Aggregation Functions
Concept
In Database Management Systems (DBMS), an Aggregation Function, or Aggregate Function, is a function that operates on values across rows to perform mathematical calculations such as sum, average, count, minimum/maximum values, standard deviation, as well as some non-mathematical operations, like finding the range of dates covered by the values. An aggregate function takes multiple rows as input and produces a single output.
While the term can also be found in OnLine Transaction Processing (OLTP) workflows, with standard SQL aggregation functions like COUNT
, MIN
, etc,
OnLine Analytical Processing (OLAP) systems such as ActivePivot are built specifically for aggregation.
Similarly to how an aggregation function is, in any DBMS, associated with some columns at query time, ActivePivot aggregation functions are associated with table columns to create aggregated measures.
The SQL query SELECT SUM(dollar_value) FROM trades
will naturally return the sum of the values of all trades stored in the trades table.
Similarly, the ActivePivot SumFunction
can be associated with the column dollar_value
to create the measure dollar_value.SUM
.
The same SUM value for all trades can then be found in the cube, as a top level aggregate.
The concept of aggregation is orthogonal to data storage, and should rather be considered as an analytics notion.
Vocabulary
An aggregate function takes multiple rows as input and produces a single output.
The input values are called source data
, or source values
, and the output value is called aggregated value
.
Aggregating source value trade_value
into the aggregated value trade_aggregate
is the act of adding a trade_value
contribution to trade_aggregate
, according to the selected aggregation function.
For instance, if result = 2
and val = 3
, aggregating trade_value
to trade_aggregate
:
- using the
SUM
function will producetrade_aggregate = 5
. - using the
MAX
function will producetrade_aggregate = 3
. - using the
SQUARE_SUM
function will producetrade_aggregate = 2 + 3*3 = 11
.
ActivePivot Specifics
As mentioned in the Concept Section, ActivePivot is tailored for OLAP workloads.
To provide an adequate performance, ActivePivot's aggregation engine can store pre-aggregated data
for some selected cube locations, in the AggregateProvider
component.
For instance, if a bank has a million open positions on the market, visualizing the total amount involved in these trades requires retrieving a million values and summing them.
However, if the bank is divided into 15 entities, and ActivePivot is storing pre-aggregated data at the entity level, computing that total amount only requires summing the 15 aggregated values.
Let's use a very small example to represent the aforementioned storage component:
Location | dollar_value.SUM |
---|---|
EntityA | 1000 |
EntityB | 50 |
EntityC | null |
EntityC
has no open position as of end of day yesterday.
Today, all 3 entities closed some positions, and opened some new ones.
Added:
Location | dollar_value.SUM |
---|---|
EntityB | 2 |
EntityA | 264 |
EntityC | 46000 |
EntityD | 150 |
The bank created a new entity and traded in its name.
Removed:
Location | dollar_value.SUM |
---|---|
EntityA | 110 |
EntityB | 27 |
In these tables, the entities are not in the same order as in the storage. The transactional flow that updates the aggregate provider will take care of aligning the locations for data coherence, resulting in:
Location | dollar_value.SUM |
---|---|
EntityA | 1000 - 110 + 264 = 1154 |
EntityB | 50 + 2 - 27 = 25 |
EntityC | 46000 |
EntityD | 150 |
The previous example showcases how the SUM
aggregation function is responsible for more than simply taking a group of values and summing them.
It knows how to initialize the first value for EntityD
, it knows how to aggregate some trades to that first copied value, and how to disaggregate other trades from the stored value.
It knows how to group pre-aggregated values to output an aggregated value that represents a bigger scope.
To be able to do all of this, ActivePivot aggregation functions are able to take some source columns, and an aggregated column, and bind them together. The use of that binding is outside the scope of the function, which only handles its creation.
In ActivePivot, the aggregation function concept is divided into 3 separate contracts:
Specified by the
IGenericAggregationFunction
contract, aggregation functions represent the exposed API. Each aggregation function is anIPluginValue
, stored in the registry, under thePlugin
GenericAggregationFunctionPlugin
. Custom implementations must be added to the registry, through annotations or manual insertion.Aggregation Functions are factories for
IGenericAggregation
s, which can each be seen as an implementation of its associated Aggregation Function for specific data sources.IGenericAggregation
s are objects that keep:- the names of the columns that hold the data being aggregated. These may be table fields, measure names, etc... This is used to uniquely identify the sources an aggregation requires.
- the types, from the
com.qfs.store.Types
API, of these columns. - the type of the aggregated data. It is the type of the result of the reduction function mentioned beforehand.
Based on these information, an
IGenericAggregation
will specify how the chunks that will hold the aggregated data are created (typically, chunks of the particular aggregated type).IGenericAggregation
s are used to produceIAggregationBinding
s, which are responsible for the computation of the aggregated values.
Natively Supported Aggregation Functions
This is the list of natively supported aggregation functions, in alphabetical order.
The following notations are used:
Symbol | Meaning |
---|---|
fully supported | |
partially supported | |
will not be supported |
In cases where null
is supported, the aggregated type will depend on the nullability of the types of the sources.
Average
Returns the average of the source values. The average is defined as the sum of the contributions, divided by the number of contributions.
Support
null
values are strictly ignored. They do not influence the sum of the contributions, nor do they influence the number of contributions.
If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | double | ||
Non-Numerical scalar | |||
Numerical vectors | double vector | Returns a vector containing the term by term average. Aggregated vectors must all have the same size. | |
Non-Numerical vectors |
This function supports disaggregation. Disaggregating a value removes its contribution from the sum, and reduces the number of contributions.
Disaggregating down to zero
contributions will return null
, to prevent dividing by zero
.
Copy
Copies the given source value and replace the aggregated value.
Support
null
values are supported. Copying null
will effectively write null
in place of the previous aggregated value.
The type of the aggregated value is exactly the type of the source values.
Type | Support |
---|---|
Numerical scalar | |
Non-Numerical scalar | |
Numerical vectors | |
Non-Numerical vectors |
This function does not support aggregating or disaggregating. It only supports replacing the current value with the source value.
Count
Counts the number of contributions to the aggregated value, no matter the contributed values.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
This function returns long
values.
Type | Support |
---|---|
Numerical scalar | |
Non-Numerical scalar | |
Numerical vectors | |
Non-Numerical vectors |
This function supports disaggregation. Disaggregating a value reduces the number of contributions.
Gross Sum
Returns the sum of the absolute values of the contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type | ||
Non-Numerical scalar | |||
Numerical vectors | Same as source type | Returns a vector containing the term by term gross sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |
Non-Numerical vectors |
This function supports disaggregation.
Long
Returns the sum of the positive contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type | ||
Non-Numerical scalar | |||
Numerical vectors | Same as source type | Returns a vector containing the term by term long sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |
Non-Numerical vectors |
This function supports disaggregation.
Max
Returns the maximum of the contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Objects implementing Comparable
are NOT supported.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type | ||
Non-Numerical scalar | |||
Numerical vectors | |||
Non-Numerical vectors |
This function does not support disaggregation. A specialized version of this function, MaxHistoryFunction
, supports the disaggregation of values.
To do so, it must keep the entire history of contributed values, to be able to retrieve the next maximum. Thus, its usage is memory intensive.
Median
Returns the median of the contributions. In other words, if there is an odd number of contributions, it returns their middle value. If there is an even number of contributions, the average of the two values that are closest to the middle is returned.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, or no values are aggregated, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | double | ||
Non-Numerical scalar | |||
Numerical vectors | |||
Non-Numerical vectors |
This function supports disaggregation, albeit with poor performance. As it must keep the entire history of contributed values, this function is quite memory-intensive.
Min
Returns the minimum of the contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Objects implementing Comparable
are NOT supported.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type | ||
Non-Numerical scalar | |||
Numerical vectors | |||
Non-Numerical vectors |
This function does not support disaggregation. A specialized version of this function, MinHistoryFunction
, supports the disaggregation of values.
To do so, it must keep the entire history of contributed values, to be able to retrieve the next minimum. Thus, its usage is memory intensive.
Multiply
Returns the product of the contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type*. | *int are aggregated as long to prevent overflows. | |
Non-Numerical scalar | |||
Numerical vectors | |||
Non-Numerical vectors |
This function supports disaggregation.
Short
Returns the sum of the negative contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type | ||
Non-Numerical scalar | |||
Numerical vectors | Same as source type | Returns a vector containing the term by term short sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |
Non-Numerical vectors |
This function supports disaggregation.
Single Value
Copies the given source value. Once an aggregated value, it is only possible to contribute a value that is equal to the aggregated value. This contribution has no effect. This aggregation function is mostly used to acknowledge the presence of values, without needing to know which ones.
Support
null
values are supported. Copying null
will effectively write null
in place of the previous aggregated value.
Contributing a non-null
value when the aggregated value is null
will simply copy the source value into the aggregated column.
Contributing null
has no effect, even if the aggregated value isn't null
.
The type of the aggregated value is exactly the type of the source values.
Type | Support |
---|---|
Numerical scalar | |
Non-Numerical scalar | |
Numerical vectors | |
Non-Numerical vectors |
This function does not support disaggregation.
Square Sum
Returns the square sum of the contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type*. | *int are aggregated as long to prevent overflows. | |
Non-Numerical scalar | |||
Numerical vectors | |||
Non-Numerical vectors |
This function supports disaggregation.
Standard deviation
Calculates the standard deviation of the contributions using either the population or sample formulas depending on the plugin key.
Support
null
values are strictly ignored. If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | double | All values are aggregated as double s | |
Non-Numerical scalar | |||
Numerical vectors | |||
Non-Numerical vectors |
This function supports disaggregation.
Sum
Returns the sum of the contributions.
Support
null
values are strictly ignored.
If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | Same as source type | ||
Non-Numerical scalar | |||
Numerical vectors | Same as source type | Returns a vector containing the term by term sum. Supports aggregating vectors of different sizes. The aggregated value's size is the largest of the source vectors, the smaller ones being padded with zeroes. | |
Non-Numerical vectors |
This function supports disaggregation.
Timestamp
Records the timestamp of the last update. When contributing a source value to the aggregated value, this function will check which of the two temporal values is the latest, and keep it as the aggregated value.
Support
This function supports int
and long
representations of a timestamp. null
is not supported in that case.
It also supports ITimestamp
objects. In that case, null
values are supported.
Disaggregating is also an update. For this function, disaggregating and aggregating values are the same thing.
Variance
Calculates the variance of the contributions using either the population or sample formulas depending on the plugin key.
Support
null
values are strictly ignored. If all the values aggregated into the output value are null
, the output value is null
as well.
Type | Support | Aggregated Type | Details |
---|---|---|---|
Numerical scalar | double | All values are aggregated as double s | |
Non-Numerical scalar | |||
Numerical vectors | |||
Non-Numerical vectors |
This function supports disaggregation.
Modifying native Aggregation Functions
The Aggregation Functions in ActivePivot are PluginValue
s. As such, they are registered in the global Registry
, where they can
be replaced by a different implementation.
For instance, the SumFunction
outputs int
primitive values when summing int
source values. A user may know that the amount of
data one is using SUM
with might result in Integer Overflow errors. The user can then override the SumFunction
with a CustomSumFunction
,
and modify its getAggregatedType(sourceType)
method to return Types.TYPE_LONG
instead of Types.TYPE_INT
when the source type is
Types.TYPE_INT
.
Then, this user needs to ensure CustomSumFunction
will take the place of SumFunction
in the Registry.
Writing a custom Aggregation Function
Writing a custom aggregation function will require implementing the 3 aforementioned contracts:
IGenericAggregationFunction
, or one of its specialized interface or abstract classIGenericAggregation
, or one of its specialized interface or abstract classIAggregationBinding
, or one of its specialized abstract class
Let's write a function that counts the number of strictly positive contributions.
First, one needs to implement the IGenericAggregationFunction
contract.
Since this function will only count one source column at a time,
it can extend the AAggregationFunction
abstract class to reduce boilerplate code.
The function CountIfPositiveFunction
needs to define the key that uniquely identifies it in the Registry,
and to make itself known to the Registry:
@QuartetPluginValue(intf = IGenericAggregationFunction.class)
public class CountIfPositiveFunction extends AAggregationFunction {
public static final String PLUGIN_KEY = "COUNT_IF_POSITIVE";
public CountIfPositiveFunction() {
super(PLUGIN_KEY);
}
@Override
public Object key() {
return PLUGIN_KEY;
}
The function can now be called wherever it is needed, through its plugin key. Let's now write its logic.
The function will always output long
values, meaning it can override the getAggregatedType
method like so:
@Override
public int getAggregatedType(final int sourceType) {
if (Types.isDictionary(sourceType) || !Types.isNumericalType(sourceType))
throw unsupportedSourceType(sourceType);
return Types.TYPE_LONG;
}
Since the function will check for positive values, it must be numerical. The values cannot be dictionarized, as the function needs to read the value to perform this check.
Then, the function must produce an IGenericAggregation
, or, in that case with one source column, its specialization IAggregation
:
@Override
public IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {
return new CountIfPositiveAggregation(sourceIdentifier, this, sourceType, getAggregatedType(sourceType));
}
CountIfPositiveAggregation
can extend AAggregation<CountIfPositiveFunction>
. That way, boilerplate getters are already defined in the parent class.
public static class CountIfPositiveAggregation extends AAggregation<CountIfPositiveFunction> {
First, the IGenericAggregation
must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:
public CountIfPositiveAggregation(
final String sourceIdentifier,
final CountIfPositiveFunction aggFun,
final int sourceType,
final int aggregatedType) {
super(sourceIdentifier, aggFun, sourceType, aggregatedType);
}
Since this aggregation outputs Types.TYPE_LONG
, the factory is relatively straightforward:
@Override
public IChunkFactory<?> createChunkFactory(
final boolean isTransient,
final IAllocationSettings allocationSettings) {
final boolean nullable = false;
return new ChunkFactory<>(
isTransient,
allocationSettings,
(allocator, settings, chunkSize) -> allocator.allocateChunkLong(chunkSize, nullable));
}
This aggregation could also use the helper factories from ChunkFactories
.
The code snippet above showcases a more generic implementation.
The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:
@Override
public IAggregationBinding bindSource(IArrayReader source, IArrayWriter destination) {
return new CountIfPositiveBinding(source, destination);
}
Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together.
That means, if both columns contain aggregated values obtained through the CountIfPositiveFunction
, they can be combined.
Since the "positivity of the source values" condition has been checked using the CountIfPositiveBinding
mentioned above,
when aggregating source values, the bindings of the SumFunction
can be re-used to sum the count of the two columns.
@Override
public IAggregationBinding bindAggregates(IArrayReader source, IArrayWriter destination) {
return new SumBindingLong(source, destination);
}
Let's now write the CountIfPositiveBinding
:
public static class CountIfPositiveBinding extends AAggregationBinding {
private final IArrayReader source;
private final IArrayWriter aggregates;
public CountIfPositiveBinding(final IArrayReader source, final IArrayWriter aggregates) {
super();
this.source = source;
this.aggregates = aggregates;
}
@Override
public void copy(int from, int to) {
aggregates.writeLong(to, source.readDouble(from) > 0 ? 1L : 0L);
}
@Override
public void aggregate(int from, int to) {
if (source.readDouble(from) > 0)
aggregates.addLong(to, 1L);
}
@Override
public void disaggregate(int from, int to) {
if (source.readDouble(from) > 0)
aggregates.addLong(to, -1L);
}
}
Advanced considerations
Strongly typed aggregation functions
In the CountIfPositiveFunction
example, one may note that in order to perform the comparison to 0, the sources are read using the specialized readDouble
method.
This allows the example to correctly function for the four numerical primitive types in Java supported in ActivePivot, using the automatic primitive conversions of the language.
However, a user can write specifically typed aggregation bindings for each of the supported primitive type. In doing so, the user avoids this unnecessary conversion.
To achieve so, one must rewrite the IAggregation createAggregation(final String sourceIdentifier, final int sourceType)
function to handle specific types in a different way,
and follow the previous tutorial's flow for each type.
Null support
Note that for simplicity's sake, the above snippets do not handle null
values in the sources.
The correct condition should probably be: if (!source.isNull(from) && source.readDouble(from) > 0)
.
In that case, it would be even better to create two bindings: CountIfPositiveBinding
and CountIfPositiveBindingWithNullSupport
.
The second would check for null values in the sources, at the cost of a null check, while the first one would be specialized for sources that are not nullable,
and doesn't need that additional check, for performance aspects.
Mutability/Immutability
While writing bindings for primitive types is relatively straight forward, aggregating objects is a much more delicate matter,
because of the memory footprint of Objects
.
Since arguments in Java are always passed-by-value, one must carefully plan around the potential modification of the aggregated Object
.
For instance, let's write an aggregation function that takes a source of java.time.Duration
(that represents the time that
passed between the submission of a market order and its execution), and computes the total time it took to execute all orders.
This can be adapted to compute the average time for an order execution, but for simplicity's sake, this example will showcase the SUM's implementation.
Let's say these are the values stored in a database table named trades: (notation :
Duration(1, 500)
is a Duration
object that has been created using private Duration.create(1L second, 500 nanoseconds)
)
TradeId | dollar_value | execution_time |
---|---|---|
0 | 1000 | Duration(1, 500) |
1 | 50 | Duration(0, 65000) |
2 | 250 | Duration(12, 888 888) |
To compute execution_time.SUM
, the binding can start by copying Duration(1, 500)
in the aggregated column.
To simplify this example, let's assume that all data must be aggregated at index 0 of the aggregated column.
First, binding.copy(0, 0)
is called, with this definition:
public class DurationBinding extends AAggregationBinding {
@Override
public void copy(int from, int to) {
final Duration sourceDuration = source.read(from);
destination.write(to, sourceDuration);
}
}
Following the Java Memory model, this action will add in the corresponding cell of the aggregated column
a reference to the actual Duration(1, 500)
that is stored on heap.
Then, to aggregate the next value, the binding can call binding.aggregate(1, 0)
:
public class DurationBinding extends AAggregationBinding {
@Override
public void aggregate(int from, int to) {
final Duration sourceDuration = source.read(from);
final Duration currentDuration = destination.read(to);
destination.write(to, currentDuration.plus(sourceDuration));
}
}
However, doing so will, according to Duration#plus(Duration)
's javadoc, create a new Duration
object as the result of the sum.
This means that every aggregation operation creates a new object on the heap, and allows the previous object to be garbage collected once it is referenced no more.
This is problematic, because aggregation bindings are performance sensitive, and this method may be called millions of times, thus creating millions of transient
instances of the class java.time.Duration
and needlessly putting pressure on the application.
Since no way around this issue can be found while using the immutable java.time.Duration
, there is a strong incentive to create a mutable copy of the class,
which will be called MutableDuration
in the rest of this page.
This isn't so far-fetched: before Java 8, time representations were so poor that
org.joda.time
was developed as a very popular open-sourced alternative.org.joda.time.MutableDateTime
was created as part of this alternative, and serves as a model for this example.
MutableDuration
will have the exact same API as java.time.Duration
, but for this example's sake, public Duration plus(Duration)
is replaced by public void plus(MutableDuration)
.
This method mutates the current object.
The binding now evolves to become:
public class MutableDurationBinding extends AAggregationBinding {
@Override
public void aggregate(int from, int to) {
final MutableDuration sourceDuration = source.read(from);
final MutableDuration currentDuration = destination.read(to);
currentDuration.plus(sourceDuration); // no need to write to the destination column anymore, as the currentDuration is modified in place.
}
}
To recap,
binding.copy(0, 0)
will copyMutableDuration(1, 500)
to the aggregated column, in the cell of index 0.binding.aggregate(1, 0)
will aggregateMutableDuration(0, 65000)
into the current value in the cell at index 0.
However, this operation is now mutating the instance MutableDuration(1, 500)
, which is the same instance as the one stored in the trades. It just became MutableDuration(1, 65500)
.
This means that because of the pass-by-value paradigm, the MutableDuration
being modified in the same object stored on the Java Heap that is referenced from the database table
To prevent that from ever happening, the copy
method of the binding can be smarter: it can clone the MutableDuration
instead of simply passing it, and store the cloned
object in the aggregated column:
public class MutableDurationBinding extends AAggregationBinding {
@Override
public void copy(int from, int to) {
final MutableDuration sourceDuration = source.read(from);
destination.write(to, MutableDuration.from(sourceDuration)); // write a clone that is only owned by the aggregated column.
}
}
This solution will not have any side effect, but it will still prove costly memory-wise.
Indeed, if there are lots of aggregated values that are computed simply by copying an initial contribution, without ever aggregating further contributions,
it will have cloned many MutableDuration
objects to no benefit.
An optimal solution will only clone when it's needed. It will first copy the value, but mark it as unmodifiable. Then, at the time of calling aggregate
,
it will clone the current value,and then call MutableDuration#plus(MutableDuration)
on the cloned value.
That cloned object will forever be the one stored in the aggregated column, and all users are free to modify that value without impacting any other code part that could hold a reference to the same address on the Java Heap.
To help users achieve this ideal scenario, ActivePivot comes with the abstract aggregation binding AAggregationBindingCloneable
.
This binding simply requires the user to implement MutableDuration cloneAggregate(MutableDuration)
, on top of the regular methods of a binding.
In exchange, it provides multiple methods that handle cloning for the user.
public class MutableDurationBinding extends AAggregationBindingCloneable<MutableDuration> {
protected MutableDurationBinding(final IArrayReader source, final IArrayWriter destination) {
super(source, destination);
}
@Override
protected MutableDuration cloneAggregate(final MutableDuration aggregate) {
return MutableDuration.from(aggregate);
}
@Override
public void copy(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
writeReadOnlyAggregate(toPosition, sourceDuration); // can be null
}
@Override
public void aggregate(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
if (Objects.isNull(sourceDuration)) {
return; // Nothing to do.
}
final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);
if (Objects.isNull(currentAggregate)) {
copy(fromPosition, toPosition);
} else {
// will handle cloning for me if necessary
final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);
writableCurrentAggregate.plus(sourceDuration);
}
}
@Override
public void disaggregate(final int fromPosition, final int toPosition) {
final MutableDuration sourceDuration = readInput(fromPosition);
if (Objects.isNull(sourceDuration)) {
return; // Nothing to do.
}
final MutableDuration currentAggregate = readReadOnlyAggregate(toPosition);
if (Objects.isNull(currentAggregate)) {
throw new IllegalStateException("Cannot disaggregate from a null value");
} else if (sourceDuration == currentAggregate) {
// Shortcut in the exceptional case where this is the exact same object, replace it by null.
removeAggregate(toPosition);
} else {
// will handle cloning for me if necessary
final MutableDuration writableCurrentAggregate = readWritableAggregate(toPosition);
writableCurrentAggregate.minus(sourceDuration);
}
}
}
Aggregating Vectors
Vectors are one of the most frequently used features of ActivePivot. Querying and aggregating vectors is extremely CPU and memory intensive, and should be accomplished with great care.
ActivePivot provides a specialization of the class AAggregationBindingCloneable
, designed for vector aggregation: AVectorAggregationBinding
.
This binding handles the internal aspects of vector aggregation: on-heap and off-heap allocation, cloning, reference counting for garbage collection, etc...
In order to implement a vector aggregation binding, it is strongly recommended to extend this class.
Let's write an aggregation function that produces the sum of each source vector, limited to their first two elements.
This function will typically be difficult to write, for the following reasons:
- The source
IVector
s may be on-heap or off-heap. - The aggregated
IVector
can be lazily copied, as seen in the previous example withMutableDuration
. - Acquiring and storing a sub-part of an
IVector
may have an impact on the vector's reference counting.
First, one needs to implement the IGenericAggregationFunction
contract.
Since this function will only count one source column at a time, it can extend the AAggregationFunction
abstract class to reduce boilerplate code.
The SubVectorSumFunction
function needs to add itself to the Registry using a unique key:
@QuartetPluginValue(intf = IGenericAggregationFunction.class)
public class SubVectorSumFunction extends AAggregationFunction {
public static final String PLUGIN_KEY = "SUB_VECTOR_SIZE_2_SUM";
public SubVectorSumFunction() {
super(PLUGIN_KEY);
}
@Override
public Object key() {
return PLUGIN_KEY;
}
The function can now be called wherever it is needed, using its plugin key. Let's write its logic.
The function will always output numerical arrays, meaning it can override the getAggregatedType
method like so:
@Override
public int getAggregatedType(final int sourceType) {
if (sourceType == Types.TYPE_INT_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_LONG_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_FLOAT_ARRAY) {
return sourceType;
} else if (sourceType == Types.TYPE_DOUBLE_ARRAY) {
return sourceType;
} else {
throw unsupportedSourceType(sourceType);
}
}
Then, the function must produce an IAggregation
, the appropriate IGenericAggregation
type in the single source column case.
@Override
protected IAggregation createAggregation(final String sourceIdentifier, final int sourceType) {
return new SubVectorSumAggregation(sourceIdentifier, this, sourceType, getAggregatedType(sourceType));
}
SubVectorSumFunction
can extend AAggregation<CountIfPositiveFunction>
to inherit boilerplate getters.
public static class SubVectorSumAggregation extends AAggregation<SubVectorSumFunction> {
First, the IGenericAggregation
must be able to return the identifier(s) of the source column(s) being aggregated, their types, the aggregated type, and the function that created it:
public SubVectorSumAggregation(
final String sourceIdentifier,
final SubVectorSumFunction aggFun,
final int sourceType,
final int aggregatedType) {
super(sourceIdentifier, aggFun, sourceType, aggregatedType);
}
Since this aggregation outputs vectors, the factory is relatively straightforward:
@Override
public IChunkFactory<?> createChunkFactory(
final boolean isTransient,
final IAllocationSettings allocationSettings) {
return ChunkFactories.chunkMarkedVectorFactory(getInputDataType(), isTransient, allocationSettings);
}
This time, the example uses a helper method from the class ChunkFactories
. Vectors need to be aggregated into ChunkMarkedVector
, which the helper factory ensures.
The aggregation must be able to bind a source of data to a column of aggregates. To do so, it must create a specialized binding:
@Override
public IAggregationBinding bindSource(final IArrayReader source, final IArrayWriter aggregates) {
return new SubVectorSumBinding(source, aggregates);
}
Finally, the aggregation must be able to bind 2 columns of pre-aggregated data together.
That means, if both columns contain aggregated values obtained through the SubVectorSumAggregation
, they can be combined.
Since the "2 elements" condition has been applied using the SubVectorSumBinding
mentioned above, when aggregating source values, the bindings of the SumFunction
can be re-used to sum the vectors of the two columns.
@Override
public IAggregationBinding bindAggregates(final IArrayReader input, final IArrayWriter output) {
return new SumVectorAggregationBinding(input, output);
}
All that is left to do is to write the SubVectorSumBinding
implementation, extending the abstract vector binding AVectorAggregationBinding
:
public static class SubVectorSumBinding extends AVectorAggregationBinding {
public SubVectorSumBinding(final IArrayReader source, final IArrayWriter aggregates) {
super(source, aggregates);
}
@Override
protected void doCopy(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
writeReadOnlyAggregate(toPosition, null);
return;
}
// Take care not to copy to input vector
final IVector vectorToWrite = inputVec.subVector(0, 2);
writeReadOnlyAggregate(toPosition, vectorToWrite);
}
@Override
protected void doAggregate(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
return;
}
if (inputVec.size() < 2) {
throw new IllegalArgumentException(
"Source vectors are expected to contain more than one element.");
}
if (readReadOnlyAggregate(toPosition) == null) {
doCopy(fromPosition, toPosition);
return;
}
// Handles necessary cloning behind the scene. The initially copied sub-vector must not be written
// into.
final IVector currentAggregate = readWritableAggregate(toPosition);
currentAggregate.plus(inputVec.subVector(0, 2));
}
@Override
protected void doDisaggregate(final int fromPosition, final int toPosition) {
final IVector inputVec = readInput(fromPosition);
if (inputVec == null) {
return;
}
final IVector currentAggregate = readReadOnlyAggregate(toPosition);
if (currentAggregate == null) {
throw new IllegalStateException("Disaggregating from a null value.");
} else {
final IVector currentWritableAggregate = readWritableAggregate(toPosition);
currentWritableAggregate.minus(inputVec.subVector(0, 2));
}
}
}