diff --git a/blockmanager.go b/blockmanager.go index 02d6e71c..ff3e1418 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -5,6 +5,7 @@ package main import ( + "container/list" "github.com/conformal/btcchain" "github.com/conformal/btcdb" _ "github.com/conformal/btcdb/sqlite3" @@ -46,13 +47,131 @@ type blockManager struct { receivedLogTx int64 lastBlockLogTime time.Time processingReqs bool + syncPeer *peer newBlocks chan bool + newCandidates chan *peer + donePeers chan *peer blockQueue chan *blockMsg chainNotify chan *btcchain.Notification wg sync.WaitGroup quit chan bool } +// startSync will choose the best peer among the available candidate peers to +// download/sync the blockchain from. When syncing is already running, it +// simply returns. It also examines the candidates for any which are no longer +// candidates and removes them as needed. +func (b *blockManager) startSync(peers *list.List) { + // Return now if we're already syncing. + if b.syncPeer != nil { + return + } + + // Find the height of the current known best block. + _, height, err := b.server.db.NewestSha() + if err != nil { + log.Errorf("[BMGR] %v", err) + return + } + + var bestPeer *peer + for e := peers.Front(); e != nil; e = e.Next() { + p := e.Value.(*peer) + + // Remove sync candidate peers that are no longer candidates due + // to passing their latest known block. + if p.lastBlock <= int32(height) { + peers.Remove(e) + continue + } + + // TODO(davec): Use a better algorithm to choose the best peer. + // For now, just pick the first available candidate. + bestPeer = p + } + + // Start syncing from the best peer if one was selected. + if bestPeer != nil { + locator, err := b.blockChain.LatestBlockLocator() + if err != nil { + log.Error("[BMGR] Failed to get block locator for the "+ + "latest block: %v", err) + return + } + + log.Infof("[BMGR] Syncing to block height %d from peer %v", + bestPeer.lastBlock, bestPeer.conn.RemoteAddr()) + bestPeer.pushGetBlocksMsg(locator, &zeroHash) + b.syncPeer = bestPeer + } + +} + +// handleNewCandidateMsg deals with new peers that have signalled they may +// be considered as a sync peer (they have already successfully negotiated). It +// also start syncing if needed. It is invoked from the syncHandler goroutine. +func (b *blockManager) handleNewCandidateMsg(peers *list.List, p *peer) { + // Ignore if in the process of shutting down. + if b.shutdown { + return + } + + // The peer is not a candidate for sync if it's not a full node. + if p.services&btcwire.SFNodeNetwork != btcwire.SFNodeNetwork { + return + } + + // Add the peer as a candidate to sync from. + peers.PushBack(p) + + // Start syncing by choosing the best candidate if needed. + b.startSync(peers) +} + +// handleDonePeerMsg deals with peers that have signalled they are done. It +// removes the peer as a candidate for syncing and in the case where it was +// the current sync peer, attempts to select a new best peer to sync from. It +// is invoked from the syncHandler goroutine. +func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) { + // Remove the peer from the list of candidate peers. + for e := peers.Front(); e != nil; e = e.Next() { + if e.Value == p { + peers.Remove(e) + break + } + } + + // Attempt to find a new peer to sync from if the quitting peer is the + // sync peer. + if b.syncPeer != nil && b.syncPeer == p { + b.syncPeer = nil + b.startSync(peers) + } +} + +// syncHandler deals with handling downloading (syncing) the block chain from +// other peers as they connect and disconnect. It must be run as a goroutine. +func (b *blockManager) syncHandler() { + log.Tracef("[BMGR] Starting sync handler") + candidatePeers := list.New() +out: + // Live while we're not shutting down. + for !b.shutdown { + select { + case peer := <-b.newCandidates: + b.handleNewCandidateMsg(candidatePeers, peer) + + case peer := <-b.donePeers: + b.handleDonePeerMsg(candidatePeers, peer) + + case <-b.quit: + break out + } + } + b.wg.Done() + log.Trace("[BMGR] Sync handler done") +} + // logBlockHeight logs a new block height as an information message to show // progress to the user. In order to prevent spam, it limits logging to one // message every 10 seconds with duration and totals included. @@ -207,9 +326,10 @@ func (b *blockManager) Start() { } log.Trace("[BMGR] Starting block manager") + go b.syncHandler() go b.blockHandler() go b.chainNotificationHandler() - b.wg.Add(2) + b.wg.Add(3) b.started = true } @@ -239,6 +359,8 @@ func newBlockManager(s *server) *blockManager { blockPeer: make(map[btcwire.ShaHash]*peer), lastBlockLogTime: time.Now(), newBlocks: make(chan bool, 1), + newCandidates: make(chan *peer, cfg.MaxPeers), + donePeers: make(chan *peer, cfg.MaxPeers), blockQueue: make(chan *blockMsg, chanBufferSize), chainNotify: chainNotify, quit: make(chan bool), diff --git a/peer.go b/peer.go index 6e8136de..f7fb334e 100644 --- a/peer.go +++ b/peer.go @@ -178,8 +178,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { p.protocolVersion, p.conn.RemoteAddr()) p.lastBlock = msg.LastBlock - // Set the supported services for the peer to what the remote - // peer advertised. + // Set the supported services for the peer to what the remote peer + // advertised. p.services = msg.Services // Inbound connections. @@ -232,22 +232,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { } } - // Request latest blocks if the peer has blocks we're interested in. - _, lastBlock, err := p.server.db.NewestSha() - if err != nil { - log.Errorf("[PEER] %v", err) - p.Disconnect() - } - // If the peer has blocks we're interested in. - if p.lastBlock > int32(lastBlock) { - locator, err := p.server.blockManager.blockChain.LatestBlockLocator() - if err != nil { - log.Error("[PEER] Failed to get block locator for the "+ - "latest block: %v", err) - p.Disconnect() - } - p.pushGetBlocksMsg(locator, &zeroHash) - } + // Signal the block manager this peer is a new sync candidate. + p.server.blockManager.newCandidates <- p // TODO: Relay alerts. } @@ -808,9 +794,11 @@ out: } } - // Ensure connection is closed and notify server that the peer is done. + // Ensure connection is closed and notify server and block manager that + // the peer is done. p.Disconnect() p.server.donePeers <- p + p.server.blockManager.donePeers <- p p.quit <- true p.wg.Done()