Aggregate Tables
DirectQuery is using the external database to feed some aggregate providers and answer some queries. This can result in some large queries with heavy load on the database, and if the same cube is started multiple times, the same query can be repeated.
Aggregate tables are a way to cache some aggregated data in the external database to make some queries faster and cheaper.
How does it work
Aggregate Tables are tables added in the external database as a table fed from aggregated data.
Whenever possible, DirectQuery will query the Aggregate Table instead of the base Schema for a more optimized query:
For instance, consider a Sales table containing sales such as this one:
Id | Date | Product | Buyer | Quantity | Price |
---|---|---|---|---|---|
1 | 2023-01-01 | P1 | Buyer_A | 100.0 | 15.5 |
2 | 2023-01-01 | P1 | Buyer_B | 100.0 | 16.5 |
3 | 2023-01-01 | P2 | Buyer_A | 100.0 | 30 |
4 | 2023-01-01 | P2 | Buyer_B | 100.0 | 31 |
5 | 2023-01-02 | P1 | Buyer_A | 100.0 | 15.5 |
6 | 2023-01-02 | P1 | Buyer_B | 100.0 | 15.5 |
7 | 2023-01-02 | P1 | Buyer_A | 100.0 | 18 |
8 | 2023-01-03 | P1 | Buyer_B | 100.0 | 18 |
9 | 2023-01-03 | P2 | Buyer_A | 100.0 | 35 |
10 | 2023-01-03 | P2 | Buyer_B | 100.0 | 40 |
It is possible to aggregate the sum of the Quantity per Date and Product in the external database into an other table agg_sales:
date | product | sum_of_quantity | count_of_rows |
---|---|---|---|
2023-01-01 | P1 | 200.0 | 2 |
2023-01-01 | P2 | 200.0 | 2 |
2023-01-02 | P1 | 300.0 | 3 |
2023-01-03 | P1 | 100.0 | 1 |
2023-01-03 | P2 | 200.0 | 2 |
If this table is added as an Aggregate Table in DirectQuery, it will be used whenever possible instead of the original Sales table.
For instance when asking for the SUM(Quantity)
per Product in the Sales table, DirectQuery will retrieve the SUM(sum_of_quantity)
per product in agg_sales.
As the agg_sales is smaller and already aggregated the result will be faster to compute and will potentially avoid a huge scan of the database, saving both time and compute costs.
Use cases
There are a few classic use cases for using an Aggregate Table:
- Feeding an aggregate provider faster by pre-aggregating over the provider fields
- Having an "external database aggregate provider", i.e. being able to answer some queries from the pre-aggregated data in the external database but without having to store additional data in-memory.
- Feeding some hierarchies faster
Supported aggregations
The following aggregation function can be used in an Aggregate Table:
- Sum
- Min
- Max
- Average
- Sum product
- Count
- Long
- Short
- Gross
- Square sum
Custom Aggregations
To use a custom aggregation in an aggregate table, you need to define the corresponding user defined aggregation.
You can also register its corresponding re-aggregation function.
This re-aggregation function will be used to compute the aggregates at a coarser granularity than the one stored in the aggregate table.
Examples
Create a simple Aggregate Table to feed an aggregate provider
Let’s create a new table that will pre-aggregate the quantity (in the tutorial sales table) per Date and Product:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
CREATE OR REPLACE TABLE `tutorial`.SALES_BY_PRODUCT (
PRODUCT STRING NOT NULL,
QUANTITY_SUM FLOAT64 NOT NULL,
CONTRIBUTORS_COUNT INT64 NOT NULL
);
We feed it with the data from the other tables:
INSERT INTO `tutorial`.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
`tutorial`.SALES AS T0
GROUP BY
T0.PRODUCT;
create table
TUTORIAL.sales_by_product
(
`product` String NOT NULL,
`quantity_sum` Float64 NOT NULL,
`contributors_count` UInt64 NOT NULL
)
ENGINE = MergeTree()
ORDER BY (product);
We feed it with the data from the other tables:
INSERT INTO TUTORIAL.sales_by_product
SELECT
T0.PRODUCT AS `product`,
sum(T0.QUANTITY) AS `quantity_sum`,
COUNT(*) AS `contributors_count`
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
CREATE OR REPLACE TABLE
tutorial.SALES_BY_PRODUCT
(
PRODUCT String NOT NULL,
QUANTITY_SUM FLOAT NOT NULL,
CONTRIBUTORS_COUNT INTEGER NOT NULL
)
We feed it with the data from the other tables:
INSERT INTO tutorial.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
tutorial.SALES AS T0
GROUP BY
T0.PRODUCT;
if not exists (select * from sys.tables where name = 'SALES_BY_PRODUCT')
CREATE TABLE TUTORIAL.SALES_BY_PRODUCT (
[PRODUCT] VARCHAR(50) NOT NULL,
[QUANTITY_SUM] FLOAT NOT NULL,
[CONTRIBUTORS_COUNT] BIGINT NOT NULL
);
We feed it with the data from the other tables:
INSERT INTO TUTORIAL.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
CREATE TABLE IF NOT EXISTS
tutorial."SALES_BY_PRODUCT"
(
"PRODUCT" VARCHAR NOT NULL,
"QUANTITY_SUM" FLOAT NOT NULL,
"CONTRIBUTORS_COUNT" BIGINT NOT NULL
);
We feed it with the data from the other tables:
INSERT INTO tutorial."SALES_BY_PRODUCT"
SELECT
T0."PRODUCT" AS "PRODUCT",
sum(T0."QUANTITY") AS "QUANTITY_SUM",
COUNT(*) AS "CONTRIBUTORS_COUNT"
FROM
tutorial."SALES" AS T0
GROUP BY
T0."PRODUCT";
CREATE OR REPLACE TABLE
TUTORIAL.SALES_BY_PRODUCT
(
PRODUCT String NOT NULL,
QUANTITY_SUM FLOAT NOT NULL,
CONTRIBUTORS_COUNT INTEGER NOT NULL
)
We feed it with the data from the other tables:
INSERT INTO TUTORIAL.SALES_BY_PRODUCT
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
CREATE VIEW TUTORIAL.SALES_BY_PRODUCT_VIEW AS
SELECT
T0.PRODUCT AS PRODUCT,
sum(T0.QUANTITY) AS QUANTITY_SUM,
COUNT(*) AS CONTRIBUTORS_COUNT
FROM
TUTORIAL.SALES AS T0
GROUP BY
T0.PRODUCT;
To simplify this example on Synapse we create a view containing the Aggregated data. However, views means that the result will be recomputed for each query so it is not a wise choice for an Aggregate Table. We recommend to use a materialized table.
The pre-aggregated data looks like this:
PRODUCT | QUANTITY_SUM | CONTRIBUTORS_COUNT |
---|---|---|
BED_2 | 6 | 4 |
HOO_5 | 3 | 3 |
TAB_0 | 4 | 4 |
TAB_1 | 4 | 3 |
TSH_3 | 3 | 3 |
TSH_4 | 5 | 3 |
Let's first discover the SQL table corresponding to the Aggregate Table:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final TableDescription salesTable =
discoverer.discoverTable(new SqlTableId(PROJECT_ID, "tutorial", "SALES"));
final TableDescription table =
discoverer.discoverTable(new SqlTableId(PROJECT_ID, "tutorial", "SALES_BY_PRODUCT"));
final TableDescription salesTable =
discoverer.discoverTable(new ClickhouseTableId("TUTORIAL", "SALES"));
final TableDescription table =
discoverer.discoverTable(new ClickhouseTableId("TUTORIAL", "sales_by_product"));
final TableDescription salesTable =
discoverer.discoverTable(new SqlTableId(CATALOG_NAME, TUTORIAL_SCHEMA_NAME, "SALES"));
final TableDescription table =
discoverer.discoverTable(
new SqlTableId(CATALOG_NAME, TUTORIAL_SCHEMA_NAME, "SALES_BY_PRODUCT"));
final TableDescription salesTable =
discoverer.discoverTable(new SqlTableId(DATABASE_NAME, SCHEMA_NAME, "SALES"));
final TableDescription table =
discoverer.discoverTable(new SqlTableId(DATABASE_NAME, SCHEMA_NAME, "SALES_BY_PRODUCT"));
final TableDescription salesTable =
discoverer.discoverTable(new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES"));
final TableDescription table =
discoverer.discoverTable(
new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES_BY_PRODUCT"));
final TableDescription salesTable =
discoverer.discoverTable(new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES"));
final TableDescription table =
discoverer.discoverTable(
new SqlTableId(TEST_DATABASE_NAME, TUTORIAL_SCHEMA_NAME, "SALES_BY_PRODUCT"));
final TableDescription salesTable =
discoverer.discoverTable(new SqlTableId(TEST_DATABASE_NAME, "TUTORIAL", "SALES"));
final TableDescription table =
discoverer.discoverTable(
new SqlTableId(TEST_DATABASE_NAME, "TUTORIAL", "SALES_BY_PRODUCT_VIEW"));
We will map the PRODUCT
field of the sales table to the product
field of the Aggregate Table:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "product");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
final Map<FieldPath, String> fieldMapping = Map.of(FieldPath.of("PRODUCT"), "PRODUCT");
We will also map the sum of the sales quantity
column into a column called quantity_sum
:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final List<MeasureMapping> measureMapping =
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping =
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "quantity_sum"),
MeasureMapping.count("contributors_count"));
final List<MeasureMapping> measureMapping =
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping =
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping =
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping =
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
final List<MeasureMapping> measureMapping =
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.count("CONTRIBUTORS_COUNT"));
We can now build the AggregateTable
:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.originBaseTableName(salesTable.getName())
.underlyingTable(table)
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(fieldMapping)
.withMeasuresMappings(measureMapping)
.build();
And add it to the schema:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final SchemaDescription schema =
SchemaDescription.builder()
.externalTables(List.of(salesTable))
.externalAggregateTableDescriptions(List.of(aggregateTable))
.build();
final SchemaDescription schema =
SchemaDescription.builder()
.externalTables(List.of(salesTable))
.externalAggregateTableDescriptions(List.of(aggregateTable))
.build();
final SchemaDescription schema =
SchemaDescription.builder()
.externalTables(List.of(salesTable))
.externalJoins(List.of())
.externalAggregateTableDescriptions(List.of(aggregateTable))
.build();
final SchemaDescription schema =
SchemaDescription.builder()
.externalTables(List.of(salesTable))
.externalAggregateTableDescriptions(List.of(aggregateTable))
.build();
final SchemaDescription schema =
SchemaDescription.builder()
.externalTables(List.of(salesTable))
.externalAggregateTableDescriptions(List.of(aggregateTable))
.build();
final SchemaDescription schema =
SchemaDescription.builder()
.externalTables(List.of(salesTable))
.externalAggregateTableDescriptions(List.of(aggregateTable))
.build();
final SchemaDescription schema =
SchemaDescription.builder()
.externalTables(List.of(salesTable))
.externalAggregateTableDescriptions(List.of(aggregateTable))
.build();
Let's now define a bitmap aggregate provider on the sum of quantity per Product
, the feeding of the in-memory provider will be done directly from the Aggregate Table:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final IActivePivotInstanceDescription cubeDescription =
StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder ->
superMeasureBuilder.withCalculations(
c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(
builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription =
StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.databaseSettings(BigqueryDatabaseSettings.defaults())
.build();
final IActivePivotInstanceDescription cubeDescription =
StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder ->
superMeasureBuilder.withCalculations(
c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(
builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription =
StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.build();
final IActivePivotInstanceDescription cubeDescription =
StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder ->
superMeasureBuilder.withCalculations(
c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(
builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription =
StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.build();
final IActivePivotInstanceDescription cubeDescription =
StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder ->
superMeasureBuilder.withCalculations(
c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(
builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription =
StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.build();
final IActivePivotInstanceDescription cubeDescription =
StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder ->
superMeasureBuilder.withCalculations(
c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(
builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription =
StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.build();
final IActivePivotInstanceDescription cubeDescription =
StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder ->
superMeasureBuilder.withCalculations(
c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(
builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription =
StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.build();
final IActivePivotInstanceDescription cubeDescription =
StartBuilding.cube("MyCube")
.withMeasures(
superMeasureBuilder ->
superMeasureBuilder.withCalculations(
c -> Copper.sum("QUANTITY").as("Quantity.SUM").publish(c)))
.withDimensions(
builder -> builder.withSingleLevelDimensions(List.of("DATE", "PRODUCT", "SHOP")))
.withAggregateProvider()
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.build();
final ISelectionDescription selection =
StartBuilding.selection(schema).fromBaseStore("SALES").withAllReachableFields().build();
final IActivePivotManagerDescription managerDescription =
StartBuilding.managerDescription()
.withName("MyManager")
.withSchema()
.withSelection(selection)
.withCube(cubeDescription)
.build();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.build();
To make sure that the feeding can be done from the Aggregate Table, we can call a validator that will check that the provider is compatible with the Aggregate Table before starting the application:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
AggregateTableValidator.validateAggregateProvidersUseAggregateTable(
aggregateTable, managerDescription, List.of(ProviderCoordinate.at("MyCube", "MyProvider")));
AggregateTableValidator.validateAggregateProvidersUseAggregateTable(
aggregateTable, managerDescription, List.of(ProviderCoordinate.at("MyCube", "MyProvider")));
AggregateTableValidator.validateAggregateProvidersUseAggregateTable(
aggregateTable, managerDescription, List.of(ProviderCoordinate.at("MyCube", "MyProvider")));
AggregateTableValidator.validateAggregateProvidersUseAggregateTable(
aggregateTable, managerDescription, List.of(ProviderCoordinate.at("MyCube", "MyProvider")));
AggregateTableValidator.validateAggregateProvidersUseAggregateTable(
aggregateTable, managerDescription, List.of(ProviderCoordinate.at("MyCube", "MyProvider")));
AggregateTableValidator.validateAggregateProvidersUseAggregateTable(
aggregateTable, managerDescription, List.of(ProviderCoordinate.at("MyCube", "MyProvider")));
AggregateTableValidator.validateAggregateProvidersUseAggregateTable(
aggregateTable, managerDescription, List.of(ProviderCoordinate.at("MyCube", "MyProvider")));
More complex aggregations such as average
In the previous example, SUM was based on a single input and pre-aggregated into a single column, but there are some more complex cases. For instance:
count
has no input and produces 1 pre-aggregated columnsumProduct
can takes N input columns and produces 1 pre-aggregated columnaverage
takes a single input column but requires 2 pre-aggregated column in order to be able to re-aggregate: one column for the sum and one for the count
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
- Synapse
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(
FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "product"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "quantity_sum"),
MeasureMapping.avg(
FieldPath.of("QUANTITY"), "quantity_sum", "contributors_count"),
MeasureMapping.count("contributors_count")))
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(
FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(
FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(
FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(
FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
final AggregateTableDescription aggregateTable =
AggregateTableDescription.builder()
.underlyingTable(table)
.originBaseTableName(salesTable.getName())
.withGroupByFieldsMapping(Map.of(FieldPath.of("PRODUCT"), "PRODUCT"))
.withMeasuresMappings(
List.of(
MeasureMapping.sum(FieldPath.of("QUANTITY"), "QUANTITY_SUM"),
MeasureMapping.avg(
FieldPath.of("QUANTITY"), "QUANTITY_SUM", "CONTRIBUTORS_COUNT"),
MeasureMapping.count("CONTRIBUTORS_COUNT")))
.build();
Count
The count is measure automatically added to any query sent to the database. When adding an aggregate table, a check is therefore performed to ensure it contains a column with a count. (otherwise it would never be used).
Slicing hierarchies
Slicing hierarchies are always expressed in the queries sent to the database. When a field expressed in a query is not part of the Aggregate Table, the query will not use the Aggregate Table. Therefore it is recommended to include the slicing hierarchies in Aggregate Table definition.
Aggregate tables with vectors
Arrays can be aggregated in an aggregate table but they need to have a specific format depending on the type of original array being aggregated:
Original Array | Aggregate Table Array |
---|---|
Native | Native |
Multi Column Array | Multi Column Array |
Multi Row Array | Native |
See the DirectQuery vector documentation for more information about array types.
Generate SQL for Aggregate Table matching an Aggregate Provider
Aggregate Tables are a good tool to feed Aggregate Providers quickly because it can use the aggregated data instead of running a new aggregation. DirectQuery provides a bootstrapper to generate the SQL to create and feed an Aggregate Table matching an Aggregate Provider.
Let's imagine we have the following Aggregate Provider defined:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
.withPartialProvider()
.withName("MyProvider")
.bitmap()
.includingOnlyHierarchies(HierarchyIdentifier.simple("PRODUCT"))
.includingOnlyMeasures("Quantity.SUM")
We can create a bootstrapper for it:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final IAggregateTableBootstrapper bootstrapper =
IAggregateTableBootstrapper.builder()
.connector(connector)
.schema(schema)
.providerCoordinate(provider)
.managerDescription(managerDescription)
.targetTableName(aggregateTableName)
.targetSchemaName(AGGREGATE_TABLE_TEMP_DATASET)
.build();
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final IAggregateTableBootstrapper bootstrapper =
IAggregateTableBootstrapper.builder()
.connector(connector)
.schema(schema)
.providerCoordinate(provider)
.managerDescription(managerDescription)
.build();
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final IAggregateTableBootstrapper bootstrapper =
IAggregateTableBootstrapper.builder()
.connector(connector)
.schema(schema)
.providerCoordinate(provider)
.managerDescription(managerDescription)
.targetTableName(aggregateTableName)
.targetSchemaName(AGGREGATE_TABLE_TEMP_SCHEMA_NAME)
.build();
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final IAggregateTableBootstrapper bootstrapper =
IAggregateTableBootstrapper.builder()
.connector(connector)
.schema(schema)
.providerCoordinate(provider)
.managerDescription(managerDescription)
.build();
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final IAggregateTableBootstrapper bootstrapper =
IAggregateTableBootstrapper.builder()
.connector(connector)
.schema(schema)
.providerCoordinate(provider)
.managerDescription(managerDescription)
.targetTableName(aggregateTableName)
.targetSchemaName(AGGREGATE_TABLE_TEMP_SCHEMA_NAME)
.build();
final ProviderCoordinate provider = ProviderCoordinate.at("MyCube", "MyProvider");
final IAggregateTableBootstrapper bootstrapper =
IAggregateTableBootstrapper.builder()
.connector(connector)
.schema(schema)
.providerCoordinate(provider)
.managerDescription(managerDescription)
.targetTableName(aggregateTableName)
.targetCatalogName(TEST_DATABASE_NAME)
.targetSchemaName(AGGREGATE_TABLE_TEMP_SCHEMA_NAME)
.build();
This bootstrapper can provide the SQL to create and feed the provider:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE `cubulus-tests`.`test_aggregate_table_temp`.`Aggregate_Table_MyProvider` (
`PRODUCT` String NOT NULL,
`Quantity__PERIOD__SUM` INT64,
`contributors__PERIOD__COUNT` INT64
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO `cubulus-tests`.`test_aggregate_table_temp`.`Aggregate_Table_MyProvider` SELECT
`T0`.`PRODUCT` AS `PRODUCT_0`,
SUM(COALESCE(`T0`.`QUANTITY`, 0)) AS `Quantity__PERIOD__SUM_1`,
COUNT(*) AS `contributors__PERIOD__COUNT_2`
FROM
`cubulus-tests`.`tutorial`.`SALES` AS `T0`
GROUP BY
`T0`.`PRODUCT`
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE `TUTORIAL`.`Aggregate_Table_MyProvider` (
`PRODUCT` String,
`Quantity.SUM` Nullable(Float64),
`contributors.COUNT` Nullable(Int64)
) ENGINE = MergeTree()
ORDER BY
`PRODUCT`
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO `TUTORIAL`.`Aggregate_Table_MyProvider` SELECT
`T0`.`PRODUCT` AS `PRODUCT_0`,
SUM(COALESCE(`T0`.`QUANTITY`, 0.0)) AS `Quantity.SUM_1`,
COUNT(*) AS `contributors.COUNT_2`
FROM
`TUTORIAL`.`SALES` AS `T0`
GROUP BY
`T0`.`PRODUCT`
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE `hive_metastore`.`aggregate_table_temp_schema`.`Aggregate_Table_MyProvider` (
`PRODUCT` String NOT NULL,
`Quantity.SUM` DOUBLE,
`contributors.COUNT` BIGINT
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO `hive_metastore`.`aggregate_table_temp_schema`.`Aggregate_Table_MyProvider` SELECT
`T0`.`PRODUCT` AS `PRODUCT_0`,
SUM(`T0`.`QUANTITY`) AS `Quantity.SUM_1`,
COUNT(*) AS `contributors.COUNT_2`
FROM
`hive_metastore`.`tutorial`.`SALES` AS `T0`
GROUP BY
`T0`.`PRODUCT`
System.out.println(bootstrapper.getSqlForCreation());
CREATE TABLE "dev"."TUTORIAL"."Aggregate_Table_MyProvider" (
"PRODUCT" nvarchar(4000) NOT NULL,
"Quantity.SUM" BigInt,
"contributors.COUNT" BigInt
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO "dev"."TUTORIAL"."Aggregate_Table_MyProvider" SELECT
"T0"."PRODUCT" AS "PRODUCT_0",
SUM("T0"."QUANTITY") AS "Quantity.SUM_1",
COUNT(*) AS "contributors.COUNT_2"
FROM
"dev"."TUTORIAL"."SALES" AS "T0"
GROUP BY
"T0"."PRODUCT"
System.out.println(bootstrapper.getSqlForCreation());
SET
enable_case_sensitive_identifier TO true;
CREATE TABLE "dev"."test_aggregate_table_temp"."Aggregate_Table_MyProvider" (
"PRODUCT" VARCHAR NOT NULL,
"Quantity.SUM" FLOAT8,
"contributors.COUNT" BigInt
)
System.out.println(bootstrapper.getSqlForFeeding());
SET enable_case_sensitive_identifier TO true;
INSERT INTO "dev"."test_aggregate_table_temp"."Aggregate_Table_MyProvider" SELECT
"T0"."PRODUCT" AS "PRODUCT_0",
SUM(COALESCE("T0"."QUANTITY", 0.0)) AS "Quantity.SUM_1",
COUNT(*) AS "contributors.COUNT_2"
FROM
"dev"."tutorial"."SALES" AS "T0"
GROUP BY
"T0"."PRODUCT"
System.out.println(bootstrapper.getSqlForCreation());
CREATE TRANSIENT TABLE "TEST_RESOURCES"."TEST_AGGREGATE_TABLE_TEMP"."Aggregate_Table_MyProvider" (
"PRODUCT" VARCHAR NOT NULL,
"Quantity.SUM" DOUBLE,
"contributors.COUNT" NUMBER
)
System.out.println(bootstrapper.getSqlForFeeding());
INSERT INTO "TEST_RESOURCES"."TEST_AGGREGATE_TABLE_TEMP"."Aggregate_Table_MyProvider" SELECT
"T0"."PRODUCT" AS "PRODUCT_0",
SUM(COALESCE("T0"."QUANTITY", 0.0)) AS "Quantity.SUM_1",
COUNT(*) AS "contributors.COUNT_2"
FROM
"TEST_RESOURCES"."TUTORIAL"."SALES" AS "T0"
GROUP BY
"T0"."PRODUCT"
You will need to execute this SQL to create and feed the table. Creating and feeding the SQL table only need to be done once: remember that the purpose of using an Aggregate Table is to avoid re-running the same queries at every start up. A good way to do that is to run the queries manually.
When the table exists, it is possible to use the Aggregate Table in the application, the Aggregate Provider will use it for feeding:
- BigQuery
- ClickHouse
- Databricks
- MsSql
- Redshift
- Snowflake
final AggregateTableDescription aggregateTable = bootstrapper.toAggregateTable();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTableDescription aggregateTable = bootstrapper.toAggregateTable();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTableDescription aggregateTable = bootstrapper.toAggregateTable();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTableDescription aggregateTable = bootstrapper.toAggregateTable();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTableDescription aggregateTable = bootstrapper.toAggregateTable();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
final AggregateTableDescription aggregateTable = bootstrapper.toAggregateTable();
final Application app =
Application.builder(connector)
.schema(schema)
.managerDescription(managerDescription)
.additionalAggregateTables(List.of(aggregateTable))
.build();
app.start();
Database specific features
Creating a view for an Aggregate Table is not a solution as the view must be recomputed for each query. Creating a table requires to update it when the data changes. Some databases offers native implementations to automatically create and materialize the content of a view (so it is fast to access) while automatically updating it (so it is always up-to-date).
- Snowflake
In Snowflake there is a powerful feature called Dynamic tables. As these Dynamic tables also support time travel it is the recommended way to implement an Aggregate Table in Snowflake.
Is my query using the Aggregate Table ?
The simplest solution to check whether the query is using an aggregate table is to look at the query in the external database. If that is the case, then the aggregate table name should appear in the SQL query as the base table (the first table in the FROM clause). If it using an aggregate table, the aggregate table name should appears as the base table (the first table in FROM clause) in the SQL query.
Additionally, for databases supporting tags such as Snowflake, the tag aggregate_table_name
will be added to the query with the name of the aggregate table used.
While building a project, to have more information about why a query is compatible or not with aggregate tables, it is possible to add additional logs by setting the logger atoti.server.directquery.query_resolution.aggregate_table
(see Logger naming) to FINE
.
The additional logs look like this:
FINE: Trying to match query with 1 aggregate tables. Query is [...]
Aug 21, 2023 4:46:37 PM atoti.server.directquery.query_resolution.aggregate_table isCompatibleAggregateTable
FINE: The query did not match the Aggregate Table AggregateTable-0. Compatible base table: true, compatible group by fields: true, compatible selection fields: true, compatible aggregations: true, compatible condition: false. Query was [...]
Aug 21, 2023 4:46:37 PM atoti.server.directquery.query_resolution.aggregate_table findMatchingAggregateTable
FINE: No matching Aggregate Table for query [...]