DLC Real-Time Data Sources
CSV Source
CSV files can be loaded in real-time by implementing the ACsvScopedListenSource
class and its startListening()
and stopListening()
methods.
Currently supported Listening sources are as follows:
CSV File | Source Implementation Class | Data-Connectors Module |
---|---|---|
On Local Filesystem | LocalCsvScopedListenSource | data-connectors-csv |
On Azure | AzureCsvScopedListenSource | data-connectors-azure |
On Google Cloud Platform | GcpCsvScopedListenSource | data-connectors-gcp |
On AWS S3 Bucket | S3CsvScopedListenSource | data-connectors-aws |
note
- A CSV Listen topic CANNOT have the same name as a CSV Fetch topic. An ActiveViamRuntime Exception will be thrown.
- Deletion of a listed file will not remove that file’s data from the cube. This is a feature that is not supported.
Additional properties:
The CSV Listen will poll from the target file in increments set by the PROP_POLLING_DELAY
property of the FileScopedListenTopic
. By default, the CSV listeners will poll every 100ms. This can be overridden by setting the PROP_POLLING_DELAY
of the Properties
sent to the FileScopedListenTopic
.
Load real-time from local filesystem
We first must ensure we are importing the correct dependency. Your POM should have:
<!-- Local CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-csv</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
To establish real time data-loading from local filesystem, a FileScopedListenTopic
needs to be created.
Example:
@Bean
public LocalCsvScopedListenSource(Properties props) {
final Properties sourceProps = new Properties();
sourceProps.setProperty(ACsvScopedListenSource.PROP__SOURCE_NAME, "LocalCSVScopedListenSource");
Properties topicProperties = new Properties();
topicProperties.put(CSVFileListener.PROP__SOURCE_NAME, new LocalCsvScopedListenSource(sourceProps));
topicProperties.put(FileScopedListenTopic.PROP_POLLING_DELAY, pollingDelayValue); // Set via Application Properties, see section below
List<String> scopeKeys = new ArrayList<>();
scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
final LocalCsvScopedListenSource<Path> localCsvScopedListenSource = new LocalCsvScopedListenSource (sourceProps);
localCsvScopedListenSource.addTopic(
new FileScopedListenTopic(
CSV_TOPIC__CASHFLOW,
localCsvScopedListenSource.createParserConfiguration(CASHFLOW_FILE_COLUMNS, linesToSkip),
csvDataRootDirPath,
new WatcherService(),
pathMatcherSyntaxAndPattern == null ? null
: FileSystems.getDefault().getPathMatcher(pathMatcherSyntaxAndPattern),
new FileScopedListenTopic.FetchScopeToFileScanParametersConverter(
DataLoadControllerRestService.SCOPE_KEY__COB_DATE),
topicProperties);
)
);
}
Load real-time from cloud
We first must ensure we are importing the correct dependency. Your POM should have one or many of the following:
<!-- For Aws CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-aws</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
<!-- For Azure CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-azure</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
<!-- For Google-Cloud-Platform CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-gcp</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
To load from a cloud storage, the CSV source and cloud specific CSV topic can be configured as below -
@Bean
public S3CsvScopedListenSource s3CsvScopedListenSource() {
final Properties sourceProps = new Properties();
sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "S3CsvScopedListenSource");
List<String> scopeKeys = new ArrayList<>();
scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
final S3CsvScopedListenSource s3CsvScopedListenSource = new S3CsvScopedListenSource(sourceProps);
s3CsvScopedListenSource.addTopic(
createListenDirectoryTopic(
source, CSV_TOPIC__TRADE, inputDataRootDirPath , DELIV_SUBDIR,
INPUT_FILE_PATH_MATCHER_TRADE, source.createParserConfiguration(generateTradeCsvFileds(), NB_HEADER_LINES_TO_SKIP)
)
);
return s3CsvScopedListenSource;
}
@Override
protected ICSVTopic<ICloudEntityPath<S3Object>> createListenDirectoryTopic(ICSVSource<ICloudEntityPath<S3Object>> source, String topic,
String inputDataRootDirPath, String subdirectory, String pathMatcherSyntaxAndPattern, ICSVParserConfiguration createParserConfiguration) {
ICloudDirectory<S3Object> cloudDirectory = rootDirectory();
if (!subdirectory.isEmpty()) {
// Check if subdirectory contains a file separator, remove if so.
if(subdirectory.toCharArray()[0] == '\\' || subdirectory.toCharArray()[0] == '/'){
subdirectory = subdirectory.substring(1);
}
cloudDirectory = cloudDirectory.getSubDirectory(subdirectory);
}
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("**", "*");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("*", ".*");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("glob:", "");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("{", "(");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("}", ")");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace(",", "|");
Properties topicProps = new Properties();
topicProps.put("source-name", source);
topicProps.put("polling.delay", 100);
return new CsvCloudScopedListenTopic(
topic,
createParserConfiguration,
pathMatcherSyntaxAndPattern,
dataProviderFactory(),
cloudDirectory,
new FilesScopedListenTopic.FetchScopeToFileScanParametersConverter(
DataLoadControllerRestService.SCOPE_KEY__COB_DATE),
topicProps);
}
AWS S3
Dependency to include:
<!-- For Aws CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-aws</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
To load from S3 storage, a cloud specific topic CsvCloudScopedListenTopic
needs to be
created as explained above. A few additional beans are also created to establish connectivity with
remote storage. Below is the example.
The connectivity details for S3 bucket are provided via application properties.
@Bean
public ICloudDirectory<S3Object> rootDirectory() {
return new S3CloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.AWS_BUCKET), env.getRequiredProperty(ACsvSourceConfig.AWS_ROOT_DIRECTORY));
}
@Bean
public AmazonS3 client() {
return AmazonS3Client.builder()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withRegion(Regions.fromName(env.getRequiredProperty(ACsvSourceConfig.AWS_REGION)))
.withClientConfiguration(new ClientConfiguration().withMaxConnections(128))
.build();
}
@Bean
public ICloudCsvDataProviderFactory<S3Object> dataProviderFactory() {
//the maximum number of threads that will download in parallel parts of an entity
String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, "10");
return new AwsCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));
}
Azure Blob
Dependency to include:
<!-- For Azure CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>azure</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Here we create a CsvCloudScopedListenTopic
similar to S3. Below are the necessary methods to connect to an Azure Blob.
@Bean
public ICloudDirectory<CloudBlob> rootDirectory() {
return new AzureCloudDirectory(client(), env.getRequiredProperty(AZURE_CONTAINER_NAME),
env.getRequiredProperty(AZURE_ROOT_DIRECTORY));
}
@Bean
/*Expect Azure connection string in the VM parameter
* -Dazure.connection.string
*/
public CloudBlobClient client() {
String storageConnectionString = System.getProperty(AZURE_STORGE_CONNECTION_STRING);
CloudStorageAccount storageAccount;
CloudBlobClient blobClient;
CloudBlobContainer container = null;
// Parse the connection string and create a blob client to interact with Blob storage
try {
storageAccount = CloudStorageAccount.parse(storageConnectionString);
blobClient = storageAccount.createCloudBlobClient();
container = blobClient.getContainerReference(env.getRequiredProperty(AZURE_CONTAINER_NAME));
} catch (InvalidKeyException | URISyntaxException | StorageException e) {
e.printStackTrace();
throw new UnsupportedOperationException("Azure Cloud Blob Client Issues");
}
return blobClient;
}
@Bean
public ICloudCsvDataProviderFactory<CloudBlob> dataProviderFactory() {
//the maximum number of threads that will download in parallel parts of an entity
String cloudFetchThread = env.getProperty(ACTIVE_CLOUD_FETCH_THREAD, ACTIVE_CLOUD_FETCH_THREAD);
return new AzureCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));
}
Google Storage
Dependency to include:
<!-- For Google-Cloud-Platform CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-gcp</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Similar to S3 and Azure, we create a CsvCloudScopedListenTopic
. The cloud directory and
client are configured as below -
@Bean
public ICloudDirectory<Blob> rootDirectory() {
Map<String, String> temp = System.getenv();
return new GoogleCloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.GOOGLE_CONTAINER_NAME),
env.getRequiredProperty(ACsvSourceConfig.GOOGLE_ROOT_DIRECTORY));
}
@Bean
public Storage client() {
return StorageOptions.getDefaultInstance().getService();
}
@Bean
public ICloudCsvDataProviderFactory<Blob> dataProviderFactory() {
//the maximum number of threads that will download in parallel parts of an entity
String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD);
return new GoogleCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));
}
Application properties
Below is an example of the application properties configured to load from cloud storage options mentioned above. Please provide the values obtained from your account.
# General Cloud Source Properties
cloud.fetch.thread=
# AWS Source Properties
aws.bucket=
aws.root_directory=
aws.region =
aws.accessKeyId=
aws.secretKey=
# Azure Source Properites
azure.container=
azure.root_directory=
azure.connection.string=
# Google Cloud Platform Source Properties
google.container=
google.root_directory=
local.root_directory=