Monday, February 3, 2025

Internode Communication

Now that the nodes know about each other, they can communicate with each other. This consists of sending messages and receiving them. For now, all we want to do is broadcast every thought we have to all the other nodes and have them be accepted.

Sending Transactions

It's always hard to know whether to write the sender or receiver first, but given that it is clearer that the code is "present", if not necessarily correct, I'm going to do the sending first with the expectation that I will see some kind of error because the URL does not exist.

The idea is going to be that every time we create an item and store it in the journal, we will then also turn around and ask the configuration what all the other nodes are and then send the message (in binary form) to a URL constructed of the node URL and a path, say /remotetx.

So, let's review the code in clienthandler I'm planning to change:
// ServeHTTP implements http.Handler.
func (r RecordStorage) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
    log.Printf("asked to store record with length %d\n", req.ContentLength)

    body, err := io.ReadAll(req.Body)
    if err != nil {
        log.Printf("Error: %v\n", err)
        resp.WriteHeader(http.StatusBadRequest)
        return
    }
    log.Printf("have json input %s\n", string(body))

    var tx = api.Transaction{}
    err = json.Unmarshal(body, &tx)
    if err != nil {
        log.Printf("Error unmarshalling: %v\n", err)
        resp.WriteHeader(http.StatusBadRequest)
        return
    }

    log.Printf("Have transaction %v\n", &tx)
    if stx, err := r.resolver.ResolveTx(&tx); stx != nil {
        r.journal.RecordTx(stx)
    } else if err != nil {
        log.Printf("Error resolving tx: %v\n", err)
        resp.WriteHeader(http.StatusInternalServerError)
        return
    } else {
        log.Printf("have acknowledged this transaction, but not yet ready")
    }
}

REDO_CONFIG:internal/clienthandler/recordstorage.go

This is a method on the class RecordStorage which has the following members:
type RecordStorage struct {
    resolver Resolver
    journal  storage.Journaller
}

REDO_CONFIG:internal/clienthandler/recordstorage.go

What I want to do then is to have an array of endpoints to connect to and send the binary version of the stored transaction to each one. What does that look like? Well, what I have "in my hand", as it were, from the last chapter is a slice of NodeConfigs for the OtherNodes. But what I want here is something that can send a binary blob. Given that I would like to be able to test this code, I think what I'm going to do is declare a BinarySender interface and require that we are given a slice of those. (Note: I actually have no intention of actually writing any unit tests for this specific code because I am too lazy, but I will happily accept any pull requests; I am merely making sure that it would be testable.)

The interface looks like this:
package internode

type BinarySender interface {
    Send(path string, blob []byte)
}

INTERNODE_TRANSACTIONS:internal/internode/binarysender.go

We can do the work here to make sure we have these passed in and available to us:
type RecordStorage struct {
    resolver Resolver
    journal  storage.Journaller
    senders  []internode.BinarySender
}

func NewRecordStorage(r Resolver, j storage.Journaller, senders []internode.BinarySender) RecordStorage {
    return RecordStorage{resolver: r, journal: j, senders: senders}
}

INTERNODE_TRANSACTIONS:internal/clienthandler/recordstorage.go

And then we can marshal the transaction to binary and send it over the wire:
// ServeHTTP implements http.Handler.
func (r RecordStorage) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
    log.Printf("asked to store record with length %d\n", req.ContentLength)

    body, err := io.ReadAll(req.Body)
    if err != nil {
        log.Printf("Error: %v\n", err)
        resp.WriteHeader(http.StatusBadRequest)
        return
    }
    log.Printf("have json input %s\n", string(body))

    var tx = api.Transaction{}
    err = json.Unmarshal(body, &tx)
    if err != nil {
        log.Printf("Error unmarshalling: %v\n", err)
        resp.WriteHeader(http.StatusBadRequest)
        return
    }

    log.Printf("Have transaction %v\n", &tx)
    if stx, err := r.resolver.ResolveTx(&tx); stx != nil {
        r.journal.RecordTx(stx)
        blob, err := stx.MarshalBinary()
        if err != nil {
            log.Printf("Error marshalling tx: %v %v\n", tx.ID(), err)
            return
        }
        for _, bs := range r.senders {
            go bs.Send("/remotetx", blob)
        }
    } else if err != nil {
        log.Printf("Error resolving tx: %v\n", err)
        resp.WriteHeader(http.StatusInternalServerError)
        return
    } else {
        log.Printf("have acknowledged this transaction, but not yet ready")
    }
}

INTERNODE_TRANSACTIONS:internal/clienthandler/recordstorage.go

Note that when we invoke the sender, we do so in a new goroutine. This ensures that all of our messages are sent in parallel, which is really important because almost all network communication is a question of sitting around for a long time waiting for responses. Especially when you are doing HTTPS requests halfway around the world (which is absolutely our intention here).

That leaves us with a few loose ends. Firstly, we aren't passing in any BinarySenders to the NewRecordStorage constructor. I'm going to go off and quietly sort that out since it's mainly a bookkeeping exercise turning NodeConfigs into HttpBinarySender objects. Secondly, there isn't in fact an HttpBinarySender object, so we need to create one.
package internode

import (
    "bytes"
    "log"
    "net/http"
    "net/url"
)

type HttpBinarySender struct {
    cli *http.Client
    url *url.URL
}

// Send implements BinarySender.
func (h *HttpBinarySender) Send(path string, blob []byte) {
    tourl := h.url.JoinPath(path).String()
    log.Printf("sending blob(%d) to %s\n", len(blob), tourl)
    resp, err := h.cli.Post(tourl, "application/octet-stream", bytes.NewReader(blob))
    if err != nil {
        log.Printf("error sending to %s: %v\n", tourl, err)
    } else if resp.StatusCode/100 != 2 {
        log.Printf("bad status code sending to %s: %d\n", tourl, resp.StatusCode)
    }
}

func NewHttpBinarySender(url *url.URL) BinarySender {
    return &HttpBinarySender{cli: &http.Client{}, url: url}
}

INTERNODE_TRANSACTIONS:internal/internode/httpbinarysender.go

We create one of these for each of the remote nodes. When we create it, we create a new http.Client instance. I'm assuming that these can be used from multiple threads at the same time, but I'm not entirely sure. We also store the "base url" which comes from the "name" of the remote node.

When Send is called, we append the relative path which has been passed in to the base url, and then call Post on the client, specifying the binary MIME type application/octet-stream and the binary blob. Finally we check for errors and invalid status returns.

And finally, we don't currently have a method to marshal the stored transaction to binary format, so we want to add that. It's all a bit clumsy, but the main method looks like this:
func (s *StoredTransaction) MarshalBinary() ([]byte, error) {
    ret := types.NewBinaryMarshallingBuffer()
    s.TxID.MarshalBinaryInto(ret)
    s.WhenReceived.MarshalBinaryInto(ret)
    types.MarshalStringInto(ret, s.ContentLink.String())
    s.ContentHash.MarshalBinaryInto(ret)
    types.MarshalInt32Into(ret, int32(len(s.Signatories)))
    for _, sg := range s.Signatories {
        sg.MarshalBinaryInto(ret)
    }
    s.NodeSig.MarshalBinaryInto(ret)
    return ret.Bytes(), nil
}

INTERNODE_TRANSACTIONS:internal/records/storedtransaction.go

All of the subsidiary methods are implemented in various files in the types package. You can look at them if you want, but they all have fairly obvious implementations. At the end of the day, marshalling everything comes down to either an integer or a length followed by a byte slice.

When we run the harness tests, we see these messages come out:
2025/01/11 21:13:04 sending blob(998) to http://localhost:5002/remotetx
2025/01/11 21:13:04 bad status code sending to http://localhost:5002/remotetx: 404
This confirms that we have built a blob (a total of 998 bytes long) and that it was sent but not received. The 404 code of course means that there wasn't anyone at home listening for a /remotetx request.

Maybe we should fix that.

Receiving transactions

On the other end, we want to implement a handler for the /remotetx request and within that handler unmarshal the binary buffer. In the fulness of time we want to check that the message was valid and store it, as well as starting to build up a block identical to the one on the originating server. But that's for later. For now, can we just get far enough that the 404s go away and we have a StoredTransaction object in our hands?

First, let's just make the message go away:
func (node *ListenerNode) startAPIListener(resolver Resolver, journaller storage.Journaller) {
    cliapi := http.NewServeMux()
    pingMe := PingHandler{}
    cliapi.Handle("/ping", pingMe)
    senders := make([]internode.BinarySender, len(node.config.OtherNodes()))
    for i, n := range node.config.OtherNodes() {
        senders[i] = internode.NewHttpBinarySender(n.Name())
    }
    storeRecord := NewRecordStorage(resolver, journaller, senders)
    cliapi.Handle("/store", storeRecord)
    remoteTxHandler := internode.NewTransactionHandler()
    cliapi.Handle("/remotetx", remoteTxHandler)
    node.server = &http.Server{Addr: node.config.ListenOn(), Handler: cliapi}
    err := node.server.ListenAndServe()
    if err != nil && !errors.Is(err, http.ErrServerClosed) {
        fmt.Printf("error starting server: %s\n", err)
    }
}

INTERNODE_NO_404:internal/clienthandler/node.go

which of course requires an implementation of internode.NewTransactionHandler:
package internode

import (
    "io"
    "log"
    "net/http"
)

type TransactionHandler struct {
}

// ServeHTTP implements http.Handler.
func (t *TransactionHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
    buf, _ := io.ReadAll(req.Body)
    log.Printf("have received an internode request length: %d\n", len(buf))
}

func NewTransactionHandler() *TransactionHandler {
    return &TransactionHandler{}
}

INTERNODE_NO_404:internal/internode/transactionhandler.go

And then when we run the harness again, we receive positive affirmation from the recipient rather than errors from the sender:
2025/01/11 21:31:41 http://localhost:5001 recording tx with id [4 218 55 214 175 54 220 250 78 117 196 247 29 62 161 49 195 14 174 5 130 83 68 180 28 142 193 230 200 152 149 6 200 103 247 206 210 105 168 58 109 68 240 129 167 170 44 72 245 166 178 5 137 126 30 96 51 3 107 47 13 169 33 22], have 12 at 0xc000500080
2025/01/11 21:31:41 sending blob(998) to http://localhost:5002/remotetx
2025/01/11 21:31:41 have received an internode request length 998
Excellent. Now let's unmarshal the buffer. This is basically the mirror image of what we did for marshalling, so as before, I'll just show you the overview. This is what happens in the HTTP handler:
func (t *TransactionHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
    buf, err := io.ReadAll(req.Body)
    if err != nil {
        log.Printf("could not read the buffer from the request")
        return
    }
    log.Printf("have received an internode request length: %d\n", len(buf))
    stx, err := records.UnmarshalBinaryStoredTransaction(buf)
    if err != nil {
        log.Printf("could not unpack the internode message")
        return
    }
    log.Printf("unmarshalled message to: %v\n", stx)
}

INTERNODE_UNMARSHALLING:internal/internode/transactionhandler.go

And we need to add this code to unmarshal the stored transaction:
func UnmarshalBinaryStoredTransaction(bytes []byte) (*StoredTransaction, error) {
    buf := types.NewBinaryUnmarshallingBuffer(bytes)
    stx := StoredTransaction{}
    stx.TxID, _ = types.UnmarshalHashFrom(buf)
    stx.WhenReceived, _ = types.UnmarshalTimestampFrom(buf)
    cls, _ := types.UnmarshalStringFrom(buf)
    stx.ContentLink, _ = url.Parse(cls)
    stx.ContentHash, _ = types.UnmarshalHashFrom(buf)
    nsigs, _ := types.UnmarshalInt32From(buf)
    stx.Signatories = make([]*types.Signatory, nsigs)
    for i := 0; i < int(nsigs); i++ {
        stx.Signatories[i], _ = types.UnmarshalSignatoryFrom(buf)
    }
    _ = buf.ShouldBeDone()
    return &stx, nil
}

INTERNODE_UNMARSHALLING:internal/records/storedtransaction.go

(Note that to make this easier to understand, I have omitted all the error handling for now; I am going to go back and add that now.)

And then, running the harness again, we get a lot of interleaved messages, but these might be a pair:
2025/02/03 14:59:11 sending blob(1002) to http://localhost:5002/remotetx
2025/02/03 14:59:11 have received an internode request length: 1002
2025/02/03 14:59:11 unmarshalled message to: &{[222 208 107 140 79 14 157 97 132 106 128 254 150 37 195 208 157 67 234 94 15 14 57 28 42 106 84 33 94 158 113 99 114 59 115 191 11 163 49 7 12 78 173 152 174 234 179 80 103 172 83 80 58 123 30 24 29 177 103 29 74 187 45 168] 1738594751553 http://tx.info/t23ehi_zz07 [142 222 72 165 226 123 251 31 164 239 135 236 222 53 131 204 225 168 66 127 56 173 120 145 187 223 122 203 98 144 15 113 206 250 116 90 25 169 146 58 255 210 166 155 120 29 147 1 37 150 218 82 253 40 51 148 104 17 37 20 240 169 255 83] [0xc000016480 0xc0000164a0] []}

Conclusion

Great! We've managed to send across all the transactions from all the nodes to all the other nodes and, on arrival, unpack that transaction so that we're ready to use it.

Next time, we'll check the signatures and then we'll store it on the "other nodes" along with their own transactions.

No comments:

Post a Comment