As we near our goal of being able to send unsolicited messages to a group of connected websocket listeners, we need to be able to figure out who is in our group.
To be in the group, you obviously have to have connected and not yet have disconnected. You probably should also register an interest in the group, but we will leave that as an exercise for the reader (especially since it feels more application level).
Building on what we did in the last post, we are going to capture $connect and $disconnect events and store the current set of members in an array in an object in Couchbase.
This can all be found under the tag API_GATEWAY_CONNECTING.
Configuration
First and foremost, we need to update our configuration to add the $connect and $disconnect routes.
I'm not going to show all the JSON code for the CounterConnect and CounterDisconnect route definitions (which is basically the same as CounterRoute) but there are a couple of points I wanted to make:
- It is important to add these new objects to the DependsOn field of the CounterDeployment
- It appears to be necessary to tear down the entire gateway and recreate it
I'm not entirely sure on the latter point, but I tried everything I could think of and nothing worked, but tearing it down and starting again did work. It's possible that there is a way to achieve the same results from the UI, but as far as I could tell everything was being constructed correctly, it just didn't "take". Since I had had a similar problem in defining the $default route originally, I suspect that there is a problem updating the "built-in" routes on the API Gateway. It may be connected with the issues around updating deployments with RestApis (you need to change them in some way more than just their dependencies, for instance the logical name) but it seems more insidious than that.
Handling the Routes
Through my setup, I have intentionally chosen to direct the connection routes to the same lambda and same handler that we have already set up. This is very definitely a choice, but it is a choice driven by a desire to have minimal overlap of similar code (in the next section we will see the consequences of the other choice).
Before starting to handle these routes, I'm going to do a refactoring. Websocket events have a different "structure" to HTTP events, and I'm going to use this to break up the handleIt code in ProcessorRequest into two functions: handleHTTPMethod will handle "regular" HTTP requests and handleWSEvent will handle websocket events. I'm also going to add cases to cover not receiving a context (a 500) and the context not having any of the parameters I expect (a 400).
The rest of the changes we are going to make are isolated in handleWSEvent.
First, I have extracted the responder from the call to onText into its own variable, since we will be passing it to each of the three methods. Then I have looked at the context's routeKey variable and decided whether the route is $connect, $default or $disconnect and called open, onText or close as appropriate.
WSResponder responder = central.responderFor(logger,
(String) context.get("connectionId"),
(String)context.get("domainName"),
(String)context.get("stage"));
if ("$connect".equals(context.get("routeKey"))) {
wsproc.open(responder);
} else if ("$default".equals(context.get("routeKey"))) {
wsproc.onText(responder, body);
} else if ("$disconnect".equals(context.get("routeKey"))) {
wsproc.close(responder);
}
Handler Code
On the handler side, we have quite a bit of complex logic to do. I think in any serious application, this would be hidden in a separate data-management object, itself hidden behind an interface. But because we don't have that whole application, I am just going to put the Couchbase code directly in the handler.
First off, we acquire a handle to the Couchbase bucket. We do this by implementing the WithCouchbase interface we created in the last post and storing the handle passed into it. This is probably the best moment to make the point that, for professional purposes, an asynchronous handle to the bucket is to be preferred to using the synchronous Couchbase client, but that makes the code here significantly more cluttered; if you are using Couchbase professionally, you will know what to do. If you are not, then do not take this as an example of what to do.
The logic is quite simple but contains two cases: whether the object already exists or not. If the group does not exist, we create a new object with a members array containing the current connection id; if the group does exist, we add the current connection to the existing array and replace the object.
For simplicity, I have also chosen to ignore error handling, instead just logging the exception. Note that because of concurrency, it is entirely possible for two lambdas to come through here at the same time: if they do, the insert or replace could fail with a write conflict. In the real world this exception would need to be caught and retried.
public void open(WSResponder responder) {
try {
JsonDocument doc = bucket.get("groups/everybody");
JsonArray members;
if (doc == null) {
members = JsonArray.ja();
members.add(responder.connectionName());
bucket.insert(JsonDocument.create("groups/everybody",
JsonObject.jo().put("members", members)));
} else {
members = doc.content().getArray("members");
members.add(responder.connectionName());
bucket.replace(doc);
}
} catch (Throwable t) {
logger.log("couchbase error", t);
}
}
To handle the logging of the error, I added a new method to ServerLogger to log a message along with an exception.
More importantly, we need some storable representation of the connection name. We delegate this to the WSResponder interface, requiring it to return us a string value that we can store. While we have the connectionId as a string already, that is not quite enough: when we want to send a message to the connection, we also need to know the endpoint name (the domain and the stage) and the region. We can "always" (unless you are going to deploy this application across multiple regions) obtain the region from the environment, so we will just capture the endpoint; we separate it from the connectionId by a colon, since we believe this is not a valid character in either token.
Next: Finally Sending Unsolicited Messages
More importantly, we need some storable representation of the connection name. We delegate this to the WSResponder interface, requiring it to return us a string value that we can store. While we have the connectionId as a string already, that is not quite enough: when we want to send a message to the connection, we also need to know the endpoint name (the domain and the stage) and the region. We can "always" (unless you are going to deploy this application across multiple regions) obtain the region from the environment, so we will just capture the endpoint; we separate it from the connectionId by a colon, since we believe this is not a valid character in either token.
public String connectionName() {
return domainName + "/" + stage + ":" + connId;
}
The logic for close() is very similar, except that we don't need to consider the case that the object has not been created but we do have some complex juggling because Couchbase does not have a method for removing an object from an array.public void close(WSResponder responder) {
try {
JsonDocument doc = bucket.get("groups/everybody");
JsonArray ja = doc.content().getArray("members");
JsonArray copyTo = JsonArray.ja();
String conn = responder.connectionName();
for (int i=0;i<ja.size();i++) {
if (!conn.equals(ja.getString(i)))
copyTo.add(ja.getString(i));
}
doc.content().put("members", copyTo);
bucket.replace(doc);
} catch (Throwable t) {
logger.log("couchbase error", t);
}
}
Conclusion
We have configured our API Gateway to be able to handle the $connect and $disconnnect routes and, in doing so, keep track of a list of everybody currently connected in Couchbase.Next: Finally Sending Unsolicited Messages
No comments:
Post a Comment