Skip to main content

CSV Source

You should be familiar with the source ETL concepts before reading the CSV specific documentation.

Check out the example code snippets while reading this documentation.

An ICsvSource is a csv-related implementation of the ISource interface. ISource is the generic framework used in Atoti for fetching from external data sources and contributing to an Atoti IDatastore.

The purpose of the ICsvSource is to load data from a csv file.

ETL pipeline

Extract

CSV Topics

ICsvTopics are used to load data from CSV files into a datastore.

For CSV files that come from the local file system, we provide three implementations:

  • a single CSV file topic, where a single file is watched for updates
  • a directory topic, where a directory is watched for new or updated files that match a given pattern
  • a multiple file topic, where a list of files is watched for updates.

Configuring these topics can be easily done through a FileSystemCsvTopicFactory.

When creating a FileSystemCsvTopicFactory, which permits the creation of topics associated with their parser configuration, you can decide to follow symbolic links when registering a folder to watch.

Note that other CSV topic implementations exist for files that do not come from the local file system (Cloud source), that we do not cover in this article. Even though the way to access the file contents is different, all other aspects of data loading stay the same.

Each ICsvTopic holds its own CsvParserConfiguration, which states the delimiter used, the columns of the files and their position, and other properties.

CSV parser

Atoti provides a high-performance multithreaded parser to extract data from CSV files into memory, in order to perform the extraction step of the ETL pipeline.

There are several levels of configuration for the ICsvSource.

Each defined topic, whether it is a SingleFileCsvTopic, a DirectoryCsvTopic, or is created using a FileSystemCsvTopicFactory, comes with its own CsvParserConfiguration.

Parser configuration

A CsvParserConfiguration can be created using CsvParserConfiguration.builder().

First, this builder requires to define the columns of the CSV file. This information must be provided when creating the parser. It can be defined in several ways:

  • by specifying the number of columns: .withColumnCount(int)
  • by specifying the names of the columns: .withColumnNames(List<String>)
    • the user must specify all column names in the file. The ones that do not correspond to a matching column in the datastore are skipped
  • by specifying the mapping of the column index to the column name: .withColumnNamesMapping(Map<Integer, String>)
    • the user can specify only the columns that are of interest

CsvParserConfiguration contains other properties, as described in the table below:

PropertyDescriptionDefault value
acceptOverflowingLinesWhen set to false, a record that contains more elements is rejected, otherwise the additional elements are ignoredfalse
acceptIncompleteLinesWhen set to false, a record that contains fewer elements is rejected
If set to true, null values are inserted into the missing columns.
true
separatorThe separator used to separate fields in the CSV file, (comma)
processQuotesSets the behavior of the parser regarding quotes

Details

When set to true, the parser follows the official CSV specification, meaning:

  • Each field may or may not be enclosed in double quotes. If fields are not enclosed with double quotes, then double quotes may not appear inside the fields.
  • A double-quote appearing inside a field must be escaped by preceding it with another double-quote. The double-quote character MUST NOT be enclosed in a double-quote pair.
  • Fields containing line breaks (CRLF), double quotes, and commas MUST be enclosed in double-quotes.
When set to false, all double-quotes within a field are treated as any regular character, following Microsoft Excel's behavior.

In this mode, fields shouldn't be enclosed in double-quotes. In addition, line breaks should not be used within a field.

true
numberSkippedLinesThe number of lines to skip at the beginning of each file (header)0

Parser internals

Principle

The parser relies on NIO (Non-blocking I/O) channel, memory buffers and multithreading to speed-up character decoding and object parsing, providing thus high speed data analysis through fast data loading.

The implementation aims at high performance and detailed configuration. It allows fetching data from, and listening for changes in CSV files in a multithreaded way in minimal times, while keeping the amount of transient memory to a minimum.

Moreover, CSV work is organized internally in workloads that they execute. Nevertheless, it is also possible for custom code to bypass the CSV Source standard pipelines, to take control of parsing operations, deciding what files are loaded into what channel, and when.

Multithreaded CSV parsing isn't as straight-forward as splitting the file and assigning each chunk to a single thread.

This example illustrates the difficulties:

Given the following CSV:

ID, Name, Price, Comment
0,"nameA",1,"This is a comment, is it not"
1,"nameB",2,"Another comment"

If this file is split in half (character wise, since the parser doesn't know about context, line delimiter, and so on, at that point of the algorithm), the first thread will have the bytes that correspond to

0,"nameA",1,"This is a comment

while the second thread gets

, is it not"
1,"nameB",2,"Another comment"

Now the second thread sees a delimiter, the character , but as it doesn't know the context, it cannot know if it is within a comment or not. It could count the " character and deduce, after parsing the entire block, that it was within a comment, but that technique has its own limits.

ActiveViam's multithreaded parser can read over a million entries per second.

Concurrency and cache locality

The CSV Source uses a single unified thread pool that runs a single type of task. Each task performs the following sub-tasks:

  1. Fetch bytes from a byte channel that represents the file
  2. Decode the bytes into characters
  3. Split these characters into lines
  4. Parse the lines into records
  5. Publish the records to a source listener

The parser optimizes memory access by having all transient data remain in the CPU's cache for the duration of the computation. Each task works separately without the need for synchronization with the other tasks in the thread pool.

In a typical usage of the CSV source, the first three sub-tasks of each task (fetch, decode, split) are much less expensive than the following two (parse, publish). Thus, as soon as a task is done with its decoding sub-task, it spawns the next task, and can give that task context regarding delimiters.

A CSV parsing process can therefore be seen as such:

Parsing Multithreaded

Channels and messages

The IMessageChannels are each linked to a single topic and are used to gather the data, as it is being parsed, into messages. They are also responsible for sending those messages to the datastore.

Most messages are IStoreMessages, and gather the data as tuples to add to the datastore.

When asked to fetch data for specific IMessageChannels, given as arguments, the ICsvSource polls the corresponding ICsvTopics and generates the corresponding workload.

Note that several channels can correspond to the same topic. For example, a file contains some columns that correspond to a given store, and other columns that correspond to a different store. In this case there will be one channel for each store. The two channels depend on the same file and therefore on the same topic. The two channels are grouped into a composite channel so that the topic polling and file parsing into lines is only done once but then the lines are dispatched to each individual channel for specific parsing into records.

Translators

A default ITranslator is automatically created within the MessageChannelFactory, and converts parsed lines to tuples that can be inserted into the datastore.

Data can be enriched in the ITranslator using IColumnCalculators. IColumnCalculators work for single line data transformation (the data needed for the transformation is contained on one line of the CSV file). For example:

  • convert existing data (transform a numerical month into a text string)
  • generate new data

For each tuple written into an IMessage, an IColumnCalculator calculates the value to write in the corresponding added column.

An ITranslator with IColumnCalculators can be created thanks to CsvMessageChannelFactory#createTranslator().

A IColumnCalculator is related to a single topic.

ColumnCalculator

ColumnCalculatorDescription
CsvColumnParserConverts a column to another type

Details

A CsvColumnParser parses the sequence of characters for a given column into the asked type.

A custom IParser can be created using the annotation @AtotiPluginValue

Formatters can be added, provided they are known to Atoti Server. For instance, a "date" column at position 1 in the source can be parsed like so: new CsvColumnParser("date", "localDate[yyyy-MM-dd]", 1)

ConstantCalculatorAdds the same predetermined constant value in the given column position, for every tuple.
LineIndexCalculatorAdds the index of the line being parsed in the current CSV file. This is primarily used to add an ID to each element.
FileNameCalculatorAdds the name of the current CSV file as a value for the given column. This is primarily used when the imported files each represent a specific entity.
FilePathCalculatorAdds the absolute path to the current CSV file as a value for the given column.
FileLastModifiedCalculatorAdds the last modified date of the current CSV file as a value for the given column.
EmptyCalculatorAdds an empty (null) cell to the tuple.

Details

This has two major use cases:

  • When the data is not in the source file but is still expected in the datastore.
  • When it's filled by another multi-value column calculator using theIColumnCalculationContext write-back feature. This allows a column calculator to write in sibling columns, to use processing resources only once, while filling several columns.

You can define your own logic in a column calculator by creating a new IColumnCalculator and overriding the compute method.

ICsvSourceConfiguration

The CSV Source Configuration is a higher level configuration.

PropertyDescriptionDefault value
parserThreadsSets the number of threads that run the parsing in parallel.Half of the available processors, but no more than 8 and not less than 2
bufferSizeSets the size of a byte buffer, in kilobytes.64 kB
synchronousModeSets the processing mode of the CSV source.

Details

In synchronous mode, the processing of a new file can only start when all the activity related to the previous file has been completed, such as the fetching, parsing, and publishing of the parsed records. The publishing phase might also include any processing implemented on the CSV source listener side. So, if the listener implements any asynchronous processing once it receives the records, the CSV Source processing won't wait before switching to the following file.

Please also note that archives are always processed in synchronous mode in order to keep the parsers' APIs fully agnostic of the data source's location. For example, source files may come from the local storage or be lazily fetched from the Internet. For this reason files are always handled as if they were byte streams, i.e. bidirectional or random access iteration on the file data is prohibited. As such it is impossible to find individual entries in ZIP and TAR archives before parsing them.

false
fileComparatorSets the comparator used to sort the files of a topic before parsing them.

Details

If you need the files from a topic to be parsed in a specific order (for example, you have one file per day and want to parse the files in historical order). If your topic is a local file system directory topic, the provided comparator must be an IComparator<Path>

Note that the comparator will effectively allow the files to be parsed in the appropriate order only if synchronousMode is set to true (otherwise, even if we respect the file order for beginning the parsing workload, as we don't wait for the first file to be finished parsing before parsing the next one, the final order for record publishing cannot be guaranteed).

None

Pipeline execution

Fetch

ICsvSource#fetch is the method that triggers the data loading from the source to the datastore.

Listen

ICsvSource#listen is the method that triggers listening for new data.

Listening allows any new data in the files to be fetched when it is added. To do so, two different approaches are available in Atoti:

  • by polling the files at a given interval, and if the last modification time of one file has changed, the content is reloaded and checked and compared to the data already loaded
  • via a WatcherService, the files are watched for changes and the data is loaded as soon as a change is detected (WatcherService relies on a java.nio.file.WatchService)

By default, the approach via WatcherService is used, as it provides better latency. Indeed, when using the polling mechanism, the last modification time of the file is checked when deciding whether to reload a file. On some file systems, the last modification time precision can be up to a second:

  • This can lead to delay in reloading the file, as the minimal refresh frequency is one second.
  • This can lead to huge delays when having irregular change frequency, as if two updates happens in the same second, the first update could trigger the reload, and the second one would not, ending up being processed with the next update, which could be much later.

Reloading a file is a necessary step, in order to check for changes in the content. We recommended that topics are split in several files (using a DirectoryCsvTopic for instance). This avoids large files that take a long time to reload. With that design, only the files that have been updated will be reloaded, and not the whole topic.

Monitoring

Reporting

Optionally, the CSVSource can generate an ICsvTopicParsingReport for each fetched topic. This report will include a ICsvFileParsingReport for each parsed file of the former topic, including all the parsing errors encountered. These reports are returned by the fetch() method of the source, if parsingReportEnabled(boolean) is set to true in the CsvSourceConfiguration.

Each ICsvFileParsingReport includes a wide range of parsing information:

  • a list of all encountered errors on the form of ICsvAnomaly,
  • the number of successfully parsed and lines loaded into the datastore
  • aggregated execution times for various steps of the parsing process.

Only the first errors are taken into account in order to avoid creating an enormous report which will result in high memory consumption (for instance while parsing a large malformed file) The number of stored errors can be set by specifying the maxParsingAnomalies(int) in the CsvSourceConfiguration, which defaults to 1000.

CsvSource MBean

The CSV Source can first be monitored using the associated JMX Bean. It can be created using:

@Bean
@DependsOn(value = "csvSource")
public JMXEnabler JMXCsvSourceEnabler() {
return new JMXEnabler(new CsvMonitoring(csvSource()));
}

with csvSource being a Bean itself.

The following is a list of available operations and attributes:

  • getTopics: Retrieves the list of topics defined in the CSV Source, coupled with their actual state (paused or not, see pauseListen(topic) and resumeListen(topic)below).
  • getCommittedState(topic): Returns the committed state of a given topic: all the files submitted for processing and their last modification time.
  • pauseListen(topic): Allows you to pause the listening of events on a given topic, meaning that the WatcherService of the CSV Source doesn't process events that concern the given topic, until the resume method is called. To be able to call this pause method, the topic must extend the ACsvTopic class.
  • resumeListen(topic): Resumes the listening of events on a given topic. Additionally, this method automatically triggers the differentialReload() method on the given topic.
  • differentialReload(topic): Triggers a differential reload of the topic. it computes the current state of the topic, compares it to the committed state, and generates an event that only loads the difference between the two states. This means that after calling resumeListen() on a topic, the CSV Source processes all the changes that happened during the pause period of time.

    Warning: This method relies on the Last Modified Time property of the files, which in some file systems can take up to a second to be updated.

  • fullReload(topic): Performs a full reload of the topic: all files linked to this topic are reloaded.
  • simulateDifferentialReload(topic): Simulates a differential reload of the given topic and returns the changes that the differential reload would have triggered.

Customized Monitoring

Before reading this section, please refer to the monitoring documentation.

Several ICsvHealthEvent have been added to the CSV Source to ease monitoring.

  • CsvFileParsingProgress is dispatched to inform the user about the progress of the current parsing. It contains the full name of the file and the parsing information related to the exact moment of the dispatch. associated tags: csv, parsing, source
  • CsvFileParsingDone is dispatched when a given file parsing is completed. The ChannelTask is closed, meaning that no new ParsingTask can be created. It contains the full name of the file and the parsing information related to the exact moment of the dispatch. associated tags: csv, parsing, source
  • CsvFileParsingFailure is dispatched in case of a full failure of a CSV file parsing. It contains the full name of the file, the error message, and the parsing information related to the exact moment of the failure. associated tags: csv, parsing, source
  • CsvLineParsingFailure is dispatched in case of a failure of a one line parsing in a CSV file. It contains the full name of the file, the failing line content and line number, and the error message. associated tags: csv, parsing, source