> ## Documentation Index
> Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt
> Use this file to discover all available pages before exploring further.

# JDBC Source

> You should be familiar with the [source ETL concepts](./data_sources_introduction) before reading the JDBC specific documentation.

> Check out the example [code snippets](how-to/load_data_from_jdbc_in_store) while reading this documentation.

An `IJdbcSource` is a database-related implementation of the `ISource` interface.
`ISource` is the generic framework used in Atoti for fetching from external data sources and contributing in an Atoti `IDatastore`.

The purpose of the `IJdbcSource` is to load data from a database, through a JDBC driver.

## ETL pipeline

Here is a representation of the ETL pipeline for a JDBC source:

```mermaid theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
%%{init: {'sequence': { 'mirrorActors': false}}}%%
sequenceDiagram
    participant Database as External Database
    participant Queue as Queue
    participant Message as Message
    participant Datastore as Datastore
    Database-->>Queue: Extract
    Queue-->>Message: Transform
    Message-->>Datastore: Load
```

### Extract

To perform the extraction step, Atoti relies on:

* a JDBC driver provided by the client
* a query to execute, it can be either:
  * a `String`, which will be passed to the JDBC driver
  * a `java.sql.PreparedStatement` provided by a `com.activeviam.source.jdbc.api.IPreparedStatementSupplier`.

This allows data to be extracted from a database to the format `java.sql.ResultSet` that can be used in the rest of the pipeline.

An `IJdbcSource` is built with an `IJdbcSourceBuilder` created by `IJdbcSource.builder()`.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final IJdbcSource<ResultSetRow> source =
    IJdbcSource.builder()
        .nativeRows()
        .withConnectionInfo(properties.getProperty("url"), H2_DRIVER_CLASS, properties)
        .withName(NAME)
        .build();
```

#### Row types

First you choose one of these types of `IJdbcSource` that mostly differ in the way the `java.sql.ResultSet` obtained by executing the query is parsed:

* `IJdbcSourceBuilder.nativeRows()` parses the ResultSet as `ResultSetRow`, it is the recommended option.
* `IJdbcSourceBuilder.arrayRows()` parses the ResultSet as `Object[]`

##### Native rows

A `ResultSetRow` is a thin wrapper around a `Object[]`, with additional metadata:

* the `java.sql.ResultSetMetaData` provided by the JDBC driver.
* the `Map<String, Integer>` that maps the column names to their index in the `Object[]`.

Native rows relies on the presence of the `java.sql.ResultSetMetaData` to provide a more ergonomic way to manipulate the result: for example, a specific column can be accessed by its name using `ResultSetRow#getObject(String)`.
The `java.sql.ResultSetMetaData` is included in `ResultSetRow` to allow a developer to use this metadata in custom column calculators.

##### Array rows

Array rows are the simplest way to parse the `java.sql.ResultSet` as an `Object[]`.
It is less ergonomic than the `ResultSetRow` as it requires a developer to know the index of the column in the `Object[]`.

#### Connection information

The JDBC source connects to a given database by using the provided connection information:

* Url, username, password and class name of the JDBC Driver.
  **OR**
* An implementation of the `IConnectionSupplier` interface, which wraps the connection information. `IConnectionSupplier#createConnection()` must provide a `java.sql.Connection` and must load the JDBC driver.

#### Optional parameters

| Parameter  | Description                                                                                   | Default value   |
| ---------- | --------------------------------------------------------------------------------------------- | --------------- |
| `name`     | Specifies the JDBC source name                                                                | `"JDBC_SOURCE"` |
| `poolSize` | The size of the thread pool in which the task that executes and fills the channel is executed | 2               |

#### Topic creation

Topics do not need to exist before instantiating an IJdbcSource, but they must be added to the source before loading the data.
This is done with `IJdbcSource#addTopic(IJdbcTopic)`.

Downloading the data from the database must be done in mono-threaded fashion, due to the nature of the JDBC standard.
However, once this step is performed, the data can be processed in parallel to be loaded into the datastore.

In order to create a `JdbcTopic`, use the fluent `JdbcTopic.builder()` method.
The following parameters can be set:

| Parameter                                                        | Description                                                                                                                                                                                                                                                                                                          |
| ---------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topicName`                                                      | The name of the topic.                                                                                                                                                                                                                                                                                               |
| `query`<br /><br />**OR**<br /><br />`preparedStatementSupplier` | The string representation of the query associated with the topic (usually a SQL query, but it can be a stringified JSON for NoSQL databases for example)<br /><br />An user implementation of `IPreparedStatementSupplier` which provides a `java.sql.PreparedStatement`                                             |
| `queueSize`                                                      | The size of the queue that receives the downloaded records. <details><p>This property should be set at a larger value than <code>batchSize</code> and <code>fetchSize</code>, since those properties correspond to push/pop operations on a the queue.</p></details>                                                 |
| `appendThreads`                                                  | The number of threads which process the raw rows that have been downloaded                                                                                                                                                                                                                                           |
| `chunkSize`                                                      | The size of the data chunks which store the processed rows before sending them to the datastore                                                                                                                                                                                                                      |
| `fetchSize`                                                      | The size of one fetch performed by the JDBC driver. <details><p>This parameters is forwarded to <code>java.sql.ResultSet#setFetchSize()</code> and can be ignored by the JDBC driver. It can be useful for controlling network usage congestion, which can be the performance bottleneck of the source</p></details> |
| `batchSize`                                                      | The max number of elements to poll from the append queue by the append threads                                                                                                                                                                                                                                       |

While fetching data with an `IJdbcSource`, Atoti monitors the size of the append queue.
If the queue reaches full capacity, it becomes the limiting factor for the data fetching operation.
In that case, increase the size of the thread-pool through `appendThread`, or change the `batchSize` value of the JDBC Source.

### Overall view

```mermaid theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
%%{init: {'sequence': { 'mirrorActors': false}}}%%
sequenceDiagram
    participant Database as External Database
    participant Queue as Queue
    participant Message as Message
    participant Datastore as Datastore
    par Topic
      Note over Queue: queueSize
      Database-->>Queue: Extract
      Note over Database,Queue: fetchSize
      par
          Note over Queue,Datastore: appendThreads
          Queue-->>Message: Transform
          Note over Queue,Message: batchSize
          Message-->>Datastore: Load
          Note over Message,Datastore: chunkSize
      and
          Queue-->>Message: Transform
          Note over Queue,Message: batchSize
          Message-->>Datastore: Load
          Note over Message,Datastore: chunkSize
      end
    and Topic
      Note over Queue: queueSize
      Database-->>Queue: Extract
      Note over Database,Queue: fetchSize
      par
          Note over Queue,Datastore: appendThreads
          Queue-->>Message: Transform
          Note over Queue,Message: batchSize
          Message-->>Datastore: Load
          Note over Message,Datastore: chunkSize
      and
          Queue-->>Message: Transform
          Note over Queue,Message: batchSize
          Message-->>Datastore: Load
          Note over Message,Datastore: chunkSize
      end
    end
```

The JDBC source operates with a given `poolSize` to process the tasks.

* Extract:
  * Each topic will fetch data (`fetchSize`) from the external database and put it in the append queue (`queueSize`)
  * This step is mono-threaded within each topic
* Transform:
  * Several threads (`appendThreads`) pool the data from the append queue in batch (`batchSize`) and transform it into a `IMessage`
* Load:
  * The `IMessage` are loaded into the datastore in chunks (`chunkSize`)

### Transform

#### Column calculators

Customized column calculators for specific columns can be specified to the `IStoreMessageChannelFactory` and transform data received from the JDBC database as requested.

This example transforms a given column from a `java.sql.Date` to a `java.time.LocalDate`:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
public final class LocalDateJdbcColumnCalculator extends AIndexedColumnCalculator<ResultSetRow> {
  /**
   * Constructor.
   *
   * @param columnName Name of the column to change.
   */
  public LocalDateJdbcColumnCalculator(final String columnName) {
    super(columnName, -1);
  }
  @Override
  public Object compute(final IColumnCalculationContext<ResultSetRow> context) {
    final Object value = context.getContext().getObject(this.sourcePosition);
    if (value == null) {
      return null;
    }
    return ((Date) value).toLocalDate();
  }
}
```

#### Handling numbers

The column calculator `NumberJdbcColumnCalculator` is available to easily convert any `java.lang.Number` (boxed types, `BigInteger`, ...) to the desired type.
This column calculator is not required if both types are matching (e.g. from `Double` to `double`, ...).

#### Handling dates and timestamps

The following column calculators are available:

* `LocalDateJdbcColumnCalculator` (whose implementation is given as an example above) to transform `java.sql.Date` into `java.time.LocalDate`
* `LocalDateTimeJdbcColumnCalculator` to transform `java.sql.Timestamp` into `java.time.LocalDateTime`

#### Handling vectors

JDBC `ARRAY` type will be automatically converted to an `IVector` whose underlying type depends on the type of the `ARRAY` in the database.
To pick a specific vector type, you can use the factory `VectorColumnCalculatorFactory` and call the factory for native rows or for array rows.
In this example when calling the factory for native rows, a calculator for a JDBC array of doubles is required:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final JdbcTopic topic =
    JdbcTopic.builder().topicName(topicName).query("SELECT ID, PNL_VECTOR FROM PNL").build();
source.addTopic(topic);
final IStoreMessageChannelFactory<String, ResultSetRow> channelFactory =
    new JdbcMessageChannelFactory(source, datastore);
final IColumnCalculator<ResultSetRow> calculator =
    VectorColumnCalculatorFactory.createForNativeRows(
        "PNL_VECTOR", ResultSetArrayFormat.JDBC_ARRAY, ContentType.DOUBLE_ARRAY);
channelFactory.setCalculatedColumns(topicName, PNL_STORE, List.of(calculator));
```

Some JDBC connector serialize the `ARRAY` type into a JSON String.
For this kind of vectors use the JSON `ResultSetArrayFormat`.
For this type there is an inconsistency between the `java.sql.ResultSet` (JSON `String`) and the datastore table (`IVector`).
When using these calculators, call `JdbcMessageChannelFactory#setOverridingType(String, String, int)` to set the type of the column as vector.

### Load

To load data from a JDBC source, an implementation of the `IStoreMessageChannelFactory` interface must be provided.
This object is used to create the channels required to load the data into the datastore.

For a JDBC source, `com.activeviam.source.jdbc.api.AJdbcMessageChannelFactory` can be extended to create a custom implementation of the `IStoreMessageChannelFactory` interface.

Out of the box, Atoti provides the following implementations:

* `JdbcMessageChannelFactory extends AJdbcMessageChannelFactory<ResultSetRow>`: to use with `IJdbcSourceBuilder.nativeRows()`
* `ArrayJdbcMessageChannelFactory extends AJdbcMessageChannelFactory<Object[]>`: to use with `IJdbcSourceBuilder.arrayRows()`

## Pipeline execution

The JDBC source connects to an external service.
Therefore, before executing the pipeline, the connection to the database is tested.

### Fetch

Data can be fetched from the database by calling the `IJdbcSource#fetch([...])` methods.

The `fetch([...])` method retrieves all available data as defined by the topics exposed by the channels given in arguments.
Then it fills the corresponding channels with the parsed results.

`IJdbcSource#fetch([...])` performs most of the task of an `IJdbcSource`: it executes queries on a database and puts the parsed data in channels that feed into an `IDatastore`.
Also, if the `parsingReportEnabled` property is set to `true` in the source, it returns an `IJdbcFetchingInfo` object for each topic, which contains various statistics about the fetched data,
such as which columns it contains, how many lines were published to the datastore and how long this process took.

For each entry in the input map (for instance, a MessageChannel-Parameters pair), `fetch([...])` creates a task.
Those tasks are executed concurrently on a thread pool dimensioned by an input property of the source.

### Monitoring

#### Parsing reports

If the property `parsingReportEnabled` is set to true in the `IJdbcSourceBuilder`,
the `IJdbcSource#fetch` methods return an `IJdbcFetchingInfo` object for each topic, which contains various statistics about the fetched data,
such as which columns it contains, how many lines were published to the datastore and how long this process took.

#### Health Event Monitoring

The JDBC Source contributes to the [Health Event Monitoring](monitoring/health-dispatcher), under the tags *jdbc* and *source*.

The following snippet adds the basic implementation of the listener to the handler stack.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
HealthEventDispatcher.INSTANCE.addEventHandler(new LoggingJdbcHealthEventHandler());
```

This uses the default logger to report all JDBC operations. By default, there is no filtering on the received events.
