RabbitMQ

RabbitMQ Source Configuration

RabbitMQ Source allows for streaming of data from a RabbitMQ Messaging Source.

Maven Dependency

We first must ensure we are importing the correct dependency. Your pom should have:

    <!-- For RabbitMq Source -->
    <dependency>
        <groupId>com.activeviam.io</groupId>
        <artifactId>data-connectors-rabbitmq</artifactId>
        <version>${dataconnectors.version}</version>
    </dependency>

Connection Properties

The connection properties can be wrapped in a Properties object

@Bean
public Properties rabbitMQConnectionProperties() {

    Properties rabbitMQConnectionProps = new Properties();
    rabbitMQConnectionProps.put(PROP__RABBITMQ_BROKER_URI, rabbitMQBrokerUri);

    return rabbitMQConnectionProps;
}

Source Configuration

MessagingSource currently supports loading from Apache Kafka and RabbitMQ out of the box. To load data from a different messaging system, an implementation of IMessagingConsumer needs to be provided. The Following section shows how to configure the source and topic for such a system, via Spring config.

Below is an example:

@Bean
public MessagingSource rabbitMqMessagingSource() {

    MessagingSource messagingSource = new MessagingSource("RabbitMqMessagingSource");

    Properties rabbitMqConnectionProperties = rabbitMQConnectionConfig.rabbitMQConnectionProperties();
    Properties tradeProps = (Properties) rabbitMqConnectionProperties.clone();

    messagingSource.addTopic(
            new MessagingTopic(
                    TOPIC__RABBITMQ_MESSAGING_TRADE,
                    tradeRabbitMqBrokerTopic,
                    TRADE_STORE_FIELDS,
                    tradeProps,
                    tradeRabbitMqConsumers,
                    datastoreConfig.datastore(),
                    new String[] { DataStoreNames.TRADE_STORE_NAME, DataStoreNames.BASE_STORE_NAME },
                    RabbitMqMessageConsumer.PLUGIN_KEY)); // Plugin key of the Messaging Consumer to use

    return messagingSource;
}

Messaging Consumer

Messaging consumers are the ones responsible for connecting to remote messaging system, keep listening for new data, and consume data when available.

The consumers are created when the request to start “listening” to remote topic is submitted at runtime. The underlying Kafka / RabbitMQ client consumers are created in the constructor of the messaging consumer implementation - RabbitMqMessageConsumer.

search.js