Distributed Architecture Communication Flows
Distributed Messenger
The distributed messenger is the component used to communicate between all the data and query nodes within a cluster. It allows for the discovery of different members of the cluster, so that the different nodes can talk to each other. It also keeps an up-to-date list of cluster members, to prevent communication with disconnected members.
All nodes within the same cluster must use the same messenger type. ActivePivot relies on the following two types of messengers:
LocalMessenger
The LocalMessenger
is an extremely efficient, pointer-passing message framework that avoids serialization
and other expensive operations when all the members of the cluster reside in the same JVM.
NettyMessenger
Netty is a lightweight asynchronous event-driven network application framework for high performance protocol servers & clients.
The NettyMessenger
relies on this framework to achieve point-to-point communication between cluster members over TCP.
It is the default chosen messenger in ActivePivot.
The Netty messenger uses JGroups for member discovery and group membership.
JGroups
JGroups is a reliable multicast system that adds a "grouping" layer over a transport protocol. It provides the underlying group communication support for ActivePivot Distribution allowing members of a cluster to seamlessly discover each other using UDP multicast, or using TCP with additional setup.
A distributed node requiring group communication with remote node(s) will obtain a JGroups Channel to use for its communication with cluster members. The characteristics of a JGroups Channel are described by the set of protocols that compose it where each protocol handles a single aspect of the overall group communication task (Membership, Transport, Failure, Compression, etc). You can learn more about protocol configuration on the JGroups website: JGroups Advanced Concepts, JGroups Protocols List and JGroups Services.
JGroups configuration section can be found here: Jgroups Configuration.
In the following sections, referring to message exchange or message properties means a Netty messenger.
Message Types
There are two main categories of messages flowing in a distributed cluster: request messages and response messages.
Request Messages
Request messages are messages initiating a two-way communication between data nodes and query nodes. They can be broadcasted by either type of node depending on the purpose of the message. In that sense, request messages can be classified as follows:
Messages sent by query nodes to data nodes
Type | Description |
---|---|
RetrieveAggregatesMessage | Messages sent by the query node to forward GetAggregatesQuery queries to data nodes. |
DrillThroughHeadersMessage, InternalDrillthroughHeadersMessage and DrillthroughMessageWithHeaders | Messages sent by the query node to forward DrillthroughQuery queries to data nodes. A DrillthroughQuery consists of several sub-queries, resulting in several exchange of messages between the query node and the underlying data nodes. |
NotificationMessage | Messages sent to data nodes as a response to transaction / discovery messages in order to acknowledge the fact that a transaction / discovery request was successfully processed by the query node. |
Messages sent by data cubes to query cubes
Type | Description |
---|---|
HelloMessage | Simple "hello messages" sent from data nodes to a query node within the cluster to make sure they can communicate with each other. |
GoodbyeMessage | Simple "goodbye message" sent from a data node to a query node within the cluster to notify the latter that the former is leaving the cluster. |
IInitialDiscoveryMessage | Messages sent by data nodes to each query node in the cluster. They enclose a serialized version of their entire hierarchies: the measure hierarchy with aggregate measures, post processed measures, plus all other member hierarchies. Once received and de-serialized by the query cube, these hierarchies are merged into the query cube's own hierarchies, creating new ones if necessary. |
ITransactionCommittedMessage | Messages sent to notify the query node that a transaction happened on data nodes resulting in members update. The query node updates its hierarchies to reflect the members that have been added or removed as a result of the transaction. |
Response Messages
Response messages are messages sent as responses to request messages. They are in the form of BroadcastResult messages that can effectively wrap different types of response. These wrapped answers can be classified as follows:
Answers sent by query cubes to data cubes
Type | Description |
---|---|
HelloMessageAnswer | Answers sent by a query node to a data node as a response to HelloMessage messages. |
Null | BroadcastResult response messages sent by a query node to a data node as a response to IInitialDiscoveryMessage, ITransactionCommittedMessage and GoodbyeMessage messages. They have Null answers. |
Answers sent by data cubes to query cubes
Type | Description |
---|---|
ScopedAggregatesRetrievalResultTransporter | Answers sent by data nodes to a query node as a response to GetAggregatesQuery queries. They represent the result of a single GAQ (GetAggregatesQuery) requested through a RetrieveAggregatesMessage request. This type of message holds the list of locations and the retrieval results for the subsequent GAQ. |
DrillthroughHeader | Answers sent by data nodes to a query node as a response to a DrillthroughHeadersQuery query (DrillThroughHeadersMessage and InternalDrillthroughHeadersMessage requests). |
DrillthroughMessageWithHeadersAnswers | Answers sent by data nodes to a query node as a response to a DrillthroughQuery query (DrillThroughMessageWithHeaders request). |
Communication Flows and Associated Diagrams
Discovery Workflow
This page details how several cubes recognize that they should be working as a single application.
When starting a distributed ActivePivot, each cube starts communicating with all the other
future members of the distribution through its JGroups layer. For each clusterId
, the first node
to be able to communicate (the node that cannot find anyone to interact with) becomes the JGroups
coordinator.
The distributed ActivePivot can be seen as a complete graph, whose vertices are the distributed cubes, and the edges are open communication channels between two nodes.
A cube's messenger won't communicate with remote instances whose clusterIds (stated in each distributed cube's description) are different from its own.
JGroups also handles cluster security, as explained in Distributed Security.
The JGroups cluster coordinator maintains a View
object, including an incremental ID
and a list of all cluster members.
This is equivalent to cutting vertices of the initial graph. By the end of this phase, the initial graph becomes a disconnected graph, whose connected components each form a complete graph and represent a single cluster.
After the JGroups layer has initialized the communications between the nodes of a cluster, the data
nodes send a HelloMessage
to each query node of the cluster, making sure that communication is
possible with the query node.
Each cluster can be seen as a complete bipartite graph, one group being the data nodes, the other being the query nodes.
When a query cube starts, it is composed of only one hierarchy: the measure hierarchy made with contributors.COUNT and potentially other measures, like post-processed measures or calculated formulas.
When a data cube starts, it sends an InitialDiscoveryMessage
to each query cube already present in
the cluster.
The message contains a serialized version of the data cube's hierarchies: the measure hierarchy with
the aggregate measures and post-processed measures, as well as all the other hierarchies of the data
cube, with their members. Once received and deserialized by the query cube, the data cube's
hierarchies are merged into the query cube's own hierarchies, creating new ones if necessary.
Thus, the query cube topology represents the merger of all its data cubes' topologies.
Since a query cube will potentially receive InitialDiscoveryMessages
from dozens of Data Cubes,
there can be a long computing time involved in the processing of these messages, especially if
the data cubes' hierarchies contain many members.
You can monitor the amount of InitialDiscoveryMessages
accepted at any given time by a Query Node using the property -Dactiveviam.distribution.maxPendingDiscoveries=[integer]
.
Sequence Diagram
If for some reason a data cube is removed from the cluster, the associated query cubes don't remove the associated
hierarchy contributions immediately.
They will do so after a given period of time that can be set using the removalDelay
property. If
the data cube comes back online before this timer expires, and if no new transaction occurred in the
data cube since it has left the cluster, no initial discovery message needs to be sent.
Transaction Workflow
When a transaction occurs in a data cube, some members may appear or disappear. This information needs to be forwarded to each query cube in the cluster. This is done by sending a message containing the new or removed members to each query cube in the cluster. Once received, query cubes update the content of their own hierarchies accordingly.
Sequence Diagram
For the sake of simplicity, only one data cube is shown in the diagram.
In a data cube, transactions do not always result in added or removed members. By default, query cubes are still notified when an empty transaction occurs. This can be deactivated by setting
discardEmptyTransactions
totrue
.
Query Workflow
When an MDX query is started on a query cube, the MDX engine plans the GetAggregatesQueries
(GAQ)
needed to compute the result and executes them, as it is done on a regular ActivePivot.
A GAQ executed on a query cube is dispatched to the appropriate data cubes. This computation step
is done by the query cube's query plan, in the form of an RetrieveAggregatesMessage
.
The way a GAQ is dispatched highly depends on its attributes: the set of locations, the measures and the filters. Not all data cubes may be contacted by the query cube.
Once received by a data cube, the GAQ is computed and the result is sent back to the query cube. Once received, all intermediate results are merged/combined by the query cube before giving it to the MDX engine, which can then build the final result.
In order to reduce the number of messages sent between the data cube and the query cube, a
RetrieveAggregatesMessage
computes all the intermediate results that a single data cube must
compute.
Sequence Diagram
Cube Removal Workflow
To leave its cluster, a node must send a leave message to the JGroups coordinator.
For details, see the JGroups documentation on this subject.
If the coordinator tries to leave, a new one is seamlessly selected from the cluster members.
When a member is suspected to have crashed, the coordinator broadcasts a suspicion message to all other members of the cluster. If the coordinator crashes, a new one is seamlessly selected from the cluster members.
A member leaving or crashing induces a change in the View
object maintained by the coordinator.
When a data cube leaves the cluster, the query cube is informed by its messenger that the View
object has changed. This Data Node's unique contributions to the global topology are removed from
the query cube's hierarchies if they were not shared with another data cube.
An entire hierarchy can disappear if all of its members are removed.
Sometimes network issues or calculation saturation can cause a data node to be irresponsive for a
short time, triggering a suspicion message from the JGroups coordinator. Since removing that data
node's contributions is an expensive operation, the query cube will wait for a certain time before dropping these contributions, offering the
data node some time to join the cluster again.
When this happens, the data cube will send an EmptyInitialDiscoveryMessage
to notify the query cube it hasn't changed its topology.
You can specify the waiting time using the
removalDelay
property.
In order to distinguish unresponsive data nodes (for instance due to high workload or network issues) from data nodes that left the cluster
gracefully, each data node, when leaving the cluster, will send a GoodbyeMessage
notifying the query node about its intent to leave the cluster.
This will immediately trigger an asynchronous task responsible for removing all subsequent contributions of the former. On success, a GoodbyeMessageApplicationEvent
is generated. Otherwise,
on failure of the removal task, a GoodbyeMessageApplicationFailureEvent
is generated.
When a query cube leaves the cluster, the data cube remains unchanged.
If a query is running when a cube leaves the cluster:
- if the cube has sent a normal leave message, the data from that cube is simply ignored
- if the cube is non-responsive, the query cube waits for results until the query fails due to timeout, thus avoiding "surprise" partial results due to network flakiness.
Awaiting transaction processing
By default, when transactions are performed on a datastore that is connected to a data cube, a method committing the transaction sends a message to query nodes asynchronously and does not wait for the query nodes to actually process the transaction delta. This behavior may be illustrated using a following diagram:
As described above, a method used by the user to commit datastore transaction may return before the critical section is entered. Thus, the members removed during the transaction will still reside in the query node's hierarchies, and consequent transactions adding these members on another data node may result in the following exception:
Caused by: java.lang.IllegalArgumentException: The member 2023-10-23 in cube linuxuser_VarCube-master_DATA_321188_15764 was also found in cube linuxuser_VarCube-master_DATA_285033_12974. There could be cluster configuration issues or the cube's data may not have been discarded yet if it left the cluster recently (see IDistributedActivePivotInstanceDescription.REMOVAL_DELAY).
In order to ensure that all the nodes in the cluster have correctly processed the transaction, one may use a property
called IDataCLusterDefinition#AWAIT_NOTIFICATIONS_TIME
.
The default value for this property is 0 and any positive value will behave as a timeout.
The data cube will wait for the waitNotificationSeconds
till all query cubes respond about
successfully committing the remote transaction.
But if this timeout is reached, no exception will be thrown by committing method itself.
The only side effect would be a message written to a data node log:
INFO c.q.messenger.impl.DataInstanceDistributionManager - [linuxuser_VarCube-master_DATA_285033_12974] Not all the members have answered after `waitNotificationSeconds` seconds: Should have [linuxuser_RiskCube_QUERY_908738_7133] instead of []
This may be illustrated using the following diagram:
Committing method only returns after awaiting a NotificationMessage
for some time, but the only consequence of the timeout would be a message written into the data cube log.
If you want to set this property globally, you may add it directly to the cube description like this:
final long waitNotificationSeconds = 5;
final IActivePivotInstanceDescription dataCubeDescription =
cubeBuilder
.asDataCube()
.withClusterDefinition()
.withClusterId(CLUSTER_ID)
.withMessengerDefinition()
.withKey(NettyMessenger.PLUGIN_KEY)
.withProperty(
IDataClusterDefinition.AWAIT_NOTIFICATIONS_TIME,
String.valueOf(waitNotificationSeconds))
.end()
.end()
.withApplicationId(APPLICATION_ID)
.withAllHierarchies()
.withAllMeasures()
.end()
.build();
But if the frequency of data cube transactions is high, then this setup may lead to performance issues even if you don't
await for notifications explicitly.
If you only want to wait for some "critical" transactions to synchronize with the query node,
you may set this property "locally"
using IMultiVersionActivePivot#setTransactionProperties(Properties)
just before launching a transaction:
// Records per store ID
final Map<String, List<Object[]>> data = generateData();
final IDatastore datastore = app.getDatastore();
final IMultiVersionDataActivePivot dataCube =
(IMultiVersionDataActivePivot) manager.getActivePivot(DATA_CUBE_ID);
dataCube.setTransactionProperties(
NotificationMessage.putNotificationTime(waitNotificationSeconds));
try {
datastore.edit(tm -> data.forEach(tm::addAll) /* put any transaction code here */);
} finally {
dataCube.setTransactionProperties(NotificationMessage.putNotificationTime(0));
}
In order to get a TimeoutException
when this timeout is reached, one may use
or IMultiVersionActivePivot#awaitNotifications(long)
immediately after the committed transaction:
try {
dataCube.awaitNotifications(Duration.ofSeconds(waitNotificationSeconds));
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
The diagram will then look like this:
It is important to notice that the timeout provided in
millis
argument forIMultiVersionActivePivot#awaitNotifications(long)
should never exceed the timeout specified inNotificationMessage.putNotificationTime
. Otherwise, there is no guarantee that aTimeoutException
will be thrown on error.
If some query node leaves the cluster while the data node is waiting for the transaction to be processed,
the IMultiVersionActivePivot#awaitNotifications(long)
will wait for the cluster view to be updated, i.e.
it will send discovery messages to all query nodes.
The following message in the data node log:
FINE c.q.messenger.impl.DataInstanceDistributionManager - [linuxuser_VarCube-master_DATA_285033_12974]: On member changed with view Id = 151 newMembers = [] removedMembers = [linuxuser_RiskCube_QUERY_908738_7133]
May indicate that the transaction may be not properly awaited on other query nodes of the cluster. However, the query node that was disconnected will have the newest data when connected again to the cluster thanks to a discovery message sent from the data node.