JDBC Source
Check out the example code snippets while reading this documentation.
Introduction
An IJdbcSource
is a database-related implementation of the ISource
interface.
ISource
is the generic framework used in Atoti for fetching from external data sources and contributing in an Atoti IDatastore
.
The purpose of the IJdbcSource
is to load data from a database, through a JDBC driver.
Parameters
An IJdbcSource
is built with an IJdbcSourceBuilder
created by IJdbcSource.builder()
.
First you choose one of these types of IJdbcSource
that mostly differ in the way the java.sql.ResultSet
obtained by executing the query is parsed:
IJdbcSourceBuilder.arrayRows()
parses the ResultSet asObject[]
IJdbcSourceBuilder.mapRows()
parses the ResultSet asMap<String,Object>
IJdbcSourceBuilder.nativeRows()
parses the ResultSet asResultSetRow
IJdbcSourceBuilder.customRows()
parses the ResultSet as specified by the providedIJdbcRowCreator
Then you can choose several parameters:
- Connection information: an
IJdbcSource
connects to a given database by using the provided connection information- Url, username, password and class name of the JDBC Driver. OR
- Implementation of the
IConnectionSupplier
functional interface, which wraps the connection information.IConnectionSupplier#createConnection()
provides ajava.sql.Connection
and loads JDBC drivers according to the implementation.
- Source Properties: these properties are either a field of the JDBC source or a property of one of its fields
name
: specifies the JDBC source name (defaults to JDBC_SOURCE).poolSize
: input argument given to the Source Constructor that drives the size of the thread pool in which the task that executes and fills the channel is executed (defaults to 2). It can be an impacting factor when trying to fetch data of the underlying topic from numerous channels at the same time.appendBatchSize
: the max number of elements to poll from the append queue by the append threads (defaults to 1000).
An IJdbcSource
does not need a topic to be instantiated, but topics must be added to the source before any query execution. This is done with IJdbcSource#addTopic(IJdbcTopic)
.
Here's the parameters a topic can take:
topicName
: the name of the topic.query
: the string representation of the SQL query associated with the topic.nbAppendThreads
: the number of threads which process the raw rows and put them in the database.appendQueueSize
: size of the append queue that receives the parsed records of a given task. This property should be set at a larger value thanappendBatchSize
andfetchSize
, since those properties correspond to push/pop operations on a collection.chunkSize
: size of the data chunks which store the processed rows before sending them to the database.fetchSize
: size used to receive SQL request answer. While fetching data from ajava.sql.ResultSet
, we only fetch fetchSize-sized pieces of the ResultSet at once, which can be useful for controlling network usage congestion. This property can be the performance bottleneck of the source.
While fetching data with an IJdbcSource
, Atoti monitors the size of the append queue.
If the queue reaches full capacity, it becomes the limiting factor for the data fetching operation.
In that case, one can increase the size of the thread-pool through JdbcTopic#nbAppendThreads
, or
change the appendBatchSize
value of the JDBC Source.
Once the connection has been tested, data can be fetched from the database by calling the IJdbcSource#fetch([...])
methods.
The fetch([...])
method retrieves all available data as defined by the topics exposed by the channels given in arguments. They can either be SQL queries or Java.sql.PreparedStatement
.
Then it fills the corresponding channels with the parsed results.
ResultSet
type and IStoreMessageChannelFactory
implementations
To load data from a JDBC source, one must provide an implementation of the IStoreMessageChannelFactory
interface.
This interface is used to create the channels that will be used to load the data into the datastore.
For a JDBC source, com.activeviam.source.jdbc.api.AJdbcMessageChannelFactory
can be extended to create a custom implementation of the IStoreMessageChannelFactory
interface.
Out of the box, Atoti provides the following implementations:
JdbcMessageChannelFactory extends AJdbcMessageChannelFactory<ResultSetRow>
: to use withIJdbcSourceBuilder.nativeRows()
ArrayJdbcMessageChannelFactory extends AJdbcMessageChannelFactory<Object[]>
: to use withIJdbcSourceBuilder.arrayRows()
Handling vectors
JDBC ARRAY
type will be automatically converted to an IVector
whose underlying type depends on the type of the ARRAY
in the database.
To pick a specific vector type, you can use the factory VectorColumnCalculatorFactory
and call the factory for native rows or for array rows.
In this example when calling the factory for native rows, we require a calculator for a JDBC array of doubles:
final IJdbcTopic topic = new JdbcTopic(topicName, "SELECT ID, PNL_VECTOR FROM PNL");
source.addTopic(topic);
final IStoreMessageChannelFactory<String, ResultSetRow> channelFactory =
new JdbcMessageChannelFactory(source, datastore);
final IColumnCalculator<ResultSetRow> calculator =
VectorColumnCalculatorFactory.createForNativeRows(
"PNL_VECTOR", ResultSetArrayFormat.JDBC_ARRAY, ContentType.DOUBLE_ARRAY);
channelFactory.setCalculatedColumns(topicName, PNL_STORE, List.of(calculator));
Some JDBC connector serialize the ARRAY
type into a JSON String.
For this kind of vectors you have to use the JSON ResultSetArrayFormat
.
As the type is inconsistent between the JDBC ResultSet (String) and the table (IVector) when using these calculators, one must also call JdbcMessageChannelFactory#setOverridingType(String, String, int)
to set the type of the column as vector.
Column calculators
More generally, customized column calculators for specific columns can be specified to the IStoreMessageChannelFactory
(as shown above for vectors),
and transform data received from the JDBC database as requested.
This example transforms a given column from a java.sql.Date
to a java.time.LocalDate
:
public final class LocalDateJdbcColumnCalculator extends AIndexedColumnCalculator<ResultSetRow> {
/**
* Constructor.
*
* @param columnName Name of the column to change.
*/
public LocalDateJdbcColumnCalculator(final String columnName) {
super(columnName, -1);
}
@Override
public Object compute(final IColumnCalculationContext<ResultSetRow> context) {
final Object value = context.getContext().getObject(this.sourcePosition);
if (value == null) {
return null;
}
return ((Date) value).toLocalDate();
}
}
Fetching data
IJdbcSource#fetch([...])
performs most of the task of an IJdbcSource
: it executes queries on a database and puts the parsed data in channels that feed into an IDatastore
.
Also, if the PARSING_REPORT_ENABLED
property is set to true
, it returns an IJdbcFetchingInfo
object for each topic, which contains various statistics about the fetched data,
such as which columns it contains, how many lines were published to the datastore and how long this process took.
This is performed in parallel:
For each entry in the input map (for instance, a MessageChannel-Parameters pair), fetch([...])
creates a JdbcTask.
Those are executed concurrently on a thread pool dimensioned by an input property of the source.
Here is a sequence diagram displaying the execution of the fetch([...])
method on multiple channels.
JdbcTask and JdbcAppendRunnable
are documented more thoroughly below.
JdbcTask
Each JdbcTask
performs the following tasks:
Executes the query on the database.
Launches and keeps track of concurrent JdbcAppendRunnables on a thread pool dimensioned by a property of the topic related to the JdbcTask.
Creates an append queue from which the runnables poll.
Fetches the corresponding
java.sql.ResultSet
.To avoid network congestion, data from the
ResultSet
is recovered by fetchSize-sized pieces.Iterates over the fetchSize-sized piece of the
ResultSet
until the data is entirely recovered:- Parses the data as a collection of records by using the implementation of the current source.
- Feeds it to the append queue.
When the ResultSet
has been entirely processed, the JdbcAppendRunnables are notified, and we wait for the termination of all the linked threads to proceed and terminate the JdbcTask.
JdbcAppendRunnable
Each JdbcAppendRunnable
performs the following tasks until the underlying JdbcTask sends notification that its append queue is empty and won't be filled anymore:
- Attempts to drain the queue by batchSize-sized pieces into a
List<Record>
. - Pushes the records from the
List
into the channel'sIMessage
by chunkSize-sized pieces.
Monitoring JDBC Source
The JDBC Source contributes to the Health Event Monitoring, under the tags jdbc and source.
The following snippet adds the basic implementation of the listener to the handler stack.
HealthEventDispatcher.INSTANCE.addEventHandler(new LoggingJdbcHealthEventHandler());
This uses the default logger to report all JDBC operations. By default, there is no filtering on the received events.