diff --git a/blockmanager.go b/blockmanager.go index d7bf6d1f..59482c4c 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -6,6 +6,8 @@ package main import ( "container/list" + "crypto/rand" + "math/big" "net" "os" "path/filepath" @@ -32,6 +34,18 @@ const ( // database type is appended to this value to form the full block // database name. blockDbNamePrefix = "blocks" + + // maxRejectedTxns is the maximum number of rejected transactions + // shas to store in memory. + maxRejectedTxns = 1000 + + // maxRequestedBlocks is the maximum number of requested block + // shas to store in memory. + maxRequestedBlocks = wire.MaxInvPerMsg + + // maxRequestedTxns is the maximum number of requested transactions + // shas to store in memory. + maxRequestedTxns = wire.MaxInvPerMsg ) // zeroHash is the zero value hash (all zeros). It is defined as a convenience. @@ -189,6 +203,7 @@ type blockManager struct { started int32 shutdown int32 blockChain *blockchain.BlockChain + rejectedTxns map[wire.ShaHash]struct{} requestedTxns map[wire.ShaHash]struct{} requestedBlocks map[wire.ShaHash]struct{} progressLogger *blockProgressLogger @@ -316,6 +331,11 @@ func (b *blockManager) startSync(peers *list.List) { // Start syncing from the best peer if one was selected. if bestPeer != nil { + // Clear the requestedBlocks if the sync peer changes, otherwise + // we may ignore blocks we need that the last sync peer failed + // to send. + b.requestedBlocks = make(map[wire.ShaHash]struct{}) + locator, err := b.blockChain.LatestBlockLocator() if err != nil { bmgrLog.Errorf("Failed to get block locator for the "+ @@ -471,6 +491,16 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // spec to proliferate. While this is not ideal, there is no check here // to disconnect peers for sending unsolicited transactions to provide // interoperability. + txHash := tmsg.tx.Sha() + + // Ignore transactions that we have already rejected. Do not + // send a reject message here because if the transaction was already + // rejected, the transaction was unsolicited. + if _, exists := b.rejectedTxns[*txHash]; exists { + bmgrLog.Debugf("Ignoring unsolicited previously rejected "+ + "transaction %v from %s", txHash, tmsg.peer) + return + } // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. @@ -482,11 +512,20 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // already knows about it and as such we shouldn't have any more // instances of trying to fetch it, or we failed to insert and thus // we'll retry next time we get an inv. - txHash := tmsg.tx.Sha() delete(tmsg.peer.requestedTxns, *txHash) delete(b.requestedTxns, *txHash) if err != nil { + // Do not request this transaction again until a new block + // has been processed. + b.rejectedTxns[*txHash] = struct{}{} + lerr := b.limitMap(b.rejectedTxns, maxRejectedTxns) + if lerr != nil { + bmgrLog.Warnf("Failed to limit the number of "+ + "rejected transactions: %v", lerr) + delete(b.rejectedTxns, *txHash) + } + // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, // so log it as such. Otherwise, something really did go wrong, @@ -664,6 +703,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { heightUpdate = int32(newestHeight) blkShaUpdate = newestSha + // Clear the rejected transactions. + b.rejectedTxns = make(map[wire.ShaHash]struct{}) + // Allow any clients performing long polling via the // getblocktemplate RPC to be notified when the new block causes // their old block template to become stale. @@ -984,6 +1026,14 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { continue } if !haveInv { + if iv.Type == wire.InvTypeTx { + // Skip the transaction if it has already been + // rejected. + if _, exists := b.rejectedTxns[iv.Hash]; exists { + continue + } + } + // Add it to the request queue. imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv) continue @@ -1046,6 +1096,16 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // request. if _, exists := b.requestedBlocks[iv.Hash]; !exists { b.requestedBlocks[iv.Hash] = struct{}{} + err := b.limitMap(b.requestedBlocks, + maxRequestedBlocks) + if err != nil { + bmgrLog.Warnf("Failed to limit the "+ + "number of requested "+ + "blocks: %v", err) + delete(b.requestedBlocks, iv.Hash) + continue + } + imsg.peer.requestedBlocks[iv.Hash] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ @@ -1056,6 +1116,15 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // pending request. if _, exists := b.requestedTxns[iv.Hash]; !exists { b.requestedTxns[iv.Hash] = struct{}{} + err := b.limitMap(b.requestedTxns, + maxRequestedTxns) + if err != nil { + bmgrLog.Warnf("Failed to limit the "+ + "number of requested "+ + "transactions: %v", err) + delete(b.requestedTxns, iv.Hash) + continue + } imsg.peer.requestedTxns[iv.Hash] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ @@ -1072,6 +1141,40 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { } } +// limitMap is a helper function for maps that require a maximum limit by +// evicting a random rejected transaction if adding a new value would cause it +// to overflow the maximum allowed. +func (b *blockManager) limitMap(m map[wire.ShaHash]struct{}, limit int) error { + if len(m)+1 > limit { + // Generate a cryptographically random hash. + randHashBytes := make([]byte, wire.HashSize) + _, err := rand.Read(randHashBytes) + if err != nil { + return err + } + randHashNum := new(big.Int).SetBytes(randHashBytes) + + // Try to find the first entry that is greater than the random + // hash. Use the first entry (which is already pseudorandom due + // to Go's range statement over maps) as a fallback if none of + // the hashes in the map are larger than the random hash. + var foundHash *wire.ShaHash + for txHash := range m { + if foundHash == nil { + foundHash = &txHash + } + txHashNum := blockchain.ShaHashToBig(&txHash) + if txHashNum.Cmp(randHashNum) > 0 { + foundHash = &txHash + break + } + } + delete(m, *foundHash) + } + + return nil +} + // blockHandler is the main handler for the block manager. It must be run // as a goroutine. It processes block and inv messages in a separate goroutine // from the peer handlers so the block (MsgBlock) messages are handled by a @@ -1440,6 +1543,7 @@ func newBlockManager(s *server) (*blockManager, error) { bm := blockManager{ server: s, + rejectedTxns: make(map[wire.ShaHash]struct{}), requestedTxns: make(map[wire.ShaHash]struct{}), requestedBlocks: make(map[wire.ShaHash]struct{}), progressLogger: newBlockProgressLogger("Processed", bmgrLog),