Sunday, January 26, 2025

Configuring Nodes

It is starting to feel (to me, at least) as if we are making progress towards having an actual blockchain.

The biggest obstacle remaining is that the nodes are all operating independently, and none of them are checking each other's work, let alone certifying it in a non-repudiatable fashion.

The next three posts are going to tackle that, as we divide the work up:
  • In this post, we are going to improve configurability, so that nodes are at least aware of each other's existence;
  • In the next post, we will handle the communication by making sure that each node sends and receives the transactions and blocks it is processing;
  • In the third post, we will check and store all the incoming messages.
We still have quite a bit of work to do after that, but don't panic! We'll take it one step at a time.

The Current State

As it currently stands, node configuration is a mess. This is mainly because I haven't thought about it very much. Or rather, because the products of my thoughts have not yet made it into the code. So let's start by reviewing my thoughts.

There are three different ways in which I visualize a node starting:
  • As a chainledger node from the command line;
  • As a node within a harness test;
  • As a lambda function (or equivalent) launched in an AWS environment.
Now, we'll come back to thinking about the third one later, but clearly these three are significantly different from each other. And want to be configured differently. For now, it's reasonable to assume that configuring a node on the command line would be most easily done by reading a (JSON?) file. Within the harness, I would expect the harness to have information about all the nodes, and to be able to slice and dice this to give each node the information it specifically needs about the world. In the AWS universe, it is possible to read from a file in S3, a table in DynamoDB, parameters in the parameter store, etc. (we will probably use some combination of all these).

Clearly, however, from the perspective of a node starting up, it wants an interface that can hide all of this.

In order to run properly, each node needs to know at least:
  • It's own name, which is the base url other nodes will use to communicate with it;
  • A port to listen on;
  • A private signing key;
As well as information about all the other nodes, specifically, a record for each remote node, detailing:
  • A URL to communicate with it ;
  • A public key to check the signatures on the records they send.
This is not at all what we have at the moment. To review:
  • In config/config.go, we have a NodeConfig, but it's a struct, not an interface:
type NodeConfig struct {
    Name     *url.URL
    ListenOn string
    NodeKey  *rsa.PrivateKey
}

STORE_BLOCK:internal/config/config.go

  • In chainledger/main.go, we just hardcode in a couple of values and call ReadNodeConfig (inside Start()) to generate a random key:
func main() {
    url, _ := url.Parse("https://localhost:5001")
    node := clienthandler.NewListenerNode(url, ":5001")
    node.Start()
}

STORE_BLOCK:cmd/chainledger/main.go

  • In the harness, we read the same couple of values for each node from the configuration file, and likewise call ReadNodeConfig inside Start.
func (nc *NodeConfig) UnmarshalJSON(bs []byte) error {
    var wire struct {
        Name     string
        ListenOn string
    }
    if err := json.Unmarshal(bs, &wire); err != nil {
        return err
    }
    if url, err := url.Parse(wire.Name); err == nil {
        nc.Name = url
    } else {
        return err
    }
    nc.ListenOn = wire.ListenOn

    return nil
}

STORE_BLOCK:internal/config/config.go

  • And ReadNodeConfig doesn't do any reading, but generates a new key pair:
func ReadNodeConfig(name *url.URL, addr string) (*NodeConfig, error) {
    pk, err := rsa.GenerateKey(rand.Reader, 2048)
    if err != nil {
        return nil, err
    }

    return &NodeConfig{Name: name, ListenOn: addr, NodeKey: pk}, nil
}

STORE_BLOCK:internal/config/config.go

  • The common code to run a node (ListenerNode) is created in NewListenerNode:
func NewListenerNode(name *url.URL, addr string) Node {
    return &ListenerNode{name: name, addr: addr, Control: make(types.PingBack)}
}

STORE_BLOCK:internal/clienthandler/node.go

  • And then Start looks like this:
func (node *ListenerNode) Start() {
    log.Printf("starting chainledger node %s\n", node.name)
    clock := &helpers.ClockLive{}
    hasher := &helpers.SHA512Factory{}
    signer := &helpers.RSASigner{}
    config, err := config.ReadNodeConfig(node.name, node.addr)
    if err != nil {
        fmt.Printf("error reading config: %s\n", err)
        return
    }
    pending := storage.NewMemoryPendingStorage()
    resolver := NewResolver(clock, hasher, signer, config.NodeKey, pending)
    node.journaller = storage.NewJournaller(node.name.String())
    node.runBlockBuilder(clock, node.journaller, config)
    node.startAPIListener(resolver, node.journaller)
}

STORE_BLOCK:internal/clienthandler/node.go

So let's rip it up and start again.

The Interfaces

Naming things is one of the two irreducibly hard problems of computer science. I'm not very happy with the names I've come up with in this section, so don't be surprised if I change them at some point.

We have two different "sorts" of nodes: ones that we want to launch, and ones that we want to talk to. Obviously, each node fills both roles "in the real world", but within the code of a single node, the node that is running is qualitatively different from the others. I have distinguished these as just a plain NodeConfig and a LaunchableNodeConfig if it has the additional properties needed to be launched. When we try and initialize a ListenerNode, we will pass it a LaunchableNodeConfig which, in turn, contains a list of the OtherNodes we want to talk to, each of which is a NodeConfig. Clear?
type NodeConfig interface {
    Name() *url.URL
    PublicKey() *rsa.PublicKey
}

type LaunchableNodeConfig interface {
    NodeConfig
    ListenOn() string
    PrivateKey() *rsa.PrivateKey
    OtherNodes() []NodeConfig
}

REDO_CONFIG:internal/config/config.go

Here the use of NodeConfig in LaunchableNodeConfig is approximately equivalent to saying that the interface LaunchableNodeConfig extends the interface NodeConfig. Or, in other words, all the methods that are declared in NodeConfig are also to be declared in LaunchableNodeConfig.

The rest of the code that was in config.go has been thrown away. We are going to do things differently.

Using the Configuration

Now that we have an interface, we can rework the code in Start. Firstly, here is the new code for NewListenerNode:
func NewListenerNode(config config.LaunchableNodeConfig) Node {
    return &ListenerNode{config: config, Control: make(types.PingBack)}
}

REDO_CONFIG:internal/clienthandler/node.go

Instead of taking a name and an address, this now takes a LaunchableNodeConfig, which has all the information we need to start a node. It stores this in the ListenerNode struct:
type ListenerNode struct {
    config     config.LaunchableNodeConfig
    Control    types.PingBack
    server     *http.Server
    journaller storage.Journaller
}

REDO_CONFIG:internal/clienthandler/node.go

And inside Start we now have everything we need to pass around:
func (node *ListenerNode) Start() {
    log.Printf("starting chainledger node %s\n", node.Name())
    clock := &helpers.ClockLive{}
    hasher := &helpers.SHA512Factory{}
    signer := &helpers.RSASigner{}
    pending := storage.NewMemoryPendingStorage()
    resolver := NewResolver(clock, hasher, signer, node.config.PrivateKey(), pending)
    node.journaller = storage.NewJournaller(node.Name())
    node.runBlockBuilder(clock, node.journaller, node.config)
    node.startAPIListener(resolver, node.journaller)
}

REDO_CONFIG:internal/clienthandler/node.go

So we need to update the chainledger and harness commands to provide this.

The chainledger command

The simpler of the two is the chainledger command, but it is also the more dramatic of the two, because it is such a big change.

The chainledger command now takes an argument and reads its configuration from that file:
func main() {
    if len(os.Args) < 2 {
        fmt.Println("Usage: chainledger <config>")
        return
    }
    config := config.ReadNodeConfig(os.Args[1])
    node := clienthandler.NewListenerNode(config)
    node.Start()
}

REDO_CONFIG:cmd/chainledger/main.go

Although ReadNodeConfig has the same name as a function we used to have, it is a completely new function in a completely new file in the config directory:
package config

import (
    "crypto/rsa"
    "crypto/x509"
    "encoding/base64"
    "encoding/json"

    "io"
    "net/url"
    "os"
)

type NodeJsonConfig struct {
    Name       string
    ListenOn   string
    PrivateKey string
    PublicKey  string
    OtherNodes []NodeJsonConfig
}

type NodeConfigWrapper struct {
    config  NodeJsonConfig
    url     *url.URL
    private *rsa.PrivateKey
    public  *rsa.PublicKey
    others  []NodeConfig
}

// ListenOn implements LaunchableNodeConfig.
func (n *NodeConfigWrapper) ListenOn() string {
    return n.config.ListenOn
}

// Name implements LaunchableNodeConfig.
func (n *NodeConfigWrapper) Name() *url.URL {
    return n.url
}

// OtherNodes implements LaunchableNodeConfig.
func (n *NodeConfigWrapper) OtherNodes() []NodeConfig {
    return n.others
}

// PrivateKey implements LaunchableNodeConfig.
func (n *NodeConfigWrapper) PrivateKey() *rsa.PrivateKey {
    return n.private
}

// PublicKey implements LaunchableNodeConfig.
func (n *NodeConfigWrapper) PublicKey() *rsa.PublicKey {
    return n.public
}

func ReadNodeConfig(file string) LaunchableNodeConfig {
    fd, err := os.Open(file)
    if err != nil {
        panic(err)
    }
    defer fd.Close()

    bytes, _ := io.ReadAll(fd)
    var config NodeJsonConfig
    json.Unmarshal(bytes, &config)

    url, err := url.Parse(config.Name)
    if err != nil {
        panic("cannot parse url " + config.Name)
    }

    pkbs, err := base64.StdEncoding.DecodeString(config.PrivateKey)
    if err != nil {
        panic("cannot parse base64 private key " + config.PrivateKey)
    }
    pk, err := x509.ParsePKCS1PrivateKey(pkbs)
    if err != nil {
        panic("cannot parse private key after conversion from " + config.PrivateKey)
    }

    others := make([]NodeConfig, len(config.OtherNodes))
    for i, json := range config.OtherNodes {
        bs, err := base64.StdEncoding.DecodeString(json.PublicKey)
        if err != nil {
            panic("cannot parse base64 public key " + json.PublicKey)
        }
        pub, err := x509.ParsePKCS1PublicKey(bs)
        if err != nil {
            panic("cannot parse public key after conversion from " + json.PublicKey)
        }

        others[i] = &NodeConfigWrapper{config: json, public: pub}
    }
    return &NodeConfigWrapper{config: config, url: url, private: pk, public: &pk.PublicKey, others: others}
}

REDO_CONFIG:internal/config/node_json_config.go

There's a lot going on here, so let's take it slowly.

Because we want to parse a JSON file using the standard json.Unmarshal code provided by Go, we declare a type NodeJsonConfig which exactly matches the JSON input (in terms of strings and nested NodeJsonConfig objects for the remote nodes). We then separately define a NodeConfigWrapper which stores both the underlying configuration and the "parsed" versions of url, private key and public key.

In ReadNodeConfig, the first few lines open the file (checking for errors and deferring the closing of it):
    fd, err := os.Open(file)
    if err != nil {
        panic(err)
    }
    defer fd.Close()
The next few read the file and then parse out the JSON into a NodeJsonConfig struct:
    bytes, _ := io.ReadAll(fd)
    var config NodeJsonConfig
    json.Unmarshal(bytes, &config)
And then the rest of the code turns the various string values (including the nested string values inside the OtherNodes) into the appropriate internal objects. The final line builds up a NodeConfigWrapper object, which can then handle all the config requests easily:
    return &NodeConfigWrapper{config: config, url: url, private: pk, public: &pk.PublicKey, others: others}
We also need to provide an actual JSON configuration file (or two). Here's the first one, for node "5001":
{
        "name": "http://localhost:5001",
        "listenOn": ":5001",
        "privateKey": "MIIEowIBAAKCAQEArcZdpZM6fipqtMMmts3xkD7s6PQWNnF0KCYEESRIebSFX0fKVC8urvF6wkf4EMFDT36bDtzg3Lh/fxaCYadxaxxs36M1MpYRoBi9CX/VyIIiwpej7Zccm2cfGSAghy48ArAX2SPZS0EGEjTNBuVSh+gkFsy3rQkmQs8/XFR5C9iPhpzUCkqhue6k9euyfN14YoOdEB1xlfp42YEXISuhWoMNyN8Qb4qk39JxxsYE7YBxUbIN6gB7Hi8eAoI6bbcITUifGP4Ax0t/O9YnO/kL6h+hEECK4izQU8kKvVE4jNoBScBwfQChD48vFgNdcDAs+4cwyJkMebV8FXplRIKMDwIDAQABAoIBAQCWE/NcxEKII+n0I3aT+ljd0vqYVfW5H1LKOcrZYxSUx6tIFqBPBFC1FiiHEdDT55VSWm1f8LLi7RRvlekUnZ/+eZYtrq6K+cBPHA5m3disSnfqxzv0PcWfEPhyoqR1GyEI0TxHdAZ+T7IGl0Na6ULVzU8dwb//2R8KJCL8gpfn+bui99835zfTCJj0NTTgfb78VpiSASZ2UJK03YZ0w3n+RiuehBErPVe8mxpdQrucgavIvZvCgnifF25FmWHoH3thEM7Mcvauug3qhZeAEeU+4OhNd8kYX5N5AmtvjDWbGzgJMbZZ7/VxIOR+cv9PPwxXK31vGLC3O0pFMXCW0AhBAoGBANtKtHF7vyV1QCibTHf+kRCDty6cNua4HlMy7ICyy7Lnoz/e0Y6FR8JIxTtyr6+wsd0IsefC+X/UTw+8HE2kSgTyqEls/EW07ot1b8AtpY4OR3rbQ1tYGYtM1nacK2goh0E0Hzj+snx6OHxeOe3GK6S3ydqDxvHzlQaGsFUr4DrdAoGBAMrdIGx44LxNpzshD90TzD7K1gi8zZLUWRW9zzC1UoNM/TaSmRsVgfCe2r2U58v1t9rCm5w4L7XYS1D5nigfEQSndi5jhdLM9Zco/B8rYBiBLRzFN458GvWZ5jhJRu9eRt4rXkdl0QW2tSYAl0Jsnw6M8Kkwtgafi7y6oJH1Y2XbAoGAPnU2k6P1O0v77BTfYMXmt1dskx/3GxuRt4yng7hpABmti4GBGiCn4ZQsaNQvadDft97EHQiRW3Ey235uaUbDtkkO2WrrJ0dzMdFO9OOLZbx3a2yL8LZVADHwW3P7gP0aGN4pjmgsmfuNnw6PXUO2JoIaQdyKi1sfNO6jxn5qrRkCgYAoZe6+Czhd52zlFoltMjMbUhNbfBXIJqdy7/Cht4ouAZfvVTROM3ND8q6G0G90q4Moela4vmup39/nyT3YqY8fCSY8yK7ussg5iPzkTCP/3UGZmCCfLFHGFRbGoLkSlAiy15oXx8vfQmpCnh2BKdZm9GQ8nSmymfUe6V9ukZpwvwKBgBuEiNLS2TE8E4V6bhP9mMjDFP3hhIm9BngnJECCW7RdPIiNpwqd4LngUeM5xbRcOoxPBMzXtfJSazFDJcO0mXZgqTDr4cwSon3fVhboyim4JFHWIo5fgnRoH5m6Ty11SzaT73pLsa8g83VECBgR/oWSQqn2EoqYs4xjbCtBBjqX",
        "otherNodes": [
            {"name": "http://localhost:5002", "publicKey": "MIIBCgKCAQEArRX2JO4Shwb1dsw6/3vIV7aTDWWjEHvI8sYsV3qcRt6pQGMlmLu8+h5Wn76iuM5+TIfTJu8Ct3x/xeD0DrGWgjjTsb8ehMnkzviU+qKOWkeDzqmxRWZNlfayZRxl4gAC8JShQA8mGTs2im8EcJTFP6FsX7aBBpIXiM0C7JHKnmmYGhHJixHl4fPxdnfeunqgJWNuQNZ0sYgcdQcwgZoAAcZUVbLUOLKvkT4odovQLo7knVlfa+2rDt6hJ00v5Q17OCedNyYD16Rp7JBGeV8d9M7ZD7+/gFKzRfSfFONiNO0wXJo4LtgVFMZ3Jr3Z493uOb/po4IR+Ui+ij8YdECsXQIDAQAB"}
        ]
}

REDO_CONFIG:config/nodes/node5001.json

Where did those private and public keys come from? Well, I created a new command called keypair:
package main

import (
    "crypto/rand"
    "crypto/rsa"
    "crypto/x509"
    "encoding/base64"
    "fmt"
)

func main() {
    pk, err := rsa.GenerateKey(rand.Reader, 2048)
    if err != nil {
        panic("could not generate key")
    }
    fmt.Printf("private key: %s\n", base64.StdEncoding.EncodeToString(x509.MarshalPKCS1PrivateKey(pk)))
    fmt.Printf("public  key: %s\n", base64.StdEncoding.EncodeToString(x509.MarshalPKCS1PublicKey(&pk.PublicKey)))
}

REDO_CONFIG:cmd/keypair/main.go

This command generates a new keypair every time it is run, so that you can initialize all the different nodes. This is a tedious business, but in reality you have to do something like this.
private key: MIIEpAIBAAKCAQEAwODwdGwL7jGUG7yuuTvRjYz3fQZUrulj5ULzfPKVR01IFQv7GYdWuaRVJtKd6dTDSYRge0tcUp4X9yFiIMBRWVZAOInRHUE8ISN2eGLuznQdRK73U8Dr31l91rrVcp3WGw594Bksmvb47oDZM6Zmywiv5niH0IQ3CHu8BD7fXwo4GTQ4kC7UAVks2HikKUpInsW+tR+yPgajfNsOPM/iiRVG/wTNRnRoRLi0u7+WH14fBTQdA6uvAIxik+dNtQT291hPWL7QcSK5WnNTjqfHwYAmtHTplN2XkLRc0G7YD7l4ZD4EqAFFXK8vmkdJSfIHxmHZmrd1k0WOxDOcsKywOQIDAQABAoIBAFd8L9S+xVKPDlzeYmoGZfBMhl0hJ/wGRJdSnNqJtYgX16AkRQq5Rm8ByNXJJnNPXBzWfGSwM/oNV1VywO2WDc/1vT9n03/vfPSS/0NvrF3ccQIcUnacxOAT2W4yZGqOiPTQx+uDv6WybArSSrKQwYNKN27UMNY1gjjI1ukeE3cpg50V/FrAXZEUUcvh9VS8ixbF9VwCV2b0KwghZjyWQxxdoeCyFssRahfNUyzWSs2ZCN0zFmpQLdNq6/cx94ofnEUZDHuOoSVLCMZMvPcYeg/YlueIU3Z9giHRPw3t7eMJhw1C+7HhwsqfOUlmVWwvkgfYN8fXlsyXgLU627bRmLkCgYEA0zBfjWw6DvW3sg0yka0l3Qiz0TXwtypbhVFydr8JQfKfVIjop0adwzWf9a2vAhJWbmMt+edtOx+FV0AUViP0sT+yhTnq092nS+gGEQuXTqbODW6XKXwnSHS55qo+/FQ8XDiSsuqBkx25BFQvVDWX5JYCjA/m8tv14yQuUOaWgFsCgYEA6c32+8batQPCqaijbtV779IRwRxjrIqjXf/CYJBs118uvbf98SVvxe3XmMgoP0aCArzTLLn53ZYHIrvbhWNbwvs0t7y6We5Ep5nSfdDIo5w7TGjsc3osnXv0qrB8rcmm4J7YfVOgsQ3fhTHgOw5Zf5Dx6qirSLirV6tJ9b2+NfsCgYEAuSyZG+/hmGxrfXuE86bWpFCVGsQpJPHG/cbEjspC28hZXE4PcVzByAClGU4JPc/GaVQdZBo/9K9Ww4I0UrOEQkaPybFW7h5UKoJvj1KSgSxRUAXAFWf/KdDvkAmG4Mkbg+E3ABoPM2fEar9GIJg9bvj5ksX+wsOLfnajBdyp6jECgYAF+qpyTeeR8YKs7A8h6nu86lZh5eP2qaT75mqGJati5qA/YdEwtZBiM27sDVJaK+dvQnz0C92D+S49iShYBO530gzLFhx96EYBM0HazdgTtw8dKSHC4kD51g2vv8uwdhO6ctV+fwEBBiXNNjVRzVAknwRQx/d5aJ+ZIlxF2JBguQKBgQCsWwfln+HK4jWftaJTUbzIQ8dQmogIYTuF3aSc9OeB5222VNO8IeeYttvxI4U1xv3g+JSOIFnTJ+QdX+4LitdiKlqPy8lYV9DcpLLgZQhOTLgilXtcPVhTddSwhcEni9N1UZLu+HO1DF39UN8OYkh0fAmy/CEWCoUaImOwbYZVIg==
public key: MIIBCgKCAQEAwODwdGwL7jGUG7yuuTvRjYz3fQZUrulj5ULzfPKVR01IFQv7GYdWuaRVJtKd6dTDSYRge0tcUp4X9yFiIMBRWVZAOInRHUE8ISN2eGLuznQdRK73U8Dr31l91rrVcp3WGw594Bksmvb47oDZM6Zmywiv5niH0IQ3CHu8BD7fXwo4GTQ4kC7UAVks2HikKUpInsW+tR+yPgajfNsOPM/iiRVG/wTNRnRoRLi0u7+WH14fBTQdA6uvAIxik+dNtQT291hPWL7QcSK5WnNTjqfHwYAmtHTplN2XkLRc0G7YD7l4ZD4EqAFFXK8vmkdJSfIHxmHZmrd1k0WOxDOcsKywOQIDAQAB
I then just did the copy-and-paste from the output here into the files. Obviously, if you are doing this seriously, you need to take a lot more care about how you store (and where you show) your private keys.

The harness

The harness is more complicated by far, for two reasons. Firstly, it has to handle multiple nodes. Secondly, I want it to infer from a single data source all the configurations for all the nodes and all the clients. I don't want to have to explain to each node what all the other nodes are (although I may want the ability to fudge something later so that I can test what happens when one or more nodes are misconfigured). And I don't want to go through the tedious business of hand-generating public and private keys when the harness can do all of that itself.

In reworking this, I also decided that my previous layout with nodes and clients separate was not the best fit for this brave new world, so I reworked the configuration like so:
{
    "nodes": [{
        "name": "http://localhost:5001",
        "listenOn": ":5001",
        "clients": [
            { "user": "https://user1.com/", "count": 10 },
            { "user": "https://user2.com/", "count": 2 }
        ]
    },
    {
        "name": "http://localhost:5002",
        "listenOn": ":5002",
        "clients": [
            { "user": "https://user1.com/", "count": 5 },
            { "user": "https://user2.com/", "count": 6 }
        ]
    }]
}

REDO_CONFIG:config/harness/node_2.json

Now the clients associated with each node are embedded within the definition of the node. I commented at the time that I wanted them separate so that I could run clients without running nodes. If I still want to do that, I will have to figure out how to make that happen. The obvious thing would be to remove the listenOn field, since that would stop the nodes being launched. We could also add an extra launch field on each node, which, when set to true would launch the node but, when set to false, would only launch the clients.

To read this configuration, I have used much the same technique as for the single node, of unpacking the JSON into a "holding" configuration, and then building out the actual configurations on top of that:
type HarnessConfig struct {
    Nodes []*HarnessNode
    keys  map[string]*rsa.PrivateKey
}

type HarnessNode struct {
    Name     string
    ListenOn string
    Clients  []*CliConfig
    url      *url.URL
}

type CliConfig struct {
    User  string
    Count int
}

REDO_CONFIG:internal/harness/config.go

Here HarnessConfig holds the overall configuration - a list of nodes (we'll come back to the map keys). Each node is represented by a NodeConfig (we'll come back to the url), and the each of the clients is read into a CliConfig struct.

This is read from the configuration file by ReadConfig as before, although the contents of the function have changed dramatically:
func ReadConfig(file string) Config {
    fd, err := os.Open(file)
    if err != nil {
        panic(err)
    }
    defer fd.Close()

    bytes, _ := io.ReadAll(fd)
    var ret HarnessConfig
    json.Unmarshal(bytes, &ret)
    ret.keys = make(map[string]*rsa.PrivateKey)

    for _, n := range ret.Nodes {
        name := n.Name
        url, err := url.Parse(name)
        if err != nil {
            panic("could not parse name " + name)
        }
        n.url = url
        pk, err := rsa.GenerateKey(rand.Reader, 2048)
        if err != nil {
            panic("key generation failed")
        }

        ret.keys[name] = pk
    }

    return &ret
}

REDO_CONFIG:internal/harness/config.go

The first two blocks operate in much the same way as the node configuration above, but note that we slip in an initialization of the keys map.

The range loop iterates over all the nodes, parsing the node name and storing it in url and then generating a private key and storing it in the keys map. (We could have chosen different strategies for storing these; this is just what came naturally to me at the time.)

This is basically a direct representation of the data that we read from the harness configuration file. We want to use this information to generate information about:
  • each node we want to launch;
  • each client we want to launch;
  • all the remote nodes as viewed from any given node.
To do this, we start by declaring an interface harness.Config like so:
type Config interface {
    NodeNames() []string
    Launcher(forNode string) config.LaunchableNodeConfig
    Remote(forNode string) config.NodeConfig
    ClientsFor(forNode string) []*CliConfig
}

REDO_CONFIG:internal/harness/config.go

which represents all the operations we will want to carry out from within the harness driver.

HarnessConfig then implements these various methods given the data it has loaded and generated.

It can return the list of node names thus:
func (c *HarnessConfig) NodeNames() []string {
    ret := make([]string, len(c.Nodes))
    for i, n := range c.Nodes {
        ret[i] = n.Name
    }
    return ret
}

REDO_CONFIG:internal/harness/config.go

This allocates an array of strings and then goes through the list of nodes storing the name of each node in the array. Finally it returns them. Remote and ClientsFor similarly scan the list of nodes, returning the appropriate entries:
func (c *HarnessConfig) Remote(forNode string) config.NodeConfig {
    for _, n := range c.Nodes {
        if n.Name == forNode {
            return &HarnessRemote{from: n, public: &c.keys[forNode].PublicKey}
        }
    }
    panic("no node found for " + forNode)
}

// ClientsPerNode implements Config.
func (c *HarnessConfig) ClientsFor(forNode string) []*CliConfig {
    for _, n := range c.Nodes {
        if n.Name == forNode {
            return n.Clients
        }
    }
    panic("no node found for " + forNode)
}

REDO_CONFIG:internal/harness/config.go

(As an aside, I notice when reviewing this that while I have refactored this to change the function names, the comments have not been updated. These comments seem cool, but if they are not going to be kept up to date, they really aren't.)

Finally, Launcher does much the same thing, but it returns a HarnessLauncher which is quite a complex beast in its own right.
func (c *HarnessConfig) Launcher(forNode string) config.LaunchableNodeConfig {
    for _, n := range c.Nodes {
        if n.Name == forNode {
            return &HarnessLauncher{config: c, launching: n, private: c.keys[n.Name], public: &c.keys[n.Name].PublicKey}
        }
    }
    panic("no node found for " + forNode)
}

REDO_CONFIG:internal/harness/config.go

So much so that I put it in its own file, which I'll reproduce here with almost no commentary. It's much like HarnessConfig but on a smaller scale.
package harness

import (
    "crypto/rsa"
    "net/url"

    "github.com/gmmapowell/ChainLedger/internal/config"
)

type HarnessLauncher struct {
    config    *HarnessConfig
    launching *HarnessNode
    private   *rsa.PrivateKey
    public    *rsa.PublicKey
}

// Name implements config.LaunchableNodeConfig.
func (h *HarnessLauncher) Name() *url.URL {
    return h.launching.url
}

// PublicKey implements config.LaunchableNodeConfig.
func (h *HarnessLauncher) PublicKey() *rsa.PublicKey {
    return &h.config.keys[h.launching.Name].PublicKey
}

// ListenOn implements config.LaunchableNodeConfig.
func (h *HarnessLauncher) ListenOn() string {
    return h.launching.ListenOn
}

// OtherNodes implements config.LaunchableNodeConfig.
func (h *HarnessLauncher) OtherNodes() []config.NodeConfig {
    ret := make([]config.NodeConfig, len(h.config.NodeNames())-1)
    j := 0
    for _, n := range h.config.NodeNames() {
        if n == h.launching.Name {
            continue
        }
        ret[j] = h.config.Remote(n)
        j++
    }
    return ret
}

// PrivateKey implements config.LaunchableNodeConfig.
func (h *HarnessLauncher) PrivateKey() *rsa.PrivateKey {
    return h.private
}

REDO_CONFIG:internal/harness/harness_launcher.go

The one thing that feels worth pointing out is the OtherNodes method (which is not yet used, but will be in the next episode). This has to return a list of all the nodes except the current node under consideration. When the HarnessLauncher is created, it is a configuration created from the list of all nodes for a specific node. So by going back to the original configuration - and specifically the list of all the nodes - we can find a list of all the nodes except this one, and then ask the configuration to return us the Remote configuration for all of those. You'll notice that we again allocate a slice with the correct number of elements in it so that we don't need to use append or reallocate the slice at any point.

Conclusion

This feels as if we have reached an abrupt end, but that's possibly because we did things backwards and wrote all the code to launch the nodes first, and then came back and did all the work with the configuration. In doing so, I tried to change everything as little as possible, so most of the harness code didn't even change.

All we were really trying to do was to set up the node configuration so that each node could ask for the URLs and public keys of the other nodes in the system.

Thursday, January 23, 2025

Storing the Block


Since we last did any work with blocks, we've totally rewritten the memory journaller. Now we need to update it again so that we can store the block we created. Hopefully, this is nice and simple and we can be out of here in just five minutes. Let's see - this is why engineers always hate estimating.

The first step is to add the three calls to store a block each time we generate one:
func (builder *SleepBlockBuilder) Run() {
    blocktime := builder.clock.Time()
    timer := builder.clock.After(delay)
    lastBlock, err := builder.blocker.Build(blocktime, nil, nil)
    if err != nil {
        panic("error returned from building block 0")
    }
    builder.journaller.RecordBlock(lastBlock)
    for {
        prev := blocktime
        select {
        case pingback := <-builder.control:
            log.Printf("%s asked to build final block and quit", builder.Name.String())
            lastBlock = builder.buildBlock(prev, builder.clock.Time(), lastBlock)
            builder.journaller.RecordBlock(lastBlock)
            pingback.Send()
            return
        case blocktime = <-timer:
            timer = builder.clock.After(delay)
            nowis := <-builder.clock.After(pause)
            // we are ready to build a block
            log.Printf("%s building block at %s", builder.Name.String(), nowis.IsoTime())
            lastBlock = builder.buildBlock(prev, blocktime, lastBlock)
            builder.journaller.RecordBlock(lastBlock)
        }
    }
}

STORE_BLOCK:internal/block/builder.go

And we need to add the RecordBlock method to the Journaller interface:
type Journaller interface {
    RecordTx(tx *records.StoredTransaction) error
    RecordBlock(block *records.Block) error
    ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error)
    Quit() error
}

STORE_BLOCK:internal/storage/journal.go

And then we need to create a struct to allow it to be a "command" to be sent to the journaller goroutine:
type JournalBlockCommand struct {
    Block *records.Block
}

STORE_BLOCK:internal/storage/journal_thread.go

And then implement the MemoryJournaller API RecordBlock method to create one of these and sent it down the channel:
func (d *MemoryJournaller) RecordBlock(block *records.Block) error {
    d.tothread <- JournalBlockCommand{Block: block}
    return nil
}

STORE_BLOCK:internal/storage/journal.go

And finally actually process this command when it comes through:
func LaunchJournalThread(name string, finj helpers.FaultInjection) chan<- JournalCommand {
    var txs []*records.StoredTransaction
    var blocks []*records.Block
    ret := make(chan JournalCommand, 20)
    log.Printf("launching new journal thread with channel %p", ret)
    go func() {
    whenDone:
        for {
            x := <-ret
            switch v := x.(type) {
            case JournalStoreCommand:
                txs = append(txs, v.Tx)
                log.Printf("%s recording tx with id %v, have %d at %p", name, v.Tx.TxID, len(txs), txs)
            case JournalBlockCommand:
                blocks = append(blocks, v.Block)
                log.Printf("%s recording block with id %v, have %d at %p", name, v.Block.ID, len(blocks), blocks)
            case JournalRetrieveCommand:
                log.Printf("reading txs = %p, len = %d", txs, len(txs))
                var ret []*records.StoredTransaction
                for _, tx := range txs {
                    if tx.WhenReceived >= v.From && tx.WhenReceived < v.Upto {
                        ret = append(ret, tx)
                    }
                }
                v.ResultChan <- ret
            case JournalCheckCapacityCommand:
                ret := cap(txs) == len(txs) && cap(txs) >= v.AtLeast
                log.Printf("checking capacity, returning %v\n", ret)
                v.ResultChan <- ret
            case JournalDoneCommand:
                log.Printf("was a done command %v\n", v)
                v.NotifyMe <- struct{}{}
                break whenDone
            default:
                log.Printf("not a valid journal command %v\n", x)
            }
        }
    }()
    return ret
}

STORE_BLOCK:internal/storage/journal_thread.go

For once, that was (almost) as easy as I'd thought. There were a few other things that I had to move around to make sure that the harness closed down the journal thread and waited for that to finish before allowing the node, and thus the harness, to finish, and to make the journaller object available where it was needed, but on the whole, not too bad ...

Tuesday, January 21, 2025

A Digression on Concurrency

Apologies if you find the title misleading. In a very real sense, this is why we're here and building features is the digression.

If you remember back to my original thesis statement, I said the main purpose of this was to investigate massively parallel and distributed systems in the context of building a blockchain thingy. This episode is dedicated to an important part of that, rather than building blocks and adding communication between nodes.

Let's review from last time. When we tried running five clients at once for about 2,000 messages each, we had a panic attack.

Here's the message again:
fatal error: concurrent map read and map write


goroutine 10305 [running]:
github.com/gmmapowell/ChainLedger/internal/storage.MemoryPendingStorage.PendingTx({0xc0001da3c0}, 0xc000444740)
        /home/gareth/Projects/ChainLedger/internal/storage/pending.go:16 +0xde
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x987af8, 0xb965a0}, {0x9865e0, 0xb965a0}, {0x986600, 0xb965a0}, 0xc000128000, {0x987080, 0xc0000b6050}}, 0xc000444740)
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:25 +0x91
github.com/gmmapowell/ChainLedger/internal/clienthandler.RecordStorage.ServeHTTP({{0x986540, 0xc000214000}, {0x988468, 0xc0001da3f0}}, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
        /home/gareth/Projects/ChainLedger/internal/clienthandler/recordstorage.go:43 +0x6a2
net/http.(*ServeMux).ServeHTTP(0xc0000e60e0, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
        /usr/local/go/src/net/http/server.go:2747 +0x3c2
net/http.serverHandler.ServeHTTP({0xc0002161e0}, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
        /usr/local/go/src/net/http/server.go:3210 +0x257
net/http.(*conn).serve(0xc00199cea0, {0x989730, 0xc000214af0})
        /usr/local/go/src/net/http/server.go:2092 +0x1ab5
created by net/http.(*Server).Serve in goroutine 23
        /usr/local/go/src/net/http/server.go:3360 +0xa9a
This error is actually in the MemoryPendingStorage rather than in the Journal, but trust me, the journal has the same problem. Because I'm me - and because I can - we are going to solve both these problems right now, but we are going to do so in different ways. There are two reasons for this: the first is simply didactic: I want to both investigate and show both methods; the second is that the ways that the MemoryPendingStorage and the Journal work are fundamentally different, and so different approaches make sense.

Pending Storage

You will remember me saying earlier that the pending storage was the only place in the system where we are going to "update" anything - that is, change something once we have written it. The consequence of this is that we could have two threads both trying to update the same object at the same time. This makes any attempt to linearize access through channels and a single thread incredibly tricky - and possibly inefficient - so we are going to use the traditional tool of a mutex, and we are going to do so exclusively.

This may sound like a tautology, but there are (at least) two approaches to using a mutex: what we are going to do here is to lock eveyrbody out while we do our work; the alternative is to allow multiple readers to access the mutex at once but have any writer lock out all other writers and all readers.

Let's review the code in the MemoryPendingStorage before we start:
func (mps MemoryPendingStorage) PendingTx(tx *api.Transaction) *api.Transaction {
    curr := mps.store[string(tx.ID())]
    if curr == nil {
        mps.store[string(tx.ID())] = tx
    }
    return curr
}

JOURNAL_MEMORY_QUERY:internal/storage/pending.go

This accesses the mps.store at least once, and possibly twice. Especially given that it may write to it after it has read it - and concluded that there is not an entry there - it is really important that nobody else is doing the same thing at the same time. In the worst possible case, two different users could decide that the transaction wasn't there, both write to it at "the same time" and one of them would have their changes lost. We can't have that.

You will often hear it said that this kind of thing "is very difficult to test". This is not true: it's easy enough to test. What's hard is getting it to happen "randomly" at runtime. And when it does, it's very hard to diagnose what's gone wrong. On the other hand, testing it is easy: you just add some delays in there. And channels make it very easy to add delays.

(On the other hand, I'm never quite sure what happens to this extra code "in production"; back in my days doing this in C++ all the fault injections were macros that were compiled out unless you turned on the EBUG flag (written -DEBUG on the command line) during compilation. I will have to see what mechanisms Go has for turning code on and off in debug and production modes.)

So let's write a test that proves that we can do two updates "at the same time" without problems here (this test will FAIL of course, because we know we can't do two updates at the same time, but the test is trying to prove that we can).

The key to having this test work is to make the gap between the two accesses unreasonably long, thus ensuring that both forks of code go through the first channel before either of them can get to the second. How long is long enough? No, I don't know either (although realistically I imagine 10ms will be), so what we are going to do is to control it in the test. And we're going to build a "general purpose mechanism" to do this, because we want to make it so easy to do that everybody will do it. And then we can consider how to optimise it out during production.
package storage_test

import (
    "testing"
    "time"

    "github.com/gmmapowell/ChainLedger/internal/api"
    "github.com/gmmapowell/ChainLedger/internal/helpers"
    "github.com/gmmapowell/ChainLedger/internal/storage"
    "github.com/gmmapowell/ChainLedger/internal/types"
)

func TestTwoThreadsCannotBeInCriticalZoneAtOnce(t *testing.T) {
    finj := helpers.FaultInjectionLibrary(t)
    mps := storage.TestMemoryPendingStorage(finj)
    results := make(chan *api.Transaction, 2)
    go func() {
        tx1, _ := api.NewTransaction("https://hello.com", types.Hash("hello"))
        results <- tx1
        sx := mps.PendingTx(tx1)
        results <- sx
    }()
    w1 := finj.AllocatedWaiter()

    go func() {
        tx2, _ := api.NewTransaction("https://hello.com", types.Hash("hello"))
        results <- tx2
        rx := mps.PendingTx(tx2)
        results <- rx
    }()
    w2 := finj.AllocatedWaiterOrNil(50 * time.Millisecond)

    if w2 != nil {
        t.Fatalf("second waiter allocated before first released")
    }
    w1.Release()

    w2 = finj.AllocatedWaiter()
    if w2 == nil {
        t.Fatalf("second waiter could not be allocated after first released")
    }
    w2.Release()

    tx1 := <-results
    tx2 := <-results
    rx1 := <-results
    rx2 := <-results

    if rx1 != nil {
        t.Fatalf("the first call to PendingTx should return nil, not %v\n", rx1)
    }
    if tx1 != rx2 {
        t.Fatalf("we did not get back the same transaction: %v %v\n", tx1, rx2)
    }
    if tx2 == rx2 {
        t.Fatalf("we received the same second tx: %v %v\n", tx2, rx2)
    }
}

CONCURRENCY_PENDING_TX:internal/storage/pending_sync_test.go

What the bleep is going on here? And will we eventually refactor it to become clearer?

Well, the main thing that is going on is that there are two goroutines being executed, each of which calls PendingTx. And the idea is that they run in parallel (which is why they are in goroutines) and that they are coordinated in the test. The coordination happens with the assistance of a fault injection library, that is a library which is deliberately designed to inject faults of this (and other natures) directly into our code, so that we don't have to wait for them to happen.

w1 and w2 are allocated by asking this library to wait until "some other code" (we'll get to that) has requested a Waiter. Now, a Waiter in this context is a piece of fault injection code which "waits" until it is released. The idea here is that we put this in the middle of a critical section and then demonstrate that either two threads can both be in it at the same time, or that one is prevented from doing so.

The call which initializes w2 specifies a duration to wait: if this duration elapses before the Waiter is allocated, nil will be returned. Because we are testing that the second thread cannot get into the critical section before we explictly Release w1, we expect this to return nil and failure to do so will be an error.

We then Release w1 which should then finish up its work and put the Transaction it received back from PendingTx in the results channel. This channel is just used to communicate between the goroutines and the test thread.

We then try retrieving the Waiter that should now have been allocated for the second goroutine - once the first thread has cleared the critical section, the second should be able to enter. Assuming this is successful, we can immediately Release it.

Finally, we can recover the two transactions returned from PendingTx in the two threads and check that the same transaction was returned in each case - because they should match.

So far, we have only made one (significant) change to the production code which is to ask for a Waiter after we have requested the transaction from the map, but before we update it if it is not there.
func (mps *MemoryPendingStorage) PendingTx(tx *api.Transaction) *api.Transaction {
    curr := mps.store[string(tx.ID())]
    mps.finj.NextWaiter()
    if curr == nil {
        mps.store[string(tx.ID())] = tx
    }
    return curr
}

CONCURRENCY_PENDING_TX:internal/storage/pending.go

When we first run this, it fails for the simple reason that, because there is no exclusion, both threads enter the critical section and attempt to allocate a thread:
--- FAIL: TestTwoThreadsCannotBeInCriticalZoneAtOnce (0.00s)
    pendingsynctest.go:32: second waiter allocated before first released
FAIL
So let's do the obvious thing and wrap that with a Mutex:
type MemoryPendingStorage struct {
    mu    sync.Mutex
    store map[string]*api.Transaction
    finj  helpers.FaultInjection
}

func (mps *MemoryPendingStorage) PendingTx(tx *api.Transaction) *api.Transaction {
    mps.mu.Lock()
    defer mps.mu.Unlock()
    curr := mps.store[string(tx.ID())]
    mps.finj.NextWaiter()
    if curr == nil {
        mps.store[string(tx.ID())] = tx
    }
    return curr
}

CONCURRENCY_PENDING_MUTEX:internal/storage/pending.go

By declaring the Mutex as an object and not a pointer inside the MemoryPendingStorage object, we guarantee that it is initialized and we are assured that it's "zero state" is ready to go. We then call both Lock and defer Unlock on the Mutex as soon as we enter PendingTx. This guarantees that we immediately lock all access and that the moment the function exits (through any path) the Mutex will be released.

And then you find that the test passes.

Making it Work without the Injection Library

Our entire test suite, on the other hand, does not pass. We have introduced a regression.

The Mutex is fine - as noted above, that is created and initialized all in one go. The problem is that we don't have an instance of the fault injection library and thus we panic on a nil pointer. The easy thing to do is to add a "dummy" fault injection library that just ignores the call to NextWaiter (technically, the call goes ahead but the implementation is empty).
func NewMemoryPendingStorage() PendingStorage {
    return TestMemoryPendingStorage(helpers.IgnoreFaultInjection())
}

CONCURRENCY_IGNORE_FAULTS:internal/storage/pending.go

OK, So Show Me the Injection Library

So far, I've glossed over what's in the injection library; this is mainly because it is messy. But now is the time to present that in all its glory. I'm just going to present it a block at a time as a finished product; I'm not going to show you how it came to be.
package helpers

import (
    "testing"
    "time"
)

type PairedWaiter interface {
    Wait()
    Release()
}

type SimplePairedWaiter struct {
    t        *testing.T
    notifyMe chan struct{}
    delay    time.Duration
}

func (spw *SimplePairedWaiter) Wait() {
    select {
    case <-time.After(spw.delay):
        spw.t.Fatalf("waited for %d but not notified", spw.delay)
    case <-spw.notifyMe:
    }
}

func (spw SimplePairedWaiter) Release() {
    spw.notifyMe <- struct{}{}
}

Apart from the usual boilerplate, this presents the idea of a PairedWaiter, which is an interface which is intended to be able to Wait in one goroutine and be Released in another. The implementation is SimplePairedWaiter which waits either to be notified on the designated channel (notifyMe) by Release, or else for the delay timeout to occur. Release sends a signal down the notifyMe channel.
type FaultInjection interface {
    NextWaiter()
    AllocatedWaiter() PairedWaiter
    AllocatedWaiterOrNil(waitFor time.Duration) PairedWaiter
}

type TestingFaultInjection struct {
    t           *testing.T
    allocations chan PairedWaiter
}

// AllocatedWaiter implements FaultInjection.
func (t *TestingFaultInjection) AllocatedWaiter() PairedWaiter {
    r := t.AllocatedWaiterOrNil(5 * time.Second)
    if r == nil {
        t.t.Fatalf("waiter had not been allocated after 5s")
    }
    return r
}

// AllocatedWaiter implements FaultInjection.
func (t *TestingFaultInjection) AllocatedWaiterOrNil(waitFor time.Duration) PairedWaiter {
    select {
    case <-time.After(waitFor):
        return nil
    case ret := <-t.allocations:
        return ret
    }
}

// NextWaiter implements FaultInjection.
func (t *TestingFaultInjection) NextWaiter() {
    ret := &SimplePairedWaiter{notifyMe: make(chan struct{}), delay: 10 * time.Second}
    t.allocations <- ret
    ret.Wait()
}

func FaultInjectionLibrary(t *testing.T) FaultInjection {
    return &TestingFaultInjection{t: t, allocations: make(chan PairedWaiter, 10)}
}

FaultInjection is the root of all of the fault injection technology we are hoping to develop over the course of this project. The idea is that there at two versions of the interface - one that actually injects faults in the testing infrastructure, and one that doesn't. Ideally, I would like to completely eliminate this from the build, but I'm not sure whether I can. But the price of a few method calls to have testable code is a tradeoff I'm willing to make.

NextWaiter is the call we expect to make from production code and the contract is that, in a test environment, it allocates a waiter and passes back a pointer to it to where the test script can access it. It then puts this new Waiter in its waiting state until either it times out or is released.

The other half of this, as called by the test harness, is AllocatedWaiter (and its companion AllocatedWaiterOrNil. This reads from the channel allocations, which is where the newly created Waiters are placed when they are allocated. If a Waiter has not been created in NextWaiter, the call to AllocatedWaiterOrNil will time out, returning nil. AllocatedWaiter considers this an error and panics.

Finally, the function FaultInjectionLibrary passes in a testing environment (so that errors can be reported gracefully) and a channel for the allocated Waiters to be passed down. Note that this channel has a "size" of 10, meaning that up to 10 Waiters can be allocated before the channel will block. This may not be enough, but you have to pick some number.
type InactiveFaultInjection struct{}

// AllocatedWaiter implements FaultInjection.
func (i *InactiveFaultInjection) AllocatedWaiter() PairedWaiter {
    panic("this should only be called from test methods, I think")
}

// AllocatedWaiterOrNil implements FaultInjection.
func (i *InactiveFaultInjection) AllocatedWaiterOrNil(waitFor time.Duration) PairedWaiter {
    panic("this should only be called from test methods, I think")
}

// NextWaiter implements FaultInjection.
func (i *InactiveFaultInjection) NextWaiter() {
}

func IgnoreFaultInjection() FaultInjection {
    return &InactiveFaultInjection{}

}

CONCURRENCY_IGNORE_FAULTS:internal/helpers/faults.go

Finally, for production use there is a version of the Fault Injection Library which has a "nothing" implementation of NextWaiter. At the moment, this is the only call that should be in production code, so the other two methods in the class are implemented using panic.

Shared Instances are, in fact, Fine

It may be objected that we are returning these instances and then updating them at the same time in ResolveTx:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
    curr := r.store.PendingTx(tx)
    complete := true
    for i, v := range tx.Signatories {
        if v.Signature != nil && curr != nil {
            curr.Signatories[i] = v
        } else if v.Signature == nil {
            if curr == nil || curr.Signatories[i].Signature == nil {
                complete = false
            }
        }
    }

    if complete {
        return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
    }

    return nil, nil
}

CONCURRENCY_IGNORE_FAULTS:internal/clienthandler/resolver.go

This is absolutely true, but it is not really an objection. The object does not get reallocated, and there is no contention for a memory location. There are two possibilites: one is that the two goroutines are updating different signatures, and the other is that they are trying to update the same signature. If they are trying to update different signatures, both will succeed and it will all be fine. If they are trying to update the same signature, it doesn't matter what we do, because one will win and the other will lose. Because the pointer writes are atomic, one or the other will win - they cannot mangle each other.

OK, let's assume that that argument didn't convince you. How would we test this? Well, there's only one place where we store a signature, so let's put another Waiter just before that, and then write a test that goes through there and makes sure that both goroutines have arrived there before it releases either of them.

So we think we can write this test, with a view to coming back and making it more "solid" once we have seen it work or not work.
package clienthandler_test

import (
    "testing"

    "github.com/gmmapowell/ChainLedger/internal/helpers"
    "github.com/gmmapowell/ChainLedger/internal/records"
)

func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
    clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
    setup(t, clock)
    collector := make(chan *records.StoredTransaction, 2)
    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
        stx, _ := r.ResolveTx(tx)
        collector <- stx
    }()
    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
        stx, _ := r.ResolveTx(tx)
        collector <- stx
    }()
    s1 := <-collector
    s2 := <-collector
    if s1 != s2 {
        t.Fatalf("The two transactions were not the same")
    }
    if s1.Signatories[0].Signature == nil {
        t.Fatalf("the first signature is missing")
    }
    if s1.Signatories[1].Signature == nil {
        t.Fatalf("the second signature is missing")
    }
}

RESOLVER_SYNC_SIGNATURE:internal/clienthandler/resolver_sync_test.go

Note that because this is in the same package (clienthandler_test) as the previous resolver tests, it is able to just "borrow" all the infrastructure we built up there (e.g. maketx).

But when we run this test, it doesn't even fail. It just hangs. What's the problem? Well, here's a hint: I have done something very bad, and I didn't realize it until just now. It's because I'm used to the Java threading and exception models, and Go does not work the same way.

The immediate problem is that I have called t.Fatalf from within a goroutine, and that, apparently, is just not done, because it calls FailNow which cannot be done within a goroutine.

While I was debugging this, I realized that the code to put Signatures in place was actually overwriting the whole Signatory which is both overkill and probably asking for trouble. So I fixed that:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
    curr := r.store.PendingTx(tx)
    complete := true
    for i, v := range tx.Signatories {
        if v.Signature != nil && curr != nil {
            curr.Signatories[i].Signature = v.Signature
        } else if v.Signature == nil {
            if curr == nil || curr.Signatories[i].Signature == nil {
                complete = false
            }
        }
    }

    if complete {
        return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
    }

    return nil, nil
}

RESOLVER_SYNC_SIGNATURE:internal/clienthandler/resolver.go

But obviously it still hangs. So what is happening here is that I am calling FailNow indirectly, and that is doing something to cause the current goroutine to exit, but is not causing the whole test to fail. That is sitting, waiting for a message to come down a channel that is never going to come. And so it all hangs.

We need to rework our plumbing and infrastructure to make sure that this does something (like closing the channels) that will release the main thread.

For the record, following the code through to the bitter end, the actual fault in our test is that we haven't defined any entries for the HasherFactory and it complains about this inside the function to create a StoredTransaction inside ResolveTx. But that's not important right now.

Reworking the Test

I'm going to just dive off the deep end here and start building stuff. I don't know what the consequences of my actions are going to be, and right now I don't really care. In the immortal words of Kent Beck, "if you don't know what to do to succeed, try to fail". We'll end up somewhere and then we can always rework whatever we've got.

Now, when in doubt, what I tend to do is to wrap something up in a class and make the problem "go away" in the sense of putting it somewhere else rather than in my eyeline. So I'm going to wrap up all of the channel communication in a class:
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
    clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
    cc := helpers.NewChanCollector(t, 2)
    setup(cc, clock)
    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()
    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()
    s1 := cc.Recv()
    s2 := cc.Recv()
    if s1 != s2 {
        t.Fatalf("The two transactions were not the same")
    }
    tx1 := s1.(records.StoredTransaction)
    if tx1.Signatories[0].Signature == nil {
        t.Fatalf("the first signature is missing")
    }
    if tx1.Signatories[1].Signature == nil {
        t.Fatalf("the second signature is missing")
    }
}

INTRODUCED_CHAN_COLLECTOR:internal/clienthandler/resolver_sync_test.go

Tackling the changes I wanted to make here, I introduced a new class ChanCollector and created an instance, passing it a pointer to the testing environment (so it can complain if the channel is closed when it is waiting for it) and the size of channel buffer I want. I then replace the explicit send with a call to Send and the explicit channel read with a call to Recv. The idea is that I have isolated any other problems in this class.

However, this doesn't actually address the problem, since the problem is that we are calling FailNow in a goroutine. So we need to stop doing this, so we stop passing in the testing environment and instead pass in a pointer to this ChanCollector object.

How can we do that? It isn't the right type. Ah, yes, well, more plumbing changes...

I introduced a new interface helpers.Fatals and gave it a subset of methods from the testing.T class (I did look to see if such an interface already exists, but didn't find one):
package helpers

type Fatals interface {
    Fatalf(format string, args ...any)
    Log(args ...any)
    Logf(format string, args ...any)
    Fail()
}

INTRODUCED_CHAN_COLLECTOR:internal/helpers/fatals.go

I then implemented those methods on ChanCollector and replaced all the places where I was passing a testing.T around to instead take a helpers.Fatals. And then, I can pass the ChanCollector where I previously had t.

This is the ChanCollector:
package helpers

import (
    "testing"
)

type ChanCollector struct {
    t         *testing.T
    collector chan any
}

// Fail implements Fatals.
func (cc *ChanCollector) Fail() {
    close(cc.collector)
}

// Fatalf implements Fatals.
func (cc *ChanCollector) Fatalf(format string, args ...any) {
    cc.t.Logf(format, args...)
    cc.Fail()
}

// Log implements Fatals.
func (cc *ChanCollector) Log(args ...any) {
    cc.t.Log(args...)
}

// Logf implements Fatals.
func (cc *ChanCollector) Logf(format string, args ...any) {
    cc.t.Logf(format, args...)
}

func (cc *ChanCollector) Send(obj any) {
    cc.collector <- obj
}

func (cc *ChanCollector) Recv() any {
    msg, ok := <-cc.collector
    if !ok {
        cc.t.FailNow()
    }
    return msg
}

func NewChanCollector(t *testing.T, nc int) *ChanCollector {
    return &ChanCollector{t: t, collector: make(chan any, nc)}
}

INTRODUCED_CHAN_COLLECTOR:internal/helpers/chancollector.go

Which is so much just a glue layer that I'm not even going to bother commenting on it.

Sadly, by itself, this doesn't help much. Why not? Well, the semantics of FailNow, and thus Fatalf, have changed subtly: previously, the FailNow method would cause the process to exit. Now, it simply closes the channel back to the main thread and continues. This is a problem in the problem code, NewHasher:
func (f *MockHasherFactory) NewHasher() hash.Hash {
    if f.next >= len(f.hashers) {
        f.t.Fatalf("The mock hasher does not have %d hashers configured", f.next+1)
    }
    r := f.hashers[f.next]
    f.next++
    return r
}

RESOLVER_SYNC_SIGNATURE:internal/helpers/hashing.go

Consequently, it drops through to the next line where it tries to read a non-existent member of the slice. This isn't ever going to (and doesn't) end well. So we might as well protect it with our own panic:
func (f *MockHasherFactory) NewHasher() hash.Hash {
    if f.next >= len(f.hashers) {
        f.t.Fatalf("The mock hasher does not have %d hashers configured", f.next+1)
        panic("not enough hashers")
    }
    r := f.hashers[f.next]
    f.next++
    return r
}

INTRODUCED_CHAN_COLLECTOR:internal/helpers/hashing.go

So nothing really worked the way I wanted, but at least we are in control of our own destiny and we get a decent panic message before we quit:
--- FAIL: TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime (7.98s)
    chancollector.go:19: The mock hasher does not have 1 hashers configured
panic: not enough hashers


goroutine 21 [running]:
github.com/gmmapowell/ChainLedger/internal/helpers.(*MockHasherFactory).NewHasher(0xc0000a0fc0)
        /home/gareth/Projects/ChainLedger/internal/helpers/hashing.go:37 +0x18d
github.com/gmmapowell/ChainLedger/internal/records.CreateStoredTransaction({0x727af0, 0xc000092b40}, {0x727140, 0xc0000a0fc0}, {0x727160, 0xc0000a0ff0}, 0xc0000da280, 0xc000092000)
        /home/gareth/Projects/ChainLedger/internal/records/storedtransaction.go:26 +0x2cb
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x727af0, 0xc000092b40}, {0x727140, 0xc0000a0fc0}, {0x727160, 0xc0000a0ff0}, 0xc0000da280, {0x727560, 0xc00007c4e0}}, 0xc000404000)
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:38 +0x28c
github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime.func1()
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:16 +0x165
created by github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime in goroutine 7
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:14 +0x153
So, I'm just going to press on and this point and fix the problem (i.e. add some fake hashes) and see what happens.

Fixing the test

So I went ahead and added a Hasher and said "sure, let it accept anything". I'm not here to test the hashing algorithm, I'm here to look into concurrency issues. Not only does it waste my time, it reduces the clarity of an already unclear test, cluttering it up with irrelevancies.

And then I found it failed. Looking at it closely, I'm not surprised. I was specifically testing that both messages that came back were the same. They're not, of course: one of them (whichever gets processed first) should be nil and the other one should be the records.StoredTransaction. I tried to fix this and was surprised by the outcome.
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
    clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
    cc := helpers.NewChanCollector(t, 2)
    setup(cc, clock)

    h1 := hasher.AddMock("fred")
    h1.AcceptAnything()

    signer.Expect(types.Signature("tx-sig"), nodeKey, types.Hash("fred"))

    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()
    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()
    s1 := cc.Recv()
    s2 := cc.Recv()
    if s1 == nil && s2 == nil {
        t.Fatalf("both transactions were nil")
    }
    if s1 != nil && s2 != nil {
        t.Fatalf("both transactions were NOT nil: %v %v", s1, s2)
    }
    tx1 := s1.(records.StoredTransaction)
    if tx1.Signatories[0].Signature == nil {
        t.Fatalf("the first signature is missing")
    }
    if tx1.Signatories[1].Signature == nil {
        t.Fatalf("the second signature is missing")
    }
}

RESOLVER_SYNC_FIX_1:internal/clienthandler/resolver_sync_test.go

--- FAIL: TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime (733.59s)
    resolversynctest.go:37: both transactions were NOT nil: <nil> &{[102 114 101 100] 1735095600121 https://test.com/msg1 [104 97 115 104] [0xc00007c2e0 0xc00007c420] [116 120 45 115 105 103]}
That clearly says "both were NOT nil" and then shows the first one IS nil. Uh, what now, Go?

After some googling, I came across this article about nil, from which I learned a lot, not just about nil, but also more about pointers and interfaces and that has helped me achieve some more clarity around why interfaces are at the same level of indirection as pointers (i.e. because an interface is an object which contains a pointer).

OK, so I need to unwrap the any I got passed back and lay my hands on a real pointer. Hopefully this works without panicking because otherwise I'm in an infinite loop of "cast this so I can test it is nil/is it non-nil so that I can safely cast it?"
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
    clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
    cc := helpers.NewChanCollector(t, 2)
    setup(cc, clock)

    h1 := hasher.AddMock("fred")
    h1.AcceptAnything()

    signer.Expect(types.Signature("tx-sig"), nodeKey, types.Hash("fred"))

    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()
    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()
    s1 := cc.Recv()
    s2 := cc.Recv()
    tx1 := s1.(*records.StoredTransaction)
    tx2 := s2.(*records.StoredTransaction)
    if tx1 == nil && tx2 == nil {
        t.Fatalf("both transactions were nil")
    }
    if tx1 != nil && tx2 != nil {
        t.Fatalf("both transactions were NOT nil: %v %v", tx1, tx2)
    }
    if tx1 == nil {
        tx1 = tx2
    }
    if tx1.Signatories[0].Signature == nil {
        t.Fatalf("the first signature is missing")
    }
    if tx1.Signatories[1].Signature == nil {
        t.Fatalf("the second signature is missing")
    }
}

RESOLVER_SYNC_FIX_2:internal/clienthandler/resolver_sync_test.go

So I'm glad to report that did work, and then I ran into the obvious problem, that my code assumed that tx1 was valid, which may or may not be the case, depending on what happens. So I added an extra test here: if tx1 is nil then, by deduction, tx2 must be non-nil, so we assign tx2 to tx1 and carry on.

OK, we've managed to work around this issues with Go and get a passing test. What were we trying to do again?

Trying to make the test fail

We were trying to demonstrate that it doesn't matter if both goroutines see the same transaction at the same time and both update it: at some point, one of them will have a transaction in their hands that has both signatures and at some point after that they will test to see if it has all the signatures attached. And will not crash.

A failure would most likely look like both goroutines returning nil because both of them believed that they only had one signature.

So we need to put some wait points in to our code to make sure that we make it as dramatic as possible, rather than just depending on luck.
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
    clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
    cc := helpers.NewChanCollector(t, 2)
    setup(cc, clock, true)

    h1 := hasher.AddMock("fred")
    h1.AcceptAnything()

    signer.Expect(types.Signature("tx-sig"), nodeKey, types.Hash("fred"))

    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()
    go func() {
        tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
        stx, _ := r.ResolveTx(tx)
        cc.Send(stx)
    }()

    // Now wait for both of them to get to the critical section
    w1 := finj.AllocatedWaiter()
    w2 := finj.AllocatedWaiter()

    // Then we can release both of them
    w1.Release()
    w2.Release()

    s1 := cc.Recv()
    s2 := cc.Recv()
    tx1 := s1.(*records.StoredTransaction)
    tx2 := s2.(*records.StoredTransaction)
    if tx1 == nil && tx2 == nil {
        t.Fatalf("both transactions were nil")
    }
    if tx1 != nil && tx2 != nil {
        t.Fatalf("both transactions were NOT nil: %v %v", tx1, tx2)
    }
    if tx1 == nil {
        tx1 = tx2
    }
    if tx1.Signatories[0].Signature == nil {
        t.Fatalf("the first signature is missing")
    }
    if tx1.Signatories[1].Signature == nil {
        t.Fatalf("the second signature is missing")
    }
}

RESOLVER_SYNC_TEST_FINJ:internal/clienthandler/resolver_sync_test.go

This asks the setup() function to allocate a fault injector, and uses that to align the two threads and then release them. In the Resolver, we allocate the waiter once the pending transaction has been returned:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
    curr := r.store.PendingTx(tx)
    r.finj.NextWaiter()
    complete := true
    for i, v := range tx.Signatories {
        if v.Signature != nil && curr != nil {
            curr.Signatories[i].Signature = v.Signature
        } else if v.Signature == nil {
            if curr == nil || curr.Signatories[i].Signature == nil {
                complete = false
            }
        }
    }

    if complete {
        return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
    }

    return nil, nil
}

RESOLVER_SYNC_TEST_FINJ:internal/clienthandler/resolver.go

And, look at that! It panics with a nil pointer dereference:
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x685ac8]


goroutine 22 [running]:
github.com/gmmapowell/ChainLedger/internal/records.CreateStoredTransaction({0x728ff0, 0xc00010ab40}, {0x728640, 0xc000118fc0}, {0x728660, 0xc000118ff0}, 0xc000308080, 0x0)
        /home/gareth/Projects/ChainLedger/internal/records/storedtransaction.go:24 +0x88
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x728ff0, 0xc00010ab40}, {0x728640, 0xc000118fc0}, {0x728660, 0xc000118ff0}, 0xc000308080, {0x728a60, 0xc00007c3a0}, {0x729988, ...}}, ...)
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:40 +0x26b
github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime.func2()
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:28 +0x165
created by github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime in goroutine 7
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:26 +0x2ad
What has happened? Amazingly, this error has nothing to do with the synchronization issue here, but is just a simple bug that has been sitting there all the time, but I think would only be exposed by a single-client transaction.

PendingTx returns nil (which is stored in curr) if the transaction passed in is the first instance of that transaction. If that version of the transaction has all the signatures it needs already, then nil is passed in as the transaction to be converted into a StoredTransaction.
func TestSubmittingACompleteTransactionStoresItImmediately(t *testing.T) {
    clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
    setup(t, clock, false)
    h1 := hasher.AddMock("fred")
    h1.AcceptAnything()
    signer.SignAnythingAs("hello")
    tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/", true)
    stx, _ := r.ResolveTx(tx)
    if stx == nil {
        t.Fatalf("the transaction was not stored")
    }
}

RESOLVER_COMPLETED_TEST:internal/clienthandler/resolver_test.go

And, lo and behold, we come across the same panic as above. The problem is that the PendingTransaction is nil. So let's fix that by updating the value of curr just before we call CreateStoredTransaction (but only if it's nil).
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
    curr := r.store.PendingTx(tx)
    r.finj.NextWaiter()
    complete := true
    for i, v := range tx.Signatories {
        if v.Signature != nil && curr != nil {
            curr.Signatories[i].Signature = v.Signature
        } else if v.Signature == nil {
            if curr == nil || curr.Signatories[i].Signature == nil {
                complete = false
            }
        }
    }

    if complete {
        if curr == nil {
            curr = tx
        }
        return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
    }

    return nil, nil
}

RESOLVER_COMPLETED_TEST_FIXED:internal/clienthandler/resolver.go

So our new test passes - but what about the synchronous test?

Well, no. It makes it further, though:
panic: runtime error: index out of range [1] with length 1


goroutine 9 [running]:
github.com/gmmapowell/ChainLedger/internal/helpers.(*ClockDouble).Time(0xc00008ab40)
        /home/gareth/Projects/ChainLedger/internal/helpers/clock.go:48 +0x7c
github.com/gmmapowell/ChainLedger/internal/records.CreateStoredTransaction({0x72a090, 0xc00008ab40}, {0x7296e0, 0xc0000a0fc0}, {0x729700, 0xc0000a0ff0}, 0xc0000da100, 0xc000134000)
        /home/gareth/Projects/ChainLedger/internal/records/storedtransaction.go:25 +0x11f
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x72a090, 0xc00008ab40}, {0x7296e0, 0xc0000a0fc0}, {0x729700, 0xc0000a0ff0}, 0xc0000da100, {0x729b00, 0xc00035e260}, {0x72aa28, ...}}, ...)
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:43 +0x28e
github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime.func2()
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:28 +0x165
created by github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime in goroutine 7
        /home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:26 +0x2ad
This fails because while it is possible to update both signatures at the same time, if both legs see all the signatures complete, then both will try and create a stored transaction and we will have more StoredTransaction objects than we want. Can we fix this?

Well, yes, of course we can. There are a number of strategies, and we may try another one in a minute, but one possibility is to add a flag to the pending transaction to indicate if it has already been "selected" by a previous thread.

There is a problem, of course, which is "how do we test the flag, and then update it before the other leg can test it?" We could, of course, use another mutex, but this specific problem has a specific solution right down at the processor level which is an "atomic test and set operation". In Go, this is provided by the atomic package (available since Go 1.19).

So we can update ResolveTx to look like this:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
    curr := r.store.PendingTx(tx)
    r.finj.NextWaiter()
    complete := true
    for i, v := range tx.Signatories {
        if v.Signature != nil && curr != nil {
            curr.Signatories[i].Signature = v.Signature
        } else if v.Signature == nil {
            if curr == nil || curr.Signatories[i].Signature == nil {
                complete = false
            }
        }
    }

    if complete {
        if curr == nil {
            curr = tx
        }
        if !curr.AlreadyCompleted() {
            return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
        }
    }

    return nil, nil
}

RESOLVER_COMPLETE_ONLY_ONCE:internal/clienthandler/resolver.go

And AlreadyCompleted is implemented in transaction.go:
type Transaction struct {
    ContentLink *url.URL
    ContentHash types.Hash
    Signatories []*types.Signatory
    completed   atomic.Bool
}
...
func (tx *Transaction) AlreadyCompleted() bool {
    return tx.completed.Swap(true)
}

RESOLVER_COMPLETE_ONLY_ONCE:internal/api/transaction.go

And now all our test pass. Whew!

Journal

Happily, even our big test now passes, but I believe that is good fortune, rather than good design. The overlap happens in the two methods in MemoryJournaller:
func (d *MemoryJournaller) RecordTx(tx *records.StoredTransaction) error {
    d.txs = append(d.txs, tx)
    fmt.Printf("%s recording tx with id %v, have %d\n", d.name, tx.TxID, len(d.txs))
    return nil
}

func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error) {
    var ret []*records.StoredTransaction
    for _, tx := range d.txs {
        if tx.WhenReceived >= from && tx.WhenReceived < upto {
            ret = append(ret, tx)
        }
    }
    return ret, nil
}

RESOLVER_COMPLETE_ONLY_ONCE:internal/storage/journal.go

But the only time that anything is written here is during the append function call, so the opportunities for conflict are small - most likely if we are in the process of reading a block of transactions when an "updating append" occurs.

Using fault injection - and some cunning white box testing - we can force the Read method to pause while we pump some more records into the array, forcing an extension, and then release the reader.

I'm going to do this in a couple of steps. So, first, we need to get the journal in a state where it has at least one message in the txs slice and the slice has reached its capacity. How do we know these things? Well, there are meta operations on slices (len and cap) that enable you to know the length of a slice and its capacity (i.e. how long it can become before reallocation will occur). So, let's add some methods to the journal to expose this:
func (d *MemoryJournaller) HaveSome() bool {
    fmt.Printf("len = %d\n", len(d.txs))
    return len(d.txs) > 0
}

func (d *MemoryJournaller) NotAtCapacity() bool {
    fmt.Printf("cap = %d len = %d\n", cap(d.txs), len(d.txs))
    return cap(d.txs) < len(d.txs)
}

JOURNAL_SYNC_1:internal/storage/journal.go

The fmt.Printf statements here just show my mind at work during development - I like to see what's going on. I'll remove them in a couple of iterations.

With this in place we can write the following test:
package storage_test

import (
    "testing"

    "github.com/gmmapowell/ChainLedger/internal/helpers"
    "github.com/gmmapowell/ChainLedger/internal/records"
    "github.com/gmmapowell/ChainLedger/internal/storage"
)

func TestWeCanAddAndRecoverAtTheSameTime(t *testing.T) {
    finj := helpers.FaultInjectionLibrary(t)
    tj := storage.TestJournaller("journal", finj)
    journal := tj.(*storage.MemoryJournaller)
    completed := false
    go func() {
        for !completed {
            journal.RecordTx(storableTx())
        }
    }()
    aw := finj.AllocatedWaiter()
    for !journal.HaveSome() || journal.NotAtCapacity() {
        aw.Release()
        aw = finj.AllocatedWaiter()
    }

}

func storableTx() *records.StoredTransaction {
    return &records.StoredTransaction{TxID: []byte("hello")}
}

JOURNAL_SYNC_1:internal/storage/journal_sync_test.go

Apart from all the boilerplate and setup, this kicks off a goroutine (obviously to test anything to do with concurrency, you need at least two goroutines - usually in addition to the test). This keeps adding new records to the journal until the completed flag is set - or the goroutine is shut down when the test ends. Inside the RecordTx method (as we'll see in a minute) it pauses at the critical point, allocating a waiter, and blocking the goroutine. In the main test, we wait for that waiter to be allocated, and then enter a loop, waiting for the desired condition to exist and repeatedly releasing the goroutine, then waiting for it to pause again. Using this pattern, there is no possibility that both threads are running at the same time.

At the end of this, we are in the desired condition: the slice is at capacity with at least one entry in it. As it happens, it seems to be exactly one entry, as shown here:
Starting: /home/gareth/go/bin/dlv dap --listen=127.0.0.1:38689 --log-dest=3 from /home/gareth/Projects/ChainLedger/internal/storage
DAP server listening at: 127.0.0.1:38689
Type 'dlv help' for list of commands.
len = 0
journal recording tx with id [104 101 108 108 111], have 1
len = 1
cap = 1 len = 1
PASS
Process 73675 has exited with status 0
Detaching
dlv dap (73454) exited with code: 0
Finally, these are the wait points we added to the journaller code:
func (d *MemoryJournaller) RecordTx(tx *records.StoredTransaction) error {
    d.finj.NextWaiter()
    d.txs = append(d.txs, tx)
    fmt.Printf("%s recording tx with id %v, have %d\n", d.name, tx.TxID, len(d.txs))
    return nil
}

func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error) {
    var ret []*records.StoredTransaction
    for _, tx := range d.txs {
        d.finj.NextWaiter()
        if tx.WhenReceived >= from && tx.WhenReceived < upto {
            ret = append(ret, tx)
        }
    }
    return ret, nil
}

JOURNAL_SYNC_1:internal/storage/journal.go

So now we want to kick off our second goroutine, to try and recover the messages to add to a block, and that is going to hit a wait point, which we will wait for and capture. We can then release the first thread, triggering a reallocation. If all goes well, when we release the second thread, it will panic.
func TestWeCanAddAndRecoverAtTheSameTime(t *testing.T) {
    clock := helpers.ClockDoubleSameMinute("2024-12-25_03:00", "05.121", "07.282", "11.281", "19.202")

    finj := helpers.FaultInjectionLibrary(t)
    tj := storage.TestJournaller("journal", finj)
    journal := tj.(*storage.MemoryJournaller)
    completed := false
    go func() {
        for !completed {
            journal.RecordTx(storableTx(clock))
        }
    }()
    aw := finj.AllocatedWaiter()
    for !journal.HaveSome() || journal.NotAtCapacity() {
        aw.Release()
        aw = finj.AllocatedWaiter()
    }
    go func() {
        txs, _ := tj.ReadTransactionsBetween(clock.Times[0], clock.Times[3])
        fmt.Printf("%v\n", txs)
    }()
    rw := finj.AllocatedWaiter()
    aw.Release()
    /*aw = */ finj.AllocatedWaiter()
    rw.Release()
    // aw.Release()
}

JOURNAL_SYNC_2:internal/storage/journal_sync_test.go

And this is what happens:
len = 0
journal recording tx with id [104 101 108 108 111], have 1 at 0xc000130060
len = 1
cap = 1 len = 1
before waiting txs = 0xc000130060
journal recording tx with id [104 101 108 108 111], have 2 at 0xc00012c760
after waiting txs = 0xc000130060
[0xc00014e1c0]
PASS
I freely admit I wasn't expecting that: so much so, that I added extra tracing to be sure I was seeing what I was seeing, specifically to report the "pointer value" of the slice in both goroutines. As you can see, the append in RecordTx has changed the pointer value of d.txs (from (130060 to 12c760), but the thread doing the recovery has not noticed the change. If the memory underlying this original slice is still valid, then everything is fine; if not, we should have had a panic. I think I need to go off and research what Go is supposed to do in these circumstances. If this is guaranteed behaviour, we are done. If not, we need to figure out how to expose that we have broken something; it may be my assumption that recovering one message was enough was invalid.

Googling didn't help me very much, but going back to The Go Programming Language, Section 9.1 makes it very clear that there is nothing magical going on and bad things can definitely happen, and I am perhaps even being optimistic about there being a panic - it would seem that scribbling over random memory is a distinct possibility.

So I am going to increase the number of writes and reads that I do and see what happens.
func TestWeCanAddAndRecoverAtTheSameTime(t *testing.T) {
    clock := helpers.ClockDoubleSameMinute("2024-12-25_03:00", "05.121", "07.282", "08.301", "08.402", "11.281", "14.010", "19.202")

    finj := helpers.FaultInjectionLibrary(t)
    tj := storage.TestJournaller("journal", finj)
    journal := tj.(*storage.MemoryJournaller)
    completed := false
    go func() {
        for !completed {
            journal.RecordTx(storableTx(clock))
        }
    }()
    aw := finj.AllocatedWaiter()
    for !journal.HaveAtLeast(3) || journal.NotAtCapacity() {
        aw.Release()
        aw = finj.AllocatedWaiter()
    }
    waitAll := make(chan struct{})
    go func() {
        txs, _ := tj.ReadTransactionsBetween(clock.Times[0], clock.Times[6])
        fmt.Printf("%v\n", txs)
        txs, _ = tj.ReadTransactionsBetween(clock.Times[0], clock.Times[6])
        fmt.Printf("%v\n", txs)
        waitAll <- struct{}{}
    }()
    rw := finj.AllocatedWaiter()
    aw.Release()
    /*aw = */ finj.AllocatedWaiter()
    rw.Release()
    finj.JustRun()
    <-waitAll
}

JOURNAL_SYNC_3:internal/storage/journal_sync_test.go

It still doesn't end as disastrously as you might think, but it's not "right" and I think the actual consequences probably depend as much on the compiler as anything else.
Starting: /home/gareth/go/bin/dlv dap --listen=127.0.0.1:35803 --log-dest=3 from /home/gareth/Projects/ChainLedger/internal/storage
DAP server listening at: 127.0.0.1:35803
Type 'dlv help' for list of commands.
journal recording tx with id [104 101 108 108 111], have 1 at 0xc000180000
journal recording tx with id [104 101 108 108 111], have 2 at 0xc000182020
journal recording tx with id [104 101 108 108 111], have 3 at 0xc000198060
journal recording tx with id [104 101 108 108 111], have 4 at 0xc000198060
before waiting txs = 0xc000198060
journal recording tx with id [104 101 108 108 111], have 5 at 0xc00019e040
after waiting txs = 0xc000198060
before waiting txs = 0xc000198060
after waiting txs = 0xc000198060
before waiting txs = 0xc000198060
after waiting txs = 0xc000198060
before waiting txs = 0xc000198060
after waiting txs = 0xc000198060
[0xc00011c1c0 0xc000194000 0xc000194230 0xc000194460]
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
[0xc00011c1c0 0xc000194000 0xc000194230 0xc000194460 0xc000194690]
PASS
Process 146902 has exited with status 0
Detaching
dlv dap (146676) exited with code: 0
Clearly, in the goroutine doing the reading, it does not get the memo that d.txs has changed until after it leaves the range loop. It is unclear what code is generated that this should be true. Bear in mind that it is not just that the range might have cached it; I am printing out the value and it does not change until after the loop has completed. As a compiler designer I can only imagine that the value of d.txs is captured in a local variable before the loop starts, equivalent to this:
        for dtxs := d.txs; _, tx := range dtxs {
                fmt.Printf("before waiting txs = %p\n", dtxs)
                d.finj.NextWaiter()
                fmt.Printf("after waiting txs = %p\n", dtxs)
                ...
        }
Thus the whole of the slice is read (and "locked" so that it is not released by the memory manager) for the duration of the loop. If so, the code "as written" (i.e. if the only possible race point were inside the loop) would be valid; but it is still theoretically possible (although not practically testable) that the slice d.txs is updated while it is being cached; if so, the three parts of the slice could be out of whack.

Either way, we are not going to take the risk, but instead we are going to put the whole of the journal in a separate goroutine and submit both write requests and read requests through a channel. The current API will be preserved but will do the channel interactions.

A Channel-based Journal

So let's start with drafting out what such a journal would look like. This isn't really a test, but it gives a flavour of what we're going to be doing (although we will need to add fields to the requests to Store and Retrieve).
func TestThread(t *testing.T) {
    ch := storage.LaunchJournalThread()
    ch <- storage.JournalStoreCommand{}
    ch <- storage.JournalRetrieveCommand{}
    ch <- 42

    donech := make(chan struct{})
    ch <- storage.JournalDoneCommand{NotifyMe: donech}
    <-donech
}

JOURNAL_THREAD_DRAFT:internal/storage/journal_sync_test.go

This "launches" a new journal thread - a goroutine responsible for isolating all of the journal contents - and returns a channel which can be used to submit requests. As I understand it, writing to a channel is thread safe, so we can store this anywhere we want and all our threads can submit requests to the journal via this channel without issues. We then submit a variety of items.

All of the items referenced here are to be found in the new file journal_thread.go:
package storage

import "fmt"

type JournalCommand interface {
}

type JournalStoreCommand struct {
}

type JournalRetrieveCommand struct {
}

type JournalDoneCommand struct {
    NotifyMe chan<- struct{}
}

func LaunchJournalThread() chan<- JournalCommand {
    ret := make(chan JournalCommand)
    go func() {
    whenDone:
        for {
            x := <-ret
            switch v := x.(type) {
            case JournalStoreCommand:
                fmt.Printf("was a store command %v\n", v)
            case JournalRetrieveCommand:
                fmt.Printf("was a retrieve command %v\n", v)
            case JournalDoneCommand:
                fmt.Printf("was a done command %v\n", v)
                v.NotifyMe <- struct{}{}
                break whenDone
            default:
                fmt.Printf("not a valid journal command %v\n", x)
            }
        }
    }()
    return ret
}

JOURNAL_THREAD_DRAFT:internal/storage/journal_thread.go

And when we run this test, we see the following (fairly boring) output:
was a store command {}
was a retrieve command {}
not a valid journal command 42
was a done command {0xc000180000}
It's interesting to note that, because of the way Go implements interfaces, the interface here is functionally equivalent to any - any object is a valid instance of the interface because it trivially implements the (non-existent) interface methods.

So, what have we actually achieved here? Not a lot, which is why I've described it as a "draft": it's an outline of what we want to achieve. But it's worth noting that from here, pretty much everything is a refactoring.

The intention of this "refactoring" is that the user API shouldn't change (much), but rather, than we should bury all references to the journal thread within the existing API so that it is compatible with other APIs (such as the Dummy API, and also the "future" DynamoDB implementation) that don't need an exclusionary thread. The one API change we do want is to allow the thread to be terminated by having a Quit method on the journal.

So, pulling across the code from the existing journal API, we end up with this:
type JournalStoreCommand struct {
    Tx *records.StoredTransaction
}

type JournalRetrieveCommand struct {
    From, Upto types.Timestamp
    ResultChan chan<- []*records.StoredTransaction
}

type JournalCheckCapacityCommand struct {
    AtLeast    int
    ResultChan chan<- bool
}

type JournalDoneCommand struct {
    NotifyMe chan<- struct{}
}

func LaunchJournalThread(name string, finj helpers.FaultInjection) chan<- JournalCommand {
    var txs []*records.StoredTransaction
    ret := make(chan JournalCommand)
    log.Printf("launching new journal thread with channel %p", ret)
    go func() {
    whenDone:
        for {
            x := <-ret
            switch v := x.(type) {
            case JournalStoreCommand:
                txs = append(txs, v.Tx)
                log.Printf("%s recording tx with id %v, have %d at %p", name, v.Tx.TxID, len(txs), txs)
            case JournalRetrieveCommand:
                log.Printf("reading txs = %p, len = %d", txs, len(txs))
                var ret []*records.StoredTransaction
                for _, tx := range txs {
                    if tx.WhenReceived >= v.From && tx.WhenReceived < v.Upto {
                        ret = append(ret, tx)
                    }
                }
                v.ResultChan <- ret
            case JournalCheckCapacityCommand:
                ret := cap(txs) == len(txs) && cap(txs) >= v.AtLeast
                log.Printf("checking capacity, returning %v\n", ret)
                v.ResultChan <- ret
            case JournalDoneCommand:
                log.Printf("was a done command %v\n", v)
                v.NotifyMe <- struct{}{}
                break whenDone
            default:
                log.Printf("not a valid journal command %v\n", x)
            }
        }
    }()
    return ret
}

JOURNAL_THREAD_COMPLETED:internal/storage/journal_thread.go

One of the interesting things that happens with these kinds of tests is that depending on how you arrange them, you can have different things that you can test. Here, we haven't copied across the wait point inside the loop, because if we did, it would block the thread, and then everything would end up deadlocked (because we couldn't then store another record). Instead, we have left that wait point back in the API, but there it does not block while the read is happening. In one sense, this is frustrating, but in a very real sense, we cannot test this behaviour for the very simple reason that it cannot any longer go wrong in this way. In other words, our simple inability to write the test tells us that we have eliminated the possibility of the problem.

In doing this, I have finally reached a point I had been expecting to reach some time ago. Based on my previous experience, I expected to need to "name" the individual fault injection points, and then control them independently. Up until now, tI have been able to control the points individually. But now, I believe that this change does introduce a real race condition between the Store and Read at the end, so I have named them and allowed them to be controlled independently. An additional benefit is that I am now able to produce better tracing.
2025/01/09 20:02:11 launching new journal thread with channel 0xc00010e310
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc000148080, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc000148080
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc000148080)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc0001480c0, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc0001480c0
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 1 at 0xc00011c050
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc0001480c0)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc0001480e0, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc0001480e0
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 2 at 0xc000118810
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc0001480e0)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc000148100, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc000148100
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 3 at 0xc000180000
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc000148100)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc000180020, waiting ...
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 4 at 0xc000180000
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc000180020
2025/01/09 20:02:11 checking capacity, returning true
2025/01/09 20:02:11 next(journal-read-txs) allocated 0xc000180060, waiting ...
2025/01/09 20:02:11 allocated(journal-read-txs) was 0xc000180060
2025/01/09 20:02:11 released(journal-store-tx, 0xc000180020)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc00007e020, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc00007e020
2025/01/09 20:02:11 released(journal-read-txs, 0xc000180060)
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 5 at 0xc00012a940
2025/01/09 20:02:11 reading txs = 0xc00012a940, len = 5
2025/01/09 20:02:11 [0xc000146230 0xc000146460 0xc000146690 0xc0001468c0 0xc0001920e0]
2025/01/09 20:02:11 next(journal-read-txs) allocated 0xc000148140, waiting ...
2025/01/09 20:02:11 allocated(journal-read-txs) was 0xc000148140
2025/01/09 20:02:11 released(journal-read-txs, 0xc000148140)
2025/01/09 20:02:11 reading txs = 0xc00012a940, len = 5
2025/01/09 20:02:11 [0xc000146230 0xc000146460 0xc000146690 0xc0001468c0 0xc0001920e0]
2025/01/09 20:02:11 next(pending-tx) allocated 0xc0001481c0, waiting ...
2025/01/09 20:02:11 allocated(pending-tx) was 0xc0001481c0
2025/01/09 20:02:11 non-allocated(pending-tx) after 50000000
2025/01/09 20:02:11 released(pending-tx, 0xc0001481c0)
2025/01/09 20:02:11 next(pending-tx) allocated 0xc000148200, waiting ...
2025/01/09 20:02:11 allocated(pending-tx) was 0xc000148200
2025/01/09 20:02:11 released(pending-tx, 0xc000148200)
PASS
Finally, here is what is left in the stubbed API:
type MemoryJournaller struct {
    name     string
    tothread chan<- JournalCommand
    finj     helpers.FaultInjection
}

// RecordTx implements Journaller.
func (d *MemoryJournaller) RecordTx(tx *records.StoredTransaction) error {
    d.finj.NextWaiter("journal-store-tx")
    d.tothread <- JournalStoreCommand{Tx: tx}
    return nil
}

func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error) {
    messageMe := make(chan []*records.StoredTransaction)
    d.finj.NextWaiter("journal-read-txs")
    d.tothread <- JournalRetrieveCommand{From: from, Upto: upto, ResultChan: messageMe}
    ret := <-messageMe
    return ret, nil
}

func (d *MemoryJournaller) Quit() error {
    return nil
}

func (d *MemoryJournaller) AtCapacityWithAtLeast(n int) bool {
    messageMe := make(chan bool)
    d.tothread <- JournalCheckCapacityCommand{AtLeast: n, ResultChan: messageMe}
    return <-messageMe
}

func NewJournaller(name string) Journaller {
    return TestJournaller(name, helpers.IgnoreFaultInjection())
}

func TestJournaller(name string, finj helpers.FaultInjection) Journaller {
    ret := MemoryJournaller{name: name, finj: finj}
    ret.tothread = LaunchJournalThread(name, finj)
    return &ret
}

JOURNAL_THREAD_COMPLETED:internal/storage/journal.go

Performance

The question will naturally arise as to what the performance implications of this change are, and how they relate to that of using a mutex. The real answer is "you need to run a large-scale performance test", which we are not ready to do yet, but the simple answer is that when you are trying to synchronize threads, there is always going to be a "critical section" that you are explicitly saying only one thread can be in at any one time and you want to make that as small as possible. It seems to me that in this case the critical section consists of the append in Record and the for..range in Read. The thread here has very little overhead other than that, and I believe the channel queuing mechanism (we have configured it to have a queue of 20 messages) enables it to smooth out the requests.

The obvious problem is actually the implementation of the Read method: this is scanning through all the messages, looking for ones that are in a given time range. If we want to improve the performance, we should probably decide "in advance" what the time bounds of the block are going to be, and keep a list of list of messages: one list for each block. It wouldn't surprise me if we came back to that in the fulness of time. But always remember that this is just a cheap "in-memory" implementation: what really counts is the production version built on a real database.