diff --git a/blockmanager.go b/blockmanager.go index fb066e76..67f1a80f 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -285,9 +285,21 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { break } + // A block has been accepted into the block chain. case btcchain.NTBlockAccepted: - // TODO(davec): Relay inventory, but don't relay old inventory - // during initial block download. + block, ok := notification.Data.(*btcutil.Block) + if !ok { + log.Warnf("[BMGR] Chain notification type not a block.") + break + } + + // It's ok to ignore the error here since the notification is + // coming from the chain code which has already cached the hash. + hash, _ := block.Sha() + + // Generate the inventory vector and relay it. + iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash) + b.server.RelayInventory(iv) } } diff --git a/mruinvmap.go b/mruinvmap.go new file mode 100644 index 00000000..f58648c1 --- /dev/null +++ b/mruinvmap.go @@ -0,0 +1,82 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "fmt" + "github.com/conformal/btcwire" + "time" +) + +// MruInventoryMap provides a map that is limited to a maximum number of items +// with eviction for the oldest entry when the limit is exceeded. +type MruInventoryMap struct { + invMap map[btcwire.InvVect]int64 // Use int64 for time for less mem. + limit uint +} + +// String returns the map as a human-readable string. +func (m MruInventoryMap) String() string { + return fmt.Sprintf("<%d>%v", m.limit, m.invMap) +} + +// Exists returns whether or not the passed inventory item is in the map. +func (m *MruInventoryMap) Exists(iv *btcwire.InvVect) bool { + if _, exists := m.invMap[*iv]; exists { + return true + } + return false +} + +// Add adds the passed inventory to the map and handles eviction of the oldest +// item if adding the new item would exceed the max limit. +func (m *MruInventoryMap) Add(iv *btcwire.InvVect) { + // When the limit is zero, nothing can be added to the map, so just + // return + if m.limit == 0 { + return + } + + // When the entry already exists update its last seen time. + if m.Exists(iv) { + m.invMap[*iv] = time.Now().Unix() + return + } + + // Evict the oldest entry if the the new entry would exceed the size + // limit for the map. + if uint(len(m.invMap))+1 > m.limit { + var oldestEntry btcwire.InvVect + var oldestTime int64 + for iv, lastUpdated := range m.invMap { + if oldestTime == 0 || lastUpdated < oldestTime { + oldestEntry = iv + oldestTime = lastUpdated + } + } + + m.Delete(&oldestEntry) + } + + m.invMap[*iv] = time.Now().Unix() + return +} + +// Delete deletes the passed inventory item from the map (if it exists). +func (m *MruInventoryMap) Delete(iv *btcwire.InvVect) { + delete(m.invMap, *iv) +} + +// NewMruInventoryMap returns a new inventory map that is limited to the number +// of entries specified by limit. When the number of entries exceeds the limit, +// the oldest (least recently used) entry will be removed to make room for the +// new entry.. +func NewMruInventoryMap(limit uint) *MruInventoryMap { + m := MruInventoryMap{ + invMap: make(map[btcwire.InvVect]int64), + limit: limit, + } + return &m +} diff --git a/peer.go b/peer.go index 5d0863c7..7a3aa30f 100644 --- a/peer.go +++ b/peer.go @@ -21,7 +21,17 @@ import ( "time" ) -const outputBufferSize = 50 +const ( + outputBufferSize = 50 + + // invTrickleSize is the maximum amount of inventory to send in a single + // message when trickling inventory to remote peers. + maxInvTrickleSize = 1000 + + // maxKnownInventory is the maximum number of items to keep in the known + // inventory cache. + maxKnownInventory = 20000 +) // userAgent is the user agent string used to identify ourselves to other // bitcoin peers. @@ -92,15 +102,40 @@ type peer struct { persistent bool versionKnown bool knownAddresses map[string]bool + knownInventory *MruInventoryMap + knownInvMutex sync.Mutex lastBlock int32 requestQueue *list.List + invSendQueue *list.List continueHash *btcwire.ShaHash wg sync.WaitGroup outputQueue chan btcwire.Message + outputInvChan chan *btcwire.InvVect blockProcessed chan bool quit chan bool } +// isKnownInventory returns whether or not the peer is known to have the passed +// inventory. It is safe for concurrent access. +func (p *peer) isKnownInventory(invVect *btcwire.InvVect) bool { + p.knownInvMutex.Lock() + defer p.knownInvMutex.Unlock() + + if p.knownInventory.Exists(invVect) { + return true + } + return false +} + +// addKnownInventory adds the passed inventory to the cache of known inventory +// for the peer. It is safe for concurrent access. +func (p *peer) addKnownInventory(invVect *btcwire.InvVect) { + p.knownInvMutex.Lock() + defer p.knownInvMutex.Unlock() + + p.knownInventory.Add(invVect) +} + // pushVersionMsg sends a version message to the connected peer using the // current state. func (p *peer) pushVersionMsg() error { @@ -289,7 +324,7 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error { return nil } -// pushGetBlocksMsg send a getblocks message for the provided block locator +// pushGetBlocksMsg sends a getblocks message for the provided block locator // and stop hash. func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error { msg := btcwire.NewMsgGetBlocks(stopHash) @@ -303,6 +338,38 @@ func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire return nil } +// handleBlockMsg is invoked when a peer receives a block bitcoin message. It +// blocks until the bitcoin block has been fully processed. +func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) { + // Convert the raw MsgBlock to a btcutil.Block which + // provides some convience methods and things such as + // hash caching. + block := btcutil.NewBlockFromBlockAndBytes(msg, buf) + + // Add the block to the known inventory for the peer. + hash, err := block.Sha() + if err != nil { + log.Errorf("Unable to get block hash: %v", err) + return + } + iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash) + p.addKnownInventory(iv) + + // Queue the block up to be handled by the block + // manager and intentionally block further receives + // until the bitcoin block is fully processed and known + // good or bad. This helps prevent a malicious peer + // from queueing up a bunch of bad blocks before + // disconnecting (or being disconnected) and wasting + // memory. Additionally, this behavior is depended on + // by at least the block acceptance test tool as the + // reference implementation processes blocks in the same + // thread and therefore blocks further messages until + // the bitcoin block has been fully processed. + p.server.blockManager.QueueBlock(block, p) + <-p.blockProcessed +} + // handleInvMsg is invoked when a peer receives an inv bitcoin message and is // used to examine the inventory being advertised by the remote peer and react // accordingly. @@ -329,6 +396,11 @@ func (p *peer) handleInvMsg(msg *btcwire.MsgInv) { for i, iv := range invVects { switch iv.Type { case btcwire.InvVect_Block: + // Add the inventory to the cache of known inventory + // for the peer. + p.addKnownInventory(iv) + + // Request the inventory if we don't already have it. if !chain.HaveInventory(iv) { // Add it to the request queue. p.requestQueue.PushBack(iv) @@ -722,7 +794,12 @@ func (p *peer) readMessage() (msg btcwire.Message, buf []byte, err error) { } // writeMessage sends a bitcoin Message to the peer with logging. -func (p *peer) writeMessage(msg btcwire.Message) error { +func (p *peer) writeMessage(msg btcwire.Message) { + // Don't do anything if we're disconnecting. + if p.disconnect == true { + return + } + log.Debugf("[PEER] Sending command [%v] to %s", msg.Command(), p.conn.RemoteAddr()) @@ -743,9 +820,10 @@ func (p *peer) writeMessage(msg btcwire.Message) error { // Write the message to the peer. err := btcwire.WriteMessage(p.conn, msg, p.protocolVersion, p.btcnet) if err != nil { - return err + p.Disconnect() + log.Errorf("[PEER] %v", err) + return } - return nil } // isAllowedByRegression returns whether or not the passed error is allowed by @@ -828,20 +906,7 @@ out: p.server.BroadcastMessage(msg, p) case *btcwire.MsgBlock: - // Queue the block up to be handled by the block - // manager and intentionally block further receives - // until the bitcoin block is fully processed and known - // good or bad. This helps prevent a malicious peer - // from queueing up a bunch of bad blocks before - // disconnecting (or being disconnected) and wasting - // memory. Additionally, this behavior is depended on - // by at least the block acceptance test tool as the - // reference implementation processes blocks in the same - // thread and therefore blocks further messages until - // the bitcoin block has been fully processed. - block := btcutil.NewBlockFromBlockAndBytes(msg, buf) - p.server.blockManager.QueueBlock(block, p) - <-p.blockProcessed + p.handleBlockMsg(msg, buf) case *btcwire.MsgInv: p.handleInvMsg(msg) @@ -876,18 +941,48 @@ out: // goroutine. It uses a buffered channel to serialize output messages while // allowing the sender to continue running asynchronously. func (p *peer) outHandler() { + trickleTicker := time.NewTicker(time.Second * 10) out: for { select { case msg := <-p.outputQueue: - // Don't send anything if we're disconnected. - if p.disconnect { + p.writeMessage(msg) + + case iv := <-p.outputInvChan: + p.invSendQueue.PushBack(iv) + + case <-trickleTicker.C: + // Don't send anything if we're disconnecting or there + // is no queued inventory. + if p.disconnect || p.invSendQueue.Len() == 0 { continue } - err := p.writeMessage(msg) - if err != nil { - p.Disconnect() - log.Errorf("[PEER] %v", err) + + // Create a new inventory message, populate it with as + // much per + invMsg := btcwire.NewMsgInv() + for e := p.invSendQueue.Front(); e != nil; e = p.invSendQueue.Front() { + iv := p.invSendQueue.Remove(e).(*btcwire.InvVect) + + // Don't send inventory that became known after + // the initial check. + if p.isKnownInventory(iv) { + continue + } + + invMsg.AddInvVect(iv) + if len(invMsg.InvList) >= maxInvTrickleSize { + p.writeMessage(invMsg) + invMsg = btcwire.NewMsgInv() + } + + // Add the inventory that is being relayed to + // the known inventory for the peer. + p.addKnownInventory(iv) + } + if len(invMsg.InvList) > 0 { + log.Infof("Relaying %d inv to %v", len(invMsg.InvList), p.conn.RemoteAddr()) + p.writeMessage(invMsg) } case <-p.quit: @@ -905,6 +1000,20 @@ func (p *peer) QueueMessage(msg btcwire.Message) { p.outputQueue <- msg } +// QueueInventory adds the passed inventory to the inventory send queue which +// might not be sent right away, rather it is trickled to the peer in batches. +// Inventory that the peer is already known to have is ignored. It is safe for +// concurrent access. +func (p *peer) QueueInventory(invVect *btcwire.InvVect) { + // Don't add the inventory to the send queue if the peer is + // already known to have it. + if p.isKnownInventory(invVect) { + return + } + + p.outputInvChan <- invVect +} + // Start begins processing input and output messages. It also sends the initial // version message for outbound connections to start the negotiation process. func (p *peer) Start() error { @@ -962,8 +1071,11 @@ func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer { inbound: inbound, persistent: persistent, knownAddresses: make(map[string]bool), + knownInventory: NewMruInventoryMap(maxKnownInventory), requestQueue: list.New(), + invSendQueue: list.New(), outputQueue: make(chan btcwire.Message, outputBufferSize), + outputInvChan: make(chan *btcwire.InvVect, outputBufferSize), blockProcessed: make(chan bool, 1), quit: make(chan bool), } diff --git a/server.go b/server.go index 0aaf4533..fbad5b84 100644 --- a/server.go +++ b/server.go @@ -53,6 +53,7 @@ type server struct { newPeers chan *peer donePeers chan *peer banPeers chan *peer + relayInv chan *btcwire.InvVect broadcast chan broadcastMsg wg sync.WaitGroup quit chan bool @@ -143,6 +144,24 @@ func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) { } +// handleRelayInvMsg deals with relaying inventory to peer that are not already +// known to have it. It is invoked from the peerHandler goroutine. +func (s *server) handleRelayInvMsg(peers *list.List, iv *btcwire.InvVect) { + // TODO(davec): Don't relay inventory during the initial block chain + // download. + + // Loop through all connected peers and relay the inventory to those + // which are not already known to have it. + for e := peers.Front(); e != nil; e = e.Next() { + p := e.Value.(*peer) + + // Queue the inventory to be relayed with the next batch. It + // will be ignored if the peer is already known to have the + // inventory. + p.QueueInventory(iv) + } +} + // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked // from the peerHandler goroutine. func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { @@ -158,7 +177,6 @@ func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { p.QueueMessage(bmsg.message) } } - } // listenHandler is the main listener which accepts incoming connections for the @@ -211,6 +229,10 @@ func (s *server) peerHandler() { case p := <-s.banPeers: s.handleBanPeerMsg(bannedPeers, p) + // New inventory to potentially be relayed to other peers. + case invMsg := <-s.relayInv: + s.handleRelayInvMsg(peers, invMsg) + // Message to broadcast to all connected peers except those // which are excluded by the message. case bmsg := <-s.broadcast: @@ -242,6 +264,12 @@ func (s *server) BanPeer(p *peer) { s.banPeers <- p } +// RelayInventory relays the passed inventory to all connected peers that are +// not already known to have it. +func (s *server) RelayInventory(invVect *btcwire.InvVect) { + s.relayInv <- invVect +} + // BroadcastMessage sends msg to all peers currently connected to the server // except those in the passed peers to exclude. func (s *server) BroadcastMessage(msg btcwire.Message, exclPeers ...*peer) { @@ -437,6 +465,7 @@ func newServer(addr string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*server, er newPeers: make(chan *peer, cfg.MaxPeers), donePeers: make(chan *peer, cfg.MaxPeers), banPeers: make(chan *peer, cfg.MaxPeers), + relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers), quit: make(chan bool), db: db,