CSV Source
You should be familiar with the source ETL concepts before reading the CSV specific documentation.
Check out the example code snippets while reading this documentation.
An ICsvSource
is a csv-related implementation of the ISource
interface.
ISource
is the generic framework used in Atoti for fetching from external data sources and contributing to an Atoti IDatastore
.
The purpose of the ICsvSource
is to load data from a csv file.
ETL pipeline
Extract
CSV Topics
ICsvTopic
s are used to load data from CSV files into a datastore.
For CSV files that come from the local file system, we provide three implementations:
- a single CSV file topic, where a single file is watched for updates
- a directory topic, where a directory is watched for new or updated files that match a given pattern
- a multiple file topic, where a list of files is watched for updates.
Configuring these topics can be easily done through a
FileSystemCsvTopicFactory
.
When creating a FileSystemCsvTopicFactory
, which permits the creation of topics associated with their parser configuration, you can decide to follow symbolic links when registering a folder to watch.
Note that other CSV topic implementations exist for files that do not come from the local file system (Cloud source), that we do not cover in this article. Even though the way to access the file contents is different, all other aspects of data loading stay the same.
Each ICsvTopic
holds its own CsvParserConfiguration
, which states the delimiter used, the
columns of the files and their position, and other properties.
CSV parser
Atoti provides a high-performance multithreaded parser to extract data from CSV files into memory, in order to perform the extraction step of the ETL pipeline.
There are several levels of configuration for the ICsvSource
.
Each defined topic, whether it is a SingleFileCsvTopic
, a DirectoryCsvTopic
,
or is created using a FileSystemCsvTopicFactory
, comes with its own CsvParserConfiguration
.
Parser configuration
A CsvParserConfiguration
can be created using CsvParserConfiguration.builder()
.
First, this builder requires to define the columns of the CSV file. This information must be provided when creating the parser. It can be defined in several ways:
- by specifying the number of columns:
.withColumnCount(int)
- by specifying the names of the columns:
.withColumnNames(List<String>)
- the user must specify all column names in the file. The ones that do not correspond to a matching column in the datastore are skipped
- by specifying the mapping of the column index to the column name:
.withColumnNamesMapping(Map<Integer, String>)
- the user can specify only the columns that are of interest
CsvParserConfiguration
contains other properties, as described in the table below:
Property | Description | Default value |
---|---|---|
acceptOverflowingLines | When set to false, a record that contains more elements is rejected, otherwise the additional elements are ignored | false |
acceptIncompleteLines | When set to false, a record that contains fewer elements is rejected If set to true, null values are inserted into the missing columns. | true |
separator | The separator used to separate fields in the CSV file | , (comma) |
processQuotes | Sets the behavior of the parser regarding quotes DetailsWhen set to true, the parser follows the official CSV specification, meaning:
| true |
numberSkippedLines | The number of lines to skip at the beginning of each file (header) | 0 |
Parser internals
Principle
The parser relies on NIO (Non-blocking I/O) channel, memory buffers and multithreading to speed-up character decoding and object parsing, providing thus high speed data analysis through fast data loading.
The implementation aims at high performance and detailed configuration. It allows fetching data from, and listening for changes in CSV files in a multithreaded way in minimal times, while keeping the amount of transient memory to a minimum.
Moreover, CSV work is organized internally in workloads that they execute. Nevertheless, it is also possible for custom code to bypass the CSV Source standard pipelines, to take control of parsing operations, deciding what files are loaded into what channel, and when.
Multithreaded CSV parsing isn't as straight-forward as splitting the file and assigning each chunk to a single thread.
This example illustrates the difficulties:
Given the following CSV:
ID, Name, Price, Comment
0,"nameA",1,"This is a comment, is it not"
1,"nameB",2,"Another comment"
If this file is split in half (character wise, since the parser doesn't know about context, line delimiter, and so on, at that point of the algorithm), the first thread will have the bytes that correspond to
0,"nameA",1,"This is a comment
while the second thread gets
, is it not"
1,"nameB",2,"Another comment"
Now the second thread sees a delimiter, the character ,
but as it doesn't know the context, it cannot
know if it is within a comment or not. It could count the "
character and deduce, after
parsing the entire block, that it was within a comment, but that technique has its own limits.
ActiveViam's multithreaded parser can read over a million entries per second.
Concurrency and cache locality
The CSV Source uses a single unified thread pool that runs a single type of task. Each task performs the following sub-tasks:
- Fetch bytes from a byte channel that represents the file
- Decode the bytes into characters
- Split these characters into lines
- Parse the lines into records
- Publish the records to a source listener
The parser optimizes memory access by having all transient data remain in the CPU's cache for the duration of the computation. Each task works separately without the need for synchronization with the other tasks in the thread pool.
In a typical usage of the CSV source, the first three sub-tasks of each task (fetch, decode, split) are much less expensive than the following two (parse, publish). Thus, as soon as a task is done with its decoding sub-task, it spawns the next task, and can give that task context regarding delimiters.
A CSV parsing process can therefore be seen as such:
Channels and messages
The IMessageChannel
s are each linked to a single topic and are used to gather the data, as it is being parsed,
into messages. They are also responsible for sending those messages to the datastore.
Most messages are IStoreMessage
s, and gather the data as tuples to add to the datastore.
When asked to fetch
data for specific IMessageChannel
s, given as arguments, the ICsvSource
polls the corresponding ICsvTopic
s and generates the corresponding workload.
Note that several channels can correspond to the same topic. For example, a file contains some columns that correspond to a given store, and other columns that correspond to a different store. In this case there will be one channel for each store. The two channels depend on the same file and therefore on the same topic. The two channels are grouped into a composite channel so that the topic polling and file parsing into lines is only done once but then the lines are dispatched to each individual channel for specific parsing into records.
Translators
A default ITranslator
is automatically created within the MessageChannelFactory
,
and converts parsed lines to tuples that can be inserted into the datastore.
Data can be enriched in the ITranslator
using IColumnCalculator
s.
IColumnCalculator
s work for single line data transformation (the data needed for the transformation is contained on one line of the CSV file).
For example:
- convert existing data (transform a numerical month into a text string)
- generate new data
For each tuple written into an IMessage
, an IColumnCalculator
calculates the value to write in the corresponding added column.
An ITranslator
with IColumnCalculator
s can be created thanks to CsvMessageChannelFactory#createTranslator()
.
A
IColumnCalculator
is related to a single topic.
ColumnCalculator
ColumnCalculator | Description |
---|---|
CsvColumnParser | Converts a column to another type DetailsA Formatters can be added, provided they are known to Atoti Server. For instance, a "date" column at position 1 in the source can be parsed like so: new CsvColumnParser("date", "localDate[yyyy-MM-dd]", 1) |
ConstantCalculator | Adds the same predetermined constant value in the given column position, for every tuple. |
LineIndexCalculator | Adds the index of the line being parsed in the current CSV file. This is primarily used to add an ID to each element. |
FileNameCalculator | Adds the name of the current CSV file as a value for the given column. This is primarily used when the imported files each represent a specific entity. |
FilePathCalculator | Adds the absolute path to the current CSV file as a value for the given column. |
FileLastModifiedCalculator | Adds the last modified date of the current CSV file as a value for the given column. |
EmptyCalculator | Adds an empty (null) cell to the tuple. DetailsThis has two major use cases:
|
ICsvSourceConfiguration
The CSV Source Configuration is a higher level configuration.
Property | Description | Default value |
---|---|---|
parserThreads | Sets the number of threads that run the parsing in parallel. | Half of the available processors, but no more than 8 and not less than 2 |
bufferSize | Sets the size of a byte buffer, in kilobytes. | 64 kB |
synchronousMode | Sets the processing mode of the CSV source. DetailsIn synchronous mode, the processing of a new file can only start when all the activity related to the previous file has been completed, such as the fetching, parsing, and publishing of the parsed records. The publishing phase might also include any processing implemented on the CSV source listener side. So, if the listener implements any asynchronous processing once it receives the records, the CSV Source processing won't wait before switching to the following file. | false |
fileComparator | Sets the comparator used to sort the files of a topic before parsing them. DetailsIf you need the files from a topic to be parsed in a specific order (for example, you have one file per day and want to parse the files in historical order). If your topic is a local file system directory topic, the provided comparator must be an | None |
Pipeline execution
Fetch
ICsvSource#fetch
is the method that triggers the data loading from the source to the datastore.
Listen
ICsvSource#listen
is the method that triggers listening for new data.
Listening allows any new data in the files to be fetched when it is added. To do so, two different approaches are available in Atoti:
- by polling the files at a given interval, and if the last modification time of one file has changed, the content is reloaded and checked and compared to the data already loaded
- via a
WatcherService
, the files are watched for changes and the data is loaded as soon as a change is detected (WatcherService
relies on ajava.nio.file.WatchService
)
By default, the approach via WatcherService
is used, as it provides better latency.
Indeed, when using the polling mechanism, the last modification time of the file is checked when deciding whether to reload a file.
On some file systems, the last modification time precision can be up to a second:
- This can lead to delay in reloading the file, as the minimal refresh frequency is one second.
- This can lead to huge delays when having irregular change frequency, as if two updates happens in the same second, the first update could trigger the reload, and the second one would not, ending up being processed with the next update, which could be much later.
Reloading a file is a necessary step, in order to check for changes in the content.
We recommended that topics are split in several files (using a DirectoryCsvTopic
for instance). This avoids large files that take a long time to reload.
With that design, only the files that have been updated will be reloaded, and not the whole topic.
Monitoring
Reporting
Optionally, the CSVSource can generate an ICsvTopicParsingReport
for each fetched topic.
This report will include a ICsvFileParsingReport
for each parsed file of the former topic, including all the parsing errors encountered.
These reports are returned by the fetch()
method of the source, if parsingReportEnabled(boolean)
is set to true
in the CsvSourceConfiguration
.
Each ICsvFileParsingReport
includes a wide range of parsing information:
- a list of all encountered errors on the form of
ICsvAnomaly
, - the number of successfully parsed and lines loaded into the datastore
- aggregated execution times for various steps of the parsing process.
Only the first errors are taken into account in order to avoid creating an enormous report which will result in high memory consumption (for instance while parsing a large malformed file)
The number of stored errors can be set by specifying the maxParsingAnomalies(int)
in the CsvSourceConfiguration
, which defaults to 1000
.
CsvSource MBean
The CSV Source can first be monitored using the associated JMX Bean. It can be created using:
@Bean
@DependsOn(value = "csvSource")
public JMXEnabler JMXCsvSourceEnabler() {
return new JMXEnabler(new CsvMonitoring(csvSource()));
}
with csvSource
being a Bean itself.
The following is a list of available operations and attributes:
getTopics
: Retrieves the list of topics defined in the CSV Source, coupled with their actual state (paused or not, seepauseListen(topic)
andresumeListen(topic)
below).getCommittedState(topic)
: Returns the committed state of a given topic: all the files submitted for processing and their last modification time.pauseListen(topic)
: Allows you to pause the listening of events on a given topic, meaning that the WatcherService of the CSV Source doesn't process events that concern the given topic, until the resume method is called. To be able to call this pause method, the topic must extend theACsvTopic
class.resumeListen(topic)
: Resumes the listening of events on a given topic. Additionally, this method automatically triggers thedifferentialReload()
method on the given topic.differentialReload(topic)
: Triggers a differential reload of the topic. it computes the current state of the topic, compares it to the committed state, and generates an event that only loads the difference between the two states. This means that after calling resumeListen() on a topic, the CSV Source processes all the changes that happened during the pause period of time.Warning: This method relies on the Last Modified Time property of the files, which in some file systems can take up to a second to be updated.
fullReload(topic)
: Performs a full reload of the topic: all files linked to this topic are reloaded.simulateDifferentialReload(topic)
: Simulates a differential reload of the given topic and returns the changes that the differential reload would have triggered.
Customized Monitoring
Before reading this section, please refer to the monitoring documentation.
Several ICsvHealthEvent
have been added to the CSV Source to ease monitoring.
CsvFileParsingProgress
is dispatched to inform the user about the progress of the current parsing. It contains the full name of the file and the parsing information related to the exact moment of the dispatch. associated tags: csv, parsing, sourceCsvFileParsingDone
is dispatched when a given file parsing is completed. TheChannelTask
is closed, meaning that no newParsingTask
can be created. It contains the full name of the file and the parsing information related to the exact moment of the dispatch. associated tags: csv, parsing, sourceCsvFileParsingFailure
is dispatched in case of a full failure of a CSV file parsing. It contains the full name of the file, the error message, and the parsing information related to the exact moment of the failure. associated tags: csv, parsing, sourceCsvLineParsingFailure
is dispatched in case of a failure of a one line parsing in a CSV file. It contains the full name of the file, the failing line content and line number, and the error message. associated tags: csv, parsing, source