[lbry] claimtrie: support replay of chain changes

This commit is contained in:
Roy Lee 2021-07-10 16:21:20 -07:00
parent 3c85e6e56a
commit 328705f579
5 changed files with 47 additions and 17 deletions

View file

@ -1814,7 +1814,7 @@ func New(config *Config) (*BlockChain, error) {
return nil, err return nil, err
} }
ct, err := claimtrie.New() ct, err := claimtrie.New(true)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -6,9 +6,10 @@ import (
"runtime" "runtime"
"sort" "sort"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/claimtrie/block" "github.com/btcsuite/btcd/claimtrie/block"
"github.com/btcsuite/btcd/claimtrie/block/blockrepo" "github.com/btcsuite/btcd/claimtrie/block/blockrepo"
"github.com/btcsuite/btcd/claimtrie/chain"
"github.com/btcsuite/btcd/claimtrie/chain/chainrepo"
"github.com/btcsuite/btcd/claimtrie/change" "github.com/btcsuite/btcd/claimtrie/change"
"github.com/btcsuite/btcd/claimtrie/config" "github.com/btcsuite/btcd/claimtrie/config"
"github.com/btcsuite/btcd/claimtrie/merkletrie" "github.com/btcsuite/btcd/claimtrie/merkletrie"
@ -18,6 +19,8 @@ import (
"github.com/btcsuite/btcd/claimtrie/param" "github.com/btcsuite/btcd/claimtrie/param"
"github.com/btcsuite/btcd/claimtrie/temporal" "github.com/btcsuite/btcd/claimtrie/temporal"
"github.com/btcsuite/btcd/claimtrie/temporal/temporalrepo" "github.com/btcsuite/btcd/claimtrie/temporal/temporalrepo"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
) )
@ -27,6 +30,9 @@ type ClaimTrie struct {
// Repository for reported block hashes (debugging purpose). // Repository for reported block hashes (debugging purpose).
reportedBlockRepo block.Repo reportedBlockRepo block.Repo
// Repository for raw changes recieved from chain.
chainRepo chain.Repo
// Repository for calculated block hashes. // Repository for calculated block hashes.
blockRepo block.Repo blockRepo block.Repo
@ -46,13 +52,13 @@ type ClaimTrie struct {
// Write buffer for batching changes written to repo. // Write buffer for batching changes written to repo.
// flushed before block is appended. // flushed before block is appended.
// changes []change.Change changes []change.Change
// Registrered cleanup functions which are invoked in the Close() in reverse order. // Registrered cleanup functions which are invoked in the Close() in reverse order.
cleanups []func() error cleanups []func() error
} }
func New() (*ClaimTrie, error) { func New(record bool) (*ClaimTrie, error) {
cfg := config.GenerateConfig(param.ClaimtrieDataFolder) cfg := config.GenerateConfig(param.ClaimtrieDataFolder)
var cleanups []func() error var cleanups []func() error
@ -126,12 +132,25 @@ func New() (*ClaimTrie, error) {
merkleTrie: trie, merkleTrie: trie,
height: previousHeight, height: previousHeight,
reportedBlockRepo: reportedBlockRepo,
cleanups: cleanups,
} }
if record {
chainRepo, err := chainrepo.NewPebble(cfg.ChainRepoPebble.Path)
if err != nil {
return nil, fmt.Errorf("new change change repo: %w", err)
}
cleanups = append(cleanups, chainRepo.Close)
ct.chainRepo = chainRepo
reportedBlockRepo, err := blockrepo.NewPebble(cfg.ReportedBlockRepoPebble.Path)
if err != nil {
return nil, fmt.Errorf("new reported block repo: %w", err)
}
cleanups = append(cleanups, reportedBlockRepo.Close)
ct.reportedBlockRepo = reportedBlockRepo
}
ct.cleanups = cleanups
return ct, nil return ct, nil
} }
@ -210,6 +229,15 @@ func (ct *ClaimTrie) SpendSupport(name []byte, op wire.OutPoint, id node.ClaimID
func (ct *ClaimTrie) AppendBlock() error { func (ct *ClaimTrie) AppendBlock() error {
ct.height++ ct.height++
if len(ct.changes) > 0 && ct.chainRepo != nil {
err := ct.chainRepo.Save(ct.height, ct.changes)
if err != nil {
return fmt.Errorf("chain change repo save: %w", err)
}
ct.changes = ct.changes[:0]
}
names, err := ct.nodeManager.IncrementHeightTo(ct.height) names, err := ct.nodeManager.IncrementHeightTo(ct.height)
if err != nil { if err != nil {
return fmt.Errorf("node mgr increment: %w", err) return fmt.Errorf("node mgr increment: %w", err)
@ -348,7 +376,6 @@ func (ct *ClaimTrie) Close() error {
return fmt.Errorf("cleanup: %w", err) return fmt.Errorf("cleanup: %w", err)
} }
} }
ct.cleanups = nil
return nil return nil
} }
@ -362,7 +389,7 @@ func (ct *ClaimTrie) forwardNodeChange(chg change.Change) error {
return fmt.Errorf("node manager handle change: %w", err) return fmt.Errorf("node manager handle change: %w", err)
} }
//ct.changes = append(ct.changes, chg) ct.changes = append(ct.changes, chg)
return nil return nil
} }

View file

@ -3,10 +3,11 @@ package claimtrie
import ( import (
"testing" "testing"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/claimtrie/merkletrie" "github.com/btcsuite/btcd/claimtrie/merkletrie"
"github.com/btcsuite/btcd/claimtrie/node" "github.com/btcsuite/btcd/claimtrie/node"
"github.com/btcsuite/btcd/claimtrie/param" "github.com/btcsuite/btcd/claimtrie/param"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -34,7 +35,7 @@ func TestFixedHashes(t *testing.T) {
r := require.New(t) r := require.New(t)
setup(t) setup(t)
ct, err := New() ct, err := New(false)
r.NoError(err) r.NoError(err)
defer func() { defer func() {
err = ct.Close() err = ct.Close()
@ -74,7 +75,7 @@ func TestNormalizationFork(t *testing.T) {
setup(t) setup(t)
param.NormalizedNameForkHeight = 2 param.NormalizedNameForkHeight = 2
ct, err := New() ct, err := New(false)
r.NoError(err) r.NoError(err)
r.NotNil(ct) r.NotNil(ct)
defer func() { defer func() {
@ -138,7 +139,7 @@ func TestActivationsOnNormalizationFork(t *testing.T) {
setup(t) setup(t)
param.NormalizedNameForkHeight = 4 param.NormalizedNameForkHeight = 4
ct, err := New() ct, err := New(false)
r.NoError(err) r.NoError(err)
r.NotNil(ct) r.NotNil(ct)
defer func() { defer func() {
@ -184,7 +185,7 @@ func TestNormalizationSortOrder(t *testing.T) {
// alas, it's now part of our history; we hereby test it to keep it that way // alas, it's now part of our history; we hereby test it to keep it that way
setup(t) setup(t)
param.NormalizedNameForkHeight = 2 param.NormalizedNameForkHeight = 2
ct, err := New() ct, err := New(false)
r.NoError(err) r.NoError(err)
r.NotNil(ct) r.NotNil(ct)
defer func() { defer func() {

View file

@ -110,8 +110,7 @@ var chainReplayCmd = &cobra.Command{
return fmt.Errorf("open block repo: %w", err) return fmt.Errorf("open block repo: %w", err)
} }
// FIXME: pass record flag into claimtrie ct, err := claimtrie.New(false)
ct, err := claimtrie.New()
if err != nil { if err != nil {
return fmt.Errorf("create claimtrie: %w", err) return fmt.Errorf("create claimtrie: %w", err)
} }

View file

@ -18,6 +18,9 @@ func GenerateConfig(folder string) *DBConfig {
MerkleTrieRepoPebble: pebbleConfig{ MerkleTrieRepoPebble: pebbleConfig{
Path: filepath.Join(folder, "merkletrie_pebble_db"), Path: filepath.Join(folder, "merkletrie_pebble_db"),
}, },
ChainRepoPebble: pebbleConfig{
Path: filepath.Join(folder, "chain_pebble_db"),
},
ReportedBlockRepoPebble: pebbleConfig{ ReportedBlockRepoPebble: pebbleConfig{
Path: filepath.Join(folder, "reported_blocks_pebble_db"), Path: filepath.Join(folder, "reported_blocks_pebble_db"),
}, },