Skip to main content

Load data from a JDBC source

This guide shows how to load data from a database using a JDBC driver into a store, thanks to the JDBC source. For a description of what the JDBC Source is and other advanced options not included in this guide, refer to the JDBC source.

Basic load example

Goal

The goal is to load the first two columns of the Trades table into the Trades store.

Table content

IDAMOUNTCURRENCYTRADERTRADER_ENTITYBANK
112394chfE FE3BNP Paribas
213149gbpA BE4BNP Paribas

Expected Trades content

We want to load the first two columns into Trades store.

idamount
112394
213149

Setup

First, we need to import the artifact com.activeviam.source:jdbc-source in our project.

Build a source

In order to load data from a database into a datastore, the first step is to build a JDBC source.

final IJdbcSource<ResultSetRow> source =
IJdbcSource.builder()
.nativeRows()
.withConnectionInfo(properties.getProperty("url"), H2_DRIVER_CLASS, properties)
.withName(NAME)
.build();

A JDBC source is a collection of JDBC topics. JDBC topics are references to a query on the database. A topic is defined using the query language supported by the JDBC driver (most of the time SQL).

final IJdbcTopic topic = new JdbcTopic(topicName, "SELECT ID, AMOUNT FROM TRADES");
source.addTopic(topic);

Keep in mind that the topic can be configured with more advanced options.

Load into datastore

The second step is to create a channel and use the source to fetch the data into the Datastore.
A channel is the link between a topic and a store.
The way to do so is to provide a channel factory to the source.

final IStoreMessageChannelFactory<String, ResultSetRow> channelFactory =
new JdbcMessageChannelFactory(source, datastore);

Finally, the source can be used to process the channel. The source loads topic content using the channel and stream it into the Datastore.
There is two ways of doing so.

Using the fetch utility:

final Fetch<String, ResultSetRow> fetch = new Fetch<>(channelFactory);
fetch.fetch(source);

Or in a manual way:

final IStoreMessageChannel<String, ResultSetRow> channel =
channelFactory.createChannel(topicName, TRADES_STORE);
datastore.getTransactionManager().startTransaction();
source.fetch(List.of(channel));
datastore.getTransactionManager().commitTransaction();

Access the loaded data

After this loading, you can access your data thanks to the query API.

final ListQuery query =
datastore
.getQueryManager()
.listQuery()
.forTable(TRADES_STORE)
.withoutCondition()
.selectingAllTableFields()
.toQuery();
@Cleanup
final ICursor result = datastore.getMasterHead().getQueryRunner().listQuery(query).run();
result.next();
final Object[] firstRow = result.getRecord().toTuple();
result.next();
final Object[] secondRow = result.getRecord().toTuple();

Load with a parameterized query

Using a prepared statement, you can load data from a JDBC source with a parameterized query.

It is identical to the previous example, except for the topic definition:

final IJdbcTopic topic =
new JdbcTopic(topicName, "SELECT ID, AMOUNT FROM TRADES WHERE TRADER_ENTITY=?");
source.addTopic(topic);

The rest of the process is the same as in the previous example, except that the parameter value must be provided when fetching the data:

source.fetch(channel, List.of("E3"));

Load with a calculated column

It is also possible to load data from a JDBC source with a calculated column, for example to apply an offset to a column.

To do so, one must define a column calculator that will be applied to the column when loading the data:

final int offset = 1_000_000;
final IColumnCalculator<ResultSetRow> calculator =
new IColumnCalculator<>() {
@Override
public String getColumnName() {
return OFFSET_AMOUNT_FIELD;
}
@Override
public Object compute(final IColumnCalculationContext<ResultSetRow> context) {
final ResultSetRow row = context.getContext();
final Integer amountValue = (Integer) row.getObject(AMOUNT_FIELD);
return offset + amountValue;
}
};

Then, one must register the column calculator in the channel factory:

channelFactory.setCalculatedColumns(topicName, TRADES_STORE, List.of(calculator));

And that's it!