RabbitMQ
RabbitMQ Source Configuration
RabbitMQ Source allows for streaming of data from a RabbitMQ Messaging Source.
Maven Dependency
We first must ensure we are importing the correct dependency. Your pom should have:
<!-- For RabbitMq Source -->
<dependency>
<groupId>com.activeviam.io</groupId>
<artifactId>data-connectors-rabbitmq</artifactId>
<version>${dataconnectors.version}</version>
</dependency>
Connection Properties
The connection properties can be wrapped in a Properties
object
@Bean
public Properties rabbitMQConnectionProperties() {
Properties rabbitMQConnectionProps = new Properties();
rabbitMQConnectionProps.put(PROP__RABBITMQ_BROKER_URI, rabbitMQBrokerUri);
return rabbitMQConnectionProps;
}
Source Configuration
MessagingSource
currently supports loading from Apache Kafka and RabbitMQ out of the box.
To load data from a different messaging system, an implementation of IMessagingConsumer
needs to be provided. The Following section shows how to configure the source and topic
for such a system, via Spring config.
Below is an example:
@Bean
public MessagingSource rabbitMqMessagingSource() {
MessagingSource messagingSource = new MessagingSource("RabbitMqMessagingSource");
Properties rabbitMqConnectionProperties = rabbitMQConnectionConfig.rabbitMQConnectionProperties();
Properties tradeProps = (Properties) rabbitMqConnectionProperties.clone();
messagingSource.addTopic(
new MessagingTopic(
TOPIC__RABBITMQ_MESSAGING_TRADE,
tradeRabbitMqBrokerTopic,
TRADE_STORE_FIELDS,
tradeProps,
tradeRabbitMqConsumers,
datastoreConfig.datastore(),
new String[] { DataStoreNames.TRADE_STORE_NAME, DataStoreNames.BASE_STORE_NAME },
RabbitMqMessageConsumer.PLUGIN_KEY)); // Plugin key of the Messaging Consumer to use
return messagingSource;
}
Messaging Consumer
Messaging consumers are the ones responsible for connecting to remote messaging system, keep listening for new data, and consume data when available.
The consumers are created when the request to start “listening” to remote topic is submitted at runtime. The underlying Kafka / RabbitMQ client consumers are created in the constructor of the messaging consumer implementation - RabbitMqMessageConsumer
.