Distributed Post-Processors
A post-processor is a function that computes a measure at query time, based on other measures in the cube. Thus, retrieving a post-processed measure involves retrieving the measures upon which it is based, potentially cascading through a dependency chain, until ActivePivot finds primitive measures at the given location.
Prefetchers
are responsible for computing underlying dependencies, and are declared within
each post-processor.
A major feature of a distributed ActivePivot cluster is the fact that post-processors can have distributed prefetchers. A post-processor can ask for any underlying measure, be it pre-aggregated or post-processed, to be evaluated on a given set of locations spreading across all of the remote ActivePivot cubes.
This allows the user to pre-compute some aggregates on the local instances before sending them to the query cube to finish the calculation. This way, the computation can benefit from each local instance's aggregates cache. Each data node stores the results of the most popular queries, so that the pre-computed results are already available the next time similar queries hit the node.
For efficient query planning, post-processors declare a list of prefetchers. This often allows ActivePivot to retrieve all underlying measures in a single pass before evaluating the post- processors. In a distributed environment, this is of paramount importance, as a single pass also means a single round trip on the network. Do not ignore the prefetchers' warnings while working with a distributed cluster, as bad prefetching or lack of prefetching can lead to incorrect results.
Post-Processor Definition in the Query Node
Advanced calculations can be done with post-processors declared directly in the query cube. They can rely on measures coming from data nodes that do not share a topology.
In the following example, we consider one query node linked to two data nodes on a supply chain use case.
One data node contains all the inbound flux, the other data node contains all the outbound flux. To calculate the current stock for a product in a warehouse, the query cube needs to get all the inbound flux entries for that specific product/warehouse location, and needs to subtract all the outbound flux entries.
StartBuilding.cube("QueryCube")
.withUpdateTimestamp()
.withinFolder("Native_measures")
.withAlias("Distributed_Timestamp")
.withFormatter("DATE[HH:mm:ss]")
.withPostProcessor("CURRENT_STOCK")
// CURRENT_STOCK = incoming - leaving
.withPluginKey("DiffPostProcessor")
.withUnderlyingMeasures("incoming_flux", "leaving_flux")
.asQueryCube()
.withClusterDefinition()
.withClusterId("SupplyChainCubeCluster")
.withMessengerDefinition()
.withProtocolPath("jgroups-protocols/protocol-tcp.xml")
.end()
.withApplication("supply-chain")
.withDistributingFields("FluxType")
.end()
.build();
Implicit Distribution of a Post-Processor
A post-processor is considered to be implicitly distributed if it implements
IPartitionedPostProcessor
and if it exposes at least one of the distributing fields as a
partitioning level.
In such cases, the post-processor and all its underlying measures are executed on each
involved cube locally, and the intermediate results are aggregated into the final result using the
IPartitionedPostProcessor#reduce
method.
Auto-Distributed Post-Processors
In a distributed environment, the data cube has two additional properties:
executePostProcessorsInDataCube
can be set with theDataClusterDefinition
builder. If set to true, all post-processors in the given data cube are evaluated locally. It amounts to wrapping all the post-processors inDistributedPostProcessors
, as seen in the earlier example. The default value isfalse
with AP 5.5.9+, andtrue
with AP 5.6+.executeInDataCube
can be set as a single post-processor property, to allow for more granularity.
We recommend using both these properties as much as possible, to reduce network traffic and speed up queries.
Specific cases where it can be useful:
- A post-processor that needs to be specifically initialized in the data cube.
- A post-processor whose code cannot be shared with the query cube.
- A post-processor that requires datastore querying for computation.
- A post-processor that is computed in a MapReduce style, using the SUM aggregation function to sum up the values and get the final result.
Specific cases where not to use this property:
- A post-processor that cannot be computed in a MapReduce style without inducing erroneous results.
In this case, advanced ActivePivot users can implement
IDistributedPostProcessor
to force local execution and decide how to finish the computation. This incurs data transfers between the data nodes and the query node to perform the final aggregation. - The user is already using the wrapping technique demonstrated above.
Example: The ForexPostProcessor
Forex stands for Foreign Exchange Market. It is the currency market. PnL stands for Profits and Losses, and is one of the financial statements companies issue to demonstrate revenues and expenses during a set period of time.
Let's imagine a PnL application distributed along the field Currency, with one query cube and three data nodes, that contains:
- USD
- EUR and JPY
- GBP
To compute its post-processed measure, the ForexPostProcessor
requests the value of the pnl.SUM
measure for each currency on the given location. It then converts the values from each currency into
a reference currency, for example, USD, sums them up and returns the result.
If a user requests the top level location with the Grand Totals, we could ask each remote instance
for its total pnl.SUM
per currency, bring all this data back to the query cube, apply the currency
conversion, and sum the converted totals before returning the final result to the user.
This is however quite inefficient, as most of the computation, such as currency conversions, could be done in the remote instances. Of course, this becomes more interesting with more expensive computations.
Setting the property executeInDataCube
is equivalent to the following process,
which takes advantage of having several nodes:
Create an intermediate DistributedPostProcessor
that requests the value of the
ForexPostProcessor
for the requested location on each remote cube, and use the
DistributedPostProcessor
's aggregation function (SUM) to sum up the values and compute the final
result. This way:
- most of the computation is spread on multiple servers/more CPUs,
- less data is traveling through the network.
The following example is no longer the recommended way to do this. The properties detailed above automatically provide this distribution without requiring additional work from the developer. However, the following code helps to understand what has been done to simplify the user's work.
StartBuilding.cube("PnL")
.withMeasures(builder -> builder
.withAggregatedMeasure().sum("PnL.SUM")
.withPostProcessor("PnL.Forex")
.withPluginKey("FOREX")
.withUnderlyingMeasures("PnL.SUM")
.withContinuousQueryHandlers("STORED", "FOREX")
.withProperty("leafLevels", "Currency@Currency@Currency")
.withPostProcessor("PnL.DistributedForex")
.withPluginKey(DISTRIBUTED_POST_PROCESSOR_PLUGIN_KEY)
.withUnderlyingMeasures("PnL.Forex")
.withContinuousQueryHandlers("STORED", "FOREX"))
.asQueryCube()
Chained Distributed Post-Processors
A distributed post-processor at the top of a chain of post-processors allows the entire chain to be processed locally.
Once intermediate results have been received by the query cube, the computation always takes place in the query cube.
Therefore, the best practice is to use distributed post-processors for the most computation intensive measures, and if possible, at the top of the chain.
In a distributed cluster, if some granted measures are declared, then all distributed measures must be declared as granted for visibility purposes.