Finally we are in a position to send messages to a group of connected clients.
As I have previously commented, the "normal" thing to do from here is to have the handler send out the incoming message to all the connected clients. That is certainly possible, and is left as an exercise for the reader.
But that is not the way I usually work. In general, I have clients connect, express an interest in something happening in the system and then receive updates whenever that happens, completely disconnected from incoming messages from other clients.
To most easily simulate that behavior in this example, I've opted to use a timer.
This is on the tag API_GATEWAY_UNSOLICITED.
Timers
A timer in AWS is allegedly a "Cloudwatch Event" but its actual provenance seems more murky than that. However, there is a CloudFormation type called AWS::Events::Rule, documented under EventBridge which can be used to create a timer which can then be connected to a Lambda.
This is what we are going to do first. We are going to create a new lambda, a timer and connect the two together using a "Lambda Permission" (basically an invocation rule, much like the one that enables API Gateway to call our existing lambda).
Why a new Lambda? Well, that's a good question and it comes back to the basic software engineering question of when do you think you are duplicating something and when do you think something is significantly different.
We could expand upon our technique from the last post and check the context to see the structure of the event and handle it appropriately. That may, in certain contexts, be the right choice. But in spite of overlap and connections - both lambdas will use websockets and Couchbase and reference the same list of subscribers in Couchbase, this feels different to me and so I am going to put it in a separate lambda. If forced to justify my decision, I would look at the amount of actual user code (as opposed to infrastructure code or libraries) that they shared. If they shared a great deal, I would humbly reverse my decision and merge the two into a single lambda.
This also requires changes to our create, update and package scripts to correctly package and deploy the new lambda.
Note that this lambda is going to fire every minute until you tear it down; that is 1440 times a day, or around 45,000 times a month. This is a waste, but well below the "free" limit of 1,000,000 lambda invocations a month.
This is what we are going to do first. We are going to create a new lambda, a timer and connect the two together using a "Lambda Permission" (basically an invocation rule, much like the one that enables API Gateway to call our existing lambda).
Why a new Lambda? Well, that's a good question and it comes back to the basic software engineering question of when do you think you are duplicating something and when do you think something is significantly different.
We could expand upon our technique from the last post and check the context to see the structure of the event and handle it appropriately. That may, in certain contexts, be the right choice. But in spite of overlap and connections - both lambdas will use websockets and Couchbase and reference the same list of subscribers in Couchbase, this feels different to me and so I am going to put it in a separate lambda. If forced to justify my decision, I would look at the amount of actual user code (as opposed to infrastructure code or libraries) that they shared. If they shared a great deal, I would humbly reverse my decision and merge the two into a single lambda.
This also requires changes to our create, update and package scripts to correctly package and deploy the new lambda.
Note that this lambda is going to fire every minute until you tear it down; that is 1440 times a day, or around 45,000 times a month. This is a waste, but well below the "free" limit of 1,000,000 lambda invocations a month.
Handling a Timer
In addition to creating a new lambda, we identified a new entry point in our configuration: blog.ignorance.timer.OnTimer. This is, in fact, the main motivation for the new lambda declaration: for our purposes I would be happy enough to share the code - and indeed exactly the same contents are put into both lambdas for now.
OnTimer, like its cousin TDAHandler, is basically there to provide a buffer between AWS and AWS-independent code. As with TDAHandler, it delegates the creation of internal state to a private TDACentralConfiguration object. Note that the semantics of static creation in Java gives rise to a prospect of there being two such objects created; the simplest fix is to make sure that only one of these two packages is placed in any given lambda; alternatives include sharing or abstracting the central configuration singleton. All such approaches are left as an exercise for the reader.
OnTimer needs to instantiate a handler class; again, there are many ways in which this can be done - I feel the most correct way would be to extract something from the incoming event, but for now I have just hardcoded the TimerHandler class being used in this example.
The handler code thus looks like this:
public TimerResponse handleRequest(TimerEvent ev, Context cx) {
cx.getLogger().log(new Date() + ": timer invoked by " + cx.getAwsRequestId());
TimerRequest userHandler = new TimerHandler();
if (userHandler instanceof DesiresLogger) {
ServerLogger logger = new AWSLambdaLogger(cx.getLogger());
((DesiresLogger)userHandler).provideLogger(logger);
}
if (userHandler instanceof WithCouchbase) {
central.applyCouchbase((WithCouchbase) userHandler);
}
userHandler.onTimer();
return new TimerResponse();
}
We can now do most of the work of sending an unsolicited message - everything except sending it, that is. We create the hardcoded TimerHandler, letting it implement the TimerRequest interface and the WithCouchbase interface. It can then read the members object, create a message to send and iterate over the members.
As it happens, that's just the thing we have in our hand - the element of the array is an abstract representation of the connection; in our case an AWS connectionId.
What we need though is an abstraction that isolates the hard work from the application logic. Let's call it WSConnections.
Of course, it's not quite enough to have this interface - we need an instance of it provided to our handler. Following our pattern of injection, we declare our handler to implement the ProvideWSConnections interface which receives a concrete instance of this interface for our use:
Back to the Table of Contents
public void onTimer() {
try {
JsonDocument doc = this.bucket.get("groups/everybody");
if (doc == null || doc.content() == null || !doc.content().containsKey("members"))
return;
JsonArray ja = doc.content().getArray("members");
String msg = "Time is now " + new Date().toString();
for (int i=0;i<ja.size();i++) {
// need to send the message
}
} catch (Throwable t) {
logger.log("couchbase error", t);
}
}
So how do we send the message? Well, that involves the same logic we provided in the implementation of WSResponder, just with a different value for connectionId - the id of the connection we want to send it to.As it happens, that's just the thing we have in our hand - the element of the array is an abstract representation of the connection; in our case an AWS connectionId.
What we need though is an abstraction that isolates the hard work from the application logic. Let's call it WSConnections.
public interface WSConnections {
boolean sendTo(String connection, String data);
}
Pretty simple really. It has one method that takes the connectionId and the data to send. It returns a boolean which indicates if the connection was valid (true if valid, false otherwise).Of course, it's not quite enough to have this interface - we need an instance of it provided to our handler. Following our pattern of injection, we declare our handler to implement the ProvideWSConnections interface which receives a concrete instance of this interface for our use:
public interface ProvideWSConnections {
void wsConnections(WSConnections connections);
}
We can then implement this in our handler class:public void wsConnections(WSConnections connections) {
this.connections = connections;
}
And then use it in our handler. Again, life becomes a little bit tricky because of the potential for failure and Couchbase not directly supporting a "remove" operation, so most of this code goes into handling that bookkeeping:
JsonArray copyTo = JsonArray.ja();
for (int i=0;i<ja.size();i++) {
if (connections.sendTo(ja.getString(i), msg))
copyTo.add(ja.getString(i));
}
doc.content().put("members", copyTo);
bucket.replace(doc);
Strictly speaking, replacing the document is only necessary if we have made any changes. This is probably a change worth making in the real world if only because it would avoid contention on the document - especially likely if multiple writers notice around the same time that the list of members has changed.
AWS Implementation of Sender
Finally, we are ready to present the AWS implementation of WSConnections in TDACentralConfiguration. Because this is basically the same code as the responder, we will start by refactoring that to extract a sendMessageTo method.private boolean sendMessageTo(ServerLogger logger, String endpoint,
String connId, String text, String hint) {
try {
AmazonApiGatewayManagementApi wsapi =
AmazonApiGatewayManagementApiClientBuilder
.standard()
.withEndpointConfiguration(
new EndpointConfiguration(endpoint, System.getenv("AWS_REGION"))
).build();
PostToConnectionRequest msg = new PostToConnectionRequest();
msg.setConnectionId(connId);
msg.setData(ByteBuffer.wrap(text.getBytes()));
wsapi.postToConnection(msg);
return true;
} catch (GoneException ex) {
return false;
} catch (Throwable t) {
logger.log(hint, t);
return true;
}
}
and the WSResponder caller now looks like:
public boolean send(String text) {
return
sendMessageTo(logger, domainName + "/" + stage, connId, text, "error responding");
}
We have all the information we need in the abstract connectionName string, formatted as endpoint:connectionId. We can extract this and pass it to our extracted function as follows:
public boolean sendTo(String connection, String data) {
String[] split = connection.split(":");
return
sendMessageTo(logger, split[0], split[1], data, "error sending to " + connection);
}
This is presented as a nested class within a function that is called to inject the connections object through ProvideWSConnections.Conclusion
And that is that. When clients connect, they are automatically bombarded - around once a minute - with a message reminding them of the time. When they disconnect (or are found to have been disconnected), the code cleans up its memory of the connection. While they remain connected, they receive the timer events as they crop up.Back to the Table of Contents