diff --git a/spvsvc/spvchain/blockmanager.go b/spvsvc/spvchain/blockmanager.go index 20b0b63..ea110d2 100644 --- a/spvsvc/spvchain/blockmanager.go +++ b/spvsvc/spvchain/blockmanager.go @@ -1,3 +1,5 @@ +// NOTE: THIS API IS UNSTABLE RIGHT NOW AND WILL GO MOSTLY PRIVATE SOON. + package spvchain import ( @@ -1109,6 +1111,7 @@ func (b *blockManager) QueueCFHeaders(cfheaders *wire.MsgCFHeaders, } // handleCFHeadersMsg handles cfheaders messages from all peers. +// TODO: Refactor this using query API. func (b *blockManager) handleCFHeadersMsg(cfhmsg *cfheadersMsg) { // Grab the matching request we sent, as this message should correspond // to that, and delete it from the map on return as we're now handling @@ -1172,6 +1175,7 @@ func (b *blockManager) handleCFHeadersMsg(cfhmsg *cfheadersMsg) { // a decision about what the correct headers are, makes that decision if // possible, and downloads any cfilters and blocks necessary to make that // decision. +// TODO: Refactor this using query API. func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { // Assume we aren't ready to make a decision about correct headers yet. ready := false @@ -1240,7 +1244,8 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { el := b.headerList.Front() for el != nil { node := el.Value.(*headerNode) - hash := node.header.BlockHash() + header := node.header + hash := header.BlockHash() if node.height > *lastCFHeaderHeight { b.mapMutex.Lock() blockMap := headerMap[hash] @@ -1268,6 +1273,27 @@ func (b *blockManager) handleProcessCFHeadersMsg(msg *processCFHeadersMsg) { "extended: %t", node.height, len(blockMap[headerHash]), msg.extended) + // Notify subscribers of a connected + // block. + // TODO: Rethink this so we're not + // interrupting block processing for + // notifications if the client messes + // up channel handling. + b.server.mtxSubscribers.RLock() + for sub := range b.server.blockSubscribers { + channel := sub.onConnectBasic + if msg.extended { + channel = + sub.onConnectExt + } + if channel != nil { + select { + case channel <- *header: + case <-sub.quit: + } + } + } + b.server.mtxSubscribers.RUnlock() } *lastCFHeaderHeight = node.height // This is when we have conflicting information from @@ -1472,598 +1498,3 @@ func (b *blockManager) findPrevTestNetDifficulty(hList *list.List) (uint32, erro } return lastBits, nil } - -/* -import ( - "os" - "path/filepath" - "sort" - - "github.com/btcsuite/btcd/database" -) - -// handleBlockMsg handles block messages from all peers. -func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { - // If we didn't ask for this block then the peer is misbehaving. - blockHash := bmsg.block.Hash() - if _, exists := bmsg.peer.requestedBlocks[*blockHash]; !exists { - log.Warnf("Got unrequested block %v from %s -- "+ - "disconnecting", blockHash, bmsg.peer.Addr()) - bmsg.peer.Disconnect() - return - } - - // When in headers-first mode, if the block matches the hash of the - // first header in the list of headers that are being fetched, it's - // eligible for less validation since the headers have already been - // verified to link together and are valid up to the next checkpoint. - // Also, remove the list entry for all blocks except the checkpoint - // since it is needed to verify the next round of headers links - // properly. - isCheckpointBlock := false - behaviorFlags := blockchain.BFNone - firstNodeEl := b.headerList.Front() - if firstNodeEl != nil { - firstNode := firstNodeEl.Value.(*headerNode) - if blockHash.IsEqual(firstNode.hash) { - behaviorFlags |= blockchain.BFFastAdd - if firstNode.hash.IsEqual(b.nextCheckpoint.Hash) { - isCheckpointBlock = true - } else { - b.headerList.Remove(firstNodeEl) - } - } - } - - // Remove block from request maps. Either chain will know about it and - // so we shouldn't have any more instances of trying to fetch it, or we - // will fail the insert and thus we'll retry next time we get an inv. - delete(bmsg.peer.requestedBlocks, *blockHash) - delete(b.requestedBlocks, *blockHash) - - // Process the block to include validation, best chain selection, orphan - // handling, etc. - - _, isOrphan, err := b.chain.ProcessBlock(bmsg.block, behaviorFlags) - if err != nil { - // When the error is a rule error, it means the block was simply - // rejected as opposed to something actually going wrong, so log - // it as such. Otherwise, something really did go wrong, so log - // it as an actual error. - if _, ok := err.(blockchain.RuleError); ok { - log.Infof("Rejected block %v from %s: %v", blockHash, - bmsg.peer, err) - } else { - log.Errorf("Failed to process block %v: %v", - blockHash, err) - } - if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode == - database.ErrCorruption { - panic(dbErr) - } - - // Convert the error into an appropriate reject message and - // send it. - code, reason := mempool.ErrToRejectErr(err) - bmsg.peer.PushRejectMsg(wire.CmdBlock, code, reason, - blockHash, false) - return - } - - // Meta-data about the new block this peer is reporting. We use this - // below to update this peer's lastest block height and the heights of - // other peers based on their last announced block hash. This allows us - // to dynamically update the block heights of peers, avoiding stale - // heights when looking for a new sync peer. Upon acceptance of a block - // or recognition of an orphan, we also use this information to update - // the block heights over other peers who's invs may have been ignored - // if we are actively syncing while the chain is not yet current or - // who may have lost the lock announcment race. - var heightUpdate int32 - var blkHashUpdate *chainhash.Hash - - // Request the parents for the orphan block from the peer that sent it. - if isOrphan { - // We've just received an orphan block from a peer. In order - // to update the height of the peer, we try to extract the - // block height from the scriptSig of the coinbase transaction. - // Extraction is only attempted if the block's version is - // high enough (ver 2+). - header := &bmsg.block.MsgBlock().Header - if blockchain.ShouldHaveSerializedBlockHeight(header) { - coinbaseTx := bmsg.block.Transactions()[0] - cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx) - if err != nil { - log.Warnf("Unable to extract height from "+ - "coinbase tx: %v", err) - } else { - log.Debugf("Extracted height of %v from "+ - "orphan block", cbHeight) - heightUpdate = cbHeight - blkHashUpdate = blockHash - } - } - - orphanRoot := b.chain.GetOrphanRoot(blockHash) - locator, err := b.chain.LatestBlockLocator() - if err != nil { - log.Warnf("Failed to get block locator for the "+ - "latest block: %v", err) - } else { - bmsg.peer.PushGetBlocksMsg(locator, orphanRoot) - } - } else { - // When the block is not an orphan, log information about it and - // update the chain state. - b.progressLogger.LogBlockHeight(bmsg.block) - - // Update this peer's latest block height, for future - // potential sync node candidacy. - best := b.chain.BestSnapshot() - heightUpdate = best.Height - blkHashUpdate = &best.Hash - - // Clear the rejected transactions. - b.rejectedTxns = make(map[chainhash.Hash]struct{}) - - // Allow any clients performing long polling via the - // getblocktemplate RPC to be notified when the new block causes - // their old block template to become stale. - rpcServer := b.server.rpcServer - if rpcServer != nil { - rpcServer.gbtWorkState.NotifyBlockConnected(blockHash) - } - } - - // Update the block height for this peer. But only send a message to - // the server for updating peer heights if this is an orphan or our - // chain is "current". This avoids sending a spammy amount of messages - // if we're syncing the chain from scratch. - if blkHashUpdate != nil && heightUpdate != 0 { - bmsg.peer.UpdateLastBlockHeight(heightUpdate) - if isOrphan || b.current() { - go b.server.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer) - } - } - - // Nothing more to do if we aren't in headers-first mode. - if !b.headersFirstMode { - return - } - - // This is headers-first mode, so if the block is not a checkpoint - // request more blocks using the header list when the request queue is - // getting short. - if !isCheckpointBlock { - if b.startHeader != nil && - len(bmsg.peer.requestedBlocks) < minInFlightBlocks { - b.fetchHeaderBlocks() - } - return - } - - // This is headers-first mode and the block is a checkpoint. When - // there is a next checkpoint, get the next round of headers by asking - // for headers starting from the block after this one up to the next - // checkpoint. - prevHeight := b.nextCheckpoint.Height - prevHash := b.nextCheckpoint.Hash - b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) - if b.nextCheckpoint != nil { - locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) - err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) - if err != nil { - log.Warnf("Failed to send getheaders message to "+ - "peer %s: %v", bmsg.peer.Addr(), err) - return - } - log.Infof("Downloading headers for blocks %d to %d from "+ - "peer %s", prevHeight+1, b.nextCheckpoint.Height, - b.syncPeer.Addr()) - return - } - - // This is headers-first mode, the block is a checkpoint, and there are - // no more checkpoints, so switch to normal mode by requesting blocks - // from the block after this one up to the end of the chain (zero hash). - b.headersFirstMode = false - b.headerList.Init() - log.Infof("Reached the final checkpoint -- switching to normal mode") - locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) - err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) - if err != nil { - log.Warnf("Failed to send getblocks message to peer %s: %v", - bmsg.peer.Addr(), err) - return - } -} - -// fetchHeaderBlocks creates and sends a request to the syncPeer for the next -// list of blocks to be downloaded based on the current list of headers. -func (b *blockManager) fetchHeaderBlocks() { - // Nothing to do if there is no start header. - if b.startHeader == nil { - log.Warnf("fetchHeaderBlocks called with no start header") - return - } - - // Build up a getdata request for the list of blocks the headers - // describe. The size hint will be limited to wire.MaxInvPerMsg by - // the function, so no need to double check it here. - gdmsg := wire.NewMsgGetDataSizeHint(uint(b.headerList.Len())) - numRequested := 0 - for e := b.startHeader; e != nil; e = e.Next() { - node, ok := e.Value.(*headerNode) - if !ok { - log.Warn("Header list node type is not a headerNode") - continue - } - - iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) - haveInv, err := b.haveInventory(iv) - if err != nil { - log.Warnf("Unexpected failure when checking for "+ - "existing inventory during header block "+ - "fetch: %v", err) - } - if !haveInv { - b.requestedBlocks[*node.hash] = struct{}{} - b.syncPeer.requestedBlocks[*node.hash] = struct{}{} - gdmsg.AddInvVect(iv) - numRequested++ - } - b.startHeader = e.Next() - if numRequested >= wire.MaxInvPerMsg { - break - } - } - if len(gdmsg.InvList) > 0 { - b.syncPeer.QueueMessage(gdmsg, nil) - } -} - -// haveInventory returns whether or not the inventory represented by the passed -// inventory vector is known. This includes checking all of the various places -// inventory can be when it is in different states such as blocks that are part -// of the main chain, on a side chain, in the orphan pool, and transactions that -// are in the memory pool (either the main pool or orphan pool). -func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { - switch invVect.Type { - case wire.InvTypeBlock: - // Ask chain if the block is known to it in any form (main - // chain, side chain, or orphan). - return b.chain.HaveBlock(&invVect.Hash) - - case wire.InvTypeTx: - // Ask the transaction memory pool if the transaction is known - // to it in any form (main pool or orphan). - if b.server.txMemPool.HaveTransaction(&invVect.Hash) { - return true, nil - } - - // Check if the transaction exists from the point of view of the - // end of the main chain. - entry, err := b.chain.FetchUtxoEntry(&invVect.Hash) - if err != nil { - return false, err - } - return entry != nil && !entry.IsFullySpent(), nil - } - - // The requested inventory is is an unsupported type, so just claim - // it is known to avoid requesting it. - return true, nil -} - -// limitMap is a helper function for maps that require a maximum limit by -// evicting a random transaction if adding a new value would cause it to -// overflow the maximum allowed. -func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { - if len(m)+1 > limit { - // Remove a random entry from the map. For most compilers, Go's - // range statement iterates starting at a random item although - // that is not 100% guaranteed by the spec. The iteration order - // is not important here because an adversary would have to be - // able to pull off preimage attacks on the hashing function in - // order to target eviction of specific entries anyways. - for txHash := range m { - delete(m, txHash) - return - } - } -} - -// handleNotifyMsg handles notifications from blockchain. It does things such -// as request orphan block parents and relay accepted blocks to connected peers. -func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { - switch notification.Type { - // A block has been accepted into the block chain. Relay it to other - // peers. - case blockchain.NTBlockAccepted: - // Don't relay if we are not current. Other peers that are - // current should already know about it. - if !b.current() { - return - } - - block, ok := notification.Data.(*btcutil.Block) - if !ok { - log.Warnf("Chain accepted notification is not a block.") - break - } - - // Generate the inventory vector and relay it. - //iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) - //b.server.RelayInventory(iv, block.MsgBlock().Header) - - // A block has been connected to the main block chain. - case blockchain.NTBlockConnected: - block, ok := notification.Data.(*btcutil.Block) - if !ok { - log.Warnf("Chain connected notification is not a block.") - break - } - - // Remove all of the transactions (except the coinbase) in the - // connected block from the transaction pool. Secondly, remove any - // transactions which are now double spends as a result of these - // new transactions. Finally, remove any transaction that is - // no longer an orphan. Transactions which depend on a confirmed - // transaction are NOT removed recursively because they are still - // valid. - for _, tx := range block.Transactions()[1:] { - b.server.txMemPool.RemoveTransaction(tx, false) - b.server.txMemPool.RemoveDoubleSpends(tx) - b.server.txMemPool.RemoveOrphan(tx) - acceptedTxs := b.server.txMemPool.ProcessOrphans(tx) - b.server.AnnounceNewTransactions(acceptedTxs) - } - - if r := b.server.rpcServer; r != nil { - // Now that this block is in the blockchain we can mark - // all the transactions (except the coinbase) as no - // longer needing rebroadcasting. - for _, tx := range block.Transactions()[1:] { - iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) - b.server.RemoveRebroadcastInventory(iv) - } - - // Notify registered websocket clients of incoming block. - r.ntfnMgr.NotifyBlockConnected(block) - } - - // A block has been disconnected from the main block chain. - case blockchain.NTBlockDisconnected: - block, ok := notification.Data.(*btcutil.Block) - if !ok { - log.Warnf("Chain disconnected notification is not a block.") - break - } - - // Reinsert all of the transactions (except the coinbase) into - // the transaction pool. - for _, tx := range block.Transactions()[1:] { - _, _, err := b.server.txMemPool.MaybeAcceptTransaction(tx, - false, false) - if err != nil { - // Remove the transaction and all transactions - // that depend on it if it wasn't accepted into - // the transaction pool. - b.server.txMemPool.RemoveTransaction(tx, true) - } - } - - // Notify registered websocket clients. - if r := b.server.rpcServer; r != nil { - r.ntfnMgr.NotifyBlockDisconnected(block) - } - } -} - -// QueueBlock adds the passed block message and peer to the block handling queue. -func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer) { - // Don't accept more blocks if we're shutting down. - if atomic.LoadInt32(&b.shutdown) != 0 { - sp.blockProcessed <- struct{}{} - return - } - - b.msgChan <- &blockMsg{block: block, peer: sp} -} - -// SyncPeer returns the current sync peer. -func (b *blockManager) SyncPeer() *serverPeer { - reply := make(chan *serverPeer) - b.msgChan <- getSyncPeerMsg{reply: reply} - return <-reply -} - -// ProcessBlock makes use of ProcessBlock on an internal instance of a block -// chain. It is funneled through the block manager since btcchain is not safe -// for concurrent access. -func (b *blockManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { - reply := make(chan processBlockResponse, 1) - b.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} - response := <-reply - return response.isOrphan, response.err -} - - -// checkpointSorter implements sort.Interface to allow a slice of checkpoints to -// be sorted. -type checkpointSorter []chaincfg.Checkpoint - -// Len returns the number of checkpoints in the slice. It is part of the -// sort.Interface implementation. -func (s checkpointSorter) Len() int { - return len(s) -} - -// Swap swaps the checkpoints at the passed indices. It is part of the -// sort.Interface implementation. -func (s checkpointSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -// Less returns whether the checkpoint with index i should sort before the -// checkpoint with index j. It is part of the sort.Interface implementation. -func (s checkpointSorter) Less(i, j int) bool { - return s[i].Height < s[j].Height -} - -// mergeCheckpoints returns two slices of checkpoints merged into one slice -// such that the checkpoints are sorted by height. In the case the additional -// checkpoints contain a checkpoint with the same height as a checkpoint in the -// default checkpoints, the additional checkpoint will take precedence and -// overwrite the default one. -func mergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint { - // Create a map of the additional checkpoints to remove duplicates while - // leaving the most recently-specified checkpoint. - extra := make(map[int32]chaincfg.Checkpoint) - for _, checkpoint := range additional { - extra[checkpoint.Height] = checkpoint - } - - // Add all default checkpoints that do not have an override in the - // additional checkpoints. - numDefault := len(defaultCheckpoints) - checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra)) - for _, checkpoint := range defaultCheckpoints { - if _, exists := extra[checkpoint.Height]; !exists { - checkpoints = append(checkpoints, checkpoint) - } - } - - // Append the additional checkpoints and return the sorted results. - for _, checkpoint := range extra { - checkpoints = append(checkpoints, checkpoint) - } - sort.Sort(checkpointSorter(checkpoints)) - return checkpoints -} - -// removeRegressionDB removes the existing regression test database if running -// in regression test mode and it already exists. -func removeRegressionDB(dbPath string) error { - // Don't do anything if not in regression test mode. - if !cfg.RegressionTest { - return nil - } - - // Remove the old regression test database if it already exists. - fi, err := os.Stat(dbPath) - if err == nil { - btcdLog.Infof("Removing regression test database from '%s'", dbPath) - if fi.IsDir() { - err := os.RemoveAll(dbPath) - if err != nil { - return err - } - } else { - err := os.Remove(dbPath) - if err != nil { - return err - } - } - } - - return nil -} - -// dbPath returns the path to the block database given a database type. -func blockDbPath(dbType string) string { - // The database name is based on the database type. - dbName := blockDbNamePrefix + "_" + dbType - if dbType == "sqlite" { - dbName = dbName + ".db" - } - dbPath := filepath.Join(cfg.DataDir, dbName) - return dbPath -} - -// warnMultipeDBs shows a warning if multiple block database types are detected. -// This is not a situation most users want. It is handy for development however -// to support multiple side-by-side databases. -func warnMultipeDBs() { - // This is intentionally not using the known db types which depend - // on the database types compiled into the binary since we want to - // detect legacy db types as well. - dbTypes := []string{"ffldb", "leveldb", "sqlite"} - duplicateDbPaths := make([]string, 0, len(dbTypes)-1) - for _, dbType := range dbTypes { - if dbType == cfg.DbType { - continue - } - - // Store db path as a duplicate db if it exists. - dbPath := blockDbPath(dbType) - if fileExists(dbPath) { - duplicateDbPaths = append(duplicateDbPaths, dbPath) - } - } - - // Warn if there are extra databases. - if len(duplicateDbPaths) > 0 { - selectedDbPath := blockDbPath(cfg.DbType) - btcdLog.Warnf("WARNING: There are multiple block chain databases "+ - "using different database types.\nYou probably don't "+ - "want to waste disk space by having more than one.\n"+ - "Your current database is located at [%v].\nThe "+ - "additional database is located at %v", selectedDbPath, - duplicateDbPaths) - } -} - -// loadBlockDB loads (or creates when needed) the block database taking into -// account the selected database backend and returns a handle to it. It also -// contains additional logic such warning the user if there are multiple -// databases which consume space on the file system and ensuring the regression -// test database is clean when in regression test mode. -func loadBlockDB() (database.DB, error) { - // The memdb backend does not have a file path associated with it, so - // handle it uniquely. We also don't want to worry about the multiple - // database type warnings when running with the memory database. - if cfg.DbType == "memdb" { - btcdLog.Infof("Creating block database in memory.") - db, err := database.Create(cfg.DbType) - if err != nil { - return nil, err - } - return db, nil - } - - warnMultipeDBs() - - // The database name is based on the database type. - dbPath := blockDbPath(cfg.DbType) - - // The regression test is special in that it needs a clean database for - // each run, so remove it now if it already exists. - removeRegressionDB(dbPath) - - btcdLog.Infof("Loading block database from '%s'", dbPath) - db, err := database.Open(cfg.DbType, dbPath, activeNetParams.Net) - if err != nil { - // Return the error if it's not because the database doesn't - // exist. - if dbErr, ok := err.(database.Error); !ok || dbErr.ErrorCode != - database.ErrDbDoesNotExist { - - return nil, err - } - - // Create the db if it does not exist. - err = os.MkdirAll(cfg.DataDir, 0700) - if err != nil { - return nil, err - } - db, err = database.Create(cfg.DbType, dbPath, activeNetParams.Net) - if err != nil { - return nil, err - } - } - - btcdLog.Info("Block database loaded") - return db, nil -} -*/ diff --git a/spvsvc/spvchain/db.go b/spvsvc/spvchain/db.go index 945a051..114ad31 100644 --- a/spvsvc/spvchain/db.go +++ b/spvsvc/spvchain/db.go @@ -1,3 +1,5 @@ +// NOTE: THIS API IS UNSTABLE RIGHT NOW. + package spvchain import ( @@ -381,29 +383,6 @@ func rollBackLastBlock(bs *waddrmgr.BlockStamp) dbUpdateOption { } } -// rollBackToHeight rolls back all blocks until it hits the specified height. -func (s *ChainService) rollBackToHeight(height uint32) (*waddrmgr.BlockStamp, error) { - var bs waddrmgr.BlockStamp - err := s.dbUpdate(rollBackToHeight(height, &bs)) - return &bs, err -} - -func rollBackToHeight(height uint32, bs *waddrmgr.BlockStamp) dbUpdateOption { - return func(bucket walletdb.ReadWriteBucket) error { - err := syncedTo(bs)(bucket) - if err != nil { - return err - } - for uint32(bs.Height) > height { - err = rollBackLastBlock(bs)(bucket) - if err != nil { - return err - } - } - return nil - } -} - // GetBlockByHash retrieves the block header, filter, and filter tip, based on // the provided block hash, from the database. func (s *ChainService) GetBlockByHash(blockHash chainhash.Hash) ( diff --git a/spvsvc/spvchain/notifications.go b/spvsvc/spvchain/notifications.go index d887a4d..7c6af69 100644 --- a/spvsvc/spvchain/notifications.go +++ b/spvsvc/spvchain/notifications.go @@ -2,6 +2,8 @@ // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. +// NOTE: THIS API IS UNSTABLE RIGHT NOW. + package spvchain import ( diff --git a/spvsvc/spvchain/query.go b/spvsvc/spvchain/query.go index 4f8d68b..41ee87f 100644 --- a/spvsvc/spvchain/query.go +++ b/spvsvc/spvchain/query.go @@ -1,3 +1,5 @@ +// NOTE: THIS API IS UNSTABLE RIGHT NOW. + package spvchain import ( @@ -41,7 +43,8 @@ type queryOptions struct { // QueryOption is a functional option argument to any of the network query // methods, such as GetBlockFromNetwork and GetCFilter (when that resorts to a -// network query). +// network query). These are always processed in order, with later options +// overriding earlier ones. type QueryOption func(*queryOptions) // defaultQueryOptions returns a queryOptions set to package-level defaults. diff --git a/spvsvc/spvchain/rescan.go b/spvsvc/spvchain/rescan.go new file mode 100644 index 0000000..db94df9 --- /dev/null +++ b/spvsvc/spvchain/rescan.go @@ -0,0 +1,415 @@ +// NOTE: THIS API IS UNSTABLE RIGHT NOW. + +package spvchain + +import ( + "bytes" + "fmt" + + "github.com/btcsuite/btcd/btcjson" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcrpcclient" + "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcutil/gcs" + "github.com/btcsuite/btcutil/gcs/builder" + "github.com/btcsuite/btcwallet/waddrmgr" +) + +// Relevant package-level variables live here +var () + +// Functional parameters for Rescan +type rescanOptions struct { + queryOptions []QueryOption + ntfn btcrpcclient.NotificationHandlers + startBlock *waddrmgr.BlockStamp + endBlock *waddrmgr.BlockStamp + watchAddrs []btcutil.Address + watchOutPoints []wire.OutPoint + quit <-chan struct{} +} + +// RescanOption is a functional option argument to any of the rescan and +// notification subscription methods. These are always processed in order, with +// later options overriding earlier ones. +type RescanOption func(ro *rescanOptions) + +func defaultRescanOptions() *rescanOptions { + return &rescanOptions{} +} + +// QueryOptions pass onto the underlying queries. +func QueryOptions(options ...QueryOption) RescanOption { + return func(ro *rescanOptions) { + ro.queryOptions = options + } +} + +// NotificationHandlers specifies notification handlers for the rescan. These +// will always run in the same goroutine as the caller. +func NotificationHandlers(ntfn btcrpcclient.NotificationHandlers) RescanOption { + return func(ro *rescanOptions) { + ro.ntfn = ntfn + } +} + +// StartBlock specifies the start block. The hash is checked first; if there's +// no such hash (zero hash avoids lookup), the height is checked next. If +// the height is 0 or the start block isn't specified, starts from the genesis +// block. This block is assumed to already be known, and no notifications will +// be sent for this block. +func StartBlock(startBlock *waddrmgr.BlockStamp) RescanOption { + return func(ro *rescanOptions) { + ro.startBlock = startBlock + } +} + +// EndBlock specifies the end block. The hash is checked first; if there's no +// such hash (zero hash avoids lookup), the height is checked next. If the +// height is 0 or the end block isn't specified, the quit channel MUST be +// specified as Rescan will sync to the tip of the blockchain and continue to +// stay in sync and pass notifications. This is enforced at runtime. +func EndBlock(startBlock *waddrmgr.BlockStamp) RescanOption { + return func(ro *rescanOptions) { + ro.startBlock = startBlock + } +} + +// WatchAddrs specifies the addresses to watch/filter for. Each call to this +// function adds to the list of addresses being watched rather than replacing +// the list. Each time a transaction spends to the specified address, the +// outpoint is added to the WatchOutPoints list. +func WatchAddrs(watchAddrs ...btcutil.Address) RescanOption { + return func(ro *rescanOptions) { + ro.watchAddrs = append(ro.watchAddrs, watchAddrs...) + } +} + +// WatchOutPoints specifies the outpoints to watch for on-chain spends. Each +// call to this function adds to the list of outpoints being watched rather +// than replacing the list. +func WatchOutPoints(watchOutPoints ...wire.OutPoint) RescanOption { + return func(ro *rescanOptions) { + ro.watchOutPoints = append(ro.watchOutPoints, watchOutPoints...) + } +} + +// QuitChan specifies the quit channel. This can be used by the caller to let +// an indefinite rescan (one with no EndBlock set) know it should gracefully +// shut down. If this isn't specified, an end block MUST be specified as Rescan +// must know when to stop. This is enforced at runtime. +func QuitChan(quit <-chan struct{}) RescanOption { + return func(ro *rescanOptions) { + ro.quit = quit + } +} + +// Rescan is a single-threaded function that uses headers from the database and +// functional options as arguments. +func (s *ChainService) Rescan(options ...RescanOption) error { + ro := defaultRescanOptions() + ro.endBlock = &waddrmgr.BlockStamp{ + Hash: *s.chainParams.GenesisHash, + Height: 0, + } + for _, option := range options { + option(ro) + } + + var watchList [][]byte + // If we have something to watch, create a watch list. + if len(ro.watchAddrs) != 0 || len(ro.watchOutPoints) != 0 { + for _, addr := range ro.watchAddrs { + watchList = append(watchList, addr.ScriptAddress()) + } + for _, op := range ro.watchOutPoints { + watchList = append(watchList, + builder.OutPointToFilterEntry(op)) + } + } else { + return fmt.Errorf("Rescan must specify addresses and/or " + + "outpoints to watch") + } + + // Check that we have either an end block or a quit channel. + if ro.endBlock != nil { + if (ro.endBlock.Hash == chainhash.Hash{}) { + ro.endBlock.Height = 0 + } else { + _, height, err := s.GetBlockByHash( + ro.endBlock.Hash) + if err != nil { + ro.endBlock.Height = int32(height) + } else { + if height == 0 { + ro.endBlock.Hash = chainhash.Hash{} + } else { + header, err := + s.GetBlockByHeight(height) + if err == nil { + ro.endBlock.Hash = + header.BlockHash() + } else { + ro.endBlock = + &waddrmgr.BlockStamp{} + } + } + } + } + } else { + ro.endBlock = &waddrmgr.BlockStamp{} + } + if ro.quit == nil && ro.endBlock.Height == 0 { + return fmt.Errorf("Rescan request must specify a quit channel" + + " or valid end block") + } + + // Track our position in the chain. + var curHeader wire.BlockHeader + curStamp := *ro.startBlock + + // Find our starting block. + if (curStamp.Hash != chainhash.Hash{}) { + header, height, err := s.GetBlockByHash(curStamp.Hash) + if err == nil { + curHeader = header + curStamp.Height = int32(height) + } else { + curStamp.Hash = chainhash.Hash{} + } + } + if (curStamp.Hash == chainhash.Hash{}) { + if curStamp.Height == 0 { + curStamp.Hash = *s.chainParams.GenesisHash + } else { + header, err := s.GetBlockByHeight( + uint32(curStamp.Height)) + if err == nil { + curHeader = header + curStamp.Hash = curHeader.BlockHash() + } else { + curHeader = + s.chainParams.GenesisBlock.Header + curStamp.Hash = + *s.chainParams.GenesisHash + curStamp.Height = 0 + } + } + } + + // Listen for notifications. + blockConnected := make(chan wire.BlockHeader) + blockDisconnected := make(chan wire.BlockHeader) + subscription := blockSubscription{ + onConnectBasic: blockConnected, + onDisconnect: blockDisconnected, + quit: ro.quit, + } + + // Loop through blocks, one at a time. This relies on the underlying + // ChainService API to send blockConnected and blockDisconnected + // notifications in the correct order. + current := false +rescanLoop: + for { + // If we're current, we wait for notifications. + if current { + // Wait for a signal that we have a newly connected + // header and cfheader, or a newly disconnected header; + // alternatively, forward ourselves to the next block + // if possible. + select { + case <-ro.quit: + s.unsubscribeBlockMsgs(subscription) + return nil + case header := <-blockConnected: + // Only deal with the next block from what we + // know about. Otherwise, it's in the future. + if header.PrevBlock != curStamp.Hash { + continue rescanLoop + } + curHeader = header + curStamp.Hash = header.BlockHash() + curStamp.Height++ + case header := <-blockDisconnected: + // Only deal with it if it's the current block + // we know about. Otherwise, it's in the future. + if header.BlockHash() == curStamp.Hash { + // Run through notifications. This is + // all single-threaded. We include + // deprecated calls as they're still + // used, for now. + if ro.ntfn. + OnFilteredBlockDisconnected != + nil { + ro.ntfn. + OnFilteredBlockDisconnected( + curStamp.Height, + &curHeader) + } + if ro.ntfn.OnBlockDisconnected != nil { + ro.ntfn.OnBlockDisconnected( + &curStamp.Hash, + curStamp.Height, + curHeader.Timestamp) + } + header, _, err := s.GetBlockByHash( + header.PrevBlock) + if err != nil { + return err + } + curHeader = header + curStamp.Hash = header.BlockHash() + curStamp.Height-- + } + continue rescanLoop + } + } else { + // Since we're not current, we try to manually advance + // the block. If we fail, we mark outselves as current + // and follow notifications. + header, err := s.GetBlockByHeight(uint32( + curStamp.Height + 1)) + if err != nil { + current = true + // Subscribe to block notifications. + s.subscribeBlockMsg(subscription) + continue rescanLoop + } + curHeader = header + curStamp.Height++ + curStamp.Hash = header.BlockHash() + } + + // At this point, we've found the block header that's next in + // our rescan. First, if we're sending out BlockConnected + // notifications, do that. + if ro.ntfn.OnBlockConnected != nil { + ro.ntfn.OnBlockConnected(&curStamp.Hash, + curStamp.Height, curHeader.Timestamp) + } + // Now we need to see if it matches the rescan's filters, so we + // get the basic filter from the DB or network. + var block *btcutil.Block + var relevantTxs []*btcutil.Tx + filter := s.GetCFilter(curStamp.Hash, false) + // If we have no transactions, we send a notification + if filter != nil && filter.N() != 0 { + // We see if any relevant transactions match. + key := builder.DeriveKey(&curStamp.Hash) + matched, err := filter.MatchAny(key, watchList) + if err != nil { + return err + } + if matched { + // We've matched. Now we actually get the block + // and cycle through the transactions to see + // which ones are relevant. + block = s.GetBlockFromNetwork( + curStamp.Hash, ro.queryOptions...) + if block == nil { + return fmt.Errorf("Couldn't get block "+ + "%d (%s)", curStamp.Height, + curStamp.Hash) + } + relevantTxs, err = notifyBlock(block, filter, + &ro.watchOutPoints, ro.watchAddrs, + ro.ntfn) + if err != nil { + return err + } + } + } + if ro.ntfn.OnFilteredBlockConnected != nil { + ro.ntfn.OnFilteredBlockConnected( + block.Height(), + &(block.MsgBlock().Header), + relevantTxs) + } + } +} + +// notifyBlock notifies listeners based on the block filter. It writes back to +// the outPoints argument the updated list of outpoints to monitor based on +// matched addresses. +func notifyBlock(block *btcutil.Block, filter *gcs.Filter, + outPoints *[]wire.OutPoint, addrs []btcutil.Address, + ntfn btcrpcclient.NotificationHandlers) ([]*btcutil.Tx, error) { + var relevantTxs []*btcutil.Tx + blockHeader := block.MsgBlock().Header + details := btcjson.BlockDetails{ + Height: block.Height(), + Hash: block.Hash().String(), + Time: blockHeader.Timestamp.Unix(), + } + for txIdx, tx := range block.Transactions() { + relevant := false + txDetails := details + txDetails.Index = txIdx + for _, in := range tx.MsgTx().TxIn { + if relevant { + break + } + for _, op := range *outPoints { + if in.PreviousOutPoint == + op { + relevant = true + if ntfn.OnRedeemingTx != nil { + ntfn.OnRedeemingTx( + tx, + &txDetails, + ) + } + break + } + } + } + for outIdx, out := range tx.MsgTx().TxOut { + if relevant { + break + } + pushedData, err := + txscript.PushedData( + out.PkScript) + if err != nil { + continue + } + for _, addr := range addrs { + if relevant { + break + } + for _, data := range pushedData { + if bytes.Equal(data, + addr.ScriptAddress()) { + relevant = true + hash := + tx.Hash() + outPoint := wire.OutPoint{ + Hash: *hash, + Index: uint32(outIdx), + } + *outPoints = + append( + *outPoints, + outPoint, + ) + if ntfn.OnRecvTx != nil { + ntfn.OnRecvTx( + tx, + &txDetails, + ) + } + break + } + } + + } + } + if relevant { + relevantTxs = append(relevantTxs, tx) + } + } + return relevantTxs, nil +} diff --git a/spvsvc/spvchain/spvchain.go b/spvsvc/spvchain/spvchain.go index b6f73ce..2f575f7 100644 --- a/spvsvc/spvchain/spvchain.go +++ b/spvsvc/spvchain/spvchain.go @@ -1,3 +1,6 @@ +// NOTE: THIS API IS UNSTABLE RIGHT NOW. +// TODO: Add functional options to ChainService instantiation. + package spvchain import ( @@ -17,6 +20,7 @@ import ( "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/walletdb" ) @@ -517,6 +521,15 @@ func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, sp.server.AddBytesSent(uint64(bytesWritten)) } +// blockSubscription allows a client to subscribe to and unsubscribe from block +// connect and disconnect notifications. +type blockSubscription struct { + onConnectBasic chan<- wire.BlockHeader + onConnectExt chan<- wire.BlockHeader + onDisconnect chan<- wire.BlockHeader + quit <-chan struct{} +} + // ChainService is instantiated with functional options type ChainService struct { // The following variables must only be used atomically. @@ -540,6 +553,8 @@ type ChainService struct { quit chan struct{} timeSource blockchain.MedianTimeSource services wire.ServiceFlag + blockSubscribers map[blockSubscription]struct{} + mtxSubscribers sync.RWMutex userAgentName string userAgentVersion string @@ -601,6 +616,43 @@ func (s *ChainService) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan return nil } +// rollBackToHeight rolls back all blocks until it hits the specified height. +// It sends notifications along the way. +func (s *ChainService) rollBackToHeight(height uint32) (*waddrmgr.BlockStamp, + error) { + bs, err := s.SyncedTo() + if err != nil { + return nil, err + } + for uint32(bs.Height) > height { + header, _, err := s.GetBlockByHash(bs.Hash) + if err != nil { + return nil, err + } + bs, err = s.rollBackLastBlock() + if err != nil { + return nil, err + } + // Now we send the block disconnected notifications. + // TODO: Rethink this so we don't send notifications + // outside the package directly from here, and so we + // don't end up halting in the middle of processing + // blocks if a client mishandles a channel while still + // guaranteeing in-order delivery. + s.mtxSubscribers.RLock() + for sub := range s.blockSubscribers { + if sub.onDisconnect != nil { + select { + case sub.onDisconnect <- header: + case <-sub.quit: + } + } + } + s.mtxSubscribers.RUnlock() + } + return bs, nil +} + // peerHandler is used to handle peer operations such as adding and removing // peers to and from the server, banning peers, and broadcasting messages to // peers. It must be run in a goroutine. @@ -1194,6 +1246,11 @@ out: s.wg.Done() } +// ChainParams returns a copy of the ChainService's chaincfg.Params. +func (s *ChainService) ChainParams() chaincfg.Params { + return s.chainParams +} + // Start begins connecting to peers and syncing the blockchain. func (s *ChainService) Start() { // Already started? @@ -1228,3 +1285,19 @@ func (s *ChainService) Stop() error { func (s *ChainService) IsCurrent() bool { return s.blockManager.IsCurrent() } + +// subscribeBlockMsg handles adding block subscriptions to the ChainService. +// TODO: Rethink this. +func (s *ChainService) subscribeBlockMsg(subscription blockSubscription) { + s.mtxSubscribers.Lock() + defer s.mtxSubscribers.Unlock() + s.blockSubscribers[subscription] = struct{}{} +} + +// unsubscribeBlockMsgs handles removing block subscriptions from the +// ChainService. +func (s *ChainService) unsubscribeBlockMsgs(subscription blockSubscription) { + s.mtxSubscribers.Lock() + defer s.mtxSubscribers.Unlock() + delete(s.blockSubscribers, subscription) +} diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go index a3a549c..9067288 100644 --- a/spvsvc/spvchain/sync_test.go +++ b/spvsvc/spvchain/sync_test.go @@ -1,3 +1,5 @@ +// TODO: Break up tests into bite-sized pieces. + package spvchain_test import ( @@ -11,12 +13,15 @@ import ( "time" "github.com/aakselrod/btctestlog" + "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpctest" + "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" "github.com/btcsuite/btcrpcclient" + "github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil/gcs/builder" "github.com/btcsuite/btcwallet/spvsvc/spvchain" "github.com/btcsuite/btcwallet/waddrmgr" @@ -31,10 +36,43 @@ var ( // Don't set this too high for your platform, or the tests will miss // messages. // TODO: Make this a benchmark instead. + // TODO: Implement load limiting for both outgoing and incoming + // messages. numQueryThreads = 50 queryOptions = []spvchain.QueryOption{ //spvchain.NumRetries(5), } + // The sequence of connecting blocks. + conn = func() []int32 { + blocks := []int32{} + for i := 801; i <= 928; i++ { + blocks = append(blocks, int32(i)) + } + for i := 926; i <= 930; i++ { + blocks = append(blocks, int32(i)) + } + for i := 926; i <= 935; i++ { + blocks = append(blocks, int32(i)) + } + return blocks + } + // The sequence of disconnecting blocks. + dconn = func() []int32 { + blocks := []int32{} + for i := 928; i >= 926; i-- { + blocks = append(blocks, int32(i)) + } + for i := 930; i >= 926; i-- { + blocks = append(blocks, int32(i)) + } + return blocks + } + // Blocks with relevant transactions + relevant = []int32{801, 929, 930} + // Blocks with receive transactions + receive = []int32{801} + // Blocks with redeeming transactions + redeem = []int32{929, 930} ) func TestSetup(t *testing.T) { @@ -184,9 +222,49 @@ func TestSetup(t *testing.T) { t.Fatalf("Testing blocks and cfilters failed: %s", err) } - // Generate 125 blocks on h1 to make sure it reorgs the other nodes. + // Generate an address and send it some coins on the h1 chain. We use + // this to test rescans and notifications. + privKey, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("Couldn't generate private key: %s", err) + } + pubKeyHash := btcutil.Hash160(privKey.PubKey().SerializeCompressed()) + addr, err := btcutil.NewAddressWitnessPubKeyHash(pubKeyHash, &modParams) + if err != nil { + t.Fatalf("Couldn't create address from key: %s", err) + } + script, err := txscript.PayToAddrScript(addr) + if err != nil { + t.Fatalf("Couldn't create script from address: %s", err) + } + out := wire.TxOut{ + PkScript: script, + Value: 1000000000, + } + tx1, err := h1.CreateTransaction([]*wire.TxOut{&out}, 1000) + if err != nil { + t.Fatalf("Couldn't create transaction from script: %s", err) + } + utx1 := btcutil.NewTx(tx1) + utx1.SetIndex(1) + tx2, err := h1.CreateTransaction([]*wire.TxOut{&out}, 1000) + if err != nil { + t.Fatalf("Couldn't create transaction from script: %s", err) + } + utx2 := btcutil.NewTx(tx2) + utx2.SetIndex(2) + if tx1.TxHash() == tx2.TxHash() { + t.Fatalf("Created two identical transactions") + } + _, err = h1.GenerateAndSubmitBlock([]*btcutil.Tx{utx1, utx2}, + -1, time.Time{}) + if err != nil { + t.Fatalf("Couldn't generate/submit block: %s") + } + + // Generate 124 blocks on h1 to make sure it reorgs the other nodes. // Ensure the ChainService instance stays caught up. - h1.Node.Generate(125) + h1.Node.Generate(124) err = waitForSync(t, svc, h1) if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) @@ -198,6 +276,41 @@ func TestSetup(t *testing.T) { t.Fatalf("Couldn't sync h2 to h1: %s", err) } + // Spend the outputs we sent ourselves. + _ = func(tx wire.MsgTx) func(target btcutil.Amount) ( + total btcutil.Amount, inputs []*wire.TxIn, + inputValues []btcutil.Amount, scripts [][]byte, err error) { + ourIndex := 1 << 30 // Should work on 32-bit systems + for i, txo := range tx.TxOut { + if bytes.Equal(txo.PkScript, script) { + ourIndex = i + } + } + return func(target btcutil.Amount) (total btcutil.Amount, + inputs []*wire.TxIn, inputValues []btcutil.Amount, + scripts [][]byte, err error) { + if ourIndex == 1<<30 { + err = fmt.Errorf("Couldn't find our address " + + "in the passed transaction's outputs.") + return + } + total = target + inputs = []*wire.TxIn{ + &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{ + Hash: tx.TxHash(), + Index: uint32(ourIndex), + }, + }, + } + inputValues = []btcutil.Amount{ + btcutil.Amount(tx.TxOut[ourIndex].Value)} + scripts = [][]byte{tx.TxOut[ourIndex].PkScript} + err = nil + return + } + } + // Generate 3 blocks on h1, one at a time, to make sure the // ChainService instance stays caught up. for i := 0; i < 3; i++ { diff --git a/spvsvc/spvsvc.go b/spvsvc/spvsvc.go index a3de2d3..b321d10 100644 --- a/spvsvc/spvsvc.go +++ b/spvsvc/spvsvc.go @@ -1,22 +1,11 @@ package spvsvc -import ( - "fmt" - "net" - "time" - - "github.com/btcsuite/btcd/addrmgr" - "github.com/btcsuite/btcd/connmgr" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcwallet/spvsvc/spvchain" - "github.com/btcsuite/btcwallet/wallet" -) +import "github.com/btcsuite/btcwallet/spvsvc/spvchain" // SynchronizationService provides an SPV, p2p-based backend for a wallet to // synchronize it with the network and send transactions it signs. type SynchronizationService struct { - wallet *wallet.Wallet - chainService spvchain.ChainService + chain spvchain.ChainService } // SynchronizationServiceOpt is the return type of functional options for @@ -27,8 +16,8 @@ type SynchronizationServiceOpt func(*SynchronizationService) error // functional options. func NewSynchronizationService(opts ...SynchronizationServiceOpt) (*SynchronizationService, error) { s := SynchronizationService{ - userAgentName: defaultUserAgentName, - userAgentVersion: defaultUserAgentVersion, + //userAgentName: defaultUserAgentName, + //userAgentVersion: defaultUserAgentVersion, } for _, opt := range opts { err := opt(&s) @@ -43,235 +32,8 @@ func NewSynchronizationService(opts ...SynchronizationServiceOpt) (*Synchronizat // appears to other nodes. func UserAgent(agentName, agentVersion string) SynchronizationServiceOpt { return func(s *SynchronizationService) error { - s.userAgentName = agentName - s.userAgentVersion = agentVersion + //s.userAgentName = agentName + //s.userAgentVersion = agentVersion return nil } } - -// AddrManager is a functional option to create an address manager for the -// synchronization service. It takes a string as an argument to specify the -// directory in which to store addresses. -func AddrManager(dir string) SynchronizationServiceOpt { - return func(s *SynchronizationService) error { - m := addrmgr.New(dir, spvLookup) - s.addrManager = m - return nil - } -} - -// ConnManagerOpt is the return type of functional options to create a -// connection manager for the synchronization service. -type ConnManagerOpt func(*connmgr.Config) error - -// ConnManager is a functional option to create a connection manager for the -// synchronization service. -func ConnManager(opts ...ConnManagerOpt) SynchronizationServiceOpt { - return func(s *SynchronizationService) error { - c := connmgr.Config{ - TargetOutbound: defaultTargetOutbound, - RetryDuration: connectionRetryInterval, - GetNewAddress: s.getNewAddress, - } - for _, opt := range opts { - err := opt(&c) - if err != nil { - return err - } - } - connManager, err := connmgr.New(&c) - if err != nil { - return err - } - s.connManager = connManager - return nil - } -} - -// TargetOutbound is a functional option to specify how many outbound -// connections should be made by the ConnManager to peers. Defaults to 8. -func TargetOutbound(target uint32) ConnManagerOpt { - return func(c *connmgr.Config) error { - c.TargetOutbound = target - return nil - } -} - -// RetryDuration is a functional option to specify how long to wait before -// retrying a connection request. Defaults to 5s. -func RetryDuration(duration time.Duration) ConnManagerOpt { - return func(c *connmgr.Config) error { - c.RetryDuration = duration - return nil - } -} - -func (s *SynchronizationService) getNewAddress() (net.Addr, error) { - if s.addrManager == nil { - return nil, log.Error("Couldn't get address for new " + - "connection: address manager is nil.") - } - s.addrManager.Start() - for tries := 0; tries < 100; tries++ { - addr := s.addrManager.GetAddress() - if addr == nil { - break - } - // If we already have peers in this group, skip this address - key := addrmgr.GroupKey(addr.NetAddress()) - if s.outboundGroupCount(key) != 0 { - continue - } - if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute { - continue - } - if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) != - s.wallet.ChainParams().DefaultPort { - continue - } - addrString := addrmgr.NetAddressKey(addr.NetAddress()) - return addrStringToNetAddr(addrString) - } - return nil, log.Error("Couldn't get address for new connection: no " + - "valid addresses known.") -} - -func (s *SynchronizationService) outboundGroupCount(key string) int { - replyChan := make(chan int) - s.query <- getOutboundGroup{key: key, reply: replyChan} - return <-replyChan -} - -// SynchronizeWallet associates a wallet with the consensus RPC client, -// synchronizes the wallet with the latest changes to the blockchain, and -// continuously updates the wallet through RPC notifications. -// -// This function does not return without error until the wallet is synchronized -// to the current chain state. -func (s *SynchronizationService) SynchronizeWallet(w *wallet.Wallet) error { - s.wallet = w - - s.wg.Add(3) - go s.notificationQueueHandler() - go s.processQueuedNotifications() - go s.queryHandler() - - return s.syncWithNetwork(w) -} - -func (s *SynchronizationService) queryHandler() { - -} - -func (s *SynchronizationService) processQueuedNotifications() { - for n := range s.dequeueNotification { - var err error - notificationSwitch: - switch n := n.(type) { - case *wire.MsgBlock: - if n.BlockHash().String() != "" { - break notificationSwitch - } - case *wire.MsgHeaders: - case *wire.MsgInv: - case *wire.MsgReject: - } - - if err != nil { - log.Errorf("Cannot handle peer notification: %v", err) - } - } - s.wg.Done() -} - -// syncWithNetwork brings the wallet up to date with the current chain server -// connection. It creates a rescan request and blocks until the rescan has -// finished. -func (s *SynchronizationService) syncWithNetwork(w *wallet.Wallet) error { - /*chainClient := s.rpcClient - - // Request notifications for connected and disconnected blocks. - // - // TODO(jrick): Either request this notification only once, or when - // btcrpcclient is modified to allow some notification request to not - // automatically resent on reconnect, include the notifyblocks request - // as well. I am leaning towards allowing off all btcrpcclient - // notification re-registrations, in which case the code here should be - // left as is. - err := chainClient.NotifyBlocks() - if err != nil { - return err - } - - // Request notifications for transactions sending to all wallet - // addresses. - addrs, unspent, err := w.ActiveData() - if err != nil { - return err - } - - // TODO(jrick): How should this handle a synced height earlier than - // the chain server best block? - - // When no addresses have been generated for the wallet, the rescan can - // be skipped. - // - // TODO: This is only correct because activeData above returns all - // addresses ever created, including those that don't need to be watched - // anymore. This code should be updated when this assumption is no - // longer true, but worst case would result in an unnecessary rescan. - if len(addrs) == 0 && len(unspent) == 0 { - // TODO: It would be ideal if on initial sync wallet saved the - // last several recent blocks rather than just one. This would - // avoid a full rescan for a one block reorg of the current - // chain tip. - hash, height, err := chainClient.GetBestBlock() - if err != nil { - return err - } - return w.Manager.SetSyncedTo(&waddrmgr.BlockStamp{ - Hash: *hash, - Height: height, - }) - } - - // Compare previously-seen blocks against the chain server. If any of - // these blocks no longer exist, rollback all of the missing blocks - // before catching up with the rescan. - iter := w.Manager.NewIterateRecentBlocks() - rollback := iter == nil - syncBlock := waddrmgr.BlockStamp{ - Hash: *w.ChainParams().GenesisHash, - Height: 0, - } - for cont := iter != nil; cont; cont = iter.Prev() { - bs := iter.BlockStamp() - log.Debugf("Checking for previous saved block with height %v hash %v", - bs.Height, bs.Hash) - _, err = chainClient.GetBlock(&bs.Hash) - if err != nil { - rollback = true - continue - } - - log.Debug("Found matching block.") - syncBlock = bs - break - } - if rollback { - err = w.Manager.SetSyncedTo(&syncBlock) - if err != nil { - return err - } - // Rollback unconfirms transactions at and beyond the passed - // height, so add one to the new synced-to height to prevent - // unconfirming txs from the synced-to block. - err = w.TxStore.Rollback(syncBlock.Height + 1) - if err != nil { - return err - } - } - - return s.initialRescan(addrs, unspent, w.Manager.SyncedTo()) */ - return nil -}