Tuesday, August 12, 2025

Delivering Prices from a Lambda


Step two is to assemble all the infrastructure in the cloud to deliver prices. For now, we are just going to deliver one price, statically, on connection, but, believe me, it's the principle of the thing that counts.

Websocket Quote Listener

The first thing we're going to do here is to add a JavaScript listener. Basically, we're going to copy the outline of the code we wrote for the mock listener, open a websocket and connect that to the watcher.
class WebsocketStockQuoter {
    constructor(wsuri) {
        this.conn = new WebSocket(wsuri);
        this.conn.onmessage = msg => {
            console.log(msg);
            if (this.lsnr) {
                var data = JSON.parse(msg.data);
                if (data.action && data.action === 'quotes') {
                    this.lsnr.quotes(data.quotes)
                }
            }
        }
        this.conn.onopen = () => {
            console.log("telling lambda who we are - expect quotes after this");
            this.conn.send(JSON.stringify({"action":"user","id":"user003"}));
        }
        this.conn.onclose = err => {
            console.log("closing because", err);
        }
    }

    provideQuotesTo(lsnr) {
        this.lsnr = lsnr
    }
}

export { WebsocketStockQuoter }

NEPTUNE_WEBSOCKET_QUOTER:neptune/app/js/webstockquoter.js

The console.log statements are there for the debugging I know I am going to have to do.

When you use APIGateway with Lambda with a websocket, there are three routes you can listen to: $connect, $default and $disconnect. I am just going to listen to $default and so nothing is going to happen until I send a message - in onopen. In reality, it would generally be necessary for the user to perform some kind of security, so it's rare that you can in fact do anything on the server side before the client sends their first message anyway. Consider passing over our user name as "doing security".

And we change stocks.js to use this (keeping the old code around commented out "just in case"):
// import { MockStockQuoter } from './mockstockquoter.js';
import { WebsocketStockQuoter } from './webstockquoter.js';
import { QuoteWatcher } from './quotewatcher.js';

window.addEventListener("load", () => {
    var table = document.querySelector(".stock-table tbody");
    var templ = document.getElementById("stockrow");
    var quoteWatcher = new QuoteWatcher(table, templ);
    var quoter = new WebsocketStockQuoter("wss://n2n2psybtd.execute-api.us-east-1.amazonaws.com/development/");
    // var quoter = new MockStockQuoter();
    quoter.provideQuotesTo(quoteWatcher);
});

NEPTUNE_WEBSOCKET_QUOTER:neptune/app/js/stocks.js

Defining the Infrastructure

In order to work in the AWS cloud, we need to deploy a lot of infrastructure. In large part, it was this specific use case that set me down the path of writing my own, "sensible" deployer tool, so let's use it here. Hopefully the commentary within the script explains what is going on.
We want to have a web app that can access Neptune.  In order for that to work, we need to
define and declare a lambda, and then connect it through an API Gateway.  We actually need
multiple units to handle watching prices and updating them

First off, we need an S3 bucket to store our code in

        ensure aws.S3.Bucket "ignorance-bucket.blogger.com" => bucket
            @teardown preserve

We need to recover the VPC we have put Neptune in

        find aws.VPC.VPC "Test" => vpc

Then we need a lambda

        lambda.function "watch-lambda" => watch_lambda
            @teardown delete
            Runtime <- "go"
            Code <= aws.S3.Location
                Bucket <- "ignorance-bucket.blogger.com"
                Key <- "lambda-watch.zip"

The role for the lambda needs to say that it can be assumed by lambda,
and then needs to have the permissions to set up the VPC, along with
permissions to access other services we will need.

            Role <= aws.IAM.WithRole "ignorance-lambda-role"
                assume
                    allow "sts:AssumeRole"
                        principal "Service" "lambda.amazonaws.com"
                policy

These permissions are needed to allow the lambda to configure its VPC

                    allow aws.action.ec2.CreateNetworkInterface "*"
                    allow aws.action.ec2.DescribeNetworkInterfaces "*"
                    allow aws.action.ec2.DeleteNetworkInterface "*"

These allow the lambda to write to CloudWatch

                    allow "logs:CreateLogGroup" "*"
                    allow "logs:CreateLogStream" "*"
                    allow "logs:PutLogEvents" "*"

This permission is a weird one, and it's hard to track down a definitive reference for the resource API,
but it's what allows the lambda to send websocket messages.  The resource pattern is:

arn:aws:execute-api:REGION:ACCOUNT:GWID/STAGE/METHOD/@connections/CONNECTION-ID

You obviously stand no chance of guessing the connection ID, but we could generate the rest exactly when
we create the APIGW below, but it's easier to just do this.  Note that, unlike most resource IDs in permissions,
something needs to appear in both the REGION and ACCOUNT slots (I have used *); it is not acceptable to
just leave them blank between colons.

                    allow "execute-api:ManageConnections" "arn:aws:execute-api:*:*:*/development/*/@connections/*"

Lambda has a lot of complicated features, but we will just set up to use
the basic publication and alias features using the alias "next"

            PublishVersion <- true
            Alias <- "next"

In order to access Neptune, the Lambda needs to be in the same VPC.  For whatever
reason, we can't just specify the VPC name, we have to find it (above) and then
copy across the Subnets and Security Groups.

            VpcConfig <= aws.VPC.Config
                Subnets <- vpc->subnets
                SecurityGroups <- vpc->securityGroups

We want to work through the alias, which was just created, so find that specifically

        find aws.Lambda.Alias "next" => nextAlias
            FunctionName <- "watch-lambda"

Now we need to define a websocket API Gateway to access this lambda:

        api.gatewayV2 "stock-watch"
            @teardown delete
            Protocol <- "websocket"
            RouteSelectionExpression <- "$request.body.action"
            integration "lambda"
                Type <- "AWS_PROXY"
                Uri <- nextAlias->arn
            route "$default" "lambda"
            stage "development"

NEPTUNE_DEPLOYER_API:neptune/dply/infrastructure.dply

So now we have all the pieces in place to run our websocket quoter, but we still need the code for a lambda, which we're going to write in Go.

The Lambda

We're going to start with a fairly simple lambda that responds to input and sends a static stock price back.

First, the obligatory ceremony at the top of the file:
package main

import (
    "context"
    "encoding/json"
    "log"
    "net/url"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/apigatewaymanagementapi"
    transport "github.com/aws/smithy-go/endpoints"
)

NEPTUNE_WATCH_LAMBDA:neptune/lambda/watch/main.go

Skipping to the end, AWS requires that every lambda has a main() function that calls lambda.Start() with the actual handler, which has one of a variety of defined function types.
func main() {
    lambda.Start(handleRequest)
}

NEPTUNE_WATCH_LAMBDA:neptune/lambda/watch/main.go

The input message is an event which has a defined type in AWS, events.APIGatewayWebsocketProxyRequest, which contains all the elements we need such as the ConnectionId and the Body as a string. We can then unmarshal that to our own event. So we define our input and output types as follows:
type messagePayload struct {
    Action string `json:"action"`
    Userid string `json:"id"`
}

type quotesPayload struct {
    Action       string  `json:"action"`
    ConnectionId string  `json:"connection-id"`
    Quotes       []Quote `json:"quotes"`
}

type Quote struct {
    Ticker string `json:"ticker"`
    Price  int    `json:"price"`
}

NEPTUNE_WATCH_LAMBDA:neptune/lambda/watch/main.go

And then we can actually handle the request in the handleRequest method (the name of this method is not significant; it is the function passed in the argument to lambda.Start). This starts by unpacking the event into the messagePayload structure, and then creating a client to send messages back across the websocket.
var apiClient *apigatewaymanagementapi.Client

func handleRequest(ctx context.Context, event events.APIGatewayWebsocketProxyRequest) error {
    if event.IsBase64Encoded {
        log.Printf("cannot unmarshal request in base64")
        return nil
    }

    var request messagePayload
    if err := json.Unmarshal([]byte(event.Body), &request); err != nil {
        log.Printf("Failed to unmarshal body: %v", err)
        return err
    }

    if apiClient == nil {
        apiClient = NewAPIGatewayManagementClient(event.RequestContext.DomainName, event.RequestContext.Stage)
    }

NEPTUNE_WATCH_LAMBDA:neptune/lambda/watch/main.go

The action field determines what we are being asked to do, and so we can switch on that. At the moment, we only have only type of incoming action, which is to identify the user interested in receiving stock quotes. For now, when we receive this message, we immediately respond with a static list of quotes, marshalled from the structures above.
    switch request.Action {
    case "user":
        log.Printf("request for stocks for %s\n", request.Userid)
        resp := quotesPayload{Action: "quotes", ConnectionId: event.RequestContext.ConnectionID, Quotes: []Quote{{Ticker: "EIQQ", Price: 2200}}}
        msgData, err := json.Marshal(&resp)
        if err != nil {
            return err
        }
        connectionInput := &apigatewaymanagementapi.PostToConnectionInput{
            ConnectionId: aws.String(event.RequestContext.ConnectionID),
            Data:         msgData,
        }
        _, err = apiClient.PostToConnection(context.TODO(), connectionInput)
        log.Printf("sent message to %s, err = %v\n", event.RequestContext.ConnectionID, err)
        return err
    default:
        log.Printf("cannot handle user request: %s", request.Action)
    }
    return nil
}

NEPTUNE_WATCH_LAMBDA:neptune/lambda/watch/main.go

Finally, in order to send messages across a websocket, you need to create an apigatewaymanagementclient which has to be "told" exactly which APIGateway to connect to. It took me a fair while to figure this out, sampling from a number of sources across the internet, until I reached this which "almost" works:
func NewAPIGatewayManagementClient(domain, stage string) *apigatewaymanagementapi.Client {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Printf("could not init config: %v\n", err)
        return nil
    }
    return apigatewaymanagementapi.NewFromConfig(cfg, func(opts *apigatewaymanagementapi.Options) {
        opts.EndpointResolverV2 = &endpointResolver{domain: domain, stage: stage}
    })
}

type endpointResolver struct {
    domain string
    stage  string
}

func (e *endpointResolver) ResolveEndpoint(ctx context.Context, params apigatewaymanagementapi.EndpointParameters) (transport.Endpoint, error) {
    uri := url.URL{Scheme: "https", Host: e.domain, Path: e.stage}
    return transport.Endpoint{URI: uri}, nil
}

NEPTUNE_WATCH_LAMBDA:neptune/lambda/watch/main.go

The core of this is where the code creates a client using the default configuration; this is modified by a function which takes the Options associated with that, and binds the EndpointResolverV2 to the function which defines the uri on the highlighted line.

So far, so good. We can then build and deploy this using the scripts/deploy command. The key to deploying a lambda in Go is to build the binary as an executable called bootstrap, and then put that as the only (or last) file in a zip archive. We can then upload this file to an S3 bucket and point our lambda configuration to that location. The deployer knows to update the function code every time it is asked to update the infrastructure and then to publish it and update the next alias. In short, every time we re-run scripts/deploy, all the necessary work is done to bring the lambda up to date.

We can now reload our webapp, and find out what happens. Which it turns out is nothing. After a while, an error appears in the browser console:
{"message": "Endpoint request timed out", "connectionId":"O9iVddyboAMCJ1A=", "requestId":"O9iVeH0VoAMEtnw="}
It's not entirely clear from this what's gone wrong, but I have past experience with lambdas running in a VPC and I was totally expecting this. By default, lambdas in a VPC cannot access the internet or (almost) any AWS services outside of the VPC. It's a real pain.

Conclusion

We managed to build a lambda, and update our code to connect to it. But as yet, we don't get any stock prices because the lambda cannot connect to the api management gateway. This strikes me as incredibly dumb: to require many lambdas to be in VPCs (because neptune HAS to be in a VPC), but then not allow them to send messages back to their clients.

There are solutions, and in a special bonus episode, we will figure this out next time.

No comments:

Post a Comment