From 7c174620f7631612bd73d9f9f1df6e1afcb0f9c9 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 18 Feb 2016 22:51:18 -0600 Subject: [PATCH] indexers: Implement optional tx/address indexes. This introduces a new indexing infrastructure for supporting optional indexes using the new database and blockchain infrastructure along with two concrete indexer implementations which provide both a transaction-by-hash and a transaction-by-address index. The new infrastructure is mostly separated into a package named indexers which is housed under the blockchain package. In order to support this, a new interface named IndexManager has been introduced in the blockchain package which provides methods to be notified when the chain has been initialized and when blocks are connected and disconnected from the main chain. A concrete implementation of an index manager is provided by the new indexers package. The new indexers package also provides a new interface named Indexer which allows the index manager to manage concrete index implementations which conform to the interface. The following is high level overview of the main index infrastructure changes: - Define a new IndexManager interface in the blockchain package and modify the package to make use of the interface when specified - Create a new indexers package - Provides an Index interface which allows concrete indexes to plugin to an index manager - Provides a concrete IndexManager implementation - Handles the lifecycle of all indexes it manages - Tracks the index tips - Handles catching up disabled indexes that have been reenabled - Handles reorgs while the index was disabled - Invokes the appropriate methods for all managed indexes to allow them to index and deindex the blocks and transactions - Implement a transaction-by-hash index - Makes use of internal block IDs to save a significant amount of space and indexing costs over the old transaction index format - Implement a transaction-by-address index - Makes use of a leveling scheme in order to provide a good tradeoff between space required and indexing costs - Supports enabling and disabling indexes at will - Support the ability to drop indexes if they are no longer desired The following is an overview of the btcd changes: - Add a new index logging subsystem - Add new options --txindex and --addrindex in order to enable the optional indexes - NOTE: The transaction index will automatically be enabled when the address index is enabled because it depends on it - Add new options --droptxindex and --dropaddrindex to allow the indexes to be removed - NOTE: The address index will also be removed when the transaction index is dropped because it depends on it - Update getrawtransactions RPC to make use of the transaction index - Reimplement the searchrawtransaction RPC that makes use of the address index - Update sample-btcd.conf to include sample usage for the new optional index flags --- blockchain/chain.go | 71 +- blockchain/chainio.go | 13 + blockchain/indexers/README.md | 44 ++ blockchain/indexers/addrindex.go | 932 ++++++++++++++++++++++++++ blockchain/indexers/addrindex_test.go | 275 ++++++++ blockchain/indexers/blocklogger.go | 76 +++ blockchain/indexers/common.go | 90 +++ blockchain/indexers/log.go | 30 + blockchain/indexers/manager.go | 653 ++++++++++++++++++ blockchain/indexers/txindex.go | 477 +++++++++++++ blockmanager.go | 8 +- btcd.go | 22 + config.go | 44 +- database2/ffldb/db.go | 10 +- log.go | 7 + mempool.go | 25 +- rpcserver.go | 733 +++++++++++++++++--- sample-btcd.conf | 13 + server.go | 44 +- 19 files changed, 3472 insertions(+), 95 deletions(-) create mode 100644 blockchain/indexers/README.md create mode 100644 blockchain/indexers/addrindex.go create mode 100644 blockchain/indexers/addrindex_test.go create mode 100644 blockchain/indexers/blocklogger.go create mode 100644 blockchain/indexers/common.go create mode 100644 blockchain/indexers/log.go create mode 100644 blockchain/indexers/manager.go create mode 100644 blockchain/indexers/txindex.go diff --git a/blockchain/chain.go b/blockchain/chain.go index 1026c2e4..aacda4b8 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -166,6 +166,7 @@ type BlockChain struct { chainParams *chaincfg.Params notifications NotificationCallback sigCache *txscript.SigCache + indexManager IndexManager // chainLock protects concurrent access to the vast majority of the // fields in this struct below this point. @@ -732,6 +733,20 @@ func (b *BlockChain) getReorganizeNodes(node *blockNode) (*list.List, *list.List return detachNodes, attachNodes } +// dbMaybeStoreBlock stores the provided block in the database if it's not +// already there. +func dbMaybeStoreBlock(dbTx database.Tx, block *btcutil.Block) error { + hasBlock, err := dbTx.HasBlock(block.Sha()) + if err != nil { + return err + } + if hasBlock { + return nil + } + + return dbTx.StoreBlock(block) +} + // connectBlock handles connecting the passed node/block to the end of the main // (best) chain. // @@ -797,12 +812,19 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, view *U } // Insert the block into the database if it's not already there. - hasBlock, err := dbTx.HasBlock(block.Sha()) + err = dbMaybeStoreBlock(dbTx, block) if err != nil { return err } - if !hasBlock { - return dbTx.StoreBlock(block) + + // Allow the index manager to call each of the currently active + // optional indexes with the block being connected so they can + // update themselves accordingly. + if b.indexManager != nil { + err := b.indexManager.ConnectBlock(dbTx, block, view) + if err != nil { + return err + } } return nil @@ -913,6 +935,16 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view return err } + // Allow the index manager to call each of the currently active + // optional indexes with the block being disconnected so they + // can update themselves accordingly. + if b.indexManager != nil { + err := b.indexManager.DisconnectBlock(dbTx, block, view) + if err != nil { + return err + } + } + return nil }) if err != nil { @@ -1339,6 +1371,23 @@ func (b *BlockChain) BestSnapshot() *BestState { return snapshot } +// IndexManager provides a generic interface that the is called when blocks are +// connected and disconnected to and from the tip of the main chain for the +// purpose of supporting optional indexes. +type IndexManager interface { + // Init is invoked during chain initialize in order to allow the index + // manager to initialize itself and any indexes it is managing. + Init(*BlockChain) error + + // ConnectBlock is invoked when a new block has been connected to the + // main chain. + ConnectBlock(database.Tx, *btcutil.Block, *UtxoViewpoint) error + + // DisconnectBlock is invoked when a block has been disconnected from + // the main chain. + DisconnectBlock(database.Tx, *btcutil.Block, *UtxoViewpoint) error +} + // Config is a descriptor which specifies the blockchain instance configuration. type Config struct { // DB defines the database which houses the blocks and will be used to @@ -1370,6 +1419,13 @@ type Config struct { // This field can be nil if the caller is not interested in using a // signature cache. SigCache *txscript.SigCache + + // IndexManager defines an index manager to use when initializing the + // chain and connecting and disconnecting blocks. + // + // This field can be nil if the caller does not wish to make use of an + // index manager. + IndexManager IndexManager } // New returns a BlockChain instance using the provided configuration details. @@ -1399,6 +1455,7 @@ func New(config *Config) (*BlockChain, error) { chainParams: params, notifications: config.Notifications, sigCache: config.SigCache, + indexManager: config.IndexManager, root: nil, bestNode: nil, index: make(map[wire.ShaHash]*blockNode), @@ -1415,6 +1472,14 @@ func New(config *Config) (*BlockChain, error) { return nil, err } + // Initialize and catch up all of the currently active optional indexes + // as needed. + if config.IndexManager != nil { + if err := config.IndexManager.Init(&b); err != nil { + return nil, err + } + } + log.Infof("Chain state (height %d, hash %v, totaltx %d, work %v)", b.bestNode.height, b.bestNode.hash, b.stateSnapshot.TotalTxns, b.bestNode.workSum) diff --git a/blockchain/chainio.go b/blockchain/chainio.go index 02869366..531814b3 100644 --- a/blockchain/chainio.go +++ b/blockchain/chainio.go @@ -1286,6 +1286,19 @@ func dbMainChainHasBlock(dbTx database.Tx, hash *wire.ShaHash) bool { return hashIndex.Get(hash[:]) != nil } +// MainChainHasBlock returns whether or not the block with the given hash is in +// the main chain. +// +// This function is safe for concurrent access. +func (b *BlockChain) MainChainHasBlock(hash *wire.ShaHash) (bool, error) { + var exists bool + err := b.db.View(func(dbTx database.Tx) error { + exists = dbMainChainHasBlock(dbTx, hash) + return nil + }) + return exists, err +} + // BlockHeightByHash returns the height of the block with the given hash in the // main chain. // diff --git a/blockchain/indexers/README.md b/blockchain/indexers/README.md new file mode 100644 index 00000000..8ec12e2b --- /dev/null +++ b/blockchain/indexers/README.md @@ -0,0 +1,44 @@ +indexers +======== + +[![Build Status](https://travis-ci.org/btcsuite/btcd.png?branch=master)] +(https://travis-ci.org/btcsuite/btcd) + +Package indexers implements optional block chain indexes. + +These indexes are typically used to enhance the amount of information available +via an RPC interface. + +## Supported Indexers + +- Transaction-by-hash (txbyhashidx) Index + - Creates a mapping from the hash of each transaction to the block that + contains it along with its offset and length within the serialized block +- Transaction-by-address (txbyaddridx) Index + - Creates a mapping from every address to all transactions which either credit + or debit the address + - Requires the transaction-by-hash index + +## Documentation + +[![GoDoc](https://godoc.org/github.com/btcsuite/btcd/blockchain/indexers?status.png)] +(http://godoc.org/github.com/btcsuite/btcd/blockchain/indexers) + +Full `go doc` style documentation for the project can be viewed online without +installing this package by using the GoDoc site here: +http://godoc.org/github.com/btcsuite/btcd/blockchain/indexers + +You can also view the documentation locally once the package is installed with +the `godoc` tool by running `godoc -http=":6060"` and pointing your browser to +http://localhost:6060/pkg/github.com/btcsuite/btcd/blockchain/indexers + +## Installation + +```bash +$ go get -u github.com/btcsuite/btcd/blockchain/indexers +``` + +## License + +Package indexers is licensed under the [copyfree](http://copyfree.org) ISC +License. diff --git a/blockchain/indexers/addrindex.go b/blockchain/indexers/addrindex.go new file mode 100644 index 00000000..ce146ad9 --- /dev/null +++ b/blockchain/indexers/addrindex.go @@ -0,0 +1,932 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "errors" + "fmt" + "sync" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +const ( + // addrIndexName is the human-readable name for the index. + addrIndexName = "address index" + + // level0MaxEntries is the maximum number of transactions that are + // stored in level 0 of an address index entry. Subsequent levels store + // 2^n * level0MaxEntries entries, or in words, double the maximum of + // the previous level. + level0MaxEntries = 8 + + // addrKeySize is the number of bytes an address key consumes in the + // index. It consists of 1 byte address type + 20 bytes hash160. + addrKeySize = 1 + 20 + + // levelKeySize is the number of bytes a level key in the address index + // consumes. It consists of the address key + 1 byte for the level. + levelKeySize = addrKeySize + 1 + + // levelOffset is the offset in the level key which identifes the level. + levelOffset = levelKeySize - 1 + + // addrKeyTypePubKeyHash is the address type in an address key which + // represents both a pay-to-pubkey-hash and a pay-to-pubkey address. + // This is done because both are identical for the purposes of the + // address index. + addrKeyTypePubKeyHash = 0 + + // addrKeyTypeScriptHash is the address type in an address key which + // represents a pay-to-script-hash address. This is necessary because + // the hash of a pubkey address might be the same as that of a script + // hash. + addrKeyTypeScriptHash = 1 + + // Size of a transaction entry. It consists of 4 bytes block id + 4 + // bytes offset + 4 bytes length. + txEntrySize = 4 + 4 + 4 +) + +var ( + // addrIndexKey is the key of the address index and the db bucket used + // to house it. + addrIndexKey = []byte("txbyaddridx") + + // errUnsupportedAddressType is an error that is used to signal an + // unsupported address type has been used. + errUnsupportedAddressType = errors.New("address type is not supported " + + "by the address index") +) + +// ----------------------------------------------------------------------------- +// The address index maps addresses referenced in the blockchain to a list of +// all the transactions involving that address. Transactions are stored +// according to their order of appearance in the blockchain. That is to say +// first by block height and then by offset inside the block. It is also +// important to note that this implementation requires the transaction index +// since it is needed in order to catch up old blocks due to the fact the spent +// outputs will already be pruned from the utxo set. +// +// The approach used to store the index is similar to a log-structured merge +// tree (LSM tree) and is thus similar to how leveldb works internally. +// +// Every address consists of one or more entries identified by a level starting +// from 0 where each level holds a maximum number of entries such that each +// subsequent level holds double the maximum of the previous one. In equation +// form, the number of entries each level holds is 2^n * firstLevelMaxSize. +// +// New transactions are appended to level 0 until it becomes full at which point +// the entire level 0 entry is appended to the level 1 entry and level 0 is +// cleared. This process continues until level 1 becomes full at which point it +// will be appended to level 2 and cleared and so on. +// +// The result of this is the lower levels contain newer transactions and the +// transactions within each level are ordered from oldest to newest. +// +// The intent of this approach is to provide a balance between space efficiency +// and indexing cost. Storing one entry per transaction would have the lowest +// indexing cost, but would waste a lot of space because the same address hash +// would be duplicated for every transaction key. On the other hand, storing a +// single entry with all transactions would be the most space efficient, but +// would cause indexing cost to grow quadratically with the number of +// transactions involving the same address. The approach used here provides +// logarithmic insertion and retrieval. +// +// The serialized key format is: +// +// +// +// Field Type Size +// addr type uint8 1 byte +// addr hash hash160 20 bytes +// level uint8 1 byte +// ----- +// Total: 22 bytes +// +// The serialized value format is: +// +// [,...] +// +// Field Type Size +// block id uint32 4 bytes +// start offset uint32 4 bytes +// tx length uint32 4 bytes +// ----- +// Total: 12 bytes per indexed tx +// ----------------------------------------------------------------------------- + +// fetchBlockHashFunc defines a callback function to use in order to convert a +// serialized block ID to an associated block hash. +type fetchBlockHashFunc func(serializedID []byte) (*wire.ShaHash, error) + +// serializeAddrIndexEntry serializes the provided block id and transaction +// location according to the format described in detail above. +func serializeAddrIndexEntry(blockID uint32, txLoc wire.TxLoc) []byte { + // Serialize the entry. + serialized := make([]byte, 12) + byteOrder.PutUint32(serialized, blockID) + byteOrder.PutUint32(serialized[4:], uint32(txLoc.TxStart)) + byteOrder.PutUint32(serialized[8:], uint32(txLoc.TxLen)) + return serialized +} + +// deserializeAddrIndexEntry decodes the passed serialized byte slice into the +// provided region struct according to the format described in detail above and +// uses the passed block hash fetching function in order to conver the block ID +// to the associated block hash. +func deserializeAddrIndexEntry(serialized []byte, region *database.BlockRegion, fetchBlockHash fetchBlockHashFunc) error { + // Ensure there are enough bytes to decode. + if len(serialized) < txEntrySize { + return errDeserialize("unexpected end of data") + } + + hash, err := fetchBlockHash(serialized[0:4]) + if err != nil { + return err + } + region.Hash = hash + region.Offset = byteOrder.Uint32(serialized[4:8]) + region.Len = byteOrder.Uint32(serialized[8:12]) + return nil +} + +// keyForLevel returns the key for a specific address and level in the address +// index entry. +func keyForLevel(addrKey [addrKeySize]byte, level uint8) [levelKeySize]byte { + var key [levelKeySize]byte + copy(key[:], addrKey[:]) + key[levelOffset] = level + return key +} + +// dbPutAddrIndexEntry updates the address index to include the provided entry +// according to the level-based scheme described in detail above. +func dbPutAddrIndexEntry(bucket internalBucket, addrKey [addrKeySize]byte, blockID uint32, txLoc wire.TxLoc) error { + // Start with level 0 and its initial max number of entries. + curLevel := uint8(0) + maxLevelBytes := level0MaxEntries * txEntrySize + + // Simply append the new entry to level 0 and return now when it will + // fit. This is the most common path. + newData := serializeAddrIndexEntry(blockID, txLoc) + level0Key := keyForLevel(addrKey, 0) + level0Data := bucket.Get(level0Key[:]) + if len(level0Data)+len(newData) <= maxLevelBytes { + mergedData := newData + if len(level0Data) > 0 { + mergedData = make([]byte, len(level0Data)+len(newData)) + copy(mergedData, level0Data) + copy(mergedData[len(level0Data):], newData) + } + return bucket.Put(level0Key[:], mergedData) + } + + // At this point, level 0 is full, so merge each level into higher + // levels as many times as needed to free up level 0. + prevLevelData := level0Data + for { + // Each new level holds twice as much as the previous one. + curLevel++ + maxLevelBytes *= 2 + + // Move to the next level as long as the current level is full. + curLevelKey := keyForLevel(addrKey, curLevel) + curLevelData := bucket.Get(curLevelKey[:]) + if len(curLevelData) == maxLevelBytes { + prevLevelData = curLevelData + continue + } + + // The current level has room for the data in the previous one, + // so merge the data from previous level into it. + mergedData := prevLevelData + if len(curLevelData) > 0 { + mergedData = make([]byte, len(curLevelData)+ + len(prevLevelData)) + copy(mergedData, curLevelData) + copy(mergedData[len(curLevelData):], prevLevelData) + } + err := bucket.Put(curLevelKey[:], mergedData) + if err != nil { + return err + } + + // Move all of the levels before the previous one up a level. + for mergeLevel := curLevel - 1; mergeLevel > 0; mergeLevel-- { + mergeLevelKey := keyForLevel(addrKey, mergeLevel) + prevLevelKey := keyForLevel(addrKey, mergeLevel-1) + prevData := bucket.Get(prevLevelKey[:]) + err := bucket.Put(mergeLevelKey[:], prevData) + if err != nil { + return err + } + } + break + } + + // Finally, insert the new entry into level 0 now that it is empty. + return bucket.Put(level0Key[:], newData) +} + +// dbFetchAddrIndexEntries returns block regions for transactions referenced by +// the given address key and the number of entries skipped since it could have +// been less in the case where there are less total entries than the requested +// number of entries to skip. +func dbFetchAddrIndexEntries(bucket internalBucket, addrKey [addrKeySize]byte, numToSkip, numRequested uint32, reverse bool, fetchBlockHash fetchBlockHashFunc) ([]database.BlockRegion, uint32, error) { + // When the reverse flag is not set, all levels need to be fetched + // because numToSkip and numRequested are counted from the oldest + // transactions (highest level) and thus the total count is needed. + // However, when the reverse flag is set, only enough records to satisfy + // the requested amount are needed. + var level uint8 + var serialized []byte + for !reverse || len(serialized) < int(numToSkip+numRequested)*txEntrySize { + curLevelKey := keyForLevel(addrKey, level) + levelData := bucket.Get(curLevelKey[:]) + if levelData == nil { + // Stop when there are no more levels. + break + } + + // Higher levels contain older transactions, so prepend them. + prepended := make([]byte, len(serialized)+len(levelData)) + copy(prepended, levelData) + copy(prepended[len(levelData):], serialized) + serialized = prepended + level++ + } + + // When the requested number of entries to skip is larger than the + // number available, skip them all and return now with the actual number + // skipped. + numEntries := uint32(len(serialized) / txEntrySize) + if numToSkip >= numEntries { + return nil, numEntries, nil + } + + // Nothing more to do when there are no requested entries. + if numRequested == 0 { + return nil, numToSkip, nil + } + + // Limit the number to load based on the number of available entries, + // the number to skip, and the number requested. + numToLoad := numEntries - numToSkip + if numToLoad > numRequested { + numToLoad = numRequested + } + + // Start the offset after all skipped entries and load the calculated + // number. + results := make([]database.BlockRegion, numToLoad) + for i := uint32(0); i < numToLoad; i++ { + // Calculate the read offset according to the reverse flag. + var offset uint32 + if reverse { + offset = (numEntries - numToSkip - i - 1) * txEntrySize + } else { + offset = (numToSkip + i) * txEntrySize + } + + // Deserialize and populate the result. + err := deserializeAddrIndexEntry(serialized[offset:], + &results[i], fetchBlockHash) + if err != nil { + // Ensure any deserialization errors are returned as + // database corruption errors. + if isDeserializeErr(err) { + err = database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("failed to "+ + "deserialized address index "+ + "for key %x: %v", addrKey, err), + } + } + + return nil, 0, err + } + } + + return results, numToSkip, nil +} + +// minEntriesToReachLevel returns the minimum number of entries that are +// required to reach the given address index level. +func minEntriesToReachLevel(level uint8) int { + maxEntriesForLevel := level0MaxEntries + minRequired := 1 + for l := uint8(1); l <= level; l++ { + minRequired += maxEntriesForLevel + maxEntriesForLevel *= 2 + } + return minRequired +} + +// maxEntriesForLevel returns the maximum number of entries allowed for the +// given address index level. +func maxEntriesForLevel(level uint8) int { + numEntries := level0MaxEntries + for l := level; l > 0; l-- { + numEntries *= 2 + } + return numEntries +} + +// dbRemoveAddrIndexEntries removes the specified number of entries from from +// the address index for the provided key. An assertion error will be returned +// if the count exceeds the total number of entries in the index. +func dbRemoveAddrIndexEntries(bucket internalBucket, addrKey [addrKeySize]byte, count int) error { + // Nothing to do if no entries are being deleted. + if count <= 0 { + return nil + } + + // Make use of a local map to track pending updates and define a closure + // to apply it to the database. This is done in order to reduce the + // number of database reads and because there is more than one exit + // path that needs to apply the updates. + pendingUpdates := make(map[uint8][]byte) + applyPending := func() error { + for level, data := range pendingUpdates { + curLevelKey := keyForLevel(addrKey, level) + if len(data) == 0 { + err := bucket.Delete(curLevelKey[:]) + if err != nil { + return err + } + continue + } + err := bucket.Put(curLevelKey[:], data) + if err != nil { + return err + } + } + return nil + } + + // Loop fowards through the levels while removing entries until the + // specified number has been removed. This will potentially result in + // entirely empty lower levels which will be backfilled below. + var highestLoadedLevel uint8 + numRemaining := count + for level := uint8(0); numRemaining > 0; level++ { + // Load the data for the level from the database. + curLevelKey := keyForLevel(addrKey, level) + curLevelData := bucket.Get(curLevelKey[:]) + if len(curLevelData) == 0 && numRemaining > 0 { + return AssertError(fmt.Sprintf("dbRemoveAddrIndexEntries "+ + "not enough entries for address key %x to "+ + "delete %d entries", addrKey, count)) + } + pendingUpdates[level] = curLevelData + highestLoadedLevel = level + + // Delete the entire level as needed. + numEntries := len(curLevelData) / txEntrySize + if numRemaining >= numEntries { + pendingUpdates[level] = nil + numRemaining -= numEntries + continue + } + + // Remove remaining entries to delete from the level. + offsetEnd := len(curLevelData) - (numRemaining * txEntrySize) + pendingUpdates[level] = curLevelData[:offsetEnd] + break + } + + // When all elements in level 0 were not removed there is nothing left + // to do other than updating the database. + if len(pendingUpdates[0]) != 0 { + return applyPending() + } + + // At this point there are one or more empty levels before the current + // level which need to be backfilled and the current level might have + // had some entries deleted from it as well. Since all levels after + // level 0 are required to either be empty, half full, or completely + // full, the current level must be adjusted accordingly by backfilling + // each previous levels in a way which satisfies the requirements. Any + // entries that are left are assigned to level 0 after the loop as they + // are guaranteed to fit by the logic in the loop. In other words, this + // effectively squashes all remaining entries in the current level into + // the lowest possible levels while following the level rules. + // + // Note that the level after the current level might also have entries + // and gaps are not allowed, so this also keeps track of the lowest + // empty level so the code below knows how far to backfill in case it is + // required. + lowestEmptyLevel := uint8(255) + curLevelData := pendingUpdates[highestLoadedLevel] + curLevelMaxEntries := maxEntriesForLevel(highestLoadedLevel) + for level := highestLoadedLevel; level > 0; level-- { + // When there are not enough entries left in the current level + // for the number that would be required to reach it, clear the + // the current level which effectively moves them all up to the + // previous level on the next iteration. Otherwise, there are + // are sufficient entries, so update the current level to + // contain as many entries as possible while still leaving + // enough remaining entries required to reach the level. + numEntries := len(curLevelData) / txEntrySize + prevLevelMaxEntries := curLevelMaxEntries / 2 + minPrevRequired := minEntriesToReachLevel(level - 1) + if numEntries < prevLevelMaxEntries+minPrevRequired { + lowestEmptyLevel = level + pendingUpdates[level] = nil + } else { + // This level can only be completely full or half full, + // so choose the appropriate offset to ensure enough + // entries remain to reach the level. + var offset int + if numEntries-curLevelMaxEntries >= minPrevRequired { + offset = curLevelMaxEntries * txEntrySize + } else { + offset = prevLevelMaxEntries * txEntrySize + } + pendingUpdates[level] = curLevelData[:offset] + curLevelData = curLevelData[offset:] + } + + curLevelMaxEntries = prevLevelMaxEntries + } + pendingUpdates[0] = curLevelData + if len(curLevelData) == 0 { + lowestEmptyLevel = 0 + } + + // When the highest loaded level is empty, it's possible the level after + // it still has data and thus that data needs to be backfilled as well. + for len(pendingUpdates[highestLoadedLevel]) == 0 { + // When the next level is empty too, the is no data left to + // continue backfilling, so there is nothing left to do. + // Otherwise, populate the pending updates map with the newly + // loaded data and update the highest loaded level accordingly. + level := highestLoadedLevel + 1 + curLevelKey := keyForLevel(addrKey, level) + levelData := bucket.Get(curLevelKey[:]) + if len(levelData) == 0 { + break + } + pendingUpdates[level] = levelData + highestLoadedLevel = level + + // At this point the highest level is not empty, but it might + // be half full. When that is the case, move it up a level to + // simplify the code below which backfills all lower levels that + // are still empty. This also means the current level will be + // empty, so the loop will perform another another iteration to + // potentially backfill this level with data from the next one. + curLevelMaxEntries := maxEntriesForLevel(level) + if len(levelData)/txEntrySize != curLevelMaxEntries { + pendingUpdates[level] = nil + pendingUpdates[level-1] = levelData + level-- + curLevelMaxEntries /= 2 + } + + // Backfill all lower levels that are still empty by iteratively + // halfing the data until the lowest empty level is filled. + for level > lowestEmptyLevel { + offset := (curLevelMaxEntries / 2) * txEntrySize + pendingUpdates[level] = levelData[:offset] + levelData = levelData[offset:] + pendingUpdates[level-1] = levelData + level-- + curLevelMaxEntries /= 2 + } + + // The lowest possible empty level is now the highest loaded + // level. + lowestEmptyLevel = highestLoadedLevel + } + + // Apply the pending updates. + return applyPending() +} + +// addrToKey converts known address types to an addrindex key. An error is +// returned for unsupported types. +func addrToKey(addr btcutil.Address) ([addrKeySize]byte, error) { + switch addr := addr.(type) { + case *btcutil.AddressPubKeyHash: + var result [addrKeySize]byte + result[0] = addrKeyTypePubKeyHash + copy(result[1:], addr.Hash160()[:]) + return result, nil + + case *btcutil.AddressScriptHash: + var result [addrKeySize]byte + result[0] = addrKeyTypeScriptHash + copy(result[1:], addr.Hash160()[:]) + return result, nil + + case *btcutil.AddressPubKey: + var result [addrKeySize]byte + result[0] = addrKeyTypePubKeyHash + copy(result[1:], addr.AddressPubKeyHash().Hash160()[:]) + return result, nil + } + + return [addrKeySize]byte{}, errUnsupportedAddressType +} + +// AddrIndex implements a transaction by address index. That is to say, it +// supports querying all transactions that reference a given address because +// they are either crediting or debiting the address. The returned transactions +// are ordered according to their order of appearance in the blockchain. In +// other words, first by block height and then by offset inside the block. +// +// In addition, support is provided for a memory-only index of unconfirmed +// transactions such as those which are kept in the memory pool before inclusion +// in a block. +type AddrIndex struct { + // The following fields are set when the instance is created and can't + // be changed afterwards, so there is no need to protect them with a + // separate mutex. + db database.DB + chainParams *chaincfg.Params + + // The following fields are used to quickly link transactions and + // addresses that have not been included into a block yet when an + // address index is being maintained. The are protected by the + // unconfirmedLock field. + // + // The txnsByAddr field is used to keep an index of all transactions + // which either create an output to a given address or spend from a + // previous output to it keyed by the address. + // + // The addrsByTx field is essentially the reverse and is used to + // keep an index of all addresses which a given transaction involves. + // This allows fairly efficient updates when transactions are removed + // once they are included into a block. + unconfirmedLock sync.RWMutex + txnsByAddr map[[addrKeySize]byte]map[wire.ShaHash]*btcutil.Tx + addrsByTx map[wire.ShaHash]map[[addrKeySize]byte]struct{} +} + +// Ensure the AddrIndex type implements the Indexer interface. +var _ Indexer = (*AddrIndex)(nil) + +// Ensure the AddrIndex type implements the NeedsInputser interface. +var _ NeedsInputser = (*AddrIndex)(nil) + +// NeedsInputs signals that the index requires the referenced inputs in order +// to properly create the index. +// +// This implements the NeedsInputser interface. +func (idx *AddrIndex) NeedsInputs() bool { + return true +} + +// Init is only provided to satisfy the Indexer interface as there is nothing to +// initialize for this index. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Init() error { + // Nothing to do. + return nil +} + +// Key returns the database key to use for the index as a byte slice. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Key() []byte { + return addrIndexKey +} + +// Name returns the human-readable name of the index. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Name() string { + return addrIndexName +} + +// Create is invoked when the indexer manager determines the index needs +// to be created for the first time. It creates the bucket for the address +// index. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) Create(dbTx database.Tx) error { + _, err := dbTx.Metadata().CreateBucket(addrIndexKey) + return err +} + +// writeIndexData represents the address index data to be written for one block. +// It consistens of the address mapped to an ordered list of the transactions +// that involve the address in block. It is ordered so the transactions can be +// stored in the order they appear in the block. +type writeIndexData map[[addrKeySize]byte][]int + +// indexPkScript extracts all standard addresses from the passed public key +// script and maps each of them to the associated transaction using the passed +// map. +func (idx *AddrIndex) indexPkScript(data writeIndexData, pkScript []byte, txIdx int) { + // Nothing to index if the script is non-standard or otherwise doesn't + // contain any addresses. + _, addrs, _, err := txscript.ExtractPkScriptAddrs(pkScript, + idx.chainParams) + if err != nil || len(addrs) == 0 { + return + } + + for _, addr := range addrs { + addrKey, err := addrToKey(addr) + if err != nil { + // Ignore unsupported address types. + continue + } + + // Avoid inserting the transaction more than once. Since the + // transactions are indexed serially any duplicates will be + // indexed in a row, so checking the most recent entry for the + // address is enough to detect duplicates. + indexedTxns := data[addrKey] + numTxns := len(indexedTxns) + if numTxns > 0 && indexedTxns[numTxns-1] == txIdx { + continue + } + indexedTxns = append(indexedTxns, txIdx) + data[addrKey] = indexedTxns + } +} + +// indexBlock extract all of the standard addresses from all of the transactions +// in the passed block and maps each of them to the assocaited transaction using +// the passed map. +func (idx *AddrIndex) indexBlock(data writeIndexData, block *btcutil.Block, view *blockchain.UtxoViewpoint) { + for txIdx, tx := range block.Transactions() { + // Coinbases do not reference any inputs. Since the block is + // required to have already gone through full validation, it has + // already been proven on the first transaction in the block is + // a coinbase. + if txIdx != 0 { + for _, txIn := range tx.MsgTx().TxIn { + // The view should always have the input since + // the index contract requires it, however, be + // safe and simply ignore any missing entries. + origin := &txIn.PreviousOutPoint + entry := view.LookupEntry(&origin.Hash) + if entry == nil { + continue + } + + pkScript := entry.PkScriptByIndex(origin.Index) + idx.indexPkScript(data, pkScript, txIdx) + } + } + + for _, txOut := range tx.MsgTx().TxOut { + idx.indexPkScript(data, txOut.PkScript, txIdx) + } + } +} + +// ConnectBlock is invoked by the index manager when a new block has been +// connected to the main chain. This indexer adds a mapping for each address +// the transactions in the block involve. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // The offset and length of the transactions within the serialized + // block. + txLocs, err := block.TxLoc() + if err != nil { + return err + } + + // Get the internal block ID associated with the block. + blockID, err := dbFetchBlockIDByHash(dbTx, block.Sha()) + if err != nil { + return err + } + + // Build all of the address to transaction mappings in a local map. + addrsToTxns := make(writeIndexData) + idx.indexBlock(addrsToTxns, block, view) + + // Add all of the index entries for each address. + addrIdxBucket := dbTx.Metadata().Bucket(addrIndexKey) + for addrKey, txIdxs := range addrsToTxns { + for _, txIdx := range txIdxs { + err := dbPutAddrIndexEntry(addrIdxBucket, addrKey, + blockID, txLocs[txIdx]) + if err != nil { + return err + } + } + } + + return nil +} + +// DisconnectBlock is invoked by the index manager when a block has been +// disconnected from the main chain. This indexer removes the address mappings +// each transaction in the block involve. +// +// This is part of the Indexer interface. +func (idx *AddrIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Build all of the address to transaction mappings in a local map. + addrsToTxns := make(writeIndexData) + idx.indexBlock(addrsToTxns, block, view) + + // Remove all of the index entries for each address. + bucket := dbTx.Metadata().Bucket(addrIndexKey) + for addrKey, txIdxs := range addrsToTxns { + err := dbRemoveAddrIndexEntries(bucket, addrKey, len(txIdxs)) + if err != nil { + return err + } + } + + return nil +} + +// TxRegionsForAddress returns a slice of block regions which identify each +// transaction that involves the passed address according to the specified +// number to skip, number requested, and whether or not the results should be +// reversed. It also returns the number actually skipped since it could be less +// in the case where there are not enough entries. +// +// NOTE: These results only include transactions confirmed in blocks. See the +// UnconfirmedTxnsForAddress method for obtaining unconfirmed transactions +// that involve a given address. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) TxRegionsForAddress(dbTx database.Tx, addr btcutil.Address, numToSkip, numRequested uint32, reverse bool) ([]database.BlockRegion, uint32, error) { + addrKey, err := addrToKey(addr) + if err != nil { + return nil, 0, err + } + + var regions []database.BlockRegion + var skipped uint32 + err = idx.db.View(func(dbTx database.Tx) error { + // Create closure to lookup the block hash given the ID using + // the database transaction. + fetchBlockHash := func(id []byte) (*wire.ShaHash, error) { + // Deserialize and populate the result. + return dbFetchBlockHashBySerializedID(dbTx, id) + } + + var err error + addrIdxBucket := dbTx.Metadata().Bucket(addrIndexKey) + regions, skipped, err = dbFetchAddrIndexEntries(addrIdxBucket, + addrKey, numToSkip, numRequested, reverse, + fetchBlockHash) + return err + }) + + return regions, skipped, err +} + +// indexUnconfirmedAddresses modifies the unconfirmed (memory-only) address +// index to include mappings for the addresses encoded by the passed public key +// script to the transaction. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) indexUnconfirmedAddresses(pkScript []byte, tx *btcutil.Tx) { + // The error is ignored here since the only reason it can fail is if the + // script fails to parse and it was already validated before being + // admitted to the mempool. + _, addresses, _, _ := txscript.ExtractPkScriptAddrs(pkScript, + idx.chainParams) + for _, addr := range addresses { + // Ignore unsupported address types. + addrKey, err := addrToKey(addr) + if err != nil { + continue + } + + // Add a mapping from the address to the transaction. + idx.unconfirmedLock.Lock() + addrIndexEntry := idx.txnsByAddr[addrKey] + if addrIndexEntry == nil { + addrIndexEntry = make(map[wire.ShaHash]*btcutil.Tx) + idx.txnsByAddr[addrKey] = addrIndexEntry + } + addrIndexEntry[*tx.Sha()] = tx + + // Add a mapping from the transaction to the address. + addrsByTxEntry := idx.addrsByTx[*tx.Sha()] + if addrsByTxEntry == nil { + addrsByTxEntry = make(map[[addrKeySize]byte]struct{}) + idx.addrsByTx[*tx.Sha()] = addrsByTxEntry + } + addrsByTxEntry[addrKey] = struct{}{} + idx.unconfirmedLock.Unlock() + } +} + +// AddUnconfirmedTx adds all addresses related to the transaction to the +// unconfirmed (memory-only) address index. +// +// NOTE: This transaction MUST have already been validated by the memory pool +// before calling this function with it and have all of the inputs available in +// the provided utxo view. Failure to do so could result in some or all +// addresses not being indexed. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) AddUnconfirmedTx(tx *btcutil.Tx, utxoView *blockchain.UtxoViewpoint) { + // Index addresses of all referenced previous transaction outputs. + // + // The existence checks are elided since this is only called after the + // transaction has already been validated and thus all inputs are + // already known to exist. + for _, txIn := range tx.MsgTx().TxIn { + entry := utxoView.LookupEntry(&txIn.PreviousOutPoint.Hash) + if entry == nil { + // Ignore missing entries. This should never happen + // in practice since the function comments specifically + // call out all inputs must be available. + continue + } + pkScript := entry.PkScriptByIndex(txIn.PreviousOutPoint.Index) + idx.indexUnconfirmedAddresses(pkScript, tx) + } + + // Index addresses of all created outputs. + for _, txOut := range tx.MsgTx().TxOut { + idx.indexUnconfirmedAddresses(txOut.PkScript, tx) + } +} + +// RemoveUnconfirmedTx removes the passed transaction from the unconfirmed +// (memory-only) address index. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) RemoveUnconfirmedTx(hash *wire.ShaHash) { + idx.unconfirmedLock.Lock() + defer idx.unconfirmedLock.Unlock() + + // Remove all address references to the transaction from the address + // index and remove the entry for the address altogether if it no longer + // references any transactions. + for addrKey := range idx.addrsByTx[*hash] { + delete(idx.txnsByAddr[addrKey], *hash) + if len(idx.txnsByAddr[addrKey]) == 0 { + delete(idx.txnsByAddr, addrKey) + } + } + + // Remove the entry from the transaction to address lookup map as well. + delete(idx.addrsByTx, *hash) +} + +// UnconfirmedTxnsForAddress returns all transactions currently in the +// unconfirmed (memory-only) address index that involve the passed address. +// Unsupported address types are ignored and will result in no results. +// +// This function is safe for concurrent access. +func (idx *AddrIndex) UnconfirmedTxnsForAddress(addr btcutil.Address) []*btcutil.Tx { + // Ignore unsupported address types. + addrKey, err := addrToKey(addr) + if err != nil { + return nil + } + + // Protect concurrent access. + idx.unconfirmedLock.RLock() + defer idx.unconfirmedLock.RUnlock() + + // Return a new slice with the results if there are any. This ensures + // safe concurrency. + if txns, exists := idx.txnsByAddr[addrKey]; exists { + addressTxns := make([]*btcutil.Tx, 0, len(txns)) + for _, tx := range txns { + addressTxns = append(addressTxns, tx) + } + return addressTxns + } + + return nil +} + +// NewAddrIndex returns a new instance of an indexer that is used to create a +// mapping of all addresses in the blockchain to the respective transactions +// that involve them. +// +// It implements the Indexer interface which plugs into the IndexManager that in +// turn is used by the blockchain package. This allows the index to be +// seamlessly maintained along with the chain. +func NewAddrIndex(db database.DB, chainParams *chaincfg.Params) *AddrIndex { + return &AddrIndex{ + db: db, + chainParams: chainParams, + txnsByAddr: make(map[[addrKeySize]byte]map[wire.ShaHash]*btcutil.Tx), + addrsByTx: make(map[wire.ShaHash]map[[addrKeySize]byte]struct{}), + } +} + +// DropAddrIndex drops the address index from the provided database if it +// exists. +func DropAddrIndex(db database.DB) error { + return dropIndex(db, addrIndexKey, addrIndexName) +} diff --git a/blockchain/indexers/addrindex_test.go b/blockchain/indexers/addrindex_test.go new file mode 100644 index 00000000..92fce538 --- /dev/null +++ b/blockchain/indexers/addrindex_test.go @@ -0,0 +1,275 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "bytes" + "fmt" + "testing" + + "github.com/btcsuite/btcd/wire" +) + +// addrIndexBucket provides a mock address index database bucket by implementing +// the internalBucket interface. +type addrIndexBucket struct { + levels map[[levelKeySize]byte][]byte +} + +// Clone returns a deep copy of the mock adress index bucket. +func (b *addrIndexBucket) Clone() *addrIndexBucket { + levels := make(map[[levelKeySize]byte][]byte) + for k, v := range b.levels { + vCopy := make([]byte, len(v)) + copy(vCopy, v) + levels[k] = vCopy + } + return &addrIndexBucket{levels: levels} +} + +// Get returns the value associated with the key from the mock address index +// bucket. +// +// This is part of the internalBucket interface. +func (b *addrIndexBucket) Get(key []byte) []byte { + var levelKey [levelKeySize]byte + copy(levelKey[:], key) + return b.levels[levelKey] +} + +// Put stores the provided key/value pair to the mock address index bucket. +// +// This is part of the internalBucket interface. +func (b *addrIndexBucket) Put(key []byte, value []byte) error { + var levelKey [levelKeySize]byte + copy(levelKey[:], key) + b.levels[levelKey] = value + return nil +} + +// Delete removes the provided key from the mock address index bucket. +// +// This is part of the internalBucket interface. +func (b *addrIndexBucket) Delete(key []byte) error { + var levelKey [levelKeySize]byte + copy(levelKey[:], key) + delete(b.levels, levelKey) + return nil +} + +// printLevels returns a string with a visual representation of the provided +// address key taking into account the max size of each level. It is useful +// when creating and debugging test cases. +func (b *addrIndexBucket) printLevels(addrKey [addrKeySize]byte) string { + highestLevel := uint8(0) + for k := range b.levels { + if !bytes.Equal(k[:levelOffset], addrKey[:]) { + continue + } + level := uint8(k[levelOffset]) + if level > highestLevel { + highestLevel = level + } + } + + var levelBuf bytes.Buffer + _, _ = levelBuf.WriteString("\n") + maxEntries := level0MaxEntries + for level := uint8(0); level <= highestLevel; level++ { + data := b.levels[keyForLevel(addrKey, level)] + numEntries := len(data) / txEntrySize + for i := 0; i < numEntries; i++ { + start := i * txEntrySize + num := byteOrder.Uint32(data[start:]) + _, _ = levelBuf.WriteString(fmt.Sprintf("%02d ", num)) + } + for i := numEntries; i < maxEntries; i++ { + _, _ = levelBuf.WriteString("_ ") + } + _, _ = levelBuf.WriteString("\n") + maxEntries *= 2 + } + + return levelBuf.String() +} + +// sanityCheck ensures that all data stored in the bucket for the given address +// adheres to the level-based rules described by the address index +// documentation. +func (b *addrIndexBucket) sanityCheck(addrKey [addrKeySize]byte, expectedTotal int) error { + // Find the highest level for the key. + highestLevel := uint8(0) + for k := range b.levels { + if !bytes.Equal(k[:levelOffset], addrKey[:]) { + continue + } + level := uint8(k[levelOffset]) + if level > highestLevel { + highestLevel = level + } + } + + // Ensure the expected total number of entries are present and that + // all levels adhere to the rules described in the address index + // documentation. + var totalEntries int + maxEntries := level0MaxEntries + for level := uint8(0); level <= highestLevel; level++ { + // Level 0 can'have more entries than the max allowed if the + // levels after it have data and it can't be empty. All other + // levels must either be half full or full. + data := b.levels[keyForLevel(addrKey, level)] + numEntries := len(data) / txEntrySize + totalEntries += numEntries + if level == 0 { + if (highestLevel != 0 && numEntries == 0) || + numEntries > maxEntries { + + return fmt.Errorf("level %d has %d entries", + level, numEntries) + } + } else if numEntries != maxEntries && numEntries != maxEntries/2 { + return fmt.Errorf("level %d has %d entries", level, + numEntries) + } + maxEntries *= 2 + } + if totalEntries != expectedTotal { + return fmt.Errorf("expected %d entries - got %d", expectedTotal, + totalEntries) + } + + // Ensure all of the numbers are in order starting from the highest + // level moving to the lowest level. + expectedNum := uint32(0) + for level := highestLevel + 1; level > 0; level-- { + data := b.levels[keyForLevel(addrKey, level)] + numEntries := len(data) / txEntrySize + for i := 0; i < numEntries; i++ { + start := i * txEntrySize + num := byteOrder.Uint32(data[start:]) + if num != expectedNum { + return fmt.Errorf("level %d offset %d does "+ + "not contain the expected number of "+ + "%d - got %d", level, i, num, + expectedNum) + } + expectedNum++ + } + } + + return nil +} + +// TestAddrIndexLevels ensures that adding and deleting entries to the address +// index creates multiple levels as decribed by the address index documentation. +func TestAddrIndexLevels(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + key [addrKeySize]byte + numInsert int + printLevels bool // Set to help debug a specific test. + }{ + { + name: "level 0 not full", + numInsert: level0MaxEntries - 1, + }, + { + name: "level 1 half", + numInsert: level0MaxEntries + 1, + }, + { + name: "level 1 full", + numInsert: level0MaxEntries*2 + 1, + }, + { + name: "level 2 half, level 1 half", + numInsert: level0MaxEntries*3 + 1, + }, + { + name: "level 2 half, level 1 full", + numInsert: level0MaxEntries*4 + 1, + }, + { + name: "level 2 full, level 1 half", + numInsert: level0MaxEntries*5 + 1, + }, + { + name: "level 2 full, level 1 full", + numInsert: level0MaxEntries*6 + 1, + }, + { + name: "level 3 half, level 2 half, level 1 half", + numInsert: level0MaxEntries*7 + 1, + }, + { + name: "level 3 full, level 2 half, level 1 full", + numInsert: level0MaxEntries*12 + 1, + }, + } + +nextTest: + for testNum, test := range tests { + // Insert entries in order. + populatedBucket := &addrIndexBucket{ + levels: make(map[[levelKeySize]byte][]byte), + } + for i := 0; i < test.numInsert; i++ { + txLoc := wire.TxLoc{TxStart: i * 2} + err := dbPutAddrIndexEntry(populatedBucket, test.key, + uint32(i), txLoc) + if err != nil { + t.Errorf("dbPutAddrIndexEntry #%d (%s) - "+ + "unexpected error: %v", testNum, + test.name, err) + continue nextTest + } + } + if test.printLevels { + t.Log(populatedBucket.printLevels(test.key)) + } + + // Delete entries from the populated bucket until all entries + // have been deleted. The bucket is reset to the fully + // populated bucket on each iteration so every combination is + // tested. Notice the upper limit purposes exceeds the number + // of entries to ensure attempting to delete more entries than + // there are works correctly. + for numDelete := 0; numDelete <= test.numInsert+1; numDelete++ { + // Clone populated bucket to run each delete against. + bucket := populatedBucket.Clone() + + // Remove the number of entries for this iteration. + err := dbRemoveAddrIndexEntries(bucket, test.key, + numDelete) + if err != nil { + if numDelete <= test.numInsert { + t.Errorf("dbRemoveAddrIndexEntries (%s) "+ + " delete %d - unexpected error: "+ + "%v", test.name, numDelete, err) + continue nextTest + } + } + if test.printLevels { + t.Log(bucket.printLevels(test.key)) + } + + // Sanity check the levels to ensure the adhere to all + // rules. + numExpected := test.numInsert + if numDelete <= test.numInsert { + numExpected -= numDelete + } + err = bucket.sanityCheck(test.key, numExpected) + if err != nil { + t.Errorf("sanity check fail (%s) delete %d: %v", + test.name, numDelete, err) + continue nextTest + } + } + } +} diff --git a/blockchain/indexers/blocklogger.go b/blockchain/indexers/blocklogger.go new file mode 100644 index 00000000..88d6f269 --- /dev/null +++ b/blockchain/indexers/blocklogger.go @@ -0,0 +1,76 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "sync" + "time" + + "github.com/btcsuite/btclog" + "github.com/btcsuite/btcutil" +) + +// blockProgressLogger provides periodic logging for other services in order +// to show users progress of certain "actions" involving some or all current +// blocks. Ex: syncing to best chain, indexing all blocks, etc. +type blockProgressLogger struct { + receivedLogBlocks int64 + receivedLogTx int64 + lastBlockLogTime time.Time + + subsystemLogger btclog.Logger + progressAction string + sync.Mutex +} + +// newBlockProgressLogger returns a new block progress logger. +// The progress message is templated as follows: +// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} +// ({numTxs}, height {lastBlockHeight}, {lastBlockTimeStamp}) +func newBlockProgressLogger(progressMessage string, logger btclog.Logger) *blockProgressLogger { + return &blockProgressLogger{ + lastBlockLogTime: time.Now(), + progressAction: progressMessage, + subsystemLogger: logger, + } +} + +// LogBlockHeight logs a new block height as an information message to show +// progress to the user. In order to prevent spam, it limits logging to one +// message every 10 seconds with duration and totals included. +func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) { + b.Lock() + defer b.Unlock() + + b.receivedLogBlocks++ + b.receivedLogTx += int64(len(block.MsgBlock().Transactions)) + + now := time.Now() + duration := now.Sub(b.lastBlockLogTime) + if duration < time.Second*10 { + return + } + + // Truncate the duration to 10s of milliseconds. + durationMillis := int64(duration / time.Millisecond) + tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) + + // Log information about new block height. + blockStr := "blocks" + if b.receivedLogBlocks == 1 { + blockStr = "block" + } + txStr := "transactions" + if b.receivedLogTx == 1 { + txStr = "transaction" + } + b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s)", + b.progressAction, b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx, + txStr, block.Height(), block.MsgBlock().Header.Timestamp) + + b.receivedLogBlocks = 0 + b.receivedLogTx = 0 + b.lastBlockLogTime = now +} diff --git a/blockchain/indexers/common.go b/blockchain/indexers/common.go new file mode 100644 index 00000000..e18fdcdd --- /dev/null +++ b/blockchain/indexers/common.go @@ -0,0 +1,90 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +/* +Package indexers implements optional block chain indexes. +*/ +package indexers + +import ( + "encoding/binary" + + "github.com/btcsuite/btcd/blockchain" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcutil" +) + +var ( + // byteOrder is the preferred byte order used for serializing numeric + // fields for storage in the database. + byteOrder = binary.LittleEndian +) + +// NeedsInputser provides a generic interface for an indexer to specify the it +// requires the ability to look up inputs for a transaction. +type NeedsInputser interface { + NeedsInputs() bool +} + +// Indexer provides a generic interface for an indexer that is managed by an +// index manager such as the Manager type provided by this package. +type Indexer interface { + // Key returns the key of the index as a byte slice. + Key() []byte + + // Name returns the human-readable name of the index. + Name() string + + // Create is invoked when the indexer manager determines the index needs + // to be created for the first time. + Create(dbTx database.Tx) error + + // Init is invoked when the index manager is first initializing the + // index. This differs from the Create method in that it is called on + // every load, including the case the index was just created. + Init() error + + // ConnectBlock is invoked when the index manager is notified that a new + // block has been connected to the main chain. + ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error + + // DisconnectBlock is invoked when the index manager is notified that a + // block has been disconnected from the main chain. + DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error +} + +// AssertError identifies an error that indicates an internal code consistency +// issue and should be treated as a critical and unrecoverable error. +type AssertError string + +// Error returns the assertion error as a huma-readable string and satisfies +// the error interface. +func (e AssertError) Error() string { + return "assertion failed: " + string(e) +} + +// errDeserialize signifies that a problem was encountered when deserializing +// data. +type errDeserialize string + +// Error implements the error interface. +func (e errDeserialize) Error() string { + return string(e) +} + +// isDeserializeErr returns whether or not the passed error is an errDeserialize +// error. +func isDeserializeErr(err error) bool { + _, ok := err.(errDeserialize) + return ok +} + +// internalBucket is an abstraction over a database bucket. It is used to make +// the code easier to test since it allows mock objects in the tests to only +// implement these functions instead of everything a database.Bucket supports. +type internalBucket interface { + Get(key []byte) []byte + Put(key []byte, value []byte) error + Delete(key []byte) error +} diff --git a/blockchain/indexers/log.go b/blockchain/indexers/log.go new file mode 100644 index 00000000..0172da07 --- /dev/null +++ b/blockchain/indexers/log.go @@ -0,0 +1,30 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import "github.com/btcsuite/btclog" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/blockchain/indexers/manager.go b/blockchain/indexers/manager.go new file mode 100644 index 00000000..928b1f36 --- /dev/null +++ b/blockchain/indexers/manager.go @@ -0,0 +1,653 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "bytes" + "fmt" + + "github.com/btcsuite/btcd/blockchain" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +var ( + // indexTipsBucketName is the name of the db bucket used to house the + // current tip of each index. + indexTipsBucketName = []byte("idxtips") +) + +// ----------------------------------------------------------------------------- +// The index manager tracks the current tip of each index by using a parent +// bucket that contains an entry for index. +// +// The serialized format for an index tip is: +// +// [],... +// +// Field Type Size +// block hash wire.ShaHash wire.HashSize +// block height uint32 4 bytes +// ----------------------------------------------------------------------------- + +// dbPutIndexerTip uses an existing database transaction to update or add the +// current tip for the given index to the provided values. +func dbPutIndexerTip(dbTx database.Tx, idxKey []byte, hash *wire.ShaHash, height int32) error { + serialized := make([]byte, wire.HashSize+4) + copy(serialized, hash[:]) + byteOrder.PutUint32(serialized[wire.HashSize:], uint32(height)) + + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + return indexesBucket.Put(idxKey, serialized) +} + +// dbFetchIndexerTip uses an existing database transaction to retrieve the +// hash and height of the current tip for the provided index. +func dbFetchIndexerTip(dbTx database.Tx, idxKey []byte) (*wire.ShaHash, int32, error) { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + serialized := indexesBucket.Get(idxKey) + if len(serialized) < wire.HashSize+4 { + return nil, 0, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("unexpected end of data for "+ + "index %q tip", string(idxKey)), + } + } + + var hash wire.ShaHash + copy(hash[:], serialized[:wire.HashSize]) + height := int32(byteOrder.Uint32(serialized[wire.HashSize:])) + return &hash, height, nil +} + +// dbIndexConnectBlock adds all of the index entries associated with the +// given block using the provided indexer and updates the tip of the indexer +// accordingly. An error will be returned if the current tip for the indexer is +// not the previous block for the passed block. +func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Assert that the block being connected properly connects to the + // current tip of the index. + idxKey := indexer.Key() + curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + if !curTipHash.IsEqual(&block.MsgBlock().Header.PrevBlock) { + return AssertError(fmt.Sprintf("dbIndexConnectBlock must be "+ + "called with a block that extends the current index "+ + "tip (%s, tip %s, block %s)", indexer.Name(), + curTipHash, block.Sha())) + } + + // Notify the indexer with the connected block so it can index it. + if err := indexer.ConnectBlock(dbTx, block, view); err != nil { + return err + } + + // Update the current index tip. + return dbPutIndexerTip(dbTx, idxKey, block.Sha(), block.Height()) +} + +// dbIndexDisconnectBlock removes all of the index entries associated with the +// given block using the provided indexer and updates the tip of the indexer +// accordingly. An error will be returned if the current tip for the indexer is +// not the passed block. +func dbIndexDisconnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Assert that the block being disconnected is the current tip of the + // index. + idxKey := indexer.Key() + curTipHash, _, err := dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + if !curTipHash.IsEqual(block.Sha()) { + return AssertError(fmt.Sprintf("dbIndexDisconnectBlock must "+ + "be called with the block at the current index tip "+ + "(%s, tip %s, block %s)", indexer.Name(), + curTipHash, block.Sha())) + } + + // Notify the indexer with the disconnected block so it can remove all + // of the appropriate entries. + if err := indexer.DisconnectBlock(dbTx, block, view); err != nil { + return err + } + + // Update the current index tip. + prevHash := &block.MsgBlock().Header.PrevBlock + return dbPutIndexerTip(dbTx, idxKey, prevHash, block.Height()-1) +} + +// Manager defines an index manager that manages multiple optional indexes and +// implements the blockchain.IndexManager interface so it can be seamlessly +// plugged into normal chain processing. +type Manager struct { + db database.DB + enabledIndexes []Indexer +} + +// Ensure the Manager type implements the blockchain.IndexManager interface. +var _ blockchain.IndexManager = (*Manager)(nil) + +// indexDropKey returns the key for an index which indicates it is in the +// process of being dropped. +func indexDropKey(idxKey []byte) []byte { + dropKey := make([]byte, len(idxKey)+1) + dropKey[0] = 'd' + copy(dropKey[1:], idxKey) + return dropKey +} + +// maybeFinishDrops determines if each of the enabled indexes are in the middle +// of being dropped and finishes dropping them when the are. This is necessary +// because dropping and index has to be done in several atomic steps rather than +// one big atomic step due to the massive number of entries. +func (m *Manager) maybeFinishDrops() error { + indexNeedsDrop := make([]bool, len(m.enabledIndexes)) + err := m.db.View(func(dbTx database.Tx) error { + // None of the indexes needs to be dropped if the index tips + // bucket hasn't been created yet. + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + if indexesBucket == nil { + return nil + } + + // Make the indexer as requiring a drop if one is already in + // progress. + for i, indexer := range m.enabledIndexes { + dropKey := indexDropKey(indexer.Key()) + if indexesBucket.Get(dropKey) != nil { + indexNeedsDrop[i] = true + } + } + + return nil + }) + if err != nil { + return err + } + + // Finish dropping any of the enabled indexes that are already in the + // middle of being dropped. + for i, indexer := range m.enabledIndexes { + if !indexNeedsDrop[i] { + continue + } + + log.Infof("Resuming %s drop", indexer.Name()) + err := dropIndex(m.db, indexer.Key(), indexer.Name()) + if err != nil { + return err + } + } + + return nil +} + +// maybeCreateIndexes determines if each of the enabled indexes have already +// been created and creates them if not. +func (m *Manager) maybeCreateIndexes(dbTx database.Tx) error { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + for _, indexer := range m.enabledIndexes { + // Nothing to do if the index tip already exists. + idxKey := indexer.Key() + if indexesBucket.Get(idxKey) != nil { + continue + } + + // The tip for the index does not exist, so create it and + // invoke the create callback for the index so it can perform + // any one-time initialization it requires. + if err := indexer.Create(dbTx); err != nil { + return err + } + + // Set the tip for the index to values which represent an + // uninitialized index. + err := dbPutIndexerTip(dbTx, idxKey, &wire.ShaHash{}, -1) + if err != nil { + return err + } + } + + return nil +} + +// Init initializes the enabled indexes. This is called during chain +// initialization and primarily consists of catching up all indexes to the +// current best chain tip. This is necessary since each index can be disabled +// and re-enabled at any time and attempting to catch-up indexes at the same +// time new blocks are being downloaded would lead to an overall longer time to +// catch up due to the I/O contention. +// +// This is part of the blockchain.IndexManager interface. +func (m *Manager) Init(chain *blockchain.BlockChain) error { + // Nothing to do when no indexes are enabled. + if len(m.enabledIndexes) == 0 { + return nil + } + + // Finish and drops that were previously interrupted. + if err := m.maybeFinishDrops(); err != nil { + return err + } + + // Create the initial state for the indexes as needed. + err := m.db.Update(func(dbTx database.Tx) error { + // Create the bucket for the current tips as needed. + meta := dbTx.Metadata() + _, err := meta.CreateBucketIfNotExists(indexTipsBucketName) + if err != nil { + return err + } + + return m.maybeCreateIndexes(dbTx) + }) + if err != nil { + return err + } + + // Initialize each of the enabled indexes. + for _, indexer := range m.enabledIndexes { + if err := indexer.Init(); err != nil { + return err + } + } + + // Rollback indexes to the main chain if their tip is an orphaned fork. + // This is fairly unlikely, but it can happen if the chain is + // reorganized while the index is disabled. This has to be done in + // reverse order because later indexes can depend on earlier ones. + for i := len(m.enabledIndexes); i > 0; i-- { + indexer := m.enabledIndexes[i-1] + + // Fetch the current tip for the index. + var height int32 + var hash *wire.ShaHash + err := m.db.View(func(dbTx database.Tx) error { + idxKey := indexer.Key() + hash, height, err = dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + // Nothing to do if the index does not have any entries yet. + if height == -1 { + continue + } + + // Loop until the tip is a block that exists in the main chain. + initialHeight := height + for { + exists, err := chain.MainChainHasBlock(hash) + if err != nil { + return err + } + if exists { + break + } + + // At this point the index tip is orphaned, so load the + // orphaned block from the database directly and + // disconnect it from the index. The block has to be + // loaded directly since it is no longer in the main + // chain and thus the chain.BlockByHash function would + // error. + err = m.db.Update(func(dbTx database.Tx) error { + blockBytes, err := dbTx.FetchBlock(hash) + if err != nil { + return err + } + block, err := btcutil.NewBlockFromBytes(blockBytes) + if err != nil { + return err + } + block.SetHeight(height) + + // When the index requires all of the referenced + // txouts they need to be retrieved from the + // transaction index. + var view *blockchain.UtxoViewpoint + if indexNeedsInputs(indexer) { + var err error + view, err = makeUtxoView(dbTx, block) + if err != nil { + return err + } + } + + // Remove all of the index entries associated + // with the block and update the indexer tip. + err = dbIndexDisconnectBlock(dbTx, indexer, + block, view) + if err != nil { + return err + } + + // Update the tip to the previous block. + hash = &block.MsgBlock().Header.PrevBlock + height-- + + return nil + }) + if err != nil { + return err + } + } + + if initialHeight != height { + log.Infof("Removed %d orphaned blocks from %s "+ + "(heights %d to %d)", initialHeight-height, + indexer.Name(), height+1, initialHeight) + } + } + + // Fetch the current tip heights for each index along with tracking the + // lowest one so the catchup code only needs to start at the earliest + // block and is able to skip connecting the block for the indexes that + // don't need it. + bestHeight := chain.BestSnapshot().Height + lowestHeight := bestHeight + indexerHeights := make([]int32, len(m.enabledIndexes)) + err = m.db.View(func(dbTx database.Tx) error { + for i, indexer := range m.enabledIndexes { + idxKey := indexer.Key() + hash, height, err := dbFetchIndexerTip(dbTx, idxKey) + if err != nil { + return err + } + + log.Debugf("Current %s tip (height %d, hash %v)", + indexer.Name(), height, hash) + indexerHeights[i] = height + if height < lowestHeight { + lowestHeight = height + } + } + return nil + }) + if err != nil { + return err + } + + // Nothing to index if all of the indexes are caught up. + if lowestHeight == bestHeight { + return nil + } + + // Create a progress logger for the indexing process below. + progressLogger := newBlockProgressLogger("Indexed", log) + + // At this point, one or more indexes are behind the current best chain + // tip and need to be caught up, so log the details and loop through + // each block that needs to be indexed. + log.Infof("Catching up indexes from height %d to %d", lowestHeight, + bestHeight) + for height := lowestHeight + 1; height <= bestHeight; height++ { + // Load the block for the height since it is required to index + // it. + block, err := chain.BlockByHeight(height) + if err != nil { + return err + } + + // Connect the block for all indexes that need it. + var view *blockchain.UtxoViewpoint + for i, indexer := range m.enabledIndexes { + // Skip indexes that don't need to be updated with this + // block. + if indexerHeights[i] >= height { + continue + } + + err := m.db.Update(func(dbTx database.Tx) error { + // When the index requires all of the referenced + // txouts and they haven't been loaded yet, they + // need to be retrieved from the transaction + // index. + if view == nil && indexNeedsInputs(indexer) { + var err error + view, err = makeUtxoView(dbTx, block) + if err != nil { + return err + } + } + return dbIndexConnectBlock(dbTx, indexer, block, + view) + }) + if err != nil { + return err + } + indexerHeights[i] = height + } + + // Log indexing progress. + progressLogger.LogBlockHeight(block) + } + + log.Infof("Indexes caught up to height %d", bestHeight) + return nil +} + +// indexNeedsInputs returns whether or not the index needs access to the txouts +// referenced by the transaction inputs being indexed. +func indexNeedsInputs(index Indexer) bool { + if idx, ok := index.(NeedsInputser); ok { + return idx.NeedsInputs() + } + + return false +} + +// dbFetchTx looks up the passed transaction hash in the transaction index and +// loads it from the database. +func dbFetchTx(dbTx database.Tx, hash *wire.ShaHash) (*wire.MsgTx, error) { + // Look up the location of the transaction. + blockRegion, err := dbFetchTxIndexEntry(dbTx, hash) + if err != nil { + return nil, err + } + if blockRegion == nil { + return nil, fmt.Errorf("transaction %v not found", hash) + } + + // Load the raw transaction bytes from the database. + txBytes, err := dbTx.FetchBlockRegion(blockRegion) + if err != nil { + return nil, err + } + + // Deserialize the transaction. + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(txBytes)) + if err != nil { + return nil, err + } + + return &msgTx, nil +} + +// makeUtxoView creates a mock unspent transaction output view by using the +// transaction index in order to look up all inputs referenced by the +// transactions in the block. This is sometimes needed when catching indexes up +// because many of the txouts could actually already be spent however the +// associated scripts are still required to index them. +func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewpoint, error) { + view := blockchain.NewUtxoViewpoint() + for txIdx, tx := range block.Transactions() { + // Coinbases do not reference any inputs. Since the block is + // required to have already gone through full validation, it has + // already been proven on the first transaction in the block is + // a coinbase. + if txIdx == 0 { + continue + } + + // Use the transaction index to load all of the referenced + // inputs and add their outputs to the view. + for _, txIn := range tx.MsgTx().TxIn { + originOut := &txIn.PreviousOutPoint + originTx, err := dbFetchTx(dbTx, &originOut.Hash) + if err != nil { + return nil, err + } + + view.AddTxOuts(btcutil.NewTx(originTx), 0) + } + } + + return view, nil +} + +// ConnectBlock must be invoked when a block is extending the main chain. It +// keeps track of the state of each index it is managing, performs some sanity +// checks, and invokes each indexer. +// +// This is part of the blockchain.IndexManager interface. +func (m *Manager) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Call each of the currently active optional indexes with the block + // being connected so they can update accordingly. + for _, index := range m.enabledIndexes { + err := dbIndexConnectBlock(dbTx, index, block, view) + if err != nil { + return err + } + } + return nil +} + +// DisconnectBlock must be invoked when a block is being disconnected from the +// end of the main chain. It keeps track of the state of each index it is +// managing, performs some sanity checks, and invokes each indexer to remove +// the index entries associated with the block. +// +// This is part of the blockchain.IndexManager interface. +func (m *Manager) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Call each of the currently active optional indexes with the block + // being disconnected so they can update accordingly. + for _, index := range m.enabledIndexes { + err := dbIndexDisconnectBlock(dbTx, index, block, view) + if err != nil { + return err + } + } + return nil +} + +// NewManager returns a new index manager with the provided indexes enabled. +// +// The manager returned satisfies the blockchain.IndexManager interface and thus +// cleanly plugs into the normal blockchain processing path. +func NewManager(db database.DB, enabledIndexes []Indexer) *Manager { + return &Manager{ + db: db, + enabledIndexes: enabledIndexes, + } +} + +// dropIndex drops the passed index from the database. Since indexes can be +// massive, it deletes the index in multiple database transactions in order to +// keep memory usage to reasonable levels. It also marks the drop in progress +// so the drop can be resumed if it is stopped before it is done before the +// index can be used again. +func dropIndex(db database.DB, idxKey []byte, idxName string) error { + // Nothing to do if the index doesn't already exist. + var needsDelete bool + err := db.View(func(dbTx database.Tx) error { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + if indexesBucket != nil && indexesBucket.Get(idxKey) != nil { + needsDelete = true + } + return nil + }) + if err != nil { + return err + } + if !needsDelete { + log.Infof("Not dropping %s because it does not exist", idxName) + return nil + } + + // Mark that the index is in the process of being dropped so that it + // can be resumed on the next start if interrupted before the process is + // complete. + log.Infof("Dropping all %s entries. This might take a while...", + idxName) + err = db.Update(func(dbTx database.Tx) error { + indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName) + return indexesBucket.Put(indexDropKey(idxKey), idxKey) + }) + if err != nil { + return err + } + + // Since the indexes can be so large, attempting to simply delete + // the bucket in a single database transaction would result in massive + // memory usage and likely crash many systems due to ulimits. In order + // to avoid this, use a cursor to delete a maximum number of entries out + // of the bucket at a time. + const maxDeletions = 2000000 + var totalDeleted uint64 + for numDeleted := maxDeletions; numDeleted == maxDeletions; { + numDeleted = 0 + err := db.Update(func(dbTx database.Tx) error { + bucket := dbTx.Metadata().Bucket(idxKey) + cursor := bucket.Cursor() + for ok := cursor.First(); ok; ok = cursor.Next() && + numDeleted < maxDeletions { + + if err := cursor.Delete(); err != nil { + return err + } + numDeleted++ + } + return nil + }) + if err != nil { + return err + } + + if numDeleted > 0 { + totalDeleted += uint64(numDeleted) + log.Infof("Deleted %d keys (%d total) from %s", + numDeleted, totalDeleted, idxName) + } + } + + // Call extra index specific deinitialization for the transaction index. + if idxName == txIndexName { + if err := dropBlockIDIndex(db); err != nil { + return err + } + } + + // Remove the index tip, index bucket, and in-progress drop flag now + // that all index entries have been removed. + err = db.Update(func(dbTx database.Tx) error { + meta := dbTx.Metadata() + indexesBucket := meta.Bucket(indexTipsBucketName) + if err := indexesBucket.Delete(idxKey); err != nil { + return err + } + + if err := meta.DeleteBucket(idxKey); err != nil { + return err + } + + return indexesBucket.Delete(indexDropKey(idxKey)) + }) + if err != nil { + return err + } + + log.Infof("Dropped %s", idxName) + return nil +} diff --git a/blockchain/indexers/txindex.go b/blockchain/indexers/txindex.go new file mode 100644 index 00000000..0c6b207d --- /dev/null +++ b/blockchain/indexers/txindex.go @@ -0,0 +1,477 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package indexers + +import ( + "errors" + "fmt" + + "github.com/btcsuite/btcd/blockchain" + database "github.com/btcsuite/btcd/database2" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +const ( + // txIndexName is the human-readable name for the index. + txIndexName = "transaction index" +) + +var ( + // txIndexKey is the key of the transaction index and the db bucket used + // to house it. + txIndexKey = []byte("txbyhashidx") + + // idByHashIndexBucketName is the name of the db bucket used to house + // the block id -> block hash index. + idByHashIndexBucketName = []byte("idbyhashidx") + + // hashByIDIndexBucketName is the name of the db bucket used to house + // the block hash -> block id index. + hashByIDIndexBucketName = []byte("hashbyididx") + + // errNoBlockIDEntry is an error that indicates a requested entry does + // not exist in the block ID index. + errNoBlockIDEntry = errors.New("no entry in the block ID index") +) + +// ----------------------------------------------------------------------------- +// The transaction index consists of an entry for every transaction in the main +// chain. In order to significanly optimize the space requirements a separate +// index which provides an internal mapping between each block that has been +// indexed and a unique ID for use within the hash to location mappings. The ID +// is simply a sequentially incremented uint32. This is useful because it is +// only 4 bytes versus 32 bytes hashes and thus saves a ton of space in the +// index. +// +// There are three buckets used in total. The first bucket maps the hash of +// each transaction to the specific block location. The second bucket maps the +// hash of each block to the unique ID and the third maps that ID back to the +// block hash. +// +// NOTE: Although it is technically possible for multiple transactions to have +// the same hash as long as the previous transaction with the same hash is fully +// spent, this code only stores the most recent one because doing otherwise +// would add a non-trivial amount of space and overhead for something that will +// realistically never happen per the probability and even if it did, the old +// one must be fully spent and so the most likely transaction a caller would +// want for a given hash is the most recent one anyways. +// +// The serialized format for keys and values in the block hash to ID bucket is: +// = +// +// Field Type Size +// hash wire.ShaHash 32 bytes +// ID uint32 4 bytes +// ----- +// Total: 36 bytes +// +// The serialized format for keys and values in the ID to block hash bucket is: +// = +// +// Field Type Size +// ID uint32 4 bytes +// hash wire.ShaHash 32 bytes +// ----- +// Total: 36 bytes +// +// The serialized format for the keys and values in the tx index bucket is: +// +// = +// +// Field Type Size +// txhash wire.ShaHash 32 bytes +// block id uint32 4 bytes +// start offset uint32 4 bytes +// tx length uint32 4 bytes +// ----- +// Total: 44 bytes +// ----------------------------------------------------------------------------- + +// dbPutBlockIDIndexEntry uses an existing database transaction to update or add +// the index entries for the hash to id and id to hash mappings for the provided +// values. +func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *wire.ShaHash, id uint32) error { + // Serialize the height for use in the index entries. + var serializedID [4]byte + byteOrder.PutUint32(serializedID[:], id) + + // Add the block hash to ID mapping to the index. + meta := dbTx.Metadata() + hashIndex := meta.Bucket(idByHashIndexBucketName) + if err := hashIndex.Put(hash[:], serializedID[:]); err != nil { + return err + } + + // Add the block ID to hash mapping to the index. + idIndex := meta.Bucket(hashByIDIndexBucketName) + return idIndex.Put(serializedID[:], hash[:]) +} + +// dbRemoveBlockIDIndexEntry uses an existing database transaction remove index +// entries from the hash to id and id to hash mappings for the provided hash. +func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *wire.ShaHash) error { + // Remove the block hash to ID mapping. + meta := dbTx.Metadata() + hashIndex := meta.Bucket(idByHashIndexBucketName) + serializedID := hashIndex.Get(hash[:]) + if serializedID == nil { + return nil + } + if err := hashIndex.Delete(hash[:]); err != nil { + return err + } + + // Remove the block ID to hash mapping. + idIndex := meta.Bucket(hashByIDIndexBucketName) + return idIndex.Delete(serializedID) +} + +// dbFetchBlockIDByHash uses an existing database transaction to retrieve the +// block id for the provided hash from the index. +func dbFetchBlockIDByHash(dbTx database.Tx, hash *wire.ShaHash) (uint32, error) { + hashIndex := dbTx.Metadata().Bucket(idByHashIndexBucketName) + serializedID := hashIndex.Get(hash[:]) + if serializedID == nil { + return 0, errNoBlockIDEntry + } + + return byteOrder.Uint32(serializedID), nil +} + +// dbFetchBlockHashBySerializedID uses an existing database transaction to +// retrieve the hash for the provided serialized block id from the index. +func dbFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (*wire.ShaHash, error) { + idIndex := dbTx.Metadata().Bucket(hashByIDIndexBucketName) + hashBytes := idIndex.Get(serializedID) + if hashBytes == nil { + return nil, errNoBlockIDEntry + } + + var hash wire.ShaHash + copy(hash[:], hashBytes) + return &hash, nil +} + +// dbFetchBlockHashByID uses an existing database transaction to retrieve the +// hash for the provided block id from the index. +func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (*wire.ShaHash, error) { + var serializedID [4]byte + byteOrder.PutUint32(serializedID[:], id) + return dbFetchBlockHashBySerializedID(dbTx, serializedID[:]) +} + +// putTxIndexEntry serializes the provided values according to the format +// described about for a transaction index entry. The target byte slice must +// be at least large enough to handle the number of bytes defined by the +// txEntrySize constant or it will panic. +func putTxIndexEntry(target []byte, blockID uint32, txLoc wire.TxLoc) { + byteOrder.PutUint32(target, blockID) + byteOrder.PutUint32(target[4:], uint32(txLoc.TxStart)) + byteOrder.PutUint32(target[8:], uint32(txLoc.TxLen)) +} + +// dbPutTxIndexEntry uses an existing database transaction to update the +// transaction index given the provided serialized data that is expected to have +// been serialized putTxIndexEntry. +func dbPutTxIndexEntry(dbTx database.Tx, txHash *wire.ShaHash, serializedData []byte) error { + txIndex := dbTx.Metadata().Bucket(txIndexKey) + return txIndex.Put(txHash[:], serializedData) +} + +// dbFetchTxIndexEntry uses an existing database transaction to fetch the block +// region for the provided transaction hash from the transaction index. When +// there is no entry for the provided hash, nil will be returned for the both +// the region and the error. +func dbFetchTxIndexEntry(dbTx database.Tx, txHash *wire.ShaHash) (*database.BlockRegion, error) { + // Load the record from the database and return now if it doesn't exist. + txIndex := dbTx.Metadata().Bucket(txIndexKey) + serializedData := txIndex.Get(txHash[:]) + if len(serializedData) == 0 { + return nil, nil + } + + // Ensure the serialized data has enough bytes to properly deserialize. + if len(serializedData) < 12 { + return nil, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("corrupt transaction index "+ + "entry for %s", txHash), + } + } + + // Load the block hash associated with the block ID. + hash, err := dbFetchBlockHashBySerializedID(dbTx, serializedData[0:4]) + if err != nil { + return nil, database.Error{ + ErrorCode: database.ErrCorruption, + Description: fmt.Sprintf("corrupt transaction index "+ + "entry for %s: %v", txHash, err), + } + } + + // Deserialize the final entry. + region := database.BlockRegion{Hash: &wire.ShaHash{}} + copy(region.Hash[:], hash[:]) + region.Offset = byteOrder.Uint32(serializedData[4:8]) + region.Len = byteOrder.Uint32(serializedData[8:12]) + + return ®ion, nil +} + +// dbAddTxIndexEntries uses an existing database transaction to add a +// transaction index entry for every transaction in the passed block. +func dbAddTxIndexEntries(dbTx database.Tx, block *btcutil.Block, blockID uint32) error { + // The offset and length of the transactions within the serialized + // block. + txLocs, err := block.TxLoc() + if err != nil { + return err + } + + // As an optimization, allocate a single slice big enough to hold all + // of the serialized transaction index entries for the block and + // serialize them directly into the slice. Then, pass the appropriate + // subslice to the database to be written. This approach significantly + // cuts down on the number of required allocations. + offset := 0 + serializedValues := make([]byte, len(block.Transactions())*txEntrySize) + for i, tx := range block.Transactions() { + putTxIndexEntry(serializedValues[offset:], blockID, txLocs[i]) + endOffset := offset + txEntrySize + err := dbPutTxIndexEntry(dbTx, tx.Sha(), + serializedValues[offset:endOffset:endOffset]) + if err != nil { + return err + } + offset += txEntrySize + } + + return nil +} + +// dbRemoveTxIndexEntry uses an existing database transaction to remove the most +// recent transaction index entry for the given hash. +func dbRemoveTxIndexEntry(dbTx database.Tx, txHash *wire.ShaHash) error { + txIndex := dbTx.Metadata().Bucket(txIndexKey) + serializedData := txIndex.Get(txHash[:]) + if len(serializedData) == 0 { + return fmt.Errorf("can't remove non-existent transaction %s "+ + "from the transaction index", txHash) + } + + return txIndex.Delete(txHash[:]) +} + +// dbRemoveTxIndexEntries uses an existing database transaction to remove the +// latest transaction entry for every transaction in the passed block. +func dbRemoveTxIndexEntries(dbTx database.Tx, block *btcutil.Block) error { + for _, tx := range block.Transactions() { + err := dbRemoveTxIndexEntry(dbTx, tx.Sha()) + if err != nil { + return err + } + } + + return nil +} + +// TxIndex implements a transaction by hash index. That is to say, it supports +// querying all transactions by their hash. +type TxIndex struct { + db database.DB + curBlockID uint32 +} + +// Ensure the TxIndex type implements the Indexer interface. +var _ Indexer = (*TxIndex)(nil) + +// Init initializes the hash-based transaction index. In particular, it finds +// the highest used block ID and stores it for later use when connecting or +// disconnecting blocks. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Init() error { + // Find the latest known block id field for the internal block id + // index and initialize it. This is done because it's a lot more + // efficient to do a single search at initialize time than it is to + // write another value to the database on every update. + err := idx.db.View(func(dbTx database.Tx) error { + // Scan forward in large gaps to find a block id that doesn't + // exist yet to serve as an upper bound for the binary search + // below. + var highestKnown, nextUnknown uint32 + testBlockID := uint32(1) + increment := uint32(100000) + for { + _, err := dbFetchBlockHashByID(dbTx, testBlockID) + if err != nil { + nextUnknown = testBlockID + break + } + + highestKnown = testBlockID + testBlockID += increment + } + log.Tracef("Forward scan (highest known %d, next unknown %d)", + highestKnown, nextUnknown) + + // No used block IDs due to new database. + if nextUnknown == 1 { + return nil + } + + // Use a binary search to find the final highest used block id. + // This will take at most ceil(log_2(increment)) attempts. + for { + testBlockID = (highestKnown + nextUnknown) / 2 + _, err := dbFetchBlockHashByID(dbTx, testBlockID) + if err != nil { + nextUnknown = testBlockID + } else { + highestKnown = testBlockID + } + log.Tracef("Binary scan (highest known %d, next "+ + "unknown %d)", highestKnown, nextUnknown) + if highestKnown+1 == nextUnknown { + break + } + } + + idx.curBlockID = highestKnown + return nil + }) + if err != nil { + return err + } + + log.Debugf("Current internal block ID: %d", idx.curBlockID) + return nil +} + +// Key returns the database key to use for the index as a byte slice. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Key() []byte { + return txIndexKey +} + +// Name returns the human-readable name of the index. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Name() string { + return txIndexName +} + +// Create is invoked when the indexer manager determines the index needs +// to be created for the first time. It creates the buckets for the hash-based +// transaction index and the internal block ID indexes. +// +// This is part of the Indexer interface. +func (idx *TxIndex) Create(dbTx database.Tx) error { + meta := dbTx.Metadata() + if _, err := meta.CreateBucket(idByHashIndexBucketName); err != nil { + return err + } + if _, err := meta.CreateBucket(hashByIDIndexBucketName); err != nil { + return err + } + _, err := meta.CreateBucket(txIndexKey) + return err +} + +// ConnectBlock is invoked by the index manager when a new block has been +// connected to the main chain. This indexer adds a hash-to-transaction mapping +// for every transaction in the passed block. +// +// This is part of the Indexer interface. +func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Increment the internal block ID to use for the block being connected + // and add all of the transactions in the block to the index. + newBlockID := idx.curBlockID + 1 + if err := dbAddTxIndexEntries(dbTx, block, newBlockID); err != nil { + return err + } + + // Add the new block ID index entry for the block being connected and + // update the current internal block ID accordingly. + err := dbPutBlockIDIndexEntry(dbTx, block.Sha(), newBlockID) + if err != nil { + return err + } + idx.curBlockID = newBlockID + return nil +} + +// DisconnectBlock is invoked by the index manager when a block has been +// disconnected from the main chain. This indexer removes the +// hash-to-transaction mapping for every transaction in the block. +// +// This is part of the Indexer interface. +func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error { + // Remove all of the transactions in the block from the index. + if err := dbRemoveTxIndexEntries(dbTx, block); err != nil { + return err + } + + // Remove the block ID index entry for the block being disconnected and + // decrement the current internal block ID to account for it. + if err := dbRemoveBlockIDIndexEntry(dbTx, block.Sha()); err != nil { + return err + } + idx.curBlockID-- + return nil +} + +// TxBlockRegion returns the block region for the provided transaction hash +// from the transaction index. The block region can in turn be used to load the +// raw transaction bytes. When there is no entry for the provided hash, nil +// will be returned for the both the entry and the error. +// +// This function is safe for concurrent access. +func (idx *TxIndex) TxBlockRegion(hash *wire.ShaHash) (*database.BlockRegion, error) { + var region *database.BlockRegion + err := idx.db.View(func(dbTx database.Tx) error { + var err error + region, err = dbFetchTxIndexEntry(dbTx, hash) + return err + }) + return region, err +} + +// NewTxIndex returns a new instance of an indexer that is used to create a +// mapping of the hashes of all transactions in the blockchain to the respective +// block, location within the block, and size of the transaction. +// +// It implements the Indexer interface which plugs into the IndexManager that in +// turn is used by the blockchain package. This allows the index to be +// seamlessly maintained along with the chain. +func NewTxIndex(db database.DB) *TxIndex { + return &TxIndex{db: db} +} + +// dropBlockIDIndex drops the internal block id index. +func dropBlockIDIndex(db database.DB) error { + return db.Update(func(dbTx database.Tx) error { + meta := dbTx.Metadata() + err := meta.DeleteBucket(idByHashIndexBucketName) + if err != nil { + return err + } + + return meta.DeleteBucket(hashByIDIndexBucketName) + }) +} + +// DropTxIndex drops the transaction index from the provided database if it +// exists. Since the address index relies on it, the address index will also be +// dropped when it exists. +func DropTxIndex(db database.DB) error { + if err := dropIndex(db, addrIndexKey, addrIndexName); err != nil { + return err + } + + return dropIndex(db, txIndexKey, txIndexName) +} diff --git a/blockmanager.go b/blockmanager.go index 074101a0..c50aeca1 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -576,6 +576,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { bmgrLog.Errorf("Failed to process block %v: %v", blockSha, err) } + if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode == + database.ErrCorruption { + panic(dbErr) + } // Convert the error into an appropriate reject message and // send it. @@ -1172,7 +1176,6 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { case blockchain.NTBlockAccepted: // Don't relay if we are not current. Other peers that are // current should already know about it. - if !b.current() { return } @@ -1379,7 +1382,7 @@ func (b *blockManager) Pause() chan<- struct{} { // newBlockManager returns a new bitcoin block manager. // Use Start to begin processing asynchronous block and inv updates. -func newBlockManager(s *server) (*blockManager, error) { +func newBlockManager(s *server, indexManager blockchain.IndexManager) (*blockManager, error) { bm := blockManager{ server: s, rejectedTxns: make(map[wire.ShaHash]struct{}), @@ -1398,6 +1401,7 @@ func newBlockManager(s *server) (*blockManager, error) { ChainParams: s.chainParams, Notifications: bm.handleNotifyMsg, SigCache: s.sigCache, + IndexManager: indexManager, }) if err != nil { return nil, err diff --git a/btcd.go b/btcd.go index c6da3a56..e1fdf04b 100644 --- a/btcd.go +++ b/btcd.go @@ -13,6 +13,7 @@ import ( "runtime" "runtime/pprof" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/limits" ) @@ -87,6 +88,27 @@ func btcdMain(serverChan chan<- *server) error { db.Close() }) + // Drop indexes and exit if requested. + // + // NOTE: The order is important here because dropping the tx index also + // drops the address index since it relies on it. + if cfg.DropAddrIndex { + if err := indexers.DropAddrIndex(db); err != nil { + btcdLog.Errorf("%v", err) + return err + } + + return nil + } + if cfg.DropTxIndex { + if err := indexers.DropTxIndex(db); err != nil { + btcdLog.Errorf("%v", err) + return err + } + + return nil + } + // Create server and start it. server, err := newServer(cfg.Listeners, db, activeNetParams.Params) if err != nil { diff --git a/config.go b/config.go index 2654542c..e05d90ad 100644 --- a/config.go +++ b/config.go @@ -47,6 +47,8 @@ const ( defaultMaxOrphanTransactions = 1000 defaultMaxOrphanTxSize = 5000 defaultSigCacheMaxSize = 50000 + defaultTxIndex = false + defaultAddrIndex = false ) var ( @@ -128,9 +130,13 @@ type config struct { BlockMaxSize uint32 `long:"blockmaxsize" description:"Maximum block size in bytes to be used when creating a block"` BlockPrioritySize uint32 `long:"blockprioritysize" description:"Size in bytes for high-priority/low-fee transactions when creating a block"` GetWorkKeys []string `long:"getworkkey" description:"DEPRECATED -- Use the --miningaddr option instead"` - NoPeerBloomFilters bool `long:"nopeerbloomfilters" description:"Disable bloom filtering support."` - SigCacheMaxSize uint `long:"sigcachemaxsize" description:"The maximum number of entries in the signature verification cache."` + NoPeerBloomFilters bool `long:"nopeerbloomfilters" description:"Disable bloom filtering support"` + SigCacheMaxSize uint `long:"sigcachemaxsize" description:"The maximum number of entries in the signature verification cache"` BlocksOnly bool `long:"blocksonly" description:"Do not accept transactions from remote peers."` + TxIndex bool `long:"txindex" description:"Maintain a full hash-based transaction index which makes all transactions available via the getrawtransaction RPC"` + DropTxIndex bool `long:"droptxindex" description:"Deletes the hash-based transaction index from the database on start up and then exits."` + AddrIndex bool `long:"addrindex" description:"Maintain a full address-based transaction index which makes the searchrawtransactions RPC available"` + DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."` onionlookup func(string) ([]net.IP, error) lookup func(string) ([]net.IP, error) oniondial func(string, string) (net.Conn, error) @@ -342,6 +348,8 @@ func loadConfig() (*config, []string, error) { MaxOrphanTxs: defaultMaxOrphanTransactions, SigCacheMaxSize: defaultSigCacheMaxSize, Generate: defaultGenerate, + TxIndex: defaultTxIndex, + AddrIndex: defaultAddrIndex, } // Service options which are only added on Windows. @@ -629,6 +637,38 @@ func loadConfig() (*config, []string, error) { cfg.BlockPrioritySize = minUint32(cfg.BlockPrioritySize, cfg.BlockMaxSize) cfg.BlockMinSize = minUint32(cfg.BlockMinSize, cfg.BlockMaxSize) + // --txindex and --droptxindex do not mix. + if cfg.TxIndex && cfg.DropTxIndex { + err := fmt.Errorf("%s: the --txindex and --droptxindex "+ + "options may not be activated at the same time", + funcName) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + + // --addrindex and --dropaddrindex do not mix. + if cfg.AddrIndex && cfg.DropAddrIndex { + err := fmt.Errorf("%s: the --addrindex and --dropaddrindex "+ + "options may not be activated at the same time", + funcName) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + + // --addrindex and --droptxindex do not mix. + if cfg.AddrIndex && cfg.DropTxIndex { + err := fmt.Errorf("%s: the --addrindex and --droptxindex "+ + "options may not be activated at the same time "+ + "because the address index relies on the transaction "+ + "index", + funcName) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + // Check getwork keys are valid and saved parsed versions. cfg.miningAddrs = make([]btcutil.Address, 0, len(cfg.GetWorkKeys)+ len(cfg.MiningAddrs)) diff --git a/database2/ffldb/db.go b/database2/ffldb/db.go index 1b3d59eb..1b79c921 100644 --- a/database2/ffldb/db.go +++ b/database2/ffldb/db.go @@ -1971,18 +1971,16 @@ func (db *db) Close() error { } db.closed = true + // NOTE: Since the above lock waits for all transactions to finish and + // prevents any new ones from being started, it is safe to flush the + // cache and clear all state without the individual locks. + // Close the database cache which will flush any existing entries to // disk and close the underlying leveldb database. Any error is saved // and returned at the end after the remaining cleanup since the // database will be marked closed even if this fails given there is no // good way for the caller to recover from a failure here anyways. - db.writeLock.Lock() closeErr := db.cache.Close() - db.writeLock.Unlock() - - // NOTE: Since the above lock waits for all transactions to finish and - // prevents any new ones from being started, it is safe to clear all - // state without the individual locks. // Close any open flat files that house the blocks. wc := db.store.writeCursor diff --git a/log.go b/log.go index 58279156..febbc340 100644 --- a/log.go +++ b/log.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" database "github.com/btcsuite/btcd/database2" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" @@ -39,6 +40,7 @@ var ( btcdLog = btclog.Disabled chanLog = btclog.Disabled discLog = btclog.Disabled + indxLog = btclog.Disabled minrLog = btclog.Disabled peerLog = btclog.Disabled rpcsLog = btclog.Disabled @@ -56,6 +58,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "BTCD": btcdLog, "CHAN": chanLog, "DISC": discLog, + "INDX": indxLog, "MINR": minrLog, "PEER": peerLog, "RPCS": rpcsLog, @@ -113,6 +116,10 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "DISC": discLog = logger + case "INDX": + indxLog = logger + indexers.UseLogger(logger) + case "MINR": minrLog = logger diff --git a/mempool.go b/mempool.go index 2d5af096..6947424d 100644 --- a/mempool.go +++ b/mempool.go @@ -15,6 +15,7 @@ import ( "time" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/mining" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" @@ -60,6 +61,11 @@ type mempoolConfig struct { // TimeSource defines the timesource to use. TimeSource blockchain.MedianTimeSource + + // AddrIndex defines the optional address index instance to use for + // indexing the unconfirmed transactions in the memory pool. + // This can be nil if the address index is not enabled. + AddrIndex *indexers.AddrIndex } // mempoolPolicy houses the policy (configuration parameters) which is used to @@ -324,16 +330,21 @@ func (mp *txMemPool) removeTransaction(tx *btcutil.Tx, removeRedeemers bool) { } } - // Remove the transaction and mark the referenced outpoints as unspent - // by the pool. + // Remove the transaction if needed. if txDesc, exists := mp.pool[*txHash]; exists { + // Remove unconfirmed address index entries associated with the + // transaction if enabled. + if mp.cfg.AddrIndex != nil { + mp.cfg.AddrIndex.RemoveUnconfirmedTx(txHash) + } + + // Mark the referenced outpoints as unspent by the pool. for _, txIn := range txDesc.Tx.MsgTx().TxIn { delete(mp.outpoints, txIn.PreviousOutPoint) } delete(mp.pool, *txHash) atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) } - } // RemoveTransaction removes the passed transaction from the mempool. When the @@ -354,7 +365,7 @@ func (mp *txMemPool) RemoveTransaction(tx *btcutil.Tx, removeRedeemers bool) { // passed transaction from the memory pool. Removing those transactions then // leads to removing all transactions which rely on them, recursively. This is // necessary when a block is connected to the main chain because the block may -// contain transactions which were previously unknown to the memory pool +// contain transactions which were previously unknown to the memory pool. // // This function is safe for concurrent access. func (mp *txMemPool) RemoveDoubleSpends(tx *btcutil.Tx) { @@ -392,6 +403,12 @@ func (mp *txMemPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcu mp.outpoints[txIn.PreviousOutPoint] = tx } atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) + + // Add unconfirmed address index entries associated with the transaction + // if enabled. + if mp.cfg.AddrIndex != nil { + mp.cfg.AddrIndex.AddUnconfirmedTx(tx, utxoView) + } } // checkPoolDoubleSpend checks whether or not the passed transaction is diff --git a/rpcserver.go b/rpcserver.go index 2e2b11db..b68067c3 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -131,45 +131,46 @@ type commandHandler func(*rpcServer, interface{}, <-chan struct{}) (interface{}, // a dependency loop. var rpcHandlers map[string]commandHandler var rpcHandlersBeforeInit = map[string]commandHandler{ - "addnode": handleAddNode, - "createrawtransaction": handleCreateRawTransaction, - "debuglevel": handleDebugLevel, - "decoderawtransaction": handleDecodeRawTransaction, - "decodescript": handleDecodeScript, - "generate": handleGenerate, - "getaddednodeinfo": handleGetAddedNodeInfo, - "getbestblock": handleGetBestBlock, - "getbestblockhash": handleGetBestBlockHash, - "getblock": handleGetBlock, - "getblockcount": handleGetBlockCount, - "getblockhash": handleGetBlockHash, - "getblockheader": handleGetBlockHeader, - "getblocktemplate": handleGetBlockTemplate, - "getconnectioncount": handleGetConnectionCount, - "getcurrentnet": handleGetCurrentNet, - "getdifficulty": handleGetDifficulty, - "getgenerate": handleGetGenerate, - "gethashespersec": handleGetHashesPerSec, - "getinfo": handleGetInfo, - "getmempoolinfo": handleGetMempoolInfo, - "getmininginfo": handleGetMiningInfo, - "getnettotals": handleGetNetTotals, - "getnetworkhashps": handleGetNetworkHashPS, - "getpeerinfo": handleGetPeerInfo, - "getrawmempool": handleGetRawMempool, - "getrawtransaction": handleGetRawTransaction, - "gettxout": handleGetTxOut, - "getwork": handleGetWork, - "help": handleHelp, - "node": handleNode, - "ping": handlePing, - "sendrawtransaction": handleSendRawTransaction, - "setgenerate": handleSetGenerate, - "stop": handleStop, - "submitblock": handleSubmitBlock, - "validateaddress": handleValidateAddress, - "verifychain": handleVerifyChain, - "verifymessage": handleVerifyMessage, + "addnode": handleAddNode, + "createrawtransaction": handleCreateRawTransaction, + "debuglevel": handleDebugLevel, + "decoderawtransaction": handleDecodeRawTransaction, + "decodescript": handleDecodeScript, + "generate": handleGenerate, + "getaddednodeinfo": handleGetAddedNodeInfo, + "getbestblock": handleGetBestBlock, + "getbestblockhash": handleGetBestBlockHash, + "getblock": handleGetBlock, + "getblockcount": handleGetBlockCount, + "getblockhash": handleGetBlockHash, + "getblockheader": handleGetBlockHeader, + "getblocktemplate": handleGetBlockTemplate, + "getconnectioncount": handleGetConnectionCount, + "getcurrentnet": handleGetCurrentNet, + "getdifficulty": handleGetDifficulty, + "getgenerate": handleGetGenerate, + "gethashespersec": handleGetHashesPerSec, + "getinfo": handleGetInfo, + "getmempoolinfo": handleGetMempoolInfo, + "getmininginfo": handleGetMiningInfo, + "getnettotals": handleGetNetTotals, + "getnetworkhashps": handleGetNetworkHashPS, + "getpeerinfo": handleGetPeerInfo, + "getrawmempool": handleGetRawMempool, + "getrawtransaction": handleGetRawTransaction, + "gettxout": handleGetTxOut, + "getwork": handleGetWork, + "help": handleHelp, + "node": handleNode, + "ping": handlePing, + "searchrawtransactions": handleSearchRawTransactions, + "sendrawtransaction": handleSendRawTransaction, + "setgenerate": handleSetGenerate, + "stop": handleStop, + "submitblock": handleSubmitBlock, + "validateaddress": handleValidateAddress, + "verifychain": handleVerifyChain, + "verifymessage": handleVerifyMessage, } // list of commands that we recognize, but for which btcd has no support because @@ -222,12 +223,11 @@ var rpcAskWallet = map[string]struct{}{ // Commands that are currently unimplemented, but should ultimately be. var rpcUnimplemented = map[string]struct{}{ - "estimatefee": {}, - "estimatepriority": {}, - "getblockchaininfo": {}, - "getchaintips": {}, - "getnetworkinfo": {}, - "searchrawtransactions": {}, + "estimatefee": {}, + "estimatepriority": {}, + "getblockchaininfo": {}, + "getchaintips": {}, + "getnetworkinfo": {}, } // Commands that are available to a limited user @@ -301,6 +301,15 @@ func rpcDecodeHexError(gotHex string) *btcjson.RPCError { gotHex)) } +// rpcNoTxInfoError is a convenience function for returning a nicely formatted +// RPC error which indiactes there is no information available for the provided +// transaction hash. +func rpcNoTxInfoError(txHash *wire.ShaHash) *btcjson.RPCError { + return btcjson.NewRPCError(btcjson.ErrRPCNoTxInfo, + fmt.Sprintf("No information available about transaction %v", + txHash)) +} + // workStateBlockInfo houses information about how to reconstruct a block given // its template and signature script. type workStateBlockInfo struct { @@ -731,8 +740,8 @@ func createTxRawResult(chainParams *chaincfg.Params, mtx *wire.MsgTx, txReply := &btcjson.TxRawResult{ Hex: mtxHex, Txid: txHash, - Vout: createVoutList(mtx, chainParams, nil), Vin: createVinList(mtx), + Vout: createVoutList(mtx, chainParams, nil), Version: mtx.Version, LockTime: mtx.LockTime, } @@ -2282,34 +2291,118 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str verbose = *c.Verbose != 0 } - // Try to fetch the transaction from the memory pool. + // Try to fetch the transaction from the memory pool and if that fails, + // try the block database. + var mtx *wire.MsgTx + var blkHash *wire.ShaHash + var blkHeight int32 tx, err := s.server.txMemPool.FetchTransaction(txHash) if err != nil { - // TODO(davec): Implement optional transaction index. - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCNoTxInfo, - Message: "No information available about transaction", + txIndex := s.server.txIndex + if txIndex == nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCNoTxInfo, + Message: "The transaction index must be " + + "enabled to query the blockchain " + + "(specify --txindex)", + } } - } - // When the verbose flag isn't set, simply return the - // network-serialized transaction as a hex-encoded string. - if !verbose { - // Note that this is intentionally not directly - // returning because the first return value is a - // string and it would result in returning an empty - // string to the client instead of nothing (nil) in the - // case of an error. - mtxHex, err := messageToHex(tx.MsgTx()) + // Look up the location of the transaction. + blockRegion, err := txIndex.TxBlockRegion(txHash) if err != nil { - return nil, err + context := "Failed to retrieve transaction location" + return nil, internalRPCError(err.Error(), context) } - return mtxHex, nil + if blockRegion == nil { + return nil, rpcNoTxInfoError(txHash) + } + + // Load the raw transaction bytes from the database. + var txBytes []byte + err = s.server.db.View(func(dbTx database.Tx) error { + var err error + txBytes, err = dbTx.FetchBlockRegion(blockRegion) + return err + }) + if err != nil { + return nil, rpcNoTxInfoError(txHash) + } + + // When the verbose flag isn't set, simply return the serialized + // transaction as a hex-encoded string. This is done here to + // avoid deserializing it only to reserialize it again later. + if !verbose { + return hex.EncodeToString(txBytes), nil + } + + // Grab the block height. + blkHash = blockRegion.Hash + blkHeight, err = s.chain.BlockHeightByHash(blkHash) + if err != nil { + context := "Failed to retrieve block height" + return nil, internalRPCError(err.Error(), context) + } + + // Deserialize the transaction + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(txBytes)) + if err != nil { + context := "Failed to deserialize transaction" + return nil, internalRPCError(err.Error(), context) + } + mtx = &msgTx + } else { + // When the verbose flag isn't set, simply return the + // network-serialized transaction as a hex-encoded string. + if !verbose { + // Note that this is intentionally not directly + // returning because the first return value is a + // string and it would result in returning an empty + // string to the client instead of nothing (nil) in the + // case of an error. + mtxHex, err := messageToHex(tx.MsgTx()) + if err != nil { + return nil, err + } + return mtxHex, nil + } + + mtx = tx.MsgTx() } // The verbose flag is set, so generate the JSON object and return it. - rawTxn, err := createTxRawResult(s.server.chainParams, tx.MsgTx(), - txHash.String(), nil, "", 0, 0) + var blkHeader *wire.BlockHeader + var blkHashStr string + var chainHeight int32 + if blkHash != nil { + // Load the raw header bytes. + var headerBytes []byte + err := s.server.db.View(func(dbTx database.Tx) error { + var err error + headerBytes, err = dbTx.FetchBlockHeader(blkHash) + return err + }) + if err != nil { + context := "Failed to fetch block header" + return nil, internalRPCError(err.Error(), context) + } + + // Deserialize the header. + var header wire.BlockHeader + err = header.Deserialize(bytes.NewReader(headerBytes)) + if err != nil { + context := "Failed to deserialize block header" + return nil, internalRPCError(err.Error(), context) + } + + blkHeader = &header + blkHashStr = blkHash.String() + chainHeight = s.chain.BestSnapshot().Height + } + + rawTxn, err := createTxRawResult(s.server.chainParams, mtx, + txHash.String(), blkHeader, blkHashStr, blkHeight, chainHeight) if err != nil { return nil, err } @@ -2378,10 +2471,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i if includeMempool && s.server.txMemPool.HaveTransaction(txHash) { tx, err := s.server.txMemPool.FetchTransaction(txHash) if err != nil { - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCNoTxInfo, - Message: "No information available about transaction", - } + return nil, rpcNoTxInfoError(txHash) } mtx := tx.MsgTx() @@ -2410,10 +2500,7 @@ func handleGetTxOut(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (i } else { entry, err := s.chain.FetchUtxoEntry(txHash) if err != nil { - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCNoTxInfo, - Message: "No information available about transaction", - } + return nil, rpcNoTxInfoError(txHash) } // To match the behavior of the reference client, return nil @@ -2835,6 +2922,500 @@ func handlePing(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter return nil, nil } +// retrievedTx represents a transaction that was either loaded from the +// transaction memory pool or from the database. When a transaction is loaded +// from the database, it is loaded with the raw serialized bytes while the +// mempool has the fully deserialized structure. This structure therefore will +// have one of the two fields set depending on where is was retrieved from. +// This is mainly done for efficiency to avoid extra serialization steps when +// possible. +type retrievedTx struct { + txBytes []byte + blkHash *wire.ShaHash // Only set when transaction is in a block. + tx *btcutil.Tx +} + +// fetchInputTxos fetches the outpoints from all transactions referenced by the +// inputs to the passed transaction by checking the transaction mempool first +// then the transaction index for those already mined into blocks. +func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut, error) { + mp := s.server.txMemPool + originOutputs := make(map[wire.OutPoint]wire.TxOut) + for txInIndex, txIn := range tx.TxIn { + // Attempt to fetch and use the referenced transaction from the + // memory pool. + origin := &txIn.PreviousOutPoint + originTx, err := mp.FetchTransaction(&origin.Hash) + if err == nil { + txOuts := originTx.MsgTx().TxOut + if origin.Index >= uint32(len(txOuts)) { + errStr := fmt.Sprintf("unable to find output "+ + "%v referenced from transaction %s:%d", + origin, tx.TxSha(), txInIndex) + return nil, internalRPCError(errStr, "") + } + + originOutputs[*origin] = *txOuts[origin.Index] + continue + } + + // Look up the location of the transaction. + blockRegion, err := s.server.txIndex.TxBlockRegion(&origin.Hash) + if err != nil { + context := "Failed to retrieve transaction location" + return nil, internalRPCError(err.Error(), context) + } + if blockRegion == nil { + return nil, rpcNoTxInfoError(&origin.Hash) + } + + // Load the raw transaction bytes from the database. + var txBytes []byte + err = s.server.db.View(func(dbTx database.Tx) error { + var err error + txBytes, err = dbTx.FetchBlockRegion(blockRegion) + return err + }) + if err != nil { + return nil, rpcNoTxInfoError(&origin.Hash) + } + + // Deserialize the transaction + var msgTx wire.MsgTx + err = msgTx.Deserialize(bytes.NewReader(txBytes)) + if err != nil { + context := "Failed to deserialize transaction" + return nil, internalRPCError(err.Error(), context) + } + + // Add the referenced output to the map. + if origin.Index >= uint32(len(msgTx.TxOut)) { + errStr := fmt.Sprintf("unable to find output %v "+ + "referenced from transaction %s:%d", origin, + tx.TxSha(), txInIndex) + return nil, internalRPCError(errStr, "") + } + originOutputs[*origin] = *msgTx.TxOut[origin.Index] + } + + return originOutputs, nil +} + +// createVinListPrevOut returns a slice of JSON objects for the inputs of the +// passed transaction. +func createVinListPrevOut(s *rpcServer, mtx *wire.MsgTx, chainParams *chaincfg.Params, vinExtra bool, filterAddrMap map[string]struct{}) ([]btcjson.VinPrevOut, error) { + // Coinbase transactions only have a single txin by definition. + if blockchain.IsCoinBaseTx(mtx) { + // Only include the transaction if the filter map is empty + // because a coinbase input has no addresses and so would never + // match a non-empty filter. + if len(filterAddrMap) != 0 { + return nil, nil + } + + txIn := mtx.TxIn[0] + vinList := make([]btcjson.VinPrevOut, 1) + vinList[0].Coinbase = hex.EncodeToString(txIn.SignatureScript) + vinList[0].Sequence = txIn.Sequence + return vinList, nil + } + + // Use a dynamically sized list to accomodate the address filter. + vinList := make([]btcjson.VinPrevOut, 0, len(mtx.TxIn)) + + // Lookup all of the referenced transaction outputs needed to populate + // the previous output information if requested. + var originOutputs map[wire.OutPoint]wire.TxOut + if vinExtra || len(filterAddrMap) > 0 { + var err error + originOutputs, err = fetchInputTxos(s, mtx) + if err != nil { + return nil, err + } + } + + for _, txIn := range mtx.TxIn { + // The disassembled string will contain [error] inline + // if the script doesn't fully parse, so ignore the + // error here. + disbuf, _ := txscript.DisasmString(txIn.SignatureScript) + + // Create the basic input entry without the additional optional + // previous output details which will be added later if + // requested and available. + prevOut := &txIn.PreviousOutPoint + vinEntry := btcjson.VinPrevOut{ + Txid: prevOut.Hash.String(), + Vout: prevOut.Index, + Sequence: txIn.Sequence, + ScriptSig: &btcjson.ScriptSig{ + Asm: disbuf, + Hex: hex.EncodeToString(txIn.SignatureScript), + }, + } + + // Add the entry to the list now if it already passed the filter + // since the previous output might not be available. + passesFilter := len(filterAddrMap) == 0 + if passesFilter { + vinList = append(vinList, vinEntry) + } + + // Only populate previous output information if requested and + // available. + if len(originOutputs) == 0 { + continue + } + originTxOut, ok := originOutputs[*prevOut] + if !ok { + continue + } + + // Ignore the error here since an error means the script + // couldn't parse and there is no additional information about + // it anyways. + _, addrs, _, _ := txscript.ExtractPkScriptAddrs( + originTxOut.PkScript, chainParams) + + // Encode the addresses while checking if the address passes the + // filter when needed. + encodedAddrs := make([]string, len(addrs)) + for j, addr := range addrs { + encodedAddr := addr.EncodeAddress() + encodedAddrs[j] = encodedAddr + + // No need to check the map again if the filter already + // passes. + if passesFilter { + continue + } + if _, exists := filterAddrMap[encodedAddr]; exists { + passesFilter = true + } + } + + // Ignore the entry if it doesn't pass the filter. + if !passesFilter { + continue + } + + // Add entry to the list if it wasn't already done above. + if len(filterAddrMap) != 0 { + vinList = append(vinList, vinEntry) + } + + // Update the entry with previous output information if + // requested. + if vinExtra { + vinListEntry := &vinList[len(vinList)-1] + vinListEntry.PrevOut = &btcjson.PrevOut{ + Addresses: encodedAddrs, + Value: btcutil.Amount(originTxOut.Value).ToBTC(), + } + } + } + + return vinList, nil +} + +// fetchMempoolTxnsForAddress queries the address index for all unconfirmed +// transactions that involve the provided address. The results will be limited +// by the number to skip and the number requested. +func fetchMempoolTxnsForAddress(s *rpcServer, addr btcutil.Address, numToSkip, numRequested uint32) ([]*btcutil.Tx, uint32) { + // There are no entries to return when there are less available than the + // number being skipped. + mpTxns := s.server.addrIndex.UnconfirmedTxnsForAddress(addr) + numAvailable := uint32(len(mpTxns)) + if numToSkip > numAvailable { + return nil, numAvailable + } + + // Filter the available entries based on the number to skip and number + // requested. + rangeEnd := numToSkip + numRequested + if rangeEnd > numAvailable { + rangeEnd = numAvailable + } + return mpTxns[numToSkip:rangeEnd], numToSkip +} + +// handleSearchRawTransactions implements the searchrawtransactions command. +func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { + // Respond with an error if the address index is not enabled. + addrIndex := s.server.addrIndex + if addrIndex == nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCMisc, + Message: "Address index must be enabled (--addrindex)", + } + } + + // Override the flag for including extra previous output information in + // each input if needed. + c := cmd.(*btcjson.SearchRawTransactionsCmd) + vinExtra := false + if c.VinExtra != nil { + vinExtra = *c.VinExtra != 0 + } + + // Including the extra previous output information requires the + // transaction index. Currently the address index relies on the + // transaction index, so this check is redundant, but it's better to be + // safe in case the address index is ever changed to not rely on it. + if vinExtra && s.server.txIndex == nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCMisc, + Message: "Transaction index must be enabled (--txindex)", + } + } + + // Attempt to decode the supplied address. + addr, err := btcutil.DecodeAddress(c.Address, s.server.chainParams) + if err != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidAddressOrKey, + Message: "Invalid address or key: " + err.Error(), + } + } + + // Override the default number of requested entries if needed. Also, + // just return now if the number of requested entries is zero to avoid + // extra work. + numRequested := 100 + if c.Count != nil { + numRequested = *c.Count + if numRequested < 0 { + numRequested = 1 + } + } + if numRequested == 0 { + return nil, nil + } + + // Override the default number of entries to skip if needed. + var numToSkip int + if c.Skip != nil { + numToSkip = *c.Skip + if numToSkip < 0 { + numToSkip = 0 + } + } + + // Override the reverse flag if needed. + var reverse bool + if c.Reverse != nil { + reverse = *c.Reverse + } + + // Add transactions from mempool first if client asked for reverse + // order. Otherwise, they will be added last (as needed depending on + // the requested counts). + // + // NOTE: This code doesn't sort by dependency. This might be something + // to do in the future for the client's convenience, or leave it to the + // client. + numSkipped := uint32(0) + addressTxns := make([]retrievedTx, 0, numRequested) + if reverse { + // Transactions in the mempool are not in a block header yet, + // so the block header field in the retieved transaction struct + // is left nil. + mpTxns, mpSkipped := fetchMempoolTxnsForAddress(s, addr, + uint32(numToSkip), uint32(numRequested)) + numSkipped += mpSkipped + for _, tx := range mpTxns { + addressTxns = append(addressTxns, retrievedTx{tx: tx}) + } + } + + // Fetch transactions from the database in the desired order if more are + // needed. + if len(addressTxns) < numRequested { + err = s.server.db.View(func(dbTx database.Tx) error { + regions, dbSkipped, err := addrIndex.TxRegionsForAddress( + dbTx, addr, uint32(numToSkip)-numSkipped, + uint32(numRequested-len(addressTxns)), reverse) + if err != nil { + return err + } + + // Load the raw transaction bytes from the database. + serializedTxns, err := dbTx.FetchBlockRegions(regions) + if err != nil { + return err + } + + // Add the transaction and the hash of the block it is + // contained in to the list. Note that the transaction + // is left serialized here since the caller might have + // requested non-verbose output and hence there would be + // no point in deserializing it just to reserialize it + // later. + for i, serializedTx := range serializedTxns { + addressTxns = append(addressTxns, retrievedTx{ + txBytes: serializedTx, + blkHash: regions[i].Hash, + }) + } + numSkipped += dbSkipped + + return nil + }) + if err != nil { + context := "Failed to load address index entries" + return nil, internalRPCError(err.Error(), context) + } + + } + + // Add transactions from mempool last if client did not request reverse + // order and the number of results is still under the number requested. + if !reverse && len(addressTxns) < numRequested { + // Transactions in the mempool are not in a block header yet, + // so the block header field in the retieved transaction struct + // is left nil. + mpTxns, mpSkipped := fetchMempoolTxnsForAddress(s, addr, + uint32(numToSkip)-numSkipped, uint32(numRequested- + len(addressTxns))) + numSkipped += mpSkipped + for _, tx := range mpTxns { + addressTxns = append(addressTxns, retrievedTx{tx: tx}) + } + } + + // Address has never been used if neither source yielded any results. + if len(addressTxns) == 0 { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCNoTxInfo, + Message: "No information available about address", + } + } + + // Serialize all of the transactions to hex. + hexTxns := make([]string, len(addressTxns)) + for i := range addressTxns { + // Simply encode the raw bytes to hex when the retrieved + // transaction is already in serialized form. + rtx := &addressTxns[i] + if rtx.txBytes != nil { + hexTxns[i] = hex.EncodeToString(rtx.txBytes) + continue + } + + // Serialize the transaction first and convert to hex when the + // retrieved transaction is the deserialized structure. + hexTxns[i], err = messageToHex(rtx.tx.MsgTx()) + if err != nil { + return nil, err + } + } + + // When not in verbose mode, simply return a list of serialized txns. + if c.Verbose != nil && *c.Verbose == 0 { + return hexTxns, nil + } + + // Normalize the provided filter addresses (if any) to ensure there are + // no duplicates. + filterAddrMap := make(map[string]struct{}) + if c.FilterAddrs != nil && len(*c.FilterAddrs) > 0 { + for _, addr := range *c.FilterAddrs { + filterAddrMap[addr] = struct{}{} + } + } + + // The verbose flag is set, so generate the JSON object and return it. + best := s.chain.BestSnapshot() + chainParams := s.server.chainParams + srtList := make([]btcjson.SearchRawTransactionsResult, len(addressTxns)) + for i := range addressTxns { + // The deserialized transaction is needed, so deserialize the + // retrieved transaction if it's in serialized form (which will + // be the case when it was lookup up from the database). + // Otherwise, use the existing deserialized transaction. + rtx := &addressTxns[i] + var mtx *wire.MsgTx + if rtx.tx == nil { + // Deserialize the transaction. + mtx = new(wire.MsgTx) + err := mtx.Deserialize(bytes.NewReader(rtx.txBytes)) + if err != nil { + context := "Failed to deserialize transaction" + return nil, internalRPCError(err.Error(), + context) + } + } else { + mtx = rtx.tx.MsgTx() + } + + result := &srtList[i] + result.Hex = hexTxns[i] + result.Txid = mtx.TxSha().String() + result.Vin, err = createVinListPrevOut(s, mtx, chainParams, + vinExtra, filterAddrMap) + if err != nil { + return nil, err + } + result.Vout = createVoutList(mtx, chainParams, filterAddrMap) + result.Version = mtx.Version + result.LockTime = mtx.LockTime + + // Transactions grabbed from the mempool aren't yet in a block, + // so conditionally fetch block details here. This will be + // reflected in the final JSON output (mempool won't have + // confirmations or block information). + var blkHeader *wire.BlockHeader + var blkHashStr string + var blkHeight int32 + if blkHash := rtx.blkHash; blkHash != nil { + // Load the raw header bytes from the database. + var headerBytes []byte + err := s.server.db.View(func(dbTx database.Tx) error { + var err error + headerBytes, err = dbTx.FetchBlockHeader(blkHash) + return err + }) + if err != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCBlockNotFound, + Message: "Block not found", + } + } + + // Deserialize the block header. + var header wire.BlockHeader + err = header.Deserialize(bytes.NewReader(headerBytes)) + if err != nil { + context := "Failed to deserialize block header" + return nil, internalRPCError(err.Error(), context) + } + + // Get the block height from chain. + height, err := s.chain.BlockHeightByHash(blkHash) + if err != nil { + context := "Failed to obtain block height" + return nil, internalRPCError(err.Error(), context) + } + + blkHeader = &header + blkHashStr = blkHash.String() + blkHeight = height + } + + // Add the block information to the result if there is any. + if blkHeader != nil { + // This is not a typo, they are identical in Bitcoin + // Core as well. + result.Time = blkHeader.Timestamp.Unix() + result.Blocktime = blkHeader.Timestamp.Unix() + result.BlockHash = blkHashStr + result.Confirmations = uint64(1 + best.Height - blkHeight) + } + } + + return srtList, nil +} + // handleSendRawTransaction implements the sendrawtransaction command. func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { c := cmd.(*btcjson.SendRawTransactionCmd) @@ -3239,8 +3820,7 @@ func (s *rpcServer) decrementClients() { // the second bool return value specifies whether the user can change the state // of the server (true) or whether the user is limited (false). The second is // always false if the first is. -func (s *rpcServer) checkAuth(r *http.Request, require bool) (bool, bool, - error) { +func (s *rpcServer) checkAuth(r *http.Request, require bool) (bool, bool, error) { authhdr := r.Header["Authorization"] if len(authhdr) <= 0 { if require { @@ -3355,8 +3935,7 @@ func createMarshalledReply(id, result interface{}, replyErr error) ([]byte, erro } // jsonRPCRead handles reading and responding to RPC messages. -func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, - isAdmin bool) { +func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin bool) { if atomic.LoadInt32(&s.shutdown) != 0 { return } diff --git a/sample-btcd.conf b/sample-btcd.conf index 61ec0eac..da4e4b8c 100644 --- a/sample-btcd.conf +++ b/sample-btcd.conf @@ -249,6 +249,19 @@ ; dropaddrindex=0 +; ------------------------------------------------------------------------------ +; Optional Indexes +; ------------------------------------------------------------------------------ + +; Build and maintain a full hash-based transaction index which makes all +; transactions available via the getrawtransaction RPC. +; txindex=1 + +; Build and maintain a full address-based transaction index which makes the +; searchrawtransactions RPC available. +; addrindex=1 + + ; ------------------------------------------------------------------------------ ; Signature Verification Cache ; ------------------------------------------------------------------------------ diff --git a/server.go b/server.go index d0238e34..1b2889ed 100644 --- a/server.go +++ b/server.go @@ -22,6 +22,7 @@ import ( "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/chaincfg" database "github.com/btcsuite/btcd/database2" "github.com/btcsuite/btcd/mining" @@ -201,6 +202,13 @@ type server struct { db database.DB timeSource blockchain.MedianTimeSource services wire.ServiceFlag + + // The following fields are used for optional indexes. They will be nil + // if the associated index is not enabled. These fields are set during + // initial creation of the server and never changed afterwards, so they + // do not need to be protected for concurrent access. + txIndex *indexers.TxIndex + addrIndex *indexers.AddrIndex } // serverPeer extends the peer to maintain state shared by the server and @@ -2480,7 +2488,40 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param services: services, sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize), } - bm, err := newBlockManager(&s) + + // Create the transaction and address indexes if needed. + // + // CAUTION: the txindex needs to be first in the indexes array because + // the addrindex uses data from the txindex during catchup. If the + // addrindex is run first, it may not have the transactions from the + // current block indexed. + var indexes []indexers.Indexer + if cfg.TxIndex || cfg.AddrIndex { + // Enable transaction index if address index is enabled since it + // requires it. + if !cfg.TxIndex { + indxLog.Infof("Transaction index enabled because it " + + "is required by the address index") + cfg.TxIndex = true + } else { + indxLog.Info("Transaction index is enabled") + } + + s.txIndex = indexers.NewTxIndex(db) + indexes = append(indexes, s.txIndex) + } + if cfg.AddrIndex { + indxLog.Info("Address index is enabled") + s.addrIndex = indexers.NewAddrIndex(db, chainParams) + indexes = append(indexes, s.addrIndex) + } + + // Create an index manager if any of the optional indexes are enabled. + var indexManager blockchain.IndexManager + if len(indexes) > 0 { + indexManager = indexers.NewManager(db, indexes) + } + bm, err := newBlockManager(&s, indexManager) if err != nil { return nil, err } @@ -2500,6 +2541,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param RelayNtfnChan: s.relayNtfnChan, SigCache: s.sigCache, TimeSource: s.timeSource, + AddrIndex: s.addrIndex, } s.txMemPool = newTxMemPool(&txC)