Getting Started
Supported sources
The DLC supports loading data from multiple sources into an Atoti datastore.
Source | Where data is loaded from |
---|---|
Local CSV | CSV files stored locally. |
JDBC | Via JDBC API. |
Tuple | Generated by Java code. |
AWS CSV | CSV files stored on Amazon S3. |
Azure CSV | CSV files stored on Azure Blob Storage. |
GCP CSV | CSV files stored on Google Cloud Storage. |
Local Avro | Avro files stored locally. |
Local Parquet | Parquet files stored locally. |
Kafka | A Kafka messaging producer. |
Imports
Import the module(s) for your data source. Spring Auto Configuration will then load the required configuration classes.
If you are not using Spring Auto Configuration, you can manually load the configuration classes.
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
LocalCsvConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core JDBC Dependency -->
<dependency>
<groupId>com.activeviam.source</groupId>
<artifactId>jdbc-source</artifactId>
<version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
JdbcConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
TupleConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core AWS Dependency-->
<dependency>
<groupId>com.activeviam.source</groupId>
<artifactId>cloud-source-aws</artifactId>
<version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
DefaultAwsClientConfig.class,
AwsCsvConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core Azure Dependency-->
<dependency>
<groupId>com.activeviam.source</groupId>
<artifactId>cloud-source-azure</artifactId>
<version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
DefaultAzureClientConfig.class,
AzureCsvConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core GCP Dependency-->
<dependency>
<groupId>com.activeviam.source</groupId>
<artifactId>cloud-source-google</artifactId>
<version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
DefaultGcpClientConfig.class,
GcpCsvConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
<!-- Avro Dependency-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
LocalAvroConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core Parquet Dependency-->
<dependency>
<groupId>com.activeviam</groupId>
<artifactId>parquet-source</artifactId>
<version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
LocalParquetConnectorConfig.class
Module Dependency
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-load-controller</artifactId>
<version>${data.connectors.version}</version>
</dependency>
<!-- Kafka Dependencies -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
KafkaSourceConfig.class
Configuration
The DLC can be configured in Java via Spring Beans.
Most things can also be configured via Spring Boot’s Externalized Configuration,
for example, using application.yml
.
Sources
The first thing to configure is a source, which is a location where data is loaded from.
This is also the minimum configuration for loading data into an Atoti Server.
YAML Configuration
dlc:
csv:
local:
sources:
localCsvSource:
root-base-dir: 'data'
topics-to-include:
- topic1
- topic2
Java Configuration
@Bean
LocalCsvSourceDescription source() {
return LocalCsvSourceDescription.builder("localCsvSource", "data").build();
}
For more information on configuration, see Local CSV Source.
YAML Configuration
dlc:
jdbc:
sources:
jdbcSource1:
url: jdbc:mysql://localhost:3306/mydb
username: user
password: password
driver-class: org.h2.Driver
topics-to-include:
- topic1
- topic2
Java Configuration
@Bean
JdbcSourceDescription jdbcSource() {
return JdbcSourceDescription.builder(
"jdbcSource1",
"jdbc:mysql://localhost:3306/mydb",
"user",
"password",
"org.h2.Driver")
.build();
}
For more information on configuration, see JDBC Source.
note
No configuration is required as the DLC provides a default TupleSource.
For more information on configuration, see Tuple Source.
YAML Configuration
dlc:
csv:
aws:
sources:
awsCsvSource:
bucket: bucket
prefix: dataDir
Java Configuration
@Bean
AwsCsvSourceDescription source() {
return AwsCsvSourceDescription.builder(
"awsCsvSource",
"bucket",
"dataDir")
.build();
}
For more information on configuration, see AWS CSV Source.
YAML Configuration
dlc:
csv:
azure:
sources:
azureCsvSource:
container: container
prefix: dataDir
Java Configuration
@Bean
AzureCsvSourceDescription azureCsvSource() {
return AzureCsvSourceDescription.builder(
"azureCsvSource",
"container",
"dataDir")
.build();
}
For more information on configuration, see Azure CSV Source.
YAML Configuration
dlc:
csv:
gcp:
sources:
gcpCsvSource:
bucket: bucket
prefix: dataDir
Java Configuration
@Bean
GcpCsvSourceDescription gcpCsvSource() {
return GcpCsvSourceDescription.builder(
"gcpCsvSource",
"bucket",
"dataDir")
.build();
}
For more information on configuration, see GCP CSV Source.
YAML Configuration
dlc:
avro:
local:
sources:
localAvro:
root-base-dir: 'data'
topics-to-include:
- topic1
- topic2
Java Configuration
@Bean
LocalAvroSourceDescription source() {
return LocalAvroSourceDescription.builder("localAvro", "data").build();
}
For more information on configuration, see Local Avro Source.
YAML Configuration
dlc:
parquet:
local:
sources:
localParquetSource:
root-base-dir: 'data'
topics-to-include:
- topic1
- topic2
Java Configuration
@Bean
LocalParquetSourceDescription source() {
return LocalParquetSourceDescription.builder("localParquetSource", "data").build();
}
For more information on configuration, see Local Parquet Source.
YAML Configuration
dlc:
messaging:
kafka:
sources:
MyKafkaSource:
topics-to-exclude:
- Topic1
- Topic2
topics-to-include:
- Topic3
accepts-topic-overrides: true
Java Configuration
@Bean
KafkaSourceDescription source(){
return KafkaSourceDescription.builder("KafkaSource")
.topicsToExclude(Set.of("Topic1", "Topic2"))
.topicsToInclude(Set.of("Topic3"))
.acceptsTopicOverrides(true)
.build();
}
For more information on configuration, see Kafka Source.
Topics
A topic describes a specific collection of columnar data for loading into an Atoti Server.
A topic includes the source format as well as a target for the data to be loaded into, some of which is implicit in a minimal configuration.
The DLC can also create implicit topics for CSV.
The following configuration examples configure a topic called “Trades”,
which loads files that match the file pattern trades*.csv
.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
csv:
topics:
Trades:
file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
.build();
}
For more information on configuration, see CSV Topic.
The following configuration examples configure a topic called “Trades”,
which has the following query SELECT * FROM table1
.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format (column order in the result of the SQL query and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
jdbc:
topics:
Trades:
sql: 'SELECT * FROM table1'
Java Configuration
@Bean
public JdbcTopicDescription trades() {
return JdbcTopicDescription.builder("Trades", "SELECT * FROM table1")
.build();
}
For more information on configuration, see JDBC Topic.
This configuration specifies a tuple generator which produces tuples (an array of objects) which match the store format.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format (column order of the tuples and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
You can not define a tuple topic in Yaml.
Java Configuration
@Bean
public TupleTopicDescription tupleTopic() {
return TupleTopicDescription.builder("Trades", this::generateTuples)
.build();
}
For more information on configuration, see Tuple Topic.
The following configuration examples configure a topic called “Trades”,
which loads files that match the file pattern trades*.csv
.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
csv:
topics:
Trades:
file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
.build();
}
For more information on configuration, see CSV Topic.
The following configuration examples configure a topic called “Trades”,
which loads files that match the file pattern trades*.csv
.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
csv:
topics:
Trades:
file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
.build();
}
For more information on configuration, see CSV Topic.
The following configuration examples configure a topic called “Trades”,
which loads files that match the file pattern trades*.csv
.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
csv:
topics:
Trades:
file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
.build();
}
For more information on configuration, see CSV Topic.
The following configuration examples configure a topic called “Trades”,
which loads files that match the file pattern trades*.avro
.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format comes from the schema file specified in the configuration.
YAML Configuration
dlc:
avro:
topics:
Trades:
file-pattern: glob:trades*.avro
schema-path: schemas/schema.avsc
Java Configuration
@Bean
public AvroTopicDescription trades() {
return AvroTopicDescription.builder("Trades", "glob:trades*.avro", "schemas/schema.avsc")
.build();
}
For more information on configuration, see Avro Topic.
The following configuration examples configure a topic called “Trades”,
which loads files that match the file pattern trades*.parquet
.
This configuration does not specify a target for the data or the data format.
note
If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.
- The target is implicitly the Atoti table named “Trades”.
- The format implicitly comes from the columns of the Atoti table.
YAML Configuration
dlc:
parquet:
topics:
Trades:
file-pattern: glob:trades*.parquet
Java Configuration
@Bean
public ParquetTopicDescription trades() {
return ParquetTopicDescription.builder("Trades", "glob:trades*.parquet")
.build();
}
For more information on configuration, see Parquet Topic.
The following configuration examples configure a topic called “MyTradeTopic”, which will listen for messages from a Kafa Producer that is located on the same host machine at port 9092.
Kafka Consumer Configuration
The KafkaConsumer
requires some configuration properties via a Properties
object. These configuration properties
are specified in the messagingConsumerProperties
and are passed directly on to the KafkaConsumer
.
The following configuration examples configure some of the required properties. Please be aware that your Kafka producer may require additional configuration properties in order to connect.
YAML Configuration
We can configure just the minimal properties and allow the Data Load Controller to infer the rest from the Datastore.
dlc:
messaging:
kafka:
topics:
MyTradeTopic:
messaging-broker-topic: Kafka-Source-Trades
messaging-consumer-properties:
bootstrap:
servers: localhost:9092
key:
deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
value:
deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
group:
id: my-consumer-group-1
auto:
offset:
reset: earliest
Java Configuration
@Bean
KafkaTopicDescription kafkaTopicDescription() {
return KafkaTopicDescription.builder(SIMPLE_BASE_STORE_NAME)
.messagingBrokerTopic(SIMPLE_BASE_STORE_NAME)
.messagingConsumerProperties(messagingConsumerProperties())
.build();
}
@Bean
Properties messagingConsumerProperties(){
var messagingConsumerProperties = new Properties();
messagingConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
messagingConsumerProperties.put(PROP_POLL_TIMEOUT_MS, "50");
messagingConsumerProperties.put("key.deserializer", StringTupleDeserializer.class.getName());
messagingConsumerProperties.put("value.deserializer", StringTupleDeserializer.class.getName());
messagingConsumerProperties.put("group.id", "test-consumer-group");
messagingConsumerProperties.put("auto.offset.reset", "earliest");
return messagingConsumerProperties;
}
For more information on configuration and additional properties, see Kafka Topic.
Aliases
Within the DLC you can define aliases for topics. This allows you to group multiple topics for loading and unloading.
YAML Configuration
dlc:
aliases:
alias:
- trades
- sensitivities
Java Configuration
@Bean
AliasesDescription aliases() {
var aliases = new AliasesDescription();
aliases.put("alias", Set.of("Trades", "sensitivities"));
return aliases;
}
For more information on configuration, see Aliases.
DLC requests
Loading and unloading is initiated by sending requests to the DLC.
The DLC provides APIs for Java requests as well as REST requests.
Requests contain an operation and a list of topics or aliases.
Operations
The DLC comes with the following operations:
- LOAD - Loads data into the datastore.
- UNLOAD - Unloads data from the datastore.
- START_LISTEN - Starts listening for files, triggering a load operation when a file is detected.
- LISTEN_STATUS - Gets the status of a listener,
- STOP_LISTEN - Stops listening for files.
For more information, see DLC Operations.
Request
A request contains an operation and a list of topics or aliases.
Additionally, a scope can be provided to a DLC request.
The request can also override existing topic configurations. Read More
Example: Load a topic
Given the above Local Csv
configuration of the localCsvSource
source and trades
topic, the following request will load the data/trades1.csv
and data/trades2.csv
files into the trades
store.
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json
{
"operation": "LOAD",
"topics": [ "Trades" ]
}
DlcLoadRequest.builder()
.topics("Trades")
.build();
Example: Start Listening on a topic
Given the above Local Csv
configuration of the localCsvSource
source and trades
topic, the following request will
start listening for file changes on the data/trades1.csv
and data/trades2.csv
files and load any changes into the
trades
store.
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json
{
"operation": "START_LISTEN",
"topics": [ "Trades" ]
}
DlcStartListenRequest.builder()
.listenId("Trades")
.build();
Example: Stop Listening
In the above example we have started listening on a topic, this operation will give us a map (String
to String
)
of topic
-> listenId
. We can cancel a listener by its listenId
. The listenId
is a unique hex string.
For the below examples, we will say that our “Trades” topic has a listenId
of 5F38A0
.
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json
{
"operation": "STOP_LISTEN",
"topics": [ "5F38A0" ]
}
DlcStopListenRequest.builder()
.listenId("5F38A0")
.build();
Example: Unload a Store
Data can be unloaded from the application on a store-by-store basis. The default implicit unload topics created will map the provided scope to store fields and match for the provided values.
The following request will unload all facts from the trades
store where the AsOfDate
field is equal to 2024-12-12
.
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json
{
"operation": "UNLOAD",
"topics": [ "Trades" ]
"scope": {
"AsOfDate": "2024-12-12"
}
}
DlcUnloadRequest.builder()
.topics("Trades")
.scope(DlcScope.of("AsOfDate", LocalDate.of(2021, 12, 12)))
.build()
Example: Initial Load
This is an example configuration class which configures data loading on application startup.
@Configuration
public class InitialDataLoadConfig {
@Bean
public ApplicationRunner initialConfigDataLoad(IDataLoadControllerService dlc) {
return args -> {
dlc.execute(
DlcLoadRequest.builder()
.topics("Trades")
.build()
);
};
}
}
Load into a Branch
The DLC provides the ability to load directly into a specific branch. This can be done by specifying a branch in the
DlcLoadRequest
.
The DLC opens transaction on a specific branch through the transaction manager. Atoti will automatically create the
CustomBranchToLoadInto
if it does not exist:
note
The DLC does not manage deletion of branches.
var properties = new Properties();
properties.setProperty(ITransactionManager.BRANCH, "CustomBranchToLoadInto");
datastore.getTransactionManager().startTransaction(properties);
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json
{
"operation": "LOAD",
"topics": [ "Trades" ],
"branch": "CustomBranchToLoadInto"
}
DlcLoadRequest.builder()
.topics(Set.of("Trades"))
.branch("CustomBranchToLoadInto")
.build();
Request Time Overrides
We can override parts of a Topic’s configuration at request time by defining the csvTopicOverrides
and/or jdbcTopiOverrides
in the request. The topic properties will override the existing topic. It is also possible to define an entirely new topic at request time by providing a topicName that does not already exist nor match any datastore name.
All Topic Overrides will only exist for the duration of the request and will not be persisted.
A source can be configured to allow or disallow the use of Topic Overrides.
Given the above trades
topic, we can override it at request time to specify a new pathMatcher
to use.
The following request will load the data/alternative_trades1.csv
and data/alternative_trades2.csv
files into the
trades
store.
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json
{
"operation": "LOAD",
"csvTopicOverrides": {
"newTopic": {
"filePattern": "glob:alternative_trades*.csv"
}
}
}
DlcLoadRequest.builder()
.topicDescriptions(Set.of(
CsvTopicDescription.builder("Trades", "glob:alternative_trades*.csv")
.build()
))
.build();
Example: New Load Topic
Here we will define a new load topic.
We will define our new topic to load a file into the trades
store. Since we are not overriding an existing topic, we need to define all aspects of the topic. We can re-use targets and other pre-defined components in a new topic.
The name we provide for the topic does not matter and will not persist.
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json
{
"operation": "LOAD",
"csvTopicOverrides": {
"newTopic": {
"filePattern": "glob:alternative_trades*.csv",
"channels": [
{
"targetName": "Trades"
}
]
}
}
}
DlcLoadRequest.builder()
.topicOverrides(Set.of(
CsvTopicDescription.builder("NewTopic", "glob:alternative_trades*.csv")
.channel(ChannelDescription.builder(
AnonymousTargetDescription.of(
"Trades",
scope -> new TuplePublisher<>(datastore, "Trades")
)
).build()
)
.build()
))
.build();
Next Steps
The DLC offers more configurations than are covered in this guide. To learn more, read the following sections:
- Channels - for connecting topics to targets and column calculators
- ParserOverrides - for overriding the default parser behavior
- Custom Fields - for enriching data during loading
- Targets - for defining where data is loaded to and using tuple publishers
- Custom Topic Ordering - for enforcing the order in which certain topics are processed