Add block inventory relay.
This commit adds support for relaying blocks between peers. It keeps track of inventory that has either already been advertised to remote peers or advertised by remote peers using a size-limited most recently used cache. This helps avoid relaying inventory the peer already knows as much as possible while not allowing rogue peers to eat up arbitrary amounts of memory with bogus inventory.
This commit is contained in:
parent
adf7149838
commit
121f7a47d4
4 changed files with 263 additions and 28 deletions
|
@ -285,9 +285,21 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
|
|||
break
|
||||
}
|
||||
|
||||
// A block has been accepted into the block chain.
|
||||
case btcchain.NTBlockAccepted:
|
||||
// TODO(davec): Relay inventory, but don't relay old inventory
|
||||
// during initial block download.
|
||||
block, ok := notification.Data.(*btcutil.Block)
|
||||
if !ok {
|
||||
log.Warnf("[BMGR] Chain notification type not a block.")
|
||||
break
|
||||
}
|
||||
|
||||
// It's ok to ignore the error here since the notification is
|
||||
// coming from the chain code which has already cached the hash.
|
||||
hash, _ := block.Sha()
|
||||
|
||||
// Generate the inventory vector and relay it.
|
||||
iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash)
|
||||
b.server.RelayInventory(iv)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
82
mruinvmap.go
Normal file
82
mruinvmap.go
Normal file
|
@ -0,0 +1,82 @@
|
|||
// Copyright (c) 2013 Conformal Systems LLC.
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/conformal/btcwire"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MruInventoryMap provides a map that is limited to a maximum number of items
|
||||
// with eviction for the oldest entry when the limit is exceeded.
|
||||
type MruInventoryMap struct {
|
||||
invMap map[btcwire.InvVect]int64 // Use int64 for time for less mem.
|
||||
limit uint
|
||||
}
|
||||
|
||||
// String returns the map as a human-readable string.
|
||||
func (m MruInventoryMap) String() string {
|
||||
return fmt.Sprintf("<%d>%v", m.limit, m.invMap)
|
||||
}
|
||||
|
||||
// Exists returns whether or not the passed inventory item is in the map.
|
||||
func (m *MruInventoryMap) Exists(iv *btcwire.InvVect) bool {
|
||||
if _, exists := m.invMap[*iv]; exists {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Add adds the passed inventory to the map and handles eviction of the oldest
|
||||
// item if adding the new item would exceed the max limit.
|
||||
func (m *MruInventoryMap) Add(iv *btcwire.InvVect) {
|
||||
// When the limit is zero, nothing can be added to the map, so just
|
||||
// return
|
||||
if m.limit == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// When the entry already exists update its last seen time.
|
||||
if m.Exists(iv) {
|
||||
m.invMap[*iv] = time.Now().Unix()
|
||||
return
|
||||
}
|
||||
|
||||
// Evict the oldest entry if the the new entry would exceed the size
|
||||
// limit for the map.
|
||||
if uint(len(m.invMap))+1 > m.limit {
|
||||
var oldestEntry btcwire.InvVect
|
||||
var oldestTime int64
|
||||
for iv, lastUpdated := range m.invMap {
|
||||
if oldestTime == 0 || lastUpdated < oldestTime {
|
||||
oldestEntry = iv
|
||||
oldestTime = lastUpdated
|
||||
}
|
||||
}
|
||||
|
||||
m.Delete(&oldestEntry)
|
||||
}
|
||||
|
||||
m.invMap[*iv] = time.Now().Unix()
|
||||
return
|
||||
}
|
||||
|
||||
// Delete deletes the passed inventory item from the map (if it exists).
|
||||
func (m *MruInventoryMap) Delete(iv *btcwire.InvVect) {
|
||||
delete(m.invMap, *iv)
|
||||
}
|
||||
|
||||
// NewMruInventoryMap returns a new inventory map that is limited to the number
|
||||
// of entries specified by limit. When the number of entries exceeds the limit,
|
||||
// the oldest (least recently used) entry will be removed to make room for the
|
||||
// new entry..
|
||||
func NewMruInventoryMap(limit uint) *MruInventoryMap {
|
||||
m := MruInventoryMap{
|
||||
invMap: make(map[btcwire.InvVect]int64),
|
||||
limit: limit,
|
||||
}
|
||||
return &m
|
||||
}
|
162
peer.go
162
peer.go
|
@ -21,7 +21,17 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const outputBufferSize = 50
|
||||
const (
|
||||
outputBufferSize = 50
|
||||
|
||||
// invTrickleSize is the maximum amount of inventory to send in a single
|
||||
// message when trickling inventory to remote peers.
|
||||
maxInvTrickleSize = 1000
|
||||
|
||||
// maxKnownInventory is the maximum number of items to keep in the known
|
||||
// inventory cache.
|
||||
maxKnownInventory = 20000
|
||||
)
|
||||
|
||||
// userAgent is the user agent string used to identify ourselves to other
|
||||
// bitcoin peers.
|
||||
|
@ -92,15 +102,40 @@ type peer struct {
|
|||
persistent bool
|
||||
versionKnown bool
|
||||
knownAddresses map[string]bool
|
||||
knownInventory *MruInventoryMap
|
||||
knownInvMutex sync.Mutex
|
||||
lastBlock int32
|
||||
requestQueue *list.List
|
||||
invSendQueue *list.List
|
||||
continueHash *btcwire.ShaHash
|
||||
wg sync.WaitGroup
|
||||
outputQueue chan btcwire.Message
|
||||
outputInvChan chan *btcwire.InvVect
|
||||
blockProcessed chan bool
|
||||
quit chan bool
|
||||
}
|
||||
|
||||
// isKnownInventory returns whether or not the peer is known to have the passed
|
||||
// inventory. It is safe for concurrent access.
|
||||
func (p *peer) isKnownInventory(invVect *btcwire.InvVect) bool {
|
||||
p.knownInvMutex.Lock()
|
||||
defer p.knownInvMutex.Unlock()
|
||||
|
||||
if p.knownInventory.Exists(invVect) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// addKnownInventory adds the passed inventory to the cache of known inventory
|
||||
// for the peer. It is safe for concurrent access.
|
||||
func (p *peer) addKnownInventory(invVect *btcwire.InvVect) {
|
||||
p.knownInvMutex.Lock()
|
||||
defer p.knownInvMutex.Unlock()
|
||||
|
||||
p.knownInventory.Add(invVect)
|
||||
}
|
||||
|
||||
// pushVersionMsg sends a version message to the connected peer using the
|
||||
// current state.
|
||||
func (p *peer) pushVersionMsg() error {
|
||||
|
@ -289,7 +324,7 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// pushGetBlocksMsg send a getblocks message for the provided block locator
|
||||
// pushGetBlocksMsg sends a getblocks message for the provided block locator
|
||||
// and stop hash.
|
||||
func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
|
||||
msg := btcwire.NewMsgGetBlocks(stopHash)
|
||||
|
@ -303,6 +338,38 @@ func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire
|
|||
return nil
|
||||
}
|
||||
|
||||
// handleBlockMsg is invoked when a peer receives a block bitcoin message. It
|
||||
// blocks until the bitcoin block has been fully processed.
|
||||
func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) {
|
||||
// Convert the raw MsgBlock to a btcutil.Block which
|
||||
// provides some convience methods and things such as
|
||||
// hash caching.
|
||||
block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
|
||||
|
||||
// Add the block to the known inventory for the peer.
|
||||
hash, err := block.Sha()
|
||||
if err != nil {
|
||||
log.Errorf("Unable to get block hash: %v", err)
|
||||
return
|
||||
}
|
||||
iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash)
|
||||
p.addKnownInventory(iv)
|
||||
|
||||
// Queue the block up to be handled by the block
|
||||
// manager and intentionally block further receives
|
||||
// until the bitcoin block is fully processed and known
|
||||
// good or bad. This helps prevent a malicious peer
|
||||
// from queueing up a bunch of bad blocks before
|
||||
// disconnecting (or being disconnected) and wasting
|
||||
// memory. Additionally, this behavior is depended on
|
||||
// by at least the block acceptance test tool as the
|
||||
// reference implementation processes blocks in the same
|
||||
// thread and therefore blocks further messages until
|
||||
// the bitcoin block has been fully processed.
|
||||
p.server.blockManager.QueueBlock(block, p)
|
||||
<-p.blockProcessed
|
||||
}
|
||||
|
||||
// handleInvMsg is invoked when a peer receives an inv bitcoin message and is
|
||||
// used to examine the inventory being advertised by the remote peer and react
|
||||
// accordingly.
|
||||
|
@ -329,6 +396,11 @@ func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
|
|||
for i, iv := range invVects {
|
||||
switch iv.Type {
|
||||
case btcwire.InvVect_Block:
|
||||
// Add the inventory to the cache of known inventory
|
||||
// for the peer.
|
||||
p.addKnownInventory(iv)
|
||||
|
||||
// Request the inventory if we don't already have it.
|
||||
if !chain.HaveInventory(iv) {
|
||||
// Add it to the request queue.
|
||||
p.requestQueue.PushBack(iv)
|
||||
|
@ -722,7 +794,12 @@ func (p *peer) readMessage() (msg btcwire.Message, buf []byte, err error) {
|
|||
}
|
||||
|
||||
// writeMessage sends a bitcoin Message to the peer with logging.
|
||||
func (p *peer) writeMessage(msg btcwire.Message) error {
|
||||
func (p *peer) writeMessage(msg btcwire.Message) {
|
||||
// Don't do anything if we're disconnecting.
|
||||
if p.disconnect == true {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("[PEER] Sending command [%v] to %s", msg.Command(),
|
||||
p.conn.RemoteAddr())
|
||||
|
||||
|
@ -743,9 +820,10 @@ func (p *peer) writeMessage(msg btcwire.Message) error {
|
|||
// Write the message to the peer.
|
||||
err := btcwire.WriteMessage(p.conn, msg, p.protocolVersion, p.btcnet)
|
||||
if err != nil {
|
||||
return err
|
||||
p.Disconnect()
|
||||
log.Errorf("[PEER] %v", err)
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// isAllowedByRegression returns whether or not the passed error is allowed by
|
||||
|
@ -828,20 +906,7 @@ out:
|
|||
p.server.BroadcastMessage(msg, p)
|
||||
|
||||
case *btcwire.MsgBlock:
|
||||
// Queue the block up to be handled by the block
|
||||
// manager and intentionally block further receives
|
||||
// until the bitcoin block is fully processed and known
|
||||
// good or bad. This helps prevent a malicious peer
|
||||
// from queueing up a bunch of bad blocks before
|
||||
// disconnecting (or being disconnected) and wasting
|
||||
// memory. Additionally, this behavior is depended on
|
||||
// by at least the block acceptance test tool as the
|
||||
// reference implementation processes blocks in the same
|
||||
// thread and therefore blocks further messages until
|
||||
// the bitcoin block has been fully processed.
|
||||
block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
|
||||
p.server.blockManager.QueueBlock(block, p)
|
||||
<-p.blockProcessed
|
||||
p.handleBlockMsg(msg, buf)
|
||||
|
||||
case *btcwire.MsgInv:
|
||||
p.handleInvMsg(msg)
|
||||
|
@ -876,18 +941,48 @@ out:
|
|||
// goroutine. It uses a buffered channel to serialize output messages while
|
||||
// allowing the sender to continue running asynchronously.
|
||||
func (p *peer) outHandler() {
|
||||
trickleTicker := time.NewTicker(time.Second * 10)
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case msg := <-p.outputQueue:
|
||||
// Don't send anything if we're disconnected.
|
||||
if p.disconnect {
|
||||
p.writeMessage(msg)
|
||||
|
||||
case iv := <-p.outputInvChan:
|
||||
p.invSendQueue.PushBack(iv)
|
||||
|
||||
case <-trickleTicker.C:
|
||||
// Don't send anything if we're disconnecting or there
|
||||
// is no queued inventory.
|
||||
if p.disconnect || p.invSendQueue.Len() == 0 {
|
||||
continue
|
||||
}
|
||||
err := p.writeMessage(msg)
|
||||
if err != nil {
|
||||
p.Disconnect()
|
||||
log.Errorf("[PEER] %v", err)
|
||||
|
||||
// Create a new inventory message, populate it with as
|
||||
// much per
|
||||
invMsg := btcwire.NewMsgInv()
|
||||
for e := p.invSendQueue.Front(); e != nil; e = p.invSendQueue.Front() {
|
||||
iv := p.invSendQueue.Remove(e).(*btcwire.InvVect)
|
||||
|
||||
// Don't send inventory that became known after
|
||||
// the initial check.
|
||||
if p.isKnownInventory(iv) {
|
||||
continue
|
||||
}
|
||||
|
||||
invMsg.AddInvVect(iv)
|
||||
if len(invMsg.InvList) >= maxInvTrickleSize {
|
||||
p.writeMessage(invMsg)
|
||||
invMsg = btcwire.NewMsgInv()
|
||||
}
|
||||
|
||||
// Add the inventory that is being relayed to
|
||||
// the known inventory for the peer.
|
||||
p.addKnownInventory(iv)
|
||||
}
|
||||
if len(invMsg.InvList) > 0 {
|
||||
log.Infof("Relaying %d inv to %v", len(invMsg.InvList), p.conn.RemoteAddr())
|
||||
p.writeMessage(invMsg)
|
||||
}
|
||||
|
||||
case <-p.quit:
|
||||
|
@ -905,6 +1000,20 @@ func (p *peer) QueueMessage(msg btcwire.Message) {
|
|||
p.outputQueue <- msg
|
||||
}
|
||||
|
||||
// QueueInventory adds the passed inventory to the inventory send queue which
|
||||
// might not be sent right away, rather it is trickled to the peer in batches.
|
||||
// Inventory that the peer is already known to have is ignored. It is safe for
|
||||
// concurrent access.
|
||||
func (p *peer) QueueInventory(invVect *btcwire.InvVect) {
|
||||
// Don't add the inventory to the send queue if the peer is
|
||||
// already known to have it.
|
||||
if p.isKnownInventory(invVect) {
|
||||
return
|
||||
}
|
||||
|
||||
p.outputInvChan <- invVect
|
||||
}
|
||||
|
||||
// Start begins processing input and output messages. It also sends the initial
|
||||
// version message for outbound connections to start the negotiation process.
|
||||
func (p *peer) Start() error {
|
||||
|
@ -962,8 +1071,11 @@ func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer {
|
|||
inbound: inbound,
|
||||
persistent: persistent,
|
||||
knownAddresses: make(map[string]bool),
|
||||
knownInventory: NewMruInventoryMap(maxKnownInventory),
|
||||
requestQueue: list.New(),
|
||||
invSendQueue: list.New(),
|
||||
outputQueue: make(chan btcwire.Message, outputBufferSize),
|
||||
outputInvChan: make(chan *btcwire.InvVect, outputBufferSize),
|
||||
blockProcessed: make(chan bool, 1),
|
||||
quit: make(chan bool),
|
||||
}
|
||||
|
|
31
server.go
31
server.go
|
@ -53,6 +53,7 @@ type server struct {
|
|||
newPeers chan *peer
|
||||
donePeers chan *peer
|
||||
banPeers chan *peer
|
||||
relayInv chan *btcwire.InvVect
|
||||
broadcast chan broadcastMsg
|
||||
wg sync.WaitGroup
|
||||
quit chan bool
|
||||
|
@ -143,6 +144,24 @@ func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) {
|
|||
|
||||
}
|
||||
|
||||
// handleRelayInvMsg deals with relaying inventory to peer that are not already
|
||||
// known to have it. It is invoked from the peerHandler goroutine.
|
||||
func (s *server) handleRelayInvMsg(peers *list.List, iv *btcwire.InvVect) {
|
||||
// TODO(davec): Don't relay inventory during the initial block chain
|
||||
// download.
|
||||
|
||||
// Loop through all connected peers and relay the inventory to those
|
||||
// which are not already known to have it.
|
||||
for e := peers.Front(); e != nil; e = e.Next() {
|
||||
p := e.Value.(*peer)
|
||||
|
||||
// Queue the inventory to be relayed with the next batch. It
|
||||
// will be ignored if the peer is already known to have the
|
||||
// inventory.
|
||||
p.QueueInventory(iv)
|
||||
}
|
||||
}
|
||||
|
||||
// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
|
||||
// from the peerHandler goroutine.
|
||||
func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) {
|
||||
|
@ -158,7 +177,6 @@ func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) {
|
|||
p.QueueMessage(bmsg.message)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// listenHandler is the main listener which accepts incoming connections for the
|
||||
|
@ -211,6 +229,10 @@ func (s *server) peerHandler() {
|
|||
case p := <-s.banPeers:
|
||||
s.handleBanPeerMsg(bannedPeers, p)
|
||||
|
||||
// New inventory to potentially be relayed to other peers.
|
||||
case invMsg := <-s.relayInv:
|
||||
s.handleRelayInvMsg(peers, invMsg)
|
||||
|
||||
// Message to broadcast to all connected peers except those
|
||||
// which are excluded by the message.
|
||||
case bmsg := <-s.broadcast:
|
||||
|
@ -242,6 +264,12 @@ func (s *server) BanPeer(p *peer) {
|
|||
s.banPeers <- p
|
||||
}
|
||||
|
||||
// RelayInventory relays the passed inventory to all connected peers that are
|
||||
// not already known to have it.
|
||||
func (s *server) RelayInventory(invVect *btcwire.InvVect) {
|
||||
s.relayInv <- invVect
|
||||
}
|
||||
|
||||
// BroadcastMessage sends msg to all peers currently connected to the server
|
||||
// except those in the passed peers to exclude.
|
||||
func (s *server) BroadcastMessage(msg btcwire.Message, exclPeers ...*peer) {
|
||||
|
@ -437,6 +465,7 @@ func newServer(addr string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*server, er
|
|||
newPeers: make(chan *peer, cfg.MaxPeers),
|
||||
donePeers: make(chan *peer, cfg.MaxPeers),
|
||||
banPeers: make(chan *peer, cfg.MaxPeers),
|
||||
relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers),
|
||||
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
|
||||
quit: make(chan bool),
|
||||
db: db,
|
||||
|
|
Loading…
Reference in a new issue