main: Reduce shared state between server and blockManager.

Instead of having both server and blockManager be aware of the
txProcessed and blockProcessed channels, now the server passed them as
method arguments to blockProcessor.
This commit is contained in:
Jim Posen 2017-08-14 19:22:21 -07:00 committed by Dave Collins
parent 1b50c7300f
commit 28606122c3
2 changed files with 19 additions and 14 deletions

View file

@ -59,6 +59,7 @@ type newPeerMsg struct {
type blockMsg struct { type blockMsg struct {
block *btcutil.Block block *btcutil.Block
peer *serverPeer peer *serverPeer
reply chan struct{}
} }
// invMsg packages a bitcoin inv message and the peer it came from together // invMsg packages a bitcoin inv message and the peer it came from together
@ -83,8 +84,9 @@ type donePeerMsg struct {
// txMsg packages a bitcoin tx message and the peer it came from together // txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information. // so the block handler has access to that information.
type txMsg struct { type txMsg struct {
tx *btcutil.Tx tx *btcutil.Tx
peer *serverPeer peer *serverPeer
reply chan struct{}
} }
// getSyncPeerMsg is a message type to be sent across the message channel for // getSyncPeerMsg is a message type to be sent across the message channel for
@ -1126,11 +1128,11 @@ out:
case *txMsg: case *txMsg:
b.handleTxMsg(msg) b.handleTxMsg(msg)
msg.peer.txProcessed <- struct{}{} msg.reply <- struct{}{}
case *blockMsg: case *blockMsg:
b.handleBlockMsg(msg) b.handleBlockMsg(msg)
msg.peer.blockProcessed <- struct{}{} msg.reply <- struct{}{}
case *invMsg: case *invMsg:
b.handleInvMsg(msg) b.handleInvMsg(msg)
@ -1261,26 +1263,29 @@ func (b *blockManager) NewPeer(sp *serverPeer) {
} }
// QueueTx adds the passed transaction message and peer to the block handling // QueueTx adds the passed transaction message and peer to the block handling
// queue. // queue. Responds to the done channel argument after the tx message is
func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer) { // processed.
func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer, done chan struct{}) {
// Don't accept more transactions if we're shutting down. // Don't accept more transactions if we're shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 { if atomic.LoadInt32(&b.shutdown) != 0 {
sp.txProcessed <- struct{}{} done <- struct{}{}
return return
} }
b.msgChan <- &txMsg{tx: tx, peer: sp} b.msgChan <- &txMsg{tx: tx, peer: sp, reply: done}
} }
// QueueBlock adds the passed block message and peer to the block handling queue. // QueueBlock adds the passed block message and peer to the block handling
func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer) { // queue. Responds to the done channel argument after the block message is
// processed.
func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer, done chan struct{}) {
// Don't accept more blocks if we're shutting down. // Don't accept more blocks if we're shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 { if atomic.LoadInt32(&b.shutdown) != 0 {
sp.blockProcessed <- struct{}{} done <- struct{}{}
return return
} }
b.msgChan <- &blockMsg{block: block, peer: sp} b.msgChan <- &blockMsg{block: block, peer: sp, reply: done}
} }
// QueueInv adds the passed inv message and peer to the block handling queue. // QueueInv adds the passed inv message and peer to the block handling queue.

View file

@ -485,7 +485,7 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
// processed and known good or bad. This helps prevent a malicious peer // processed and known good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad transactions before disconnecting (or // from queuing up a bunch of bad transactions before disconnecting (or
// being disconnected) and wasting memory. // being disconnected) and wasting memory.
sp.server.blockManager.QueueTx(tx, sp) sp.server.blockManager.QueueTx(tx, sp, sp.txProcessed)
<-sp.txProcessed <-sp.txProcessed
} }
@ -511,7 +511,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
// reference implementation processes blocks in the same // reference implementation processes blocks in the same
// thread and therefore blocks further messages until // thread and therefore blocks further messages until
// the bitcoin block has been fully processed. // the bitcoin block has been fully processed.
sp.server.blockManager.QueueBlock(block, sp) sp.server.blockManager.QueueBlock(block, sp, sp.blockProcessed)
<-sp.blockProcessed <-sp.blockProcessed
} }