Kafka Topic

Configuration Properties

Properties for a Kafka Messaging Topic are defined by name in the dlc.messaging.kafka.topics namespace, and are picked up as configurations for the DLC.

KafkaTopicProperties

The Kafka Message Topic defines the topic itself along with the configuration of the Message Consumer and the Transaction Controller Task.

Key Required Type Default Description
name Y String SQL statement. Can accept parameters.
messaging-broker-topic Y String The Kafka Topic the KafkaConsumer should be subscribed to. This will match the Producer’s Topic.
columns List<String> Columns will implicitly come from the store which is being published to. Column coming from the Kafka Producer. You must specify all column names. The ones that do not correspond to a matching column in the datastore are skipped.
target-stores Set<String> The Topic’s name. Stores to write tuples into. If null, then it is assumed the name is the target store.
messaging-topic-config-description MessagingTopicConfigProperties Default MessagingTopicConfigProperties Performance configurations of this Topic.
messaging-consumer-properties Y Map<String, String> The configuration Properties to be passed into the KafkaConsumer.
tx-controller-task-description TxControllerTaskConfigProperties Default TxControllerTaskConfigProperties Configuration of the TxControllerTask that this Topic will use.
channels Set<Channel> A channel is created using the topic’s name as the name of the target. Channels.
restrict-to-sources Set<String> Sources to which the topic is restricted. See source to topic matching.
restrict-from-sources Set<String> Sources from which the topic restricted. See source to topic matching.

MessagingTopicConfigProperties

Defines some performance configuration properties for the message consumer.

Key Required Type Default Description
num-consumers Integer 1 Number of KafkaConsumers to open.
publish-queue-capacity Integer 100,000,000 The maximum size of the BlockingQueue shared between the KafkaMessageConsumer and TxSizeController.

TxControllerTaskConfigProperties

Defines some performance configuration properties for the message consumer.

Key Required Type Default Description
num-publishers Integer 4 Number of parallel TxSizeController.PublishTask to run. Each will publish tuples into the IMessageChannel.
publish-chunk-size Integer 100,000 Maximum number of records to publish in a single TxSizeController.PublishTask.
max-tx-drain-size Integer 400,000 Threshold to flush and commit a Datastore Transaction. Counter is reset on each Transaction.
max-tx-drain-time-ms Integer 3000 Threshold to flush and commit a Datastore Transaction. Clock is reset on each Transaction.

Map<String, String> messagingConsumerProperties

Defines properties required for the KafkaConsumer. These properties are passed directly to the KafkaConsumer. The individual properties required may vary depending on your Kafka Producer. Below are a few that may be required:

Key Required Type Default Description
bootstrap-servers Y String URL of your Kafka Producer.
key-deserializer Y String Classpath of the Deserializer to use to parse the keys of the messages from the Kafka Producer.
key-deserializer Y String Classpath of the Deserializer to use to parse the values of the messages from the Kafka Producer.
group-id Y String Group Identifier of the Kafka Producer.
auto-offset-reset Y String Kafka Consumer offset handling behavior.
offsets Map<String, String> Offset Key-Value pairs.

YAML Example

dlc:
  messaging:
    kafka:
      topics:
        MyTradeTopic:
          messaging-broker-topic: Kafka-Source-Trades
          source-fields:
            - TradeId
            - AsOfDate
          target-stores:
            - TradesBaseStore
          tx-controller-task-description:
            num-publishers: 4
            publish-chunk-size: 100000
            max-tx-drain-size: 400000
            max-tx-drain-time-ms: 3000
          messaging-topic-config-description:
            num-consumers: 1
            publish-queue-capacity: 100000000
          messaging-consumer-properties:
            bootstrap:
              servers: host:port
            key:
              deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
            value:
              deserializer: com.activeviam.io.dlc.impl.description.source.messaging.StringTupleDeserializer
            group:
              id: my-consumer-group-1
            auto:
              offset:
                reset: earliest

Java Configuration

KafkaTopicDescription Spring Beans are picked up as configurations for the DLC.

KafkaTopicDescription

The Kafka Message Topic defines the topic itself along with the configuration of the Message Consumer and the Transaction Controller Task.

Key Required Type Default Description
name Y String SQL statement. Can accept parameters.
channels Set<Channel> A channel is created using the topic’s name as the name of the target. Channels.
restrictToSources Set<String> Sources to which the topic is restricted. See source to topic matching.
restrictFromSources Set<String> Sources from which the topic restricted. See source to topic matching.
messagingBrokerTopic Y String The Kafka Topic the KafkaConsumer should be subscribed to. This will match the Producer’s Topic.
columns List<String> Columns will implicitly come from the store which is being published to. Column coming from the Kafka Producer. You must specify all column names. The ones that do not correspond to a matching column in the datastore are skipped.
targetStores Set<String> The Topic’s name. Stores to write tuples into. If null, then it is assumed the name is the target store.
messagingTopicConfigDescription KafkaTopicConfigDescription Default MessagingTopicConfigProperties Performance configurations of this Topic.
txControllerTaskDescription TxControllerTaskDescription Default TxControllerTaskConfigProperties Configuration of the TxControllerTask that this Topic will use.
messagingConsumerProperties Y Properties The configuration Properties to be passed into the KafkaConsumer.

KafkaTopicConfigDescription

Defines some performance configuration properties for the message consumer.

Key Required Type Default Description
numConsumers Integer 1 Number of KafkaConsumers to open.
publishQueueCapacity Integer 100,000,000 The maximum size of the BlockingQueue shared between the KafkaMessageConsumer and TxSizeController.

TxControllerTaskDescription

Defines some performance configuration properties for the message consumer.

Key Required Type Default Description
numPublishers Integer 4 Number of parallel TxSizeController.PublishTask to run. Each will publish tuples into the IMessageChannel.
publishChunkSize Integer 100,000 Maximum number of records to publish in a single TxSizeController.PublishTask.
maxTxDrainSize Integer 400,000 Threshold to flush and commit a Datastore Transaction. Counter is reset on each Transaction.
maxTxDrainTimeMs Integer 3000 Threshold to flush and commit a Datastore Transaction. Clock is reset on each Transaction.

Properties messagingConsumerProperties

Defines properties required for the KafkaConsumer. These properties are passed directly to the KafkaConsumer. The individual properties required may vary depending on your Kafka Producer. Below are a few that may be required:

Key Required Type Default Description
bootstrap-servers Y String URL of your Kafka Producer.
key-deserializer Y String Classpath of the Deserializer to use to parse the keys of the messages from the Kafka Producer.
key-deserializer Y String Classpath of the Deserializer to use to parse the values of the messages from the Kafka Producer.
group-id Y String Group Identifier of the Kafka Producer.
auto-offset-reset Y String Kafka Consumer offset handling behavior.
offsets Map<String, String> Offset Key-Value pairs.

Java Example

		@Bean
KafkaTopicDescription kafkaTopicDescription() {
	return KafkaTopicDescription.builder(SIMPLE_BASE_STORE_NAME)
			.messagingBrokerTopic(SIMPLE_BASE_STORE_NAME)
			.columns(List.of("idKey", "price"))
			.targetStore(SIMPLE_BASE_STORE_NAME)
			.messagingTopicConfigDescription(messagingTopicConfigDescription())
			.messagingConsumerProperties(messagingConsumerProperties())
			.txControllerTaskDescription(transactionControllerTaskDescription())
			.build();
}

@Bean
KafkaTopicConfigDescription messagingTopicConfigDescription(){
	return KafkaTopicConfigDescription.builder()
			.numConsumers(1)
			.publishQueueCapacity(100_000_000)
			.build();
}

@Bean
TxControllerTaskDescription transactionControllerTaskDescription(){
	return TxControllerTaskDescription.builder()
			.numPublishers(4)
			.publishChunkSize(100_000)
			.maxTxDrainSize(400_000)
			.maxTxDrainTimeMs(3000)
			.build();
}

@Bean
Properties messagingConsumerProperties(){
	var messagingConsumerProperties = new Properties();
	messagingConsumerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:PORT");
	messagingConsumerProperties.put(PROP_POLL_TIMEOUT_MS, "50");
	messagingConsumerProperties.put("key.deserializer", StringTupleDeserializer.class.getName());
	messagingConsumerProperties.put("value.deserializer", StringTupleDeserializer.class.getName());
	messagingConsumerProperties.put("group.id", "test-consumer-group");
	messagingConsumerProperties.put("auto.offset.reset", "earliest");
	return messagingConsumerProperties;
}