Since we last did any work with blocks, we've totally rewritten the memory journaller. Now we need to update it again so that we can store the block we created. Hopefully, this is nice and simple and we can be out of here in just five minutes. Let's see - this is why engineers always hate estimating.
The first step is to add the three calls to store a block each time we generate one:
func (builder *SleepBlockBuilder) Run() {
blocktime := builder.clock.Time()
timer := builder.clock.After(delay)
lastBlock, err := builder.blocker.Build(blocktime, nil, nil)
if err != nil {
panic("error returned from building block 0")
}
builder.journaller.RecordBlock(lastBlock)
for {
prev := blocktime
select {
case pingback := <-builder.control:
log.Printf("%s asked to build final block and quit", builder.Name.String())
lastBlock = builder.buildBlock(prev, builder.clock.Time(), lastBlock)
builder.journaller.RecordBlock(lastBlock)
pingback.Send()
return
case blocktime = <-timer:
timer = builder.clock.After(delay)
nowis := <-builder.clock.After(pause)
// we are ready to build a block
log.Printf("%s building block at %s", builder.Name.String(), nowis.IsoTime())
lastBlock = builder.buildBlock(prev, blocktime, lastBlock)
builder.journaller.RecordBlock(lastBlock)
}
}
}
STORE_BLOCK:internal/block/builder.go
And we need to add the RecordBlock method to the Journaller interface:type Journaller interface {
RecordTx(tx *records.StoredTransaction) error
RecordBlock(block *records.Block) error
ReadTransactionsBetween(from types.Timestamp, upto types.Timestamp) ([]*records.StoredTransaction, error)
Quit() error
}
STORE_BLOCK:internal/storage/journal.go
And then we need to create a struct to allow it to be a "command" to be sent to the journaller goroutine:type JournalBlockCommand struct {
Block *records.Block
}
STORE_BLOCK:internal/storage/journal_thread.go
And then implement the MemoryJournaller API RecordBlock method to create one of these and sent it down the channel:func (d *MemoryJournaller) RecordBlock(block *records.Block) error {
d.tothread <- JournalBlockCommand{Block: block}
return nil
}
STORE_BLOCK:internal/storage/journal.go
And finally actually process this command when it comes through:func LaunchJournalThread(name string, finj helpers.FaultInjection) chan<- JournalCommand {
var txs []*records.StoredTransaction
var blocks []*records.Block
ret := make(chan JournalCommand, 20)
log.Printf("launching new journal thread with channel %p", ret)
go func() {
whenDone:
for {
x := <-ret
switch v := x.(type) {
case JournalStoreCommand:
txs = append(txs, v.Tx)
log.Printf("%s recording tx with id %v, have %d at %p", name, v.Tx.TxID, len(txs), txs)
case JournalBlockCommand:
blocks = append(blocks, v.Block)
log.Printf("%s recording block with id %v, have %d at %p", name, v.Block.ID, len(blocks), blocks)
case JournalRetrieveCommand:
log.Printf("reading txs = %p, len = %d", txs, len(txs))
var ret []*records.StoredTransaction
for _, tx := range txs {
if tx.WhenReceived >= v.From && tx.WhenReceived < v.Upto {
ret = append(ret, tx)
}
}
v.ResultChan <- ret
case JournalCheckCapacityCommand:
ret := cap(txs) == len(txs) && cap(txs) >= v.AtLeast
log.Printf("checking capacity, returning %v\n", ret)
v.ResultChan <- ret
case JournalDoneCommand:
log.Printf("was a done command %v\n", v)
v.NotifyMe <- struct{}{}
break whenDone
default:
log.Printf("not a valid journal command %v\n", x)
}
}
}()
return ret
}
STORE_BLOCK:internal/storage/journal_thread.go
For once, that was (almost) as easy as I'd thought. There were a few other things that I had to move around to make sure that the harness closed down the journal thread and waited for that to finish before allowing the node, and thus the harness, to finish, and to make the journaller object available where it was needed, but on the whole, not too bad ...
No comments:
Post a Comment