From 28606122c3fc8aff9dc04fd1ba682df80e436491 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Mon, 14 Aug 2017 19:22:21 -0700 Subject: [PATCH] main: Reduce shared state between server and blockManager. Instead of having both server and blockManager be aware of the txProcessed and blockProcessed channels, now the server passed them as method arguments to blockProcessor. --- blockmanager.go | 29 +++++++++++++++++------------ server.go | 4 ++-- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 2557dc37..f7ab2a85 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -59,6 +59,7 @@ type newPeerMsg struct { type blockMsg struct { block *btcutil.Block peer *serverPeer + reply chan struct{} } // invMsg packages a bitcoin inv message and the peer it came from together @@ -83,8 +84,9 @@ type donePeerMsg struct { // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { - tx *btcutil.Tx - peer *serverPeer + tx *btcutil.Tx + peer *serverPeer + reply chan struct{} } // getSyncPeerMsg is a message type to be sent across the message channel for @@ -1126,11 +1128,11 @@ out: case *txMsg: b.handleTxMsg(msg) - msg.peer.txProcessed <- struct{}{} + msg.reply <- struct{}{} case *blockMsg: b.handleBlockMsg(msg) - msg.peer.blockProcessed <- struct{}{} + msg.reply <- struct{}{} case *invMsg: b.handleInvMsg(msg) @@ -1261,26 +1263,29 @@ func (b *blockManager) NewPeer(sp *serverPeer) { } // QueueTx adds the passed transaction message and peer to the block handling -// queue. -func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer) { +// queue. Responds to the done channel argument after the tx message is +// processed. +func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer, done chan struct{}) { // Don't accept more transactions if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { - sp.txProcessed <- struct{}{} + done <- struct{}{} return } - b.msgChan <- &txMsg{tx: tx, peer: sp} + b.msgChan <- &txMsg{tx: tx, peer: sp, reply: done} } -// QueueBlock adds the passed block message and peer to the block handling queue. -func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer) { +// QueueBlock adds the passed block message and peer to the block handling +// queue. Responds to the done channel argument after the block message is +// processed. +func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer, done chan struct{}) { // Don't accept more blocks if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { - sp.blockProcessed <- struct{}{} + done <- struct{}{} return } - b.msgChan <- &blockMsg{block: block, peer: sp} + b.msgChan <- &blockMsg{block: block, peer: sp, reply: done} } // QueueInv adds the passed inv message and peer to the block handling queue. diff --git a/server.go b/server.go index 945d9777..c17f79f0 100644 --- a/server.go +++ b/server.go @@ -485,7 +485,7 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { // processed and known good or bad. This helps prevent a malicious peer // from queuing up a bunch of bad transactions before disconnecting (or // being disconnected) and wasting memory. - sp.server.blockManager.QueueTx(tx, sp) + sp.server.blockManager.QueueTx(tx, sp, sp.txProcessed) <-sp.txProcessed } @@ -511,7 +511,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { // reference implementation processes blocks in the same // thread and therefore blocks further messages until // the bitcoin block has been fully processed. - sp.server.blockManager.QueueBlock(block, sp) + sp.server.blockManager.QueueBlock(block, sp, sp.blockProcessed) <-sp.blockProcessed }