Local CSV

Local CSV Source

The LocalCsvSource allows for loading of CSV files from the local filesystem. The Local CSV Source can be configured as a Fetching or Listening source.

Maven Dependency

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- Local CSV Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>a
        <artifactId>data-connectors-csv</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Channel Configuration

A CSV Channel must use a DlcMessageHandler so the DLC can be notified in the event an error was throw during tuple publishing. A CSVMessageChannel by default uses a LogMessageHandler which will simply log the error and continue. Because of this the error will not be propagated upwards to the DLC.

This can be done by using a DlcCSVMessageChannelFactory to create the channels without having to worry about adding the DlcMessageHandler. Alternatively a CSVMessageChannelFactory can be used to create the CSVMessageChannels, but you must set the message handler to use a DlcMessageHandler or a class extending DlcMessageHandler. For info on extending the DlcMessageHandler please read the Creating Custom CSV Message Handlers section.

It is recommended to use a DlcCSVMessageChannelFactory as the DlcStoreMessageChannel channel created will only accept message handlers that are or extend DlcMessageHandler.

See CSV Source Channel Configuration on how to register a Channel.

// Create Dlc message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new DlcCSVMessageChannelFactory(csvSource, datastore);
// Channels created using the DlcCSVMessageChannelFactory will have a DlcMessageHandler so there is no need to set the message handler.
IStoreMessageChannel channel = csvMessageChannelFactory.createChannel(...);
// Create the CSV message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new CSVMessageChannelFactory(csvSource, datastore);
// Channels created using the CSVMessageChannelFactory will not have a DlcMessageHandler, so we will need to add it
IMessageChannel channel = csvMessageChannelFactory.createChannel(...);
channel.withMessageHandler(new DlcMessageHandler());

Source Configuration

CSVs can be loaded by any implementing class of ACsvScopedFetchSource that supports fetching data for a specific scope (for example, CobDate sub-folder) that is specified at runtime for each topic.

To load from local filesystem, a LocalCsvScopedFetchSource or LocalCsvScopedListenSource source object needs to be created. This object will contain many CsvFilesScopedFetchTopic or CsvFilesScopedFetchTopic topics.

Below is an example:

/**
 * Create a Local CSV Scoped Fetch Source
 * 
 * @return A new LocalCsvScopedFetchSource
 */
public LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource() {
	/*
	 Configure Local CSV Source
	 */
	
	var csvParserConfiguration = new CSVSourceConfiguration.CSVSourceConfigurationBuilder<Path>()
			.parserThreads(10)
			.bufferSize(1024)
			.build();

	final var sourceName = "Local Filesystem CSV Source";
	
	var dlcCSVSourceConfiguration = new DlcCSVSourceConfiguration<Path>(
            csvParserConfiguration, 
            sourceName);
    
	var csvSource = new LocalCsvScopedFetchSource<Path>(dlcCSVSourceConfiguration);
	
	return csvSource;
}

Topic Configuration

The Local CSV Source consists of one or many Topics of type CsvFilesScopedFetchTopic or CsvFilesScopedListenTopic for Fetch or Listen Sources respectively. Fetch and Listen Topics cannot be mixed with opposite Fetch or Listen sources. The Topic name must be globally unique (to all other Topics). The Local CSV Topic’s consist of the following parameters:

Topic Parameter Description
name Name of this topic. Must be globally unique among all Topics
rootDirectoryPath Base directory to apply pathMatcher to
pathMatcher PathMatcher to match file paths for this topic
fetchScopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter
csvParserConfiguration ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...)
Topic Parameter Description
topicName Name of this topic. Must be globally unique among all Topics
rootDirectoryPath Base directory to apply pathMatcher to
pathMatcher PathMatcher to match file paths for this topic
scopeToFileScanParametersConverter Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedListenTopic.ScopeToFileScanParametersConverter
csvParserConfiguration ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...)
extraProperties Can contain optional property “pollingDelay” describing millisecond pause between checks for CSV file changes. Default is 500

Below is an example of creating and adding a Topic to a Local CSV Scoped Fetch Source:

/**
 * This method will create and add the Cash Flow topic to the CSV Source.
 * 
 * @param localCsvScopedFetchSource The CSV Source to add the Cash Flow Topic to.
 */
public void addCashFlowTopicToSource(LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource){

	/*
	 Topic Properties:
	 */
	
	final var topicName = "Cash Flow Topic";
	
	// The scope key "AsOfDate" will be used to match a folder within the rootDirectoryPath directory.
	var fetchScopeToFileScanParametersConverter = new CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter("AsOfDate");
	
	
	/*
	CSV File Properties:
	*/
	
	final var rootDirectoryPath = "/path/to/data/";
	final var cashFlowFilePattern = "glob:**/cash_flow_input_data.csv";
	final var cashFlowFileColumns = List.of("Header 1", "Header 2", "Header 3");
	final var numLinesToSkip = 1;
	
	final var cashFlowFileMatcher = FileSystems.getDefault().getPathMatcher(cashFlowFilePattern);
	
	
	/*
	Create Topic
	 */
	var cashFlowTopic = new CsvFilesScopedFetchTopic(
			topicName,
			rootDirectoryPath,
			cashFlowFileMatcher,
			fetchScopeToFileScanParametersConverter,
			localCsvScopedFetchSource.createParserConfiguration(cashFlowFileColumns, numLinesToSkip)
	);
	
	/*
	Add Topic to CSV Source:
	 */
	localCsvScopedFetchSource.addTopic(cashFlowTopic);
}

Java Topic properties:

public CsvFilesScopedFetchTopic(
    final String name,
    final String rootDirectoryPath,
    final PathMatcher pathMatcher,
    final IFilesScopedTopic.IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
    final ICSVParserConfiguration csvParserConfiguration)
public CsvFilesScopedListenTopic(
    final String topicName,
    final String rootDirectoryPath,
    final PathMatcher pathMatcher,
    final IFilesScopedTopic.IScopeToFileScanParametersConverter scopeToFileScanParametersConverter,
    final ICSVParserConfiguration configuration,
    final Properties extraProperties)