diff --git a/blockmanager.go b/blockmanager.go index 6cecf0ef..c00003f0 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -54,7 +54,7 @@ type donePeerMsg struct { // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { - msg *btcwire.MsgTx + tx *btcwire.MsgTx peer *peer } @@ -66,6 +66,7 @@ type blockManager struct { shutdown int32 blockChain *btcchain.BlockChain blockPeer map[btcwire.ShaHash]*peer + requestedTxns map[btcwire.ShaHash]bool requestedBlocks map[btcwire.ShaHash]bool receivedLogBlocks int64 receivedLogTx int64 @@ -200,7 +201,13 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) { log.Infof("[BMGR] Lost peer %s", p) - // remove requested blocks from the global map so that they will be + // Remove requested transactions from the global map so that they will + // be fetched from elsewhere next time we get an inv. + for k := range p.requestedTxns { + delete(b.requestedTxns, k) + } + + // Remove requested blocks from the global map so that they will be // fetched from elsewhere next time we get an inv. // TODO(oga) we could possibly here check which peers have these blocks // and request them now to speed things up a little. @@ -247,6 +254,44 @@ func (b *blockManager) logBlockHeight(numTx, height int64) { b.lastBlockLogTime = now } +// handleTxMsg handles transaction messages from all peers. +func (b *blockManager) handleTxMsg(tmsg *txMsg) { + // Keep track of which peer the tx was sent from. + txHash, _ := tmsg.tx.TxSha() + + // If we didn't ask for this block then the peer is misbehaving. + if _, ok := tmsg.peer.requestedTxns[txHash]; !ok { + log.Warnf("[BMGR] Got unrequested transaction %v from %s -- "+ + "disconnecting", &txHash, tmsg.peer.addr) + tmsg.peer.Disconnect() + return + } + + // Process the transaction to include validation, insertion in the + // memory pool, orphan handling, etc. + err := tmsg.peer.server.txMemPool.ProcessTransaction(tmsg.tx) + + // Remove transaction from request maps. Either the mempool/chain + // already knows about it and as such we shouldn't have any more + // instances of trying to fetch it, or we failed to insert and thus + // we'll retry next time we get an inv. + delete(tmsg.peer.requestedTxns, txHash) + delete(b.requestedTxns, txHash) + + if err != nil { + // When the error is a rule error, it means the transaction was + // simply rejected as opposed to something actually going wrong, + // so log it as such. Otherwise, something really did go wrong, + // so log it as an actual error. + if _, ok := err.(TxRuleError); ok { + log.Debugf("Rejected transaction %v: %v", txHash, err) + } else { + log.Errorf("Failed to process transaction %v: %v", txHash, err) + } + return + } +} + // handleBlockMsg handles block messages from all peers. func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Keep track of which peer the block was sent from so the notification @@ -312,6 +357,35 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { b.server.db.Sync() } +// haveInventory returns whether or not the inventory represented by the passed +// inventory vector is known. This includes checking all of the various places +// inventory can be when it is in different states such as blocks that are part +// of the main chain, on a side chain, in the orphan pool, and transactions that +// in the memory pool (either the main pool or orphan pool). +func (b *blockManager) haveInventory(invVect *btcwire.InvVect) bool { + switch invVect.Type { + case btcwire.InvVect_Block: + // Ask chain if the block is known to it in any form (main + // chain, side chain, or orphan). + return b.blockChain.HaveBlock(&invVect.Hash) + + case btcwire.InvVect_Tx: + // Ask the transaction memory pool if the transaction is known + // to it in any form (main pool or orphan). + if b.server.txMemPool.IsTransactionInPool(&invVect.Hash) { + return true + } + + // Check if the transaction exists from the point of view of the + // end of the main chain. + return b.server.db.ExistsTxSha(&invVect.Hash) + } + + // The requested inventory is is an unsupported type, so just claim + // it is known to avoid requesting it. + return true +} + // handleInvMsg handles inv messages from all peers. // We examine the inventory advertised by the remote peer and act accordingly. // @@ -351,7 +425,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { imsg.peer.addKnownInventory(iv) // Request the inventory if we don't already have it. - if !chain.HaveInventory(iv) { + if !b.haveInventory(iv) { // Add it to the request queue. imsg.peer.requestQueue.PushBack(iv) continue @@ -406,13 +480,27 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { for e := requestQueue.Front(); e != nil; e = requestQueue.Front() { iv := e.Value.(*btcwire.InvVect) imsg.peer.requestQueue.Remove(e) - // check that no one else has asked for this. if so we don't - // need to ask. - if _, exists := b.requestedBlocks[iv.Hash]; !exists { - b.requestedBlocks[iv.Hash] = true - imsg.peer.requestedBlocks[iv.Hash] = true - gdmsg.AddInvVect(iv) - numRequested++ + + switch iv.Type { + case btcwire.InvVect_Block: + // Request the block if there is not already a pending + // request. + if _, exists := b.requestedBlocks[iv.Hash]; !exists { + b.requestedBlocks[iv.Hash] = true + imsg.peer.requestedBlocks[iv.Hash] = true + gdmsg.AddInvVect(iv) + numRequested++ + } + + case btcwire.InvVect_Tx: + // Request the transaction if there is not already a + // pending request. + if _, exists := b.requestedTxns[iv.Hash]; !exists { + b.requestedTxns[iv.Hash] = true + imsg.peer.requestedTxns[iv.Hash] = true + gdmsg.AddInvVect(iv) + numRequested++ + } } if numRequested >= btcwire.MaxInvPerMsg { @@ -440,6 +528,10 @@ out: case *newPeerMsg: b.handleNewPeerMsg(candidatePeers, msg.peer) + case *txMsg: + b.handleTxMsg(msg) + msg.peer.txProcessed <- true + case *blockMsg: b.handleBlockMsg(msg) msg.peer.blockProcessed <- true @@ -554,6 +646,18 @@ func (b *blockManager) NewPeer(p *peer) { b.msgChan <- &newPeerMsg{peer: p} } +// QueueTx adds the passed transaction message and peer to the block handling +// queue. +func (b *blockManager) QueueTx(tx *btcwire.MsgTx, p *peer) { + // Don't accept more transactions if we're shutting down. + if atomic.LoadInt32(&b.shutdown) != 0 { + p.txProcessed <- false + return + } + + b.msgChan <- &txMsg{tx: tx, peer: p} +} + // QueueBlock adds the passed block message and peer to the block handling queue. func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { // Don't accept more blocks if we're shutting down. @@ -619,6 +723,7 @@ func newBlockManager(s *server) (*blockManager, error) { bm := blockManager{ server: s, blockPeer: make(map[btcwire.ShaHash]*peer), + requestedTxns: make(map[btcwire.ShaHash]bool), requestedBlocks: make(map[btcwire.ShaHash]bool), lastBlockLogTime: time.Now(), msgChan: make(chan interface{}, cfg.MaxPeers*3), diff --git a/mempool.go b/mempool.go index 92cd9471..ddd710d0 100644 --- a/mempool.go +++ b/mempool.go @@ -407,9 +407,9 @@ func (mp *txMemPool) maybeAddOrphan(tx *btcwire.MsgTx, txHash *btcwire.ShaHash) return nil } -// isTransactionInPool returns whether or not the passed transaction already +// IsTransactionInPool returns whether or not the passed transaction already // exists in the memory pool. -func (mp *txMemPool) isTransactionInPool(hash *btcwire.ShaHash) bool { +func (mp *txMemPool) IsTransactionInPool(hash *btcwire.ShaHash) bool { mp.lock.RLock() defer mp.lock.RUnlock() @@ -529,7 +529,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcwire.MsgTx, isOrphan *bool) e // be a quick check to weed out duplicates. It is more expensive to // detect a duplicate transaction in the main chain, so that is done // later. - if mp.isTransactionInPool(&txHash) { + if mp.IsTransactionInPool(&txHash) { str := fmt.Sprintf("already have transaction %v", txHash) return TxRuleError(str) } diff --git a/peer.go b/peer.go index 2f90512e..46c987aa 100644 --- a/peer.go +++ b/peer.go @@ -118,6 +118,7 @@ type peer struct { knownAddresses map[string]bool knownInventory *MruInventoryMap knownInvMutex sync.Mutex + requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager. requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager. lastBlock int32 retrycount int64 @@ -129,6 +130,7 @@ type peer struct { continueHash *btcwire.ShaHash outputQueue chan btcwire.Message outputInvChan chan *btcwire.InvVect + txProcessed chan bool blockProcessed chan bool quit chan bool } @@ -437,20 +439,13 @@ func (p *peer) handleTxMsg(msg *btcwire.MsgTx) { iv := btcwire.NewInvVect(btcwire.InvTypeTx, &hash) p.addKnownInventory(iv) - // Process the transaction. - err = p.server.txMemPool.ProcessTransaction(msg) - if err != nil { - // When the error is a rule error, it means the transaction was - // simply rejected as opposed to something actually going wrong, - // so log it as such. Otherwise, something really did go wrong, - // so log it as an actual error. - if _, ok := err.(TxRuleError); ok { - log.Debugf("Rejected transaction %v: %v", hash, err) - } else { - log.Errorf("Failed to process transaction %v: %v", hash, err) - } - return - } + // Queue the transaction up to be handled by the block manager and + // intentionally block further receives until the transaction is fully + // processed and known good or bad. This helps prevent a malicious peer + // from queueing up a bunch of bad transactions before disconnecting (or + // being disconnected) and wasting memory. + p.server.blockManager.QueueTx(msg, p) + <-p.txProcessed } // handleBlockMsg is invoked when a peer receives a block bitcoin message. It @@ -1129,11 +1124,13 @@ func newPeerBase(s *server, inbound bool) *peer { inbound: inbound, knownAddresses: make(map[string]bool), knownInventory: NewMruInventoryMap(maxKnownInventory), + requestedTxns: make(map[btcwire.ShaHash]bool), requestedBlocks: make(map[btcwire.ShaHash]bool), requestQueue: list.New(), invSendQueue: list.New(), outputQueue: make(chan btcwire.Message, outputBufferSize), outputInvChan: make(chan *btcwire.InvVect, outputBufferSize), + txProcessed: make(chan bool, 1), blockProcessed: make(chan bool, 1), quit: make(chan bool), }