From 17a9b41bef9ed73569680a29fffd0357f0d50dab Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Tue, 24 Dec 2013 10:32:20 -0600 Subject: [PATCH] Cleanup peer.go. This commit does some housekeeping on peer.go to make the code more consistent, correct a few comments, and add new comments to explain the peer data flow. A couple of examples are variables not using the standard Go style (camelCase) and comments that don't match the style of other comments. --- blockmanager.go | 2 +- peer.go | 99 ++++++++++++++++++++++++++++++------------------- 2 files changed, 62 insertions(+), 39 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index a285a776..8150241e 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -528,7 +528,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // Add the inventory to the cache of known inventory // for the peer. - imsg.peer.addKnownInventory(iv) + imsg.peer.AddKnownInventory(iv) if b.fetchingHeaders { // if we are fetching headers and already know diff --git a/peer.go b/peer.go index 68d8adfd..25a6b38a 100644 --- a/peer.go +++ b/peer.go @@ -97,12 +97,29 @@ func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAdd return na, nil } +// TODO(davec): Rename and comment this type outMsg struct { msg btcwire.Message doneChan chan bool } -// peer provides a bitcoin peer for handling bitcoin communications. +// peer provides a bitcoin peer for handling bitcoin communications. The +// overall data flow is split into 3 goroutines and a separate block manager. +// Inbound messages are read via the inHandler goroutine and generally +// dispatched to their own handler. For inbound data-related messages such as +// blocks, transactions, and inventory, the data is pased on to the block +// manager to handle it. Outbound messages are queued via QueueMessage or +// QueueInventory. QueueMessage is intended for all messages, including +// responses to data such as blocks and transactions. QueueInventory, on the +// other hand, is only intended for relaying inventory as it employs a trickling +// mechanism to batch the inventory together. The data flow for outbound +// messages uses two goroutines, queueHandler and outHandler. The first, +// queueHandler, is used as a way for external entities (mainly block manager) +// to queue messages quickly regardless of whether the peer is currently +// sending or not. It acts as the traffic cop between the external world and +// the actual goroutine which writes to the network socket. In addition, the +// peer contains several functions which are of the form pushX, that are used +// to push messages to the peer. Internally they use QueueMessage. type peer struct { server *server protocolVersion uint32 @@ -123,23 +140,23 @@ type peer struct { knownAddresses map[string]bool knownInventory *MruInventoryMap knownInvMutex sync.Mutex - requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager. - requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager. + requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager + requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager lastBlock int32 - retrycount int64 - prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager. - prevGetBlocksStop *btcwire.ShaHash // owned by blockmaanger. + retryCount int64 + prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager + prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager requestQueue *list.List continueHash *btcwire.ShaHash outputQueue chan outMsg sendQueue chan outMsg sendDoneQueue chan bool queueWg sync.WaitGroup // TODO(oga) wg -> single use channel? - outputInvChan chan *btcwire.InvVect - txProcessed chan bool - blockProcessed chan bool - quit chan bool - userAgent string + outputInvChan chan *btcwire.InvVect + txProcessed chan bool + blockProcessed chan bool + quit chan bool + userAgent string } // String returns the peer's address and directionality as a human-readable @@ -160,9 +177,9 @@ func (p *peer) isKnownInventory(invVect *btcwire.InvVect) bool { return false } -// addKnownInventory adds the passed inventory to the cache of known inventory +// AddKnownInventory adds the passed inventory to the cache of known inventory // for the peer. It is safe for concurrent access. -func (p *peer) addKnownInventory(invVect *btcwire.InvVect) { +func (p *peer) AddKnownInventory(invVect *btcwire.InvVect) { p.knownInvMutex.Lock() defer p.knownInvMutex.Unlock() @@ -506,7 +523,7 @@ func (p *peer) handleTxMsg(msg *btcwire.MsgTx) { // methods and things such as hash caching. tx := btcutil.NewTx(msg) iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha()) - p.addKnownInventory(iv) + p.AddKnownInventory(iv) // Queue the transaction up to be handled by the block manager and // intentionally block further receives until the transaction is fully @@ -531,7 +548,7 @@ func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) { return } iv := btcwire.NewInvVect(btcwire.InvTypeBlock, hash) - p.addKnownInventory(iv) + p.AddKnownInventory(iv) // Queue the block up to be handled by the block // manager and intentionally block further receives @@ -1122,12 +1139,12 @@ out: idleTimer.Stop() - // Ensure connection is closed and notify server and block manager that - // the peer is done. + // Ensure connection is closed and notify the server that the peer is + // done. p.Disconnect() - p.server.donePeers <- p - // Only tell blockmanager we are gone if we ever told it we existed. + + // Only tell block manager we are gone if we ever told it we existed. if p.versionKnown { p.server.blockManager.DonePeer(p) } @@ -1145,10 +1162,10 @@ func (p *peer) queueHandler() { trickleTicker := time.NewTicker(time.Second * 10) // We keep the waiting flag so that we know if we have a message queued - // to the outHandler or not. We could use the presence of a head - // of the list for this but then we have rather racy concerns about - // whether it has gotten it at cleanup time - and thus who sends on the - // message's done channel. To avoid such confusion we keep a different + // to the outHandler or not. We could use the presence of a head of + // the list for this but then we have rather racy concerns about whether + // it has gotten it at cleanup time - and thus who sends on the + // message's done channel. To avoid such confusion we keep a different // flag and pendingMsgs only contains messages that we have not yet // passed to outHandler. waiting := false @@ -1171,22 +1188,28 @@ out: case msg := <-p.outputQueue: waiting = queuePacket(msg, pendingMsgs, waiting) + // This channel is notified when a message has been sent across + // the network socket. case <-p.sendDoneQueue: peerLog.Tracef("%s: acked by outhandler", p) - // one message on sendChan means one message on - // txDoneChan when it has been sent. + + // No longer waiting if there are no more messages + // in the pending messages queue. next := pendingMsgs.Front() - if next != nil { - val := pendingMsgs.Remove(next) - peerLog.Tracef("%s: sending to outHandler", p) - p.sendQueue <- val.(outMsg) - peerLog.Tracef("%s: sent to outHandler", p) - } else { + if next == nil { waiting = false + continue } + // Notify the outHandler about the next item to + // asynchronously send. + val := pendingMsgs.Remove(next) + peerLog.Tracef("%s: sending to outHandler", p) + p.sendQueue <- val.(outMsg) + peerLog.Tracef("%s: sent to outHandler", p) + case iv := <-p.outputInvChan: - // No handshake? They'll find out soon enough. + // No handshake? They'll find out soon enough. if p.versionKnown { invSendQueue.PushBack(iv) } @@ -1222,7 +1245,7 @@ out: // Add the inventory that is being relayed to // the known inventory for the peer. - p.addKnownInventory(iv) + p.AddKnownInventory(iv) } if len(invMsg.InvList) > 0 { waiting = queuePacket(outMsg{msg: invMsg}, @@ -1462,8 +1485,8 @@ func newPeerBase(s *server, inbound bool) *peer { requestedBlocks: make(map[btcwire.ShaHash]bool), requestQueue: list.New(), outputQueue: make(chan outMsg, outputBufferSize), - sendQueue: make(chan outMsg, 1), // nonblocking sync. - sendDoneQueue: make(chan bool, 1), // nonblocking sync. + sendQueue: make(chan outMsg, 1), // nonblocking sync + sendDoneQueue: make(chan bool, 1), // nonblocking sync outputInvChan: make(chan *btcwire.InvVect, outputBufferSize), txProcessed: make(chan bool, 1), blockProcessed: make(chan bool, 1), @@ -1543,14 +1566,14 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { srvrLog.Debugf("Attempting to connect to %s", faddr) conn, err := dial("tcp", addr) if err != nil { - p.retrycount += 1 + p.retryCount += 1 srvrLog.Debugf("Failed to connect to %s: %v", faddr, err) if !persistent { p.server.donePeers <- p return } - scaledInterval := connectionRetryInterval.Nanoseconds() * p.retrycount / 2 + scaledInterval := connectionRetryInterval.Nanoseconds() * p.retryCount / 2 scaledDuration := time.Duration(scaledInterval) srvrLog.Debugf("Retrying connection to %s in "+ "%s", faddr, scaledDuration) @@ -1570,7 +1593,7 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { conn.RemoteAddr()) p.conn = conn atomic.AddInt32(&p.connected, 1) - p.retrycount = 0 + p.retryCount = 0 p.Start() }