From 8c883d1fcad75ca146ab6657b0640208cb1555f4 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Sun, 3 Sep 2017 19:50:38 -0500 Subject: [PATCH] blockchain/indexers: Allow interrupts. This propagates the interrupt channel through to blockchain and the indexers so that it is possible to interrupt long-running operations such as catching up indexes. --- blockchain/chain.go | 17 +++++++++-- blockchain/indexers/addrindex.go | 4 +-- blockchain/indexers/common.go | 18 ++++++++++++ blockchain/indexers/manager.go | 48 ++++++++++++++++++++++++++------ blockchain/indexers/txindex.go | 7 +++-- btcd.go | 15 +++++----- server.go | 3 +- 7 files changed, 87 insertions(+), 25 deletions(-) diff --git a/blockchain/chain.go b/blockchain/chain.go index f773549b..47289a52 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -1421,8 +1421,11 @@ func (b *BlockChain) LocateHeaders(locator BlockLocator, hashStop *chainhash.Has // purpose of supporting optional indexes. type IndexManager interface { // Init is invoked during chain initialize in order to allow the index - // manager to initialize itself and any indexes it is managing. - Init(*BlockChain) error + // manager to initialize itself and any indexes it is managing. The + // channel parameter specifies a channel the caller can close to signal + // that the process should be interrupted. It can be nil if that + // behavior is not desired. + Init(*BlockChain, <-chan struct{}) error // ConnectBlock is invoked when a new block has been connected to the // main chain. @@ -1441,6 +1444,13 @@ type Config struct { // This field is required. DB database.DB + // Interrupt specifies a channel the caller can close to signal that + // long running operations, such as catching up indexes or performing + // database migrations, should be interrupted. + // + // This field can be nil if the caller does not desire the behavior. + Interrupt <-chan struct{} + // ChainParams identifies which chain parameters the chain is associated // with. // @@ -1555,7 +1565,8 @@ func New(config *Config) (*BlockChain, error) { // Initialize and catch up all of the currently active optional indexes // as needed. if config.IndexManager != nil { - if err := config.IndexManager.Init(&b); err != nil { + err := config.IndexManager.Init(&b, config.Interrupt) + if err != nil { return nil, err } } diff --git a/blockchain/indexers/addrindex.go b/blockchain/indexers/addrindex.go index 6400b120..d512c32f 100644 --- a/blockchain/indexers/addrindex.go +++ b/blockchain/indexers/addrindex.go @@ -958,6 +958,6 @@ func NewAddrIndex(db database.DB, chainParams *chaincfg.Params) *AddrIndex { // DropAddrIndex drops the address index from the provided database if it // exists. -func DropAddrIndex(db database.DB) error { - return dropIndex(db, addrIndexKey, addrIndexName) +func DropAddrIndex(db database.DB, interrupt <-chan struct{}) error { + return dropIndex(db, addrIndexKey, addrIndexName, interrupt) } diff --git a/blockchain/indexers/common.go b/blockchain/indexers/common.go index be4f07af..9b0320dc 100644 --- a/blockchain/indexers/common.go +++ b/blockchain/indexers/common.go @@ -9,6 +9,7 @@ package indexers import ( "encoding/binary" + "errors" "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/database" @@ -19,6 +20,10 @@ var ( // byteOrder is the preferred byte order used for serializing numeric // fields for storage in the database. byteOrder = binary.LittleEndian + + // errInterruptRequested indicates that an operation was cancelled due + // to a user-requested interrupt. + errInterruptRequested = errors.New("interrupt requested") ) // NeedsInputser provides a generic interface for an indexer to specify the it @@ -88,3 +93,16 @@ type internalBucket interface { Put(key []byte, value []byte) error Delete(key []byte) error } + +// interruptRequested returns true when the provided channel has been closed. +// This simplifies early shutdown slightly since the caller can just use an if +// statement instead of a select. +func interruptRequested(interrupted <-chan struct{}) bool { + select { + case <-interrupted: + return true + default: + } + + return false +} diff --git a/blockchain/indexers/manager.go b/blockchain/indexers/manager.go index 7de6f5f5..cc996dd8 100644 --- a/blockchain/indexers/manager.go +++ b/blockchain/indexers/manager.go @@ -146,7 +146,7 @@ func indexDropKey(idxKey []byte) []byte { // of being dropped and finishes dropping them when the are. This is necessary // because dropping and index has to be done in several atomic steps rather than // one big atomic step due to the massive number of entries. -func (m *Manager) maybeFinishDrops() error { +func (m *Manager) maybeFinishDrops(interrupt <-chan struct{}) error { indexNeedsDrop := make([]bool, len(m.enabledIndexes)) err := m.db.View(func(dbTx database.Tx) error { // None of the indexes needs to be dropped if the index tips @@ -156,7 +156,7 @@ func (m *Manager) maybeFinishDrops() error { return nil } - // Make the indexer as requiring a drop if one is already in + // Mark the indexer as requiring a drop if one is already in // progress. for i, indexer := range m.enabledIndexes { dropKey := indexDropKey(indexer.Key()) @@ -171,6 +171,10 @@ func (m *Manager) maybeFinishDrops() error { return err } + if interruptRequested(interrupt) { + return errInterruptRequested + } + // Finish dropping any of the enabled indexes that are already in the // middle of being dropped. for i, indexer := range m.enabledIndexes { @@ -179,7 +183,7 @@ func (m *Manager) maybeFinishDrops() error { } log.Infof("Resuming %s drop", indexer.Name()) - err := dropIndex(m.db, indexer.Key(), indexer.Name()) + err := dropIndex(m.db, indexer.Key(), indexer.Name(), interrupt) if err != nil { return err } @@ -225,14 +229,18 @@ func (m *Manager) maybeCreateIndexes(dbTx database.Tx) error { // catch up due to the I/O contention. // // This is part of the blockchain.IndexManager interface. -func (m *Manager) Init(chain *blockchain.BlockChain) error { +func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{}) error { // Nothing to do when no indexes are enabled. if len(m.enabledIndexes) == 0 { return nil } + if interruptRequested(interrupt) { + return errInterruptRequested + } + // Finish and drops that were previously interrupted. - if err := m.maybeFinishDrops(); err != nil { + if err := m.maybeFinishDrops(interrupt); err != nil { return err } @@ -308,7 +316,8 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error { var view *blockchain.UtxoViewpoint if indexNeedsInputs(indexer) { var err error - view, err = makeUtxoView(dbTx, block) + view, err = makeUtxoView(dbTx, block, + interrupt) if err != nil { return err } @@ -331,6 +340,10 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error { if err != nil { return err } + + if interruptRequested(interrupt) { + return errInterruptRequested + } } if initialHeight != height { @@ -389,6 +402,10 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error { return err } + if interruptRequested(interrupt) { + return errInterruptRequested + } + // Connect the block for all indexes that need it. var view *blockchain.UtxoViewpoint for i, indexer := range m.enabledIndexes { @@ -405,7 +422,8 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error { // index. if view == nil && indexNeedsInputs(indexer) { var err error - view, err = makeUtxoView(dbTx, block) + view, err = makeUtxoView(dbTx, block, + interrupt) if err != nil { return err } @@ -421,6 +439,10 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error { // Log indexing progress. progressLogger.LogBlockHeight(block) + + if interruptRequested(interrupt) { + return errInterruptRequested + } } log.Infof("Indexes caught up to height %d", bestHeight) @@ -470,7 +492,7 @@ func dbFetchTx(dbTx database.Tx, hash *chainhash.Hash) (*wire.MsgTx, error) { // transactions in the block. This is sometimes needed when catching indexes up // because many of the txouts could actually already be spent however the // associated scripts are still required to index them. -func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewpoint, error) { +func makeUtxoView(dbTx database.Tx, block *btcutil.Block, interrupt <-chan struct{}) (*blockchain.UtxoViewpoint, error) { view := blockchain.NewUtxoViewpoint() for txIdx, tx := range block.Transactions() { // Coinbases do not reference any inputs. Since the block is @@ -492,6 +514,10 @@ func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewp view.AddTxOuts(btcutil.NewTx(originTx), 0) } + + if interruptRequested(interrupt) { + return nil, errInterruptRequested + } } return view, nil @@ -548,7 +574,7 @@ func NewManager(db database.DB, enabledIndexes []Indexer) *Manager { // keep memory usage to reasonable levels. It also marks the drop in progress // so the drop can be resumed if it is stopped before it is done before the // index can be used again. -func dropIndex(db database.DB, idxKey []byte, idxName string) error { +func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error { // Nothing to do if the index doesn't already exist. var needsDelete bool err := db.View(func(dbTx database.Tx) error { @@ -610,6 +636,10 @@ func dropIndex(db database.DB, idxKey []byte, idxName string) error { log.Infof("Deleted %d keys (%d total) from %s", numDeleted, totalDeleted, idxName) } + + if interruptRequested(interrupt) { + return errInterruptRequested + } } // Call extra index specific deinitialization for the transaction index. diff --git a/blockchain/indexers/txindex.go b/blockchain/indexers/txindex.go index 6d36ac60..9b1826cc 100644 --- a/blockchain/indexers/txindex.go +++ b/blockchain/indexers/txindex.go @@ -469,10 +469,11 @@ func dropBlockIDIndex(db database.DB) error { // DropTxIndex drops the transaction index from the provided database if it // exists. Since the address index relies on it, the address index will also be // dropped when it exists. -func DropTxIndex(db database.DB) error { - if err := dropIndex(db, addrIndexKey, addrIndexName); err != nil { +func DropTxIndex(db database.DB, interrupt <-chan struct{}) error { + err := dropIndex(db, addrIndexKey, addrIndexName, interrupt) + if err != nil { return err } - return dropIndex(db, txIndexKey, txIndexName) + return dropIndex(db, txIndexKey, txIndexName, interrupt) } diff --git a/btcd.go b/btcd.go index 8d4b8802..2395974b 100644 --- a/btcd.go +++ b/btcd.go @@ -57,7 +57,7 @@ func btcdMain(serverChan chan<- *server) error { // Get a channel that will be closed when a shutdown signal has been // triggered either from an OS signal such as SIGINT (Ctrl+C) or from // another subsystem such as the RPC server. - interruptedChan := interruptListener() + interrupt := interruptListener() defer btcdLog.Info("Shutdown complete") // Show version at startup. @@ -94,7 +94,7 @@ func btcdMain(serverChan chan<- *server) error { } // Return now if an interrupt signal was triggered. - if interruptRequested(interruptedChan) { + if interruptRequested(interrupt) { return nil } @@ -111,7 +111,7 @@ func btcdMain(serverChan chan<- *server) error { }() // Return now if an interrupt signal was triggered. - if interruptRequested(interruptedChan) { + if interruptRequested(interrupt) { return nil } @@ -120,7 +120,7 @@ func btcdMain(serverChan chan<- *server) error { // NOTE: The order is important here because dropping the tx index also // drops the address index since it relies on it. if cfg.DropAddrIndex { - if err := indexers.DropAddrIndex(db); err != nil { + if err := indexers.DropAddrIndex(db, interrupt); err != nil { btcdLog.Errorf("%v", err) return err } @@ -128,7 +128,7 @@ func btcdMain(serverChan chan<- *server) error { return nil } if cfg.DropTxIndex { - if err := indexers.DropTxIndex(db); err != nil { + if err := indexers.DropTxIndex(db, interrupt); err != nil { btcdLog.Errorf("%v", err) return err } @@ -137,7 +137,8 @@ func btcdMain(serverChan chan<- *server) error { } // Create server and start it. - server, err := newServer(cfg.Listeners, db, activeNetParams.Params) + server, err := newServer(cfg.Listeners, db, activeNetParams.Params, + interrupt) if err != nil { // TODO: this logging could do with some beautifying. btcdLog.Errorf("Unable to start server on %v: %v", @@ -158,7 +159,7 @@ func btcdMain(serverChan chan<- *server) error { // Wait until the interrupt signal is received from an OS signal or // shutdown is requested through one of the subsystems such as the RPC // server. - <-interruptedChan + <-interrupt return nil } diff --git a/server.go b/server.go index c2092c44..5b3c8385 100644 --- a/server.go +++ b/server.go @@ -2153,7 +2153,7 @@ func setupRPCListeners() ([]net.Listener, error) { // newServer returns a new btcd server configured to listen on addr for the // bitcoin network type specified by chainParams. Use start to begin accepting // connections from peers. -func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Params) (*server, error) { +func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Params, interrupt <-chan struct{}) (*server, error) { services := defaultServices if cfg.NoPeerBloomFilters { services &^= wire.SFNodeBloom @@ -2237,6 +2237,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param var err error s.chain, err = blockchain.New(&blockchain.Config{ DB: s.db, + Interrupt: interrupt, ChainParams: s.chainParams, Checkpoints: checkpoints, TimeSource: s.timeSource,