Skip to main content

Datastore Transactions

Loading, updating, and removing data in the datastore is done through transactions. Datastore transactions are atomic, consistent, isolated and durable, meaning that users performing queries will only see the impact of a transaction when it is fully committed.

Because the Datastore implements MultiVersion Concurrency Control (MVCC), the transactions and the queries do not lock each other. A query that started on version 'v' of the data will not be paused or interrupted by a subsequent transaction. When the transaction is ready to commit, it will not wait for the query to complete but will commit concurrently to the running query, introducing version 'v+1' of the data. Until the query has completed, it will consistently see version 'v' of the data.

A datastore has one Transaction Manager that manages access to all the stores (com.activeviam.database.datastore.api.transaction.ITransactionManager). The Transaction Manager knows how the stores are linked together by references.

A configurable pool of threads is allocated to the Transaction Manager (see Managing Transaction and Query Thread Pools).

Business Requirement

Users that take decisions based on Atoti must be sure the information they are looking at is consistent.

Consider a situation where Atoti is used as a real-time sensitivity tool for equity derivatives. It is important that all the risks that a security is sensitive to are consistent with each other and use the same security market data. In this case, Atoti must ensure that all the sensitivities derived for the last market data update are aggregated together, and that the user looking at their dashboard does not see sensitivities that combine values derived from the last re-evaluation together with values derived from the previous one for a specific security.

Atoti's transaction mechanism implements this requirement, and enables sensitivities to be updated continuously, in real-time, while maintaining proper consistencies of aggregated values.

Technical Requirement

All operations performed on a hypercube must be transactional. That is, the set of operations that perform a specific task are:

  • Atomic: Either the set of operations succeeds completely, or the whole task is aborted. The system cannot be left with a task partially done.
    For example, in a transfer of funds, both debit and credit movements take place, or neither occurs.
  • Consistent: The set of operations starts with the system in a consistent state, and leaves the system in a (different) consistent state.
    For example, the amount of debit movements must balance the amount of credit movements.
  • Isolated: The intermediate states within a set of operations are never visible to users of a system, or to other transactions.
    This means that queries will be temporarily prevented during execution of a transaction (or, thanks to MVCC, will run on a different version of the data than the one affected by the transaction).
  • Durable: Once a set of operations has completed successfully, the results persist in the system and cannot be undone, although another transaction might reverse them.

These properties are sometimes referred to by the acronym 'ACID'.

Transaction Manager

The Transaction Manager exposes methods to manage transactions. It can be retrieved from the Datastore (com.activeviam.database.datastore.api.IDatastore.getTransactionManager()), or from the ActivePivotSchema. It manages sequences of operations between explicit startTransaction and commit calls.

We advise building large transactions instead of building multiple little ones. All of Atoti's components have been designed to handle large sets of data.

Here is a simple example:

// Get the transaction manager
final ITransactionManager tm = database.getTransactionManager();
// Start a transaction on a list of impacted stores
// (allows for parallel transactions on non-related stores)
tm.startTransaction(STORE_NAME);
try {
// Get the data to add
// each object array represents a record,
// in the same field order as the fields declared in the datastore description
final List<Object[]> toAdd = generateData();
// Add all records in the same transaction
tm.addAll(STORE_NAME, toAdd);
} catch (final Exception ex) {
tm.rollbackTransaction();
throw new MyApplicativeException("Problem while adding data in transaction", ex);
}
// Commit the transaction: either all records will be added,
// or the transaction will be rolled back if an error has occurred
tm.commitTransaction();

If you insert a record for which the key already exists in the store, it is considered an update, which is internally represented as a delete then insert.

If the same key is added twice during the same transaction, the behavior depends on the duplicate key handler defined for the store (see com.activeviam.database.datastore.api.description.impl.DuplicateKeyHandlers).

Only one transaction can be started on a given store at any given time. Locking occurs in the manager because you cannot process two feeds in parallel. The lock is held until the transaction is committed.

If a call to a basic operation fails validation, the transaction will fail during the "prepare commit" phase, which happens just before merging submitted data into the target schema. The calling code can then handle the exception the way you want it.

How to correctly handle potential exceptions of a transaction

There are multiple steps where a transaction can fail:

  • when starting a transaction,
  • during the transaction,
  • when committing, and
  • when roll-backing.

Here is an example showing how to correctly try / catch your code.

final ITransactionManager transactionManager = database.getTransactionManager();
try {
transactionManager.startTransaction(STORE_NAME);
} catch (final Exception ex) {
throw new MyApplicativeException("An error occurred when starting a new transaction.", ex);
}
// now the transaction is started
try {
doMyTransactionalWork();
} catch (final Exception ex) {
try {
transactionManager.rollbackTransaction();
} catch (final DatastoreTransactionException re) {
ex.addSuppressed(
new MyApplicativeException(
"Automatic transaction rollback following an error: "
+ "the transaction rollback has failed.",
re));
}
throw new MyApplicativeException("An error occurred inside the transactional work.", ex);
}
// Commit the transaction
try {
transactionManager.commitTransaction();
} catch (final Exception ex) {
throw new MyApplicativeException("An error occurred during the commit of a transaction.", ex);
}

If you don't want the burden of taking care of all exceptions and systematically want to rollback if there is an error and throw a runtime exception, then you can use the much simpler ITransactionManager.performInTransaction method:

final ITransactionManager transactionManager = database.getTransactionManager();
transactionManager.performInTransaction(
t -> {
t.addAll(STORE_NAME, generateData());
},
STORE_NAME);

Transaction Listeners

Atoti transactions are not accessible from outside the Transaction Manager. However, Atoti generates transaction information objects, which can be received by Listener objects.

A transaction information object contains information about:

  • objects added,
  • objects removed,
  • the commit date/time, and
  • custom properties.

To start listening for transactions, a developer only needs to implement the transaction listener's interface and to register a transaction listener instance to the transaction manager.

Here is the transaction listener interface to implement:

/**
* A transaction listener is listening transaction managers in order to intercept every important
* action:
*
* <ul>
* <li>When a transaction is started.
* <li>When a transaction is rolled back.
* <li>When a transaction is committed.
* </ul>
*
* <p>Each callback method is passed the id of the transaction manager, and the id of the associated
* transaction within the scope of that manager.
*
* <p>Unlike an {@link IAsyncCompositeViewListener}, this listener focuses only on the execution of
* the transaction. The content of the transaction is not relevant to such a listener.
*
* <p>Listeners are expected to have very fast operations. Listeners are sequentially notified of
* every transaction step before the transaction can move to the next step. One shall not assume
* that there are any effort made to notify listeners as fast as possible.<br>
* Listeners are also expected to not fail. Exceptions from listeners are ignored.
*
* @author ActiveViam
* @see ITransactionManager
*/
public interface ITransactionListener {
/** Method called when a transaction manager has just started a new transaction. */
void transactionStarted(long transactionId, ITransactionInformation transactionInfo);
/** Method called when a transaction manager has successfully committed a transaction. */
void transactionCommitted(long transactionId, IEpoch newVersionEpoch);
/** Method called when a transaction manager has rolled back a transaction. */
void transactionRolledBack(long transactionId);
}

Here is how to register a transaction listener:

database.getTransactionManager().addTransactionListener(listener);

We guarantee that transaction listeners are synchronously notified.

For instance, if a listener has to query the cube whenever a transaction is committed, no transaction will be started until the listener finishes its work.

Transaction Types

There are two types of transactions in the datastore. The engine selects the transaction type automatically depending on the state of the application and the stores declared in the transaction. The choice cannot be forced by the user.

  • Initial transaction: This transaction type was designed for the initial load of an application, for instance, when you start Atoti and it loads a start-of-day file set from scratch.

    In this mode, the Atoti Server is aware that there are no previous data to update and thus can apply a bunch of optimizations for quicker loading. It is thus recommended to load the initial data in one big transaction rather than several smaller ones.

    A transaction will be set to this type if there has never been a commit on any of the stores it impacts. In the specific case of several concurrent transactions, as each transaction locks the stores it impacts, the same reasoning is applied.

  • Incremental transaction: This transaction type was designed for updates. They can affect multiple stores.

    A transaction will be set to this type if any one of the impacted store has already been committed to.

Reference resolution is done at commit time of a transaction. It is recommended to commit both the target and owner store of a reference in the same transaction to avoid performance issues. If owner and target stores must be updated in different commits, it is recommended to commit the target store before the owner store.

Transaction Rules

There are rules about how threads manage transactions and write into the datastore:

  • Same Thread to Commit: Any thread can start a transaction, but only that same thread is allowed to commit or roll back the transaction concerned.
  • Any Thread to Push Data: Once a transaction is started, any other thread can push data into the datastore (multithreaded loading is, of course, possible and required for high performance).

There are also rules about resource reservation:

  • No Concurrent Transactions: It is not possible to start two transactions concurrently on the same store. Two transactions that may end up updating the same piece of data are always mutually exclusive (an update propagation following references is taken into account for this rule). The start of a new transaction always denotes the beginning of a new Epoch.
  • Disjoint Stores: The datastore supports concurrent transactions on disjoint stores. Disjoint stores are stores that do not reference each other (directly or following a path of references).

Managing Transaction and Query Thread Pools

The Atoti engine is heavily multithreaded to get as much performance as possible from multicore architectures.

Data loading, compression, aggregation, querying are each performed within their multithreaded pipelines. There are two distinct thread pools that manage threads:

  • the Transaction Thread Pool, and
  • the Query Thread Pool.

Atoti Server uses a special fork/join thread pool implementation that supports work stealing of tasks among worker threads, maximizing the usage of the threads in the pools, and allowing divide & conquer multithreaded algorithms. For more details on the implementation and internal working of the ForkJoinPool see its documentation.

The threads in the transaction thread pool are used to feed data into Atoti and in the Datastore, handling data classification, compression, aggregation, dictionarization, indexing, and so on. When the additions and removals of a transaction would trigger some updates on registered continuous queries in Atoti Server, those continuous queries are also processed within the transaction pool.

The threads in the query thread pool are used to execute all kinds of queries in Atoti (MDX queries, GetAggregates queries, Drillthrough queries) and in the Datastore. When multiple queries are sent concurrently to an Atoti application, they are processed in parallel on as many threads as possible. But the Atoti query engine also implements deep intra-query parallelism and large queries are executed faster in parallel on multiple threads.

By default, an Atoti Server deployment allocates exactly one transaction thread pool and one query thread pool per NUMA node, and the size of each pool is equal to the number of cores on each node. Thus, if your machine has 64 cores with no NUMA architecture, there is one transaction pool with 64 worker threads, and one query pool with also 64 worker threads.

Those two pools are shared among all Atoti Managers, Schemas, Cubes, Stores, and so on, that run in the JVM.

The size of those two pools can be configured with the pool size property (see Properties in Atoti Server).

If your machine has a NUMA architecture, more details can be found in the NUMA Optimization documentation.

Datastore Transactions in Distributed Architecture

The workflow to monitor the progress of datastore transaction propagation in distributed cluster is described here.