blockchain: Reconstruct headers from block nodes.

This modifies the block node structure to include a couple of extra
fields needed to be able to reconstruct the block header from a node,
and exposes a new function from chain to fetch the block headers which
takes advantage of the new functionality to reconstruct the headers from
memory when possible.  Finally, it updates both the p2p and RPC servers
to make use of the new function.

This is useful since many of the block header fields need to be kept in
order to form the block index anyways and storing the extra fields means
the database does not have to be consulted when headers are requested if
the associated node is still in memory.

The following timings show representative performance gains as measured
from one system:

new: Time to fetch 100000 headers:   59ms
old: Time to fetch 100000 headers: 4783ms
This commit is contained in:
Dave Collins 2017-01-31 00:36:15 -06:00
parent d9241e91a9
commit 59169540c3
No known key found for this signature in database
GPG key ID: B8904D9D9C93D1F2
3 changed files with 80 additions and 83 deletions

View file

@ -59,10 +59,15 @@ type blockNode struct {
// ancestor when switching chains.
inMainChain bool
// Some fields from block headers to aid in best chain selection.
version int32
bits uint32
timestamp int64
// Some fields from block headers to aid in best chain selection and
// reconstructing headers from memory. These must be treated as
// immutable and are intentionally ordered to avoid padding on 64-bit
// platforms.
version int32
bits uint32
nonce uint32
timestamp int64
merkleRoot chainhash.Hash
}
// newBlockNode returns a new block node for the given block header. It is
@ -81,11 +86,28 @@ func newBlockNode(blockHeader *wire.BlockHeader, blockHash *chainhash.Hash, heig
height: height,
version: blockHeader.Version,
bits: blockHeader.Bits,
nonce: blockHeader.Nonce,
timestamp: blockHeader.Timestamp.Unix(),
merkleRoot: blockHeader.MerkleRoot,
}
return &node
}
// Header constructs a block header from the node and returns it.
//
// This function is safe for concurrent access.
func (node *blockNode) Header() wire.BlockHeader {
// No lock is needed because all accessed fields are immutable.
return wire.BlockHeader{
Version: node.version,
PrevBlock: *node.parentHash,
MerkleRoot: node.merkleRoot,
Timestamp: time.Unix(node.timestamp, 0),
Bits: node.bits,
Nonce: node.nonce,
}
}
// orphanBlock represents a block that we don't yet have the parent for. It
// is a normal block plus an expiration time to prevent caching the orphan
// forever.
@ -1561,6 +1583,30 @@ func (b *BlockChain) BestSnapshot() *BestState {
return snapshot
}
// FetchHeader returns the block header identified by the given hash or an error
// if it doesn't exist.
func (b *BlockChain) FetchHeader(hash *chainhash.Hash) (wire.BlockHeader, error) {
// Reconstruct the header from the block index if possible.
b.chainLock.RLock()
node, ok := b.index[*hash]
b.chainLock.RUnlock()
if ok {
return node.Header(), nil
}
// Fall back to loading it from the database.
var header *wire.BlockHeader
err := b.db.View(func(dbTx database.Tx) error {
var err error
header, err = dbFetchHeaderByHash(dbTx, hash)
return err
})
if err != nil {
return wire.BlockHeader{}, err
}
return *header, nil
}
// IndexManager provides a generic interface that the is called when blocks are
// connected and disconnected to and from the tip of the main chain for the
// purpose of supporting optional indexes.

View file

@ -1243,17 +1243,12 @@ func handleGetBlockHash(s *rpcServer, cmd interface{}, closeChan <-chan struct{}
func handleGetBlockHeader(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetBlockHeaderCmd)
// Load the raw header bytes from the database.
// Fetch the header from chain.
hash, err := chainhash.NewHashFromStr(c.Hash)
if err != nil {
return nil, rpcDecodeHexError(c.Hash)
}
var headerBytes []byte
err = s.server.db.View(func(dbTx database.Tx) error {
var err error
headerBytes, err = dbTx.FetchBlockHeader(hash)
return err
})
blockHeader, err := s.chain.FetchHeader(hash)
if err != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCBlockNotFound,
@ -1264,19 +1259,17 @@ func handleGetBlockHeader(s *rpcServer, cmd interface{}, closeChan <-chan struct
// When the verbose flag isn't set, simply return the serialized block
// header as a hex-encoded string.
if c.Verbose != nil && !*c.Verbose {
return hex.EncodeToString(headerBytes), nil
var headerBuf bytes.Buffer
err := blockHeader.Serialize(&headerBuf)
if err != nil {
context := "Failed to serialize block header"
return nil, internalRPCError(err.Error(), context)
}
return hex.EncodeToString(headerBuf.Bytes()), nil
}
// The verbose flag is set, so generate the JSON object and return it.
// Deserialize the header.
var blockHeader wire.BlockHeader
err = blockHeader.Deserialize(bytes.NewReader(headerBytes))
if err != nil {
context := "Failed to deserialize block header"
return nil, internalRPCError(err.Error(), context)
}
// Get the block height from chain.
blockHeight, err := s.chain.BlockHeightByHash(hash)
if err != nil {
@ -2150,7 +2143,7 @@ func handleGetHeaders(s *rpcServer, cmd interface{}, closeChan <-chan struct{})
"headers: " + err.Error(),
}
}
blockHeaders, err := fetchHeaders(s.server.db, blockHashes)
blockHeaders, err := fetchHeaders(s.chain, blockHashes)
if err != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCDatabase,
@ -2315,26 +2308,13 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru
return nil, internalRPCError(err.Error(), context)
}
// Load the raw header bytes.
var headerBytes []byte
err = s.server.db.View(func(dbTx database.Tx) error {
var err error
headerBytes, err = dbTx.FetchBlockHeader(hash)
return err
})
// Fetch the header from chain.
header, err := s.chain.FetchHeader(hash)
if err != nil {
context := "Failed to fetch block header"
return nil, internalRPCError(err.Error(), context)
}
// Deserialize the header.
var header wire.BlockHeader
err = header.Deserialize(bytes.NewReader(headerBytes))
if err != nil {
context := "Failed to deserialize block header"
return nil, internalRPCError(err.Error(), context)
}
if curHeight == startHeight {
minTimestamp = header.Timestamp
maxTimestamp = minTimestamp
@ -2521,26 +2501,13 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str
var blkHashStr string
var chainHeight int32
if blkHash != nil {
// Load the raw header bytes.
var headerBytes []byte
err := s.server.db.View(func(dbTx database.Tx) error {
var err error
headerBytes, err = dbTx.FetchBlockHeader(blkHash)
return err
})
// Fetch the header from chain.
header, err := s.chain.FetchHeader(blkHash)
if err != nil {
context := "Failed to fetch block header"
return nil, internalRPCError(err.Error(), context)
}
// Deserialize the header.
var header wire.BlockHeader
err = header.Deserialize(bytes.NewReader(headerBytes))
if err != nil {
context := "Failed to deserialize block header"
return nil, internalRPCError(err.Error(), context)
}
blkHeader = &header
blkHashStr = blkHash.String()
chainHeight = s.chain.BestSnapshot().Height
@ -3161,13 +3128,8 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan
var blkHashStr string
var blkHeight int32
if blkHash := rtx.blkHash; blkHash != nil {
// Load the raw header bytes from the database.
var headerBytes []byte
err := s.server.db.View(func(dbTx database.Tx) error {
var err error
headerBytes, err = dbTx.FetchBlockHeader(blkHash)
return err
})
// Fetch the header from chain.
header, err := s.chain.FetchHeader(blkHash)
if err != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCBlockNotFound,
@ -3175,14 +3137,6 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan
}
}
// Deserialize the block header.
var header wire.BlockHeader
err = header.Deserialize(bytes.NewReader(headerBytes))
if err != nil {
context := "Failed to deserialize block header"
return nil, internalRPCError(err.Error(), context)
}
// Get the block height from chain.
height, err := s.chain.BlockHeightByHash(blkHash)
if err != nil {

View file

@ -726,24 +726,17 @@ func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Ha
// fetchHeaders fetches and decodes headers from the db for each hash in
// blockHashes.
func fetchHeaders(db database.DB, blockHashes []chainhash.Hash) ([]*wire.BlockHeader, error) {
headers := make([]*wire.BlockHeader, 0, len(blockHashes))
err := db.View(func(dbTx database.Tx) error {
rawHeaders, err := dbTx.FetchBlockHeaders(blockHashes)
func fetchHeaders(chain *blockchain.BlockChain, blockHashes []chainhash.Hash) ([]wire.BlockHeader, error) {
headers := make([]wire.BlockHeader, 0, len(blockHashes))
for i := range blockHashes {
header, err := chain.FetchHeader(&blockHashes[i])
if err != nil {
return err
return nil, err
}
for _, headerBytes := range rawHeaders {
h := new(wire.BlockHeader)
err = h.Deserialize(bytes.NewReader(headerBytes))
if err != nil {
return err
}
headers = append(headers, h)
}
return nil
})
return headers, err
headers = append(headers, header)
}
return headers, nil
}
// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
@ -760,12 +753,16 @@ func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
peerLog.Errorf("OnGetHeaders: failed to fetch hashes: %v", err)
return
}
blockHeaders, err := fetchHeaders(sp.server.db, blockHashes)
headers, err := fetchHeaders(sp.server.blockManager.chain, blockHashes)
if err != nil {
peerLog.Errorf("OnGetHeaders: failed to fetch block headers: "+
"%v", err)
return
}
blockHeaders := make([]*wire.BlockHeader, len(headers))
for i := range headers {
blockHeaders[i] = &headers[i]
}
if len(blockHeaders) > wire.MaxBlockHeadersPerMsg {
peerLog.Warnf("OnGetHeaders: fetched more block headers than " +