Add full support for BIP0037 bloom filters.
This commit implements full support for filtering based on the filterload, filteradd, filterclear, and merkleblock messages introduced by BIP0037. This allows btcd to work seamlessly with SPV wallets such as BitcoinJ. Original code by @dajohi. Cleanup, bug fixes, and polish by @davecgh.
This commit is contained in:
parent
307c52f25b
commit
e68d46f556
2 changed files with 193 additions and 14 deletions
169
peer.go
169
peer.go
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/conformal/btcd/addrmgr"
|
||||
"github.com/conformal/btcdb"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcutil/bloom"
|
||||
"github.com/conformal/btcwire"
|
||||
socks "github.com/conformal/go-socks"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
|
@ -158,6 +159,9 @@ type peer struct {
|
|||
prevGetHdrsBegin *btcwire.ShaHash // owned by blockmanager
|
||||
prevGetHdrsStop *btcwire.ShaHash // owned by blockmanager
|
||||
requestQueue *list.List
|
||||
filter *bloom.Filter
|
||||
relayMtx sync.Mutex
|
||||
disableRelayTx bool
|
||||
continueHash *btcwire.ShaHash
|
||||
outputQueue chan outMsg
|
||||
sendQueue chan outMsg
|
||||
|
@ -228,6 +232,15 @@ func (p *peer) ProtocolVersion() uint32 {
|
|||
return p.protocolVersion
|
||||
}
|
||||
|
||||
// RelayTxDisabled returns whether or not relaying of transactions is disabled.
|
||||
// It is safe for concurrent access.
|
||||
func (p *peer) RelayTxDisabled() bool {
|
||||
p.relayMtx.Lock()
|
||||
defer p.relayMtx.Unlock()
|
||||
|
||||
return p.disableRelayTx
|
||||
}
|
||||
|
||||
// pushVersionMsg sends a version message to the connected peer using the
|
||||
// current state.
|
||||
func (p *peer) pushVersionMsg() error {
|
||||
|
@ -364,6 +377,12 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
|
|||
|
||||
p.StatsMtx.Unlock()
|
||||
|
||||
// Choose whether or not to relay transactions before a filter command
|
||||
// is received.
|
||||
p.relayMtx.Lock()
|
||||
p.disableRelayTx = msg.DisableRelayTx
|
||||
p.relayMtx.Unlock()
|
||||
|
||||
// Inbound connections.
|
||||
if p.inbound {
|
||||
// Set up a NetAddress for the peer to be used with AddrManager.
|
||||
|
@ -480,6 +499,82 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool)
|
|||
return nil
|
||||
}
|
||||
|
||||
// pushMerkleBlockMsg sends a merkleblock message for the provided block hash to
|
||||
// the connected peer. Since a merkle block requires the peer to have a filter
|
||||
// loaded, this call will simply be ignored if there is no filter laoded. An
|
||||
// error is returned if the block hash is not known.
|
||||
func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error {
|
||||
// Do not send a response if the peer doesn't have a filter loaded.
|
||||
if !p.filter.IsLoaded() {
|
||||
if doneChan != nil {
|
||||
doneChan <- false
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
blk, err := p.server.db.FetchBlockBySha(sha)
|
||||
if err != nil {
|
||||
peerLog.Tracef("Unable to fetch requested block sha %v: %v",
|
||||
sha, err)
|
||||
|
||||
if doneChan != nil {
|
||||
doneChan <- false
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Generate a merkle block by filtering the requested block according
|
||||
// to the filter for the peer and fetch any matched transactions from
|
||||
// the database.
|
||||
merkle, matchedHashes := bloom.NewMerkleBlock(blk, p.filter)
|
||||
txList := p.server.db.FetchTxByShaList(matchedHashes)
|
||||
|
||||
// Warn on any missing transactions which should not happen since the
|
||||
// matched transactions come from an existing block. Also, find the
|
||||
// final valid transaction index for later.
|
||||
finalValidTxIndex := -1
|
||||
for i, txR := range txList {
|
||||
if txR.Err != nil || txR.Tx == nil {
|
||||
warnMsg := fmt.Sprintf("Failed to fetch transaction "+
|
||||
"%v which was matched by merkle block %v",
|
||||
txR.Sha, sha)
|
||||
if txR.Err != nil {
|
||||
warnMsg += ": " + err.Error()
|
||||
}
|
||||
peerLog.Warnf(warnMsg)
|
||||
continue
|
||||
}
|
||||
finalValidTxIndex = i
|
||||
}
|
||||
|
||||
// Once we have fetched data wait for any previous operation to finish.
|
||||
if waitChan != nil {
|
||||
<-waitChan
|
||||
}
|
||||
|
||||
// Send the merkleblock. Only send the done channel with this message
|
||||
// if no transactions will be sent afterwards.
|
||||
var dc chan bool
|
||||
if finalValidTxIndex > -1 {
|
||||
dc = doneChan
|
||||
}
|
||||
p.QueueMessage(merkle, dc)
|
||||
|
||||
// Finally, send any matched transactions.
|
||||
for i, txR := range txList {
|
||||
// Only send the done channel on the final transaction.
|
||||
var dc chan bool
|
||||
if i == finalValidTxIndex {
|
||||
dc = doneChan
|
||||
}
|
||||
if txR.Err == nil && txR.Tx != nil {
|
||||
p.QueueMessage(txR.Tx, dc)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushGetBlocksMsg sends a getblocks message for the provided block locator
|
||||
// and stop hash. It will ignore back-to-back duplicate requests.
|
||||
func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
|
||||
|
@ -558,28 +653,36 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator, stopHash *btcwir
|
|||
|
||||
// handleMemPoolMsg is invoked when a peer receives a mempool bitcoin message.
|
||||
// It creates and sends an inventory message with the contents of the memory
|
||||
// pool up to the maximum inventory allowed per message.
|
||||
// pool up to the maximum inventory allowed per message. When the peer has a
|
||||
// bloom filter loaded, the contents are filtered accordingly.
|
||||
func (p *peer) handleMemPoolMsg(msg *btcwire.MsgMemPool) {
|
||||
// Generate inventory message with the available transactions in the
|
||||
// transaction memory pool. Limit it to the max allowed inventory
|
||||
// per message. The the NewMsgInvSizeHint function automatically limits
|
||||
// the passed hint to the maximum allowed, so it's safe to pass it
|
||||
// without double checking it here.
|
||||
hashes := p.server.txMemPool.TxShas()
|
||||
invMsg := btcwire.NewMsgInvSizeHint(uint(len(hashes)))
|
||||
for i, hash := range hashes {
|
||||
txDescs := p.server.txMemPool.TxDescs()
|
||||
invMsg := btcwire.NewMsgInvSizeHint(uint(len(txDescs)))
|
||||
|
||||
for i, txDesc := range txDescs {
|
||||
// Another thread might have removed the transaction from the
|
||||
// pool since the initial query.
|
||||
hash := txDesc.Tx.Sha()
|
||||
if !p.server.txMemPool.IsTransactionInPool(hash) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Either add all transactions when there is no bloom filter,
|
||||
// or only the transactions that match the filter when there is
|
||||
// one.
|
||||
if !p.filter.IsLoaded() || p.filter.MatchTxAndUpdate(txDesc.Tx) {
|
||||
iv := btcwire.NewInvVect(btcwire.InvTypeTx, hash)
|
||||
invMsg.AddInvVect(iv)
|
||||
if i+1 >= btcwire.MaxInvPerMsg {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send the inventory message if there is anything to send.
|
||||
if len(invMsg.InvList) > 0 {
|
||||
|
@ -681,8 +784,8 @@ func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
|
|||
err = p.pushTxMsg(&iv.Hash, c, waitChan)
|
||||
case btcwire.InvTypeBlock:
|
||||
err = p.pushBlockMsg(&iv.Hash, c, waitChan)
|
||||
case btcwire.InvTypeFilteredBlock: // unhandled
|
||||
continue
|
||||
case btcwire.InvTypeFilteredBlock:
|
||||
err = p.pushMerkleBlockMsg(&iv.Hash, c, waitChan)
|
||||
default:
|
||||
peerLog.Warnf("Unknown type in inventory request %d",
|
||||
iv.Type)
|
||||
|
@ -717,7 +820,7 @@ func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
|
|||
}
|
||||
}
|
||||
|
||||
// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message.
|
||||
// handleGetBlocksMsg is invoked when a peer receives a getblocks bitcoin message.
|
||||
func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
|
||||
// Return all block hashes to the latest one (up to max per message) if
|
||||
// no stop hash was specified.
|
||||
|
@ -890,6 +993,48 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
|
|||
p.QueueMessage(headersMsg, nil)
|
||||
}
|
||||
|
||||
// handleFilterAddMsg is invoked when a peer receives a filteradd bitcoin
|
||||
// message and is used by remote peers to add data to an already loaded bloom
|
||||
// filter. The peer will be disonnected if a filter is not loaded when this
|
||||
// message is received.
|
||||
func (p *peer) handleFilterAddMsg(msg *btcwire.MsgFilterAdd) {
|
||||
if !p.filter.IsLoaded() {
|
||||
peerLog.Debugf("%s sent a filteradd request with no filter "+
|
||||
"loaded -- disconnecting", p)
|
||||
p.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
p.filter.Add(msg.Data)
|
||||
}
|
||||
|
||||
// handleFilterClearMsg is invoked when a peer receives a filterclear bitcoin
|
||||
// message and is used by remote peers to clear an already loaded bloom filter.
|
||||
// The peer will be disonnected if a filter is not loaded when this message is
|
||||
// received.
|
||||
func (p *peer) handleFilterClearMsg(msg *btcwire.MsgFilterClear) {
|
||||
if !p.filter.IsLoaded() {
|
||||
peerLog.Debugf("%s sent a filterclear request with no "+
|
||||
"filter loaded -- disconnecting", p)
|
||||
p.Disconnect()
|
||||
return
|
||||
}
|
||||
p.filter.Unload()
|
||||
}
|
||||
|
||||
// handleFilterLoadMsg is invoked when a peer receives a filterload bitcoin
|
||||
// message and it used to load a bloom filter that should be used for delivering
|
||||
// merkle blocks and associated transactions that match the filter.
|
||||
func (p *peer) handleFilterLoadMsg(msg *btcwire.MsgFilterLoad) {
|
||||
// Transaction relay is no longer disabled once a filterload message is
|
||||
// received regardless of its original state.
|
||||
p.relayMtx.Lock()
|
||||
p.disableRelayTx = false
|
||||
p.relayMtx.Unlock()
|
||||
|
||||
p.filter = bloom.LoadFilter(msg)
|
||||
}
|
||||
|
||||
// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message
|
||||
// and is used to provide the peer with known addresses from the address
|
||||
// manager.
|
||||
|
@ -1261,6 +1406,15 @@ out:
|
|||
case *btcwire.MsgGetHeaders:
|
||||
p.handleGetHeadersMsg(msg)
|
||||
|
||||
case *btcwire.MsgFilterAdd:
|
||||
p.handleFilterAddMsg(msg)
|
||||
|
||||
case *btcwire.MsgFilterClear:
|
||||
p.handleFilterClearMsg(msg)
|
||||
|
||||
case *btcwire.MsgFilterLoad:
|
||||
p.handleFilterLoadMsg(msg)
|
||||
|
||||
default:
|
||||
peerLog.Debugf("Received unhandled message of type %v: Fix Me",
|
||||
rmsg.Command())
|
||||
|
@ -1636,6 +1790,7 @@ func newPeerBase(s *server, inbound bool) *peer {
|
|||
requestedTxns: make(map[btcwire.ShaHash]struct{}),
|
||||
requestedBlocks: make(map[btcwire.ShaHash]struct{}),
|
||||
requestQueue: list.New(),
|
||||
filter: bloom.LoadFilter(nil),
|
||||
outputQueue: make(chan outMsg, outputBufferSize),
|
||||
sendQueue: make(chan outMsg, 1), // nonblocking sync
|
||||
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
|
||||
|
|
30
server.go
30
server.go
|
@ -292,9 +292,33 @@ func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) {
|
|||
return
|
||||
}
|
||||
|
||||
// Queue the inventory to be relayed with the next batch. It
|
||||
// will be ignored if the peer is already known to have the
|
||||
// inventory.
|
||||
if iv.Type == btcwire.InvTypeTx {
|
||||
// Don't relay the transaction to the peer when it has
|
||||
// transaction relaying disabled.
|
||||
if p.RelayTxDisabled() {
|
||||
return
|
||||
}
|
||||
|
||||
// Don't relay the transaction if there is a bloom
|
||||
// filter loaded and the transaction doesn't match it.
|
||||
if p.filter.IsLoaded() {
|
||||
tx, err := s.txMemPool.FetchTransaction(&iv.Hash)
|
||||
if err != nil {
|
||||
peerLog.Warn("Attempt to relay tx %s "+
|
||||
"that is not in the memory pool",
|
||||
iv.Hash)
|
||||
return
|
||||
}
|
||||
|
||||
if !p.filter.MatchTxAndUpdate(tx) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Queue the inventory to be relayed with the next batch.
|
||||
// It will be ignored if the peer is already known to
|
||||
// have the inventory.
|
||||
p.QueueInventory(iv)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue