We have calls to methods in the journal which are not yet implemented. So when our node runs in the wild, it neither stores the transactions (even in memory!) or recovers any transactions to hash into a block. We aren't going to get very far without that!
Storing the Messages
In one sense, what we are going to do here is not a "serious" implementation. For a start, it is going to store the data in memory, which obviously won't survive the process dying, let alone the whole machine rebooting. But I am working on the principle of "the simplest thing that could possibly work" and, when we need it, we can either come back and save this to disk, or we can replace the whole thing with a more reasonable implementation (spoiler: we will use DynamoDB when we move to AWS much later). None of this will affect any other code in the system: this is the joy of programming to interfaces.So, for now, all we are going to do is store all the messages in a slice, appending as we go.
type MemoryJournaller struct {
name string
txs []*records.StoredTransaction
}
// RecordTx implements Journaller.
func (d *MemoryJournaller) RecordTx(tx *records.StoredTransaction) error {
d.txs = append(d.txs, tx)
fmt.Printf("%s recording tx with id %v, have %d\n", d.name, tx.TxID, len(d.txs))
return nil
}
func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]records.StoredTransaction, error) {
return nil, nil
}
func NewJournaller(name string) Journaller {
return &MemoryJournaller{name: name}
}
JOURNAL_MEMORY_RECORD:internal/storage/journal.go
I have copied out the code for DummyJournaller and just added a few lines: the txs slice in the object and the append line in RecordTx. The astute observer will notice I also included the node name in the logging message, which involved the usual plumbing to get it where it was needed. The message has also been updated to include the number of messages currently being stored.Retrieving the Messages
We want to find messages in a certain time range (specifically, the time range of messages to be included in the block) and the easiest way to do this is to scan along the list of messages looking for ones whose time is in that range. No, obviously this is not at all efficient. But it is simple and it could possibly work, at least in the current context - we can do this differently later if we need to.func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error) {
var ret []*records.StoredTransaction
for _, tx := range d.txs {
if tx.WhenReceived >= from && tx.WhenReceived < upto {
ret = append(ret, tx)
}
}
return ret, nil
}
JOURNAL_MEMORY_QUERY:internal/storage/journal.go
This is code and I believe it works - but only because I wrote it and it looks really simple. I haven't written any tests for it - because I generally don't test code I'm only writing to support testing - and, if you look closely, you'll see that it never gets executed. This is because when I run the harness it comes to an end so quickly that we never get as far as building a block, so we never need to retrieve the messages.To ameliorate that, I'm going to build a much bigger configuration and see if we can't make it last at least the five seconds we currently have configured as the block interval.
The problem is that before we get around to writing a block, we end up having concurrency issues:
fatal error: concurrent map read and map writeNow, I don't know about you, but I was totally expecting this. Although, looking at the whole error message, I wasn't expecting was to have goroutines with IDs in the 10,000s but we do:
goroutine 10305 [running]:
github.com/gmmapowell/ChainLedger/internal/storage.MemoryPendingStorage.PendingTx({0xc0001da3c0}, 0xc000444740)
/home/gareth/Projects/ChainLedger/internal/storage/pending.go:16 +0xde
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x987af8, 0xb965a0}, {0x9865e0, 0xb965a0}, {0x986600, 0xb965a0}, 0xc000128000, {0x987080, 0xc0000b6050}}, 0xc000444740)
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:25 +0x91
github.com/gmmapowell/ChainLedger/internal/clienthandler.RecordStorage.ServeHTTP({{0x986540, 0xc000214000}, {0x988468, 0xc0001da3f0}}, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
/home/gareth/Projects/ChainLedger/internal/clienthandler/recordstorage.go:43 +0x6a2
net/http.(*ServeMux).ServeHTTP(0xc0000e60e0, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
/usr/local/go/src/net/http/server.go:2747 +0x3c2
net/http.serverHandler.ServeHTTP({0xc0002161e0}, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
/usr/local/go/src/net/http/server.go:3210 +0x257
net/http.(*conn).serve(0xc00199cea0, {0x989730, 0xc000214af0})
/usr/local/go/src/net/http/server.go:2092 +0x1ab5
created by net/http.(*Server).Serve in goroutine 23
/usr/local/go/src/net/http/server.go:3360 +0xa9a
goroutine 10531 [IO wait]:However, I don't think there are 10,000 active goroutines: I think it's just that we have burned our way through that many; most of them have died.
I'm not ready to deal with that yet, so I will go back to my earlier harness configuration (which just works through the pure luck of not running long enough to have two transactions clash into each other) and make sure that the harness won't quit until all the nodes have finished writing the most recent block.
Synchronizing the Harness and Blockers
So the sequence of events we want to happen are:- harness waits for all the clients to finish
- the harness tells each of the nodes that their blocker threads need to wrap up
- the harness waits for each of the nodes to answer and say they are done
In the main harness command (still under cmd/ but I can't imagine it will be long before I want to move some of it elsewhere):
func main() {
if len(os.Args) != 2 {
fmt.Printf("Usage: harness <json>\n")
return
}
log.Println("starting harness")
config := harness.ReadConfig(os.Args[1])
nodes := harness.StartNodes(config)
clients := harness.PrepareClients(config)
startedAt := time.Now().UnixMilli()
for _, c := range clients {
c.Begin()
}
for _, c := range clients {
c.WaitFor()
}
for _, n := range nodes {
n.Terminate()
}
endedAt := time.Now().UnixMilli()
log.Printf("elapsed time = %d", endedAt-startedAt)
log.Println("harness complete")
}
HARNESS_SHUTDOWN_NODES:cmd/harness/main.go
We now collect the list of nodes we start in StartNodes and then we call Terminate on each of those once the clients have all finished doing their work (and, by specification, all their transactions have been resolved and stored in the journal).This requires us to implement Terminate:
func (node *ListenerNode) Terminate() {
node.server.Shutdown(context.Background())
waitChan := make(types.Signal)
node.Control <- waitChan.Sender()
<-waitChan
log.Printf("node %s finished\n", node.name)
}
HARNESS_SHUTDOWN_NODES:internal/clienthandler/node.go
This looks somewhat complicated, but isn't too bad if you take it slowly. Or, I suspect, if you are familiar with what I have to assume is an idiom. The blocker is provided with a Control member which is of type types.PingBack (see below); when we want to terminate, we send a message down this channel. What do we send? Well, it's a channel we want to wait on. The idea here is that the Blocker will be waiting for a message on this Control channel and, when it receives it, will recover the channel we sent and, when it has finished shutting down will send a notification message back on our channel. And, as you can see, we wait until that message arrives before concluding that the node has terminated.We can see the code waiting for the message in Blocker here:
func (builder *SleepBlockBuilder) Run() {
blocktime := builder.clock.Time()
timer := builder.clock.After(delay)
lastBlock, err := builder.blocker.Build(blocktime, nil, nil)
if err != nil {
panic("error returned from building block 0")
}
for {
prev := blocktime
select {
case pingback := <-builder.control:
log.Printf("%s asked to build final block and quit", builder.Name.String())
builder.buildBlock(prev, builder.clock.Time(), lastBlock)
pingback.Send()
return
case blocktime = <-timer:
timer = builder.clock.After(delay)
nowis := <-builder.clock.After(pause)
// we are ready to build a block
log.Printf("%s building block at %s", builder.Name.String(), nowis.IsoTime())
lastBlock = builder.buildBlock(prev, blocktime, lastBlock)
}
}
}
HARNESS_SHUTDOWN_NODES:internal/block/builder.go
We do not (and indeed, cannot, by the single-threaded nature of a goroutine) actually wait for this notification to come through. What we need to do instead is to use a "channel select" construct in Go. This works in much the same way as the select(2) function call in Unix: there are a number of channels that you could wait on, and it dispatches the first one to be ready (it is also possible to specify a default to handle the case that none of them are ready; otherwise it just blocks).The second case reflects the code we already had. It's the first case that's new. Here we simply build a "final" block which is from whenever the previous block was built up until "the current time" (i.e. clock.Time()). Once that block is built, we can notify the channel we read from the PingBack channel.
Because of the duplication between these two arms of the select statement, I extracted the code to build a block, panic on errors and update local variables into a new method.
As I've said before, I find a lot of the channel syntax (specifically declaring types around channels) confusing, so I decided to "simplify" it by extracting all of this signalling to a separate set of general-purpose types, in types/signal.go:
package types
// The syntax for sending "signal" messages through a channel is ugly,
// so I'm going to fix it here
type Signal chan struct{}
type SendSignal chan<- struct{}
type OnSignal <-chan struct{}
type PingBack chan SendSignal
func (s Signal) Sender() SendSignal {
var k chan struct{} = s
return SendSignal(k)
}
func (s Signal) Reader() OnSignal {
var k chan struct{} = s
return OnSignal(k)
}
func (s SendSignal) Send() {
s <- struct{}{}
}
HARNESS_SHUTDOWN_NODES:internal/types/signal.go
The Signal type is a channel which can only be used for signalling: the type struct{} is the simplest value which is just the presence of a value. SendSignal is the type of channel which can only send values to the channel; and OnSignal is a type which can only be used to read values. For reasons I don't understand, converting between these types is not easy, so I've added a couple of helper methods to Signal to obtain the appropriate unidirectional channels. I also added a method to Send a nothing message down this nothing channel.PingBack is just a type that can send SendSignal channels. It is intended to be used in exactly the way we used it here. In the fulness of time I expect to add at least one method to encapsulate the three lines we used in Terminate.
Having done all this, we can run the harness on our small test sample of 23 messages; I've removed most of the output, but when it gets to the end, we see this:
2024/12/22 18:32:46 http://localhost:5001 asked to build final block and quitI may be easily convinced, but it seems to me that each node has read the correct number of transactions and I believe that our current tests have established that the correct blocks are generated.
2024/12/22 18:32:46 Building block before 2024-12-22_18:32:46.745, following Block with 12 records
2024/12/22 18:32:46 node http://localhost:5001 finished
2024/12/22 18:32:46 http://localhost:5002 asked to build final block and quit
2024/12/22 18:32:46 Building block before 2024-12-22_18:32:46.749, following Block with 11 records
2024/12/22 18:32:46 node http://localhost:5002 finished
2024/12/22 18:32:46 elapsed time = 201
2024/12/22 18:32:46 harness complete
Conclusion
We are anything but finished with dealing with blocks: we have created them but we haven't stored them yet, and we haven't shared them with other nodes.But we have more pressing problems: the first of which is the panic we saw above. Next time we are going to have to digress from the task of building blocks to make sure that we have more stability.
No comments:
Post a Comment