Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt

Use this file to discover all available pages before exploring further.

Load a simple csv

This guide shows how to load a CSV file into a store thanks to a CSV source. For a description of what the CSV Source is and other advanced options not included in this guide, refer to the CSV source.

Goal

The goal is to load the first two columns of “trades.csv” file into the Trades store. File content
id,amount,currency,trader,traderEntity,bank
1,12394,chf,E F,E3,BNP Paribas
2,13149,gbp,A B,E4,BNP Paribas
Expected Trades content We want to load the first two columns into Trades store.
idamount
112394
213149

Setup

First, we need to import the artifact com.activeviam.source:csv-source in our project.

Build a source

First step, in order to load a CSV file into a datastore, is to build a CSV source.
final ICsvSource<Path> csvSource = ICsvSource.builderForLocalFiles().build();
A CSV source is a collection of CSV topics. CSV topics are references to a CSV file or a directory containing CSV files. They are associated to a parser configuration. Let’s build this parser configuration.
final CsvParserConfiguration parserConfig =
    CsvParserConfiguration.builder()
        .withColumnNames(columnNames)
        .separator(',') // Default separator is ',', you can customize this.
        .numberSkippedLines(1) // Skip one line, e.g. if your file has a header.
        .charset(StandardCharsets.UTF_8) // You can override the default charset
        .acceptOverflowingLines(
            true) // Accept lines with more columns than expected and ignore the extra columns
        .build();
Then build a topic with the parser configuration. This topic will load a single file in the Trades store.
Because of this simple mapping the name of the store is used as a topic name. This topic is registered into the source.
// Will be close by source
final SingleFileCsvTopic csvTopic =
    new SingleFileCsvTopic(topicName, parserConfig, filePath, null);
csvSource.addTopic(csvTopic);

Load into datastore

Second step is to create a channel and use the source to fetch the data into the Datastore.
A channel is the link between a topic and a store.
You can build a channel factory like this.
final IStoreMessageChannelFactory<IFileInfo<Path>, ILineReader> channelFactory =
    new CsvMessageChannelFactory<>(csvSource, datastore);
Finally, we use the source to process the channel. Source loads topic content using the channel and stream it into the Datastore.
There is two ways of doing so.
Using the fetch utility:
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);
Or in a manual way:
final IStoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
    channelFactory.createChannel(TOPIC_NAME, STORE_NAME);
datastore.getTransactionManager().startTransaction();
csvSource.fetch(List.of(channel));
datastore.getTransactionManager().commitTransaction();
Be careful, ICsvSource is an AutoCloseable resource.
Closing a source closes all the registered topics.
csvSource.close();

Load with a calculated column

Goal

The goal is still to load the same file into the Trades store, but we want to compute a new column from the amount column. Expected trade content
idamountnegatedAmount
112394-12394
213149-13149

Load into the datastore

To add a new column which is based on existing ones we use an IColumnCalculator.
final AColumnCalculator<ILineReader> calculator =
    new AColumnCalculator<>(negatedAmountField) {
      @Override
      public Object compute(final IColumnCalculationContext<ILineReader> context) {
        final Long amountValue = (Long) context.getValue(AMOUNT_FIELD);
        return Math.negateExact(amountValue);
      }
    };
This calculator must be registered in the channel factory.
channelFactory.setCalculatedColumns(STORE_NAME, List.of(calculator));
Then you can use your source to do the feeding as usual.
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);

Load only specific rows

Goal

The goal here is to load the first line of a CSV file with a filtering condition. Currently, there is no direct native method to apply such a filter on a CSV source in Atoti. The recommended approach is to filter these rows during pre-processing before loading the data into Atoti, if possible. However, the following solutions demonstrate how to implement CSV source filtering directly within Atoti. Expected trade content
idamountcurrency
112394chf

Build the filter

The filtering logic is based on a simple predicate:
final Predicate<Object[]> filter =
    tuple -> {
      final String currency = (String) tuple[2];
      return currency.equals("chf");
    };

Filtering using a custom tuple publisher

As mentioned earlier, there is no direct native way to apply such a filter on a CSV source. A proposed solution is to create a custom TuplePublisher that incorporates the filter. The filter is used during the TuplePublisher’s process of tuples. Here is a possible custom TuplePublisher:
final ITuplePublisher<IFileInfo<Path>> tuplePublisher =
    new TuplePublisher<>(datastore, STORE_NAME) {
      @Override
      public List<Object[]> process(
          final IStoreMessage<? extends IFileInfo<Path>, ?> message,
          final List<Object[]> tuples) {
        return tuples.stream().filter(filter).toList();
      }
    };
Once the tuple publisher is defined and instantiated, it remains to create a channel between the topic and the store using this custom tuple publisher:
final StoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
    (StoreMessageChannel<IFileInfo<Path>, ILineReader>)
        factory.createChannel(TOPIC_NAME, STORE_NAME, tuplePublisher);
There are other possible approaches to tackle this problem, but this is the one we recommend as it is lightweight and occurs at a coherent stage during the tuple’s processing.

Load only specific rows using a filtering CSV column

Goal

The goal here is to load the first line of a CSV file with a filtering condition based on a CSV column.
We want to do so without loading the column into our store.
The example file for this section is as follows:
idamountcurrencytradertraderEntitybankvalid
112394chfE FE3BNP ParibasY
213149gbpA BE4BNP ParibasN
The filtering column is the valid column here.

Create a custom channel factory with a filtering translator

The solution involves defining a custom CsvMessageChannelFactory that creates a custom ITranslator. This custom translator overrides the translate method to incorporate the filtering logic. Below is the code snippet for this custom factory:
final CsvMessageChannelFactory<Path> factory =
    new CsvMessageChannelFactory<>(csvSource, datastore) {
      @Override
      protected ITranslator<ILineReader, Object[], ILineReader> createTranslator(
          final List<IColumnCalculator<ILineReader>> columnCalculators) {
        return new AColumnarTranslator<>(columnCalculators) {
          @Override
          public Object[] translate(final ILineReader lineReader) {
            final String validField = lineReader.read(6);
            return filter.test(validField) ? translateTuple(lineReader) : null;
          }
        };
      }
    };
The filtering logic is as follows:
final Predicate<String> filter = s -> s.equals("Y");
The rest of the process remains unchanged: instantiate a channel using the custom channel factory and fetch the data.