> ## Documentation Index
> Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Load a CSV file

## Load a simple csv

This guide shows how to load a CSV file into a store thanks to a CSV source.
For a description of what the CSV Source is and other advanced options not included in this guide,
refer to the [CSV source](../../sources/csv_source).

### Goal

The goal is to load the first two columns of "trades.csv" file into the `Trades` store.

**File content**

```
id,amount,currency,trader,traderEntity,bank
1,12394,chf,E F,E3,BNP Paribas
2,13149,gbp,A B,E4,BNP Paribas
```

**Expected `Trades` content**

We want to load the first two columns into `Trades` store.

| id | amount |
| -- | ------ |
| 1  | 12394  |
| 2  | 13149  |

### Setup

First, we need to import the artifact `com.activeviam.source:csv-source` in our project.

### Build a source

First step, in order to load a CSV file into a datastore, is to build a CSV source.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final ICsvSource<Path> csvSource = ICsvSource.builderForLocalFiles().build();
```

A CSV source is a collection of CSV topics.
CSV topics are references to a CSV file or a directory containing CSV files.
They are associated to a parser configuration. Let's build this parser configuration.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final CsvParserConfiguration parserConfig =
    CsvParserConfiguration.builder()
        .withColumnNames(columnNames)
        .separator(',') // Default separator is ',', you can customize this.
        .numberSkippedLines(1) // Skip one line, e.g. if your file has a header.
        .charset(StandardCharsets.UTF_8) // You can override the default charset
        .acceptOverflowingLines(
            true) // Accept lines with more columns than expected and ignore the extra columns
        .build();
```

Then build a topic with the parser configuration.
This topic will load a single file in the `Trades` store.\
Because of this simple mapping the name of the store is used as a topic name.
This topic is registered into the source.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
// Will be close by source
final SingleFileCsvTopic csvTopic =
    new SingleFileCsvTopic(topicName, parserConfig, filePath, null);
csvSource.addTopic(csvTopic);
```

### Load into datastore

Second step is to create a channel and use the source to fetch the data into the Datastore.\
A channel is the link between a topic and a store.\
You can build a channel factory like this.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final IStoreMessageChannelFactory<IFileInfo<Path>, ILineReader> channelFactory =
    new CsvMessageChannelFactory<>(csvSource, datastore);
```

Finally, we use the source to process the channel.
Source loads topic content using the channel and stream it into the Datastore.\
There is two ways of doing so.

Using the fetch utility:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);
```

Or in a manual way:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final IStoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
    channelFactory.createChannel(TOPIC_NAME, STORE_NAME);
datastore.getTransactionManager().startTransaction();
csvSource.fetch(List.of(channel));
datastore.getTransactionManager().commitTransaction();
```

Be careful, `ICsvSource` is an `AutoCloseable` resource.\
Closing a source closes all the registered topics.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
csvSource.close();
```

## Load with a calculated column

### Goal

The goal is still to load the same file into the `Trades` store, but we want to compute a new column
from the
amount column.

**Expected trade content**

| id | amount | negatedAmount |
| -- | ------ | ------------- |
| 1  | 12394  | -12394        |
| 2  | 13149  | -13149        |

### Load into the datastore

To add a new column which is based on existing ones we use an `IColumnCalculator`.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final AColumnCalculator<ILineReader> calculator =
    new AColumnCalculator<>(negatedAmountField) {
      @Override
      public Object compute(final IColumnCalculationContext<ILineReader> context) {
        final Long amountValue = (Long) context.getValue(AMOUNT_FIELD);
        return Math.negateExact(amountValue);
      }
    };
```

This calculator must be registered in the channel factory.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
channelFactory.setCalculatedColumns(STORE_NAME, List.of(calculator));
```

Then you can use your source to do the feeding as usual.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final Fetch<IFileInfo<Path>, ILineReader> fetch = new Fetch<>(channelFactory);
fetch.fetch(csvSource);
```

## Load only specific rows

### Goal

The goal here is to load the first line of a CSV file with a filtering condition.

Currently, there is no direct native method to apply such a filter on a CSV source in Atoti. The
recommended
approach is to filter these rows during pre-processing before loading the data into Atoti, if
possible.

However, the following solutions demonstrate how to implement CSV source filtering directly within
Atoti.

**Expected trade content**

| id | amount | currency |
| -- | ------ | -------- |
| 1  | 12394  | chf      |

### Build the filter

The filtering logic is based on a simple predicate:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final Predicate<Object[]> filter =
    tuple -> {
      final String currency = (String) tuple[2];
      return currency.equals("chf");
    };
```

### Filtering using a custom tuple publisher

As mentioned earlier, there is no direct native way to apply such a filter on a CSV source. A
proposed solution is to create a custom `TuplePublisher` that incorporates the filter.

The filter is used during the `TuplePublisher`'s process of tuples.
Here is a possible custom `TuplePublisher`:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final ITuplePublisher<IFileInfo<Path>> tuplePublisher =
    new TuplePublisher<>(datastore, STORE_NAME) {
      @Override
      public List<Object[]> process(
          final IStoreMessage<? extends IFileInfo<Path>, ?> message,
          final List<Object[]> tuples) {
        return tuples.stream().filter(filter).toList();
      }
    };
```

Once the tuple publisher is defined and instantiated, it remains to create a channel between the
topic and the store using this custom tuple publisher:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final StoreMessageChannel<IFileInfo<Path>, ILineReader> channel =
    (StoreMessageChannel<IFileInfo<Path>, ILineReader>)
        factory.createChannel(TOPIC_NAME, STORE_NAME, tuplePublisher);
```

There are other possible approaches to tackle this problem, but this is the one we recommend as it
is lightweight and occurs at a coherent stage during the tuple's processing.

## Load only specific rows using a filtering CSV column

### Goal

The goal here is to load the first line of a CSV file with a filtering condition based on a CSV
column.\
We want to do so without loading the column into our store.\
The example file for this section is as follows:

| id | amount | currency | trader | traderEntity | bank        | valid |
| -- | ------ | -------- | ------ | ------------ | ----------- | ----- |
| 1  | 12394  | chf      | E F    | E3           | BNP Paribas | Y     |
| 2  | 13149  | gbp      | A B    | E4           | BNP Paribas | N     |

The filtering column is the *valid* column here.

### Create a custom channel factory with a filtering translator

The solution involves defining a custom `CsvMessageChannelFactory` that creates a
custom `ITranslator`. This custom translator overrides the *translate* method to incorporate the
filtering logic. Below is the code snippet for this custom factory:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final CsvMessageChannelFactory<Path> factory =
    new CsvMessageChannelFactory<>(csvSource, datastore) {
      @Override
      protected ITranslator<ILineReader, Object[], ILineReader> createTranslator(
          final List<IColumnCalculator<ILineReader>> columnCalculators) {
        return new AColumnarTranslator<>(columnCalculators) {
          @Override
          public Object[] translate(final ILineReader lineReader) {
            final String validField = lineReader.read(6);
            return filter.test(validField) ? translateTuple(lineReader) : null;
          }
        };
      }
    };
```

The filtering logic is as follows:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final Predicate<String> filter = s -> s.equals("Y");
```

The rest of the process remains unchanged: instantiate a channel using the custom channel factory
and fetch the data.
