Merge pull request #1417 from cfromknecht/detect-sync-stall

netsync/manager: detect stalled sync peer
This commit is contained in:
Olaoluwa Osuntokun 2019-04-26 17:42:31 -07:00 committed by GitHub
commit 96897255fd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -39,6 +39,14 @@ const (
// maxRequestedTxns is the maximum number of requested transactions
// hashes to store in memory.
maxRequestedTxns = wire.MaxInvPerMsg
// maxStallDuration is the time after which we will disconnect our
// current sync peer if we haven't made progress.
maxStallDuration = 3 * time.Minute
// stallSampleInterval the interval at which we will check to see if our
// sync has stalled.
stallSampleInterval = 30 * time.Second
)
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
@ -157,11 +165,12 @@ type SyncManager struct {
quit chan struct{}
// These fields should only be accessed from the blockHandler thread
rejectedTxns map[chainhash.Hash]struct{}
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
syncPeer *peerpkg.Peer
peerStates map[*peerpkg.Peer]*peerSyncState
rejectedTxns map[chainhash.Hash]struct{}
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
syncPeer *peerpkg.Peer
peerStates map[*peerpkg.Peer]*peerSyncState
lastProgressTime time.Time
// The following fields are used for headers-first mode.
headersFirstMode bool
@ -335,6 +344,11 @@ func (sm *SyncManager) startSync() {
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
}
sm.syncPeer = bestPeer
// Reset the last progress time now that we have a non-nil
// syncPeer to avoid instantly detecting it as stalled in the
// event the progress time hasn't been updated recently.
sm.lastProgressTime = time.Now()
} else {
log.Warnf("No sync peer candidates available")
}
@ -402,6 +416,59 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
}
}
// handleStallSample will switch to a new sync peer if the current one has
// stalled. This is detected when by comparing the last progress timestamp with
// the current time, and disconnecting the peer if we stalled before reaching
// their highest advertised block.
func (sm *SyncManager) handleStallSample() {
if atomic.LoadInt32(&sm.shutdown) != 0 {
return
}
// If we don't have an active sync peer, exit early.
if sm.syncPeer == nil {
return
}
// If the stall timeout has not elapsed, exit early.
if time.Since(sm.lastProgressTime) <= maxStallDuration {
return
}
// Check to see that the peer's sync state exists.
state, exists := sm.peerStates[sm.syncPeer]
if !exists {
return
}
sm.clearRequestedState(state)
disconnectSyncPeer := sm.shouldDCStalledSyncPeer()
sm.updateSyncPeer(disconnectSyncPeer)
}
// shouldDCStalledSyncPeer determines whether or not we should disconnect a
// stalled sync peer. If the peer has stalled and its reported height is greater
// than our own best height, we will disconnect it. Otherwise, we will keep the
// peer connected in case we are already at tip.
func (sm *SyncManager) shouldDCStalledSyncPeer() bool {
lastBlock := sm.syncPeer.LastBlock()
startHeight := sm.syncPeer.StartingHeight()
var peerHeight int32
if lastBlock > startHeight {
peerHeight = lastBlock
} else {
peerHeight = startHeight
}
// If we've stalled out yet the sync peer reports having more blocks for
// us we will disconnect them. This allows us at tip to not disconnect
// peers when we are equal or they temporarily lag behind us.
best := sm.chain.BestSnapshot()
return peerHeight > best.Height
}
// handleDonePeerMsg deals with peers that have signalled they are done. It
// removes the peer as a candidate for syncing and in the case where it was
// the current sync peer, attempts to select a new best peer to sync from. It
@ -418,6 +485,19 @@ func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
log.Infof("Lost peer %s", peer)
sm.clearRequestedState(state)
if peer == sm.syncPeer {
// Update the sync peer. The server has already disconnected the
// peer before signaling to the sync manager.
sm.updateSyncPeer(false)
}
}
// clearRequestedState wipes all expected transactions and blocks from the sync
// manager's requested maps that were requested under a peer's sync state, This
// allows them to be rerequested by a subsequent sync peer.
func (sm *SyncManager) clearRequestedState(state *peerSyncState) {
// Remove requested transactions from the global map so that they will
// be fetched from elsewhere next time we get an inv.
for txHash := range state.requestedTxns {
@ -431,18 +511,29 @@ func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
for blockHash := range state.requestedBlocks {
delete(sm.requestedBlocks, blockHash)
}
}
// Attempt to find a new peer to sync from if the quitting peer is the
// sync peer. Also, reset the headers-first state if in headers-first
// mode so
if sm.syncPeer == peer {
sm.syncPeer = nil
if sm.headersFirstMode {
best := sm.chain.BestSnapshot()
sm.resetHeaderState(&best.Hash, best.Height)
}
sm.startSync()
// updateSyncPeer choose a new sync peer to replace the current one. If
// dcSyncPeer is true, this method will also disconnect the current sync peer.
// If we are in header first mode, any header state related to prefetching is
// also reset in preparation for the next sync peer.
func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) {
log.Debugf("Updating sync peer, no progress for: %v",
time.Since(sm.lastProgressTime))
// First, disconnect the current sync peer if requested.
if dcSyncPeer {
sm.syncPeer.Disconnect()
}
// Reset any header state before we choose our next active sync peer.
if sm.headersFirstMode {
best := sm.chain.BestSnapshot()
sm.resetHeaderState(&best.Hash, best.Height)
}
sm.syncPeer = nil
sm.startSync()
}
// handleTxMsg handles transaction messages from all peers.
@ -659,6 +750,10 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer.PushGetBlocksMsg(locator, orphanRoot)
}
} else {
if peer == sm.syncPeer {
sm.lastProgressTime = time.Now()
}
// When the block is not an orphan, log information about it and
// update the chain state.
sm.progressLogger.LogBlockHeight(bmsg.block)
@ -1175,6 +1270,9 @@ func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
// important because the sync manager controls which blocks are needed and how
// the fetching should proceed.
func (sm *SyncManager) blockHandler() {
stallTicker := time.NewTicker(stallSampleInterval)
defer stallTicker.Stop()
out:
for {
select {
@ -1234,6 +1332,9 @@ out:
"handler: %T", msg)
}
case <-stallTicker.C:
sm.handleStallSample()
case <-sm.quit:
break out
}