Wednesday, December 18, 2019

Sending unsolicited messages

Go to the Table of Contents for the Java API Gateway

Finally we are in a position to send messages to a group of connected clients.

As I have previously commented, the "normal" thing to do from here is to have the handler send out the incoming message to all the connected clients.  That is certainly possible, and is left as an exercise for the reader.

But that is not the way I usually work.  In general, I have clients connect, express an interest in something happening in the system and then receive updates whenever that happens, completely disconnected from incoming messages from other clients.

To most easily simulate that behavior in this example, I've opted to use a timer.

This is on the tag API_GATEWAY_UNSOLICITED.

Timers

A timer in AWS is allegedly a "Cloudwatch Event" but its actual provenance seems more murky than that.  However, there is a CloudFormation type called AWS::Events::Rule, documented under EventBridge which can be used to create a timer which can then be connected to a Lambda.

This is what we are going to do first.  We are going to create a new lambda, a timer and connect the two together using a "Lambda Permission" (basically an invocation rule, much like the one that enables API Gateway to call our existing lambda).

Why a new Lambda?  Well, that's a good question and it comes back to the basic software engineering question of when do you think you are duplicating something and when do you think something is significantly different.

We could expand upon our technique from the last post and check the context to see the structure of the event and handle it appropriately.  That may, in certain contexts, be the right choice.  But in spite of overlap and connections - both lambdas will use websockets and Couchbase and reference the same list of subscribers in Couchbase, this feels different to me and so I am going to put it in a separate lambda.  If forced to justify my decision, I would look at the amount of actual user code (as opposed to infrastructure code or libraries) that they shared.  If they shared a great deal, I would humbly reverse my decision and merge the two into a single lambda.

This also requires changes to our create, update and package scripts to correctly package and deploy the new lambda.

Note that this lambda is going to fire every minute until you tear it down; that is 1440 times a day, or around 45,000 times a month.  This is a waste, but well below the "free" limit of 1,000,000 lambda invocations a month.

Handling a Timer

In addition to creating a new lambda, we identified a new entry point in our configuration: blog.ignorance.timer.OnTimer.  This is, in fact, the main motivation for the new lambda declaration: for our purposes I would be happy enough to share the code - and indeed exactly the same contents are put into both lambdas for now.

OnTimer, like its cousin TDAHandler, is basically there to provide a buffer between AWS and AWS-independent code.  As with TDAHandler, it delegates the creation of internal state to a private TDACentralConfiguration object.  Note that the semantics of static creation in Java gives rise to a prospect of there being two such objects created; the simplest fix is to make sure that only one of these two packages is placed in any given lambda; alternatives include sharing or abstracting the central configuration singleton.  All such approaches are left as an exercise for the reader.

OnTimer needs to instantiate a handler class; again, there are many ways in which this can be done - I feel the most correct way would be to extract something from the incoming event, but for now I have just hardcoded the TimerHandler class being used in this example.

The handler code thus looks like this:
public TimerResponse handleRequest(TimerEvent ev, Context cx) {
  cx.getLogger().log(new Date() + ": timer invoked by " + cx.getAwsRequestId());
  TimerRequest userHandler = new TimerHandler();
  if (userHandler instanceof DesiresLogger) {
    ServerLogger logger = new AWSLambdaLogger(cx.getLogger());
    ((DesiresLogger)userHandler).provideLogger(logger);
  }
  if (userHandler instanceof WithCouchbase) {
    central.applyCouchbase((WithCouchbase) userHandler);
  }
  userHandler.onTimer();
  return new TimerResponse();
}
We can now do most of the work of sending an unsolicited message - everything except sending it, that is.   We create the hardcoded TimerHandler, letting it implement the TimerRequest interface and the WithCouchbase interface.  It can then read the members object, create a message to send and iterate over the members.
public void onTimer() {
  try {
    JsonDocument doc = this.bucket.get("groups/everybody");
    if (doc == null || doc.content() == null || !doc.content().containsKey("members"))
      return;
    JsonArray ja = doc.content().getArray("members");
    String msg = "Time is now " + new Date().toString();
    for (int i=0;i<ja.size();i++) {
       // need to send the message
    }
  } catch (Throwable t) {
    logger.log("couchbase error", t);
  }
}
So how do we send the message?  Well, that involves the same logic we provided in the implementation of WSResponder, just with a different value for connectionId - the id of the connection we want to send it to.

As it happens, that's just the thing we have in our hand - the element of the array is an abstract representation of the connection; in our case an AWS connectionId.

What we need though is an abstraction that isolates the hard work from the application logic.  Let's call it WSConnections.
public interface WSConnections {
  boolean sendTo(String connection, String data);
}
Pretty simple really. It has one method that takes the connectionId and the data to send. It returns a boolean which indicates if the connection was valid (true if valid, false otherwise).

Of course, it's not quite enough to have this interface - we need an instance of it provided to our handler. Following our pattern of injection, we declare our handler to implement the ProvideWSConnections interface which receives a concrete instance of this interface for our use:
public interface ProvideWSConnections {
  void wsConnections(WSConnections connections);
}
We can then implement this in our handler class:
public void wsConnections(WSConnections connections) {
  this.connections = connections;
}
And then use it in our handler. Again, life becomes a little bit tricky because of the potential for failure and Couchbase not directly supporting a "remove" operation, so most of this code goes into handling that bookkeeping:
JsonArray copyTo = JsonArray.ja();
for (int i=0;i<ja.size();i++) {
  if (connections.sendTo(ja.getString(i), msg))
    copyTo.add(ja.getString(i));
}
doc.content().put("members", copyTo);
bucket.replace(doc);
Strictly speaking, replacing the document is only necessary if we have made any changes. This is probably a change worth making in the real world if only because it would avoid contention on the document - especially likely if multiple writers notice around the same time that the list of members has changed.

AWS Implementation of Sender

Finally, we are ready to present the AWS implementation of WSConnections in TDACentralConfiguration.  Because this is basically the same code as the responder, we will start by refactoring that to extract a sendMessageTo method.
private boolean sendMessageTo(ServerLogger logger, String endpoint,
                              String connId, String text, String hint) {
  try {
    AmazonApiGatewayManagementApi wsapi =
      AmazonApiGatewayManagementApiClientBuilder
       .standard()
       .withEndpointConfiguration(
          new EndpointConfiguration(endpoint, System.getenv("AWS_REGION"))
       ).build();
    PostToConnectionRequest msg = new PostToConnectionRequest();
    msg.setConnectionId(connId);
    msg.setData(ByteBuffer.wrap(text.getBytes()));
    wsapi.postToConnection(msg);
    return true;
  } catch (GoneException ex) {
    return false;
  } catch (Throwable t) {
    logger.log(hint, t);
    return true;
  }
}
and the WSResponder caller now looks like:
public boolean send(String text) {
  return
    sendMessageTo(logger, domainName + "/" + stage, connId, text, "error responding");
}
We have all the information we need in the abstract connectionName string, formatted as endpoint:connectionId. We can extract this and pass it to our extracted function as follows:
public boolean sendTo(String connection, String data) {
  String[] split = connection.split(":");
  return 
    sendMessageTo(logger, split[0], split[1], data, "error sending to " + connection);
}
This is presented as a nested class within a function that is called to inject the connections object through ProvideWSConnections.

Conclusion

And that is that.  When clients connect, they are automatically bombarded - around once a minute - with a message reminding them of the time.  When they disconnect (or are found to have been disconnected), the code cleans up its memory of the connection.  While they remain connected, they receive the timer events as they crop up.

Back to the Table of Contents 

Handling $connect and $disconnect events

Go to the Table of Contents for the Java API Gateway

As we near our goal of being able to send unsolicited messages to a group of connected websocket listeners, we need to be able to figure out who is in our group.

To be in the group, you obviously have to have connected and not yet have disconnected.  You probably should also register an interest in the group, but we will leave that as an exercise for the reader (especially since it feels more application level).

Building on what we did in the last post, we are going to capture $connect and $disconnect events and store the current set of members in an array in an object in Couchbase.

This can all be found under the tag API_GATEWAY_CONNECTING.

Configuration

First and foremost, we need to update our configuration to add the $connect and $disconnect routes.

I'm not going to show all the JSON code for the CounterConnect and CounterDisconnect route definitions (which is basically the same as CounterRoute) but there are a couple of points I wanted to make:
  • It is important to add these new objects to the DependsOn field of the CounterDeployment
  • It appears to be necessary to tear down the entire gateway and recreate it
I'm not entirely sure on the latter point, but I tried everything I could think of and nothing worked, but tearing it down and starting again did work.  It's possible that there is a way to achieve the same results from the UI, but as far as I could tell everything was being constructed correctly, it just didn't "take".  Since I had had a similar problem in defining the $default route originally, I suspect that there is a problem updating the "built-in" routes on the API Gateway.  It may be connected with the issues around updating deployments with RestApis (you need to change them in some way more than just their dependencies, for instance the logical name) but it seems more insidious than that.

Handling the Routes

Through my setup, I have intentionally chosen to direct the connection routes to the same lambda and same handler that we have already set up.  This is very definitely a choice, but it is a choice driven by a desire to have minimal overlap of similar code (in the next section we will see the consequences of the other choice).

Before starting to handle these routes, I'm going to do a refactoring.  Websocket events have a different "structure" to HTTP events, and I'm going to use this to break up the handleIt code in ProcessorRequest into two functions: handleHTTPMethod will handle "regular" HTTP requests and handleWSEvent will handle websocket events.  I'm also going to add cases to cover not receiving a context (a 500) and the context not having any of the parameters I expect (a 400).

The rest of the changes we are going to make are isolated in handleWSEvent.

First, I have extracted the responder from the call to onText into its own variable, since we will be passing it to each of the three methods.  Then I have looked at the context's routeKey variable and decided whether the route is $connect, $default or $disconnect and called open, onText or close as appropriate.
WSResponder responder = central.responderFor(logger,
                                            (String) context.get("connectionId"),
                                            (String)context.get("domainName"),
                                            (String)context.get("stage"));
if ("$connect".equals(context.get("routeKey"))) {
  wsproc.open(responder);
} else if ("$default".equals(context.get("routeKey"))) {
  wsproc.onText(responder, body);
} else if ("$disconnect".equals(context.get("routeKey"))) {
  wsproc.close(responder);
}

Handler Code

On the handler side, we have quite a bit of complex logic to do.  I think in any serious application, this would be hidden in a separate data-management object, itself hidden behind an interface.  But because we don't have that whole application, I am just going to put the Couchbase code directly in the handler.

First off, we acquire a handle to the Couchbase bucket.  We do this by implementing the WithCouchbase interface we created in the last post and storing the handle passed into it.  This is probably the best moment to make the point that, for professional purposes, an asynchronous handle to the bucket is to be preferred to using the synchronous Couchbase client, but that makes the code here significantly more cluttered; if you are using Couchbase professionally, you will know what to do.  If you are not, then do not take this as an example of what to do.

The logic is quite simple but contains two cases: whether the object already exists or not.  If the group does not exist, we create a new object with a members array containing the current connection id; if the group does exist, we add the current connection to the existing array and replace the object.

For simplicity, I have also chosen to ignore error handling, instead just logging the exception.  Note that because of concurrency, it is entirely possible for two lambdas to come through here at the same time: if they do, the insert or replace could fail with a write conflict.  In the real world this exception would need to be caught and retried.
public void open(WSResponder responder) {
  try {
    JsonDocument doc = bucket.get("groups/everybody");
    JsonArray members;
    if (doc == null) {
      members = JsonArray.ja();
      members.add(responder.connectionName());
      bucket.insert(JsonDocument.create("groups/everybody",
                                        JsonObject.jo().put("members", members)));
    } else {
      members = doc.content().getArray("members");
      members.add(responder.connectionName());
      bucket.replace(doc);
    }
  } catch (Throwable t) {
    logger.log("couchbase error", t);
  }
}
To handle the logging of the error, I added a new method to ServerLogger to log a message along with an exception.

More importantly, we need some storable representation of the connection name.  We delegate this to the WSResponder interface, requiring it to return us a string value that we can store.  While we have the connectionId as a string already, that is not quite enough: when we want to send a message to the connection, we also need to know the endpoint name (the domain and the stage) and the region.  We can "always" (unless you are going to deploy this application across multiple regions) obtain the region from the environment, so we will just capture the endpoint; we separate it from the connectionId by a colon, since we believe this is not a valid character in either token.
public String connectionName() {
  return domainName + "/" + stage + ":" + connId;
}
The logic for close() is very similar, except that we don't need to consider the case that the object has not been created but we do have some complex juggling because Couchbase does not have a method for removing an object from an array.
public void close(WSResponder responder) {
  try {
    JsonDocument doc = bucket.get("groups/everybody");
    JsonArray ja = doc.content().getArray("members");
    JsonArray copyTo = JsonArray.ja();
    String conn = responder.connectionName();
    for (int i=0;i<ja.size();i++) {
      if (!conn.equals(ja.getString(i)))
        copyTo.add(ja.getString(i));
    }
    doc.content().put("members", copyTo);
    bucket.replace(doc);
  } catch (Throwable t) {
    logger.log("couchbase error", t);
  }
}

Conclusion

We have configured our API Gateway to be able to handle the $connect and $disconnnect routes and, in doing so, keep track of a list of everybody currently connected in Couchbase.

Next: Finally Sending Unsolicited Messages

Integrating Couchbase

Go to the Table of Contents for the Java API Gateway

At this point, most authors use DynamoDB, for the very simple reason that it's easy, provisioned and you don't really need to think about it.

For various reasons, I want to use Couchbase, so I'm going to do that here and just apologize if that makes your life more difficult.  By all means, follow along and apply the same "principles" but using DynamoDB; it's certainly possible.

The code for this is in the repository tagged API_GATEWAY_COUCHBASE.

Getting started

The overall arc that we are going to follow is to have an entry in Couchbase that holds an array of connectionIds.  We want to add to that on $connect and remove on $disconnect and be able to iterate that list at will.  All of that is too much to bite off in one post, however, so we will start by just getting Couchbase integrated into our Lambda so that we could, theoretically, do something with it.

It all starts with libraries, so we need to download the Couchbase client library and its dependencies in package.sh.

We then want to make sure we have a connection to a bucket established in the lambda.  To do this, we need to provide some parameters: a cluster endpoint, a bucket name, a username and a password.  These are going to be passed to our creation and update scripts as environment variables which are then turned into parameters for the JSON.
aws cloudformation create-stack \
  --stack-name 'ignorant-gateway' \
  --capabilities CAPABILITY_IAM \
  --parameters \
    "ParameterKey=BUCKET,ParameterValue=$BUCKET" \
    "ParameterKey=COUCHBASE,ParameterValue=$COUCHBASE" \
    "ParameterKey=CBUSER,ParameterValue=$CBUSER" \
    "ParameterKey=CBPASSWD,ParameterValue=$CBPASSWD" \
    "ParameterKey=CBBUCKET,ParameterValue=$CBBUCKET" \
  --template-body "`cat src/main/resources/gateway-cf.json`"
Because we now have so many arguments, I have broken the one command out onto multiple lines.

The parameters are handled near the top of the configuration JSON:
"Parameters": {
  "BUCKET": { "Type": "String" },
  "COUCHBASE": { "Type": "String" },
  "CBUSER": { "Type": "String" },
  "CBPASSWD": { "Type": "String" },
  "CBBUCKET": { "Type": "String" }
}
And are then passed in to the lambda as environment variables again
  "COUCHBASE": { "Ref": "COUCHBASE" },
  "CBUSER": { "Ref": "CBUSER" },
  "CBPASSWD": { "Ref": "CBPASSWD" },
  "CBBUCKET": { "Ref": "CBBUCKET" }
Now we can go and write some code.

The initial connection obviously wants to be in our central repository of things for the lambda, so that it is covered under the same "static initialization" policy we came up with before.  At the same time, we don't want too much conflation of concerns, so we will delegate to its own class:
private CouchbaseEnvironment couchbase = new CouchbaseEnvironment();
Ignoring all the boilerplate, this basically comes down to a constructor which opens the Couchbase bucket identified in the parameters:
public CouchbaseEnvironment() {
  Builder builder = new DefaultCouchbaseEnvironment.Builder();
  builder.connectTimeout(10000);
  builder.kvTimeout(10000);
  builder.retryStrategy(BestEffortRetryStrategy.INSTANCE);
  DefaultCouchbaseEnvironment env = builder.build();

  String server = System.getenv("COUCHBASE");
  CouchbaseCluster cluster = CouchbaseCluster.fromConnectionString(env, server);
  String cbuser = System.getenv("CB_USER");
  String cbpasswd = System.getenv("CB_PASSWD");
  PasswordAuthenticator authenticator = new PasswordAuthenticator(cbuser, cbpasswd);
  cluster.authenticate(authenticator);
  String testBucket = System.getenv("CB_BUCKET");
  bucket = cluster.openBucket(testBucket, 15, TimeUnit.SECONDS);
}
Obviously, we also need code to provide for injecting this into handlers.  Other than to say that there is an interface WithCouchbase which handlers can implement, I'm just going to put that code in the repository for you to find.

Conclusion

This post has not really advanced "the state of the world".  It is just a technical post that allows us to have a bucket to hand to record things in.

Now we need to capture the connectionIds on connect and store them in Couchbase for later use.

Next: Handling connections and disconnections

Tuesday, December 17, 2019

Handling websocket responses with API Gateway

Go to the Table of Contents for the Java API Gateway

Apologies if that previous post seemed to end abruptly.  The simple reason is that I realized I had painted myself into something of a corner.  In every other environment I have used, web socket handlers are long-lived, leading to stickiness on the server.  That is, the websocket handler goes through a lifecycle of being created, receiving messages and being closed.  All on the same in-memory object.

For all I know, API Gateway takes the same approach internally.  But the separation of concerns, by which it delegates processing to a lambda, means that even if the actual socket connection is sticky, the lambda which is invoked quite possibly has no recollection of every having dealt with this connection before.

I was hoping that it would be simple enough to "respond" to the "calling" websocket and then defer the broader issues of connections to this post, but (reasonably enough) AWS takes the attitude that all websockets are equal and all must be accessed through the same mechanism.

In a moment, we'll look at that mechanism, but what stopped me in my tracks was a broken abstraction: in the WSProcessor abstraction presented in the previous post, the open method is handed a WSResponder and expected to hold onto it.  We simply can't do that here, because we don't know that an object in a given lambda will see the operations in the correct order.

So it's time for a new abstraction.  For now, we don't need to do anything too dramatic, although when we (finally) get around to considering other connections (as well as data storage), we will need to update the abstraction again.

For now, the code can be found in the repository tagged API_GATEWAY_RESPOND.

The Updated Abstraction

In the WSProcessor class, we have methods for handling the opening, closing and error conditions on websockets as well as processing messages.  In the original abstraction, only the open method is provided with the WSResponder object.  We need to change that so that the message processing method onText is provided with it every time as well.  Remember that WSResponder is just an interface: it doesn't give away how it's implemented; it just sends messages back to where they came from.  At the moment, neither error nor close need this handle: they cannot actually respond.

This makes it easy to finish our implementation of onText in CounterSocket:
public void onText(WSResponder responder, String text) {
  responder.send("Length: " + text.length());
}
But this obviously won't work until we have completed the behind-the-scenes implementation.

Posting to Connections in API Gateway

The key to writing to connection in API Gateway is to use the AmazonApiGatewayManagementApi interface in their client library.  This has a method postToConnection which enables you to send a message to an arbitrary connection by id.

The skeleton code for this seems quite easy.  We just "create" one of these somewhere and then call it with an appropriate object.  There is a small complexity in that the post method wants a byte buffer, but briefly ignoring character encodings, we can make that happen.
AmazonApiGatewayManagementApi wsapi = ...
PostToConnectionRequest msg = new PostToConnectionRequest();
msg.setConnectionId(connId);
msg.setData(ByteBuffer.wrap(text.getBytes()));
wsapi.postToConnection(msg);
The problems start to come when you try and create the ApiGatewayManagementApi object.  First off, you need to include a jar with it in (at time of writing, aws-java-sdk-apigatewaymanagementapi-1.11.688.jar).  This also has a string of dependencies (which are automatically resolved if you are using maven or gradle, but make quite a difference in my hand-rolled package.sh).  It needs to know how to reach the API Gateway, which requires the current region, the domain name and the stage name:
AmazonApiGatewayManagementApi wsapi =
  AmazonApiGatewayManagementApiClientBuilder.standard()
    .withEndpointConfiguration(
       new EndpointConfiguration(domainName + "/" + stage, System.getenv("AWS_REGION"))
    )
    .build();
Where can we get these from?  The region (as shown) comes from an environment variable.  The domain name and stage name are included in the request context, so we can provide these to the function from the calling site.
public WSResponder responderFor(ServerLogger logger, String connId,
                                String domainName, String stage) {
  return new WSResponder() {
    ...
  }
}
The calling site is in ProcessorRequest, well away from user code.
wsproc.onText(
  central.responderFor(logger,
                       (String)context.get("connectionId"),
                       (String)context.get("domainName"),
                       (String)context.get("stage")
  ),
  body
);
Unfortunately, that is not everything that you need.  If you run the code in this state, it fails.  It actually took me quite a long time to figure out why, in part because (as previously noted), the debug loop when you are depending on lambdas is quite long.

One problem I had was I had failed to understand how to correctly configure the Gateway Management Api (the code shown above is correct, but there are a number of other ways of trying to configure it which are wrong).

But more importantly, I kept getting "OutOfMemory" errors.  These are very unusual in normal Java development, but AWS Lambda has a default memory limit of 128MB.  Amazingly, 128MB is not enough to load in all the libraries we now need.  As I upped the limit, I found (in my environment) that 150MB or so was required.  I gave it 256MB and it still didn't work.  It kept timing out.  I increased the timeout to 25s and it returned (on cold start) in about 15s, and about 3s thereafter.  While that probably counts as "working", I wasn't pleased.

It turns out if you increase the memory limit more, you also get a bigger slice of all the other pies: specifically CPU time.  I found at around 1GB, I was getting performance I considered acceptable.  Your mileage may vary.

Conclusion

I was surprised by how very hard it was just to respond to the client calling you.  I can't help feeling I'm missing something - especially since every message returns a response to the gateway and whenever the handler throws an exception the gateway automatically sends a message back to the client.  But it certainly doesn't seem to directly couple your response to the gateway response.

There is a further wrinkle which is that I have used lambda proxies everywhere because that seemed to be the best fit when I was working with REST APIs (in particular, being able to obtain headers).  There is a lot of information in the documentation about building response routes and response integrations but that only applies if you are using non-proxy integrations.  I did not - but possibly should - take the time to investigate that.

I was also surprised by the complexity and slowness of the overall mechanism of writing to clients in general.  It concerns me that, if it takes 2-3s just to respond to one client, it is going to be unscalable when we try writing to hundreds of clients.

Next: Integrating Couchbase to Store Connected Clients

Handling WebSocket traffic

Go to the Table of Contents for the Java API Gateway

Having rejected RPC as a technology back in the 1980s, the whole request/response nature of HTTP and the web seemed a backward step to me when I first started doing Web Development in the 1990s.  Consequently, I've been a keen enthusiast of WebSockets as they have slowly permeated the web.

One of my reasons for not investigating API Gateway before now was its lack of support for WebSockets.  Part of my reason for pursuing it now is that the support is there (admittedly, it has been for a year, but I've been busy).

In this post, we are going to extend the work of the last post to support a WebSocket client.  Like most such examples to be found on the web, it is going to be trivial; but for variety not a chat room.  Instead, you can send lines of text (one at a time) to the server and it will send back a character count.  Trivial logic, complex plumbing.

All of this code is in the repository, tagged API_GATEWAY_WEBSOCKET.

First, a Client

One of the things about using WebSockets is that fairly quickly curl stops working for almost anything you want to do.  wscurl and wscat exist, but I've had a lot of issues getting them to do what I want to do.

On the other hand, writing websocket clients with grizzly is "fairly" trivial, particularly when so much of the code is boilerplate.  Consequently, I've written a quick-and-dirty client that sends and receives messages as pure text without any processing and placed it in the blog.ignorance.wsclient directory.

Now AWS

There are a number of online resources that I've followed in understanding websockets on AWS.
  • This blog post seems to be the top hit that I always come back to and gives a basic outline of the approach but requires manual setup.
  • This announcement says that CloudFormation now supports configuration of API Gateway V2 (with websockets) but doesn't give any direct links.
  • It would seem that to support this, CloudFormation added a completely new gateway resource type, helpfully called APIGatewayV2.
  • There is a developer guide for Websocket APIs.
Quite apart from the fact that a new resource type is involved, it is necessary to choose between REST and WebSocket APIs when creating a gateway.  This presumably means that you cannot have both on the same server with different paths, but I haven't fully established that yet.  In spite of this, I have configured the new gateway to direct traffic to the existing lambda.

Once again, we are required to create a number of resources:
  • A new (V2) gateway
  • An integration between this and our existing lambda
  • A "route" that says that we want to send all our messages to this lambda as they are
  • A "deployment" and a "stage" to make it exist in the outside world
With all this work done, we can recreate our gateway configuration and try running the scripts/wscli.sh script.

As an aside, for this operation, I created an updateGateway.sh script, which avoids tearing down and rebuilding all the components continually.  You may use this if you wish, but from time to time there appear to be issues with "updating" things in CloudFormation, at which point you may have to go back and drop and recreate everything anyway.

If you have tried running the script, it will probably work ... but we have yet to implement the code in the lambda.

I'm not sure how much is V1 vs V2, but obviously what comes across in the request is significantly different for WebSockets than it is for REST API calls - for a start, there is no path parameter.  Instead, we need to dig into the "request context".

This is a map which is passed across in the integration proxy JSON (for both V1 and V2), just like query parameters or headers.  Unlike those, it seems the values can plausibly be non-String values, such as nested maps.  In the V2 request context for websockets, there is a field called "eventType" which appears as "MESSAGE", a "routeKey" which contains the route key determined by the gateway and a "connectionId" which is an abstract string describing each connection uniquely.

To avoid the kind of duplication we dealt with in the API integration before, I've opted to configure the gateway to pass everything as "$default" - the default route.  It would not be unreasonable to have a number of "large scale" lambdas, each dedicated to handling a significant part of the logic of the websocket flow.  As always, what you decide on depends on the constraints of the system that you are building.

Either way, I claim that the knowledge of the route is not interesting: either all the routes are "$default" or else there is one route per lambda: whatever the value, we want to take the same action (I am deliberately ignoring $connect and $disconnect events here, along with the connectionId, but will return to them in the next post).

So we can add to our Initialization class a simple request that we want to have a handler which processes all the WebSocket messages, and have our "implementation detail" handler observe if the eventType is MESSAGE and if so, invoke this handler instead.
public void initialize(Central central) {
  ...
  central.websocket(() -> new CounterSocket());
}
public void handleIt(TDACentralConfiguration central, ServerLogger logger,
                     Responder response, Context cx) throws Exception {
  if (context != null && "MESSAGE".equals(context.get("eventType"))) {
    WSProcessor wsproc = central.websocketHandler();
    if (wsproc instanceof DesiresLogger) {
      ((DesiresLogger)wsproc).provideLogger(logger);
    }
    wsproc.onText(body);
    return;
  }
  ...
}
The CounterSocket class is then an implementation of WSProcessor which, for now, just simply logs out what it would like to do.  It turns out that responding is trickier than I had imagined and involves tangling with that connectionId.  So time to cut our losses and move on to the next post which deals with that.

Conclusion

In this post, we have managed to set up a second gateway, which is using the AWS V2 model, and which can handle websockets.  We have successfully managed to configure our lambda to receive and log the incoming messages, but we can't respond yet because we need to first deal with connections.

Next: Responding to Websockets

Friday, December 13, 2019

Integrating with a Real Server

Go to the Table of Contents for the Java API Gateway

Now things start to get interesting.  We have completed our "characterization test" phase of AWS and we have figured out how to build a gateway, deploy some Java into it and handle the request and response objects.  Now we want to connect this to a system designed to do real work.

The eagle-eyed among you will have noticed that when I created more resources and methods during the input blog post, I connected them to the same lambda I had been working with.  This is very definitely a choice - the "integration" part of each APIGateway Method definition allows you to choose which lambda to proxy to - but it was a choice I intentionally made.  All the information we need is there to handle our own routing within the lambda.

How much to put into a single lambda - and when to start creating new lambdas - is a thorny topic and not one I am going to get into here.  Suffice it to say that general software engineering best practices should apply and if there is significant code duplication, put it all in one lambda.  On the other hand, the duplication only needs to be in the packaged product, which is just storage and configuration, so you may find going towards more lambdas is a good idea.  What I would definitely advise against is literally duplicating code to make multiple lambdas.  Code duplication is almost always a bad idea.

All of this code is in the repository, tagged API_GATEWAY_TDA.

A Tell-Don't-Ask Server

As trailed in the last post, I'm going to be working towards integrating a tell-don't-ask HTTP server into API Gateway here.  Again, for full disclosure, I already have this working within a Grizzly setup.  I am going to copy-and-paste some of that code into the blog area to make it clear what is going on, and then try and rework the handler, request and response to fit with that.  You will also notice that I have changed the API Gateway configuration to point to a new resource, so if you are following along and haven't already, you will need to drop and recreate your gateway configuration.

What is a tell-don't-ask server?  Simply put, it views an HTTP request as a pipeline: that is, the data from the request is fed into the handler which then places its output (status, data, etc) in a response object, which in turn feeds it back to a client.

To be clear, no public HTTP processing framework I am aware of works like this: most pass a handler both a request object (from which it is supposed to pull data) and a response object to write to.  The AWS API we are working with here expects the handler to pull from a request object and then create a response object.  While I'm not going to get into the philosophy of why I prefer functional and pipeline approaches here, suffice it to say that there are reasons of modularity, composability and testability which I find compelling.

The intent is that the server is configured to map input paths to "handler generators": that is, classes which act as factories for handlers, creating a new object for each incoming request and attaching any global configuration (such as access to databases).  The server then processes the incoming request and "tells" the handlers the data that it wants to know to do its job.  Once the handler has been created and configured, it is given a response object and told to get on with it.  When complete, the response can be processed by the server. 

What is being Requested, anyway?

One of the things we did not look at in the last post when considering input data was how to find out what the actual resource being requested was.  It is possible to determine this by implementing additional setters on the Request object.  The documentation says that httpMethod, resource and path are the ones to look at.  For our purposes, the overlap between path and resource means that path is probably not going to be particularly useful except for debugging and errors.

For this post, I've essentially thrown away what I've done up to this point and started again.  Specifically, I've created a set of new packages to hold the "client" code, the necessary interfaces and an implementation server.

The code that we have been working with over the past posts becomes "implementation detail" in this version and thus is buried inside the server.  At the same time, I have taken the opportunity to add setters for these additional data items.

Initialization in Lambda

As far as I can tell from reading the AWS documentation, there is no specific mechanism for starting up or "bootstrapping" a lambda.  They are clear that by and large lambdas can be reused, but the first time through you have to hope that everything is initialized.

The one guarantee that we have is that the API Gateway is going to call our Handler method before anything else we care about, and we are guaranteed by Java semantics that before an object is created all of the static members will be initialized and any static constructor will have been called.

Like most software engineers, I don't like static variables and I don't like this way of approaching problems.  On the other hand, it is possible to use a static member to declare that there is a singleton configuration object which needs to be created.

So to make the initialization work, I have defined an Initialization class and created a single static instance of it in the top-level handler.  For testing purposes (not that I am seriously testing this version of this code) it is possible to create the Initialization class independently of this static variable and I will treat the Handler class as I would a main method: it does the minimal amount of wiring up which can be "tested" in production and otherwise depend on thorough testing of all its components.

This Initialization class is again an "implementation detail" and is mainly responsible for making sure that a central configuration repository is initialized, accessible to all handlers, and then calling a user-provided configuration class (passing it the repository instance).  The name of this class is determined from an environment variable passed into the lambda definition.

The Processing Flow

The processing of a request begins in the "implementation detail" handler.  It is provided with a ProcessorRequest object which has been populated by the AWS server.  But our handler, being tell-don't-ask, rejects the opportunity to extract data from the request and instead creates a response object and asks the request to get on with the job of processing the request and passing any output to the response.  When complete, it returns the response to AWS.

The ProcessorRequest object first talks back to the central repository and asks it whether it can create a handler for the given combination of method and resource.  If it cannot, it populates the response with a 404 and gives up.

Otherwise, it looks at the capabilities of the defined handler using reflection and populates it accordingly: if it wants headers, it gives it those; if it wants path parameters, it gives it those; if it wants query or form parameters, it gives it those; and if it wants the body, it gives it the body.

Finally, when the handler is fully configured, the obligatory process method is called, providing the response object, and the handler is expected to "do its thing" and complete the response object.  Ultimately, the userspace handler completes, so does the handler method in the ProcessorRequest object, and finally the AWS handler returns the response.

What about the duplication?

Apologies to those of you who have been following along and asking this question.   Clearly, there is duplication between the paths that are set up in API Gateway and the path matching that is being done here.  The question is how to avoid it.

If you follow the "pure" API Gateway approach, it is possible to eliminate the duplication by associating each of the methods with its own endpoint in its own lambda, and could then remove the initialization and pattern matching here.  The problem with that (from my perspective) is you are switching a small amount of mapping duplication for a huge amount of infrastructural duplication.

The other possibility is to deal with the duplication by creating one of the copies from the other.  The duplication will still technically "exist" but it will no longer be your problem.

My solution in real life is to extract all of the mapping setup into external (JSON) files which are then read to configure the server.  These same files can then be used at deployment time to generate the appropriate configurations in API Gateway.  Doing this is left as an exercise to the reader.

Abstractions and Testing

What may or may not be clear is that I have taken the same (actually slightly simplified) abstraction as I was already using for Grizzly and redeployed it under API Gateway.  That means, that with minimal effort, I can continue to run "local" end-to-end tests inside a Grizzly server on actual hardware, and only deploy to API Gateway for "staging" and "production" purposes.

The whole point, of course, of using these interfaces is to isolate my development cycle from the underlying technologies and so unit testing of handlers in this form becomes a breeze (although you do need to be careful that your tests populate the handlers in the same way that the configuration does).

Conclusion

Hopefully that was not too breathless a tour of integrating a separate method dispatcher into API Gateway.  In principle, I don't see any reason why the same technique would not work for commercial libraries such as Spring or Struts.

For me, the key conclusion is that it is possible to deploy an API Gateway with minimal or no changes to the underlying logic, just a separate adapter/wrapper to the web technology.  This gives me the freedom to develop and test my code in units and deploy to either grizzly containers or to AWS API Gateway.

Next: Logging to Cloudwatch

Logging to Cloudwatch from API Gateway

Go to the Table of Contents for the Java API Gateway

Logging is an important means of being able to determine what is going on in any system.  This is particularly true for otherwise opaque systems in the cloud.

API Gateway offers the ability to log through Cloudwatch by providing the handler with a Context object containing a logging service in its getLogger property.  We can obtain this in our handler, wrap it in a "generic" interface (to log a string) and then pass that to any userspace handlers that want it.

These changes are tagged AWS_GATEWAY_LOGGING in the repository.

So, in TDAHandler we add this constructor:
  ServerLogger logger = new AWSLambdaLogger(cx.getLogger());
The AWSLambdaLogger is define as:
public class AWSLambdaLogger implements ServerLogger {
  private final LambdaLogger logger;

  public AWSLambdaLogger(LambdaLogger logger) {
    this.logger = logger;
  }

  @Override
  public void log(String string) {
    logger.log(string);
  }
}
Finally, in ProcessorRequest we pass it off to anybody who asks for it by implementing the DesiresLogger interface:
  if (handler instanceof DesiresLogger) {
    ((DesiresLogger)handler).provideLogger(logger);
  }
It would, of course, be possible to integrate this with frameworks such as SLF4J by providing a suitable implementation of the concrete classes.  This is left as an exercise to the reader (but look at aws-lambda-java-log4j).

If you don't have access to "the context" you can still log.  You just need to obtain the logger from a static method on LambdaRuntime:

LambdaLogger logger = LambdaRuntime.getLogger();
One thing to note (if you are unused to Cloudwatch Logging): it often takes a while for logging to work its way through Amazon's cloud.  This can be very distracting, particularly since a lot of issues never seem to get logged at all.  How do you determine if a log failed to get through, or just hasn't come through yet?  And having to wait a minute or two (sometimes more) for feedback on what just went wrong can kill productivity.  Sadly, I don't have a solution for  this other than to "test" as little as possible on AWS and to always assume that every time you push everything except the environment is perfect.  And then to change the environment as little as possible, and always in small steps so that it is always "green" to "green" and any time you do see a failure, you know what the problem "must be".

Next: Getting Started with Websockets

Thursday, December 12, 2019

Handling Input in a Lambda Proxy

Go to the Table of Contents for the Java API Gateway

If we don't want to greet the entire world, but one individual by name, then we will need to obtain information about that person and their name.  There are many ways to do this, and this section will look at some of them in an attempt to develop a more complete Lambda Integration before we go full-bore with trying to actually integrate an existing Java server.

All of the code here is checked in together and can be found at the tag API_GATEWAY_HELLO_INPUT.

Reading from a query parameter

According to the AWS documentation, the query parameters are available as a JSON object in the lambda proxy input request in a field called queryStringParameters. There is also a multiValueQueryStringParameters which allows multiple copies of the same parameter to be specified, but let's start small.

In order to obtain this, we need to create a setter on our IgnorantRequest POJO.  I am guessing that in the Java binding, it will happily translate a JSON object into a Java Map.  Thus I can write this code:
public class IgnorantRequest {
 private Map<String, String> values = new HashMap<>();

 public void setQueryStringParameters(Map<String, String> values) {
  if (values != null)
   this.values = values;
 }
}
In my experiments, I found that with no parameters, AWS could choose to pass null in to this function, which could cause an exception.  On the other hand, it does seem to always call the function.  But since I can't be sure, I wrote this code in the most defensive way possible: values will always have a valid map regardless of how it is invoked.

For full disclosure (for those that don't know me), I really don't like the usual Java-style POJOs with setters and getters (to be precise, it's the getters that get me) and prefer a "tell-don't-ask" style of programming.  We'll see what this looks like when we integrate my "tell-don't-ask-server" in a later post, but for now I'm just going to grit my teeth and add a getter to this class.  To make myself a little happier (and a little less primitive-obsessed), I'm not going to let you ask for all of the parameters: you have to know which one you want.  I'm also going to let you ask first if we have that one.
public class IgnorantRequest {
 private Map<String, String> values = new HashMap<>();

 public void setQueryStringParameters(Map<String, String> values) {
  if (values != null)
   this.values = values;
 }
 
 public boolean hasQueryParameter(String p) {
  return values.containsKey(p);
 }
 
 public String queryParameter(String p) {
  return values.get(p);
 }
}
It's now quite easy to update both the main function and the response to handle the fact that we may have a query parameter (or may not) and that we want to use it in the greeting if we do, and carry on with the old "hello, world" behavior if not.
public class Handler implements RequestHandler<IgnorantRequest, IgnorantResponse> {
 public IgnorantResponse handleRequest(IgnorantRequest arg0, Context arg1) {
  if (arg0.hasQueryParameter("name"))
   return new IgnorantResponse(arg0.queryParameter("name"));
  else
   return new IgnorantResponse("world");
 }
}
public class IgnorantResponse {
 private final String helloTo;

 public IgnorantResponse(String helloTo) {
  this.helloTo = helloTo;
 }

 public String getBody() {
  return "hello, " + helloTo;
 }
}
And you can package that up and try to curl it again with and without a parameter:
$ scripts/package.sh
$ curl https://tovogqsfoj.execute-api.us-east-1.amazonaws.com/ignorance/hello
hello, world
$ curl https://tovogqsfoj.execute-api.us-east-1.amazonaws.com/ignorance/hello?name=Fred
hello, Fred

Reading from an HTTP header

HTTP headers are a standard way of passing information between clients and servers.  The headers are passed to a lambda proxy through the headers field of the JSON object, which can again be interpreted as a Map in Java.

There is something of a wrinkle here, which possibly applies to the query parameters as well, but simply didn't come up, that often HTTP headers are thought of in upper case or mixed case but can be any case and curl in particular shifts them to lower case.

To handle this, we define a case-insensitive TreeMap to hold the header values and then put all of the headers passed across into that.  Note that I don't see how it is possible for the headers array to be null, but I've defended against it anyway.
public class IgnorantRequest {
  private Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);

  public void setHeaders(Map<String, String> headers) {
    if (headers != null)
      this.headers.putAll(headers);
  }
 
  public boolean hasHeader(String hdr) {
    return headers != null && headers.containsKey(hdr);
  }

  public String getHeader(String hdr) {
    return headers.get(hdr);
  }
}
(Note that I have omitted code already shown for brevity.)

The updates to the main code are very similar to those already shown for the query parameter case.
$ scripts/package.sh
$ curl -HX-USER-NAME:Fred https://tovogqsfoj.execute-api.us-east-1.amazonaws.com/ignorance/hello
hello, Fred

Reading from a path parameter

In order to move on, we need new resource methods.  I'm going to add two at the same time to the gateway configuration: a POST method on the existing hello resource to handle reading from the body (see next section) and another GET method on a new hello/{who} resource that will enable us to read from the who path parameter to find out who we should be greeting.  If you are following along and have already deployed the appropriate version of the gateway, you'll be fine.  If you have checked out the code but not dropped and recreated the gateway, you will need to do that before you can continue.

Path parameters come across in a map called pathParameters.  This is starting to look easy (and repetitive).  A little bit of cutten-and-pasten (making sure not to make any stupid duplication mistakes) and we can try again:
$ scripts/package.sh
$ curl https://tovogqsfoj.execute-api.us-east-1.amazonaws.com/ignorance/hello/George
hello, George

Reading from the body

The body can also be passed in for a POST request, so it is important to add that method (done above) and then the body should be available through the body parameter.  The documentation describes it as a "JSON string" but we have to assume that will be translated into a Java String for our purposes (although note that binary bodies apparently come across as Byte64 encoded strings).

I am not going to try and parse the body or do anything fancy.  I just assume that if there is a non-empty body, it is the name to be greeted.  I store the body (if any) and provide code to test the body exists and return it much the same as for the other cases.  The handler is updated to test this in turn.

$ scripts/package.sh
$ curl --data Henry https://vuhsa5vvlj.execute-api.us-east-1.amazonaws.com/ignorance/hello
hello, Henry

Ordering

You may be wondering at this point what happens if you specify multiple names to greet.  The answer, of course, is that it depends on the logic in the handler.  What I chose to do was to test each of the cases we have considered in turn and return the first one that matches; if none of them match, we continue to greet the world.

This is not the only possible choice, but it is simple and, in any case, it is not our greeting strategy that is under review here: it is whether we can integrate with AWS.

Conclusion

We have experimented with four different ways of handling input from a client (query parameters, path parameters, headers and body) and used these to customize our greeting response.

Next: Integrating our TDA Server