lbcd/blockchain/upgrade.go
Dave Collins a59ac5b18f
multi: Rework utxoset/view to use outpoints.
This modifies the utxoset in the database and related UtxoViewpoint to
store and work with unspent transaction outputs on a per-output basis
instead of at a transaction level.  This was inspired by similar recent
changes in Bitcoin Core.

The primary motivation is to simplify the code, pave the way for a
utxo cache, and generally focus on optimizing runtime performance.

The tradeoff is that this approach does somewhat increase the size of
the serialized utxoset since it means that the transaction hash is
duplicated for each output as a part of the key and some additional
details such as whether the containing transaction is a coinbase and the
block height it was a part of are duplicated in each output.

However, in practice, the size difference isn't all that large, disk
space is relatively cheap, certainly cheaper than memory, and it is much
more important to provide more efficient runtime operation since that is
the ultimate purpose of the daemon.

While performing this conversion, it also simplifies the code to remove
the transaction version information from the utxoset as well as the
spend journal.  The logic for only serializing it under certain
circumstances is complicated and it isn't actually used anywhere aside
from the gettxout RPC where it also isn't used by anything important
either.  Consequently, this also removes the version field of the
gettxout RPC result.

The utxos in the database are automatically migrated to the new format
with this commit and it is possible to interrupt and resume the
migration process.

Finally, it also updates the tests for the new format and adds a new
function to the tests to convert the old test data to the new format for
convenience.  The data has already been converted and updated in the
commit.

An overview of the changes are as follows:

- Remove transaction version from both spent and unspent output entries
  - Update utxo serialization format to exclude the version
  - Modify the spend journal serialization format
    - The old version field is now reserved and always stores zero and
      ignores it when reading
    - This allows old entries to be used by new code without having to
      migrate the entire spend journal
  - Remove version field from gettxout RPC result
- Convert UtxoEntry to represent a specific utxo instead of a
  transaction with all remaining utxos
  - Optimize for memory usage with an eye towards a utxo cache
    - Combine details such as whether the txout was contained in a
      coinbase, is spent, and is modified into a single packed field of
      bit flags
    - Align entry fields to eliminate extra padding since ultimately
      there will be a lot of these in memory
    - Introduce a free list for serializing an outpoint to the database
      key format to significantly reduce pressure on the GC
  - Update all related functions that previously dealt with transaction
    hashes to accept outpoints instead
  - Update all callers accordingly
  - Only add individually requested outputs from the mempool when
    constructing a mempool view
- Modify the spend journal to always store the block height and coinbase
  information with every spent txout
  - Introduce code to handle fetching the missing information from
    another utxo from the same transaction in the event an old style
    entry is encountered
    - Make use of a database cursor with seek to do this much more
      efficiently than testing every possible output
- Always decompress data loaded from the database now that a utxo entry
  only consists of a specific output
- Introduce upgrade code to migrate the utxo set to the new format
  - Store versions of the utxoset and spend journal buckets
  - Allow migration process to be interrupted and resumed
- Update all tests to expect the correct encodings, remove tests that no
  longer apply, and add new ones for the new expected behavior
  - Convert old tests for the legacy utxo format deserialization code to
    test the new function that is used during upgrade
  - Update the utxostore test data and add function that was used to
    convert it
- Introduce a few new functions on UtxoViewpoint
  - AddTxOut for adding an individual txout versus all of them
  - addTxOut to handle the common code between the new AddTxOut and
    existing AddTxOuts
  - RemoveEntry for removing an individual txout
  - fetchEntryByHash for fetching any remaining utxo for a given
    transaction hash
2018-05-27 03:07:41 -05:00

604 lines
20 KiB
Go

// Copyright (c) 2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package blockchain
import (
"bytes"
"container/list"
"errors"
"fmt"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/database"
"github.com/btcsuite/btcd/wire"
)
const (
// blockHdrOffset defines the offsets into a v1 block index row for the
// block header.
//
// The serialized block index row format is:
// <blocklocation><blockheader>
blockHdrOffset = 12
)
// errInterruptRequested indicates that an operation was cancelled due
// to a user-requested interrupt.
var errInterruptRequested = errors.New("interrupt requested")
// interruptRequested returns true when the provided channel has been closed.
// This simplifies early shutdown slightly since the caller can just use an if
// statement instead of a select.
func interruptRequested(interrupted <-chan struct{}) bool {
select {
case <-interrupted:
return true
default:
}
return false
}
// blockChainContext represents a particular block's placement in the block
// chain. This is used by the block index migration to track block metadata that
// will be written to disk.
type blockChainContext struct {
parent *chainhash.Hash
children []*chainhash.Hash
height int32
mainChain bool
}
// migrateBlockIndex migrates all block entries from the v1 block index bucket
// to the v2 bucket. The v1 bucket stores all block entries keyed by block hash,
// whereas the v2 bucket stores the exact same values, but keyed instead by
// block height + hash.
func migrateBlockIndex(db database.DB) error {
// Hardcoded bucket names so updates to the global values do not affect
// old upgrades.
v1BucketName := []byte("ffldb-blockidx")
v2BucketName := []byte("blockheaderidx")
err := db.Update(func(dbTx database.Tx) error {
v1BlockIdxBucket := dbTx.Metadata().Bucket(v1BucketName)
if v1BlockIdxBucket == nil {
return fmt.Errorf("Bucket %s does not exist", v1BucketName)
}
log.Info("Re-indexing block information in the database. This might take a while...")
v2BlockIdxBucket, err :=
dbTx.Metadata().CreateBucketIfNotExists(v2BucketName)
if err != nil {
return err
}
// Get tip of the main chain.
serializedData := dbTx.Metadata().Get(chainStateKeyName)
state, err := deserializeBestChainState(serializedData)
if err != nil {
return err
}
tip := &state.hash
// Scan the old block index bucket and construct a mapping of each block
// to parent block and all child blocks.
blocksMap, err := readBlockTree(v1BlockIdxBucket)
if err != nil {
return err
}
// Use the block graph to calculate the height of each block.
err = determineBlockHeights(blocksMap)
if err != nil {
return err
}
// Find blocks on the main chain with the block graph and current tip.
determineMainChainBlocks(blocksMap, tip)
// Now that we have heights for all blocks, scan the old block index
// bucket and insert all rows into the new one.
return v1BlockIdxBucket.ForEach(func(hashBytes, blockRow []byte) error {
endOffset := blockHdrOffset + blockHdrSize
headerBytes := blockRow[blockHdrOffset:endOffset:endOffset]
var hash chainhash.Hash
copy(hash[:], hashBytes[0:chainhash.HashSize])
chainContext := blocksMap[hash]
if chainContext.height == -1 {
return fmt.Errorf("Unable to calculate chain height for "+
"stored block %s", hash)
}
// Mark blocks as valid if they are part of the main chain.
status := statusDataStored
if chainContext.mainChain {
status |= statusValid
}
// Write header to v2 bucket
value := make([]byte, blockHdrSize+1)
copy(value[0:blockHdrSize], headerBytes)
value[blockHdrSize] = byte(status)
key := blockIndexKey(&hash, uint32(chainContext.height))
err := v2BlockIdxBucket.Put(key, value)
if err != nil {
return err
}
// Delete header from v1 bucket
truncatedRow := blockRow[0:blockHdrOffset:blockHdrOffset]
return v1BlockIdxBucket.Put(hashBytes, truncatedRow)
})
})
if err != nil {
return err
}
log.Infof("Block database migration complete")
return nil
}
// readBlockTree reads the old block index bucket and constructs a mapping of
// each block to its parent block and all child blocks. This mapping represents
// the full tree of blocks. This function does not populate the height or
// mainChain fields of the returned blockChainContext values.
func readBlockTree(v1BlockIdxBucket database.Bucket) (map[chainhash.Hash]*blockChainContext, error) {
blocksMap := make(map[chainhash.Hash]*blockChainContext)
err := v1BlockIdxBucket.ForEach(func(_, blockRow []byte) error {
var header wire.BlockHeader
endOffset := blockHdrOffset + blockHdrSize
headerBytes := blockRow[blockHdrOffset:endOffset:endOffset]
err := header.Deserialize(bytes.NewReader(headerBytes))
if err != nil {
return err
}
blockHash := header.BlockHash()
prevHash := header.PrevBlock
if blocksMap[blockHash] == nil {
blocksMap[blockHash] = &blockChainContext{height: -1}
}
if blocksMap[prevHash] == nil {
blocksMap[prevHash] = &blockChainContext{height: -1}
}
blocksMap[blockHash].parent = &prevHash
blocksMap[prevHash].children =
append(blocksMap[prevHash].children, &blockHash)
return nil
})
return blocksMap, err
}
// determineBlockHeights takes a map of block hashes to a slice of child hashes
// and uses it to compute the height for each block. The function assigns a
// height of 0 to the genesis hash and explores the tree of blocks
// breadth-first, assigning a height to every block with a path back to the
// genesis block. This function modifies the height field on the blocksMap
// entries.
func determineBlockHeights(blocksMap map[chainhash.Hash]*blockChainContext) error {
queue := list.New()
// The genesis block is included in blocksMap as a child of the zero hash
// because that is the value of the PrevBlock field in the genesis header.
preGenesisContext, exists := blocksMap[zeroHash]
if !exists || len(preGenesisContext.children) == 0 {
return fmt.Errorf("Unable to find genesis block")
}
for _, genesisHash := range preGenesisContext.children {
blocksMap[*genesisHash].height = 0
queue.PushBack(genesisHash)
}
for e := queue.Front(); e != nil; e = queue.Front() {
queue.Remove(e)
hash := e.Value.(*chainhash.Hash)
height := blocksMap[*hash].height
// For each block with this one as a parent, assign it a height and
// push to queue for future processing.
for _, childHash := range blocksMap[*hash].children {
blocksMap[*childHash].height = height + 1
queue.PushBack(childHash)
}
}
return nil
}
// determineMainChainBlocks traverses the block graph down from the tip to
// determine which block hashes that are part of the main chain. This function
// modifies the mainChain field on the blocksMap entries.
func determineMainChainBlocks(blocksMap map[chainhash.Hash]*blockChainContext, tip *chainhash.Hash) {
for nextHash := tip; *nextHash != zeroHash; nextHash = blocksMap[*nextHash].parent {
blocksMap[*nextHash].mainChain = true
}
}
// deserializeUtxoEntryV0 decodes a utxo entry from the passed serialized byte
// slice according to the legacy version 0 format into a map of utxos keyed by
// the output index within the transaction. The map is necessary because the
// previous format encoded all unspent outputs for a transaction using a single
// entry, whereas the new format encodes each unspent output individually.
//
// The legacy format is as follows:
//
// <version><height><header code><unspentness bitmap>[<compressed txouts>,...]
//
// Field Type Size
// version VLQ variable
// block height VLQ variable
// header code VLQ variable
// unspentness bitmap []byte variable
// compressed txouts
// compressed amount VLQ variable
// compressed script []byte variable
//
// The serialized header code format is:
// bit 0 - containing transaction is a coinbase
// bit 1 - output zero is unspent
// bit 2 - output one is unspent
// bits 3-x - number of bytes in unspentness bitmap. When both bits 1 and 2
// are unset, it encodes N-1 since there must be at least one unspent
// output.
//
// The rationale for the header code scheme is as follows:
// - Transactions which only pay to a single output and a change output are
// extremely common, thus an extra byte for the unspentness bitmap can be
// avoided for them by encoding those two outputs in the low order bits.
// - Given it is encoded as a VLQ which can encode values up to 127 with a
// single byte, that leaves 4 bits to represent the number of bytes in the
// unspentness bitmap while still only consuming a single byte for the
// header code. In other words, an unspentness bitmap with up to 120
// transaction outputs can be encoded with a single-byte header code.
// This covers the vast majority of transactions.
// - Encoding N-1 bytes when both bits 1 and 2 are unset allows an additional
// 8 outpoints to be encoded before causing the header code to require an
// additional byte.
//
// Example 1:
// From tx in main blockchain:
// Blk 1, 0e3e2357e806b6cdb1f70b54c3a3a17b6714ee1f0e68bebb44a74b1efd512098
//
// 010103320496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52
// <><><><------------------------------------------------------------------>
// | | \--------\ |
// | height | compressed txout 0
// version header code
//
// - version: 1
// - height: 1
// - header code: 0x03 (coinbase, output zero unspent, 0 bytes of unspentness)
// - unspentness: Nothing since it is zero bytes
// - compressed txout 0:
// - 0x32: VLQ-encoded compressed amount for 5000000000 (50 BTC)
// - 0x04: special script type pay-to-pubkey
// - 0x96...52: x-coordinate of the pubkey
//
// Example 2:
// From tx in main blockchain:
// Blk 113931, 4a16969aa4764dd7507fc1de7f0baa4850a246de90c45e59a3207f9a26b5036f
//
// 0185f90b0a011200e2ccd6ec7c6e2e581349c77e067385fa8236bf8a800900b8025be1b3efc63b0ad48e7f9f10e87544528d58
// <><----><><><------------------------------------------><-------------------------------------------->
// | | | \-------------------\ | |
// version | \--------\ unspentness | compressed txout 2
// height header code compressed txout 0
//
// - version: 1
// - height: 113931
// - header code: 0x0a (output zero unspent, 1 byte in unspentness bitmap)
// - unspentness: [0x01] (bit 0 is set, so output 0+2 = 2 is unspent)
// NOTE: It's +2 since the first two outputs are encoded in the header code
// - compressed txout 0:
// - 0x12: VLQ-encoded compressed amount for 20000000 (0.2 BTC)
// - 0x00: special script type pay-to-pubkey-hash
// - 0xe2...8a: pubkey hash
// - compressed txout 2:
// - 0x8009: VLQ-encoded compressed amount for 15000000 (0.15 BTC)
// - 0x00: special script type pay-to-pubkey-hash
// - 0xb8...58: pubkey hash
//
// Example 3:
// From tx in main blockchain:
// Blk 338156, 1b02d1c8cfef60a189017b9a420c682cf4a0028175f2f563209e4ff61c8c3620
//
// 0193d06c100000108ba5b9e763011dd46a006572d820e448e12d2bbb38640bc718e6
// <><----><><----><-------------------------------------------------->
// | | | \-----------------\ |
// version | \--------\ unspentness |
// height header code compressed txout 22
//
// - version: 1
// - height: 338156
// - header code: 0x10 (2+1 = 3 bytes in unspentness bitmap)
// NOTE: It's +1 since neither bit 1 nor 2 are set, so N-1 is encoded.
// - unspentness: [0x00 0x00 0x10] (bit 20 is set, so output 20+2 = 22 is unspent)
// NOTE: It's +2 since the first two outputs are encoded in the header code
// - compressed txout 22:
// - 0x8ba5b9e763: VLQ-encoded compressed amount for 366875659 (3.66875659 BTC)
// - 0x01: special script type pay-to-script-hash
// - 0x1d...e6: script hash
func deserializeUtxoEntryV0(serialized []byte) (map[uint32]*UtxoEntry, error) {
// Deserialize the version.
//
// NOTE: Ignore version since it is no longer used in the new format.
_, bytesRead := deserializeVLQ(serialized)
offset := bytesRead
if offset >= len(serialized) {
return nil, errDeserialize("unexpected end of data after version")
}
// Deserialize the block height.
blockHeight, bytesRead := deserializeVLQ(serialized[offset:])
offset += bytesRead
if offset >= len(serialized) {
return nil, errDeserialize("unexpected end of data after height")
}
// Deserialize the header code.
code, bytesRead := deserializeVLQ(serialized[offset:])
offset += bytesRead
if offset >= len(serialized) {
return nil, errDeserialize("unexpected end of data after header")
}
// Decode the header code.
//
// Bit 0 indicates whether the containing transaction is a coinbase.
// Bit 1 indicates output 0 is unspent.
// Bit 2 indicates output 1 is unspent.
// Bits 3-x encodes the number of non-zero unspentness bitmap bytes that
// follow. When both output 0 and 1 are spent, it encodes N-1.
isCoinBase := code&0x01 != 0
output0Unspent := code&0x02 != 0
output1Unspent := code&0x04 != 0
numBitmapBytes := code >> 3
if !output0Unspent && !output1Unspent {
numBitmapBytes++
}
// Ensure there are enough bytes left to deserialize the unspentness
// bitmap.
if uint64(len(serialized[offset:])) < numBitmapBytes {
return nil, errDeserialize("unexpected end of data for " +
"unspentness bitmap")
}
// Add sparse output for unspent outputs 0 and 1 as needed based on the
// details provided by the header code.
var outputIndexes []uint32
if output0Unspent {
outputIndexes = append(outputIndexes, 0)
}
if output1Unspent {
outputIndexes = append(outputIndexes, 1)
}
// Decode the unspentness bitmap adding a sparse output for each unspent
// output.
for i := uint32(0); i < uint32(numBitmapBytes); i++ {
unspentBits := serialized[offset]
for j := uint32(0); j < 8; j++ {
if unspentBits&0x01 != 0 {
// The first 2 outputs are encoded via the
// header code, so adjust the output number
// accordingly.
outputNum := 2 + i*8 + j
outputIndexes = append(outputIndexes, outputNum)
}
unspentBits >>= 1
}
offset++
}
// Map to hold all of the converted outputs.
entries := make(map[uint32]*UtxoEntry)
// All entries will need to potentially be marked as a coinbase.
var packedFlags txoFlags
if isCoinBase {
packedFlags |= tfCoinBase
}
// Decode and add all of the utxos.
for i, outputIndex := range outputIndexes {
// Decode the next utxo.
amount, pkScript, bytesRead, err := decodeCompressedTxOut(
serialized[offset:])
if err != nil {
return nil, errDeserialize(fmt.Sprintf("unable to "+
"decode utxo at index %d: %v", i, err))
}
offset += bytesRead
// Create a new utxo entry with the details deserialized above.
entries[outputIndex] = &UtxoEntry{
amount: int64(amount),
pkScript: pkScript,
blockHeight: int32(blockHeight),
packedFlags: packedFlags,
}
}
return entries, nil
}
// upgradeUtxoSetToV2 migrates the utxo set entries from version 1 to 2 in
// batches. It is guaranteed to updated if this returns without failure.
func upgradeUtxoSetToV2(db database.DB, interrupt <-chan struct{}) error {
// Hardcoded bucket names so updates to the global values do not affect
// old upgrades.
var (
v1BucketName = []byte("utxoset")
v2BucketName = []byte("utxosetv2")
)
log.Infof("Upgrading utxo set to v2. This will take a while...")
start := time.Now()
// Create the new utxo set bucket as needed.
err := db.Update(func(dbTx database.Tx) error {
_, err := dbTx.Metadata().CreateBucketIfNotExists(v2BucketName)
return err
})
if err != nil {
return err
}
// doBatch contains the primary logic for upgrading the utxo set from
// version 1 to 2 in batches. This is done because the utxo set can be
// huge and thus attempting to migrate in a single database transaction
// would result in massive memory usage and could potentially crash on
// many systems due to ulimits.
//
// It returns the number of utxos processed.
const maxUtxos = 200000
doBatch := func(dbTx database.Tx) (uint32, error) {
v1Bucket := dbTx.Metadata().Bucket(v1BucketName)
v2Bucket := dbTx.Metadata().Bucket(v2BucketName)
v1Cursor := v1Bucket.Cursor()
// Migrate utxos so long as the max number of utxos for this
// batch has not been exceeded.
var numUtxos uint32
for ok := v1Cursor.First(); ok && numUtxos < maxUtxos; ok =
v1Cursor.Next() {
// Old key was the transaction hash.
oldKey := v1Cursor.Key()
var txHash chainhash.Hash
copy(txHash[:], oldKey)
// Deserialize the old entry which included all utxos
// for the given transaction.
utxos, err := deserializeUtxoEntryV0(v1Cursor.Value())
if err != nil {
return 0, err
}
// Add an entry for each utxo into the new bucket using
// the new format.
for txOutIdx, utxo := range utxos {
reserialized, err := serializeUtxoEntry(utxo)
if err != nil {
return 0, err
}
key := outpointKey(wire.OutPoint{
Hash: txHash,
Index: txOutIdx,
})
err = v2Bucket.Put(*key, reserialized)
// NOTE: The key is intentionally not recycled
// here since the database interface contract
// prohibits modifications. It will be garbage
// collected normally when the database is done
// with it.
if err != nil {
return 0, err
}
}
// Remove old entry.
err = v1Bucket.Delete(oldKey)
if err != nil {
return 0, err
}
numUtxos += uint32(len(utxos))
if interruptRequested(interrupt) {
// No error here so the database transaction
// is not cancelled and therefore outstanding
// work is written to disk.
break
}
}
return numUtxos, nil
}
// Migrate all entries in batches for the reasons mentioned above.
var totalUtxos uint64
for {
var numUtxos uint32
err := db.Update(func(dbTx database.Tx) error {
var err error
numUtxos, err = doBatch(dbTx)
return err
})
if err != nil {
return err
}
if interruptRequested(interrupt) {
return errInterruptRequested
}
if numUtxos == 0 {
break
}
totalUtxos += uint64(numUtxos)
log.Infof("Migrated %d utxos (%d total)", numUtxos, totalUtxos)
}
// Remove the old bucket and update the utxo set version once it has
// been fully migrated.
err = db.Update(func(dbTx database.Tx) error {
err := dbTx.Metadata().DeleteBucket(v1BucketName)
if err != nil {
return err
}
return dbPutVersion(dbTx, utxoSetVersionKeyName, 2)
})
if err != nil {
return err
}
seconds := int64(time.Since(start) / time.Second)
log.Infof("Done upgrading utxo set. Total utxos: %d in %d seconds",
totalUtxos, seconds)
return nil
}
// maybeUpgradeDbBuckets checks the database version of the buckets used by this
// package and performs any needed upgrades to bring them to the latest version.
//
// All buckets used by this package are guaranteed to be the latest version if
// this function returns without error.
func (b *BlockChain) maybeUpgradeDbBuckets(interrupt <-chan struct{}) error {
// Load or create bucket versions as needed.
var utxoSetVersion uint32
err := b.db.Update(func(dbTx database.Tx) error {
// Load the utxo set version from the database or create it and
// initialize it to version 1 if it doesn't exist.
var err error
utxoSetVersion, err = dbFetchOrCreateVersion(dbTx,
utxoSetVersionKeyName, 1)
return err
})
if err != nil {
return err
}
// Update the utxo set to v2 if needed.
if utxoSetVersion < 2 {
if err := upgradeUtxoSetToV2(b.db, interrupt); err != nil {
return err
}
}
return nil
}