Aggregates Continuous Handlers
When a continuous query is registered in Atoti and a real-time event is detected that may impact the results of a query, the continuous query engine recomputes the measures where needed and forwards the updates to listeners.
Handlers are responsible for computing the impact of a real-time event on a given location.
To calculate such impact, the continuous query engine relies on the Aggregates Continuous Handlers:
com.activeviam.activepivot.core.intf.api.realtime.IAggregatesContinuousHandler
.
In practice:
- The (core) handler whose plugin key is
IAggregatesContinuousHandler.BASIC_HANDLER_PLUGIN_KEY
(STORED
), receives transaction events coming from theIStream.ACTIVEPIVOT_PLUGIN_KEY
. This is done for all measures that are only impacted by facts that contribute to the locations they are computed on. - Other handlers are usually custom implementations to support either of these particular types:
- a real-time event that is not related to facts contribution
(e.g. market data feed that impacts post-processed measures).
These types of handlers do not rely on the stream of the Atoti transactions (
IStream.ACTIVEPIVOT_PLUGIN_KEY
) but on a custom stream. - a measure that is impacted by facts that do not contribute to the location
it is computed on (for example a customized location shifting post-processor).
These types of handlers do rely on the stream of the Atoti transactions (
IStream.ACTIVEPIVOT_PLUGIN_KEY
) but implement their own logic for computing the impact.
- a real-time event that is not related to facts contribution
(e.g. market data feed that impacts post-processed measures).
These types of handlers do not rely on the stream of the Atoti transactions (
Handlers (and Streams) Registration
Handlers are configured on a per-measure (cube measure) basis, by using the handler plugin key.
By default, all measures (including post-processed measures, if no explicit handler configuration has been done)
register the IAggregatesContinuousHandler.BASIC_HANDLER_PLUGIN_KEY
plugin key.
Note that sensitivity to custom real-time events is only relevant for post-processed measures, because it is within the post-processing logic that you make use of data not provided via the traditional facts contribution mechanism. A given post-processed measure can be sensitive to multiple types of real-time events (generated by multiple streams) and hence define multiple handlers for registration.
Here is an example of a custom handler defined on a post-processed measure, using the cube builder:
final IActivePivotInstanceDescription cube =
StartBuilding.cube("myCube")
.withPostProcessor("myCustomMeasure")
.withPluginKey("MY_CUSTOM_PP_KEY")
.withContinuousQueryHandlers(
IAggregatesContinuousHandler.BASIC_HANDLER_PLUGIN_KEY, "MY_CUSTOM_HANDLER_KEY")
.withDimensions(dimensionAdder) // whatever you need to describe your cube ...
.build();
Another way to manage a measure's continuous query handlers is to use the Copper experimental API.
Do not forget the STORED handler when defining custom handlers! When explicitly configuring handlers for a given measure on a stream other than
IStream.ACTIVEPIVOT_PLUGIN_KEY
, remember to add the default handler (plugin key = STORED) that is responsible for the measure sensitivity to real-time events for facts contribution. This handler is not added by default on top of the custom handlers (it is only the default when there is no explicit configuration).
Sensitivity of the handler to events coming from a particular stream is something that is defined intrinsically
by the handler implementation, as it must implement IAggregatesContinuousHandler.getStreamKey()
.
A measure is not allowed to define several handlers impacted by the same stream. If a measure is impacted in several ways (for example must be recomputed if the primitive measure for its location changes but also if the primitive measure for a shifted location to the previous day changes), then a custom handler implementation has to take into account the combined logic.
A Concrete Example
In the following example, a query is submitted that requires three measures to run over a set of cells:
- contributors.COUNT
- Delta.SUM
- PnL.SUM
contributors.COUNT and Delta.SUM are aggregated non-post-processed measures, whereas PnL.SUM is a post-processed measure that depends on Delta.SUM and real-time market data.
Because we know that at least one measure depends on real-time market data, we implement a specific handler which listens to market data stream events and computes which cube cells are impacted as those events occur.
Let's say that this custom handler's plugin key is MARKETDATA
.
Then, we have the following handler dependencies for our measures:
Measure | Continuous Handler Keys |
---|---|
contributors.COUNT | STORED |
Delta.SUM | STORED |
PnL.SUM | STORED, MARKETDATA |
When a continuous query is created, it is bound to the two required handlers (STORED and MARKETDATA) as follows:
Continuous Handler | Sensitive To | Handled Aggregated Measures |
---|---|---|
STORED | Atoti's transactions | contributors.COUNT, Delta.SUM, PnL.SUM |
MARKETDATA | Market Data updates | PnL.SUM |
It implies that:
- Whenever a transaction is committed, the
STORED
handler computes impacted cells and pushes {"contributors.COUNT","Delta.SUM","PnL.SUM"} updated values to the parent continuous query's listeners. - Whenever Market Data are updated, the
MARKETDATA
handler computes impacted cells and pushes {"PnL.SUM"} updated values to the parent continuous query's listeners.
Continuous Handlers and Chains of Post-Processors
When post-processors are chained (one post-processor whose input is the output of another post-processor), Atoti query engine cannot guess any custom inheritance logic for the continuous query handlers.
So, always explicitly declare all the required handlers (when custom handlers are required) on all the post-processors they may impact.
To be clear: if post-processor P1 is attached to handler H1 and post-processor P2 declares P1 as its underlying measure, then P2 will not automatically inherit H1 registration.
Implementing a Custom Handler
There is only one handler instance per handler (plugin) key within a given cube instance, except for those that inherit from
IMeasureDependentAggregatesContinuousHandler
. This means that, if multiple post-processors need to use the same handler logic on different data (different hierarchies, data source name), be careful of how the handler is implemented. Typically, this means ensuring that it does not hold any state or cache anything beyond the execution scope / lifespan of each handler method. Then, when it really makes sense to re-use some common logic, you should rely on class inheritance: have a parent abstract class implement the common logic and keep specific handler implementations (with separate plugin keys!) for different types of data.
A continuous query handler must implement com.activeviam.activepivot.core.intf.api.realtime.IAggregatesContinuousHandler
.
We strongly recommend that any custom implementation inherits from the core product base abstract class
com.activeviam.activepivot.core.impl.api.realtime.AAggregatesContinuousHandler
.
Then, the main purpose of the handler implementation is to compute the impact of a real-time event on a (query) location.
For instance, if the event is a quote change for the IBM stock, then the [2013, EQUITY/IBM]
location will be impacted,
but the [2015, EQUITY/BNP]
or [2016, IRD/TBOND]
locations will not.
When implementing the computeImpact()
method, a new Impact
object must be returned.
Its constructor requires three parameters:
- The original location (i.e. the one passed as a parameter of
computeImpact())
). This corresponds to the location that is asked for in the query. - The collection of impacted locations. These are the sub-locations of the original location for which we need to recompute the measure because the value might have changed.
- The collection of removable points - point locations (not range locations with null coordinates) which might be "removed" as a result of the event concerned. Note that range locations with collection coordinates are allowed.
Note that continuous queries are updated incrementally. The incremental update is composed of the results of the queries that were made with the impacted locations, and the point locations that actually need to be removed (e.g. points that were in the collection of removable points and that do not appear in the results of the impacted locations computation). Since the impacted locations are just a sub-part of the original query, and the query engine does not "remember" the results of the query on the previous version, correctly computing the removable points is the only way to make values "disappear" from a continuous query.
If it is expected that the post-processors will return null on a location where it returned a non-null value in previous versions, then the corresponding point locations must be added as removable points, otherwise the change will not be seen.
Retrieve a collection of non-range locations from a collection of range locations
The easiest way to construct a collection of point locations from a collection of range locations is to use the
LocationUtil.expandAll()
function:LocationUtil.expandAll(pivot.getHierarchies(),myCollectionOfLocations)
.
This function basically iterates over the collection of locations specified and, if it appears that it is a range location, uses the hierarchy information provided to transform the range location into a collection of expanded locations.
In most Atoti projects this does not affect performance. However, for some projects a huge number of point locations may be generated, which results in large computations and a very high memory footprint.
To limit this, follow Atoti best practices and avoid hierarchies with very high cardinality.
As said before, the continuous handler also exposes the key of the event stream for which it can process events. Thanks to the extended plugin design, Atoti will bind event streams and continuous handler together at runtime, associating streams to respective handlers.
Finally, the continuous handler (itself an extended plugin value) must expose its type, so that post-processed measures can reference it.
Don't forget to register your new handler as a valid extended plugin value
Add an @AtotiExtendedPluginValue
annotation to your new continuous query handler.
Measure Dependent Handler
For most measures, the impact of new data on a continuous query depends only on the affected cell itself, so we can have a single instance of each type of handler.
There are however some measures which must know over which hierarchy they act in order to calculate their impact.
A typical example is Stream2PositionPostProcessor
, which calculates cumulative sums along an ordered hierarchy.
In this case, the value of a cell is impacted by all previous siblings of the chosen hierarchy,
which is implied by the measure.
Every handler that depends on its measure must implement the marker interface IMeasureDependentAggregatesContinuousHandler
.
These handlers usually store a reference to their measure or the corresponding post-processor.
In the core product, the only IMeasureDependentAggregatesContinuousHandler
is TimeLineHandler
(plugin key = TIMELINE
), which is used by Stream2PositionPostProcessor
.
If you have a measure that depends on another measure that has an IMeasureDependentAggregatesContinuousHandler
,
you must add the continuousQueryHandlerMeasure
property as explained
in the Javadoc for IMeasureDependentAggregatesContinuousHandler
.