User Defined Aggregation Functions
DirectQuery delegates some aggregation work to the remote database.
The core ActivePivot aggregation (such as SUM
, AVG
, MAX
...) are natively converted to the remote database equivalent aggregations.
However, if you write some custom aggregation functions (UDAF) you will also need to define how this aggregation is done in your remote database.
Sum product example
This example is based on a ClickHouse database and SQL but can easily be ported to other databases.
For instance let's consider a simple sum product on scalar values, which in SQL would be SUM(quantity * price)
.
Here is a simple implementation with Copper:
Copper.userDefinedAgg("QUANTITY", "UNIT_PRICE")
.aggregationBuffer(Types.TYPE_DOUBLE)
.pluginKey("MY_SUM_PRODUCT")
.contribute((reader, agg) -> agg.addDouble(0, reader.readDouble(0) * reader.readDouble(1)))
.merge((in, out) -> out.addDouble(0, in.readDouble(0)))
.outputFromBuffer(0)
.as("My Sum Product")
.publish(context);
When converting the query to SQL for ClickHouse, DirectQuery will look for a IClickhouseAggregationFunction
with the same plugin key.
This aggregation function is responsible for creating the ISqlAggregation
depending on the fields.
The ISqlAggregation
will convert the fields to SQL code.
Let's implement these two classes:
- The aggregation function will basically create the Aggregation.
public class ExampleSumProductAggregationFunction extends ASqlAggregationFunction {
public static final String KEY = "MY_SUM_PRODUCT";
@Override
public String key() {
return KEY;
}
@Override
public ISqlAggregation createAggregation(
final List<ISqlFieldDescription> fields,
final IFieldType outputType) {
if (fields.size() != 2) {
throw new IllegalArgumentException("Sum product must have 2 fields.");
}
return new ExampleSumProductAggregation(outputType);
}
}
- The SQL aggregation is made of two methods:
getSqlAggregationElements
will receive the fields and generates the SQL code.transformResultingColumn
will receive the columns from the query result and will convert them to an ActivePivot reader.
public class ExampleSumProductAggregation extends ASqlAggregation {
protected ExampleSumProductAggregation(final IFieldType outputType) {
super(outputType);
}
@Override
public List<SqlAggregationElement> getSqlAggregationElements(List<SqlQueryTableField> sqlFields) {
return List.of(
new SqlAggregationElement(
getOutputType(),
sqlFields,
fields -> "SUM(" + ClickhouseSqlGenerator.toSql(fields.get(0)) + " * "
+ ClickhouseSqlGenerator.toSql(fields.get(1)) + ")"));
}
@Override
public IArrayReader transformResultingColumn(List<IArrayReader> columns) {
AggregationUtil.ensureResultingColumnCount(columns, 1);
return AggregationUtil.createUdafReader(columns, List.of(getOutputType()));
}
}
And we simply need to register the additional aggregation function:
Registry.getPlugin(IClickhouseAggregationFunction.class).add(new ExampleSumProductAggregationFunction());
This new Aggregation can now be used in a DirectQuery project and the computation will be pushed to the remote database.
final ICellSet result = ActivePivotQueryRunner.create()
.withWildcardCoordinate("DATE")
.forMeasures("My Sum Product")
.run(pivot);
GetAggregatesResultCellset.check(pivot, result).getHumanRepresentationOfCellSet();
DATE | My Sum Product |
---|---|
2022-02-03 | 580.0 |
2022-01-31 | 798.0 |
2022-02-04 | 258.0 |
2022-02-01 | 1030.0 |
2022-02-02 | 604.0 |