blockchain: Persist block status changes to disk.

The block index now tracks the set of dirty block nodes with status
changes that haven't been persisted and flushes the changes to the DB
at the appropriate times.
This commit is contained in:
Jim Posen 2017-09-14 14:41:21 -07:00 committed by Dave Collins
parent 31444f5890
commit 52cddc19cd
4 changed files with 134 additions and 46 deletions

View file

@ -60,12 +60,18 @@ func (b *BlockChain) maybeAcceptBlock(block *btcutil.Block, flags BehaviorFlags)
return false, err
}
// Create a new block node for the block and add it to the in-memory
// block chain (could be either a side chain or the main chain).
// Create a new block node for the block and add it to the node index. Even
// if the block ultimately gets connected to the main chain, it starts out
// on a side chain.
blockHeader := &block.MsgBlock().Header
newNode := newBlockNode(blockHeader, prevNode)
newNode.status = statusDataStored
b.index.AddNode(newNode)
err = b.index.flushToDB()
if err != nil {
return false, err
}
// Connect the passed block to the chain while respecting proper chain
// selection according to the chain with the most proof of work. This

View file

@ -231,6 +231,7 @@ type blockIndex struct {
sync.RWMutex
index map[chainhash.Hash]*blockNode
dirty map[*blockNode]struct{}
}
// newBlockIndex returns a new empty instance of a block index. The index will
@ -241,6 +242,7 @@ func newBlockIndex(db database.DB, chainParams *chaincfg.Params) *blockIndex {
db: db,
chainParams: chainParams,
index: make(map[chainhash.Hash]*blockNode),
dirty: make(map[*blockNode]struct{}),
}
}
@ -265,16 +267,25 @@ func (bi *blockIndex) LookupNode(hash *chainhash.Hash) *blockNode {
return node
}
// AddNode adds the provided node to the block index. Duplicate entries are not
// checked so it is up to caller to avoid adding them.
// AddNode adds the provided node to the block index and marks it as dirty.
// Duplicate entries are not checked so it is up to caller to avoid adding them.
//
// This function is safe for concurrent access.
func (bi *blockIndex) AddNode(node *blockNode) {
bi.Lock()
bi.index[node.hash] = node
bi.addNode(node)
bi.dirty[node] = struct{}{}
bi.Unlock()
}
// addNode adds the provided node to the block index, but does not mark it as
// dirty. This can be used while initializing the block index.
//
// This function is NOT safe for concurrent access.
func (bi *blockIndex) addNode(node *blockNode) {
bi.index[node.hash] = node
}
// NodeStatus provides concurrent-safe access to the status field of a node.
//
// This function is safe for concurrent access.
@ -293,6 +304,7 @@ func (bi *blockIndex) NodeStatus(node *blockNode) blockStatus {
func (bi *blockIndex) SetStatusFlags(node *blockNode, flags blockStatus) {
bi.Lock()
node.status |= flags
bi.dirty[node] = struct{}{}
bi.Unlock()
}
@ -303,5 +315,34 @@ func (bi *blockIndex) SetStatusFlags(node *blockNode, flags blockStatus) {
func (bi *blockIndex) UnsetStatusFlags(node *blockNode, flags blockStatus) {
bi.Lock()
node.status &^= flags
bi.dirty[node] = struct{}{}
bi.Unlock()
}
// flushToDB writes all dirty block nodes to the database. If all writes
// succeed, this clears the dirty set.
func (bi *blockIndex) flushToDB() error {
bi.Lock()
if len(bi.dirty) == 0 {
bi.Unlock()
return nil
}
err := bi.db.Update(func(dbTx database.Tx) error {
for node := range bi.dirty {
err := dbStoreBlockNode(dbTx, node)
if err != nil {
return err
}
}
return nil
})
// If write was successful, clear the dirty set.
if err == nil {
bi.dirty = make(map[*blockNode]struct{})
}
bi.Unlock()
return err
}

View file

@ -495,6 +495,8 @@ func LockTimeToSequence(isSeconds bool, locktime uint32) uint32 {
// passed node is the new end of the main chain. The lists will be empty if the
// passed node is not on a side chain.
//
// This function may modify node statuses in the block index without flushing.
//
// This function MUST be called with the chain state lock held (for reads).
func (b *BlockChain) getReorganizeNodes(node *blockNode) (*list.List, *list.List) {
attachNodes := list.New()
@ -585,6 +587,12 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, view *U
}
}
// Write any block status changes to DB before updating best state.
err := b.index.flushToDB()
if err != nil {
return err
}
// Generate a new best state snapshot that will be used to update the
// database and later memory if all database updates are successful.
b.stateLock.RLock()
@ -597,7 +605,7 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, view *U
curTotalTxns+numTxns, node.CalcPastMedianTime())
// Atomically insert info into the database.
err := b.db.Update(func(dbTx database.Tx) error {
err = b.db.Update(func(dbTx database.Tx) error {
// Update best block state.
err := dbPutBestState(dbTx, state, node.workSum)
if err != nil {
@ -691,6 +699,12 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
return err
}
// Write any block status changes to DB before updating best state.
err = b.index.flushToDB()
if err != nil {
return err
}
// Generate a new best state snapshot that will be used to update the
// database and later memory if all database updates are successful.
b.stateLock.RLock()
@ -792,6 +806,8 @@ func countSpentOutputs(block *btcutil.Block) int {
// the chain) and nodes the are being attached must be in forwards order
// (think pushing them onto the end of the chain).
//
// This function may modify node statuses in the block index without flushing.
//
// This function MUST be called with the chain state lock held (for writes).
func (b *BlockChain) reorganizeChain(detachNodes, attachNodes *list.List) error {
// All of the blocks to detach and related spend journal entries needed
@ -1030,13 +1046,26 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla
stxos := make([]spentTxOut, 0, countSpentOutputs(block))
if !fastAdd {
err := b.checkConnectBlock(node, block, view, &stxos)
if err != nil {
if _, ok := err.(RuleError); ok {
b.index.SetStatusFlags(node, statusValidateFailed)
}
if err == nil {
b.index.SetStatusFlags(node, statusValid)
} else if _, ok := err.(RuleError); ok {
b.index.SetStatusFlags(node, statusValidateFailed)
} else {
return false, err
}
// Intentionally ignore errors writing updated node status to DB. If
// it fails to write, it's not the end of the world. If the block is
// valid, we flush in connectBlock and if the block is invalid, the
// worst that can happen is we revalidate the block after a restart.
if writeErr := b.index.flushToDB(); writeErr != nil {
log.Warnf("Error flushing block index changes to disk: %v",
writeErr)
}
if err != nil {
return false, err
}
b.index.SetStatusFlags(node, statusValid)
}
// In the fast add case the code to check the block connection
@ -1097,11 +1126,16 @@ func (b *BlockChain) connectBestChain(node *blockNode, block *btcutil.Block, fla
// Reorganize the chain.
log.Infof("REORGANIZE: Block %v is causing a reorganize.", node.hash)
err := b.reorganizeChain(detachNodes, attachNodes)
if err != nil {
return false, err
// Either getReorganizeNodes or reorganizeChain could have made unsaved
// changes to the block index, so flush regardless of whether there was an
// error. The index would only be dirty if the block failed to connect, so
// we can ignore any errors writing.
if writeErr := b.index.flushToDB(); writeErr != nil {
log.Warnf("Error flushing block index changes to disk: %v", writeErr)
}
return true, nil
return err == nil, err
}
// isCurrent returns whether or not the chain believes it is current. Several

View file

@ -1076,7 +1076,7 @@ func (b *BlockChain) createChainState() error {
b.bestChain.SetTip(node)
// Add the new node to the index which is used for faster lookups.
b.index.AddNode(node)
b.index.addNode(node)
// Initialize the state related to the best block. Since it is the
// genesis block, use its timestamp for the median time.
@ -1150,8 +1150,7 @@ func (b *BlockChain) createChainState() error {
func (b *BlockChain) initChainState() error {
// Determine the state of the chain database. We may need to initialize
// everything from scratch or upgrade certain buckets.
var initialized bool
var hasBlockIndex bool
var initialized, hasBlockIndex bool
err := b.db.View(func(dbTx database.Tx) error {
initialized = dbTx.Metadata().Get(chainStateKeyName) != nil
hasBlockIndex = dbTx.Metadata().Bucket(blockIndexBucketName) != nil
@ -1209,9 +1208,7 @@ func (b *BlockChain) initChainState() error {
var lastNode *blockNode
cursor = blockIndexBucket.Cursor()
for ok := cursor.First(); ok; ok = cursor.Next() {
var header wire.BlockHeader
headerBytes := cursor.Value()
err := header.Deserialize(bytes.NewReader(headerBytes))
header, status, err := deserializeBlockRow(cursor.Value())
if err != nil {
return err
}
@ -1243,9 +1240,9 @@ func (b *BlockChain) initChainState() error {
// Initialize the block node for the block, connect it,
// and add it to the block index.
node := &blockNodes[i]
initBlockNode(node, &header, parent)
node.status = statusDataStored | statusValid
b.index.AddNode(node)
initBlockNode(node, header, parent)
node.status = status
b.index.addNode(node)
lastNode = node
i++
@ -1281,6 +1278,25 @@ func (b *BlockChain) initChainState() error {
})
}
// deserializeBlockRow parses a value in the block index bucket into a block
// header and block status bitfield.
func deserializeBlockRow(blockRow []byte) (*wire.BlockHeader, blockStatus, error) {
buffer := bytes.NewReader(blockRow)
var header wire.BlockHeader
err := header.Deserialize(buffer)
if err != nil {
return nil, statusNone, err
}
statusByte, err := buffer.ReadByte()
if err != nil {
return nil, statusNone, err
}
return &header, blockStatus(statusByte), nil
}
// dbFetchHeaderByHash uses an existing database transaction to retrieve the
// block header for the provided hash.
func dbFetchHeaderByHash(dbTx database.Tx, hash *chainhash.Hash) (*wire.BlockHeader, error) {
@ -1329,40 +1345,31 @@ func dbFetchBlockByNode(dbTx database.Tx, node *blockNode) (*btcutil.Block, erro
return block, nil
}
// dbStoreBlock stores the provided block in the database. The block header is
// written to the block index bucket and full block data is written to ffldb.
func dbStoreBlockHeader(dbTx database.Tx, blockHeader *wire.BlockHeader, height uint32) error {
// Serialize block data to be stored. This is just the serialized header.
w := bytes.NewBuffer(make([]byte, 0, blockHdrSize))
err := blockHeader.Serialize(w)
// dbStoreBlockNode stores the block header and validation status to the block
// index bucket. This overwrites the current entry if there exists one.
func dbStoreBlockNode(dbTx database.Tx, node *blockNode) error {
// Serialize block data to be stored.
w := bytes.NewBuffer(make([]byte, 0, blockHdrSize+1))
header := node.Header()
err := header.Serialize(w)
if err != nil {
return err
}
err = w.WriteByte(byte(node.status))
if err != nil {
return err
}
value := w.Bytes()
// Write block header data to block index bucket.
blockHash := blockHeader.BlockHash()
blockIndexBucket := dbTx.Metadata().Bucket(blockIndexBucketName)
key := blockIndexKey(&blockHash, height)
key := blockIndexKey(&node.hash, uint32(node.height))
return blockIndexBucket.Put(key, value)
}
// dbStoreBlock stores the provided block in the database. The block header is
// written to the block index bucket and full block data is written to ffldb.
// dbStoreBlock stores the provided block in the database if it is not already
// there. The full block data is written to ffldb.
func dbStoreBlock(dbTx database.Tx, block *btcutil.Block) error {
if block.Height() == btcutil.BlockHeightUnknown {
return fmt.Errorf("cannot store block %s with unknown height",
block.Hash())
}
// First store block header in the block index bucket.
err := dbStoreBlockHeader(dbTx, &block.MsgBlock().Header,
uint32(block.Height()))
if err != nil {
return err
}
// Then store block data in ffldb if we haven't already.
hasBlock, err := dbTx.HasBlock(block.Hash())
if err != nil {
return err