Kafka Topic
Configuration Properties
Properties for a Kafka Messaging Topic are defined by name in the dlc.messaging.kafka.topics
namespace,
and are picked up as configurations for the DLC.
KafkaTopicProperties
The Kafka Message Topic defines the topic itself along with the configuration of the Message Consumer and the Transaction Controller Task.
Key | Required | Type | Default | Description |
---|---|---|---|---|
name |
Y | String |
SQL statement. Can accept parameters. | |
messaging-broker-topic |
Y | String |
The Kafka Topic the KafkaConsumer should be subscribed to. This will match the Producer’s Topic. |
|
columns |
List<String> |
Columns will implicitly come from the store which is being published to. | Column coming from the Kafka Producer. You must specify all column names. The ones that do not correspond to a matching column in the datastore are skipped. | |
target-stores |
Set<String> |
The Topic’s name . |
Stores to write tuples into. If null, then it is assumed the name is the target store. |
|
messaging-topic-config-description |
MessagingTopicConfigProperties |
Default MessagingTopicConfigProperties |
Performance configurations of this Topic. | |
messaging-consumer-properties |
Y | Map<String, String> |
The configuration Properties to be passed into the KafkaConsumer . |
|
tx-controller-task-description |
TxControllerTaskConfigProperties |
Default TxControllerTaskConfigProperties |
Configuration of the TxControllerTask that this Topic will use. |
|
channels |
Set<Channel> |
A channel is created using the topic’s name as the name of the target. | Channels. | |
restrict-to-sources |
Set<String> |
Sources to which the topic is restricted. See source to topic matching. | ||
restrict-from-sources |
Set<String> |
Sources from which the topic restricted. See source to topic matching. |
MessagingTopicConfigProperties
Defines some performance configuration properties for the message consumer.
Key | Required | Type | Default | Description |
---|---|---|---|---|
num-consumers |
Integer |
1 |
Number of KafkaConsumer s to open. |
|
publish-queue-capacity |
Integer |
100,000,000 |
The maximum size of the BlockingQueue shared between the KafkaMessageConsumer and TxSizeController . |
TxControllerTaskConfigProperties
Defines some performance configuration properties for the message consumer.
Key | Required | Type | Default | Description |
---|---|---|---|---|
num-publishers |
Integer |
4 |
Number of parallel TxSizeController.PublishTask to run. Each will publish tuples into the IMessageChannel . |
|
publish-chunk-size |
Integer |
100,000 |
Maximum number of records to publish in a single TxSizeController.PublishTask . |
|
max-tx-drain-size |
Integer |
400,000 |
Threshold to flush and commit a Datastore Transaction. Counter is reset on each Transaction. | |
max-tx-drain-time-ms |
Integer |
3000 |
Threshold to flush and commit a Datastore Transaction. Clock is reset on each Transaction. |
Map<String, String> messagingConsumerProperties
Defines properties required for the KafkaConsumer
. These properties are passed directly to the KafkaConsumer
.
The individual properties required may vary depending on your Kafka Producer. Below are a few that may be required:
Key | Required | Type | Default | Description |
---|---|---|---|---|
bootstrap-servers |
Y | String |
URL of your Kafka Producer. | |
key-deserializer |
Y | String |
Classpath of the Deserializer to use to parse the keys of the messages from the Kafka Producer. | |
key-deserializer |
Y | String |
Classpath of the Deserializer to use to parse the values of the messages from the Kafka Producer. | |
group-id |
Y | String |
Group Identifier of the Kafka Producer. | |
auto-offset-reset |
Y | String |
Kafka Consumer offset handling behavior. | |
offsets |
Map<String, String> |
Offset Key-Value pairs. |
YAML Example
dlc:
messaging:
kafka:
topics:
MyTradeTopic:
messaging-broker-topic: Kafka-Source-Trades
source-fields:
- TradeId
- AsOfDate
target-stores:
- TradesBaseStore
tx-controller-task-description:
num-publishers: 4
publish-chunk-size: 100000
max-tx-drain-size: 400000
max-tx-drain-time-ms: 3000
messaging-topic-config-description:
num-consumers: 1
publish-queue-capacity: 100000000
messaging-consumer-properties:
bootstrap:
servers: host:port
key:
deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
value:
deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
group:
id: my-consumer-group-1
auto:
offset:
reset: earliest
Java Configuration
KafkaTopicDescription
Spring Beans are picked up as configurations for the DLC.
KafkaTopicDescription
The Kafka Message Topic defines the topic itself along with the configuration of the Message Consumer and the Transaction Controller Task.
Key | Required | Type | Default | Description |
---|---|---|---|---|
name |
Y | String |
SQL statement. Can accept parameters. | |
channels |
Set<Channel> |
A channel is created using the topic’s name as the name of the target. | Channels. | |
restrictToSources |
Set<String> |
Sources to which the topic is restricted. See source to topic matching. | ||
restrictFromSources |
Set<String> |
Sources from which the topic restricted. See source to topic matching. | ||
messagingBrokerTopic |
Y | String |
The Kafka Topic the KafkaConsumer should be subscribed to. This will match the Producer’s Topic. |
|
columns |
List<String> |
Columns will implicitly come from the store which is being published to. | Column coming from the Kafka Producer. You must specify all column names. The ones that do not correspond to a matching column in the datastore are skipped. | |
targetStores |
Set<String> |
The Topic’s name . |
Stores to write tuples into. If null, then it is assumed the name is the target store. |
|
messagingTopicConfigDescription |
KafkaTopicConfigDescription |
Default MessagingTopicConfigProperties |
Performance configurations of this Topic. | |
txControllerTaskDescription |
TxControllerTaskDescription |
Default TxControllerTaskConfigProperties |
Configuration of the TxControllerTask that this Topic will use. |
|
messagingConsumerProperties |
Y | Properties |
The configuration Properties to be passed into the KafkaConsumer . |
KafkaTopicConfigDescription
Defines some performance configuration properties for the message consumer.
Key | Required | Type | Default | Description |
---|---|---|---|---|
numConsumers |
Integer |
1 |
Number of KafkaConsumer s to open. |
|
publishQueueCapacity |
Integer |
100,000,000 |
The maximum size of the BlockingQueue shared between the KafkaMessageConsumer and TxSizeController . |
TxControllerTaskDescription
Defines some performance configuration properties for the message consumer.
Key | Required | Type | Default | Description |
---|---|---|---|---|
numPublishers |
Integer |
4 |
Number of parallel TxSizeController.PublishTask to run. Each will publish tuples into the IMessageChannel . |
|
publishChunkSize |
Integer |
100,000 |
Maximum number of records to publish in a single TxSizeController.PublishTask . |
|
maxTxDrainSize |
Integer |
400,000 |
Threshold to flush and commit a Datastore Transaction. Counter is reset on each Transaction. | |
maxTxDrainTimeMs |
Integer |
3000 |
Threshold to flush and commit a Datastore Transaction. Clock is reset on each Transaction. |
Properties messagingConsumerProperties
Defines properties required for the KafkaConsumer
. These properties are passed directly to the KafkaConsumer
.
The individual properties required may vary depending on your Kafka Producer. Below are a few that may be required:
Key | Required | Type | Default | Description |
---|---|---|---|---|
bootstrap-servers |
Y | String |
URL of your Kafka Producer. | |
key-deserializer |
Y | String |
Classpath of the Deserializer to use to parse the keys of the messages from the Kafka Producer. | |
key-deserializer |
Y | String |
Classpath of the Deserializer to use to parse the values of the messages from the Kafka Producer. | |
group-id |
Y | String |
Group Identifier of the Kafka Producer. | |
auto-offset-reset |
Y | String |
Kafka Consumer offset handling behavior. | |
offsets |
Map<String, String> |
Offset Key-Value pairs. |
Java Example
@Bean
KafkaTopicDescription kafkaTopicDescription() {
return KafkaTopicDescription.builder(SIMPLE_BASE_STORE_NAME)
.messagingBrokerTopic(SIMPLE_BASE_STORE_NAME)
.columns(List.of("idKey", "price"))
.targetStore(SIMPLE_BASE_STORE_NAME)
.messagingTopicConfigDescription(messagingTopicConfigDescription())
.messagingConsumerProperties(messagingConsumerProperties())
.txControllerTaskDescription(transactionControllerTaskDescription())
.build();
}
@Bean
KafkaTopicConfigDescription messagingTopicConfigDescription(){
return KafkaTopicConfigDescription.builder()
.numConsumers(1)
.publishQueueCapacity(100_000_000)
.build();
}
@Bean
TxControllerTaskDescription transactionControllerTaskDescription(){
return TxControllerTaskDescription.builder()
.numPublishers(4)
.publishChunkSize(100_000)
.maxTxDrainSize(400_000)
.maxTxDrainTimeMs(3000)
.build();
}
@Bean
Properties messagingConsumerProperties(){
var messagingConsumerProperties = new Properties();
messagingConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:PORT");
messagingConsumerProperties.put(PROP_POLL_TIMEOUT_MS, "50");
messagingConsumerProperties.put("key.deserializer", StringTupleDeserializer.class.getName());
messagingConsumerProperties.put("value.deserializer", StringTupleDeserializer.class.getName());
messagingConsumerProperties.put("group.id", "test-consumer-group");
messagingConsumerProperties.put("auto.offset.reset", "earliest");
return messagingConsumerProperties;
}