Skip to main content

Continuous Query Engine

The Continuous Query Engine (CQE) is responsible for computing the updates for the registered continuous queries and notifying their listeners.

Continuous Query 101: Events, Streams, and Handlers

Events

Several types of real-time events may trigger continuous query updates.

For instance (non-exhaustive list):

  • A transaction that causes new facts to be contributed to the cube.
  • A new price quote that is sent by a market data feed.
  • A change to a configuration parameter in a formula.

Streams

In ActivePivot, in the context of continuous querying, a source of such real-time events is called a stream.

Here we consider streams in the context of continuous query engine activity occurring inside an ActivePivot server. Don't confuse this with the streams seen in the context of Streaming API (for clients-server interactions). In other words, com.quartetfs.biz.pivot.query.aggregates.IStream (the one we are talking about here) is not the same as com.quartetfs.tech.streaming.IStream (from Streaming API).

In practice:

  • TransactionStream implementation (core product) encapsulates the stream of ActivePivot's transactions (a.k.a. facts contribution).
  • Other streams are usually custom implementations to support a particular type of real-time event (e.g. market data feed that impacts post-processed measures).

Handlers

Once configured, a stream publishes real-time events into the ActivePivot continuous query engine - possibly impacting the measures that you want to analyse through a given cube. Now, not all real-time events can impact all the measures at all the locations. In other words, specific measures and/or locations in the cube might only be impacted by specific events.

To calculate such impact, the continuous query engine relies on the Aggregates Continuous Handlers: com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler.

More information on handlers and their configuration is provided in the Handlers section.

Continuous Query engine

The Continuous Query Engine can be viewed as an orchestrator; any time it receives an event, it launches the impact computation on the relevant handlers, based on the measures and locations of the registered continuous queries. When the impact is not null, it launches the recomputation of the impacted locations and then notifies the listeners.

ActivePivot offers two different implementations of CQE - "synchronous" and "conflating" - to best fit different performance and feature use cases.

The Synchronous CQE

The synchronous CQE computes the updates for each event (ActivePivot transaction, real-time event, etc.) sequentially.

When an event is received by the engine, it is either processed immediately if the engine is currently idle, or placed on hold until the processing of the current event is complete and then worked on.

This behavior has several implications:

  • A new transaction cannot be committed in the ActivePivot cube before the previous transaction's continuous query updates have been computed and the listeners notified.
  • A stream sending a real-time event to a cube will be blocked until all events previously sent to the engine have been processed and the newly sent event is also processed.

Impact on transactions

As mentioned above, in order to process the events sequentially, ActivePivot has to block a commit until the previous transaction's continuous query computations are complete.

Therefore, the continuous queries may slow down the insertion rate of an ActivePivot system. This happens if the continuous query computations last longer than the interval between two commits.

The continuous query computation time depends mainly on the time spent on three operations:

  • Computing the locations impacted by the event for each query.
  • Computing the measure values for the impacted locations.
  • Notifying the listeners.

Recommendation

The synchronous continuous query engine is the best choice if the listeners must be notified for each transaction and event.

To achieve the best possible performance and not slow down the cube's insertion rate, it should only be used if the continuous query computations do not last longer than the interval between two commits, at least most of the time.

Conflating CQE

To reduce the impact of the continuous query updates, the conflating CQE conflates events of the same type whenever possible.

Upon receiving an event (e.g. a transaction), the conflating engine either:

  • Immediately processes the event if it is currently idle.
  • Adds the event to a queue to be processed later. When the current processing task is finished, all the events in the queue are conflated and processed as one large event.

Impact on transactions

In contrast to the synchronous CQE, the conflating CQE does not need to block commits since they are put into a queue to be conflated and processed whenever possible.

Therefore, the insertion rate of the ActivePivot system is not affected.

On the other hand, since multiple events can be conflated, a single notification may account for multiple events. Thus listeners will not be able to distinguish the update of each of the conflated events independently.

For instance, let's assume that a very slow continuous query is registered to monitor a measure per currency and the system receives the following flow of transactions:

  1. At 1:00:00, an update in EUR moves the euro PnL to 30
  2. At 1:00:20, an update in USD moves the dollar PnL to 10
  3. At 1:00:40, an update in EUR moves the euro PnL to 40
  4. At 1:01:00, an update in USD moves the dollar PnL to 0

Let's further assume the continuous query engine takes one minute to compute aggregated values and notify listeners for the registered query.

  • The processing of the first update starts immediately at 1:00:00 since the engine is currently idle.
  • It will take one minute to process the update and notify the listeners, which will receive the new PnL at 1:01:00.
  • The second, third and fourth updates will be added to the queue mentioned previously and wait until the first update has been processed. \ At 1:01:00, the engine has completed processing the first event and will conflate, then process, the updates currently in the queue.

Therefore the listeners will receive the following sequence of events:

  1. At 1:01:00, the first transaction has moved the euro PnL to 30
  2. At 1:02:00, transactions 2, 3 and 4 have been conflated and moved the euro PnL to 40 and the dollar PnL to 0.

Recommendation

The conflating CQE is the best choice to reduce the impact of heavy continuous queries on the system. It avoids slowing down the insertion rate of the ActivePivot system.

However, since multiple events can be conflated, the listeners may not receive updates for all events. They will never see incorrect results, but they may miss intermediary values in order to catch up with the real-time feed.

Configuration

You can select the CQE implementation in the cube definition as follows:

final IActivePivotInstanceDescription cube = StartBuilding.cube("MyCube")
.withMeasures(mesuresAdder)
.withDimensions(dimensionAdder)
// start continuous query engine configuration
.withAggregatesContinuousQueryEngine()
.withKey(ConflatingAggregatesContinuousQueryEngine.PLUGIN_KEY)
.withNoProperty()
// end continuous query engine configuration
.build();

This code block defines an ActivePivot cube "MyCube" relying on the conflating CQE. The plugin keys are SYNC for the synchronous CQE and CONFLATE for the conflating CQE.

By default, if you do not explicitly choose the CQE implementation when configuring a cube, the "synchronous" CQE will be used.\ You can always refer to IAggregatesContinuousQueryEngine.DEFAULT_PLUGIN_KEY to double-check what the actual default is for a given version of ActivePivot.

Configuration Specific to the Conflation Engine

The following properties can be used to configure the Conflation engine.

qfs.conflation.maxQueueSize Sets the maximum queue size for transaction events in the queue. This defaults to -1, meaning there is no maximum queue size.

qfs.conflation.maxDelayTime Sets the maximum time (in milliseconds) that the oldest transaction of the master branch waits in the queue before we stop accepting transactions. This defaults to -1, meaning there is no maximum.

The maximum amount of time (in milliseconds) that a non-transactional event can wait before being forced to be processed can be tweaked using the Conflation Continuous Query Engine's MBean : Conflation setMaxEventWaitInterval MBean This defaults to 60 000 (that is, a maximum of one minute before a non-transactional event is processed).