[lbry] many methods now use errors.Wrap, others use node.log

added hasChildren test
This commit is contained in:
Brannon King 2021-07-27 09:34:15 -04:00
parent d691ab7a9e
commit ab852a6e9f
22 changed files with 248 additions and 332 deletions

View file

@ -15,10 +15,7 @@ import (
"github.com/btcsuite/btcd/claimtrie/node"
)
// Hack: print which block mismatches happened, but keep recording.
var mismatchedPrinted bool
func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, node *blockNode, view *UtxoViewpoint, failOnHashMiss bool) error {
func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view *UtxoViewpoint, failOnHashMiss bool) error {
ht := block.Height()
for _, tx := range block.Transactions() {
@ -32,22 +29,22 @@ func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, node *blockNode, vi
}
// Hack: let the claimtrie know the expected Hash.
b.claimTrie.ReportHash(ht, node.claimTrie)
err := b.claimTrie.AppendBlock()
err := b.claimTrie.ReportHash(ht, bn.claimTrie)
if err != nil {
return err
return errors.Wrapf(err, "in report hash")
}
err = b.claimTrie.AppendBlock()
if err != nil {
return errors.Wrapf(err, "in append block")
}
hash := b.claimTrie.MerkleHash()
if node.claimTrie != *hash {
if bn.claimTrie != *hash {
if failOnHashMiss {
return fmt.Errorf("height: %d, ct.MerkleHash: %s != node.ClaimTrie: %s", ht, *hash, node.claimTrie)
}
if !mismatchedPrinted {
fmt.Printf("\n\nHeight: %d, ct.MerkleHash: %s != node.ClaimTrie: %s, Error: %s\n", ht, *hash, node.claimTrie, err)
mismatchedPrinted = true
return errors.Errorf("height: %d, ct.MerkleHash: %s != node.ClaimTrie: %s", ht, *hash, bn.claimTrie)
}
node.LogOnce(fmt.Sprintf("\n\nHeight: %d, ct.MerkleHash: %s != node.ClaimTrie: %s, Error: %s", ht, *hash, bn.claimTrie, err))
}
return nil
}
@ -67,7 +64,7 @@ func (h *handler) handleTxIns(ct *claimtrie.ClaimTrie) error {
op := txIn.PreviousOutPoint
e := h.view.LookupEntry(op)
if e == nil {
return fmt.Errorf("missing input in view for %s", op.String())
return errors.Errorf("missing input in view for %s", op.String())
}
cs, err := txscript.DecodeClaimScript(e.pkScript)
if err == txscript.ErrNotClaimScript {
@ -129,7 +126,8 @@ func (h *handler) handleTxOuts(ct *claimtrie.ClaimTrie) error {
copy(id[:], cs.ClaimID())
normName := node.NormalizeIfNecessary(name, ct.Height())
if !bytes.Equal(h.spent[id.Key()], normName) {
fmt.Printf("Invalid update operation: name or ID mismatch for %s, %s\n", normName, id.String())
node.LogOnce(fmt.Sprintf("Invalid update operation: name or ID mismatch at %d for: %s, %s",
ct.Height(), normName, id.String()))
continue
}

View file

@ -2,7 +2,7 @@ package blockrepo
import (
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -16,31 +16,22 @@ type Pebble struct {
func NewPebble(path string) (*Pebble, error) {
db, err := pebble.Open(path, nil)
if err != nil {
return nil, fmt.Errorf("pebble open %s, %w", path, err)
}
repo := &Pebble{db: db}
return repo, nil
return repo, errors.Wrapf(err, "unable to open %s", path)
}
func (repo *Pebble) Load() (int32, error) {
iter := repo.db.NewIter(nil)
if !iter.Last() {
if err := iter.Close(); err != nil {
return 0, fmt.Errorf("close iter: %w", err)
}
return 0, nil
err := iter.Close()
return 0, errors.Wrap(err, "closing iterator with no last")
}
height := int32(binary.BigEndian.Uint32(iter.Key()))
if err := iter.Close(); err != nil {
return height, fmt.Errorf("close iter: %w", err)
}
return height, nil
err := iter.Close()
return height, errors.Wrap(err, "closing iterator")
}
func (repo *Pebble) Get(height int32) (*chainhash.Hash, error) {
@ -49,14 +40,14 @@ func (repo *Pebble) Get(height int32) (*chainhash.Hash, error) {
binary.BigEndian.PutUint32(key, uint32(height))
b, closer, err := repo.db.Get(key)
if err != nil {
return nil, err
if closer != nil {
defer closer.Close()
}
if err != nil {
return nil, errors.Wrap(err, "in get")
}
defer closer.Close()
hash, err := chainhash.NewHash(b)
return hash, err
return hash, errors.Wrap(err, "creating hash")
}
func (repo *Pebble) Set(height int32, hash *chainhash.Hash) error {
@ -64,20 +55,17 @@ func (repo *Pebble) Set(height int32, hash *chainhash.Hash) error {
key := make([]byte, 4)
binary.BigEndian.PutUint32(key, uint32(height))
return repo.db.Set(key, hash[:], pebble.NoSync)
return errors.WithStack(repo.db.Set(key, hash[:], pebble.NoSync))
}
func (repo *Pebble) Close() error {
err := repo.db.Flush()
if err != nil {
return fmt.Errorf("pebble fludh: %w", err)
// if we fail to close are we going to try again later?
return errors.Wrap(err, "on flush")
}
err = repo.db.Close()
if err != nil {
return fmt.Errorf("pebble close: %w", err)
}
return nil
return errors.Wrap(err, "on close")
}

View file

@ -1,9 +1,8 @@
package chainrepo
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/btcsuite/btcd/claimtrie/change"
"github.com/vmihailenco/msgpack/v5"
@ -18,13 +17,9 @@ type Pebble struct {
func NewPebble(path string) (*Pebble, error) {
db, err := pebble.Open(path, nil)
if err != nil {
return nil, fmt.Errorf("pebble open %s, %w", path, err)
}
repo := &Pebble{db: db}
return repo, nil
return repo, errors.Wrapf(err, "unable to open %s", path)
}
func (repo *Pebble) Save(height int32, changes []change.Change) error {
@ -33,59 +28,44 @@ func (repo *Pebble) Save(height int32, changes []change.Change) error {
return nil
}
key := bytes.NewBuffer(nil)
err := binary.Write(key, binary.BigEndian, height)
if err != nil {
return fmt.Errorf("pebble prepare key: %w", err)
}
var key [4]byte
binary.BigEndian.PutUint32(key[:], uint32(height))
value, err := msgpack.Marshal(changes)
if err != nil {
return fmt.Errorf("pebble msgpack marshal: %w", err)
return errors.Wrap(err, "in marshaller")
}
err = repo.db.Set(key.Bytes(), value, pebble.NoSync)
if err != nil {
return fmt.Errorf("pebble set: %w", err)
}
return nil
err = repo.db.Set(key[:], value, pebble.NoSync)
return errors.Wrap(err, "in set")
}
func (repo *Pebble) Load(height int32) ([]change.Change, error) {
key := bytes.NewBuffer(nil)
err := binary.Write(key, binary.BigEndian, height)
if err != nil {
return nil, fmt.Errorf("pebble prepare key: %w", err)
}
var key [4]byte
binary.BigEndian.PutUint32(key[:], uint32(height))
b, closer, err := repo.db.Get(key.Bytes())
if err != nil {
return nil, err
b, closer, err := repo.db.Get(key[:])
if closer != nil {
defer closer.Close()
}
if err != nil {
return nil, errors.Wrap(err, "in get")
}
defer closer.Close()
var changes []change.Change
err = msgpack.Unmarshal(b, &changes)
if err != nil {
return nil, fmt.Errorf("pebble msgpack marshal: %w", err)
}
return changes, nil
return changes, errors.Wrap(err, "in unmarshaller")
}
func (repo *Pebble) Close() error {
err := repo.db.Flush()
if err != nil {
return fmt.Errorf("pebble fludh: %w", err)
// if we fail to close are we going to try again later?
return errors.Wrap(err, "on flush")
}
err = repo.db.Close()
if err != nil {
return fmt.Errorf("pebble close: %w", err)
}
return nil
return errors.Wrap(err, "on close")
}

View file

@ -3,6 +3,7 @@ package claimtrie
import (
"bytes"
"fmt"
"github.com/pkg/errors"
"path/filepath"
"sort"
@ -64,13 +65,13 @@ func New(cfg config.Config) (*ClaimTrie, error) {
blockRepo, err := blockrepo.NewPebble(filepath.Join(cfg.DataDir, cfg.BlockRepoPebble.Path))
if err != nil {
return nil, fmt.Errorf("new block repo: %w", err)
return nil, errors.Wrap(err, "creating block repo")
}
cleanups = append(cleanups, blockRepo.Close)
temporalRepo, err := temporalrepo.NewPebble(filepath.Join(cfg.DataDir, cfg.TemporalRepoPebble.Path))
if err != nil {
return nil, fmt.Errorf("new temporal repo: %w", err)
return nil, errors.Wrap(err, "creating temporal repo")
}
cleanups = append(cleanups, temporalRepo.Close)
@ -78,12 +79,12 @@ func New(cfg config.Config) (*ClaimTrie, error) {
// The cleanup is delegated to the Node Manager.
nodeRepo, err := noderepo.NewPebble(filepath.Join(cfg.DataDir, cfg.NodeRepoPebble.Path))
if err != nil {
return nil, fmt.Errorf("new node repo: %w", err)
return nil, errors.Wrap(err, "creating node repo")
}
baseManager, err := node.NewBaseManager(nodeRepo)
if err != nil {
return nil, fmt.Errorf("new node manager: %w", err)
return nil, errors.Wrap(err, "creating node base manager")
}
nodeManager := node.NewNormalizingManager(baseManager)
cleanups = append(cleanups, nodeManager.Close)
@ -96,7 +97,7 @@ func New(cfg config.Config) (*ClaimTrie, error) {
// Initialize repository for MerkleTrie. The cleanup is delegated to MerkleTrie.
trieRepo, err := merkletrierepo.NewPebble(filepath.Join(cfg.DataDir, cfg.MerkleTrieRepoPebble.Path))
if err != nil {
return nil, fmt.Errorf("new trie repo: %w", err)
return nil, errors.Wrap(err, "creating trie repo")
}
persistentTrie := merkletrie.NewPersistentTrie(nodeManager, trieRepo)
@ -107,7 +108,7 @@ func New(cfg config.Config) (*ClaimTrie, error) {
// Restore the last height.
previousHeight, err := blockRepo.Load()
if err != nil {
return nil, fmt.Errorf("load blocks: %w", err)
return nil, errors.Wrap(err, "load block tip")
}
ct := &ClaimTrie{
@ -123,14 +124,14 @@ func New(cfg config.Config) (*ClaimTrie, error) {
if cfg.Record {
chainRepo, err := chainrepo.NewPebble(filepath.Join(cfg.DataDir, cfg.ChainRepoPebble.Path))
if err != nil {
return nil, fmt.Errorf("new change change repo: %w", err)
return nil, errors.Wrap(err, "creating chain repo")
}
cleanups = append(cleanups, chainRepo.Close)
ct.chainRepo = chainRepo
reportedBlockRepo, err := blockrepo.NewPebble(filepath.Join(cfg.DataDir, cfg.ReportedBlockRepoPebble.Path))
if err != nil {
return nil, fmt.Errorf("new reported block repo: %w", err)
return nil, errors.Wrap(err, "creating reported block repo")
}
cleanups = append(cleanups, reportedBlockRepo.Close)
ct.reportedBlockRepo = reportedBlockRepo
@ -141,19 +142,19 @@ func New(cfg config.Config) (*ClaimTrie, error) {
hash, err := blockRepo.Get(previousHeight)
if err != nil {
ct.Close() // TODO: the cleanups aren't run when we exit with an err above here (but should be)
return nil, fmt.Errorf("get hash: %w", err)
return nil, errors.Wrap(err, "block repo get")
}
_, err = nodeManager.IncrementHeightTo(previousHeight)
if err != nil {
ct.Close()
return nil, fmt.Errorf("node manager init: %w", err)
return nil, errors.Wrap(err, "increment height to")
}
// TODO: pass in the interrupt signal here:
trie.SetRoot(hash, nil) // keep this after IncrementHeightTo
if !ct.MerkleHash().IsEqual(hash) {
ct.Close()
return nil, fmt.Errorf("unable to restore the claim hash to %s at height %d", hash.String(), previousHeight)
return nil, errors.Errorf("unable to restore the claim hash to %s at height %d", hash.String(), previousHeight)
}
}
@ -236,19 +237,19 @@ func (ct *ClaimTrie) AppendBlock() error {
if len(ct.changes) > 0 && ct.chainRepo != nil {
err := ct.chainRepo.Save(ct.height, ct.changes)
if err != nil {
return fmt.Errorf("chain change repo save: %w", err)
return errors.Wrap(err, "chain change repo save")
}
ct.changes = ct.changes[:0]
}
names, err := ct.nodeManager.IncrementHeightTo(ct.height)
if err != nil {
return fmt.Errorf("node mgr increment: %w", err)
return errors.Wrap(err, "node manager increment")
}
expirations, err := ct.temporalRepo.NodesAt(ct.height)
if err != nil {
return fmt.Errorf("temporal repo nodes at: %w", err)
return errors.Wrap(err, "temporal repo get")
}
names = removeDuplicates(names) // comes out sorted
@ -275,7 +276,7 @@ func (ct *ClaimTrie) AppendBlock() error {
}
err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights)
if err != nil {
return fmt.Errorf("temporal repo set at: %w", err)
return errors.Wrap(err, "temporal repo set")
}
hitFork := ct.updateTrieForHashForkIfNecessary()
@ -294,14 +295,16 @@ func (ct *ClaimTrie) updateTrieForHashForkIfNecessary() bool {
if ct.height != param.AllClaimsInMerkleForkHeight {
return false
}
fmt.Printf("Marking all trie nodes as dirty for the hash fork...")
node.LogOnce("Marking all trie nodes as dirty for the hash fork...")
// invalidate all names because we have to recompute the hash on everything
// requires its own 8GB of RAM in current trie impl.
ct.nodeManager.IterateNames(func(name []byte) bool {
ct.merkleTrie.Update(name, false)
return true
})
fmt.Printf(" Done. Now recomputing all hashes...\n")
node.LogOnce("Done. Now recomputing all hashes...")
return true
}
@ -358,7 +361,7 @@ func (ct *ClaimTrie) ResetHeight(height int32) error {
ct.merkleTrie.SetRoot(hash, names)
if !ct.MerkleHash().IsEqual(hash) {
return fmt.Errorf("unable to restore the hash at height %d", height)
return errors.Errorf("unable to restore the hash at height %d", height)
}
return nil
}
@ -378,17 +381,15 @@ func (ct *ClaimTrie) Height() int32 {
// Close persists states.
// Any calls to the ClaimTrie after Close() being called results undefined behaviour.
func (ct *ClaimTrie) Close() error {
func (ct *ClaimTrie) Close() {
for i := len(ct.cleanups) - 1; i >= 0; i-- {
cleanup := ct.cleanups[i]
err := cleanup()
if err != nil { // TODO: it would be better to cleanup what we can than exit
return fmt.Errorf("cleanup: %w", err)
if err != nil { // it would be better to cleanup what we can than exit early
node.LogOnce("On cleanup: " + err.Error())
}
}
return nil
}
func (ct *ClaimTrie) forwardNodeChange(chg change.Change) error {

View file

@ -40,10 +40,7 @@ func TestFixedHashes(t *testing.T) {
setup(t)
ct, err := New(cfg)
r.NoError(err)
defer func() {
err = ct.Close()
r.NoError(err)
}()
defer ct.Close()
r.Equal(merkletrie.EmptyTrieHash[:], ct.MerkleHash()[:])
@ -81,10 +78,7 @@ func TestNormalizationFork(t *testing.T) {
ct, err := New(cfg)
r.NoError(err)
r.NotNil(ct)
defer func() {
err = ct.Close()
r.NoError(err)
}()
defer ct.Close()
hash := chainhash.HashH([]byte{1, 2, 3})
@ -145,10 +139,7 @@ func TestActivationsOnNormalizationFork(t *testing.T) {
ct, err := New(cfg)
r.NoError(err)
r.NotNil(ct)
defer func() {
err = ct.Close()
r.NoError(err)
}()
defer ct.Close()
hash := chainhash.HashH([]byte{1, 2, 3})
@ -191,10 +182,7 @@ func TestNormalizationSortOrder(t *testing.T) {
ct, err := New(cfg)
r.NoError(err)
r.NotNil(ct)
defer func() {
err := ct.Close()
r.NoError(err)
}()
defer ct.Close()
hash := chainhash.HashH([]byte{1, 2, 3})
@ -238,10 +226,7 @@ func TestRebuild(t *testing.T) {
ct, err := New(cfg)
r.NoError(err)
r.NotNil(ct)
defer func() {
err := ct.Close()
r.NoError(err)
}()
defer ct.Close()
hash := chainhash.HashH([]byte{1, 2, 3})

View file

@ -9,7 +9,6 @@ import (
"github.com/btcsuite/btcd/claimtrie/block/blockrepo"
"github.com/btcsuite/btcd/claimtrie/merkletrie"
"github.com/btcsuite/btcd/claimtrie/merkletrie/merkletrierepo"
"github.com/btcsuite/btcd/claimtrie/param"
"github.com/btcsuite/btcd/claimtrie/temporal/temporalrepo"
"github.com/spf13/cobra"
@ -139,7 +138,7 @@ var blockNameCmd = &cobra.Command{
defer trie.Close()
trie.SetRoot(hash, nil)
if len(args) > 1 {
trie.Dump(args[1], param.AllClaimsInMerkleForkHeight >= int32(height))
trie.Dump(args[1])
} else {
tmpRepo, err := temporalrepo.NewPebble(filepath.Join(cfg.DataDir, cfg.TemporalRepoPebble.Path))
if err != nil {
@ -151,7 +150,7 @@ var blockNameCmd = &cobra.Command{
}
for _, name := range nodes {
fmt.Printf("Name: %s, ", string(name))
trie.Dump(string(name), param.AllClaimsInMerkleForkHeight >= int32(height))
trie.Dump(string(name))
}
}
return nil

View file

@ -84,11 +84,11 @@ var nodeReplayCmd = &cobra.Command{
}
}
nm, err := node.NewBaseManager(repo)
bm, err := node.NewBaseManager(repo)
if err != nil {
return fmt.Errorf("create node manager: %w", err)
}
nm = node.NewNormalizingManager(nm)
nm := node.NewNormalizingManager(bm)
_, err = nm.IncrementHeightTo(int32(height))
if err != nil {

View file

@ -3,14 +3,13 @@ package merkletrie
import (
"bytes"
"fmt"
"github.com/pkg/errors"
"runtime"
"sort"
"sync"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/claimtrie/node"
"github.com/cockroachdb/pebble"
)
var (
@ -97,7 +96,7 @@ func (t *PersistentTrie) resolveChildLinks(n *vertex, key []byte) {
b.Write(n.merkleHash[:])
result, closer, err := t.repo.Get(b.Bytes())
if err == pebble.ErrNotFound { // TODO: leaky abstraction
if result == nil {
return
} else if err != nil {
panic(err)
@ -247,10 +246,13 @@ func (t *PersistentTrie) merkleAllClaims(prefix []byte, v *vertex) *chainhash.Ha
}
func (t *PersistentTrie) Close() error {
return t.repo.Close()
return errors.WithStack(t.repo.Close())
}
func (t *PersistentTrie) Dump(s string, allClaims bool) {
func (t *PersistentTrie) Dump(s string) {
// TODO: this function is in the wrong spot; either it goes with its caller or it needs to be a generic iterator
// we don't want fmt used in here either way
v := t.root
for i := 0; i < len(s); i++ {

View file

@ -1,12 +1,9 @@
package merkletrierepo
import (
"fmt"
"io"
"time"
"github.com/cockroachdb/pebble"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"io"
)
type Pebble struct {
@ -16,37 +13,35 @@ type Pebble struct {
func NewPebble(path string) (*Pebble, error) {
cache := pebble.NewCache(512 << 20)
defer cache.Unref()
go func() {
tick := time.NewTicker(60 * time.Second)
for range tick.C {
m := cache.Metrics()
fmt.Printf("cnt: %s, objs: %s, hits: %s, miss: %s, hitrate: %.2f\n",
humanize.Bytes(uint64(m.Size)),
humanize.Comma(m.Count),
humanize.Comma(m.Hits),
humanize.Comma(m.Misses),
float64(m.Hits)/float64(m.Hits+m.Misses))
}
}()
//defer cache.Unref()
//
//go func() {
// tick := time.NewTicker(60 * time.Second)
// for range tick.C {
//
// m := cache.Metrics()
// fmt.Printf("cnt: %s, objs: %s, hits: %s, miss: %s, hitrate: %.2f\n",
// humanize.Bytes(uint64(m.Size)),
// humanize.Comma(m.Count),
// humanize.Comma(m.Hits),
// humanize.Comma(m.Misses),
// float64(m.Hits)/float64(m.Hits+m.Misses))
//
// }
//}()
db, err := pebble.Open(path, &pebble.Options{Cache: cache, BytesPerSync: 32 << 20})
if err != nil {
return nil, fmt.Errorf("pebble open %s, %w", path, err)
}
repo := &Pebble{db: db}
repo := &Pebble{
db: db,
}
return repo, nil
return repo, errors.Wrapf(err, "unable to open %s", path)
}
func (repo *Pebble) Get(key []byte) ([]byte, io.Closer, error) {
return repo.db.Get(key)
d, c, e := repo.db.Get(key)
if e == pebble.ErrNotFound {
return nil, c, nil
}
return d, c, e
}
func (repo *Pebble) Set(key, value []byte) error {
@ -57,13 +52,10 @@ func (repo *Pebble) Close() error {
err := repo.db.Flush()
if err != nil {
return fmt.Errorf("pebble fludh: %w", err)
// if we fail to close are we going to try again later?
return errors.Wrap(err, "on flush")
}
err = repo.db.Close()
if err != nil {
return fmt.Errorf("pebble close: %w", err)
}
return nil
return errors.Wrap(err, "on close")
}

View file

@ -2,7 +2,6 @@ package merkletrie
import (
"bytes"
"fmt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/claimtrie/node"
"runtime"
@ -42,8 +41,15 @@ func (rt *RamTrie) SetRoot(h *chainhash.Hash, names [][]byte) {
// if names is nil then we need to query all names
if names == nil {
fmt.Printf("Building the entire claim trie in RAM...\n")
// TODO: should technically clear the old trie first
node.LogOnce("Building the entire claim trie in RAM...") // could put this in claimtrie.go
//should technically clear the old trie first:
if rt.Nodes > 1 {
rt.Root = &collapsedVertex{key: make(KeyType, 0)}
rt.Nodes = 1
runtime.GC()
}
rt.store.IterateNames(func(name []byte) bool {
rt.Update(name, false)
return true
@ -134,6 +140,7 @@ func (rt *RamTrie) merkleHashAllClaims(v *collapsedVertex) *chainhash.Hash {
childHash := NoChildrenHash
if len(childHashes) > 0 {
// this shouldn't be referencing node; where else can we put this merkle root func?
childHash = node.ComputeMerkleRoot(childHashes)
}

View file

@ -1,4 +1,4 @@
package claimtrie
package node
import (
"github.com/btcsuite/btclog"
@ -26,3 +26,12 @@ func DisableLog() {
func UseLogger(logger btclog.Logger) {
log = logger
}
var loggedStrings = map[string]bool{} // is this gonna get too large?
func LogOnce(s string) {
if loggedStrings[s] {
return
}
loggedStrings[s] = true
log.Info(s)
}

View file

@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"strconv"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -85,7 +86,7 @@ type BaseManager struct {
changes []change.Change
}
func NewBaseManager(repo Repo) (Manager, error) {
func NewBaseManager(repo Repo) (*BaseManager, error) {
nm := &BaseManager{
repo: repo,
@ -107,12 +108,12 @@ func (nm *BaseManager) Node(name []byte) (*Node, error) {
changes, err := nm.repo.LoadChanges(name)
if err != nil {
return nil, fmt.Errorf("load changes from node repo: %w", err)
return nil, errors.Wrap(err, "in load changes")
}
n, err = nm.newNodeFromChanges(changes, nm.height)
if err != nil {
return nil, fmt.Errorf("create node from changes: %w", err)
return nil, errors.Wrap(err, "in new node")
}
if n == nil { // they've requested a nonexistent or expired name
@ -137,7 +138,7 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32)
for i, chg := range changes {
if chg.Height < previous {
return nil, fmt.Errorf("expected the changes to be in order by height")
panic("expected the changes to be in order by height")
}
if chg.Height > height {
count = i
@ -152,7 +153,7 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32)
delay := nm.getDelayForName(n, chg)
err := n.ApplyChange(chg, delay)
if err != nil {
return nil, fmt.Errorf("append change: %w", err)
return nil, errors.Wrap(err, "in apply change")
}
}
@ -168,6 +169,9 @@ func (nm *BaseManager) AppendChange(chg change.Change) error {
nm.cache.Delete(string(chg.Name))
nm.changes = append(nm.changes, chg)
// worth putting in this kind of thing pre-emptively?
// log.Debugf("CHG: %d, %s, %v, %s, %d", chg.Height, chg.Name, chg.Type, chg.ClaimID, chg.Amount)
return nil
}
@ -225,7 +229,7 @@ func (nm *BaseManager) IncrementHeightTo(height int32) ([][]byte, error) {
}
if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names
return nil, fmt.Errorf("save changes to node repo: %w", err)
return nil, errors.Wrap(err, "in append changes")
}
// Truncate the buffer size to zero.
@ -241,13 +245,13 @@ func (nm *BaseManager) IncrementHeightTo(height int32) ([][]byte, error) {
func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) error {
if height >= nm.height {
return fmt.Errorf("invalid height")
return errors.Errorf("invalid height of %d for %d", height, nm.height)
}
for _, name := range affectedNames {
nm.cache.Delete(string(name))
if err := nm.repo.DropChanges(name, height); err != nil {
return err
return errors.Wrap(err, "in drop changes")
}
}
@ -275,25 +279,15 @@ func (nm *BaseManager) getDelayForName(n *Node, chg change.Change) int32 {
delay := calculateDelay(chg.Height, n.TakenOverAt)
if delay > 0 && nm.aWorkaroundIsNeeded(n, chg) {
// TODO: log this (but only once per name-height combo)
//fmt.Printf("Delay workaround applies to %s at %d\n", chg.Name, chg.Height)
if chg.Height >= nm.height {
LogOnce(fmt.Sprintf("Delay workaround applies to %s at %d, ClaimID: %s",
chg.Name, chg.Height, chg.ClaimID))
}
return 0
}
return delay
}
func isInDelayPart2(chg change.Change) bool {
heights, ok := param.DelayWorkaroundsPart2[string(chg.Name)]
if ok {
for _, h := range heights {
if h == chg.Height {
return true
}
}
}
return false
}
func hasZeroActiveClaims(n *Node) bool {
// this isn't quite the same as having an active best (since that is only updated after all changes are processed)
for _, c := range n.Claims {
@ -318,19 +312,7 @@ func (nm *BaseManager) aWorkaroundIsNeeded(n *Node, chg change.Change) bool {
// auto it = nodesToAddOrUpdate.find(name); // nodesToAddOrUpdate is the working changes, base is previous block
// auto answer = (it || (it = base->find(name))) && !it->empty() ? nNextHeight - it->nHeightOfLastTakeover : 0;
needed := hasZeroActiveClaims(n) && nm.hasChildren(chg.Name, chg.Height, chg.SpentChildren, 2)
if chg.Height <= 933294 {
w := isInDelayPart2(chg)
if w {
if !needed {
fmt.Printf("DELAY WORKAROUND FALSE NEGATIVE! %d: %s: %t\n", chg.Height, chg.Name, needed)
}
} else if needed {
fmt.Printf("DELAY WORKAROUND FALSE POSITIVE! %d: %s: %t\n", chg.Height, chg.Name, needed)
}
// return w // if you want to sync to 933294+
}
return needed
return hasZeroActiveClaims(n) && nm.hasChildren(chg.Name, chg.Height, chg.SpentChildren, 2)
} else if len(n.Claims) > 0 {
// NOTE: old code had a bug in it where nodes with no claims but with children would get left in the cache after removal.
// This would cause the getNumBlocksOfContinuousOwnership to return zero (causing incorrect takeover height calc).
@ -371,13 +353,7 @@ func (nm *BaseManager) Height() int32 {
}
func (nm *BaseManager) Close() error {
err := nm.repo.Close()
if err != nil {
return fmt.Errorf("close repo: %w", err)
}
return nil
return errors.WithStack(nm.repo.Close())
}
func (nm *BaseManager) hasChildren(name []byte, height int32, spentChildren map[string]bool, required int) bool {
@ -386,7 +362,7 @@ func (nm *BaseManager) hasChildren(name []byte, height int32, spentChildren map[
spentChildren = map[string]bool{}
}
nm.repo.IterateChildren(name, func(changes []change.Change) bool {
err := nm.repo.IterateChildren(name, func(changes []change.Change) bool {
// if the key is unseen, generate a node for it to height
// if that node is active then increase the count
if len(changes) == 0 {
@ -407,7 +383,7 @@ func (nm *BaseManager) hasChildren(name []byte, height int32, spentChildren map[
}
return true
})
return len(c) >= required
return err == nil && len(c) >= required
}
func (nm *BaseManager) IterateNames(predicate func(name []byte) bool) {

View file

@ -185,3 +185,44 @@ func TestClaimSort(t *testing.T) {
r.Equal(int64(2), n.Claims[2].Amount)
r.Equal(int32(4), n.Claims[3].AcceptedAt)
}
func TestHasChildren(t *testing.T) {
r := require.New(t)
param.SetNetwork(wire.TestNet)
repo, err := noderepo.NewPebble(t.TempDir())
r.NoError(err)
m, err := NewBaseManager(repo)
r.NoError(err)
defer m.Close()
chg := change.NewChange(change.AddClaim).SetName([]byte("a")).SetOutPoint(out1).SetHeight(1).SetAmount(2)
chg.ClaimID = change.NewClaimID(*out1)
r.NoError(m.AppendChange(chg))
_, err = m.IncrementHeightTo(1)
r.NoError(err)
r.False(m.hasChildren([]byte("a"), 1, nil, 1))
chg = change.NewChange(change.AddClaim).SetName([]byte("ab")).SetOutPoint(out2).SetHeight(2).SetAmount(2)
chg.ClaimID = change.NewClaimID(*out2)
r.NoError(m.AppendChange(chg))
_, err = m.IncrementHeightTo(2)
r.NoError(err)
r.False(m.hasChildren([]byte("a"), 2, nil, 2))
r.True(m.hasChildren([]byte("a"), 2, nil, 1))
chg = change.NewChange(change.AddClaim).SetName([]byte("abc")).SetOutPoint(out3).SetHeight(3).SetAmount(2)
chg.ClaimID = change.NewClaimID(*out3)
r.NoError(m.AppendChange(chg))
_, err = m.IncrementHeightTo(3)
r.NoError(err)
r.False(m.hasChildren([]byte("a"), 3, nil, 2))
chg = change.NewChange(change.AddClaim).SetName([]byte("ac")).SetOutPoint(out1).SetHeight(4).SetAmount(2)
chg.ClaimID = change.NewClaimID(*out4)
r.NoError(m.AppendChange(chg))
_, err = m.IncrementHeightTo(4)
r.NoError(err)
r.True(m.hasChildren([]byte("a"), 4, nil, 2))
}

View file

@ -2,6 +2,7 @@ package node
import (
"fmt"
"github.com/pkg/errors"
"math"
"sort"
@ -9,9 +10,6 @@ import (
"github.com/btcsuite/btcd/claimtrie/param"
)
// ErrNotFound is returned when a claim or support is not found.
var mispents = map[string]bool{}
type Node struct {
BestClaim *Claim // The claim that has most effective amount at the current height.
TakenOverAt int32 // The height at when the current BestClaim took over.
@ -48,7 +46,7 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error {
}
old := n.Claims.find(byOut(chg.OutPoint)) // TODO: remove this after proving ResetHeight works
if old != nil {
fmt.Printf("CONFLICT WITH EXISTING TXO! Name: %s, Height: %d\n", chg.Name, chg.Height)
return errors.Errorf("CONFLICT WITH EXISTING TXO! Name: %s, Height: %d", chg.Name, chg.Height)
}
n.Claims = append(n.Claims, c)
@ -56,10 +54,9 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error {
c := n.Claims.find(byOut(chg.OutPoint))
if c != nil {
c.setStatus(Deactivated)
} else if !mispents[fmt.Sprintf("%d_%s", chg.Height, chg.ClaimID)] {
mispents[fmt.Sprintf("%d_%s", chg.Height, chg.ClaimID)] = true
fmt.Printf("Spending claim but missing existing claim with TXO %s\n "+
"Name: %s, ID: %s\n", chg.OutPoint, chg.Name, chg.ClaimID)
} else {
LogOnce(fmt.Sprintf("Spending claim but missing existing claim with TXO %s, "+
"Name: %s, ID: %s", chg.OutPoint, chg.Name, chg.ClaimID))
}
// apparently it's legit to be absent in the map:
// 'two' at 481100, 36a719a156a1df178531f3c712b8b37f8e7cc3b36eea532df961229d936272a1:0
@ -81,7 +78,7 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error {
c.setActiveAt(chg.Height + delay) // TODO: Fork this out
} else {
fmt.Printf("Updating claim but missing existing claim with ID %s", chg.ClaimID)
LogOnce(fmt.Sprintf("Updating claim but missing existing claim with ID %s", chg.ClaimID))
}
case change.AddSupport:
n.Supports = append(n.Supports, &Claim{
@ -104,8 +101,8 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error {
// We would also need to track the update situation, though, but that could be done locally.
s.setStatus(Deactivated)
} else {
fmt.Printf("Spending support but missing existing support with TXO %s\n "+
"Name: %s, ID: %s\n", chg.OutPoint, chg.Name, chg.ClaimID)
LogOnce(fmt.Sprintf("Spending support but missing existing claim with TXO %s, "+
"Name: %s, ID: %s", chg.OutPoint, chg.Name, chg.ClaimID))
}
}
return nil

View file

@ -2,7 +2,6 @@ package noderepo
import (
"bytes"
"fmt"
"reflect"
"sort"
@ -12,6 +11,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"github.com/vmihailenco/msgpack/v5"
)
@ -80,19 +80,16 @@ func init() {
func NewPebble(path string) (*Pebble, error) {
db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(256 << 20), BytesPerSync: 16 << 20})
if err != nil {
return nil, fmt.Errorf("pebble open %s, %w", path, err)
}
repo := &Pebble{db: db}
return repo, nil
return repo, errors.Wrapf(err, "unable to open %s", path)
}
// AppendChanges makes an assumption that anything you pass to it is newer than what was saved before.
func (repo *Pebble) AppendChanges(changes []change.Change) error {
batch := repo.db.NewBatch()
defer batch.Close()
// TODO: switch to buffer pool and reuse encoder
for _, chg := range changes {
@ -100,27 +97,22 @@ func (repo *Pebble) AppendChanges(changes []change.Change) error {
chg.Name = nil // don't waste the storage space on this (annotation a better approach?)
value, err := msgpack.Marshal(chg)
if err != nil {
return fmt.Errorf("msgpack marshal value: %w", err)
return errors.Wrap(err, "in marshaller")
}
err = batch.Merge(name, value, pebble.NoSync)
if err != nil {
return fmt.Errorf("pebble set: %w", err)
return errors.Wrap(err, "in merge")
}
}
err := batch.Commit(pebble.NoSync)
if err != nil {
return fmt.Errorf("pebble save commit: %w", err)
}
batch.Close()
return err
return errors.Wrap(batch.Commit(pebble.NoSync), "in commit")
}
func (repo *Pebble) LoadChanges(name []byte) ([]change.Change, error) {
data, closer, err := repo.db.Get(name)
if err != nil && err != pebble.ErrNotFound {
return nil, fmt.Errorf("pebble get: %w", err)
return nil, errors.Wrapf(err, "in get %s", name) // does returning a name in an error expose too much?
}
if closer != nil {
defer closer.Close()
@ -140,7 +132,7 @@ func unmarshalChanges(name, data []byte) ([]change.Change, error) {
var chg change.Change
err := dec.Decode(&chg)
if err != nil {
return nil, fmt.Errorf("msgpack unmarshal: %w", err)
return nil, errors.Wrap(err, "in decode")
}
chg.Name = name
changes = append(changes, chg)
@ -156,24 +148,24 @@ func unmarshalChanges(name, data []byte) ([]change.Change, error) {
func (repo *Pebble) DropChanges(name []byte, finalHeight int32) error {
changes, err := repo.LoadChanges(name)
if err != nil {
return errors.Wrapf(err, "in load changes for %s", name)
}
i := 0
for ; i < len(changes); i++ {
if changes[i].Height > finalHeight {
break
}
}
if err != nil {
return fmt.Errorf("pebble drop: %w", err)
}
// making a performance assumption that DropChanges won't happen often:
err = repo.db.Set(name, []byte{}, pebble.NoSync)
if err != nil {
return fmt.Errorf("pebble drop: %w", err)
return errors.Wrapf(err, "in set at %s", name)
}
return repo.AppendChanges(changes[:i])
}
func (repo *Pebble) IterateChildren(name []byte, f func(changes []change.Change) bool) {
func (repo *Pebble) IterateChildren(name []byte, f func(changes []change.Change) bool) error {
start := make([]byte, len(name)+1) // zeros that last byte; need a constant len for stack alloc?
copy(start, name)
@ -195,12 +187,13 @@ func (repo *Pebble) IterateChildren(name []byte, f func(changes []change.Change)
// NOTE! iter.Key() is ephemeral!
changes, err := unmarshalChanges(iter.Key(), iter.Value())
if err != nil {
panic(err)
return errors.Wrapf(err, "from unmarshaller at %s", iter.Key())
}
if !f(changes) {
return
break
}
}
return nil
}
func (repo *Pebble) IterateAll(predicate func(name []byte) bool) {
@ -218,13 +211,10 @@ func (repo *Pebble) Close() error {
err := repo.db.Flush()
if err != nil {
return fmt.Errorf("pebble flush: %w", err)
// if we fail to close are we going to try again later?
return errors.Wrap(err, "on flush")
}
err = repo.db.Close()
if err != nil {
return fmt.Errorf("pebble close: %w", err)
}
return nil
return errors.Wrap(err, "on close")
}

View file

@ -2,8 +2,6 @@ package node
import (
"bytes"
"fmt"
"github.com/btcsuite/btcd/claimtrie/change"
"github.com/btcsuite/btcd/claimtrie/param"
)
@ -58,7 +56,7 @@ func (nm *NormalizingManager) addNormalizationForkChangesIfNecessary(height int3
return
}
nm.normalizedAt = height
fmt.Printf("Generating necessary changes for the normalization fork...\n")
log.Info("Generating necessary changes for the normalization fork...")
// the original code had an unfortunate bug where many unnecessary takeovers
// were triggered at the normalization fork

View file

@ -22,7 +22,7 @@ type Repo interface {
// IterateChildren returns change sets for each of name.+
// Return false on f to stop the iteration.
IterateChildren(name []byte, f func(changes []change.Change) bool)
IterateChildren(name []byte, f func(changes []change.Change) bool) error
// IterateAll iterates keys until the predicate function returns false
IterateAll(predicate func(name []byte) bool)

View file

@ -283,37 +283,3 @@ func generateDelayWorkarounds() map[string][]int32 {
"yay-nc-bob-afet-kamera-arkas": {657957},
}
}
var DelayWorkaroundsPart2 = generateDelayWorkaroundsPart2()
func generateDelayWorkaroundsPart2() map[string][]int32 {
return map[string][]int32{
"en-vivo-hablando-de-bitcoin-y-3": {664642},
"en-vivo-hablando-de-bitcoin-y-4": {664642},
"@gn": {752630, 755269},
"putalocura": {809590},
"@isc": {813832},
"@pnl": {864618},
"@dreamr": {875433},
"2019-10-30": {878258},
"papi-16": {884431},
"papi-4": {884431},
"papi-18": {884431},
"papi-17": {884431},
"papi-7": {884431},
"papi-3": {884431},
"papi-30": {884431},
"papi": {884431},
"papi-9": {884431},
"papi-19": {884431},
"papi-papi-2": {884431},
"papi-6": {884431},
"viaje-a-la-luna-": {887018, 887591, 888024},
"fortnite1": {900015},
"who-is-the-master-": {900787},
"thp": {923634},
"thm": {923635},
"el-presidente": {923766},
"@erikh526": {933294},
}
}

View file

@ -3,7 +3,7 @@ package temporalrepo
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/cockroachdb/pebble"
)
@ -15,13 +15,9 @@ type Pebble struct {
func NewPebble(path string) (*Pebble, error) {
db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(128 << 20)})
if err != nil {
return nil, fmt.Errorf("pebble open %s, %w", path, err)
}
repo := &Pebble{db: db}
return repo, nil
return repo, errors.Wrapf(err, "unable to open %s", path)
}
func (repo *Pebble) SetNodesAt(name [][]byte, heights []int32) error {
@ -38,10 +34,10 @@ func (repo *Pebble) SetNodesAt(name [][]byte, heights []int32) error {
err := batch.Set(key.Bytes(), nil, pebble.NoSync)
if err != nil {
return fmt.Errorf("pebble set: %w", err)
return errors.Wrap(err, "in set")
}
}
return batch.Commit(pebble.NoSync)
return errors.Wrap(batch.Commit(pebble.NoSync), "in commit")
}
func (repo *Pebble) NodesAt(height int32) ([][]byte, error) {
@ -69,25 +65,17 @@ func (repo *Pebble) NodesAt(height int32) ([][]byte, error) {
names = append(names, name)
}
err := iter.Close()
if err != nil {
return nil, fmt.Errorf("pebble get: %w", err)
}
return names, nil
return names, errors.Wrap(iter.Close(), "in close")
}
func (repo *Pebble) Close() error {
err := repo.db.Flush()
if err != nil {
return fmt.Errorf("pebble fludh: %w", err)
// if we fail to close are we going to try again later?
return errors.Wrap(err, "on flush")
}
err = repo.db.Close()
if err != nil {
return fmt.Errorf("pebble close: %w", err)
}
return nil
return errors.Wrap(err, "on close")
}

14
log.go
View file

@ -13,7 +13,7 @@ import (
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/blockchain/indexers"
"github.com/btcsuite/btcd/claimtrie"
"github.com/btcsuite/btcd/claimtrie/node"
"github.com/btcsuite/btcd/connmgr"
"github.com/btcsuite/btcd/database"
"github.com/btcsuite/btcd/mempool"
@ -59,9 +59,9 @@ var (
amgrLog = backendLog.Logger("AMGR")
cmgrLog = backendLog.Logger("CMGR")
bcdbLog = backendLog.Logger("BCDB")
btcdLog = backendLog.Logger("BTCD")
btcdLog = backendLog.Logger("MAIN")
chanLog = backendLog.Logger("CHAN")
clmtLog = backendLog.Logger("CLMT")
lbryLog = backendLog.Logger("LBRY")
discLog = backendLog.Logger("DISC")
indxLog = backendLog.Logger("INDX")
minrLog = backendLog.Logger("MINR")
@ -79,7 +79,7 @@ func init() {
connmgr.UseLogger(cmgrLog)
database.UseLogger(bcdbLog)
blockchain.UseLogger(chanLog)
claimtrie.UseLogger(clmtLog)
node.UseLogger(lbryLog)
indexers.UseLogger(indxLog)
mining.UseLogger(minrLog)
cpuminer.UseLogger(minrLog)
@ -95,9 +95,9 @@ var subsystemLoggers = map[string]btclog.Logger{
"AMGR": amgrLog,
"CMGR": cmgrLog,
"BCDB": bcdbLog,
"BTCD": btcdLog,
"MAIN": btcdLog,
"CHAN": chanLog,
"CLMT": clmtLog,
"LBRY": lbryLog,
"DISC": discLog,
"INDX": indxLog,
"MINR": minrLog,
@ -119,7 +119,7 @@ func initLogRotator(logFile string) {
fmt.Fprintf(os.Stderr, "failed to create log directory: %v\n", err)
os.Exit(1)
}
r, err := rotator.New(logFile, 10*1024, false, 3)
r, err := rotator.New(logFile, 40*1024, false, 3)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create file rotator: %v\n", err)
os.Exit(1)

View file

@ -59,16 +59,15 @@ func logMemoryUsage() {
ds, err := dirSize(cfg.DataDir)
if err != nil {
btcdLog.Warnf("When reading directory: %s", err.Error())
btcdLog.Debugf("When reading directory: %s", err.Error())
continue
}
cur := fmt.Sprintf("RAM: using %.1f GB with %.1f available, DISK: using %.1f GB with %.1f available",
toGB(m2.RSS), toGB(m.Free), toGB(uint64(ds)), toGB(d.Free))
toGB(m2.RSS), toGB(m.Available), toGB(uint64(ds)), toGB(d.Free))
if cur != last {
btcdLog.Infof(cur)
last = cur
}
}
}

View file

@ -2735,19 +2735,19 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
switch cfg.ClaimTrieImpl {
case "none":
// Disable ClaimTrie for development purpose.
clmtLog.Infof("ClaimTrie is disabled")
lbryLog.Infof("ClaimTrie is disabled")
default:
ct, err = claimtrie.New(claimTrieCfg)
if err != nil {
return nil, err
}
if h := cfg.ClaimTrieHeight; h != 0 {
clmtLog.Infof("Reseting height to %d", h)
lbryLog.Infof("Reseting height to %d", h)
err := ct.ResetHeight(int32(h))
if err != nil {
return nil, err
}
clmtLog.Infof("Height is reset to %d", h)
lbryLog.Infof("Height is reset to %d", h)
}
}