449 lines
13 KiB
Go
449 lines
13 KiB
Go
// Copyright (c) 2013 Conformal Systems LLC.
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"container/list"
|
|
"github.com/conformal/btcchain"
|
|
"github.com/conformal/btcdb"
|
|
_ "github.com/conformal/btcdb/sqlite3"
|
|
"github.com/conformal/btcutil"
|
|
"github.com/conformal/btcwire"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
chanBufferSize = 50
|
|
)
|
|
|
|
// inventoryItem is used to track known and requested inventory items.
|
|
type inventoryItem struct {
|
|
invVect *btcwire.InvVect
|
|
peers []*peer
|
|
}
|
|
|
|
// blockMsg packages a bitcoin block message and the peer it came from together
|
|
// so the block handler has access to that information.
|
|
type blockMsg struct {
|
|
block *btcutil.Block
|
|
}
|
|
|
|
// invMsg packages a bitcoin inv message and the peer it came from together
|
|
// so the block handler has access to that information.
|
|
type invMsg struct {
|
|
msg *btcwire.MsgInv
|
|
peer *peer
|
|
}
|
|
|
|
// txMsg packages a bitcoin tx message and the peer it came from together
|
|
// so the block handler has access to that information.
|
|
type txMsg struct {
|
|
msg *btcwire.MsgTx
|
|
peer *peer
|
|
}
|
|
|
|
// blockManager provides a concurrency safe block manager for handling all
|
|
// incoming block inventory advertisement as well as issuing requests to
|
|
// download needed blocks of the block chain from other peers. It works by
|
|
// forcing all incoming block inventory advertisements through a single
|
|
// goroutine which then determines whether the block is needed and how the
|
|
// requests should be made amongst multiple peers.
|
|
type blockManager struct {
|
|
server *server
|
|
started bool
|
|
shutdown bool
|
|
blockChain *btcchain.BlockChain
|
|
requestQueue *list.List
|
|
requestMap map[string]*inventoryItem
|
|
outstandingBlocks int
|
|
receivedLogBlocks int64
|
|
receivedLogTx int64
|
|
lastBlockLogTime time.Time
|
|
processingReqs bool
|
|
newBlocks chan bool
|
|
blockQueue chan *blockMsg
|
|
invQueue chan *invMsg
|
|
chainNotify chan *btcchain.Notification
|
|
wg sync.WaitGroup
|
|
quit chan bool
|
|
}
|
|
|
|
// logBlockHeight logs a new block height as an information message to show
|
|
// progress to the user. In order to prevent spam, it limits logging to one
|
|
// message every 10 seconds with duration and totals included.
|
|
func (b *blockManager) logBlockHeight(numTx, height int64) {
|
|
b.receivedLogBlocks++
|
|
b.receivedLogTx += numTx
|
|
|
|
now := time.Now()
|
|
duration := now.Sub(b.lastBlockLogTime)
|
|
if b.outstandingBlocks != 0 && duration < time.Second*10 {
|
|
return
|
|
}
|
|
|
|
// Log information about new block height.
|
|
blockStr := "blocks"
|
|
if b.receivedLogBlocks == 1 {
|
|
blockStr = "block"
|
|
}
|
|
txStr := "transactions"
|
|
if b.receivedLogTx == 1 {
|
|
txStr = "transaction"
|
|
}
|
|
log.Infof("[BMGR] Processed %d %s (%d %s) in the last %s - Block "+
|
|
"height %d", b.receivedLogBlocks, blockStr, b.receivedLogTx,
|
|
txStr, duration, height)
|
|
|
|
b.receivedLogBlocks = 0
|
|
b.receivedLogTx = 0
|
|
b.lastBlockLogTime = now
|
|
}
|
|
|
|
// handleInvMsg handles inventory messages for all peers. It adds blocks that
|
|
// we need along with which peers know about each block to a request queue
|
|
// based upon the advertised inventory. It also attempts to strike a balance
|
|
// between the number of in-flight blocks and keeping the request queue full
|
|
// by issuing more getblocks (MsgGetBlocks) requests as needed.
|
|
func (b *blockManager) handleInvMsg(msg *btcwire.MsgInv, p *peer) {
|
|
// Find the last block in the inventory list.
|
|
invVects := msg.InvList
|
|
var lastHash *btcwire.ShaHash
|
|
for i := len(invVects) - 1; i >= 0; i-- {
|
|
if invVects[i].Type == btcwire.InvVect_Block {
|
|
lastHash = &invVects[i].Hash
|
|
break
|
|
}
|
|
}
|
|
|
|
for _, iv := range invVects {
|
|
switch iv.Type {
|
|
case btcwire.InvVect_Block:
|
|
// Ignore this block if we already have it.
|
|
// TODO(davec): Need to check orphans too.
|
|
if b.server.db.ExistsSha(&iv.Hash) {
|
|
log.Tracef("[BMGR] Ignoring known block %v.", &iv.Hash)
|
|
continue
|
|
}
|
|
|
|
// Add the peer to the list of peers which can serve the block if
|
|
// it's already queued to be fetched.
|
|
if item, ok := b.requestMap[iv.Hash.String()]; ok {
|
|
item.peers = append(item.peers, p)
|
|
continue
|
|
}
|
|
|
|
// Add the item to the end of the request queue.
|
|
item := &inventoryItem{
|
|
invVect: iv,
|
|
peers: []*peer{p},
|
|
}
|
|
b.requestMap[item.invVect.Hash.String()] = item
|
|
b.requestQueue.PushBack(item)
|
|
b.outstandingBlocks++
|
|
|
|
case btcwire.InvVect_Tx:
|
|
// XXX: Handle transactions here.
|
|
}
|
|
}
|
|
|
|
// Request more blocks if there aren't enough in-flight blocks.
|
|
if lastHash != nil && b.outstandingBlocks < btcwire.MaxBlocksPerMsg*5 {
|
|
stopHash := btcwire.ShaHash{}
|
|
gbmsg := btcwire.NewMsgGetBlocks(&stopHash)
|
|
gbmsg.AddBlockLocatorHash(lastHash)
|
|
p.QueueMessage(gbmsg)
|
|
}
|
|
}
|
|
|
|
// handleBlockMsg handles block messages from all peers. It is currently
|
|
// very simple. It doesn't validate the block or handle orphans and side
|
|
// chains. It simply inserts the block into the database after ensuring the
|
|
// previous block is already inserted.
|
|
func (b *blockManager) handleBlockMsg(block *btcutil.Block) {
|
|
b.outstandingBlocks--
|
|
msg := block.MsgBlock()
|
|
|
|
// Process the block to include validation, best chain selection, orphan
|
|
// handling, etc.
|
|
err := b.blockChain.ProcessBlock(block)
|
|
if err != nil {
|
|
blockSha, err2 := block.Sha()
|
|
if err2 != nil {
|
|
log.Errorf("[BMGR] %v", err2)
|
|
}
|
|
log.Warnf("[BMGR] Failed to process block %v: %v", blockSha, err)
|
|
return
|
|
}
|
|
|
|
// Log info about the new block height.
|
|
_, height, err := b.server.db.NewestSha()
|
|
if err != nil {
|
|
log.Warnf("[BMGR] Failed to obtain latest sha - %v", err)
|
|
return
|
|
}
|
|
b.logBlockHeight(int64(len(msg.Transactions)), height)
|
|
|
|
// Sync the db to disk when there are no more outstanding blocks.
|
|
// NOTE: Periodic syncs happen as new data is requested as well.
|
|
if b.outstandingBlocks <= 0 {
|
|
b.server.db.Sync()
|
|
}
|
|
}
|
|
|
|
// blockHandler is the main handler for the block manager. It must be run as a
|
|
// goroutine. It processes block and inv messages in a separate goroutine from
|
|
// the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are handled
|
|
// by a single thread without needing to lock memory data structures. This is
|
|
// important because the block manager controls which blocks are needed and how
|
|
// the fetching should proceed.
|
|
//
|
|
// NOTE: Tx messages need to be handled here too.
|
|
// (either that or block and tx need to be handled in separate threads)
|
|
func (b *blockManager) blockHandler() {
|
|
out:
|
|
for !b.shutdown {
|
|
select {
|
|
// Handle new block messages.
|
|
case msg := <-b.blockQueue:
|
|
b.handleBlockMsg(msg.block)
|
|
|
|
// Handle new inventory messages.
|
|
case msg := <-b.invQueue:
|
|
b.handleInvMsg(msg.msg, msg.peer)
|
|
// Request the blocks.
|
|
if b.requestQueue.Len() > 0 && !b.processingReqs {
|
|
b.processingReqs = true
|
|
b.newBlocks <- true
|
|
}
|
|
|
|
case <-b.newBlocks:
|
|
numRequested := 0
|
|
gdmsg := btcwire.NewMsgGetData()
|
|
var p *peer
|
|
for e := b.requestQueue.Front(); e != nil; e = b.requestQueue.Front() {
|
|
item := e.Value.(*inventoryItem)
|
|
p = item.peers[0]
|
|
gdmsg.AddInvVect(item.invVect)
|
|
delete(b.requestMap, item.invVect.Hash.String())
|
|
b.requestQueue.Remove(e)
|
|
|
|
numRequested++
|
|
if numRequested >= btcwire.MaxInvPerMsg {
|
|
break
|
|
}
|
|
}
|
|
b.server.db.Sync()
|
|
if len(gdmsg.InvList) > 0 && p != nil {
|
|
p.QueueMessage(gdmsg)
|
|
}
|
|
b.processingReqs = false
|
|
|
|
case <-b.quit:
|
|
break out
|
|
}
|
|
}
|
|
b.wg.Done()
|
|
log.Trace("[BMGR] Block handler done")
|
|
}
|
|
|
|
// handleNotifyMsg handles notifications from btcchain. Currently it doesn't
|
|
// respond to any notifications, but the idea is that it requests missing blocks
|
|
// in response to orphan notifications and updates the wallet for blocks
|
|
// connected and disconnected to the main chain.
|
|
func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
|
|
switch notification.Type {
|
|
case btcchain.NTOrphanBlock:
|
|
// TODO(davec): Ask the peer to fill in the missing blocks for the
|
|
// orphan root if it's not nil.
|
|
orphanRoot := notification.Data.(*btcwire.ShaHash)
|
|
_ = orphanRoot
|
|
|
|
case btcchain.NTBlockAccepted:
|
|
// TODO(davec): Relay inventory, but don't relay old inventory
|
|
// during initial block download.
|
|
}
|
|
}
|
|
|
|
// chainNotificationHandler is the handler for asynchronous notifications from
|
|
// btcchain. It must be run as a goroutine.
|
|
func (b *blockManager) chainNotificationHandler() {
|
|
out:
|
|
for !b.shutdown {
|
|
select {
|
|
case notification := <-b.chainNotify:
|
|
b.handleNotifyMsg(notification)
|
|
|
|
case <-b.quit:
|
|
break out
|
|
}
|
|
}
|
|
b.wg.Done()
|
|
log.Trace("[BMGR] Chain notification handler done")
|
|
}
|
|
|
|
// QueueBlock adds the passed block message and peer to the block handling queue.
|
|
func (b *blockManager) QueueBlock(block *btcutil.Block) {
|
|
// Don't accept more blocks if we're shutting down.
|
|
if b.shutdown {
|
|
return
|
|
}
|
|
|
|
bmsg := blockMsg{block: block}
|
|
b.blockQueue <- &bmsg
|
|
}
|
|
|
|
// QueueInv adds the passed inventory message and peer to the inventory handling
|
|
// queue.
|
|
func (b *blockManager) QueueInv(msg *btcwire.MsgInv, p *peer) {
|
|
// Don't accept more inventory if we're shutting down.
|
|
if b.shutdown {
|
|
return
|
|
}
|
|
|
|
imsg := invMsg{msg: msg, peer: p}
|
|
b.invQueue <- &imsg
|
|
}
|
|
|
|
// Start begins the core block handler which processes block and inv messages.
|
|
func (b *blockManager) Start() {
|
|
// Already started?
|
|
if b.started {
|
|
return
|
|
}
|
|
|
|
log.Trace("[BMGR] Starting block manager")
|
|
go b.blockHandler()
|
|
go b.chainNotificationHandler()
|
|
b.wg.Add(2)
|
|
b.started = true
|
|
}
|
|
|
|
// Stop gracefully shuts down the block manager by stopping all asynchronous
|
|
// handlers and waiting for them to finish.
|
|
func (b *blockManager) Stop() error {
|
|
if b.shutdown {
|
|
log.Warnf("[BMGR] Block manager is already in the process of " +
|
|
"shutting down")
|
|
return nil
|
|
}
|
|
|
|
log.Infof("[BMGR] Block manager shutting down")
|
|
b.shutdown = true
|
|
close(b.quit)
|
|
b.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// AddBlockLocators adds block locators to a getblocks message starting with
|
|
// the passed hash back to the genesis block hash. In order to keep the list
|
|
// of locator hashes to a reasonable number of entries, first it adds the
|
|
// most recent 10 block hashes (starting with the passed hash), then doubles the
|
|
// step each loop iteration to exponentially decrease the number of hashes the
|
|
// further away from head and closer to the genesis block it gets.
|
|
func (b *blockManager) AddBlockLocators(hash *btcwire.ShaHash, msg *btcwire.MsgGetBlocks) error {
|
|
// XXX(davec): This is fetching the block data too.
|
|
block, err := b.server.db.FetchBlockBySha(hash)
|
|
if err != nil {
|
|
log.Warnf("[BMGR] Lookup of known valid index failed %v", hash)
|
|
return err
|
|
}
|
|
blockIndex := block.Height()
|
|
|
|
// We want inventory after the passed hash.
|
|
msg.AddBlockLocatorHash(hash)
|
|
|
|
// Generate the block locators according to the algorithm described in
|
|
// in the function comment and make sure to leave room for the already
|
|
// added hash and final genesis hash.
|
|
increment := int64(1)
|
|
for i := 1; i < btcwire.MaxBlockLocatorsPerMsg-2; i++ {
|
|
if i > 10 {
|
|
increment *= 2
|
|
}
|
|
blockIndex -= increment
|
|
if blockIndex <= 1 {
|
|
break
|
|
}
|
|
|
|
h, err := b.server.db.FetchBlockShaByHeight(blockIndex)
|
|
if err != nil {
|
|
// This shouldn't happen and it's ok to ignore, so just
|
|
// continue to the next.
|
|
log.Warnf("[BMGR] Lookup of known valid index failed %v",
|
|
blockIndex)
|
|
continue
|
|
}
|
|
msg.AddBlockLocatorHash(h)
|
|
}
|
|
msg.AddBlockLocatorHash(&btcwire.GenesisHash)
|
|
return nil
|
|
}
|
|
|
|
// newBlockManager returns a new bitcoin block manager.
|
|
// Use Start to begin processing asynchronous block and inv updates.
|
|
func newBlockManager(s *server) *blockManager {
|
|
chainNotify := make(chan *btcchain.Notification, chanBufferSize)
|
|
bm := blockManager{
|
|
server: s,
|
|
blockChain: btcchain.New(s.db, s.btcnet, chainNotify),
|
|
requestQueue: list.New(),
|
|
requestMap: make(map[string]*inventoryItem),
|
|
lastBlockLogTime: time.Now(),
|
|
newBlocks: make(chan bool, 1),
|
|
blockQueue: make(chan *blockMsg, chanBufferSize),
|
|
invQueue: make(chan *invMsg, chanBufferSize),
|
|
chainNotify: chainNotify,
|
|
quit: make(chan bool),
|
|
}
|
|
bm.blockChain.DisableVerify(cfg.VerifyDisabled)
|
|
return &bm
|
|
}
|
|
|
|
// loadBlockDB opens the block database and returns a handle to it.
|
|
func loadBlockDB() (btcdb.Db, error) {
|
|
dbPath := filepath.Join(cfg.DbDir, activeNetParams.dbName)
|
|
log.Infof("[BMGR] Loading block database from '%s'", dbPath)
|
|
db, err := btcdb.OpenDB("sqlite", dbPath)
|
|
if err != nil {
|
|
// Return the error if it's not because the database doesn't
|
|
// exist.
|
|
if err != btcdb.DbDoesNotExist {
|
|
return nil, err
|
|
}
|
|
|
|
// Create the db if it does not exist.
|
|
err = os.MkdirAll(cfg.DbDir, 0700)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
db, err = btcdb.CreateDB("sqlite", dbPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Insert the appropriate genesis block for the bitcoin network
|
|
// being connected to.
|
|
genesis := btcutil.NewBlock(activeNetParams.genesisBlock)
|
|
_, err := db.InsertBlock(genesis)
|
|
if err != nil {
|
|
db.Close()
|
|
return nil, err
|
|
}
|
|
log.Infof("[BMGR] Inserted genesis block %v",
|
|
activeNetParams.genesisHash)
|
|
}
|
|
|
|
// Get the latest block height from the database.
|
|
_, height, err := db.NewestSha()
|
|
if err != nil {
|
|
db.Close()
|
|
return nil, err
|
|
}
|
|
log.Infof("[BMGR] Block database loaded with block height %d", height)
|
|
return db, nil
|
|
}
|