Implementing Support for Clustered Environments
- Understanding the Concept of Event Log
- Opening an Event Log
- Managing Event Log Snapshots
- Node Identity and Membership Events
- Feature Flag
Important
|
Experimental feature
The Collaboration Kit backend for clustering support is an experimental feature. This means that its behavior, API, as well as its look and feel might change.
You may enable it with a Feature Flag.
|
Using Collaboration Kit with its default settings in an application running in a clustered environment would result in users being able to collaborate only with others connected to the same application instance, that is, on the same node.
To run clustered application deployments, Collaboration Kit provides the Backend
superclass that can be extended to support such multi-instance environments.
This page shows how to implement a custom backend to support clustering based on the Hazelcast platform.
Understanding the Concept of Event Log
Collaboration Kit uses a custom Backend
implementation as the gateway for access to Event Logs.
An Event Log is a strictly ordered log of submitted events involving Topic data, such as newly added items or value changes.
The EventLog
API provides methods to submit new events to the log and to add a subscriber to receive all past and future events.
All events are marked by a unique identifier. With this, the API provides a method to remove all events in the log prior to a given identifier.
Implementing an Event Log for Hazelcast
Start by implementing the EvengLog
interface for the reference Hazelcast platform.
Hazelcast provides a straightforward streaming API based on shared maps and lists.
The Event Log can be implemented to make use of a Hazelcast IList
. However, first you need a class to store both the event identifier and payload.
private static final class IdAndPayload implements Serializable {
private final UUID id;
private final String payload;
private IdAndPayload(UUID id, String payload) {
this.id = id;
this.payload = payload;
}
}
Once you have that, you can start implementing the interface methods.
The submitEvent
uses the event identifier and payload, so you can store them in a new IdAndPayload
object. You may also add it to the Hazelcast list for this Event Log.
public static class HazelcastEventLog implements EventLog {
private final IList<IdAndPayload> list;
public HazelcastEventLog(IList<IdAndPayload> list) {
this.list = list;
}
@Override
public void submitEvent(UUID id, String payload) {
list.add(new IdAndPayload(id, payload));
}
}
To implement subscriptions to events, add an item listener to the Hazelcast list.
The subscriber will receive all past and future events for this Event Log.
A newly added subscriber should receive initially all previous events in the log based on their original order. This allows it to catch up to the latest state.
New events should be delivered, in order, only after all previous events have been delivered.
The subscribe
method may not be invoked again until the previous subscriber has been removed.
The subscribe
method optionally takes the identifier of the last known event, so that the subscriber is notified only about newer events.
If an identifier is provided, but not found in the Event Log, a EventIdNotFoundException
should be thrown.
Tip
|
Exception handling
When the code calling this method catches the exception, it may re-attempt the subscription with another identifier.
|
@Override
public synchronized Registration subscribe(UUID newerThan,
BiConsumer<UUID, String> eventSubscriber)
throws EventIdNotFoundException {
if (this.eventSubscriber != null) {
throw new IllegalStateException(); 1
}
if (newerThan != null) {
Optional<IdAndPayload> newerThanIdAndEvent = list.stream()
.filter(item -> newerThan.equals(item.id)).findFirst();
if (newerThanIdAndEvent.isEmpty()) {
throw new EventIdNotFoundException(
"newerThan doesn't " + "exist in the log."); 2
}
}
this.newerThan = newerThan;
this.eventSubscriber = eventSubscriber;
nextEventIndex = 0;
UUID registrationId = list
.addItemListener(new ItemListener<IdAndPayload>() {
@Override
public void itemAdded(ItemEvent<IdAndPayload> item) {
deliverEvents();
}
@Override
public void itemRemoved(ItemEvent<IdAndPayload> item) {
handleRemoveItem();
}
}, false); 3
// Deliver initial events
deliverEvents(); 4
return () -> {
synchronized (this) {
list.removeItemListener(registrationId);
this.eventSubscriber = null;
}
}; 5
}
-
Only a single subscriber is allowed, so an exception is thrown if one is already set.
-
If an event identifier is provided, the list is checked to see if it exists in it. If it doesn’t, an
EventIdNotFoundException
is thrown. -
An item listener is added to the Hazelcast list to handle new items and ones that have been removed.
-
Then all past events are initially delivered.
-
Finally,
Registration
is returned that can be used to remove the subscriber.
The deliverEvents
method is synchronized
to prevent it from being invoked by multiple threads simultaneously — and to avoid duplicate events being notified to the subscriber.
The method keeps track of the Hazelcast list index to identify the next event and it increments that index until all of the events are delivered.
If an event identifier has been set as the starting point, no events are delivered until that identifier is reached.
private synchronized void deliverEvents() {
while (nextEventIndex < list.size()) {
IdAndPayload event = list.get(nextEventIndex++);
if (this.newerThan == null) {
eventSubscriber.accept(event.id, event.payload);
} else {
if (event.id.equals(newerThan)) {
this.newerThan = null;
}
}
}
}
Finally, the last method to implement for the EventLog
interface is the truncate
method.
This method is used to limit the number of events contained in the log, preventing it from growing infinitely.
It takes the identifier of the oldest known event that should be preserved, or, if a null
identifier is provided, it empties the entire log.
To implement this behavior for Hazelcast, create a simple Predicate
and pass it to the list’s removeIf
method.
@Override
public synchronized void truncate(UUID olderThan) {
Predicate<IdAndPayload> filter = e -> true;
if (olderThan != null) {
Optional<IdAndPayload> olderThanEvent = list.stream()
.filter(item -> olderThan.equals(item.id)).findFirst();
if (olderThanEvent.isEmpty()) {
// NOOP
return;
}
filter = new Predicate<>() {
boolean found;
@Override
public boolean test(IdAndPayload event) {
found = found || olderThan.equals(event.id);
return !found;
}
};
}
list.removeIf(filter);
}
Opening an Event Log
Now that you have a Hazelcast implementation of the EventLog
interface, to be able to create and get instances of it, you need to extend the Backend
class.
Since this implementation depends only on a single Hazelcast IList
, it’s very easy to implement the openEventLog
method. You would do so by returning a new instance of the HazelcastEventLog
with the list named after the logId
parameter.
@Override
public EventLog openEventLog(String logId) {
return new HazelcastEventLog(hz.getList(logId));
}
Managing Event Log Snapshots
A snapshot is an opaque representation of data at a certain moment in time. It can be used, for example, by nodes joining a cluster to catch-up quickly with a recent state of data, without the need to replay all of the events from the beginning. Snapshots are identified by name. Each version of a named snapshot is assigned an additional unique identifier.
Loading a Snapshot
To load the latest version of a snapshot, the Backend
class provides the loadLatestSnapshot
method.
This method can be implemented for Hazelcast by using a map to store the latest available snapshot.
@Override
public CompletableFuture<Snapshot> loadLatestSnapshot(String name) {
return CompletableFuture.completedFuture(snapshots.get(name));
}
Submitting a New Snapshot
To submit a new snapshot of data, the replaceSnapshot
method should be used.
It takes the name of the snapshot, the expected unique identifier of the latest snapshot, the unique identifier of the new snapshot and the payload of the snapshot itself.
To implement this method for Hazelcast, you need some logic to verify that the latest snapshot is the expected one.
@Override
public CompletableFuture<Void> replaceSnapshot(String name, UUID expectedId,
UUID newId, String payload) {
Snapshot currentSnapshot = snapshots.computeIfAbsent(name,
k -> new Snapshot(null, null));
if (Objects.equals(expectedId, currentSnapshot.getId())) {
Snapshot idAndPayload = new Snapshot(newId, payload);
snapshots.put(name, idAndPayload);
}
return CompletableFuture.completedFuture(null);
}
Node Identity and Membership Events
The primary purpose of the Backend API is to support collaboration in applications deployed in clustered environments.
Every Backend
instance represents a member of the cluster and is uniquely identified by a UUID
, which should be returned by the getNodeId
method.
For your Hazelcast implementation, return the local member identifier.
@Override
public UUID getNodeId() {
return hz.getCluster().getLocalMember().getUuid();
}
When many Backend
instances are involved, it’s necessary to know when they join or leave the cluster.
For this purpose, the Backend
should provide an implementation of the addMembershipListener
method that takes a MembershipListener
and notifies it when cluster members join or leave.
Since Hazelcast itself uses the same concept, the implementation is straightforward: you only need to map Hazelcast’s events to Collaboration Kit’s MembershipEvent
events. This takes the MembershipEventType
(JOIN
or LEAVE
) and the identifier of the member.
@Override
public Registration addMembershipListener(
MembershipListener membershipListener) {
UUID registrationId = hz.getCluster()
.addMembershipListener(new InitialMembershipListener() {
@Override
public void init(InitialMembershipEvent event) {
event.getMembers()
.forEach(member -> submitEvent(
MembershipEventType.JOIN,
member.getUuid()));
}
@Override
public void memberAdded(MembershipEvent membershipEvent) {
submitEvent(MembershipEventType.JOIN,
membershipEvent.getMember().getUuid());
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
submitEvent(MembershipEventType.LEAVE,
membershipEvent.getMember().getUuid());
}
private void submitEvent(MembershipEventType type,
UUID id) {
membershipListener.handleMembershipEvent(
new com.vaadin.collaborationengine.MembershipEvent(
type, id, getCollaborationEngine()));
}
});
return () -> hz.getCluster().removeMembershipListener(registrationId);
}
Note
|
Returning a registration object
The addMembershipListener should return a Registration object that can be used later to remove the listener.
|
Feature Flag
To use the Collaboration Kit backend for clustering support, you must enable it with the collaborationEngineBackend
feature flag.
See Feature Flags for more information.
AB472607-53E3-481D-AF99-93E3F6ED8B61