DLC Data Sources
CSV Source
CSVs can be loaded by an implementing class of ACsvScopedFetchSource
that supports fetching data for a
specific scope (for example, CobDate, sub-folder) that is specified at runtime for each fetch iteration.
DLC supports loading CSVs from either local filesystem or cloud storage. Amongst the cloud storage options, AWS S3, Azure Blob and Google Storage are currently supported.
Load from local filesystem
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- Local CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>a
<artifactId>data-connectors-csv</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
To load from local filesystem, a CsvFilesScopedFetchTopic
needs to be created.
Example:
@Bean
public LocalCsvScopedFetchSource localCsvScopedFetchSource() {
final Properties sourceProps = new Properties();
sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "LocalCSVScopedSource");
List<String> scopeKeys = new ArrayList<>();
scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
final LocalCsvScopedFetchSource<Path> localCsvScopedFetchSource = new LocalCsvScopedFetchSource<>(sourceProps);
localCsvScopedFetchSource.addTopic(
new CsvFilesScopedFetchTopic(CSV_TOPIC__CASHFLOW,
csvDataRootDirPath,
CASHFLOW_FILE_PATTERN == null ? null : FileSystems.getDefault().getPathMatcher(CASHFLOW_FILE_PATTERN),
new CsvFilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(
scopeKeys.toArray(new String[scopeKeys.size()])),
localCsvScopedFetchSource.createParserConfiguration(CASHFLOW_FILE_COLUMNS, linesToSkip)
)
);
return localCsvScopedFetchSource;
}
Load from a cloud source
We first must ensure we are importing the correct dependency. Your pom should have one or many of the following:
<!-- For Aws CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-aws</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
<!-- For Azure CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-azure</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
<!-- For Google-Cloud-Platform CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-gcp</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
To load from a cloud storage, the CSV source and cloud specific CSV topic can be configured as below -
@Bean
public S3CsvScopedFetchSource s3CsvScopedFetchSource() {
final Properties sourceProps = new Properties();
sourceProps.setProperty(ACsvScopedFetchSource.PROP__SOURCE_NAME, "S3CSVScopedSource");
List<String> scopeKeys = new ArrayList<>();
scopeKeys.add(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
final S3CsvScopedFetchSource<Path> s3CsvScopedFetchSource = new S3CsvScopedFetchSource<>(sourceProps);
s3CsvScopedFetchSource.addTopic(
createDirectoryTopic(
source, CSV_TOPIC__TRADE, inputDataRootDirPath , DELIV_SUBDIR,
INPUT_FILE_PATH_MATCHER_TRADE, source.createParserConfiguration(generateTradeCsvFileds(), NB_HEADER_LINES_TO_SKIP)
)
);
return s3CsvScopedFetchSource;
}
@Override
protected ICSVTopic<ICloudEntityPath<S3Object>> createDirectoryTopic(ICSVSource source, String topic, String path, String subdirectory,
String pathMatcherSyntaxAndPattern, ICSVParserConfiguration parserConfig) {
ICloudDirectory<S3Object> cloudDirectory = rootDirectory();
if (!subdirectory.isEmpty()) {
// Check if subdirectory contains a file separator, remove if so.
if(subdirectory.toCharArray()[0] == '\\' || subdirectory.toCharArray()[0] == '/'){
subdirectory = subdirectory.substring(1);
}
cloudDirectory = cloudDirectory.getSubDirectory(subdirectory);
}
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("**", "*");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("*", ".*");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("glob:", "");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("{", "(");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace("}", ")");
pathMatcherSyntaxAndPattern = pathMatcherSyntaxAndPattern.replace(",", "|");
return new CsvCloudScopedFetchTopic(topic, cloudDirectory,dataProviderFactory(),pathMatcherSyntaxAndPattern,
new FilesScopedFetchTopic.FetchScopeToFileScanParametersConverter(
DataLoadControllerRestService.SCOPE_KEY__COB_DATE),parserConfig, "" ,null);
}
AWS S3
Dependency to include:
<!-- For Aws CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-aws</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
To load from S3 storage, a cloud specific topic CsvCloudScopedFetchTopic
needs to be
created as explained above. A few additional beans are also created to establish connectivity with
remote storage. Below is the example.
The connectivity details for S3 bucket are provided via application properties.
@Bean
public ICloudDirectory<S3Object> rootDirectory() {
return new S3CloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.AWS_BUCKET), env.getRequiredProperty(ACsvSourceConfig.AWS_ROOT_DIRECTORY));
}
@Bean
public AmazonS3 client() {
return AmazonS3Client.builder()
.withCredentials(new DefaultAWSCredentialsProviderChain())
.withRegion(Regions.fromName(env.getRequiredProperty(ACsvSourceConfig.AWS_REGION)))
.withClientConfiguration(new ClientConfiguration().withMaxConnections(128))
.build();
}
@Bean
public ICloudCsvDataProviderFactory<S3Object> dataProviderFactory() {
//the maximum number of threads that will download in parallel parts of an entity
String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, "10");
return new AwsCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));
}
Azure Blob
Dependency to include:
<!-- For Azure CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>azure</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Here we create a CsvCloudScopedFetchTopic
similar to S3.
@Bean
public ACloudDirectory<BlobClientBase> rootDirectory() {
return new AzureCloudDirectory(client(), env.getRequiredProperty(AZURE_CONTAINER_NAME_PROP),
env.getRequiredProperty(AZURE_ROOT_DIRECTORY_PROP));
}
@Bean
/*Expect Azure connection string in the VM parameter
* -Dazure.connection.string
*/
@Bean
public BlobServiceClient client() {
String storageConnectionString = System.getProperty(AZURE_STORGE_CONNECTION_STRING_PROP);
return new BlobServiceClientBuilder()
.connectionString(storageConnectionString)
.buildClient();
}
@Bean
public ICloudCsvDataProviderFactory<BlobClientBase> dataProviderFactory() {
//the maximum number of threads that will download in parallel parts of an entity
String cloudFetchThread = env.getProperty(ACTIVE_CLOUD_FETCH_THREAD_PROP, "10");
return new AzureCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread)));
}
Google Storage
Dependency to include:
<!-- For Google-Cloud-Platform CSV Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-gcp</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Similar to S3 and Azure, we create a CsvCloudScopedFetchTopic
. The cloud directory and
client are configured as below -
@Bean
public ICloudDirectory<Blob> rootDirectory() {
Map<String, String> temp = System.getenv();
return new GoogleCloudDirectory(client(), env.getRequiredProperty(ACsvSourceConfig.GOOGLE_CONTAINER_NAME),
env.getRequiredProperty(ACsvSourceConfig.GOOGLE_ROOT_DIRECTORY));
}
@Bean
public Storage client() {
return StorageOptions.getDefaultInstance().getService();
}
@Bean
public ICloudCsvDataProviderFactory<Blob> dataProviderFactory() {
//the maximum number of threads that will download in parallel parts of an entity
String cloudFetchThread = env.getProperty(ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD, ACsvSourceConfig.ACTIVE_CLOUD_FETCH_THREAD);
return new GoogleCsvDataProviderFactory(new CloudFetchingConfig(Integer.parseInt(cloudFetchThread) ));
}
Application properties
Below is an example of the application properties configured to load from cloud storage options mentioned above. Please provide the values obtained from your account.
# General Cloud Source Properties
cloud.fetch.thread=
# AWS Source Properties
aws.bucket=
aws.root_directory=
aws.region =
aws.accessKeyId=
aws.secretKey=
# Azure Source Properites
azure.container=
azure.root_directory=
azure.connection.string=
# Google Cloud Platform Source Properties
google.container=
google.root_directory=
local.root_directory=
JDBC Source
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- For JDBC Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-jdbc</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
We build an instance of JdbcScopedFetchSource
that collects all the JDBC topics.
It can load data from a Database into ActivePivot, based on SQL statements (defined via source topics)
and fetch business scopes, which are defined at fetch time to derive
the SQL input parameters required for injection into those statements.
Example:
@Bean(destroyMethod = "close")
public IJdbcScopedFetchSource jdbcSource() {
final IJdbcScopedFetchSource source = new JdbcScopedFetchSource(dbConnectionConfig.dbConnectionFactory(), null);
final String productTopicSql = String.join(" ",
"SELECT ProductId AS ",
DatastoreNames.PRODUCT_ID,
", Name AS ",
DatastoreNames.PRODUCT_NAME,
", Color AS ",
DatastoreNames.PRODUCT_COLOR,
", Price AS ",
DatastoreNames.PRODUCT_PRICE,
", CobDate AS ",
DatastoreNames.PRODUCT_COB_DATE,
" FROM ", PRODUCT_TABLE_NAME
," WHERE CobDate = ?"
);
source.addTopic(
new JdbcStatementTopic(
JDBC_TOPIC__PRODUCT,
productTopicSql,
new CobDateScopeToSqlParametersConverter(),
null,
JdbcPreparedStatementTask.PLUGIN_KEY,
null));
return source;
}
private static class CobDateScopeToSqlParametersConverter extends JdbcStatementTopic.ScopeToSqlParametersConverter {
public CobDateScopeToSqlParametersConverter() {
super(DataLoadControllerRestService.SCOPE_KEY__COB_DATE);
}
@Override
protected Object formatValue(
final String key,
final Object value) {
if (DataLoadControllerRestService.SCOPE_KEY__COB_DATE.equals(key)) {
return java.sql.Date.valueOf(LocalDate.parse((String) value));
}
return super.formatValue(key, value);
}
}
Messaging Source
We first must ensure we are importing the correct dependency. Your pom should have one of the following:
<!-- For Kafka Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-kafka</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
<!-- For RabbitMq Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-rabbitmq</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
MessagingSource
currently supports loading from Apache Kafka and RabbitMQ out of the box.
To load data from a different messaging system, an implementation of IMessagingConsumer
needs to be provided.
Following section shows how to configure the source and topic
for such a system, via Spring config.
Remote broker details
One can specify the connectivity details of a remote broker with a Spring config.
Depending on the kind of security config in a project, one may need to add/remove certain properties.
The connection properties can be wrapped in a Properties
object:
@Bean
public Properties kafkaConnectionProperties() {
Properties kafkaConnectionProps = new Properties();
kafkaConnectionProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaConnectionProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
kafkaConnectionProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
kafkaConnectionProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
kafkaConnectionProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
kafkaConnectionProps.put("message.max.bytes", messageMaxBytes);
kafkaConnectionProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
kafkaConnectionProps.put(PROP_POLL_TIMEOUT, pollTimeout);
kafkaConnectionProps.put(PROP_SCHEMA_REGISTRY_URL, schemaRegistryUrl);
return kafkaConnectionProps;
}
Messaging Source and Topic
Next, one needs to construct the messaging source and add topics.
The AP topic name, name of the topic on remote broker, various other properties
are passed to the MessagingTopic
constructor as below -
@Bean
public MessagingSource kafkaMessagingSource() {
Properties kafkaConnectionProperties = kafkaConnectionConfig.kafkaConnectionProperties();
MessagingSource messagingSource = new MessagingSource("KafkaMessagingSource");
Properties tradeProps = (Properties) kafkaConnectionProperties.clone();
// All the consumers for a topic should have same group.id
tradeProps.put(ConsumerConfig.GROUP_ID_CONFIG,
tradeTopicGroupId.equals("UNIQUE") ? serviceUsername +"."+ UUID.randomUUID().toString() :
tradeTopicGroupId);
tradeProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
tradeProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class.getName());
messagingSource.addTopic(
new MessagingTopic(
TOPIC__KAFKA_MESSAGING_TRADE,
tradeKafkaBrokerTopic,
TRADE_STORE_FIELDS,
tradeProps,
tradeKafkaConsumers,
datastoreConfig.datastore(),
new String[] {DatastoreNames.TRADE_STORE_NAME},
messageConsumerPluginKey));
return messagingSource;
}
private static final List<String> TRADE_STORE_FIELDS = Arrays.asList(
DatastoreNames.TRADE_ID,
DatastoreNames.TRADE_PRODUCT_ID,
DatastoreNames.TRADE_PRODUCT_QUANTITY,
DatastoreNames.TRADE_COB_DATE
);
Messaging Consumer
Messaging consumers are the ones responsible for connecting to remote messaging system, keep listening for new data, and consume data when available.
The consumers are created when the request to start “listening” to remote topic is submitted
at runtime. The underlying Kafka / RabbitMQ client consumers are created in the constructor of
messaging consumer implementation - KafkaMessageConsumer
, RabbitMqMessageConsumer
.
The remote topics are polled for records when the consumer thread begins, i.e. inside
the Java Callable
’s call
method.