> ## Documentation Index
> Fetch the complete documentation index at: https://docs.activeviam.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Aggregates Continuous Handlers

When a [continuous query](continuous_queries_overview) is registered in Atoti and a real-time event is detected
that may impact the results of a query, the continuous query engine recomputes the measures where needed
and forwards the updates to listeners.

Handlers are responsible for computing the impact of real-time events on given locations.
Those events are published by streams in response to certain actions or events.
Handlers and streams are respectively represented by the `IAggregatesContinuousHandler` and `IStream` interfaces.

Atoti provides the following handlers:

* `STORED` (`IAggregatesContinuousHandler.BASIC_HANDLER_PLUGIN_KEY`): when a measure uses this handler as one of its continuous handlers, any continuous query relying on this measure will listen to the stream of pivot transactions (see `IStream.ACTIVEPIVOT_PLUGIN_KEY`).
  On each pivot transaction, this handler will determine whether the facts in the transaction have an impact on the continuous query's location by finding its affected sub-parts.
* `FULL_REFRESH` (`IAggregatesContinuousHandler.FORCE_FULL_REFRESH_PLUGIN_KEY`): similar to `STORED`, this handler also listens to the stream of pivot transactions but instead triggers a full refresh of the query results on each pivot transaction.
  This generally provides worse performance as the entire query needs to be recomputed, but it can be more efficient in certain scenarios if the impact is costly to compute.
* `UPDATE_ANALYSIS_HIERARCHY` (`IAggregatesContinuousHandler.MULTI_ANALYSIS_HIERARCHY_MEASURE_HANDLER`): similar to `STORED` with the difference that this handler first computes the impact on the cleansed locations, and then expands the result on analysis hierarchies if it has not overflowed.

Other handlers are usually custom implementations to support either of these particular types:

* a real-time event that is not related to facts contribution
  (e.g. market data feed that impacts post-processed measures).
  These types of handlers do not rely on the stream of the Atoti transactions (`IStream.ACTIVEPIVOT_PLUGIN_KEY`) but on a custom stream.
* a measure that is impacted by facts that do not contribute to the location
  it is computed on (for example a customized location shifting post-processor).
  These types of handlers do rely on the stream of the Atoti transactions (`IStream.ACTIVEPIVOT_PLUGIN_KEY`) but implement their own logic for computing the impact.

## Handlers and streams registration

Handlers are configured on a per-measure (cube measure) basis, by using the handler *plugin key*.

By default, all measures (including post-processed measures, if no explicit handler configuration has been done)
register the `IAggregatesContinuousHandler.BASIC_HANDLER_PLUGIN_KEY` plugin key.

Note that sensitivity to custom real-time events is only relevant for post-processed measures,
because it is within the post-processing logic that you make use of data not provided
via the traditional facts contribution mechanism.
A given post-processed measure can be sensitive to multiple types of real-time events (generated by multiple streams)
and hence define multiple handlers for registration.

Here is an example of a custom handler defined on a post-processed measure, using the cube builder:

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
final IActivePivotInstanceDescription cube =
    StartBuilding.cube("myCube")
        .withPostProcessor("myCustomMeasure")
        .withPluginKey("MY_CUSTOM_PP_KEY")
        .withContinuousQueryHandlers(
            IAggregatesContinuousHandler.BASIC_HANDLER_PLUGIN_KEY, "MY_CUSTOM_HANDLER_KEY")
        .withDimensions(dimensionAdder) // whatever you need to describe your cube ...
        .build();
```

Another way to manage a measure's continuous query handlers is to use [the Copper experimental API](./../copper/copper_measures#continuous-query-handlers-experimental).

> Do not forget the STORED handler when defining custom handlers!
> When explicitly configuring handlers for a given measure on a stream other than `IStream.ACTIVEPIVOT_PLUGIN_KEY`, remember to add the default handler
> (plugin key = STORED) that is responsible for the measure sensitivity to real-time events for facts contribution.
> This handler is not added by default on top of the custom handlers
> (it is only the default when there is no explicit configuration).

Sensitivity of the handler to events coming from a particular stream is something that is defined intrinsically
by the handler implementation, as it must implement `IAggregatesContinuousHandler.getStreamKey()`.

> A measure is not allowed to define several handlers impacted by the same stream.
> If a measure is impacted in several ways (for example must be recomputed if the primitive measure for its location
> changes but also if the primitive measure for a shifted location to the previous day changes), then a custom handler
> implementation has to take into account the combined logic.

## A concrete example

In the following example, a query is submitted that requires three measures to run over a set of cells:

* contributors.COUNT
* Delta.SUM
* PnL.SUM

contributors.COUNT and Delta.SUM are aggregated non-post-processed measures,
whereas PnL.SUM is a post-processed measure that depends on Delta.SUM and real-time market data.

Because we know that at least one measure depends on real-time market data,
we implement a specific handler which listens to market data stream events and computes
which cube cells are impacted as those events occur.

Let's say that this custom handler's plugin key is `MARKETDATA`.
Then, we have the following handler dependencies for our measures:

| Measure            | Continuous Handler Keys |
| ------------------ | ----------------------- |
| contributors.COUNT | STORED                  |
| Delta.SUM          | STORED                  |
| PnL.SUM            | STORED, MARKETDATA      |

When a continuous query is created, it is bound to the two required handlers (STORED and MARKETDATA)
as follows:

| Continuous Handler | Sensitive To         | Handled Aggregated Measures            |
| ------------------ | -------------------- | -------------------------------------- |
| STORED             | Atoti's transactions | contributors.COUNT, Delta.SUM, PnL.SUM |
| MARKETDATA         | Market Data updates  | PnL.SUM                                |

It implies that:

* Whenever a transaction is committed, the `STORED` handler computes impacted cells and pushes
  {"contributors.COUNT","Delta.SUM","PnL.SUM"} updated values to the parent continuous query's listeners.
* Whenever Market Data are updated, the `MARKETDATA` handler computes impacted cells and pushes {"PnL.SUM"}
  updated values to the parent continuous query's listeners.

## Continuous handlers and chains of post-processors

When post-processors are chained (one post-processor whose input is the output of another post-processor),
Atoti query engine cannot guess any custom inheritance logic for the continuous query handlers.

So, always explicitly declare all the required handlers (when custom handlers are required)
on all the post-processors they may impact.

To be clear: if post-processor P1 is attached to handler H1 and post-processor P2 declares P1 as its underlying measure,
then P2 will **not** automatically inherit H1 registration.

## Implementing a custom handler

Continuous query handlers rely on streams to produce events at the appropriate time.
A handler may use either a stream provided by Atoti Server (see `IStream`) or a custom stream implementation.

### Implementing a custom stream

The main responsibility of streams is to produce events and notify their listeners.

Custom streams should generally be created by extending the `AStream` class.
This class handles registration and notification of listeners such that subclasses only need to create new events and call the protected `sendEvent` method when needed.

Alternatively, custom streams can be defined by implementing the `IStream` interface from scratch.

Due to the way streams are instantiated, they must expose a public constructor accepting a single parameter of type `IMultiVersionActivePivot`.
They should also be annotated with the `@AtotiExtendedPluginValue` annotation for Atoti Server to detect them.

Below is an example of a stream implementation for market data.
It assumes we have access to a `MarketDataProvider` object that will handle market data itself and that can notify its listeners of updates using the `MarketDataEventListener.handleMarketDataEvent` method.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
@AtotiExtendedPluginValue(key = MarketDataStream.PLUGIN_KEY, intf = IStream.class)
public class MarketDataStream extends AStream<MarketDataEvent>
    implements MarketDataEventListener {
  public static final String PLUGIN_KEY = "MARKETDATA";
  /** Object handling market data that notifies this stream of updates. */
  private final MarketDataProvider marketDataProvider = getMarketDataProvider();
  public MarketDataStream(final IMultiVersionActivePivot pivot) {
    super(pivot);
  }
  @Override
  public String getType() {
    return PLUGIN_KEY;
  }
  @Override
  public Class<MarketDataEvent> getEventType() {
    return MarketDataEvent.class;
  }
  @Override
  public void enable() {
    super.enable();
    this.marketDataProvider.addListener(this);
  }
  @Override
  public void disable() {
    super.disable();
    this.marketDataProvider.removeListener(this);
  }
  @Override // Method inherited from MarketDataEventListener and called by MarketDataProvider
  public void handleMarketDataEvent(final MarketDataEvent event) {
    sendEvent(event, null);
  }
}
```

### Implementing the handler itself

> There is only one handler instance per handler (plugin) key within a given cube instance,
> except for those that inherit from `IMeasureDependentAggregatesContinuousHandler`.
> If multiple post-processors use the same handler plugin key, they will use the same handler instance.
> Beware of side effects if the different post-processors do not have the same data (for example, different hierarchies, data source name).
> Typically, this means ensuring that it does not hold any state or cache anything beyond the execution scope / lifespan
> of each handler method.
> Then, when it really makes sense to re-use some common logic, you should rely on class inheritance:
> have a parent abstract class implement the common logic and keep specific handler implementations
> (with separate plugin keys!) for different types of data.

A continuous query handler must implement `IAggregatesContinuousHandler`.
We strongly recommend that any custom implementation inherits from the core product base abstract class
`AAggregatesContinuousHandler`.

For Atoti Server to detect your new continuous query handler, it should be annotated with the `@AtotiExtendedPluginValue` annotation.

Then, the main purpose of the handler implementation is to compute the impact of a real-time event on a (query) location.

For instance, if the event is a quote change for the IBM stock, then the `[2013, EQUITY/IBM]` location will be impacted,
but the `[2015, EQUITY/BNP]` or `[2016, IRD/TBOND]` locations will not.

When implementing the `computeImpact()` method, a new `Impact` object must be returned.
Its constructor requires three parameters:

1. The original location (i.e. the one passed as a parameter of `computeImpact())`).
   This corresponds to the location that is asked for in the query.
2. The collection of impacted locations.
   These are the sub-locations of the original location for which we need to recompute the measure
   because the value might have changed.
3. The collection of removable points - **point** locations (not range locations with null coordinates)
   which might be "removed" as a result of the event concerned.
   Note that range locations with collection coordinates are allowed.

Note that continuous queries are updated incrementally.
The incremental update is composed of the results of the queries that were made with the impacted locations,
and the point locations that actually need to be removed (e.g. points that were in the collection of removable points
and that do not appear in the results of the impacted locations computation).
Since the impacted locations are just a sub-part of the original query, and the query engine does not "remember"
the results of the query on the previous version, correctly computing the removable points is the only way to make
values "disappear" from a continuous query.

If it is expected that the post-processors will return null on a location where it returned a non-null value in previous
versions, then the corresponding point locations must be added as removable points, otherwise the change will not be seen.

> **Retrieve a collection of non-range locations from a range location**
>
> The easiest way to iterate over point locations covered by a range location is to use
> the `LocationUtil.expandRangeLevels()` function.

This function basically iterates over the collection of locations specified and,
if it appears that it is a range location, uses the hierarchy information provided to transform the
range location into a collection of expanded locations.

In most Atoti projects this does not affect performance.
However, for some projects a huge number of point locations may be generated,
which results in large computations and a very high memory footprint.

To limit this, follow Atoti best practices and avoid hierarchies with very high cardinality.

As said before, the continuous handler also exposes the key of the event stream for which it can process events.
Thanks to the extended plugin design, Atoti will bind event streams and continuous handler together at runtime,
associating streams to respective handlers.

Finally, the continuous handler (itself an extended plugin value) must expose its type,
so that post-processed measures can reference it.

## Measure dependent handler

For most measures, the impact of new data on a continuous query depends only on the affected cell itself,
so we can have a single instance of each type of handler.

There are however some measures which must know over which hierarchy they act in order to calculate their impact.
A typical example is `Stream2PositionPostProcessor`, which calculates cumulative sums along an ordered hierarchy.
In this case, the value of a cell is impacted by all previous siblings of the chosen hierarchy,
which is implied by the measure.

Every handler that depends on its measure must implement the marker interface `IMeasureDependentAggregatesContinuousHandler`.
These handlers usually store a reference to their measure or the corresponding post-processor.

In the core product, the only `IMeasureDependentAggregatesContinuousHandler` is `TimeLineHandler`
(plugin key = `TIMELINE`), which is used by `Stream2PositionPostProcessor`.

If you have a measure that depends on another measure that has an `IMeasureDependentAggregatesContinuousHandler`,
you must add the `continuousQueryHandlerMeasure` property as explained
in the Javadoc for `IMeasureDependentAggregatesContinuousHandler`.

## Configuration

The continuous query handlers used by a post-processor can be specified by setting the
`IPostProcessor#CONTINUOUS_QUERY_HANDLER_KEYS` property, or by using `withContinuousQueryHandlers` if
using [Copper post-processors](../copper/copper_measures#post-processor).

In specific cases, computing the impact of a transaction is too costly and it is more efficient to refresh the entire query results.
In those scenarios, the `IAggregatesContinuousHandler#FORCE_FULL_REFRESH_ENGINE_PROPERTY` property can be set as shown
below to force all measures to behave as if they used the `FULL_REFRESH` handler.
Using this only affects handlers based on the `IStream.ACTIVEPIVOT_PLUGIN_KEY` stream, any other handlers are left
untouched.

```java theme={"languages":{"custom":["/engine/python-sdk/0.9/languages/pycon.tmLanguage.json"]}}
.withAggregatesContinuousQueryEngine()
.withKey(ContinuousQueryEngineConstants.SYNC_PLUGIN_KEY)
.withProperty(
    IAggregatesContinuousHandler.FORCE_FULL_REFRESH_ENGINE_PROPERTY, "true")
```
