Local CSV

Local CSV Source

The LocalCsvSource allows for loading of CSV files from the local filesystem. The Local CSV Source can be configured as a Fetching or Listening source.

Maven Dependency

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- Local CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>a
        <artifactId>data-connectors-csv</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Channel Configuration

A CSV Channel must use a DlcMessageHandler so the DLC can be notified in the event an error occurs during tuple publishing. A CSVMessageChannel uses a LogMessageHandler by default. This simply logs the error and continues. Because of this, the error isn’t propagated upwards to the DLC.

To do this, use a DlcCSVMessageChannelFactory to create the channels without having to worry about adding the DlcMessageHandler. Alternatively, you can use a CSVMessageChannelFactory to create the CSVMessageChannels, but you must set the message handler to use a DlcMessageHandler or a class extending DlcMessageHandler.

For more details on extending the DlcMessageHandler see Creating Custom CSV Message Handlers.

We recommend using a DlcCSVMessageChannelFactory, as the DlcStoreMessageChannel channel created will only accept message handlers that are or extend DlcMessageHandler.

// Create Dlc message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new DlcCSVMessageChannelFactory(csvSource, datastore);
// Channels created using the DlcCSVMessageChannelFactory will have a DlcMessageHandler so there is no need to set the message handler.
IStoreMessageChannel channel = csvMessageChannelFactory.createChannel(...);
// Create the CSV message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new CSVMessageChannelFactory(csvSource, datastore);
// Channels created using the CSVMessageChannelFactory will not have a DlcMessageHandler, so we will need to add it
IMessageChannel channel = csvMessageChannelFactory.createChannel(...);
channel.withMessageHandler(new DlcMessageHandler());

Source Configuration

CSVs can be loaded by any implementing class of ACsvScopedFetchSource that supports fetching data for a specific scope (for example, CobDate sub-folder) that is specified at runtime for each topic.

To load from local filesystem, a LocalCsvScopedFetchSource or LocalCsvScopedListenSource source object needs to be created. This object will contain many CsvFilesScopedFetchTopic or CsvFilesScopedFetchTopic topics.

Below is an example:

@Bean
public LocalCsvScopedFetchSource localCsvScopedFetchSource() {
    final Properties sourceProps = new Properties();
    sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "LocalCSVScopedSource");

    // Additional Properties to optimize the CSV Source.
    final String parserThreads = Runtime.getRuntime().availableProcessors() / 2;
    sourceProps.setProperty(ICSVSourceConfiguration.PARSER_THREAD_PROPERTY, parserThreads);
    final String bufferSize = 1024;
    sourceProps.setProperty(ICSVSourceConfiguration.BUFFER_SIZE_PROPERTY, bufferSize);

    List<String> scopeKeys = new ArrayList<>();
    scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);

    final LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource = new LocalCsvScopedFetchSource<>(sourceProps);

    localCsvScopedFetchSource.addTopic(                
        new CsvFilesScopedFetchTopic(CSV_TOPIC__CASHFLOW,
                csvDataRootDirPath,
                CASHFLOW_FILE_PATTERN == null ? null : FileSystems.getDefault().getPathMatcher(CASHFLOW_FILE_PATTERN),
                new CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(
                        scopeKeys.toArray(new String[scopeKeys.size()])),
                localCsvScopedFetchSource.createParserConfiguration(CASHFLOW_FILE_COLUMNS, linesToSkip)
        )
    );

    return localCsvScopedFetchSource;
}

Topic Configuration

The Local CSV Source consists of one or many Topics of type CsvFilesScopedFetchTopic or CsvFilesScopedListenTopic for Fetch or Listen Sources respectively. Fetch and Listen Topics cannot be mixed with opposite Fetch or Listen sources. The Topic name must be globally unique (to all other Topics). The Local CSV Topic’s consist of the following parameters:

Topic Parameter Description
name Name of this topic. Must be globally unique among all Topics
rootDirectoryPath Base directory to apply pathMatcher to
pathMatcher PathMatcher to match file paths for this topic
fetchScopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter
csvParserConfiguration ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...)
Topic Parameter Description
topicName Name of this topic. Must be globally unique among all Topics
rootDirectoryPath Base directory to apply pathMatcher to
pathMatcher PathMatcher to match file paths for this topic
scopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedListenTopic.ScopeToFileScanParametersConverter
csvParserConfiguration ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...)
extraProperties Can contain optional property “pollingDelay” describing millisecond pause between checks for CSV file changes. Default is 500

Java Topic properties:

public CsvFilesScopedFetchTopic(
    final String name,
    final String rootDirectoryPath,
    final PathMatcher pathMatcher,
    final IFilesScopedTopic.IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
    final ICSVParserConfiguration csvParserConfiguration)
public CsvFilesScopedListenTopic(
    final String topicName,
    final String rootDirectoryPath,
    final PathMatcher pathMatcher,
    final IFilesScopedTopic.IScopeToFileScanParametersConverter scopeToFileScanParametersConverter,
    final ICSVParserConfiguration configuration,
    final Properties extraProperties)
search.js