Thursday, July 17, 2025

Populating the Tables


Before we can do anything else, we need to start populating the dynamo tables and then linking random things together. None of this code is that hard, but obviously it requires us to be able to use all the APIs. I've worked with DynamoDB before (but probably not in Go), but I've never so much as touched Neptune.

Let's get started!

Populating and Cleaning DynamoDB

I'm going to create two scripts, create and clean which set up the initial data and clean up all the data respectively.

Because I've done it before, I'm going to start by writing code to generate some random ticker symbols (with random prices) and some unique usernames, and put all that in DynamoDB. Then I'll add code to delete all of that. Then we can come back and do the linking in Neptune.

I'm feeling bored today, so I'm going to do this slowly and in steps. Step 1 is just to try and get one record inserted into Dynamo, basically just getting used to the API. I'm going to do this straight from the cmd:
package main

import (
    "context"
    "log"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

type Stock struct {
    Symbol string
    Price  int
}

func main() {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatal(err)
    }
    svc := dynamodb.NewFromConfig(cfg)

    tableName := "Stocks"

    stock := Stock{
        Symbol: "HWX2",
        Price:  1195,
    }

    av, err := attributevalue.MarshalMap(stock)
    if err != nil {
        log.Fatalf("Got error marshalling new movie item: %s", err)
    }
    input := &dynamodb.PutItemInput{
        Item:      av,
        TableName: aws.String(tableName),
    }

    _, err = svc.PutItem(context.TODO(), input)
    if err != nil {
        log.Fatalf("Got error calling PutItem: %s", err)
    }
}

NEPTUNE_INSERT_1:neptune/cmd/create/main.go

This feels easier than previous experiences I have had in Java, mainly on account of that MarshalMap. Even so, I want to refactor. To keep things simple, I'll do my first bout of refactoring in the cmd script.
type Stock struct {
    Symbol string
    Price  int
}

type Inserter struct {
    svc *dynamodb.Client
}

func (ins *Inserter) Insert(table string, item any) error {
    av, err := attributevalue.MarshalMap(item)
    if err != nil {
        return err
    }

    input := &dynamodb.PutItemInput{
        Item:      av,
        TableName: aws.String(table),
    }

    _, err = ins.svc.PutItem(context.TODO(), input)
    return err
}

func NewInserter() *Inserter {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatal(err)
    }
    return &Inserter{svc: dynamodb.NewFromConfig(cfg)}
}

func main() {
    inserter := NewInserter()

    inserter.Insert("Stocks", Stock{
        Symbol: "HWX2",
        Price:  1195,
    })
}

NEPTUNE_DDB_REFACTOR:neptune/cmd/create/main.go

So now I have created an Inserter which can (try to) insert any kind of struct into any table. As you can see, the main() function is now very short and straightforward.

Turning to our other program, we want to delete all the records in the table. This is slightly more tricky, since it involves first scanning the table, then deleting everything we find. And scanning is the sort of operation that can "pause" halfway through, so you may have to issue another scan request with a continuation token. On the other hand, since I started by copying the other one, it's already as factored.
package main

import (
    "context"
    "log"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

type Cleaner struct {
    svc *dynamodb.Client
}

func (c *Cleaner) Clean(table string, keys ...string) error {
    for {
        scanner := dynamodb.ScanInput{TableName: &table}
        result, err := c.svc.Scan(context.TODO(), &scanner)
        if err != nil {
            return err
        }
        for _, it := range result.Items {
            key := make(map[string]types.AttributeValue)
            for _, k := range keys {
                key[k] = it[k]
            }
            di := dynamodb.DeleteItemInput{TableName: &table, Key: key}
            _, err = c.svc.DeleteItem(context.TODO(), &di)
            if err != nil {
                return err
            }
        }
        if result.LastEvaluatedKey == nil {
            break
        }
    }
    return nil
}

func NewCleaner() *Cleaner {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        log.Fatal(err)
    }
    return &Cleaner{svc: dynamodb.NewFromConfig(cfg)}
}

func main() {
    c := NewCleaner()
    c.Clean("Stocks", "Symbol")
}

NEPTUNE_CLEAN_DYNAMO:neptune/cmd/clean/main.go

And then we can move it all into its own package. This reduces the main() function to more than I really want, but at least something reasonably sized:
type Stock struct {
    Symbol string
    Price  int
}

func main() {
    inserter, err := dynamo.NewInserter()
    if err != nil {
        log.Fatal(err)
    }
    
    inserter.Insert("Stocks", Stock{
        Symbol: "HWX2",
        Price:  1195,
    })
}

NEPTUNE_DYNAMO_PACKAGE:neptune/cmd/create/main.go

And puts the Inserter (and Cleaner, not shown) into its own file:
package dynamo

import (
    "context"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

type Inserter struct {
    svc *dynamodb.Client
}

func (ins *Inserter) Insert(table string, item any) error {
    av, err := attributevalue.MarshalMap(item)
    if err != nil {
        return err
    }

    input := &dynamodb.PutItemInput{
        Item:      av,
        TableName: aws.String(table),
    }

    _, err = ins.svc.PutItem(context.TODO(), input)
    return err
}

func NewInserter() (*Inserter, error) {
    svc, err := openDynamo()
    if err != nil {
        return nil, err
    } else {
        return &Inserter{svc: svc}, nil
    }
}

NEPTUNE_DYNAMO_PACKAGE:neptune/internal/dynamo/insert.go

The code to connect to DynamoDB and build a service instance is extracted and shared in base.go:
package dynamo

import (
    "context"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

func openDynamo() (*dynamodb.Client, error) {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        return nil, err
    }
    return dynamodb.NewFromConfig(cfg), nil
}

NEPTUNE_DYNAMO_PACKAGE:neptune/internal/dynamo/base.go

OK, it's starting to feel like I'm ignoring Neptune - which is, after all, the purpose of this whole exercise.

Creating Records in Neptune

Unlike Dynamo, which for all its sharp corners is a simple API, Neptune is a conglomeration of different bits and pieces bolted together. For starters, it has two different data models you can use (property graphs and RDF; sorry, you have to scroll down). There are then three different query languages, two for property graphs (Gremlin and openCypher) and one (SparQL) for RDF graphs. Choosing between these is somewhat complicated, but I'm fairly certain that I want to go with property graphs and openCypher.

This then seems to want you to use a third party library, of which neo4j seems to be the "standard", but then there are all kinds of warnings about how the language it supports is not entirely consistent with the openCypher standard. This then needs to be used with custom Neptune endpoints, which I don't think we even have yet. However, digging through the sample code, "they" use something else called neptunedata and, with a bit more digging, I managed to find the Go implementation of that. But as that is the only use of it I found, I'm pretty much on my own. Nothing new there: that's why we're here and not in production code.

So, let's get started.

Remember that with Neptune, we need to build a graph and that means inserting both nodes and edges. We want one node in Neptune for every Item in DynamoDB, and then we want an edge for every relationship (yet to be done, partly because we don't have any users yet).

So let's add code to create/main.go to insert a node into Neptune. The basic mechanism for interacting with openCypher is much the same as it is with a relational database using SQL: everything is a query and you need to know the precise syntax of the query language. AWS provide a "cheat sheet" (i.e. documentation in their github repo) and we will be using this, e.g. to know how to create a node. In order to get this to work, we need to use parameters, which are one more poorly explained feature (in the API), so we'll have to guess.

In summary, then, I've added this code to implement the NodeCreator which roughly corresponds to the Inserter we already had for Dynamo.
type NodeCreator struct {
    svc *neptunedata.Client
}

func (nc *NodeCreator) Insert(label string) error {
    create := "CREATE (a:stock) {Symbol: $symbol}"
    params := fmt.Sprintf(`{"name": "%s"}`, label)
    insertQuery := neptunedata.ExecuteOpenCypherQueryInput{OpenCypherQuery: aws.String(create), Parameters: aws.String(params)}
    _, err := nc.svc.ExecuteOpenCypherQuery(context.TODO(), &insertQuery)
    return err
}

func NewNodeCreator() (*NodeCreator, error) {
    svc, err := openNeptune()
    if err != nil {
        return nil, err
    } else {
        return &NodeCreator{svc: svc}, nil
    }
}

func openNeptune() (*neptunedata.Client, error) {
    cfg, err := config.LoadDefaultConfig(context.TODO())
    if err != nil {
        return nil, err
    }
    return neptunedata.NewFromConfig(cfg), nil
}

NEPTUNE_NODE_CREATOR:neptune/cmd/create/main.go

And then we can use this from main():
    nodeCreator, err := NewNodeCreator()
    if err != nil {
        log.Fatal(err)
    }

    err = nodeCreator.Insert("HWX2")
    if err != nil {
        log.Fatal(err)
    }

NEPTUNE_NODE_CREATOR:neptune/cmd/create/main.go

Does this work? Of course not. The better question is, what is the exact error message and where should we direct our attention first?
2025/07/11 07:44:06 operation error neptunedata: ExecuteOpenCypherQuery, https response error StatusCode: 0, RequestID: , request send failed, Post "https://neptune-db.us-east-1.amazonaws.com/opencypher": dial tcp: lookup neptune-db.us-east-1.amazonaws.com: no such host
In other words, it can't find a neptune instance to talk to. Given that I haven't specifically created an instance, nor have I specifically created an endpoint, the fact that there isn't anything to talk to doesn't surprise me. On top of that, even if one or the other does exist, I know it's in a VPC and I haven't done anything to connect to that. In short, we have a lot of work still to do. But we have at least written some code.

Conclusion

We have managed to write some code to populate the tables. The Dynamo code even works and is in quite good shape.

On the other hand, the Neptune code does not even work and still needs refactoring and extracting.

No comments:

Post a Comment