Kafka

Kafka Source Configuration

Kafka Source allows for streaming of data from a Kafka source.

Maven Dependency

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- For Kafka Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-kafka</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Connection Properties

The connection properties can be wrapped in a Properties object

@Bean
public Properties kafkaConnectionProperties() {

    Properties kafkaConnectionProps = new Properties();		
    kafkaConnectionProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 	                
    kafkaConnectionProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);        
    kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    kafkaConnectionProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    kafkaConnectionProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
    kafkaConnectionProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
    kafkaConnectionProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
    kafkaConnectionProps.put("message.max.bytes", messageMaxBytes);
    kafkaConnectionProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
    kafkaConnectionProps.put(PROP_POLL_TIMEOUT, pollTimeout);

    kafkaConnectionProps.put(PROP_SCHEMA_REGISTRY_URL, schemaRegistryUrl);

    return kafkaConnectionProps;
}  

Source Configuration

MessagingSource currently supports loading from Apache Kafka and RabbitMQ out of the box. To load data from a different messaging system, an implementation of IMessagingConsumer needs to be provided. The Following section shows how to configure the source and topic for such a system, via Spring config.

Below is an example:

@Bean
public MessagingSource kafkaMessagingSource() {

    Properties kafkaConnectionProperties = kafkaConnectionConfig.kafkaConnectionProperties();
    MessagingSource messagingSource = new MessagingSource("KafkaMessagingSource");

    Properties tradeProps = (Properties) kafkaConnectionProperties.clone();

    // All the consumers for a topic should have same group.id   

    tradeProps.put(ConsumerConfig.GROUP_ID_CONFIG,
            tradeTopicGroupId.equals("UNIQUE") ? serviceUsername +"."+ UUID.randomUUID().toString() :
                    tradeTopicGroupId);

    tradeProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    tradeProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class.getName());

    messagingSource.addTopic(
            new MessagingTopic(
                    TOPIC__KAFKA_MESSAGING_TRADE,
                    tradeKafkaBrokerTopic,
                    TRADE_STORE_FIELDS,
                    tradeProps,
                    tradeKafkaConsumers,
                    datastoreConfig.datastore(),
                    new String[] {DatastoreNames.TRADE_STORE_NAME},
                    KafkaMessageConsumer.PLUGIN_KEY)); // Plugin key of the Messaging Consumer to use

    return messagingSource;
}

private static final List<String> TRADE_STORE_FIELDS = Arrays.asList(
        DatastoreNames.TRADE_ID,
        DatastoreNames.TRADE_PRODUCT_ID,
        DatastoreNames.TRADE_PRODUCT_QUANTITY,
        DatastoreNames.TRADE_COB_DATE
    );

Messaging Consumer

Messaging consumers are the ones responsible for connecting to remote messaging system, keep listening for new data, and consume data when available.

The consumers are created when the request to start “listening” to remote topic is submitted at runtime. The underlying Kafka / RabbitMQ client consumers are created in the constructor of the messaging consumer implementation - KafkaMessageConsumer.

search.js