From 46c237dbcfa048f5b6ccdf1af357afdb7085051a Mon Sep 17 00:00:00 2001 From: Roy Lee Date: Sun, 5 Aug 2018 13:59:25 -0700 Subject: [PATCH] [lbry] blockchain: connect to ClaimTrie Co-authored-by: Brannon King --- blockchain/chain.go | 96 ++++++++++++++++++++- blockchain/claimtrie.go | 183 ++++++++++++++++++++++++++++++++++++++++ btcd.go | 7 ++ server.go | 40 ++++++++- 4 files changed, 324 insertions(+), 2 deletions(-) create mode 100644 blockchain/claimtrie.go diff --git a/blockchain/chain.go b/blockchain/chain.go index b4a871b9..06cc7a76 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -17,6 +17,8 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + + "github.com/btcsuite/btcd/claimtrie" ) const ( @@ -180,6 +182,8 @@ type BlockChain struct { // certain blockchain events. notificationsLock sync.RWMutex notifications []NotificationCallback + + claimTrie *claimtrie.ClaimTrie } // HaveBlock returns whether or not the chain instance has the block represented @@ -571,7 +575,8 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, } // No warnings about unknown rules until the chain is current. - if b.isCurrent() { + current := b.isCurrent() + if current { // Warn if any unknown new rules are either about to activate or // have already been activated. if err := b.warnUnknownRuleActivations(node); err != nil { @@ -579,6 +584,13 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block, } } + // Handle LBRY Claim Scripts + if b.claimTrie != nil { + if err := b.ParseClaimScripts(block, node, view, current); err != nil { + return ruleError(ErrBadClaimTrie, err.Error()) + } + } + // Write any block status changes to DB before updating best state. err := b.index.flushToDB() if err != nil { @@ -761,6 +773,12 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view return err } + if b.claimTrie != nil { + if err = b.claimTrie.ResetHeight(node.parent.height); err != nil { + return err + } + } + // Prune fully spent entries and mark all entries in the view unmodified // now that the modifications have been committed to the database. view.commit() @@ -1614,6 +1632,11 @@ func (b *BlockChain) LocateHeaders(locator BlockLocator, hashStop *chainhash.Has return headers } +// ClaimTrie returns the claimTrie associated wit hthe chain. +func (b *BlockChain) ClaimTrie() *claimtrie.ClaimTrie { + return b.claimTrie +} + // IndexManager provides a generic interface that the is called when blocks are // connected and disconnected to and from the tip of the main chain for the // purpose of supporting optional indexes. @@ -1700,6 +1723,8 @@ type Config struct { // This field can be nil if the caller is not interested in using a // signature cache. HashCache *txscript.HashCache + + ClaimTrie *claimtrie.ClaimTrie } // New returns a BlockChain instance using the provided configuration details. @@ -1754,6 +1779,7 @@ func New(config *Config) (*BlockChain, error) { prevOrphans: make(map[chainhash.Hash][]*orphanBlock), warningCaches: newThresholdCaches(vbNumBits), deploymentCaches: newThresholdCaches(chaincfg.DefinedDeployments), + claimTrie: config.ClaimTrie, } // Initialize the chain state from the passed database. When the db @@ -1796,6 +1822,14 @@ func New(config *Config) (*BlockChain, error) { return nil, err } + if b.claimTrie != nil { + err := rebuildMissingClaimTrieData(&b, config.Interrupt) + if err != nil { + b.claimTrie.Close() + return nil, err + } + } + bestNode := b.bestChain.Tip() log.Infof("Chain state (height %d, hash %v, totaltx %d, work %v)", bestNode.height, bestNode.hash, b.stateSnapshot.TotalTxns, @@ -1803,3 +1837,63 @@ func New(config *Config) (*BlockChain, error) { return &b, nil } + +func rebuildMissingClaimTrieData(b *BlockChain, done <-chan struct{}) error { + target := b.bestChain.Height() + if b.claimTrie.Height() == target { + return nil + } + if b.claimTrie.Height() > target { + return b.claimTrie.ResetHeight(target) + } + + start := time.Now() + lastReport := time.Now() + // TODO: move this view inside the loop (or recreate it every 5 sec.) + // as accumulating all inputs has potential to use a huge amount of RAM + // but we need to get the spent inputs working for that to be possible + view := NewUtxoViewpoint() + for h := int32(0); h < target; h++ { + select { + case <-done: + return fmt.Errorf("rebuild unfinished at height %d", b.claimTrie.Height()) + default: + } + + n := b.bestChain.NodeByHeight(h + 1) + + var block *btcutil.Block + err := b.db.View(func(dbTx database.Tx) error { + var err error + block, err = dbFetchBlockByNode(dbTx, n) + return err + }) + if err != nil { + return err + } + + err = view.fetchInputUtxos(b.db, block) + if err != nil { + return err + } + + err = view.connectTransactions(block, nil) + if err != nil { + return err + } + + if h >= b.claimTrie.Height() { + err = b.ParseClaimScripts(block, n, view, false) + if err != nil { + return err + } + } + if time.Since(lastReport) > time.Second*5 { + lastReport = time.Now() + log.Infof("Rebuilding claim trie data to %d. At: %d", target, h) + } + } + log.Infof("Completed rebuilding claim trie data to %d. Took %s ", + b.claimTrie.Height(), time.Since(start)) + return nil +} diff --git a/blockchain/claimtrie.go b/blockchain/claimtrie.go new file mode 100644 index 00000000..1032860d --- /dev/null +++ b/blockchain/claimtrie.go @@ -0,0 +1,183 @@ +package blockchain + +import ( + "bytes" + "fmt" + + "github.com/pkg/errors" + + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + + "github.com/btcsuite/btcd/claimtrie" + "github.com/btcsuite/btcd/claimtrie/change" + "github.com/btcsuite/btcd/claimtrie/node" + "github.com/btcsuite/btcd/claimtrie/normalization" +) + +func (b *BlockChain) SetClaimtrieHeader(block *btcutil.Block, view *UtxoViewpoint) error { + b.chainLock.Lock() + defer b.chainLock.Unlock() + + err := b.ParseClaimScripts(block, nil, view, false) + if err != nil { + return errors.Wrapf(err, "in parse claim scripts") + } + + block.MsgBlock().Header.ClaimTrie = *b.claimTrie.MerkleHash() + err = b.claimTrie.ResetHeight(b.claimTrie.Height() - 1) + + return errors.Wrapf(err, "in reset height") +} + +func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view *UtxoViewpoint, shouldFlush bool) error { + ht := block.Height() + + for _, tx := range block.Transactions() { + h := handler{ht, tx, view, map[string][]byte{}} + if err := h.handleTxIns(b.claimTrie); err != nil { + return err + } + if err := h.handleTxOuts(b.claimTrie); err != nil { + return err + } + } + + err := b.claimTrie.AppendBlock() + if err != nil { + return errors.Wrapf(err, "in append block") + } + + if shouldFlush { + b.claimTrie.FlushToDisk() + } + + hash := b.claimTrie.MerkleHash() + if bn != nil && bn.claimTrie != *hash { + // undo our AppendBlock call as we've decided that our interpretation of the block data is incorrect, + // or that the person who made the block assembled the pieces incorrectly. + _ = b.claimTrie.ResetHeight(b.claimTrie.Height() - 1) + return errors.Errorf("height: %d, computed hash: %s != header's ClaimTrie: %s", ht, *hash, bn.claimTrie) + } + return nil +} + +type handler struct { + ht int32 + tx *btcutil.Tx + view *UtxoViewpoint + spent map[string][]byte +} + +func (h *handler) handleTxIns(ct *claimtrie.ClaimTrie) error { + if IsCoinBase(h.tx) { + return nil + } + for _, txIn := range h.tx.MsgTx().TxIn { + op := txIn.PreviousOutPoint + e := h.view.LookupEntry(op) + if e == nil { + return errors.Errorf("missing input in view for %s", op.String()) + } + cs, err := txscript.DecodeClaimScript(e.pkScript) + if err == txscript.ErrNotClaimScript { + continue + } + if err != nil { + return err + } + + var id change.ClaimID + name := cs.Name() // name of the previous one (that we're now spending) + + switch cs.Opcode() { + case txscript.OP_CLAIMNAME: // OP code from previous transaction + id = change.NewClaimID(op) // claimID of the previous item now being spent + h.spent[id.Key()] = normalization.NormalizeIfNecessary(name, ct.Height()) + err = ct.SpendClaim(name, op, id) + case txscript.OP_UPDATECLAIM: + copy(id[:], cs.ClaimID()) + h.spent[id.Key()] = normalization.NormalizeIfNecessary(name, ct.Height()) + err = ct.SpendClaim(name, op, id) + case txscript.OP_SUPPORTCLAIM: + copy(id[:], cs.ClaimID()) + err = ct.SpendSupport(name, op, id) + } + if err != nil { + return errors.Wrapf(err, "handleTxIns") + } + } + return nil +} + +func (h *handler) handleTxOuts(ct *claimtrie.ClaimTrie) error { + for i, txOut := range h.tx.MsgTx().TxOut { + op := *wire.NewOutPoint(h.tx.Hash(), uint32(i)) + cs, err := txscript.DecodeClaimScript(txOut.PkScript) + if err == txscript.ErrNotClaimScript { + continue + } + if err != nil { + return err + } + + var id change.ClaimID + name := cs.Name() + amt := txOut.Value + + switch cs.Opcode() { + case txscript.OP_CLAIMNAME: + id = change.NewClaimID(op) + err = ct.AddClaim(name, op, id, amt) + case txscript.OP_SUPPORTCLAIM: + copy(id[:], cs.ClaimID()) + err = ct.AddSupport(name, op, amt, id) + case txscript.OP_UPDATECLAIM: + // old code wouldn't run the update if name or claimID didn't match existing data + // that was a safety feature, but it should have rejected the transaction instead + // TODO: reject transactions with invalid update commands + copy(id[:], cs.ClaimID()) + normName := normalization.NormalizeIfNecessary(name, ct.Height()) + if !bytes.Equal(h.spent[id.Key()], normName) { + node.LogOnce(fmt.Sprintf("Invalid update operation: name or ID mismatch at %d for: %s, %s", + ct.Height(), normName, id.String())) + continue + } + + delete(h.spent, id.Key()) + err = ct.UpdateClaim(name, op, amt, id) + } + if err != nil { + return errors.Wrapf(err, "handleTxOuts") + } + } + return nil +} + +func (b *BlockChain) GetNamesChangedInBlock(height int32) ([]string, error) { + b.chainLock.RLock() + defer b.chainLock.RUnlock() + + return b.claimTrie.NamesChangedInBlock(height) +} + +func (b *BlockChain) GetClaimsForName(height int32, name string) (string, *node.Node, error) { + + normalizedName := normalization.NormalizeIfNecessary([]byte(name), height) + + b.chainLock.RLock() + defer b.chainLock.RUnlock() + + n, err := b.claimTrie.NodeAt(height, normalizedName) + if err != nil { + return string(normalizedName), nil, err + } + + if n == nil { + return string(normalizedName), nil, fmt.Errorf("name does not exist at height %d: %s", height, name) + } + + n.SortClaimsByBid() + return string(normalizedName), n, nil +} diff --git a/btcd.go b/btcd.go index b93851ba..4f04657b 100644 --- a/btcd.go +++ b/btcd.go @@ -16,6 +16,7 @@ import ( "runtime/pprof" "github.com/btcsuite/btcd/blockchain/indexers" + "github.com/btcsuite/btcd/claimtrie/param" "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/limits" @@ -147,6 +148,8 @@ func btcdMain(serverChan chan<- *server) error { return nil } + param.SetNetwork(activeNetParams.Params.Net) // prep the claimtrie params + // Create server and start it. server, err := newServer(cfg.Listeners, cfg.AgentBlacklist, cfg.AgentWhitelist, db, activeNetParams.Params, interrupt) @@ -161,6 +164,10 @@ func btcdMain(serverChan chan<- *server) error { server.Stop() server.WaitForShutdown() srvrLog.Infof("Server shutdown complete") + // TODO: tie into the sync manager for shutdown instead + if ct := server.chain.ClaimTrie(); ct != nil { + ct.Close() + } }() server.Start() if serverChan != nil { diff --git a/server.go b/server.go index 7512b989..5a55a2d4 100644 --- a/server.go +++ b/server.go @@ -27,6 +27,8 @@ import ( "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/claimtrie" + claimtrieconfig "github.com/btcsuite/btcd/claimtrie/config" "github.com/btcsuite/btcd/connmgr" "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/mempool" @@ -2721,8 +2723,43 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints) } - // Create a new block chain instance with the appropriate configuration. var err error + + claimTrieCfg := claimtrieconfig.DefaultConfig + claimTrieCfg.DataDir = cfg.DataDir + + var ct *claimtrie.ClaimTrie + + switch cfg.ClaimTrieImpl { + case "none": + // Disable ClaimTrie for development purpose. + lbryLog.Infof("ClaimTrie is disabled") + case "persistent": + claimTrieCfg.RamTrie = false + lbryLog.Infof("ClaimTrie uses Persistent implementation") + case "ram", "": + claimTrieCfg.RamTrie = true + lbryLog.Infof("ClaimTrie uses RamTrie implementation") + default: + lbryLog.Errorf("ClaimTrie uses Unknown implementation") + } + + if cfg.ClaimTrieImpl != "none" { + ct, err = claimtrie.New(claimTrieCfg) + if err != nil { + return nil, err + } + if h := cfg.ClaimTrieHeight; h != 0 { + lbryLog.Infof("Reseting claim trie height to %d", h) + err := ct.ResetHeight(int32(h)) + if err != nil { + return nil, err + } + lbryLog.Infof("Claim trie height is reset to %d", h) + } + } + + // Create a new block chain instance with the appropriate configuration. s.chain, err = blockchain.New(&blockchain.Config{ DB: s.db, Interrupt: interrupt, @@ -2732,6 +2769,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, SigCache: s.sigCache, IndexManager: indexManager, HashCache: s.hashCache, + ClaimTrie: ct, }) if err != nil { return nil, err