Custom Monitoring / Event Handling
CSV Message Handlers
A CSV Channel must use a DlcMessageHandler
so the DLC can be notified in the event an error was throw during tuple publishing. A CSVMessageChannel
by default uses a LogMessageHandler
which will simply log the error and continue. Because of this the error will not be propagated upwards to the DLC.
This can be done by using a DlcCSVMessageChannelFactory
to create the channels without having to worry about adding the DlcMessageHandler
. IMessageChannel
’s created by the DlcCSVMessageChannelFactory
will only accept IMessageHandler
’s of type DlcMessageHandler
- this can help to ensure that the DLC will catch and report errors during CSV tuple publishing.
If you do however want to implement your own IMessageHandler
s you can do so by extending DlcMessageHandler
and ensure that you call dispatchErrorMessage(Throwable)
in your onError
method. Below is an example of using a custom IMessageHandler
:
// A simple message handler to write all errors to a file
public class CustomLogToFileMessageHandler<I> extends DlcMessageHandler<I> {
@Override
public void onMessageStarted(I key){
// print message to file
FileUtil.write("Message Started");
}
@Override
public void onMessagePublished(I key, IPublicationInfo info){
// print message to file
FileUtil.write("Published Message: " + info.toString());
}
/**
* Ensure we call DlcMessageHandler's dispatchErrorMessage() so DLC is aware of any errors.
*/
void onError(I key, Throwable ex){
dispatchErrorMessage(ex);
FileUtil.write("Error encountered: " + ex);
}
}
This CustomLogToFileMessageHandler
can be added to the IMessageChannel
as seen below:
CSVMessageChannelFactory channelFactory = new CSVMessageChannelFactory(...);
final IStoreMessageChannel<IFileInfo<I>, ILineReader> channel = channelFactory.createChannel(...);
channel.withMessageHandler(new CustomLogToFileMessageHandler());
Custom Health Event Handler
The first step is to create a custom health event handler by extending the IDataLoadSourceHealthEventHandler
interface and add our custom event handler methods.
/**
* @author ActiveViam
*/
public interface ITuplePublisherDlcHealthEventHandler implements IDataLoadSourceHealthEventHandler {
@Override
public void onPublish(TuplePublisherDlcPublish publish);
@Override
public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded);
}
Next we will implement our newly created interface ITuplePublisherDlcHealthEventHandler
. This implementation class needs to be annotated with the QuartetExtendedPluginValue
,
so that this can be picked up in Atoti Server’s Registry
on startup.
/**
* @author ActiveViam
*/
@QuartetExtendedPluginValue(intf = IDataLoadSourceHealthEventHandler.class, key = TuplePublisherDlcHealthEventHandler.PLUGIN_KEY)
public class TuplePublisherDlcHealthEventHandler implements ITuplePublisherDlcHealthEventHandler {
private static final long serialVersionUID = -5806171621353218782L;
public static final String PLUGIN_KEY = "TuplePublisher";
@Override
public String getType() {
return PLUGIN_KEY;
}
@Override
public void onUnknownEvent(IHealthEvent e) {
// ignore
}
@Override
public void onPublish(TuplePublisherDlcPublish publish) {
CacheUtils.addEventToCache(publish, publish.getEventType());
}
@Override
public void onTuplesAdded(TuplePublisherDlcTuplesAdded tuplesAdded) {
CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());
}
}
note
Keep in mind to add your event to the cache.
DLC internally uses a cache to store events for every request. The events added to the cache would be retrieved and output as JSON response for a given Trace ID or taskName.
CacheUtils.addEventToCache(tuplesAdded, tuplesAdded.getEventType());
Custom health event
The second step is to create a custom health event by extending the ADlcHealthEvent
class
and implementing your custom interface. An appropriate DlcEventType
needs to be associated
with an event, to indicate the type of event. You can define your custom DlcEventType
as the argument takes String
value.
Every event is given a taskName
through the DLC’s custom implementation of IHealthEventDispatcher
: IDlcHealthEventDispatcher
. The taskName
is contained within the event’s event properties mapping. There is no need to customize the event’s taskName
.
/**
* @author ActiveViam
*/
public class TuplePublisherDlcPublish extends ADlcHealthEvent implements ITuplePublisherDlcHealthEvent {
public static final Set<String> TAGS = new HashSet<>(Arrays.asList("publish", "source"));
public static final String TYPE = "PUBLISH_TUPLES";
public final String tuplePublisher;
public final String messageIdentifier;
public final int tupleCount;
public TuplePublisherDlcPublish(String tuplePublisher, Object messageIdentifier, int tupleCount) {
this.tuplePublisher = tuplePublisher;
this.messageIdentifier = messageIdToString(messageIdentifier);
this.tupleCount = tupleCount;
}
@Override
public Set<String> getTags() {
return TAGS;
}
@Override
public Level getLevel() {
return Level.INFO;
}
@Override
public void accept(ITuplePublisherDlcHealthEventHandler handler) {
handler.onPublish(this);
}
@Override
public String getEventType() {
return TYPE;
}
@Override
public Throwable getCause() {
return null;
}
public String messageIdToString(Object messageIdentifier) {
if (messageIdentifier instanceof IFileInfo) {
return ((IFileInfo<?>)messageIdentifier).getFullName();
} else {
return messageIdentifier.toString();
}
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
TuplePublisherDlcPublish other = (TuplePublisherDlcPublish) obj;
return tupleCount == other.tupleCount &&
taskName.equals(other.taskName) &&
tuplePublisher.equals(other.tuplePublisher) &&
messageIdentifier.equals(other.messageIdentifier);
}
@Override
public int hashCode() {
return Objects.hash(taskName, tuplePublisher, messageIdentifier, tupleCount);
}
}
note
The custom health events are serialized in order to save to the cache. Therefore,
please override their equals
and hashCode
implementation.