Relay inv msgs now include underlying data object

* When an inv is to be sent to the server for relaying, the sender
already has access to the underlying data. So
instead of requiring the relay to look up the data by
hash, the data is now coupled in the request message.
This commit is contained in:
Olaoluwa Osuntokun 2015-01-28 21:22:27 -08:00
parent 3b1a15d0d5
commit c01d175fde
4 changed files with 30 additions and 23 deletions

View file

@ -1131,7 +1131,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
// Generate the inventory vector and relay it.
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, hash)
b.server.RelayInventory(iv)
b.server.RelayInventory(iv, nil)
// A block has been connected to the main block chain.
case btcchain.NTBlockConnected:

View file

@ -1123,7 +1123,7 @@ func (mp *txMemPool) processOrphans(hash *btcwire.ShaHash) error {
// Generate and relay the inventory vector for the
// newly accepted transaction.
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
mp.server.RelayInventory(iv)
mp.server.RelayInventory(iv, tx)
} else {
// Transaction is still an orphan.
// TODO(jrick): This removeOrphan call is
@ -1175,7 +1175,7 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b
if len(missingParents) == 0 {
// Generate the inventory vector and relay it.
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
mp.server.RelayInventory(iv)
mp.server.RelayInventory(iv, tx)
// Accept any orphan transactions that depend on this
// transaction (they may no longer be orphans if all inputs

View file

@ -2999,7 +2999,7 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan st
// We keep track of all the sendrawtransaction request txs so that we
// can rebroadcast them if they don't make their way into a block.
iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha())
s.server.AddRebroadcastInventory(iv)
s.server.AddRebroadcastInventory(iv, tx)
return tx.Sha().String(), nil
}

View file

@ -24,6 +24,7 @@ import (
"github.com/btcsuite/btcd/database"
"github.com/btcsuite/btcjson"
"github.com/btcsuite/btcnet"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwire"
)
@ -56,12 +57,19 @@ type broadcastMsg struct {
// broadcastInventoryAdd is a type used to declare that the InvVect it contains
// needs to be added to the rebroadcast map
type broadcastInventoryAdd *btcwire.InvVect
type broadcastInventoryAdd relayMsg
// broadcastInventoryDel is a type used to declare that the InvVect it contains
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *btcwire.InvVect
// relayMsg packages an inventory vector along with the newly discovered
// inventory so the relay has access to that information.
type relayMsg struct {
invVect *btcwire.InvVect
data interface{}
}
// server provides a bitcoin server for handling communications to and from
// bitcoin peers.
type server struct {
@ -85,7 +93,7 @@ type server struct {
banPeers chan *peer
wakeup chan struct{}
query chan interface{}
relayInv chan *btcwire.InvVect
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan struct{}
@ -123,13 +131,13 @@ func randomUint16Number(max uint16) uint16 {
// AddRebroadcastInventory adds 'iv' to the list of inventories to be
// rebroadcasted at random intervals until they show up in a block.
func (s *server) AddRebroadcastInventory(iv *btcwire.InvVect) {
func (s *server) AddRebroadcastInventory(iv *btcwire.InvVect, data interface{}) {
// Ignore if shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
return
}
s.modifyRebroadcastInv <- broadcastInventoryAdd(iv)
s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data}
}
// RemoveRebroadcastInventory removes 'iv' from the list of items to be
@ -287,13 +295,13 @@ func (s *server) handleBanPeerMsg(state *peerState, p *peer) {
// handleRelayInvMsg deals with relaying inventory to peers that are not already
// known to have it. It is invoked from the peerHandler goroutine.
func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) {
func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
state.forAllPeers(func(p *peer) {
if !p.Connected() {
return
}
if iv.Type == btcwire.InvTypeTx {
if msg.invVect.Type == btcwire.InvTypeTx {
// Don't relay the transaction to the peer when it has
// transaction relaying disabled.
if p.RelayTxDisabled() {
@ -303,11 +311,10 @@ func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) {
// Don't relay the transaction if there is a bloom
// filter loaded and the transaction doesn't match it.
if p.filter.IsLoaded() {
tx, err := s.txMemPool.FetchTransaction(&iv.Hash)
if err != nil {
peerLog.Warnf("Attempt to relay tx %s "+
"that is not in the memory pool",
iv.Hash)
tx, ok := msg.data.(*btcutil.Tx)
if !ok {
peerLog.Warnf("Underlying data for tx" +
" inv relay is not a transaction")
return
}
@ -320,7 +327,7 @@ func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) {
// Queue the inventory to be relayed with the next batch.
// It will be ignored if the peer is already known to
// have the inventory.
p.QueueInventory(iv)
p.QueueInventory(msg.invVect)
})
}
@ -717,8 +724,8 @@ func (s *server) BanPeer(p *peer) {
// RelayInventory relays the passed inventory to all connected peers that are
// not already known to have it.
func (s *server) RelayInventory(invVect *btcwire.InvVect) {
s.relayInv <- invVect
func (s *server) RelayInventory(invVect *btcwire.InvVect, data interface{}) {
s.relayInv <- relayMsg{invVect: invVect, data: data}
}
// BroadcastMessage sends msg to all peers currently connected to the server
@ -811,7 +818,7 @@ func (s *server) NetTotals() (uint64, uint64) {
func (s *server) rebroadcastHandler() {
// Wait 5 min before first tx rebroadcast.
timer := time.NewTimer(5 * time.Minute)
pendingInvs := make(map[btcwire.InvVect]struct{})
pendingInvs := make(map[btcwire.InvVect]interface{})
out:
for {
@ -820,7 +827,7 @@ out:
switch msg := riv.(type) {
// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg] = struct{}{}
pendingInvs[*msg.invVect] = msg.data
// When an InvVect has been added to a block, we can
// now remove it, if it was present.
@ -833,9 +840,9 @@ out:
case <-timer.C:
// Any inventory we have has not made it into a block
// yet. We periodically resubmit them until they have.
for iv := range pendingInvs {
for iv, data := range pendingInvs {
ivCopy := iv
s.RelayInventory(&ivCopy)
s.RelayInventory(&ivCopy, data)
}
// Process at a random time up to 30mins (in seconds)
@ -1225,7 +1232,7 @@ func newServer(listenAddrs []string, db database.Db, netParams *btcnet.Params) (
banPeers: make(chan *peer, cfg.MaxPeers),
wakeup: make(chan struct{}),
query: make(chan interface{}),
relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers),
relayInv: make(chan relayMsg, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),