Getting Started

Supported sources

The DLC supports loading data from multiple sources into an Atoti datastore.

Source Where data is loaded from
Local CSV CSV files stored locally.
JDBC Via JDBC API.
Tuple Generated by Java code.
AWS CSV CSV files stored on Amazon S3.
Azure CSV CSV files stored on Azure Blob Storage.
GCP CSV CSV files stored on Google Cloud Storage.
Local Avro Avro files stored locally.
Local Parquet Parquet files stored locally.
Kafka A Kafka messaging producer.

Imports

Import the module(s) for your data source. Spring Auto Configuration will then load the required configuration classes.

If you are not using Spring Auto Configuration, you can manually load the configuration classes.

Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
LocalCsvConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core JDBC Dependency -->
<dependency>
    <groupId>com.activeviam.source</groupId>
    <artifactId>jdbc-source</artifactId>
    <version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
JdbcConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
TupleConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core AWS Dependency-->
<dependency>
    <groupId>com.activeviam.source</groupId>
    <artifactId>cloud-source-aws</artifactId>
    <version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
DefaultAwsClientConfig.class,
AwsCsvConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core Azure Dependency-->
<dependency>
    <groupId>com.activeviam.source</groupId>
    <artifactId>cloud-source-azure</artifactId>
    <version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
DefaultAzureClientConfig.class,
AzureCsvConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core GCP Dependency-->
<dependency>
    <groupId>com.activeviam.source</groupId>
    <artifactId>cloud-source-google</artifactId>
    <version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
DefaultGcpClientConfig.class,
GcpCsvConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
<!-- Avro Dependency-->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>${avro.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
LocalAvroConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
<!-- ActivePivot Core Parquet Dependency-->
<dependency>
    <groupId>com.activeviam</groupId>
    <artifactId>parquet-source</artifactId>
    <version>${atoti.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
LocalParquetConnectorConfig.class
Module Dependency
<dependency>
    <groupId>com.activeviam.io</groupId>
    <artifactId>data-load-controller</artifactId>
    <version>${data.connectors.version}</version>
</dependency>
<!-- Kafka Dependencies -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${kafka.version}</version>
</dependency>
Spring Auto Configuration Classes
DataLoadControllerConfig.class,
KafkaSourceConfig.class

Configuration

The DLC can be configured in Java via Spring Beans.

Most things can also be configured via Spring Boot’s Externalized Configuration, for example, using application.yml.

Sources

The first thing to configure is a source, which is a location where data is loaded from.

This is also the minimum configuration for loading data into an Atoti Server.

YAML Configuration
dlc:
  csv:
    local:
      sources:
        localCsvSource:
          root-base-dir: 'data'
          topics-to-include:
            - topic1
            - topic2
Java Configuration
@Bean
LocalCsvSourceDescription source() {
    return LocalCsvSourceDescription.builder("localCsvSource", "data").build();
}

For more information on configuration, see Local CSV Source.

YAML Configuration
dlc:
  jdbc:
    sources:
      jdbcSource1:
        url: jdbc:mysql://localhost:3306/mydb
        username: user
        password: password
        driver-class: org.h2.Driver
        topics-to-include:
          - topic1
          - topic2
Java Configuration
@Bean
JdbcSourceDescription jdbcSource() {
    return JdbcSourceDescription.builder(
                    "jdbcSource1",
                    "jdbc:mysql://localhost:3306/mydb",
                    "user",
                    "password",
                    "org.h2.Driver")
            .build();
}

For more information on configuration, see JDBC Source.

note

No configuration is required as the DLC provides a default TupleSource.

For more information on configuration, see Tuple Source.

YAML Configuration
dlc:
  csv:
    aws:
      sources:
        awsCsvSource:
          bucket: bucket
          prefix: dataDir
Java Configuration
@Bean
AwsCsvSourceDescription source() {
    return AwsCsvSourceDescription.builder(
                    "awsCsvSource",
                    "bucket",
                    "dataDir")
            .build();
}

For more information on configuration, see AWS CSV Source.

YAML Configuration
dlc:
  csv:
    azure:
      sources:
        azureCsvSource:
          container: container
          prefix: dataDir
Java Configuration
@Bean
AzureCsvSourceDescription azureCsvSource() {
    return AzureCsvSourceDescription.builder(
                    "azureCsvSource",
                    "container",
                    "dataDir")
            .build();
}

For more information on configuration, see Azure CSV Source.

YAML Configuration
dlc:
  csv:
    gcp:
      sources:
        gcpCsvSource:
          bucket: bucket
          prefix: dataDir
Java Configuration
@Bean
GcpCsvSourceDescription gcpCsvSource() {
    return GcpCsvSourceDescription.builder(
                    "gcpCsvSource",
                    "bucket",
                    "dataDir")
            .build();
}

For more information on configuration, see GCP CSV Source.

YAML Configuration
dlc:
  avro:
    local:
      sources:
        localAvro:
          root-base-dir: 'data'
          topics-to-include:
            - topic1
            - topic2
Java Configuration
@Bean
LocalAvroSourceDescription source() {
    return LocalAvroSourceDescription.builder("localAvro", "data").build();
}

For more information on configuration, see Local Avro Source.

YAML Configuration
dlc:
  parquet:
    local:
      sources:
        localParquetSource:
          root-base-dir: 'data'
          topics-to-include:
            - topic1
            - topic2
Java Configuration
@Bean
LocalParquetSourceDescription source() {
    return LocalParquetSourceDescription.builder("localParquetSource", "data").build();
}

For more information on configuration, see Local Parquet Source.

YAML Configuration
dlc:
  messaging:
    kafka:
      sources:
        MyKafkaSource:
          topics-to-exclude:
            - Topic1
            - Topic2
          topics-to-include:
            - Topic3
          accepts-topic-overrides: true
Java Configuration
@Bean
KafkaSourceDescription source(){
	return KafkaSourceDescription.builder("KafkaSource")
			.topicsToExclude(Set.of("Topic1", "Topic2"))
			.topicsToInclude(Set.of("Topic3"))
			.acceptsTopicOverrides(true)
			.build();
}

For more information on configuration, see Kafka Source.

Topics

A topic describes a specific collection of columnar data for loading into an Atoti Server.

A topic includes the source format as well as a target for the data to be loaded into, some of which is implicit in a minimal configuration.

The DLC can also create implicit topics for CSV.

The following configuration examples configure a topic called “Trades”, which loads files that match the file pattern trades*.csv.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
  csv:
    topics:
      Trades:
        file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
    return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
            .build();
}

For more information on configuration, see CSV Topic.

The following configuration examples configure a topic called “Trades”, which has the following query SELECT * FROM table1.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format (column order in the result of the SQL query and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
  jdbc:
    topics:
      Trades:
        sql: 'SELECT * FROM table1'
Java Configuration
@Bean
public JdbcTopicDescription trades() {
    return JdbcTopicDescription.builder("Trades", "SELECT * FROM table1")
            .build();
}

For more information on configuration, see JDBC Topic.

This configuration specifies a tuple generator which produces tuples (an array of objects) which match the store format.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format (column order of the tuples and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration

You can not define a tuple topic in Yaml.

Java Configuration
@Bean
public TupleTopicDescription tupleTopic() {
    return TupleTopicDescription.builder("Trades", this::generateTuples)
            .build();
}

For more information on configuration, see Tuple Topic.

The following configuration examples configure a topic called “Trades”, which loads files that match the file pattern trades*.csv.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
  csv:
    topics:
      Trades:
        file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
    return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
            .build();
}

For more information on configuration, see CSV Topic.

The following configuration examples configure a topic called “Trades”, which loads files that match the file pattern trades*.csv.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
  csv:
    topics:
      Trades:
        file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
    return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
            .build();
}

For more information on configuration, see CSV Topic.

The following configuration examples configure a topic called “Trades”, which loads files that match the file pattern trades*.csv.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format (column order and how input fields are parsed) implicitly comes from the columns and types of the targeted Atoti table, in this case “Trades”.
YAML Configuration
dlc:
  csv:
    topics:
      Trades:
        file-pattern: 'glob:trades*.csv'
Java Configuration
@Bean
public CsvTopicDescription trades() {
    return CsvTopicDescription.builder("Trades", "glob:trades*.csv")
            .build();
}

For more information on configuration, see CSV Topic.

The following configuration examples configure a topic called “Trades”, which loads files that match the file pattern trades*.avro.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format comes from the schema file specified in the configuration.
YAML Configuration
dlc:
  avro:
    topics:
      Trades:
        file-pattern: glob:trades*.avro
        schema-path: schemas/schema.avsc
Java Configuration
@Bean
public AvroTopicDescription trades() {
    return AvroTopicDescription.builder("Trades", "glob:trades*.avro", "schemas/schema.avsc")
            .build();
}

For more information on configuration, see Avro Topic.

The following configuration examples configure a topic called “Trades”, which loads files that match the file pattern trades*.parquet.

This configuration does not specify a target for the data or the data format.

note

If you do not name your topic the same as your store you will have to explicitly specify the target in the channel of the topic.

  • The target is implicitly the Atoti table named “Trades”.
  • The format implicitly comes from the columns of the Atoti table.
YAML Configuration
dlc:
  parquet:
    topics:
      Trades:
        file-pattern: glob:trades*.parquet
Java Configuration
@Bean
public ParquetTopicDescription trades() {
    return ParquetTopicDescription.builder("Trades", "glob:trades*.parquet")
            .build();
}

For more information on configuration, see Parquet Topic.

The following configuration examples configure a topic called “MyTradeTopic”, which will listen for messages from a Kafa Producer that is located on the same host machine at port 9092.

Kafka Consumer Configuration

The KafkaConsumer requires some configuration properties via a Properties object. These configuration properties are specified in the messagingConsumerProperties and are passed directly on to the KafkaConsumer.

The following configuration examples configure some of the required properties. Please be aware that your Kafka producer may require additional configuration properties in order to connect.

YAML Configuration

We can configure just the minimal properties and allow the Data Load Controller to infer the rest from the Datastore.

dlc:
  messaging:
    kafka:
      topics:
        MyTradeTopic:
          messaging-broker-topic: Kafka-Source-Trades
          messaging-consumer-properties:
            bootstrap:
              servers: localhost:9092
            key:
              deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
            value:
              deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
            group:
              id: my-consumer-group-1
            auto:
              offset:
                reset: earliest
Java Configuration
@Bean
KafkaTopicDescription kafkaTopicDescription() {
	return KafkaTopicDescription.builder(SIMPLE_BASE_STORE_NAME)
			.messagingBrokerTopic(SIMPLE_BASE_STORE_NAME)
			.messagingConsumerProperties(messagingConsumerProperties())
			.build();
}

@Bean
Properties messagingConsumerProperties(){
	var messagingConsumerProperties = new Properties();
	messagingConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	messagingConsumerProperties.put(PROP_POLL_TIMEOUT_MS, "50");
	messagingConsumerProperties.put("key.deserializer", StringTupleDeserializer.class.getName());
	messagingConsumerProperties.put("value.deserializer", StringTupleDeserializer.class.getName());
	messagingConsumerProperties.put("group.id", "test-consumer-group");
	messagingConsumerProperties.put("auto.offset.reset", "earliest");
	return messagingConsumerProperties;
}

For more information on configuration and additional properties, see Kafka Topic.

Aliases

Within the DLC you can define aliases for topics. This allows you to group multiple topics for loading and unloading.

YAML Configuration
dlc:
  aliases:
    alias:
      - trades
      - sensitivities
Java Configuration
@Bean
AliasesDescription aliases() {
    var aliases = new AliasesDescription();
    aliases.put("alias", Set.of("Trades", "sensitivities"));
    return aliases;
}

For more information on configuration, see Aliases.

DLC requests

Loading and unloading is initiated by sending requests to the DLC.

The DLC provides APIs for Java requests as well as REST requests.

Requests contain an operation and a list of topics or aliases.

Operations

The DLC comes with the following operations:

  • LOAD - Loads data into the datastore.
  • UNLOAD - Unloads data from the datastore.
  • START_LISTEN - Starts listening for files, triggering a load operation when a file is detected.
  • LISTEN_STATUS - Gets the status of a listener,
  • STOP_LISTEN - Stops listening for files.

For more information, see DLC Operations.

Request

A request contains an operation and a list of topics or aliases.

Additionally, a scope can be provided to a DLC request.

The request can also override existing topic configurations. Read More

Example: Load a topic

Given the above Local Csv configuration of the localCsvSource source and trades topic, the following request will load the data/trades1.csv and data/trades2.csvfiles into the trades store.

POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json

{
  "operation": "LOAD",
  "topics": [ "Trades" ]
}
DlcLoadRequest.builder()
	.topics("Trades")
	.build();

Example: Start Listening on a topic

Given the above Local Csv configuration of the localCsvSource source and trades topic, the following request will start listening for file changes on the data/trades1.csv and data/trades2.csv files and load any changes into the trades store.

POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json

{
  "operation": "START_LISTEN",
  "topics": [ "Trades" ]
}
DlcStartListenRequest.builder()
		.listenId("Trades")
		.build();

Example: Stop Listening

In the above example we have started listening on a topic, this operation will give us a map (String to String) of topic -> listenId. We can cancel a listener by its listenId. The listenId is a unique hex string.

For the below examples, we will say that our “Trades” topic has a listenId of 5F38A0.

POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json

{
  "operation": "STOP_LISTEN",
  "topics": [ "5F38A0" ]
}
DlcStopListenRequest.builder()
		.listenId("5F38A0")
		.build();

Example: Unload a Store

Data can be unloaded from the application on a store-by-store basis. The default implicit unload topics created will map the provided scope to store fields and match for the provided values.

The following request will unload all facts from the trades store where the AsOfDate field is equal to 2024-12-12.

POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json

{
  "operation": "UNLOAD",
  "topics": [ "Trades" ]
  "scope": {
    "AsOfDate": "2024-12-12"
  }
}
DlcUnloadRequest.builder()
    .topics("Trades")
    .scope(DlcScope.of("AsOfDate", LocalDate.of(2021, 12, 12)))
    .build()

Example: Initial Load

This is an example configuration class which configures data loading on application startup.

@Configuration
public class InitialDataLoadConfig {

    @Bean
    public ApplicationRunner initialConfigDataLoad(IDataLoadControllerService dlc) {
    	return args -> {
    		dlc.execute(
					DlcLoadRequest.builder()
    						.topics("Trades")
    						.build()
    		);
    	};
    }
}

Load into a Branch

The DLC provides the ability to load directly into a specific branch. This can be done by specifying a branch in the DlcLoadRequest.

The DLC opens transaction on a specific branch through the transaction manager. Atoti will automatically create the CustomBranchToLoadInto if it does not exist:

note

The DLC does not manage deletion of branches.

var properties = new Properties();
properties.setProperty(ITransactionManager.BRANCH, "CustomBranchToLoadInto");
datastore.getTransactionManager().startTransaction(properties);
POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json

{
  "operation": "LOAD",
  "topics": [ "Trades" ],
  "branch": "CustomBranchToLoadInto"
}
DlcLoadRequest.builder()
    .topics(Set.of("Trades"))
    .branch("CustomBranchToLoadInto")
    .build();

Request Time Overrides

We can override parts of a Topic’s configuration at request time by defining the csvTopicOverrides and/or jdbcTopiOverrides in the request. The topic properties will override the existing topic. It is also possible to define an entirely new topic at request time by providing a topicName that does not already exist nor match any datastore name.

All Topic Overrides will only exist for the duration of the request and will not be persisted.

A source can be configured to allow or disallow the use of Topic Overrides.

Given the above trades topic, we can override it at request time to specify a new pathMatcher to use. The following request will load the data/alternative_trades1.csv and data/alternative_trades2.csvfiles into the trades store.

POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json

{
    "operation": "LOAD",
    "csvTopicOverrides": {
        "newTopic": {
            "filePattern": "glob:alternative_trades*.csv"
        }
    }
}
DlcLoadRequest.builder()
    .topicDescriptions(Set.of(
		    CsvTopicDescription.builder("Trades", "glob:alternative_trades*.csv")
            .build()
    ))
    .build();
Example: New Load Topic

Here we will define a new load topic.

We will define our new topic to load a file into the trades store. Since we are not overriding an existing topic, we need to define all aspects of the topic. We can re-use targets and other pre-defined components in a new topic.

The name we provide for the topic does not matter and will not persist.

POST https://<hostname>:<port>/<app-context>/connectors/rest/dlc/v2
Content-Type: application/json

{
    "operation": "LOAD",
    "csvTopicOverrides": {
        "newTopic": {
            "filePattern": "glob:alternative_trades*.csv",
            "channels": [
                {
                    "targetName": "Trades"
                }
            ]
        }
    }
}
DlcLoadRequest.builder()
		.topicOverrides(Set.of(
				CsvTopicDescription.builder("NewTopic", "glob:alternative_trades*.csv")
						.channel(ChannelDescription.builder(
										AnonymousTargetDescription.of(
												"Trades",
												scope -> new TuplePublisher<>(datastore, "Trades")
										)
								).build()
						)
						.build()
		))
		.build();

Next Steps

The DLC offers more configurations than are covered in this guide. To learn more, read the following sections:

  • Channels - for connecting topics to targets and column calculators
  • ParserOverrides - for overriding the default parser behavior
  • Custom Fields - for enriching data during loading
  • Targets - for defining where data is loaded to and using tuple publishers
  • Custom Topic Ordering - for enforcing the order in which certain topics are processed