Monitoring and Event Handling

As part of the data loading/unloading phase, clients can monitor the status of the load, create monitoring dashboards, which will enable them to track events like the start/end times and exceptions/failures if any. This section is mainly aimed at developers looking to tap into the event handling mechanism that has been built into the DLC.

Basic events, like task summary, task completion, task failure, and event handlers for the supported data sources have been provided as part of the DLC. These event handlers write the corresponding events to the DLC cache.

DLC Cache

The data load controller uses an internal cache (Ehcache) to store events. The default configuration can be found in the ehcache.xml (located at data-connectors-core/src/main/resources/ehcache.xml). The cache configuration can be customized based on the nature and size of the project’s data. This can be done by including your own ehcache.xml in your project’s classpath.

You may override the heap setting and how long the events and event handlers will be kept in cache.

If you wish to implement a custom event handler, maybe to write an event to logs (in addition to cache), you would need a custom event handler. The following section explains how to implement a custom health event handler.

Custom health event handler

The first step is to create a custom health event handler by extending the IDataLoadSourceHealthEventHandler interface and add our custom event handler methods.


    /**
     * @author ActiveViam
     */
    public interface ITuplePublisherDlcHealthEventHandler implements IDataLoadSourceHealthEventHandler {
     
        @Override
        public void onPublish(TuplePublisherDlcPublish publish);
     
        @Override
        public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded);
     
    }

Next we will implement our newly created interface ITuplePublisherDlcHealthEventHandler. This implementation class needs to be annotated with the QuartetExtendedPluginValue, so that this can be picked up in ActivePivot’s Registry on startup.

    /**
     * @author ActiveViam
     */
    @QuartetExtendedPluginValue(intf = IDataLoadSourceHealthEventHandler.class, key = TuplePublisherDlcHealthEventHandler.PLUGIN_KEY)
    public class TuplePublisherDlcHealthEventHandler implements ITuplePublisherDlcHealthEventHandler {
     
        private static final long serialVersionUID = -5806171621353218782L;
     
        public static final String PLUGIN_KEY = "TuplePublisher";
     
        @Override
        public String getType() {
            return PLUGIN_KEY;
        }
     
        @Override
        public void onUnknownEvent(IHealthEvent e) {
            // ignore
        }
     
        @Override
        public void onPublish(TuplePublisherDlcPublish publish) {
            CacheUtils.addEventToCache(publish, publish.getEventType());
        }
     
        @Override
        public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded) {
            CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());
        }
     
    }

Keep in mind to add your event to the cache.

DLC internally uses a cache to store events for every transaction. The events added to the cache would be retrieved and output as JSON response for a given Transaction ID or taskName.

    CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());

Custom health event

The second step is to create a custom health event by extending the ADlcHealthEvent class and implementing your custom interface. An appropriate DlcEventType needs to be associated with an event, to indicate the type of event. You can define your custom DlcEventType as the argument takes String value.

Every event is given a taskName through the DLC’s custom implementation of IHealthEventDispatcher: IDlcHealthEventDispatcher. The taskName is contained within the event’s event properties mapping. There is no need to customize the event’s taskName.

    /**
     * @author ActiveViam
     */
    public class TuplePublisherDlcPublish extends ADlcHealthEvent implements ITuplePublisherDlcHealthEvent {
     
        public static final Set<String> TAGS = new HashSet<>(Arrays.asList("publish", "source"));
     
        public static final String TYPE = "PUBLISH_TUPLES";
          
        public final String tuplePublisher;
        public final String messageIdentifier;
        public final int tupleCount;
     
        public TuplePublisherDlcPublish(String tuplePublisher, Object messageIdentifier, int tupleCount) {
            this.tuplePublisher = tuplePublisher;
            this.messageIdentifier = messageIdToString(messageIdentifier);
            this.tupleCount = tupleCount;
        }
     
     
        @Override
        public Set<String> getTags() {
            return TAGS;
        }
     
        @Override
        public Level getLevel() {
            return Level.INFO;
        }
     
        @Override
        public void accept(ITuplePublisherDlcHealthEventHandler handler) {
            handler.onPublish(this);
        }
     
        @Override
        public String getEventType() {
            return TYPE;
        }
     
        @Override
        public Throwable getCause() {
            return null;
        }
     
        public String messageIdToString(Object messageIdentifier) {
            if (messageIdentifier instanceof IFileInfo) {
                return ((IFileInfo<?>)messageIdentifier).getFullName();
            } else {
                return messageIdentifier.toString();
            }
        }
     
        @Override
        public boolean equals(Object obj) {
            if (this == obj) return true;
            if (obj == null || getClass() != obj.getClass()) return false;
            TuplePublisherDlcPublish other = (TuplePublisherDlcPublish) obj;
            return tupleCount == other.tupleCount &&
                    taskName.equals(other.taskName) &&
                    tuplePublisher.equals(other.tuplePublisher) &&
                    messageIdentifier.equals(other.messageIdentifier);
        }
     
        @Override
        public int hashCode() {
            return Objects.hash(taskName, tuplePublisher, messageIdentifier, tupleCount);
        }
 
    }

The custom health events are serialized in order to save to the cache. Therefore, please override their equals and hashCode implementation.

search.js