The next step in the process is to enable the sending of updates from the cloud to our webapp.
This is essentially very simple, but as with everything else, we need to put in significant effort to wire it up.
But there is a very cheap way of sending an update to the browser, which is to use the apigatewaymanagementapi subcommand of the aws command line tool. This is always a little weird to me because the mental model I have of a websocket is analogous to a socket, and there is a lambda on the far end. This is not true, of course.
The websocket is owned by APIGateway, and so anybody that wants to send anything down the socket needs to contact APIGateway and send a request. As we saw last time, for this to work they need two things:
- connectivity to the APIGateway;
- the "execute-api:ManageConnections" permission on the appropriate resource.
AWS_PROFILE=ziniki-admin aws apigatewaymanagementapi post-to-connection --data '"Hello from API Gateway!"' --endpoint-url https://n2n2psybtd.execute-api.us-east-1.amazonaws.com/development --connection-id "O_nmJfJWoAMCE9w="And if I look in the console of my browser, I see a message like this:
MessageEvent {isTrusted: true, data: '"Hello from API Gateway!"', origin: 'wss://n2n2psybtd.execute-api.us-east-1.amazonaws.com', lastEventId: '', source: null, …}I know what you're thinking: how did that work and where did that connection-id come from? (Actually, you're probably only wondering one of those).
The connection-id is a key which tells APIGateway which of the many websockets it's currently holding onto to send the message to. Each websocket is automatically given this identifier with every incoming request. You can use it to respond as we did in the last episode, or you can store it to send asynchronous messages (as we'll do in the next episode). For simplicity, I sent it over with the data in the response from the lambda, and just pulled it out of there to put it in the command line now.
Doing this Programmatically
I just sent a trivial message here, which was displayed, but did not have the appropriate action. With a little bit of thought better could be done from the command line. But we are going to skip that step and move straight onto doing it with another API Gateway and another lambda. The idea here is that we can send new prices for one or more stocks from the command line (using curl) and then they will be automatically updated and the connections notified. For now, we are going to provide the connection ids; the final step (next time) will be to use neptune and dynamo to coordinate all this.For some reason, when using APIGateway, you have to configure either a websocket gateway or an http gateway, so we can't reuse our existing gateway but need to declare a new one. We could reuse the same lambda, but that doesn't seem like a good choice to me right now, so we are going to declare another one of those as well. Obviously (since it will need access to Neptune) it needs to be in the same VPC. And since it will want to send messages to the websockets, we need the IPv6 "dualstack" property. Even though the new API Gateway won't be handling websockets, it seems to make sense to make that IPv6 compatible as well.
So we add this to our infrastructure.dply file:
Now we "duplicate" everything for the flow to publish new prices. Things are
not quite the same, but close.
lambda.function "publish-lambda" => publish_lambda
@teardown delete
Runtime <- "go"
Code <= aws.S3.Location
Bucket <- "ignorance-bucket.blogger.com"
Key <- "lambda-publish.zip"
Role <= aws.IAM.WithRole "ignorance-lambda-role"
assume
allow "sts:AssumeRole"
principal "Service" "lambda.amazonaws.com"
policy
allow aws.action.ec2.CreateNetworkInterface "*"
allow aws.action.ec2.DescribeNetworkInterfaces "*"
allow aws.action.ec2.DeleteNetworkInterface "*"
allow "logs:CreateLogGroup" "*"
allow "logs:CreateLogStream" "*"
allow "logs:PutLogEvents" "*"
allow "execute-api:ManageConnections" "arn:aws:execute-api:*:*:*/development/*/@connections/*"
PublishVersion <- true
Alias <- "next"
VpcConfig <= aws.VPC.Config
DualStack <- true
Subnets <- vpc->subnets
SecurityGroups <- vpc->securityGroups
We want to work through the alias, which was just created, so find that specifically
find aws.Lambda.Alias "next" => publishAlias
FunctionName <- "publish-lambda"
Now we need to define a websocket API Gateway to access this lambda:
api.gatewayV2 "stock-publish"
@teardown delete
Protocol <- "http"
IpAddressType <- "dualstack"
integration "lambda"
Type <- "AWS_PROXY"
Uri <- publishAlias->arn
PayloadFormatVersion <- "2.0"
route "$default" "lambda"
stage "development"
NEPTUNE_PUBLISH:neptune/dply/infrastructure.dply
This is basically the same as for the watch lambda except we are configuring an HTTP gateway rather than a websocket one.So we just need to implement a lambda which does the publication for us. This is similar in a lot of ways to the watch lambda, but it has a different prototype for the handle function because it's an HTTP handler, not a websocket handler.
Here's the prologue ceremony:
package main
import (
"context"
"encoding/base64"
"fmt"
"log"
"net/url"
"strconv"
"strings"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/gmmapowell/ignorance/neptune/internal/client"
)
NEPTUNE_PUBLISH:neptune/lambda/publish/main.go
Starting at the end, main() identifies the actual handler function:func main() {
lambda.Start(handleRequest)
}
NEPTUNE_PUBLISH:neptune/lambda/publish/main.go
So now we can move on to actually writing the handler:var sender *client.Sender
func handleRequest(ctx context.Context, event events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
if sender == nil {
sender = client.NewSender("n2n2psybtd.execute-api.us-east-1.amazonaws.com", "development")
}
formData, r, err := readForm(&event)
if r != nil {
return *r, err
}
quotes, r, err := buildQuotes(formData)
if r != nil {
return *r, err
}
// a hack right now; will be replaced with neptune
connIds := formData["connId"]
for _, connId := range connIds {
log.Printf("have quotes %v; sending to %s\n", quotes, connId)
sender.SendTo(connId, quotes)
}
resp := events.APIGatewayV2HTTPResponse{StatusCode: 200, Body: ""}
return resp, nil
}
NEPTUNE_PUBLISH:neptune/lambda/publish/main.go
Note that to make this work, I have refactored the watch lambda to extract all the relevant methods to the internal packages. In reality, files in lambda packages should be treated exactly the same as those in main packages: the minimum amount of code should go there. I broke this rule with the watcher because I wanted to get something to work. But even so, I feel more of both of these lambda main functions should be extracted into internal.The sender is an instance variable which is the client responsible for communicating with the websocket(s). The Sender is the name of the abstraction we created when refactoring. Note that we are not using the name of this api gateway but the one that serves the websockets. Blind copying of the watcher code would lead to us passing in the current domain name. Obviously, in the "real" world, this would not be hardcoded; probably it would be an environment variable attached to the lambda definition in the deployer file, but this is just a demo, so let's press on.
There are obviously decisions to be made about how we indicate the variables to be passed in the HTTP request; I have decided to use form data; this would be a good fit for an old-style web form, but is also compatible with AJAX requests and, most importantly for us, is easy to use with curl.
Once we have the form data, we can build this into a list of Quote objects, which are the same objects we already used with the watcher, which have already been set up to encode into JSON.
We then retrieve the list of connection ids from the form data, and then for each connection id we post the quotes to the management gateway with that id.
Finally, we return a status code of 200 to indicate success with no message (yes, we possibly should use 204).
This code obviously depends on supporting functions which are dull processing, but for completeness here they are:
func readForm(event *events.APIGatewayV2HTTPRequest) (url.Values, *events.APIGatewayV2HTTPResponse, error) {
method := event.RequestContext.HTTP.Method
if method != "POST" {
log.Printf("request was not POST but %s\n", method)
return nil, &events.APIGatewayV2HTTPResponse{StatusCode: 400, Body: "must use POST"}, nil
}
contentType := event.Headers["content-type"]
if !strings.Contains(contentType, "application/x-www-form-urlencoded") {
log.Printf("content type did not say it was a form but %s\n", contentType)
return nil, &events.APIGatewayV2HTTPResponse{StatusCode: 400, Body: "must use content type application/x-www-form-urlencoded"}, nil
}
body := event.Body
if event.IsBase64Encoded {
decodedBody, err := base64.StdEncoding.DecodeString(body)
if err != nil {
log.Printf("error decoding base64: %v\n", err)
return nil, &events.APIGatewayV2HTTPResponse{StatusCode: 500, Body: "decoding base64 failed"}, err
}
body = string(decodedBody)
}
// Parse the form data
formData, err := url.ParseQuery(body)
if err != nil {
log.Printf("error parsing body as query: %v\n", err)
return nil, &events.APIGatewayV2HTTPResponse{StatusCode: 500, Body: "parsing failed"}, err
}
return formData, nil, nil
}
func buildQuotes(formData url.Values) ([]client.Quote, *events.APIGatewayV2HTTPResponse, error) {
tickers := formData["ticker"]
prices := formData["price"]
if len(tickers) != len(prices) {
log.Printf("mismatched tickers and prices: %d %d\n", len(tickers), len(prices))
return nil, &events.APIGatewayV2HTTPResponse{StatusCode: 400, Body: "mismatched tickers and prices"}, nil
}
var quotes []client.Quote
for i, t := range tickers {
ps := prices[i]
p, err := strconv.Atoi(ps)
if err != nil {
log.Printf("could not parse %s as a number\n", ps)
return nil, &events.APIGatewayV2HTTPResponse{StatusCode: 400, Body: fmt.Sprintf("not a number: %s", ps)}, err
}
quotes = append(quotes, client.Quote{Ticker: t, Price: p})
}
return quotes, nil, nil
}
NEPTUNE_PUBLISH:neptune/lambda/publish/main.go
We can now send through an update message like so:curl -vk -HContent-Type:application/x-www-form-urlencoded -d ticker=AAPL -d price=2400 -d ticker=GOOG -d price=31300 -d connId=PCjUkcrRIAMCI-w= https://uxfcmjy8e7.execute-api.us-east-1.amazonaws.com/developmentThis appears in our browser window.