Load a CSV file
Load a simple csv
This guide shows how to load a CSV file into a store thanks to a CSV source. For a description of what the CSV Source is and other advanced options not included in this guide, refer to the CSV source.
Goal
The goal is to load the first two columns of "trades.csv" file into the Trades
store.
File content
id,amount,currency,trader,traderEntity,bank
1,12394,chf,E F,E3,BNP Paribas
2,13149,gbp,A B,E4,BNP Paribas
Expected Trades
content
We want to load the first two columns into Trades
store.
id | amount |
---|---|
1 | 12394 |
2 | 13149 |
Setup
First, we need to import the artifact com.activeviam.source:csv-source
in our project.
Build a source
First step, in order to load a CSV file into a datastore, is to build a CSV source.
final ICsvSource<Path> csvSource = ICsvSource.builderForLocalFiles().build();
A CSV source is a collection of CSV topics. CSV topics are references to a CSV file or a directory containing CSV files. They are associated to a parser configuration. Let's build this parser configuration.
final CsvParserConfiguration parserConfig =
CsvParserConfiguration.builder()
.withColumnNames(columnNames)
.separator(',') // Default separator is ',', you can customize this.
.numberSkippedLines(1) // Skip one line, e.g. if your file has a header.
.charset(StandardCharsets.UTF_8) // You can override the default charset
.acceptOverflowingLines(
true) // Accept lines with more columns than expected and ignore the extra columns
.build();
Then build a topic with the parser configuration.
This topic will load a single file in the Trades
store.
Because of this simple mapping the name of the store is used as a topic name.
This topic is registered into the source.
// Will be close by source
final SingleFileCsvTopic csvTopic =
new SingleFileCsvTopic(topicName, parserConfig, filePath, null);
csvSource.addTopic(csvTopic);
Load into datastore
Second step is to create a channel and use the source to fetch the data into the Datastore.
A channel is the link between a topic and a store.
You can build a channel factory like this.
final IStoreMessageChannelFactory<IFileInfo<Path>, ILineReader> channelFactory =
new CsvMessageChannelFactory<>(csvSource, datastore);
Finally, we use the source to process the channel.
Source loads topic content using the channel and stream it into the Datastore.
There is two ways of doing so.
Using the fetch utility :
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);
Or in a manual way :
final IStoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
channelFactory.createChannel(TOPIC_NAME, STORE_NAME);
datastore.getTransactionManager().startTransaction();
csvSource.fetch(List.of(channel));
datastore.getTransactionManager().commitTransaction();
Be careful, ICsvSource
is an AutoCloseable
resource.
Closing a source closes all the registered topics.
csvSource.close();
Load with a calculated column
Goal
The goal is still to load the same file into the Trades
store, but we want to compute a new column from the
amount column.
Expected trade content
id | amount | negatedAmount |
---|---|---|
1 | 12394 | -12394 |
2 | 13149 | -13149 |
Load into the datastore
To add a new column which is based on existing ones we use an IColumnCalculator
.
final AColumnCalculator<ILineReader> calculator =
new AColumnCalculator<>(negatedAmountField) {
@Override
public Object compute(final IColumnCalculationContext<ILineReader> context) {
final Long amountValue = (Long) context.getValue(AMOUNT_FIELD);
return Math.negateExact(amountValue);
}
};
This calculator must be registered in the channel factory.
channelFactory.setCalculatedColumns(STORE_NAME, List.of(calculator));
Then you can use your source to do the feeding as usual.
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);