Custom Order for Loading Topics

Overview

For some cases, some topics of a transaction are required to be loaded before / after another topic. This can easily be done on the Client’s data-orchestrator by executing multiple transactions, with the order-sensitive topics in the different transactions, in order.

However, it is possible to customize the order of how topics are loaded within a single transaction, but this is not a full replacement for multiple load transactions see [Issues With Data Availability][data-availability] for more.

Example

Here we will walk through an example of creating a custom task ordering for data sourcing.

The following example come from the test class TopicOrderLoadTask within the CSV module.

Creating Custom Class

First we will extend the  LoadDataTxControllerTask and implement the @QuartetExtendedPluginValue as is shown in the snippet below. We will also extend the processForTopics() method as this is where we define the ordering of the topics. Finally we call fetchForTopics() to execute the loading operation on the provided topics. The fetchForTopics() method will not return (blocking call) until it is finished loading all topics.

It is important to fetch for all topics, any topics not processed here will not be processed anywhere else.

@QuartetExtendedPluginValue(intf = IDataLoadController.IExecutionTask.class, key = LoadDataTxControllerTask.PLUGIN_KEY)
public class TopicOrderLoadTask extends LoadDataTxControllerTask {
    
    // Default constructor to match parent.
    public TopicOrderLoadTask(IDatastore datastore, ConcurrentMap<String, DataLoadController.TopicControllerDelegates> controllerDelegatesByTopic, List<String> topics, Map<String, Object> scope) {
        super(datastore, controllerDelegatesByTopic, topics, scope);
    }

    @Override
    protected void processForTopics(List<String> topics){
        
        boolean hasTradesTopic = topics.remove(TOPIC__ORDERED_TOPIC_TRADES);

        // Fetch for all other topics - This will block until topics have been loaded.
        fetchForTopics(topics);
        
        // Then fetch for trades topics.
        if(hasTradesTopic){
            fetchForTopics(Collections.singletonList(TOPIC__ORDERED_TOPIC_TRADES));
        }
        
    }
}

Add Package to Registry

It is important that we properly override the default LoadDataTxControllerTask Extended Plugin in the Registry by specifying to the Registry that our class should take precedence over LoadDataTxControllerTask. To do this we will statically add the package where our TopicOrderLoadTask class is, and define it before other ActiveViam packages.

Here we add our package, say “Our.Example.Package” to the Registry:

    /* Make sure that this package "Our.Example.Package" has highest precedence, so that we can override the 
	LoadDataTxControllerTask extended plugin - so we set it first in the contribution provider*/
    Registry.setContributionProvider(new ClasspathContributionProvider("Our.Example.Package", "com.activeviam", "com.quartetfs", "com.qfs"));

Issues With Data Availability

All topics will be loaded in a single transaction (regardless of order).

Each topic’s data will not be available to the most recent version of the datastore until all topics have been processed, and the datastore transaction has been submitted. Until then the topic’s data loaded is available for querying through the Query Runner of the Transaction Manager.

Below is an example of querying intermediate data that only exists in the current transaction, and not yet in the most recent datastore version. Taken from the test class of data-connectors-csv OrderedTradesTuplePublisher#isProductsStoreEmpty():

	// Query data that is in the current transaction, not in the most recent datastore version as the transaction 
	// has not been submitted yet.
	ICursor transactionQueryCursor = datastore
			// Run query through transaction manager as the transaction has not been submitted to Datastore yet.
			.getTransactionManager()
			.getQueryRunner()
			.forStore(PRODUCT_STORE_NAME)
			.withoutCondition()
			.selecting(PRODUCT_ID)
			.run();

	boolean productStoreHasData = transactionQueryCursor.iterator().hasNext();
search.js