diff --git a/blockmanager.go b/blockmanager.go index 311e4190..7e9173c0 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1131,7 +1131,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { // Generate the inventory vector and relay it. iv := btcwire.NewInvVect(btcwire.InvTypeBlock, hash) - b.server.RelayInventory(iv) + b.server.RelayInventory(iv, nil) // A block has been connected to the main block chain. case btcchain.NTBlockConnected: diff --git a/mempool.go b/mempool.go index 2929b2fe..c0575f65 100644 --- a/mempool.go +++ b/mempool.go @@ -1123,7 +1123,7 @@ func (mp *txMemPool) processOrphans(hash *btcwire.ShaHash) error { // Generate and relay the inventory vector for the // newly accepted transaction. iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha()) - mp.server.RelayInventory(iv) + mp.server.RelayInventory(iv, tx) } else { // Transaction is still an orphan. // TODO(jrick): This removeOrphan call is @@ -1175,7 +1175,7 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b if len(missingParents) == 0 { // Generate the inventory vector and relay it. iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha()) - mp.server.RelayInventory(iv) + mp.server.RelayInventory(iv, tx) // Accept any orphan transactions that depend on this // transaction (they may no longer be orphans if all inputs diff --git a/rpcserver.go b/rpcserver.go index 2c2bc4cf..668b29ce 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2999,7 +2999,7 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan st // We keep track of all the sendrawtransaction request txs so that we // can rebroadcast them if they don't make their way into a block. iv := btcwire.NewInvVect(btcwire.InvTypeTx, tx.Sha()) - s.server.AddRebroadcastInventory(iv) + s.server.AddRebroadcastInventory(iv, tx) return tx.Sha().String(), nil } diff --git a/server.go b/server.go index 96a12b62..61a1d972 100644 --- a/server.go +++ b/server.go @@ -24,6 +24,7 @@ import ( "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcjson" "github.com/btcsuite/btcnet" + "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwire" ) @@ -56,12 +57,19 @@ type broadcastMsg struct { // broadcastInventoryAdd is a type used to declare that the InvVect it contains // needs to be added to the rebroadcast map -type broadcastInventoryAdd *btcwire.InvVect +type broadcastInventoryAdd relayMsg // broadcastInventoryDel is a type used to declare that the InvVect it contains // needs to be removed from the rebroadcast map type broadcastInventoryDel *btcwire.InvVect +// relayMsg packages an inventory vector along with the newly discovered +// inventory so the relay has access to that information. +type relayMsg struct { + invVect *btcwire.InvVect + data interface{} +} + // server provides a bitcoin server for handling communications to and from // bitcoin peers. type server struct { @@ -85,7 +93,7 @@ type server struct { banPeers chan *peer wakeup chan struct{} query chan interface{} - relayInv chan *btcwire.InvVect + relayInv chan relayMsg broadcast chan broadcastMsg wg sync.WaitGroup quit chan struct{} @@ -123,13 +131,13 @@ func randomUint16Number(max uint16) uint16 { // AddRebroadcastInventory adds 'iv' to the list of inventories to be // rebroadcasted at random intervals until they show up in a block. -func (s *server) AddRebroadcastInventory(iv *btcwire.InvVect) { +func (s *server) AddRebroadcastInventory(iv *btcwire.InvVect, data interface{}) { // Ignore if shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { return } - s.modifyRebroadcastInv <- broadcastInventoryAdd(iv) + s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data} } // RemoveRebroadcastInventory removes 'iv' from the list of items to be @@ -287,13 +295,13 @@ func (s *server) handleBanPeerMsg(state *peerState, p *peer) { // handleRelayInvMsg deals with relaying inventory to peers that are not already // known to have it. It is invoked from the peerHandler goroutine. -func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { +func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { state.forAllPeers(func(p *peer) { if !p.Connected() { return } - if iv.Type == btcwire.InvTypeTx { + if msg.invVect.Type == btcwire.InvTypeTx { // Don't relay the transaction to the peer when it has // transaction relaying disabled. if p.RelayTxDisabled() { @@ -303,11 +311,10 @@ func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { // Don't relay the transaction if there is a bloom // filter loaded and the transaction doesn't match it. if p.filter.IsLoaded() { - tx, err := s.txMemPool.FetchTransaction(&iv.Hash) - if err != nil { - peerLog.Warnf("Attempt to relay tx %s "+ - "that is not in the memory pool", - iv.Hash) + tx, ok := msg.data.(*btcutil.Tx) + if !ok { + peerLog.Warnf("Underlying data for tx" + + " inv relay is not a transaction") return } @@ -320,7 +327,7 @@ func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { // Queue the inventory to be relayed with the next batch. // It will be ignored if the peer is already known to // have the inventory. - p.QueueInventory(iv) + p.QueueInventory(msg.invVect) }) } @@ -717,8 +724,8 @@ func (s *server) BanPeer(p *peer) { // RelayInventory relays the passed inventory to all connected peers that are // not already known to have it. -func (s *server) RelayInventory(invVect *btcwire.InvVect) { - s.relayInv <- invVect +func (s *server) RelayInventory(invVect *btcwire.InvVect, data interface{}) { + s.relayInv <- relayMsg{invVect: invVect, data: data} } // BroadcastMessage sends msg to all peers currently connected to the server @@ -811,7 +818,7 @@ func (s *server) NetTotals() (uint64, uint64) { func (s *server) rebroadcastHandler() { // Wait 5 min before first tx rebroadcast. timer := time.NewTimer(5 * time.Minute) - pendingInvs := make(map[btcwire.InvVect]struct{}) + pendingInvs := make(map[btcwire.InvVect]interface{}) out: for { @@ -820,7 +827,7 @@ out: switch msg := riv.(type) { // Incoming InvVects are added to our map of RPC txs. case broadcastInventoryAdd: - pendingInvs[*msg] = struct{}{} + pendingInvs[*msg.invVect] = msg.data // When an InvVect has been added to a block, we can // now remove it, if it was present. @@ -833,9 +840,9 @@ out: case <-timer.C: // Any inventory we have has not made it into a block // yet. We periodically resubmit them until they have. - for iv := range pendingInvs { + for iv, data := range pendingInvs { ivCopy := iv - s.RelayInventory(&ivCopy) + s.RelayInventory(&ivCopy, data) } // Process at a random time up to 30mins (in seconds) @@ -1225,7 +1232,7 @@ func newServer(listenAddrs []string, db database.Db, netParams *btcnet.Params) ( banPeers: make(chan *peer, cfg.MaxPeers), wakeup: make(chan struct{}), query: make(chan interface{}), - relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers), + relayInv: make(chan relayMsg, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers), quit: make(chan struct{}), modifyRebroadcastInv: make(chan interface{}),