Streaming Overview
ActivePivot offers a Streaming API to enable clients to submit queries only once and then receive continuous updates.
ActiveUI communicates with an ActivePivot Server using the Websocket API built on the Streaming API, but the API is open and can be leveraged by all sorts of custom client, for instance a remote C# or Java client, or an alert agent running on another server.
The Streaming API complements the other standard APIs supported by ActivePivot.
Typically, although the MDX query language over the XMLA protocol allows a client to send adhoc queries to
ActivePivot and obtain a resulting cell set, the XMLA protocol does not support continuous queries or real-time cell updates.
API Design
Two logical connections are required to establish streaming between a client and an ActivePivot server:
- Streaming control connection: a server-side service that makes it possible to create, pause and stop streams.
- Streaming data connection: the connection over which the server sends continuous events to the client for the streams it is interested in.
Stream Control
To create a stream, the client sends a query to the server (over the streaming control connection). Out of the box, ActivePivot supports continuous MDX queries and continuous Drillthrough queries.
To create a stream, the client also provides some stream properties:
- Target stream ID
- Publication domain (stream events resulting from the submitted query will be published within this domain)
- Stream initial state (STARTED, PAUSED or INITIALIZED)
- Stream "push data" flag. When set to false the stream will not push data events. It will, however, push notifications that query results have changed so that basic clients can decide when to refresh themselves by resubmitting the query (this is useful when, for example, the updates are big enough to cause network contention, and/or the update rate is so quick that a person would not have time to read the update anyway before a new update appears).
- The "strict mode" flag. Specifies whether the associated query should error out if it cannot be resolved to locations in the cube (strict mode = true) or should still be registered and wait for the locations to be created later (strict mode = false). The default mode is "true", and can be modified by setStrictMode() on the implementation class.
It is the responsibility of the client to provide the ID of the stream when it is created. This is because it is possible that stream events start flowing immediately after the subscription of the query, published asynchronously over the data channel. If the client were to receive a stream ID after the stream is created, it is very possible that it would receive stream events for a stream ID it does not yet know about. To ensure stream IDs are unique in the context of multiple clients, the ActivePivot server exposes an ID Generator service.
The service interface of the streaming control is com.quartetfs.tech.streaming.IStreamingService
.
Apart from creating a stream, you can pause, resume, or stop an existing stream.
You can also update the underlying stream query in real-time, without pausing the stream.
To resynchronize a client over an existing stream, you can ask for the "current view" of the stream.
Out of the box, an ActivePivot server supports the following options:
- Web Service
- Spring HTTP Invoker
- All the streaming functionalities are provided by the Websocket API (which is used by ActiveUI)
The choices are not exclusive; you can expose the streaming service as a Web Service for .NET interoperability, as a Spring HTTP Invoker for high performance communications from a Java client, and use the WebSocket through ActiveUI.
Data Stream
The technology used to transport the data stream is more challenging because the communication goes from the server to the client. Yet, the design allows the data stream to be independently configured from stream control. Options include:
- Stream Listener pattern (for Java code running within the ActivePivot JVM only)
- Messaging technology (JMS, MQSeries, Rendez-Vous...)
- Long Polling Web Service
- Websockets allow for communication going both ways: the same websocket is used for stream control and data stream
Aside from the Websocket API, the Long Polling Web Service is implemented in ActivePivot and provides the interoperable streaming technology for .NET clients.
Stream Events
Whatever the submitted query, an ActivePivot client will receive continuous real-time updates wrapped in stream events. There are multiple stream event implementations depending on what the event means and what data it contains. But all stream events contain their parent stream Id, and a timestamp that uniquely identifies them.
Examples of stream events:
- MDX Query
- ICellEvent: indicates a collection of cells that changed
- ICellSetEvent: notifies that the entire cell set has changed (including the axis)
- Drillthrough Query
- IDrillthroughEvent: wraps the result of a drillthrough
- IDrillthroughUpdateEvent: indicates the changes within an existing drillthrough result set
- IFailureEvent: reports an error in the stream processing
Stream Listener
If the streaming client runs within the same JVM, creating the data stream is as easy as registering a stream listener object to the ActivePivot Registration Service.
/**
* Adds a listener for a given domain.
*
* @param domain the domain to add the listener to
* @param streamListener the listener to add to the domain
* @throws RegistrationServiceException if adding a listener failed.
*/
public void addListener(String domain, IStreamListener streamListener) throws RegistrationServiceException;
One stream listener can register multiple times for multiple domains. The callback method that must be implemented by stream listeners passes a collection of stream events and their publication domain so the listener knows what the domain is.
Long Polling
Of course, most of the time, the streaming client is remote and cannot register itself as a Java object. ActivePivot comes with a long polling service that fills the gap.
Principle of long polling:
- The client submits a listen request to the server (with a unique listener Id)
- The server does not respond to the client immediately, but waits until it has an update to send
- If there is no update during the timeout period, the server eventually responds with an empty response
- If there is an update during the timeout period, the server sends it
- In any case the client immediately resubmits a listen request to the server. This emulates a continuous stream of data from the server to the client, although all operations are initiated by the client.
The long polling service interface is com.quartetfs.tech.streaming.ILongPollingService
.
Long polling introduces the new concept of "listener Id".
As with stream Ids, the client is responsible for providing the unique listener Id.
One listener can register its Id for multiple domains.
The received events are multiplexed within IBulkedStreamEvents
instances.
A bulked stream event contains a collection of domain events.
And a domain event is tagged with a domain and contains a collection of stream events.
This design makes it possible to multiplex events from various streams and domains on one physical connection, reducing resource consumption. For instance, a rich client with multiple continuous queries open at the same time can efficiently communicate with an ActivePivot server over a single physical connection.
Like the streaming service, the long polling service can be exported with the following technologies:
- Web Service
- Spring HTTP Invoker
A rich and real-time .NET client can be developed with the long polling technology. What is left on the client side is to write efficient code to register listener(s) with the long polling service, and demultiplex the received events.
Note that our dedicated user interface, ActiveUI, uses the Websocket API.
Long Polling and Stream Garbage Collection
It is very common for long polling clients to abandon their activity without properly unregistering themselves as a listener. To avoid leaking resources, the long polling service has a special timeout. When the server returns an event to the client (as the result of calling listen) it expects the client to immediately call listen again as this is the principle of a long polling connection. If the client does not call listen again before the timeout expires, the listener is considered to have left. Then once there are no more listeners attached to a stream, the stream itself can be garbage collected by the streaming service.
There are two common issues associated with the long polling timeout:
- Custom long polling clients are sometimes written without a proper understanding of how long polling works, and perform some synchronous work when they receive events following a call to listen. They wait for the workload to be processed before calling listen again. When the client-side workload is significant, this can trigger the timeout.
- Server-side ActivePivot solutions that struggle with memory resources and undergo large stop the world garbage collection events. If the JVM hangs for a long time, this can trigger the long polling timeout (and, of course, all other sorts of timeouts, such as query timeouts).
By default in ActivePivot 5.x.x the long polling client timeout is set to 10 seconds.
You can change this by setting the timeoutMs
property on the long polling service in your project.
Streams, Domains and Listeners
A quick reference to clarify the interaction of streams, domains and listeners:
- A stream is identified by a stream Id and is associated with one query
- A stream belongs to a single domain, the events generated by the stream are published in that domain
- A domain can contain multiple streams. All the events generated by all the streams are published in the parent domain
- A listener is identified by a listener Id and listens to the events published by one or several domains
- One domain can be listened to by multiple listeners
Consuming Streaming API as a Web Service
The Streaming API is used to create streams, while the LongPolling service is used to get the result of a query, and any subsequent updates. The long polling mechanism emulates the behavior of a push service.
The services are exposed mainly via two different protocols:
Protocol | Serialization | Data | Comment |
---|---|---|---|
Spring's HTTP invoker (HTTP Remoting) | Java | binary | Only use with Java clients |
Apache CXF's JAX-WS | JAXB | xml | SOAP Web Service to be interoperable with any technology |
To consume such services, you can draw your inspiration from the sandbox project: see the WebServiceClient
class.
Understanding Timeouts
During a connection between a client and a server, there are three important timeouts that you might want to consider:
- client ReceiveTimeout: defined by the HTTP layer from the client side and specifies the amount of time that the client waits for a response before it times out.
- server MaxWait: a parameter from the Streaming Service, it defines the amount of time the server waits for incoming data before returning a "null" object to the client.
- server Timeout: a parameter from the Streaming Service, it defines the amount of time the server waits between two subsequent listens. After that, the server disposes of the underlying client.
The client's timeout can be configured in the HTTP layer. Here is a code example:
HTTPClientPolicy policy = new HTTPClientPolicy();
policy.setConnectionTimeout(5000);
policy.setReceiveTimeout(30000);
The server's timeouts can be configured in the StreamingServices.xml file:
<!-- LongPolling Service -->
<bean id="longPollingService" class="com.quartetfs.tech.streaming.impl.LongPollingService" destroy-method="stop">
<property name="streamPublisher" ref="streamPublisher" />
<property name="maxWaitMs" value="10000" />
<property name="timeoutMs" value="10000" />
<property name="timeoutPoolSize" value="2" />
</bean>
The schema below illustrates where the timeouts occur in the streaming process:
It is important for the client to delegate the data processing to another thread, in order to release the listening thread, and let it listen to the server before the server "timeout" period expires.