Continuous Query Engine
The Continuous Query Engine (CQE) is responsible for computing the updates for the registered continuous queries and notifying their listeners.
Continuous Query 101: Events, Streams, and Handlers
Events
Several types of real-time events may trigger continuous query updates.
For instance (non-exhaustive list):
- A transaction that causes new facts to be contributed to the cube.
- A new price quote that is sent by a market data feed.
- A change to a configuration parameter in a formula.
Streams
In Atoti, in the context of continuous querying, a source of such real-time events is called a stream.
Here we consider streams in the context of continuous query engine activity occurring inside an Atoti Server. Don't confuse this with the streams seen in the context of Streaming API (for clients-server interactions). In other words,
com.activeviam.activepivot.core.intf.api.realtime.IStream
(the one we are talking about here) is not the same ascom.quartetfs.tech.streaming.IStream
(from Streaming API).
In practice:
TransactionStream
implementation (core product) encapsulates the stream of Atoti's transactions (a.k.a. facts contribution).- Other streams are usually custom implementations to support a particular type of real-time event (e.g. market data feed that impacts post-processed measures).
Handlers
Once configured, a stream publishes real-time events into the Atoti continuous query engine - possibly impacting the measures that you want to analyze through a given cube. Now, not all real-time events can impact all the measures at all the locations. In other words, specific measures and/or locations in the cube might only be impacted by specific events.
To calculate such impact, the continuous query engine relies on the Aggregates Continuous Handlers:
com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler
.
More information on handlers and their configuration is provided in the Handlers section.
Continuous Query engine
The Continuous Query Engine can be viewed as an orchestrator; any time it receives an event, it launches the impact computation on the relevant handlers, based on the measures and locations of the registered continuous queries. When the impact is not null, it launches the recomputation of the impacted locations and then notifies the listeners.
Atoti offers two different implementations of CQE - "synchronous" and "conflating" - to best fit different performance and feature use cases.
The Synchronous CQE
The synchronous CQE computes the updates for each event (Atoti transaction, real-time event, etc.) sequentially.
When an event is received by the engine, it is either processed immediately if the engine is currently idle, or placed on hold until the processing of the current event is complete and then worked on.
This behavior has several implications:
- A new transaction cannot be committed in the Atoti cube before the previous transaction's continuous query updates have been computed and the listeners notified.
- A stream sending a real-time event to a cube will be blocked until all events previously sent to the engine have been processed and the newly sent event is also processed.
Impact on transactions
As mentioned above, in order to process the events sequentially, Atoti has to block a commit until the previous transaction's continuous query computations are complete.
Therefore, the continuous queries may slow down the insertion rate of an Atoti system. This happens if the continuous query computations last longer than the interval between two commits.
The continuous query computation time depends mainly on the time spent on three operations:
- Computing the locations impacted by the event for each query.
- Computing the measure values for the impacted locations.
- Notifying the listeners.
Recommendation
The synchronous continuous query engine is the best choice if the listeners must be notified for each transaction and event.
To achieve the best possible performance and not slow down the cube's insertion rate, it should only be used if the continuous query computations do not last longer than the interval between two commits, at least most of the time.
Conflating CQE
To reduce the impact of the continuous query updates, the conflating CQE conflates events of the same type whenever possible.
Upon receiving an event (e.g. a transaction), the conflating engine either:
- Immediately processes the event if it is currently idle.
- Adds the event to a queue to be processed later. When the current processing task is finished, all the events in the queue are conflated and processed as one large event.
Impact on transactions
In contrast to the synchronous CQE, the conflating CQE does not need to block commits since they are put into a queue to be conflated and processed whenever possible.
Therefore, the insertion rate of the Atoti system is not affected.
On the other hand, since multiple events can be conflated, a single notification may account for multiple events. Thus, listeners will not be able to distinguish the update of each of the conflated events independently.
For instance, let's assume that a very slow continuous query is registered to monitor a measure per currency and the system receives the following flow of transactions:
- At 1:00:00, an update in
EUR
moves the euroPnL
to 30 - At 1:00:20, an update in
USD
moves the dollarPnL
to 10 - At 1:00:40, an update in
EUR
moves the euroPnL
to 40 - At 1:01:00, an update in
USD
moves the dollarPnL
to 0
Let's further assume the continuous query engine takes one minute to compute aggregated values and notify listeners for the registered query.
- The processing of the first update starts immediately at 1:00:00 since the engine is currently idle.
- It will take one minute to process the update and notify the listeners, which will receive the new
PnL
at 1:01:00. - The second, third and fourth updates will be added to the queue mentioned previously and wait until the first update has been processed. \ At 1:01:00, the engine has completed processing the first event and will conflate, then process, the updates currently in the queue.
Therefore the listeners will receive the following sequence of events:
- At 1:01:00, the first transaction has moved the euro
PnL
to 30 - At 1:02:00, transactions 2, 3 and 4 have been conflated and moved
the euro
PnL
to 40 and the dollarPnL
to 0.
Recommendation
The conflating CQE is the best choice to reduce the impact of heavy continuous queries on the system. It avoids slowing down the insertion rate of the Atoti Server.
However, since multiple events can be conflated, the listeners may not receive updates for all events. They will never see incorrect results, but they may miss intermediary values in order to catch up with the real-time feed.
Configuration
You can select the CQE implementation in the cube definition as follows:
final IActivePivotInstanceDescription cube =
StartBuilding.cube("MyCube")
.withMeasures(mesuresAdder)
.withDimensions(dimensionAdder)
// start continuous query engine configuration
.withAggregatesContinuousQueryEngine()
.withKey(ConflatingAggregatesContinuousQueryEngine.PLUGIN_KEY)
.withNoProperty()
// end continuous query engine configuration
.build();
This code block defines an Atoti cube "MyCube" relying on the conflating CQE.
The plugin keys are SYNC
for the synchronous CQE and CONFLATE
for the conflating CQE.
By default, if you do not explicitly choose the CQE implementation when configuring a cube, the "synchronous" CQE will be used.\ You can always refer to
IAggregatesContinuousQueryEngine.DEFAULT_PLUGIN_KEY
to double-check what the actual default is for a given version of Atoti Server.
Configuration Specific to the Conflation Engine
The following properties can be used to configure the Conflation engine.
activeviam.conflation.maxQueueSize
Sets the maximum queue size for transaction events in the queue.
This defaults to -1, meaning there is no maximum queue size.
activeviam.conflation.maxDelayTime
Sets the maximum time (in milliseconds) that the oldest transaction of the master branch waits
in the queue before we stop accepting transactions.
This defaults to -1, meaning there is no maximum.
The maximum amount of time (in milliseconds) that a non-transactional event can wait before being forced to be processed can be tweaked using the Conflation Continuous Query Engine's MBean : This defaults to 60 000 (that is, a maximum of one minute before a non-transactional event is processed).