// Copyright (c) 2013-2014 Conformal Systems LLC. // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package main import ( "container/heap" "fmt" "runtime" "sync" "sync/atomic" "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/btcsuite/golangcrypto/ripemd160" ) type indexState int const ( // Our two operating modes. // We go into "CatchUp" mode when, on boot, the current best // chain height is greater than the last block we've indexed. // "CatchUp" mode is characterized by several concurrent worker // goroutines indexing blocks organized by a manager goroutine. // When in "CatchUp" mode, incoming requests to index newly solved // blocks are backed up for later processing. Once we've finished // catching up, we process these queued jobs, and then enter into // "maintainence" mode. indexCatchUp indexState = iota // When in "maintainence" mode, we have a single worker serially // processing incoming jobs to index newly solved blocks. indexMaintain ) // Limit the number of goroutines that concurrently // build the index to catch up based on the number // of processor cores. This help ensure the system // stays reasonably responsive under heavy load. var numCatchUpWorkers = runtime.NumCPU() * 3 // indexBlockMsg packages a request to have the addresses of a block indexed. type indexBlockMsg struct { blk *btcutil.Block done chan struct{} } // writeIndexReq represents a request to have a completed address index // committed to the database. type writeIndexReq struct { blk *btcutil.Block addrIndex database.BlockAddrIndex } // addrIndexer provides a concurrent service for indexing the transactions of // target blocks based on the addresses involved in the transaction. type addrIndexer struct { server *server started int32 shutdown int32 state indexState quit chan struct{} wg sync.WaitGroup addrIndexJobs chan *indexBlockMsg writeRequests chan *writeIndexReq progressLogger *blockProgressLogger currentIndexTip int64 chainTip int64 sync.Mutex } // newAddrIndexer creates a new block address indexer. // Use Start to begin processing incoming index jobs. func newAddrIndexer(s *server) (*addrIndexer, error) { _, chainHeight, err := s.db.NewestSha() if err != nil { return nil, err } _, lastIndexedHeight, err := s.db.FetchAddrIndexTip() if err != nil && err != database.ErrAddrIndexDoesNotExist { return nil, err } var state indexState if chainHeight == lastIndexedHeight { state = indexMaintain } else { state = indexCatchUp } ai := &addrIndexer{ server: s, quit: make(chan struct{}), state: state, addrIndexJobs: make(chan *indexBlockMsg), writeRequests: make(chan *writeIndexReq, numCatchUpWorkers), currentIndexTip: lastIndexedHeight, chainTip: chainHeight, progressLogger: newBlockProgressLogger("Indexed addresses of", adxrLog), } return ai, nil } // Start begins processing of incoming indexing jobs. func (a *addrIndexer) Start() { // Already started? if atomic.AddInt32(&a.started, 1) != 1 { return } adxrLog.Trace("Starting address indexer") a.wg.Add(2) go a.indexManager() go a.indexWriter() } // Stop gracefully shuts down the address indexer by stopping all ongoing // worker goroutines, waiting for them to finish their current task. func (a *addrIndexer) Stop() error { if atomic.AddInt32(&a.shutdown, 1) != 1 { adxrLog.Warnf("Address indexer is already in the process of " + "shutting down") return nil } adxrLog.Infof("Address indexer shutting down") close(a.quit) a.wg.Wait() return nil } // IsCaughtUp returns a bool representing if the address indexer has // caught up with the best height on the main chain. func (a *addrIndexer) IsCaughtUp() bool { a.Lock() defer a.Unlock() return a.state == indexMaintain } // indexManager creates, and oversees worker index goroutines. // indexManager is the main goroutine for the addresses indexer. // It creates, and oversees worker goroutines to index incoming blocks, with // the exact behavior depending on the current index state // (catch up, vs maintain). Completion of catch-up mode is always proceeded by // a gracefull transition into "maintain" mode. // NOTE: Must be run as a goroutine. func (a *addrIndexer) indexManager() { if a.state == indexCatchUp { adxrLog.Infof("Building up address index from height %v to %v.", a.currentIndexTip+1, a.chainTip) // Quit semaphores to gracefully shut down our worker tasks. runningWorkers := make([]chan struct{}, 0, numCatchUpWorkers) shutdownWorkers := func() { for _, quit := range runningWorkers { close(quit) } } criticalShutdown := func() { shutdownWorkers() a.server.Stop() } // Spin up all of our "catch up" worker goroutines, giving them // a quit channel and WaitGroup so we can gracefully exit if // needed. var workerWg sync.WaitGroup catchUpChan := make(chan *indexBlockMsg) for i := 0; i < numCatchUpWorkers; i++ { quit := make(chan struct{}) runningWorkers = append(runningWorkers, quit) workerWg.Add(1) go a.indexCatchUpWorker(catchUpChan, &workerWg, quit) } // Starting from the next block after our current index tip, // feed our workers each successive block to index until we've // caught up to the current highest block height. lastBlockIdxHeight := a.currentIndexTip + 1 for lastBlockIdxHeight <= a.chainTip { targetSha, err := a.server.db.FetchBlockShaByHeight(lastBlockIdxHeight) if err != nil { adxrLog.Errorf("Unable to look up the sha of the "+ "next target block (height %v): %v", lastBlockIdxHeight, err) criticalShutdown() goto fin } targetBlock, err := a.server.db.FetchBlockBySha(targetSha) if err != nil { // Unable to locate a target block by sha, this // is a critical error, we may have an // inconsistency in the DB. adxrLog.Errorf("Unable to look up the next "+ "target block (sha %v): %v", targetSha, err) criticalShutdown() goto fin } // Send off the next job, ready to exit if a shutdown is // signalled. indexJob := &indexBlockMsg{blk: targetBlock} select { case catchUpChan <- indexJob: lastBlockIdxHeight++ case <-a.quit: shutdownWorkers() goto fin } _, a.chainTip, err = a.server.db.NewestSha() if err != nil { adxrLog.Errorf("Unable to get latest block height: %v", err) criticalShutdown() goto fin } } a.Lock() a.state = indexMaintain a.Unlock() // We've finished catching up. Signal our workers to quit, and // wait until they've all finished. shutdownWorkers() workerWg.Wait() } adxrLog.Infof("Address indexer has caught up to best height, entering " + "maintainence mode") // We're all caught up at this point. We now serially process new jobs // coming in. for { select { case indexJob := <-a.addrIndexJobs: addrIndex, err := a.indexBlockAddrs(indexJob.blk) if err != nil { adxrLog.Errorf("Unable to index transactions of"+ " block: %v", err) a.server.Stop() goto fin } a.writeRequests <- &writeIndexReq{blk: indexJob.blk, addrIndex: addrIndex} case <-a.quit: goto fin } } fin: a.wg.Done() } // UpdateAddressIndex asynchronously queues a newly solved block to have its // transactions indexed by address. func (a *addrIndexer) UpdateAddressIndex(block *btcutil.Block) { go func() { job := &indexBlockMsg{blk: block} a.addrIndexJobs <- job }() } // pendingIndexWrites writes is a priority queue which is used to ensure the // address index of the block height N+1 is written when our address tip is at // height N. This ordering is necessary to maintain index consistency in face // of our concurrent workers, which may not necessarily finish in the order the // jobs are handed out. type pendingWriteQueue []*writeIndexReq // Len returns the number of items in the priority queue. It is part of the // heap.Interface implementation. func (pq pendingWriteQueue) Len() int { return len(pq) } // Less returns whether the item in the priority queue with index i should sort // before the item with index j. It is part of the heap.Interface implementation. func (pq pendingWriteQueue) Less(i, j int) bool { return pq[i].blk.Height() < pq[j].blk.Height() } // Swap swaps the items at the passed indices in the priority queue. It is // part of the heap.Interface implementation. func (pq pendingWriteQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } // Push pushes the passed item onto the priority queue. It is part of the // heap.Interface implementation. func (pq *pendingWriteQueue) Push(x interface{}) { *pq = append(*pq, x.(*writeIndexReq)) } // Pop removes the highest priority item (according to Less) from the priority // queue and returns it. It is part of the heap.Interface implementation. func (pq *pendingWriteQueue) Pop() interface{} { n := len(*pq) item := (*pq)[n-1] (*pq)[n-1] = nil *pq = (*pq)[0 : n-1] return item } // indexWriter commits the populated address indexes created by the // catch up workers to the database. Since we have concurrent workers, the writer // ensures indexes are written in ascending order to avoid a possible gap in the // address index triggered by an unexpected shutdown. // NOTE: Must be run as a goroutine func (a *addrIndexer) indexWriter() { var pendingWrites pendingWriteQueue minHeightWrite := make(chan *writeIndexReq) workerQuit := make(chan struct{}) writeFinished := make(chan struct{}, 1) // Spawn a goroutine to feed our writer address indexes such // that, if our address tip is at N, the index for block N+1 is always // written first. We use a priority queue to enforce this condition // while accepting new write requests. go func() { for { top: select { case incomingWrite := <-a.writeRequests: heap.Push(&pendingWrites, incomingWrite) // Check if we've found a write request that // satisfies our condition. If we have, then // chances are we have some backed up requests // which wouldn't be written until a previous // request showed up. If this is the case we'll // quickly flush our heap of now available in // order writes. We also accept write requests // with a block height *before* the current // index tip, in order to re-index new prior // blocks added to the main chain during a // re-org. writeReq := heap.Pop(&pendingWrites).(*writeIndexReq) _, addrTip, _ := a.server.db.FetchAddrIndexTip() for writeReq.blk.Height() == (addrTip+1) || writeReq.blk.Height() <= addrTip { minHeightWrite <- writeReq // Wait for write to finish so we get a // fresh view of the addrtip. <-writeFinished // Break to grab a new write request if pendingWrites.Len() == 0 { break top } writeReq = heap.Pop(&pendingWrites).(*writeIndexReq) _, addrTip, _ = a.server.db.FetchAddrIndexTip() } // We haven't found the proper write request yet, // push back onto our heap and wait for the next // request which may be our target write. heap.Push(&pendingWrites, writeReq) case <-workerQuit: return } } }() out: // Our main writer loop. Here we actually commit the populated address // indexes to the database. for { select { case nextWrite := <-minHeightWrite: sha, _ := nextWrite.blk.Sha() // Can never fail. height := nextWrite.blk.Height() err := a.server.db.UpdateAddrIndexForBlock(sha, height, nextWrite.addrIndex) if err != nil { adxrLog.Errorf("Unable to write index for block, "+ "sha %v, height %v", sha, height) a.server.Stop() break out } writeFinished <- struct{}{} a.progressLogger.LogBlockHeight(nextWrite.blk) case <-a.quit: break out } } close(workerQuit) a.wg.Done() } // indexCatchUpWorker indexes the transactions of previously validated and // stored blocks. // NOTE: Must be run as a goroutine func (a *addrIndexer) indexCatchUpWorker(workChan chan *indexBlockMsg, wg *sync.WaitGroup, quit chan struct{}) { out: for { select { case indexJob := <-workChan: addrIndex, err := a.indexBlockAddrs(indexJob.blk) if err != nil { adxrLog.Errorf("Unable to index transactions of"+ " block: %v", err) a.server.Stop() break out } a.writeRequests <- &writeIndexReq{blk: indexJob.blk, addrIndex: addrIndex} case <-quit: break out } } wg.Done() } // indexScriptPubKey indexes all data pushes greater than 8 bytes within the // passed SPK. Our "address" index is actually a hash160 index, where in the // ideal case the data push is either the hash160 of a publicKey (P2PKH) or // a Script (P2SH). func indexScriptPubKey(addrIndex database.BlockAddrIndex, scriptPubKey []byte, locInBlock *wire.TxLoc) error { dataPushes, err := txscript.PushedData(scriptPubKey) if err != nil { adxrLog.Tracef("Couldn't get pushes: %v", err) return err } for _, data := range dataPushes { // Only index pushes greater than 8 bytes. if len(data) < 8 { continue } var indexKey [ripemd160.Size]byte // A perfect little hash160. if len(data) <= 20 { copy(indexKey[:], data) // Otherwise, could be a payToPubKey or an OP_RETURN, so we'll // make a hash160 out of it. } else { copy(indexKey[:], btcutil.Hash160(data)) } addrIndex[indexKey] = append(addrIndex[indexKey], locInBlock) } return nil } // indexBlockAddrs returns a populated index of the all the transactions in the // passed block based on the addresses involved in each transaction. func (a *addrIndexer) indexBlockAddrs(blk *btcutil.Block) (database.BlockAddrIndex, error) { addrIndex := make(database.BlockAddrIndex) txLocs, err := blk.TxLoc() if err != nil { return nil, err } for txIdx, tx := range blk.Transactions() { // Tx's offset and length in the block. locInBlock := &txLocs[txIdx] // Coinbases don't have any inputs. if !blockchain.IsCoinBase(tx) { // Index the SPK's of each input's previous outpoint // transaction. for _, txIn := range tx.MsgTx().TxIn { // Lookup and fetch the referenced output's tx. prevOut := txIn.PreviousOutPoint txList, err := a.server.db.FetchTxBySha(&prevOut.Hash) if len(txList) == 0 { return nil, fmt.Errorf("transaction %v not found", prevOut.Hash) } if err != nil { adxrLog.Errorf("Error fetching tx %v: %v", prevOut.Hash, err) return nil, err } prevOutTx := txList[len(txList)-1] inputOutPoint := prevOutTx.Tx.TxOut[prevOut.Index] indexScriptPubKey(addrIndex, inputOutPoint.PkScript, locInBlock) } } for _, txOut := range tx.MsgTx().TxOut { indexScriptPubKey(addrIndex, txOut.PkScript, locInBlock) } } return addrIndex, nil }