lbcd/cmd/addblock/import.go

353 lines
9.9 KiB
Go

// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"encoding/binary"
"fmt"
"io"
"sync"
"time"
"github.com/lbryio/lbcd/blockchain"
"github.com/lbryio/lbcd/blockchain/indexers"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbcd/database"
"github.com/lbryio/lbcd/wire"
"github.com/btcsuite/btcutil"
)
var zeroHash = chainhash.Hash{}
// importResults houses the stats and result as an import operation.
type importResults struct {
blocksProcessed int64
blocksImported int64
err error
}
// blockImporter houses information about an ongoing import from a block data
// file to the block database.
type blockImporter struct {
db database.DB
chain *blockchain.BlockChain
r io.ReadSeeker
processQueue chan []byte
doneChan chan bool
errChan chan error
quit chan struct{}
wg sync.WaitGroup
blocksProcessed int64
blocksImported int64
receivedLogBlocks int64
receivedLogTx int64
lastHeight int64
lastBlockTime time.Time
lastLogTime time.Time
}
// readBlock reads the next block from the input file.
func (bi *blockImporter) readBlock() ([]byte, error) {
// The block file format is:
// <network> <block length> <serialized block>
var net uint32
err := binary.Read(bi.r, binary.LittleEndian, &net)
if err != nil {
if err != io.EOF {
return nil, err
}
// No block and no error means there are no more blocks to read.
return nil, nil
}
if net != uint32(activeNetParams.Net) {
return nil, fmt.Errorf("network mismatch -- got %x, want %x",
net, uint32(activeNetParams.Net))
}
// Read the block length and ensure it is sane.
var blockLen uint32
if err := binary.Read(bi.r, binary.LittleEndian, &blockLen); err != nil {
return nil, err
}
if blockLen > wire.MaxBlockPayload {
return nil, fmt.Errorf("block payload of %d bytes is larger "+
"than the max allowed %d bytes", blockLen,
wire.MaxBlockPayload)
}
serializedBlock := make([]byte, blockLen)
if _, err := io.ReadFull(bi.r, serializedBlock); err != nil {
return nil, err
}
return serializedBlock, nil
}
// processBlock potentially imports the block into the database. It first
// deserializes the raw block while checking for errors. Already known blocks
// are skipped and orphan blocks are considered errors. Finally, it runs the
// block through the chain rules to ensure it follows all rules and matches
// up to the known checkpoint. Returns whether the block was imported along
// with any potential errors.
func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) {
// Deserialize the block which includes checks for malformed blocks.
block, err := btcutil.NewBlockFromBytes(serializedBlock)
if err != nil {
return false, err
}
// update progress statistics
bi.lastBlockTime = block.MsgBlock().Header.Timestamp
bi.receivedLogTx += int64(len(block.MsgBlock().Transactions))
// Skip blocks that already exist.
blockHash := block.Hash()
exists, err := bi.chain.HaveBlock(blockHash)
if err != nil {
return false, err
}
if exists {
return false, nil
}
// Don't bother trying to process orphans.
prevHash := &block.MsgBlock().Header.PrevBlock
if !prevHash.IsEqual(&zeroHash) {
exists, err := bi.chain.HaveBlock(prevHash)
if err != nil {
return false, err
}
if !exists {
return false, fmt.Errorf("import file contains block "+
"%v which does not link to the available "+
"block chain", prevHash)
}
}
// Ensure the blocks follows all of the chain rules and match up to the
// known checkpoints.
isMainChain, isOrphan, err := bi.chain.ProcessBlock(block,
blockchain.BFFastAdd)
if err != nil {
return false, err
}
if !isMainChain {
return false, fmt.Errorf("import file contains an block that "+
"does not extend the main chain: %v", blockHash)
}
if isOrphan {
return false, fmt.Errorf("import file contains an orphan "+
"block: %v", blockHash)
}
return true, nil
}
// readHandler is the main handler for reading blocks from the import file.
// This allows block processing to take place in parallel with block reads.
// It must be run as a goroutine.
func (bi *blockImporter) readHandler() {
out:
for {
// Read the next block from the file and if anything goes wrong
// notify the status handler with the error and bail.
serializedBlock, err := bi.readBlock()
if err != nil {
bi.errChan <- fmt.Errorf("Error reading from input "+
"file: %v", err.Error())
break out
}
// A nil block with no error means we're done.
if serializedBlock == nil {
break out
}
// Send the block or quit if we've been signalled to exit by
// the status handler due to an error elsewhere.
select {
case bi.processQueue <- serializedBlock:
case <-bi.quit:
break out
}
}
// Close the processing channel to signal no more blocks are coming.
close(bi.processQueue)
bi.wg.Done()
}
// logProgress logs block progress as an information message. In order to
// prevent spam, it limits logging to one message every cfg.Progress seconds
// with duration and totals included.
func (bi *blockImporter) logProgress() {
bi.receivedLogBlocks++
now := time.Now()
duration := now.Sub(bi.lastLogTime)
if duration < time.Second*time.Duration(cfg.Progress) {
return
}
// Truncate the duration to 10s of milliseconds.
durationMillis := int64(duration / time.Millisecond)
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
// Log information about new block height.
blockStr := "blocks"
if bi.receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if bi.receivedLogTx == 1 {
txStr = "transaction"
}
log.Infof("Processed %d %s in the last %s (%d %s, height %d, %s)",
bi.receivedLogBlocks, blockStr, tDuration, bi.receivedLogTx,
txStr, bi.lastHeight, bi.lastBlockTime)
bi.receivedLogBlocks = 0
bi.receivedLogTx = 0
bi.lastLogTime = now
}
// processHandler is the main handler for processing blocks. This allows block
// processing to take place in parallel with block reads from the import file.
// It must be run as a goroutine.
func (bi *blockImporter) processHandler() {
out:
for {
select {
case serializedBlock, ok := <-bi.processQueue:
// We're done when the channel is closed.
if !ok {
break out
}
bi.blocksProcessed++
bi.lastHeight++
imported, err := bi.processBlock(serializedBlock)
if err != nil {
bi.errChan <- err
break out
}
if imported {
bi.blocksImported++
}
bi.logProgress()
case <-bi.quit:
break out
}
}
bi.wg.Done()
}
// statusHandler waits for updates from the import operation and notifies
// the passed doneChan with the results of the import. It also causes all
// goroutines to exit if an error is reported from any of them.
func (bi *blockImporter) statusHandler(resultsChan chan *importResults) {
select {
// An error from either of the goroutines means we're done so signal
// caller with the error and signal all goroutines to quit.
case err := <-bi.errChan:
resultsChan <- &importResults{
blocksProcessed: bi.blocksProcessed,
blocksImported: bi.blocksImported,
err: err,
}
close(bi.quit)
// The import finished normally.
case <-bi.doneChan:
resultsChan <- &importResults{
blocksProcessed: bi.blocksProcessed,
blocksImported: bi.blocksImported,
err: nil,
}
}
}
// Import is the core function which handles importing the blocks from the file
// associated with the block importer to the database. It returns a channel
// on which the results will be returned when the operation has completed.
func (bi *blockImporter) Import() chan *importResults {
// Start up the read and process handling goroutines. This setup allows
// blocks to be read from disk in parallel while being processed.
bi.wg.Add(2)
go bi.readHandler()
go bi.processHandler()
// Wait for the import to finish in a separate goroutine and signal
// the status handler when done.
go func() {
bi.wg.Wait()
bi.doneChan <- true
}()
// Start the status handler and return the result channel that it will
// send the results on when the import is done.
resultChan := make(chan *importResults)
go bi.statusHandler(resultChan)
return resultChan
}
// newBlockImporter returns a new importer for the provided file reader seeker
// and database.
func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) {
// Create the transaction and address indexes if needed.
//
// CAUTION: the txindex needs to be first in the indexes array because
// the addrindex uses data from the txindex during catchup. If the
// addrindex is run first, it may not have the transactions from the
// current block indexed.
var indexes []indexers.Indexer
if cfg.TxIndex || cfg.AddrIndex {
// Enable transaction index if address index is enabled since it
// requires it.
if !cfg.TxIndex {
log.Infof("Transaction index enabled because it is " +
"required by the address index")
cfg.TxIndex = true
} else {
log.Info("Transaction index is enabled")
}
indexes = append(indexes, indexers.NewTxIndex(db))
}
if cfg.AddrIndex {
log.Info("Address index is enabled")
indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams))
}
// Create an index manager if any of the optional indexes are enabled.
var indexManager blockchain.IndexManager
if len(indexes) > 0 {
indexManager = indexers.NewManager(db, indexes)
}
chain, err := blockchain.New(&blockchain.Config{
DB: db,
ChainParams: activeNetParams,
TimeSource: blockchain.NewMedianTime(),
IndexManager: indexManager,
})
if err != nil {
return nil, err
}
return &blockImporter{
db: db,
r: r,
processQueue: make(chan []byte, 2),
doneChan: make(chan bool),
errChan: make(chan error),
quit: make(chan struct{}),
chain: chain,
lastLogTime: time.Now(),
}, nil
}