Local CSV
Local CSV Source
The LocalCsvSource allows for loading of CSV files from the local filesystem. The Local CSV Source can be configured as a Fetching or Listening source.
Maven Dependency
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- Local CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>a
<artifactId>data-connectors-csv</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Channel Configuration
A CSV Channel must use a DlcMessageHandler
so the DLC can be notified in the event an error was throw during tuple publishing. A CSVMessageChannel
by default uses a LogMessageHandler
which will simply log the error and continue. Because of this the error will not be propagated upwards to the DLC.
This can be done by using a DlcCSVMessageChannelFactory
to create the channels without having to worry about adding the DlcMessageHandler
.
Alternatively a CSVMessageChannelFactory
can be used to create the CSVMessageChannel
s, but you must set the message handler to use a DlcMessageHandler
or a class extending DlcMessageHandler
. For info on extending the DlcMessageHandler
please read the Creating Custom CSV Message Handlers section.
It is recommended to use a DlcCSVMessageChannelFactory
as the DlcStoreMessageChannel
channel created will only accept message handlers that are or extend DlcMessageHandler
.
See CSV Source Channel Configuration on how to register a Channel.
// Create Dlc message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new DlcCSVMessageChannelFactory(csvSource, datastore);
// Channels created using the DlcCSVMessageChannelFactory will have a DlcMessageHandler so there is no need to set the message handler.
IStoreMessageChannel channel = csvMessageChannelFactory.createChannel(...);
// Create the CSV message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new CSVMessageChannelFactory(csvSource, datastore);
// Channels created using the CSVMessageChannelFactory will not have a DlcMessageHandler, so we will need to add it
IMessageChannel channel = csvMessageChannelFactory.createChannel(...);
channel.withMessageHandler(new DlcMessageHandler());
Source Configuration
CSVs can be loaded by any implementing class of ACsvScopedFetchSource
that supports fetching data for a
specific scope (for example, CobDate sub-folder) that is specified at runtime for each topic.
To load from local filesystem, a LocalCsvScopedFetchSource
or LocalCsvScopedListenSource
source object needs to be created. This object will contain many CsvFilesScopedFetchTopic
or CsvFilesScopedFetchTopic
topics.
Below is an example:
/**
* Create a Local CSV Scoped Fetch Source
*
* @return A new LocalCsvScopedFetchSource
*/
public LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource() {
/*
Configure Local CSV Source
*/
var csvParserConfiguration = new CSVSourceConfiguration.CSVSourceConfigurationBuilder<Path>()
.parserThreads(10)
.bufferSize(1024)
.build();
final var sourceName = "Local Filesystem CSV Source";
var dlcCSVSourceConfiguration = new DlcCSVSourceConfiguration<Path>(
csvParserConfiguration,
sourceName);
var csvSource = new LocalCsvScopedFetchSource<Path>(dlcCSVSourceConfiguration);
return csvSource;
}
Topic Configuration
The Local CSV Source consists of one or many Topics of type CsvFilesScopedFetchTopic
or CsvFilesScopedListenTopic
for Fetch or Listen Sources respectively. Fetch and Listen Topics cannot be mixed with opposite Fetch or Listen sources. The Topic name must be globally unique (to all other Topics). The Local CSV Topic’s consist of the following parameters:
Topic Parameter | Description |
---|---|
name | Name of this topic. Must be globally unique among all Topics |
rootDirectoryPath | Base directory to apply pathMatcher to |
pathMatcher | PathMatcher to match file paths for this topic |
fetchScopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter |
csvParserConfiguration | ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...) |
Topic Parameter | Description |
---|---|
topicName | Name of this topic. Must be globally unique among all Topics |
rootDirectoryPath | Base directory to apply pathMatcher to |
pathMatcher | PathMatcher to match file paths for this topic |
scopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedListenTopic.ScopeToFileScanParametersConverter |
csvParserConfiguration | ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...) |
extraProperties | Can contain optional property “pollingDelay” describing millisecond pause between checks for CSV file changes. Default is 500 |
Below is an example of creating and adding a Topic to a Local CSV Scoped Fetch Source:
/**
* This method will create and add the Cash Flow topic to the CSV Source.
*
* @param localCsvScopedFetchSource The CSV Source to add the Cash Flow Topic to.
*/
public void addCashFlowTopicToSource(LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource){
/*
Topic Properties:
*/
final var topicName = "Cash Flow Topic";
// The scope key "AsOfDate" will be used to match a folder within the rootDirectoryPath directory.
var fetchScopeToFileScanParametersConverter = new CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter("AsOfDate");
/*
CSV File Properties:
*/
final var rootDirectoryPath = "/path/to/data/";
final var cashFlowFilePattern = "glob:**/cash_flow_input_data.csv";
final var cashFlowFileColumns = List.of("Header 1", "Header 2", "Header 3");
final var numLinesToSkip = 1;
final var cashFlowFileMatcher = FileSystems.getDefault().getPathMatcher(cashFlowFilePattern);
/*
Create Topic
*/
var cashFlowTopic = new CsvFilesScopedFetchTopic(
topicName,
rootDirectoryPath,
cashFlowFileMatcher,
fetchScopeToFileScanParametersConverter,
localCsvScopedFetchSource.createParserConfiguration(cashFlowFileColumns, numLinesToSkip)
);
/*
Add Topic to CSV Source:
*/
localCsvScopedFetchSource.addTopic(cashFlowTopic);
}
Java Topic properties:
public CsvFilesScopedFetchTopic(
final String name,
final String rootDirectoryPath,
final PathMatcher pathMatcher,
final IFilesScopedTopic.IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
final ICSVParserConfiguration csvParserConfiguration)
public CsvFilesScopedListenTopic(
final String topicName,
final String rootDirectoryPath,
final PathMatcher pathMatcher,
final IFilesScopedTopic.IScopeToFileScanParametersConverter scopeToFileScanParametersConverter,
final ICSVParserConfiguration configuration,
final Properties extraProperties)