Kafka
Kafka Source Configuration
Kafka Source allows for streaming of data from a Kafka source.
Maven Dependency
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- For Kafka Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-kafka</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Connection Properties
The connection properties can be wrapped in a Properties
object
@Bean
public Properties kafkaConnectionProperties() {
Properties kafkaConnectionProps = new Properties();
kafkaConnectionProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaConnectionProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
kafkaConnectionProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
kafkaConnectionProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
kafkaConnectionProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
kafkaConnectionProps.put("message.max.bytes", messageMaxBytes);
kafkaConnectionProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
kafkaConnectionProps.put(PROP_POLL_TIMEOUT, pollTimeout);
kafkaConnectionProps.put(PROP_SCHEMA_REGISTRY_URL, schemaRegistryUrl);
return kafkaConnectionProps;
}
Source Configuration
MessagingSource
currently supports loading from Apache Kafka and RabbitMQ out of the box.
To load data from a different messaging system, an implementation of IMessagingConsumer
needs to be provided. The Following section shows how to configure the source and topic
for such a system, via Spring config.
Below is an example:
@Bean
public MessagingSource kafkaMessagingSource() {
Properties kafkaConnectionProperties = kafkaConnectionConfig.kafkaConnectionProperties();
MessagingSource messagingSource = new MessagingSource("KafkaMessagingSource");
Properties tradeProps = (Properties) kafkaConnectionProperties.clone();
// All the consumers for a topic should have same group.id
tradeProps.put(ConsumerConfig.GROUP_ID_CONFIG,
tradeTopicGroupId.equals("UNIQUE") ? serviceUsername +"."+ UUID.randomUUID().toString() :
tradeTopicGroupId);
tradeProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
tradeProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class.getName());
messagingSource.addTopic(
new MessagingTopic(
TOPIC__KAFKA_MESSAGING_TRADE,
tradeKafkaBrokerTopic,
TRADE_STORE_FIELDS,
tradeProps,
tradeKafkaConsumers,
datastoreConfig.datastore(),
new String[] {DatastoreNames.TRADE_STORE_NAME},
KafkaMessageConsumer.PLUGIN_KEY)); // Plugin key of the Messaging Consumer to use
return messagingSource;
}
private static final List<String> TRADE_STORE_FIELDS = Arrays.asList(
DatastoreNames.TRADE_ID,
DatastoreNames.TRADE_PRODUCT_ID,
DatastoreNames.TRADE_PRODUCT_QUANTITY,
DatastoreNames.TRADE_COB_DATE
);
Messaging Consumer
Messaging consumers are the ones responsible for connecting to remote messaging system, keep listening for new data, and consume data when available.
The consumers are created when the request to start “listening” to remote topic is submitted at runtime. The underlying Kafka / RabbitMQ client consumers are created in the constructor of the messaging consumer implementation - KafkaMessageConsumer
.