blockchain/indexers: update indexing to use stxos instead of utxo view for blocks

In this commit, we update all the indexers to use the stxo set for a
particular block rather than the utxo view for the block. We do this as
we can eliminate a large number of random reads for each block, and can
instead deserialize a single instance of all the outputs spent in that
block and feed in the prev input scripts to each indexer.
This commit is contained in:
Olaoluwa Osuntokun 2018-05-28 21:55:34 -07:00
parent ad69a7121b
commit a26e2634fa
No known key found for this signature in database
GPG key ID: 964EA263DD637C21
5 changed files with 88 additions and 62 deletions

View file

@ -692,23 +692,26 @@ func (idx *AddrIndex) indexPkScript(data writeIndexData, pkScript []byte, txIdx
// indexBlock extract all of the standard addresses from all of the transactions
// in the passed block and maps each of them to the associated transaction using
// the passed map.
func (idx *AddrIndex) indexBlock(data writeIndexData, block *btcutil.Block, view *blockchain.UtxoViewpoint) {
func (idx *AddrIndex) indexBlock(data writeIndexData, block *btcutil.Block,
stxos []blockchain.SpentTxOut) {
stxoIndex := 0
for txIdx, tx := range block.Transactions() {
// Coinbases do not reference any inputs. Since the block is
// required to have already gone through full validation, it has
// already been proven on the first transaction in the block is
// a coinbase.
if txIdx != 0 {
for _, txIn := range tx.MsgTx().TxIn {
// The view should always have the input since
// the index contract requires it, however, be
// safe and simply ignore any missing entries.
entry := view.LookupEntry(txIn.PreviousOutPoint)
if entry == nil {
continue
}
for range tx.MsgTx().TxIn {
// We'll access the slice of all the
// transactions spent in this block properly
// ordered to fetch the previous input script.
pkScript := stxos[stxoIndex].PkScript
idx.indexPkScript(data, pkScript, txIdx)
idx.indexPkScript(data, entry.PkScript(), txIdx)
// With an input indexed, we'll advance the
// stxo coutner.
stxoIndex++
}
}
@ -723,7 +726,9 @@ func (idx *AddrIndex) indexBlock(data writeIndexData, block *btcutil.Block, view
// the transactions in the block involve.
//
// This is part of the Indexer interface.
func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block,
stxos []blockchain.SpentTxOut) error {
// The offset and length of the transactions within the serialized
// block.
txLocs, err := block.TxLoc()
@ -739,7 +744,7 @@ func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view
// Build all of the address to transaction mappings in a local map.
addrsToTxns := make(writeIndexData)
idx.indexBlock(addrsToTxns, block, view)
idx.indexBlock(addrsToTxns, block, stxos)
// Add all of the index entries for each address.
addrIdxBucket := dbTx.Metadata().Bucket(addrIndexKey)
@ -761,10 +766,12 @@ func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view
// each transaction in the block involve.
//
// This is part of the Indexer interface.
func (idx *AddrIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func (idx *AddrIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
stxos []blockchain.SpentTxOut) error {
// Build all of the address to transaction mappings in a local map.
addrsToTxns := make(writeIndexData)
idx.indexBlock(addrsToTxns, block, view)
idx.indexBlock(addrsToTxns, block, stxos)
// Remove all of the index entries for each address.
bucket := dbTx.Metadata().Bucket(addrIndexKey)

View file

@ -202,7 +202,7 @@ func storeFilter(dbTx database.Tx, block *btcutil.Block, f *gcs.Filter,
// connected to the main chain. This indexer adds a hash-to-cf mapping for
// every passed block. This is part of the Indexer interface.
func (idx *CfIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block,
view *blockchain.UtxoViewpoint) error {
stxos []blockchain.SpentTxOut) error {
f, err := builder.BuildBasicFilter(block.MsgBlock())
if err != nil {
@ -226,7 +226,7 @@ func (idx *CfIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block,
// disconnected from the main chain. This indexer removes the hash-to-cf
// mapping for every passed block. This is part of the Indexer interface.
func (idx *CfIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
view *blockchain.UtxoViewpoint) error {
_ []blockchain.SpentTxOut) error {
for _, key := range cfIndexKeys {
err := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())

View file

@ -50,13 +50,17 @@ type Indexer interface {
// every load, including the case the index was just created.
Init() error
// ConnectBlock is invoked when the index manager is notified that a new
// block has been connected to the main chain.
ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error
// ConnectBlock is invoked when a new block has been connected to the
// main chain. The set of output spent within a block is also passed in
// so indexers can access the pevious output scripts input spent if
// required.
ConnectBlock(database.Tx, *btcutil.Block, []blockchain.SpentTxOut) error
// DisconnectBlock is invoked when the index manager is notified that a
// block has been disconnected from the main chain.
DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error
// DisconnectBlock is invoked when a block has been disconnected from
// the main chain. The set of outputs scripts that were spent within
// this block is also returned so indexers can clean up the prior index
// state for this block
DisconnectBlock(database.Tx, *btcutil.Block, []blockchain.SpentTxOut) error
}
// AssertError identifies an error that indicates an internal code consistency

View file

@ -68,7 +68,9 @@ func dbFetchIndexerTip(dbTx database.Tx, idxKey []byte) (*chainhash.Hash, int32,
// given block using the provided indexer and updates the tip of the indexer
// accordingly. An error will be returned if the current tip for the indexer is
// not the previous block for the passed block.
func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block,
stxo []blockchain.SpentTxOut) error {
// Assert that the block being connected properly connects to the
// current tip of the index.
idxKey := indexer.Key()
@ -84,7 +86,7 @@ func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block
}
// Notify the indexer with the connected block so it can index it.
if err := indexer.ConnectBlock(dbTx, block, view); err != nil {
if err := indexer.ConnectBlock(dbTx, block, stxo); err != nil {
return err
}
@ -96,7 +98,9 @@ func dbIndexConnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block
// given block using the provided indexer and updates the tip of the indexer
// accordingly. An error will be returned if the current tip for the indexer is
// not the passed block.
func dbIndexDisconnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func dbIndexDisconnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Block,
stxo []blockchain.SpentTxOut) error {
// Assert that the block being disconnected is the current tip of the
// index.
idxKey := indexer.Key()
@ -113,7 +117,7 @@ func dbIndexDisconnectBlock(dbTx database.Tx, indexer Indexer, block *btcutil.Bl
// Notify the indexer with the disconnected block so it can remove all
// of the appropriate entries.
if err := indexer.DisconnectBlock(dbTx, block, view); err != nil {
if err := indexer.DisconnectBlock(dbTx, block, stxo); err != nil {
return err
}
@ -299,7 +303,8 @@ func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{})
// loaded directly since it is no longer in the main
// chain and thus the chain.BlockByHash function would
// error.
err = m.db.Update(func(dbTx database.Tx) error {
var block *btcutil.Block
err := m.db.View(func(dbTx database.Tx) error {
blockBytes, err := dbTx.FetchBlock(hash)
if err != nil {
return err
@ -309,24 +314,27 @@ func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{})
return err
}
block.SetHeight(height)
return err
})
if err != nil {
return err
}
// When the index requires all of the referenced
// txouts they need to be retrieved from the
// transaction index.
var view *blockchain.UtxoViewpoint
if indexNeedsInputs(indexer) {
var err error
view, err = makeUtxoView(dbTx, block,
interrupt)
if err != nil {
return err
}
}
// We'll also grab the set of outputs spent by this
// block so we can remove them from the index.
spentTxos, err := chain.FetchSpendJournal(block)
if err != nil {
return err
}
// With the block and stxo set for that block retrieved,
// we can now update the index itself.
err = m.db.Update(func(dbTx database.Tx) error {
// Remove all of the index entries associated
// with the block and update the indexer tip.
err = dbIndexDisconnectBlock(dbTx, indexer,
block, view)
err = dbIndexDisconnectBlock(
dbTx, indexer, block, spentTxos,
)
if err != nil {
return err
}
@ -407,7 +415,7 @@ func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{})
}
// Connect the block for all indexes that need it.
var view *blockchain.UtxoViewpoint
var spentTxos []blockchain.SpentTxOut
for i, indexer := range m.enabledIndexes {
// Skip indexes that don't need to be updated with this
// block.
@ -415,21 +423,20 @@ func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{})
continue
}
err := m.db.Update(func(dbTx database.Tx) error {
// When the index requires all of the referenced
// txouts and they haven't been loaded yet, they
// need to be retrieved from the transaction
// index.
if view == nil && indexNeedsInputs(indexer) {
var err error
view, err = makeUtxoView(dbTx, block,
interrupt)
if err != nil {
return err
}
// When the index requires all of the referenced txouts
// and they haven't been loaded yet, they need to be
// retrieved from the spend journal.
if spentTxos == nil && indexNeedsInputs(indexer) {
spentTxos, err = chain.FetchSpendJournal(block)
if err != nil {
return err
}
return dbIndexConnectBlock(dbTx, indexer, block,
view)
}
err := m.db.Update(func(dbTx database.Tx) error {
return dbIndexConnectBlock(
dbTx, indexer, block, spentTxos,
)
})
if err != nil {
return err
@ -528,11 +535,13 @@ func makeUtxoView(dbTx database.Tx, block *btcutil.Block, interrupt <-chan struc
// checks, and invokes each indexer.
//
// This is part of the blockchain.IndexManager interface.
func (m *Manager) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func (m *Manager) ConnectBlock(dbTx database.Tx, block *btcutil.Block,
stxos []blockchain.SpentTxOut) error {
// Call each of the currently active optional indexes with the block
// being connected so they can update accordingly.
for _, index := range m.enabledIndexes {
err := dbIndexConnectBlock(dbTx, index, block, view)
err := dbIndexConnectBlock(dbTx, index, block, stxos)
if err != nil {
return err
}
@ -546,11 +555,13 @@ func (m *Manager) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blo
// the index entries associated with the block.
//
// This is part of the blockchain.IndexManager interface.
func (m *Manager) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func (m *Manager) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
stxo []blockchain.SpentTxOut) error {
// Call each of the currently active optional indexes with the block
// being disconnected so they can update accordingly.
for _, index := range m.enabledIndexes {
err := dbIndexDisconnectBlock(dbTx, index, block, view)
err := dbIndexDisconnectBlock(dbTx, index, block, stxo)
if err != nil {
return err
}

View file

@ -388,7 +388,9 @@ func (idx *TxIndex) Create(dbTx database.Tx) error {
// for every transaction in the passed block.
//
// This is part of the Indexer interface.
func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block,
stxos []blockchain.SpentTxOut) error {
// Increment the internal block ID to use for the block being connected
// and add all of the transactions in the block to the index.
newBlockID := idx.curBlockID + 1
@ -411,7 +413,9 @@ func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *btcutil.Block, view *b
// hash-to-transaction mapping for every transaction in the block.
//
// This is part of the Indexer interface.
func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block, view *blockchain.UtxoViewpoint) error {
func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
stxos []blockchain.SpentTxOut) error {
// Remove all of the transactions in the block from the index.
if err := dbRemoveTxIndexEntries(dbTx, block); err != nil {
return err