Publishing and handling the blocks is basically the same as dealing with the messages, so a lot of this code will seem familiar, and I'll skip a bunch of the details.
Publishing our blocks
The first thing we want to do is to publish the blocks from within the loop in the goroutine that creates and stores the blocks. In coming here, I notice that there is already duplication in this code and one of my fairly consistent rules is that I don't add duplication to duplication: it's time to refactor.So, first off, I'm going to extract that code and then add the corresponding code to publish to the senders on the path /remoteBlock:
func (builder *SleepBlockBuilder) buildRecordAndSend(prevTime types.Timestamp, currTime types.Timestamp, lastBlock *records.Block) *records.Block {
block := builder.buildBlock(prevTime, currTime, lastBlock)
builder.journaller.RecordBlock(block)
blob, err := block.MarshalBinary()
if err != nil {
log.Printf("Error marshalling block: %v %v\n", block.ID, err)
return block
}
for _, bs := range builder.senders {
go bs.Send("/remoteblock", blob)
}
return block
}
INTERNODE_PUBLISH_BLOCKS:internal/block/builder.go
This is called twice from the main loop above:func (builder *SleepBlockBuilder) Run() {
blocktime := builder.clock.Time()
timer := builder.clock.After(delay)
lastBlock, err := builder.blocker.Build(blocktime, nil, nil)
if err != nil {
panic("error returned from building block 0")
}
builder.journaller.RecordBlock(lastBlock)
for {
prev := blocktime
select {
case pingback := <-builder.control:
log.Printf("%s asked to build final block and quit\n", builder.Name.String())
builder.buildRecordAndSend(prev, builder.clock.Time(), lastBlock)
pingback.Send()
return
case blocktime = <-timer:
timer = builder.clock.After(delay)
nowis := <-builder.clock.After(pause)
// we are ready to build a block
log.Printf("%s building block at %s\n", builder.Name.String(), nowis.IsoTime())
lastBlock = builder.buildRecordAndSend(prev, blocktime, lastBlock)
}
}
}
INTERNODE_PUBLISH_BLOCKS:internal/block/builder.go
It may seem an error that one leg stores the returned block (in lastBlock) and the other doesn't, but the first leg almost immediately returns - it is tidying up, after all - and so there will not be an opportunity to use the returned value even if we did store it (and attempting to store it, as I originally did, is a compile-time error in Go).We also need to be able to marshal the block into a binary buffer in the same way as we did the StoredTransaction:
func (b *Block) MarshalBinary() ([]byte, error) {
ret := types.NewBinaryMarshallingBuffer()
b.ID.MarshalBinaryInto(ret)
b.PrevID.MarshalBinaryInto(ret)
types.MarshalStringInto(ret, b.BuiltBy.String())
b.UpUntil.MarshalBinaryInto(ret)
types.MarshalInt32Into(ret, int32(len(b.Txs)))
for _, tx := range b.Txs {
tx.MarshalBinaryInto(ret)
}
b.Signature.MarshalBinaryInto(ret)
return ret.Bytes(), nil
}
INTERNODE_PUBLISH_BLOCKS:internal/records/block.go
And, as seems to be always the case at the moment, we need to do a bunch of plumbing to make sure that everything else lines up. Specifically, we needed to make sure that the Block generator goroutine had the list of binary senders.Then we can run the harness and check that something happens:
2025/03/03 14:43:05 sending blob(458) to http://localhost:5002/remoteblockSomething is happening. I suspect that the EOF here does not so much "not handled" as "the node is shutting down". Because we send out a final block just before shutting down, the remote node has probably closed its listener before it gets around to processing the block. We will have to fix that, but I'm going to press on writing more code first.
2025/03/03 14:43:05 error sending to http://localhost:5002/remoteblock: Post "http://localhost:5002/remoteblock": EOF
Listening for the Remote Blocks
In the same way as we have a handler for /remotetx, we need a handler for /remoteblock.func (node *ListenerNode) startAPIListener(resolver Resolver, journaller storage.Journaller, senders []internode.BinarySender) {
cliapi := http.NewServeMux()
pingMe := PingHandler{}
cliapi.Handle("/ping", pingMe)
storeRecord := NewRecordStorage(resolver, journaller, senders)
cliapi.Handle("/store", storeRecord)
remoteTxHandler := internode.NewTransactionHandler(node.config)
cliapi.Handle("/remotetx", remoteTxHandler)
remoteBlockHandler := internode.NewBlockHandler(node.config)
cliapi.Handle("/remoteblock", remoteBlockHandler)
node.server = &http.Server{Addr: node.config.ListenOn(), Handler: cliapi}
err := node.server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
fmt.Printf("error starting server: %s\n", err)
}
}
INTERNODE_HANDLE_BLOCKS:internal/clienthandler/node.go
And that is implemented in a new file, blockhandler.go (while I'm about this, I'm also refactoring and renaming various other files, classes and methods):package internode
import (
"fmt"
"io"
"log"
"net/http"
"github.com/gmmapowell/ChainLedger/internal/config"
"github.com/gmmapowell/ChainLedger/internal/records"
)
type BlockHandler struct {
nodeConfig config.LaunchableNodeConfig
}
// ServeHTTP implements http.Handler.
func (t *BlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
buf, err := io.ReadAll(req.Body)
if err != nil {
log.Printf("could not read the buffer from the request")
return
}
log.Printf("have received an internode block length: %d\n", len(buf))
block, err := records.UnmarshalBinaryBlock(buf)
if err != nil {
log.Printf("could not unpack the internode block: %v\n", err)
return
}
log.Printf("unmarshalled message to: %v\n", block)
publishedBy := block.BuiltBy.String()
storer := t.nodeConfig.RemoteStorer(publishedBy)
if storer == nil {
log.Printf("could not find a handler for remote node %s\n", publishedBy)
return
}
err = storer.StoreBlock(block)
if err != nil {
panic(fmt.Sprintf("failed to store remote transaction: %v", err))
}
}
func NewBlockHandler(c config.LaunchableNodeConfig) *BlockHandler {
return &BlockHandler{nodeConfig: c}
}
INTERNODE_HANDLE_BLOCKS:internal/internode/blockhandler.go
The correspondence to the code for handling stored transactions should be obvious (and probably the fact that this is basically a copy-paste-modify of that).This depends on being able to unmarshal the binary blob we marshalled when publishing the transaction:
func UnmarshalBinaryBlock(bytes []byte) (*Block, error) {
block := Block{}
buf := types.NewBinaryUnmarshallingBuffer(bytes)
var err error
block.ID, err = types.UnmarshalHashFrom(buf)
if err != nil {
return nil, err
}
block.PrevID, err = types.UnmarshalHashFrom(buf)
if err != nil {
return nil, err
}
cls, err := types.UnmarshalStringFrom(buf)
if err != nil {
return nil, err
}
block.BuiltBy, err = url.Parse(cls)
if err != nil {
return nil, err
}
block.UpUntil, err = types.UnmarshalTimestampFrom(buf)
if err != nil {
return nil, err
}
ntxs, err := types.UnmarshalInt32From(buf)
if err != nil {
return nil, err
}
block.Txs = make([]types.Hash, ntxs)
for i := 0; i < int(ntxs); i++ {
block.Txs[i], err = types.UnmarshalHashFrom(buf)
if err != nil {
return nil, err
}
}
block.Signature, err = types.UnmarshalSignatureFrom(buf)
if err != nil {
return nil, err
}
err = buf.ShouldBeDone()
if err != nil {
return nil, err
}
return &block, nil
}
INTERNODE_HANDLE_BLOCKS:internal/records/block.go
Let's try running the harness again:2025/03/03 15:11:16 error sending to http://localhost:5002/remoteblock: Post "http://localhost:5002/remoteblock": dial tcp 127.0.0.1:5002: connect: connection refusedThere is clearly a problem with the process terminating before it handles the /remoteblock request. Let's fix that.
Making the Node Wait to Shut Down
As with everything else in life, there is a right way to do this and a wrong way. And, as is generally the case, the "right" way is a lot more pain and hassle than the "wrong" way. So I'm going to do it the "wrong" way for now. We may revisit this later when we have all the code in place (that will not be today).The right way to do it would be to track all the blocks that were generated, and not allow any of the nodes in the harness to shut down until:
- all the nodes were ready to shut down;
- all the nodes reported that they had received all the blocks;
- but still cope with a failure and eventually terminate if a block was not sent for some reason.
Currently, the main harness code waits for all the clients to finish, and then starts terminating the nodes. Whatever we do, this is where we need to do it.
for _, c := range clients {
c.Begin()
}
for _, c := range clients {
c.WaitFor()
}
for _, n := range nodes {
n.Terminate()
}
INTERNODE_HANDLE_BLOCKS:cmd/harness/main.go
For now, I'm just going to add a sleep for two seconds. Unfortunately, when I do this, I discover that it is only during Terminate that we build and publish the final block. So, we need to split Terminate into ClientsDone and Terminate, so that we can wait between the two:for _, c := range clients {
c.Begin()
}
for _, c := range clients {
c.WaitFor()
}
for _, n := range nodes {
n.ClientsDone()
}
time.Sleep(2 * time.Second)
for _, n := range nodes {
n.Terminate()
}
INTERNODE_WAIT_TERMINATE:cmd/harness/main.go
And then we need to do the refactoring in ListenerNode:func (node *ListenerNode) ClientsDone() {
node.waitChan = make(types.Signal)
node.Control <- node.waitChan.Sender()
}
func (node *ListenerNode) Terminate() {
node.server.Shutdown(context.Background())
<-node.waitChan
node.journaller.Quit()
log.Printf("node %s finished\n", node.Name())
}
INTERNODE_WAIT_TERMINATE:internal/clienthandler/node.go
And, magically, it works:2025/03/03 16:50:58 unmarshalled block message to: Block[EBXNbN907S4H7tkfU3mCwLO34yakpfpUcLidKsnawaSsKOOpZ5bMUdBuvhnApGHs9EGhAHZ9D4XZRb9LgxKiXg==](Ignoring the fact that the copied message says "remote tx" instead of "remote block".)
2025/03/03 16:50:58 asked to check and store remote tx
Now we can go back and check the integrity of the block and actually store it.
Checking the block
First off, we need to check for fraud by re-hashing the block and then checking both the hash against the ID and that the signature is a valid signature for the publisher:func (cas *CheckAndStore) StoreBlock(block *records.Block) error {
err := block.VerifySignature(cas.hasher, cas.signer, cas.key)
if err != nil {
return err
}
return nil
}
INTERNODE_VERIFY_BLOCK_SIGNATURE:internal/storage/remotestorer.go
Which requires implementing VerifySignature:func (b *Block) VerifySignature(hasher helpers.HasherFactory, signer helpers.Signer, pub *rsa.PublicKey) error {
id := b.HashMe(hasher)
if !id.Is(b.ID) {
return fmt.Errorf("remote block id %s was not the result of computing it locally: %s", b.ID.String(), id.String())
}
return signer.Verify(pub, id, b.Signature)
}
INTERNODE_VERIFY_BLOCK_SIGNATURE:internal/records/block.go
Which in turn requires us to extract the hashing of the block into a function HashMe:func (b *Block) HashMe(hf helpers.HasherFactory) types.Hash {
hasher := hf.NewHasher()
hasher.Write(b.PrevID)
hasher.Write([]byte(b.BuiltBy.String()))
hasher.Write([]byte("\n"))
hasher.Write(b.UpUntil.AsBytes())
for _, m := range b.Txs {
hasher.Write(m)
}
return hasher.Sum(nil)
}
INTERNODE_VERIFY_BLOCK_SIGNATURE:internal/records/block.go
Note that unlike in the transaction case, this is a public method because the block is built externally, and so this function will need to be called from that location. On the subject of which, since we extracted the function above from the block building function, we need to update that as well:func (b Blocker) Build(to types.Timestamp, last *records.Block, txs []*records.StoredTransaction) (*records.Block, error) {
ls := "<none>"
var lastID types.Hash
if last != nil {
ls = last.String()
lastID = last.ID
}
log.Printf("Building block before %s, following %s with %d records\n", to.IsoTime(), ls, len(txs))
txids := make([]types.Hash, len(txs))
for i, tx := range txs {
txids[i] = tx.TxID
}
block := &records.Block{
UpUntil: to,
BuiltBy: b.name,
PrevID: lastID,
Txs: txids,
}
var err error
block.ID = block.HashMe(b.hasher)
block.Signature, err = b.signer.Sign(b.pk, types.Hash(block.ID))
if err != nil {
return nil, err
}
return block, nil
}
INTERNODE_VERIFY_BLOCK_SIGNATURE:internal/block/blocker.go
While I was there, I discovered that the set of transaction IDs was just always set to nil, which is not going to work well, so I updated that to have the correct list of IDs.That deals with the "fraud" case, where the block is inconsistent or incorrectly signed. But in addition to checking the hash and signature of the block, we also need to check that we have received the previous block and all the messages it claims to summarize in Txs. These errors are of a different nature: the most likely cause is some kind of transmission or transient error, or else that they are still "in transit". No matter what, we should collect a list of them all and send it back to the remote node and ask them to be sent again. We will come back to this, but only when we want to test it properly (which is the main point of this whole project, but it feels like I have lost sight of this at the moment and I'm just trying to reach some version of "feature-complete"; bear with me).
So we will start by adding two methods to the journal to check if it has a slice of transactions and a block. We're going to do it in this way because we "know" that we are going to have to pass these requests across a channel to a goroutine, and I "suspect" that is going to be slow if we do it one at a time. This may not be true, but I suspect significant latency if we are going to wait for each response in turn. We wouldn't have to do it that way, but separating the sending and the waiting would add significant complexity to the code.
So, starting with StoreBlock again:
func (cas *CheckAndStore) StoreBlock(block *records.Block) error {
err := block.VerifySignature(cas.hasher, cas.signer, cas.key)
if err != nil {
return err
}
hasBlock := cas.journal.HasBlock(block.PrevID)
if !hasBlock {
return fmt.Errorf("block %v does not have prev %v", block.ID, block.PrevID)
}
missingTxs := cas.journal.CheckTxs(block.Txs)
if missingTxs != nil {
return fmt.Errorf("block %v does not have %d txs", block.ID, len(missingTxs))
}
return nil
}
INTERNODE_CHECK_BLOCK_PRESENT:internal/storage/remotestorer.go
We can implement these methods in the API:type Journaller interface {
RecordTx(tx *records.StoredTransaction) error
RecordBlock(block *records.Block) error
HasBlock(id types.Hash) bool
CheckTxs(ids []types.Hash) []types.Hash
ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error)
Quit() error
}
INTERNODE_CHECK_BLOCK_PRESENT:internal/storage/journal.go
And in the MemoryJournaller:func (d *MemoryJournaller) HasBlock(id types.Hash) bool {
messageMe := make(chan bool)
d.finj.NextWaiter("journal-has-block")
d.tothread <- JournalHasBlockCommand{ID: id, ResultChan: messageMe}
ret := <-messageMe
return ret
}
func (d *MemoryJournaller) CheckTxs(ids []types.Hash) []types.Hash {
messageMe := make(chan []types.Hash)
d.finj.NextWaiter("journal-check-txs")
d.tothread <- JournalCheckTxsCommand{IDs: ids, ResultChan: messageMe}
ret := <-messageMe
return ret
}
INTERNODE_CHECK_BLOCK_PRESENT:internal/storage/journal.go
This requires quite a bit of tinkering with the threaded journal. We need the new request types:type JournalHasBlockCommand struct {
ID types.Hash
ResultChan chan<- bool
}
type JournalCheckTxsCommand struct {
IDs []types.Hash
ResultChan chan<- []types.Hash
}
INTERNODE_CHECK_BLOCK_PRESENT:internal/storage/journal_thread.go
And these need implementing in the monster LaunchJournalThread function:func LaunchJournalThread(name string, finj helpers.FaultInjection) chan<- JournalCommand {
var txs []*records.StoredTransaction
var blocks []*records.Block
ret := make(chan JournalCommand, 20)
log.Printf("launching new journal thread with channel %p\n", ret)
go func() {
whenDone:
for {
x := <-ret
switch v := x.(type) {
case JournalStoreCommand:
txs = append(txs, v.Tx)
log.Printf("%s recording tx with id %v, have %d at %p\n", name, v.Tx.TxID, len(txs), txs)
case JournalBlockCommand:
blocks = append(blocks, v.Block)
log.Printf("%s recording block with id %v, have %d at %p\n", name, v.Block.ID, len(blocks), blocks)
case JournalRetrieveCommand:
log.Printf("reading txs = %p, len = %d\n", txs, len(txs))
var ret []*records.StoredTransaction
for _, tx := range txs {
if tx.WhenReceived >= v.From && tx.WhenReceived < v.Upto {
ret = append(ret, tx)
}
}
v.ResultChan <- ret
case JournalCheckCapacityCommand:
ret := cap(txs) == len(txs) && cap(txs) >= v.AtLeast
log.Printf("checking capacity, returning %v\n", ret)
v.ResultChan <- ret
case JournalHasBlockCommand:
for _, b := range blocks {
if b.ID.Is(v.ID) {
v.ResultChan <- true
continue whenDone
}
}
v.ResultChan <- false
case JournalCheckTxsCommand:
tmp := make([]types.Hash, len(v.IDs))
copy(tmp, v.IDs)
nextTx:
// go through all the TXs we _do_ have
for _, tx := range txs {
// is it in the list they are asking about
for pos, id := range tmp {
if id.Is(tx.TxID) {
// if so, remove it and move on
tmp[pos] = tmp[len(tmp)-1]
tmp = tmp[:len(tmp)-1]
continue nextTx
}
}
}
if len(tmp) == 0 {
v.ResultChan <- nil
} else {
v.ResultChan <- tmp
}
case JournalDoneCommand:
log.Printf("was a done command %v\n", v)
v.NotifyMe <- struct{}{}
break whenDone
default:
log.Printf("not a valid journal command %v\n", x)
}
}
}()
return ret
}
INTERNODE_CHECK_BLOCK_PRESENT:internal/storage/journal_thread.go
This function has grown very long and there is definitely a case to be made for breaking it up, possibly inverting it to have the main loop dispatch the "current state" to the commands. Again, pull requests welcome.Unsurprisingly, this fails with the error that it cannot find the previous block:
2025/03/04 10:01:19 http: panic serving [::1]:52059: failed to store remote transaction: block 4a113c0023594ee58d317133566cd67c1faff3807668831b6440a4f43f284741f14c3062f3f38a5b6f87b5d30a191b56a8f10c6bd7eb4b642aea08969b65436c does not have prev e133f474c621709530e098a1c8e212cd070fb964e5c1e30d05024896a6445e4e49129c931d7e59a43670dad78761df852247c0fa712643c05d763410636eb1f3Why? Because while we've done all the groundwork to check that the block can be stored, we haven't actually stored any of them yet! So, when the second block comes along saying "I follow this block", we can't find the referent. Let's fix that:
func (cas *CheckAndStore) StoreBlock(block *records.Block) error {
err := block.VerifySignature(cas.hasher, cas.signer, cas.key)
if err != nil {
return err
}
hasBlock := cas.journal.HasBlock(block.PrevID)
if !hasBlock {
return fmt.Errorf("block %v does not have prev %v", block.ID, block.PrevID)
}
missingTxs := cas.journal.CheckTxs(block.Txs)
if missingTxs != nil {
return fmt.Errorf("block %v does not have %d txs", block.ID, len(missingTxs))
}
return cas.journal.RecordBlock(block)
}
INTERNODE_SEND_INITIAL_BLOCK:internal/storage/remotestorer.go
Stop Right There!This doesn't actually fix the problem. Why not? Digging in a little deeper, we send all the subsequent transactions out to the remote nodes, but not the initial transaction we built at the top of the loop. Fixing that is a little tricky, but at the end of the day just a refactoring (it is mainly tricky because of the project dependencies; BinarySender needs to be moved from the internode package to the helpers package).
The main change is this one:
func (builder *SleepBlockBuilder) Run() {
blocktime := builder.clock.Time()
timer := builder.clock.After(delay)
lastBlock, err := builder.blocker.Build(blocktime, nil, nil)
if err != nil {
panic("error returned from building block 0")
}
builder.journaller.RecordBlock(lastBlock)
lastBlock.MarshalAndSend(builder.senders)
INTERNODE_SEND_INITIAL_BLOCK:internal/block/builder.go
where MarshalAndSend is the code to marshal and send a block extracted from BuildRecordAndSend and put in a public function in Block:func (b *Block) MarshalAndSend(senders []helpers.BinarySender) {
blob, err := b.MarshalBinary()
if err != nil {
log.Printf("Error marshalling block: %v %v\n", b.ID, err)
return
}
for _, bs := range senders {
go bs.Send("/remoteblock", blob)
}
}
INTERNODE_SEND_INITIAL_BLOCK:internal/records/block.go
And yet it still doesn't work. Looking more closely, however, we can see a subtle shift in the first error that appears:2025/03/04 10:25:49 http: panic serving [::1]:52154: failed to store remote transaction: block cbb0463a23e65bdcf5e3e74401f2f12e5c75c13096a5689dd33013056cd0849022e2ff3f6634cfa09c98bc3e82dff433a542e23877f1813dbd2324c7c759e2bc does not have prevSpecifically, it does not have a value for prev. This is because the first block specifically has a nil PrevID. So we need to look for that case in the code that is checking remote blocks and specifically allow it. On trying this, however, it would seem that marshalling translates that to an empty slice. As it happens, len(nil) in Go returns 0, so this code handles both cases:
func (cas *CheckAndStore) StoreBlock(block *records.Block) error {
err := block.VerifySignature(cas.hasher, cas.signer, cas.key)
if err != nil {
return err
}
if len(block.PrevID) > 0 {
hasBlock := cas.journal.HasBlock(block.PrevID)
if !hasBlock {
return fmt.Errorf("block %v does not have prev %v", block.ID, block.PrevID)
}
missingTxs := cas.journal.CheckTxs(block.Txs)
if missingTxs != nil {
return fmt.Errorf("block %v does not have %d txs", block.ID, len(missingTxs))
}
}
return cas.journal.RecordBlock(block)
}
INTERNODE_HANDLE_EMPTY_PREVID:internal/storage/remotestorer.go
It's worth pointing out that we could be a lot more assertive here. If this really is the first block from this node, the list of blocks in the journal will be empty: we could check that. The first block must not have any transaction ids, so we could check that. It would be over-reaching to check that the journal did not have any transactions, since they might have arrived before the initial (empty) block. But we could check that the timestamp of the initial block pre-dated any of the transactions we have received. I'm not going to do that: I believe the signature is enough to protect me against fraud, and while it might be a good idea to "program defensively" in this way, this is just a demo. As always, send me pull requests.And now it does run through.
No comments:
Post a Comment