BIP0144: properly fetch witness data from witness-enabled peers

This commit modifies the logic within the block manager and service to
preferentially fetch transactions and blocks which include witness data
from fully upgraded peers.

Once the initial version handshake has completed, the server now tracks
which of the connected peers are witness enabled (they advertise
SFNodeWitness). From then on, if a peer is witness enabled, then btcd
will always request full witness data when fetching
transactions/blocks.
This commit is contained in:
Olaoluwa Osuntokun 2016-10-18 16:54:55 -07:00 committed by Dave Collins
parent 7a1456aae5
commit 0db14c740b
4 changed files with 149 additions and 15 deletions

View file

@ -1073,7 +1073,8 @@ func (a *AddrManager) GetBestLocalAddress(remoteAddr *wire.NetAddress) *wire.Net
} else { } else {
ip = net.IPv4zero ip = net.IPv4zero
} }
bestAddress = wire.NewNetAddressIPPort(ip, 0, wire.SFNodeNetwork) services := wire.SFNodeNetwork | wire.SFNodeWitness | wire.SFNodeBloom
bestAddress = wire.NewNetAddressIPPort(ip, 0, services)
} }
return bestAddress return bestAddress

View file

@ -316,6 +316,21 @@ func (b *BlockChain) ThresholdState(deploymentID uint32) (ThresholdState, error)
return state, err return state, err
} }
// IsDeploymentActive returns true if the target deploymentID is active, and
// false otherwise.
//
// This function is safe for concurrent access.
func (b *BlockChain) IsDeploymentActive(deploymentID uint32) (bool, error) {
b.chainLock.Lock()
state, err := b.deploymentState(b.bestNode, deploymentID)
b.chainLock.Unlock()
if err != nil {
return false, err
}
return state == ThresholdActive, nil
}
// deploymentState returns the current rule change threshold for a given // deploymentState returns the current rule change threshold for a given
// deploymentID. The threshold is evaluated from the point of view of the block // deploymentID. The threshold is evaluated from the point of view of the block
// node passed in as the first argument to this method. // node passed in as the first argument to this method.

View file

@ -223,6 +223,20 @@ func (b *blockManager) startSync(peers *list.List) {
enext = e.Next() enext = e.Next()
sp := e.Value.(*serverPeer) sp := e.Value.(*serverPeer)
// Once the segwit soft-fork package has activated, we only
// want to sync from peers which are witness enabled to ensure
// that we fully validate all blockchain data.
segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
if err != nil {
bmgrLog.Errorf("Unable to query for segwit "+
"soft-fork state: %v", err)
continue
}
if segwitActive && !sp.IsWitnessEnabled() {
bmgrLog.Infof("peer %v not witness enabled, skipping", sp)
continue
}
// Remove sync candidate peers that are no longer candidates due // Remove sync candidate peers that are no longer candidates due
// to passing their latest known block. NOTE: The < is // to passing their latest known block. NOTE: The < is
// intentional as opposed to <=. While technically the peer // intentional as opposed to <=. While technically the peer
@ -309,8 +323,17 @@ func (b *blockManager) isSyncCandidate(sp *serverPeer) bool {
return false return false
} }
} else { } else {
// The peer is not a candidate for sync if it's not a full node. // The peer is not a candidate for sync if it's not a full
if sp.Services()&wire.SFNodeNetwork != wire.SFNodeNetwork { // node. Additionally, if the segwit soft-fork package has
// activated, then the peer must also be upgraded.
segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
if err != nil {
bmgrLog.Errorf("Unable to query for segwit "+
"soft-fork state: %v", err)
}
nodeServices := sp.Services()
if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
(segwitActive && !sp.IsWitnessEnabled()) {
return false return false
} }
} }
@ -703,6 +726,14 @@ func (b *blockManager) fetchHeaderBlocks() {
if !haveInv { if !haveInv {
b.requestedBlocks[*node.hash] = struct{}{} b.requestedBlocks[*node.hash] = struct{}{}
b.syncPeer.requestedBlocks[*node.hash] = struct{}{} b.syncPeer.requestedBlocks[*node.hash] = struct{}{}
// If we're fetching from a witness enabled peer
// post-fork, then ensure that we receive all the
// witness data in the blocks.
if b.syncPeer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessBlock
}
gdmsg.AddInvVect(iv) gdmsg.AddInvVect(iv)
numRequested++ numRequested++
} }
@ -824,11 +855,15 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
// are in the memory pool (either the main pool or orphan pool). // are in the memory pool (either the main pool or orphan pool).
func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) {
switch invVect.Type { switch invVect.Type {
case wire.InvTypeWitnessBlock:
fallthrough
case wire.InvTypeBlock: case wire.InvTypeBlock:
// Ask chain if the block is known to it in any form (main // Ask chain if the block is known to it in any form (main
// chain, side chain, or orphan). // chain, side chain, or orphan).
return b.chain.HaveBlock(&invVect.Hash) return b.chain.HaveBlock(&invVect.Hash)
case wire.InvTypeWitnessTx:
fallthrough
case wire.InvTypeTx: case wire.InvTypeTx:
// Ask the transaction memory pool if the transaction is known // Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan). // to it in any form (main pool or orphan).
@ -894,7 +929,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// we already have and request more blocks to prevent them. // we already have and request more blocks to prevent them.
for i, iv := range invVects { for i, iv := range invVects {
// Ignore unsupported inventory types. // Ignore unsupported inventory types.
if iv.Type != wire.InvTypeBlock && iv.Type != wire.InvTypeTx { switch iv.Type {
case wire.InvTypeBlock:
case wire.InvTypeTx:
case wire.InvTypeWitnessBlock:
case wire.InvTypeWitnessTx:
default:
continue continue
} }
@ -924,6 +964,14 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
} }
} }
// Ignore invs block invs from non-witness enabled
// peers, as after segwit activation we only want to
// download from peers that can provide us full witness
// data for blocks.
if !imsg.peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
continue
}
// Add it to the request queue. // Add it to the request queue.
imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv) imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv)
continue continue
@ -981,6 +1029,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
requestQueue = requestQueue[1:] requestQueue = requestQueue[1:]
switch iv.Type { switch iv.Type {
case wire.InvTypeWitnessBlock:
fallthrough
case wire.InvTypeBlock: case wire.InvTypeBlock:
// Request the block if there is not already a pending // Request the block if there is not already a pending
// request. // request.
@ -988,10 +1038,17 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
b.requestedBlocks[iv.Hash] = struct{}{} b.requestedBlocks[iv.Hash] = struct{}{}
b.limitMap(b.requestedBlocks, maxRequestedBlocks) b.limitMap(b.requestedBlocks, maxRequestedBlocks)
imsg.peer.requestedBlocks[iv.Hash] = struct{}{} imsg.peer.requestedBlocks[iv.Hash] = struct{}{}
if imsg.peer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessBlock
}
gdmsg.AddInvVect(iv) gdmsg.AddInvVect(iv)
numRequested++ numRequested++
} }
case wire.InvTypeWitnessTx:
fallthrough
case wire.InvTypeTx: case wire.InvTypeTx:
// Request the transaction if there is not already a // Request the transaction if there is not already a
// pending request. // pending request.
@ -999,6 +1056,13 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
b.requestedTxns[iv.Hash] = struct{}{} b.requestedTxns[iv.Hash] = struct{}{}
b.limitMap(b.requestedTxns, maxRequestedTxns) b.limitMap(b.requestedTxns, maxRequestedTxns)
imsg.peer.requestedTxns[iv.Hash] = struct{}{} imsg.peer.requestedTxns[iv.Hash] = struct{}{}
// If the peer is capable, request the txn
// including all witness data.
if imsg.peer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessTx
}
gdmsg.AddInvVect(iv) gdmsg.AddInvVect(iv)
numRequested++ numRequested++
} }

View file

@ -40,7 +40,7 @@ import (
const ( const (
// defaultServices describes the default services that are supported by // defaultServices describes the default services that are supported by
// the server. // the server.
defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom | wire.SFNodeWitness
// defaultRequiredServices describes the default services that are // defaultRequiredServices describes the default services that are
// required to be supported by outbound peers. // required to be supported by outbound peers.
@ -210,6 +210,8 @@ type serverPeer struct {
connReq *connmgr.ConnReq connReq *connmgr.ConnReq
server *server server *server
witnessMtx sync.Mutex
witnessEnabled bool
persistent bool persistent bool
continueHash *chainhash.Hash continueHash *chainhash.Hash
relayMtx sync.Mutex relayMtx sync.Mutex
@ -243,6 +245,17 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer {
} }
} }
// IsWitnessEnabled returns true if the target serverPeer has signalled that it
// supports segregated witness.
//
// This function is safe for concurrent access.
func (sp *serverPeer) IsWitnessEnabled() bool {
sp.witnessMtx.Lock()
enabled := sp.witnessEnabled
sp.witnessMtx.Unlock()
return enabled
}
// newestBlock returns the current best block hash and height using the format // newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package. // required by the configuration for the peer package.
func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) { func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) {
@ -351,6 +364,14 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// is received. // is received.
sp.setDisableRelayTx(msg.DisableRelayTx) sp.setDisableRelayTx(msg.DisableRelayTx)
// Determine if the peer would like to receive witness data with
// transactions, or not.
if sp.Services()&wire.SFNodeWitness == wire.SFNodeWitness {
sp.witnessMtx.Lock()
sp.witnessEnabled = true
sp.witnessMtx.Unlock()
}
// Update the address manager and request known addresses from the // Update the address manager and request known addresses from the
// remote peer for outbound connections. This is skipped when running // remote peer for outbound connections. This is skipped when running
// on the simulation test network since it is only intended to connect // on the simulation test network since it is only intended to connect
@ -358,8 +379,28 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// discovered peers. // discovered peers.
if !cfg.SimNet { if !cfg.SimNet {
addrManager := sp.server.addrManager addrManager := sp.server.addrManager
// Outbound connections. // Outbound connections.
if !sp.Inbound() { if !sp.Inbound() {
// After soft-fork activation, only make outbound
// connection to peers if they flag that they're segwit
// enabled.
chain := sp.server.blockManager.chain
segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
if err != nil {
peerLog.Errorf("Unable to query for segwit "+
"soft-fork state: %v", err)
return
}
if segwitActive && !sp.IsWitnessEnabled() {
peerLog.Infof("Disconnecting non-segwit "+
"peer %v, isn't segwit segwit enabled and "+
"we need more segwit enabled peers", sp)
sp.Disconnect()
return
}
// TODO(davec): Only do this if not doing the initial block // TODO(davec): Only do this if not doing the initial block
// download and the local address is routable. // download and the local address is routable.
if !cfg.DisableListen /* && isCurrent? */ { if !cfg.DisableListen /* && isCurrent? */ {
@ -568,12 +609,18 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
} }
var err error var err error
switch iv.Type { switch iv.Type {
case wire.InvTypeWitnessTx:
err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
case wire.InvTypeTx: case wire.InvTypeTx:
err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan) err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
case wire.InvTypeWitnessBlock:
err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
case wire.InvTypeBlock: case wire.InvTypeBlock:
err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan) err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
case wire.InvTypeFilteredWitnessBlock:
err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
case wire.InvTypeFilteredBlock: case wire.InvTypeFilteredBlock:
err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan) err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
default: default:
peerLog.Warnf("Unknown type in inventory request %d", peerLog.Warnf("Unknown type in inventory request %d",
iv.Type) iv.Type)
@ -1050,7 +1097,9 @@ func (s *server) AnnounceNewTransactions(newTxs []*mempool.TxDesc) {
// pushTxMsg sends a tx message for the provided transaction hash to the // pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction hash is not known. // connected peer. An error is returned if the transaction hash is not known.
func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
// Attempt to fetch the requested transaction from the pool. A // Attempt to fetch the requested transaction from the pool. A
// call could be made to check for existence first, but simply trying // call could be made to check for existence first, but simply trying
// to fetch a missing transaction results in the same behavior. // to fetch a missing transaction results in the same behavior.
@ -1070,14 +1119,16 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<-
<-waitChan <-waitChan
} }
sp.QueueMessage(tx.MsgTx(), doneChan) sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)
return nil return nil
} }
// pushBlockMsg sends a block message for the provided block hash to the // pushBlockMsg sends a block message for the provided block hash to the
// connected peer. An error is returned if the block hash is not known. // connected peer. An error is returned if the block hash is not known.
func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
// Fetch the raw block bytes from the database. // Fetch the raw block bytes from the database.
var blockBytes []byte var blockBytes []byte
err := sp.server.db.View(func(dbTx database.Tx) error { err := sp.server.db.View(func(dbTx database.Tx) error {
@ -1121,7 +1172,7 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha
if !sendInv { if !sendInv {
dc = doneChan dc = doneChan
} }
sp.QueueMessage(&msgBlock, dc) sp.QueueMessageWithEncoding(&msgBlock, dc, encoding)
// When the peer requests the final block that was advertised in // When the peer requests the final block that was advertised in
// response to a getblocks message which requested more blocks than // response to a getblocks message which requested more blocks than
@ -1143,7 +1194,9 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha
// the connected peer. Since a merkle block requires the peer to have a filter // the connected peer. Since a merkle block requires the peer to have a filter
// loaded, this call will simply be ignored if there is no filter loaded. An // loaded, this call will simply be ignored if there is no filter loaded. An
// error is returned if the block hash is not known. // error is returned if the block hash is not known.
func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
// Do not send a response if the peer doesn't have a filter loaded. // Do not send a response if the peer doesn't have a filter loaded.
if !sp.filter.IsLoaded() { if !sp.filter.IsLoaded() {
if doneChan != nil { if doneChan != nil {
@ -1190,7 +1243,8 @@ func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneCh
dc = doneChan dc = doneChan
} }
if txIndex < uint32(len(blkTransactions)) { if txIndex < uint32(len(blkTransactions)) {
sp.QueueMessage(blkTransactions[txIndex], dc) sp.QueueMessageWithEncoding(blkTransactions[txIndex], dc,
encoding)
} }
} }
@ -1627,7 +1681,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
ChainParams: sp.server.chainParams, ChainParams: sp.server.chainParams,
Services: sp.server.services, Services: sp.server.services,
DisableRelayTx: cfg.BlocksOnly, DisableRelayTx: cfg.BlocksOnly,
ProtocolVersion: wire.FeeFilterVersion, ProtocolVersion: peer.MaxProtocolVersion,
} }
} }