ActivePivot

ActivePivot

  • Developer Documentation
  • Versions
  • Help

›JDBC Source

Information⌃

  • What's new in ActivePivot
  • Changelog
  • Migration notes

CoPPer⌃

  • Introduction
  • The API
  • Flat and Multi-Dimensional Representations
  • Examples
  • Presentation Meta Data
  • Arithmetic Operations and Edge Cases
  • Advanced Topics

Copper 2⌃

  • Introduction
  • API
  • Measures
  • Hierarchies
  • Publication
  • Join operations

ActivePivot WebSocket API⌃

  • Operation description
  • Payload definition

JDBC Source⌃

  • Introduction
  • Source configuration

MDX language⌃

  • MDX Functions

ActivePivot Properties⌃

  • ActivePivot Properties

Introduction to JDBCSource

JDBCSource

A JDBCSource is a database-related implementation of the ISource interface, which is the generic framework used in ActivePivot for fetching data from an external source and put it in a ActivePivot Datastore.

The purpose of the JDBCSource is to load data from a database, through a JDBC driver.

A JBDCSource has several parameters :

  • Connection information

    • User Name/Password/Class of the JDBC Driver

    OR

    • Implementation of the IConnectionSupplier functional interface which wraps the connection information. The createConnection() method of the connectionSupplier will provide a java.sql.Connection and load JDBC drivers according to the implementation.
  • JDBC Topics

  • Processing properties

    • Size of the Tasks thread pool
    • Size of the queue containing rows retrieved from the SQL query
    • The max number of elements polled at once from the queue

A JDBCSource connects to a given Database by using the provided connection information.

Once the connection has been tested, data can be fetched from the Database by calling the fetch([...]) method :

void fetch(Map<IMessageChannel<String, T>, List<Object>> channelsAndParams)

A JDBCSource does not need a topic to be instantiated, however topics must be added to the Source before any query execution . This is done with theaddTopic([...]) method.

The fetch([...]) method will retrieve all the available data as defined by the topics exposed by the channels given in argument, that can either be SQL queries or a Java.sql.PreparedStatement, then fill the corresponding channels with the parsed results.

The provided implementations of IJDBCSource extend AJDBCSource and mostly differ by the way the ResultSet obtained by executing the Query is parsed :

  • ArrayJDBCSource parses the ResultSet as Object[]
  • MapJDBCSource parses the ResultSet as Map<String,Object>
  • NativeJDBCSource parses the ResultSet as QfsResultRow

Use Example

Create the ArrayJDBCSource source by using a IConnectionSupplier implementation which will attempt to use the H2 database Driver :

By not specifying parameters in the Source constructor, the default values for the processing properties are used. However, the source properties can be modified for a more fine-tuned configuration of the source (see here)

final IConnectionSupplier supplier = new SimpleConnectionSupplier(  url,
                                                                    username,
                                                                    pwd,
                                                                    "org.h2.Driver");

final IJDBCSource source = new ArrayJDBCSource( supplier,
                                                "Source");

Create the topic, here a preparedStatement, and add it to the source :

Default Topic parameters are implied here, but can also be modified in order to tune the performances of the source(see here)

final JDBCTopic topic =new JDBCTopic("Order_Topic",
                                     "SELECT DESK, BOOK, PNL FROM RECORDS WHERE DESK=?");
source.addTopic(topic);

A IMessageChannelFactory implementation creates the channels between the topics of the source and the stores of a datastore :

final JDBCMessageChannelFactory channelFactory = new JDBCMessageChannelFactory(source, datastore);

final IMessageChannel<String, QfsResultSetRow> channel = channelFactory.createChannel("Order_Topic",store.toString());

Specify the requested values in the topic, then execute the query and fill the channel :

 source.fetch(channel, Arrays.asList("Desk A"));

Fetching data

The fetch([...]) method performs most of the task of a JDBCSource, which is to execute queries on a database and put the parsed data in channels that will feed a Datastore.

This is performed in parallel :

For each entry in the input map ( eg. a MessageChannel-Parameters pair) , fetch([...]) will create a JDBCTask, those are executed concurrently on a Thread Pool dimensionned by an input property of the Source.

Diagram of fetch Here is a sequence diagram displaying the execution of the fetch([...]) method on multiple channels. JDBCTask and JDBCAppendRunnable are documented more thoroughly below.

JDBCTask

Each JDBCTask has to perform the following tasks :

  • Execute the query on the database.

  • Launch and keep track of concurrent JDBCAppendRunnable on a thread pool dimensionned by a property of the Topic related to the JDBCTask.

  • Create an appendQueue from which the runnables will poll.

  • Fetch the corresponding java.sql.ResultSet .

    In order to avoid network congestion, data from the ResultSet is recovered by fetchSize-sized pieces.

  • Iterate upon the fetchSize-sized piece of the ResultSet until the data was entirely recovered :

    • Parse the data as a collection of Record by using the JDBCSource::createRecord() implementation of the current source.
    • Feed it to the appendQueue.

When the ResultSet has been entirely processed, the JDBCAppendRunnable are notified and we wait for the termination of all the linked threads to proceed and terminate the JDBCTask

JDBCAppendRunnable

From its creation, and until it's been notified by the underlying JDBCTask that its appendQueue is empty and won't be filled anymore, each JDBCAppendRunnable will :

  • Attempt to drain the queue by batchSize-sized pieces into a List<Record>.
  • Push the Records from the List into the channel's IMessage by chunkSize-sized pieces.
← Payload definitionSource configuration →
ActivePivot
Docs
Getting StartedWikiAPI Reference
Community
Stack OverflowLinkedinTwitter
More
Blog
Copyright © 2019 ActiveViam