Skip to main content

Aggregation Functions

Concept

In Database Management Systems (DBMS), an Aggregation Function, or Aggregate Function, is a function that operates on values across rows to perform mathematical calculations such as sum, average, count, minimum/maximum values, standard deviation, as well as some non-mathematical operations, like finding the range of dates covered by the values. An aggregate function takes multiple rows as input and produces a single output.

While the term can also be found in OnLine Transaction Processing (OLTP) workflows, with standard SQL aggregation functions like COUNT, MIN, etc, OnLine Analytical Processing (OLAP) systems such as Atoti are built specifically for aggregation.

Similarly to how an aggregation function is, in any DBMS, associated with some columns at query time, Atoti aggregation functions are associated with table columns to create aggregated measures.

The SQL query SELECT SUM(dollar_value) FROM trades will naturally return the sum of the values of all trades stored in the trades table. Similarly, the Atoti SumFunction can be associated with the column dollar_value to create the measure dollar_value.SUM. The same SUM value for all trades can then be found in the cube, as a top level aggregate.

The concept of aggregation is orthogonal to data storage, and should rather be considered as an analytics notion.

Vocabulary

An aggregate function takes multiple rows as input and produces a single output.

The input values are called source data, or source values, and the output value is called aggregated value.

Aggregating source value trade_value into the aggregated value trade_aggregate is the act of adding a trade_value contribution to trade_aggregate, according to the selected aggregation function. For instance, if trade_aggregate = 2 and trade_value = 3, aggregating trade_value to trade_aggregate:

  • using the SUM function will modify the aggregate trade_aggregate to trade_aggregate = 5.
  • using the MAX function will modify the aggregate trade_aggregate to trade_aggregate = 3.
  • using the SQUARE_SUM function will modify the aggregate trade_aggregate to trade_aggregate = 2 + 3*3 = 11.

Atoti Specifics - Aggregate Providers

As mentioned in the Concept Section, Atoti is tailored for OLAP workloads. To provide better performance, Atoti's aggregation engine can store pre-aggregated data for some selected cube locations, in the AggregateProvider component.

For instance, if a bank has a million open positions on the market, visualizing the total amount involved in these trades requires retrieving a million values and summing them.

However, if the bank is divided into 15 entities, and Atoti is storing pre-aggregated data at the entity level, computing that total amount only requires summing the 15 aggregated values.

Let's use a very small example to represent the aforementioned storage component:

Locationdollar_value.SUM
EntityA1000
EntityB50
EntityCnull

EntityC has no open position as of end of day yesterday. Today, all 3 entities closed some positions, and opened some new ones. These updates are published to the database in one single transaction, whose content is the following:

Added facts:

KeyEntityInstrumentdollar_value
12EntityBX1
13EntityBY1
14EntityAY264
15EntityCX1
16EntityCX1000
17EntityCY44899
18EntityDZ150

Removed facts:

KeyEntityInstrumentdollar_value
2EntityBX10
7EntityBY100
8EntityAY264

These changes accumulated in the database transaction are aggregated into the aggregate provider. The aggregated changes for dollar_value.SUM are:

Added:

Locationdollar_value.SUM
EntityB1 + 1 = 2
EntityA264
EntityC1 + 1000 + 44899 = 46000
EntityD150

Removed:

Locationdollar_value.SUM
EntityA10 + 100 = 110
EntityB27

In these tables, the entities are not in the same order as in the storage. The transactional flow that updates the aggregate provider will take care of aligning the locations for data coherence, resulting in:

Locationdollar_value.SUM
EntityA1000 - 110 + 264 = 1154
EntityB50 + 2 - 27 = 25
EntityC46000
EntityD150

The previous example showcases how the SUM aggregation function's responsibilities, which go further than simply taking a group of values and summing them. It knows how to initialize the first value for EntityD, it knows how to aggregate some trades to that first copied value, and how to disaggregate other trades from the stored value. And to compute the dollar_value.SUM aggregated value for all entities, it must know how to group pre-aggregated values to output an aggregated value that represents a bigger scope.

To be able to do all of this, Atoti aggregation functions are able to take some source columns, and an aggregated column, and bind them together. The use of that binding is outside the scope of the function, which only handles its creation.

In Atoti Server, the aggregation function concept is divided into 3 separate contracts:

  • Specified by the IGenericAggregationFunction contract, aggregation functions represent the exposed API. Each aggregation function is an IPluginValue, stored in the registry, under the IPlugin GenericAggregationFunctionPlugin. Their plugin key for the Registry can be found in the base interfaces for aggregation functions. Custom implementations must be added to the registry, through annotations or through manual insertion.

  • Aggregation Functions are singleton factories for IGenericAggregations, which can each be seen as an implementation of its associated Aggregation Function for specific data sources. IGenericAggregations are objects that keep:

    • the names of the columns that hold the data being aggregated. These may be table fields, measure names, etc... This is used to uniquely identify the sources an aggregation requires.
    • the types, from the com.activeviam.tech.chunks.api.types.Types API, of these columns.
    • the type of the aggregated data. It is the type of the result of the reduction function mentioned beforehand.
  • IGenericAggregations are used to produce IAggregationBindings, which are responsible for the computation of the aggregated values. As mentioned in the above example, the binding knows, for its given source and aggregated types, how to copy the first value into the result column via the copy method, how to aggregate subsequent values into the column via the aggregate method, and how to disaggregate values from the result column when updating an AggregateProvider with removals in the underlying database via the disaggregate method.

Bindings have 3 methods, copy, aggregate, and disaggregate. The disaggregate method is only ever used for AggregateProvider updates, when underlying data have been removed. This means that it is not necessary to implement that method when writing a custom aggregation function for a Dynamic Aggregation, for instance. In such a case, one should defensively override and throw in IGenericAggregationFunction#withRemovalSupport.

Natively Supported Aggregation Functions

This is the list of natively supported aggregation functions, in alphabetical order.

The following notations are used:

SymbolMeaning
tickfully supported
pluspartially supported
crosswill not be supported

In cases where null is supported, the aggregated type will depend on the nullability of the types of the sources.

Average

Returns the average of the source values. The average is defined as the sum of the contributions, divided by the number of contributions.

Support

null values are strictly ignored. They do not influence the sum of the contributions, nor do they influence the number of contributions. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdouble
Non-Numerical scalarcross
Numerical vectorstickdouble vectorReturns a vector containing the term by term average. Aggregated vectors must all have the same size.
Non-Numerical vectorscross

This function supports disaggregation. Disaggregating a value removes its contribution from the sum, and reduces the number of contributions. Disaggregating down to zero contributions will return null, to prevent dividing by zero.

Copy

Copies the given source value and replace the aggregated value.

Support

null values are supported. Copying null will effectively write null in place of the previous aggregated value. The type of the aggregated value is exactly the type of the source values.

TypeSupport
Numerical scalartick
Non-Numerical scalartick
Numerical vectorstick
Non-Numerical vectorstick

This function does not support aggregating or disaggregating. It only supports replacing the current value with the source value.

Count

Counts the number of contributions to the aggregated value, no matter the contributed values.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well. This function returns long values.

TypeSupport
Numerical scalartick
Non-Numerical scalartick
Numerical vectorstick
Non-Numerical vectorstick

This function supports disaggregation. Disaggregating a value reduces the number of contributions.

Distinct Count

Counts the number of distinct non-null contributions to the aggregated value.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well. This function returns long values.

TypeSupport
Numerical scalartick
Non-Numerical scalartick
Numerical vectorstick
Non-Numerical vectorstick

This function supports disaggregation, and is memory intensive, whether disaggregation support is needed or not: for a single aggregated value of the distinct count, a history of all the distinct contributions that have been counted so far must be kept in memory.

Gross Sum

Returns the sum of the absolute values of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term gross sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Long

Returns the sum of the positive contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term long sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Max

Returns the maximum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

Objects implementing Comparable are NOT supported.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function does not support disaggregation. A specialized version of this function, MaxHistoryFunction, supports the disaggregation of values. To do so, it must keep the entire history of contributed values, to be able to retrieve the next maximum. Thus, its usage is memory intensive.

Median

Returns the median of the contributions. In other words, if there is an odd number of contributions, it returns their middle value. If there is an even number of contributions, the average of the two values that are closest to the middle is returned.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, or no values are aggregated, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdouble
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function supports disaggregation, albeit with poor performance. As it must keep the entire history of contributed values, this function is quite memory-intensive.

Min

Returns the minimum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

Objects implementing Comparable are NOT supported.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function does not support disaggregation. A specialized version of this function, MinHistoryFunction, supports the disaggregation of values. To do so, it must keep the entire history of contributed values, to be able to retrieve the next minimum. Thus, its usage is memory intensive.

Multiply

Returns the product of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type*.*int are aggregated as long to prevent overflows.
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function supports disaggregation.

Percentile

Returns a percentile of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdouble
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function supports disaggregation, albeit with poor performance. As it must keep the entire history of contributed values, this function is quite memory-intensive.

To use this function, call one of the various PercentileFunction.generateKey() methods to get a plugin key associated with the desired percentile. For example, PercentileFunction.generateKey(10.) would generate a plugin key for a first decile function. A few extra parameters are available if one needs to exactly match a particular definition of percentile.

Exact definition

There is no clear official method to compute a percentile from a discrete set of observations, mostly due to the fact that all methods converge to the same result as the size of this set increases. If a specific definition of a percentile is needed for samples of small sizes, the Percentile function can take parameters to behave according to most usual methods. PercentileFunction.generateKey() can take two optional arguments on top of the desired percentile: The first represents the method used to find the theoretical index of the selected percentile. The various options allow to select a particular definition of this index. The second represents the interpolation mode, which determines how the percentile is computed if its index is not an integer.

Here are the values to supply to match these standard computation methods:

  • R-1: IndexCalculationMode.SIMPLE and InterpolationMode.HIGHER.
  • R-2: IndexCalculationMode.SIMPLE and InterpolationMode.HIGHER_WITH_AVG.
  • R-3: Not supported yet.
  • R-4: IndexCalculationMode.SIMPLE and InterpolationMode.LINEAR.
  • R-5: IndexCalculationMode.CENTERED and InterpolationMode.LINEAR.
  • R-6: IndexCalculationMode.EXCLUSIVE and InterpolationMode.LINEAR.
  • R-7: IndexCalculationMode.INCLUSIVE and InterpolationMode.LINEAR.
  • R-8 and R-9: Not supported yet.

Short

Returns the sum of the negative contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term short sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Single Value

Copies the given source value. Once an aggregated value, it is only possible to contribute a value that is equal to the aggregated value. This contribution has no effect. This aggregation function is mostly used to acknowledge the presence of values, without needing to know which ones.

Support

null values are supported. Copying null will effectively write null in place of the previous aggregated value. Contributing a non-null value when the aggregated value is null will simply copy the source value into the aggregated column. Contributing null has no effect, even if the aggregated value isn't null.

The type of the aggregated value is exactly the type of the source values.

TypeSupport
Numerical scalartick
Non-Numerical scalartick
Numerical vectorstick
Non-Numerical vectorstick

This function does not support disaggregation.

Square Sum

Returns the square sum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type*.*int are aggregated as long to prevent overflows.
Non-Numerical scalarcross
Numerical vectorscross
Non-Numerical vectorscross

This function supports disaggregation.

Standard deviation

Calculates the standard deviation of the contributions using either the population or sample formulas depending on the plugin key.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdoubleAll values are aggregated as doubles
Non-Numerical scalarcross
Numerical vectorstick
Non-Numerical vectorscross

This function supports disaggregation.

Sum

Returns the sum of the contributions.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickSame as source type
Non-Numerical scalarcross
Numerical vectorstickSame as source typeReturns a vector containing the term by term sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes.
Non-Numerical vectorscross

This function supports disaggregation.

Timestamp

Records the timestamp of the last update. When contributing a source value to the aggregated value, this function will check which of the two temporal values is the latest, and keep it as the aggregated value.

Support

This function supports int and long representations of a timestamp. null is not supported in that case. It also supports ITimestamp objects. In that case, null values are supported.

Disaggregating is also an update. For this function, disaggregating and aggregating values are the same thing.

Variance

Calculates the variance of the contributions using either the population or sample formulas depending on the plugin key.

Support

null values are strictly ignored. If all the values aggregated into the output value are null, the output value is null as well.

TypeSupportAggregated TypeDetails
Numerical scalartickdoubleAll values are aggregated as doubles
Non-Numerical scalarcross
Numerical vectorstick
Non-Numerical vectorscross

This function supports disaggregation.

Modifying native Aggregation Functions

The Aggregation Functions in Atoti Server are IPluginValues. As such, they are registered in the global Registry, where they can be replaced by a different implementation.

For instance, the SumFunction outputs int primitive values when summing int source values. A user may know that the amount of data one is using SUM with might result in Integer Overflow errors. They can then override the SumFunction with a CustomSumFunction, and modify its getAggregatedType(sourceType) method to return Types.TYPE_LONG instead of Types.TYPE_INT when the source type is Types.TYPE_INT.

Then, they need to ensure CustomSumFunction will take the place of SumFunction in the Registry.

Writing a custom Aggregation Function

Writing a custom aggregation function will require implementing the 3 aforementioned contracts:

  • IGenericAggregationFunction, or one of its specialized interface or abstract class
  • IGenericAggregation, or one of its specialized interface or abstract class
  • IAggregationBinding, or one of its specialized abstract class

Let's write a function that counts the number of strictly positive contributions.

First, one needs to implement the IGenericAggregationFunction contract. Since this function will only count one source column at a time, it can extend the AAggregationFunction abstract class to reduce boilerplate code. The function CountIfPositiveFunction needs to define the key that uniquely identifies it in the Registry, and to make itself known to the Registry:

@AtotiPluginValue(intf = IGenericAggregationFunction.class)
public class CountIfPositiveFunction extends AAggregationFunction {
public static final String PLUGIN_KEY = "COUNT_IF_POSITIVE";
public CountIfPositiveFunction() {
super(PLUGIN_KEY);
}
@Override
public String key() {
return PLUGIN_KEY;
}

The function can now be called wherever it is needed, through its plugin key. Let's now write its logic.

The function will always output long values, meaning it can override the getAggregatedType method like so:

@Override
public int getAggregatedType(final int sourceType) {
if (Types.isDictionarized(sourceType) || !Types.isNumericalType(sourceType)) {
throw unsupportedSourceType(sourceType);
}
return Types.TYPE_LONG;
}

Since the function will check for positive values, it must be numerical. The values cannot be dictionarized, as the function needs to read the value to perform this check.

Then, the function must produce an IGenericAggregation, or, in that case with one source column, its specialization IAggregation:

@Override
public IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {
return new CountIfPositiveAggregation(
sourceIdentifier, this, sourceType, getAggregatedType(sourceType));
}

CountIfPositiveAggregation can extend AAggregation<CountIfPositiveFunction>. That way, boilerplate getters are already defined in the parent class.

public static class CountIfPositiveAggregation extends AAggregation<CountIfPositiveFunction> {

First, the IGenericAggregation must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:

public CountIfPositiveAggregation(
final String sourceIdentifier,
final CountIfPositiveFunction aggFun,
final int sourceType,
final int aggregatedType) {
super(sourceIdentifier, aggFun, sourceType, aggregatedType);
}

The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:

@Override
public IAggregationBinding bindSource(
final IPositionalReader source, final IArrayWriter destination) {
return new CountIfPositiveSourceBinding(source, destination);
}

Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together. This means that both columns contain aggregated values obtained through the CountIfPositiveFunction, and can be directly combined.

To get the total count of positive values from 2 columns containing partial counts of positive values, one simply needs to sum the 2 partial counts.

@Override
public IAggregationBinding bindAggregates(
final IPositionalReader source, final IArrayWriter destination) {
return new CountIfPositiveAggregateBinding(source, destination);
}

Let's now write the source binding:

public static class CountIfPositiveSourceBinding extends AAggregationBinding {
private final IPositionalReader source;
private final IArrayWriter aggregates;
public CountIfPositiveSourceBinding(
final IPositionalReader source, final IArrayWriter aggregates) {
this.source = source;
this.aggregates = aggregates;
}
@Override
public void copy(final int from, final int to) {
aggregates.writeLong(to, source.readDouble(from) > 0 ? 1L : 0L);
}
@Override
public void aggregate(final int from, final int to) {
if (source.readDouble(from) > 0) {
aggregates.addLong(to, 1L);
}
}
@Override
public void disaggregate(final int from, final int to) {
if (source.readDouble(from) > 0) {
aggregates.addLong(to, -1L);
}
}
}

And the aggregation binding:

public static class CountIfPositiveAggregateBinding extends AAggregationBinding {
private final IPositionalReader source;
private final IArrayWriter aggregates;
public CountIfPositiveAggregateBinding(
final IPositionalReader source, final IArrayWriter aggregates) {
this.source = source;
this.aggregates = aggregates;
}
@Override
public void copy(final int from, final int to) {
aggregates.writeLong(to, source.readLong(from));
}
@Override
public void aggregate(final int from, final int to) {
aggregates.addLong(to, source.readLong(from));
}
@Override
public void disaggregate(final int from, final int to) {
aggregates.addLong(to, -source.readLong(from));
}
}

Implementing aggregation functions allow users to fine tune the behavior around nullability, or to override native implementations with custom algorithms, such as Kahan Summation instead of the natural Sum aggregation.

Advanced considerations

Strongly typed aggregation functions

In the CountIfPositiveFunction example, one may note that in order to perform the comparison to 0, the sources are read using the specialized readDouble method. This allows the example to correctly function for the four numerical primitive types in Java supported in Atoti Server, using the automatic primitive conversions of the language.

However, a user can write specifically typed aggregation bindings for each of the supported primitive type. In doing so, the user avoids this unnecessary type widening.

To achieve this, one must rewrite the IAggregation createAggregation(final String sourceIdentifier, final int sourceType) function to handle specific types in a different way, and follow the previous tutorial's flow for each type.

Null support

For simplicity's sake, the above snippets do not handle null values in the sources. The correct condition should probably be: if (!source.isNull(from) && source.readDouble(from) > 0). In that case, it would be even better to create two bindings: one that supports null values, and one that doesn't. The second would check for null values in the sources, at the cost of a null check, while the first one would be specialized for sources that are not nullable, and doesn't need that additional check, for performance aspects.

Mutability/Immutability

While writing bindings for primitive types is relatively straightforward, aggregating objects is a much more delicate matter.

  • The memory footprint of Objects should be taken into account when storing in an AggregateProvider a measure that relies on stored objects.
  • Because arguments in Java are always passed-by-value, one must carefully plan around the potential mutations of the aggregated Object when manipulated in the bindings.

For instance, let's write an aggregation function that takes a source of java.time.Duration (that represents the time that passed between the submission of a market order and its execution), and computes the total time it took to execute all orders.

This can be adapted to compute the average time for an order execution, but for simplicity's sake, this example will showcase the SUM's implementation.

Let's say these are the values stored in a database table named trades: (notation : Duration(1, 500) is a Duration object that has been created using private Duration.create(1L second, 500 nanoseconds))

TradeIddollar_valueexecution_time
01000Duration(1, 500)
150Duration(0, 65000)
2250Duration(12, 888 888)

To compute execution_time.SUM, the binding can start by copying Duration(1, 500) in the aggregated column.

To simplify this example, let's assume that all data must be aggregated at index 0 of the aggregated column.

First, binding.copy(0, 0) is called, with this definition:

public class DurationBinding extends AAggregationBinding {
@Override
public void copy(final int from, final int to) {
final Duration sourceDuration = source.read(from);
destination.write(to, sourceDuration);
}
}

Following the Java Memory model, this action will add in the corresponding cell of the aggregated column a reference to the actual Duration(1, 500) that is stored on heap. Then, to aggregate the next value, the binding can call binding.aggregate(1, 0):

public class DurationBinding extends AAggregationBinding {
@Override
public void aggregate(final int from, final int to) {
final Duration sourceDuration = source.read(from);
final Duration currentDuration = destination.read(to);
destination.write(to, currentDuration.plus(sourceDuration));
}
}

However, doing so will, according to Duration#plus(Duration)'s javadoc, create a new Duration object as the result of the sum. This means that every aggregation operation creates a new object on the heap, and allows the previous object to be garbage collected once it is referenced no more.

This is problematic, because aggregation bindings are performance sensitive, and this method may be called millions of times, thus creating millions of transient instances of the class java.time.Duration and needlessly putting pressure on the application.

Since no way around this issue can be found while using the immutable java.time.Duration, there is a strong incentive to create a mutable copy of the class, which will be called MutableDuration in the rest of this page.

This isn't so far-fetched: before Java 8, time representations were so poor that org.joda.time was developed as a very popular open-sourced alternative. org.joda.time.MutableDateTime was created as part of this alternative, and serves as a model for this example.

MutableDuration will have the exact same API as java.time.Duration, but for this example's sake, public Duration plus(Duration) is replaced by public void plus(MutableDuration). This method mutates the current object.

The binding now evolves to become:

public class MutableDurationBinding extends AAggregationBinding {
@Override
public void aggregate(final int from, final int to) {
final MutableDuration sourceDuration = source.read(from);
final MutableDuration currentDuration = destination.read(to);
currentDuration.plus(sourceDuration); // no need to write to the destination column anymore, as the currentDuration is modified in place.
}
}

To recap,

  • binding.copy(0, 0) will copy MutableDuration(1, 500) to the aggregated column, in the cell of index 0.
  • binding.aggregate(1, 0) will aggregate MutableDuration(0, 65000) into the current value in the cell at index 0.

However, this operation is now mutating the instance MutableDuration(1, 500). It just became MutableDuration(1, 65500). Because objects are passed-by-value, the MutableDuration being modified is the object stored on the Java Heap that is referenced from the database table trades

To prevent this undesirable side-effect from ever happening, the copy method of the binding must be smarter: it needs to clone the MutableDuration instead of simply passing it, and store the cloned object in the aggregated column:

public class MutableDurationBinding extends AAggregationBinding {
@Override
public void copy(final int from, final int to) {
final MutableDuration sourceDuration = source.read(from);
destination.write(to, MutableDuration.from(sourceDuration)); // write a clone that is only owned by the aggregated column.
}
}

This solution will not have any side effect, but it will still prove costly memory-wise. Indeed, if there are lots of aggregated values that are computed simply by copying an initial contribution, without ever aggregating further contributions, it will have cloned many MutableDuration objects to no benefit.

An optimal solution will only clone when needed. It will first copy the value, but mark it as unmodifiable. Then, at the time of calling aggregate, it will clone the current value,and then call MutableDuration#plus(MutableDuration) on the cloned value.

That cloned object will forever be the one stored in the aggregated column, and all users are free to modify that value without impacting any other code part that could hold a reference to the same address on the Java Heap.

To help users achieve this ideal scenario, Atoti Server comes with the abstract aggregation binding AAggregationBindingCloneable. This binding simply requires the user to implement MutableDuration cloneAggregate(MutableDuration), on top of the regular methods of a binding. In exchange, it provides multiple methods that handle cloning for the user.

public static class MutableDurationSumBinding
extends AAggregationBindingCloneable<MutableDuration> {
protected MutableDurationSumBinding(final IPositionalReader input, final IArrayWriter output) {
super(input, output);
}
@Override
public void copy(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
writeReadOnlyAggregate(toPosition, sourceDuration); // can be null
}
@Override
public void aggregate(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
if (Objects.isNull(sourceDuration) || sourceDuration.isZero()) {
return; // Nothing to do.
}
final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);
if (Objects.isNull(currentAggregate) || currentAggregate.isZero()) {
copy(fromPosition, toPosition);
} else {
// will handle cloning for me if necessary
final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);
writableCurrentAggregate.plus(sourceDuration);
}
}
@Override
public void disaggregate(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
if (Objects.isNull(sourceDuration) || sourceDuration.isZero()) {
return; // Nothing to do.
}
final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);
if (Objects.isNull(currentAggregate)) {
throw new IllegalStateException("Cannot disaggregate from a null value");
} else {
// will handle cloning for me if necessary
final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);
writableCurrentAggregate.minus(sourceDuration);
if (writableCurrentAggregate.isZero()) {
removeAggregate(toPosition);
}
}
}
@Override
protected MutableDuration cloneAggregate(final MutableDuration aggregate) {
return MutableDuration.ofSeconds(aggregate.seconds, aggregate.nanos);
}
}

To be able to use this binding implementation, Atoti Server needs to be able to store, for each aggregated value, a flag stating whether it has already been copied or not. To do so, one needs to indicate that it requires "Marked chunks" in the chunks settings of the aggregation:

@Override
public AggregationChunksSetting getChunksSetting() {
return AggregationChunksSetting.builder().useMarkedChunk(true).build();
}

Aggregating Vectors

Vectors are one of the most frequently used features of Atoti. Querying and aggregating vectors is extremely CPU and memory intensive, and should be accomplished with great care.

Atoti Server provides a specialization of the class AAggregationBindingCloneable, designed for vector aggregation: AVectorAggregationBinding. This binding handles the internal aspects of vector aggregation: on-heap and off-heap allocation, cloning, reference counting for garbage collection, etc...

In order to implement a vector aggregation binding, it is strongly recommended to extend this class.

Let's write an aggregation function that produces the sum of each source vector, limited to their first two elements.

This function will typically be difficult to write, for the following reasons:

  • The source IVectors may be on-heap or off-heap.
  • The aggregated IVector can be lazily copied, as seen in the previous example with MutableDuration.
  • Acquiring and storing a sub-part of an IVector may have an impact on the vector's reference counting.

First, one needs to implement the IGenericAggregationFunction contract. Since this function will only count one source column at a time, it can extend the AAggregationFunction abstract class to reduce boilerplate code. The SubVectorSumFunction function needs to add itself to the Registry using a unique key:

@AtotiPluginValue(intf = IGenericAggregationFunction.class)
public class SubVectorSumFunction extends AAggregationFunction {
public static final String PLUGIN_KEY = "SUB_VECTOR_SIZE_2_SUM";
public SubVectorSumFunction() {
super(PLUGIN_KEY);
}
@Override
public String key() {
return PLUGIN_KEY;
}

The function can now be called wherever it is needed, using its plugin key. Let's write its logic. The function will always output numerical arrays, meaning it can override the getAggregatedType method like so:

@Override
public int getAggregatedType(final int sourceType) {
if (sourceType == Types.TYPE_INT_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_LONG_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_FLOAT_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_DOUBLE_ARRAY) {
return sourceType;
} else {
throw unsupportedSourceType(sourceType);
}
}

Then, the function must produce an IAggregation, the appropriate IGenericAggregation type in the single source column case.

@Override
protected IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {
return new SubVectorSumAggregation(
sourceIdentifier, this, sourceType, getAggregatedType(sourceType));
}

SubVectorSumFunction can extend AAggregation<CountIfPositiveFunction> to inherit boilerplate getters.

public static class SubVectorSumAggregation extends AAggregation<SubVectorSumFunction> {

First, the IGenericAggregation must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:

public SubVectorSumAggregation(
final String sourceIdentifier,
final SubVectorSumFunction aggFun,
final int sourceType,
final int aggregatedType) {
super(sourceIdentifier, aggFun, sourceType, aggregatedType);
}

The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:

@Override
public IAggregationBinding bindSource(
final IPositionalReader source, final IArrayWriter aggregates) {
return new SubVectorSumBinding(source, aggregates);
}

Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together. That means, if both columns contain aggregated values obtained through the SubVectorSumAggregation, they can be combined.

Since the "2 elements" condition has been applied using the SubVectorSumBinding mentioned above, when aggregating source values, the bindings of the SumFunction can be re-used to sum the vectors of the two columns.

@Override
public IAggregationBinding bindAggregates(
final IPositionalReader input, final IArrayWriter output) {
return new SumVectorAggregationBinding(input, output);
}

All that is left to do is to write the SubVectorSumBinding implementation, extending the abstract vector binding AVectorAggregationBinding:

public static class SubVectorSumBinding extends AVectorAggregationBinding {
public SubVectorSumBinding(final IPositionalReader source, final IArrayWriter aggregates) {
super(source, aggregates);
}
@Override
protected void doCopy(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
writeReadOnlyAggregate(toPosition, null);
return;
}
// Take care not to copy to input vector
final IVector vectorToWrite = inputVec.subVector(0, 2);
writeReadOnlyAggregate(toPosition, vectorToWrite);
}
@Override
protected void doAggregate(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
return;
}
if (inputVec.size() < 2) {
throw new IllegalArgumentException(
"Source vectors are expected to contain more than one element.");
}
if (readReadOnlyAggregate(toPosition) == null) {
doCopy(fromPosition, toPosition);
return;
}
// Handles necessary cloning behind the scene. The initially copied sub-vector must not be
// written
// into.
final IVector currentAggregate = readWritableAggregate(toPosition);
currentAggregate.plus(inputVec.subVector(0, 2));
}
@Override
protected void doDisaggregate(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
return;
}
final IVector currentAggregate = readReadOnlyAggregate(toPosition);
if (currentAggregate == null) {
throw new IllegalStateException("Disaggregating from a null value.");
} else {
final IVector currentWritableAggregate = readWritableAggregate(toPosition);
currentWritableAggregate.minus(inputVec.subVector(0, 2));
}
}
}