Skip to main content

Aggregate Tables

danger

Aggregate Tables are an experimental feature, the API might be changed in any release.

To use this feature, please add the following JVM argument: -Dactiveviam.feature.aggregate_tables.enabled=true.

Note that only Snowflake and ClickHouse support Aggregate Tables at the moment.

DirectQuery is using the external database to feed some aggregate providers and answer some queries. This can result in some large queries with heavy load on the database, and if the same cube is started multiple times, the same query can be repeated.

Aggregate tables are a way to cache some aggregated data in the external database to make some queries faster and cheaper.

How does it work

Aggregate Tables are tables added in the external database as a table fed from aggregated data.

Whenever possible, DirectQuery will query the Aggregate Table instead of the base Schema for a more optimized query:

For instance, consider a Sales table containing sales such as this one:

IdDateProductBuyerQuantityPrice
12023-01-01P1Buyer_A100.015.5
22023-01-01P1Buyer_B100.016.5
32023-01-01P2Buyer_A100.030
42023-01-01P2Buyer_B100.031
52023-01-02P1Buyer_A100.015.5
62023-01-02P1Buyer_B100.015.5
72023-01-02P1Buyer_A100.018
82023-01-03P1Buyer_B100.018
92023-01-03P2Buyer_A100.035
102023-01-03P2Buyer_B100.040

It is possible to aggregate the sum of the Quantity per Date and Product in the external database into an other table agg_sales:

dateproductsum_of_quantitycount_of_rows
2023-01-01P1200.02
2023-01-01P2200.02
2023-01-02P1300.03
2023-01-03P1100.01
2023-01-03P2200.02

If this table is added as an Aggregate Table in DirectQuery, it will be used whenever possible instead of the original Sales table.

For instance when asking for the SUM(Quantity) per Product in the Sales table, DirectQuery will retrieve the SUM(sum_of_quantity) per product in agg_sales. As the agg_sales is smaller and already aggregated the result will be faster to compute and will potentially avoid a huge scan of the database, saving both time and compute costs.

Use cases

There are a few classic use cases for using an Aggregate Table:

  • Feeding an aggregate provider faster by pre-aggregating over the provider fields
  • Having an "external database aggregate provider", i.e. being able to answer some queries from the pre-aggregated data in the external database but without having to store additional data in-memory.
  • Feeding some hierarchies faster

Supported aggregations

The following aggregation function can be used in an Aggregate Table:

  • Sum
  • Min
  • Max
  • Average
  • Sum product
  • Count
  • Long
  • Short
  • Gross
  • Square sum

Custom Aggregations

If you have a custom aggregation functions, you must implement an equivalent ISqlReAggregationFunction plugin value and register it in the Registry with the same key as your aggregation function.

Examples

Create a simple Aggregate Table to feed an aggregate provider

Let’s create a new table that will pre-aggregate the quantity (in the tutorial sales table) per Date and Product:

CREATE OR REPLACE TABLE
tutorial.SALES_BY_PRODUCT
(
PRODUCT String NOT NULL,
QUANTITY_SUM FLOAT64 NOT NULL,
CONTRIBUTORS_COUNT INT64 NOT NULL
)

We feed it with the data from the other tables:

INSERT INTO tutorial.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
tutorial.SALES AS T0
GROUP BY
T0.PRODUCT;

The pre-aggregated data looks like this:

PRODUCTQUANTITY_SUMCONTRIBUTORS_COUNT
BED_264
HOO_533
TAB_044
TAB_143
TSH_333
TSH_453

Let's first discover the SQL table corresponding to the Aggregate Table:

final Table salesTable = session.discoverTable(TableId.of(PROJECT_ID, "tutorial", "SALES"));
final Table table = session.discoverTable(TableId.of(PROJECT_ID, "tutorial", "SALES_BY_PRODUCT"));

We will map the PRODUCT field of the sales table to the product field of the Aggregate Table:

final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");

We will also map the sum of the sales quantity column into a column called quantity_sum:

final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));

We can now build the AggregateTable:

AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();

And add it to the schema:

final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();

Let's now define a bitmap aggregate provider on the sum of quantity per Product, the feeding of the in-memory provider will be done directly from the Aggregate Table:

final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();

Note that it is possible to make sure that a provider will be fed by a given Aggregate Table by adding the provider coordinate to this Aggregate Table. This will simply trigger a check when starting the application to make sure that they are compatible.

aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();

More complex aggregations such as average

In the previous example, SUM was based on a single input and pre-aggregated into a single column, but there are some more complex cases. For instance:

  • count has no input and produces 1 pre-aggregated column
  • sumProduct can takes N input columns and produces 1 pre-aggregated column
  • average takes a single input column but requires 2 pre-aggregated column in order to be able to re-aggregate: one column for the sum and one for the count

final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();

Count

The count is measure automatically added to any query sent to the database. When adding an aggregate table, a check is therefore performed to ensure it contains a column with a count. (otherwise it would never be used).

Slicing hierarchies

Slicing hierarchies are always expressed in the queries sent to the database. When a field expressed in a query is not part of the Aggregate Table, the query will not use the Aggregate Table. Therefore it is recommended to include the slicing hierarchies in Aggregate Table definition.

Aggregate tables with vectors

Arrays can be aggregated in an aggregate table but they need to have a specific format depending on the type of original array being aggregated:

Original ArrayAggregate Table Array
NativeNative
Multi Column ArrayMulti Column Array
Multi Row ArrayNative

See the DirectQuery vector documentation for more information about array types.

Generate SQL for Aggregate Table matching an Aggregate Provider

note

Only ClickHouse supports generating SQL for Aggregate Tables.

Aggregate Tables are a good tool to feed Aggregate Providers quickly because it can use the aggregated data instead of running a new aggregation. DirectQuery provides a bootstrapper to generate the SQL to create and feed an Aggregate Table matching an Aggregate Provider.

Let's imagine we have the following Aggregate Provider defined:

.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")

We can create a bootstrapper for it:

final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final AggregateTableBootstrapper bootstrapper =
session.getAggregateTableBootstrapper(schema, provider, managerDescription, null, null);

This bootstrapper can provide the SQL to create and feed the provider:

System.out.println(bootstrapper.getSqlForCreation());

CREATE TABLE `TUTORIAL`.`Aggregate_Table_MyProvider` (
`PRODUCT` String,
`Quantity.SUM` Nullable(Float64),
`contributors.COUNT` Nullable(Int64),
)
ENGINE = MergeTree()
ORDER BY `PRODUCT`

System.out.println(bootstrapper.getSqlForFeeding());

INSERT INTO `TUTORIAL`.`Aggregate_Table_MyProvider` SELECT
`T0`.`PRODUCT` AS `PRODUCT_0`,
sum(COALESCE(`T0`.`QUANTITY`, 0.0)) AS `Quantity.SUM_1`,
COUNT(*) AS `contributors.COUNT_2`
FROM
`TUTORIAL`.`SALES` AS `T0`
GROUP BY
`T0`.`PRODUCT`

You will need to execute this SQL to create and feed the table. Creating and feeding the SQL table only need to be done once: remember that the purpose of using an Aggregate Table is to avoid re-running the same queries at every start up. A good way to do that is to run the queries manually.

When the table exists, it is possible to use the Aggregate Table in the application, the Aggregate Provider will use it for feeding:

final AggregateTable aggregateTable = bootstrapper.toAggregateTable();
final Application app = session.applicationBuilder()
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();