ActivePivot

ActivePivot

  • 5.10.4
  • Other Versions
  • User Guide
  • Technical Documentation
  • Support

›Distributed Architecture

Introduction

  • Overview
  • What's new in ActivePivot

Getting Started

  • Overview
  • AP in a Nutshell
  • Development Environment
  • Download
  • Sandbox Project

Concepts

  • Overview
  • AP Concepts in a Nutshell
  • Data Versioning (MVCC)
  • Dimensions and Hierarchies
  • Partitioning and NUMA
  • Other Concepts

Data Loading

  • Overview
  • Datastore

    • Datastore Configuration
    • Datastore Transactions
    • Store Indexing

    ETL

    • Overview
    • CSV Source
    • JDBC Source
    • Parquet Source

    Loading data from the cloud

    • Cloud Source
    • Amazon S3 Cloud Source
    • Azure Cloud Source
    • Google Cloud Source

    Specific data types

    • Handling Dates
    • Vectors

Aggregation & Analytics

  • Overview
  • Cube Configuration
  • Copper API

    • Introduction
    • API
    • Measures
    • Hierarchies
    • Publication
    • Join operations
    • Advanced topics

    Streaming API

    • Continuous Queries Overview
    • Streaming Overview
    • Continuous Query Engine
    • Continuous Handlers

    Advanced APIs

    • Cube Locations
    • Post-Processors
    • Cube Filters
    • Member Properties
    • Context Values

Data Querying

  • Overview
  • Business Frontends
  • Server Endpoints

    • XMLA
    • Datastore REST API
    • Cube REST API
    • Cube Websocket API

    MDX

    • MDX Engine Configuration
    • MDX Functions
    • MDX Operators
    • MDX Formats
    • MDX Filtering
    • MDX Snippets
    • MDX Cellsets
  • Datastore Queries
  • Location-Based Queries
  • Drillthrough Extensions

Configuration

  • Overview
  • ContentServer

    • Content Server
    • ContentServer REST API
    • CS Websocket API
  • ActivePivot Properties
  • Internationalization

Security

  • Overview
  • Client/Server Communication

    • Authentication
    • Authorization & Entitlements

    Data Access Control

    • Datastore Access Control
    • ActivePivot Access Control
    • Branch Permission Manager

Distributed Architecture

  • Overview
  • Communication Flows
  • Post-Processors
  • Security
  • What-If
  • Recommendations
  • Distribution Properties

Operations

  • Overview
  • Monitoring

    • Health Dispatcher
    • Query Execution Plan
    • Monitoring Query Execution
    • JMX monitoring
    • Off-Heap Memory Export
    • Tracing REST API
  • Troubleshooting
  • Performance
  • High Availability

Release & Migration Notes

  • Changelog
  • Migration notes
  • Deprecated features

Reference

  • Javadoc
  • REST APIs

Distributed Post-Processors

A post-processor is a function that computes a measure at query time based on other measures in the cube. Thus, retrieving a post-processed measure involves retrieving the measures upon which it is based, potentially cascading through a dependency chain until ActivePivot finds primitive measures at the given location.

Prefetchers are responsible for computing underlying dependencies, and are declared within each post-processor.

A major feature of a distributed ActivePivot cluster is the fact that post-processors can have distributed prefetchers. A post-processor can ask for any underlying measure, be it pre-aggregated or post-processed, to be evaluated on a given set of locations spreading across all the remote ActivePivot cubes.

This allows the user to pre-compute some aggregates on the local instances before sending them to the query cube to finish the calculation. This way, the computation can benefit from each local instance's aggregates cache. Each data cube stores the results of the most popular queries, so that the pre-computed results are already available the next time similar queries hit the node.

For efficient query planning, post-processors declare a list of prefetchers. This often allows ActivePivot to retrieve all underlying measures in a single pass before evaluating the post- processors. In a distributed environment, this is of paramount importance, as a single pass also means a single round trip on the network. Do not ignore the prefetchers' warnings while working with a distributed cluster, as bad prefetching or lack of prefetching can lead to incorrect results.

Post-processors can be defined on both query cubes and data cubes. In what follows, we explain the different post-processor settings when using distribution.

Post-Processor Definition in the Query Cube

Declaring post-processors directly in the query cube allows performing advanced calculations on measures coming from data cubes of different applications that do not share the same topology.

In the following example, we consider one query cube linked to two data cubes from two different applications of a supply chain use case.

One data cube contains all the inbound flux, the other data cube contains all the outbound flux. To calculate the current stock for a product in a warehouse, the query cube needs to get all the inbound flux entries for that specific product/warehouse location, and needs to subtract all the outbound flux entries.

StartBuilding.cube("QueryCube")
        .withUpdateTimestamp()
            .withinFolder("Native_measures")
            .withAlias("Distributed_Timestamp")
            .withFormatter("DATE[HH:mm:ss]")
        .withPostProcessor("CURRENT_STOCK")
            // CURRENT_STOCK = incoming - leaving
            .withPluginKey("DiffPostProcessor")
            .withUnderlyingMeasures("incoming_flux", "leaving_flux")
        .asQueryCube()
        .withClusterDefinition()
            .withClusterId("SupplyChainCubeCluster")
            .withMessengerDefinition()
            .withProtocolPath("jgroups-protocols/protocol-tcp.xml")
            .end()
            .withApplication("app-incoming-flux")
                .withoutDistributingFields()
            .withApplication("app-leaving-flux")
                .withoutDistributingFields()
        .end()
        .build();

Post-Processor Definition in the Data Cube

In order to leverage the remote data cube instances, ActivePivot relies on DistributedPostProcessor, whose computations are distributed on each data cube. The partial results are then reduced within the query cube, using a given aggregation function (default is SUM). There are three approaches to make use of such post-processors. They are explained below:

Implicit Distribution of a Post-Processor

A post-processor is considered to be implicitly distributed if it implements IPartitionedPostProcessor and if it exposes at least one of the distributing fields as a partitioning level.

In such cases, the post-processor and all its underlying measures are executed on each involved cube locally, and the intermediate results are aggregated into the final result using the IPartitionedPostProcessor#reduce method.

Auto-Distributed Post-Processors

In a distributed environment, the data cube has two additional properties:

  • executePostProcessorsInDataCube can be set within the DataClusterDefinition builder. If set to true, all post-processors in the given data cube are evaluated locally. The default value is true.
  • executeInDataCube can be set as a single post-processor property, to allow for more granularity.

For a given post-processor, the value of executeInDataCube takes precedence over the value of executePostProcessorsInDataCube when both properties are specified, with different values (see example below).

StartBuilding.cube("DataCube")
        .withAggregatedMeasure().sum("pnl")
        .withPostProcessor(FIRST_PP)
            .withPluginKey(PLUGIN_KEY_1)
            .withUnderlyingMeasures("pnl.SUM")
        .withPostProcessor(SECOND_PP)
            .withPluginKey(PLUGIN_KEY_2)
            .withUnderlyingMeasures(SECOND_PP)
            .withProperty(IDistributedPostProcessor.EXECUTE_IN_DATA_CUBE_PROPERTY, "true")
        .withSingleLevelDimension("Currency")
        .asDataCube()
        .withClusterDefinition()
            .withClusterId("distributed_cluster")
            .withMessengerDefinition()
            .withProtocolPath("jgroups-protocols/protocol-tcp.xml")
            .end()
            .withApplicationId("app")
            .withAllHierarchies()
            .withAllMeasures()
            .withProperty(IClusterDefinition.EXECUTE_POST_PROCESSORS_IN_DATA_CUBE_PROPERTY, "false")
        .end()
        .build();

Having these properties set to true is equivalent to wrapping the corresponding post-processors in DistributedPostProcessors. Thus, we recommend using both these properties as much as possible (setting them to true), in order reduce network traffic and speed up queries. This will induce more work on local instances and less data sent over to the query cubes (data cubes will sent their partial result instead of the underlying measures the post-processors rely on).

Specific cases where setting these properties to true (i.e. default value) can be useful:

  • A post-processor that needs to be specifically initialized in the data cube.
  • A post-processor whose code cannot be shared with the query cube.
  • A post-processor that requires datastore querying for computation.
  • A post-processor that is computed in a MapReduce style, using the SUM aggregation function to sum up the values and get the final result.

Specific cases where these properties should be set to false:

  • A post-processor that cannot be computed in a MapReduce style without inducing erroneous results. In this case, advanced ActivePivot users can implement IDistributedPostProcessor to force local execution and decide how to finish the computation. This incurs data transfers between the data nodes and the query cube to perform the final aggregation.
  • The user is already using the wrapping technique demonstrated above.

Note that a DistributedPostProcessor defined on the data cube will see its definition automatically copied to the distributed cube.

Chained Distributed Post-Processors

A distributed post-processor at the top of a chain of post-processors allows the entire chain to be processed locally on each remote cube, and the results will be aggregated using the distributed post-processor's aggregation function as shown in the ForexPostProcessor example.

Therefore, a good practice is to use distributed post-processors for the most computation intensive measures, especially on the top of the chain.

Be careful, the above does not mean that the chained post-processors are distributed.

The following example illustrates well the concept of chained distributed post-processors.

Example: The ForexPostProcessor

Forex stands for Foreign Exchange Market. It is the currency market. PnL stands for Profits and Losses, and is one of the financial statements companies issue to demonstrate revenues and expenses during a set period of time.

Let's imagine a PnL application distributed along the field Currency, with one query cube and three data cubes, that contains:

  • USD
  • EUR and JPY
  • GBP

To compute its post-processed measure, the ForexPostProcessor requests the value of the pnl.SUM measure for each currency on the given location. It then converts the values from each currency into a reference currency, for example, USD, sums them up and returns the result.

If a user requests the top level location with the Grand Totals, we could ask each remote instance for its total pnl.SUM per currency, bring all this data back to the query cube, apply the currency conversion, and sum the converted totals before returning the final result to the user.

This is however quite inefficient, as the currency conversion computation could be done in the remote instances. Of course, this becomes more interesting with more expensive computations.

Setting the property executeInDataCube is equivalent to the following process, which takes advantage of having several nodes:

Create an intermediate DistributedPostProcessor that requests the value of the ForexPostProcessor for the requested location on each remote cube, and use the DistributedPostProcessor's aggregation function (SUM) to sum up the values and compute the final result. This way:

  • most of the computation is spread on multiple servers/more CPUs,
  • less data is traveling through the network.

The following example is no longer the recommended way to do this. The properties detailed above automatically provide this distribution without requiring additional work from the developer. However, the following code helps to understand what has been done to simplify the user's work.

StartBuilding.cube("DataCube")
        .withAggregatedMeasure().sum("pnl")
        .withPostProcessor(PNL_FOREX)
            .withPluginKey(FOREX_PLUGIN_KEY)
            .withUnderlyingMeasures("pnl.SUM")
            .withContinuousQueryHandlers("STORED", "FOREX")
            .withProperty("leafLevels", "Currency@Currency@Currency")
        .withPostProcessor(DISTRIBUTED_FOREX)
            .withPluginKey(DistributedPostProcessor.PLUGIN_KEY)
            .withUnderlyingMeasures(PNL_FOREX)
        .withSingleLevelDimension("Currency")
        .asDataCube()

← Communication FlowsSecurity →
  • Post-Processor Definition in the Query Cube
  • Post-Processor Definition in the Data Cube
    • Implicit Distribution of a Post-Processor
    • Auto-Distributed Post-Processors
    • Chained Distributed Post-Processors
ActivePivot
Community
Stack OverflowLinkedinTwitter
More
Blog
Copyright © 2021 ActiveViam