Keep track of recently rejected transactions. (#484)
This prevents the node from repeatedly requesting and rejecting the same transaction as different peers inv the same transaction. Idea from Bitcoin Core commit 0847d9cb5fcd2fdd5a21bde699944d966cf5add9 Also, limit the number of both requested blocks and transactions.
This commit is contained in:
parent
d1e493f4ee
commit
cab74feb59
1 changed files with 105 additions and 1 deletions
106
blockmanager.go
106
blockmanager.go
|
@ -6,6 +6,8 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
|
"crypto/rand"
|
||||||
|
"math/big"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -32,6 +34,18 @@ const (
|
||||||
// database type is appended to this value to form the full block
|
// database type is appended to this value to form the full block
|
||||||
// database name.
|
// database name.
|
||||||
blockDbNamePrefix = "blocks"
|
blockDbNamePrefix = "blocks"
|
||||||
|
|
||||||
|
// maxRejectedTxns is the maximum number of rejected transactions
|
||||||
|
// shas to store in memory.
|
||||||
|
maxRejectedTxns = 1000
|
||||||
|
|
||||||
|
// maxRequestedBlocks is the maximum number of requested block
|
||||||
|
// shas to store in memory.
|
||||||
|
maxRequestedBlocks = wire.MaxInvPerMsg
|
||||||
|
|
||||||
|
// maxRequestedTxns is the maximum number of requested transactions
|
||||||
|
// shas to store in memory.
|
||||||
|
maxRequestedTxns = wire.MaxInvPerMsg
|
||||||
)
|
)
|
||||||
|
|
||||||
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
|
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
|
||||||
|
@ -189,6 +203,7 @@ type blockManager struct {
|
||||||
started int32
|
started int32
|
||||||
shutdown int32
|
shutdown int32
|
||||||
blockChain *blockchain.BlockChain
|
blockChain *blockchain.BlockChain
|
||||||
|
rejectedTxns map[wire.ShaHash]struct{}
|
||||||
requestedTxns map[wire.ShaHash]struct{}
|
requestedTxns map[wire.ShaHash]struct{}
|
||||||
requestedBlocks map[wire.ShaHash]struct{}
|
requestedBlocks map[wire.ShaHash]struct{}
|
||||||
progressLogger *blockProgressLogger
|
progressLogger *blockProgressLogger
|
||||||
|
@ -316,6 +331,11 @@ func (b *blockManager) startSync(peers *list.List) {
|
||||||
|
|
||||||
// Start syncing from the best peer if one was selected.
|
// Start syncing from the best peer if one was selected.
|
||||||
if bestPeer != nil {
|
if bestPeer != nil {
|
||||||
|
// Clear the requestedBlocks if the sync peer changes, otherwise
|
||||||
|
// we may ignore blocks we need that the last sync peer failed
|
||||||
|
// to send.
|
||||||
|
b.requestedBlocks = make(map[wire.ShaHash]struct{})
|
||||||
|
|
||||||
locator, err := b.blockChain.LatestBlockLocator()
|
locator, err := b.blockChain.LatestBlockLocator()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bmgrLog.Errorf("Failed to get block locator for the "+
|
bmgrLog.Errorf("Failed to get block locator for the "+
|
||||||
|
@ -471,6 +491,16 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
|
||||||
// spec to proliferate. While this is not ideal, there is no check here
|
// spec to proliferate. While this is not ideal, there is no check here
|
||||||
// to disconnect peers for sending unsolicited transactions to provide
|
// to disconnect peers for sending unsolicited transactions to provide
|
||||||
// interoperability.
|
// interoperability.
|
||||||
|
txHash := tmsg.tx.Sha()
|
||||||
|
|
||||||
|
// Ignore transactions that we have already rejected. Do not
|
||||||
|
// send a reject message here because if the transaction was already
|
||||||
|
// rejected, the transaction was unsolicited.
|
||||||
|
if _, exists := b.rejectedTxns[*txHash]; exists {
|
||||||
|
bmgrLog.Debugf("Ignoring unsolicited previously rejected "+
|
||||||
|
"transaction %v from %s", txHash, tmsg.peer)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Process the transaction to include validation, insertion in the
|
// Process the transaction to include validation, insertion in the
|
||||||
// memory pool, orphan handling, etc.
|
// memory pool, orphan handling, etc.
|
||||||
|
@ -482,11 +512,20 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
|
||||||
// already knows about it and as such we shouldn't have any more
|
// already knows about it and as such we shouldn't have any more
|
||||||
// instances of trying to fetch it, or we failed to insert and thus
|
// instances of trying to fetch it, or we failed to insert and thus
|
||||||
// we'll retry next time we get an inv.
|
// we'll retry next time we get an inv.
|
||||||
txHash := tmsg.tx.Sha()
|
|
||||||
delete(tmsg.peer.requestedTxns, *txHash)
|
delete(tmsg.peer.requestedTxns, *txHash)
|
||||||
delete(b.requestedTxns, *txHash)
|
delete(b.requestedTxns, *txHash)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// Do not request this transaction again until a new block
|
||||||
|
// has been processed.
|
||||||
|
b.rejectedTxns[*txHash] = struct{}{}
|
||||||
|
lerr := b.limitMap(b.rejectedTxns, maxRejectedTxns)
|
||||||
|
if lerr != nil {
|
||||||
|
bmgrLog.Warnf("Failed to limit the number of "+
|
||||||
|
"rejected transactions: %v", lerr)
|
||||||
|
delete(b.rejectedTxns, *txHash)
|
||||||
|
}
|
||||||
|
|
||||||
// When the error is a rule error, it means the transaction was
|
// When the error is a rule error, it means the transaction was
|
||||||
// simply rejected as opposed to something actually going wrong,
|
// simply rejected as opposed to something actually going wrong,
|
||||||
// so log it as such. Otherwise, something really did go wrong,
|
// so log it as such. Otherwise, something really did go wrong,
|
||||||
|
@ -664,6 +703,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
||||||
heightUpdate = int32(newestHeight)
|
heightUpdate = int32(newestHeight)
|
||||||
blkShaUpdate = newestSha
|
blkShaUpdate = newestSha
|
||||||
|
|
||||||
|
// Clear the rejected transactions.
|
||||||
|
b.rejectedTxns = make(map[wire.ShaHash]struct{})
|
||||||
|
|
||||||
// Allow any clients performing long polling via the
|
// Allow any clients performing long polling via the
|
||||||
// getblocktemplate RPC to be notified when the new block causes
|
// getblocktemplate RPC to be notified when the new block causes
|
||||||
// their old block template to become stale.
|
// their old block template to become stale.
|
||||||
|
@ -984,6 +1026,14 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !haveInv {
|
if !haveInv {
|
||||||
|
if iv.Type == wire.InvTypeTx {
|
||||||
|
// Skip the transaction if it has already been
|
||||||
|
// rejected.
|
||||||
|
if _, exists := b.rejectedTxns[iv.Hash]; exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add it to the request queue.
|
// Add it to the request queue.
|
||||||
imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv)
|
imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv)
|
||||||
continue
|
continue
|
||||||
|
@ -1046,6 +1096,16 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
||||||
// request.
|
// request.
|
||||||
if _, exists := b.requestedBlocks[iv.Hash]; !exists {
|
if _, exists := b.requestedBlocks[iv.Hash]; !exists {
|
||||||
b.requestedBlocks[iv.Hash] = struct{}{}
|
b.requestedBlocks[iv.Hash] = struct{}{}
|
||||||
|
err := b.limitMap(b.requestedBlocks,
|
||||||
|
maxRequestedBlocks)
|
||||||
|
if err != nil {
|
||||||
|
bmgrLog.Warnf("Failed to limit the "+
|
||||||
|
"number of requested "+
|
||||||
|
"blocks: %v", err)
|
||||||
|
delete(b.requestedBlocks, iv.Hash)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
imsg.peer.requestedBlocks[iv.Hash] = struct{}{}
|
imsg.peer.requestedBlocks[iv.Hash] = struct{}{}
|
||||||
gdmsg.AddInvVect(iv)
|
gdmsg.AddInvVect(iv)
|
||||||
numRequested++
|
numRequested++
|
||||||
|
@ -1056,6 +1116,15 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
||||||
// pending request.
|
// pending request.
|
||||||
if _, exists := b.requestedTxns[iv.Hash]; !exists {
|
if _, exists := b.requestedTxns[iv.Hash]; !exists {
|
||||||
b.requestedTxns[iv.Hash] = struct{}{}
|
b.requestedTxns[iv.Hash] = struct{}{}
|
||||||
|
err := b.limitMap(b.requestedTxns,
|
||||||
|
maxRequestedTxns)
|
||||||
|
if err != nil {
|
||||||
|
bmgrLog.Warnf("Failed to limit the "+
|
||||||
|
"number of requested "+
|
||||||
|
"transactions: %v", err)
|
||||||
|
delete(b.requestedTxns, iv.Hash)
|
||||||
|
continue
|
||||||
|
}
|
||||||
imsg.peer.requestedTxns[iv.Hash] = struct{}{}
|
imsg.peer.requestedTxns[iv.Hash] = struct{}{}
|
||||||
gdmsg.AddInvVect(iv)
|
gdmsg.AddInvVect(iv)
|
||||||
numRequested++
|
numRequested++
|
||||||
|
@ -1072,6 +1141,40 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// limitMap is a helper function for maps that require a maximum limit by
|
||||||
|
// evicting a random rejected transaction if adding a new value would cause it
|
||||||
|
// to overflow the maximum allowed.
|
||||||
|
func (b *blockManager) limitMap(m map[wire.ShaHash]struct{}, limit int) error {
|
||||||
|
if len(m)+1 > limit {
|
||||||
|
// Generate a cryptographically random hash.
|
||||||
|
randHashBytes := make([]byte, wire.HashSize)
|
||||||
|
_, err := rand.Read(randHashBytes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
randHashNum := new(big.Int).SetBytes(randHashBytes)
|
||||||
|
|
||||||
|
// Try to find the first entry that is greater than the random
|
||||||
|
// hash. Use the first entry (which is already pseudorandom due
|
||||||
|
// to Go's range statement over maps) as a fallback if none of
|
||||||
|
// the hashes in the map are larger than the random hash.
|
||||||
|
var foundHash *wire.ShaHash
|
||||||
|
for txHash := range m {
|
||||||
|
if foundHash == nil {
|
||||||
|
foundHash = &txHash
|
||||||
|
}
|
||||||
|
txHashNum := blockchain.ShaHashToBig(&txHash)
|
||||||
|
if txHashNum.Cmp(randHashNum) > 0 {
|
||||||
|
foundHash = &txHash
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(m, *foundHash)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// blockHandler is the main handler for the block manager. It must be run
|
// blockHandler is the main handler for the block manager. It must be run
|
||||||
// as a goroutine. It processes block and inv messages in a separate goroutine
|
// as a goroutine. It processes block and inv messages in a separate goroutine
|
||||||
// from the peer handlers so the block (MsgBlock) messages are handled by a
|
// from the peer handlers so the block (MsgBlock) messages are handled by a
|
||||||
|
@ -1440,6 +1543,7 @@ func newBlockManager(s *server) (*blockManager, error) {
|
||||||
|
|
||||||
bm := blockManager{
|
bm := blockManager{
|
||||||
server: s,
|
server: s,
|
||||||
|
rejectedTxns: make(map[wire.ShaHash]struct{}),
|
||||||
requestedTxns: make(map[wire.ShaHash]struct{}),
|
requestedTxns: make(map[wire.ShaHash]struct{}),
|
||||||
requestedBlocks: make(map[wire.ShaHash]struct{}),
|
requestedBlocks: make(map[wire.ShaHash]struct{}),
|
||||||
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
|
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
|
||||||
|
|
Loading…
Reference in a new issue