Modify syncing code to support multiple peers.
Previously the code was only designed to work with a single peer. This commit modifies the syncing code to deal with multiple peers.
This commit is contained in:
parent
a69ba92006
commit
cbfee93b74
2 changed files with 130 additions and 20 deletions
124
blockmanager.go
124
blockmanager.go
|
@ -5,6 +5,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/list"
|
||||||
"github.com/conformal/btcchain"
|
"github.com/conformal/btcchain"
|
||||||
"github.com/conformal/btcdb"
|
"github.com/conformal/btcdb"
|
||||||
_ "github.com/conformal/btcdb/sqlite3"
|
_ "github.com/conformal/btcdb/sqlite3"
|
||||||
|
@ -46,13 +47,131 @@ type blockManager struct {
|
||||||
receivedLogTx int64
|
receivedLogTx int64
|
||||||
lastBlockLogTime time.Time
|
lastBlockLogTime time.Time
|
||||||
processingReqs bool
|
processingReqs bool
|
||||||
|
syncPeer *peer
|
||||||
newBlocks chan bool
|
newBlocks chan bool
|
||||||
|
newCandidates chan *peer
|
||||||
|
donePeers chan *peer
|
||||||
blockQueue chan *blockMsg
|
blockQueue chan *blockMsg
|
||||||
chainNotify chan *btcchain.Notification
|
chainNotify chan *btcchain.Notification
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
quit chan bool
|
quit chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startSync will choose the best peer among the available candidate peers to
|
||||||
|
// download/sync the blockchain from. When syncing is already running, it
|
||||||
|
// simply returns. It also examines the candidates for any which are no longer
|
||||||
|
// candidates and removes them as needed.
|
||||||
|
func (b *blockManager) startSync(peers *list.List) {
|
||||||
|
// Return now if we're already syncing.
|
||||||
|
if b.syncPeer != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the height of the current known best block.
|
||||||
|
_, height, err := b.server.db.NewestSha()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("[BMGR] %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var bestPeer *peer
|
||||||
|
for e := peers.Front(); e != nil; e = e.Next() {
|
||||||
|
p := e.Value.(*peer)
|
||||||
|
|
||||||
|
// Remove sync candidate peers that are no longer candidates due
|
||||||
|
// to passing their latest known block.
|
||||||
|
if p.lastBlock <= int32(height) {
|
||||||
|
peers.Remove(e)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(davec): Use a better algorithm to choose the best peer.
|
||||||
|
// For now, just pick the first available candidate.
|
||||||
|
bestPeer = p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start syncing from the best peer if one was selected.
|
||||||
|
if bestPeer != nil {
|
||||||
|
locator, err := b.blockChain.LatestBlockLocator()
|
||||||
|
if err != nil {
|
||||||
|
log.Error("[BMGR] Failed to get block locator for the "+
|
||||||
|
"latest block: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("[BMGR] Syncing to block height %d from peer %v",
|
||||||
|
bestPeer.lastBlock, bestPeer.conn.RemoteAddr())
|
||||||
|
bestPeer.pushGetBlocksMsg(locator, &zeroHash)
|
||||||
|
b.syncPeer = bestPeer
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleNewCandidateMsg deals with new peers that have signalled they may
|
||||||
|
// be considered as a sync peer (they have already successfully negotiated). It
|
||||||
|
// also start syncing if needed. It is invoked from the syncHandler goroutine.
|
||||||
|
func (b *blockManager) handleNewCandidateMsg(peers *list.List, p *peer) {
|
||||||
|
// Ignore if in the process of shutting down.
|
||||||
|
if b.shutdown {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// The peer is not a candidate for sync if it's not a full node.
|
||||||
|
if p.services&btcwire.SFNodeNetwork != btcwire.SFNodeNetwork {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the peer as a candidate to sync from.
|
||||||
|
peers.PushBack(p)
|
||||||
|
|
||||||
|
// Start syncing by choosing the best candidate if needed.
|
||||||
|
b.startSync(peers)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDonePeerMsg deals with peers that have signalled they are done. It
|
||||||
|
// removes the peer as a candidate for syncing and in the case where it was
|
||||||
|
// the current sync peer, attempts to select a new best peer to sync from. It
|
||||||
|
// is invoked from the syncHandler goroutine.
|
||||||
|
func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) {
|
||||||
|
// Remove the peer from the list of candidate peers.
|
||||||
|
for e := peers.Front(); e != nil; e = e.Next() {
|
||||||
|
if e.Value == p {
|
||||||
|
peers.Remove(e)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to find a new peer to sync from if the quitting peer is the
|
||||||
|
// sync peer.
|
||||||
|
if b.syncPeer != nil && b.syncPeer == p {
|
||||||
|
b.syncPeer = nil
|
||||||
|
b.startSync(peers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncHandler deals with handling downloading (syncing) the block chain from
|
||||||
|
// other peers as they connect and disconnect. It must be run as a goroutine.
|
||||||
|
func (b *blockManager) syncHandler() {
|
||||||
|
log.Tracef("[BMGR] Starting sync handler")
|
||||||
|
candidatePeers := list.New()
|
||||||
|
out:
|
||||||
|
// Live while we're not shutting down.
|
||||||
|
for !b.shutdown {
|
||||||
|
select {
|
||||||
|
case peer := <-b.newCandidates:
|
||||||
|
b.handleNewCandidateMsg(candidatePeers, peer)
|
||||||
|
|
||||||
|
case peer := <-b.donePeers:
|
||||||
|
b.handleDonePeerMsg(candidatePeers, peer)
|
||||||
|
|
||||||
|
case <-b.quit:
|
||||||
|
break out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.wg.Done()
|
||||||
|
log.Trace("[BMGR] Sync handler done")
|
||||||
|
}
|
||||||
|
|
||||||
// logBlockHeight logs a new block height as an information message to show
|
// logBlockHeight logs a new block height as an information message to show
|
||||||
// progress to the user. In order to prevent spam, it limits logging to one
|
// progress to the user. In order to prevent spam, it limits logging to one
|
||||||
// message every 10 seconds with duration and totals included.
|
// message every 10 seconds with duration and totals included.
|
||||||
|
@ -207,9 +326,10 @@ func (b *blockManager) Start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Trace("[BMGR] Starting block manager")
|
log.Trace("[BMGR] Starting block manager")
|
||||||
|
go b.syncHandler()
|
||||||
go b.blockHandler()
|
go b.blockHandler()
|
||||||
go b.chainNotificationHandler()
|
go b.chainNotificationHandler()
|
||||||
b.wg.Add(2)
|
b.wg.Add(3)
|
||||||
b.started = true
|
b.started = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,6 +359,8 @@ func newBlockManager(s *server) *blockManager {
|
||||||
blockPeer: make(map[btcwire.ShaHash]*peer),
|
blockPeer: make(map[btcwire.ShaHash]*peer),
|
||||||
lastBlockLogTime: time.Now(),
|
lastBlockLogTime: time.Now(),
|
||||||
newBlocks: make(chan bool, 1),
|
newBlocks: make(chan bool, 1),
|
||||||
|
newCandidates: make(chan *peer, cfg.MaxPeers),
|
||||||
|
donePeers: make(chan *peer, cfg.MaxPeers),
|
||||||
blockQueue: make(chan *blockMsg, chanBufferSize),
|
blockQueue: make(chan *blockMsg, chanBufferSize),
|
||||||
chainNotify: chainNotify,
|
chainNotify: chainNotify,
|
||||||
quit: make(chan bool),
|
quit: make(chan bool),
|
||||||
|
|
26
peer.go
26
peer.go
|
@ -178,8 +178,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
|
||||||
p.protocolVersion, p.conn.RemoteAddr())
|
p.protocolVersion, p.conn.RemoteAddr())
|
||||||
p.lastBlock = msg.LastBlock
|
p.lastBlock = msg.LastBlock
|
||||||
|
|
||||||
// Set the supported services for the peer to what the remote
|
// Set the supported services for the peer to what the remote peer
|
||||||
// peer advertised.
|
// advertised.
|
||||||
p.services = msg.Services
|
p.services = msg.Services
|
||||||
|
|
||||||
// Inbound connections.
|
// Inbound connections.
|
||||||
|
@ -232,22 +232,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request latest blocks if the peer has blocks we're interested in.
|
// Signal the block manager this peer is a new sync candidate.
|
||||||
_, lastBlock, err := p.server.db.NewestSha()
|
p.server.blockManager.newCandidates <- p
|
||||||
if err != nil {
|
|
||||||
log.Errorf("[PEER] %v", err)
|
|
||||||
p.Disconnect()
|
|
||||||
}
|
|
||||||
// If the peer has blocks we're interested in.
|
|
||||||
if p.lastBlock > int32(lastBlock) {
|
|
||||||
locator, err := p.server.blockManager.blockChain.LatestBlockLocator()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("[PEER] Failed to get block locator for the "+
|
|
||||||
"latest block: %v", err)
|
|
||||||
p.Disconnect()
|
|
||||||
}
|
|
||||||
p.pushGetBlocksMsg(locator, &zeroHash)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Relay alerts.
|
// TODO: Relay alerts.
|
||||||
}
|
}
|
||||||
|
@ -808,9 +794,11 @@ out:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure connection is closed and notify server that the peer is done.
|
// Ensure connection is closed and notify server and block manager that
|
||||||
|
// the peer is done.
|
||||||
p.Disconnect()
|
p.Disconnect()
|
||||||
p.server.donePeers <- p
|
p.server.donePeers <- p
|
||||||
|
p.server.blockManager.donePeers <- p
|
||||||
p.quit <- true
|
p.quit <- true
|
||||||
|
|
||||||
p.wg.Done()
|
p.wg.Done()
|
||||||
|
|
Loading…
Reference in a new issue