Flicking back to what we did with Neptune, we have functions ConnectEndpoint and DisconnectEndpoint. To use these, we need a db endpoint address, a user id and a connection id. We don't have the db endpoint in our new lambdas, but we can easily provide that (through hardcoding or otherwise). As I look at this code, I see that it has been written based on the assumption it is running within a one-shot main routine, so I think some refactoring will need to be done there to make it work with the more long-term model expected by the lambdas.
For the publisher, we have the FindStockWatchers method, which again opens the database internally, so again needs to be refactored. In looking through this, I realized that I don't have any code to update the stock price in DynamoDB, so I will need to add a "hardcore" function to do that, although strictly that isn't necessary to complete the update and publication tasks. The calls to openDynamo also need to be moved up to avoid repeatedly opening dynamo connections. And then we have all the publication code in the publish lambda we wrote last time.
OK, let's get to it.
I did all the refactoring and checked it in as NEPTUNE_EXTRACT_OPEN.
Wiring up the watcher
The handler in the websocket listener is a piece of code which follows a somewhat confusing pattern. The handler function is only called once for any given client, because our clients only send one message to the server; but it may be called multiple times for different clients. So, when it is called, it may have been called before, or it may not. We thus make the provisioning of resources conditional on them not already existing. It is, however, my firm belief that the code does not need to handle multi-threading; if AWS needs higher throughput it creates multiple instances of a lambda; it does not invoke the same one multiple times in parallel.On the other hand, every time it is invoked, it will have a different connection id.
The first responsibility of the lambda is to make sure that all of the AWS client connections are in place; so we need to open neptune and dynamo.
var sender *client.Sender
var nepcli *neptunedata.Client
var dyncli *dynamodb.Client
func handleRequest(ctx context.Context, event events.APIGatewayWebsocketProxyRequest) error {
if nepcli == nil {
var err error
nepcli, err = neptune.OpenNeptune("user-stocks")
if err != nil {
log.Printf("could not open neptune")
return err
}
}
if dyncli == nil {
var err error
dyncli, err = dynamo.OpenDynamo()
if err != nil {
log.Printf("could not open dynamo")
return err
}
}
if sender == nil {
sender = client.NewSender(event.RequestContext.DomainName, event.RequestContext.Stage)
}
NEPTUNE_CONNECTING_PARTS:neptune/lambda/watch/main.go
Then we basically rewrite all of the code to respond to the user connection:case "user":
log.Printf("request for stocks for %s\n", request.Userid)
err := neptune.ConnectEndpoint(nepcli, request.Userid, event.RequestContext.ConnectionID)
if err != nil {
log.Printf("Failed to record connection id for %s in neptune: %v\n", request.Userid, err)
return err
}
stocks, err := neptune.FindWatchedStocks(nepcli, request.Userid)
if err != nil {
log.Printf("Failed to find stocks for %s: %v\n", request.Userid, err)
return err
}
quotes, err := dynamo.FindStockPrices(dyncli, "Stocks", stocks)
if err != nil {
log.Printf("Failed to find stock prices for %s (%v): %v\n", request.Userid, stocks, err)
return err
}
var payload []client.Quote
for t, p := range quotes {
payload = append(payload, client.Quote{Ticker: t, Price: p})
}
return sender.SendTo(event.RequestContext.ConnectionID, payload)
NEPTUNE_CONNECTING_PARTS:neptune/lambda/watch/main.go
Here, it first records the new connection in Neptune; then it asks Neptune to find all the stocks that are watched by this user; then it turns to Dynamo and finds the prices of all those stocks; it then constructs the payload by turning those into an appropriate list (there is an obvious refactoring to be done here to smooth the two data models); and then it sends that payload back to the newly opened connection.Wiring up the publisher
In the publisher, we have a somewhat different pattern. The client here is a single-shot: that is, it is invoked exactly once for each HTTP gateway request. But again, the lambda itself is long lived and needs to consider that it might be called multiple times. This being the case, it again needs to open the databases just the once.After that, it has two responsibilities: first it must update the dynamo database so that any clients which join after this will see the updated price, and then it must find and notify all the clients which are already connected.
The publisher starts off in exactly the same way as the watcher (which suggests a refactoring: move more code into internal, extract the (different) error handling, create a shared base class):
var sender *client.Sender
var nepcli *neptunedata.Client
var dyncli *dynamodb.Client
func handleRequest(ctx context.Context, event events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
if nepcli == nil {
var err error
nepcli, err = neptune.OpenNeptune("user-stocks")
if err != nil {
log.Printf("could not open neptune")
return events.APIGatewayV2HTTPResponse{StatusCode: 500, Body: "could not open neptune"}, err
}
}
if dyncli == nil {
var err error
dyncli, err = dynamo.OpenDynamo()
if err != nil {
log.Printf("could not open dynamo")
return events.APIGatewayV2HTTPResponse{StatusCode: 500, Body: "could not open dynamo"}, err
}
}
if sender == nil {
sender = client.NewSender("n2n2psybtd.execute-api.us-east-1.amazonaws.com", "development")
}
NEPTUNE_CONNECTING_PARTS:neptune/lambda/publish/main.go
The code to retrieve the information from the form and build the quotes is preserved, and then the rest of the handler is basically rewritten:updater, err := dynamo.NewUpdater(dyncli)
if err != nil {
log.Printf("could not create updater")
return events.APIGatewayV2HTTPResponse{StatusCode: 500, Body: "could not create updater"}, err
}
for _, q := range quotes {
s := model.Stock{Symbol: q.Ticker, Price: q.Price}
err := updater.Update("Stocks", s)
if err != nil {
log.Printf("could not update price for %s: %v\n", q.Ticker, err)
continue
}
conns, err := neptune.FindStockWatchers(nepcli, q.Ticker)
if err != nil {
log.Printf("could not recover watchers for %s: %v\n", q.Ticker, err)
continue
}
for _, conn := range conns {
log.Printf("have quotes %v; sending to %s\n", quotes, conn)
err := sender.SendTo(conn.ConnectionId, quotes)
NEPTUNE_CONNECTING_PARTS:neptune/lambda/publish/main.go
We create a new Updater, which is a new class based on Inserter (in fact, because of the semantics of DynamoDB it is in fact identical) which is designed to update an existing item.We then scan through all the quotes we have been asked to update, first notifying Dynamo of the new price, then asking Neptune for all the watchers for this stock.
Finally, we scan through the list of watchers and send the latest quotes package to all of them.
Cleaning up errors
For completeness, the lambda should handle the $disconnect route for when the client disconnects, but I can't really be bothered. But we really do need to handle the error case when we send to a client which reports that the connection has disappeared and call the neptune.DisconnectEndpoint method.err := sender.SendTo(conn.ConnectionId, quotes)
if err != nil {
log.Printf("Failed to send to connection id %s for %s in neptune: %v\n", conn.ConnectionId, conn.User, err)
err = neptune.DisconnectEndpoint(nepcli, conn.ConnectionId)
if err != nil {
log.Printf("Error disconnecting %s for %s: %v\n", conn.ConnectionId, conn.User, err)
}
}
}
}
NEPTUNE_CONNECTING_PARTS:neptune/lambda/publish/main.go
This ensures that we don't end up with an ever-increasing graph full of dead connections.Getting it to Work
Writing code is one thing; getting it to work, especially in a pernicious environment like AWS Lambdas within VPCs is a different matter. For me, the only realistic way to do this is to add lots of tracing so that you know where it is falling down. I'm not going to show you my tracing statements here, but they're in the repository under the tag NEPTUNE_GETTING_WORKING.When we try to run the code, it hangs and eventually we receive a message that the lambda has timed out. Adding the appropriate tracing leads us to the conclusion that it is timing out trying to find the db cluster endpoints. There are a handful of quick things to think when this happens: permissions and connectivity are the top two. On this occasion, I believe I have solved all my connectivity problems with IPv6 and the egress-only gateway. So, it must be permissions. And just thinking that makes me think, "oh, yeah, I don't have neptune permissions on my lambdas". When I was doing it before, by hand, I was doing everything as my admin user who has all the privileges.
Looking at the documentation, there are a number of managed policies I can apply to quickly get up to speed. So let's add some permissions and see what happens.
Specific things not covered in the managed policies
allow "rds:DescribeDBClusterEndpoints" "*"
policy "NeptuneFullAccess"
policy "AmazonDynamoDBFullAccess"
NEPTUNE_GETTING_WORKING:neptune/dply/infrastructure.dply
But this doesn't make any difference. I actually did this in two stages - I didn't realize that for some reason, in the long list of permissions included in the NeptuneFullAccess policy, rds:DescribeDBClusterEndpoints is nowhere to be seen so I added that separately.At which point, I go back to connectivity. It turns out that the calls to the neptune management API don't operate over IPv6 (what????), so our IPv6 egress-only gateway does not work to route traffic to the neptune management API (really????). I am getting so frustrated by all this. I am sure there are reasons for it, but I just cannot imagine what they are, especially when you have to put your neptune instances inside a VPC.
Well, at this point in life, I just want to get this done. This discovery is the "best" way of finding the endpoints, but we could also use configuration, or, even, hardcoding. That seems the easiest thing to get working, so I'll do that. Now, I run into the problem that the endpoint cannot be reached. There's a simple reason for that: I don't have an instance running. But I have a deployer rule to fix that, and in 20 minutes I can have an instance up and running.
Ok, now I can take one furhter step and interact with neptune. But my lambda still hangs when it attempts to connect to Dynamo. Aaargh. Aagain, it would seem Dynamo does not support IPv6, but a different approach seems to be applicable here, using a service gateway endpoint. These are free and it would seem available for some services and not others. Dynamo is one which is supported, so let's add that to our VPC.
And, as if by magic, I suddenly see stock prices in my watcher.
And Publishing
So now, let's see if the publisher works. Hopefully, the permissions will be shared because we are using the same role. And hopefully the connectivity issues have all been resolved, and the change we made to OpenNeptune above was refactored into common code.Lo and behold it works first time! In the browser window, the stock prices update.
On the other hand, the response to curl shows an error:
{"message":"Internal Server Error"}but on examining the log, that's because some of the connections dropped. That is probably a false error and should not be reported, but I cannot be bothered to tidy up that loose end at this point. I'm done: the one that is currently open worked and saw the updates. Yay! I sense an overwhelming sense of relief!
Conclusion
We have been able to build an end-to-end process for publishing stock prices to websocket listeners, using neptune to keep track of all the links.If it wasn't obvious, there are a whole host of sloppy corners in this code; for example, the combination of updating multiple stock prices at once, sending the entire packet to all subscribers, and having the web app just happily insert any "new" stocks it finds means that apps display ever more stocks that users aren't actually interested in. It is left as an exercise to the reader to (a) choose an appropriate solution to this problem and (b) implement it, all while (c) refactoring everything to remove duplication of quote objects and payloads. I also didn't circle back and sort out the green/red fade when stock prices change. It may or may not work for green; it won't work for red.
Final Thoughts
So if you really want to use a graph database, it's clear how to use Neptune to get some simple work done, and more complex queries are just extensions of that.Of course, in general, you don't want to "use a graph database", you want to solve a problem in the real world. And in this case I want to solve a problem identifying relationships.
I've shown how I can model that in Neptune, but I also know how to model it in a plain old relational database. These have a bad rap in cloud computing circles, but, having done what I've done here, I've come to the following conclusions:
- whether I use Neptune or Aurora, I need to use two databases:
- neither solution scales seamlessly across regions;
- both cost money to keep servers live continuously, and, indeed, Aurora seems cheaper;
- it seems that Neptune is, in fact, built on top of a relational database;
- the claim seems to be that Aurora has better IPv6 support.
No comments:
Post a Comment