Skip to main content

JDBC Source

You should be familiar with the source ETL concepts before reading the JDBC specific documentation.

Check out the example code snippets while reading this documentation.

An IJdbcSource is a database-related implementation of the ISource interface. ISource is the generic framework used in Atoti for fetching from external data sources and contributing in an Atoti IDatastore.

The purpose of the IJdbcSource is to load data from a database, through a JDBC driver.

ETL pipeline

Here is a representation of the ETL pipeline for a JDBC source:

Extract

To perform the extraction step, Atoti relies on:

  • a JDBC driver provided by the client
  • a query to execute, it can be either:
    • a String, which will be passed to the JDBC driver
    • a java.sql.PreparedStatement provided by a com.activeviam.source.jdbc.api.IPreparedStatementSupplier.

This allows data to be extracted from a database to the format java.sql.ResultSet that can be used in the rest of the pipeline.

An IJdbcSource is built with an IJdbcSourceBuilder created by IJdbcSource.builder().

final IJdbcSource<ResultSetRow> source =
IJdbcSource.builder()
.nativeRows()
.withConnectionInfo(properties.getProperty("url"), H2_DRIVER_CLASS, properties)
.withName(NAME)
.build();

Row types

First you choose one of these types of IJdbcSource that mostly differ in the way the java.sql.ResultSet obtained by executing the query is parsed:

  • IJdbcSourceBuilder.nativeRows() parses the ResultSet as ResultSetRow, it is the recommended option.
  • IJdbcSourceBuilder.arrayRows() parses the ResultSet as Object[]

Native rows

A ResultSetRow is a thin wrapper around a Object[], with additional metadata:

  • the java.sql.ResultSetMetaData provided by the JDBC driver.
  • the Map<String, Integer> that maps the column names to their index in the Object[].

Native rows relies on the presence of the java.sql.ResultSetMetaData to provide a more ergonomic way to manipulate the result: for example, a specific column can be accessed by its name using ResultSetRow#getObject(String). The java.sql.ResultSetMetaData is included in ResultSetRow to allow a developer to use this metadata in custom column calculators.

Array rows

Array rows are the simplest way to parse the java.sql.ResultSet as an Object[]. It is less ergonomic than the ResultSetRow as it requires a developer to know the index of the column in the Object[].

Connection information

The JDBC source connects to a given database by using the provided connection information:

  • Url, username, password and class name of the JDBC Driver. OR
  • An implementation of the IConnectionSupplier interface, which wraps the connection information. IConnectionSupplier#createConnection() must provide a java.sql.Connection and must load the JDBC driver.

Optional parameters

ParameterDescriptionDefault value
nameSpecifies the JDBC source name"JDBC_SOURCE"
poolSizeThe size of the thread pool in which the task that executes and fills the channel is executed2

Topic creation

Topics do not need to exist before instantiating an IJdbcSource, but they must be added to the source before loading the data. This is done with IJdbcSource#addTopic(IJdbcTopic).

Downloading the data from the database must be done in mono-threaded fashion, due to the nature of the JDBC standard. However, once this step is performed, the data can be processed in parallel to be loaded into the datastore.

In order to create a JdbcTopic, use the fluent JdbcTopic.builder() method. The following parameters can be set:

ParameterDescription
topicNameThe name of the topic.
query

OR

preparedStatementSupplier
The string representation of the query associated with the topic (usually a SQL query, but it can be a stringified JSON for NoSQL databases for example)

An user implementation of IPreparedStatementSupplier which provides a java.sql.PreparedStatement
queueSizeThe size of the queue that receives the downloaded records.
Details

This property should be set at a larger value than batchSize and fetchSize, since those properties correspond to push/pop operations on a the queue.

appendThreadsThe number of threads which process the raw rows that have been downloaded
chunkSizeThe size of the data chunks which store the processed rows before sending them to the datastore
fetchSizeThe size of one fetch performed by the JDBC driver.
Details

This parameters is forwarded to java.sql.ResultSet#setFetchSize() and can be ignored by the JDBC driver. It can be useful for controlling network usage congestion, which can be the performance bottleneck of the source

batchSizeThe max number of elements to poll from the append queue by the append threads

While fetching data with an IJdbcSource, Atoti monitors the size of the append queue. If the queue reaches full capacity, it becomes the limiting factor for the data fetching operation. In that case, increase the size of the thread-pool through appendThread, or change the batchSize value of the JDBC Source.

Overall view

The JDBC source operates with a given poolSize to process the tasks.

  • Extract:
    • Each topic will fetch data (fetchSize) from the external database and put it in the append queue (queueSize)
    • This step is mono-threaded within each topic
  • Transform:
    • Several threads (appendThreads) pool the data from the append queue in batch (batchSize) and transform it into a IMessage
  • Load:
    • The IMessage are loaded into the datastore in chunks (chunkSize)

Transform

Column calculators

Customized column calculators for specific columns can be specified to the IStoreMessageChannelFactory and transform data received from the JDBC database as requested.

This example transforms a given column from a java.sql.Date to a java.time.LocalDate:

public final class LocalDateJdbcColumnCalculator extends AIndexedColumnCalculator<ResultSetRow> {
/**
* Constructor.
*
* @param columnName Name of the column to change.
*/
public LocalDateJdbcColumnCalculator(final String columnName) {
super(columnName, -1);
}
@Override
public Object compute(final IColumnCalculationContext<ResultSetRow> context) {
final Object value = context.getContext().getObject(this.sourcePosition);
if (value == null) {
return null;
}
return ((Date) value).toLocalDate();
}
}

Handling dates and timestamps

The following column calculators are available:

  • LocalDateJdbcColumnCalculator (whose implementation is given as an example above) to transform java.sql.Date into java.time.LocalDate
  • LocalDateTimeJdbcColumnCalculator to transform java.sql.Timestamp into java.time.LocalDateTime

Handling vectors

JDBC ARRAY type will be automatically converted to an IVector whose underlying type depends on the type of the ARRAY in the database. To pick a specific vector type, you can use the factory VectorColumnCalculatorFactory and call the factory for native rows or for array rows. In this example when calling the factory for native rows, a calculator for a JDBC array of doubles is required:

final JdbcTopic topic =
JdbcTopic.builder().topicName(topicName).query("SELECT ID, PNL_VECTOR FROM PNL").build();
source.addTopic(topic);
final IStoreMessageChannelFactory<String, ResultSetRow> channelFactory =
new JdbcMessageChannelFactory(source, datastore);
final IColumnCalculator<ResultSetRow> calculator =
VectorColumnCalculatorFactory.createForNativeRows(
"PNL_VECTOR", ResultSetArrayFormat.JDBC_ARRAY, ContentType.DOUBLE_ARRAY);
channelFactory.setCalculatedColumns(topicName, PNL_STORE, List.of(calculator));

Some JDBC connector serialize the ARRAY type into a JSON String. For this kind of vectors use the JSON ResultSetArrayFormat. For this type there is an inconsistency between the java.sql.ResultSet (JSON String) and the datastore table (IVector). When using these calculators, call JdbcMessageChannelFactory#setOverridingType(String, String, int) to set the type of the column as vector.

Load

To load data from a JDBC source, an implementation of the IStoreMessageChannelFactory interface must be provided. This object is used to create the channels required to load the data into the datastore.

For a JDBC source, com.activeviam.source.jdbc.api.AJdbcMessageChannelFactory can be extended to create a custom implementation of the IStoreMessageChannelFactory interface.

Out of the box, Atoti provides the following implementations:

  • JdbcMessageChannelFactory extends AJdbcMessageChannelFactory<ResultSetRow>: to use with IJdbcSourceBuilder.nativeRows()
  • ArrayJdbcMessageChannelFactory extends AJdbcMessageChannelFactory<Object[]>: to use with IJdbcSourceBuilder.arrayRows()

Pipeline execution

The JDBC source connects to an external service. Therefore, before executing the pipeline, the connection to the database is tested.

Fetch

Data can be fetched from the database by calling the IJdbcSource#fetch([...]) methods.

The fetch([...]) method retrieves all available data as defined by the topics exposed by the channels given in arguments. Then it fills the corresponding channels with the parsed results.

IJdbcSource#fetch([...]) performs most of the task of an IJdbcSource: it executes queries on a database and puts the parsed data in channels that feed into an IDatastore. Also, if the parsingReportEnabled property is set to true in the source, it returns an IJdbcFetchingInfo object for each topic, which contains various statistics about the fetched data, such as which columns it contains, how many lines were published to the datastore and how long this process took.

For each entry in the input map (for instance, a MessageChannel-Parameters pair), fetch([...]) creates a task. Those tasks are executed concurrently on a thread pool dimensioned by an input property of the source.

Monitoring

Parsing reports

If the property parsingReportEnabled is set to true in the IJdbcSourceBuilder, the IJdbcSource#fetch methods return an IJdbcFetchingInfo object for each topic, which contains various statistics about the fetched data, such as which columns it contains, how many lines were published to the datastore and how long this process took.

Health Event Monitoring

The JDBC Source contributes to the Health Event Monitoring, under the tags jdbc and source.

The following snippet adds the basic implementation of the listener to the handler stack.

HealthEventDispatcher.INSTANCE.addEventHandler(new LoggingJdbcHealthEventHandler());

This uses the default logger to report all JDBC operations. By default, there is no filtering on the received events.