Monitoring and Event Handling
As part of the data loading/unloading phase, clients can monitor the status of the load, create monitoring dashboards, which will enable them to track events like the start/end times and exceptions/failures if any. This section is mainly aimed at developers looking to tap into the event handling mechanism that has been built into the DLC.
Basic events, like task summary, task completion, task failure, and event handlers for the supported data sources have been provided as part of the DLC. These event handlers write the corresponding events to the DLC cache.
DLC Cache
The data load controller uses an internal cache (Ehcache) to store events. The default configuration can be
found in the ehcache.xml
(located at data-connectors-core/src/main/resources/ehcache.xml
).
The cache configuration can be customized based on the nature
and size of the project’s data. This can be done by including your own ehcache.xml
in your project’s classpath.
You may override the heap setting and how long the events and event handlers will be kept in cache.
If you wish to implement a custom event handler, maybe to write an event to logs (in addition to cache), you would need a custom event handler. The following section explains how to implement a custom health event handler.
Custom health event handler
The first step is to create a custom health event handler by extending the IDataLoadSourceHealthEventHandler
interface and add our custom event handler methods.
/**
* @author ActiveViam
*/
public interface ITuplePublisherDlcHealthEventHandler implements IDataLoadSourceHealthEventHandler {
@Override
public void onPublish(TuplePublisherDlcPublish publish);
@Override
public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded);
}
Next we will implement our newly created interface ITuplePublisherDlcHealthEventHandler
. This implementation class needs to be annotated with the QuartetExtendedPluginValue
,
so that this can be picked up in ActivePivot’s Registry
on startup.
/**
* @author ActiveViam
*/
@QuartetExtendedPluginValue(intf = IDataLoadSourceHealthEventHandler.class, key = TuplePublisherDlcHealthEventHandler.PLUGIN_KEY)
public class TuplePublisherDlcHealthEventHandler implements ITuplePublisherDlcHealthEventHandler {
private static final long serialVersionUID = -5806171621353218782L;
public static final String PLUGIN_KEY = "TuplePublisher";
@Override
public String getType() {
return PLUGIN_KEY;
}
@Override
public void onUnknownEvent(IHealthEvent e) {
// ignore
}
@Override
public void onPublish(TuplePublisherDlcPublish publish) {
CacheUtils.addEventToCache(publish, publish.getEventType());
}
@Override
public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded) {
CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());
}
}
Keep in mind to add your event to the cache.
DLC internally uses a cache to store events for every transaction. The events added to the cache would be retrieved and output as JSON response for a given Transaction ID or taskName.
CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());
Custom health event
The second step is to create a custom health event by extending the ADlcHealthEvent
class
and implementing your custom interface. An appropriate DlcEventType
needs to be associated
with an event, to indicate the type of event. You can define your custom DlcEventType
as the argument takes String
value.
Every event is given a taskName
through the DLC’s custom implementation of IHealthEventDispatcher
: IDlcHealthEventDispatcher
. The taskName
is contained within the event’s event properties mapping. There is no need to customize the event’s taskName
.
/**
* @author ActiveViam
*/
public class TuplePublisherDlcPublish extends ADlcHealthEvent implements ITuplePublisherDlcHealthEvent {
public static final Set<String> TAGS = new HashSet<>(Arrays.asList("publish", "source"));
public static final String TYPE = "PUBLISH_TUPLES";
public final String tuplePublisher;
public final String messageIdentifier;
public final int tupleCount;
public TuplePublisherDlcPublish(String tuplePublisher, Object messageIdentifier, int tupleCount) {
this.tuplePublisher = tuplePublisher;
this.messageIdentifier = messageIdToString(messageIdentifier);
this.tupleCount = tupleCount;
}
@Override
public Set<String> getTags() {
return TAGS;
}
@Override
public Level getLevel() {
return Level.INFO;
}
@Override
public void accept(ITuplePublisherDlcHealthEventHandler handler) {
handler.onPublish(this);
}
@Override
public String getEventType() {
return TYPE;
}
@Override
public Throwable getCause() {
return null;
}
public String messageIdToString(Object messageIdentifier) {
if (messageIdentifier instanceof IFileInfo) {
return ((IFileInfo<?>)messageIdentifier).getFullName();
} else {
return messageIdentifier.toString();
}
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
TuplePublisherDlcPublish other = (TuplePublisherDlcPublish) obj;
return tupleCount == other.tupleCount &&
taskName.equals(other.taskName) &&
tuplePublisher.equals(other.tuplePublisher) &&
messageIdentifier.equals(other.messageIdentifier);
}
@Override
public int hashCode() {
return Objects.hash(taskName, tuplePublisher, messageIdentifier, tupleCount);
}
}
The custom health events are serialized in order to save to the cache. Therefore,
please override their equals
and hashCode
implementation.