> ## Documentation Index
> Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt
> Use this file to discover all available pages before exploring further.

# CSV Source

> You should be familiar with the [source ETL concepts](./data_sources_introduction) before reading the CSV specific documentation.

> Check out the example [code snippets](how-to/load_csv_file_in_store) while reading this documentation.

An `ICsvSource` is a csv-related implementation of the `ISource` interface.
`ISource` is the generic framework used in Atoti for fetching from external data sources and contributing to an Atoti `IDatastore`.

The purpose of the `ICsvSource` is to load data from a csv file.

## ETL pipeline

### Extract

#### CSV Topics

`ICsvTopic`s are used to load data from CSV files into a datastore.

For CSV files that come from the local file system, we provide three implementations:

* a single CSV file topic, where a single file is watched for updates
* a directory topic, where a directory is watched for new or updated files that match a given pattern
* a multiple file topic, where a list of files is watched for updates.

> Configuring these topics can be easily done through a `FileSystemCsvTopicFactory`.

When creating a `FileSystemCsvTopicFactory`, which permits the creation of topics associated with their parser configuration, you can decide to follow symbolic links when registering a folder to watch.

Note that other CSV topic implementations exist for files that do not come from the local file system (Cloud source),
that we do not cover in this article. Even though the way to access the file contents is different,
all other aspects of data loading stay the same.

Each `ICsvTopic` holds its own `CsvParserConfiguration`, which states the delimiter used, the
columns of the files and their position, and other properties.

#### CSV parser

Atoti provides a high-performance multithreaded parser to extract data from CSV files into memory, in order to perform the extraction step of the ETL pipeline.

There are several levels of configuration for the `ICsvSource`.

Each defined topic, whether it is a `SingleFileCsvTopic`, a `DirectoryCsvTopic`,
or is created using a `FileSystemCsvTopicFactory`, comes with its own `CsvParserConfiguration`.

##### Parser configuration

A `CsvParserConfiguration` can be created using `CsvParserConfiguration.builder()`.

First, this builder requires to define the columns of the CSV file.
This information must be provided when creating the parser. It can be defined in several ways:

* by specifying the number of columns: `.withColumnCount(int)`
* by specifying the names of the columns: `.withColumnNames(List<String>)`
  * the user must specify all column names in the file. The ones that do not correspond to a matching column in the datastore are skipped
* by specifying the mapping of the column index to the column name: `.withColumnNamesMapping(Map<Integer, String>)`
  * the user can specify only the columns that are of interest

`CsvParserConfiguration` contains other properties, as described in the table below:

| Property               | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                | Default value |
| ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------------- |
| acceptOverflowingLines | When set to false, a record that contains more elements is rejected, otherwise the additional elements are ignored                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         | `false`       |
| acceptIncompleteLines  | When set to false, a record that contains fewer elements is rejected<br />If set to true, null values are inserted into the missing columns.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               | `true`        |
| separator              | The separator used to separate fields in the CSV file                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      | `,` (comma)   |
| processQuotes          | Sets the behavior of the parser regarding quotes <br /><br /><details><p>When set to true, the parser follows the official CSV specification, meaning:<ul><li>Each field may or may not be enclosed in double quotes. If fields are not enclosed with double quotes, then double quotes may not appear inside the fields.</li><li>A double-quote appearing inside a field must be escaped by preceding it with another double-quote. The double-quote character *MUST NOT* be enclosed in a double-quote pair.</li><li>Fields containing line breaks (CRLF), double quotes, and commas *MUST* be enclosed in double-quotes.</li></ul>When set to false, all double-quotes within a field are treated as any regular character, following Microsoft Excel's behavior.<blockquote><p>In this mode, fields shouldn't be enclosed in double-quotes. In addition, line breaks should not be used within a field.</p></blockquote></p></details> | `true`        |
| numberSkippedLines     | The number of lines to skip at the beginning of each file (header)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         | 0             |

##### Parser internals

###### Principle

The parser relies on NIO (Non-blocking I/O) channel,
memory buffers and multithreading to speed-up character decoding and object parsing, providing thus
high speed data analysis through fast data loading.

The  implementation aims at high performance and detailed configuration. It allows
fetching data from, and listening for changes in CSV files in a multithreaded way in minimal times,
while keeping the amount of transient memory to a minimum.

Moreover, CSV work is organized internally in workloads that they execute. Nevertheless,
it is also possible for custom code to bypass the CSV Source standard
pipelines, to take control of parsing operations, deciding what files are loaded into what
channel, and when.

Multithreaded CSV parsing isn't as straight-forward as splitting the file and assigning each chunk to a single thread.

This example illustrates the difficulties:

Given the following CSV:

```
ID, Name, Price, Comment
0,"nameA",1,"This is a comment, is it not"
1,"nameB",2,"Another comment"
```

If this file is split in half (character wise, since the parser doesn't know about context, line
delimiter, and so on, at that point of the algorithm), the first thread will have the bytes that correspond to

```
0,"nameA",1,"This is a comment
```

while the second thread gets

```
, is it not"
1,"nameB",2,"Another comment"
```

Now the second thread sees a delimiter, the character `,` but as it doesn't know the context, it cannot
know if it is within a comment or not. It could count the `"` character and deduce, after
parsing the entire block, that it was within a comment, but that technique has its own limits.

ActiveViam's multithreaded parser can read over a million entries per second.

###### Concurrency and cache locality

The CSV Source uses a single unified thread pool that runs a single type of task.
Each task performs the following sub-tasks:

1. Fetch bytes from a byte channel that represents the file
2. Decode the bytes into characters
3. Split these characters into lines
4. Parse the lines into records
5. Publish the records to a source listener

The parser optimizes memory access by having all transient data remain in the CPU's cache for the duration of the computation.
Each task works separately without the need for synchronization with the other tasks in the thread pool.

In a typical usage of the CSV source, the first three sub-tasks of each task (fetch, decode, split) are much less expensive than the following two (parse, publish).
Thus, as soon as a task is done with its decoding sub-task, it spawns the next task, and can give that task context regarding delimiters.

A CSV parsing process can therefore be seen as such:

<Frame>
  <img src="https://mintcdn.com/activeviam/KszPZqdDnmT6EpJc/engine/java-sdk/6.1/assets/sources/csv/csv_parsing_multithreading.drawio.png?fit=max&auto=format&n=KszPZqdDnmT6EpJc&q=85&s=9770441a9b3fe486a6a22831a421f6a5" alt="Parsing Multithreaded" width="1031" height="491" data-path="engine/java-sdk/6.1/assets/sources/csv/csv_parsing_multithreading.drawio.png" />
</Frame>

### Channels and messages

The `IMessageChannel`s are each linked to a single topic and are used to gather the data, as it is being parsed,
into messages. They are also responsible for sending those messages to the datastore.

Most messages are `IStoreMessage`s, and gather the data as tuples to add to the datastore.

When asked to `fetch` data for specific `IMessageChannel`s, given as arguments, the `ICsvSource`
polls the corresponding `ICsvTopic`s and generates the corresponding workload.

Note that several channels can correspond to the same topic.
For example, a file contains some columns that correspond to a given store, and other columns that correspond to a different store.
In this case there will be one channel for each store.
The two channels depend on the same file and therefore on the same topic.
The two channels are grouped into a composite channel so that the topic polling and file parsing into lines is only done once but then the lines are dispatched to each
individual channel for specific parsing into records.

### Translators

A default `ITranslator` is automatically created within the `MessageChannelFactory`,
and converts parsed lines to tuples that can be inserted into the datastore.

Data can be enriched in the `ITranslator` using `IColumnCalculator`s.
`IColumnCalculator`s work for single line data transformation (the data needed for the transformation is contained on one line of the CSV file).
For example:

* convert existing data (transform a numerical month into a text string)
* generate new data

For each tuple written into an `IMessage`, an `IColumnCalculator` calculates the value to write in the corresponding added column.

An `ITranslator` with `IColumnCalculator`s can be created thanks to `CsvMessageChannelFactory#createTranslator()`.

> A `IColumnCalculator` is related to a single topic.

#### ColumnCalculator

| ColumnCalculator             | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| ---------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CsvColumnParser`            | Converts a column to another type <br /><br /><details><p>A <code>CsvColumnParser</code> parses the sequence of characters for a given column into the asked type. <br /><br /><blockquote><p>A custom <code>IParser</code> can be created using the annotation <code>@AtotiPluginValue</code></p></blockquote> Formatters can be added, provided they are known to Atoti Server. For instance, a "date" column at position 1 in the source can be parsed like so: <code>new CsvColumnParser("date", "localDate\[yyyy-MM-dd]", 1)</code></p></details>                                                                                                                     |
| `ConstantCalculator`         | Adds the same predetermined constant value in the given column position, for every tuple.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `LineIndexCalculator`        | Adds the index of the line being parsed in the current CSV file. This is primarily used to add an ID to each element.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `FileNameCalculator`         | Adds the name of the current CSV file as a value for the given column. This is primarily used when the imported files each represent a specific entity.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| `FilePathCalculator`         | Adds the absolute path to the current CSV file as a value for the given column.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| `FileLastModifiedCalculator` | Adds the last modified date of the current CSV file as a value for the given column.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| `EmptyCalculator`            | Adds an empty (null) cell to the tuple. <br /><br /><details><p>This has two major use cases:<ul><li>When the data is not in the source file but is still expected in the datastore.</li><li>When it's filled by another multi-value column calculator using the<code>IColumnCalculationContext</code> write-back feature. This allows a column calculator to write in sibling columns, to use processing resources only once, while filling several columns.</li></ul><blockquote><p>You can define your own logic in a column calculator by creating a new <code>IColumnCalculator</code> and overriding the <code>compute</code> method.</p></blockquote></p></details> |

### ICsvSourceConfiguration

The CSV Source Configuration is a higher level configuration.

| Property        | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          | Default value                                                            |
| --------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------ |
| parserThreads   | Sets the number of threads that run the parsing in parallel.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         | Half of the available processors, but no more than 8 and not less than 2 |
| bufferSize      | Sets the size of a byte buffer, in kilobytes.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        | 64 kB                                                                    |
| synchronousMode | Sets the processing mode of the CSV source. <br /><br /><details><p>In synchronous mode, the processing of a new file can only start when all the activity related to the previous file has been completed, such as the fetching, parsing, and publishing of the parsed records. The publishing phase might also include any processing implemented on the CSV source listener side. So, if the listener implements any asynchronous processing once it receives the records, the CSV Source processing won't wait before switching to the following file.<br /><br />Please also note that archives are always processed in synchronous mode in order to keep the parsers' APIs fully agnostic of the data source's location. For example, source files may come from the local storage or be lazily fetched from the Internet. For this reason files are always handled as if they were byte streams, i.e. bidirectional or random access iteration on the file data is prohibited. As such it is impossible to find individual entries in ZIP and TAR archives before parsing them.</p></details> | false                                                                    |
| fileComparator  | Sets the comparator used to sort the files of a topic before parsing them. <br /><br /><details><p>If you need the files from a topic to be parsed in a specific order (for example, you have one file per day and want to parse the files in historical order). If your topic is a local file system directory topic, the provided comparator must be an <code>IComparator\&ltPath\&gt</code><br /><br />Note that the comparator will effectively allow the files to be parsed in the appropriate order only if <code>synchronousMode</code> is set to true (otherwise, even if we respect the file order for beginning the parsing workload, as we don't wait for the first file to be finished parsing before parsing the next one, the final order for record publishing cannot be guaranteed).</p></details>                                                                                                                                                                                                                                                                                   | None                                                                     |

## Pipeline execution

### Fetch

`ICsvSource#fetch` is the method that triggers the data loading from the source to the datastore.

### Listen

`ICsvSource#listen` is the method that triggers listening for new data.

Listening allows any new data in the files to be fetched when it is added.
To do so, two different approaches are available in Atoti:

* by polling the files at a given interval, and if the last modification time of one file has changed, the content is reloaded and checked and compared to the data already loaded
* via a `WatcherService`, the files are watched for changes and the data is loaded as soon as a change is detected (`WatcherService` relies on a `java.nio.file.WatchService`)

By default, the approach via `WatcherService` is used, as it provides better latency.
Indeed, when using the polling mechanism, the last modification time of the file is checked when deciding whether to reload a file.
On some file systems, the last modification time precision can be up to a second:

* This can lead to delay in reloading the file, as the minimal refresh frequency is one second.
* This can lead to huge delays when having irregular change frequency, as if two updates happens in the same second, the first update could trigger the reload, and the second one would not, ending up being processed with the next update, which could be much later.

Reloading a file is a necessary step, in order to check for changes in the content.
We recommended that topics are split in several files (using a `DirectoryCsvTopic` for instance). This avoids large files that take a long time to reload.
With that design, only the files that have been updated will be reloaded, and not the whole topic.

## Monitoring

### Reporting

Optionally, the CSVSource can generate an `ICsvTopicParsingReport` for each fetched topic.
This report will include a `ICsvFileParsingReport` for each parsed file of the former topic, including all the parsing errors encountered.
These reports are returned by the `fetch()` method of the source, if `parsingReportEnabled(boolean)` is set to `true` in the `CsvSourceConfiguration`.

Each `ICsvFileParsingReport` includes a wide range of parsing information:

* a list of all encountered errors on the form of `ICsvAnomaly`,
* the number of successfully parsed and lines loaded into the datastore
* aggregated execution times for various steps of the parsing process.

Only the first errors are taken into account in order to avoid creating an enormous report which will result in high memory consumption (for instance while parsing a large malformed file)
The number of stored errors can be set by specifying the `maxParsingAnomalies(int)` in the `CsvSourceConfiguration`, which defaults to `1000`.

### CsvSource MBean

The CSV Source can first be monitored using the associated JMX Bean.
It can be created using:

```
@Bean
@DependsOn(value = "csvSource")
public JMXEnabler JMXCsvSourceEnabler() {
    return new JMXEnabler(new CsvMonitoring(csvSource()));
}
```

with `csvSource` being a Bean itself.

The following is a list of available operations and attributes:

* `getTopics`: Retrieves the list of topics defined in the CSV Source, coupled with their actual
  state (paused or not, see `pauseListen(topic)` and `resumeListen(topic)`below).
* `getCommittedState(topic)`: Returns the committed state of a given topic: all the files
  submitted for processing and their last modification time.
* `pauseListen(topic)`: Allows you to pause the listening of events on a given topic, meaning
  that the WatcherService of the CSV Source doesn't process events that concern the given topic,
  until the resume method is called. To be able to call this pause method, the topic must extend the
  `ACsvTopic` class.
* `resumeListen(topic)`: Resumes the listening of events on a given topic. Additionally, this method
  automatically triggers the `differentialReload()` method on the given topic.
* `differentialReload(topic)`: Triggers a differential reload of the topic. it computes the current
  state of the topic, compares it to the committed state, and generates an event that only loads
  the difference between the two states.
  This means that after calling resumeListen() on a topic, the CSV Source processes all the
  changes that happened during the pause period of time.

> Warning: This method relies on the **Last Modified Time** property of the files, which in some file
> systems can take up to a second to be updated.

* `fullReload(topic)`: Performs a full reload of the topic: all files linked to this topic
  are reloaded.
* `simulateDifferentialReload(topic)`: Simulates a differential reload of the given topic and
  returns the changes that the differential reload would have triggered.

### Customized Monitoring

Before reading this section, please refer to
[the monitoring documentation](../monitoring/health-dispatcher).

Several `ICsvHealthEvent` have been added to the CSV Source to ease monitoring.

* `CsvFileParsingProgress` is dispatched to inform the user about the progress of the current
  parsing. It contains the full name of the file and the parsing information related to the exact
  moment of the dispatch. **associated tags: csv, parsing, source**
* `CsvFileParsingDone` is dispatched when a given file parsing is completed. The `ChannelTask` is closed,
  meaning that no new `ParsingTask` can be created. It contains the full name of the file and the parsing information
  related to the exact moment of the dispatch. **associated tags: csv, parsing, source**
* `CsvFileParsingFailure` is dispatched in case of a full failure of a CSV file parsing. It contains
  the full name of the file, the error message, and the parsing information related to the exact
  moment of the failure. **associated tags: csv, parsing, source**
* `CsvLineParsingFailure` is dispatched in case of a failure of a one line parsing in a CSV file. It contains
  the full name of the file, the failing line content and line number, and the error message.
  **associated tags: csv, parsing, source**
