Distributed Post-Processors
A post-processor is a function that computes a measure at query time based on other measures in the cube. Thus, retrieving a post-processed measure involves retrieving the measures upon which it is based, potentially cascading through a dependency chain until ActivePivot finds primitive measures at the given location.
Prefetchers
are responsible for computing underlying dependencies, and are declared within
each post-processor.
A major feature of a distributed ActivePivot cluster is the fact that post-processors can have distributed prefetchers. A post-processor can ask for any underlying measure, be it pre-aggregated or post-processed, to be evaluated on a given set of locations spreading across all the remote ActivePivot cubes.
This allows the user to pre-compute some aggregates on the local instances before sending them to the query cube to finish the calculation. This way, the computation can benefit from each local instance's aggregates cache. Each data cube stores the results of the most popular queries, so that the pre-computed results are already available the next time similar queries hit the node.
For efficient query planning, post-processors declare a list of prefetchers. This often allows ActivePivot to retrieve all underlying measures in a single pass before evaluating the post- processors. In a distributed environment, this is of paramount importance, as a single pass also means a single round trip on the network. Do not ignore the prefetchers' warnings while working with a distributed cluster, as bad prefetching or lack of prefetching can lead to incorrect results.
Post-processors can be defined on both query cubes and data cubes. In what follows, we explain the different post-processor settings when using distribution.
Post-Processor Definition in the Query Cube
Declaring post-processors directly in the query cube allows performing advanced calculations on measures coming from data cubes of different applications that do not share the same topology.
In the following example, we consider one query cube linked to two data cubes from two different applications of a supply chain use case.
One data cube contains all the inbound flux, the other data cube contains all the outbound flux. To calculate the current stock for a product in a warehouse, the query cube needs to get all the inbound flux entries for that specific product/warehouse location, and needs to subtract all the outbound flux entries.
StartBuilding.cube("QueryCube")
.withUpdateTimestamp()
.withinFolder("Native_measures")
.withAlias("Distributed_Timestamp")
.withFormatter("DATE[HH:mm:ss]")
.withPostProcessor("CURRENT_STOCK")
// CURRENT_STOCK = incoming - leaving
.withPluginKey("DiffPostProcessor")
.withUnderlyingMeasures("incoming_flux", "leaving_flux")
.asQueryCube()
.withClusterDefinition()
.withClusterId("SupplyChainCubeCluster")
.withMessengerDefinition()
.withProtocolPath("jgroups-protocols/protocol-tcp.xml")
.end()
.withApplication("app-incoming-flux")
.withoutDistributingFields()
.withApplication("app-leaving-flux")
.withoutDistributingFields()
.end()
.build();
Post-Processor Definition in the Data Cube
In order to leverage the remote data cube instances, ActivePivot relies on DistributedPostProcessor
,
whose computations are distributed on each data cube, and completed on the query cube where all the partial results are reduced
using a given aggregation function (default is SUM). There are three approaches to make use of such post-processors. They are explained
below:
Implicit Distribution of a Post-Processor
A post-processor is considered to be implicitly distributed if it implements
IPartitionedPostProcessor
and if it exposes at least one of the distributing fields as a
partitioning level.
In such cases, the post-processor and all its underlying measures are executed on each
involved cube locally, and the intermediate results are aggregated into the final result using the
IPartitionedPostProcessor#reduce
method.
Auto-Distributed Post-Processors
In a distributed environment, the data cube has two additional properties:
executePostProcessorsInDataCube
can be set within theDataClusterDefinition
builder. If set to true, all post-processors in the given data cube are evaluated locally. The default value istrue
.executeInDataCube
can be set as a single post-processor property, to allow for more granularity.
For a given post-processor, the value of executeInDataCube
takes precedence over the value of
executePostProcessorsInDataCube
when both properties are specified, with different values (see example below).
StartBuilding.cube("DataCube")
.withAggregatedMeasure()
.sum("pnl")
.withPostProcessor(FIRST_PP)
.withPluginKey(PLUGIN_KEY_1)
.withUnderlyingMeasures("pnl.SUM")
.withPostProcessor(SECOND_PP)
.withPluginKey(PLUGIN_KEY_2)
.withUnderlyingMeasures(SECOND_PP)
.withProperty(IDistributedPostProcessor.EXECUTE_IN_DATA_CUBE_PROPERTY, "true")
.withSingleLevelDimension("Currency")
.asDataCube()
.withClusterDefinition()
.withClusterId("distributed_cluster")
.withMessengerDefinition()
.withProtocolPath("jgroups-protocols/protocol-tcp.xml")
.end()
.withApplicationId("app")
.withAllHierarchies()
.withAllMeasures()
.withProperty(IClusterDefinition.EXECUTE_POST_PROCESSORS_IN_DATA_CUBE_PROPERTY, "false")
.end()
.build();
Having these properties set to true
is equivalent to wrapping the corresponding
post-processors in DistributedPostProcessor
s. Thus, we recommend using both these properties as much as possible (setting them to true
),
in order reduce network traffic and speed up queries. This will induce more work on local instances and less data sent over to the query cubes
(data cubes will sent their partial result instead of the underlying measures the post-processors rely on).
Specific cases where setting these properties to true
(i.e. default value) can be useful:
- A post-processor that needs to be specifically initialized in the data cube.
- A post-processor whose code cannot be shared with the query cube.
- A post-processor that requires datastore querying for computation.
- A post-processor that is computed in a MapReduce style, using the SUM aggregation function to sum up the values and get the final result.
Specific cases where these properties should be set to false
:
- A post-processor that cannot be computed in a MapReduce style without inducing erroneous results.
In this case, advanced ActivePivot users can implement
IDistributedPostProcessor
to force local execution and decide how to finish the computation. This incurs data transfers between the data nodes and the query cube to perform the final aggregation. - The user is already using the wrapping technique demonstrated above.
Note that a
DistributedPostProcessor
defined on the data cube will see its definition automatically copied to the distributed cube.
Chained Distributed Post-Processors
A distributed post-processor at the top of a chain of post-processors allows the entire chain to
be processed locally on each remote cube, and the results will be aggregated using the distributed post-processor's aggregation function
as shown in the ForexPostProcessor
example.
Therefore, a good practice is to use distributed post-processors for the most computation intensive measures, especially on the top of the chain.
Be careful, the above does not mean that the chained post-processors are distributed.
The following example illustrates well the concept of chained distributed post-processors.
Example: The ForexPostProcessor
Forex stands for Foreign Exchange Market. It is the currency market. PnL stands for Profits and Losses, and is one of the financial statements companies issue to demonstrate revenues and expenses during a set period of time.
Let's imagine a PnL application distributed along the field Currency, with one query cube and three data cubes, that contains:
- USD
- EUR and JPY
- GBP
To compute its post-processed measure, the ForexPostProcessor
requests the value of the pnl.SUM
measure for each currency on the given location. It then converts the values from each currency into
a reference currency, for example, USD, sums them up and returns the result.
If a user requests the top level location with the Grand Totals, we could ask each remote instance
for its total pnl.SUM
per currency, bring all this data back to the query cube, apply the currency
conversion, and sum the converted totals before returning the final result to the user.
This is however quite inefficient, as the currency conversion computation could be done in the remote instances. Of course, this becomes more interesting with more expensive computations.
Setting the property executeInDataCube
is equivalent to the following process,
which takes advantage of having several nodes:
Create an intermediate DistributedPostProcessor
that requests the value of the
ForexPostProcessor
for the requested location on each remote cube, and use the
DistributedPostProcessor
's aggregation function (SUM) to sum up the values and compute the final
result. This way:
- most of the computation is spread on multiple servers/more CPUs,
- less data is traveling through the network.
The following example is no longer the recommended way to do this. The properties detailed above automatically provide this distribution without requiring additional work from the developer. However, the following code helps to understand what has been done to simplify the user's work.
StartBuilding.cube("DataCube")
.withAggregatedMeasure()
.sum("pnl")
.withPostProcessor(PNL_FOREX)
.withPluginKey(FOREX_PLUGIN_KEY)
.withUnderlyingMeasures("pnl.SUM")
.withContinuousQueryHandlers("STORED", "FOREX")
.withProperty("leafLevels", "Currency@Currency@Currency")
.withPostProcessor(DISTRIBUTED_FOREX)
.withPluginKey(DistributedPostProcessor.PLUGIN_KEY)
.withUnderlyingMeasures(PNL_FOREX)
.withSingleLevelDimension("Currency")
.asDataCube()