Skip to main content

User Defined Aggregation Functions

DirectQuery delegates some aggregation work to the remote database. The core ActivePivot aggregation (such as SUM, AVG, MAX...) are natively converted to the remote database equivalent aggregations.

However, if you write some custom aggregation functions (UDAF) you will also need to define how this aggregation is done in your remote database.

Sum product example

This example is based on a ClickHouse database and SQL but can easily be ported to other databases.

For instance let's consider a simple sum product on scalar values, which in SQL would be SUM(quantity * price).

Here is a simple implementation with Copper:

Copper.userDefinedAgg("QUANTITY", "UNIT_PRICE")
.aggregationBuffer(Types.TYPE_DOUBLE)
.pluginKey("MY_SUM_PRODUCT")
.contribute((reader, agg) -> agg.addDouble(0, reader.readDouble(0) * reader.readDouble(1)))
.merge((in, out) -> out.addDouble(0, in.readDouble(0)))
.outputFromBuffer(0)
.as("My Sum Product")
.publish(context);

When converting the query to SQL for ClickHouse, DirectQuery will look for a IClickhouseAggregationFunction with the same plugin key. This aggregation function is responsible for creating the ISqlAggregation depending on the fields. The ISqlAggregation will convert the fields to SQL code.

Let's implement these two classes:

  • The aggregation function will basically create the Aggregation.

public class ExampleSumProductAggregationFunction extends ASqlAggregationFunction {
public static final String KEY = "MY_SUM_PRODUCT";
@Override
public String key() {
return KEY;
}
@Override
public ISqlAggregation createAggregation(
final List<ISqlFieldDescription> fields,
final IFieldType outputType) {
if (fields.size() != 2) {
throw new IllegalArgumentException("Sum product must have 2 fields.");
}
return new ExampleSumProductAggregation(outputType);
}
}

  • The SQL aggregation is made of two methods:
    • getSqlAggregationElements will receive the fields and generates the SQL code.
    • transformResultingColumn will receive the columns from the query result and will convert them to an ActivePivot reader.

public class ExampleSumProductAggregation extends ASqlAggregation {
protected ExampleSumProductAggregation(final IFieldType outputType) {
super(outputType);
}
@Override
public List<SqlAggregationElement> getSqlAggregationElements(List<SqlQueryTableField> sqlFields) {
return List.of(
new SqlAggregationElement(
getOutputType(),
sqlFields,
fields -> "SUM(" + ClickhouseSqlGenerator.toSql(fields.get(0)) + " * "
+ ClickhouseSqlGenerator.toSql(fields.get(1)) + ")"));
}
@Override
public IArrayReader transformResultingColumn(List<IArrayReader> columns) {
AggregationUtil.ensureResultingColumnCount(columns, 1);
return AggregationUtil.createUdafReader(columns, List.of(getOutputType()));
}
}

And we simply need to register the additional aggregation function:

Registry.getPlugin(IClickhouseAggregationFunction.class).add(new ExampleSumProductAggregationFunction());

This new Aggregation can now be used in a DirectQuery project and the computation will be pushed to the remote database.

final ICellSet result = ActivePivotQueryRunner.create()
.withWildcardCoordinate("DATE")
.forMeasures("My Sum Product")
.run(pivot);
GetAggregatesResultCellset.check(pivot, result).getHumanRepresentationOfCellSet();

DATEMy Sum Product
2022-02-03580.0
2022-01-31798.0
2022-02-04258.0
2022-02-011030.0
2022-02-02604.0