Custom Order for Loading Topics
Overview
For some cases, some topics of a transaction are required to be loaded before / after another topic. This can easily be done on the Client’s data-orchestrator by executing multiple transactions, with the order-sensitive topics in the different transactions, in order.
However, it is possible to customize the order of how topics are loaded within a single transaction, but this is not a full replacement for multiple load transactions see [Issues With Data Availability][data-availability] for more.
Example
Here we will walk through an example of creating a custom task ordering for data sourcing.
The following example come from the test class TopicOrderLoadTask
within the CSV module.
Creating Custom Class
First we will extend the LoadDataTxControllerTask
and implement the @QuartetExtendedPluginValue
as is shown in the snippet below. We will also extend the processForTopics()
method as this is where we define the ordering of the topics. Finally we call fetchForTopics()
to execute the loading operation on the provided topics. The fetchForTopics()
method will not return (blocking call) until it is finished loading all topics.
It is important to fetch for all topics, any topics not processed here will not be processed anywhere else.
@QuartetExtendedPluginValue(intf = IDataLoadController.IExecutionTask.class, key = LoadDataTxControllerTask.PLUGIN_KEY)
public class TopicOrderLoadTask extends LoadDataTxControllerTask {
// Default constructor to match parent.
public TopicOrderLoadTask(IDatastore datastore, ConcurrentMap<String, DataLoadController.TopicControllerDelegates> controllerDelegatesByTopic, List<String> topics, Map<String, Object> scope) {
super(datastore, controllerDelegatesByTopic, topics, scope);
}
@Override
protected void processForTopics(List<String> topics){
boolean hasTradesTopic = topics.remove(TOPIC__ORDERED_TOPIC_TRADES);
// Fetch for all other topics - This will block until topics have been loaded.
fetchForTopics(topics);
// Then fetch for trades topics.
if(hasTradesTopic){
fetchForTopics(Collections.singletonList(TOPIC__ORDERED_TOPIC_TRADES));
}
}
}
Add Package to Registry
It is important that we properly override the default LoadDataTxControllerTask
Extended Plugin in the Registry
by specifying to the Registry
that our class should take precedence over LoadDataTxControllerTask
. To do this we will statically add the package where our TopicOrderLoadTask
class is, and define it before other ActiveViam packages.
Here we add our package, say “Our.Example.Package” to the Registry
:
/* Make sure that this package "Our.Example.Package" has highest precedence, so that we can override the
LoadDataTxControllerTask extended plugin - so we set it first in the contribution provider*/
Registry.setContributionProvider(new ClasspathContributionProvider("Our.Example.Package", "com.activeviam", "com.quartetfs", "com.qfs"));
Issues With Data Availability
All topics will be loaded in a single transaction (regardless of order).
Each topic’s data will not be available to the most recent version of the datastore until all topics have been processed, and the datastore transaction has been submitted. Until then the topic’s data loaded is available for querying through the Query Runner of the Transaction Manager.
Below is an example of querying intermediate data that only exists in the current transaction, and not yet in the most recent datastore version. Taken from the test class of data-connectors-csv
OrderedTradesTuplePublisher#isProductsStoreEmpty()
:
// Query data that is in the current transaction, not in the most recent datastore version as the transaction
// has not been submitted yet.
ICursor transactionQueryCursor = datastore
// Run query through transaction manager as the transaction has not been submitted to Datastore yet.
.getTransactionManager()
.getQueryRunner()
.forStore(PRODUCT_STORE_NAME)
.withoutCondition()
.selecting(PRODUCT_ID)
.run();
boolean productStoreHasData = transactionQueryCursor.iterator().hasNext();