Move transaction handling into block manager.

The block manager handles inventory messges to know which inventory should
be requested based on what is already known and what is already in flight.
So, this commit adds logic to ask the transaction memory pool if the
transaction is already known before requesting it and tracks pending
requests into an in-flight transaction map owned by the block manager.

It also moves the transaction processing into the block manager so the
in-flight map can be properly cleaned.
This commit is contained in:
Dave Collins 2013-10-08 10:47:00 -05:00
parent e8d73d83ef
commit a0119b056e
3 changed files with 129 additions and 27 deletions

View file

@ -54,7 +54,7 @@ type donePeerMsg struct {
// txMsg packages a bitcoin tx message and the peer it came from together // txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information. // so the block handler has access to that information.
type txMsg struct { type txMsg struct {
msg *btcwire.MsgTx tx *btcwire.MsgTx
peer *peer peer *peer
} }
@ -66,6 +66,7 @@ type blockManager struct {
shutdown int32 shutdown int32
blockChain *btcchain.BlockChain blockChain *btcchain.BlockChain
blockPeer map[btcwire.ShaHash]*peer blockPeer map[btcwire.ShaHash]*peer
requestedTxns map[btcwire.ShaHash]bool
requestedBlocks map[btcwire.ShaHash]bool requestedBlocks map[btcwire.ShaHash]bool
receivedLogBlocks int64 receivedLogBlocks int64
receivedLogTx int64 receivedLogTx int64
@ -200,7 +201,13 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) {
log.Infof("[BMGR] Lost peer %s", p) log.Infof("[BMGR] Lost peer %s", p)
// remove requested blocks from the global map so that they will be // Remove requested transactions from the global map so that they will
// be fetched from elsewhere next time we get an inv.
for k := range p.requestedTxns {
delete(b.requestedTxns, k)
}
// Remove requested blocks from the global map so that they will be
// fetched from elsewhere next time we get an inv. // fetched from elsewhere next time we get an inv.
// TODO(oga) we could possibly here check which peers have these blocks // TODO(oga) we could possibly here check which peers have these blocks
// and request them now to speed things up a little. // and request them now to speed things up a little.
@ -247,6 +254,44 @@ func (b *blockManager) logBlockHeight(numTx, height int64) {
b.lastBlockLogTime = now b.lastBlockLogTime = now
} }
// handleTxMsg handles transaction messages from all peers.
func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// Keep track of which peer the tx was sent from.
txHash, _ := tmsg.tx.TxSha()
// If we didn't ask for this block then the peer is misbehaving.
if _, ok := tmsg.peer.requestedTxns[txHash]; !ok {
log.Warnf("[BMGR] Got unrequested transaction %v from %s -- "+
"disconnecting", &txHash, tmsg.peer.addr)
tmsg.peer.Disconnect()
return
}
// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
err := tmsg.peer.server.txMemPool.ProcessTransaction(tmsg.tx)
// Remove transaction from request maps. Either the mempool/chain
// already knows about it and as such we shouldn't have any more
// instances of trying to fetch it, or we failed to insert and thus
// we'll retry next time we get an inv.
delete(tmsg.peer.requestedTxns, txHash)
delete(b.requestedTxns, txHash)
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
// so log it as such. Otherwise, something really did go wrong,
// so log it as an actual error.
if _, ok := err.(TxRuleError); ok {
log.Debugf("Rejected transaction %v: %v", txHash, err)
} else {
log.Errorf("Failed to process transaction %v: %v", txHash, err)
}
return
}
}
// handleBlockMsg handles block messages from all peers. // handleBlockMsg handles block messages from all peers.
func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Keep track of which peer the block was sent from so the notification // Keep track of which peer the block was sent from so the notification
@ -312,6 +357,35 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
b.server.db.Sync() b.server.db.Sync()
} }
// haveInventory returns whether or not the inventory represented by the passed
// inventory vector is known. This includes checking all of the various places
// inventory can be when it is in different states such as blocks that are part
// of the main chain, on a side chain, in the orphan pool, and transactions that
// in the memory pool (either the main pool or orphan pool).
func (b *blockManager) haveInventory(invVect *btcwire.InvVect) bool {
switch invVect.Type {
case btcwire.InvVect_Block:
// Ask chain if the block is known to it in any form (main
// chain, side chain, or orphan).
return b.blockChain.HaveBlock(&invVect.Hash)
case btcwire.InvVect_Tx:
// Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan).
if b.server.txMemPool.IsTransactionInPool(&invVect.Hash) {
return true
}
// Check if the transaction exists from the point of view of the
// end of the main chain.
return b.server.db.ExistsTxSha(&invVect.Hash)
}
// The requested inventory is is an unsupported type, so just claim
// it is known to avoid requesting it.
return true
}
// handleInvMsg handles inv messages from all peers. // handleInvMsg handles inv messages from all peers.
// We examine the inventory advertised by the remote peer and act accordingly. // We examine the inventory advertised by the remote peer and act accordingly.
// //
@ -351,7 +425,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
imsg.peer.addKnownInventory(iv) imsg.peer.addKnownInventory(iv)
// Request the inventory if we don't already have it. // Request the inventory if we don't already have it.
if !chain.HaveInventory(iv) { if !b.haveInventory(iv) {
// Add it to the request queue. // Add it to the request queue.
imsg.peer.requestQueue.PushBack(iv) imsg.peer.requestQueue.PushBack(iv)
continue continue
@ -406,13 +480,27 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
for e := requestQueue.Front(); e != nil; e = requestQueue.Front() { for e := requestQueue.Front(); e != nil; e = requestQueue.Front() {
iv := e.Value.(*btcwire.InvVect) iv := e.Value.(*btcwire.InvVect)
imsg.peer.requestQueue.Remove(e) imsg.peer.requestQueue.Remove(e)
// check that no one else has asked for this. if so we don't
// need to ask. switch iv.Type {
if _, exists := b.requestedBlocks[iv.Hash]; !exists { case btcwire.InvVect_Block:
b.requestedBlocks[iv.Hash] = true // Request the block if there is not already a pending
imsg.peer.requestedBlocks[iv.Hash] = true // request.
gdmsg.AddInvVect(iv) if _, exists := b.requestedBlocks[iv.Hash]; !exists {
numRequested++ b.requestedBlocks[iv.Hash] = true
imsg.peer.requestedBlocks[iv.Hash] = true
gdmsg.AddInvVect(iv)
numRequested++
}
case btcwire.InvVect_Tx:
// Request the transaction if there is not already a
// pending request.
if _, exists := b.requestedTxns[iv.Hash]; !exists {
b.requestedTxns[iv.Hash] = true
imsg.peer.requestedTxns[iv.Hash] = true
gdmsg.AddInvVect(iv)
numRequested++
}
} }
if numRequested >= btcwire.MaxInvPerMsg { if numRequested >= btcwire.MaxInvPerMsg {
@ -440,6 +528,10 @@ out:
case *newPeerMsg: case *newPeerMsg:
b.handleNewPeerMsg(candidatePeers, msg.peer) b.handleNewPeerMsg(candidatePeers, msg.peer)
case *txMsg:
b.handleTxMsg(msg)
msg.peer.txProcessed <- true
case *blockMsg: case *blockMsg:
b.handleBlockMsg(msg) b.handleBlockMsg(msg)
msg.peer.blockProcessed <- true msg.peer.blockProcessed <- true
@ -554,6 +646,18 @@ func (b *blockManager) NewPeer(p *peer) {
b.msgChan <- &newPeerMsg{peer: p} b.msgChan <- &newPeerMsg{peer: p}
} }
// QueueTx adds the passed transaction message and peer to the block handling
// queue.
func (b *blockManager) QueueTx(tx *btcwire.MsgTx, p *peer) {
// Don't accept more transactions if we're shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
p.txProcessed <- false
return
}
b.msgChan <- &txMsg{tx: tx, peer: p}
}
// QueueBlock adds the passed block message and peer to the block handling queue. // QueueBlock adds the passed block message and peer to the block handling queue.
func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
// Don't accept more blocks if we're shutting down. // Don't accept more blocks if we're shutting down.
@ -619,6 +723,7 @@ func newBlockManager(s *server) (*blockManager, error) {
bm := blockManager{ bm := blockManager{
server: s, server: s,
blockPeer: make(map[btcwire.ShaHash]*peer), blockPeer: make(map[btcwire.ShaHash]*peer),
requestedTxns: make(map[btcwire.ShaHash]bool),
requestedBlocks: make(map[btcwire.ShaHash]bool), requestedBlocks: make(map[btcwire.ShaHash]bool),
lastBlockLogTime: time.Now(), lastBlockLogTime: time.Now(),
msgChan: make(chan interface{}, cfg.MaxPeers*3), msgChan: make(chan interface{}, cfg.MaxPeers*3),

View file

@ -407,9 +407,9 @@ func (mp *txMemPool) maybeAddOrphan(tx *btcwire.MsgTx, txHash *btcwire.ShaHash)
return nil return nil
} }
// isTransactionInPool returns whether or not the passed transaction already // IsTransactionInPool returns whether or not the passed transaction already
// exists in the memory pool. // exists in the memory pool.
func (mp *txMemPool) isTransactionInPool(hash *btcwire.ShaHash) bool { func (mp *txMemPool) IsTransactionInPool(hash *btcwire.ShaHash) bool {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
@ -529,7 +529,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcwire.MsgTx, isOrphan *bool) e
// be a quick check to weed out duplicates. It is more expensive to // be a quick check to weed out duplicates. It is more expensive to
// detect a duplicate transaction in the main chain, so that is done // detect a duplicate transaction in the main chain, so that is done
// later. // later.
if mp.isTransactionInPool(&txHash) { if mp.IsTransactionInPool(&txHash) {
str := fmt.Sprintf("already have transaction %v", txHash) str := fmt.Sprintf("already have transaction %v", txHash)
return TxRuleError(str) return TxRuleError(str)
} }

25
peer.go
View file

@ -118,6 +118,7 @@ type peer struct {
knownAddresses map[string]bool knownAddresses map[string]bool
knownInventory *MruInventoryMap knownInventory *MruInventoryMap
knownInvMutex sync.Mutex knownInvMutex sync.Mutex
requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager.
requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager. requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager.
lastBlock int32 lastBlock int32
retrycount int64 retrycount int64
@ -129,6 +130,7 @@ type peer struct {
continueHash *btcwire.ShaHash continueHash *btcwire.ShaHash
outputQueue chan btcwire.Message outputQueue chan btcwire.Message
outputInvChan chan *btcwire.InvVect outputInvChan chan *btcwire.InvVect
txProcessed chan bool
blockProcessed chan bool blockProcessed chan bool
quit chan bool quit chan bool
} }
@ -437,20 +439,13 @@ func (p *peer) handleTxMsg(msg *btcwire.MsgTx) {
iv := btcwire.NewInvVect(btcwire.InvTypeTx, &hash) iv := btcwire.NewInvVect(btcwire.InvTypeTx, &hash)
p.addKnownInventory(iv) p.addKnownInventory(iv)
// Process the transaction. // Queue the transaction up to be handled by the block manager and
err = p.server.txMemPool.ProcessTransaction(msg) // intentionally block further receives until the transaction is fully
if err != nil { // processed and known good or bad. This helps prevent a malicious peer
// When the error is a rule error, it means the transaction was // from queueing up a bunch of bad transactions before disconnecting (or
// simply rejected as opposed to something actually going wrong, // being disconnected) and wasting memory.
// so log it as such. Otherwise, something really did go wrong, p.server.blockManager.QueueTx(msg, p)
// so log it as an actual error. <-p.txProcessed
if _, ok := err.(TxRuleError); ok {
log.Debugf("Rejected transaction %v: %v", hash, err)
} else {
log.Errorf("Failed to process transaction %v: %v", hash, err)
}
return
}
} }
// handleBlockMsg is invoked when a peer receives a block bitcoin message. It // handleBlockMsg is invoked when a peer receives a block bitcoin message. It
@ -1129,11 +1124,13 @@ func newPeerBase(s *server, inbound bool) *peer {
inbound: inbound, inbound: inbound,
knownAddresses: make(map[string]bool), knownAddresses: make(map[string]bool),
knownInventory: NewMruInventoryMap(maxKnownInventory), knownInventory: NewMruInventoryMap(maxKnownInventory),
requestedTxns: make(map[btcwire.ShaHash]bool),
requestedBlocks: make(map[btcwire.ShaHash]bool), requestedBlocks: make(map[btcwire.ShaHash]bool),
requestQueue: list.New(), requestQueue: list.New(),
invSendQueue: list.New(), invSendQueue: list.New(),
outputQueue: make(chan btcwire.Message, outputBufferSize), outputQueue: make(chan btcwire.Message, outputBufferSize),
outputInvChan: make(chan *btcwire.InvVect, outputBufferSize), outputInvChan: make(chan *btcwire.InvVect, outputBufferSize),
txProcessed: make(chan bool, 1),
blockProcessed: make(chan bool, 1), blockProcessed: make(chan bool, 1),
quit: make(chan bool), quit: make(chan bool),
} }