Incrementally backoff when reconnecting to peers
This ensures we backoff when reconnecting to peers for which we don't understand the replies, just like we do for peers we fail to connect to. Closes #103
This commit is contained in:
parent
8e74343747
commit
ece3ed8443
2 changed files with 36 additions and 46 deletions
69
peer.go
69
peer.go
|
@ -1527,6 +1527,7 @@ out:
|
|||
// ok we got a message, reset the timer.
|
||||
// timer just calls p.Disconnect() after logging.
|
||||
idleTimer.Reset(idleTimeoutMinutes * time.Minute)
|
||||
p.retryCount = 0
|
||||
}
|
||||
|
||||
idleTimer.Stop()
|
||||
|
@ -1910,10 +1911,11 @@ func newInboundPeer(s *server, conn net.Conn) *peer {
|
|||
// newOutbountPeer returns a new outbound bitcoin peer for the provided server and
|
||||
// address and connects to it asynchronously. If the connection is successful
|
||||
// then the peer will also be started.
|
||||
func newOutboundPeer(s *server, addr string, persistent bool) *peer {
|
||||
func newOutboundPeer(s *server, addr string, persistent bool, retryCount int64) *peer {
|
||||
p := newPeerBase(s, false)
|
||||
p.addr = addr
|
||||
p.persistent = persistent
|
||||
p.retryCount = retryCount
|
||||
|
||||
// Setup p.na with a temporary address that we are connecting to with
|
||||
// faked up service flags. We will replace this with the real one after
|
||||
|
@ -1945,46 +1947,35 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
|
|||
}
|
||||
|
||||
go func() {
|
||||
// Attempt to connect to the peer. If the connection fails and
|
||||
// this is a persistent connection, retry after the retry
|
||||
// interval.
|
||||
for atomic.LoadInt32(&p.disconnect) == 0 {
|
||||
srvrLog.Debugf("Attempting to connect to %s", addr)
|
||||
conn, err := btcdDial("tcp", addr)
|
||||
if err != nil {
|
||||
p.retryCount++
|
||||
srvrLog.Debugf("Failed to connect to %s: %v",
|
||||
addr, err)
|
||||
if !persistent {
|
||||
p.server.donePeers <- p
|
||||
return
|
||||
}
|
||||
scaledInterval := connectionRetryInterval.Nanoseconds() * p.retryCount / 2
|
||||
scaledDuration := time.Duration(scaledInterval)
|
||||
srvrLog.Debugf("Retrying connection to %s in "+
|
||||
"%s", addr, scaledDuration)
|
||||
time.Sleep(scaledDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
// While we were sleeping trying to connect, the server
|
||||
// may have scheduled a shutdown. In that case ditch
|
||||
// the peer immediately.
|
||||
if atomic.LoadInt32(&p.disconnect) == 0 {
|
||||
p.timeConnected = time.Now()
|
||||
p.server.addrManager.Attempt(p.na)
|
||||
|
||||
// Connection was successful so log it and start peer.
|
||||
srvrLog.Debugf("Connected to %s",
|
||||
conn.RemoteAddr())
|
||||
p.conn = conn
|
||||
atomic.AddInt32(&p.connected, 1)
|
||||
p.retryCount = 0
|
||||
p.Start()
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&p.disconnect) != 0 {
|
||||
return
|
||||
}
|
||||
if p.retryCount > 0 {
|
||||
scaledInterval := connectionRetryInterval.Nanoseconds() * p.retryCount / 2
|
||||
scaledDuration := time.Duration(scaledInterval)
|
||||
srvrLog.Debugf("Retrying connection to %s in %s", addr, scaledDuration)
|
||||
time.Sleep(scaledDuration)
|
||||
}
|
||||
srvrLog.Debugf("Attempting to connect to %s", addr)
|
||||
conn, err := btcdDial("tcp", addr)
|
||||
if err != nil {
|
||||
srvrLog.Debugf("Failed to connect to %s: %v", addr, err)
|
||||
p.server.donePeers <- p
|
||||
return
|
||||
}
|
||||
|
||||
// We may have slept and the server may have scheduled a shutdown. In that
|
||||
// case ditch the peer immediately.
|
||||
if atomic.LoadInt32(&p.disconnect) == 0 {
|
||||
p.timeConnected = time.Now()
|
||||
p.server.addrManager.Attempt(p.na)
|
||||
|
||||
// Connection was successful so log it and start peer.
|
||||
srvrLog.Debugf("Connected to %s", conn.RemoteAddr())
|
||||
p.conn = conn
|
||||
atomic.AddInt32(&p.connected, 1)
|
||||
p.Start()
|
||||
}
|
||||
}()
|
||||
return p
|
||||
}
|
||||
|
|
13
server.go
13
server.go
|
@ -252,9 +252,8 @@ func (s *server) handleDonePeerMsg(state *peerState, p *peer) {
|
|||
if e.Value == p {
|
||||
// Issue an asynchronous reconnect if the peer was a
|
||||
// persistent outbound connection.
|
||||
if !p.inbound && p.persistent &&
|
||||
atomic.LoadInt32(&s.shutdown) == 0 {
|
||||
e.Value = newOutboundPeer(s, p.addr, true)
|
||||
if !p.inbound && p.persistent && atomic.LoadInt32(&s.shutdown) == 0 {
|
||||
e.Value = newOutboundPeer(s, p.addr, true, p.retryCount+1)
|
||||
return
|
||||
}
|
||||
if !p.inbound {
|
||||
|
@ -431,7 +430,7 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
|
|||
}
|
||||
// TODO(oga) if too many, nuke a non-perm peer.
|
||||
if s.handleAddPeerMsg(state,
|
||||
newOutboundPeer(s, msg.addr, msg.permanent)) {
|
||||
newOutboundPeer(s, msg.addr, msg.permanent, 0)) {
|
||||
msg.reply <- nil
|
||||
} else {
|
||||
msg.reply <- errors.New("failed to add peer")
|
||||
|
@ -573,7 +572,7 @@ func (s *server) peerHandler() {
|
|||
permanentPeers = cfg.AddPeers
|
||||
}
|
||||
for _, addr := range permanentPeers {
|
||||
s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true))
|
||||
s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true, 0))
|
||||
}
|
||||
|
||||
// if nothing else happens, wake us up soon.
|
||||
|
@ -686,11 +685,11 @@ out:
|
|||
// any failure will be due to banned peers etc. we have
|
||||
// already checked that we have room for more peers.
|
||||
if s.handleAddPeerMsg(state,
|
||||
newOutboundPeer(s, addrStr, false)) {
|
||||
newOutboundPeer(s, addrStr, false, 0)) {
|
||||
}
|
||||
}
|
||||
|
||||
// We we need more peers, wake up in ten seconds and try again.
|
||||
// We need more peers, wake up in ten seconds and try again.
|
||||
if state.NeedMoreOutbound() {
|
||||
time.AfterFunc(10*time.Second, func() {
|
||||
s.wakeup <- struct{}{}
|
||||
|
|
Loading…
Add table
Reference in a new issue