Skip to main content

Load a CSV file

Load a simple csv

This guide shows how to load a CSV file into a store thanks to a CSV source. For a description of what the CSV Source is and other advanced options not included in this guide, refer to the CSV source.

Goal

The goal is to load the first two columns of "trades.csv" file into the Trades store.

File content

id,amount,currency,trader,traderEntity,bank
1,12394,chf,E F,E3,BNP Paribas
2,13149,gbp,A B,E4,BNP Paribas

Expected Trades content

We want to load the first two columns into Trades store.

idamount
112394
213149

Setup

First, we need to import the artifact com.activeviam.source:csv-source in our project.

Build a source

First step, in order to load a CSV file into a datastore, is to build a CSV source.

final ICsvSource<Path> csvSource = CsvSourceFactory.create();

A CSV source is a collection of CSV topics. CSV topics are references to a CSV file or a directory containing CSV files. They are associated to a parser configuration. Let's build this parser configuration.

final CsvParserConfiguration parserConfig = new CsvParserConfiguration(columnNames);
// Default separator is ',', you can customize this.
parserConfig.setSeparator(',');
// Skip the first header line if your file has a header.
parserConfig.setNumberSkippedLines(1);
// You can override the default charset
parserConfig.setCharset(StandardCharsets.UTF_8);
// Accept ignored supplementary columns
parserConfig.setAcceptOverflowingLines(true);

Then build a topic with the parser configuration. This topic will load a single file in the Trades store.
Because of this simple mapping the name of the store is used as a topic name. This topic is registered into the source.

// Will be close by source
final SingleFileCsvTopic csvTopic =
new SingleFileCsvTopic(topicName, parserConfig, filePath, null);
csvSource.addTopic(csvTopic);

Load into datastore

Second step is to create a channel and use the source to fetch the data into the Datastore.
A channel is the link between a topic and a store.
You can build a channel factory like this.

final IStoreMessageChannelFactory<IFileInfo<Path>, ILineReader> channelFactory =
new CsvMessageChannelFactory<>(csvSource, datastore);

Finally, we use the source to process the channel. Source loads topic content using the channel and stream it into the Datastore.
There is two ways of doing so.

Using the fetch utility :

final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);

Or in a manual way :

final IStoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
channelFactory.createChannel(TOPIC_NAME, STORE_NAME);
datastore.getTransactionManager().startTransaction();
csvSource.fetch(List.of(channel));
datastore.getTransactionManager().commitTransaction();

Be careful, ICsvSource is an AutoCloseable resource.
Closing a source closes all the registered topics.

csvSource.close();

Load with a calculated column

Goal

The goal is still to load the same file into the Trades store, but we want to compute a new column from the amount column.

Expected trade content

idamountnegatedAmount
112394-12394
213149-13149

Load into the datastore

To add a new column which is based on existing ones we use an IColumnCalculator.

final AColumnCalculator<ILineReader> calculator =
new AColumnCalculator<>(negatedAmountField) {
@Override
public Object compute(final IColumnCalculationContext<ILineReader> context) {
final Long amountValue = (Long) context.getValue(AMOUNT_FIELD);
return Math.negateExact(amountValue);
}
};

This calculator must be registered in the channel factory.

channelFactory.setCalculatedColumns(STORE_NAME, List.of(calculator));

Then you can use your source to do the feeding as usual.

final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);