diff --git a/peer.go b/peer.go index 26e15e02..5f567fe1 100644 --- a/peer.go +++ b/peer.go @@ -18,6 +18,7 @@ import ( "github.com/conformal/btcd/addrmgr" "github.com/conformal/btcdb" "github.com/conformal/btcutil" + "github.com/conformal/btcutil/bloom" "github.com/conformal/btcwire" socks "github.com/conformal/go-socks" "github.com/davecgh/go-spew/spew" @@ -158,6 +159,9 @@ type peer struct { prevGetHdrsBegin *btcwire.ShaHash // owned by blockmanager prevGetHdrsStop *btcwire.ShaHash // owned by blockmanager requestQueue *list.List + filter *bloom.Filter + relayMtx sync.Mutex + disableRelayTx bool continueHash *btcwire.ShaHash outputQueue chan outMsg sendQueue chan outMsg @@ -228,6 +232,15 @@ func (p *peer) ProtocolVersion() uint32 { return p.protocolVersion } +// RelayTxDisabled returns whether or not relaying of transactions is disabled. +// It is safe for concurrent access. +func (p *peer) RelayTxDisabled() bool { + p.relayMtx.Lock() + defer p.relayMtx.Unlock() + + return p.disableRelayTx +} + // pushVersionMsg sends a version message to the connected peer using the // current state. func (p *peer) pushVersionMsg() error { @@ -364,6 +377,12 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { p.StatsMtx.Unlock() + // Choose whether or not to relay transactions before a filter command + // is received. + p.relayMtx.Lock() + p.disableRelayTx = msg.DisableRelayTx + p.relayMtx.Unlock() + // Inbound connections. if p.inbound { // Set up a NetAddress for the peer to be used with AddrManager. @@ -480,6 +499,82 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) return nil } +// pushMerkleBlockMsg sends a merkleblock message for the provided block hash to +// the connected peer. Since a merkle block requires the peer to have a filter +// loaded, this call will simply be ignored if there is no filter laoded. An +// error is returned if the block hash is not known. +func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { + // Do not send a response if the peer doesn't have a filter loaded. + if !p.filter.IsLoaded() { + if doneChan != nil { + doneChan <- false + } + return nil + } + + blk, err := p.server.db.FetchBlockBySha(sha) + if err != nil { + peerLog.Tracef("Unable to fetch requested block sha %v: %v", + sha, err) + + if doneChan != nil { + doneChan <- false + } + return err + } + + // Generate a merkle block by filtering the requested block according + // to the filter for the peer and fetch any matched transactions from + // the database. + merkle, matchedHashes := bloom.NewMerkleBlock(blk, p.filter) + txList := p.server.db.FetchTxByShaList(matchedHashes) + + // Warn on any missing transactions which should not happen since the + // matched transactions come from an existing block. Also, find the + // final valid transaction index for later. + finalValidTxIndex := -1 + for i, txR := range txList { + if txR.Err != nil || txR.Tx == nil { + warnMsg := fmt.Sprintf("Failed to fetch transaction "+ + "%v which was matched by merkle block %v", + txR.Sha, sha) + if txR.Err != nil { + warnMsg += ": " + err.Error() + } + peerLog.Warnf(warnMsg) + continue + } + finalValidTxIndex = i + } + + // Once we have fetched data wait for any previous operation to finish. + if waitChan != nil { + <-waitChan + } + + // Send the merkleblock. Only send the done channel with this message + // if no transactions will be sent afterwards. + var dc chan bool + if finalValidTxIndex > -1 { + dc = doneChan + } + p.QueueMessage(merkle, dc) + + // Finally, send any matched transactions. + for i, txR := range txList { + // Only send the done channel on the final transaction. + var dc chan bool + if i == finalValidTxIndex { + dc = doneChan + } + if txR.Err == nil && txR.Tx != nil { + p.QueueMessage(txR.Tx, dc) + } + } + + return nil +} + // PushGetBlocksMsg sends a getblocks message for the provided block locator // and stop hash. It will ignore back-to-back duplicate requests. func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error { @@ -558,26 +653,34 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator, stopHash *btcwir // handleMemPoolMsg is invoked when a peer receives a mempool bitcoin message. // It creates and sends an inventory message with the contents of the memory -// pool up to the maximum inventory allowed per message. +// pool up to the maximum inventory allowed per message. When the peer has a +// bloom filter loaded, the contents are filtered accordingly. func (p *peer) handleMemPoolMsg(msg *btcwire.MsgMemPool) { // Generate inventory message with the available transactions in the // transaction memory pool. Limit it to the max allowed inventory // per message. The the NewMsgInvSizeHint function automatically limits // the passed hint to the maximum allowed, so it's safe to pass it // without double checking it here. - hashes := p.server.txMemPool.TxShas() - invMsg := btcwire.NewMsgInvSizeHint(uint(len(hashes))) - for i, hash := range hashes { + txDescs := p.server.txMemPool.TxDescs() + invMsg := btcwire.NewMsgInvSizeHint(uint(len(txDescs))) + + for i, txDesc := range txDescs { // Another thread might have removed the transaction from the // pool since the initial query. + hash := txDesc.Tx.Sha() if !p.server.txMemPool.IsTransactionInPool(hash) { continue } - iv := btcwire.NewInvVect(btcwire.InvTypeTx, hash) - invMsg.AddInvVect(iv) - if i+1 >= btcwire.MaxInvPerMsg { - break + // Either add all transactions when there is no bloom filter, + // or only the transactions that match the filter when there is + // one. + if !p.filter.IsLoaded() || p.filter.MatchTxAndUpdate(txDesc.Tx) { + iv := btcwire.NewInvVect(btcwire.InvTypeTx, hash) + invMsg.AddInvVect(iv) + if i+1 >= btcwire.MaxInvPerMsg { + break + } } } @@ -681,8 +784,8 @@ func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { err = p.pushTxMsg(&iv.Hash, c, waitChan) case btcwire.InvTypeBlock: err = p.pushBlockMsg(&iv.Hash, c, waitChan) - case btcwire.InvTypeFilteredBlock: // unhandled - continue + case btcwire.InvTypeFilteredBlock: + err = p.pushMerkleBlockMsg(&iv.Hash, c, waitChan) default: peerLog.Warnf("Unknown type in inventory request %d", iv.Type) @@ -717,7 +820,7 @@ func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { } } -// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message. +// handleGetBlocksMsg is invoked when a peer receives a getblocks bitcoin message. func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) { // Return all block hashes to the latest one (up to max per message) if // no stop hash was specified. @@ -890,6 +993,48 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { p.QueueMessage(headersMsg, nil) } +// handleFilterAddMsg is invoked when a peer receives a filteradd bitcoin +// message and is used by remote peers to add data to an already loaded bloom +// filter. The peer will be disonnected if a filter is not loaded when this +// message is received. +func (p *peer) handleFilterAddMsg(msg *btcwire.MsgFilterAdd) { + if !p.filter.IsLoaded() { + peerLog.Debugf("%s sent a filteradd request with no filter "+ + "loaded -- disconnecting", p) + p.Disconnect() + return + } + + p.filter.Add(msg.Data) +} + +// handleFilterClearMsg is invoked when a peer receives a filterclear bitcoin +// message and is used by remote peers to clear an already loaded bloom filter. +// The peer will be disonnected if a filter is not loaded when this message is +// received. +func (p *peer) handleFilterClearMsg(msg *btcwire.MsgFilterClear) { + if !p.filter.IsLoaded() { + peerLog.Debugf("%s sent a filterclear request with no "+ + "filter loaded -- disconnecting", p) + p.Disconnect() + return + } + p.filter.Unload() +} + +// handleFilterLoadMsg is invoked when a peer receives a filterload bitcoin +// message and it used to load a bloom filter that should be used for delivering +// merkle blocks and associated transactions that match the filter. +func (p *peer) handleFilterLoadMsg(msg *btcwire.MsgFilterLoad) { + // Transaction relay is no longer disabled once a filterload message is + // received regardless of its original state. + p.relayMtx.Lock() + p.disableRelayTx = false + p.relayMtx.Unlock() + + p.filter = bloom.LoadFilter(msg) +} + // handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message // and is used to provide the peer with known addresses from the address // manager. @@ -1261,6 +1406,15 @@ out: case *btcwire.MsgGetHeaders: p.handleGetHeadersMsg(msg) + case *btcwire.MsgFilterAdd: + p.handleFilterAddMsg(msg) + + case *btcwire.MsgFilterClear: + p.handleFilterClearMsg(msg) + + case *btcwire.MsgFilterLoad: + p.handleFilterLoadMsg(msg) + default: peerLog.Debugf("Received unhandled message of type %v: Fix Me", rmsg.Command()) @@ -1636,6 +1790,7 @@ func newPeerBase(s *server, inbound bool) *peer { requestedTxns: make(map[btcwire.ShaHash]struct{}), requestedBlocks: make(map[btcwire.ShaHash]struct{}), requestQueue: list.New(), + filter: bloom.LoadFilter(nil), outputQueue: make(chan outMsg, outputBufferSize), sendQueue: make(chan outMsg, 1), // nonblocking sync sendDoneQueue: make(chan struct{}, 1), // nonblocking sync diff --git a/server.go b/server.go index d638cee0..55b0cd3f 100644 --- a/server.go +++ b/server.go @@ -292,9 +292,33 @@ func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { return } - // Queue the inventory to be relayed with the next batch. It - // will be ignored if the peer is already known to have the - // inventory. + if iv.Type == btcwire.InvTypeTx { + // Don't relay the transaction to the peer when it has + // transaction relaying disabled. + if p.RelayTxDisabled() { + return + } + + // Don't relay the transaction if there is a bloom + // filter loaded and the transaction doesn't match it. + if p.filter.IsLoaded() { + tx, err := s.txMemPool.FetchTransaction(&iv.Hash) + if err != nil { + peerLog.Warn("Attempt to relay tx %s "+ + "that is not in the memory pool", + iv.Hash) + return + } + + if !p.filter.MatchTxAndUpdate(tx) { + return + } + } + } + + // Queue the inventory to be relayed with the next batch. + // It will be ignored if the peer is already known to + // have the inventory. p.QueueInventory(iv) }) }