Aggregate Tables
Aggregate Tables are an experimental feature, the API might be changed in any release.
To use this feature, please add the following JVM argument: -Dactiveviam.feature.aggregate_tables.enabled=true
.
DirectQuery is using the external database to feed some aggregate providers and answer some queries. This can result in some large queries with heavy load on the database, and if the same cube is started multiple times, the same query can be repeated.
Aggregate tables are a way to cache some aggregated data in the external database to make some queries faster and cheaper.
How does it work
Aggregate Tables are tables added in the external database as a table fed from aggregated data.
Whenever possible, DirectQuery will query the Aggregate Table instead of the base Schema for a more optimized query:
For instance, consider a Sales table containing sales such as this one:
Id | Date | Product | Buyer | Quantity | Price |
---|---|---|---|---|---|
1 | 2023-01-01 | P1 | Buyer_A | 100.0 | 15.5 |
2 | 2023-01-01 | P1 | Buyer_B | 100.0 | 16.5 |
3 | 2023-01-01 | P2 | Buyer_A | 100.0 | 30 |
4 | 2023-01-01 | P2 | Buyer_B | 100.0 | 31 |
5 | 2023-01-02 | P1 | Buyer_A | 100.0 | 15.5 |
6 | 2023-01-02 | P1 | Buyer_B | 100.0 | 15.5 |
7 | 2023-01-02 | P1 | Buyer_A | 100.0 | 18 |
8 | 2023-01-03 | P1 | Buyer_B | 100.0 | 18 |
9 | 2023-01-03 | P2 | Buyer_A | 100.0 | 35 |
10 | 2023-01-03 | P2 | Buyer_B | 100.0 | 40 |
It is possible to aggregate the sum of the Quantity per Date and Product in the external database into an other table agg_sales:
date | product | sum_of_quantity | count_of_rows |
---|---|---|---|
2023-01-01 | P1 | 200.0 | 2 |
2023-01-01 | P2 | 200.0 | 2 |
2023-01-02 | P1 | 300.0 | 3 |
2023-01-03 | P1 | 100.0 | 1 |
2023-01-03 | P2 | 200.0 | 2 |
If this table is added as an Aggregate Table in DirectQuery, it will be used whenever possible instead of the original Sales table.
For instance when asking for the SUM(Quantity)
per Product in the Sales table, DirectQuery will retrieve the SUM(sum_of_quantity)
per product in agg_sales.
As the agg_sales is smaller and already aggregated the result will be faster to compute and will potentially avoid a huge scan of the database, saving both time and compute costs.
Use cases
There are a few classic use cases for using an Aggregate Table:
- Feeding an aggregate provider faster by pre-aggregating over the provider fields
- Having an "external database aggregate provider", i.e. being able to answer some queries from the pre-aggregated data in the external database but without having to store additional data in-memory.
- Feeding some hierarchies faster
Supported aggregations
The following aggregation function can be used in an Aggregate Table:
- Sum
- Min
- Max
- Average
- Sum product
- Count
- Long
- Short
- Gross
- Square sum
Custom Aggregations
If you have a custom aggregation functions, you must implement an equivalent ISqlReAggregationFunction
plugin value and register it in the Registry with the same key as your aggregation function.
Examples
Create a simple Aggregate Table to feed an aggregate provider
Let’s create a new table that will pre-aggregate the quantity (in the tutorial sales table) per Date and Product:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
CREATE OR REPLACE TABLE `tutorial`.SALES_BY_PRODUCT (
PRODUCT STRING NOT NULL,
QUANTITY_SUM FLOAT64 NOT NULL,
CONTRIBUTORS_COUNT INT64 NOT NULL
);
We feed it with the data from the other tables:
INSERT INTO `tutorial`.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
`tutorial`.SALES AS T0
GROUP BY
T0.PRODUCT;
create table
TUTORIAL.sales_by_product
(
`product` String NOT NULL,
`quantity_sum` Float64 NOT NULL,
`contributors_count` UInt64 NOT NULL
)
ENGINE = MergeTree()
ORDER BY (product);
We feed it with the data from the other tables:
INSERT INTO TUTORIAL.sales_by_product
SELECT
T0.PRODUCT AS `product`,
sum(T0.QUANTITY) AS `quantity_sum`,
COUNT(*) AS `contributors_count`
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
CREATE OR REPLACE TABLE
tutorial.SALES_BY_PRODUCT
(
PRODUCT String NOT NULL,
QUANTITY_SUM FLOAT NOT NULL,
CONTRIBUTORS_COUNT INTEGER NOT NULL
)
We feed it with the data from the other tables:
INSERT INTO tutorial.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
tutorial.SALES AS T0
GROUP BY
T0.PRODUCT;
if not exists (select * from sys.tables where name = 'SALES_BY_PRODUCT')
CREATE TABLE TUTORIAL.SALES_BY_PRODUCT (
[PRODUCT] VARCHAR(50) NOT NULL,
[QUANTITY_SUM] FLOAT NOT NULL,
[CONTRIBUTORS_COUNT] BIGINT NOT NULL
);
We feed it with the data from the other tables:
INSERT INTO TUTORIAL.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
CREATE TABLE IF NOT EXISTS
tutorial."SALES_BY_PRODUCT"
(
"PRODUCT" VARCHAR NOT NULL,
"QUANTITY_SUM" FLOAT NOT NULL,
"CONTRIBUTORS_COUNT" BIGINT NOT NULL
);
We feed it with the data from the other tables:
INSERT INTO tutorial."SALES_BY_PRODUCT"
SELECT
T0."PRODUCT" AS "PRODUCT",
sum(T0."QUANTITY") AS "QUANTITY_SUM",
COUNT(*) AS "CONTRIBUTORS_COUNT"
FROM
tutorial."SALES" AS T0
GROUP BY
T0."PRODUCT";
CREATE OR REPLACE TABLE
TUTORIAL.SALES_BY_PRODUCT
(
PRODUCT String NOT NULL,
QUANTITY_SUM FLOAT NOT NULL,
CONTRIBUTORS_COUNT INTEGER NOT NULL
)
We feed it with the data from the other tables:
INSERT INTO TUTORIAL.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
CREATE VIEW TUTORIAL.SALES_BY_PRODUCT_VIEW AS
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
To simplify this example on Synapse we create a view containing the Aggregated data. However, views means that the result will be recomputed for each query so it is not a wise choice for an Aggregate Table. We recommend to use a materialized table.
The pre-aggregated data looks like this:
PRODUCT | QUANTITY_SUM | CONTRIBUTORS_COUNT |
---|---|---|
BED_2 | 6 | 4 |
HOO_5 | 3 | 3 |
TAB_0 | 4 | 4 |
TAB_1 | 4 | 3 |
TSH_3 | 3 | 3 |
TSH_4 | 5 | 3 |
Let's first discover the SQL table corresponding to the Aggregate Table:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final Table salesTable = session.discoverTable(TableId.of(PROJECT_ID, "tutorial", "SALES"));
final Table table = session.discoverTable(TableId.of(PROJECT_ID, "tutorial", "SALES_BY_PRODUCT"));
final Table salesTable = session.discoverTable("TUTORIAL", "SALES");
final Table table = session.discoverTable("TUTORIAL", "sales_by_product");
final Table salesTable =
session.discoverTable(new SqlTableId(TEST_CATALOG_NAME, TUTORIAL_SCHEMA_NAME, "SALES"));
final Table table =
session.discoverTable(new SqlTableId(TEST_CATALOG_NAME, TUTORIAL_SCHEMA_NAME, "SALES_BY_PRODUCT"));
final Table salesTable = session.discoverTable(new SqlTableId(DATABASE_NAME, SCHEMA_NAME, "SALES"));
final Table table = session.discoverTable(new SqlTableId(DATABASE_NAME, SCHEMA_NAME, "SALES_BY_PRODUCT"));
final Table salesTable =
session.discoverTable(new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES"));
final Table table = session
.discoverTable(new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES_BY_PRODUCT"));
final Table salesTable =
session.discoverTable(new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES"));
final Table table = session
.discoverTable(new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES_BY_PRODUCT"));
final Table salesTable = session.discoverTable(new SqlTableId(TEST_DATABASE_NAME, "TUTORIAL", "SALES"));
final Table table =
session.discoverTable(new SqlTableId(TEST_DATABASE_NAME, "TUTORIAL", "SALES_BY_PRODUCT_VIEW"));
We will map the PRODUCT
field of the sales table to the product
field of the Aggregate Table:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "product");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
We will also map the sum of the sales quantity
column into a column called quantity_sum
:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "quantity_sum"),
MeasureMapping.count("contributors_count"));
final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping = List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
We can now build the AggregateTable
:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
And add it to the schema:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();
final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();
final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();
final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();
final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();
final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();
final Schema schema = Schema.builder()
.withExternalTables(List.of(salesTable), List.of())
.withExternalAggregateTables(List.of(aggregateTable))
.build();
Let's now define a bitmap aggregate provider on the sum of quantity per Product
, the feeding of the in-memory provider will be done directly from the Aggregate Table:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();
final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();
final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();
final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();
final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();
final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();
final IActivePivotInstanceDescription cubeDescription = StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder -> superMeasureBuilder
.withCalculations(c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription = StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
session.applicationBuilder().schema(schema).managerDescription(managerDescription).build();
Note that it is possible to make sure that a provider will be fed by a given Aggregate Table by adding the provider coordinate to this Aggregate Table. This will simply trigger a check when starting the application to make sure that they are compatible.
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();
aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();
aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();
aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();
aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();
aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();
aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.forProviders(List.of(ProviderCoordinate.at("MyCube", "MyProvider")))
.build();
More complex aggregations such as average
In the previous example, SUM was based on a single input and pre-aggregated into a single column, but there are some more complex cases. For instance:
count
has no input and produces 1 pre-aggregated columnsumProduct
can takes N input columns and produces 1 pre-aggregated columnaverage
takes a single input column but requires 2 pre-aggregated column in order to be able to re-aggregate: one column for the sum and one for the count
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "product"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "quantity_sum"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "quantity_sum", "contributors_count"),
MeasureMapping.count("contributors_count")))
.build();
final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTable aggregateTable = AggregateTable.builder()
.fromTable(table)
.asAggregationOf(salesTable)
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
Count
The count is measure automatically added to any query sent to the database. When adding an aggregate table, a check is therefore performed to ensure it contains a column with a count. (otherwise it would never be used).
Slicing hierarchies
Slicing hierarchies are always expressed in the queries sent to the database. When a field expressed in a query is not part of the Aggregate Table, the query will not use the Aggregate Table. Therefore it is recommended to include the slicing hierarchies in Aggregate Table definition.
Aggregate tables with vectors
Arrays can be aggregated in an aggregate table but they need to have a specific format depending on the type of original array being aggregated:
Original Array | Aggregate Table Array |
---|---|
Native | Native |
Multi Column Array | Multi Column Array |
Multi Row Array | Native |
See the DirectQuery vector documentation for more information about array types.
Generate SQL for Aggregate Table matching an Aggregate Provider
Aggregate Tables are a good tool to feed Aggregate Providers quickly because it can use the aggregated data instead of running a new aggregation. DirectQuery provides a bootstrapper to generate the SQL to create and feed an Aggregate Table matching an Aggregate Provider.
Let's imagine we have the following Aggregate Provider defined:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
We can create a bootstrapper for it:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final AggregateTableBootstrapper bootstrapper = session.getAggregateTableBootstrapper(
schema,
provider,
managerDescription,
aggregateTableName,
null,
AGGREGATE_TABLE_TEMP_DATASET);
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final AggregateTableBootstrapper bootstrapper =
session.getAggregateTableBootstrapper(schema, provider, managerDescription, null, null);
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final AggregateTableBootstrapper bootstrapper = session.getAggregateTableBootstrapper(
schema,
provider,
managerDescription,
aggregateTableName,
null,
AGGREGATE_TABLE_TEMP_SCHEMA_NAME);
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final AggregateTableBootstrapper bootstrapper =
session.getAggregateTableBootstrapper(schema, provider, managerDescription, null, null, null);
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final AggregateTableBootstrapper bootstrapper = session.getAggregateTableBootstrapper(
schema,
provider,
managerDescription,
aggregateTableName,
null,
AGGREGATE_TABLE_TEMP_SCHEMA_NAME);
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final AggregateTableBootstrapper bootstrapper = session.getAggregateTableBootstrapper(
schema,
provider,
managerDescription,
aggregateTableName,
TEST_DATABASE_NAME,
AGGREGATE_TABLE_TEMP_SCHEMA_NAME);
This bootstrapper can provide the SQL to create and feed the provider:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE `cubulus-tests`.`test_aggregate_table_temp`.`Aggregate_Table_MyProvider` (
`PRODUCT` String NOT NULL,
`Quantity__PERIOD__SUM` INT64,
`contributors__PERIOD__COUNT` INT64
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO `cubulus-tests`.`test_aggregate_table_temp`.`Aggregate_Table_MyProvider` SELECT
`T0`.`PRODUCT` AS `PRODUCT_0`,
SUM(COALESCE(`T0`.`QUANTITY`, 0)) AS `Quantity__PERIOD__SUM_1`,
COUNT(*) AS `contributors__PERIOD__COUNT_2`
FROM
`cubulus-tests`.`tutorial`.`SALES` AS `T0`
GROUP BY
`T0`.`PRODUCT`
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE `TUTORIAL`.`Aggregate_Table_MyProvider` (
`PRODUCT` String,
`Quantity.SUM` Nullable(Float64),
`contributors.COUNT` Nullable(Int64)
)
ENGINE = MergeTree()
ORDER BY `PRODUCT`
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO `TUTORIAL`.`Aggregate_Table_MyProvider` SELECT
`T0`.`PRODUCT` AS `PRODUCT_0`,
SUM(COALESCE(`T0`.`QUANTITY`, 0.0)) AS `Quantity.SUM_1`,
COUNT(*) AS `contributors.COUNT_2`
FROM
`TUTORIAL`.`SALES` AS `T0`
GROUP BY
`T0`.`PRODUCT`
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE `hive_metastore`.`aggregate_table_temp_schema`.`Aggregate_Table_MyProvider` (
`PRODUCT` String NOT NULL,
`Quantity.SUM` DOUBLE,
`contributors.COUNT` BIGINT
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO `hive_metastore`.`aggregate_table_temp_schema`.`Aggregate_Table_MyProvider` SELECT
`T0`.`PRODUCT` AS `PRODUCT_0`,
SUM(`T0`.`QUANTITY`) AS `Quantity.SUM_1`,
COUNT(*) AS `contributors.COUNT_2`
FROM
`hive_metastore`.`tutorial`.`SALES` AS `T0`
GROUP BY
`T0`.`PRODUCT`
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE "dev"."TUTORIAL"."Aggregate_Table_MyProvider" (
"PRODUCT" nvarchar(4000) NOT NULL,
"Quantity.SUM" BigInt,
"contributors.COUNT" BigInt
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO "dev"."TUTORIAL"."Aggregate_Table_MyProvider" SELECT
"T0"."PRODUCT" AS "PRODUCT_0",
SUM("T0"."QUANTITY") AS "Quantity.SUM_1",
COUNT(*) AS "contributors.COUNT_2"
FROM
"dev"."TUTORIAL"."SALES" AS "T0"
GROUP BY
"T0"."PRODUCT"
System.out.println(bootstrapper.getSqlForCreation());
SET enable_case_sensitive_identifier TO true;
CREATE TABLE "dev"."test_aggregate_table_temp"."Aggregate_Table_MyProvider" (
"PRODUCT" VARCHAR NOT NULL,
"Quantity.SUM" FLOAT8,
"contributors.COUNT" BIGINT
)
System.out.println(bootstrapper.getSqlForFeeding());
SET enable_case_sensitive_identifier TO true;
INSERT INTO "dev"."test_aggregate_table_temp"."Aggregate_Table_MyProvider" SELECT
"T0"."PRODUCT" AS "PRODUCT_0",
SUM(COALESCE("T0"."QUANTITY", 0.0)) AS "Quantity.SUM_1",
COUNT(*) AS "contributors.COUNT_2"
FROM
"dev"."tutorial"."SALES" AS "T0"
GROUP BY
"T0"."PRODUCT"
System.out.println(bootstrapper.getSqlForCreation());
CREATE TRANSIENT TABLE "TEST_RESOURCES"."TEST_AGGREGATE_TABLE_TEMP"."Aggregate_Table_MyProvider" (
"PRODUCT" VARCHAR NOT NULL,
"Quantity.SUM" DOUBLE,
"contributors.COUNT" NUMBER
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO "TEST_RESOURCES"."TEST_AGGREGATE_TABLE_TEMP"."Aggregate_Table_MyProvider" SELECT
"T0"."PRODUCT" AS "PRODUCT_0",
SUM(COALESCE("T0"."QUANTITY", 0.0)) AS "Quantity.SUM_1",
COUNT(*) AS "contributors.COUNT_2"
FROM
"TEST_RESOURCES"."TUTORIAL"."SALES" AS "T0"
GROUP BY
"T0"."PRODUCT"
You will need to execute this SQL to create and feed the table. Creating and feeding the SQL table only need to be done once: remember that the purpose of using an Aggregate Table is to avoid re-running the same queries at every start up. A good way to do that is to run the queries manually.
When the table exists, it is possible to use the Aggregate Table in the application, the Aggregate Provider will use it for feeding:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
final AggregateTable aggregateTable = bootstrapper.toAggregateTable();
final Application app = session.applicationBuilder()
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTable aggregateTable = bootstrapper.toAggregateTable();
final Application app = session.applicationBuilder()
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTable aggregateTable = bootstrapper.toAggregateTable();
final Application app = session.applicationBuilder()
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTable aggregateTable = bootstrapper.toAggregateTable();
final Application app = session.applicationBuilder()
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTable aggregateTable = bootstrapper.toAggregateTable();
final Application app = session.applicationBuilder()
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTable aggregateTable = bootstrapper.toAggregateTable();
final Application app = session.applicationBuilder()
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
Is my query using the Aggregate Table ?
The simplest solution to check whether the query is using an aggregate table is to look at the query in the external database. If that is the case, then the aggregate table name should appear in the SQL query as the base table (the first table in the FROM clause). If it using an aggregate table, the aggregate table name should appears as the base table (the first table in FROM clause) in the SQL query.
Additionally, for databases supporting tags such as Snowflake, the tag aggregate_table_name
will be added to the query with the name of the aggregate table used.
While building a project, to have more information about why a query is compatible or not with aggregate tables, it is possible to add additional logs by setting the logger com.activeviam.database.sql.internal.query.AggregateTableQueryOptimizer
to FINE
.
The additional logs look like this:
FINE: Trying to match query with 1 aggregate tables. Query is [...]
Aug 21, 2023 4:46:37 PM com.activeviam.database.sql.internal.query.AggregateTableQueryOptimizer isCompatibleAggregateTable
FINE: The query did not match the Aggregate Table AggregateTable-0. Compatible base table: true, compatible group by fields: true, compatible selection fields: true, compatible aggregations: true, compatible condition: false. Query was [...]
Aug 21, 2023 4:46:37 PM com.activeviam.database.sql.internal.query.AggregateTableQueryOptimizer findMatchingAggregateTable
FINE: No matching Aggregate Table for query [...]