JDBC Source
You should be familiar with the source ETL concepts before reading the JDBC specific documentation.
Check out the example code snippets while reading this documentation.
An IJdbcSource
is a database-related implementation of the ISource
interface.
ISource
is the generic framework used in Atoti for fetching from external data sources and contributing in an Atoti IDatastore
.
The purpose of the IJdbcSource
is to load data from a database, through a JDBC driver.
ETL pipeline
Here is a representation of the ETL pipeline for a JDBC source:
Extract
To perform the extraction step, Atoti relies on:
- a JDBC driver provided by the client
- a query to execute, it can be either:
- a
String
, which will be passed to the JDBC driver - a
java.sql.PreparedStatement
provided by acom.activeviam.source.jdbc.api.IPreparedStatementSupplier
.
- a
This allows data to be extracted from a database to the format java.sql.ResultSet
that can be used in the rest of the pipeline.
An IJdbcSource
is built with an IJdbcSourceBuilder
created by IJdbcSource.builder()
.
final IJdbcSource<ResultSetRow> source =
IJdbcSource.builder()
.nativeRows()
.withConnectionInfo(properties.getProperty("url"), H2_DRIVER_CLASS, properties)
.withName(NAME)
.build();
Row types
First you choose one of these types of IJdbcSource
that mostly differ in the way the java.sql.ResultSet
obtained by executing the query is parsed:
IJdbcSourceBuilder.nativeRows()
parses the ResultSet asResultSetRow
, it is the recommended option.IJdbcSourceBuilder.arrayRows()
parses the ResultSet asObject[]
Native rows
A ResultSetRow
is a thin wrapper around a Object[]
, with additional metadata:
- the
java.sql.ResultSetMetaData
provided by the JDBC driver. - the
Map<String, Integer>
that maps the column names to their index in theObject[]
.
Native rows relies on the presence of the java.sql.ResultSetMetaData
to provide a more ergonomic way to manipulate the result: for example, a specific column can be accessed by its name using ResultSetRow#getObject(String)
.
The java.sql.ResultSetMetaData
is included in ResultSetRow
to allow a developer to use this metadata in custom column calculators.
Array rows
Array rows are the simplest way to parse the java.sql.ResultSet
as an Object[]
.
It is less ergonomic than the ResultSetRow
as it requires a developer to know the index of the column in the Object[]
.
Connection information
The JDBC source connects to a given database by using the provided connection information:
- Url, username, password and class name of the JDBC Driver. OR
- An implementation of the
IConnectionSupplier
interface, which wraps the connection information.IConnectionSupplier#createConnection()
must provide ajava.sql.Connection
and must load the JDBC driver.
Optional parameters
Parameter | Description | Default value |
---|---|---|
name | Specifies the JDBC source name | "JDBC_SOURCE" |
poolSize | The size of the thread pool in which the task that executes and fills the channel is executed | 2 |
Topic creation
Topics do not need to exist before instantiating an IJdbcSource, but they must be added to the source before loading the data.
This is done with IJdbcSource#addTopic(IJdbcTopic)
.
Downloading the data from the database must be done in mono-threaded fashion, due to the nature of the JDBC standard. However, once this step is performed, the data can be processed in parallel to be loaded into the datastore.
In order to create a JdbcTopic
, use the fluent JdbcTopic.builder()
method.
The following parameters can be set:
Parameter | Description |
---|---|
topicName | The name of the topic. |
query OR preparedStatementSupplier | The string representation of the query associated with the topic (usually a SQL query, but it can be a stringified JSON for NoSQL databases for example) An user implementation of IPreparedStatementSupplier which provides a java.sql.PreparedStatement |
queueSize | The size of the queue that receives the downloaded records. DetailsThis property should be set at a larger value than |
appendThreads | The number of threads which process the raw rows that have been downloaded |
chunkSize | The size of the data chunks which store the processed rows before sending them to the datastore |
fetchSize | The size of one fetch performed by the JDBC driver. DetailsThis parameters is forwarded to |
batchSize | The max number of elements to poll from the append queue by the append threads |
While fetching data with an IJdbcSource
, Atoti monitors the size of the append queue.
If the queue reaches full capacity, it becomes the limiting factor for the data fetching operation.
In that case, increase the size of the thread-pool through appendThread
, or change the batchSize
value of the JDBC Source.
Overall view
The JDBC source operates with a given poolSize
to process the tasks.
- Extract:
- Each topic will fetch data (
fetchSize
) from the external database and put it in the append queue (queueSize
) - This step is mono-threaded within each topic
- Each topic will fetch data (
- Transform:
- Several threads (
appendThreads
) pool the data from the append queue in batch (batchSize
) and transform it into aIMessage
- Several threads (
- Load:
- The
IMessage
are loaded into the datastore in chunks (chunkSize
)
- The
Transform
Column calculators
Customized column calculators for specific columns can be specified to the IStoreMessageChannelFactory
and transform data received from the JDBC database as requested.
This example transforms a given column from a java.sql.Date
to a java.time.LocalDate
:
public final class LocalDateJdbcColumnCalculator extends AIndexedColumnCalculator<ResultSetRow> {
/**
* Constructor.
*
* @param columnName Name of the column to change.
*/
public LocalDateJdbcColumnCalculator(final String columnName) {
super(columnName, -1);
}
@Override
public Object compute(final IColumnCalculationContext<ResultSetRow> context) {
final Object value = context.getContext().getObject(this.sourcePosition);
if (value == null) {
return null;
}
return ((Date) value).toLocalDate();
}
}
Handling dates and timestamps
The following column calculators are available:
LocalDateJdbcColumnCalculator
(whose implementation is given as an example above) to transformjava.sql.Date
intojava.time.LocalDate
LocalDateTimeJdbcColumnCalculator
to transformjava.sql.Timestamp
intojava.time.LocalDateTime
Handling vectors
JDBC ARRAY
type will be automatically converted to an IVector
whose underlying type depends on the type of the ARRAY
in the database.
To pick a specific vector type, you can use the factory VectorColumnCalculatorFactory
and call the factory for native rows or for array rows.
In this example when calling the factory for native rows, a calculator for a JDBC array of doubles is required:
final JdbcTopic topic =
JdbcTopic.builder().topicName(topicName).query("SELECT ID, PNL_VECTOR FROM PNL").build();
source.addTopic(topic);
final IStoreMessageChannelFactory<String, ResultSetRow> channelFactory =
new JdbcMessageChannelFactory(source, datastore);
final IColumnCalculator<ResultSetRow> calculator =
VectorColumnCalculatorFactory.createForNativeRows(
"PNL_VECTOR", ResultSetArrayFormat.JDBC_ARRAY, ContentType.DOUBLE_ARRAY);
channelFactory.setCalculatedColumns(topicName, PNL_STORE, List.of(calculator));
Some JDBC connector serialize the ARRAY
type into a JSON String.
For this kind of vectors use the JSON ResultSetArrayFormat
.
For this type there is an inconsistency between the java.sql.ResultSet
(JSON String
) and the datastore table (IVector
).
When using these calculators, call JdbcMessageChannelFactory#setOverridingType(String, String, int)
to set the type of the column as vector.
Load
To load data from a JDBC source, an implementation of the IStoreMessageChannelFactory
interface must be provided.
This object is used to create the channels required to load the data into the datastore.
For a JDBC source, com.activeviam.source.jdbc.api.AJdbcMessageChannelFactory
can be extended to create a custom implementation of the IStoreMessageChannelFactory
interface.
Out of the box, Atoti provides the following implementations:
JdbcMessageChannelFactory extends AJdbcMessageChannelFactory<ResultSetRow>
: to use withIJdbcSourceBuilder.nativeRows()
ArrayJdbcMessageChannelFactory extends AJdbcMessageChannelFactory<Object[]>
: to use withIJdbcSourceBuilder.arrayRows()
Pipeline execution
The JDBC source connects to an external service. Therefore, before executing the pipeline, the connection to the database is tested.
Fetch
Data can be fetched from the database by calling the IJdbcSource#fetch([...])
methods.
The fetch([...])
method retrieves all available data as defined by the topics exposed by the channels given in arguments.
Then it fills the corresponding channels with the parsed results.
IJdbcSource#fetch([...])
performs most of the task of an IJdbcSource
: it executes queries on a database and puts the parsed data in channels that feed into an IDatastore
.
Also, if the parsingReportEnabled
property is set to true
in the source, it returns an IJdbcFetchingInfo
object for each topic, which contains various statistics about the fetched data,
such as which columns it contains, how many lines were published to the datastore and how long this process took.
For each entry in the input map (for instance, a MessageChannel-Parameters pair), fetch([...])
creates a task.
Those tasks are executed concurrently on a thread pool dimensioned by an input property of the source.
Monitoring
Parsing reports
If the property parsingReportEnabled
is set to true in the IJdbcSourceBuilder
,
the IJdbcSource#fetch
methods return an IJdbcFetchingInfo
object for each topic, which contains various statistics about the fetched data,
such as which columns it contains, how many lines were published to the datastore and how long this process took.
Health Event Monitoring
The JDBC Source contributes to the Health Event Monitoring, under the tags jdbc and source.
The following snippet adds the basic implementation of the listener to the handler stack.
HealthEventDispatcher.INSTANCE.addEventHandler(new LoggingJdbcHealthEventHandler());
This uses the default logger to report all JDBC operations. By default, there is no filtering on the received events.