diff --git a/addrmgr/addrmanager.go b/addrmgr/addrmanager.go index d2d4446d..4dc8bdbd 100644 --- a/addrmgr/addrmanager.go +++ b/addrmgr/addrmanager.go @@ -1073,7 +1073,8 @@ func (a *AddrManager) GetBestLocalAddress(remoteAddr *wire.NetAddress) *wire.Net } else { ip = net.IPv4zero } - bestAddress = wire.NewNetAddressIPPort(ip, 0, wire.SFNodeNetwork) + services := wire.SFNodeNetwork | wire.SFNodeWitness | wire.SFNodeBloom + bestAddress = wire.NewNetAddressIPPort(ip, 0, services) } return bestAddress diff --git a/blockchain/thresholdstate.go b/blockchain/thresholdstate.go index 8f0f0893..5007a043 100644 --- a/blockchain/thresholdstate.go +++ b/blockchain/thresholdstate.go @@ -316,6 +316,21 @@ func (b *BlockChain) ThresholdState(deploymentID uint32) (ThresholdState, error) return state, err } +// IsDeploymentActive returns true if the target deploymentID is active, and +// false otherwise. +// +// This function is safe for concurrent access. +func (b *BlockChain) IsDeploymentActive(deploymentID uint32) (bool, error) { + b.chainLock.Lock() + state, err := b.deploymentState(b.bestNode, deploymentID) + b.chainLock.Unlock() + if err != nil { + return false, err + } + + return state == ThresholdActive, nil +} + // deploymentState returns the current rule change threshold for a given // deploymentID. The threshold is evaluated from the point of view of the block // node passed in as the first argument to this method. diff --git a/blockmanager.go b/blockmanager.go index a4ea1dc3..7b1ff6d1 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -223,6 +223,20 @@ func (b *blockManager) startSync(peers *list.List) { enext = e.Next() sp := e.Value.(*serverPeer) + // Once the segwit soft-fork package has activated, we only + // want to sync from peers which are witness enabled to ensure + // that we fully validate all blockchain data. + segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) + if err != nil { + bmgrLog.Errorf("Unable to query for segwit "+ + "soft-fork state: %v", err) + continue + } + if segwitActive && !sp.IsWitnessEnabled() { + bmgrLog.Infof("peer %v not witness enabled, skipping", sp) + continue + } + // Remove sync candidate peers that are no longer candidates due // to passing their latest known block. NOTE: The < is // intentional as opposed to <=. While technically the peer @@ -309,8 +323,17 @@ func (b *blockManager) isSyncCandidate(sp *serverPeer) bool { return false } } else { - // The peer is not a candidate for sync if it's not a full node. - if sp.Services()&wire.SFNodeNetwork != wire.SFNodeNetwork { + // The peer is not a candidate for sync if it's not a full + // node. Additionally, if the segwit soft-fork package has + // activated, then the peer must also be upgraded. + segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) + if err != nil { + bmgrLog.Errorf("Unable to query for segwit "+ + "soft-fork state: %v", err) + } + nodeServices := sp.Services() + if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork || + (segwitActive && !sp.IsWitnessEnabled()) { return false } } @@ -703,6 +726,14 @@ func (b *blockManager) fetchHeaderBlocks() { if !haveInv { b.requestedBlocks[*node.hash] = struct{}{} b.syncPeer.requestedBlocks[*node.hash] = struct{}{} + + // If we're fetching from a witness enabled peer + // post-fork, then ensure that we receive all the + // witness data in the blocks. + if b.syncPeer.IsWitnessEnabled() { + iv.Type = wire.InvTypeWitnessBlock + } + gdmsg.AddInvVect(iv) numRequested++ } @@ -824,11 +855,15 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // are in the memory pool (either the main pool or orphan pool). func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { switch invVect.Type { + case wire.InvTypeWitnessBlock: + fallthrough case wire.InvTypeBlock: // Ask chain if the block is known to it in any form (main // chain, side chain, or orphan). return b.chain.HaveBlock(&invVect.Hash) + case wire.InvTypeWitnessTx: + fallthrough case wire.InvTypeTx: // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). @@ -894,7 +929,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // we already have and request more blocks to prevent them. for i, iv := range invVects { // Ignore unsupported inventory types. - if iv.Type != wire.InvTypeBlock && iv.Type != wire.InvTypeTx { + switch iv.Type { + case wire.InvTypeBlock: + case wire.InvTypeTx: + case wire.InvTypeWitnessBlock: + case wire.InvTypeWitnessTx: + default: continue } @@ -924,6 +964,14 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { } } + // Ignore invs block invs from non-witness enabled + // peers, as after segwit activation we only want to + // download from peers that can provide us full witness + // data for blocks. + if !imsg.peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock { + continue + } + // Add it to the request queue. imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv) continue @@ -981,6 +1029,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { requestQueue = requestQueue[1:] switch iv.Type { + case wire.InvTypeWitnessBlock: + fallthrough case wire.InvTypeBlock: // Request the block if there is not already a pending // request. @@ -988,10 +1038,17 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { b.requestedBlocks[iv.Hash] = struct{}{} b.limitMap(b.requestedBlocks, maxRequestedBlocks) imsg.peer.requestedBlocks[iv.Hash] = struct{}{} + + if imsg.peer.IsWitnessEnabled() { + iv.Type = wire.InvTypeWitnessBlock + } + gdmsg.AddInvVect(iv) numRequested++ } + case wire.InvTypeWitnessTx: + fallthrough case wire.InvTypeTx: // Request the transaction if there is not already a // pending request. @@ -999,6 +1056,13 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { b.requestedTxns[iv.Hash] = struct{}{} b.limitMap(b.requestedTxns, maxRequestedTxns) imsg.peer.requestedTxns[iv.Hash] = struct{}{} + + // If the peer is capable, request the txn + // including all witness data. + if imsg.peer.IsWitnessEnabled() { + iv.Type = wire.InvTypeWitnessTx + } + gdmsg.AddInvVect(iv) numRequested++ } diff --git a/server.go b/server.go index 97c20bac..60dbeeb0 100644 --- a/server.go +++ b/server.go @@ -40,7 +40,7 @@ import ( const ( // defaultServices describes the default services that are supported by // the server. - defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom + defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom | wire.SFNodeWitness // defaultRequiredServices describes the default services that are // required to be supported by outbound peers. @@ -210,6 +210,8 @@ type serverPeer struct { connReq *connmgr.ConnReq server *server + witnessMtx sync.Mutex + witnessEnabled bool persistent bool continueHash *chainhash.Hash relayMtx sync.Mutex @@ -243,6 +245,17 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer { } } +// IsWitnessEnabled returns true if the target serverPeer has signalled that it +// supports segregated witness. +// +// This function is safe for concurrent access. +func (sp *serverPeer) IsWitnessEnabled() bool { + sp.witnessMtx.Lock() + enabled := sp.witnessEnabled + sp.witnessMtx.Unlock() + return enabled +} + // newestBlock returns the current best block hash and height using the format // required by the configuration for the peer package. func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) { @@ -351,6 +364,14 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { // is received. sp.setDisableRelayTx(msg.DisableRelayTx) + // Determine if the peer would like to receive witness data with + // transactions, or not. + if sp.Services()&wire.SFNodeWitness == wire.SFNodeWitness { + sp.witnessMtx.Lock() + sp.witnessEnabled = true + sp.witnessMtx.Unlock() + } + // Update the address manager and request known addresses from the // remote peer for outbound connections. This is skipped when running // on the simulation test network since it is only intended to connect @@ -358,8 +379,28 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { // discovered peers. if !cfg.SimNet { addrManager := sp.server.addrManager + // Outbound connections. if !sp.Inbound() { + // After soft-fork activation, only make outbound + // connection to peers if they flag that they're segwit + // enabled. + chain := sp.server.blockManager.chain + segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit) + if err != nil { + peerLog.Errorf("Unable to query for segwit "+ + "soft-fork state: %v", err) + return + } + + if segwitActive && !sp.IsWitnessEnabled() { + peerLog.Infof("Disconnecting non-segwit "+ + "peer %v, isn't segwit segwit enabled and "+ + "we need more segwit enabled peers", sp) + sp.Disconnect() + return + } + // TODO(davec): Only do this if not doing the initial block // download and the local address is routable. if !cfg.DisableListen /* && isCurrent? */ { @@ -568,12 +609,18 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { } var err error switch iv.Type { + case wire.InvTypeWitnessTx: + err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding) case wire.InvTypeTx: - err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan) + err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) + case wire.InvTypeWitnessBlock: + err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding) case wire.InvTypeBlock: - err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan) + err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) + case wire.InvTypeFilteredWitnessBlock: + err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding) case wire.InvTypeFilteredBlock: - err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan) + err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) default: peerLog.Warnf("Unknown type in inventory request %d", iv.Type) @@ -1050,7 +1097,9 @@ func (s *server) AnnounceNewTransactions(newTxs []*mempool.TxDesc) { // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction hash is not known. -func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { +func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, + waitChan <-chan struct{}, encoding wire.MessageEncoding) error { + // Attempt to fetch the requested transaction from the pool. A // call could be made to check for existence first, but simply trying // to fetch a missing transaction results in the same behavior. @@ -1070,14 +1119,16 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- <-waitChan } - sp.QueueMessage(tx.MsgTx(), doneChan) + sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding) return nil } // pushBlockMsg sends a block message for the provided block hash to the // connected peer. An error is returned if the block hash is not known. -func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { +func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, + waitChan <-chan struct{}, encoding wire.MessageEncoding) error { + // Fetch the raw block bytes from the database. var blockBytes []byte err := sp.server.db.View(func(dbTx database.Tx) error { @@ -1121,7 +1172,7 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha if !sendInv { dc = doneChan } - sp.QueueMessage(&msgBlock, dc) + sp.QueueMessageWithEncoding(&msgBlock, dc, encoding) // When the peer requests the final block that was advertised in // response to a getblocks message which requested more blocks than @@ -1143,7 +1194,9 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha // the connected peer. Since a merkle block requires the peer to have a filter // loaded, this call will simply be ignored if there is no filter loaded. An // error is returned if the block hash is not known. -func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { +func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, + doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) error { + // Do not send a response if the peer doesn't have a filter loaded. if !sp.filter.IsLoaded() { if doneChan != nil { @@ -1190,7 +1243,8 @@ func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneCh dc = doneChan } if txIndex < uint32(len(blkTransactions)) { - sp.QueueMessage(blkTransactions[txIndex], dc) + sp.QueueMessageWithEncoding(blkTransactions[txIndex], dc, + encoding) } } @@ -1627,7 +1681,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { ChainParams: sp.server.chainParams, Services: sp.server.services, DisableRelayTx: cfg.BlocksOnly, - ProtocolVersion: wire.FeeFilterVersion, + ProtocolVersion: peer.MaxProtocolVersion, } }