# Aggregation Functions

## Concept

In Database Management Systems (*DBMS*), an Aggregation Function, or Aggregate Function, is a function that operates on values across rows
to perform mathematical calculations such as sum, average, count, minimum/maximum values, standard deviation,
as well as some non-mathematical operations, like finding the range of dates covered by the values.
An aggregate function takes multiple rows as input and produces a single output.

While the term can also be found in OnLine Transaction Processing (OLTP) workflows, with standard SQL aggregation functions like `COUNT`

, `MIN`

, etc,
OnLine Analytical Processing (OLAP) systems such as ActivePivot are built specifically for aggregation.

Similarly to how an aggregation function is, in any DBMS, associated with some columns at query time, ActivePivot aggregation functions are associated with table columns to create aggregated measures.

The SQL query `SELECT SUM(dollar_value) FROM trades`

will naturally return the sum of the values of all trades stored in the *trades* table.
Similarly, the ActivePivot `SumFunction`

can be associated with the column `dollar_value`

to create the measure `dollar_value.SUM`

.
The same SUM value for all trades can then be found in the cube, as a top level aggregate.

The concept of aggregation is orthogonal to data storage, and should rather be considered as an analytics notion.

### Vocabulary

An aggregate function takes multiple rows as input and produces a single output.

The input values are called `source data`

, or `source values`

, and the output value is called `aggregated value`

.

Aggregating source value `trade_value`

into the aggregated value `trade_aggregate`

is the act of adding a `trade_value`

contribution to `trade_aggregate`

, according to the selected aggregation function.
For instance, if `result = 2`

and `val = 3`

, aggregating `trade_value`

to `trade_aggregate`

:

- using the
`SUM`

function will produce`trade_aggregate = 5`

. - using the
`MAX`

function will produce`trade_aggregate = 3`

. - using the
`SQUARE_SUM`

function will produce`trade_aggregate = 2 + 3*3 = 11`

.

## ActivePivot Specifics

As mentioned in the Concept Section, ActivePivot is tailored for OLAP workloads.
To provide an adequate performance, ActivePivot's aggregation engine can store pre-aggregated data
for some selected cube locations, in the `AggregateProvider`

component.

For instance, if a bank has a million open positions on the market, visualizing the total amount involved in these trades requires retrieving a million values and summing them.

However, if the bank is divided into 15 entities, and ActivePivot is storing pre-aggregated data at the entity level, computing that total amount only requires summing the 15 aggregated values.

Let's use a very small example to represent the aforementioned storage component:

Location | `dollar_value.SUM` |
---|---|

EntityA | 1000 |

EntityB | 50 |

EntityC | `null` |

`EntityC`

has no open position as of end of day yesterday.
Today, all 3 entities closed some positions, and opened some new ones.

Added:

Location | `dollar_value.SUM` |
---|---|

EntityB | 2 |

EntityA | 264 |

EntityC | 46000 |

EntityD | 150 |

The bank created a new entity and traded in its name.

Removed:

Location | `dollar_value.SUM` |
---|---|

EntityA | 110 |

EntityB | 27 |

In these tables, the entities are not in the same order as in the storage. The transactional flow that updates the aggregate provider will take care of aligning the locations for data coherence, resulting in:

Location | `dollar_value.SUM` |
---|---|

EntityA | 1000 - 110 + 264 = 1154 |

EntityB | 50 + 2 - 27 = 25 |

EntityC | 46000 |

EntityD | 150 |

The previous example showcases how the `SUM`

aggregation function is responsible for more than simply taking a group of values and summing them.
It knows how to initialize the first value for `EntityD`

, it knows how to aggregate some trades to that first copied value, and how to disaggregate other trades from the stored value.
It knows how to group pre-aggregated values to output an aggregated value that represents a bigger scope.

To be able to do all of this, ActivePivot aggregation functions are able to take some source columns, and an aggregated column, and bind them together. The use of that binding is outside the scope of the function, which only handles its creation.

In ActivePivot, the aggregation function concept is divided into 3 separate contracts:

Specified by the

`IGenericAggregationFunction`

contract, aggregation functions represent the exposed API. Each aggregation function is an`IPluginValue`

, stored in the registry, under the`Plugin`

`GenericAggregationFunctionPlugin`

. Custom implementations must be added to the registry, through annotations or manual insertion.Aggregation Functions are factories for

`IGenericAggregation`

s, which can each be seen as an implementation of its associated Aggregation Function for specific data sources.`IGenericAggregation`

s are objects that keep:- the names of the columns that hold the data being aggregated. These may be table fields, measure names, etc... This is used to uniquely identify the sources an aggregation requires.
- the types, from the
`com.qfs.store.Types`

API, of these columns. - the type of the aggregated data. It is the type of the result of the reduction function mentioned beforehand.

Based on these information, an

`IGenericAggregation`

will specify how the chunks that will hold the aggregated data are created (typically, chunks of the particular aggregated type).`IGenericAggregation`

s are used to produce`IAggregationBinding`

s, which are responsible for the computation of the aggregated values.

## Natively Supported Aggregation Functions

This is the list of natively supported aggregation functions, in alphabetical order.

The following notations are used:

Symbol | Meaning |
---|---|

fully supported | |

partially supported | |

will not be supported |

In cases where `null`

is supported, the aggregated type will depend on the nullability of the types of the sources.

### Average

Returns the average of the source values. The average is defined as the sum of the contributions, divided by the number of contributions.

#### Support

`null`

values are strictly ignored. They do not influence the sum of the contributions, nor do they influence the number of contributions.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | `double` | ||

Non-Numerical scalar | |||

Numerical vectors | `double vector` | Returns a vector containing the term by term average. Aggregated vectors must all have the same size. | |

Non-Numerical vectors |

This function supports disaggregation. Disaggregating a value removes its contribution from the sum, and reduces the number of contributions.
Disaggregating down to `zero`

contributions will return `null`

, to prevent dividing by `zero`

.

### Copy

Copies the given source value and replace the aggregated value.

#### Support

`null`

values are supported. Copying `null`

will effectively write `null`

in place of the previous aggregated value.
The type of the aggregated value is exactly the type of the source values.

Type | Support |
---|---|

Numerical scalar | |

Non-Numerical scalar | |

Numerical vectors | |

Non-Numerical vectors |

This function does not support aggregating or disaggregating. It only supports replacing the current value with the source value.

### Count

Counts the number of contributions to the aggregated value, no matter the contributed values.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.
This function returns `long`

values.

Type | Support |
---|---|

Numerical scalar | |

Non-Numerical scalar | |

Numerical vectors | |

Non-Numerical vectors |

This function supports disaggregation. Disaggregating a value reduces the number of contributions.

### Gross Sum

Returns the sum of the absolute values of the contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type | ||

Non-Numerical scalar | |||

Numerical vectors | Same as source type | Returns a vector containing the term by term gross sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |

Non-Numerical vectors |

This function supports disaggregation.

### Long

Returns the sum of the positive contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type | ||

Non-Numerical scalar | |||

Numerical vectors | Same as source type | Returns a vector containing the term by term long sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |

Non-Numerical vectors |

This function supports disaggregation.

### Max

Returns the maximum of the contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Objects implementing `Comparable`

are **NOT** supported.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type | ||

Non-Numerical scalar | |||

Numerical vectors | |||

Non-Numerical vectors |

This function does not support disaggregation. A specialized version of this function, `MaxHistoryFunction`

, supports the disaggregation of values.
To do so, it must keep the entire history of contributed values, to be able to retrieve the next maximum. Thus, its usage is memory intensive.

### Median

Returns the median of the contributions. In other words, if there is an odd number of contributions, it returns their middle value. If there is an even number of contributions, the average of the two values that are closest to the middle is returned.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, or no values are aggregated, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | double | ||

Non-Numerical scalar | |||

Numerical vectors | |||

Non-Numerical vectors |

This function supports disaggregation, albeit with poor performance. As it must keep the entire history of contributed values, this function is quite memory-intensive.

### Min

Returns the minimum of the contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Objects implementing `Comparable`

are **NOT** supported.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type | ||

Non-Numerical scalar | |||

Numerical vectors | |||

Non-Numerical vectors |

This function does not support disaggregation. A specialized version of this function, `MinHistoryFunction`

, supports the disaggregation of values.
To do so, it must keep the entire history of contributed values, to be able to retrieve the next minimum. Thus, its usage is memory intensive.

### Multiply

Returns the product of the contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type*. | *`int` are aggregated as `long` to prevent overflows. | |

Non-Numerical scalar | |||

Numerical vectors | |||

Non-Numerical vectors |

This function supports disaggregation.

### Short

Returns the sum of the negative contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type | ||

Non-Numerical scalar | |||

Numerical vectors | Same as source type | Returns a vector containing the term by term short sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |

Non-Numerical vectors |

This function supports disaggregation.

### Single Value

Copies the given source value. Once an aggregated value, it is only possible to contribute a value that is equal to the aggregated value. This contribution has no effect. This aggregation function is mostly used to acknowledge the presence of values, without needing to know which ones.

#### Support

`null`

values are supported. Copying `null`

will effectively write `null`

in place of the previous aggregated value.
Contributing a non-`null`

value when the aggregated value is `null`

will simply copy the source value into the aggregated column.
Contributing `null`

has no effect, even if the aggregated value isn't `null`

.

The type of the aggregated value is exactly the type of the source values.

Type | Support |
---|---|

Numerical scalar | |

Non-Numerical scalar | |

Numerical vectors | |

Non-Numerical vectors |

This function does not support disaggregation.

### Square Sum

Returns the square sum of the contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type*. | *`int` are aggregated as `long` to prevent overflows. | |

Non-Numerical scalar | |||

Numerical vectors | |||

Non-Numerical vectors |

This function supports disaggregation.

### Standard deviation

Calculates the standard deviation of the contributions using either the population or sample formulas depending on the plugin key.

#### Support

`null`

values are strictly ignored. If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | `double` | All values are aggregated as `double` s | |

Non-Numerical scalar | |||

Numerical vectors | |||

Non-Numerical vectors |

This function supports disaggregation.

### Sum

Returns the sum of the contributions.

#### Support

`null`

values are strictly ignored.
If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | Same as source type | ||

Non-Numerical scalar | |||

Numerical vectors | Same as source type | Returns a vector containing the term by term sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |

Non-Numerical vectors |

This function supports disaggregation.

### Timestamp

Records the timestamp of the last update. When contributing a source value to the aggregated value, this function will check which of the two temporal values is the latest, and keep it as the aggregated value.

#### Support

This function supports `int`

and `long`

representations of a timestamp. `null`

is not supported in that case.
It also supports `ITimestamp`

objects. In that case, `null`

values are supported.

Disaggregating is also an update. For this function, disaggregating and aggregating values are the same thing.

### Variance

Calculates the variance of the contributions using either the population or sample formulas depending on the plugin key.

#### Support

`null`

values are strictly ignored. If all the values aggregated into the output value are `null`

, the output value is `null`

as well.

Type | Support | Aggregated Type | Details |
---|---|---|---|

Numerical scalar | `double` | All values are aggregated as `double` s | |

Non-Numerical scalar | |||

Numerical vectors | |||

Non-Numerical vectors |

This function supports disaggregation.

## Modifying native Aggregation Functions

The Aggregation Functions in ActivePivot are `PluginValue`

s. As such, they are registered in the global `Registry`

, where they can
be replaced by a different implementation.

For instance, the `SumFunction`

outputs `int`

primitive values when summing `int`

source values. A user may know that the amount of
data one is using `SUM`

with might result in Integer Overflow errors. The user can then override the `SumFunction`

with a `CustomSumFunction`

,
and modify its `getAggregatedType(sourceType)`

method to return `Types.TYPE_LONG`

instead of `Types.TYPE_INT`

when the source type is
`Types.TYPE_INT`

.

Then, this user needs to ensure `CustomSumFunction`

will take the place of `SumFunction`

in the Registry.

## Writing a custom Aggregation Function

Writing a custom aggregation function will require implementing the 3 aforementioned contracts:

`IGenericAggregationFunction`

, or one of its specialized interface or abstract class`IGenericAggregation`

, or one of its specialized interface or abstract class`IAggregationBinding`

, or one of its specialized abstract class

Let's write a function that counts the number of strictly positive contributions.

First, one needs to implement the `IGenericAggregationFunction`

contract.
Since this function will only count one source column at a time,
it can extend the `AAggregationFunction`

abstract class to reduce boilerplate code.
The function `CountIfPositiveFunction`

needs to define the key that uniquely identifies it in the Registry,
and to make itself known to the Registry:

`@QuartetPluginValue(intf = IGenericAggregationFunction.class)`

public class CountIfPositiveFunction extends AAggregationFunction {

public static final String PLUGIN_KEY = "COUNT_IF_POSITIVE";

public CountIfPositiveFunction() {

super(PLUGIN_KEY);

}

@Override

public Object key() {

return PLUGIN_KEY;

}

The function can now be called wherever it is needed, through its plugin key. Let's now write its logic.

The function will always output `long`

values, meaning it can override the `getAggregatedType`

method like so:

`@Override`

public int getAggregatedType(final int sourceType) {

if (Types.isDictionary(sourceType) || !Types.isNumericalType(sourceType))

throw unsupportedSourceType(sourceType);

return Types.TYPE_LONG;

}

Since the function will check for positive values, it must be numerical. The values cannot be dictionarized, as the function needs to read the value to perform this check.

Then, the function must produce an `IGenericAggregation`

, or, in that case with one source column, its specialization `IAggregation`

:

`@Override`

public IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {

return new CountIfPositiveAggregation(sourceIdentifier, this, sourceType, getAggregatedType(sourceType));

}

`CountIfPositiveAggregation`

can extend `AAggregation<CountIfPositiveFunction>`

. That way, boilerplate getters are already defined in the parent class.

`public static class CountIfPositiveAggregation extends AAggregation<CountIfPositiveFunction> {`

First, the `IGenericAggregation`

must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:

`public CountIfPositiveAggregation(`

final String sourceIdentifier,

final CountIfPositiveFunction aggFun,

final int sourceType,

final int aggregatedType) {

super(sourceIdentifier, aggFun, sourceType, aggregatedType);

}

Since this aggregation outputs `Types.TYPE_LONG`

, the factory is relatively straightforward:

`@Override`

public IChunkFactory<?> createChunkFactory(

final boolean isTransient,

final IAllocationSettings allocationSettings) {

final boolean nullable = false;

return new ChunkFactory<>(

isTransient,

allocationSettings,

(allocator, settings, chunkSize) -> allocator.allocateChunkLong(chunkSize, nullable));

}

This aggregation could also use the helper factories from `ChunkFactories`

.
The code snippet above showcases a more generic implementation.

The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:

`@Override`

public IAggregationBinding bindSource(IArrayReader source, IArrayWriter destination) {

return new CountIfPositiveBinding(source, destination);

}

Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together.
That means, if both columns contain aggregated values obtained through the `CountIfPositiveFunction`

, they can be combined.

Since the "positivity of the source values" condition has been checked using the `CountIfPositiveBinding`

mentioned above,
when aggregating source values, the bindings of the `SumFunction`

can be re-used to sum the count of the two columns.

`@Override`

public IAggregationBinding bindAggregates(IArrayReader source, IArrayWriter destination) {

return new SumBindingLong(source, destination);

}

Let's now write the `CountIfPositiveBinding`

:

`public static class CountIfPositiveBinding extends AAggregationBinding {`

private final IArrayReader source;

private final IArrayWriter aggregates;

public CountIfPositiveBinding(final IArrayReader source, final IArrayWriter aggregates) {

super();

this.source = source;

this.aggregates = aggregates;

}

@Override

public void copy(int from, int to) {

aggregates.writeLong(to, source.readDouble(from) > 0 ? 1L : 0L);

}

@Override

public void aggregate(int from, int to) {

if (source.readDouble(from) > 0)

aggregates.addLong(to, 1L);

}

@Override

public void disaggregate(int from, int to) {

if (source.readDouble(from) > 0)

aggregates.addLong(to, -1L);

}

}

## Advanced considerations

### Strongly typed aggregation functions

In the `CountIfPositiveFunction`

example, one may note that in order to perform the comparison to 0, the sources are read using the specialized `readDouble`

method.
This allows the example to correctly function for the four numerical primitive types in Java supported in ActivePivot, using the automatic primitive conversions of the language.

However, a user can write specifically typed aggregation bindings for each of the supported primitive type. In doing so, the user avoids this unnecessary conversion.

To achieve so, one must rewrite the `IAggregation createAggregation(final String sourceIdentifier, final int sourceType)`

function to handle specific types in a different way,
and follow the previous tutorial's flow for each type.

### Null support

Note that for simplicity's sake, the above snippets do not handle `null`

values in the sources.
The correct condition should probably be: `if (!source.isNull(from) && source.readDouble(from) > 0)`

.
In that case, it would be even better to create two bindings: `CountIfPositiveBinding`

and `CountIfPositiveBindingWithNullSupport`

.
The second would check for null values in the sources, at the cost of a null check, while the first one would be specialized for sources that are **not** nullable,
and doesn't need that additional check, for performance aspects.

### Mutability/Immutability

While writing bindings for primitive types is relatively straight forward, aggregating objects is a much more delicate matter,
because of the memory footprint of `Objects`

.
Since arguments in Java are always passed-by-value, one must carefully plan around the potential modification of the aggregated `Object`

.

For instance, let's write an aggregation function that takes a source of `java.time.Duration`

(that represents the time that
passed between the submission of a market order and its execution), and computes the total time it took to execute all orders.

This can be adapted to compute the average time for an order execution, but for simplicity's sake, this example will showcase the SUM's implementation.

Let's say these are the values stored in a database table named *trades*: (notation :
`Duration(1, 500)`

is a `Duration`

object that has been created using `private Duration.create(1L second, 500 nanoseconds)`

)

TradeId | dollar_value | execution_time |
---|---|---|

0 | 1000 | `Duration(1, 500)` |

1 | 50 | `Duration(0, 65000)` |

2 | 250 | `Duration(12, 888 888)` |

To compute `execution_time.SUM`

, the binding can start by copying `Duration(1, 500)`

in the aggregated column.

To simplify this example, let's assume that all data must be aggregated at index 0 of the aggregated column.

First, `binding.copy(0, 0)`

is called, with this definition:

`public class DurationBinding extends AAggregationBinding {`

@Override

public void copy(int from, int to) {

final Duration sourceDuration = source.read(from);

destination.write(to, sourceDuration);

}

}

Following the Java Memory model, this action will add in the corresponding cell of the aggregated column
a reference to the actual `Duration(1, 500)`

that is stored on heap.
Then, to aggregate the next value, the binding can call `binding.aggregate(1, 0)`

:

`public class DurationBinding extends AAggregationBinding {`

@Override

public void aggregate(int from, int to) {

final Duration sourceDuration = source.read(from);

final Duration currentDuration = destination.read(to);

destination.write(to, currentDuration.plus(sourceDuration));

}

}

However, doing so will, according to `Duration#plus(Duration)`

's javadoc, create a new `Duration`

object as the result of the sum.
This means that every aggregation operation creates a new object on the heap, and allows the previous object to be garbage collected once it is referenced no more.

This is problematic, because aggregation bindings are performance sensitive, and this method may be called millions of times, thus creating millions of transient
instances of the class `java.time.Duration`

and needlessly putting pressure on the application.

Since no way around this issue can be found while using the immutable `java.time.Duration`

, there is a strong incentive to create a mutable copy of the class,
which will be called `MutableDuration`

in the rest of this page.

This isn't so far-fetched: before Java 8, time representations were so poor that

`org.joda.time`

was developed as a very popular open-sourced alternative.`org.joda.time.MutableDateTime`

was created as part of this alternative, and serves as a model for this example.

`MutableDuration`

will have the exact same API as `java.time.Duration`

, but for this example's sake, `public Duration plus(Duration)`

is replaced by `public void plus(MutableDuration)`

.
This method mutates the current object.

The binding now evolves to become:

`public class MutableDurationBinding extends AAggregationBinding {`

@Override

public void aggregate(int from, int to) {

final MutableDuration sourceDuration = source.read(from);

final MutableDuration currentDuration = destination.read(to);

currentDuration.plus(sourceDuration); // no need to write to the destination column anymore, as the currentDuration is modified in place.

}

}

To recap,

`binding.copy(0, 0)`

will copy`MutableDuration(1, 500)`

to the aggregated column, in the cell of index 0.`binding.aggregate(1, 0)`

will aggregate`MutableDuration(0, 65000)`

into the current value in the cell at index 0.

**However, this operation is now mutating the instance MutableDuration(1, 500), which is the same instance as the one stored in the trades. It just became MutableDuration(1, 65500).
This means that because of the pass-by-value paradigm, the MutableDuration being modified in the same object stored on the Java Heap that is referenced from the database table**

To prevent that from ever happening, the `copy`

method of the binding can be smarter: it can clone the `MutableDuration`

instead of simply passing it, and store the cloned
object in the aggregated column:

`public class MutableDurationBinding extends AAggregationBinding {`

@Override

public void copy(int from, int to) {

final MutableDuration sourceDuration = source.read(from);

destination.write(to, MutableDuration.from(sourceDuration)); // write a clone that is only owned by the aggregated column.

}

}

This solution will not have any side effect, but it will still prove costly memory-wise.
Indeed, if there are lots of aggregated values that are computed simply by copying an initial contribution, without ever aggregating further contributions,
it will have cloned many `MutableDuration`

objects to no benefit.

An optimal solution will only clone when it's needed. It will first copy the value, but mark it as unmodifiable. Then, at the time of calling `aggregate`

,
it will clone the current value,and then call `MutableDuration#plus(MutableDuration)`

on the cloned value.

That cloned object will forever be the one stored in the aggregated column, and all users are free to modify that value without impacting any other code part that could hold a reference to the same address on the Java Heap.

To help users achieve this ideal scenario, ActivePivot comes with the abstract aggregation binding `AAggregationBindingCloneable`

.
This binding simply requires the user to implement `MutableDuration cloneAggregate(MutableDuration)`

, on top of the regular methods of a binding.
In exchange, it provides multiple methods that handle cloning for the user.

`public class MutableDurationBinding extends AAggregationBindingCloneable<MutableDuration> {`

protected MutableDurationBinding(final IArrayReader source, final IArrayWriter destination) {

super(source, destination);

}

@Override

protected MutableDuration cloneAggregate(final MutableDuration aggregate) {

return MutableDuration.from(aggregate);

}

@Override

public void copy(final int fromPosition, final int toPosition) {

final MutableDuration sourceDuration = readInput(fromPosition);

writeReadOnlyAggregate(toPosition, sourceDuration); // can be null

}

@Override

public void aggregate(final int fromPosition, final int toPosition) {

final MutableDuration sourceDuration = readInput(fromPosition);

if (Objects.isNull(sourceDuration)) {

return; // Nothing to do.

}

final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);

if (Objects.isNull(currentAggregate)) {

copy(fromPosition, toPosition);

} else {

// will handle cloning for me if necessary

final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);

writableCurrentAggregate.plus(sourceDuration);

}

}

@Override

public void disaggregate(final int fromPosition, final int toPosition) {

final MutableDuration sourceDuration = readInput(fromPosition);

if (Objects.isNull(sourceDuration)) {

return; // Nothing to do.

}

final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);

if (Objects.isNull(currentAggregate)) {

throw new IllegalStateException("Cannot disaggregate from a null value");

} else if (sourceDuration == currentAggregate) {

// Shortcut in the exceptional case where this is the exact same object, replace it by null.

removeAggregate(toPosition);

} else {

// will handle cloning for me if necessary

final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);

writableCurrentAggregate.minus(sourceDuration);

}

}

}

### Aggregating Vectors

Vectors are one of the most frequently used features of ActivePivot. Querying and aggregating vectors is extremely CPU and memory intensive, and should be accomplished with great care.

ActivePivot provides a specialization of the class `AAggregationBindingCloneable`

, designed for vector aggregation: `AVectorAggregationBinding`

.
This binding handles the internal aspects of vector aggregation: on-heap and off-heap allocation, cloning, reference counting for garbage collection, etc...

In order to implement a vector aggregation binding, it is **strongly** recommended to extend this class.

Let's write an aggregation function that produces the sum of each source vector, limited to their first two elements.

This function will typically be difficult to write, for the following reasons:

- The source
`IVector`

s may be on-heap or off-heap. - The aggregated
`IVector`

can be lazily copied, as seen in the previous example with`MutableDuration`

. - Acquiring and storing a sub-part of an
`IVector`

may have an impact on the vector's reference counting.

First, one needs to implement the `IGenericAggregationFunction`

contract.
Since this function will only count one source column at a time, it can extend the `AAggregationFunction`

abstract class to reduce boilerplate code.
The `SubVectorSumFunction`

function needs to add itself to the Registry using a unique key:

`@QuartetPluginValue(intf = IGenericAggregationFunction.class)`

public class SubVectorSumFunction extends AAggregationFunction {

public static final String PLUGIN_KEY = "SUB_VECTOR_SIZE_2_SUM";

public SubVectorSumFunction() {

super(PLUGIN_KEY);

}

@Override

public Object key() {

return PLUGIN_KEY;

}

The function can now be called wherever it is needed, using its plugin key. Let's write its logic.
The function will always output numerical arrays, meaning it can override the `getAggregatedType`

method like so:

`@Override`

public int getAggregatedType(final int sourceType) {

if (sourceType == Types.TYPE_INT_ARRAY) {

return sourceType;

} else if (sourceType == Types.TYPE_LONG_ARRAY) {

return sourceType;

} else if (sourceType == Types.TYPE_FLOAT_ARRAY) {

return sourceType;

} else if (sourceType == Types.TYPE_DOUBLE_ARRAY) {

return sourceType;

} else {

throw unsupportedSourceType(sourceType);

}

}

Then, the function must produce an `IAggregation`

, the appropriate `IGenericAggregation`

type in the single source column case.

`@Override`

protected IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {

return new SubVectorSumAggregation(sourceIdentifier, this, sourceType, getAggregatedType(sourceType));

}

`SubVectorSumFunction`

can extend `AAggregation<CountIfPositiveFunction>`

to inherit boilerplate getters.

`public static class SubVectorSumAggregation extends AAggregation<SubVectorSumFunction> {`

First, the `IGenericAggregation`

must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:

`public SubVectorSumAggregation(`

final String sourceIdentifier,

final SubVectorSumFunction aggFun,

final int sourceType,

final int aggregatedType) {

super(sourceIdentifier, aggFun, sourceType, aggregatedType);

}

Since this aggregation outputs vectors, the factory is relatively straightforward:

`@Override`

public IChunkFactory<?> createChunkFactory(

final boolean isTransient,

final IAllocationSettings allocationSettings) {

return ChunkFactories.chunkMarkedVectorFactory(getInputDataType(), isTransient, allocationSettings);

}

This time, the example uses a helper method from the class `ChunkFactories`

. Vectors need to be aggregated into `ChunkMarkedVector`

, which the helper factory ensures.

The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:

`@Override`

public IAggregationBinding bindSource(final IArrayReader source, final IArrayWriter aggregates) {

return new SubVectorSumBinding(source, aggregates);

}

Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together.
That means, if both columns contain aggregated values obtained through the `SubVectorSumAggregation`

, they can be combined.

Since the "2 elements" condition has been applied using the `SubVectorSumBinding`

mentioned above, when aggregating source values, the bindings of the `SumFunction`

can be re-used to sum the vectors of the two columns.

`@Override`

public IAggregationBinding bindAggregates(final IArrayReader input, final IArrayWriter output) {

return new SumVectorAggregationBinding(input, output);

}

All that is left to do is to write the `SubVectorSumBinding`

implementation, extending the abstract vector binding `AVectorAggregationBinding`

:

`public static class SubVectorSumBinding extends AVectorAggregationBinding {`

public SubVectorSumBinding(final IArrayReader source, final IArrayWriter aggregates) {

super(source, aggregates);

}

@Override

protected void doCopy(final int fromPosition, final int toPosition) {

final IVector inputVec = readInput(fromPosition);

if (inputVec == null) {

writeReadOnlyAggregate(toPosition, null);

return;

}

// Take care not to copy to input vector

final IVector vectorToWrite = inputVec.subVector(0, 2);

writeReadOnlyAggregate(toPosition, vectorToWrite);

}

@Override

protected void doAggregate(final int fromPosition, final int toPosition) {

final IVector inputVec = readInput(fromPosition);

if (inputVec == null) {

return;

}

if (inputVec.size() < 2) {

throw new IllegalArgumentException(

"Source vectors are expected to contain more than one element.");

}

if (readReadOnlyAggregate(toPosition) == null) {

doCopy(fromPosition, toPosition);

return;

}

// Handles necessary cloning behind the scene. The initially copied sub-vector must not be written

// into.

final IVector currentAggregate = readWritableAggregate(toPosition);

currentAggregate.plus(inputVec.subVector(0, 2));

}

@Override

protected void doDisaggregate(final int fromPosition, final int toPosition) {

final IVector inputVec = readInput(fromPosition);

if (inputVec == null) {

return;

}

final IVector currentAggregate = readReadOnlyAggregate(toPosition);

if (currentAggregate == null) {

throw new IllegalStateException("Disaggregating from a null value.");

} else {

final IVector currentWritableAggregate = readWritableAggregate(toPosition);

currentWritableAggregate.minus(inputVec.subVector(0, 2));

}

}

}