Fix reorgs, improve string formatting/errors/logging.

This commit is contained in:
Alex 2017-03-27 22:45:02 -06:00 committed by Olaoluwa Osuntokun
parent f2aceaf363
commit 0a8de495cc
4 changed files with 237 additions and 145 deletions

View file

@ -2,6 +2,7 @@ package spvchain
import ( import (
"container/list" "container/list"
"fmt"
"math/big" "math/big"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -127,6 +128,7 @@ type blockManager struct {
quit chan struct{} quit chan struct{}
headerList *list.List headerList *list.List
reorgList *list.List
startHeader *list.Element startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint nextCheckpoint *chaincfg.Checkpoint
@ -148,6 +150,7 @@ func newBlockManager(s *ChainService) (*blockManager, error) {
progressLogger: newBlockProgressLogger("Processed", log), progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, MaxPeers*3), msgChan: make(chan interface{}, MaxPeers*3),
headerList: list.New(), headerList: list.New(),
reorgList: list.New(),
quit: make(chan struct{}), quit: make(chan struct{}),
blocksPerRetarget: int32(targetTimespan / targetTimePerBlock), blocksPerRetarget: int32(targetTimespan / targetTimePerBlock),
minRetargetTimespan: targetTimespan / adjustmentFactor, minRetargetTimespan: targetTimespan / adjustmentFactor,
@ -258,8 +261,7 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) {
} }
// Attempt to find a new peer to sync from if the quitting peer is the // Attempt to find a new peer to sync from if the quitting peer is the
// sync peer. Also, reset the headers-first state if in headers-first // sync peer. Also, reset the header state.
// mode so
if b.syncPeer != nil && b.syncPeer == sp { if b.syncPeer != nil && b.syncPeer == sp {
b.syncPeer = nil b.syncPeer = nil
header, height, err := b.server.LatestBlock() header, height, err := b.server.LatestBlock()
@ -423,7 +425,7 @@ func (b *blockManager) startSync(peers *list.List) {
best, err := b.server.BestSnapshot() best, err := b.server.BestSnapshot()
if err != nil { if err != nil {
log.Errorf("Failed to get hash and height for the "+ log.Errorf("Failed to get hash and height for the "+
"latest block: %v", err) "latest block: %s", err)
return return
} }
var bestPeer *serverPeer var bestPeer *serverPeer
@ -460,11 +462,11 @@ func (b *blockManager) startSync(peers *list.List) {
locator, err := b.server.LatestBlockLocator() locator, err := b.server.LatestBlockLocator()
if err != nil { if err != nil {
log.Errorf("Failed to get block locator for the "+ log.Errorf("Failed to get block locator for the "+
"latest block: %v", err) "latest block: %s", err)
return return
} }
log.Infof("Syncing to block height %d from peer %v", log.Infof("Syncing to block height %d from peer %s",
bestPeer.LastBlock(), bestPeer.Addr()) bestPeer.LastBlock(), bestPeer.Addr())
// When the current height is less than a known checkpoint we // When the current height is less than a known checkpoint we
@ -599,9 +601,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
} }
} }
// If this is the sync peer and we're not current, get the headers // If this is the sync peer or we're current, get the headers
// for the announced blocks and update the last announced block. // for the announced blocks and update the last announced block.
if lastBlock != -1 && imsg.peer == b.syncPeer /*&& !b.current()*/ { if lastBlock != -1 && (imsg.peer == b.syncPeer || b.current()) {
// Make a locator starting from the latest known header we've // Make a locator starting from the latest known header we've
// processed. // processed.
locator := make(blockchain.BlockLocator, 0, locator := make(blockchain.BlockLocator, 0,
@ -614,7 +616,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
locator = append(locator, knownLocator...) locator = append(locator, knownLocator...)
} }
// Get headers based on locator. // Get headers based on locator.
b.syncPeer.PushGetHeadersMsg(locator, &invVects[lastBlock].Hash) imsg.peer.PushGetHeadersMsg(locator, &invVects[lastBlock].Hash)
} }
} }
@ -650,7 +652,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
receivedCheckpoint := false receivedCheckpoint := false
var finalHash *chainhash.Hash var finalHash *chainhash.Hash
var finalHeight int32 var finalHeight int32
for _, blockHeader := range msg.Headers { for i, blockHeader := range msg.Headers {
blockHash := blockHeader.BlockHash() blockHash := blockHeader.BlockHash()
finalHash = &blockHash finalHash = &blockHash
@ -671,30 +673,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
prevNode := prevNodeEl.Value.(*headerNode) prevNode := prevNodeEl.Value.(*headerNode)
prevHash := prevNode.header.BlockHash() prevHash := prevNode.header.BlockHash()
if prevHash.IsEqual(&blockHeader.PrevBlock) { if prevHash.IsEqual(&blockHeader.PrevBlock) {
diff, err := b.calcNextRequiredDifficulty( err := b.checkHeaderSanity(blockHeader, maxTimestamp,
blockHeader.Timestamp) false)
if err != nil { if err != nil {
log.Warnf("Unable to calculate next difficulty"+ log.Warnf("Header doesn't pass sanity check: "+
": %v -- disconnecting peer", err) "%s -- disconnecting peer", err)
hmsg.peer.Disconnect()
return
}
stubBlock := btcutil.NewBlock(&wire.MsgBlock{
Header: *blockHeader,
})
err = blockchain.CheckProofOfWork(stubBlock,
blockchain.CompactToBig(diff))
if err != nil {
log.Warnf("Received header doesn't match "+
"required difficulty: %v -- "+
"disconnecting peer", err)
hmsg.peer.Disconnect()
return
}
// Ensure the block time is not too far in the future.
if blockHeader.Timestamp.After(maxTimestamp) {
log.Warnf("block timestamp of %v is too far in"+
" the future", blockHeader.Timestamp)
hmsg.peer.Disconnect() hmsg.peer.Disconnect()
return return
} }
@ -704,13 +687,16 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
uint32(node.height)) uint32(node.height))
if err != nil { if err != nil {
log.Criticalf("Couldn't write block to "+ log.Criticalf("Couldn't write block to "+
"database: %v", err) "database: %s", err)
// Should we panic here?
} }
err = b.server.putMaxBlockHeight(uint32(node.height)) err = b.server.putMaxBlockHeight(uint32(node.height))
if err != nil { if err != nil {
log.Criticalf("Couldn't write max block height"+ log.Criticalf("Couldn't write max block height"+
" to database: %v", err) " to database: %s", err)
// Should we panic here?
} }
hmsg.peer.UpdateLastBlockHeight(node.height)
e := b.headerList.PushBack(&node) e := b.headerList.PushBack(&node)
if b.startHeader == nil { if b.startHeader == nil {
b.startHeader = e b.startHeader = e
@ -723,30 +709,33 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
// If we got these headers from a peer that's not our // If we got these headers from a peer that's not our
// sync peer, they might not be aligned correctly or // sync peer, they might not be aligned correctly or
// even on the right chain. Just ignore the rest of the // even on the right chain. Just ignore the rest of the
// message. // message. However, if we're current, this might be a
if hmsg.peer != b.syncPeer { // reorg, in which case we'll either change our sync
// peer or disconnect the peer that sent us these
// bad headers.
if hmsg.peer != b.syncPeer && !b.current() {
return return
} }
// Check if this block is known. If so, we continue to // Check if this block is known. If so, we continue to
// the next one. // the next one.
_, _, err := b.server.GetBlockByHash( _, _, err := b.server.GetBlockByHash(blockHash)
blockHeader.BlockHash())
if err == nil { if err == nil {
continue continue
} }
// Check if the previous block is known. If it is, this // Check if the previous block is known. If it is, this
// is probably a reorg based on the estimated latest // is probably a reorg based on the estimated latest
// block that matches between us and the sync peer as // block that matches between us and the peer as
// derived from the block locator we sent to request // derived from the block locator we sent to request
// these headers. Otherwise, the headers don't connect // these headers. Otherwise, the headers don't connect
// to anything we know and we should disconnect the // to anything we know and we should disconnect the
// peer. // peer.
_, backHeight, err := b.server.GetBlockByHash( backHead, backHeight, err := b.server.GetBlockByHash(
blockHeader.PrevBlock) blockHeader.PrevBlock)
if err != nil { if err != nil {
log.Errorf("Couldn't get block by hash from "+ log.Warnf("Received block header that does not"+
"the database (%v) -- disconnecting "+ " properly connect to the chain from"+
"peer %s", err, hmsg.peer.Addr()) " peer %s (%s) -- disconnecting",
hmsg.peer.Addr(), err)
hmsg.peer.Disconnect() hmsg.peer.Disconnect()
return return
} }
@ -757,20 +746,91 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
prevCheckpoint := b.findPreviousHeaderCheckpoint( prevCheckpoint := b.findPreviousHeaderCheckpoint(
prevNode.height) prevNode.height)
if backHeight < uint32(prevCheckpoint.Height) { if backHeight < uint32(prevCheckpoint.Height) {
log.Errorf("Attempt at a reorg earlier (%v) than a "+ log.Errorf("Attempt at a reorg earlier than a "+
"checkpoint (%v) past which we've already "+ "checkpoint past which we've already "+
"synchronized -- disconnecting peer "+ "synchronized -- disconnecting peer "+
"%s", backHeight, prevCheckpoint.Height, hmsg.peer.Addr()) "%s", hmsg.peer.Addr())
hmsg.peer.Disconnect() hmsg.peer.Disconnect()
return return
} }
// TODO: Add real reorg handling here // Check the sanity of the new branch. If any of the
log.Warnf("Received block header that does not "+ // blocks don't pass sanity checks, disconnect the peer.
"properly connect to the chain from peer %s "+ // We also keep track of the work represented by these
"-- disconnecting", hmsg.peer.Addr()) // headers so we can compare it to the work in the known
// good chain.
b.reorgList.Init()
b.reorgList.PushBack(&headerNode{
header: &backHead,
height: int32(backHeight),
})
totalWork := big.NewInt(0)
for _, reorgHeader := range msg.Headers[i:] {
err = b.checkHeaderSanity(reorgHeader,
maxTimestamp, true)
if err != nil {
log.Warnf("Header doesn't pass sanity"+
" check: %s -- disconnecting "+
"peer", err)
hmsg.peer.Disconnect() hmsg.peer.Disconnect()
return return
} }
totalWork.Add(totalWork,
blockchain.CalcWork(reorgHeader.Bits))
}
log.Tracef("Sane reorg attempted. Total work from "+
"reorg chain: %v", totalWork)
// All the headers pass sanity checks. Now we calculate
// the total work for the known chain.
knownWork := big.NewInt(0)
// This should NEVER be nil because the most recent
// block is always pushed back by resetHeaderState
knownEl := b.headerList.Back()
var knownHead wire.BlockHeader
for j := uint32(prevNode.height); j > backHeight; j-- {
if knownEl != nil {
knownHead = *knownEl.Value.(*headerNode).header
knownEl = knownEl.Prev()
} else {
knownHead, _, err = b.server.GetBlockByHash(
knownHead.PrevBlock)
if err != nil {
log.Criticalf("Can't get block"+
"header for hash %s: "+
"%v",
knownHead.PrevBlock,
err)
// Should we panic here?
}
}
knownWork.Add(knownWork,
blockchain.CalcWork(knownHead.Bits))
}
log.Tracef("Total work from known chain: %v", knownWork)
// Compare the two work totals and reject the new chain
// if it doesn't have more work than the previously
// known chain.
if knownWork.Cmp(totalWork) >= 0 {
log.Warnf("Reorg attempt that does not have "+
"more work than known chain from peer "+
"%s -- disconnecting", hmsg.peer.Addr())
hmsg.peer.Disconnect()
return
}
// At this point, we have a valid reorg, so we roll
// back the existing chain and add the new block header.
// We also change the sync peer. Then we can continue
// with the rest of the headers in the message as if
// nothing has happened.
b.syncPeer = hmsg.peer
b.server.rollbackToHeight(backHeight)
b.server.putBlock(*blockHeader, backHeight+1)
b.server.putMaxBlockHeight(backHeight + 1)
b.resetHeaderState(&backHead, int32(backHeight))
b.headerList.PushBack(&headerNode{
header: blockHeader,
height: int32(backHeight + 1),
})
}
// Verify the header at the next checkpoint height matches. // Verify the header at the next checkpoint height matches.
if b.nextCheckpoint != nil && node.height == b.nextCheckpoint.Height { if b.nextCheckpoint != nil && node.height == b.nextCheckpoint.Height {
@ -792,8 +852,13 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
"checkpoint at height %d/hash %s", "checkpoint at height %d/hash %s",
prevCheckpoint.Height, prevCheckpoint.Height,
prevCheckpoint.Hash) prevCheckpoint.Hash)
b.server.rollbackToHeight(uint32( _, err := b.server.rollbackToHeight(uint32(
prevCheckpoint.Height)) prevCheckpoint.Height))
if err != nil {
log.Criticalf("Rollback failed: %s",
err)
// Should we panic here?
}
hmsg.peer.Disconnect() hmsg.peer.Disconnect()
return return
} }
@ -829,16 +894,46 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash) err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash)
if err != nil { if err != nil {
log.Warnf("Failed to send getheaders message to "+ log.Warnf("Failed to send getheaders message to "+
"peer %s: %v", hmsg.peer.Addr(), err) "peer %s: %s", hmsg.peer.Addr(), err)
return return
} }
} }
// checkHeaderSanity checks the PoW, and timestamp of a block header.
func (b *blockManager) checkHeaderSanity(blockHeader *wire.BlockHeader,
maxTimestamp time.Time, reorgAttempt bool) error {
diff, err := b.calcNextRequiredDifficulty(
blockHeader.Timestamp, reorgAttempt)
if err != nil {
return err
}
stubBlock := btcutil.NewBlock(&wire.MsgBlock{
Header: *blockHeader,
})
err = blockchain.CheckProofOfWork(stubBlock,
blockchain.CompactToBig(diff))
if err != nil {
return err
}
// Ensure the block time is not too far in the future.
if blockHeader.Timestamp.After(maxTimestamp) {
return fmt.Errorf("block timestamp of %v is too far in the "+
"future", blockHeader.Timestamp)
}
return nil
}
// calcNextRequiredDifficulty calculates the required difficulty for the block // calcNextRequiredDifficulty calculates the required difficulty for the block
// after the passed previous block node based on the difficulty retarget rules. // after the passed previous block node based on the difficulty retarget rules.
func (b *blockManager) calcNextRequiredDifficulty(newBlockTime time.Time) (uint32, error) { func (b *blockManager) calcNextRequiredDifficulty(newBlockTime time.Time,
reorgAttempt bool) (uint32, error) {
lastNodeEl := b.headerList.Back() hList := b.headerList
if reorgAttempt {
hList = b.reorgList
}
lastNodeEl := hList.Back()
// Genesis block. // Genesis block.
if lastNodeEl == nil { if lastNodeEl == nil {
@ -868,7 +963,7 @@ func (b *blockManager) calcNextRequiredDifficulty(newBlockTime time.Time) (uint3
// The block was mined within the desired timeframe, so // The block was mined within the desired timeframe, so
// return the difficulty for the last block which did // return the difficulty for the last block which did
// not have the special minimum difficulty rule applied. // not have the special minimum difficulty rule applied.
prevBits, err := b.findPrevTestNetDifficulty() prevBits, err := b.findPrevTestNetDifficulty(hList)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -934,8 +1029,8 @@ func (b *blockManager) calcNextRequiredDifficulty(newBlockTime time.Time) (uint3
// findPrevTestNetDifficulty returns the difficulty of the previous block which // findPrevTestNetDifficulty returns the difficulty of the previous block which
// did not have the special testnet minimum difficulty rule applied. // did not have the special testnet minimum difficulty rule applied.
func (b *blockManager) findPrevTestNetDifficulty() (uint32, error) { func (b *blockManager) findPrevTestNetDifficulty(hList *list.List) (uint32, error) {
startNodeEl := b.headerList.Back() startNodeEl := hList.Back()
// Genesis block. // Genesis block.
if startNodeEl == nil { if startNodeEl == nil {
@ -964,7 +1059,7 @@ func (b *blockManager) findPrevTestNetDifficulty() (uint32, error) {
} else { } else {
node, _, err := b.server.GetBlockByHeight(uint32(iterHeight)) node, _, err := b.server.GetBlockByHeight(uint32(iterHeight))
if err != nil { if err != nil {
log.Errorf("GetBlockByHeight: %v", err) log.Errorf("GetBlockByHeight: %s", err)
return 0, err return 0, err
} }
iterNode = &node iterNode = &node

View file

@ -3,6 +3,7 @@ package spvchain
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"time" "time"
"github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/blockchain"
@ -62,8 +63,8 @@ func fetchDBVersion(tx walletdb.Tx) (uint32, error) {
bucket := tx.RootBucket().Bucket(spvBucketName) bucket := tx.RootBucket().Bucket(spvBucketName)
verBytes := bucket.Get(dbVersionName) verBytes := bucket.Get(dbVersionName)
if verBytes == nil { if verBytes == nil {
str := "required version number not stored in database" return 0, fmt.Errorf("required version number not stored in " +
return 0, log.Error(str) "database")
} }
version := binary.LittleEndian.Uint32(verBytes) version := binary.LittleEndian.Uint32(verBytes)
return version, nil return version, nil
@ -74,12 +75,7 @@ func putDBVersion(tx walletdb.Tx, version uint32) error {
bucket := tx.RootBucket().Bucket(spvBucketName) bucket := tx.RootBucket().Bucket(spvBucketName)
verBytes := uint32ToBytes(version) verBytes := uint32ToBytes(version)
err := bucket.Put(dbVersionName, verBytes) return bucket.Put(dbVersionName, verBytes)
if err != nil {
str := "failed to store version: %v"
return log.Errorf(str, err)
}
return nil
} }
// putMaxBlockHeight stores the max block height to the database. // putMaxBlockHeight stores the max block height to the database.
@ -89,8 +85,7 @@ func putMaxBlockHeight(tx walletdb.Tx, maxBlockHeight uint32) error {
maxBlockHeightBytes := uint32ToBytes(maxBlockHeight) maxBlockHeightBytes := uint32ToBytes(maxBlockHeight)
err := bucket.Put(maxBlockHeightName, maxBlockHeightBytes) err := bucket.Put(maxBlockHeightName, maxBlockHeightBytes)
if err != nil { if err != nil {
str := "failed to store max block height: %v" return fmt.Errorf("failed to store max block height: %s", err)
return log.Errorf(str, err)
} }
return nil return nil
} }
@ -113,14 +108,12 @@ func putBlock(tx walletdb.Tx, header wire.BlockHeader, height uint32) error {
err = bucket.Put(blockHash[:], buf.Bytes()) err = bucket.Put(blockHash[:], buf.Bytes())
if err != nil { if err != nil {
str := "failed to store SPV block info: %v" return fmt.Errorf("failed to store SPV block info: %s", err)
return log.Errorf(str, err)
} }
err = bucket.Put(uint32ToBytes(height), blockHash[:]) err = bucket.Put(uint32ToBytes(height), blockHash[:])
if err != nil { if err != nil {
str := "failed to store block height info: %v" return fmt.Errorf("failed to store block height info: %s", err)
return log.Errorf(str, err)
} }
return nil return nil
@ -140,8 +133,7 @@ func putFilter(tx walletdb.Tx, blockHash chainhash.Hash, bucketName []byte,
err = bucket.Put(blockHash[:], buf.Bytes()) err = bucket.Put(blockHash[:], buf.Bytes())
if err != nil { if err != nil {
str := "failed to store filter: %v" return fmt.Errorf("failed to store filter: %s", err)
return log.Errorf(str, err)
} }
return nil return nil
@ -170,8 +162,7 @@ func putHeader(tx walletdb.Tx, blockHash chainhash.Hash, bucketName []byte,
err := bucket.Put(blockHash[:], filterTip[:]) err := bucket.Put(blockHash[:], filterTip[:])
if err != nil { if err != nil {
str := "failed to store filter header: %v" return fmt.Errorf("failed to store filter header: %s", err)
return log.Errorf(str, err)
} }
return nil return nil
@ -222,19 +213,22 @@ func GetBlockByHash(tx walletdb.Tx, blockHash chainhash.Hash) (wire.BlockHeader,
bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName) bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName)
blockBytes := bucket.Get(blockHash[:]) blockBytes := bucket.Get(blockHash[:])
if len(blockBytes) == 0 { if len(blockBytes) == 0 {
str := "failed to retrieve block info for hash: %s" return wire.BlockHeader{}, 0,
return wire.BlockHeader{}, 0, log.Errorf(str, blockHash) fmt.Errorf("failed to retrieve block info for hash: %s",
blockHash)
} }
buf := bytes.NewReader(blockBytes[:wire.MaxBlockHeaderPayload]) buf := bytes.NewReader(blockBytes[:wire.MaxBlockHeaderPayload])
var header wire.BlockHeader var header wire.BlockHeader
err := header.Deserialize(buf) err := header.Deserialize(buf)
if err != nil { if err != nil {
str := "failed to deserialize block header for hash: %s" return wire.BlockHeader{}, 0,
return wire.BlockHeader{}, 0, log.Errorf(str, blockHash) fmt.Errorf("failed to deserialize block header for "+
"hash: %s", blockHash)
} }
height := binary.LittleEndian.Uint32(blockBytes[wire.MaxBlockHeaderPayload : wire.MaxBlockHeaderPayload+4]) height := binary.LittleEndian.Uint32(
blockBytes[wire.MaxBlockHeaderPayload : wire.MaxBlockHeaderPayload+4])
return header, height, nil return header, height, nil
} }
@ -246,8 +240,7 @@ func GetBlockHashByHeight(tx walletdb.Tx, height uint32) (chainhash.Hash,
var hash chainhash.Hash var hash chainhash.Hash
hashBytes := bucket.Get(uint32ToBytes(height)) hashBytes := bucket.Get(uint32ToBytes(height))
if hashBytes == nil { if hashBytes == nil {
str := "no block hash for height %v" return hash, fmt.Errorf("no block hash for height %d", height)
return hash, log.Errorf(str, height)
} }
hash.SetBytes(hashBytes) hash.SetBytes(hashBytes)
return hash, nil return hash, nil
@ -283,8 +276,8 @@ func LatestBlock(tx walletdb.Tx) (wire.BlockHeader, uint32, error) {
maxBlockHeightBytes := bucket.Get(maxBlockHeightName) maxBlockHeightBytes := bucket.Get(maxBlockHeightName)
if maxBlockHeightBytes == nil { if maxBlockHeightBytes == nil {
str := "no max block height stored" return wire.BlockHeader{}, 0,
return wire.BlockHeader{}, 0, log.Error(str) fmt.Errorf("no max block height stored")
} }
maxBlockHeight := binary.LittleEndian.Uint32(maxBlockHeightBytes) maxBlockHeight := binary.LittleEndian.Uint32(maxBlockHeightBytes)
@ -293,8 +286,8 @@ func LatestBlock(tx walletdb.Tx) (wire.BlockHeader, uint32, error) {
return wire.BlockHeader{}, 0, err return wire.BlockHeader{}, 0, err
} }
if height != maxBlockHeight { if height != maxBlockHeight {
str := "max block height inconsistent" return wire.BlockHeader{}, 0,
return wire.BlockHeader{}, 0, log.Error(str) fmt.Errorf("max block height inconsistent")
} }
return header, height, nil return header, height, nil
} }
@ -341,47 +334,47 @@ func createSPVNS(namespace walletdb.Namespace, params *chaincfg.Params) error {
rootBucket := tx.RootBucket() rootBucket := tx.RootBucket()
spvBucket, err := rootBucket.CreateBucketIfNotExists(spvBucketName) spvBucket, err := rootBucket.CreateBucketIfNotExists(spvBucketName)
if err != nil { if err != nil {
str := "failed to create main bucket: %v" return fmt.Errorf("failed to create main bucket: %s",
return log.Errorf(str, err) err)
} }
_, err = spvBucket.CreateBucketIfNotExists(blockHeaderBucketName) _, err = spvBucket.CreateBucketIfNotExists(blockHeaderBucketName)
if err != nil { if err != nil {
str := "failed to create block header bucket: %v" return fmt.Errorf("failed to create block header "+
return log.Errorf(str, err) "bucket: %s", err)
} }
_, err = spvBucket.CreateBucketIfNotExists(basicFilterBucketName) _, err = spvBucket.CreateBucketIfNotExists(basicFilterBucketName)
if err != nil { if err != nil {
str := "failed to create basic filter bucket: %v" return fmt.Errorf("failed to create basic filter "+
return log.Errorf(str, err) "bucket: %s", err)
} }
_, err = spvBucket.CreateBucketIfNotExists(basicHeaderBucketName) _, err = spvBucket.CreateBucketIfNotExists(basicHeaderBucketName)
if err != nil { if err != nil {
str := "failed to create basic header bucket: %v" return fmt.Errorf("failed to create basic header "+
return log.Errorf(str, err) "bucket: %s", err)
} }
_, err = spvBucket.CreateBucketIfNotExists(extFilterBucketName) _, err = spvBucket.CreateBucketIfNotExists(extFilterBucketName)
if err != nil { if err != nil {
str := "failed to create extended filter bucket: %v" return fmt.Errorf("failed to create extended filter "+
return log.Errorf(str, err) "bucket: %s", err)
} }
_, err = spvBucket.CreateBucketIfNotExists(extHeaderBucketName) _, err = spvBucket.CreateBucketIfNotExists(extHeaderBucketName)
if err != nil { if err != nil {
str := "failed to create extended header bucket: %v" return fmt.Errorf("failed to create extended header "+
return log.Errorf(str, err) "bucket: %s", err)
} }
createDate := spvBucket.Get(dbCreateDateName) createDate := spvBucket.Get(dbCreateDateName)
if createDate != nil { if createDate != nil {
log.Infof("Wallet SPV namespace already created.") log.Info("Wallet SPV namespace already created.")
return nil return nil
} }
log.Infof("Creating wallet SPV namespace.") log.Info("Creating wallet SPV namespace.")
basicFilter, err := buildBasicFilter(params.GenesisBlock) basicFilter, err := buildBasicFilter(params.GenesisBlock)
if err != nil { if err != nil {
@ -437,15 +430,14 @@ func createSPVNS(namespace walletdb.Namespace, params *chaincfg.Params) error {
err = spvBucket.Put(dbCreateDateName, err = spvBucket.Put(dbCreateDateName,
uint64ToBytes(uint64(time.Now().Unix()))) uint64ToBytes(uint64(time.Now().Unix())))
if err != nil { if err != nil {
str := "failed to store database creation time: %v" return fmt.Errorf("failed to store database creation "+
return log.Errorf(str, err) "time: %s", err)
} }
return nil return nil
}) })
if err != nil { if err != nil {
str := "failed to update database: %v" return fmt.Errorf("failed to update database: %s", err)
return log.Errorf(str, err)
} }
return nil return nil

View file

@ -250,7 +250,7 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// OnBlock is invoked when a peer receives a block bitcoin message. It // OnBlock is invoked when a peer receives a block bitcoin message. It
// blocks until the bitcoin block has been fully processed. // blocks until the bitcoin block has been fully processed.
func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
log.Tracef("got block %v", msg.BlockHash()) log.Tracef("got block %s", msg.BlockHash())
// Convert the raw MsgBlock to a btcutil.Block which provides some // Convert the raw MsgBlock to a btcutil.Block which provides some
// convenience methods and things such as hash caching. // convenience methods and things such as hash caching.
block := btcutil.NewBlockFromBlockAndBytes(msg, buf) block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
@ -283,7 +283,7 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) {
newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList))) newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
for _, invVect := range msg.InvList { for _, invVect := range msg.InvList {
if invVect.Type == wire.InvTypeTx { if invVect.Type == wire.InvTypeTx {
log.Tracef("Ignoring tx %v in inv from %v -- "+ log.Tracef("Ignoring tx %s in inv from %v -- "+
"SPV mode", invVect.Hash, sp) "SPV mode", invVect.Hash, sp)
if sp.ProtocolVersion() >= wire.BIP0037Version { if sp.ProtocolVersion() >= wire.BIP0037Version {
log.Infof("Peer %v is announcing "+ log.Infof("Peer %v is announcing "+
@ -295,7 +295,7 @@ func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) {
} }
err := newInv.AddInvVect(invVect) err := newInv.AddInvVect(invVect)
if err != nil { if err != nil {
log.Errorf("Failed to add inventory vector: %v", err) log.Errorf("Failed to add inventory vector: %s", err)
break break
} }
} }
@ -882,7 +882,7 @@ func (s *ChainService) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
// Disconnect banned peers. // Disconnect banned peers.
host, _, err := net.SplitHostPort(sp.Addr()) host, _, err := net.SplitHostPort(sp.Addr())
if err != nil { if err != nil {
log.Debugf("can't split hostport %v", err) log.Debugf("can't split host/port: %s", err)
sp.Disconnect() sp.Disconnect()
return false return false
} }
@ -962,7 +962,7 @@ func (s *ChainService) handleDonePeerMsg(state *peerState, sp *serverPeer) {
func (s *ChainService) handleBanPeerMsg(state *peerState, sp *serverPeer) { func (s *ChainService) handleBanPeerMsg(state *peerState, sp *serverPeer) {
host, _, err := net.SplitHostPort(sp.Addr()) host, _, err := net.SplitHostPort(sp.Addr())
if err != nil { if err != nil {
log.Debugf("can't split ban peer %s %v", sp.Addr(), err) log.Debugf("can't split ban peer %s: %s", sp.Addr(), err)
return return
} }
log.Infof("Banned peer %s for %v", host, BanDuration) log.Infof("Banned peer %s for %v", host, BanDuration)
@ -1096,7 +1096,7 @@ func (s *ChainService) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn)
sp := newServerPeer(s, c.Permanent) sp := newServerPeer(s, c.Permanent)
p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String()) p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
if err != nil { if err != nil {
log.Debugf("Cannot create outbound peer %s: %v", c.Addr, err) log.Debugf("Cannot create outbound peer %s: %s", c.Addr, err)
s.connManager.Disconnect(c.ID()) s.connManager.Disconnect(c.ID())
} }
sp.Peer = p sp.Peer = p

View file

@ -7,11 +7,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/aakselrod/btctestlog"
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/rpctest" "github.com/btcsuite/btcd/rpctest"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcwallet/spvsvc/spvchain" "github.com/btcsuite/btcwallet/spvsvc/spvchain"
"github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
@ -22,58 +20,58 @@ func TestSetup(t *testing.T) {
// Create a btcd SimNet node and generate 500 blocks // Create a btcd SimNet node and generate 500 blocks
h1, err := rpctest.New(&chaincfg.SimNetParams, nil, nil) h1, err := rpctest.New(&chaincfg.SimNetParams, nil, nil)
if err != nil { if err != nil {
t.Fatalf("Couldn't create harness: %v", err) t.Fatalf("Couldn't create harness: %s", err)
} }
defer h1.TearDown() defer h1.TearDown()
err = h1.SetUp(false, 0) err = h1.SetUp(false, 0)
if err != nil { if err != nil {
t.Fatalf("Couldn't set up harness: %v", err) t.Fatalf("Couldn't set up harness: %s", err)
} }
_, err = h1.Node.Generate(500) _, err = h1.Node.Generate(500)
if err != nil { if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err) t.Fatalf("Couldn't generate blocks: %s", err)
} }
// Create a second btcd SimNet node // Create a second btcd SimNet node
h2, err := rpctest.New(&chaincfg.SimNetParams, nil, nil) h2, err := rpctest.New(&chaincfg.SimNetParams, nil, nil)
if err != nil { if err != nil {
t.Fatalf("Couldn't create harness: %v", err) t.Fatalf("Couldn't create harness: %s", err)
} }
defer h2.TearDown() defer h2.TearDown()
err = h2.SetUp(false, 0) err = h2.SetUp(false, 0)
if err != nil { if err != nil {
t.Fatalf("Couldn't set up harness: %v", err) t.Fatalf("Couldn't set up harness: %s", err)
} }
// Create a third btcd SimNet node and generate 900 blocks // Create a third btcd SimNet node and generate 900 blocks
h3, err := rpctest.New(&chaincfg.SimNetParams, nil, nil) h3, err := rpctest.New(&chaincfg.SimNetParams, nil, nil)
if err != nil { if err != nil {
t.Fatalf("Couldn't create harness: %v", err) t.Fatalf("Couldn't create harness: %s", err)
} }
defer h3.TearDown() defer h3.TearDown()
err = h3.SetUp(false, 0) err = h3.SetUp(false, 0)
if err != nil { if err != nil {
t.Fatalf("Couldn't set up harness: %v", err) t.Fatalf("Couldn't set up harness: %s", err)
} }
_, err = h3.Node.Generate(900) _, err = h3.Node.Generate(900)
if err != nil { if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err) t.Fatalf("Couldn't generate blocks: %s", err)
} }
// Connect, sync, and disconnect h1 and h2 // Connect, sync, and disconnect h1 and h2
err = csd([]*rpctest.Harness{h1, h2}) err = csd([]*rpctest.Harness{h1, h2})
if err != nil { if err != nil {
t.Fatalf("Couldn't connect/sync/disconnect h1 and h2: %v", err) t.Fatalf("Couldn't connect/sync/disconnect h1 and h2: %s", err)
} }
// Generate 300 blocks on the first node and 350 on the second // Generate 300 blocks on the first node and 350 on the second
_, err = h1.Node.Generate(300) _, err = h1.Node.Generate(300)
if err != nil { if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err) t.Fatalf("Couldn't generate blocks: %s", err)
} }
_, err = h2.Node.Generate(350) _, err = h2.Node.Generate(350)
if err != nil { if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err) t.Fatalf("Couldn't generate blocks: %s", err)
} }
// Now we have a node with 800 blocks (h1), 850 blocks (h2), and // Now we have a node with 800 blocks (h1), 850 blocks (h2), and
@ -92,7 +90,7 @@ func TestSetup(t *testing.T) {
for _, height := range []int64{111, 333, 555, 777} { for _, height := range []int64{111, 333, 555, 777} {
hash, err := h1.Node.GetBlockHash(height) hash, err := h1.Node.GetBlockHash(height)
if err != nil { if err != nil {
t.Fatalf("Couldn't get block hash for height %v: %v", t.Fatalf("Couldn't get block hash for height %d: %s",
height, err) height, err)
} }
modParams.Checkpoints = append(modParams.Checkpoints, modParams.Checkpoints = append(modParams.Checkpoints,
@ -106,17 +104,17 @@ func TestSetup(t *testing.T) {
// SPV chain namespace, and create a configuration for the ChainService. // SPV chain namespace, and create a configuration for the ChainService.
tempDir, err := ioutil.TempDir("", "spvchain") tempDir, err := ioutil.TempDir("", "spvchain")
if err != nil { if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err) t.Fatalf("Failed to create temporary directory: %s", err)
} }
defer os.RemoveAll(tempDir) defer os.RemoveAll(tempDir)
db, err := walletdb.Create("bdb", tempDir+"/weks.db") db, err := walletdb.Create("bdb", tempDir+"/weks.db")
defer db.Close() defer db.Close()
if err != nil { if err != nil {
t.Fatalf("Error opening DB: %v\n", err) t.Fatalf("Error opening DB: %s\n", err)
} }
ns, err := db.Namespace([]byte("weks")) ns, err := db.Namespace([]byte("weks"))
if err != nil { if err != nil {
t.Fatalf("Error geting namespace: %v\n", err) t.Fatalf("Error geting namespace: %s\n", err)
} }
config := spvchain.Config{ config := spvchain.Config{
DataDir: tempDir, DataDir: tempDir,
@ -132,16 +130,16 @@ func TestSetup(t *testing.T) {
spvchain.Services = 0 spvchain.Services = 0
spvchain.MaxPeers = 3 spvchain.MaxPeers = 3
spvchain.RequiredServices = wire.SFNodeNetwork spvchain.RequiredServices = wire.SFNodeNetwork
logger, err := btctestlog.NewTestLogger(t) /*logger, err := btctestlog.NewTestLogger(t)
if err != nil { if err != nil {
t.Fatalf("Could not set up logger: %v", err) t.Fatalf("Could not set up logger: %s", err)
} }
chainLogger := btclog.NewSubsystemLogger(logger, "CHAIN: ") chainLogger := btclog.NewSubsystemLogger(logger, "CHAIN: ")
chainLogger.SetLevel(btclog.TraceLvl) chainLogger.SetLevel(btclog.InfoLvl)
spvchain.UseLogger(chainLogger) //*/ spvchain.UseLogger(chainLogger) //*/
svc, err := spvchain.NewChainService(config) svc, err := spvchain.NewChainService(config)
if err != nil { if err != nil {
t.Fatalf("Error creating ChainService: %v", err) t.Fatalf("Error creating ChainService: %s", err)
} }
svc.Start() svc.Start()
defer svc.Stop() defer svc.Stop()
@ -149,7 +147,7 @@ func TestSetup(t *testing.T) {
// Make sure the client synchronizes with the correct node // Make sure the client synchronizes with the correct node
err = waitForSync(t, svc, h1, time.Second, 30*time.Second) err = waitForSync(t, svc, h1, time.Second, 30*time.Second)
if err != nil { if err != nil {
t.Fatalf("Couldn't sync ChainService: %v", err) t.Fatalf("Couldn't sync ChainService: %s", err)
} }
// Generate 125 blocks on h1 to make sure it reorgs the other nodes. // Generate 125 blocks on h1 to make sure it reorgs the other nodes.
@ -157,13 +155,13 @@ func TestSetup(t *testing.T) {
h1.Node.Generate(125) h1.Node.Generate(125)
err = waitForSync(t, svc, h1, time.Second, 30*time.Second) err = waitForSync(t, svc, h1, time.Second, 30*time.Second)
if err != nil { if err != nil {
t.Fatalf("Couldn't sync ChainService: %v", err) t.Fatalf("Couldn't sync ChainService: %s", err)
} }
// Connect/sync/disconnect h2 to make it reorg to the h1 chain. // Connect/sync/disconnect h2 to make it reorg to the h1 chain.
err = csd([]*rpctest.Harness{h1, h2}) err = csd([]*rpctest.Harness{h1, h2})
if err != nil { if err != nil {
t.Fatalf("Couldn't sync h2 to h1: %v", err) t.Fatalf("Couldn't sync h2 to h1: %s", err)
} }
// Generate 3 blocks on h1, one at a time, to make sure the // Generate 3 blocks on h1, one at a time, to make sure the
@ -172,17 +170,24 @@ func TestSetup(t *testing.T) {
h1.Node.Generate(1) h1.Node.Generate(1)
err = waitForSync(t, svc, h1, time.Second, 30*time.Second) err = waitForSync(t, svc, h1, time.Second, 30*time.Second)
if err != nil { if err != nil {
t.Fatalf("Couldn't sync ChainService: %v", err) t.Fatalf("Couldn't sync ChainService: %s", err)
} }
} }
// Generate 5 blocks on h2. // Generate 5 blocks on h2 and wait for ChainService to sync to the
// newly-best chain on h2.
h2.Node.Generate(5) h2.Node.Generate(5)
// Wait for ChainService to sync to the newly-best chain on h12
err = waitForSync(t, svc, h2, time.Second, 30*time.Second) err = waitForSync(t, svc, h2, time.Second, 30*time.Second)
if err != nil { if err != nil {
t.Fatalf("Couldn't sync ChainService: %v", err) t.Fatalf("Couldn't sync ChainService: %s", err)
}
// Generate 7 blocks on h1 and wait for ChainService to sync to the
// newly-best chain on h1.
h1.Node.Generate(7)
err = waitForSync(t, svc, h1, time.Second, 30*time.Second)
if err != nil {
t.Fatalf("Couldn't sync ChainService: %s", err)
} }
} }
@ -218,7 +223,7 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService,
if err != nil { if err != nil {
return err return err
} }
t.Logf("Syncing to %v (%v)", knownBestHeight, knownBestHash) //t.Logf("Syncing to %d (%s)", knownBestHeight, knownBestHash)
var haveBest *waddrmgr.BlockStamp var haveBest *waddrmgr.BlockStamp
haveBest, err = svc.BestSnapshot() haveBest, err = svc.BestSnapshot()
if err != nil { if err != nil {
@ -237,9 +242,9 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService,
haveBest, err = svc.BestSnapshot() haveBest, err = svc.BestSnapshot()
if err != nil { if err != nil {
return fmt.Errorf("Couldn't get best snapshot from "+ return fmt.Errorf("Couldn't get best snapshot from "+
"ChainService: %v", err) "ChainService: %s", err)
} }
t.Logf("Synced to %v (%v)", haveBest.Height, haveBest.Hash) //t.Logf("Synced to %d (%s)", haveBest.Height, haveBest.Hash)
} }
// Check if we're current // Check if we're current
if !svc.IsCurrent() { if !svc.IsCurrent() {