Custom Topic Ordering

A common customization to the LOAD operation is to enforce the order in which certain topics are processed.

We can achieve this by creating a List of Sets of topics from the incoming request and then process them in the desired order. This can be done by overriding the existing LOAD operation.

Configuration

@Configuration
public class CustomOperationsConfig {

	@Bean
	public IDlcOperation loadOperation(DlcLoadOperationsService dlcLoadOperationsService) {
		return new CustomLoadOperation(dlcLoadOperationsService);
	}
}

Alternatively you can annotate the CustomLoadOperation class with @Component and let Spring manage the bean creation.

Custom Operation

class CustomLoadOperation extends DlcLoadOperation {

    public CustomLoadOperation(DlcLoadOperationsService dlcLoadOperationsService, NamedEntityResolverService namedEntityResolverService) {
        super(dlcLoadOperationsService, namedEntityResolverService);
    }

    @Override
    public DlcLoadResponse process(DlcLoadRequest request) {
        Set<String> topics = dlcLoadOperationsService.resolveAliases(request);

        List<Set<String>> organizedTopics = organizeTopics(incomingTopics);

        var responses = organizedTopics.stream()
                .filter(Predicate.not(Set::isEmpty))
                .map(topicsList -> dlcLoadOperationsService.load(topicsList, request.topicOverrides(), request.scope(), request.sourceName(), request.sourceType()))
                .toList();

        return merge(responses);
    }

    protected DlcLoadResponse merge(SequencedCollection<DlcLoadResponse> responses) {
        Map<String, List<DlcMessageCollector.OnMessageStarted>> messageStartedMap = new HashMap<>();
        Map<String, List<DlcMessageCollector.OnMessagePublished>> messagesPublishedMap = new HashMap<>();
        Map<String, Map<String, ITopicParsingReport>> parsingReportMap = new HashMap<>();
        Map<String, List<DlcMessageCollector.OnError>> errorsMap = new HashMap<>();
        IDlcStatus status = null;

        for (var response : responses) {
            var report = response.getReport();
            report.messageStarted().forEach((k, v) ->
                    messageStartedMap.merge(k, v,
                            (a, b) -> Stream.concat(a.stream(), b.stream()).toList()));
            report.messagesPublished().forEach((k, v) ->
                    messagesPublishedMap.merge(k, v,
                            (a, b) -> Stream.concat(a.stream(), b.stream()).toList()));
            report.parsingReport().forEach((k, v) ->
                    parsingReportMap.merge(k, v,
                            (a, b) -> Stream.concat(a.entrySet().stream(), b.entrySet().stream())
                                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
            report.errors().forEach((k, v) ->
                    errorsMap.merge(k, v, (a, b) -> Stream.concat(a.stream(), b.stream()).toList()));
            if (status == null || status == DlcStatus.OK) {
                status = response.getStatus();
            }
        }

        return DlcLoadResponse.builder()
                .report(DlcLoadResponse.LoadReport.builder()
                        .messageStarted(messageStartedMap)
                        .messagesPublished(messagesPublishedMap)
                        .parsingReport(parsingReportMap)
                        .errors(errorsMap)
                        .build())
                .status(status)
                .build();
    }

    List<Set<String>> organizeTopics(Set<String> topics) {
        // Custom logic to order topics
    }
}