blockmanager: Remove serverPeer from blockmanager completely.
The purpose is to remove the dependency of blockmanager on serverPeer, which is defined in the main package. Instead, we split out some of the fields from serverPeer into a separate struct called peerSyncState in blockmanager.go. While they are in the same package now, this change makes it easier to move blockManager into its own package along with peerSyncState. The blockManager tracks a map of Peer pointers to the peer state and keeps it updated as peers connect and disconnect.
This commit is contained in:
parent
088ccfd828
commit
08955805d5
4 changed files with 218 additions and 173 deletions
313
blockmanager.go
313
blockmanager.go
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/database"
|
||||
"github.com/btcsuite/btcd/mempool"
|
||||
peerpkg "github.com/btcsuite/btcd/peer"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btcutil"
|
||||
)
|
||||
|
@ -49,14 +50,14 @@ var zeroHash chainhash.Hash
|
|||
|
||||
// newPeerMsg signifies a newly connected peer to the block handler.
|
||||
type newPeerMsg struct {
|
||||
peer *serverPeer
|
||||
peer *peerpkg.Peer
|
||||
}
|
||||
|
||||
// blockMsg packages a bitcoin block message and the peer it came from together
|
||||
// so the block handler has access to that information.
|
||||
type blockMsg struct {
|
||||
block *btcutil.Block
|
||||
peer *serverPeer
|
||||
peer *peerpkg.Peer
|
||||
reply chan struct{}
|
||||
}
|
||||
|
||||
|
@ -64,33 +65,33 @@ type blockMsg struct {
|
|||
// so the block handler has access to that information.
|
||||
type invMsg struct {
|
||||
inv *wire.MsgInv
|
||||
peer *serverPeer
|
||||
peer *peerpkg.Peer
|
||||
}
|
||||
|
||||
// headersMsg packages a bitcoin headers message and the peer it came from
|
||||
// together so the block handler has access to that information.
|
||||
type headersMsg struct {
|
||||
headers *wire.MsgHeaders
|
||||
peer *serverPeer
|
||||
peer *peerpkg.Peer
|
||||
}
|
||||
|
||||
// donePeerMsg signifies a newly disconnected peer to the block handler.
|
||||
type donePeerMsg struct {
|
||||
peer *serverPeer
|
||||
peer *peerpkg.Peer
|
||||
}
|
||||
|
||||
// txMsg packages a bitcoin tx message and the peer it came from together
|
||||
// so the block handler has access to that information.
|
||||
type txMsg struct {
|
||||
tx *btcutil.Tx
|
||||
peer *serverPeer
|
||||
peer *peerpkg.Peer
|
||||
reply chan struct{}
|
||||
}
|
||||
|
||||
// getSyncPeerMsg is a message type to be sent across the message channel for
|
||||
// retrieving the current sync peer.
|
||||
type getSyncPeerMsg struct {
|
||||
reply chan *serverPeer
|
||||
reply chan int32
|
||||
}
|
||||
|
||||
// processBlockResponse is a response sent to the reply channel of a
|
||||
|
@ -138,7 +139,7 @@ type headerNode struct {
|
|||
type PeerNotifier interface {
|
||||
AnnounceNewTransactions(newTxs []*mempool.TxDesc)
|
||||
|
||||
UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *serverPeer)
|
||||
UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peerpkg.Peer)
|
||||
|
||||
RelayInventory(invVect *wire.InvVect, data interface{})
|
||||
|
||||
|
@ -157,23 +158,35 @@ type blockManagerConfig struct {
|
|||
MaxPeers int
|
||||
}
|
||||
|
||||
// peerSyncState stores additional information that the blockManager tracks
|
||||
// about a peer.
|
||||
type peerSyncState struct {
|
||||
syncCandidate bool
|
||||
requestQueue []*wire.InvVect
|
||||
requestedTxns map[chainhash.Hash]struct{}
|
||||
requestedBlocks map[chainhash.Hash]struct{}
|
||||
}
|
||||
|
||||
// blockManager provides a concurrency safe block manager for handling all
|
||||
// incoming blocks.
|
||||
type blockManager struct {
|
||||
peerNotifier PeerNotifier
|
||||
started int32
|
||||
shutdown int32
|
||||
chain *blockchain.BlockChain
|
||||
txMemPool *mempool.TxPool
|
||||
chainParams *chaincfg.Params
|
||||
peerNotifier PeerNotifier
|
||||
started int32
|
||||
shutdown int32
|
||||
chain *blockchain.BlockChain
|
||||
txMemPool *mempool.TxPool
|
||||
chainParams *chaincfg.Params
|
||||
progressLogger *blockProgressLogger
|
||||
msgChan chan interface{}
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
|
||||
// These fields should only be accessed from the blockHandler thread
|
||||
rejectedTxns map[chainhash.Hash]struct{}
|
||||
requestedTxns map[chainhash.Hash]struct{}
|
||||
requestedBlocks map[chainhash.Hash]struct{}
|
||||
progressLogger *blockProgressLogger
|
||||
syncPeer *serverPeer
|
||||
msgChan chan interface{}
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
syncPeer *peerpkg.Peer
|
||||
peerStates map[*peerpkg.Peer]*peerSyncState
|
||||
|
||||
// The following fields are used for headers-first mode.
|
||||
headersFirstMode bool
|
||||
|
@ -230,30 +243,30 @@ func (b *blockManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoi
|
|||
// download/sync the blockchain from. When syncing is already running, it
|
||||
// simply returns. It also examines the candidates for any which are no longer
|
||||
// candidates and removes them as needed.
|
||||
func (b *blockManager) startSync(peers *list.List) {
|
||||
func (b *blockManager) startSync() {
|
||||
// Return now if we're already syncing.
|
||||
if b.syncPeer != nil {
|
||||
return
|
||||
}
|
||||
|
||||
best := b.chain.BestSnapshot()
|
||||
var bestPeer *serverPeer
|
||||
var enext *list.Element
|
||||
for e := peers.Front(); e != nil; e = enext {
|
||||
enext = e.Next()
|
||||
sp := e.Value.(*serverPeer)
|
||||
// Once the segwit soft-fork package has activated, we only
|
||||
// want to sync from peers which are witness enabled to ensure
|
||||
// that we fully validate all blockchain data.
|
||||
segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
|
||||
if err != nil {
|
||||
bmgrLog.Errorf("Unable to query for segwit soft-fork state: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Once the segwit soft-fork package has activated, we only
|
||||
// want to sync from peers which are witness enabled to ensure
|
||||
// that we fully validate all blockchain data.
|
||||
segwitActive, err := b.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
|
||||
if err != nil {
|
||||
bmgrLog.Errorf("Unable to query for segwit "+
|
||||
"soft-fork state: %v", err)
|
||||
best := b.chain.BestSnapshot()
|
||||
var bestPeer *peerpkg.Peer
|
||||
for peer, state := range b.peerStates {
|
||||
if !state.syncCandidate {
|
||||
continue
|
||||
}
|
||||
if segwitActive && !sp.IsWitnessEnabled() {
|
||||
bmgrLog.Infof("peer %v not witness enabled, skipping", sp)
|
||||
|
||||
if segwitActive && !peer.IsWitnessEnabled() {
|
||||
bmgrLog.Debugf("peer %v not witness enabled, skipping", peer)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -263,14 +276,14 @@ func (b *blockManager) startSync(peers *list.List) {
|
|||
// doesn't have a later block when it's equal, it will likely
|
||||
// have one soon so it is a reasonable choice. It also allows
|
||||
// the case where both are at 0 such as during regression test.
|
||||
if sp.LastBlock() < best.Height {
|
||||
peers.Remove(e)
|
||||
if peer.LastBlock() < best.Height {
|
||||
state.syncCandidate = false
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO(davec): Use a better algorithm to choose the best peer.
|
||||
// For now, just pick the first available candidate.
|
||||
bestPeer = sp
|
||||
bestPeer = peer
|
||||
}
|
||||
|
||||
// Start syncing from the best peer if one was selected.
|
||||
|
@ -327,14 +340,14 @@ func (b *blockManager) startSync(peers *list.List) {
|
|||
|
||||
// isSyncCandidate returns whether or not the peer is a candidate to consider
|
||||
// syncing from.
|
||||
func (b *blockManager) isSyncCandidate(sp *serverPeer) bool {
|
||||
func (b *blockManager) isSyncCandidate(peer *peerpkg.Peer) bool {
|
||||
// Typically a peer is not a candidate for sync if it's not a full node,
|
||||
// however regression test is special in that the regression tool is
|
||||
// not a full node and still needs to be considered a sync candidate.
|
||||
if b.chainParams == &chaincfg.RegressionNetParams {
|
||||
// The peer is not a candidate if it's not coming from localhost
|
||||
// or the hostname can't be determined for some reason.
|
||||
host, _, err := net.SplitHostPort(sp.Addr())
|
||||
host, _, err := net.SplitHostPort(peer.Addr())
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
@ -351,9 +364,9 @@ func (b *blockManager) isSyncCandidate(sp *serverPeer) bool {
|
|||
bmgrLog.Errorf("Unable to query for segwit "+
|
||||
"soft-fork state: %v", err)
|
||||
}
|
||||
nodeServices := sp.Services()
|
||||
nodeServices := peer.Services()
|
||||
if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
|
||||
(segwitActive && !sp.IsWitnessEnabled()) {
|
||||
(segwitActive && !peer.IsWitnessEnabled()) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
@ -365,70 +378,80 @@ func (b *blockManager) isSyncCandidate(sp *serverPeer) bool {
|
|||
// handleNewPeerMsg deals with new peers that have signalled they may
|
||||
// be considered as a sync peer (they have already successfully negotiated). It
|
||||
// also starts syncing if needed. It is invoked from the syncHandler goroutine.
|
||||
func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *serverPeer) {
|
||||
func (b *blockManager) handleNewPeerMsg(peer *peerpkg.Peer) {
|
||||
// Ignore if in the process of shutting down.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
bmgrLog.Infof("New valid peer %s (%s)", sp, sp.UserAgent())
|
||||
bmgrLog.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
|
||||
|
||||
// Ignore the peer if it's not a sync candidate.
|
||||
if !b.isSyncCandidate(sp) {
|
||||
return
|
||||
// Initialize the peer state
|
||||
isSyncCandidate := b.isSyncCandidate(peer)
|
||||
b.peerStates[peer] = &peerSyncState{
|
||||
syncCandidate: isSyncCandidate,
|
||||
requestedTxns: make(map[chainhash.Hash]struct{}),
|
||||
requestedBlocks: make(map[chainhash.Hash]struct{}),
|
||||
}
|
||||
|
||||
// Add the peer as a candidate to sync from.
|
||||
peers.PushBack(sp)
|
||||
|
||||
// Start syncing by choosing the best candidate if needed.
|
||||
b.startSync(peers)
|
||||
if isSyncCandidate && b.syncPeer == nil {
|
||||
b.startSync()
|
||||
}
|
||||
}
|
||||
|
||||
// handleDonePeerMsg deals with peers that have signalled they are done. It
|
||||
// removes the peer as a candidate for syncing and in the case where it was
|
||||
// the current sync peer, attempts to select a new best peer to sync from. It
|
||||
// is invoked from the syncHandler goroutine.
|
||||
func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) {
|
||||
// Remove the peer from the list of candidate peers.
|
||||
for e := peers.Front(); e != nil; e = e.Next() {
|
||||
if e.Value == sp {
|
||||
peers.Remove(e)
|
||||
break
|
||||
}
|
||||
func (b *blockManager) handleDonePeerMsg(peer *peerpkg.Peer) {
|
||||
state, exists := b.peerStates[peer]
|
||||
if !exists {
|
||||
bmgrLog.Warnf("Received done peer message for unknown peer %s", peer)
|
||||
return
|
||||
}
|
||||
|
||||
bmgrLog.Infof("Lost peer %s", sp)
|
||||
// Remove the peer from the list of candidate peers.
|
||||
delete(b.peerStates, peer)
|
||||
|
||||
bmgrLog.Infof("Lost peer %s", peer)
|
||||
|
||||
// Remove requested transactions from the global map so that they will
|
||||
// be fetched from elsewhere next time we get an inv.
|
||||
for k := range sp.requestedTxns {
|
||||
delete(b.requestedTxns, k)
|
||||
for txHash := range state.requestedTxns {
|
||||
delete(b.requestedTxns, txHash)
|
||||
}
|
||||
|
||||
// Remove requested blocks from the global map so that they will be
|
||||
// fetched from elsewhere next time we get an inv.
|
||||
// TODO: we could possibly here check which peers have these blocks
|
||||
// and request them now to speed things up a little.
|
||||
for k := range sp.requestedBlocks {
|
||||
delete(b.requestedBlocks, k)
|
||||
for blockHash := range state.requestedBlocks {
|
||||
delete(b.requestedBlocks, blockHash)
|
||||
}
|
||||
|
||||
// Attempt to find a new peer to sync from if the quitting peer is the
|
||||
// sync peer. Also, reset the headers-first state if in headers-first
|
||||
// mode so
|
||||
if b.syncPeer != nil && b.syncPeer == sp {
|
||||
if b.syncPeer == peer {
|
||||
b.syncPeer = nil
|
||||
if b.headersFirstMode {
|
||||
best := b.chain.BestSnapshot()
|
||||
b.resetHeaderState(&best.Hash, best.Height)
|
||||
}
|
||||
b.startSync(peers)
|
||||
b.startSync()
|
||||
}
|
||||
}
|
||||
|
||||
// handleTxMsg handles transaction messages from all peers.
|
||||
func (b *blockManager) handleTxMsg(tmsg *txMsg) {
|
||||
peer := tmsg.peer
|
||||
state, exists := b.peerStates[peer]
|
||||
if !exists {
|
||||
bmgrLog.Warnf("Received tx message from unknown peer %s", peer)
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of
|
||||
// sending an inventory message and allowing the remote peer to decide
|
||||
// whether or not they want to request the transaction via a getdata
|
||||
|
@ -442,22 +465,22 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
|
|||
// Ignore transactions that we have already rejected. Do not
|
||||
// send a reject message here because if the transaction was already
|
||||
// rejected, the transaction was unsolicited.
|
||||
if _, exists := b.rejectedTxns[*txHash]; exists {
|
||||
if _, exists = b.rejectedTxns[*txHash]; exists {
|
||||
bmgrLog.Debugf("Ignoring unsolicited previously rejected "+
|
||||
"transaction %v from %s", txHash, tmsg.peer)
|
||||
"transaction %v from %s", txHash, peer)
|
||||
return
|
||||
}
|
||||
|
||||
// Process the transaction to include validation, insertion in the
|
||||
// memory pool, orphan handling, etc.
|
||||
acceptedTxs, err := b.txMemPool.ProcessTransaction(tmsg.tx,
|
||||
true, true, mempool.Tag(tmsg.peer.ID()))
|
||||
true, true, mempool.Tag(peer.ID()))
|
||||
|
||||
// Remove transaction from request maps. Either the mempool/chain
|
||||
// already knows about it and as such we shouldn't have any more
|
||||
// instances of trying to fetch it, or we failed to insert and thus
|
||||
// we'll retry next time we get an inv.
|
||||
delete(tmsg.peer.requestedTxns, *txHash)
|
||||
delete(state.requestedTxns, *txHash)
|
||||
delete(b.requestedTxns, *txHash)
|
||||
|
||||
if err != nil {
|
||||
|
@ -472,7 +495,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
|
|||
// so log it as an actual error.
|
||||
if _, ok := err.(mempool.RuleError); ok {
|
||||
bmgrLog.Debugf("Rejected transaction %v from %s: %v",
|
||||
txHash, tmsg.peer, err)
|
||||
txHash, peer, err)
|
||||
} else {
|
||||
bmgrLog.Errorf("Failed to process transaction %v: %v",
|
||||
txHash, err)
|
||||
|
@ -481,8 +504,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
|
|||
// Convert the error into an appropriate reject message and
|
||||
// send it.
|
||||
code, reason := mempool.ErrToRejectErr(err)
|
||||
tmsg.peer.PushRejectMsg(wire.CmdTx, code, reason, txHash,
|
||||
false)
|
||||
peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -512,9 +534,16 @@ func (b *blockManager) current() bool {
|
|||
|
||||
// handleBlockMsg handles block messages from all peers.
|
||||
func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
||||
peer := bmsg.peer
|
||||
state, exists := b.peerStates[peer]
|
||||
if !exists {
|
||||
bmgrLog.Warnf("Received block message from unknown peer %s", peer)
|
||||
return
|
||||
}
|
||||
|
||||
// If we didn't ask for this block then the peer is misbehaving.
|
||||
blockHash := bmsg.block.Hash()
|
||||
if _, exists := bmsg.peer.requestedBlocks[*blockHash]; !exists {
|
||||
if _, exists = state.requestedBlocks[*blockHash]; !exists {
|
||||
// The regression test intentionally sends some blocks twice
|
||||
// to test duplicate block insertion fails. Don't disconnect
|
||||
// the peer or ignore the block when we're in regression test
|
||||
|
@ -522,8 +551,8 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
// duplicate blocks.
|
||||
if b.chainParams != &chaincfg.RegressionNetParams {
|
||||
bmgrLog.Warnf("Got unrequested block %v from %s -- "+
|
||||
"disconnecting", blockHash, bmsg.peer.Addr())
|
||||
bmsg.peer.Disconnect()
|
||||
"disconnecting", blockHash, peer.Addr())
|
||||
peer.Disconnect()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -555,7 +584,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
// Remove block from request maps. Either chain will know about it and
|
||||
// so we shouldn't have any more instances of trying to fetch it, or we
|
||||
// will fail the insert and thus we'll retry next time we get an inv.
|
||||
delete(bmsg.peer.requestedBlocks, *blockHash)
|
||||
delete(state.requestedBlocks, *blockHash)
|
||||
delete(b.requestedBlocks, *blockHash)
|
||||
|
||||
// Process the block to include validation, best chain selection, orphan
|
||||
|
@ -568,7 +597,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
// it as an actual error.
|
||||
if _, ok := err.(blockchain.RuleError); ok {
|
||||
bmgrLog.Infof("Rejected block %v from %s: %v", blockHash,
|
||||
bmsg.peer, err)
|
||||
peer, err)
|
||||
} else {
|
||||
bmgrLog.Errorf("Failed to process block %v: %v",
|
||||
blockHash, err)
|
||||
|
@ -581,8 +610,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
// Convert the error into an appropriate reject message and
|
||||
// send it.
|
||||
code, reason := mempool.ErrToRejectErr(err)
|
||||
bmsg.peer.PushRejectMsg(wire.CmdBlock, code, reason,
|
||||
blockHash, false)
|
||||
peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -626,7 +654,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
bmgrLog.Warnf("Failed to get block locator for the "+
|
||||
"latest block: %v", err)
|
||||
} else {
|
||||
bmsg.peer.PushGetBlocksMsg(locator, orphanRoot)
|
||||
peer.PushGetBlocksMsg(locator, orphanRoot)
|
||||
}
|
||||
} else {
|
||||
// When the block is not an orphan, log information about it and
|
||||
|
@ -648,9 +676,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
// chain is "current". This avoids sending a spammy amount of messages
|
||||
// if we're syncing the chain from scratch.
|
||||
if blkHashUpdate != nil && heightUpdate != 0 {
|
||||
bmsg.peer.UpdateLastBlockHeight(heightUpdate)
|
||||
peer.UpdateLastBlockHeight(heightUpdate)
|
||||
if isOrphan || b.current() {
|
||||
go b.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer)
|
||||
go b.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate,
|
||||
peer)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -664,7 +693,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
// getting short.
|
||||
if !isCheckpointBlock {
|
||||
if b.startHeader != nil &&
|
||||
len(bmsg.peer.requestedBlocks) < minInFlightBlocks {
|
||||
len(state.requestedBlocks) < minInFlightBlocks {
|
||||
b.fetchHeaderBlocks()
|
||||
}
|
||||
return
|
||||
|
@ -679,10 +708,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight)
|
||||
if b.nextCheckpoint != nil {
|
||||
locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
|
||||
err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
|
||||
err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
|
||||
if err != nil {
|
||||
bmgrLog.Warnf("Failed to send getheaders message to "+
|
||||
"peer %s: %v", bmsg.peer.Addr(), err)
|
||||
"peer %s: %v", peer.Addr(), err)
|
||||
return
|
||||
}
|
||||
bmgrLog.Infof("Downloading headers for blocks %d to %d from "+
|
||||
|
@ -698,10 +727,10 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
b.headerList.Init()
|
||||
bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode")
|
||||
locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
|
||||
err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash)
|
||||
err = peer.PushGetBlocksMsg(locator, &zeroHash)
|
||||
if err != nil {
|
||||
bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v",
|
||||
bmsg.peer.Addr(), err)
|
||||
peer.Addr(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -735,8 +764,10 @@ func (b *blockManager) fetchHeaderBlocks() {
|
|||
"fetch: %v", err)
|
||||
}
|
||||
if !haveInv {
|
||||
syncPeerState := b.peerStates[b.syncPeer]
|
||||
|
||||
b.requestedBlocks[*node.hash] = struct{}{}
|
||||
b.syncPeer.requestedBlocks[*node.hash] = struct{}{}
|
||||
syncPeerState.requestedBlocks[*node.hash] = struct{}{}
|
||||
|
||||
// If we're fetching from a witness enabled peer
|
||||
// post-fork, then ensure that we receive all the
|
||||
|
@ -761,13 +792,20 @@ func (b *blockManager) fetchHeaderBlocks() {
|
|||
// handleHeadersMsg handles block header messages from all peers. Headers are
|
||||
// requested when performing a headers-first sync.
|
||||
func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
|
||||
peer := hmsg.peer
|
||||
_, exists := b.peerStates[peer]
|
||||
if !exists {
|
||||
bmgrLog.Warnf("Received headers message from unknown peer %s", peer)
|
||||
return
|
||||
}
|
||||
|
||||
// The remote peer is misbehaving if we didn't request headers.
|
||||
msg := hmsg.headers
|
||||
numHeaders := len(msg.Headers)
|
||||
if !b.headersFirstMode {
|
||||
bmgrLog.Warnf("Got %d unrequested headers from %s -- "+
|
||||
"disconnecting", numHeaders, hmsg.peer.Addr())
|
||||
hmsg.peer.Disconnect()
|
||||
"disconnecting", numHeaders, peer.Addr())
|
||||
peer.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -789,7 +827,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
|
|||
if prevNodeEl == nil {
|
||||
bmgrLog.Warnf("Header list does not contain a previous" +
|
||||
"element as expected -- disconnecting peer")
|
||||
hmsg.peer.Disconnect()
|
||||
peer.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -806,8 +844,8 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
|
|||
} else {
|
||||
bmgrLog.Warnf("Received block header that does not "+
|
||||
"properly connect to the chain from peer %s "+
|
||||
"-- disconnecting", hmsg.peer.Addr())
|
||||
hmsg.peer.Disconnect()
|
||||
"-- disconnecting", peer.Addr())
|
||||
peer.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -823,9 +861,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
|
|||
"%s from peer %s does NOT match "+
|
||||
"expected checkpoint hash of %s -- "+
|
||||
"disconnecting", node.height,
|
||||
node.hash, hmsg.peer.Addr(),
|
||||
node.hash, peer.Addr(),
|
||||
b.nextCheckpoint.Hash)
|
||||
hmsg.peer.Disconnect()
|
||||
peer.Disconnect()
|
||||
return
|
||||
}
|
||||
break
|
||||
|
@ -851,10 +889,10 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
|
|||
// headers starting from the latest known header and ending with the
|
||||
// next checkpoint.
|
||||
locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
|
||||
err := hmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
|
||||
err := peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
|
||||
if err != nil {
|
||||
bmgrLog.Warnf("Failed to send getheaders message to "+
|
||||
"peer %s: %v", hmsg.peer.Addr(), err)
|
||||
"peer %s: %v", peer.Addr(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -899,6 +937,13 @@ func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) {
|
|||
// handleInvMsg handles inv messages from all peers.
|
||||
// We examine the inventory advertised by the remote peer and act accordingly.
|
||||
func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
||||
peer := imsg.peer
|
||||
state, exists := b.peerStates[peer]
|
||||
if !exists {
|
||||
bmgrLog.Warnf("Received inv message from unknown peer %s", peer)
|
||||
return
|
||||
}
|
||||
|
||||
// Attempt to find the final block in the inventory list. There may
|
||||
// not be one.
|
||||
lastBlock := -1
|
||||
|
@ -915,13 +960,13 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
// announced block for this peer. We'll use this information later to
|
||||
// update the heights of peers based on blocks we've accepted that they
|
||||
// previously announced.
|
||||
if lastBlock != -1 && (imsg.peer != b.syncPeer || b.current()) {
|
||||
imsg.peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
|
||||
if lastBlock != -1 && (peer != b.syncPeer || b.current()) {
|
||||
peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
|
||||
}
|
||||
|
||||
// Ignore invs from peers that aren't the sync if we are not current.
|
||||
// Helps prevent fetching a mass of orphans.
|
||||
if imsg.peer != b.syncPeer && !b.current() {
|
||||
if peer != b.syncPeer && !b.current() {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -930,7 +975,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
if lastBlock != -1 && b.current() {
|
||||
blkHeight, err := b.chain.BlockHeightByHash(&invVects[lastBlock].Hash)
|
||||
if err == nil {
|
||||
imsg.peer.UpdateLastBlockHeight(blkHeight)
|
||||
peer.UpdateLastBlockHeight(blkHeight)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -951,7 +996,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
|
||||
// Add the inventory to the cache of known inventory
|
||||
// for the peer.
|
||||
imsg.peer.AddKnownInventory(iv)
|
||||
peer.AddKnownInventory(iv)
|
||||
|
||||
// Ignore inventory when we're in headers-first mode.
|
||||
if b.headersFirstMode {
|
||||
|
@ -979,12 +1024,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
// peers, as after segwit activation we only want to
|
||||
// download from peers that can provide us full witness
|
||||
// data for blocks.
|
||||
if !imsg.peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
|
||||
if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
|
||||
continue
|
||||
}
|
||||
|
||||
// Add it to the request queue.
|
||||
imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv)
|
||||
state.requestQueue = append(state.requestQueue, iv)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1011,7 +1056,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
"%v", err)
|
||||
continue
|
||||
}
|
||||
imsg.peer.PushGetBlocksMsg(locator, orphanRoot)
|
||||
peer.PushGetBlocksMsg(locator, orphanRoot)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1024,7 +1069,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
// final one the remote peer knows about (zero
|
||||
// stop hash).
|
||||
locator := b.chain.BlockLocatorFromHash(&iv.Hash)
|
||||
imsg.peer.PushGetBlocksMsg(locator, &zeroHash)
|
||||
peer.PushGetBlocksMsg(locator, &zeroHash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1033,7 +1078,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
// the request will be requested on the next inv message.
|
||||
numRequested := 0
|
||||
gdmsg := wire.NewMsgGetData()
|
||||
requestQueue := imsg.peer.requestQueue
|
||||
requestQueue := state.requestQueue
|
||||
for len(requestQueue) != 0 {
|
||||
iv := requestQueue[0]
|
||||
requestQueue[0] = nil
|
||||
|
@ -1048,9 +1093,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
if _, exists := b.requestedBlocks[iv.Hash]; !exists {
|
||||
b.requestedBlocks[iv.Hash] = struct{}{}
|
||||
b.limitMap(b.requestedBlocks, maxRequestedBlocks)
|
||||
imsg.peer.requestedBlocks[iv.Hash] = struct{}{}
|
||||
state.requestedBlocks[iv.Hash] = struct{}{}
|
||||
|
||||
if imsg.peer.IsWitnessEnabled() {
|
||||
if peer.IsWitnessEnabled() {
|
||||
iv.Type = wire.InvTypeWitnessBlock
|
||||
}
|
||||
|
||||
|
@ -1066,11 +1111,11 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
if _, exists := b.requestedTxns[iv.Hash]; !exists {
|
||||
b.requestedTxns[iv.Hash] = struct{}{}
|
||||
b.limitMap(b.requestedTxns, maxRequestedTxns)
|
||||
imsg.peer.requestedTxns[iv.Hash] = struct{}{}
|
||||
state.requestedTxns[iv.Hash] = struct{}{}
|
||||
|
||||
// If the peer is capable, request the txn
|
||||
// including all witness data.
|
||||
if imsg.peer.IsWitnessEnabled() {
|
||||
if peer.IsWitnessEnabled() {
|
||||
iv.Type = wire.InvTypeWitnessTx
|
||||
}
|
||||
|
||||
|
@ -1083,9 +1128,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
break
|
||||
}
|
||||
}
|
||||
imsg.peer.requestQueue = requestQueue
|
||||
state.requestQueue = requestQueue
|
||||
if len(gdmsg.InvList) > 0 {
|
||||
imsg.peer.QueueMessage(gdmsg, nil)
|
||||
peer.QueueMessage(gdmsg, nil)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1114,14 +1159,13 @@ func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
|
|||
// important because the block manager controls which blocks are needed and how
|
||||
// the fetching should proceed.
|
||||
func (b *blockManager) blockHandler() {
|
||||
candidatePeers := list.New()
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case m := <-b.msgChan:
|
||||
switch msg := m.(type) {
|
||||
case *newPeerMsg:
|
||||
b.handleNewPeerMsg(candidatePeers, msg.peer)
|
||||
b.handleNewPeerMsg(msg.peer)
|
||||
|
||||
case *txMsg:
|
||||
b.handleTxMsg(msg)
|
||||
|
@ -1138,10 +1182,14 @@ out:
|
|||
b.handleHeadersMsg(msg)
|
||||
|
||||
case *donePeerMsg:
|
||||
b.handleDonePeerMsg(candidatePeers, msg.peer)
|
||||
b.handleDonePeerMsg(msg.peer)
|
||||
|
||||
case getSyncPeerMsg:
|
||||
msg.reply <- b.syncPeer
|
||||
var peerID int32
|
||||
if b.syncPeer != nil {
|
||||
peerID = b.syncPeer.ID()
|
||||
}
|
||||
msg.reply <- peerID
|
||||
|
||||
case processBlockMsg:
|
||||
_, isOrphan, err := b.chain.ProcessBlock(
|
||||
|
@ -1251,71 +1299,71 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not
|
|||
}
|
||||
|
||||
// NewPeer informs the block manager of a newly active peer.
|
||||
func (b *blockManager) NewPeer(sp *serverPeer) {
|
||||
func (b *blockManager) NewPeer(peer *peerpkg.Peer) {
|
||||
// Ignore if we are shutting down.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
return
|
||||
}
|
||||
b.msgChan <- &newPeerMsg{peer: sp}
|
||||
b.msgChan <- &newPeerMsg{peer: peer}
|
||||
}
|
||||
|
||||
// QueueTx adds the passed transaction message and peer to the block handling
|
||||
// queue. Responds to the done channel argument after the tx message is
|
||||
// processed.
|
||||
func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer, done chan struct{}) {
|
||||
func (b *blockManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) {
|
||||
// Don't accept more transactions if we're shutting down.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
done <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
b.msgChan <- &txMsg{tx: tx, peer: sp, reply: done}
|
||||
b.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
|
||||
}
|
||||
|
||||
// QueueBlock adds the passed block message and peer to the block handling
|
||||
// queue. Responds to the done channel argument after the block message is
|
||||
// processed.
|
||||
func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer, done chan struct{}) {
|
||||
func (b *blockManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) {
|
||||
// Don't accept more blocks if we're shutting down.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
done <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
b.msgChan <- &blockMsg{block: block, peer: sp, reply: done}
|
||||
b.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
|
||||
}
|
||||
|
||||
// QueueInv adds the passed inv message and peer to the block handling queue.
|
||||
func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *serverPeer) {
|
||||
func (b *blockManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
|
||||
// No channel handling here because peers do not need to block on inv
|
||||
// messages.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
b.msgChan <- &invMsg{inv: inv, peer: sp}
|
||||
b.msgChan <- &invMsg{inv: inv, peer: peer}
|
||||
}
|
||||
|
||||
// QueueHeaders adds the passed headers message and peer to the block handling
|
||||
// queue.
|
||||
func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, sp *serverPeer) {
|
||||
func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
|
||||
// No channel handling here because peers do not need to block on
|
||||
// headers messages.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
b.msgChan <- &headersMsg{headers: headers, peer: sp}
|
||||
b.msgChan <- &headersMsg{headers: headers, peer: peer}
|
||||
}
|
||||
|
||||
// DonePeer informs the blockmanager that a peer has disconnected.
|
||||
func (b *blockManager) DonePeer(sp *serverPeer) {
|
||||
func (b *blockManager) DonePeer(peer *peerpkg.Peer) {
|
||||
// Ignore if we are shutting down.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
return
|
||||
}
|
||||
|
||||
b.msgChan <- &donePeerMsg{peer: sp}
|
||||
b.msgChan <- &donePeerMsg{peer: peer}
|
||||
}
|
||||
|
||||
// Start begins the core block handler which processes block and inv messages.
|
||||
|
@ -1345,9 +1393,9 @@ func (b *blockManager) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SyncPeer returns the current sync peer.
|
||||
func (b *blockManager) SyncPeer() *serverPeer {
|
||||
reply := make(chan *serverPeer)
|
||||
// SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
|
||||
func (b *blockManager) SyncPeerID() int32 {
|
||||
reply := make(chan int32)
|
||||
b.msgChan <- getSyncPeerMsg{reply: reply}
|
||||
return <-reply
|
||||
}
|
||||
|
@ -1391,6 +1439,7 @@ func newBlockManager(config *blockManagerConfig) (*blockManager, error) {
|
|||
rejectedTxns: make(map[chainhash.Hash]struct{}),
|
||||
requestedTxns: make(map[chainhash.Hash]struct{}),
|
||||
requestedBlocks: make(map[chainhash.Hash]struct{}),
|
||||
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
|
||||
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
|
||||
msgChan: make(chan interface{}, config.MaxPeers*3),
|
||||
headerList: list.New(),
|
||||
|
|
|
@ -258,12 +258,13 @@ func (b *rpcSyncMgr) Pause() chan<- struct{} {
|
|||
return b.blockMgr.Pause()
|
||||
}
|
||||
|
||||
// SyncPeer returns the peer that is currently the peer being used to sync from.
|
||||
// SyncPeerID returns the peer that is currently the peer being used to sync
|
||||
// from.
|
||||
//
|
||||
// This function is safe for concurrent access and is part of the
|
||||
// rpcserverSyncManager interface implementation.
|
||||
func (b *rpcSyncMgr) SyncPeer() rpcserverPeer {
|
||||
return (*rpcPeer)(b.blockMgr.SyncPeer())
|
||||
func (b *rpcSyncMgr) SyncPeerID() int32 {
|
||||
return b.blockMgr.SyncPeerID()
|
||||
}
|
||||
|
||||
// LocateBlocks returns the hashes of the blocks after the first known block in
|
||||
|
|
10
rpcserver.go
10
rpcserver.go
|
@ -2402,7 +2402,7 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru
|
|||
// handleGetPeerInfo implements the getpeerinfo command.
|
||||
func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
|
||||
peers := s.cfg.ConnMgr.ConnectedPeers()
|
||||
syncPeer := s.cfg.SyncMgr.SyncPeer().ToPeer()
|
||||
syncPeerID := s.cfg.SyncMgr.SyncPeerID()
|
||||
infos := make([]*btcjson.GetPeerInfoResult, 0, len(peers))
|
||||
for _, p := range peers {
|
||||
statsSnap := p.ToPeer().StatsSnapshot()
|
||||
|
@ -2426,7 +2426,7 @@ func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{})
|
|||
CurrentHeight: statsSnap.LastBlock,
|
||||
BanScore: int32(p.BanScore()),
|
||||
FeeFilter: p.FeeFilter(),
|
||||
SyncNode: p.ToPeer() == syncPeer,
|
||||
SyncNode: statsSnap.ID == syncPeerID,
|
||||
}
|
||||
if p.ToPeer().LastPingNonce() != 0 {
|
||||
wait := float64(time.Since(statsSnap.LastPingTime).Nanoseconds())
|
||||
|
@ -4139,9 +4139,9 @@ type rpcserverSyncManager interface {
|
|||
// Pause pauses the sync manager until the returned channel is closed.
|
||||
Pause() chan<- struct{}
|
||||
|
||||
// SyncPeer returns the peer that is currently the peer being used to
|
||||
// sync from.
|
||||
SyncPeer() rpcserverPeer
|
||||
// SyncPeerID returns the ID of the peer that is currently the peer being
|
||||
// used to sync from or 0 if there is none.
|
||||
SyncPeerID() int32
|
||||
|
||||
// LocateBlocks returns the hashes of the blocks after the first known
|
||||
// block in the provided locators until the provided stop hash or the
|
||||
|
|
61
server.go
61
server.go
|
@ -120,7 +120,7 @@ type relayMsg struct {
|
|||
type updatePeerHeightsMsg struct {
|
||||
newHash *chainhash.Hash
|
||||
newHeight int32
|
||||
originPeer *serverPeer
|
||||
originPeer *peer.Peer
|
||||
}
|
||||
|
||||
// peerState maintains state of inbound, persistent, outbound peers as well
|
||||
|
@ -212,20 +212,17 @@ type serverPeer struct {
|
|||
|
||||
*peer.Peer
|
||||
|
||||
connReq *connmgr.ConnReq
|
||||
server *server
|
||||
persistent bool
|
||||
continueHash *chainhash.Hash
|
||||
relayMtx sync.Mutex
|
||||
disableRelayTx bool
|
||||
sentAddrs bool
|
||||
requestQueue []*wire.InvVect
|
||||
requestedTxns map[chainhash.Hash]struct{}
|
||||
requestedBlocks map[chainhash.Hash]struct{}
|
||||
filter *bloom.Filter
|
||||
knownAddresses map[string]struct{}
|
||||
banScore connmgr.DynamicBanScore
|
||||
quit chan struct{}
|
||||
connReq *connmgr.ConnReq
|
||||
server *server
|
||||
persistent bool
|
||||
continueHash *chainhash.Hash
|
||||
relayMtx sync.Mutex
|
||||
disableRelayTx bool
|
||||
sentAddrs bool
|
||||
filter *bloom.Filter
|
||||
knownAddresses map[string]struct{}
|
||||
banScore connmgr.DynamicBanScore
|
||||
quit chan struct{}
|
||||
// The following chans are used to sync blockmanager and server.
|
||||
txProcessed chan struct{}
|
||||
blockProcessed chan struct{}
|
||||
|
@ -235,15 +232,13 @@ type serverPeer struct {
|
|||
// the caller.
|
||||
func newServerPeer(s *server, isPersistent bool) *serverPeer {
|
||||
return &serverPeer{
|
||||
server: s,
|
||||
persistent: isPersistent,
|
||||
requestedTxns: make(map[chainhash.Hash]struct{}),
|
||||
requestedBlocks: make(map[chainhash.Hash]struct{}),
|
||||
filter: bloom.LoadFilter(nil),
|
||||
knownAddresses: make(map[string]struct{}),
|
||||
quit: make(chan struct{}),
|
||||
txProcessed: make(chan struct{}, 1),
|
||||
blockProcessed: make(chan struct{}, 1),
|
||||
server: s,
|
||||
persistent: isPersistent,
|
||||
filter: bloom.LoadFilter(nil),
|
||||
knownAddresses: make(map[string]struct{}),
|
||||
quit: make(chan struct{}),
|
||||
txProcessed: make(chan struct{}, 1),
|
||||
blockProcessed: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -349,7 +344,7 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
|
|||
sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
|
||||
|
||||
// Signal the block manager this peer is a new sync candidate.
|
||||
sp.server.blockManager.NewPeer(sp)
|
||||
sp.server.blockManager.NewPeer(sp.Peer)
|
||||
|
||||
// Choose whether or not to relay transactions before a filter command
|
||||
// is received.
|
||||
|
@ -485,7 +480,7 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
|
|||
// processed and known good or bad. This helps prevent a malicious peer
|
||||
// from queuing up a bunch of bad transactions before disconnecting (or
|
||||
// being disconnected) and wasting memory.
|
||||
sp.server.blockManager.QueueTx(tx, sp, sp.txProcessed)
|
||||
sp.server.blockManager.QueueTx(tx, sp.Peer, sp.txProcessed)
|
||||
<-sp.txProcessed
|
||||
}
|
||||
|
||||
|
@ -511,7 +506,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
|
|||
// reference implementation processes blocks in the same
|
||||
// thread and therefore blocks further messages until
|
||||
// the bitcoin block has been fully processed.
|
||||
sp.server.blockManager.QueueBlock(block, sp, sp.blockProcessed)
|
||||
sp.server.blockManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
|
||||
<-sp.blockProcessed
|
||||
}
|
||||
|
||||
|
@ -522,7 +517,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
|
|||
func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
|
||||
if !cfg.BlocksOnly {
|
||||
if len(msg.InvList) > 0 {
|
||||
sp.server.blockManager.QueueInv(msg, sp)
|
||||
sp.server.blockManager.QueueInv(msg, sp.Peer)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -548,14 +543,14 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
|
|||
}
|
||||
|
||||
if len(newInv.InvList) > 0 {
|
||||
sp.server.blockManager.QueueInv(newInv, sp)
|
||||
sp.server.blockManager.QueueInv(newInv, sp.Peer)
|
||||
}
|
||||
}
|
||||
|
||||
// OnHeaders is invoked when a peer receives a headers bitcoin
|
||||
// message. The message is passed down to the block manager.
|
||||
func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
|
||||
sp.server.blockManager.QueueHeaders(msg, sp)
|
||||
sp.server.blockManager.QueueHeaders(msg, sp.Peer)
|
||||
}
|
||||
|
||||
// handleGetData is invoked when a peer receives a getdata bitcoin message and
|
||||
|
@ -1250,7 +1245,7 @@ func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
|
|||
func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
|
||||
state.forAllPeers(func(sp *serverPeer) {
|
||||
// The origin peer should already have the updated height.
|
||||
if sp == umsg.originPeer {
|
||||
if sp.Peer == umsg.originPeer {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1717,7 +1712,7 @@ func (s *server) peerDoneHandler(sp *serverPeer) {
|
|||
|
||||
// Only tell block manager we are gone if we ever told it we existed.
|
||||
if sp.VersionKnown() {
|
||||
s.blockManager.DonePeer(sp)
|
||||
s.blockManager.DonePeer(sp.Peer)
|
||||
|
||||
// Evict any remaining orphans that were sent by the peer.
|
||||
numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
|
||||
|
@ -1895,7 +1890,7 @@ func (s *server) NetTotals() (uint64, uint64) {
|
|||
// the latest connected main chain block, or a recognized orphan. These height
|
||||
// updates allow us to dynamically refresh peer heights, ensuring sync peer
|
||||
// selection has access to the latest block heights for each peer.
|
||||
func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *serverPeer) {
|
||||
func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peer.Peer) {
|
||||
s.peerHeightsUpdate <- updatePeerHeightsMsg{
|
||||
newHash: latestBlkHash,
|
||||
newHeight: latestHeight,
|
||||
|
|
Loading…
Reference in a new issue