Apologies if you find the title misleading. In a very real sense, this is why we're here and building features is the digression.
If you remember back to my original thesis statement, I said the main purpose of this was to investigate massively parallel and distributed systems in the context of building a blockchain thingy. This episode is dedicated to an important part of that, rather than building blocks and adding communication between nodes.
Let's review from last time. When we tried running five clients at once for about 2,000 messages each, we had a panic attack.
Here's the message again:
fatal error: concurrent map read and map write
goroutine 10305 [running]:
github.com/gmmapowell/ChainLedger/internal/storage.MemoryPendingStorage.PendingTx({0xc0001da3c0}, 0xc000444740)
/home/gareth/Projects/ChainLedger/internal/storage/pending.go:16 +0xde
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x987af8, 0xb965a0}, {0x9865e0, 0xb965a0}, {0x986600, 0xb965a0}, 0xc000128000, {0x987080, 0xc0000b6050}}, 0xc000444740)
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:25 +0x91
github.com/gmmapowell/ChainLedger/internal/clienthandler.RecordStorage.ServeHTTP({{0x986540, 0xc000214000}, {0x988468, 0xc0001da3f0}}, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
/home/gareth/Projects/ChainLedger/internal/clienthandler/recordstorage.go:43 +0x6a2
net/http.(*ServeMux).ServeHTTP(0xc0000e60e0, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
/usr/local/go/src/net/http/server.go:2747 +0x3c2
net/http.serverHandler.ServeHTTP({0xc0002161e0}, {0x9891d8, 0xc0009e00e0}, 0xc0013f4c80)
/usr/local/go/src/net/http/server.go:3210 +0x257
net/http.(*conn).serve(0xc00199cea0, {0x989730, 0xc000214af0})
/usr/local/go/src/net/http/server.go:2092 +0x1ab5
created by net/http.(*Server).Serve in goroutine 23
/usr/local/go/src/net/http/server.go:3360 +0xa9a
This error is actually in the
MemoryPendingStorage rather than in the
Journal, but trust me, the journal has the same problem. Because I'm me - and because I can - we are going to solve both these problems right now, but we are going to do so in different ways. There are two reasons for this: the first is simply didactic: I want to both investigate and show both methods; the second is that the ways that the
MemoryPendingStorage and the
Journal work are fundamentally different, and so different approaches make sense.
Pending Storage
You will remember me saying earlier that the pending storage was the only place in the system where we are going to "update" anything - that is, change something once we have written it. The consequence of this is that we could have two threads both trying to update the same object at the same time. This makes any attempt to linearize access through channels and a single thread incredibly tricky - and possibly inefficient - so we are going to use the traditional tool of a mutex, and we are going to do so exclusively.
This may sound like a tautology, but there are (at least) two approaches to using a mutex: what we are going to do here is to lock
eveyrbody out while we do our work; the alternative is to allow multiple readers to access the mutex at once but have any writer lock out all other writers and all readers.
Let's review the code in the
MemoryPendingStorage before we start:
func (mps MemoryPendingStorage) PendingTx(tx *api.Transaction) *api.Transaction {
curr := mps.store[string(tx.ID())]
if curr == nil {
mps.store[string(tx.ID())] = tx
}
return curr
}
JOURNAL_MEMORY_QUERY:internal/storage/pending.go
This accesses the
mps.store at least once, and possibly twice. Especially given that it may write to it
after it has read it - and concluded that there is not an entry there - it is really important that nobody else is doing the same thing at the same time. In the worst possible case, two different users could decide that the transaction wasn't there, both write to it at "the same time" and one of them would have their changes lost. We can't have that.
You will often hear it said that this kind of thing "is very difficult to test". This is not true: it's easy enough to test. What's hard is getting it to happen "randomly" at runtime. And when it does, it's very hard to diagnose what's gone wrong. On the other hand, testing it is easy: you just add some delays in there. And channels make it very easy to add delays.
(On the other hand, I'm never quite sure what happens to this extra code "in production"; back in my days doing this in C++ all the fault injections were macros that were compiled out unless you turned on the
EBUG flag (written
-DEBUG on the command line) during compilation. I will have to see what mechanisms Go has for turning code on and off in debug and production modes.)
So let's write a test that proves that we can do two updates "at the same time" without problems here (this test will FAIL of course, because we know we can't do two updates at the same time, but the test is
trying to prove that we can).
The key to having this test work is to make the gap between the two accesses unreasonably long, thus ensuring that both forks of code go through the first channel before either of them can get to the second. How long is long enough? No, I don't know either (although realistically I imagine 10ms will be), so what we are going to do is to control it in the test. And we're going to build a "general purpose mechanism" to do this, because we want to make it so easy to do that everybody will do it. And then we can consider how to optimise it out during production.
package storage_test
import (
"testing"
"time"
"github.com/gmmapowell/ChainLedger/internal/api"
"github.com/gmmapowell/ChainLedger/internal/helpers"
"github.com/gmmapowell/ChainLedger/internal/storage"
"github.com/gmmapowell/ChainLedger/internal/types"
)
func TestTwoThreadsCannotBeInCriticalZoneAtOnce(t *testing.T) {
finj := helpers.FaultInjectionLibrary(t)
mps := storage.TestMemoryPendingStorage(finj)
results := make(chan *api.Transaction, 2)
go func() {
tx1, _ := api.NewTransaction("https://hello.com", types.Hash("hello"))
results <- tx1
sx := mps.PendingTx(tx1)
results <- sx
}()
w1 := finj.AllocatedWaiter()
go func() {
tx2, _ := api.NewTransaction("https://hello.com", types.Hash("hello"))
results <- tx2
rx := mps.PendingTx(tx2)
results <- rx
}()
w2 := finj.AllocatedWaiterOrNil(50 * time.Millisecond)
if w2 != nil {
t.Fatalf("second waiter allocated before first released")
}
w1.Release()
w2 = finj.AllocatedWaiter()
if w2 == nil {
t.Fatalf("second waiter could not be allocated after first released")
}
w2.Release()
tx1 := <-results
tx2 := <-results
rx1 := <-results
rx2 := <-results
if rx1 != nil {
t.Fatalf("the first call to PendingTx should return nil, not %v\n", rx1)
}
if tx1 != rx2 {
t.Fatalf("we did not get back the same transaction: %v %v\n", tx1, rx2)
}
if tx2 == rx2 {
t.Fatalf("we received the same second tx: %v %v\n", tx2, rx2)
}
}
CONCURRENCY_PENDING_TX:internal/storage/pending_sync_test.go
What the
bleep is going on here? And will we eventually refactor it to become clearer?
Well, the main thing that is going on is that there are two goroutines being executed, each of which calls
PendingTx. And the idea is that they run in parallel (which is why they are in goroutines) and that they are coordinated in the test. The coordination happens with the assistance of a
fault injection library, that is a library which is deliberately designed to inject faults of this (and other natures) directly into our code, so that we don't have to wait for them to happen.
w1 and
w2 are allocated by asking this library to wait until "some other code" (we'll get to that) has requested a
Waiter. Now, a
Waiter in this context is a piece of fault injection code which "waits" until it is released. The idea here is that we put this in the middle of a critical section and then demonstrate that either two threads can both be in it at the same time, or that one is prevented from doing so.
The call which initializes
w2 specifies a duration to wait: if this duration elapses before the
Waiter is allocated,
nil will be returned. Because we are testing that the second thread cannot get into the critical section before we explictly
Release w1, we expect this to return
nil and failure to do so will be an error.
We then
Release w1 which should then finish up its work and put the
Transaction it received back from
PendingTx in the
results channel. This channel is just used to communicate between the goroutines and the test thread.
We then try retrieving the
Waiter that should now have been allocated for the second goroutine - once the first thread has cleared the critical section, the second should be able to enter. Assuming this is successful, we can immediately
Release it.
Finally, we can recover the two transactions returned from
PendingTx in the two threads and check that the
same transaction was returned in each case - because they should match.
So far, we have only made one (significant) change to the production code which is to ask for a
Waiter after we have requested the transaction from the
map, but before we update it if it is not there.
func (mps *MemoryPendingStorage) PendingTx(tx *api.Transaction) *api.Transaction {
curr := mps.store[string(tx.ID())]
mps.finj.NextWaiter()
if curr == nil {
mps.store[string(tx.ID())] = tx
}
return curr
}
CONCURRENCY_PENDING_TX:internal/storage/pending.go
When we first run this, it fails for the simple reason that, because there is no exclusion, both threads enter the critical section and attempt to allocate a thread:
--- FAIL: TestTwoThreadsCannotBeInCriticalZoneAtOnce (0.00s)
pendingsynctest.go:32: second waiter allocated before first released
FAIL
So let's do the obvious thing and wrap that with a Mutex:
type MemoryPendingStorage struct {
mu sync.Mutex
store map[string]*api.Transaction
finj helpers.FaultInjection
}
func (mps *MemoryPendingStorage) PendingTx(tx *api.Transaction) *api.Transaction {
mps.mu.Lock()
defer mps.mu.Unlock()
curr := mps.store[string(tx.ID())]
mps.finj.NextWaiter()
if curr == nil {
mps.store[string(tx.ID())] = tx
}
return curr
}
CONCURRENCY_PENDING_MUTEX:internal/storage/pending.go
By declaring the
Mutex as an object and not a pointer inside the
MemoryPendingStorage object, we guarantee that it is initialized and we are assured that it's "zero state" is ready to go. We then call both
Lock and defer Unlock on the
Mutex as soon as we enter
PendingTx. This guarantees that we immediately lock all access
and that the moment the function exits (through any path) the
Mutex will be released.
And then you find that the test passes.
Making it Work without the Injection Library
Our entire test
suite, on the other hand, does not pass. We have introduced a regression.
The Mutex is fine - as noted above, that is created and initialized all in one go. The problem is that we don't have an instance of the fault injection library and thus we panic on a nil pointer. The easy thing to do is to add a "dummy" fault injection library that just ignores the call to
NextWaiter (technically, the call goes ahead but the implementation is empty).
func NewMemoryPendingStorage() PendingStorage {
return TestMemoryPendingStorage(helpers.IgnoreFaultInjection())
}
CONCURRENCY_IGNORE_FAULTS:internal/storage/pending.go
OK, So Show Me the Injection Library
So far, I've glossed over what's
in the injection library; this is mainly because it is messy. But now is the time to present that in all its glory. I'm just going to present it a block at a time as a finished product; I'm not going to show you how it came to be.
package helpers
import (
"testing"
"time"
)
type PairedWaiter interface {
Wait()
Release()
}
type SimplePairedWaiter struct {
t *testing.T
notifyMe chan struct{}
delay time.Duration
}
func (spw *SimplePairedWaiter) Wait() {
select {
case <-time.After(spw.delay):
spw.t.Fatalf("waited for %d but not notified", spw.delay)
case <-spw.notifyMe:
}
}
func (spw SimplePairedWaiter) Release() {
spw.notifyMe <- struct{}{}
}
Apart from the usual boilerplate, this presents the idea of a
PairedWaiter, which is an interface which is intended to be able to
Wait in one goroutine and be
Released in another. The implementation is
SimplePairedWaiter which waits either to be notified on the designated channel (
notifyMe) by
Release, or else for the
delay timeout to occur.
Release sends a signal down the
notifyMe channel.
type FaultInjection interface {
NextWaiter()
AllocatedWaiter() PairedWaiter
AllocatedWaiterOrNil(waitFor time.Duration) PairedWaiter
}
type TestingFaultInjection struct {
t *testing.T
allocations chan PairedWaiter
}
// AllocatedWaiter implements FaultInjection.
func (t *TestingFaultInjection) AllocatedWaiter() PairedWaiter {
r := t.AllocatedWaiterOrNil(5 * time.Second)
if r == nil {
t.t.Fatalf("waiter had not been allocated after 5s")
}
return r
}
// AllocatedWaiter implements FaultInjection.
func (t *TestingFaultInjection) AllocatedWaiterOrNil(waitFor time.Duration) PairedWaiter {
select {
case <-time.After(waitFor):
return nil
case ret := <-t.allocations:
return ret
}
}
// NextWaiter implements FaultInjection.
func (t *TestingFaultInjection) NextWaiter() {
ret := &SimplePairedWaiter{notifyMe: make(chan struct{}), delay: 10 * time.Second}
t.allocations <- ret
ret.Wait()
}
func FaultInjectionLibrary(t *testing.T) FaultInjection {
return &TestingFaultInjection{t: t, allocations: make(chan PairedWaiter, 10)}
}
FaultInjection is the root of all of the fault injection technology we are hoping to develop over the course of this project. The idea is that there at two versions of the
interface - one that actually injects faults in the testing infrastructure, and one that doesn't. Ideally, I would like to completely eliminate this from the build, but I'm not sure whether I can. But the price of a few method calls to have testable code is a tradeoff I'm willing to make.
NextWaiter is the call we expect to make from production code and the contract is that, in a test environment, it allocates a waiter and passes back a pointer to it to where the test script can access it. It then puts this new
Waiter in its waiting state until either it times out or is released.
The other half of this, as called by the test harness, is
AllocatedWaiter (and its companion
AllocatedWaiterOrNil. This reads from the channel
allocations, which is where the newly created
Waiters are placed when they are allocated. If a
Waiter has not been created in
NextWaiter, the call to
AllocatedWaiterOrNil will time out, returning
nil.
AllocatedWaiter considers this an error and panics.
Finally, the function
FaultInjectionLibrary passes in a testing environment (so that errors can be reported gracefully) and a channel for the allocated
Waiters to be passed down. Note that this channel has a "size" of 10, meaning that up to 10
Waiters can be allocated before the channel will block. This may not be enough, but you have to pick some number.
type InactiveFaultInjection struct{}
// AllocatedWaiter implements FaultInjection.
func (i *InactiveFaultInjection) AllocatedWaiter() PairedWaiter {
panic("this should only be called from test methods, I think")
}
// AllocatedWaiterOrNil implements FaultInjection.
func (i *InactiveFaultInjection) AllocatedWaiterOrNil(waitFor time.Duration) PairedWaiter {
panic("this should only be called from test methods, I think")
}
// NextWaiter implements FaultInjection.
func (i *InactiveFaultInjection) NextWaiter() {
}
func IgnoreFaultInjection() FaultInjection {
return &InactiveFaultInjection{}
}
CONCURRENCY_IGNORE_FAULTS:internal/helpers/faults.go
Finally, for production use there is a version of the Fault Injection Library which has a "nothing" implementation of
NextWaiter. At the moment, this is the only call that should be in production code, so the other two methods in the class are implemented using
panic.
Shared Instances are, in fact, Fine
It may be objected that we are returning these instances and then updating them at the same time in
ResolveTx:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
curr := r.store.PendingTx(tx)
complete := true
for i, v := range tx.Signatories {
if v.Signature != nil && curr != nil {
curr.Signatories[i] = v
} else if v.Signature == nil {
if curr == nil || curr.Signatories[i].Signature == nil {
complete = false
}
}
}
if complete {
return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
}
return nil, nil
}
CONCURRENCY_IGNORE_FAULTS:internal/clienthandler/resolver.go
This is absolutely true, but it is not really an objection. The object does not get reallocated, and there is no contention for a memory location. There are two possibilites: one is that the two goroutines are updating
different signatures, and the other is that they are trying to update the
same signature. If they are trying to update different signatures, both will succeed and it will all be fine. If they are trying to update the same signature, it doesn't matter
what we do, because one will win and the other will lose. Because the pointer writes are atomic, one or the other will win - they cannot mangle each other.
OK, let's assume that that argument didn't convince you. How would we test this? Well, there's only one place where we store a signature, so let's put another
Waiter just before that, and then write a test that goes through there and makes sure that both goroutines have arrived there before it releases either of them.
So we think we can write this test, with a view to coming back and making it more "solid" once we have seen it work or not work.
package clienthandler_test
import (
"testing"
"github.com/gmmapowell/ChainLedger/internal/helpers"
"github.com/gmmapowell/ChainLedger/internal/records"
)
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
setup(t, clock)
collector := make(chan *records.StoredTransaction, 2)
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
stx, _ := r.ResolveTx(tx)
collector <- stx
}()
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
stx, _ := r.ResolveTx(tx)
collector <- stx
}()
s1 := <-collector
s2 := <-collector
if s1 != s2 {
t.Fatalf("The two transactions were not the same")
}
if s1.Signatories[0].Signature == nil {
t.Fatalf("the first signature is missing")
}
if s1.Signatories[1].Signature == nil {
t.Fatalf("the second signature is missing")
}
}
RESOLVER_SYNC_SIGNATURE:internal/clienthandler/resolver_sync_test.go
Note that because this is in the same package (
clienthandler_test) as the previous resolver tests, it is able to just "borrow" all the infrastructure we built up there (e.g.
maketx).
But when we run this test, it doesn't even fail. It just hangs. What's the problem? Well, here's a hint: I have done something very bad, and I didn't realize it until just now. It's because I'm used to the Java threading and exception models, and Go does not work the same way.
The immediate problem is that I have called
t.Fatalf from within a goroutine, and that, apparently, is just not done, because it calls
FailNow which cannot be done within a goroutine.
While I was debugging this, I realized that the code to put
Signatures in place was actually overwriting the whole
Signatory which is both overkill and probably asking for trouble. So I fixed that:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
curr := r.store.PendingTx(tx)
complete := true
for i, v := range tx.Signatories {
if v.Signature != nil && curr != nil {
curr.Signatories[i].Signature = v.Signature
} else if v.Signature == nil {
if curr == nil || curr.Signatories[i].Signature == nil {
complete = false
}
}
}
if complete {
return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
}
return nil, nil
}
RESOLVER_SYNC_SIGNATURE:internal/clienthandler/resolver.go
But obviously it still hangs. So what is happening here is that I am calling
FailNow indirectly, and that is doing something to cause the current goroutine to exit, but is not causing the
whole test to fail. That is sitting, waiting for a message to come down a channel that is never going to come. And so it all hangs.
We need to rework our plumbing and infrastructure to make sure that this does something (like closing the channels) that will release the main thread.
For the record, following the code through to the bitter end, the actual fault in our test is that we haven't defined any entries for the
HasherFactory and it complains about this inside the function to create a
StoredTransaction inside
ResolveTx. But that's not important right now.
Reworking the Test
I'm going to just dive off the deep end here and start building stuff. I don't know what the consequences of my actions are going to be, and right now I don't really care. In the immortal words of Kent Beck, "if you don't know what to do to succeed, try to fail". We'll end up somewhere and then we can always rework whatever we've got.
Now, when in doubt, what I tend to do is to wrap something up in a class and make the problem "go away" in the sense of putting it somewhere else rather than in my eyeline. So I'm going to wrap up all of the channel communication in a class:
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
cc := helpers.NewChanCollector(t, 2)
setup(cc, clock)
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
s1 := cc.Recv()
s2 := cc.Recv()
if s1 != s2 {
t.Fatalf("The two transactions were not the same")
}
tx1 := s1.(records.StoredTransaction)
if tx1.Signatories[0].Signature == nil {
t.Fatalf("the first signature is missing")
}
if tx1.Signatories[1].Signature == nil {
t.Fatalf("the second signature is missing")
}
}
INTRODUCED_CHAN_COLLECTOR:internal/clienthandler/resolver_sync_test.go
Tackling the changes I
wanted to make here, I introduced a new class
ChanCollector and created an instance, passing it a pointer to the testing environment (so it can complain if the channel is closed when it is waiting for it) and the size of channel buffer I want. I then replace the explicit send with a call to
Send and the explicit channel read with a call to
Recv. The idea is that I have isolated any other problems in this class.
However, this doesn't actually address the problem, since the problem is that we are calling
FailNow in a goroutine. So we need to stop doing this, so we stop passing in the testing environment and instead pass in a pointer to this
ChanCollector object.
How can we do that? It isn't the right type. Ah, yes, well, more plumbing changes...
I introduced a new interface
helpers.Fatals and gave it a subset of methods from the
testing.T class (I did look to see if such an interface already exists, but didn't find one):
package helpers
type Fatals interface {
Fatalf(format string, args ...any)
Log(args ...any)
Logf(format string, args ...any)
Fail()
}
INTRODUCED_CHAN_COLLECTOR:internal/helpers/fatals.go
I then implemented those methods on
ChanCollector and replaced all the places where I was passing a
testing.T around to instead take a
helpers.Fatals. And then, I can pass the
ChanCollector where I previously had
t.
This is the
ChanCollector:
package helpers
import (
"testing"
)
type ChanCollector struct {
t *testing.T
collector chan any
}
// Fail implements Fatals.
func (cc *ChanCollector) Fail() {
close(cc.collector)
}
// Fatalf implements Fatals.
func (cc *ChanCollector) Fatalf(format string, args ...any) {
cc.t.Logf(format, args...)
cc.Fail()
}
// Log implements Fatals.
func (cc *ChanCollector) Log(args ...any) {
cc.t.Log(args...)
}
// Logf implements Fatals.
func (cc *ChanCollector) Logf(format string, args ...any) {
cc.t.Logf(format, args...)
}
func (cc *ChanCollector) Send(obj any) {
cc.collector <- obj
}
func (cc *ChanCollector) Recv() any {
msg, ok := <-cc.collector
if !ok {
cc.t.FailNow()
}
return msg
}
func NewChanCollector(t *testing.T, nc int) *ChanCollector {
return &ChanCollector{t: t, collector: make(chan any, nc)}
}
INTRODUCED_CHAN_COLLECTOR:internal/helpers/chancollector.go
Which is so much just a glue layer that I'm not even going to bother commenting on it.
Sadly, by itself, this doesn't help much. Why not? Well, the semantics of
FailNow, and thus
Fatalf, have changed subtly: previously, the
FailNow method would cause the process to exit. Now, it simply closes the channel back to the main thread and continues. This is a problem in the problem code,
NewHasher:
func (f *MockHasherFactory) NewHasher() hash.Hash {
if f.next >= len(f.hashers) {
f.t.Fatalf("The mock hasher does not have %d hashers configured", f.next+1)
}
r := f.hashers[f.next]
f.next++
return r
}
RESOLVER_SYNC_SIGNATURE:internal/helpers/hashing.go
Consequently, it drops through to the next line where it tries to read a non-existent member of the slice. This isn't ever going to (and doesn't) end well. So we might as well protect it with our own
panic:
func (f *MockHasherFactory) NewHasher() hash.Hash {
if f.next >= len(f.hashers) {
f.t.Fatalf("The mock hasher does not have %d hashers configured", f.next+1)
panic("not enough hashers")
}
r := f.hashers[f.next]
f.next++
return r
}
INTRODUCED_CHAN_COLLECTOR:internal/helpers/hashing.go
So nothing really worked the way I wanted, but at least we are in control of our own destiny and we get a decent
panic message before we quit:
--- FAIL: TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime (7.98s)
chancollector.go:19: The mock hasher does not have 1 hashers configured
panic: not enough hashers
goroutine 21 [running]:
github.com/gmmapowell/ChainLedger/internal/helpers.(*MockHasherFactory).NewHasher(0xc0000a0fc0)
/home/gareth/Projects/ChainLedger/internal/helpers/hashing.go:37 +0x18d
github.com/gmmapowell/ChainLedger/internal/records.CreateStoredTransaction({0x727af0, 0xc000092b40}, {0x727140, 0xc0000a0fc0}, {0x727160, 0xc0000a0ff0}, 0xc0000da280, 0xc000092000)
/home/gareth/Projects/ChainLedger/internal/records/storedtransaction.go:26 +0x2cb
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x727af0, 0xc000092b40}, {0x727140, 0xc0000a0fc0}, {0x727160, 0xc0000a0ff0}, 0xc0000da280, {0x727560, 0xc00007c4e0}}, 0xc000404000)
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:38 +0x28c
github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime.func1()
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:16 +0x165
created by github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime in goroutine 7
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:14 +0x153
So, I'm just going to press on and this point and fix the problem (i.e. add some fake hashes) and see what happens.
Fixing the test
So I went ahead and added a
Hasher and said "sure, let it accept anything". I'm not here to test the hashing algorithm, I'm here to look into concurrency issues. Not only does it waste my time, it reduces the clarity of an already unclear test, cluttering it up with irrelevancies.
And then I found it failed. Looking at it closely, I'm not surprised. I was specifically testing that both messages that came back were the same. They're not, of course: one of them (whichever gets processed first) should be
nil and the other one should be the
records.StoredTransaction. I tried to fix this and was surprised by the outcome.
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
cc := helpers.NewChanCollector(t, 2)
setup(cc, clock)
h1 := hasher.AddMock("fred")
h1.AcceptAnything()
signer.Expect(types.Signature("tx-sig"), nodeKey, types.Hash("fred"))
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
s1 := cc.Recv()
s2 := cc.Recv()
if s1 == nil && s2 == nil {
t.Fatalf("both transactions were nil")
}
if s1 != nil && s2 != nil {
t.Fatalf("both transactions were NOT nil: %v %v", s1, s2)
}
tx1 := s1.(records.StoredTransaction)
if tx1.Signatories[0].Signature == nil {
t.Fatalf("the first signature is missing")
}
if tx1.Signatories[1].Signature == nil {
t.Fatalf("the second signature is missing")
}
}
RESOLVER_SYNC_FIX_1:internal/clienthandler/resolver_sync_test.go
--- FAIL: TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime (733.59s)
resolversynctest.go:37: both transactions were NOT nil: <nil> &{[102 114 101 100] 1735095600121 https://test.com/msg1 [104 97 115 104] [0xc00007c2e0 0xc00007c420] [116 120 45 115 105 103]}
That clearly says "both were NOT nil" and then shows the first one
IS nil. Uh, what now, Go?
After some googling, I came across
this article about nil, from which I learned a lot, not just about
nil, but also more about pointers and interfaces and that has helped me achieve some more clarity around why interfaces are at the same level of indirection as pointers (i.e. because an interface is an object which
contains a pointer).
OK, so I need to unwrap the
any I got passed back and lay my hands on a real pointer. Hopefully this works without panicking because otherwise I'm in an infinite loop of "cast this so I can test it is nil/is it non-nil so that I can safely cast it?"
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
cc := helpers.NewChanCollector(t, 2)
setup(cc, clock)
h1 := hasher.AddMock("fred")
h1.AcceptAnything()
signer.Expect(types.Signature("tx-sig"), nodeKey, types.Hash("fred"))
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
s1 := cc.Recv()
s2 := cc.Recv()
tx1 := s1.(*records.StoredTransaction)
tx2 := s2.(*records.StoredTransaction)
if tx1 == nil && tx2 == nil {
t.Fatalf("both transactions were nil")
}
if tx1 != nil && tx2 != nil {
t.Fatalf("both transactions were NOT nil: %v %v", tx1, tx2)
}
if tx1 == nil {
tx1 = tx2
}
if tx1.Signatories[0].Signature == nil {
t.Fatalf("the first signature is missing")
}
if tx1.Signatories[1].Signature == nil {
t.Fatalf("the second signature is missing")
}
}
RESOLVER_SYNC_FIX_2:internal/clienthandler/resolver_sync_test.go
So I'm glad to report that
did work, and then I ran into the obvious problem, that my code assumed that
tx1 was valid, which may or may not be the case, depending on what happens. So I added an extra test here: if
tx1 is nil then, by deduction,
tx2 must be non-nil, so we assign
tx2 to
tx1 and carry on.
OK, we've managed to work around this issues with Go and get a passing test. What were we trying to do again?
Trying to make the test fail
We were trying to demonstrate that it doesn't matter if both goroutines see the same transaction at the same time and both update it: at some point, one of them will have a transaction in their hands that has both signatures and
at some point after that they will test to see if it has all the signatures attached. And will not crash.
A failure would most likely look like both goroutines returning
nil because both of them believed that they only had one signature.
So we need to put some wait points in to our code to make sure that we make it as dramatic as possible, rather than just depending on luck.
func TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime(t *testing.T) {
clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
cc := helpers.NewChanCollector(t, 2)
setup(cc, clock, true)
h1 := hasher.AddMock("fred")
h1.AcceptAnything()
signer.Expect(types.Signature("tx-sig"), nodeKey, types.Hash("fred"))
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/")
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
go func() {
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", "https://user2.com/", true)
stx, _ := r.ResolveTx(tx)
cc.Send(stx)
}()
// Now wait for both of them to get to the critical section
w1 := finj.AllocatedWaiter()
w2 := finj.AllocatedWaiter()
// Then we can release both of them
w1.Release()
w2.Release()
s1 := cc.Recv()
s2 := cc.Recv()
tx1 := s1.(*records.StoredTransaction)
tx2 := s2.(*records.StoredTransaction)
if tx1 == nil && tx2 == nil {
t.Fatalf("both transactions were nil")
}
if tx1 != nil && tx2 != nil {
t.Fatalf("both transactions were NOT nil: %v %v", tx1, tx2)
}
if tx1 == nil {
tx1 = tx2
}
if tx1.Signatories[0].Signature == nil {
t.Fatalf("the first signature is missing")
}
if tx1.Signatories[1].Signature == nil {
t.Fatalf("the second signature is missing")
}
}
RESOLVER_SYNC_TEST_FINJ:internal/clienthandler/resolver_sync_test.go
This asks the
setup() function to allocate a fault injector, and uses that to align the two threads and then release them. In the
Resolver, we allocate the waiter once the pending transaction has been returned:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
curr := r.store.PendingTx(tx)
r.finj.NextWaiter()
complete := true
for i, v := range tx.Signatories {
if v.Signature != nil && curr != nil {
curr.Signatories[i].Signature = v.Signature
} else if v.Signature == nil {
if curr == nil || curr.Signatories[i].Signature == nil {
complete = false
}
}
}
if complete {
return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
}
return nil, nil
}
RESOLVER_SYNC_TEST_FINJ:internal/clienthandler/resolver.go
And, look at that! It panics with a
nil pointer dereference:
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x685ac8]
goroutine 22 [running]:
github.com/gmmapowell/ChainLedger/internal/records.CreateStoredTransaction({0x728ff0, 0xc00010ab40}, {0x728640, 0xc000118fc0}, {0x728660, 0xc000118ff0}, 0xc000308080, 0x0)
/home/gareth/Projects/ChainLedger/internal/records/storedtransaction.go:24 +0x88
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x728ff0, 0xc00010ab40}, {0x728640, 0xc000118fc0}, {0x728660, 0xc000118ff0}, 0xc000308080, {0x728a60, 0xc00007c3a0}, {0x729988, ...}}, ...)
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:40 +0x26b
github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime.func2()
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:28 +0x165
created by github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime in goroutine 7
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:26 +0x2ad
What has happened? Amazingly, this error has nothing to do with the synchronization issue here, but is just a simple bug that has been sitting there all the time, but I think would only be exposed by a single-client transaction.
PendingTx returns
nil (which is stored in
curr) if the transaction passed in is the first instance of that transaction. If that version of the transaction has all the signatures it needs already, then
nil is passed in as the transaction to be converted into a
StoredTransaction.
func TestSubmittingACompleteTransactionStoresItImmediately(t *testing.T) {
clock := helpers.ClockDoubleIsoTimes("2024-12-25_03:00:00.121")
setup(t, clock, false)
h1 := hasher.AddMock("fred")
h1.AcceptAnything()
signer.SignAnythingAs("hello")
tx := maketx("https://test.com/msg1", "hash", "https://user1.com/", true, "https://user2.com/", true)
stx, _ := r.ResolveTx(tx)
if stx == nil {
t.Fatalf("the transaction was not stored")
}
}
RESOLVER_COMPLETED_TEST:internal/clienthandler/resolver_test.go
And, lo and behold, we come across the same panic as above. The problem is that the
PendingTransaction is
nil. So let's fix that by updating the value of
curr just before we call
CreateStoredTransaction (but only if it's
nil).
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
curr := r.store.PendingTx(tx)
r.finj.NextWaiter()
complete := true
for i, v := range tx.Signatories {
if v.Signature != nil && curr != nil {
curr.Signatories[i].Signature = v.Signature
} else if v.Signature == nil {
if curr == nil || curr.Signatories[i].Signature == nil {
complete = false
}
}
}
if complete {
if curr == nil {
curr = tx
}
return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
}
return nil, nil
}
RESOLVER_COMPLETED_TEST_FIXED:internal/clienthandler/resolver.go
So our new test passes - but what about the synchronous test?
Well, no. It makes it further, though:
panic: runtime error: index out of range [1] with length 1
goroutine 9 [running]:
github.com/gmmapowell/ChainLedger/internal/helpers.(*ClockDouble).Time(0xc00008ab40)
/home/gareth/Projects/ChainLedger/internal/helpers/clock.go:48 +0x7c
github.com/gmmapowell/ChainLedger/internal/records.CreateStoredTransaction({0x72a090, 0xc00008ab40}, {0x7296e0, 0xc0000a0fc0}, {0x729700, 0xc0000a0ff0}, 0xc0000da100, 0xc000134000)
/home/gareth/Projects/ChainLedger/internal/records/storedtransaction.go:25 +0x11f
github.com/gmmapowell/ChainLedger/internal/clienthandler.TxResolver.ResolveTx({{0x72a090, 0xc00008ab40}, {0x7296e0, 0xc0000a0fc0}, {0x729700, 0xc0000a0ff0}, 0xc0000da100, {0x729b00, 0xc00035e260}, {0x72aa28, ...}}, ...)
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolver.go:43 +0x28e
github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime.func2()
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:28 +0x165
created by github.com/gmmapowell/ChainLedger/internal/clienthandler_test.TestThatTwoThreadsCanSignDifferentFieldsAtTheSameTime in goroutine 7
/home/gareth/Projects/ChainLedger/internal/clienthandler/resolversynctest.go:26 +0x2ad
This fails because while it is possible to update both signatures at the same time, if both legs see all the signatures complete, then both will try and create a stored transaction and we will have more
StoredTransaction objects than we want. Can we fix this?
Well, yes, of course we can. There are a number of strategies, and we may try another one in a minute, but one possibility is to add a flag to the pending transaction to indicate if it has already been "selected" by a previous thread.
There is a problem, of course, which is "how do we test the flag, and then update it before the other leg can test it?" We could, of course, use another mutex, but
this specific problem has a specific solution right down at the processor level which is an "atomic test and set operation". In Go, this is provided by the
atomic package (available since Go 1.19).
So we can update
ResolveTx to look like this:
func (r TxResolver) ResolveTx(tx *api.Transaction) (*records.StoredTransaction, error) {
curr := r.store.PendingTx(tx)
r.finj.NextWaiter()
complete := true
for i, v := range tx.Signatories {
if v.Signature != nil && curr != nil {
curr.Signatories[i].Signature = v.Signature
} else if v.Signature == nil {
if curr == nil || curr.Signatories[i].Signature == nil {
complete = false
}
}
}
if complete {
if curr == nil {
curr = tx
}
if !curr.AlreadyCompleted() {
return records.CreateStoredTransaction(r.clock, r.hasher, r.signer, r.nodeKey, curr)
}
}
return nil, nil
}
RESOLVER_COMPLETE_ONLY_ONCE:internal/clienthandler/resolver.go
And
AlreadyCompleted is implemented in
transaction.go:
type Transaction struct {
ContentLink *url.URL
ContentHash types.Hash
Signatories []*types.Signatory
completed atomic.Bool
}
...
func (tx *Transaction) AlreadyCompleted() bool {
return tx.completed.Swap(true)
}
RESOLVER_COMPLETE_ONLY_ONCE:internal/api/transaction.go
And now all our test pass. Whew!
Journal
Happily, even our big test now passes, but I believe that is good fortune, rather than good design. The overlap happens in the two methods in
MemoryJournaller:
func (d *MemoryJournaller) RecordTx(tx *records.StoredTransaction) error {
d.txs = append(d.txs, tx)
fmt.Printf("%s recording tx with id %v, have %d\n", d.name, tx.TxID, len(d.txs))
return nil
}
func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error) {
var ret []*records.StoredTransaction
for _, tx := range d.txs {
if tx.WhenReceived >= from && tx.WhenReceived < upto {
ret = append(ret, tx)
}
}
return ret, nil
}
RESOLVER_COMPLETE_ONLY_ONCE:internal/storage/journal.go
But the only time that anything is written here is during the
append function call, so the opportunities for conflict are small - most likely if we are in the process of reading a block of transactions when an "updating append" occurs.
Using fault injection - and some cunning white box testing - we can force the
Read method to pause while we pump some more records into the array, forcing an extension, and then release the reader.
I'm going to do this in a couple of steps. So, first, we need to get the journal in a state where it has at least one message in the
txs slice and the slice has reached its capacity. How do we know these things? Well, there are meta operations on slices (
len and
cap) that enable you to know the length of a slice and its capacity (i.e. how long it can become before reallocation will occur). So, let's add some methods to the journal to expose this:
func (d *MemoryJournaller) HaveSome() bool {
fmt.Printf("len = %d\n", len(d.txs))
return len(d.txs) > 0
}
func (d *MemoryJournaller) NotAtCapacity() bool {
fmt.Printf("cap = %d len = %d\n", cap(d.txs), len(d.txs))
return cap(d.txs) < len(d.txs)
}
JOURNAL_SYNC_1:internal/storage/journal.go
The
fmt.Printf statements here just show my mind at work during development - I like to see what's going on. I'll remove them in a couple of iterations.
With this in place we can write the following test:
package storage_test
import (
"testing"
"github.com/gmmapowell/ChainLedger/internal/helpers"
"github.com/gmmapowell/ChainLedger/internal/records"
"github.com/gmmapowell/ChainLedger/internal/storage"
)
func TestWeCanAddAndRecoverAtTheSameTime(t *testing.T) {
finj := helpers.FaultInjectionLibrary(t)
tj := storage.TestJournaller("journal", finj)
journal := tj.(*storage.MemoryJournaller)
completed := false
go func() {
for !completed {
journal.RecordTx(storableTx())
}
}()
aw := finj.AllocatedWaiter()
for !journal.HaveSome() || journal.NotAtCapacity() {
aw.Release()
aw = finj.AllocatedWaiter()
}
}
func storableTx() *records.StoredTransaction {
return &records.StoredTransaction{TxID: []byte("hello")}
}
JOURNAL_SYNC_1:internal/storage/journal_sync_test.go
Apart from all the boilerplate and setup, this kicks off a goroutine (obviously to test anything to do with concurrency, you need at least two goroutines - usually in addition to the test). This keeps adding new records to the journal until the
completed flag is set - or the goroutine is shut down when the test ends. Inside the
RecordTx method (as we'll see in a minute) it pauses at the critical point, allocating a waiter, and blocking the goroutine. In the main test, we wait for that waiter to be allocated, and then enter a loop, waiting for the desired condition to exist and repeatedly releasing the goroutine, then waiting for it to pause again. Using this pattern, there is no possibility that both threads are running at the same time.
At the end of this, we are in the desired condition: the slice is at capacity with at least one entry in it. As it happens, it seems to be exactly one entry, as shown here:
Starting: /home/gareth/go/bin/dlv dap --listen=127.0.0.1:38689 --log-dest=3 from /home/gareth/Projects/ChainLedger/internal/storage
DAP server listening at: 127.0.0.1:38689
Type 'dlv help' for list of commands.
len = 0
journal recording tx with id [104 101 108 108 111], have 1
len = 1
cap = 1 len = 1
PASS
Process 73675 has exited with status 0
Detaching
dlv dap (73454) exited with code: 0
Finally, these are the wait points we added to the journaller code:
func (d *MemoryJournaller) RecordTx(tx *records.StoredTransaction) error {
d.finj.NextWaiter()
d.txs = append(d.txs, tx)
fmt.Printf("%s recording tx with id %v, have %d\n", d.name, tx.TxID, len(d.txs))
return nil
}
func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error) {
var ret []*records.StoredTransaction
for _, tx := range d.txs {
d.finj.NextWaiter()
if tx.WhenReceived >= from && tx.WhenReceived < upto {
ret = append(ret, tx)
}
}
return ret, nil
}
JOURNAL_SYNC_1:internal/storage/journal.go
So now we want to kick off our second goroutine, to try and recover the messages to add to a block, and that is going to hit a wait point, which we will wait for and capture. We can then release the first thread, triggering a reallocation. If all goes well, when we release the second thread, it will
panic.
func TestWeCanAddAndRecoverAtTheSameTime(t *testing.T) {
clock := helpers.ClockDoubleSameMinute("2024-12-25_03:00", "05.121", "07.282", "11.281", "19.202")
finj := helpers.FaultInjectionLibrary(t)
tj := storage.TestJournaller("journal", finj)
journal := tj.(*storage.MemoryJournaller)
completed := false
go func() {
for !completed {
journal.RecordTx(storableTx(clock))
}
}()
aw := finj.AllocatedWaiter()
for !journal.HaveSome() || journal.NotAtCapacity() {
aw.Release()
aw = finj.AllocatedWaiter()
}
go func() {
txs, _ := tj.ReadTransactionsBetween(clock.Times[0], clock.Times[3])
fmt.Printf("%v\n", txs)
}()
rw := finj.AllocatedWaiter()
aw.Release()
/*aw = */ finj.AllocatedWaiter()
rw.Release()
// aw.Release()
}
JOURNAL_SYNC_2:internal/storage/journal_sync_test.go
And this is what happens:
len = 0
journal recording tx with id [104 101 108 108 111], have 1 at 0xc000130060
len = 1
cap = 1 len = 1
before waiting txs = 0xc000130060
journal recording tx with id [104 101 108 108 111], have 2 at 0xc00012c760
after waiting txs = 0xc000130060
[0xc00014e1c0]
PASS
I freely admit I wasn't expecting that: so much so, that I added extra tracing to be sure I was seeing what I was seeing, specifically to report the "pointer value" of the slice in both goroutines. As you can see, the
append in
RecordTx has changed the pointer value of
d.txs (from (130060 to 12c760), but the thread doing the recovery has not noticed the change. If the memory underlying this original slice is still valid, then everything is fine; if not, we should have had a panic. I think I need to go off and research what Go is supposed to do in these circumstances. If this is guaranteed behaviour, we are done. If not, we need to figure out how to expose that we have broken something; it may be my assumption that recovering one message was enough was invalid.
Googling didn't help me very much, but going back to
The Go Programming Language, Section 9.1 makes it very clear that there is nothing magical going on and bad things can definitely happen, and I am perhaps even being optimistic about there being a
panic - it would seem that scribbling over random memory is a distinct possibility.
So I am going to increase the number of writes and reads that I do and see what happens.
func TestWeCanAddAndRecoverAtTheSameTime(t *testing.T) {
clock := helpers.ClockDoubleSameMinute("2024-12-25_03:00", "05.121", "07.282", "08.301", "08.402", "11.281", "14.010", "19.202")
finj := helpers.FaultInjectionLibrary(t)
tj := storage.TestJournaller("journal", finj)
journal := tj.(*storage.MemoryJournaller)
completed := false
go func() {
for !completed {
journal.RecordTx(storableTx(clock))
}
}()
aw := finj.AllocatedWaiter()
for !journal.HaveAtLeast(3) || journal.NotAtCapacity() {
aw.Release()
aw = finj.AllocatedWaiter()
}
waitAll := make(chan struct{})
go func() {
txs, _ := tj.ReadTransactionsBetween(clock.Times[0], clock.Times[6])
fmt.Printf("%v\n", txs)
txs, _ = tj.ReadTransactionsBetween(clock.Times[0], clock.Times[6])
fmt.Printf("%v\n", txs)
waitAll <- struct{}{}
}()
rw := finj.AllocatedWaiter()
aw.Release()
/*aw = */ finj.AllocatedWaiter()
rw.Release()
finj.JustRun()
<-waitAll
}
JOURNAL_SYNC_3:internal/storage/journal_sync_test.go
It still doesn't end as disastrously as you might think, but it's not "right" and I think the actual consequences probably depend as much on the compiler as anything else.
Starting: /home/gareth/go/bin/dlv dap --listen=127.0.0.1:35803 --log-dest=3 from /home/gareth/Projects/ChainLedger/internal/storage
DAP server listening at: 127.0.0.1:35803
Type 'dlv help' for list of commands.
journal recording tx with id [104 101 108 108 111], have 1 at 0xc000180000
journal recording tx with id [104 101 108 108 111], have 2 at 0xc000182020
journal recording tx with id [104 101 108 108 111], have 3 at 0xc000198060
journal recording tx with id [104 101 108 108 111], have 4 at 0xc000198060
before waiting txs = 0xc000198060
journal recording tx with id [104 101 108 108 111], have 5 at 0xc00019e040
after waiting txs = 0xc000198060
before waiting txs = 0xc000198060
after waiting txs = 0xc000198060
before waiting txs = 0xc000198060
after waiting txs = 0xc000198060
before waiting txs = 0xc000198060
after waiting txs = 0xc000198060
[0xc00011c1c0 0xc000194000 0xc000194230 0xc000194460]
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
before waiting txs = 0xc00019e040
after waiting txs = 0xc00019e040
[0xc00011c1c0 0xc000194000 0xc000194230 0xc000194460 0xc000194690]
PASS
Process 146902 has exited with status 0
Detaching
dlv dap (146676) exited with code: 0
Clearly, in the goroutine doing the reading, it does not get the memo that
d.txs has changed until after it leaves the
range loop. It is unclear what code is generated that this should be true. Bear in mind that it is not just that the
range might have cached it; I am printing out the value and it does not change until after the loop has completed. As a compiler designer I can only imagine that the value of
d.txs is captured in a local variable before the loop starts, equivalent to this:
for dtxs := d.txs; _, tx := range dtxs {
fmt.Printf("before waiting txs = %p\n", dtxs)
d.finj.NextWaiter()
fmt.Printf("after waiting txs = %p\n", dtxs)
...
}
Thus the whole of the slice is read (and "locked" so that it is not released by the memory manager) for the duration of the loop. If so, the code "as written" (i.e. if the only possible race point were inside the loop) would be valid; but it is still theoretically possible (although not practically testable) that the slice
d.txs is updated while it is being cached; if so, the three parts of the slice could be out of whack.
Either way, we are not going to take the risk, but instead we are going to put the whole of the journal in a separate goroutine and submit both write requests and read requests through a channel. The current API will be preserved but will do the channel interactions.
A Channel-based Journal
So let's start with drafting out what such a journal would look like. This isn't really a test, but it gives a flavour of what we're going to be doing (although we will need to add fields to the requests to
Store and
Retrieve).
func TestThread(t *testing.T) {
ch := storage.LaunchJournalThread()
ch <- storage.JournalStoreCommand{}
ch <- storage.JournalRetrieveCommand{}
ch <- 42
donech := make(chan struct{})
ch <- storage.JournalDoneCommand{NotifyMe: donech}
<-donech
}
JOURNAL_THREAD_DRAFT:internal/storage/journal_sync_test.go
This "launches" a new journal thread - a goroutine responsible for isolating all of the journal contents - and returns a channel which can be used to submit requests. As I understand it, writing to a channel
is thread safe, so we can store this anywhere we want and all our threads can submit requests to the journal via this channel without issues. We then submit a variety of items.
All of the items referenced here are to be found in the new file
journal_thread.go:
package storage
import "fmt"
type JournalCommand interface {
}
type JournalStoreCommand struct {
}
type JournalRetrieveCommand struct {
}
type JournalDoneCommand struct {
NotifyMe chan<- struct{}
}
func LaunchJournalThread() chan<- JournalCommand {
ret := make(chan JournalCommand)
go func() {
whenDone:
for {
x := <-ret
switch v := x.(type) {
case JournalStoreCommand:
fmt.Printf("was a store command %v\n", v)
case JournalRetrieveCommand:
fmt.Printf("was a retrieve command %v\n", v)
case JournalDoneCommand:
fmt.Printf("was a done command %v\n", v)
v.NotifyMe <- struct{}{}
break whenDone
default:
fmt.Printf("not a valid journal command %v\n", x)
}
}
}()
return ret
}
JOURNAL_THREAD_DRAFT:internal/storage/journal_thread.go
And when we run this test, we see the following (fairly boring) output:
was a store command {}
was a retrieve command {}
not a valid journal command 42
was a done command {0xc000180000}
It's interesting to note that, because of the way Go implements interfaces, the interface here is functionally equivalent to
any - any object is a valid instance of the interface because it trivially implements the (non-existent) interface methods.
So, what have we actually achieved here? Not a lot, which is why I've described it as a "draft": it's an outline of what we want to achieve. But it's worth noting that from here, pretty much everything is a refactoring.
The intention of this "refactoring" is that the user API shouldn't change (much), but rather, than we should bury all references to the journal thread within the existing API so that it is compatible with other APIs (such as the Dummy API, and also the "future" DynamoDB implementation) that don't need an exclusionary thread. The one API change we do want is to allow the thread to be terminated by having a
Quit method on the journal.
So, pulling across the code from the existing journal API, we end up with this:
type JournalStoreCommand struct {
Tx *records.StoredTransaction
}
type JournalRetrieveCommand struct {
From, Upto types.Timestamp
ResultChan chan<- []*records.StoredTransaction
}
type JournalCheckCapacityCommand struct {
AtLeast int
ResultChan chan<- bool
}
type JournalDoneCommand struct {
NotifyMe chan<- struct{}
}
func LaunchJournalThread(name string, finj helpers.FaultInjection) chan<- JournalCommand {
var txs []*records.StoredTransaction
ret := make(chan JournalCommand)
log.Printf("launching new journal thread with channel %p", ret)
go func() {
whenDone:
for {
x := <-ret
switch v := x.(type) {
case JournalStoreCommand:
txs = append(txs, v.Tx)
log.Printf("%s recording tx with id %v, have %d at %p", name, v.Tx.TxID, len(txs), txs)
case JournalRetrieveCommand:
log.Printf("reading txs = %p, len = %d", txs, len(txs))
var ret []*records.StoredTransaction
for _, tx := range txs {
if tx.WhenReceived >= v.From && tx.WhenReceived < v.Upto {
ret = append(ret, tx)
}
}
v.ResultChan <- ret
case JournalCheckCapacityCommand:
ret := cap(txs) == len(txs) && cap(txs) >= v.AtLeast
log.Printf("checking capacity, returning %v\n", ret)
v.ResultChan <- ret
case JournalDoneCommand:
log.Printf("was a done command %v\n", v)
v.NotifyMe <- struct{}{}
break whenDone
default:
log.Printf("not a valid journal command %v\n", x)
}
}
}()
return ret
}
JOURNAL_THREAD_COMPLETED:internal/storage/journal_thread.go
One of the interesting things that happens with these kinds of tests is that depending on how you arrange them, you can have different things that you can test. Here, we haven't copied across the wait point inside the loop, because if we did, it would block the thread, and then everything would end up deadlocked (because we couldn't then store another record). Instead, we have left that wait point back in the API, but there it does not block while the read is happening. In one sense, this is frustrating, but in a very real sense, we cannot test this behaviour for the very simple reason that
it cannot any longer go wrong in this way. In other words, our simple inability to write the test tells us that we have eliminated the possibility of the problem.
In doing this, I have finally reached a point I had been expecting to reach some time ago. Based on my previous experience, I expected to need to "name" the individual fault injection points, and then control them independently. Up until now, tI have been able to control the points individually. But now, I believe that this change does introduce a real race condition between the
Store and
Read at the end, so I have named them and allowed them to be controlled independently. An additional benefit is that I am now able to produce better tracing.
2025/01/09 20:02:11 launching new journal thread with channel 0xc00010e310
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc000148080, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc000148080
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc000148080)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc0001480c0, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc0001480c0
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 1 at 0xc00011c050
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc0001480c0)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc0001480e0, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc0001480e0
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 2 at 0xc000118810
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc0001480e0)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc000148100, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc000148100
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 3 at 0xc000180000
2025/01/09 20:02:11 checking capacity, returning false
2025/01/09 20:02:11 released(journal-store-tx, 0xc000148100)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc000180020, waiting ...
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 4 at 0xc000180000
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc000180020
2025/01/09 20:02:11 checking capacity, returning true
2025/01/09 20:02:11 next(journal-read-txs) allocated 0xc000180060, waiting ...
2025/01/09 20:02:11 allocated(journal-read-txs) was 0xc000180060
2025/01/09 20:02:11 released(journal-store-tx, 0xc000180020)
2025/01/09 20:02:11 next(journal-store-tx) allocated 0xc00007e020, waiting ...
2025/01/09 20:02:11 allocated(journal-store-tx) was 0xc00007e020
2025/01/09 20:02:11 released(journal-read-txs, 0xc000180060)
2025/01/09 20:02:11 journal recording tx with id [104 101 108 108 111], have 5 at 0xc00012a940
2025/01/09 20:02:11 reading txs = 0xc00012a940, len = 5
2025/01/09 20:02:11 [0xc000146230 0xc000146460 0xc000146690 0xc0001468c0 0xc0001920e0]
2025/01/09 20:02:11 next(journal-read-txs) allocated 0xc000148140, waiting ...
2025/01/09 20:02:11 allocated(journal-read-txs) was 0xc000148140
2025/01/09 20:02:11 released(journal-read-txs, 0xc000148140)
2025/01/09 20:02:11 reading txs = 0xc00012a940, len = 5
2025/01/09 20:02:11 [0xc000146230 0xc000146460 0xc000146690 0xc0001468c0 0xc0001920e0]
2025/01/09 20:02:11 next(pending-tx) allocated 0xc0001481c0, waiting ...
2025/01/09 20:02:11 allocated(pending-tx) was 0xc0001481c0
2025/01/09 20:02:11 non-allocated(pending-tx) after 50000000
2025/01/09 20:02:11 released(pending-tx, 0xc0001481c0)
2025/01/09 20:02:11 next(pending-tx) allocated 0xc000148200, waiting ...
2025/01/09 20:02:11 allocated(pending-tx) was 0xc000148200
2025/01/09 20:02:11 released(pending-tx, 0xc000148200)
PASS
Finally, here is what is left in the stubbed API:
type MemoryJournaller struct {
name string
tothread chan<- JournalCommand
finj helpers.FaultInjection
}
// RecordTx implements Journaller.
func (d *MemoryJournaller) RecordTx(tx *records.StoredTransaction) error {
d.finj.NextWaiter("journal-store-tx")
d.tothread <- JournalStoreCommand{Tx: tx}
return nil
}
func (d MemoryJournaller) ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error) {
messageMe := make(chan []*records.StoredTransaction)
d.finj.NextWaiter("journal-read-txs")
d.tothread <- JournalRetrieveCommand{From: from, Upto: upto, ResultChan: messageMe}
ret := <-messageMe
return ret, nil
}
func (d *MemoryJournaller) Quit() error {
return nil
}
func (d *MemoryJournaller) AtCapacityWithAtLeast(n int) bool {
messageMe := make(chan bool)
d.tothread <- JournalCheckCapacityCommand{AtLeast: n, ResultChan: messageMe}
return <-messageMe
}
func NewJournaller(name string) Journaller {
return TestJournaller(name, helpers.IgnoreFaultInjection())
}
func TestJournaller(name string, finj helpers.FaultInjection) Journaller {
ret := MemoryJournaller{name: name, finj: finj}
ret.tothread = LaunchJournalThread(name, finj)
return &ret
}
JOURNAL_THREAD_COMPLETED:internal/storage/journal.go
Performance
The question will naturally arise as to what the performance implications of this change are, and how they relate to that of using a mutex. The real answer is "you need to run a large-scale performance test", which we are not ready to do yet, but the simple answer is that when you are trying to synchronize threads, there is always going to be a "critical section" that you are explicitly saying only one thread can be in at any one time and you want to make that as small as possible. It seems to me that in this case the critical section consists of the
append in
Record and the
for..range in
Read. The thread here has very little overhead other than that, and I believe the channel queuing mechanism (we have configured it to have a queue of 20 messages) enables it to smooth out the requests.
The obvious problem is actually the implementation of the
Read method: this is scanning through all the messages, looking for ones that are in a given time range. If we want to improve the performance, we should probably decide "in advance" what the time bounds of the block are going to be, and keep a list of list of messages: one list for each block. It wouldn't surprise me if we came back to that in the fulness of time. But always remember that this is just a cheap "in-memory" implementation: what really counts is the production version built on a real database.