From 68e02afbf42b6748465f7501b0a8b16ae5675241 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 13 Feb 2017 16:00:02 -0700 Subject: [PATCH] SPV implementation - hodgepodge mishmash not done yet --- spvsvc/log.go | 26 + spvsvc/spvchain/blocklogger.go | 76 ++ spvsvc/spvchain/blockmanager.go | 1483 ++++++++++++++++++++++++++++++ spvsvc/spvchain/db.go | 429 +++++++++ spvsvc/spvchain/filter.go | 73 ++ spvsvc/spvchain/log.go | 26 + spvsvc/spvchain/notifications.go | 149 +++ spvsvc/spvchain/spvchain.go | 1373 +++++++++++++++++++++++++++ spvsvc/spvsvc.go | 277 ++++++ 9 files changed, 3912 insertions(+) create mode 100644 spvsvc/log.go create mode 100644 spvsvc/spvchain/blocklogger.go create mode 100644 spvsvc/spvchain/blockmanager.go create mode 100644 spvsvc/spvchain/db.go create mode 100644 spvsvc/spvchain/filter.go create mode 100644 spvsvc/spvchain/log.go create mode 100644 spvsvc/spvchain/notifications.go create mode 100644 spvsvc/spvchain/spvchain.go create mode 100644 spvsvc/spvsvc.go diff --git a/spvsvc/log.go b/spvsvc/log.go new file mode 100644 index 0000000..05be319 --- /dev/null +++ b/spvsvc/log.go @@ -0,0 +1,26 @@ +package spvsvc + +import "github.com/btcsuite/btclog" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/spvsvc/spvchain/blocklogger.go b/spvsvc/spvchain/blocklogger.go new file mode 100644 index 0000000..dbc3c69 --- /dev/null +++ b/spvsvc/spvchain/blocklogger.go @@ -0,0 +1,76 @@ +package spvchain + +import ( + "sync" + "time" + + "github.com/btcsuite/btclog" + "github.com/btcsuite/btcutil" +) + +// blockProgressLogger provides periodic logging for other services in order +// to show users progress of certain "actions" involving some or all current +// blocks. Ex: syncing to best chain, indexing all blocks, etc. +type blockProgressLogger struct { + receivedLogBlocks int64 + receivedLogTx int64 + lastBlockLogTime time.Time + + subsystemLogger btclog.Logger + progressAction string + sync.Mutex +} + +// newBlockProgressLogger returns a new block progress logger. +// The progress message is templated as follows: +// {progressAction} {numProcessed} {blocks|block} in the last {timePeriod} +// ({numTxs}, height {lastBlockHeight}, {lastBlockTimeStamp}) +func newBlockProgressLogger(progressMessage string, logger btclog.Logger) *blockProgressLogger { + return &blockProgressLogger{ + lastBlockLogTime: time.Now(), + progressAction: progressMessage, + subsystemLogger: logger, + } +} + +// LogBlockHeight logs a new block height as an information message to show +// progress to the user. In order to prevent spam, it limits logging to one +// message every 10 seconds with duration and totals included. +func (b *blockProgressLogger) LogBlockHeight(block *btcutil.Block) { + b.Lock() + defer b.Unlock() + + b.receivedLogBlocks++ + b.receivedLogTx += int64(len(block.MsgBlock().Transactions)) + + now := time.Now() + duration := now.Sub(b.lastBlockLogTime) + if duration < time.Second*10 { + return + } + + // Truncate the duration to 10s of milliseconds. + durationMillis := int64(duration / time.Millisecond) + tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) + + // Log information about new block height. + blockStr := "blocks" + if b.receivedLogBlocks == 1 { + blockStr = "block" + } + txStr := "transactions" + if b.receivedLogTx == 1 { + txStr = "transaction" + } + b.subsystemLogger.Infof("%s %d %s in the last %s (%d %s, height %d, %s)", + b.progressAction, b.receivedLogBlocks, blockStr, tDuration, b.receivedLogTx, + txStr, block.Height(), block.MsgBlock().Header.Timestamp) + + b.receivedLogBlocks = 0 + b.receivedLogTx = 0 + b.lastBlockLogTime = now +} + +func (b *blockProgressLogger) SetLastLogTime(time time.Time) { + b.lastBlockLogTime = time +} diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go new file mode 100644 index 0000000..a52840c --- /dev/null +++ b/spvsvc/spvchain/blockmanager.go @@ -0,0 +1,1483 @@ +package spvchain + +import ( + "container/list" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +const ( + // minInFlightBlocks is the minimum number of blocks that should be + // in the request queue for headers-first mode before requesting + // more. + minInFlightBlocks = 10 + + // blockDbNamePrefix is the prefix for the block database name. The + // database type is appended to this value to form the full block + // database name. + blockDbNamePrefix = "blocks" + + // maxRequestedBlocks is the maximum number of requested block + // hashes to store in memory. + maxRequestedBlocks = wire.MaxInvPerMsg +) + +// zeroHash is the zero value hash (all zeros). It is defined as a convenience. +var zeroHash chainhash.Hash + +// newPeerMsg signifies a newly connected peer to the block handler. +type newPeerMsg struct { + peer *serverPeer +} + +// blockMsg packages a bitcoin block message and the peer it came from together +// so the block handler has access to that information. +type blockMsg struct { + block *btcutil.Block + peer *serverPeer +} + +// invMsg packages a bitcoin inv message and the peer it came from together +// so the block handler has access to that information. +type invMsg struct { + inv *wire.MsgInv + peer *serverPeer +} + +// headersMsg packages a bitcoin headers message and the peer it came from +// together so the block handler has access to that information. +type headersMsg struct { + headers *wire.MsgHeaders + peer *serverPeer +} + +// donePeerMsg signifies a newly disconnected peer to the block handler. +type donePeerMsg struct { + peer *serverPeer +} + +// txMsg packages a bitcoin tx message and the peer it came from together +// so the block handler has access to that information. +type txMsg struct { + tx *btcutil.Tx + peer *serverPeer +} + +// getSyncPeerMsg is a message type to be sent across the message channel for +// retrieving the current sync peer. +type getSyncPeerMsg struct { + reply chan *serverPeer +} + +// processBlockResponse is a response sent to the reply channel of a +// processBlockMsg. +type processBlockResponse struct { + isOrphan bool + err error +} + +// processBlockMsg is a message type to be sent across the message channel +// for requested a block is processed. Note this call differs from blockMsg +// above in that blockMsg is intended for blocks that came from peers and have +// extra handling whereas this message essentially is just a concurrent safe +// way to call ProcessBlock on the internal block chain instance. +type processBlockMsg struct { + block *btcutil.Block + flags blockchain.BehaviorFlags + reply chan processBlockResponse +} + +// isCurrentMsg is a message type to be sent across the message channel for +// requesting whether or not the block manager believes it is synced with +// the currently connected peers. +type isCurrentMsg struct { + reply chan bool +} + +// headerNode is used as a node in a list of headers that are linked together +// between checkpoints. +type headerNode struct { + height int32 + header *wire.BlockHeader +} + +// blockManager provides a concurrency safe block manager for handling all +// incoming blocks. +type blockManager struct { + server *ChainService + started int32 + shutdown int32 + requestedBlocks map[chainhash.Hash]struct{} + progressLogger *blockProgressLogger + syncPeer *serverPeer + msgChan chan interface{} + wg sync.WaitGroup + quit chan struct{} + + headerList *list.List + startHeader *list.Element + nextCheckpoint *chaincfg.Checkpoint + + minRetargetTimespan int64 // target timespan / adjustment factor + maxRetargetTimespan int64 // target timespan * adjustment factor + blocksPerRetarget int32 // target timespan / target time per block +} + +// newBlockManager returns a new bitcoin block manager. +// Use Start to begin processing asynchronous block and inv updates. +func newBlockManager(s *ChainService) (*blockManager, error) { + targetTimespan := int64(s.chainParams.TargetTimespan / time.Second) + targetTimePerBlock := int64(s.chainParams.TargetTimePerBlock / time.Second) + adjustmentFactor := s.chainParams.RetargetAdjustmentFactor + + bm := blockManager{ + server: s, + requestedBlocks: make(map[chainhash.Hash]struct{}), + progressLogger: newBlockProgressLogger("Processed", log), + msgChan: make(chan interface{}, MaxPeers*3), + headerList: list.New(), + quit: make(chan struct{}), + blocksPerRetarget: int32(targetTimespan / targetTimePerBlock), + minRetargetTimespan: targetTimespan / adjustmentFactor, + maxRetargetTimespan: targetTimespan * adjustmentFactor, + } + + // Initialize the next checkpoint based on the current height. + header, height, err := s.LatestBlock() + if err != nil { + return nil, err + } + bm.nextCheckpoint = bm.findNextHeaderCheckpoint(int32(height)) + bm.resetHeaderState(&header, int32(height)) + + return &bm, nil +} + +// Start begins the core block handler which processes block and inv messages. +func (b *blockManager) Start() { + // Already started? + if atomic.AddInt32(&b.started, 1) != 1 { + return + } + + log.Trace("Starting block manager") + b.wg.Add(1) + go b.blockHandler() +} + +// Stop gracefully shuts down the block manager by stopping all asynchronous +// handlers and waiting for them to finish. +func (b *blockManager) Stop() error { + if atomic.AddInt32(&b.shutdown, 1) != 1 { + log.Warnf("Block manager is already in the process of " + + "shutting down") + return nil + } + + log.Infof("Block manager shutting down") + close(b.quit) + b.wg.Wait() + return nil +} + +// NewPeer informs the block manager of a newly active peer. +func (b *blockManager) NewPeer(sp *serverPeer) { + // Ignore if we are shutting down. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + b.msgChan <- &newPeerMsg{peer: sp} +} + +// handleNewPeerMsg deals with new peers that have signalled they may +// be considered as a sync peer (they have already successfully negotiated). It +// also starts syncing if needed. It is invoked from the syncHandler goroutine. +func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *serverPeer) { + // Ignore if in the process of shutting down. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + + log.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) + + // Ignore the peer if it's not a sync candidate. + if !b.isSyncCandidate(sp) { + return + } + + // Add the peer as a candidate to sync from. + peers.PushBack(sp) + + // Start syncing by choosing the best candidate if needed. + b.startSync(peers) +} + +// DonePeer informs the blockmanager that a peer has disconnected. +func (b *blockManager) DonePeer(sp *serverPeer) { + // Ignore if we are shutting down. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + + b.msgChan <- &donePeerMsg{peer: sp} +} + +// handleDonePeerMsg deals with peers that have signalled they are done. It +// removes the peer as a candidate for syncing and in the case where it was +// the current sync peer, attempts to select a new best peer to sync from. It +// is invoked from the syncHandler goroutine. +func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) { + // Remove the peer from the list of candidate peers. + for e := peers.Front(); e != nil; e = e.Next() { + if e.Value == sp { + peers.Remove(e) + break + } + } + + log.Infof("Lost peer %s", sp) + + // Remove requested blocks from the global map so that they will be + // fetched from elsewhere next time we get an inv. + // TODO: we could possibly here check which peers have these blocks + // and request them now to speed things up a little. + for k := range sp.requestedBlocks { + delete(b.requestedBlocks, k) + } + + // Attempt to find a new peer to sync from if the quitting peer is the + // sync peer. Also, reset the headers-first state if in headers-first + // mode so + if b.syncPeer != nil && b.syncPeer == sp { + b.syncPeer = nil + header, height, err := b.server.LatestBlock() + if err != nil { + return + } + b.resetHeaderState(&header, int32(height)) + b.startSync(peers) + } +} + +// blockHandler is the main handler for the block manager. It must be run +// as a goroutine. It processes block and inv messages in a separate goroutine +// from the peer handlers so the block (MsgBlock) messages are handled by a +// single thread without needing to lock memory data structures. This is +// important because the block manager controls which blocks are needed and how +// the fetching should proceed. +func (b *blockManager) blockHandler() { + candidatePeers := list.New() +out: + for { + select { + case m := <-b.msgChan: + switch msg := m.(type) { + case *newPeerMsg: + b.handleNewPeerMsg(candidatePeers, msg.peer) + + /*case *blockMsg: + b.handleBlockMsg(msg) + msg.peer.blockProcessed <- struct{}{}*/ + + case *invMsg: + b.handleInvMsg(msg) + + case *headersMsg: + b.handleHeadersMsg(msg) + + case *donePeerMsg: + b.handleDonePeerMsg(candidatePeers, msg.peer) + + case getSyncPeerMsg: + msg.reply <- b.syncPeer + + /*case processBlockMsg: + _, isOrphan, err := b.chain.ProcessBlock( + msg.block, msg.flags) + if err != nil { + msg.reply <- processBlockResponse{ + isOrphan: false, + err: err, + } + } + + msg.reply <- processBlockResponse{ + isOrphan: isOrphan, + err: nil, + }*/ + + case isCurrentMsg: + msg.reply <- b.current() + + default: + log.Warnf("Invalid message type in block "+ + "handler: %T", msg) + } + + case <-b.quit: + break out + } + } + + b.wg.Done() + log.Trace("Block handler done") +} + +// isSyncCandidate returns whether or not the peer is a candidate to consider +// syncing from. +func (b *blockManager) isSyncCandidate(sp *serverPeer) bool { + // The peer is not a candidate for sync if it's not a full node. + return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork +} + +// findNextHeaderCheckpoint returns the next checkpoint after the passed height. +// It returns nil when there is not one either because the height is already +// later than the final checkpoint or some other reason such as disabled +// checkpoints. +func (b *blockManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint { + // There is no next checkpoint if checkpoints are disabled or there are + // none for this current network. + checkpoints := b.server.chainParams.Checkpoints + if len(checkpoints) == 0 { + return nil + } + + // There is no next checkpoint if the height is already after the final + // checkpoint. + finalCheckpoint := &checkpoints[len(checkpoints)-1] + if height >= finalCheckpoint.Height { + return nil + } + + // Find the next checkpoint. + nextCheckpoint := finalCheckpoint + for i := len(checkpoints) - 2; i >= 0; i-- { + if height >= checkpoints[i].Height { + break + } + nextCheckpoint = &checkpoints[i] + } + return nextCheckpoint +} + +// resetHeaderState sets the headers-first mode state to values appropriate for +// syncing from a new peer. +func (b *blockManager) resetHeaderState(newestHeader *wire.BlockHeader, + newestHeight int32) { + b.headerList.Init() + b.startHeader = nil + + // Add an entry for the latest known block into the header pool. + // This allows the next downloaded header to prove it links to the chain + // properly. + node := headerNode{header: newestHeader, height: newestHeight} + b.headerList.PushBack(&node) +} + +// startSync will choose the best peer among the available candidate peers to +// download/sync the blockchain from. When syncing is already running, it +// simply returns. It also examines the candidates for any which are no longer +// candidates and removes them as needed. +func (b *blockManager) startSync(peers *list.List) { + // Return now if we're already syncing. + if b.syncPeer != nil { + return + } + + best, err := b.server.BestSnapshot() + if err != nil { + log.Errorf("Failed to get hash and height for the "+ + "latest block: %v", err) + return + } + var bestPeer *serverPeer + var enext *list.Element + for e := peers.Front(); e != nil; e = enext { + enext = e.Next() + sp := e.Value.(*serverPeer) + + // Remove sync candidate peers that are no longer candidates due + // to passing their latest known block. NOTE: The < is + // intentional as opposed to <=. While techcnically the peer + // doesn't have a later block when it's equal, it will likely + // have one soon so it is a reasonable choice. It also allows + // the case where both are at 0 such as during regression test. + if sp.LastBlock() < best.Height { + peers.Remove(e) + continue + } + + // TODO: Use a better algorithm to choose the best peer. + // For now, just pick the candidate with the highest last block. + if bestPeer == nil || sp.LastBlock() > bestPeer.LastBlock() { + bestPeer = sp + } + } + + // Start syncing from the best peer if one was selected. + if bestPeer != nil { + // Clear the requestedBlocks if the sync peer changes, otherwise + // we may ignore blocks we need that the last sync peer failed + // to send. + b.requestedBlocks = make(map[chainhash.Hash]struct{}) + + locator, err := b.server.LatestBlockLocator() + if err != nil { + log.Errorf("Failed to get block locator for the "+ + "latest block: %v", err) + return + } + + log.Infof("Syncing to block height %d from peer %v", + bestPeer.LastBlock(), bestPeer.Addr()) + + // When the current height is less than a known checkpoint we + // can use block headers to learn about which blocks comprise + // the chain up to the checkpoint and perform less validation + // for them. This is possible since each header contains the + // hash of the previous header and a merkle root. Therefore if + // we validate all of the received headers link together + // properly and the checkpoint hashes match, we can be sure the + // hashes for the blocks in between are accurate. Further, once + // the full blocks are downloaded, the merkle root is computed + // and compared against the value in the header which proves the + // full block hasn't been tampered with. + // + // Once we have passed the final checkpoint, or checkpoints are + // disabled, use standard inv messages learn about the blocks + // and fully validate them. Finally, regression test mode does + // not support the headers-first approach so do normal block + // downloads when in regression test mode. + b.syncPeer = bestPeer + if b.nextCheckpoint != nil && + best.Height < b.nextCheckpoint.Height { + + b.syncPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + log.Infof("Downloading headers for blocks %d to "+ + "%d from peer %s", best.Height+1, + b.nextCheckpoint.Height, bestPeer.Addr()) + // This will get adjusted when we process headers if + // we request more headers than the peer is willing to + // give us in one message. + } else { + b.syncPeer.PushGetBlocksMsg(locator, &zeroHash) + } + } else { + log.Warnf("No sync peer candidates available") + } +} + +// current returns true if we believe we are synced with our peers, false if we +// still have blocks to check +func (b *blockManager) current() bool { + // Figure out the latest block we know. + header, height, err := b.server.LatestBlock() + if err != nil { + return false + } + + // There is no last checkpoint if checkpoints are disabled or there are + // none for this current network. + checkpoints := b.server.chainParams.Checkpoints + if len(checkpoints) != 0 { + // We aren't current if the newest block we know of isn't ahead + // of all checkpoints. + if checkpoints[len(checkpoints)-1].Height >= int32(height) { + return false + } + } + + // If we have a syncPeer and are below the block we are syncing to, we + // are not current. + if b.syncPeer != nil && int32(height) < b.syncPeer.LastBlock() { + return false + } + + // If our time source (median times of all the connected peers) is at + // least 24 hours ahead of our best known block, we aren't current. + minus24Hours := b.server.timeSource.AdjustedTime().Add(-24 * time.Hour) + return !header.Timestamp.Before(minus24Hours) +} + +// IsCurrent returns whether or not the block manager believes it is synced with +// the connected peers. +func (b *blockManager) IsCurrent() bool { + reply := make(chan bool) + b.msgChan <- isCurrentMsg{reply: reply} + return <-reply +} + +// QueueInv adds the passed inv message and peer to the block handling queue. +func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *serverPeer) { + // No channel handling here because peers do not need to block on inv + // messages. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + + b.msgChan <- &invMsg{inv: inv, peer: sp} +} + +// handleInvMsg handles inv messages from all peers. +// We examine the inventory advertised by the remote peer and act accordingly. +func (b *blockManager) handleInvMsg(imsg *invMsg) { + // Attempt to find the final block in the inventory list. There may + // not be one. + lastBlock := -1 + invVects := imsg.inv.InvList + for i := len(invVects) - 1; i >= 0; i-- { + if invVects[i].Type == wire.InvTypeBlock { + lastBlock = i + break + } + } + + // If this inv contains a block announcement, and this isn't coming from + // our current sync peer or we're current, then update the last + // announced block for this peer. We'll use this information later to + // update the heights of peers based on blocks we've accepted that they + // previously announced. + if lastBlock != -1 && (imsg.peer != b.syncPeer || b.current()) { + imsg.peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) + } + + // Ignore invs from peers that aren't the sync if we are not current. + // Helps prevent dealing with orphans. + if imsg.peer != b.syncPeer && !b.current() { + return + } + + // If our chain is current and a peer announces a block we already + // know of, then update their current block height. + if lastBlock != -1 && b.current() { + _, blkHeight, err := b.server.GetBlockByHash(invVects[lastBlock].Hash) + if err == nil { + imsg.peer.UpdateLastBlockHeight(int32(blkHeight)) + } + } + + // Add blocks to the cache of known inventory for the peer. + for _, iv := range invVects { + if iv.Type == wire.InvTypeBlock { + imsg.peer.AddKnownInventory(iv) + } + } + + // If this is the sync peer and we're not current, get the headers + // for the announced blocks and update the last announced block. + if lastBlock != -1 && imsg.peer == b.syncPeer && !b.current() { + // Make a locator starting from the latest known header we've + // processed. + locator := make(blockchain.BlockLocator, 0, + wire.MaxBlockLocatorsPerMsg) + lastHash := b.headerList.Back().Value.(*headerNode).header.BlockHash() + locator = append(locator, &lastHash) + // Add locator from the database as backup. + knownLocator, err := b.server.LatestBlockLocator() + if err == nil { + locator = append(locator, knownLocator...) + } + // Get headers based on locator. + b.syncPeer.PushGetHeadersMsg(locator, &invVects[lastBlock].Hash) + } +} + +// QueueHeaders adds the passed headers message and peer to the block handling +// queue. +func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, sp *serverPeer) { + // No channel handling here because peers do not need to block on + // headers messages. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + + b.msgChan <- &headersMsg{headers: headers, peer: sp} +} + +// handleHeadersMsg handles headers messages from all peers. +func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { + msg := hmsg.headers + numHeaders := len(msg.Headers) + + // Nothing to do for an empty headers message. + if numHeaders == 0 { + return + } + + // Process all of the received headers ensuring each one connects to the + // previous and that checkpoints match. + receivedCheckpoint := false + var finalHash *chainhash.Hash + for _, blockHeader := range msg.Headers { + blockHash := blockHeader.BlockHash() + finalHash = &blockHash + + // Ensure there is a previous header to compare against. + prevNodeEl := b.headerList.Back() + if prevNodeEl == nil { + log.Warnf("Header list does not contain a previous" + + "element as expected -- disconnecting peer") + hmsg.peer.Disconnect() + return + } + + // Ensure the header properly connects to the previous one and + // the proof of work is good, and add it to the list of headers. + node := headerNode{header: blockHeader} + prevNode := prevNodeEl.Value.(*headerNode) + prevHash := prevNode.header.BlockHash() + if prevHash.IsEqual(&blockHeader.PrevBlock) { + diff, err := b.calcNextRequiredDifficulty( + blockHeader.Timestamp) + if err != nil { + log.Warnf("Unable to calculate next difficulty"+ + ": %v -- disconnecting peer", err) + hmsg.peer.Disconnect() + return + } + stubBlock := btcutil.NewBlock(&wire.MsgBlock{ + Header: *blockHeader, + }) + err = blockchain.CheckProofOfWork(stubBlock, + blockchain.CompactToBig(diff)) + if err != nil { + log.Warnf("Received header doesn't match "+ + "required difficulty: %v -- "+ + "disconnecting peer", err) + hmsg.peer.Disconnect() + return + } + node.height = prevNode.height + 1 + err = b.server.putBlock(*blockHeader, + uint32(node.height)) + if err != nil { + log.Criticalf("Couldn't write block to "+ + "database: %v", err) + } + err = b.server.putMaxBlockHeight(uint32(node.height)) + if err != nil { + log.Criticalf("Couldn't write max block height"+ + " to database: %v", err) + } + e := b.headerList.PushBack(&node) + if b.startHeader == nil { + b.startHeader = e + } + } else { + log.Warnf("Received block header that does not "+ + "properly connect to the chain from peer %s "+ + "-- disconnecting", hmsg.peer.Addr()) + hmsg.peer.Disconnect() + return + } + + // Verify the header at the next checkpoint height matches. + if b.nextCheckpoint != nil && node.height == b.nextCheckpoint.Height { + nodeHash := node.header.BlockHash() + if nodeHash.IsEqual(b.nextCheckpoint.Hash) { + receivedCheckpoint = true + log.Infof("Verified downloaded block "+ + "header against checkpoint at height "+ + "%d/hash %s", node.height, nodeHash) + } else { + log.Warnf("Block header at height %d/hash "+ + "%s from peer %s does NOT match "+ + "expected checkpoint hash of %s -- "+ + "disconnecting", node.height, + nodeHash, hmsg.peer.Addr(), + b.nextCheckpoint.Hash) + hmsg.peer.Disconnect() + return + } + break + } + } + + // When this header is a checkpoint, switch to fetching the blocks for + // all of the headers since the last checkpoint. + if receivedCheckpoint { + // Since the first entry of the list is always the final block + // that is already in the database and is only used to ensure + // the next header links properly, it must be removed before + // fetching the blocks. + b.headerList.Remove(b.headerList.Front()) + log.Infof("Received %v block headers: Fetching blocks", + b.headerList.Len()) + b.progressLogger.SetLastLogTime(time.Now()) + //b.fetchHeaderBlocks() + return + } + + // This header is not a checkpoint, so request the next batch of + // headers starting from the latest known header and ending with the + // next checkpoint. + locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) + nextHash := zeroHash + if b.nextCheckpoint != nil { + nextHash = *b.nextCheckpoint.Hash + } + err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash) + if err != nil { + log.Warnf("Failed to send getheaders message to "+ + "peer %s: %v", hmsg.peer.Addr(), err) + return + } +} + +// calcNextRequiredDifficulty calculates the required difficulty for the block +// after the passed previous block node based on the difficulty retarget rules. +func (b *blockManager) calcNextRequiredDifficulty(newBlockTime time.Time) (uint32, error) { + + lastNodeEl := b.headerList.Back() + + // Genesis block. + if lastNodeEl == nil { + return b.server.chainParams.PowLimitBits, nil + } + + lastNode := lastNodeEl.Value.(*headerNode) + + // Return the previous block's difficulty requirements if this block + // is not at a difficulty retarget interval. + if (lastNode.height+1)%b.blocksPerRetarget != 0 { + // For networks that support it, allow special reduction of the + // required difficulty once too much time has elapsed without + // mining a block. + if b.server.chainParams.ReduceMinDifficulty { + // Return minimum difficulty when more than the desired + // amount of time has elapsed without mining a block. + reductionTime := int64( + b.server.chainParams.MinDiffReductionTime / + time.Second) + allowMinTime := lastNode.header.Timestamp.Unix() + + reductionTime + if newBlockTime.Unix() > allowMinTime { + return b.server.chainParams.PowLimitBits, nil + } + + // The block was mined within the desired timeframe, so + // return the difficulty for the last block which did + // not have the special minimum difficulty rule applied. + prevBits, err := b.findPrevTestNetDifficulty() + if err != nil { + return 0, err + } + return prevBits, nil + } + + // For the main network (or any unrecognized networks), simply + // return the previous block's difficulty requirements. + return lastNode.header.Bits, nil + } + + // Get the block node at the previous retarget (targetTimespan days + // worth of blocks). + firstNode, _, err := b.server.GetBlockByHeight( + uint32(lastNode.height + 1 - b.blocksPerRetarget)) + if err != nil { + return 0, err + } + + // Limit the amount of adjustment that can occur to the previous + // difficulty. + actualTimespan := lastNode.header.Timestamp.Unix() - + firstNode.Timestamp.Unix() + adjustedTimespan := actualTimespan + if actualTimespan < b.minRetargetTimespan { + adjustedTimespan = b.minRetargetTimespan + } else if actualTimespan > b.maxRetargetTimespan { + adjustedTimespan = b.maxRetargetTimespan + } + + // Calculate new target difficulty as: + // currentDifficulty * (adjustedTimespan / targetTimespan) + // The result uses integer division which means it will be slightly + // rounded down. Bitcoind also uses integer division to calculate this + // result. + oldTarget := blockchain.CompactToBig(lastNode.header.Bits) + newTarget := new(big.Int).Mul(oldTarget, big.NewInt(adjustedTimespan)) + targetTimeSpan := int64(b.server.chainParams.TargetTimespan / + time.Second) + newTarget.Div(newTarget, big.NewInt(targetTimeSpan)) + + // Limit new value to the proof of work limit. + if newTarget.Cmp(b.server.chainParams.PowLimit) > 0 { + newTarget.Set(b.server.chainParams.PowLimit) + } + + // Log new target difficulty and return it. The new target logging is + // intentionally converting the bits back to a number instead of using + // newTarget since conversion to the compact representation loses + // precision. + newTargetBits := blockchain.BigToCompact(newTarget) + log.Debugf("Difficulty retarget at block height %d", lastNode.height+1) + log.Debugf("Old target %08x (%064x)", lastNode.header.Bits, oldTarget) + log.Debugf("New target %08x (%064x)", newTargetBits, + blockchain.CompactToBig(newTargetBits)) + log.Debugf("Actual timespan %v, adjusted timespan %v, target timespan %v", + time.Duration(actualTimespan)*time.Second, + time.Duration(adjustedTimespan)*time.Second, + b.server.chainParams.TargetTimespan) + + return newTargetBits, nil +} + +// findPrevTestNetDifficulty returns the difficulty of the previous block which +// did not have the special testnet minimum difficulty rule applied. +func (b *blockManager) findPrevTestNetDifficulty() (uint32, error) { + startNodeEl := b.headerList.Back() + + // Genesis block. + if startNodeEl == nil { + return b.server.chainParams.PowLimitBits, nil + } + + startNode := startNodeEl.Value.(*headerNode) + + // Search backwards through the chain for the last block without + // the special rule applied. + iterEl := startNodeEl + iterNode := startNode.header + iterHeight := startNode.height + for iterNode != nil && iterHeight%b.blocksPerRetarget != 0 && + iterNode.Bits == b.server.chainParams.PowLimitBits { + + // Get the previous block node. This function is used over + // simply accessing iterNode.parent directly as it will + // dynamically create previous block nodes as needed. This + // helps allow only the pieces of the chain that are needed + // to remain in memory. + iterHeight-- + el := iterEl.Prev() + if el != nil { + iterNode = el.Value.(*headerNode).header + } else { + node, _, err := b.server.GetBlockByHeight(uint32(iterHeight)) + if err != nil { + log.Errorf("GetBlockByHeight: %v", err) + return 0, err + } + iterNode = &node + } + } + + // Return the found difficulty or the minimum difficulty if no + // appropriate block was found. + lastBits := b.server.chainParams.PowLimitBits + if iterNode != nil { + lastBits = iterNode.Bits + } + return lastBits, nil +} + +/* +import ( + "container/list" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/database" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" +) + +// handleBlockMsg handles block messages from all peers. +func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { + // If we didn't ask for this block then the peer is misbehaving. + blockHash := bmsg.block.Hash() + if _, exists := bmsg.peer.requestedBlocks[*blockHash]; !exists { + log.Warnf("Got unrequested block %v from %s -- "+ + "disconnecting", blockHash, bmsg.peer.Addr()) + bmsg.peer.Disconnect() + return + } + + // When in headers-first mode, if the block matches the hash of the + // first header in the list of headers that are being fetched, it's + // eligible for less validation since the headers have already been + // verified to link together and are valid up to the next checkpoint. + // Also, remove the list entry for all blocks except the checkpoint + // since it is needed to verify the next round of headers links + // properly. + isCheckpointBlock := false + behaviorFlags := blockchain.BFNone + firstNodeEl := b.headerList.Front() + if firstNodeEl != nil { + firstNode := firstNodeEl.Value.(*headerNode) + if blockHash.IsEqual(firstNode.hash) { + behaviorFlags |= blockchain.BFFastAdd + if firstNode.hash.IsEqual(b.nextCheckpoint.Hash) { + isCheckpointBlock = true + } else { + b.headerList.Remove(firstNodeEl) + } + } + } + + // Remove block from request maps. Either chain will know about it and + // so we shouldn't have any more instances of trying to fetch it, or we + // will fail the insert and thus we'll retry next time we get an inv. + delete(bmsg.peer.requestedBlocks, *blockHash) + delete(b.requestedBlocks, *blockHash) + + // Process the block to include validation, best chain selection, orphan + // handling, etc. + + _, isOrphan, err := b.chain.ProcessBlock(bmsg.block, behaviorFlags) + if err != nil { + // When the error is a rule error, it means the block was simply + // rejected as opposed to something actually going wrong, so log + // it as such. Otherwise, something really did go wrong, so log + // it as an actual error. + if _, ok := err.(blockchain.RuleError); ok { + log.Infof("Rejected block %v from %s: %v", blockHash, + bmsg.peer, err) + } else { + log.Errorf("Failed to process block %v: %v", + blockHash, err) + } + if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode == + database.ErrCorruption { + panic(dbErr) + } + + // Convert the error into an appropriate reject message and + // send it. + code, reason := mempool.ErrToRejectErr(err) + bmsg.peer.PushRejectMsg(wire.CmdBlock, code, reason, + blockHash, false) + return + } + + // Meta-data about the new block this peer is reporting. We use this + // below to update this peer's lastest block height and the heights of + // other peers based on their last announced block hash. This allows us + // to dynamically update the block heights of peers, avoiding stale + // heights when looking for a new sync peer. Upon acceptance of a block + // or recognition of an orphan, we also use this information to update + // the block heights over other peers who's invs may have been ignored + // if we are actively syncing while the chain is not yet current or + // who may have lost the lock announcment race. + var heightUpdate int32 + var blkHashUpdate *chainhash.Hash + + // Request the parents for the orphan block from the peer that sent it. + if isOrphan { + // We've just received an orphan block from a peer. In order + // to update the height of the peer, we try to extract the + // block height from the scriptSig of the coinbase transaction. + // Extraction is only attempted if the block's version is + // high enough (ver 2+). + header := &bmsg.block.MsgBlock().Header + if blockchain.ShouldHaveSerializedBlockHeight(header) { + coinbaseTx := bmsg.block.Transactions()[0] + cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx) + if err != nil { + log.Warnf("Unable to extract height from "+ + "coinbase tx: %v", err) + } else { + log.Debugf("Extracted height of %v from "+ + "orphan block", cbHeight) + heightUpdate = cbHeight + blkHashUpdate = blockHash + } + } + + orphanRoot := b.chain.GetOrphanRoot(blockHash) + locator, err := b.chain.LatestBlockLocator() + if err != nil { + log.Warnf("Failed to get block locator for the "+ + "latest block: %v", err) + } else { + bmsg.peer.PushGetBlocksMsg(locator, orphanRoot) + } + } else { + // When the block is not an orphan, log information about it and + // update the chain state. + b.progressLogger.LogBlockHeight(bmsg.block) + + // Update this peer's latest block height, for future + // potential sync node candidacy. + best := b.chain.BestSnapshot() + heightUpdate = best.Height + blkHashUpdate = &best.Hash + + // Clear the rejected transactions. + b.rejectedTxns = make(map[chainhash.Hash]struct{}) + + // Allow any clients performing long polling via the + // getblocktemplate RPC to be notified when the new block causes + // their old block template to become stale. + rpcServer := b.server.rpcServer + if rpcServer != nil { + rpcServer.gbtWorkState.NotifyBlockConnected(blockHash) + } + } + + // Update the block height for this peer. But only send a message to + // the server for updating peer heights if this is an orphan or our + // chain is "current". This avoids sending a spammy amount of messages + // if we're syncing the chain from scratch. + if blkHashUpdate != nil && heightUpdate != 0 { + bmsg.peer.UpdateLastBlockHeight(heightUpdate) + if isOrphan || b.current() { + go b.server.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer) + } + } + + // Nothing more to do if we aren't in headers-first mode. + if !b.headersFirstMode { + return + } + + // This is headers-first mode, so if the block is not a checkpoint + // request more blocks using the header list when the request queue is + // getting short. + if !isCheckpointBlock { + if b.startHeader != nil && + len(bmsg.peer.requestedBlocks) < minInFlightBlocks { + b.fetchHeaderBlocks() + } + return + } + + // This is headers-first mode and the block is a checkpoint. When + // there is a next checkpoint, get the next round of headers by asking + // for headers starting from the block after this one up to the next + // checkpoint. + prevHeight := b.nextCheckpoint.Height + prevHash := b.nextCheckpoint.Hash + b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) + if b.nextCheckpoint != nil { + locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) + err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + if err != nil { + log.Warnf("Failed to send getheaders message to "+ + "peer %s: %v", bmsg.peer.Addr(), err) + return + } + log.Infof("Downloading headers for blocks %d to %d from "+ + "peer %s", prevHeight+1, b.nextCheckpoint.Height, + b.syncPeer.Addr()) + return + } + + // This is headers-first mode, the block is a checkpoint, and there are + // no more checkpoints, so switch to normal mode by requesting blocks + // from the block after this one up to the end of the chain (zero hash). + b.headersFirstMode = false + b.headerList.Init() + log.Infof("Reached the final checkpoint -- switching to normal mode") + locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) + err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) + if err != nil { + log.Warnf("Failed to send getblocks message to peer %s: %v", + bmsg.peer.Addr(), err) + return + } +} + +// fetchHeaderBlocks creates and sends a request to the syncPeer for the next +// list of blocks to be downloaded based on the current list of headers. +func (b *blockManager) fetchHeaderBlocks() { + // Nothing to do if there is no start header. + if b.startHeader == nil { + log.Warnf("fetchHeaderBlocks called with no start header") + return + } + + // Build up a getdata request for the list of blocks the headers + // describe. The size hint will be limited to wire.MaxInvPerMsg by + // the function, so no need to double check it here. + gdmsg := wire.NewMsgGetDataSizeHint(uint(b.headerList.Len())) + numRequested := 0 + for e := b.startHeader; e != nil; e = e.Next() { + node, ok := e.Value.(*headerNode) + if !ok { + log.Warn("Header list node type is not a headerNode") + continue + } + + iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) + haveInv, err := b.haveInventory(iv) + if err != nil { + log.Warnf("Unexpected failure when checking for "+ + "existing inventory during header block "+ + "fetch: %v", err) + } + if !haveInv { + b.requestedBlocks[*node.hash] = struct{}{} + b.syncPeer.requestedBlocks[*node.hash] = struct{}{} + gdmsg.AddInvVect(iv) + numRequested++ + } + b.startHeader = e.Next() + if numRequested >= wire.MaxInvPerMsg { + break + } + } + if len(gdmsg.InvList) > 0 { + b.syncPeer.QueueMessage(gdmsg, nil) + } +} + +// haveInventory returns whether or not the inventory represented by the passed +// inventory vector is known. This includes checking all of the various places +// inventory can be when it is in different states such as blocks that are part +// of the main chain, on a side chain, in the orphan pool, and transactions that +// are in the memory pool (either the main pool or orphan pool). +func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { + switch invVect.Type { + case wire.InvTypeBlock: + // Ask chain if the block is known to it in any form (main + // chain, side chain, or orphan). + return b.chain.HaveBlock(&invVect.Hash) + + case wire.InvTypeTx: + // Ask the transaction memory pool if the transaction is known + // to it in any form (main pool or orphan). + if b.server.txMemPool.HaveTransaction(&invVect.Hash) { + return true, nil + } + + // Check if the transaction exists from the point of view of the + // end of the main chain. + entry, err := b.chain.FetchUtxoEntry(&invVect.Hash) + if err != nil { + return false, err + } + return entry != nil && !entry.IsFullySpent(), nil + } + + // The requested inventory is is an unsupported type, so just claim + // it is known to avoid requesting it. + return true, nil +} + +// limitMap is a helper function for maps that require a maximum limit by +// evicting a random transaction if adding a new value would cause it to +// overflow the maximum allowed. +func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { + if len(m)+1 > limit { + // Remove a random entry from the map. For most compilers, Go's + // range statement iterates starting at a random item although + // that is not 100% guaranteed by the spec. The iteration order + // is not important here because an adversary would have to be + // able to pull off preimage attacks on the hashing function in + // order to target eviction of specific entries anyways. + for txHash := range m { + delete(m, txHash) + return + } + } +} + +// handleNotifyMsg handles notifications from blockchain. It does things such +// as request orphan block parents and relay accepted blocks to connected peers. +func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { + switch notification.Type { + // A block has been accepted into the block chain. Relay it to other + // peers. + case blockchain.NTBlockAccepted: + // Don't relay if we are not current. Other peers that are + // current should already know about it. + if !b.current() { + return + } + + block, ok := notification.Data.(*btcutil.Block) + if !ok { + log.Warnf("Chain accepted notification is not a block.") + break + } + + // Generate the inventory vector and relay it. + //iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) + //b.server.RelayInventory(iv, block.MsgBlock().Header) + + // A block has been connected to the main block chain. + case blockchain.NTBlockConnected: + block, ok := notification.Data.(*btcutil.Block) + if !ok { + log.Warnf("Chain connected notification is not a block.") + break + } + + // Remove all of the transactions (except the coinbase) in the + // connected block from the transaction pool. Secondly, remove any + // transactions which are now double spends as a result of these + // new transactions. Finally, remove any transaction that is + // no longer an orphan. Transactions which depend on a confirmed + // transaction are NOT removed recursively because they are still + // valid. + for _, tx := range block.Transactions()[1:] { + b.server.txMemPool.RemoveTransaction(tx, false) + b.server.txMemPool.RemoveDoubleSpends(tx) + b.server.txMemPool.RemoveOrphan(tx) + acceptedTxs := b.server.txMemPool.ProcessOrphans(tx) + b.server.AnnounceNewTransactions(acceptedTxs) + } + + if r := b.server.rpcServer; r != nil { + // Now that this block is in the blockchain we can mark + // all the transactions (except the coinbase) as no + // longer needing rebroadcasting. + for _, tx := range block.Transactions()[1:] { + iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) + b.server.RemoveRebroadcastInventory(iv) + } + + // Notify registered websocket clients of incoming block. + r.ntfnMgr.NotifyBlockConnected(block) + } + + // A block has been disconnected from the main block chain. + case blockchain.NTBlockDisconnected: + block, ok := notification.Data.(*btcutil.Block) + if !ok { + log.Warnf("Chain disconnected notification is not a block.") + break + } + + // Reinsert all of the transactions (except the coinbase) into + // the transaction pool. + for _, tx := range block.Transactions()[1:] { + _, _, err := b.server.txMemPool.MaybeAcceptTransaction(tx, + false, false) + if err != nil { + // Remove the transaction and all transactions + // that depend on it if it wasn't accepted into + // the transaction pool. + b.server.txMemPool.RemoveTransaction(tx, true) + } + } + + // Notify registered websocket clients. + if r := b.server.rpcServer; r != nil { + r.ntfnMgr.NotifyBlockDisconnected(block) + } + } +} + +// QueueBlock adds the passed block message and peer to the block handling queue. +func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer) { + // Don't accept more blocks if we're shutting down. + if atomic.LoadInt32(&b.shutdown) != 0 { + sp.blockProcessed <- struct{}{} + return + } + + b.msgChan <- &blockMsg{block: block, peer: sp} +} + +// SyncPeer returns the current sync peer. +func (b *blockManager) SyncPeer() *serverPeer { + reply := make(chan *serverPeer) + b.msgChan <- getSyncPeerMsg{reply: reply} + return <-reply +} + +// ProcessBlock makes use of ProcessBlock on an internal instance of a block +// chain. It is funneled through the block manager since btcchain is not safe +// for concurrent access. +func (b *blockManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { + reply := make(chan processBlockResponse, 1) + b.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} + response := <-reply + return response.isOrphan, response.err +} + + +// checkpointSorter implements sort.Interface to allow a slice of checkpoints to +// be sorted. +type checkpointSorter []chaincfg.Checkpoint + +// Len returns the number of checkpoints in the slice. It is part of the +// sort.Interface implementation. +func (s checkpointSorter) Len() int { + return len(s) +} + +// Swap swaps the checkpoints at the passed indices. It is part of the +// sort.Interface implementation. +func (s checkpointSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Less returns whether the checkpoint with index i should sort before the +// checkpoint with index j. It is part of the sort.Interface implementation. +func (s checkpointSorter) Less(i, j int) bool { + return s[i].Height < s[j].Height +} + +// mergeCheckpoints returns two slices of checkpoints merged into one slice +// such that the checkpoints are sorted by height. In the case the additional +// checkpoints contain a checkpoint with the same height as a checkpoint in the +// default checkpoints, the additional checkpoint will take precedence and +// overwrite the default one. +func mergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint { + // Create a map of the additional checkpoints to remove duplicates while + // leaving the most recently-specified checkpoint. + extra := make(map[int32]chaincfg.Checkpoint) + for _, checkpoint := range additional { + extra[checkpoint.Height] = checkpoint + } + + // Add all default checkpoints that do not have an override in the + // additional checkpoints. + numDefault := len(defaultCheckpoints) + checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra)) + for _, checkpoint := range defaultCheckpoints { + if _, exists := extra[checkpoint.Height]; !exists { + checkpoints = append(checkpoints, checkpoint) + } + } + + // Append the additional checkpoints and return the sorted results. + for _, checkpoint := range extra { + checkpoints = append(checkpoints, checkpoint) + } + sort.Sort(checkpointSorter(checkpoints)) + return checkpoints +} + +// removeRegressionDB removes the existing regression test database if running +// in regression test mode and it already exists. +func removeRegressionDB(dbPath string) error { + // Don't do anything if not in regression test mode. + if !cfg.RegressionTest { + return nil + } + + // Remove the old regression test database if it already exists. + fi, err := os.Stat(dbPath) + if err == nil { + btcdLog.Infof("Removing regression test database from '%s'", dbPath) + if fi.IsDir() { + err := os.RemoveAll(dbPath) + if err != nil { + return err + } + } else { + err := os.Remove(dbPath) + if err != nil { + return err + } + } + } + + return nil +} + +// dbPath returns the path to the block database given a database type. +func blockDbPath(dbType string) string { + // The database name is based on the database type. + dbName := blockDbNamePrefix + "_" + dbType + if dbType == "sqlite" { + dbName = dbName + ".db" + } + dbPath := filepath.Join(cfg.DataDir, dbName) + return dbPath +} + +// warnMultipeDBs shows a warning if multiple block database types are detected. +// This is not a situation most users want. It is handy for development however +// to support multiple side-by-side databases. +func warnMultipeDBs() { + // This is intentionally not using the known db types which depend + // on the database types compiled into the binary since we want to + // detect legacy db types as well. + dbTypes := []string{"ffldb", "leveldb", "sqlite"} + duplicateDbPaths := make([]string, 0, len(dbTypes)-1) + for _, dbType := range dbTypes { + if dbType == cfg.DbType { + continue + } + + // Store db path as a duplicate db if it exists. + dbPath := blockDbPath(dbType) + if fileExists(dbPath) { + duplicateDbPaths = append(duplicateDbPaths, dbPath) + } + } + + // Warn if there are extra databases. + if len(duplicateDbPaths) > 0 { + selectedDbPath := blockDbPath(cfg.DbType) + btcdLog.Warnf("WARNING: There are multiple block chain databases "+ + "using different database types.\nYou probably don't "+ + "want to waste disk space by having more than one.\n"+ + "Your current database is located at [%v].\nThe "+ + "additional database is located at %v", selectedDbPath, + duplicateDbPaths) + } +} + +// loadBlockDB loads (or creates when needed) the block database taking into +// account the selected database backend and returns a handle to it. It also +// contains additional logic such warning the user if there are multiple +// databases which consume space on the file system and ensuring the regression +// test database is clean when in regression test mode. +func loadBlockDB() (database.DB, error) { + // The memdb backend does not have a file path associated with it, so + // handle it uniquely. We also don't want to worry about the multiple + // database type warnings when running with the memory database. + if cfg.DbType == "memdb" { + btcdLog.Infof("Creating block database in memory.") + db, err := database.Create(cfg.DbType) + if err != nil { + return nil, err + } + return db, nil + } + + warnMultipeDBs() + + // The database name is based on the database type. + dbPath := blockDbPath(cfg.DbType) + + // The regression test is special in that it needs a clean database for + // each run, so remove it now if it already exists. + removeRegressionDB(dbPath) + + btcdLog.Infof("Loading block database from '%s'", dbPath) + db, err := database.Open(cfg.DbType, dbPath, activeNetParams.Net) + if err != nil { + // Return the error if it's not because the database doesn't + // exist. + if dbErr, ok := err.(database.Error); !ok || dbErr.ErrorCode != + database.ErrDbDoesNotExist { + + return nil, err + } + + // Create the db if it does not exist. + err = os.MkdirAll(cfg.DataDir, 0700) + if err != nil { + return nil, err + } + db, err = database.Create(cfg.DbType, dbPath, activeNetParams.Net) + if err != nil { + return nil, err + } + } + + btcdLog.Info("Block database loaded") + return db, nil +} +*/ diff --git a/spvsvc/spvchain/db.go b/spvsvc/spvchain/db.go new file mode 100644 index 0000000..5b9322b --- /dev/null +++ b/spvsvc/spvchain/db.go @@ -0,0 +1,429 @@ +package spvchain + +import ( + "bytes" + "encoding/binary" + "time" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil/gcs" + "github.com/btcsuite/btcwallet/waddrmgr" + "github.com/btcsuite/btcwallet/walletdb" +) + +const ( + // LatestDBVersion is the most recent database version. + LatestDBVersion = 1 +) + +var ( + // latestDBVersion is the most recent database version as a variable so + // the tests can change it to force errors. + latestDBVersion uint32 = LatestDBVersion +) + +// Key names for various database fields. +var ( + // Bucket names. + spvBucketName = []byte("spv") + blockHeaderBucketName = []byte("bh") + basicHeaderBucketName = []byte("bfh") + basicFilterBucketName = []byte("bf") + extHeaderBucketName = []byte("efh") + extFilterBucketName = []byte("ef") + + // Db related key names (main bucket). + dbVersionName = []byte("dbver") + dbCreateDateName = []byte("dbcreated") + maxBlockHeightName = []byte("maxblockheight") +) + +// uint32ToBytes converts a 32 bit unsigned integer into a 4-byte slice in +// little-endian order: 1 -> [1 0 0 0]. +func uint32ToBytes(number uint32) []byte { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, number) + return buf +} + +// uint64ToBytes converts a 64 bit unsigned integer into a 8-byte slice in +// little-endian order: 1 -> [1 0 0 0 0 0 0 0]. +func uint64ToBytes(number uint64) []byte { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, number) + return buf +} + +// fetchDBVersion fetches the current manager version from the database. +func fetchDBVersion(tx walletdb.Tx) (uint32, error) { + bucket := tx.RootBucket().Bucket(spvBucketName) + verBytes := bucket.Get(dbVersionName) + if verBytes == nil { + str := "required version number not stored in database" + return 0, log.Error(str) + } + version := binary.LittleEndian.Uint32(verBytes) + return version, nil +} + +// putDBVersion stores the provided version to the database. +func putDBVersion(tx walletdb.Tx, version uint32) error { + bucket := tx.RootBucket().Bucket(spvBucketName) + + verBytes := uint32ToBytes(version) + err := bucket.Put(dbVersionName, verBytes) + if err != nil { + str := "failed to store version: %v" + return log.Errorf(str, err) + } + return nil +} + +// putMaxBlockHeight stores the max block height to the database. +func putMaxBlockHeight(tx walletdb.Tx, maxBlockHeight uint32) error { + bucket := tx.RootBucket().Bucket(spvBucketName) + + maxBlockHeightBytes := uint32ToBytes(maxBlockHeight) + err := bucket.Put(maxBlockHeightName, maxBlockHeightBytes) + if err != nil { + str := "failed to store max block height: %v" + return log.Errorf(str, err) + } + return nil +} + +// putBlock stores the provided block header and height, keyed to the block +// hash, in the database. +func putBlock(tx walletdb.Tx, header wire.BlockHeader, height uint32) error { + var buf bytes.Buffer + err := header.Serialize(&buf) + if err != nil { + return err + } + _, err = buf.Write(uint32ToBytes(height)) + if err != nil { + return err + } + + bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName) + blockHash := header.BlockHash() + + err = bucket.Put(blockHash[:], buf.Bytes()) + if err != nil { + str := "failed to store SPV block info: %v" + return log.Errorf(str, err) + } + + err = bucket.Put(uint32ToBytes(height), blockHash[:]) + if err != nil { + str := "failed to store block height info: %v" + return log.Errorf(str, err) + } + + return nil +} + +// putFilter stores the provided filter, keyed to the block hash, in the +// appropriate filter bucket in the database. +func putFilter(tx walletdb.Tx, blockHash chainhash.Hash, bucketName []byte, + filter *gcs.Filter) error { + var buf bytes.Buffer + _, err := buf.Write(filter.NBytes()) + if err != nil { + return err + } + + bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(bucketName) + + err = bucket.Put(blockHash[:], buf.Bytes()) + if err != nil { + str := "failed to store filter: %v" + return log.Errorf(str, err) + } + + return nil +} + +// putBasicFilter stores the provided filter, keyed to the block hash, in the +// basic filter bucket in the database. +func putBasicFilter(tx walletdb.Tx, blockHash chainhash.Hash, + filter *gcs.Filter) error { + return putFilter(tx, blockHash, basicFilterBucketName, filter) +} + +// putExtFilter stores the provided filter, keyed to the block hash, in the +// extended filter bucket in the database. +func putExtFilter(tx walletdb.Tx, blockHash chainhash.Hash, + filter *gcs.Filter) error { + return putFilter(tx, blockHash, extFilterBucketName, filter) +} + +// putHeader stores the provided filter, keyed to the block hash, in the +// appropriate filter bucket in the database. +func putHeader(tx walletdb.Tx, blockHash chainhash.Hash, bucketName []byte, + filterTip chainhash.Hash) error { + + bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(bucketName) + + err := bucket.Put(blockHash[:], filterTip[:]) + if err != nil { + str := "failed to store filter header: %v" + return log.Errorf(str, err) + } + + return nil +} + +// putBasicHeader stores the provided filter, keyed to the block hash, in the +// basic filter bucket in the database. +func putBasicHeader(tx walletdb.Tx, blockHash chainhash.Hash, + filterTip chainhash.Hash) error { + return putHeader(tx, blockHash, basicHeaderBucketName, filterTip) +} + +// putExtHeader stores the provided filter, keyed to the block hash, in the +// extended filter bucket in the database. +func putExtHeader(tx walletdb.Tx, blockHash chainhash.Hash, + filterTip chainhash.Hash) error { + return putHeader(tx, blockHash, extHeaderBucketName, filterTip) +} + +// GetBlockByHash retrieves the block header, filter, and filter tip, based on +// the provided block hash, from the database. +func GetBlockByHash(tx walletdb.Tx, blockHash chainhash.Hash) (wire.BlockHeader, + uint32, error) { + //chainhash.Hash, chainhash.Hash, + bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName) + blockBytes := bucket.Get(blockHash[:]) + if len(blockBytes) == 0 { + str := "failed to retrieve block info for hash: %s" + return wire.BlockHeader{}, 0, log.Errorf(str, blockHash) + } + + buf := bytes.NewReader(blockBytes[:wire.MaxBlockHeaderPayload]) + var header wire.BlockHeader + err := header.Deserialize(buf) + if err != nil { + str := "failed to deserialize block header for hash: %s" + return wire.BlockHeader{}, 0, log.Errorf(str, blockHash) + } + + height := binary.LittleEndian.Uint32(blockBytes[wire.MaxBlockHeaderPayload : wire.MaxBlockHeaderPayload+4]) + + return header, height, nil +} + +// GetBlockHashByHeight retrieves the hash of a block by its height. +func GetBlockHashByHeight(tx walletdb.Tx, height uint32) (chainhash.Hash, + error) { + bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName) + var hash chainhash.Hash + hashBytes := bucket.Get(uint32ToBytes(height)) + if hashBytes == nil { + str := "no block hash for height %v" + return hash, log.Errorf(str, height) + } + hash.SetBytes(hashBytes) + return hash, nil +} + +// GetBlockByHeight retrieves a block's information by its height. +func GetBlockByHeight(tx walletdb.Tx, height uint32) (wire.BlockHeader, uint32, + error) { + // chainhash.Hash, chainhash.Hash + blockHash, err := GetBlockHashByHeight(tx, height) + if err != nil { + return wire.BlockHeader{}, 0, err + } + + return GetBlockByHash(tx, blockHash) +} + +// SyncedTo retrieves the most recent block's height and hash. +func SyncedTo(tx walletdb.Tx) (*waddrmgr.BlockStamp, error) { + header, height, err := LatestBlock(tx) + if err != nil { + return nil, err + } + var blockStamp waddrmgr.BlockStamp + blockStamp.Hash = header.BlockHash() + blockStamp.Height = int32(height) + return &blockStamp, nil +} + +// LatestBlock retrieves all the info about the latest stored block. +func LatestBlock(tx walletdb.Tx) (wire.BlockHeader, uint32, error) { + bucket := tx.RootBucket().Bucket(spvBucketName) + + maxBlockHeightBytes := bucket.Get(maxBlockHeightName) + if maxBlockHeightBytes == nil { + str := "no max block height stored" + return wire.BlockHeader{}, 0, log.Error(str) + } + + maxBlockHeight := binary.LittleEndian.Uint32(maxBlockHeightBytes) + header, height, err := GetBlockByHeight(tx, maxBlockHeight) + if err != nil { + return wire.BlockHeader{}, 0, err + } + if height != maxBlockHeight { + str := "max block height inconsistent" + return wire.BlockHeader{}, 0, log.Error(str) + } + return header, height, nil +} + +// BlockLocatorFromHash returns a block locator based on the provided hash. +func BlockLocatorFromHash(tx walletdb.Tx, hash chainhash.Hash) blockchain.BlockLocator { + locator := make(blockchain.BlockLocator, 0, wire.MaxBlockLocatorsPerMsg) + locator = append(locator, &hash) + + // If hash isn't found in DB or this is the genesis block, return + // the locator as is + _, height, err := GetBlockByHash(tx, hash) + if (err != nil) || (height == 0) { + return locator + } + + decrement := uint32(1) + for (height > 0) && (len(locator) < wire.MaxBlockLocatorsPerMsg) { + // Decrement by 1 for the first 10 blocks, then double the + // jump until we get to the genesis hash + if len(locator) > 10 { + decrement *= 2 + } + if decrement > height { + height = 0 + } else { + height -= decrement + } + blockHash, err := GetBlockHashByHeight(tx, height) + if err != nil { + return locator + } + locator = append(locator, &blockHash) + } + + return locator +} + +// createSPVNS creates the initial namespace structure needed for all of the +// SPV-related data. This includes things such as all of the buckets as well as +// the version and creation date. +func createSPVNS(namespace walletdb.Namespace, params *chaincfg.Params) error { + err := namespace.Update(func(tx walletdb.Tx) error { + rootBucket := tx.RootBucket() + spvBucket, err := rootBucket.CreateBucketIfNotExists(spvBucketName) + if err != nil { + str := "failed to create main bucket: %v" + return log.Errorf(str, err) + } + + _, err = spvBucket.CreateBucketIfNotExists(blockHeaderBucketName) + if err != nil { + str := "failed to create block header bucket: %v" + return log.Errorf(str, err) + } + + _, err = spvBucket.CreateBucketIfNotExists(basicFilterBucketName) + if err != nil { + str := "failed to create basic filter bucket: %v" + return log.Errorf(str, err) + } + + _, err = spvBucket.CreateBucketIfNotExists(basicHeaderBucketName) + if err != nil { + str := "failed to create basic header bucket: %v" + return log.Errorf(str, err) + } + + _, err = spvBucket.CreateBucketIfNotExists(extFilterBucketName) + if err != nil { + str := "failed to create extended filter bucket: %v" + return log.Errorf(str, err) + } + + _, err = spvBucket.CreateBucketIfNotExists(extHeaderBucketName) + if err != nil { + str := "failed to create extended header bucket: %v" + return log.Errorf(str, err) + } + + createDate := spvBucket.Get(dbCreateDateName) + if createDate != nil { + log.Infof("Wallet SPV namespace already created.") + return nil + } + + log.Infof("Creating wallet SPV namespace.") + + basicFilter, err := buildBasicFilter(params.GenesisBlock) + if err != nil { + return err + } + + basicFilterTip := makeHeaderForFilter(basicFilter, + params.GenesisBlock.Header.PrevBlock) + + extFilter, err := buildExtFilter(params.GenesisBlock) + if err != nil { + return err + } + + extFilterTip := makeHeaderForFilter(extFilter, + params.GenesisBlock.Header.PrevBlock) + + err = putBlock(tx, params.GenesisBlock.Header, 0) + if err != nil { + return err + } + + err = putBasicFilter(tx, *params.GenesisHash, basicFilter) + if err != nil { + return err + } + + err = putBasicHeader(tx, *params.GenesisHash, basicFilterTip) + if err != nil { + return err + } + + err = putExtFilter(tx, *params.GenesisHash, extFilter) + if err != nil { + return err + } + + err = putExtHeader(tx, *params.GenesisHash, extFilterTip) + if err != nil { + return err + } + + err = putDBVersion(tx, latestDBVersion) + if err != nil { + return err + } + + err = putMaxBlockHeight(tx, 0) + if err != nil { + return err + } + + err = spvBucket.Put(dbCreateDateName, + uint64ToBytes(uint64(time.Now().Unix()))) + if err != nil { + str := "failed to store database creation time: %v" + return log.Errorf(str, err) + } + + return nil + }) + if err != nil { + str := "failed to update database: %v" + return log.Errorf(str, err) + } + + return nil +} diff --git a/spvsvc/spvchain/filter.go b/spvsvc/spvchain/filter.go new file mode 100644 index 0000000..f47bec4 --- /dev/null +++ b/spvsvc/spvchain/filter.go @@ -0,0 +1,73 @@ +package spvchain + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil/gcs" + "github.com/btcsuite/btcutil/gcs/builder" +) + +func buildBasicFilter(block *wire.MsgBlock) (*gcs.Filter, error) { + blockHash := block.BlockHash() + b := builder.WithKeyHash(&blockHash) + _, err := b.Key() + if err != nil { + str := "failed to create filter builder: %v" + return nil, log.Errorf(str, err) + } + for i, tx := range block.Transactions { + // Skip the inputs for the coinbase transaction + if i != 0 { + for _, txIn := range tx.TxIn { + b.AddOutPoint(txIn.PreviousOutPoint) + } + } + for _, txOut := range tx.TxOut { + b.AddScript(txOut.PkScript) + } + } + f, err := b.Build() + if err != nil { + str := "failed to build filter: %v" + return nil, log.Errorf(str, err) + } + return f, nil +} + +func buildExtFilter(block *wire.MsgBlock) (*gcs.Filter, error) { + blockHash := block.BlockHash() + b := builder.WithKeyHash(&blockHash) + _, err := b.Key() + if err != nil { + str := "failed to create filter builder: %v" + return nil, log.Errorf(str, err) + } + for i, tx := range block.Transactions { + txHash := tx.TxHash() + b.AddHash(&txHash) + // Skip the inputs for the coinbase transaction + if i != 0 { + for _, txIn := range tx.TxIn { + b.AddScript(txIn.SignatureScript) + } + } + } + f, err := b.Build() + if err != nil { + str := "failed to build filter: %v" + return nil, log.Errorf(str, err) + } + return f, nil +} + +func getFilterHash(filter *gcs.Filter) chainhash.Hash { + return chainhash.HashH(filter.NBytes()) +} + +func makeHeaderForFilter(filter *gcs.Filter, prevHeader chainhash.Hash) chainhash.Hash { + filterTip := make([]byte, 2*chainhash.HashSize) + filterHash := getFilterHash(filter) + copy(filterTip, filterHash[:]) + copy(filterTip[chainhash.HashSize:], prevHeader[:]) + return chainhash.HashH(filterTip) +} diff --git a/spvsvc/spvchain/log.go b/spvsvc/spvchain/log.go new file mode 100644 index 0000000..48f1e9a --- /dev/null +++ b/spvsvc/spvchain/log.go @@ -0,0 +1,26 @@ +package spvchain + +import "github.com/btcsuite/btclog" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/spvsvc/spvchain/notifications.go b/spvsvc/spvchain/notifications.go new file mode 100644 index 0000000..2ba05c5 --- /dev/null +++ b/spvsvc/spvchain/notifications.go @@ -0,0 +1,149 @@ +// Copyright (c) 2013-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package spvchain + +import ( + "errors" + + "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/connmgr" +) + +type getConnCountMsg struct { + reply chan int32 +} + +type getPeersMsg struct { + reply chan []*serverPeer +} + +type getOutboundGroup struct { + key string + reply chan int +} + +type getAddedNodesMsg struct { + reply chan []*serverPeer +} + +type disconnectNodeMsg struct { + cmp func(*serverPeer) bool + reply chan error +} + +type connectNodeMsg struct { + addr string + permanent bool + reply chan error +} + +type removeNodeMsg struct { + cmp func(*serverPeer) bool + reply chan error +} + +// handleQuery is the central handler for all queries and commands from other +// goroutines related to peer state. +func (s *ChainService) handleQuery(state *peerState, querymsg interface{}) { + switch msg := querymsg.(type) { + case getConnCountMsg: + nconnected := int32(0) + state.forAllPeers(func(sp *serverPeer) { + if sp.Connected() { + nconnected++ + } + }) + msg.reply <- nconnected + + case getPeersMsg: + peers := make([]*serverPeer, 0, state.Count()) + state.forAllPeers(func(sp *serverPeer) { + if !sp.Connected() { + return + } + peers = append(peers, sp) + }) + msg.reply <- peers + + case connectNodeMsg: + // TODO: duplicate oneshots? + // Limit max number of total peers. + if state.Count() >= MaxPeers { + msg.reply <- errors.New("max peers reached") + return + } + for _, peer := range state.persistentPeers { + if peer.Addr() == msg.addr { + if msg.permanent { + msg.reply <- errors.New("peer already connected") + } else { + msg.reply <- errors.New("peer exists as a permanent peer") + } + return + } + } + + netAddr, err := addrStringToNetAddr(msg.addr) + if err != nil { + msg.reply <- err + return + } + + // TODO: if too many, nuke a non-perm peer. + go s.connManager.Connect(&connmgr.ConnReq{ + Addr: netAddr, + Permanent: msg.permanent, + }) + msg.reply <- nil + case removeNodeMsg: + found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) { + // Keep group counts ok since we remove from + // the list now. + state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- + }) + + if found { + msg.reply <- nil + } else { + msg.reply <- errors.New("peer not found") + } + case getOutboundGroup: + count, ok := state.outboundGroups[msg.key] + if ok { + msg.reply <- count + } else { + msg.reply <- 0 + } + // Request a list of the persistent (added) peers. + case getAddedNodesMsg: + // Respond with a slice of the relavent peers. + peers := make([]*serverPeer, 0, len(state.persistentPeers)) + for _, sp := range state.persistentPeers { + peers = append(peers, sp) + } + msg.reply <- peers + case disconnectNodeMsg: + // Check outbound peers. + found := disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) { + // Keep group counts ok since we remove from + // the list now. + state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- + }) + if found { + // If there are multiple outbound connections to the same + // ip:port, continue disconnecting them all until no such + // peers are found. + for found { + found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) { + state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- + }) + } + msg.reply <- nil + return + } + + msg.reply <- errors.New("peer not found") + } +} diff --git a/spvsvc/spvchain/spvchain.go b/spvsvc/spvchain/spvchain.go new file mode 100644 index 0000000..e5ac9c5 --- /dev/null +++ b/spvsvc/spvchain/spvchain.go @@ -0,0 +1,1373 @@ +package spvchain + +import ( + "errors" + "fmt" + "net" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/connmgr" + "github.com/btcsuite/btcd/peer" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcwallet/waddrmgr" + "github.com/btcsuite/btcwallet/wallet" + "github.com/btcsuite/btcwallet/walletdb" +) + +// These are exported variables so they can be changed by users. +var ( + // ConnectionRetryInterval is the base amount of time to wait in between + // retries when connecting to persistent peers. It is adjusted by the + // number of retries such that there is a retry backoff. + ConnectionRetryInterval = time.Second * 5 + + // UserAgentName is the user agent name and is used to help identify + // ourselves to other bitcoin peers. + UserAgentName = "spvchain" + + // UserAgentVersion is the user agent version and is used to help + // identify ourselves to other bitcoin peers. + UserAgentVersion = "0.0.1-alpha" + + // Services describes the services that are supported by the server. + Services = wire.SFNodeCF + + // RequiredServices describes the services that are required to be + // supported by outbound peers. + RequiredServices = wire.SFNodeNetwork | wire.SFNodeCF + + // BanThreshold is the maximum ban score before a peer is banned. + BanThreshold = uint32(100) + + // BanDuration is the duration of a ban. + BanDuration = time.Hour * 24 + + // TargetOutbound is the number of outbound peers to target. + TargetOutbound = 8 + + // MaxPeers is the maximum number of connections the client maintains. + MaxPeers = 125 + + // DisableDNSSeed disables getting initial addresses for Bitcoin nodes + // from DNS. + DisableDNSSeed = false +) + +// updatePeerHeightsMsg is a message sent from the blockmanager to the server +// after a new block has been accepted. The purpose of the message is to update +// the heights of peers that were known to announce the block before we +// connected it to the main chain or recognized it as an orphan. With these +// updates, peer heights will be kept up to date, allowing for fresh data when +// selecting sync peer candidacy. +type updatePeerHeightsMsg struct { + newHash *chainhash.Hash + newHeight int32 + originPeer *serverPeer +} + +// peerState maintains state of inbound, persistent, outbound peers as well +// as banned peers and outbound groups. +type peerState struct { + outboundPeers map[int32]*serverPeer + persistentPeers map[int32]*serverPeer + banned map[string]time.Time + outboundGroups map[string]int +} + +// Count returns the count of all known peers. +func (ps *peerState) Count() int { + return len(ps.outboundPeers) + len(ps.persistentPeers) +} + +// forAllOutboundPeers is a helper function that runs closure on all outbound +// peers known to peerState. +func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) { + for _, e := range ps.outboundPeers { + closure(e) + } + for _, e := range ps.persistentPeers { + closure(e) + } +} + +// forAllPeers is a helper function that runs closure on all peers known to +// peerState. +func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) { + ps.forAllOutboundPeers(closure) +} + +// serverPeer extends the peer to maintain state shared by the server and +// the blockmanager. +type serverPeer struct { + // The following variables must only be used atomically + feeFilter int64 + + *peer.Peer + + connReq *connmgr.ConnReq + server *ChainService + persistent bool + continueHash *chainhash.Hash + relayMtx sync.Mutex + requestQueue []*wire.InvVect + requestedFilters map[chainhash.Hash]bool + requestedBlocks map[chainhash.Hash]struct{} + knownAddresses map[string]struct{} + banScore connmgr.DynamicBanScore + quit chan struct{} + // The following chans are used to sync blockmanager and server. + blockProcessed chan struct{} +} + +// newServerPeer returns a new serverPeer instance. The peer needs to be set by +// the caller. +func newServerPeer(s *ChainService, isPersistent bool) *serverPeer { + return &serverPeer{ + server: s, + persistent: isPersistent, + requestedFilters: make(map[chainhash.Hash]bool), + requestedBlocks: make(map[chainhash.Hash]struct{}), + knownAddresses: make(map[string]struct{}), + quit: make(chan struct{}), + blockProcessed: make(chan struct{}, 1), + } +} + +// newestBlock returns the current best block hash and height using the format +// required by the configuration for the peer package. +func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) { + best, err := sp.server.BestSnapshot() + if err != nil { + return nil, 0, err + } + return &best.Hash, best.Height, nil +} + +// addKnownAddresses adds the given addresses to the set of known addresses to +// the peer to prevent sending duplicate addresses. +func (sp *serverPeer) addKnownAddresses(addresses []*wire.NetAddress) { + for _, na := range addresses { + sp.knownAddresses[addrmgr.NetAddressKey(na)] = struct{}{} + } +} + +// addressKnown true if the given address is already known to the peer. +func (sp *serverPeer) addressKnown(na *wire.NetAddress) bool { + _, exists := sp.knownAddresses[addrmgr.NetAddressKey(na)] + return exists +} + +// addBanScore increases the persistent and decaying ban score fields by the +// values passed as parameters. If the resulting score exceeds half of the ban +// threshold, a warning is logged including the reason provided. Further, if +// the score is above the ban threshold, the peer will be banned and +// disconnected. +func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) { + // No warning is logged and no score is calculated if banning is disabled. + warnThreshold := BanThreshold >> 1 + if transient == 0 && persistent == 0 { + // The score is not being increased, but a warning message is still + // logged if the score is above the warn threshold. + score := sp.banScore.Int() + if score > warnThreshold { + log.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+ + "it was not increased this time", sp, reason, score) + } + return + } + score := sp.banScore.Increase(persistent, transient) + if score > warnThreshold { + log.Warnf("Misbehaving peer %s: %s -- ban score increased to %d", + sp, reason, score) + if score > BanThreshold { + log.Warnf("Misbehaving peer %s -- banning and disconnecting", + sp) + sp.server.BanPeer(sp) + sp.Disconnect() + } + } +} + +// pushGetCFHeadersMsg sends a getcfheaders message for the provided block +// locator and stop hash to the connected peer. +func (sp *serverPeer) pushGetCFHeadersMsg(locator blockchain.BlockLocator, + stopHash *chainhash.Hash) error { + msg := wire.NewMsgGetCFHeaders() + msg.HashStop = *stopHash + for _, hash := range locator { + err := msg.AddBlockLocatorHash(hash) + if err != nil { + return err + } + } + sp.QueueMessage(msg, nil) + return nil +} + +// OnVersion is invoked when a peer receives a version bitcoin message +// and is used to negotiate the protocol version details as well as kick start +// the communications. +func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { + // Add the remote peer time as a sample for creating an offset against + // the local clock to keep the network time in sync. + sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp) + + // Signal the block manager this peer is a new sync candidate. + sp.server.blockManager.NewPeer(sp) + + // Update the address manager and request known addresses from the + // remote peer for outbound connections. This is skipped when running + // on the simulation test network since it is only intended to connect + // to specified peers and actively avoids advertising and connecting to + // discovered peers. + if sp.server.chainParams.Net != chaincfg.SimNetParams.Net { + addrManager := sp.server.addrManager + // Request known addresses if the server address manager needs + // more and the peer has a protocol version new enough to + // include a timestamp with addresses. + hasTimestamp := sp.ProtocolVersion() >= + wire.NetAddressTimeVersion + if addrManager.NeedMoreAddresses() && hasTimestamp { + sp.QueueMessage(wire.NewMsgGetAddr(), nil) + } + + // Mark the address as a known good address. + addrManager.Good(sp.NA()) + } + + // Add valid peer to the server. + sp.server.AddPeer(sp) +} + +// OnBlock is invoked when a peer receives a block bitcoin message. It +// blocks until the bitcoin block has been fully processed. +func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { + log.Tracef("got block %v", msg.BlockHash()) + // Convert the raw MsgBlock to a btcutil.Block which provides some + // convenience methods and things such as hash caching. + block := btcutil.NewBlockFromBlockAndBytes(msg, buf) + + // Add the block to the known inventory for the peer. + iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) + sp.AddKnownInventory(iv) + + // Queue the block up to be handled by the block + // manager and intentionally block further receives + // until the bitcoin block is fully processed and known + // good or bad. This helps prevent a malicious peer + // from queuing up a bunch of bad blocks before + // disconnecting (or being disconnected) and wasting + // memory. Additionally, this behavior is depended on + // by at least the block acceptance test tool as the + // reference implementation processes blocks in the same + // thread and therefore blocks further messages until + // the bitcoin block has been fully processed. + //sp.server.blockManager.QueueBlock(block, sp) + <-sp.blockProcessed +} + +// OnInv is invoked when a peer receives an inv bitcoin message and is +// used to examine the inventory being advertised by the remote peer and react +// accordingly. We pass the message down to blockmanager which will call +// QueueMessage with any appropriate responses. +func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { + log.Tracef("Got inv with %v items", len(msg.InvList)) + newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList))) + for _, invVect := range msg.InvList { + if invVect.Type == wire.InvTypeTx { + log.Tracef("Ignoring tx %v in inv from %v -- "+ + "SPV mode", invVect.Hash, sp) + if sp.ProtocolVersion() >= wire.BIP0037Version { + log.Infof("Peer %v is announcing "+ + "transactions -- disconnecting", sp) + sp.Disconnect() + return + } + continue + } + err := newInv.AddInvVect(invVect) + if err != nil { + log.Errorf("Failed to add inventory vector: %v", err) + break + } + } + + if len(newInv.InvList) > 0 { + sp.server.blockManager.QueueInv(newInv, sp) + } +} + +// OnHeaders is invoked when a peer receives a headers bitcoin +// message. The message is passed down to the block manager. +func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { + log.Tracef("Got headers with %v items", len(msg.Headers)) + sp.server.blockManager.QueueHeaders(msg, sp) +} + +// handleGetData is invoked when a peer receives a getdata bitcoin message and +// is used to deliver block and transaction information. +func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { + numAdded := 0 + notFound := wire.NewMsgNotFound() + + length := len(msg.InvList) + // A decaying ban score increase is applied to prevent exhausting resources + // with unusually large inventory queries. + // Requesting more than the maximum inventory vector length within a short + // period of time yields a score above the default ban threshold. Sustained + // bursts of small requests are not penalized as that would potentially ban + // peers performing IBD. + // This incremental score decays each minute to half of its value. + sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") + + // We wait on this wait channel periodically to prevent queuing + // far more data than we can send in a reasonable time, wasting memory. + // The waiting occurs after the database fetch for the next one to + // provide a little pipelining. + var waitChan chan struct{} + doneChan := make(chan struct{}, 1) + + for i, iv := range msg.InvList { + var c chan struct{} + // If this will be the last message we send. + if i == length-1 && len(notFound.InvList) == 0 { + c = doneChan + } else if (i+1)%3 == 0 { + // Buffered so as to not make the send goroutine block. + c = make(chan struct{}, 1) + } + var err error + switch iv.Type { + case wire.InvTypeTx: + err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan) + default: + log.Warnf("Unsupported type in inventory request %d", + iv.Type) + continue + } + if err != nil { + notFound.AddInvVect(iv) + + // When there is a failure fetching the final entry + // and the done channel was sent in due to there + // being no outstanding not found inventory, consume + // it here because there is now not found inventory + // that will use the channel momentarily. + if i == len(msg.InvList)-1 && c != nil { + <-c + } + } + numAdded++ + waitChan = c + } + if len(notFound.InvList) != 0 { + sp.QueueMessage(notFound, doneChan) + } + + // Wait for messages to be sent. We can send quite a lot of data at this + // point and this will keep the peer busy for a decent amount of time. + // We don't process anything else by them in this time so that we + // have an idea of when we should hear back from them - else the idle + // timeout could fire when we were only half done sending the blocks. + if numAdded > 0 { + <-doneChan + } +} + +// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and +// is used by remote peers to request that no transactions which have a fee rate +// lower than provided value are inventoried to them. The peer will be +// disconnected if an invalid fee filter value is provided. +func (sp *serverPeer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) { + // Check that the passed minimum fee is a valid amount. + if msg.MinFee < 0 || msg.MinFee > btcutil.MaxSatoshi { + log.Debugf("Peer %v sent an invalid feefilter '%v' -- "+ + "disconnecting", sp, btcutil.Amount(msg.MinFee)) + sp.Disconnect() + return + } + + atomic.StoreInt64(&sp.feeFilter, msg.MinFee) +} + +// OnReject is invoked when a peer receives a reject bitcoin message and is +// used to notify the server about a rejected transaction. +func (sp *serverPeer) OnReject(_ *peer.Peer, msg *wire.MsgReject) { + +} + +// OnCFHeaders is invoked when a peer receives a cfheaders bitcoin message and +// is used to notify the server about a list of committed filter headers. +func (sp *serverPeer) OnCFHeaders(_ *peer.Peer, msg *wire.MsgCFHeaders) { + log.Trace("Got cfheaders message!") +} + +// OnCFilter is invoked when a peer receives a cfilter bitcoin message and is +// used to notify the server about a committed filter. +func (sp *serverPeer) OnCFilter(_ *peer.Peer, msg *wire.MsgCFilter) { + +} + +// OnAddr is invoked when a peer receives an addr bitcoin message and is +// used to notify the server about advertised addresses. +func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { + // Ignore addresses when running on the simulation test network. This + // helps prevent the network from becoming another public test network + // since it will not be able to learn about other peers that have not + // specifically been provided. + if sp.server.chainParams.Net == chaincfg.SimNetParams.Net { + return + } + + // Ignore old style addresses which don't include a timestamp. + if sp.ProtocolVersion() < wire.NetAddressTimeVersion { + return + } + + // A message that has no addresses is invalid. + if len(msg.AddrList) == 0 { + log.Errorf("Command [%s] from %s does not contain any addresses", + msg.Command(), sp) + sp.Disconnect() + return + } + + for _, na := range msg.AddrList { + // Don't add more address if we're disconnecting. + if !sp.Connected() { + return + } + + // Set the timestamp to 5 days ago if it's more than 24 hours + // in the future so this address is one of the first to be + // removed when space is needed. + now := time.Now() + if na.Timestamp.After(now.Add(time.Minute * 10)) { + na.Timestamp = now.Add(-1 * time.Hour * 24 * 5) + } + + // Add address to known addresses for this peer. + sp.addKnownAddresses([]*wire.NetAddress{na}) + } + + // Add addresses to server address manager. The address manager handles + // the details of things such as preventing duplicate addresses, max + // addresses, and last seen updates. + // XXX bitcoind gives a 2 hour time penalty here, do we want to do the + // same? + sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA()) +} + +// OnRead is invoked when a peer receives a message and it is used to update +// the bytes received by the server. +func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) { + sp.server.AddBytesReceived(uint64(bytesRead)) +} + +// OnWrite is invoked when a peer sends a message and it is used to update +// the bytes sent by the server. +func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, err error) { + sp.server.AddBytesSent(uint64(bytesWritten)) +} + +// ChainService is instantiated with functional options +type ChainService struct { + // The following variables must only be used atomically. + // Putting the uint64s first makes them 64-bit aligned for 32-bit systems. + bytesReceived uint64 // Total bytes received from all peers since start. + bytesSent uint64 // Total bytes sent by all peers since start. + started int32 + shutdown int32 + + namespace walletdb.Namespace + chainParams chaincfg.Params + addrManager *addrmgr.AddrManager + connManager *connmgr.ConnManager + blockManager *blockManager + newPeers chan *serverPeer + donePeers chan *serverPeer + banPeers chan *serverPeer + query chan interface{} + peerHeightsUpdate chan updatePeerHeightsMsg + wg sync.WaitGroup + quit chan struct{} + timeSource blockchain.MedianTimeSource + services wire.ServiceFlag + + userAgentName string + userAgentVersion string +} + +// BanPeer bans a peer that has already been connected to the server by ip. +func (s *ChainService) BanPeer(sp *serverPeer) { + s.banPeers <- sp +} + +// BestSnapshot returns the best block hash and height known to the database. +func (s *ChainService) BestSnapshot() (*waddrmgr.BlockStamp, error) { + var best *waddrmgr.BlockStamp + var err error + err = s.namespace.View(func(tx walletdb.Tx) error { + best, err = SyncedTo(tx) + return err + }) + if err != nil { + return nil, err + } + return best, nil +} + +// LatestBlockLocator returns the block locator for the latest known block +// stored in the database. +func (s *ChainService) LatestBlockLocator() (blockchain.BlockLocator, error) { + var locator blockchain.BlockLocator + var err error + err = s.namespace.View(func(tx walletdb.Tx) error { + best, err := SyncedTo(tx) + if err != nil { + return err + } + locator = BlockLocatorFromHash(tx, best.Hash) + return nil + }) + if err != nil { + return nil, err + } + return locator, nil +} + +// AddPeer adds a new peer that has already been connected to the server. +func (s *ChainService) AddPeer(sp *serverPeer) { + s.newPeers <- sp +} + +// AddBytesSent adds the passed number of bytes to the total bytes sent counter +// for the server. It is safe for concurrent access. +func (s *ChainService) AddBytesSent(bytesSent uint64) { + atomic.AddUint64(&s.bytesSent, bytesSent) +} + +// AddBytesReceived adds the passed number of bytes to the total bytes received +// counter for the server. It is safe for concurrent access. +func (s *ChainService) AddBytesReceived(bytesReceived uint64) { + atomic.AddUint64(&s.bytesReceived, bytesReceived) +} + +// NetTotals returns the sum of all bytes received and sent across the network +// for all peers. It is safe for concurrent access. +func (s *ChainService) NetTotals() (uint64, uint64) { + return atomic.LoadUint64(&s.bytesReceived), + atomic.LoadUint64(&s.bytesSent) +} + +// pushTxMsg sends a tx message for the provided transaction hash to the +// connected peer. An error is returned if the transaction hash is not known. +func (s *ChainService) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { + // Attempt to fetch the requested transaction from the pool. A + // call could be made to check for existence first, but simply trying + // to fetch a missing transaction results in the same behavior. + /* tx, err := s.txMemPool.FetchTransaction(hash) + if err != nil { + log.Tracef("Unable to fetch tx %v from transaction "+ + "pool: %v", hash, err) + + if doneChan != nil { + doneChan <- struct{}{} + } + return err + } + + // Once we have fetched data wait for any previous operation to finish. + if waitChan != nil { + <-waitChan + } + + sp.QueueMessage(tx.MsgTx(), doneChan) */ + + return nil +} + +// peerHandler is used to handle peer operations such as adding and removing +// peers to and from the server, banning peers, and broadcasting messages to +// peers. It must be run in a goroutine. +func (s *ChainService) peerHandler() { + // Start the address manager and block manager, both of which are needed + // by peers. This is done here since their lifecycle is closely tied + // to this handler and rather than adding more channels to sychronize + // things, it's easier and slightly faster to simply start and stop them + // in this handler. + s.addrManager.Start() + s.blockManager.Start() + + state := &peerState{ + persistentPeers: make(map[int32]*serverPeer), + outboundPeers: make(map[int32]*serverPeer), + banned: make(map[string]time.Time), + outboundGroups: make(map[string]int), + } + + if !DisableDNSSeed { + // Add peers discovered through DNS to the address manager. + connmgr.SeedFromDNS(&s.chainParams, RequiredServices, + net.LookupIP, func(addrs []*wire.NetAddress) { + // Bitcoind uses a lookup of the dns seeder here. This + // is rather strange since the values looked up by the + // DNS seed lookups will vary quite a lot. + // to replicate this behaviour we put all addresses as + // having come from the first one. + s.addrManager.AddAddresses(addrs, addrs[0]) + }) + } + go s.connManager.Start() + +out: + for { + select { + // New peers connected to the server. + case p := <-s.newPeers: + s.handleAddPeerMsg(state, p) + + // Disconnected peers. + case p := <-s.donePeers: + s.handleDonePeerMsg(state, p) + + // Block accepted in mainchain or orphan, update peer height. + case umsg := <-s.peerHeightsUpdate: + s.handleUpdatePeerHeights(state, umsg) + + // Peer to ban. + case p := <-s.banPeers: + s.handleBanPeerMsg(state, p) + + case qmsg := <-s.query: + s.handleQuery(state, qmsg) + + case <-s.quit: + // Disconnect all peers on server shutdown. + state.forAllPeers(func(sp *serverPeer) { + log.Tracef("Shutdown peer %s", sp) + sp.Disconnect() + }) + break out + } + } + + s.connManager.Stop() + s.blockManager.Stop() + s.addrManager.Stop() + + // Drain channels before exiting so nothing is left waiting around + // to send. +cleanup: + for { + select { + case <-s.newPeers: + case <-s.donePeers: + case <-s.peerHeightsUpdate: + case <-s.query: + default: + break cleanup + } + } + s.wg.Done() + log.Tracef("Peer handler done") +} + +// Config is a struct detailing the configuration of the chain service. +type Config struct { + DataDir string + Namespace walletdb.Namespace + ChainParams chaincfg.Params + ConnectPeers []string + AddPeers []string +} + +// NewChainService returns a new chain service configured to connect to the +// bitcoin network type specified by chainParams. Use start to begin syncing +// with peers. +func NewChainService(cfg Config) (*ChainService, error) { + amgr := addrmgr.New(cfg.DataDir, net.LookupIP) + + s := ChainService{ + chainParams: cfg.ChainParams, + addrManager: amgr, + newPeers: make(chan *serverPeer, MaxPeers), + donePeers: make(chan *serverPeer, MaxPeers), + banPeers: make(chan *serverPeer, MaxPeers), + query: make(chan interface{}), + quit: make(chan struct{}), + peerHeightsUpdate: make(chan updatePeerHeightsMsg), + namespace: cfg.Namespace, + timeSource: blockchain.NewMedianTime(), + services: Services, + userAgentName: UserAgentName, + userAgentVersion: UserAgentVersion, + } + + err := createSPVNS(s.namespace, &s.chainParams) + if err != nil { + return nil, err + } + + bm, err := newBlockManager(&s) + if err != nil { + return nil, err + } + s.blockManager = bm + + // Only setup a function to return new addresses to connect to when + // not running in connect-only mode. The simulation network is always + // in connect-only mode since it is only intended to connect to + // specified peers and actively avoid advertising and connecting to + // discovered peers in order to prevent it from becoming a public test + // network. + var newAddressFunc func() (net.Addr, error) + if s.chainParams.Net != chaincfg.SimNetParams.Net { + newAddressFunc = func() (net.Addr, error) { + for tries := 0; tries < 100; tries++ { + addr := s.addrManager.GetAddress() + if addr == nil { + break + } + + // Address will not be invalid, local or unroutable + // because addrmanager rejects those on addition. + // Just check that we don't already have an address + // in the same group so that we are not connecting + // to the same network segment at the expense of + // others. + key := addrmgr.GroupKey(addr.NetAddress()) + if s.OutboundGroupCount(key) != 0 { + continue + } + + // only allow recent nodes (10mins) after we failed 30 + // times + if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute { + continue + } + + // allow nondefault ports after 50 failed tries. + if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) != + s.chainParams.DefaultPort { + continue + } + + addrString := addrmgr.NetAddressKey(addr.NetAddress()) + return addrStringToNetAddr(addrString) + } + + return nil, errors.New("no valid connect address") + } + } + + // Create a connection manager. + if MaxPeers < TargetOutbound { + TargetOutbound = MaxPeers + } + cmgr, err := connmgr.New(&connmgr.Config{ + RetryDuration: ConnectionRetryInterval, + TargetOutbound: uint32(TargetOutbound), + Dial: func(addr net.Addr) (net.Conn, error) { + return net.Dial(addr.Network(), addr.String()) + }, + OnConnection: s.outboundPeerConnected, + GetNewAddress: newAddressFunc, + }) + if err != nil { + return nil, err + } + s.connManager = cmgr + + // Start up persistent peers. + permanentPeers := cfg.ConnectPeers + if len(permanentPeers) == 0 { + permanentPeers = cfg.AddPeers + } + for _, addr := range permanentPeers { + tcpAddr, err := addrStringToNetAddr(addr) + if err != nil { + return nil, err + } + + go s.connManager.Connect(&connmgr.ConnReq{ + Addr: tcpAddr, + Permanent: true, + }) + } + + return &s, nil +} + +// addrStringToNetAddr takes an address in the form of 'host:port' and returns +// a net.Addr which maps to the original address with any host names resolved +// to IP addresses. +func addrStringToNetAddr(addr string) (net.Addr, error) { + host, strPort, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + + // Attempt to look up an IP address associated with the parsed host. + ips, err := net.LookupIP(host) + if err != nil { + return nil, err + } + if len(ips) == 0 { + return nil, fmt.Errorf("no addresses found for %s", host) + } + + port, err := strconv.Atoi(strPort) + if err != nil { + return nil, err + } + + return &net.TCPAddr{ + IP: ips[0], + Port: port, + }, nil +} + +// handleUpdatePeerHeight updates the heights of all peers who were known to +// announce a block we recently accepted. +func (s *ChainService) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) { + state.forAllPeers(func(sp *serverPeer) { + // The origin peer should already have the updated height. + if sp == umsg.originPeer { + return + } + + // This is a pointer to the underlying memory which doesn't + // change. + latestBlkHash := sp.LastAnnouncedBlock() + + // Skip this peer if it hasn't recently announced any new blocks. + if latestBlkHash == nil { + return + } + + // If the peer has recently announced a block, and this block + // matches our newly accepted block, then update their block + // height. + if *latestBlkHash == *umsg.newHash { + sp.UpdateLastBlockHeight(umsg.newHeight) + sp.UpdateLastAnnouncedBlock(nil) + } + }) +} + +// handleAddPeerMsg deals with adding new peers. It is invoked from the +// peerHandler goroutine. +func (s *ChainService) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { + if sp == nil { + return false + } + + // Ignore new peers if we're shutting down. + if atomic.LoadInt32(&s.shutdown) != 0 { + log.Infof("New peer %s ignored - server is shutting down", sp) + sp.Disconnect() + return false + } + + // Disconnect banned peers. + host, _, err := net.SplitHostPort(sp.Addr()) + if err != nil { + log.Debugf("can't split hostport %v", err) + sp.Disconnect() + return false + } + if banEnd, ok := state.banned[host]; ok { + if time.Now().Before(banEnd) { + log.Debugf("Peer %s is banned for another %v - disconnecting", + host, banEnd.Sub(time.Now())) + sp.Disconnect() + return false + } + + log.Infof("Peer %s is no longer banned", host) + delete(state.banned, host) + } + + // TODO: Check for max peers from a single IP. + + // Limit max number of total peers. + if state.Count() >= MaxPeers { + log.Infof("Max peers reached [%d] - disconnecting peer %s", + MaxPeers, sp) + sp.Disconnect() + // TODO: how to handle permanent peers here? + // they should be rescheduled. + return false + } + + // Add the new peer and start it. + log.Debugf("New peer %s", sp) + state.outboundGroups[addrmgr.GroupKey(sp.NA())]++ + if sp.persistent { + state.persistentPeers[sp.ID()] = sp + } else { + state.outboundPeers[sp.ID()] = sp + } + + return true +} + +// handleDonePeerMsg deals with peers that have signalled they are done. It is +// invoked from the peerHandler goroutine. +func (s *ChainService) handleDonePeerMsg(state *peerState, sp *serverPeer) { + var list map[int32]*serverPeer + if sp.persistent { + list = state.persistentPeers + } else { + list = state.outboundPeers + } + if _, ok := list[sp.ID()]; ok { + if !sp.Inbound() && sp.VersionKnown() { + state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- + } + if !sp.Inbound() && sp.connReq != nil { + s.connManager.Disconnect(sp.connReq.ID()) + } + delete(list, sp.ID()) + log.Debugf("Removed peer %s", sp) + return + } + + if sp.connReq != nil { + s.connManager.Disconnect(sp.connReq.ID()) + } + + // Update the address' last seen time if the peer has acknowledged + // our version and has sent us its version as well. + if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil { + s.addrManager.Connected(sp.NA()) + } + + // If we get here it means that either we didn't know about the peer + // or we purposefully deleted it. +} + +// handleBanPeerMsg deals with banning peers. It is invoked from the +// peerHandler goroutine. +func (s *ChainService) handleBanPeerMsg(state *peerState, sp *serverPeer) { + host, _, err := net.SplitHostPort(sp.Addr()) + if err != nil { + log.Debugf("can't split ban peer %s %v", sp.Addr(), err) + return + } + log.Infof("Banned peer %s for %v", host, BanDuration) + state.banned[host] = time.Now().Add(BanDuration) +} + +// disconnectPeer attempts to drop the connection of a tageted peer in the +// passed peer list. Targets are identified via usage of the passed +// `compareFunc`, which should return `true` if the passed peer is the target +// peer. This function returns true on success and false if the peer is unable +// to be located. If the peer is found, and the passed callback: `whenFound' +// isn't nil, we call it with the peer as the argument before it is removed +// from the peerList, and is disconnected from the server. +func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer) bool, whenFound func(*serverPeer)) bool { + for addr, peer := range peerList { + if compareFunc(peer) { + if whenFound != nil { + whenFound(peer) + } + + // This is ok because we are not continuing + // to iterate so won't corrupt the loop. + delete(peerList, addr) + peer.Disconnect() + return true + } + } + return false +} + +// sendUnminedTxs iterates through all transactions that spend from wallet +// credits that are not known to have been mined into a block, and attempts to +// send each to the chain server for relay. +// +// TODO: This should return an error if any of these lookups or sends fail, but +// since send errors due to double spends need to be handled gracefully and this +// isn't done yet, all sending errors are simply logged. +func (s *ChainService) sendUnminedTxs(w *wallet.Wallet) error { + /*txs, err := w.TxStore.UnminedTxs() + if err != nil { + return err + } + rpcClient := s.rpcClient + for _, tx := range txs { + resp, err := rpcClient.SendRawTransaction(tx, false) + if err != nil { + // TODO(jrick): Check error for if this tx is a double spend, + // remove it if so. + log.Debugf("Could not resend transaction %v: %v", + tx.TxHash(), err) + continue + } + log.Debugf("Resent unmined transaction %v", resp) + }*/ + return nil +} + +// PublishTransaction sends the transaction to the consensus RPC server so it +// can be propigated to other nodes and eventually mined. +func (s *ChainService) PublishTransaction(tx *wire.MsgTx) error { + /*_, err := s.rpcClient.SendRawTransaction(tx, false) + return err*/ + return nil +} + +// AnnounceNewTransactions generates and relays inventory vectors and notifies +// both websocket and getblocktemplate long poll clients of the passed +// transactions. This function should be called whenever new transactions +// are added to the mempool. +func (s *ChainService) AnnounceNewTransactions( /*newTxs []*mempool.TxDesc*/ ) { + // Generate and relay inventory vectors for all newly accepted + // transactions into the memory pool due to the original being + // accepted. + /*for _, txD := range newTxs { + // Generate the inventory vector and relay it. + iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) + s.RelayInventory(iv, txD) + + if s.rpcServer != nil { + // Notify websocket clients about mempool transactions. + s.rpcServer.ntfnMgr.NotifyMempoolTx(txD.Tx, true) + + // Potentially notify any getblocktemplate long poll clients + // about stale block templates due to the new transaction. + s.rpcServer.gbtWorkState.NotifyMempoolTx( + s.txMemPool.LastUpdated()) + } + }*/ +} + +// newPeerConfig returns the configuration for the given serverPeer. +func newPeerConfig(sp *serverPeer) *peer.Config { + return &peer.Config{ + Listeners: peer.MessageListeners{ + OnVersion: sp.OnVersion, + OnBlock: sp.OnBlock, + OnInv: sp.OnInv, + OnHeaders: sp.OnHeaders, + OnCFHeaders: sp.OnCFHeaders, + OnCFilter: sp.OnCFilter, + OnGetData: sp.OnGetData, + OnReject: sp.OnReject, + OnFeeFilter: sp.OnFeeFilter, + OnAddr: sp.OnAddr, + OnRead: sp.OnRead, + OnWrite: sp.OnWrite, + + // Note: The reference client currently bans peers that send alerts + // not signed with its key. We could verify against their key, but + // since the reference client is currently unwilling to support + // other implementations' alert messages, we will not relay theirs. + OnAlert: nil, + }, + NewestBlock: sp.newestBlock, + HostToNetAddress: sp.server.addrManager.HostToNetAddress, + UserAgentName: sp.server.userAgentName, + UserAgentVersion: sp.server.userAgentVersion, + ChainParams: &sp.server.chainParams, + Services: sp.server.services, + ProtocolVersion: wire.FeeFilterVersion, + DisableRelayTx: true, + } +} + +// outboundPeerConnected is invoked by the connection manager when a new +// outbound connection is established. It initializes a new outbound server +// peer instance, associates it with the relevant state such as the connection +// request instance and the connection itself, and finally notifies the address +// manager of the attempt. +func (s *ChainService) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { + sp := newServerPeer(s, c.Permanent) + p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String()) + if err != nil { + log.Debugf("Cannot create outbound peer %s: %v", c.Addr, err) + s.connManager.Disconnect(c.ID()) + } + sp.Peer = p + sp.connReq = c + sp.AssociateConnection(conn) + go s.peerDoneHandler(sp) + s.addrManager.Attempt(sp.NA()) +} + +// peerDoneHandler handles peer disconnects by notifiying the server that it's +// done along with other performing other desirable cleanup. +func (s *ChainService) peerDoneHandler(sp *serverPeer) { + sp.WaitForDisconnect() + s.donePeers <- sp + + // Only tell block manager we are gone if we ever told it we existed. + if sp.VersionKnown() { + s.blockManager.DonePeer(sp) + } + close(sp.quit) +} + +// ConnectedCount returns the number of currently connected peers. +func (s *ChainService) ConnectedCount() int32 { + replyChan := make(chan int32) + + s.query <- getConnCountMsg{reply: replyChan} + + return <-replyChan +} + +// OutboundGroupCount returns the number of peers connected to the given +// outbound group key. +func (s *ChainService) OutboundGroupCount(key string) int { + replyChan := make(chan int) + s.query <- getOutboundGroup{key: key, reply: replyChan} + return <-replyChan +} + +// AddedNodeInfo returns an array of btcjson.GetAddedNodeInfoResult structures +// describing the persistent (added) nodes. +func (s *ChainService) AddedNodeInfo() []*serverPeer { + replyChan := make(chan []*serverPeer) + s.query <- getAddedNodesMsg{reply: replyChan} + return <-replyChan +} + +// Peers returns an array of all connected peers. +func (s *ChainService) Peers() []*serverPeer { + replyChan := make(chan []*serverPeer) + + s.query <- getPeersMsg{reply: replyChan} + + return <-replyChan +} + +// DisconnectNodeByAddr disconnects a peer by target address. Both outbound and +// inbound nodes will be searched for the target node. An error message will +// be returned if the peer was not found. +func (s *ChainService) DisconnectNodeByAddr(addr string) error { + replyChan := make(chan error) + + s.query <- disconnectNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, + reply: replyChan, + } + + return <-replyChan +} + +// DisconnectNodeByID disconnects a peer by target node id. Both outbound and +// inbound nodes will be searched for the target node. An error message will be +// returned if the peer was not found. +func (s *ChainService) DisconnectNodeByID(id int32) error { + replyChan := make(chan error) + + s.query <- disconnectNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.ID() == id }, + reply: replyChan, + } + + return <-replyChan +} + +// RemoveNodeByAddr removes a peer from the list of persistent peers if +// present. An error will be returned if the peer was not found. +func (s *ChainService) RemoveNodeByAddr(addr string) error { + replyChan := make(chan error) + + s.query <- removeNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.Addr() == addr }, + reply: replyChan, + } + + return <-replyChan +} + +// RemoveNodeByID removes a peer by node ID from the list of persistent peers +// if present. An error will be returned if the peer was not found. +func (s *ChainService) RemoveNodeByID(id int32) error { + replyChan := make(chan error) + + s.query <- removeNodeMsg{ + cmp: func(sp *serverPeer) bool { return sp.ID() == id }, + reply: replyChan, + } + + return <-replyChan +} + +// ConnectNode adds `addr' as a new outbound peer. If permanent is true then the +// peer will be persistent and reconnect if the connection is lost. +// It is an error to call this with an already existing peer. +func (s *ChainService) ConnectNode(addr string, permanent bool) error { + replyChan := make(chan error) + + s.query <- connectNodeMsg{addr: addr, permanent: permanent, reply: replyChan} + + return <-replyChan +} + +// UpdatePeerHeights updates the heights of all peers who have have announced +// the latest connected main chain block, or a recognized orphan. These height +// updates allow us to dynamically refresh peer heights, ensuring sync peer +// selection has access to the latest block heights for each peer. +func (s *ChainService) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *serverPeer) { + s.peerHeightsUpdate <- updatePeerHeightsMsg{ + newHash: latestBlkHash, + newHeight: latestHeight, + originPeer: updateSource, + } +} + +// rebroadcastHandler keeps track of user submitted inventories that we have +// sent out but have not yet made it into a block. We periodically rebroadcast +// them in case our peers restarted or otherwise lost track of them. +func (s *ChainService) rebroadcastHandler() { + // Wait 5 min before first tx rebroadcast. + timer := time.NewTimer(5 * time.Minute) + //pendingInvs := make(map[wire.InvVect]interface{}) + +out: + for { + select { + /*case riv := <-s.modifyRebroadcastInv: + switch msg := riv.(type) { + // Incoming InvVects are added to our map of RPC txs. + case broadcastInventoryAdd: + pendingInvs[*msg.invVect] = msg.data + + // When an InvVect has been added to a block, we can + // now remove it, if it was present. + case broadcastInventoryDel: + if _, ok := pendingInvs[*msg]; ok { + delete(pendingInvs, *msg) + } + }*/ + + case <-timer.C: /* + // Any inventory we have has not made it into a block + // yet. We periodically resubmit them until they have. + for iv, data := range pendingInvs { + ivCopy := iv + s.RelayInventory(&ivCopy, data) + } + + // Process at a random time up to 30mins (in seconds) + // in the future. + timer.Reset(time.Second * + time.Duration(randomUint16Number(1800))) */ + + case <-s.quit: + break out + } + } + + timer.Stop() + + // Drain channels before exiting so nothing is left waiting around + // to send. +cleanup: + for { + select { + //case <-s.modifyRebroadcastInv: + default: + break cleanup + } + } + s.wg.Done() +} + +// Start begins connecting to peers and syncing the blockchain. +func (s *ChainService) Start() { + // Already started? + if atomic.AddInt32(&s.started, 1) != 1 { + return + } + + // Start the peer handler which in turn starts the address and block + // managers. + s.wg.Add(2) + go s.peerHandler() + go s.rebroadcastHandler() + +} + +// Stop gracefully shuts down the server by stopping and disconnecting all +// peers and the main listener. +func (s *ChainService) Stop() error { + // Make sure this only happens once. + if atomic.AddInt32(&s.shutdown, 1) != 1 { + return nil + } + + // Signal the remaining goroutines to quit. + close(s.quit) + s.wg.Wait() + return nil +} + +// GetBlockByHeight gets block information from the ChainService database by +// its height. +func (s *ChainService) GetBlockByHeight(height uint32) (wire.BlockHeader, + uint32, error) { + var bh wire.BlockHeader + var h uint32 + var err error + err = s.namespace.View(func(dbTx walletdb.Tx) error { + bh, h, err = GetBlockByHeight(dbTx, height) + return err + }) + return bh, h, err +} + +// GetBlockByHash gets block information from the ChainService database by its +// hash. +func (s *ChainService) GetBlockByHash(hash chainhash.Hash) (wire.BlockHeader, + uint32, error) { + var bh wire.BlockHeader + var h uint32 + var err error + err = s.namespace.View(func(dbTx walletdb.Tx) error { + bh, h, err = GetBlockByHash(dbTx, hash) + return err + }) + return bh, h, err +} + +// LatestBlock gets the latest block's information from the ChainService +// database. +func (s *ChainService) LatestBlock() (wire.BlockHeader, uint32, error) { + var bh wire.BlockHeader + var h uint32 + var err error + err = s.namespace.View(func(dbTx walletdb.Tx) error { + bh, h, err = LatestBlock(dbTx) + return err + }) + return bh, h, err +} + +// putBlock puts a verified block header and height in the ChainService +// database. +func (s *ChainService) putBlock(header wire.BlockHeader, height uint32) error { + return s.namespace.Update(func(dbTx walletdb.Tx) error { + return putBlock(dbTx, header, height) + }) +} + +// putMaxBlockHeight puts the max block height to the ChainService database. +func (s *ChainService) putMaxBlockHeight(maxBlockHeight uint32) error { + return s.namespace.Update(func(dbTx walletdb.Tx) error { + return putMaxBlockHeight(dbTx, maxBlockHeight) + }) +} diff --git a/spvsvc/spvsvc.go b/spvsvc/spvsvc.go new file mode 100644 index 0000000..a3de2d3 --- /dev/null +++ b/spvsvc/spvsvc.go @@ -0,0 +1,277 @@ +package spvsvc + +import ( + "fmt" + "net" + "time" + + "github.com/btcsuite/btcd/addrmgr" + "github.com/btcsuite/btcd/connmgr" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/spvsvc/spvchain" + "github.com/btcsuite/btcwallet/wallet" +) + +// SynchronizationService provides an SPV, p2p-based backend for a wallet to +// synchronize it with the network and send transactions it signs. +type SynchronizationService struct { + wallet *wallet.Wallet + chainService spvchain.ChainService +} + +// SynchronizationServiceOpt is the return type of functional options for +// creating a SynchronizationService object. +type SynchronizationServiceOpt func(*SynchronizationService) error + +// NewSynchronizationService creates a new SynchronizationService with +// functional options. +func NewSynchronizationService(opts ...SynchronizationServiceOpt) (*SynchronizationService, error) { + s := SynchronizationService{ + userAgentName: defaultUserAgentName, + userAgentVersion: defaultUserAgentVersion, + } + for _, opt := range opts { + err := opt(&s) + if err != nil { + return nil, err + } + } + return &s, nil +} + +// UserAgent is a functional option to set the user agent information as it +// appears to other nodes. +func UserAgent(agentName, agentVersion string) SynchronizationServiceOpt { + return func(s *SynchronizationService) error { + s.userAgentName = agentName + s.userAgentVersion = agentVersion + return nil + } +} + +// AddrManager is a functional option to create an address manager for the +// synchronization service. It takes a string as an argument to specify the +// directory in which to store addresses. +func AddrManager(dir string) SynchronizationServiceOpt { + return func(s *SynchronizationService) error { + m := addrmgr.New(dir, spvLookup) + s.addrManager = m + return nil + } +} + +// ConnManagerOpt is the return type of functional options to create a +// connection manager for the synchronization service. +type ConnManagerOpt func(*connmgr.Config) error + +// ConnManager is a functional option to create a connection manager for the +// synchronization service. +func ConnManager(opts ...ConnManagerOpt) SynchronizationServiceOpt { + return func(s *SynchronizationService) error { + c := connmgr.Config{ + TargetOutbound: defaultTargetOutbound, + RetryDuration: connectionRetryInterval, + GetNewAddress: s.getNewAddress, + } + for _, opt := range opts { + err := opt(&c) + if err != nil { + return err + } + } + connManager, err := connmgr.New(&c) + if err != nil { + return err + } + s.connManager = connManager + return nil + } +} + +// TargetOutbound is a functional option to specify how many outbound +// connections should be made by the ConnManager to peers. Defaults to 8. +func TargetOutbound(target uint32) ConnManagerOpt { + return func(c *connmgr.Config) error { + c.TargetOutbound = target + return nil + } +} + +// RetryDuration is a functional option to specify how long to wait before +// retrying a connection request. Defaults to 5s. +func RetryDuration(duration time.Duration) ConnManagerOpt { + return func(c *connmgr.Config) error { + c.RetryDuration = duration + return nil + } +} + +func (s *SynchronizationService) getNewAddress() (net.Addr, error) { + if s.addrManager == nil { + return nil, log.Error("Couldn't get address for new " + + "connection: address manager is nil.") + } + s.addrManager.Start() + for tries := 0; tries < 100; tries++ { + addr := s.addrManager.GetAddress() + if addr == nil { + break + } + // If we already have peers in this group, skip this address + key := addrmgr.GroupKey(addr.NetAddress()) + if s.outboundGroupCount(key) != 0 { + continue + } + if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute { + continue + } + if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) != + s.wallet.ChainParams().DefaultPort { + continue + } + addrString := addrmgr.NetAddressKey(addr.NetAddress()) + return addrStringToNetAddr(addrString) + } + return nil, log.Error("Couldn't get address for new connection: no " + + "valid addresses known.") +} + +func (s *SynchronizationService) outboundGroupCount(key string) int { + replyChan := make(chan int) + s.query <- getOutboundGroup{key: key, reply: replyChan} + return <-replyChan +} + +// SynchronizeWallet associates a wallet with the consensus RPC client, +// synchronizes the wallet with the latest changes to the blockchain, and +// continuously updates the wallet through RPC notifications. +// +// This function does not return without error until the wallet is synchronized +// to the current chain state. +func (s *SynchronizationService) SynchronizeWallet(w *wallet.Wallet) error { + s.wallet = w + + s.wg.Add(3) + go s.notificationQueueHandler() + go s.processQueuedNotifications() + go s.queryHandler() + + return s.syncWithNetwork(w) +} + +func (s *SynchronizationService) queryHandler() { + +} + +func (s *SynchronizationService) processQueuedNotifications() { + for n := range s.dequeueNotification { + var err error + notificationSwitch: + switch n := n.(type) { + case *wire.MsgBlock: + if n.BlockHash().String() != "" { + break notificationSwitch + } + case *wire.MsgHeaders: + case *wire.MsgInv: + case *wire.MsgReject: + } + + if err != nil { + log.Errorf("Cannot handle peer notification: %v", err) + } + } + s.wg.Done() +} + +// syncWithNetwork brings the wallet up to date with the current chain server +// connection. It creates a rescan request and blocks until the rescan has +// finished. +func (s *SynchronizationService) syncWithNetwork(w *wallet.Wallet) error { + /*chainClient := s.rpcClient + + // Request notifications for connected and disconnected blocks. + // + // TODO(jrick): Either request this notification only once, or when + // btcrpcclient is modified to allow some notification request to not + // automatically resent on reconnect, include the notifyblocks request + // as well. I am leaning towards allowing off all btcrpcclient + // notification re-registrations, in which case the code here should be + // left as is. + err := chainClient.NotifyBlocks() + if err != nil { + return err + } + + // Request notifications for transactions sending to all wallet + // addresses. + addrs, unspent, err := w.ActiveData() + if err != nil { + return err + } + + // TODO(jrick): How should this handle a synced height earlier than + // the chain server best block? + + // When no addresses have been generated for the wallet, the rescan can + // be skipped. + // + // TODO: This is only correct because activeData above returns all + // addresses ever created, including those that don't need to be watched + // anymore. This code should be updated when this assumption is no + // longer true, but worst case would result in an unnecessary rescan. + if len(addrs) == 0 && len(unspent) == 0 { + // TODO: It would be ideal if on initial sync wallet saved the + // last several recent blocks rather than just one. This would + // avoid a full rescan for a one block reorg of the current + // chain tip. + hash, height, err := chainClient.GetBestBlock() + if err != nil { + return err + } + return w.Manager.SetSyncedTo(&waddrmgr.BlockStamp{ + Hash: *hash, + Height: height, + }) + } + + // Compare previously-seen blocks against the chain server. If any of + // these blocks no longer exist, rollback all of the missing blocks + // before catching up with the rescan. + iter := w.Manager.NewIterateRecentBlocks() + rollback := iter == nil + syncBlock := waddrmgr.BlockStamp{ + Hash: *w.ChainParams().GenesisHash, + Height: 0, + } + for cont := iter != nil; cont; cont = iter.Prev() { + bs := iter.BlockStamp() + log.Debugf("Checking for previous saved block with height %v hash %v", + bs.Height, bs.Hash) + _, err = chainClient.GetBlock(&bs.Hash) + if err != nil { + rollback = true + continue + } + + log.Debug("Found matching block.") + syncBlock = bs + break + } + if rollback { + err = w.Manager.SetSyncedTo(&syncBlock) + if err != nil { + return err + } + // Rollback unconfirms transactions at and beyond the passed + // height, so add one to the new synced-to height to prevent + // unconfirming txs from the synced-to block. + err = w.TxStore.Rollback(syncBlock.Height + 1) + if err != nil { + return err + } + } + + return s.initialRescan(addrs, unspent, w.Manager.SyncedTo()) */ + return nil +}