Monday, July 21, 2025

Notifying Watchers


I think I've reached the point where I've actually done all the Neptune experiments that I really need to do.

Now I'm going to start moving on to building a dummy app that tests that I can do this for real. I'm not going to present all this code because there will be a lot of it and a lot of it is orthogonal to what I really want to do. But it feels that there is one thing that is still relevant to this discussion and that is being aware of connected users.

Before I go any further, I just want to make a few observations that I have noticed in getting to this point.

First off, I'm disappointed that when you choose "serverless" mode for your database, the only way of actually not having a server running is to shut down the database instance. It apparently knows when it is "idle", but still charges you. We're not talking vast sums of money here, but that is different to how (say) Dynamo works.

Secondly, in the introduction I said that I like the way relational databases model relationships but that "they don't scale". It would seem that Neptune is, in fact, built on top of a relational database, not from the ground up. I'm not sure, therefore, what does happen if you try and build a Neptune Cluster across regions or availability zones. I should at least research that if not experiment with that.

Thirdly, my reading of the documentation had led me to believe that I needed two engines: a "writer" and a "reader". That's a fallacy. You need a "primary" (which is both a writer and a reader) and then you can scale by adding more readers. You cannot add multiple writers. So I have shut down one of my two engines.

Tracking Connections

I'm planning on deploying my application to AWS using APIGateway and Lambda. APIGateway (v2) allows websocket connections and, when a websocket connects, provides you with a unique handle that you can later recover to send "unsolicited" messages on (as opposed to replies). I want to store that handle in the neptune graph. Each user can have multiple of these (they could be logged on from a computer and a phone, for instance). From my previous experience, this is just a string, but we will have a node type of Endpoint which could have multiple properties but for now we are just going to model a connectionId.

So I want to do three things:
  • Add a method to "connect" a user by adding a new Endpoint node with a given connectionId in Neptune.
  • Add a mehtod to "disconnect" a user by removing an existing Endpoint with a given connectionId from Neptune.
  • Update the FindWatchers logic to return a list of pairs (username, connectionId).
In the fulness of time, these will just come from the lambda code, but for now I'm going to add a new main() program endpoint which takes three arguments:
  • c to connect or d to disconnect;
  • a userid;
  • a connectionid.
Remember that this code is not expected to generate a unique id; in the case we are considering, that is given to us by APIGateway when the user connects.

Going back to curl, I came up with this to implement connecting:
$ curl https://user-stocks.cluster-ckgvna81hufy.us-east-1.neptune.amazonaws.com:8182/openCypher -d 'query=
  MATCH (u:User {username:$username})
  CREATE (e:Endpoint {connId:$connId})
  CREATE (u)-[r:endpoint]->(e) RETURN e, r'
-d 'parameters={"username": "user003", "connId": "xx-cc-3"}'
{
  "results": [{
      "e": {
        "~id": "83617f37-c299-4c43-9d56-58e7d97c6b12",
        "~entityType": "node",
        "~labels": ["Endpoint"],
        "~properties": {
          "connId": "xx-cc-3"
        }
      },
      "r": {
        "~id": "5e23d753-f4c2-4ca4-91be-0f366374ad0f",
        "~entityType": "relationship",
        "~start": "492f8845-e1e3-4592-8fb8-27fdeb9351e1",
        "~end": "83617f37-c299-4c43-9d56-58e7d97c6b12",
        "~type": "endpoint",
        "~properties": {}
      }
    }]
}
It took me a while to figure this out. I was trying to understand how I could "create" a node and a relationship at the same time; eventually I realized that this is a "program" and so I can do multiple steps: first, find the user node; then create the new endpoint node with its associated connection id; and then create a relationship between them. I opted to return the answers so that I can see that it worked; it's my expectation that if no user can be found, no new node will be created. I checked and this is correct:
$ curl https://user-stocks.cluster-ckgvna81hufy.us-east-1.neptune.amazonaws.com:8182/openCypher -d 'query=
  MATCH (u:User {username:$username})
  CREATE (e:Endpoint {connId:$connId})
  CREATE (u)-[r:endpoint]->(e) RETURN e, r'
-d 'parameters={"username": "userNOTFOUND", "connId": "xx-cc-3"}'
{"results":[]}
So in my code I can report if the user matched or not.

Showing Endpoints

Given we have forced an endpoint in there "by hand", I can now make the fairly simple updates to watchers to return the endpoint along with the user:
func FindStockWatchers(db string, stock string) ([]*Connection, error) {
    svc, err := openNeptune(db)
    if err != nil {
        return nil, err
    }
    query := `
    MATCH (u:User)-[r]->(s:Stock {symbol:$symbol})
    MATCH (u)-[]->(e:Endpoint)
    RETURN u.username, s.symbol, e.connId
    `
    params := fmt.Sprintf(`{"symbol": "%s"}`, stock)
    linkQuery := neptunedata.ExecuteOpenCypherQueryInput{OpenCypherQuery: aws.String(query), Parameters: aws.String(params)}
    out, err := svc.ExecuteOpenCypherQuery(context.TODO(), &linkQuery)
    if err != nil {
        return nil, err
    }

    results, err := unpack(out.Results)
    if err != nil {
        return nil, err
    }
    var ret []*Connection
    for _, m := range results {
        ret = append(ret, &Connection{User: m["u.username"].(string), ConnectionId: m["e.connId"].(string)})
    }

    return ret, nil
}

NEPTUNE_SHOW_ENDPOINTS:neptune/internal/neptune/findWatchers.go

I have highlighted the changes. It may be that it is possible to do all of the matching in one MATCH expression, but certainly my openCypher fu is not up to the task. Instead, it's possible to find all the users that have a stock, and then all the endpoints for "that" user. This should return all the endpoints for all users watching the stock (along with the user id). Because we are now returning a pair, we need to declare a struct for that:
package neptune

import "strings"

type Connection struct {
    User         string
    ConnectionId string
}

NEPTUNE_SHOW_ENDPOINTS:neptune/internal/neptune/connection.go

In the main function for watchers, we need to receive this list of connections and display them appropriately:
func main() {
    if len(os.Args) < 2 {
        log.Printf("Usage: watchers <stock>")
        return
    }
    stock := os.Args[1]
    watchers, err := neptune.FindStockWatchers("user-stocks", stock)
    if err != nil {
        panic(err)
    }
    if len(watchers) == 0 {
        fmt.Printf("no watchers found\n")
        return
    }
    slices.SortFunc(watchers, neptune.OrderConnection)
    curr := ""
    fmt.Printf("Stock %s watched by:\n", stock)
    for _, w := range watchers {
        if w.User != curr {
            fmt.Printf("  %s\n", w.User)
            curr = w.User
        }
        fmt.Printf("    connected at %s\n", w.ConnectionId)
    }
}

NEPTUNE_SHOW_ENDPOINTS:neptune/cmd/watchers/main.go

In order to sort the Connections, we need to provide a comparison function, which I've put in the same file as the Connection:
func OrderConnection(left, right *Connection) int {
    ret := strings.Compare(left.User, right.User)
    if ret != 0 {
        return ret
    }
    return strings.Compare(left.ConnectionId, right.ConnectionId)
}

NEPTUNE_SHOW_ENDPOINTS:neptune/internal/neptune/connection.go

And when we run it, we see this output:
Stock UPM6 watched by:
  user003
    connected at xx-cc-3

Deleting Endpoints

When the user disconnects, we will want to delete the associated endpoint. We should be able to find it by matching on connId and then "calling" $DELETE":
$ curl https://user-stocks.cluster-ckgvna81hufy.us-east-1.neptune.amazonaws.com:8182/openCypher -d 'query=
  MATCH (e:Endpoint {connId:$connId})
  DELETE (e)
  RETURN e'
-d 'parameters={"connId": "xx-cc-3"}'
{
  "code": "BadRequestException",
  "requestId": "4cf7d73a-332c-4cbf-be7c-88e1df36e933",
  "detailedMessage": "Cannot delete node, because it still has relationships. To delete this node, you must first delete its relationships.",
  "message": "Cannot delete node, because it still has relationships. To delete this node, you must first delete its relationships."
}
Unsurprisingly, we are not allowed to delete a node which still has relationships, and this is linked to the user node. However, according to the cheat sheet openCypher has a keyword DETACH to deal with this exact situation.
$ curl https://user-stocks.cluster-ckgvna81hufy.us-east-1.neptune.amazonaws.com:8182/openCypher -d 'query=
  MATCH (e:Endpoint {connId:$connId})
  DETACH DELETE (e)
  RETURN e'
-d 'parameters={"connId": "xx-cc-3"}'
{
  "results": [{
      "e": {
        "~id": "83617f37-c299-4c43-9d56-58e7d97c6b12",
        "~entityType": "node",
        "~labels": [],
        "~properties": {}
      }
    }]
}
It's interesting to me that this node shows up with the --id that it had when we created it, but its label and properties have been removed before returning it.

Fixing Watchers with no Endpoints

When we run the watchers program again, we see:
no watchers found
Now, I realize at this point that I have significantly changed the semantics of watchers here. When updating FindWatchers above, I was focused on getting the connection back, and that worked for the user with a connection, but now I notice that I am not seeing the watchers on here who are not connected. While that is fine if all I am interested in is notifying connected users, it's not what I intended to happen. (Yes, yes, regression tests, I know.)

The problem, of course, if that I have what in relational terms would be called an "inner" join, but I want a "left" join. Can I do that in openCypher? You betcha. Going back to the cheat sheet, there is a special section on OPTIONAL MATCH which is exactly what we want. If it's there, it's included. If it's not, null comes back in its place. Let's add that.

First, in the query portion:
func FindStockWatchers(db string, stock string) ([]*Connection, error) {
    svc, err := openNeptune(db)
    if err != nil {
        return nil, err
    }
    query := `
    MATCH (u:User)-[r]->(s:Stock {symbol:$symbol})
    OPTIONAL MATCH (u)-[]->(e:Endpoint)
    RETURN u.username, s.symbol, e.connId
    `
    params := fmt.Sprintf(`{"symbol": "%s"}`, stock)
    linkQuery := neptunedata.ExecuteOpenCypherQueryInput{OpenCypherQuery: aws.String(query), Parameters: aws.String(params)}
    out, err := svc.ExecuteOpenCypherQuery(context.TODO(), &linkQuery)
    if err != nil {
        return nil, err
    }

    results, err := unpack(out.Results)
    if err != nil {
        return nil, err
    }
    var ret []*Connection
    for _, m := range results {
        connId := ""
        cid := m["e.connId"]
        if cid != nil {
            connId = cid.(string)
        }
        ret = append(ret, &Connection{User: m["u.username"].(string), ConnectionId: connId})
    }

    return ret, nil
}

NEPTUNE_OPTIONAL_ENDPOINT:neptune/internal/neptune/findWatchers.go

My feeling is that this code is complicated by the way Go handles casting and empty strings, but it is just a question of adding steps and lines rather than explicit complexity. Anyway, the key thing is that when there is no endpoint, the map exists and contains u.username but there is no entry for e.connId, so it is impossible to cast it to string.

Much the same happens in the main routine:
func main() {
    if len(os.Args) < 2 {
        log.Printf("Usage: watchers <stock>")
        return
    }
    stock := os.Args[1]
    watchers, err := neptune.FindStockWatchers("user-stocks", stock)
    if err != nil {
        panic(err)
    }
    if len(watchers) == 0 {
        fmt.Printf("no watchers found\n")
        return
    }
    slices.SortFunc(watchers, neptune.OrderConnection)
    curr := ""
    fmt.Printf("Stock %s watched by:\n", stock)
    for _, w := range watchers {
        if w.User != curr {
            fmt.Printf("  %s\n", w.User)
            curr = w.User
        }
        if w.ConnectionId != "" {
            fmt.Printf("    connected at %s\n", w.ConnectionId)
        }
    }
}

NEPTUNE_OPTIONAL_ENDPOINT:neptune/cmd/watchers/main.go

Here, we may get multiple rows back for any given user, but we will get at least one for each user watching the stock. But we are not guaranteed that there will be a valid ConnectionId; but Go guarantees that in that case there will be an empty string there. So we need to look for that and guard against printing an invalid endpoint.

And, finally, we are back to:
Stock UPM6 watched by:
  user003
  user015
When we have our connection code working, we can check all the other cases also work.

Encoding Connection Updates

We may be further along than it appears, but what we don't have yet is any code to add and delete connections. Let's fix that, starting with the new main() program endpoint:
package main

import (
    "log"
    "os"

    "github.com/gmmapowell/ignorance/neptune/internal/neptune"
)

func main() {
    if len(os.Args) < 3 {
        log.Printf("Usage: endpoint c <user> <connId>")
        log.Printf(" or    endpoint d <connId>")
        return
    }
    command := os.Args[1]
    var err error
    switch command {
    case "c":
        if len(os.Args) != 4 {
            log.Printf("Usage: endpoint c <user> <connId>")
            return
        }
        watcher := os.Args[2]
        connId := os.Args[3]
        err = neptune.ConnectEndpoint("user-stocks", watcher, connId)
    case "d":
        connId := os.Args[2]
        err = neptune.DisconnectEndpoint("user-stocks", connId)
    default:
        log.Printf("Usage: the command must be 'c' or 'd'")
        return
    }
    if err != nil {
        panic(err)
    }
}

NEPTUNE_ENDPOINT_CONNECTOR:neptune/cmd/endpoint/main.go

This is basically just a lot of arguments processing, but it comes down to calling one of two methods ConnectEndpoint and DisconnectEndpoint, which I have put in the same file.

Here is ConnectEndpoint, which should be fairly familiar:
package neptune

import (
    "context"
    "fmt"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/service/neptunedata"
)

func ConnectEndpoint(db string, watcher string, connId string) error {
    svc, err := openNeptune(db)
    if err != nil {
        return err
    }

    program := `
        MATCH (u:User {username:$username})
        CREATE (e:Endpoint {connId:$connId})
        CREATE (u)-[r:endpoint]->(e)
        RETURN e, r
`
    params := fmt.Sprintf(`{"username": "%s", "connId":"%s"}`, watcher, connId)
    linkQuery := neptunedata.ExecuteOpenCypherQueryInput{OpenCypherQuery: aws.String(program), Parameters: aws.String(params)}
    out, err := svc.ExecuteOpenCypherQuery(context.TODO(), &linkQuery)
    if err != nil {
        return err
    }

    ret, err := unpack(out.Results)
    if err != nil {
        return err
    }

    if len(ret) == 0 {
        return fmt.Errorf("no user found to connect: %s", watcher)
    }

    return nil
}

NEPTUNE_ENDPOINT_CONNECTOR:neptune/internal/neptune/endpoints.go

This is just our usual query infrastructure with the CREATE query we figured out in curl above, and processing to make sure that at least one row of results is returned, otherwise it generates an error that it (presumably) couldn't find the user.

And DisconnectEndpoint:
func DisconnectEndpoint(db string, connId string) error {
    svc, err := openNeptune(db)
    if err != nil {
        return err
    }

    program := `
        MATCH (e:Endpoint {connId:$connId})
        DETACH DELETE (e)
        RETURN e
`
    params := fmt.Sprintf(`{"connId":"%s"}`, connId)
    linkQuery := neptunedata.ExecuteOpenCypherQueryInput{OpenCypherQuery: aws.String(program), Parameters: aws.String(params)}
    out, err := svc.ExecuteOpenCypherQuery(context.TODO(), &linkQuery)
    if err != nil {
        return err
    }

    ret, err := unpack(out.Results)
    if err != nil {
        return err
    }

    if len(ret) == 0 {
        return fmt.Errorf("no connectionId found to disconnect: %s", connId)
    }

    return nil
}

NEPTUNE_ENDPOINT_CONNECTOR:neptune/internal/neptune/endpoints.go

This is the same thing again but with our other (DETACH DELETE) query.

Using this to create an endpoint, we see no feedback because it is written to only provide error feedback:
$ cmd/endpoint/endpoint c user003 xx-cc-3
But we can check that it connected successfully by running our watchers program:
Stock UPM6 watched by:
  user003
    connected at xx-cc-3
  user015
Which also confirms that that handles both the connected and unconnected cases.

And we can likewise run the disconnect version of endpoint and see it return to no endpoints:
$ cmd/endpoint/endpoint d xx-cc-3
$ cmd/watchers/watchers UPM6
Stock UPM6 watched by:
  user003
  user015

Conclusion

We have successfully managed to navigate the processes associated with having endpoint relationships: creating and deleting them, along with finding the endpoints associated with the users watching a stock. This is basically all the (database) code we need in order to implement a stock watching webapp.

Building such a webapp obviously requires a bunch of other code, mainly around wrapping in a lambda and building out a JavaScript client; and then requires more infrastructure in my deployer. I want to do this, but I'm not sure I've got the energy. On the other hand, I do need to implement a lot of that code in my deployer, so it's as good a testbed as anything.

No comments:

Post a Comment