Skip to main content

Aggregation Functions

Concept

In Database Management Systems (DBMS), an Aggregation Function, or Aggregate Function, is a function that operates on values across rows to perform mathematical calculations such as sum, average, count, minimum/maximum values, standard deviation, as well as some non-mathematical operations, like finding the range of dates covered by the values. An aggregate function takes multiple rows as input and produces a single output.

While the term can also be found in OnLine Transaction Processing (OLTP) workflows, with standard SQL aggregation functions like COUNT, MIN, etc, OnLine Analytical Processing (OLAP) systems such as ActivePivot are built specifically for aggregation.

Similarly to how an aggregation function is, in any DBMS, associated with some columns at query time, ActivePivot aggregation functions are associated with table columns to create aggregated measures.

The SQL query SELECT SUM(dollar_value) FROM trades will naturally return the sum of the values of all trades stored in the trades table. Similarly, the ActivePivot SumFunction can be associated with the column dollar_value to create the measure dollar_value.SUM. The same SUM value for all trades can then be found in the cube, as a top level aggregate.

The concept of aggregation is orthogonal to data storage, and should rather be considered as an analytics notion.

Vocabulary

An aggregate function takes multiple rows as input and produces a single output.

The input values are called source data, or source values, and the output value is called aggregated value.

Aggregating source value trade_value into the aggregated value trade_aggregate is the act of adding a trade_value contribution to trade_aggregate, according to the selected aggregation function. For instance, if result = 2 and val = 3, aggregating trade_value to trade_aggregate:

  • using the SUM function will produce trade_aggregate = 5.
  • using the MAX function will produce trade_aggregate = 3.
  • using the SQUARE_SUM function will produce trade_aggregate = 2 + 3*3 = 11.

ActivePivot Specifics

As mentioned in the Concept Section, ActivePivot is tailored for OLAP workloads. To provide an adequate performance, ActivePivot's aggregation engine can store pre-aggregated data for some selected cube locations, in the AggregateProvider component.

For instance, if a bank has a million open positions on the market, visualizing the total amount involved in these trades requires retrieving a million values and summing them.

However, if the bank is divided into 15 entities, and ActivePivot is storing pre-aggregated data at the entity level, computing that total amount only requires summing the 15 aggregated values.

Let's use a very small example to represent the aforementioned storage component:

Locationdollar_value.SUM
EntityA1000
EntityB50
EntityCnull

EntityC has no open position as of end of day yesterday. Today, all 3 entities closed some positions, and opened some new ones.

Added:

Locationdollar_value.SUM
EntityB2
EntityA264
EntityC46000
EntityD150

The bank created a new entity and traded in its name.

Removed:

Locationdollar_value.SUM
EntityA110
EntityB27

In these tables, the entities are not in the same order as in the storage. The transactional flow that updates the aggregate provider will take care of aligning the locations for data coherence, resulting in:

Locationdollar_value.SUM
EntityA1000 - 110 + 264 = 1154
EntityB50 + 2 - 27 = 25
EntityC46000
EntityD150

The previous example showcases how the SUM aggregation function is responsible for more than simply taking a group of values and summing them. It knows how to initialize the first value for EntityD, it knows how to aggregate some trades to that first copied value, and how to disaggregate other trades from the stored value. It knows how to group pre-aggregated values to output an aggregated value that represents a bigger scope.

To be able to do all of this, ActivePivot aggregation functions are able to take some source columns, and an aggregated column, and bind them together. The use of that binding is outside the scope of the function, which only handles its creation.

In ActivePivot, the aggregation function concept is divided into 3 separate contracts:

  • Specified by the IGenericAggregationFunction contract, aggregation functions represent the exposed API. Each aggregation function is an IPluginValue, stored in the registry, under the Plugin GenericAggregationFunctionPlugin. Custom implementations must be added to the registry, through annotations or manual insertion.

  • Aggregation Functions are factories for IGenericAggregations, which can each be seen as an implementation of its associated Aggregation Function for specific data sources. IGenericAggregations are objects that keep:

    • the names of the columns that hold the data being aggregated. These may be table fields, measure names, etc... This is used to uniquely identify the sources an aggregation requires.
    • the types, from the com.qfs.store.Types API, of these columns.
    • the type of the aggregated data. It is the type of the result of the reduction function mentioned beforehand.

    Based on these information, an IGenericAggregation will specify how the chunks that will hold the aggregated data are created (typically, chunks of the particular aggregated type).

  • IGenericAggregations are used to produce IAggregationBindings, which are responsible for the computation of the aggregated values.

Natively Supported Aggregation Functions

This is the list of natively supported aggregation functions, in alphabetical order.

The following notations are used:

SymbolMeaning
tickfully supported
pluspartially supported
crosswill not be supported

In cases where null is supported, the aggregated type will depend on the nullability of the types of the sources.

Average

Returns the average of the source values. The average is defined as the sum of the contributions, divided by the number of contributions.

Support

null values are strictly ignored. They do not influence the sum of the contributions, nor do they influence the number of contributions. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdouble
Non-Numerical scalarcross
Numerical vectorstickdouble vectorReturns a vector containing the term by term average. Aggregated vectors must all have the same size.
Non-Numerical vectorscross

This function supports disaggregation. Disaggregating a value removes its contribution from the sum, and reduces the number of contributions. Disaggregating down to zero contributions will return null, to prevent dividing by zero.

Copy

Copies the given source value and replace the aggregated value.

Support

null values are supported. Copying null will effectively write null in place of the previous aggregated value. The type of the aggregated value is exactly the type of the source values.

TypeSupport
Numerical scalartick
Non-Numerical scalartick
Numerical vectorstick
Non-Numerical vectorstick

This function does not support aggregating or disaggregating. It only supports replacing the current value with the source value.

Count

Counts the number of contributions to the aggregated value, no matter the contributed values.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well. This function returns long values.

TypeSupport
Numerical scalartick
Non-Numerical scalartick
Numerical vectorstick
Non-Numerical vectorstick

This function supports disaggregation. Disaggregating a value reduces the number of contributions.

Gross Sum

Returns the sum of the absolute values of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term gross sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Long

Returns the sum of the positive contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term long sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Max

Returns the maximum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

Objects implementing Comparable are NOT supported.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function does not support disaggregation. A specialized version of this function, MaxHistoryFunction, supports the disaggregation of values. To do so, it must keep the entire history of contributed values, to be able to retrieve the next maximum. Thus, its usage is memory intensive.

Median

Returns the median of the contributions. In other words, if there is an odd number of contributions, it returns their middle value. If there is an even number of contributions, the average of the two values that are closest to the middle is returned.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, or no values are aggregated, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdouble
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function supports disaggregation, albeit with poor performance. As it must keep the entire history of contributed values, this function is quite memory-intensive.

Min

Returns the minimum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

Objects implementing Comparable are NOT supported.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function does not support disaggregation. A specialized version of this function, MinHistoryFunction, supports the disaggregation of values. To do so, it must keep the entire history of contributed values, to be able to retrieve the next minimum. Thus, its usage is memory intensive.

Multiply

Returns the product of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type*.*int are aggregated as long to prevent overflows.
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function supports disaggregation.

Short

Returns the sum of the negative contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term short sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Single Value

Copies the given source value. Once an aggregated value, it is only possible to contribute a value that is equal to the aggregated value. This contribution has no effect. This aggregation function is mostly used to acknowledge the presence of values, without needing to know which ones.

Support

null values are supported. Copying null will effectively write null in place of the previous aggregated value. Contributing a non-null value when the aggregated value is null will simply copy the source value into the aggregated column. Contributing null has no effect, even if the aggregated value isn't null.

The type of the aggregated value is exactly the type of the source values.

TypeSupport
Numerical scalartick
Non-Numerical scalartick
Numerical vectorstick
Non-Numerical vectorstick

This function does not support disaggregation.

Square Sum

Returns the square sum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type*.*int are aggregated as long to prevent overflows.
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function supports disaggregation.

Standard deviation

Calculates the standard deviation of the contributions using either the population or sample formulas depending on the plugin key.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdoubleAll values are aggregated as doubles
Non-Numerical scalarcross
Numerical vectorstick
Non-Numerical vectorscross

This function supports disaggregation.

Sum

Returns the sum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Timestamp

Records the timestamp of the last update. When contributing a source value to the aggregated value, this function will check which of the two temporal values is the latest, and keep it as the aggregated value.

Support

This function supports int and long representations of a timestamp. null is not supported in that case. It also supports ITimestamp objects. In that case, null values are supported.

Disaggregating is also an update. For this function, disaggregating and aggregating values are the same thing.

Variance

Calculates the variance of the contributions using either the population or sample formulas depending on the plugin key.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdoubleAll values are aggregated as doubles
Non-Numerical scalarcross
Numerical vectorstick
Non-Numerical vectorscross

This function supports disaggregation.

Modifying native Aggregation Functions

The Aggregation Functions in ActivePivot are PluginValues. As such, they are registered in the global Registry, where they can be replaced by a different implementation.

For instance, the SumFunction outputs int primitive values when summing int source values. A user may know that the amount of data one is using SUM with might result in Integer Overflow errors. The user can then override the SumFunction with a CustomSumFunction, and modify its getAggregatedType(sourceType) method to return Types.TYPE_LONG instead of Types.TYPE_INT when the source type is Types.TYPE_INT.

Then, this user needs to ensure CustomSumFunction will take the place of SumFunction in the Registry.

Writing a custom Aggregation Function

Writing a custom aggregation function will require implementing the 3 aforementioned contracts:

  • IGenericAggregationFunction, or one of its specialized interface or abstract class
  • IGenericAggregation, or one of its specialized interface or abstract class
  • IAggregationBinding, or one of its specialized abstract class

Let's write a function that counts the number of strictly positive contributions.

First, one needs to implement the IGenericAggregationFunction contract. Since this function will only count one source column at a time, it can extend the AAggregationFunction abstract class to reduce boilerplate code. The function CountIfPositiveFunction needs to define the key that uniquely identifies it in the Registry, and to make itself known to the Registry:

@QuartetPluginValue(intf = IGenericAggregationFunction.class)
public class CountIfPositiveFunction extends AAggregationFunction {
public static final String PLUGIN_KEY = "COUNT_IF_POSITIVE";
public CountIfPositiveFunction() {
super(PLUGIN_KEY);
}
@Override
public Object key() {
return PLUGIN_KEY;
}

The function can now be called wherever it is needed, through its plugin key. Let's now write its logic.

The function will always output long values, meaning it can override the getAggregatedType method like so:

@Override
public int getAggregatedType(final int sourceType) {
if (Types.isDictionary(sourceType) || !Types.isNumericalType(sourceType))
throw unsupportedSourceType(sourceType);
return Types.TYPE_LONG;
}

Since the function will check for positive values, it must be numerical. The values cannot be dictionarized, as the function needs to read the value to perform this check.

Then, the function must produce an IGenericAggregation, or, in that case with one source column, its specialization IAggregation:

@Override
public IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {
return new CountIfPositiveAggregation(sourceIdentifier, this, sourceType, getAggregatedType(sourceType));
}

CountIfPositiveAggregation can extend AAggregation<CountIfPositiveFunction>. That way, boilerplate getters are already defined in the parent class.

public static class CountIfPositiveAggregation extends AAggregation<CountIfPositiveFunction> {

First, the IGenericAggregation must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:

public CountIfPositiveAggregation(
final String sourceIdentifier,
final CountIfPositiveFunction aggFun,
final int sourceType,
final int aggregatedType) {
super(sourceIdentifier, aggFun, sourceType, aggregatedType);
}

Since this aggregation outputs Types.TYPE_LONG, the factory is relatively straightforward:

@Override
public IChunkFactory<?> createChunkFactory(
final boolean isTransient,
final IAllocationSettings allocationSettings) {
final boolean nullable = false;
return new ChunkFactory<>(
isTransient,
allocationSettings,
(allocator, settings, chunkSize) -> allocator.allocateChunkLong(chunkSize, nullable));
}

This aggregation could also use the helper factories from ChunkFactories. The code snippet above showcases a more generic implementation.

The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:

@Override
public IAggregationBinding bindSource(IArrayReader source, IArrayWriter destination) {
return new CountIfPositiveBinding(source, destination);
}

Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together. That means, if both columns contain aggregated values obtained through the CountIfPositiveFunction, they can be combined.

Since the "positivity of the source values" condition has been checked using the CountIfPositiveBinding mentioned above, when aggregating source values, the bindings of the SumFunction can be re-used to sum the count of the two columns.

@Override
public IAggregationBinding bindAggregates(IArrayReader source, IArrayWriter destination) {
return new SumBindingLong(source, destination);
}

Let's now write the CountIfPositiveBinding:

public static class CountIfPositiveBinding extends AAggregationBinding {
private final IArrayReader source;
private final IArrayWriter aggregates;
public CountIfPositiveBinding(final IArrayReader source, final IArrayWriter aggregates) {
super();
this.source = source;
this.aggregates = aggregates;
}
@Override
public void copy(int from, int to) {
aggregates.writeLong(to, source.readDouble(from) > 0 ? 1L : 0L);
}
@Override
public void aggregate(int from, int to) {
if (source.readDouble(from) > 0)
aggregates.addLong(to, 1L);
}
@Override
public void disaggregate(int from, int to) {
if (source.readDouble(from) > 0)
aggregates.addLong(to, -1L);
}
}

Advanced considerations

Strongly typed aggregation functions

In the CountIfPositiveFunction example, one may note that in order to perform the comparison to 0, the sources are read using the specialized readDouble method. This allows the example to correctly function for the four numerical primitive types in Java supported in ActivePivot, using the automatic primitive conversions of the language.

However, a user can write specifically typed aggregation bindings for each of the supported primitive type. In doing so, the user avoids this unnecessary conversion.

To achieve so, one must rewrite the IAggregation createAggregation(final String sourceIdentifier, final int sourceType) function to handle specific types in a different way, and follow the previous tutorial's flow for each type.

Null support

Note that for simplicity's sake, the above snippets do not handle null values in the sources. The correct condition should probably be: if (!source.isNull(from) && source.readDouble(from) > 0). In that case, it would be even better to create two bindings: CountIfPositiveBinding and CountIfPositiveBindingWithNullSupport. The second would check for null values in the sources, at the cost of a null check, while the first one would be specialized for sources that are not nullable, and doesn't need that additional check, for performance aspects.

Mutability/Immutability

While writing bindings for primitive types is relatively straight forward, aggregating objects is a much more delicate matter, because of the memory footprint of Objects. Since arguments in Java are always passed-by-value, one must carefully plan around the potential modification of the aggregated Object.

For instance, let's write an aggregation function that takes a source of java.time.Duration (that represents the time that passed between the submission of a market order and its execution), and computes the total time it took to execute all orders.

This can be adapted to compute the average time for an order execution, but for simplicity's sake, this example will showcase the SUM's implementation.

Let's say these are the values stored in a database table named trades: (notation : Duration(1, 500) is a Duration object that has been created using private Duration.create(1L second, 500 nanoseconds))

TradeIddollar_valueexecution_time
01000Duration(1, 500)
150Duration(0, 65000)
2250Duration(12, 888 888)

To compute execution_time.SUM, the binding can start by copying Duration(1, 500) in the aggregated column.

To simplify this example, let's assume that all data must be aggregated at index 0 of the aggregated column.

First, binding.copy(0, 0) is called, with this definition:

public class DurationBinding extends AAggregationBinding {
@Override
public void copy(int from, int to) {
final Duration sourceDuration = source.read(from);
destination.write(to, sourceDuration);
}
}

Following the Java Memory model, this action will add in the corresponding cell of the aggregated column a reference to the actual Duration(1, 500) that is stored on heap. Then, to aggregate the next value, the binding can call binding.aggregate(1, 0):

public class DurationBinding extends AAggregationBinding {
@Override
public void aggregate(int from, int to) {
final Duration sourceDuration = source.read(from);
final Duration currentDuration = destination.read(to);
destination.write(to, currentDuration.plus(sourceDuration));
}
}

However, doing so will, according to Duration#plus(Duration)'s javadoc, create a new Duration object as the result of the sum. This means that every aggregation operation creates a new object on the heap, and allows the previous object to be garbage collected once it is referenced no more.

This is problematic, because aggregation bindings are performance sensitive, and this method may be called millions of times, thus creating millions of transient instances of the class java.time.Duration and needlessly putting pressure on the application.

Since no way around this issue can be found while using the immutable java.time.Duration, there is a strong incentive to create a mutable copy of the class, which will be called MutableDuration in the rest of this page.

This isn't so far-fetched: before Java 8, time representations were so poor that org.joda.time was developed as a very popular open-sourced alternative. org.joda.time.MutableDateTime was created as part of this alternative, and serves as a model for this example.

MutableDuration will have the exact same API as java.time.Duration, but for this example's sake, public Duration plus(Duration) is replaced by public void plus(MutableDuration). This method mutates the current object.

The binding now evolves to become:

public class MutableDurationBinding extends AAggregationBinding {
@Override
public void aggregate(int from, int to) {
final MutableDuration sourceDuration = source.read(from);
final MutableDuration currentDuration = destination.read(to);
currentDuration.plus(sourceDuration); // no need to write to the destination column anymore, as the currentDuration is modified in place.
}
}

To recap,

  • binding.copy(0, 0) will copy MutableDuration(1, 500) to the aggregated column, in the cell of index 0.
  • binding.aggregate(1, 0) will aggregate MutableDuration(0, 65000) into the current value in the cell at index 0.

However, this operation is now mutating the instance MutableDuration(1, 500), which is the same instance as the one stored in the trades. It just became MutableDuration(1, 65500). This means that because of the pass-by-value paradigm, the MutableDuration being modified in the same object stored on the Java Heap that is referenced from the database table

To prevent that from ever happening, the copy method of the binding can be smarter: it can clone the MutableDuration instead of simply passing it, and store the cloned object in the aggregated column:

public class MutableDurationBinding extends AAggregationBinding {
@Override
public void copy(int from, int to) {
final MutableDuration sourceDuration = source.read(from);
destination.write(to, MutableDuration.from(sourceDuration)); // write a clone that is only owned by the aggregated column.
}
}

This solution will not have any side effect, but it will still prove costly memory-wise. Indeed, if there are lots of aggregated values that are computed simply by copying an initial contribution, without ever aggregating further contributions, it will have cloned many MutableDuration objects to no benefit.

An optimal solution will only clone when it's needed. It will first copy the value, but mark it as unmodifiable. Then, at the time of calling aggregate, it will clone the current value,and then call MutableDuration#plus(MutableDuration) on the cloned value.

That cloned object will forever be the one stored in the aggregated column, and all users are free to modify that value without impacting any other code part that could hold a reference to the same address on the Java Heap.

To help users achieve this ideal scenario, ActivePivot comes with the abstract aggregation binding AAggregationBindingCloneable. This binding simply requires the user to implement MutableDuration cloneAggregate(MutableDuration), on top of the regular methods of a binding. In exchange, it provides multiple methods that handle cloning for the user.

public class MutableDurationBinding extends AAggregationBindingCloneable<MutableDuration> {

protected MutableDurationBinding(final IArrayReader source, final IArrayWriter destination) {
super(source, destination);
}

@Override
protected MutableDuration cloneAggregate(final MutableDuration aggregate) {
return MutableDuration.from(aggregate);
}

@Override
public void copy(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
writeReadOnlyAggregate(toPosition, sourceDuration); // can be null
}

@Override
public void aggregate(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
if (Objects.isNull(sourceDuration)) {
return; // Nothing to do.
}
final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);
if (Objects.isNull(currentAggregate)) {
copy(fromPosition, toPosition);
} else {
// will handle cloning for me if necessary
final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);
writableCurrentAggregate.plus(sourceDuration);
}
}

@Override
public void disaggregate(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
if (Objects.isNull(sourceDuration)) {
return; // Nothing to do.
}
final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);
if (Objects.isNull(currentAggregate)) {
throw new IllegalStateException("Cannot disaggregate from a null value");
} else if (sourceDuration == currentAggregate) {
// Shortcut in the exceptional case where this is the exact same object, replace it by null.
removeAggregate(toPosition);
} else {
// will handle cloning for me if necessary
final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);
writableCurrentAggregate.minus(sourceDuration);
}
}

}

Aggregating Vectors

Vectors are one of the most frequently used features of ActivePivot. Querying and aggregating vectors is extremely CPU and memory intensive, and should be accomplished with great care.

ActivePivot provides a specialization of the class AAggregationBindingCloneable, designed for vector aggregation: AVectorAggregationBinding. This binding handles the internal aspects of vector aggregation: on-heap and off-heap allocation, cloning, reference counting for garbage collection, etc...

In order to implement a vector aggregation binding, it is strongly recommended to extend this class.

Let's write an aggregation function that produces the sum of each source vector, limited to their first two elements.

This function will typically be difficult to write, for the following reasons:

  • The source IVectors may be on-heap or off-heap.
  • The aggregated IVector can be lazily copied, as seen in the previous example with MutableDuration.
  • Acquiring and storing a sub-part of an IVector may have an impact on the vector's reference counting.

First, one needs to implement the IGenericAggregationFunction contract. Since this function will only count one source column at a time, it can extend the AAggregationFunction abstract class to reduce boilerplate code. The SubVectorSumFunction function needs to add itself to the Registry using a unique key:

@QuartetPluginValue(intf = IGenericAggregationFunction.class)
public class SubVectorSumFunction extends AAggregationFunction {
public static final String PLUGIN_KEY = "SUB_VECTOR_SIZE_2_SUM";
public SubVectorSumFunction() {
super(PLUGIN_KEY);
}
@Override
public Object key() {
return PLUGIN_KEY;
}

The function can now be called wherever it is needed, using its plugin key. Let's write its logic. The function will always output numerical arrays, meaning it can override the getAggregatedType method like so:

@Override
public int getAggregatedType(final int sourceType) {
if (sourceType == Types.TYPE_INT_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_LONG_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_FLOAT_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_DOUBLE_ARRAY) {
return sourceType;
} else {
throw unsupportedSourceType(sourceType);
}
}

Then, the function must produce an IAggregation, the appropriate IGenericAggregation type in the single source column case.

@Override
protected IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {
return new SubVectorSumAggregation(sourceIdentifier, this, sourceType, getAggregatedType(sourceType));
}

SubVectorSumFunction can extend AAggregation<CountIfPositiveFunction> to inherit boilerplate getters.

public static class SubVectorSumAggregation extends AAggregation<SubVectorSumFunction> {

First, the IGenericAggregation must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:

public SubVectorSumAggregation(
final String sourceIdentifier,
final SubVectorSumFunction aggFun,
final int sourceType,
final int aggregatedType) {
super(sourceIdentifier, aggFun, sourceType, aggregatedType);
}

Since this aggregation outputs vectors, the factory is relatively straightforward:

@Override
public IChunkFactory<?> createChunkFactory(
final boolean isTransient,
final IAllocationSettings allocationSettings) {
return ChunkFactories.chunkMarkedVectorFactory(getInputDataType(), isTransient, allocationSettings);
}

This time, the example uses a helper method from the class ChunkFactories. Vectors need to be aggregated into ChunkMarkedVector, which the helper factory ensures.

The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:

@Override
public IAggregationBinding bindSource(final IArrayReader source, final IArrayWriter aggregates) {
return new SubVectorSumBinding(source, aggregates);
}

Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together. That means, if both columns contain aggregated values obtained through the SubVectorSumAggregation, they can be combined.

Since the "2 elements" condition has been applied using the SubVectorSumBinding mentioned above, when aggregating source values, the bindings of the SumFunction can be re-used to sum the vectors of the two columns.

@Override
public IAggregationBinding bindAggregates(final IArrayReader input, final IArrayWriter output) {
return new SumVectorAggregationBinding(input, output);
}

All that is left to do is to write the SubVectorSumBinding implementation, extending the abstract vector binding AVectorAggregationBinding:

public static class SubVectorSumBinding extends AVectorAggregationBinding {
public SubVectorSumBinding(final IArrayReader source, final IArrayWriter aggregates) {
super(source, aggregates);
}
@Override
protected void doCopy(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
writeReadOnlyAggregate(toPosition, null);
return;
}
// Take care not to copy to input vector
final IVector vectorToWrite = inputVec.subVector(0, 2);
writeReadOnlyAggregate(toPosition, vectorToWrite);
}
@Override
protected void doAggregate(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
return;
}
if (inputVec.size() < 2) {
throw new IllegalArgumentException(
"Source vectors are expected to contain more than one element.");
}
if (readReadOnlyAggregate(toPosition) == null) {
doCopy(fromPosition, toPosition);
return;
}
// Handles necessary cloning behind the scene. The initially copied sub-vector must not be written
// into.
final IVector currentAggregate = readWritableAggregate(toPosition);
currentAggregate.plus(inputVec.subVector(0, 2));
}
@Override
protected void doDisaggregate(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
return;
}
final IVector currentAggregate = readReadOnlyAggregate(toPosition);
if (currentAggregate == null) {
throw new IllegalStateException("Disaggregating from a null value.");
} else {
final IVector currentWritableAggregate = readWritableAggregate(toPosition);
currentWritableAggregate.minus(inputVec.subVector(0, 2));
}
}
}