netsync: handle notfound messages from peers
backport from https://github.com/decred/dcrd/pull/2253 When a peer sends a notfound message, remove the hash from requested map. Also increase notfound ban score and return early if it disconnects the peer.
This commit is contained in:
parent
69773a7b41
commit
24db7d7c0c
2 changed files with 99 additions and 6 deletions
|
@ -79,6 +79,13 @@ type headersMsg struct {
|
||||||
peer *peerpkg.Peer
|
peer *peerpkg.Peer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// notFoundMsg packages a bitcoin notfound message and the peer it came from
|
||||||
|
// together so the block handler has access to that information.
|
||||||
|
type notFoundMsg struct {
|
||||||
|
notFound *wire.MsgNotFound
|
||||||
|
peer *peerpkg.Peer
|
||||||
|
}
|
||||||
|
|
||||||
// donePeerMsg signifies a newly disconnected peer to the block handler.
|
// donePeerMsg signifies a newly disconnected peer to the block handler.
|
||||||
type donePeerMsg struct {
|
type donePeerMsg struct {
|
||||||
peer *peerpkg.Peer
|
peer *peerpkg.Peer
|
||||||
|
@ -1012,6 +1019,32 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleNotFoundMsg handles notfound messages from all peers.
|
||||||
|
func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
|
||||||
|
peer := nfmsg.peer
|
||||||
|
state, exists := sm.peerStates[peer]
|
||||||
|
if !exists {
|
||||||
|
log.Warnf("Received notfound message from unknown peer %s", peer)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, inv := range nfmsg.notFound.InvList {
|
||||||
|
// verify the hash was actually announced by the peer
|
||||||
|
// before deleting from the global requested maps.
|
||||||
|
switch inv.Type {
|
||||||
|
case wire.InvTypeBlock:
|
||||||
|
if _, exists := state.requestedBlocks[inv.Hash]; exists {
|
||||||
|
delete(state.requestedBlocks, inv.Hash)
|
||||||
|
delete(sm.requestedBlocks, inv.Hash)
|
||||||
|
}
|
||||||
|
case wire.InvTypeTx:
|
||||||
|
if _, exists := state.requestedTxns[inv.Hash]; exists {
|
||||||
|
delete(state.requestedTxns, inv.Hash)
|
||||||
|
delete(sm.requestedTxns, inv.Hash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// haveInventory returns whether or not the inventory represented by the passed
|
// haveInventory returns whether or not the inventory represented by the passed
|
||||||
// inventory vector is known. This includes checking all of the various places
|
// inventory vector is known. This includes checking all of the various places
|
||||||
// inventory can be when it is in different states such as blocks that are part
|
// inventory can be when it is in different states such as blocks that are part
|
||||||
|
@ -1293,6 +1326,9 @@ out:
|
||||||
case *headersMsg:
|
case *headersMsg:
|
||||||
sm.handleHeadersMsg(msg)
|
sm.handleHeadersMsg(msg)
|
||||||
|
|
||||||
|
case *notFoundMsg:
|
||||||
|
sm.handleNotFoundMsg(msg)
|
||||||
|
|
||||||
case *donePeerMsg:
|
case *donePeerMsg:
|
||||||
sm.handleDonePeerMsg(msg.peer)
|
sm.handleDonePeerMsg(msg.peer)
|
||||||
|
|
||||||
|
@ -1490,6 +1526,18 @@ func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer
|
||||||
sm.msgChan <- &headersMsg{headers: headers, peer: peer}
|
sm.msgChan <- &headersMsg{headers: headers, peer: peer}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueNotFound adds the passed notfound message and peer to the block handling
|
||||||
|
// queue.
|
||||||
|
func (sm *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
|
||||||
|
// No channel handling here because peers do not need to block on
|
||||||
|
// reject messages.
|
||||||
|
if atomic.LoadInt32(&sm.shutdown) != 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer}
|
||||||
|
}
|
||||||
|
|
||||||
// DonePeer informs the blockmanager that a peer has disconnected.
|
// DonePeer informs the blockmanager that a peer has disconnected.
|
||||||
func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
|
func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
|
||||||
// Ignore if we are shutting down.
|
// Ignore if we are shutting down.
|
||||||
|
|
57
server.go
57
server.go
|
@ -364,14 +364,14 @@ func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
|
||||||
// threshold, a warning is logged including the reason provided. Further, if
|
// threshold, a warning is logged including the reason provided. Further, if
|
||||||
// the score is above the ban threshold, the peer will be banned and
|
// the score is above the ban threshold, the peer will be banned and
|
||||||
// disconnected.
|
// disconnected.
|
||||||
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
|
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool {
|
||||||
// No warning is logged and no score is calculated if banning is disabled.
|
// No warning is logged and no score is calculated if banning is disabled.
|
||||||
if cfg.DisableBanning {
|
if cfg.DisableBanning {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
if sp.isWhitelisted {
|
if sp.isWhitelisted {
|
||||||
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
|
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
warnThreshold := cfg.BanThreshold >> 1
|
warnThreshold := cfg.BanThreshold >> 1
|
||||||
|
@ -383,7 +383,7 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
|
||||||
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
|
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
|
||||||
"it was not increased this time", sp, reason, score)
|
"it was not increased this time", sp, reason, score)
|
||||||
}
|
}
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
score := sp.banScore.Increase(persistent, transient)
|
score := sp.banScore.Increase(persistent, transient)
|
||||||
if score > warnThreshold {
|
if score > warnThreshold {
|
||||||
|
@ -394,8 +394,10 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
|
||||||
sp)
|
sp)
|
||||||
sp.server.BanPeer(sp)
|
sp.server.BanPeer(sp)
|
||||||
sp.Disconnect()
|
sp.Disconnect()
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// hasServices returns whether or not the provided advertised service flags have
|
// hasServices returns whether or not the provided advertised service flags have
|
||||||
|
@ -498,7 +500,9 @@ func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
|
||||||
// The ban score accumulates and passes the ban threshold if a burst of
|
// The ban score accumulates and passes the ban threshold if a burst of
|
||||||
// mempool messages comes from a peer. The score decays each minute to
|
// mempool messages comes from a peer. The score decays each minute to
|
||||||
// half of its value.
|
// half of its value.
|
||||||
sp.addBanScore(0, 33, "mempool")
|
if sp.addBanScore(0, 33, "mempool") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Generate inventory message with the available transactions in the
|
// Generate inventory message with the available transactions in the
|
||||||
// transaction memory pool. Limit it to the max allowed inventory
|
// transaction memory pool. Limit it to the max allowed inventory
|
||||||
|
@ -638,7 +642,9 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
|
||||||
// bursts of small requests are not penalized as that would potentially ban
|
// bursts of small requests are not penalized as that would potentially ban
|
||||||
// peers performing IBD.
|
// peers performing IBD.
|
||||||
// This incremental score decays each minute to half of its value.
|
// This incremental score decays each minute to half of its value.
|
||||||
sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
|
if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// We wait on this wait channel periodically to prevent queuing
|
// We wait on this wait channel periodically to prevent queuing
|
||||||
// far more data than we can send in a reasonable time, wasting memory.
|
// far more data than we can send in a reasonable time, wasting memory.
|
||||||
|
@ -1304,6 +1310,44 @@ func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message,
|
||||||
sp.server.AddBytesSent(uint64(bytesWritten))
|
sp.server.AddBytesSent(uint64(bytesWritten))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnNotFound is invoked when a peer sends a notfound message.
|
||||||
|
func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) {
|
||||||
|
if !sp.Connected() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var numBlocks, numTxns uint32
|
||||||
|
for _, inv := range msg.InvList {
|
||||||
|
switch inv.Type {
|
||||||
|
case wire.InvTypeBlock:
|
||||||
|
numBlocks++
|
||||||
|
case wire.InvTypeTx:
|
||||||
|
numTxns++
|
||||||
|
default:
|
||||||
|
peerLog.Debugf("Invalid inv type '%d' in notfound message from %s",
|
||||||
|
inv.Type, sp)
|
||||||
|
sp.Disconnect()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if numBlocks > 0 {
|
||||||
|
blockStr := pickNoun(uint64(numBlocks), "block", "blocks")
|
||||||
|
reason := fmt.Sprintf("%d %v not found", numBlocks, blockStr)
|
||||||
|
if sp.addBanScore(20*numBlocks, 0, reason) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if numTxns > 0 {
|
||||||
|
txStr := pickNoun(uint64(numTxns), "transaction", "transactions")
|
||||||
|
reason := fmt.Sprintf("%d %v not found", numBlocks, txStr)
|
||||||
|
if sp.addBanScore(0, 10*numTxns, reason) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sp.server.syncManager.QueueNotFound(msg, p)
|
||||||
|
}
|
||||||
|
|
||||||
// randomUint16Number returns a random uint16 in a specified input range. Note
|
// randomUint16Number returns a random uint16 in a specified input range. Note
|
||||||
// that the range is in zeroth ordering; if you pass it 1800, you will get
|
// that the range is in zeroth ordering; if you pass it 1800, you will get
|
||||||
// values from 0 to 1800.
|
// values from 0 to 1800.
|
||||||
|
@ -1998,6 +2042,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
|
||||||
OnAddr: sp.OnAddr,
|
OnAddr: sp.OnAddr,
|
||||||
OnRead: sp.OnRead,
|
OnRead: sp.OnRead,
|
||||||
OnWrite: sp.OnWrite,
|
OnWrite: sp.OnWrite,
|
||||||
|
OnNotFound: sp.OnNotFound,
|
||||||
|
|
||||||
// Note: The reference client currently bans peers that send alerts
|
// Note: The reference client currently bans peers that send alerts
|
||||||
// not signed with its key. We could verify against their key, but
|
// not signed with its key. We could verify against their key, but
|
||||||
|
|
Loading…
Reference in a new issue