More updates on checkpoint, reorg, and sync handling and tests.

This commit is contained in:
Alex 2017-03-24 18:11:50 -06:00 committed by Olaoluwa Osuntokun
parent 6ea7e6035d
commit b7c5bcbf45
4 changed files with 350 additions and 11 deletions

View file

@ -387,11 +387,11 @@ func (b *blockManager) findPreviousHeaderCheckpoint(height int32) *chaincfg.Chec
// Find the latest checkpoint lower than height or return genesis block
// if there are none.
checkpoints := b.server.chainParams.Checkpoints
for _, ckpt := range checkpoints {
if height <= ckpt.Height {
for i := 0; i < len(checkpoints); i++ {
if height <= checkpoints[i].Height {
break
}
prevCheckpoint = &ckpt
prevCheckpoint = &checkpoints[i]
}
return prevCheckpoint
}
@ -716,6 +716,55 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
b.startHeader = e
}
} else {
// The block doesn't connect to the last block we know.
// We will need to do some additional checks to process
// possible reorganizations or incorrect chain on either
// our or the peer's side.
// If we got these headers from a peer that's not our
// sync peer, they might not be aligned correctly or
// even on the right chain. Just ignore the rest of the
// message.
if hmsg.peer != b.syncPeer {
return
}
// Check if this block is known. If so, we continue to
// the next one.
_, _, err := b.server.GetBlockByHash(
blockHeader.BlockHash())
if err == nil {
continue
}
// Check if the previous block is known. If it is, this
// is probably a reorg based on the estimated latest
// block that matches between us and the sync peer as
// derived from the block locator we sent to request
// these headers. Otherwise, the headers don't connect
// to anything we know and we should disconnect the
// peer.
_, backHeight, err := b.server.GetBlockByHash(
blockHeader.PrevBlock)
if err != nil {
log.Errorf("Couldn't get block by hash from "+
"the database (%v) -- disconnecting "+
"peer %s", err, hmsg.peer.Addr())
hmsg.peer.Disconnect()
return
}
// We've found a branch we weren't aware of. If the
// branch is earlier than the latest synchronized
// checkpoint, it's invalid and we need to disconnect
// the reporting peer.
prevCheckpoint := b.findPreviousHeaderCheckpoint(
prevNode.height)
if backHeight < uint32(prevCheckpoint.Height) {
log.Errorf("Attempt at a reorg earlier (%v) than a "+
"checkpoint (%v) past which we've already "+
"synchronized -- disconnecting peer "+
"%s", backHeight, prevCheckpoint.Height, hmsg.peer.Addr())
hmsg.peer.Disconnect()
return
}
// TODO: Add real reorg handling here
log.Warnf("Received block header that does not "+
"properly connect to the chain from peer %s "+
"-- disconnecting", hmsg.peer.Addr())
@ -743,7 +792,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
"checkpoint at height %d/hash %s",
prevCheckpoint.Height,
prevCheckpoint.Hash)
b.server.putMaxBlockHeight(uint32(
b.server.rollbackToHeight(uint32(
prevCheckpoint.Height))
hmsg.peer.Disconnect()
return
@ -762,9 +811,9 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
// the next header links properly, it must be removed before
// fetching the blocks.
b.headerList.Remove(b.headerList.Front())
log.Infof("Received %v block headers: Fetching blocks",
b.headerList.Len())
b.progressLogger.SetLastLogTime(time.Now())
//log.Infof("Received %v block headers: Fetching blocks",
// b.headerList.Len())
//b.progressLogger.SetLastLogTime(time.Now())
b.nextCheckpoint = b.findNextHeaderCheckpoint(finalHeight)
//b.fetchHeaderBlocks()
//return

View file

@ -191,6 +191,29 @@ func putExtHeader(tx walletdb.Tx, blockHash chainhash.Hash,
return putHeader(tx, blockHash, extHeaderBucketName, filterTip)
}
// rollbackLastBlock rolls back the last known block and returns the BlockStamp
// representing the new last known block.
func rollbackLastBlock(tx walletdb.Tx) (*waddrmgr.BlockStamp, error) {
bs, err := SyncedTo(tx)
if err != nil {
return nil, err
}
bucket := tx.RootBucket().Bucket(spvBucketName).Bucket(blockHeaderBucketName)
err = bucket.Delete(bs.Hash[:])
if err != nil {
return nil, err
}
err = bucket.Delete(uint32ToBytes(uint32(bs.Height)))
if err != nil {
return nil, err
}
err = putMaxBlockHeight(tx, uint32(bs.Height-1))
if err != nil {
return nil, err
}
return SyncedTo(tx)
}
// GetBlockByHash retrieves the block header, filter, and filter tip, based on
// the provided block hash, from the database.
func GetBlockByHash(tx walletdb.Tx, blockHash chainhash.Hash) (wire.BlockHeader,

View file

@ -278,8 +278,8 @@ func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
// used to examine the inventory being advertised by the remote peer and react
// accordingly. We pass the message down to blockmanager which will call
// QueueMessage with any appropriate responses.
func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
log.Tracef("Got inv with %v items", len(msg.InvList))
func (sp *serverPeer) OnInv(p *peer.Peer, msg *wire.MsgInv) {
log.Tracef("Got inv with %d items from %s", len(msg.InvList), p.Addr())
newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
for _, invVect := range msg.InvList {
if invVect.Type == wire.InvTypeTx {
@ -307,8 +307,9 @@ func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
// OnHeaders is invoked when a peer receives a headers bitcoin
// message. The message is passed down to the block manager.
func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
log.Tracef("Got headers with %v items", len(msg.Headers))
func (sp *serverPeer) OnHeaders(p *peer.Peer, msg *wire.MsgHeaders) {
log.Tracef("Got headers with %d items from %s", len(msg.Headers),
p.Addr())
sp.server.blockManager.QueueHeaders(msg, sp)
}
@ -1371,3 +1372,38 @@ func (s *ChainService) putMaxBlockHeight(maxBlockHeight uint32) error {
return putMaxBlockHeight(dbTx, maxBlockHeight)
})
}
func (s *ChainService) rollbackLastBlock() (*waddrmgr.BlockStamp, error) {
var bs *waddrmgr.BlockStamp
var err error
err = s.namespace.Update(func(dbTx walletdb.Tx) error {
bs, err = rollbackLastBlock(dbTx)
return err
})
return bs, err
}
func (s *ChainService) rollbackToHeight(height uint32) (*waddrmgr.BlockStamp, error) {
var bs *waddrmgr.BlockStamp
var err error
err = s.namespace.Update(func(dbTx walletdb.Tx) error {
bs, err = SyncedTo(dbTx)
if err != nil {
return err
}
for uint32(bs.Height) > height {
bs, err = rollbackLastBlock(dbTx)
if err != nil {
return err
}
}
return nil
})
return bs, err
}
// IsCurrent lets the caller know whether the chain service's block manager
// thinks its view of the network is current.
func (s *ChainService) IsCurrent() bool {
return s.blockManager.IsCurrent()
}

View file

@ -0,0 +1,231 @@
package spvchain_test
import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/aakselrod/btctestlog"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/rpctest"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcwallet/spvsvc/spvchain"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/walletdb"
_ "github.com/btcsuite/btcwallet/walletdb/bdb"
)
func TestSetup(t *testing.T) {
// Create a btcd SimNet node and generate 500 blocks
h1, err := rpctest.New(&chaincfg.SimNetParams, nil, nil)
if err != nil {
t.Fatalf("Couldn't create harness: %v", err)
}
defer h1.TearDown()
err = h1.SetUp(false, 0)
if err != nil {
t.Fatalf("Couldn't set up harness: %v", err)
}
_, err = h1.Node.Generate(500)
if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err)
}
// Create a second btcd SimNet node
h2, err := rpctest.New(&chaincfg.SimNetParams, nil, nil)
if err != nil {
t.Fatalf("Couldn't create harness: %v", err)
}
defer h2.TearDown()
err = h2.SetUp(false, 0)
if err != nil {
t.Fatalf("Couldn't set up harness: %v", err)
}
// Create a third btcd SimNet node and generate 900 blocks
h3, err := rpctest.New(&chaincfg.SimNetParams, nil, nil)
if err != nil {
t.Fatalf("Couldn't create harness: %v", err)
}
defer h3.TearDown()
err = h3.SetUp(false, 0)
if err != nil {
t.Fatalf("Couldn't set up harness: %v", err)
}
_, err = h3.Node.Generate(900)
if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err)
}
// Connect, sync, and disconnect h1 and h2
err = csd([]*rpctest.Harness{h1, h2})
if err != nil {
t.Fatalf("Couldn't connect/sync/disconnect h1 and h2: %v", err)
}
// Generate 300 blocks on the first node and 350 on the second
_, err = h1.Node.Generate(300)
if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err)
}
_, err = h2.Node.Generate(350)
if err != nil {
t.Fatalf("Couldn't generate blocks: %v", err)
}
// Now we have a node with 800 blocks (h1), 850 blocks (h2), and
// 900 blocks (h3). The chains of nodes h1 and h2 match up to block
// 500. By default, a synchronizing wallet connected to all three
// should synchronize to h3. However, we're going to take checkpoints
// from h1 at 111, 333, 555, and 777, and add those to the
// synchronizing wallet's chain parameters so that it should
// disconnect from h3 at block 111, and from h2 at block 555, and
// then synchronize to block 800 from h1. Order of connection is
// unfortunately not guaranteed, so the reorg may not happen with every
// test.
// Copy parameters and insert checkpoints
modParams := chaincfg.SimNetParams
for _, height := range []int64{111, 333, 555, 777} {
hash, err := h1.Node.GetBlockHash(height)
if err != nil {
t.Fatalf("Couldn't get block hash for height %v: %v",
height, err)
}
modParams.Checkpoints = append(modParams.Checkpoints,
chaincfg.Checkpoint{
Hash: hash,
Height: int32(height),
})
}
// Create a temporary directory, initialize an empty walletdb with an
// SPV chain namespace, and create a configuration for the ChainService.
tempDir, err := ioutil.TempDir("", "spvchain")
if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir)
db, err := walletdb.Create("bdb", tempDir+"/weks.db")
defer db.Close()
if err != nil {
t.Fatalf("Error opening DB: %v\n", err)
}
ns, err := db.Namespace([]byte("weks"))
if err != nil {
t.Fatalf("Error geting namespace: %v\n", err)
}
config := spvchain.Config{
DataDir: tempDir,
Namespace: ns,
ChainParams: modParams,
AddPeers: []string{
h3.P2PAddress(),
h2.P2PAddress(),
h1.P2PAddress(),
},
}
spvchain.Services = 0
spvchain.MaxPeers = 3
spvchain.RequiredServices = wire.SFNodeNetwork
logger, err := btctestlog.NewTestLogger(t)
if err != nil {
t.Fatalf("Could not set up logger: %v", err)
}
chainLogger := btclog.NewSubsystemLogger(logger, "CHAIN: ")
chainLogger.SetLevel(btclog.TraceLvl)
spvchain.UseLogger(chainLogger) //*/
svc, err := spvchain.NewChainService(config)
if err != nil {
t.Fatalf("Error creating ChainService: %v", err)
}
svc.Start()
defer svc.Stop()
// Make sure the client synchronizes with the correct node
err = waitForSync(t, svc, h1, time.Second, 30*time.Second)
if err != nil {
t.Fatalf("Couldn't sync ChainService: %v", err)
}
// Generate 150 blocks on h1 to make sure it reorgs the other nodes.
// Ensure the ChainService instance stays caught up.
h1.Node.Generate(150)
err = waitForSync(t, svc, h1, time.Second, 30*time.Second)
if err != nil {
t.Fatalf("Couldn't sync ChainService: %v", err)
}
// Connect/sync/disconnect the other nodes to make them reorg to the h1
// chain.
err = csd([]*rpctest.Harness{h1, h2, h3})
if err != nil {
t.Fatalf("Couldn't sync h2 and h3 to h1: %v", err)
}
}
// csd does a connect-sync-disconnect between nodes in order to support
// reorg testing. It brings up and tears down a temporary node, otherwise the
// nodes try to reconnect to each other which results in unintended reorgs.
func csd(harnesses []*rpctest.Harness) error {
hTemp, err := rpctest.New(&chaincfg.SimNetParams, nil, nil)
if err != nil {
return err
}
// Tear down node at the end of the function.
defer hTemp.TearDown()
err = hTemp.SetUp(false, 0)
if err != nil {
return err
}
for _, harness := range harnesses {
err = rpctest.ConnectNode(hTemp, harness)
if err != nil {
return err
}
}
return rpctest.JoinNodes(harnesses, rpctest.Blocks)
}
// waitForSync waits for the ChainService to sync to the current chain state.
func waitForSync(t *testing.T, svc *spvchain.ChainService,
correctSyncNode *rpctest.Harness, checkInterval,
timeout time.Duration) error {
knownBestHash, knownBestHeight, err :=
correctSyncNode.Node.GetBestBlock()
if err != nil {
return err
}
t.Logf("Syncing to %v (%v)", knownBestHeight, knownBestHash)
var haveBest *waddrmgr.BlockStamp
haveBest, err = svc.BestSnapshot()
if err != nil {
return err
}
var total time.Duration
for haveBest.Hash != *knownBestHash {
if total > timeout {
return fmt.Errorf("Timed out after %v.", timeout)
}
if haveBest.Height > knownBestHeight {
return fmt.Errorf("Synchronized to the wrong chain.")
}
time.Sleep(checkInterval)
total += checkInterval
haveBest, err = svc.BestSnapshot()
if err != nil {
return fmt.Errorf("Couldn't get best snapshot from "+
"ChainService: %v", err)
}
t.Logf("Synced to %v (%v)", haveBest.Height, haveBest.Hash)
}
// Check if we're current
if !svc.IsCurrent() {
return fmt.Errorf("ChainService doesn't see itself as current!")
}
return nil
}