netsync/manager: add syncPeer stall detector
Adds stall detection when no blocks have been received from the sync peer for at least 3 minutes. The change backports parts of https://github.com/gcash/bchd/pull/105, though has different behavior, such as: - Rotating the sync peer at tip. - Only disconnecting the sync peer if they're height exceeds our own. Since we will instead rotate the sync peer at tip, this prevents us from disconnecting good peers while waiting for new blocks. - Not resetting the progress time when blocks are submitted via rpc.
This commit is contained in:
parent
a015d8231e
commit
e7f9935099
1 changed files with 83 additions and 6 deletions
|
@ -39,6 +39,14 @@ const (
|
|||
// maxRequestedTxns is the maximum number of requested transactions
|
||||
// hashes to store in memory.
|
||||
maxRequestedTxns = wire.MaxInvPerMsg
|
||||
|
||||
// maxStallDuration is the time after which we will disconnect our
|
||||
// current sync peer if we haven't made progress.
|
||||
maxStallDuration = 3 * time.Minute
|
||||
|
||||
// stallSampleInterval the interval at which we will check to see if our
|
||||
// sync has stalled.
|
||||
stallSampleInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
|
||||
|
@ -157,11 +165,12 @@ type SyncManager struct {
|
|||
quit chan struct{}
|
||||
|
||||
// These fields should only be accessed from the blockHandler thread
|
||||
rejectedTxns map[chainhash.Hash]struct{}
|
||||
requestedTxns map[chainhash.Hash]struct{}
|
||||
requestedBlocks map[chainhash.Hash]struct{}
|
||||
syncPeer *peerpkg.Peer
|
||||
peerStates map[*peerpkg.Peer]*peerSyncState
|
||||
rejectedTxns map[chainhash.Hash]struct{}
|
||||
requestedTxns map[chainhash.Hash]struct{}
|
||||
requestedBlocks map[chainhash.Hash]struct{}
|
||||
syncPeer *peerpkg.Peer
|
||||
peerStates map[*peerpkg.Peer]*peerSyncState
|
||||
lastProgressTime time.Time
|
||||
|
||||
// The following fields are used for headers-first mode.
|
||||
headersFirstMode bool
|
||||
|
@ -335,6 +344,11 @@ func (sm *SyncManager) startSync() {
|
|||
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
|
||||
}
|
||||
sm.syncPeer = bestPeer
|
||||
|
||||
// Reset the last progress time now that we have a non-nil
|
||||
// syncPeer to avoid instantly detecting it as stalled in the
|
||||
// event the progress time hasn't been updated recently.
|
||||
sm.lastProgressTime = time.Now()
|
||||
} else {
|
||||
log.Warnf("No sync peer candidates available")
|
||||
}
|
||||
|
@ -402,6 +416,59 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
|
|||
}
|
||||
}
|
||||
|
||||
// handleStallSample will switch to a new sync peer if the current one has
|
||||
// stalled. This is detected when by comparing the last progress timestamp with
|
||||
// the current time, and disconnecting the peer if we stalled before reaching
|
||||
// their highest advertised block.
|
||||
func (sm *SyncManager) handleStallSample() {
|
||||
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// If we don't have an active sync peer, exit early.
|
||||
if sm.syncPeer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If the stall timeout has not elapsed, exit early.
|
||||
if time.Since(sm.lastProgressTime) <= maxStallDuration {
|
||||
return
|
||||
}
|
||||
|
||||
// Check to see that the peer's sync state exists.
|
||||
state, exists := sm.peerStates[sm.syncPeer]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
|
||||
sm.clearRequestedState(state)
|
||||
|
||||
disconnectSyncPeer := sm.shouldDCStalledSyncPeer()
|
||||
sm.updateSyncPeer(disconnectSyncPeer)
|
||||
}
|
||||
|
||||
// shouldDCStalledSyncPeer determines whether or not we should disconnect a
|
||||
// stalled sync peer. If the peer has stalled and its reported height is greater
|
||||
// than our own best height, we will disconnect it. Otherwise, we will keep the
|
||||
// peer connected in case we are already at tip.
|
||||
func (sm *SyncManager) shouldDCStalledSyncPeer() bool {
|
||||
lastBlock := sm.syncPeer.LastBlock()
|
||||
startHeight := sm.syncPeer.StartingHeight()
|
||||
|
||||
var peerHeight int32
|
||||
if lastBlock > startHeight {
|
||||
peerHeight = lastBlock
|
||||
} else {
|
||||
peerHeight = startHeight
|
||||
}
|
||||
|
||||
// If we've stalled out yet the sync peer reports having more blocks for
|
||||
// us we will disconnect them. This allows us at tip to not disconnect
|
||||
// peers when we are equal or they temporarily lag behind us.
|
||||
best := sm.chain.BestSnapshot()
|
||||
return peerHeight > best.Height
|
||||
}
|
||||
|
||||
// handleDonePeerMsg deals with peers that have signalled they are done. It
|
||||
// removes the peer as a candidate for syncing and in the case where it was
|
||||
// the current sync peer, attempts to select a new best peer to sync from. It
|
||||
|
@ -451,7 +518,7 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) {
|
|||
// If we are in header first mode, any header state related to prefetching is
|
||||
// also reset in preparation for the next sync peer.
|
||||
func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) {
|
||||
log.Infof("Updating sync peer, no progress since: %v",
|
||||
log.Debugf("Updating sync peer, no progress for: %v",
|
||||
time.Since(sm.lastProgressTime))
|
||||
|
||||
// First, disconnect the current sync peer if requested.
|
||||
|
@ -683,6 +750,10 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
peer.PushGetBlocksMsg(locator, orphanRoot)
|
||||
}
|
||||
} else {
|
||||
if peer == sm.syncPeer {
|
||||
sm.lastProgressTime = time.Now()
|
||||
}
|
||||
|
||||
// When the block is not an orphan, log information about it and
|
||||
// update the chain state.
|
||||
sm.progressLogger.LogBlockHeight(bmsg.block)
|
||||
|
@ -1199,6 +1270,9 @@ func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
|
|||
// important because the sync manager controls which blocks are needed and how
|
||||
// the fetching should proceed.
|
||||
func (sm *SyncManager) blockHandler() {
|
||||
stallTicker := time.NewTicker(stallSampleInterval)
|
||||
defer stallTicker.Stop()
|
||||
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
|
@ -1258,6 +1332,9 @@ out:
|
|||
"handler: %T", msg)
|
||||
}
|
||||
|
||||
case <-stallTicker.C:
|
||||
sm.handleStallSample()
|
||||
|
||||
case <-sm.quit:
|
||||
break out
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue