Custom Topic Ordering
A common customization to the LOAD
operation is to enforce the order in which certain topics are processed.
We can achieve this by creating a List of Sets of topics from the incoming request and then process them in the desired order.
This can be done by overriding the existing LOAD
operation.
Configuration
@Configuration
public class CustomOperationsConfig {
@Bean
public IDlcOperation loadOperation(DlcLoadOperationsService dlcLoadOperationsService) {
return new CustomLoadOperation(dlcLoadOperationsService);
}
}
Alternatively you can annotate the CustomLoadOperation
class with @Component
and let Spring manage the bean
creation.
Custom Operation
class CustomLoadOperation extends DlcLoadOperation {
public CustomLoadOperation(DlcLoadOperationsService dlcLoadOperationsService, NamedEntityResolverService namedEntityResolverService) {
super(dlcLoadOperationsService, namedEntityResolverService);
}
@Override
public DlcLoadResponse process(DlcLoadRequest request) {
Set<String> topics = dlcLoadOperationsService.resolveAliases(request);
List<Set<String>> organizedTopics = organizeTopics(incomingTopics);
var responses = organizedTopics.stream()
.filter(Predicate.not(Set::isEmpty))
.map(topicsList -> dlcLoadOperationsService.load(topicsList, request.topicOverrides(), request.scope(), request.sourceName(), request.sourceType()))
.toList();
return merge(responses);
}
protected DlcLoadResponse merge(SequencedCollection<DlcLoadResponse> responses) {
Map<String, List<DlcMessageCollector.OnMessageStarted>> messageStartedMap = new HashMap<>();
Map<String, List<DlcMessageCollector.OnMessagePublished>> messagesPublishedMap = new HashMap<>();
Map<String, Map<String, ITopicParsingReport>> parsingReportMap = new HashMap<>();
Map<String, List<DlcMessageCollector.OnError>> errorsMap = new HashMap<>();
IDlcStatus status = null;
for (var response : responses) {
var report = response.getReport();
report.messageStarted().forEach((k, v) ->
messageStartedMap.merge(k, v,
(a, b) -> Stream.concat(a.stream(), b.stream()).toList()));
report.messagesPublished().forEach((k, v) ->
messagesPublishedMap.merge(k, v,
(a, b) -> Stream.concat(a.stream(), b.stream()).toList()));
report.parsingReport().forEach((k, v) ->
parsingReportMap.merge(k, v,
(a, b) -> Stream.concat(a.entrySet().stream(), b.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
report.errors().forEach((k, v) ->
errorsMap.merge(k, v, (a, b) -> Stream.concat(a.stream(), b.stream()).toList()));
if (status == null || status == DlcStatus.OK) {
status = response.getStatus();
}
}
return DlcLoadResponse.builder()
.report(DlcLoadResponse.LoadReport.builder()
.messageStarted(messageStartedMap)
.messagesPublished(messagesPublishedMap)
.parsingReport(parsingReportMap)
.errors(errorsMap)
.build())
.status(status)
.build();
}
List<Set<String>> organizeTopics(Set<String> topics) {
// Custom logic to order topics
}
}