Custom Monitoring / Event Handling

CSV Message Handlers

A CSV Channel must use a DlcMessageHandler so the DLC can be notified in the event an error was throw during tuple publishing. A CSVMessageChannel by default uses a LogMessageHandler which will simply log the error and continue. Because of this the error will not be propagated upwards to the DLC.

This can be done by using a DlcCSVMessageChannelFactory to create the channels without having to worry about adding the DlcMessageHandler. IMessageChannel’s created by the DlcCSVMessageChannelFactory will only accept IMessageHandler’s of type DlcMessageHandler - this can help to ensure that the DLC will catch and report errors during CSV tuple publishing.

If you do however want to implement your own IMessageHandlers you can do so by extending DlcMessageHandler and ensure that you call dispatchErrorMessage(Throwable) in your onError method. Below is an example of using a custom IMessageHandler:

// A simple message handler to write all errors to a file
public class CustomLogToFileMessageHandler<I> extends DlcMessageHandler<I> {
    
    @Override
    public void onMessageStarted(I key){
        // print message to file
        FileUtil.write("Message Started");
    }
    
    @Override
    public void onMessagePublished(I key, IPublicationInfo info){
        // print message to file
        FileUtil.write("Published Message: " + info.toString());
    }

    /**
     * Ensure we call DlcMessageHandler's dispatchErrorMessage() so DLC is aware of any errors.
     */
    void onError(I key, Throwable ex){
        dispatchErrorMessage(ex);
        
        FileUtil.write("Error encountered: " + ex);
    }
}

This CustomLogToFileMessageHandler can be added to the IMessageChannel as seen below:

CSVMessageChannelFactory channelFactory = new CSVMessageChannelFactory(...);
final IStoreMessageChannel<IFileInfo<I>, ILineReader> channel = channelFactory.createChannel(...);
channel.withMessageHandler(new CustomLogToFileMessageHandler());

Custom Health Event Handler

The first step is to create a custom health event handler by extending the IDataLoadSourceHealthEventHandler interface and add our custom event handler methods.


    /**
     * @author ActiveViam
     */
    public interface ITuplePublisherDlcHealthEventHandler implements IDataLoadSourceHealthEventHandler {

        @Override
        public void onPublish(TuplePublisherDlcPublish publish);

        @Override
        public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded);

    }

Next we will implement our newly created interface ITuplePublisherDlcHealthEventHandler. This implementation class needs to be annotated with the QuartetExtendedPluginValue, so that this can be picked up in Atoti Server’s Registry on startup.

    /**
     * @author ActiveViam
     */
    @QuartetExtendedPluginValue(intf = IDataLoadSourceHealthEventHandler.class, key = TuplePublisherDlcHealthEventHandler.PLUGIN_KEY)
    public class TuplePublisherDlcHealthEventHandler implements ITuplePublisherDlcHealthEventHandler {

        private static final long serialVersionUID = -5806171621353218782L;

        public static final String PLUGIN_KEY = "TuplePublisher";

        @Override
        public String getType() {
            return PLUGIN_KEY;
        }

        @Override
        public void onUnknownEvent(IHealthEvent e) {
            // ignore
        }

        @Override
        public void onPublish(TuplePublisherDlcPublish publish) {
            CacheUtils.addEventToCache(publish, publish.getEventType());
        }

        @Override
        public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded) {
            CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());
        }

    }

note

Keep in mind to add your event to the cache.

DLC internally uses a cache to store events for every request. The events added to the cache would be retrieved and output as JSON response for a given Trace ID or taskName.

    CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());

Custom health event

The second step is to create a custom health event by extending the ADlcHealthEvent class and implementing your custom interface. An appropriate DlcEventType needs to be associated with an event, to indicate the type of event. You can define your custom DlcEventType as the argument takes String value.

Every event is given a taskName through the DLC’s custom implementation of IHealthEventDispatcher: IDlcHealthEventDispatcher. The taskName is contained within the event’s event properties mapping. There is no need to customize the event’s taskName.

    /**
     * @author ActiveViam
     */
    public class TuplePublisherDlcPublish extends ADlcHealthEvent implements ITuplePublisherDlcHealthEvent {

        public static final Set<String> TAGS = new HashSet<>(Arrays.asList("publish", "source"));

        public static final String TYPE = "PUBLISH_TUPLES";

        public final String tuplePublisher;
        public final String messageIdentifier;
        public final int tupleCount;

        public TuplePublisherDlcPublish(String tuplePublisher, Object messageIdentifier, int tupleCount) {
            this.tuplePublisher = tuplePublisher;
            this.messageIdentifier = messageIdToString(messageIdentifier);
            this.tupleCount = tupleCount;
        }


        @Override
        public Set<String> getTags() {
            return TAGS;
        }

        @Override
        public Level getLevel() {
            return Level.INFO;
        }

        @Override
        public void accept(ITuplePublisherDlcHealthEventHandler handler) {
            handler.onPublish(this);
        }

        @Override
        public String getEventType() {
            return TYPE;
        }

        @Override
        public Throwable getCause() {
            return null;
        }

        public String messageIdToString(Object messageIdentifier) {
            if (messageIdentifier instanceof IFileInfo) {
                return ((IFileInfo<?>)messageIdentifier).getFullName();
            } else {
                return messageIdentifier.toString();
            }
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null || getClass() != obj.getClass()) return false;
            TuplePublisherDlcPublish other = (TuplePublisherDlcPublish) obj;
            return tupleCount == other.tupleCount &&
                    taskName.equals(other.taskName) &&
                    tuplePublisher.equals(other.tuplePublisher) &&
                    messageIdentifier.equals(other.messageIdentifier);
        }

        @Override
        public int hashCode() {
            return Objects.hash(taskName, tuplePublisher, messageIdentifier, tupleCount);
        }

    }

note

The custom health events are serialized in order to save to the cache. Therefore, please override their equals and hashCode implementation.