netsync: Change name of blockManager to syncManager.

This commit is contained in:
Jim Posen 2017-08-24 14:46:59 -07:00 committed by Dave Collins
parent f2fc24d0fc
commit 46fd4ec358
7 changed files with 279 additions and 325 deletions

View file

@ -20,6 +20,13 @@ import (
"github.com/btcsuite/btcd/limits"
)
const (
// blockDbNamePrefix is the prefix for the block database name. The
// database type is appended to this value to form the full block
// database name.
blockDbNamePrefix = "blocks"
)
var (
cfg *config
)

View file

@ -7,12 +7,12 @@ netsync
## Overview
This package implements a concurrency safe block syncing protocol. The provided
implementation of SyncManager communicates with connected peers to perform an
initial block download, keep the chain in sync, and announce new blocks
connected to the chain. Currently the sync manager selects a single sync peer
that it downloads all blocks from until it is up to date with the longest chain
the sync peer is aware of.
This package implements a concurrency safe block syncing protocol. The
SyncManager communicates with connected peers to perform an initial block
download, keep the chain and unconfirmed transaction pool in sync, and announce
new blocks connected to the chain. Currently the sync manager selects a single
sync peer that it downloads all blocks from until it is up to date with the
longest chain the sync peer is aware of.
## Installation and Updating

View file

@ -3,11 +3,11 @@
// license that can be found in the LICENSE file.
/*
The netsync package implements a concurrency safe block syncing protocol. The
provided implementation of SyncManager communicates with connected peers to
perform an initial block download, keep the chain and mempool in sync, and
announce new blocks connected to the chain. Currently the block manager selects
a single sync peer that it downloads all blocks from until it is up to date with
the longest chain the sync peer is aware of.
Package netsync implements a concurrency safe block syncing protocol. The
SyncManager communicates with connected peers to perform an initial block
download, keep the chain and unconfirmed transaction pool in sync, and announce
new blocks connected to the chain. Currently the sync manager selects a single
sync peer that it downloads all blocks from until it is up to date with the
longest chain the sync peer is aware of.
*/
package netsync

View file

@ -37,58 +37,3 @@ type Config struct {
DisableCheckpoints bool
MaxPeers int
}
// SyncManager is the interface used to communicate block related messages with
// peers. The SyncManager is started as by executing Start() in a goroutine.
// Once started, it selects peers to sync from and starts the initial block
// download. Once the chain is in sync, the SyncManager handles incoming block
// and header notifications and relays announcements of new blocks to peers.
type SyncManager interface {
// NewPeer informs the SyncManager of a newly active peer.
NewPeer(p *peer.Peer)
// QueueTx adds the passed transaction message and peer to the block
// handling queue.
QueueTx(tx *btcutil.Tx, p *peer.Peer, done chan struct{})
// QueueBlock adds the passed block message and peer to the block handling
// queue.
QueueBlock(block *btcutil.Block, p *peer.Peer, done chan struct{})
// QueueInv adds the passed inv message and peer to the block handling
// queue.
QueueInv(inv *wire.MsgInv, p *peer.Peer)
// QueueHeaders adds the passed headers message and peer to the block
// handling queue.
QueueHeaders(headers *wire.MsgHeaders, p *peer.Peer)
// DonePeer informs the SyncManager that a peer has disconnected.
DonePeer(p *peer.Peer)
// Start begins the core block handler which processes block and inv
// messages.
Start()
// Stop gracefully shuts down the SyncManager by stopping all asynchronous
// handlers and waiting for them to finish.
Stop() error
// SyncPeerID returns the ID of the current sync peer, or 0 if there is
// none.
SyncPeerID() int32
// ProcessBlock makes use of ProcessBlock on an internal instance of a block
// chain.
ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error)
// IsCurrent returns whether or not the SyncManager believes it is synced
// with the connected peers.
IsCurrent() bool
// Pause pauses the SyncManager until the returned channel is closed.
//
// Note that while paused, all peer and block processing is halted. The
// message sender should avoid pausing the SyncManager for long durations.
Pause() chan<- struct{}
}

File diff suppressed because it is too large Load diff

View file

@ -10,6 +10,7 @@ import (
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/mempool"
"github.com/btcsuite/btcd/netsync"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
@ -225,8 +226,8 @@ func (cm *rpcConnManager) RelayTransactions(txns []*mempool.TxDesc) {
// rpcSyncMgr provides a block manager for use with the RPC server and
// implements the rpcserverSyncManager interface.
type rpcSyncMgr struct {
server *server
blockMgr *blockManager
server *server
syncMgr *netsync.SyncManager
}
// Ensure rpcSyncMgr implements the rpcserverSyncManager interface.
@ -238,7 +239,7 @@ var _ rpcserverSyncManager = (*rpcSyncMgr)(nil)
// This function is safe for concurrent access and is part of the
// rpcserverSyncManager interface implementation.
func (b *rpcSyncMgr) IsCurrent() bool {
return b.blockMgr.IsCurrent()
return b.syncMgr.IsCurrent()
}
// SubmitBlock submits the provided block to the network after processing it
@ -247,7 +248,7 @@ func (b *rpcSyncMgr) IsCurrent() bool {
// This function is safe for concurrent access and is part of the
// rpcserverSyncManager interface implementation.
func (b *rpcSyncMgr) SubmitBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
return b.blockMgr.ProcessBlock(block, flags)
return b.syncMgr.ProcessBlock(block, flags)
}
// Pause pauses the sync manager until the returned channel is closed.
@ -255,7 +256,7 @@ func (b *rpcSyncMgr) SubmitBlock(block *btcutil.Block, flags blockchain.Behavior
// This function is safe for concurrent access and is part of the
// rpcserverSyncManager interface implementation.
func (b *rpcSyncMgr) Pause() chan<- struct{} {
return b.blockMgr.Pause()
return b.syncMgr.Pause()
}
// SyncPeerID returns the peer that is currently the peer being used to sync
@ -264,7 +265,7 @@ func (b *rpcSyncMgr) Pause() chan<- struct{} {
// This function is safe for concurrent access and is part of the
// rpcserverSyncManager interface implementation.
func (b *rpcSyncMgr) SyncPeerID() int32 {
return b.blockMgr.SyncPeerID()
return b.syncMgr.SyncPeerID()
}
// LocateBlocks returns the hashes of the blocks after the first known block in

View file

@ -32,6 +32,7 @@ import (
"github.com/btcsuite/btcd/mempool"
"github.com/btcsuite/btcd/mining"
"github.com/btcsuite/btcd/mining/cpuminer"
"github.com/btcsuite/btcd/netsync"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
@ -67,6 +68,9 @@ var (
userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch)
)
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash chainhash.Hash
// onionAddr implements the net.Addr interface and represents a tor address.
type onionAddr struct {
addr string
@ -177,7 +181,7 @@ type server struct {
sigCache *txscript.SigCache
hashCache *txscript.HashCache
rpcServer *rpcServer
blockManager *blockManager
syncManager *netsync.SyncManager
chain *blockchain.BlockChain
txMemPool *mempool.TxPool
cpuMiner *cpuminer.CPUMiner
@ -245,7 +249,7 @@ func newServerPeer(s *server, isPersistent bool) *serverPeer {
// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) {
best := sp.server.blockManager.chain.BestSnapshot()
best := sp.server.chain.BestSnapshot()
return &best.Hash, best.Height, nil
}
@ -343,8 +347,8 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// the local clock to keep the network time in sync.
sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
// Signal the block manager this peer is a new sync candidate.
sp.server.blockManager.NewPeer(sp.Peer)
// Signal the sync manager this peer is a new sync candidate.
sp.server.syncManager.NewPeer(sp.Peer)
// Choose whether or not to relay transactions before a filter command
// is received.
@ -363,7 +367,7 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// After soft-fork activation, only make outbound
// connection to peers if they flag that they're segwit
// enabled.
chain := sp.server.blockManager.chain
chain := sp.server.chain
segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
if err != nil {
peerLog.Errorf("Unable to query for segwit "+
@ -475,12 +479,12 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
sp.AddKnownInventory(iv)
// Queue the transaction up to be handled by the block manager and
// Queue the transaction up to be handled by the sync manager and
// intentionally block further receives until the transaction is fully
// processed and known good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad transactions before disconnecting (or
// being disconnected) and wasting memory.
sp.server.blockManager.QueueTx(tx, sp.Peer, sp.txProcessed)
sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
<-sp.txProcessed
}
@ -506,7 +510,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
// reference implementation processes blocks in the same
// thread and therefore blocks further messages until
// the bitcoin block has been fully processed.
sp.server.blockManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
<-sp.blockProcessed
}
@ -517,7 +521,7 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
if !cfg.BlocksOnly {
if len(msg.InvList) > 0 {
sp.server.blockManager.QueueInv(msg, sp.Peer)
sp.server.syncManager.QueueInv(msg, sp.Peer)
}
return
}
@ -543,14 +547,14 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
}
if len(newInv.InvList) > 0 {
sp.server.blockManager.QueueInv(newInv, sp.Peer)
sp.server.syncManager.QueueInv(newInv, sp.Peer)
}
}
// OnHeaders is invoked when a peer receives a headers bitcoin
// message. The message is passed down to the block manager.
// message. The message is passed down to the sync manager.
func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
sp.server.blockManager.QueueHeaders(msg, sp.Peer)
sp.server.syncManager.QueueHeaders(msg, sp.Peer)
}
// handleGetData is invoked when a peer receives a getdata bitcoin message and
@ -646,7 +650,7 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
// over with the genesis block if unknown block locators are provided.
//
// This mirrors the behavior in the reference implementation.
chain := sp.server.blockManager.chain
chain := sp.server.chain
hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop,
wire.MaxBlocksPerMsg)
@ -676,7 +680,7 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
// message.
func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
// Ignore getheaders requests if not in sync.
if !sp.server.blockManager.IsCurrent() {
if !sp.server.syncManager.IsCurrent() {
return
}
@ -690,7 +694,7 @@ func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
// over with the genesis block if unknown block locators are provided.
//
// This mirrors the behavior in the reference implementation.
chain := sp.server.blockManager.chain
chain := sp.server.chain
headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
if len(headers) == 0 {
// Nothing to send.
@ -1075,7 +1079,7 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha
// to trigger it to issue another getblocks message for the next
// batch of inventory.
if sendInv {
best := sp.server.blockManager.chain.BestSnapshot()
best := sp.server.chain.BestSnapshot()
invMsg := wire.NewMsgInvSizeHint(1)
iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
invMsg.AddInvVect(iv)
@ -1101,7 +1105,7 @@ func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
}
// Fetch the raw block bytes from the database.
blk, err := sp.server.blockManager.chain.BlockByHash(hash)
blk, err := sp.server.chain.BlockByHash(hash)
if err != nil {
peerLog.Tracef("Unable to fetch requested block hash %v: %v",
hash, err)
@ -1616,9 +1620,9 @@ func (s *server) peerDoneHandler(sp *serverPeer) {
sp.WaitForDisconnect()
s.donePeers <- sp
// Only tell block manager we are gone if we ever told it we existed.
// Only tell sync manager we are gone if we ever told it we existed.
if sp.VersionKnown() {
s.blockManager.DonePeer(sp.Peer)
s.syncManager.DonePeer(sp.Peer)
// Evict any remaining orphans that were sent by the peer.
numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
@ -1635,13 +1639,13 @@ func (s *server) peerDoneHandler(sp *serverPeer) {
// peers to and from the server, banning peers, and broadcasting messages to
// peers. It must be run in a goroutine.
func (s *server) peerHandler() {
// Start the address manager and block manager, both of which are needed
// Start the address manager and sync manager, both of which are needed
// by peers. This is done here since their lifecycle is closely tied
// to this handler and rather than adding more channels to sychronize
// things, it's easier and slightly faster to simply start and stop them
// in this handler.
s.addrManager.Start()
s.blockManager.Start()
s.syncManager.Start()
srvrLog.Tracef("Starting peer handler")
@ -1709,7 +1713,7 @@ out:
}
s.connManager.Stop()
s.blockManager.Stop()
s.syncManager.Stop()
s.addrManager.Stop()
// Drain channels before exiting so nothing is left waiting around
@ -2366,7 +2370,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
}
s.txMemPool = mempool.New(&txC)
s.blockManager, err = newBlockManager(&blockManagerConfig{
s.syncManager, err = netsync.New(&netsync.Config{
PeerNotifier: &s,
Chain: s.chain,
TxMemPool: s.txMemPool,
@ -2398,9 +2402,9 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
ChainParams: chainParams,
BlockTemplateGenerator: blockTemplateGenerator,
MiningAddrs: cfg.miningAddrs,
ProcessBlock: s.blockManager.ProcessBlock,
ProcessBlock: s.syncManager.ProcessBlock,
ConnectedCount: s.ConnectedCount,
IsCurrent: s.blockManager.IsCurrent,
IsCurrent: s.syncManager.IsCurrent,
})
// Only setup a function to return new addresses to connect to when
@ -2500,9 +2504,9 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
Listeners: rpcListeners,
StartupTime: s.startupTime,
ConnMgr: &rpcConnManager{&s},
SyncMgr: &rpcSyncMgr{&s, s.blockManager},
SyncMgr: &rpcSyncMgr{&s, s.syncManager},
TimeSource: s.timeSource,
Chain: s.blockManager.chain,
Chain: s.chain,
ChainParams: chainParams,
DB: db,
TxMemPool: s.txMemPool,