diff --git a/netsync/manager.go b/netsync/manager.go index 37c26707..32715b8e 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -39,6 +39,14 @@ const ( // maxRequestedTxns is the maximum number of requested transactions // hashes to store in memory. maxRequestedTxns = wire.MaxInvPerMsg + + // maxStallDuration is the time after which we will disconnect our + // current sync peer if we haven't made progress. + maxStallDuration = 3 * time.Minute + + // stallSampleInterval the interval at which we will check to see if our + // sync has stalled. + stallSampleInterval = 30 * time.Second ) // zeroHash is the zero value hash (all zeros). It is defined as a convenience. @@ -157,11 +165,12 @@ type SyncManager struct { quit chan struct{} // These fields should only be accessed from the blockHandler thread - rejectedTxns map[chainhash.Hash]struct{} - requestedTxns map[chainhash.Hash]struct{} - requestedBlocks map[chainhash.Hash]struct{} - syncPeer *peerpkg.Peer - peerStates map[*peerpkg.Peer]*peerSyncState + rejectedTxns map[chainhash.Hash]struct{} + requestedTxns map[chainhash.Hash]struct{} + requestedBlocks map[chainhash.Hash]struct{} + syncPeer *peerpkg.Peer + peerStates map[*peerpkg.Peer]*peerSyncState + lastProgressTime time.Time // The following fields are used for headers-first mode. headersFirstMode bool @@ -335,6 +344,11 @@ func (sm *SyncManager) startSync() { bestPeer.PushGetBlocksMsg(locator, &zeroHash) } sm.syncPeer = bestPeer + + // Reset the last progress time now that we have a non-nil + // syncPeer to avoid instantly detecting it as stalled in the + // event the progress time hasn't been updated recently. + sm.lastProgressTime = time.Now() } else { log.Warnf("No sync peer candidates available") } @@ -402,6 +416,59 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { } } +// handleStallSample will switch to a new sync peer if the current one has +// stalled. This is detected when by comparing the last progress timestamp with +// the current time, and disconnecting the peer if we stalled before reaching +// their highest advertised block. +func (sm *SyncManager) handleStallSample() { + if atomic.LoadInt32(&sm.shutdown) != 0 { + return + } + + // If we don't have an active sync peer, exit early. + if sm.syncPeer == nil { + return + } + + // If the stall timeout has not elapsed, exit early. + if time.Since(sm.lastProgressTime) <= maxStallDuration { + return + } + + // Check to see that the peer's sync state exists. + state, exists := sm.peerStates[sm.syncPeer] + if !exists { + return + } + + sm.clearRequestedState(state) + + disconnectSyncPeer := sm.shouldDCStalledSyncPeer() + sm.updateSyncPeer(disconnectSyncPeer) +} + +// shouldDCStalledSyncPeer determines whether or not we should disconnect a +// stalled sync peer. If the peer has stalled and its reported height is greater +// than our own best height, we will disconnect it. Otherwise, we will keep the +// peer connected in case we are already at tip. +func (sm *SyncManager) shouldDCStalledSyncPeer() bool { + lastBlock := sm.syncPeer.LastBlock() + startHeight := sm.syncPeer.StartingHeight() + + var peerHeight int32 + if lastBlock > startHeight { + peerHeight = lastBlock + } else { + peerHeight = startHeight + } + + // If we've stalled out yet the sync peer reports having more blocks for + // us we will disconnect them. This allows us at tip to not disconnect + // peers when we are equal or they temporarily lag behind us. + best := sm.chain.BestSnapshot() + return peerHeight > best.Height +} + // handleDonePeerMsg deals with peers that have signalled they are done. It // removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It @@ -451,7 +518,7 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) { // If we are in header first mode, any header state related to prefetching is // also reset in preparation for the next sync peer. func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) { - log.Infof("Updating sync peer, no progress since: %v", + log.Debugf("Updating sync peer, no progress for: %v", time.Since(sm.lastProgressTime)) // First, disconnect the current sync peer if requested. @@ -683,6 +750,10 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { peer.PushGetBlocksMsg(locator, orphanRoot) } } else { + if peer == sm.syncPeer { + sm.lastProgressTime = time.Now() + } + // When the block is not an orphan, log information about it and // update the chain state. sm.progressLogger.LogBlockHeight(bmsg.block) @@ -1199,6 +1270,9 @@ func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { // important because the sync manager controls which blocks are needed and how // the fetching should proceed. func (sm *SyncManager) blockHandler() { + stallTicker := time.NewTicker(stallSampleInterval) + defer stallTicker.Stop() + out: for { select { @@ -1258,6 +1332,9 @@ out: "handler: %T", msg) } + case <-stallTicker.C: + sm.handleStallSample() + case <-sm.quit: break out }