Custom Order for Loading Topics

Overview

In some cases, it may be required for specific topics to be loaded before other topics. This can be done in one of two ways: -1. Execute a Load operation on the first topics, then execute a Load operation on the second topics. This can easily be done through the Client’s data-orchestrator by executing multiple operations, with the order-sensitive topics in different operations. -2. Implement a custom ordering of topics by extending and overriding :padDataTxControllerTask’s processForTopics() method. This will allow for a single DLC request with multiple order-sensitive topics without the need for multiple DLC requests. An example can be found below.

You can customize the order of how topics are loaded within a single request, but this is not a full replacement for multiple load requests see Issues With Data Availability below for more.

Example

Here we will walk through an example of creating a custom task ordering for data sourcing.

The following example come from the test class TopicOrderLoadTask within the CSV module.

Creating Custom Class

  1. First we will extend the  LoadDataTxControllerTask and implement the @QuartetExtendedPluginValue as is shown in the snippet below.
  2. We will also extend the processForTopics() method as this is where we define the ordering of the topics.
  3. We call fetchForTopics() to execute the loading operation on the provided topics.
    • The fetchForTopics() method will not return (as it is a blocking call) until it is finished loading all topics.

It is important to fetch for all topics, any topics not processed here will not be processed anywhere else.

@QuartetExtendedPluginValue(intf = IDataLoadController.IExecutionTask.class, key = LoadDataTxControllerTask.PLUGIN_KEY)
public class TopicOrderLoadTask extends LoadDataTxControllerTask {
    
    // Default constructor to match parent.
    public TopicOrderLoadTask(IDatastore datastore, ConcurrentMap<String, DataLoadController.TopicControllerDelegates> controllerDelegatesByTopic, List<String> topics, Map<String, Object> scope) {
        super(datastore, controllerDelegatesByTopic, topics, scope);
    }

    @Override
    protected void processForTopics(List<String> topics){
        
        boolean hasTradesTopic = topics.remove(TOPIC__ORDERED_TOPIC_TRADES);

        // Fetch for all other topics - This will block until topics have been loaded.
        fetchForTopics(topics);
        
        // Then fetch for trades topics.
        if(hasTradesTopic){
            fetchForTopics(Collections.singletonList(TOPIC__ORDERED_TOPIC_TRADES));
        }
        
    }
}

Add Package to Registry

It is important that we properly override the default LoadDataTxControllerTask Extended Plugin in the Registry by specifying to the Registry that our class should take precedence over LoadDataTxControllerTask. To do this we will statically add the package where our TopicOrderLoadTask class is, and define it before other ActiveViam packages.

Here we add our package, say “Our.Example.Package” to the Registry:

    /* Make sure that this package "Our.Example.Package" has highest precedence, so that we can override the 
	LoadDataTxControllerTask extended plugin - so we set it first in the contribution provider*/
    Registry.setContributionProvider(new ClasspathContributionProvider("Our.Example.Package", "com.activeviam", "com.quartetfs", "com.qfs"));

Issues With Data Availability

All topics will be loaded in a single request (regardless of order).

Each topic’s data will not be available to the most recent version of the datastore until all topics have been processed, and the datastore transaction has been submitted. Until then the topic’s data loaded is available for querying through the Query Runner of the Transaction Manager.

Below is an example of querying intermediate data that only exists in the current transaction, and not yet in the most recent datastore version. Taken from the test class of data-connectors-csv OrderedTradesTuplePublisher#isProductsStoreEmpty():

	// Query data that is in the current transaction, not in the most recent datastore version as the transaction 
	// has not been submitted yet.
	ICursor transactionQueryCursor = datastore
			// Run query through transaction manager as the transaction has not been submitted to Datastore yet.
			.getTransactionManager()
			.getQueryRunner()
			.forStore(PRODUCT_STORE_NAME)
			.withoutCondition()
			.selecting(PRODUCT_ID)
			.run();

	boolean productStoreHasData = transactionQueryCursor.iterator().hasNext();
search.js