Load data from a JDBC source
This guide shows how to load data from a database using a JDBC driver into a store, thanks to the JDBC source. For a description of what the JDBC Source is and other advanced options not included in this guide, refer to the JDBC source.
Basic load example
Goal
The goal is to load the first two columns of the Trades
table into the Trades
store.
Table content
ID | AMOUNT | CURRENCY | TRADER | TRADER_ENTITY | BANK |
---|---|---|---|---|---|
1 | 12394 | chf | E F | E3 | BNP Paribas |
2 | 13149 | gbp | A B | E4 | BNP Paribas |
Expected Trades
content
We want to load the first two columns into Trades
store.
id | amount |
---|---|
1 | 12394 |
2 | 13149 |
Setup
First, we need to import the artifact com.activeviam.source:jdbc-source
in our project.
Build a source
In order to load data from a database into a datastore, the first step is to build a JDBC source.
final IJdbcSource<ResultSetRow> source =
IJdbcSource.builder()
.nativeRows()
.withConnectionInfo(properties.getProperty("url"), H2_DRIVER_CLASS, properties)
.withName(NAME)
.build();
A JDBC source is a collection of JDBC topics. JDBC topics are references to a query on the database. A topic is defined using the query language supported by the JDBC driver (most of the time SQL).
final JdbcTopic topic =
JdbcTopic.builder().topicName(topicName).query("SELECT ID, AMOUNT FROM TRADES").build();
source.addTopic(topic);
Keep in mind that the topic can be configured with more advanced options.
Load into datastore
The second step is to create a channel and use the source to fetch the data into the Datastore.
A channel is the link between a topic and a store.
The way to do so is to provide a channel factory to the source.
final IStoreMessageChannelFactory<String, ResultSetRow> channelFactory =
new JdbcMessageChannelFactory(source, datastore);
Finally, the source can be used to process the channel.
The source loads topic content using the channel and stream it into the Datastore.
There is two ways of doing so.
Using the fetch utility:
final Fetch<String, ResultSetRow> fetch = new Fetch<>(channelFactory);
fetch.fetch(source);
Or in a manual way:
final IStoreMessageChannel<String, ResultSetRow> channel =
channelFactory.createChannel(topicName, TRADES_STORE);
datastore.getTransactionManager().startTransaction();
source.fetch(List.of(channel));
datastore.getTransactionManager().commitTransaction();
Access the loaded data
After this loading, you can access your data thanks to the query API.
final ListQuery query =
datastore
.getQueryManager()
.listQuery()
.forTable(TRADES_STORE)
.withoutCondition()
.selectingAllTableFields()
.toQuery();
@Cleanup
final ICursor result = datastore.getMasterHead().getQueryRunner().listQuery(query).run();
result.next();
final Object[] firstRow = result.getRecord().toTuple();
result.next();
final Object[] secondRow = result.getRecord().toTuple();
Load with a parameterized query
Using a prepared statement, you can load data from a JDBC source with a parameterized query.
It is identical to the previous example, except for the topic definition:
final JdbcTopic topic =
JdbcTopic.builder()
.topicName(topicName)
.query("SELECT ID, AMOUNT FROM TRADES WHERE TRADER_ENTITY=?")
.build();
source.addTopic(topic);
The rest of the process is the same as in the previous example, except that the parameter value must be provided when fetching the data:
source.fetch(channel, List.of("E3"));
Load with a calculated column
It is also possible to load data from a JDBC source with a calculated column, for example to apply an offset to a column.
To do so, one must define a column calculator that will be applied to the column when loading the data:
final int offset = 1_000_000;
final IColumnCalculator<ResultSetRow> calculator =
new IColumnCalculator<>() {
@Override
public String getColumnName() {
return OFFSET_AMOUNT_FIELD;
}
@Override
public Object compute(final IColumnCalculationContext<ResultSetRow> context) {
final ResultSetRow row = context.getContext();
final Integer amountValue = (Integer) row.getObject(AMOUNT_FIELD);
return offset + amountValue;
}
};
Then, one must register the column calculator in the channel factory:
channelFactory.setCalculatedColumns(topicName, TRADES_STORE, List.of(calculator));
And that's it!