Local CSV
Local CSV Source
The LocalCsvSource allows for loading of CSV files from the local filesystem. The Local CSV Source can be configured as a Fetching or Listening source.
Maven Dependency
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- Local CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>a
<artifactId>data-connectors-csv</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Channel Configuration
A CSV Channel must use a DlcMessageHandler
so the DLC can be notified in the event an error occurs during tuple publishing. A CSVMessageChannel
uses a LogMessageHandler
by default. This simply logs the error and continues. Because of this, the error isn’t propagated upwards to the DLC.
To do this, use a DlcCSVMessageChannelFactory
to create the channels without having to worry about adding the DlcMessageHandler
.
Alternatively, you can use a CSVMessageChannelFactory
to create the CSVMessageChannel
s, but you must set the message handler to use a DlcMessageHandler
or a class extending DlcMessageHandler
.
For more details on extending the DlcMessageHandler
see Creating Custom CSV Message Handlers.
We recommend using a DlcCSVMessageChannelFactory
, as the DlcStoreMessageChannel
channel created will only accept message handlers that are or extend DlcMessageHandler
.
// Create Dlc message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new DlcCSVMessageChannelFactory(csvSource, datastore);
// Channels created using the DlcCSVMessageChannelFactory will have a DlcMessageHandler so there is no need to set the message handler.
IStoreMessageChannel channel = csvMessageChannelFactory.createChannel(...);
// Create the CSV message channel factory
CSVMessageChannelFactory csvMessageChannelFactory = new CSVMessageChannelFactory(csvSource, datastore);
// Channels created using the CSVMessageChannelFactory will not have a DlcMessageHandler, so we will need to add it
IMessageChannel channel = csvMessageChannelFactory.createChannel(...);
channel.withMessageHandler(new DlcMessageHandler());
Source Configuration
CSVs can be loaded by any implementing class of ACsvScopedFetchSource
that supports fetching data for a
specific scope (for example, CobDate sub-folder) that is specified at runtime for each topic.
To load from local filesystem, a LocalCsvScopedFetchSource
or LocalCsvScopedListenSource
source object needs to be created. This object will contain many CsvFilesScopedFetchTopic
or CsvFilesScopedFetchTopic
topics.
Below is an example:
@Bean
public LocalCsvScopedFetchSource localCsvScopedFetchSource() {
final Properties sourceProps = new Properties();
sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "LocalCSVScopedSource");
// Additional Properties to optimize the CSV Source.
final String parserThreads = Runtime.getRuntime().availableProcessors() / 2;
sourceProps.setProperty(ICSVSourceConfiguration.PARSER_THREAD_PROPERTY, parserThreads);
final String bufferSize = 1024;
sourceProps.setProperty(ICSVSourceConfiguration.BUFFER_SIZE_PROPERTY, bufferSize);
List<String> scopeKeys = new ArrayList<>();
scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
final LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource = new LocalCsvScopedFetchSource<>(sourceProps);
localCsvScopedFetchSource.addTopic(
new CsvFilesScopedFetchTopic(CSV_TOPIC__CASHFLOW,
csvDataRootDirPath,
CASHFLOW_FILE_PATTERN == null ? null : FileSystems.getDefault().getPathMatcher(CASHFLOW_FILE_PATTERN),
new CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(
scopeKeys.toArray(new String[scopeKeys.size()])),
localCsvScopedFetchSource.createParserConfiguration(CASHFLOW_FILE_COLUMNS, linesToSkip)
)
);
return localCsvScopedFetchSource;
}
Topic Configuration
The Local CSV Source consists of one or many Topics of type CsvFilesScopedFetchTopic
or CsvFilesScopedListenTopic
for Fetch or Listen Sources respectively. Fetch and Listen Topics cannot be mixed with opposite Fetch or Listen sources. The Topic name must be globally unique (to all other Topics). The Local CSV Topic’s consist of the following parameters:
Topic Parameter | Description |
---|---|
name | Name of this topic. Must be globally unique among all Topics |
rootDirectoryPath | Base directory to apply pathMatcher to |
pathMatcher | PathMatcher to match file paths for this topic |
fetchScopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter |
csvParserConfiguration | ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...) |
Topic Parameter | Description |
---|---|
topicName | Name of this topic. Must be globally unique among all Topics |
rootDirectoryPath | Base directory to apply pathMatcher to |
pathMatcher | PathMatcher to match file paths for this topic |
scopeToFileScanParametersConverter | Function to use Scope keys to an IFileScanParameters to scan files to load. Can use provided CsvFilesScopedListenTopic.ScopeToFileScanParametersConverter |
csvParserConfiguration | ICSVParserConfiguration for CSV file Column Names, number of lines to skip, etc. Can use LocalCsvScopedFetchSource.createParserConfiguration(...) |
extraProperties | Can contain optional property “pollingDelay” describing millisecond pause between checks for CSV file changes. Default is 500 |
Java Topic properties:
public CsvFilesScopedFetchTopic(
final String name,
final String rootDirectoryPath,
final PathMatcher pathMatcher,
final IFilesScopedTopic.IScopeToFileScanParametersConverter fetchScopeToFileScanParametersConverter,
final ICSVParserConfiguration csvParserConfiguration)
public CsvFilesScopedListenTopic(
final String topicName,
final String rootDirectoryPath,
final PathMatcher pathMatcher,
final IFilesScopedTopic.IScopeToFileScanParametersConverter scopeToFileScanParametersConverter,
final ICSVParserConfiguration configuration,
final Properties extraProperties)