Listen on Fetch Topics
Overview
We can Listen on Fetch Topics directly by initiating a START_LISTENING operation on a Fetch Topic.
Background
Fetch Topics can be executed together in a single batch operation. When this happens, the LoadDataTxControllerTask has the ability to specify an order to process the Topics. This is useful when Topics are dependent on each other.
When Listening on Listen Topics, the Topics are processed in parallel and there is no set order in which they will be executed.
Because of this, when Listening on Fetch Topics the filesystem events will be collected in the FileListenerCollector
and then processed in a single batch operation. This way we can use the Fetch Topic’s ordering to process the Listen Topics.
Example
Here we can see an example of a custom LoadDataTxControllerTask
that loads Topics in a specific order, as well as performing some custom operations:
When we Listen on Listen Topics, we can see that the Topics are processed in parallel and there is no set order in which Topics will be executed:
When we Listen on Fetch Topics, the filesystem events will be collected in the FileListenerCollector
and then processed in a single batch operation. This way we can use the Fetch Topic’s ordering to process the Listen Topics:
Configuration
When constructing the DataLoadController
some additional properties can be provided to configure the dynamically created Listen Topics as well as the scheduling of the FileListenerCollector
tasks.
These proeprties can be provided in the DataLoadController
configuration:
IDataLoadController dataLoadController(IDatastore datastore) {
Properties properties = new Properties();
properties.put(PROP_LISTEN_ON_FETCH_FILE_LISTENER_POLLING_DELAY_MS, 500L);
properties.put(PROP_LISTEN_ON_FETCH_ADDED_TOPIC_DELAY_MS, 5000L);
properties.put(PROP_LISTEN_ON_FETCH_BATCH_MIN_DELAY_MS, 30000L);
properties.put(PROP_LISTEN_ON_FETCH_BATCH_MAX_DELAY_MS, 120000L);
return new DataLoadController(datastore, properties);
}
Properties
dlc.listen-on-fetch.file-listener-polling-delay-ms
Default Value: 500
The delay in milliseconds between polling the filesystem for new events.
dlc.listen-on-fetch.added-topic-delay-ms
Default Value: 5000
The delay in milliseconds between the time a new Topic is added and the time it is processed.
dlc.listen-on-fetch.batch-min-delay-ms
Default Value: 30000
The minimum delay in milliseconds between processing batches of Topics.
dlc.listen-on-fetch.batch-max-delay-ms
Default Value: 120000
The maximum delay in milliseconds between processing batches of Topics.
FileListenerCollector
Task Scheduling
The FileListenerCollector
will collect events from the individual Listen Topics. When a Listen Topic detects a file change, it will notify the FileListenerCollector
. The FileListenerCollector
will then wait a minimum of dlc.listen-on-fetch.batch-min-delay-ms
before processing a batch.
If another event is detected during this time, an additional dlc.listen-on-fetch.added-topic-delay-ms
will be added to the wait time. The FileListenerCollector
will wait a maximum of dlc.listen-on-fetch.batch-max-delay-ms
before processing a batch.
Example
Configuration
Here we have a FileListenerCollector
configured with the following timings:
# Pickup filesystem changes immediately:
file-listener-polling-delay-ms: 0
# Wait 20 seconds before processing a batch when a new Topic is added:
added-topic-delay-ms: 20000
# Wait a minimum of 30 seconds before processing a batch of Topics:
batch-min-delay-ms: 30000
# Wait a maximum of 60 seconds before processing a batch of Topics:
batch-max-delay-ms: 60000
We can visualize the timings configured for the FileListenerCollector
:
File Event 1
A new file is added in the file system. This will trigger the FileListenerCollector
to start collecting events.
Since the FilesystemEventTime
+ added-topic-delay-ms
< batch-min-delay-ms
, the FileListenerCollector
will wait until the batch-min-delay-ms
is reached before processing the batch:
File Event 2
A new file is added in the file system at T+20s.
Since the FilesystemEventTime
+ added-topic-delay-ms
> batch-min-delay-ms
, the FileListenerCollector
will wait until the currentTime + added-topic-delay-ms
(40s) is reached before processing the batch:
File Event 3
A new file is added in the file system at T+45s.
Since the FilesystemEventTime
+ added-topic-delay-ms
> batch-max-delay-ms
, the FileListenerCollector
will wait until the batch-max-delay-ms
(60s) is reached before processing the batch:
Any events that are detected after T+60s will be processed in the next batch.
Limitations
While Listening on Fetch Topics, the FileListenerCollector
may not process all Topics in a single batch operation, this could lead to race-cases when Topics
are dependent on each other.