Listen on Fetch Topics

Overview

We can Listen on Fetch Topics directly by initiating a START_LISTENING operation on a Fetch Topic.

Background

Fetch Topics can be executed together in a single batch operation. When this happens, the LoadDataTxControllerTask has the ability to specify an order to process the Topics. This is useful when Topics are dependent on each other.

When Listening on Listen Topics, the Topics are processed in parallel and there is no set order in which they will be executed.

Because of this, when Listening on Fetch Topics the filesystem events will be collected in the FileListenerCollector and then processed in a single batch operation. This way we can use the Fetch Topic’s ordering to process the Listen Topics.

Example

Here we can see an example of a custom LoadDataTxControllerTask that loads Topics in a specific order, as well as performing some custom operations: 000_FetchTopicOrdering.drawio.png

When we Listen on Listen Topics, we can see that the Topics are processed in parallel and there is no set order in which Topics will be executed: 000_ListenTopicsParallel.drawio.png

When we Listen on Fetch Topics, the filesystem events will be collected in the FileListenerCollector and then processed in a single batch operation. This way we can use the Fetch Topic’s ordering to process the Listen Topics: 000_ListenEventCollector.drawio.png

Configuration

When constructing the DataLoadController some additional properties can be provided to configure the dynamically created Listen Topics as well as the scheduling of the FileListenerCollector tasks.

These proeprties can be provided in the DataLoadController configuration:

IDataLoadController dataLoadController(IDatastore datastore) {
    Properties properties = new Properties();
    properties.put(PROP_LISTEN_ON_FETCH_FILE_LISTENER_POLLING_DELAY_MS, 500L);
    properties.put(PROP_LISTEN_ON_FETCH_ADDED_TOPIC_DELAY_MS, 5000L);
    properties.put(PROP_LISTEN_ON_FETCH_BATCH_MIN_DELAY_MS, 30000L);
    properties.put(PROP_LISTEN_ON_FETCH_BATCH_MAX_DELAY_MS, 120000L);

    return new DataLoadController(datastore, properties);
}

Properties

dlc.listen-on-fetch.file-listener-polling-delay-ms

Default Value: 500

The delay in milliseconds between polling the filesystem for new events.

dlc.listen-on-fetch.added-topic-delay-ms

Default Value: 5000

The delay in milliseconds between the time a new Topic is added and the time it is processed.

dlc.listen-on-fetch.batch-min-delay-ms

Default Value: 30000

The minimum delay in milliseconds between processing batches of Topics.

dlc.listen-on-fetch.batch-max-delay-ms

Default Value: 120000

The maximum delay in milliseconds between processing batches of Topics.

FileListenerCollector Task Scheduling

The FileListenerCollector will collect events from the individual Listen Topics. When a Listen Topic detects a file change, it will notify the FileListenerCollector. The FileListenerCollector will then wait a minimum of dlc.listen-on-fetch.batch-min-delay-ms before processing a batch.

If another event is detected during this time, an additional dlc.listen-on-fetch.added-topic-delay-ms will be added to the wait time. The FileListenerCollector will wait a maximum of dlc.listen-on-fetch.batch-max-delay-ms before processing a batch.

Example

Configuration

Here we have a FileListenerCollector configured with the following timings:

# Pickup filesystem changes immediately:
file-listener-polling-delay-ms: 0

# Wait 20 seconds before processing a batch when a new Topic is added:
added-topic-delay-ms: 20000

# Wait a minimum of 30 seconds before processing a batch of Topics:
batch-min-delay-ms: 30000

# Wait a maximum of 60 seconds before processing a batch of Topics:
batch-max-delay-ms: 60000

We can visualize the timings configured for the FileListenerCollector: FileListenerCollector_Example_0.drawio.png

File Event 1

A new file is added in the file system. This will trigger the FileListenerCollector to start collecting events.

Since the FilesystemEventTime + added-topic-delay-ms < batch-min-delay-ms, the FileListenerCollector will wait until the batch-min-delay-ms is reached before processing the batch: FileListenerCollector_Example_1.drawio.png

File Event 2

A new file is added in the file system at T+20s.

Since the FilesystemEventTime + added-topic-delay-ms > batch-min-delay-ms, the FileListenerCollector will wait until the currentTime + added-topic-delay-ms (40s) is reached before processing the batch: FileListenerCollector_Example_2.drawio.png

File Event 3

A new file is added in the file system at T+45s.

Since the FilesystemEventTime + added-topic-delay-ms > batch-max-delay-ms, the FileListenerCollector will wait until the batch-max-delay-ms (60s) is reached before processing the batch: FileListenerCollector_Example_3.drawio.png

Any events that are detected after T+60s will be processed in the next batch.

Limitations

While Listening on Fetch Topics, the FileListenerCollector may not process all Topics in a single batch operation, this could lead to race-cases when Topics are dependent on each other.