Documentation Index
Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt
Use this file to discover all available pages before exploring further.
Load a simple csv
This guide shows how to load a CSV file into a store thanks to a CSV source.
For a description of what the CSV Source is and other advanced options not included in this guide,
refer to the CSV source.
Goal
The goal is to load the first two columns of “trades.csv” file into the Trades store.
File content
id,amount,currency,trader,traderEntity,bank
1,12394,chf,E F,E3,BNP Paribas
2,13149,gbp,A B,E4,BNP Paribas
Expected Trades content
We want to load the first two columns into Trades store.
Setup
First, we need to import the artifact com.activeviam.source:csv-source in our project.
Build a source
First step, in order to load a CSV file into a datastore, is to build a CSV source.
final ICsvSource<Path> csvSource = ICsvSource.builderForLocalFiles().build();
A CSV source is a collection of CSV topics.
CSV topics are references to a CSV file or a directory containing CSV files.
They are associated to a parser configuration. Let’s build this parser configuration.
final CsvParserConfiguration parserConfig =
CsvParserConfiguration.builder()
.withColumnNames(columnNames)
.separator(',') // Default separator is ',', you can customize this.
.numberSkippedLines(1) // Skip one line, e.g. if your file has a header.
.charset(StandardCharsets.UTF_8) // You can override the default charset
.acceptOverflowingLines(
true) // Accept lines with more columns than expected and ignore the extra columns
.build();
Then build a topic with the parser configuration.
This topic will load a single file in the Trades store.
Because of this simple mapping the name of the store is used as a topic name.
This topic is registered into the source.
// Will be close by source
final SingleFileCsvTopic csvTopic =
new SingleFileCsvTopic(topicName, parserConfig, filePath, null);
csvSource.addTopic(csvTopic);
Load into datastore
Second step is to create a channel and use the source to fetch the data into the Datastore.
A channel is the link between a topic and a store.
You can build a channel factory like this.
final IStoreMessageChannelFactory<IFileInfo<Path>, ILineReader> channelFactory =
new CsvMessageChannelFactory<>(csvSource, datastore);
Finally, we use the source to process the channel.
Source loads topic content using the channel and stream it into the Datastore.
There is two ways of doing so.
Using the fetch utility:
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);
Or in a manual way:
final IStoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
channelFactory.createChannel(TOPIC_NAME, STORE_NAME);
datastore.getTransactionManager().startTransaction();
csvSource.fetch(List.of(channel));
datastore.getTransactionManager().commitTransaction();
Be careful, ICsvSource is an AutoCloseable resource.
Closing a source closes all the registered topics.
Load with a calculated column
Goal
The goal is still to load the same file into the Trades store, but we want to compute a new column
from the
amount column.
Expected trade content
| id | amount | negatedAmount |
|---|
| 1 | 12394 | -12394 |
| 2 | 13149 | -13149 |
Load into the datastore
To add a new column which is based on existing ones we use an IColumnCalculator.
final AColumnCalculator<ILineReader> calculator =
new AColumnCalculator<>(negatedAmountField) {
@Override
public Object compute(final IColumnCalculationContext<ILineReader> context) {
final Long amountValue = (Long) context.getValue(AMOUNT_FIELD);
return Math.negateExact(amountValue);
}
};
This calculator must be registered in the channel factory.
channelFactory.setCalculatedColumns(STORE_NAME, List.of(calculator));
Then you can use your source to do the feeding as usual.
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);
Load only specific rows
Goal
The goal here is to load the first line of a CSV file with a filtering condition.
Currently, there is no direct native method to apply such a filter on a CSV source in Atoti. The
recommended
approach is to filter these rows during pre-processing before loading the data into Atoti, if
possible.
However, the following solutions demonstrate how to implement CSV source filtering directly within
Atoti.
Expected trade content
| id | amount | currency |
|---|
| 1 | 12394 | chf |
Build the filter
The filtering logic is based on a simple predicate:
final Predicate<Object[]> filter =
tuple -> {
final String currency = (String) tuple[2];
return currency.equals("chf");
};
Filtering using a custom tuple publisher
As mentioned earlier, there is no direct native way to apply such a filter on a CSV source. A
proposed solution is to create a custom TuplePublisher that incorporates the filter.
The filter is used during the TuplePublisher’s process of tuples.
Here is a possible custom TuplePublisher:
final ITuplePublisher<IFileInfo<Path>> tuplePublisher =
new TuplePublisher<>(datastore, STORE_NAME) {
@Override
public List<Object[]> process(
final IStoreMessage<? extends IFileInfo<Path>, ?> message,
final List<Object[]> tuples) {
return tuples.stream().filter(filter).toList();
}
};
Once the tuple publisher is defined and instantiated, it remains to create a channel between the
topic and the store using this custom tuple publisher:
final StoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
(StoreMessageChannel<IFileInfo<Path>, ILineReader>)
factory.createChannel(TOPIC_NAME, STORE_NAME, tuplePublisher);
There are other possible approaches to tackle this problem, but this is the one we recommend as it
is lightweight and occurs at a coherent stage during the tuple’s processing.
Load only specific rows using a filtering CSV column
Goal
The goal here is to load the first line of a CSV file with a filtering condition based on a CSV
column.
We want to do so without loading the column into our store.
The example file for this section is as follows:
| id | amount | currency | trader | traderEntity | bank | valid |
|---|
| 1 | 12394 | chf | E F | E3 | BNP Paribas | Y |
| 2 | 13149 | gbp | A B | E4 | BNP Paribas | N |
The filtering column is the valid column here.
Create a custom channel factory with a filtering translator
The solution involves defining a custom CsvMessageChannelFactory that creates a
custom ITranslator. This custom translator overrides the translate method to incorporate the
filtering logic. Below is the code snippet for this custom factory:
final CsvMessageChannelFactory<Path> factory =
new CsvMessageChannelFactory<>(csvSource, datastore) {
@Override
protected ITranslator<ILineReader, Object[], ILineReader> createTranslator(
final List<IColumnCalculator<ILineReader>> columnCalculators) {
return new AColumnarTranslator<>(columnCalculators) {
@Override
public Object[] translate(final ILineReader lineReader) {
final String validField = lineReader.read(6);
return filter.test(validField) ? translateTuple(lineReader) : null;
}
};
}
};
The filtering logic is as follows:
final Predicate<String> filter = s -> s.equals("Y");
The rest of the process remains unchanged: instantiate a channel using the custom channel factory
and fetch the data.