lbcd/blockmanager.go

454 lines
13 KiB
Go
Raw Normal View History

2013-08-06 23:55:22 +02:00
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"container/list"
"github.com/conformal/btcchain"
"github.com/conformal/btcdb"
_ "github.com/conformal/btcdb/sqlite3"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"os"
"path/filepath"
"sync"
"time"
)
const (
chanBufferSize = 50
)
// inventoryItem is used to track known and requested inventory items.
type inventoryItem struct {
invVect *btcwire.InvVect
peers []*peer
}
// blockMsg packages a bitcoin block message and the peer it came from together
// so the block handler has access to that information.
type blockMsg struct {
block *btcutil.Block
}
// invMsg packages a bitcoin inv message and the peer it came from together
// so the block handler has access to that information.
type invMsg struct {
msg *btcwire.MsgInv
peer *peer
}
// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
msg *btcwire.MsgTx
peer *peer
}
// blockManager provides a concurrency safe block manager for handling all
// incoming block inventory advertisement as well as issuing requests to
// download needed blocks of the block chain from other peers. It works by
// forcing all incoming block inventory advertisements through a single
// goroutine which then determines whether the block is needed and how the
// requests should be made amongst multiple peers.
type blockManager struct {
server *server
started bool
shutdown bool
blockChain *btcchain.BlockChain
requestQueue *list.List
requestMap map[string]*inventoryItem
outstandingBlocks int
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime time.Time
processingReqs bool
newBlocks chan bool
blockQueue chan *blockMsg
invQueue chan *invMsg
chainNotify chan *btcchain.Notification
wg sync.WaitGroup
quit chan bool
}
// logBlockHeight logs a new block height as an information message to show
// progress to the user. In order to prevent spam, it limits logging to one
// message every 10 seconds with duration and totals included.
func (b *blockManager) logBlockHeight(numTx, height int64) {
b.receivedLogBlocks++
b.receivedLogTx += numTx
now := time.Now()
duration := now.Sub(b.lastBlockLogTime)
if b.outstandingBlocks != 0 && duration < time.Second*10 {
return
}
// Log information about new block height.
blockStr := "blocks"
if b.receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if b.receivedLogTx == 1 {
txStr = "transaction"
}
log.Infof("[BMGR] Processed %d %s (%d %s) in the last %s - Block "+
"height %d", b.receivedLogBlocks, blockStr, b.receivedLogTx,
txStr, duration, height)
b.receivedLogBlocks = 0
b.receivedLogTx = 0
b.lastBlockLogTime = now
}
// handleInvMsg handles inventory messages for all peers. It adds blocks that
// we need along with which peers know about each block to a request queue
// based upon the advertised inventory. It also attempts to strike a balance
// between the number of in-flight blocks and keeping the request queue full
// by issuing more getblocks (MsgGetBlocks) requests as needed.
func (b *blockManager) handleInvMsg(msg *btcwire.MsgInv, p *peer) {
// Find the last block in the inventory list.
invVects := msg.InvList
var lastHash *btcwire.ShaHash
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == btcwire.InvVect_Block {
lastHash = &invVects[i].Hash
break
}
}
for _, iv := range invVects {
switch iv.Type {
case btcwire.InvVect_Block:
// Ignore this block if we already have it.
// TODO(davec): Need to check orphans too.
if b.server.db.ExistsSha(&iv.Hash) {
log.Tracef("[BMGR] Ignoring known block %v.", &iv.Hash)
continue
}
// Add the peer to the list of peers which can serve the block if
// it's already queued to be fetched.
if item, ok := b.requestMap[iv.Hash.String()]; ok {
item.peers = append(item.peers, p)
continue
}
// Add the item to the end of the request queue.
item := &inventoryItem{
invVect: iv,
peers: []*peer{p},
}
b.requestMap[item.invVect.Hash.String()] = item
b.requestQueue.PushBack(item)
b.outstandingBlocks++
case btcwire.InvVect_Tx:
// XXX: Handle transactions here.
}
}
// Request more blocks if there aren't enough in-flight blocks.
if lastHash != nil && b.outstandingBlocks < btcwire.MaxBlocksPerMsg*5 {
stopHash := btcwire.ShaHash{}
gbmsg := btcwire.NewMsgGetBlocks(&stopHash)
gbmsg.AddBlockLocatorHash(lastHash)
p.QueueMessage(gbmsg)
}
}
// handleBlockMsg handles block messages from all peers. It is currently
// very simple. It doesn't validate the block or handle orphans and side
// chains. It simply inserts the block into the database after ensuring the
// previous block is already inserted.
func (b *blockManager) handleBlockMsg(block *btcutil.Block) {
b.outstandingBlocks--
msg := block.MsgBlock()
// Process the block to include validation, best chain selection, orphan
// handling, etc.
err := b.blockChain.ProcessBlock(block)
if err != nil {
blockSha, err2 := block.Sha()
if err2 != nil {
log.Errorf("[BMGR] %v", err2)
}
log.Warnf("[BMGR] Failed to process block %v: %v", blockSha, err)
return
}
// Log info about the new block height.
_, height, err := b.server.db.NewestSha()
if err != nil {
log.Warnf("[BMGR] Failed to obtain latest sha - %v", err)
return
}
b.logBlockHeight(int64(len(msg.Transactions)), height)
// Sync the db to disk when there are no more outstanding blocks.
// NOTE: Periodic syncs happen as new data is requested as well.
if b.outstandingBlocks <= 0 {
b.server.db.Sync()
}
}
// blockHandler is the main handler for the block manager. It must be run as a
// goroutine. It processes block and inv messages in a separate goroutine from
// the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are handled
// by a single thread without needing to lock memory data structures. This is
// important because the block manager controls which blocks are needed and how
// the fetching should proceed.
//
// NOTE: Tx messages need to be handled here too.
// (either that or block and tx need to be handled in separate threads)
func (b *blockManager) blockHandler() {
out:
for !b.shutdown {
select {
// Handle new block messages.
case msg := <-b.blockQueue:
b.handleBlockMsg(msg.block)
// Handle new inventory messages.
case msg := <-b.invQueue:
b.handleInvMsg(msg.msg, msg.peer)
// Request the blocks.
if b.requestQueue.Len() > 0 && !b.processingReqs {
b.processingReqs = true
b.newBlocks <- true
}
case <-b.newBlocks:
numRequested := 0
gdmsg := btcwire.NewMsgGetData()
var p *peer
for e := b.requestQueue.Front(); e != nil; e = b.requestQueue.Front() {
item := e.Value.(*inventoryItem)
p = item.peers[0]
gdmsg.AddInvVect(item.invVect)
delete(b.requestMap, item.invVect.Hash.String())
b.requestQueue.Remove(e)
numRequested++
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
b.server.db.Sync()
if len(gdmsg.InvList) > 0 && p != nil {
p.QueueMessage(gdmsg)
}
b.processingReqs = false
case <-b.quit:
break out
}
}
b.wg.Done()
log.Trace("[BMGR] Block handler done")
}
// handleNotifyMsg handles notifications from btcchain. Currently it doesn't
// respond to any notifications, but the idea is that it requests missing blocks
// in response to orphan notifications and updates the wallet for blocks
// connected and disconnected to the main chain.
func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
switch notification.Type {
case btcchain.NTOrphanBlock:
// TODO(davec): Ask the peer to fill in the missing blocks for the
// orphan root if it's not nil.
orphanRoot := notification.Data.(*btcwire.ShaHash)
_ = orphanRoot
case btcchain.NTBlockAccepted:
// TODO(davec): Relay inventory, but don't relay old inventory
// during initial block download.
}
}
// chainNotificationHandler is the handler for asynchronous notifications from
// btcchain. It must be run as a goroutine.
func (b *blockManager) chainNotificationHandler() {
out:
for !b.shutdown {
select {
case notification := <-b.chainNotify:
b.handleNotifyMsg(notification)
case <-b.quit:
break out
}
}
b.wg.Done()
log.Trace("[BMGR] Chain notification handler done")
}
// QueueBlock adds the passed block message and peer to the block handling queue.
func (b *blockManager) QueueBlock(block *btcutil.Block) {
// Don't accept more blocks if we're shutting down.
if b.shutdown {
return
}
bmsg := blockMsg{block: block}
b.blockQueue <- &bmsg
}
// QueueInv adds the passed inventory message and peer to the inventory handling
// queue.
func (b *blockManager) QueueInv(msg *btcwire.MsgInv, p *peer) {
// Don't accept more inventory if we're shutting down.
if b.shutdown {
return
}
imsg := invMsg{msg: msg, peer: p}
b.invQueue <- &imsg
}
// Start begins the core block handler which processes block and inv messages.
func (b *blockManager) Start() {
// Already started?
if b.started {
return
}
log.Trace("[BMGR] Starting block manager")
go b.blockHandler()
go b.chainNotificationHandler()
b.wg.Add(2)
b.started = true
}
// Stop gracefully shuts down the block manager by stopping all asynchronous
// handlers and waiting for them to finish.
func (b *blockManager) Stop() error {
if b.shutdown {
log.Warnf("[BMGR] Block manager is already in the process of " +
"shutting down")
return nil
}
log.Infof("[BMGR] Block manager shutting down")
b.shutdown = true
close(b.quit)
b.wg.Wait()
return nil
}
// AddBlockLocators adds block locators to a getblocks message starting with
// the passed hash back to the genesis block hash. In order to keep the list
// of locator hashes to a reasonable number of entries, first it adds the
// most recent 10 block hashes (starting with the passed hash), then doubles the
// step each loop iteration to exponentially decrease the number of hashes the
// further away from head and closer to the genesis block it gets.
func (b *blockManager) AddBlockLocators(hash *btcwire.ShaHash, msg *btcwire.MsgGetBlocks) error {
// XXX(davec): This is fetching the block data too.
block, err := b.server.db.FetchBlockBySha(hash)
if err != nil {
log.Warnf("[BMGR] Lookup of known valid index failed %v", hash)
return err
}
blockIndex := block.Height()
// We want inventory after the passed hash.
msg.AddBlockLocatorHash(hash)
// Generate the block locators according to the algorithm described in
// in the function comment and make sure to leave room for the already
// added hash and final genesis hash.
increment := int64(1)
for i := 1; i < btcwire.MaxBlockLocatorsPerMsg-2; i++ {
if i > 10 {
increment *= 2
}
blockIndex -= increment
if blockIndex <= 1 {
break
}
h, err := b.server.db.FetchBlockShaByHeight(blockIndex)
if err != nil {
// This shouldn't happen and it's ok to ignore, so just
// continue to the next.
log.Warnf("[BMGR] Lookup of known valid index failed %v",
blockIndex)
continue
}
msg.AddBlockLocatorHash(h)
}
msg.AddBlockLocatorHash(&btcwire.GenesisHash)
return nil
}
// newBlockManager returns a new bitcoin block manager.
// Use Start to begin processing asynchronous block and inv updates.
func newBlockManager(s *server) *blockManager {
chainNotify := make(chan *btcchain.Notification, chanBufferSize)
bm := blockManager{
server: s,
blockChain: btcchain.New(s.db, s.btcnet, chainNotify),
requestQueue: list.New(),
requestMap: make(map[string]*inventoryItem),
lastBlockLogTime: time.Now(),
newBlocks: make(chan bool, 1),
blockQueue: make(chan *blockMsg, chanBufferSize),
invQueue: make(chan *invMsg, chanBufferSize),
chainNotify: chainNotify,
quit: make(chan bool),
}
bm.blockChain.DisableVerify(cfg.VerifyDisabled)
return &bm
}
// loadBlockDB opens the block database and returns a handle to it.
func loadBlockDB() (btcdb.Db, error) {
dbPath := filepath.Join(cfg.DbDir, activeNetParams.dbName)
log.Infof("[BMGR] Loading block database from '%s'", dbPath)
db, err := btcdb.OpenDB("sqlite", dbPath)
if err != nil {
// Return the error if it's not because the database doesn't
// exist.
if err != btcdb.DbDoesNotExist {
return nil, err
}
// Create the db if it does not exist.
err = os.MkdirAll(cfg.DbDir, 0700)
if err != nil {
return nil, err
}
db, err = btcdb.CreateDB("sqlite", dbPath)
if err != nil {
return nil, err
}
}
// Get the latest block height from the database.
_, height, err := db.NewestSha()
if err != nil {
db.Close()
return nil, err
}
2013-08-06 23:55:22 +02:00
// Insert the appropriate genesis block for the bitcoin network being
// connected to if needed.
if height == -1 {
2013-08-06 23:55:22 +02:00
genesis := btcutil.NewBlock(activeNetParams.genesisBlock)
_, err := db.InsertBlock(genesis)
if err != nil {
db.Close()
return nil, err
}
log.Infof("[BMGR] Inserted genesis block %v",
activeNetParams.genesisHash)
height = 0
2013-08-06 23:55:22 +02:00
}
log.Infof("[BMGR] Block database loaded with block height %d", height)
return db, nil
}