Introduction to JDBCSource
JDBCSource
A JDBCSource is a database-related implementation of the ISource
interface, which is the generic framework used in ActivePivot for fetching data from an external source and put it in a ActivePivot Datastore
.
The purpose of the JDBCSource is to load data from a database, through a JDBC driver.
A JBDCSource has several parameters :
Connection information
- User Name/Password/Class of the JDBC Driver
OR
- Implementation of the
IConnectionSupplier
functional interface which wraps the connection information. ThecreateConnection()
method of the connectionSupplier will provide ajava.sql.Connection
and load JDBC drivers according to the implementation.
JDBC Topics
Processing properties
- Size of the Tasks thread pool
- Size of the queue containing rows retrieved from the SQL query
- The max number of elements polled at once from the queue
A JDBCSource connects to a given Database by using the provided connection information.
Once the connection has been tested, data can be fetched from the database by calling the fetch([...])
method:
void fetch(Map<IMessageChannel<String, T>, List<Object>> channelsAndParams);
A JDBCSource does not need a topic to be instantiated, but topics must be added to the source before any query execution. This is done with the addTopic([...])
method.
The fetch([...])
method will retrieve all the available data as defined by the topics exposed by the channels given in argument, that can either be SQL queries or a Java.sql.PreparedStatement
, then fill the corresponding channels with the parsed results.
The provided implementations of IJDBCSource
extend AJDBCSource
and mostly differ by the way the ResultSet
obtained by executing the Query is parsed :
ArrayJDBCSource
parses the ResultSet asObject[]
MapJDBCSource
parses the ResultSet asMap<String,Object>
NativeJDBCSource
parses the ResultSet asQfsResultRow
Use Example
Create the ArrayJDBCSource
source by using a IConnectionSupplier
implementation which will attempt to use the H2 database Driver :
final IConnectionSupplier supplier = new SimpleConnectionSupplier(
url,
username,
pwd,
driverClass);
final IJDBCSource<Object[]> source = new ArrayJDBCSource(
supplier, sourceName, -1, -1);
By specifying the value
-1
in the Source constructor, the default values for the processing properties are used. However, the source properties can be modified for a more fine-tuned configuration of the source.
Create the topic, here a preparedStatement
, and add it to the source:
final JDBCTopic topic = new JDBCTopic(
topicName,
"SELECT DESK, BOOK, PNL FROM RECORDS WHERE DESK=?");
source.addTopic(topic);
Default Topic parameters are implied here, but can also be modified to tune the performances of the source.
An IMessageChannelFactory
implementation creates the channels between the topics of the source and the stores of a datastore:
final JDBCMessageChannelFactory channelFactory =
new JDBCMessageChannelFactory(source, datastore);
final IMessageChannel<String, QfsResultSetRow> channel =
channelFactory.createChannel(topicName, store.toString());
Specify the requested values in the topic, then execute the query and fill the channel:
source.fetch(channel, Arrays.asList("Desk A"));
Fetching data
The fetch([...])
method performs most of the task of a JDBCSource, which is to execute queries on a database and put the parsed data in channels that will feed a Datastore
.
This is performed in parallel :
For each entry in the input map ( eg. a MessageChannel-Parameters pair) , fetch([...])
will create a JDBCTask
, those are executed concurrently on a Thread Pool dimensionned by an input property of the Source.
Here is a sequence diagram displaying the execution of the fetch([...])
method on multiple channels. JDBCTask
and JDBCAppendRunnable
are documented more thoroughly below.
JDBCTask
Each JDBCTask
has to perform the following tasks :
Execute the query on the database.
Launch and keep track of concurrent
JDBCAppendRunnable
on a thread pool dimensionned by a property of the Topic related to theJDBCTask
.Create an appendQueue from which the runnables will poll.
Fetch the corresponding
java.sql.ResultSet
.In order to avoid network congestion, data from the
ResultSet
is recovered byfetchSize
-sized pieces.Iterate upon the
fetchSize
-sized piece of theResultSet
until the data was entirely recovered :- Parse the data as a collection of
Record
by using theJDBCSource::createRecord()
implementation of the current source. - Feed it to the appendQueue.
- Parse the data as a collection of
When the ResultSet
has been entirely processed, the JDBCAppendRunnable
are notified and we wait for the termination of all the linked threads to proceed and terminate the JDBCTask
JDBCAppendRunnable
From its creation, and until it's been notified by the underlying JDBCTask
that its appendQueue is empty and won't be filled anymore, each JDBCAppendRunnable
will :
- Attempt to drain the queue by
batchSize
-sized pieces into aList<Record>
. - Push the Records from the
List
into the channel'sIMessage
bychunkSize
-sized pieces.
Monitoring JDBC Source
The JDBC Source contributes to the Health Event Monitoring, under the tags jdbc and source.
The following snippet adds the basic implementation of the listener to the handler stack.
HealthEventDispatcher.INSTANCE.addEventHandler(
new LoggingJdbcHealthEventHandler());
This uses the default logger to report all JDBC operations. By default, there is no filtering on the received events.