[lbry] blockchain: connect to ClaimTrie
Co-authored-by: Brannon King <countprimes@gmail.com>
This commit is contained in:
parent
8e059c14d7
commit
0224bf295b
4 changed files with 307 additions and 2 deletions
|
@ -17,6 +17,8 @@ import (
|
||||||
"github.com/btcsuite/btcd/txscript"
|
"github.com/btcsuite/btcd/txscript"
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/btcsuite/btcutil"
|
"github.com/btcsuite/btcutil"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/claimtrie"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -180,6 +182,8 @@ type BlockChain struct {
|
||||||
// certain blockchain events.
|
// certain blockchain events.
|
||||||
notificationsLock sync.RWMutex
|
notificationsLock sync.RWMutex
|
||||||
notifications []NotificationCallback
|
notifications []NotificationCallback
|
||||||
|
|
||||||
|
claimTrie *claimtrie.ClaimTrie
|
||||||
}
|
}
|
||||||
|
|
||||||
// HaveBlock returns whether or not the chain instance has the block represented
|
// HaveBlock returns whether or not the chain instance has the block represented
|
||||||
|
@ -571,7 +575,8 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block,
|
||||||
}
|
}
|
||||||
|
|
||||||
// No warnings about unknown rules until the chain is current.
|
// No warnings about unknown rules until the chain is current.
|
||||||
if b.isCurrent() {
|
current := b.isCurrent()
|
||||||
|
if current {
|
||||||
// Warn if any unknown new rules are either about to activate or
|
// Warn if any unknown new rules are either about to activate or
|
||||||
// have already been activated.
|
// have already been activated.
|
||||||
if err := b.warnUnknownRuleActivations(node); err != nil {
|
if err := b.warnUnknownRuleActivations(node); err != nil {
|
||||||
|
@ -579,6 +584,13 @@ func (b *BlockChain) connectBlock(node *blockNode, block *btcutil.Block,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle LBRY Claim Scripts
|
||||||
|
if b.claimTrie != nil {
|
||||||
|
if err := b.ParseClaimScripts(block, node, view, false, current); err != nil {
|
||||||
|
return ruleError(ErrBadClaimTrie, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Write any block status changes to DB before updating best state.
|
// Write any block status changes to DB before updating best state.
|
||||||
err := b.index.flushToDB()
|
err := b.index.flushToDB()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -761,6 +773,10 @@ func (b *BlockChain) disconnectBlock(node *blockNode, block *btcutil.Block, view
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = b.claimTrie.ResetHeight(node.parent.height); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Prune fully spent entries and mark all entries in the view unmodified
|
// Prune fully spent entries and mark all entries in the view unmodified
|
||||||
// now that the modifications have been committed to the database.
|
// now that the modifications have been committed to the database.
|
||||||
view.commit()
|
view.commit()
|
||||||
|
@ -1614,6 +1630,11 @@ func (b *BlockChain) LocateHeaders(locator BlockLocator, hashStop *chainhash.Has
|
||||||
return headers
|
return headers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClaimTrie returns the claimTrie associated wit hthe chain.
|
||||||
|
func (b *BlockChain) ClaimTrie() *claimtrie.ClaimTrie {
|
||||||
|
return b.claimTrie
|
||||||
|
}
|
||||||
|
|
||||||
// IndexManager provides a generic interface that the is called when blocks are
|
// IndexManager provides a generic interface that the is called when blocks are
|
||||||
// connected and disconnected to and from the tip of the main chain for the
|
// connected and disconnected to and from the tip of the main chain for the
|
||||||
// purpose of supporting optional indexes.
|
// purpose of supporting optional indexes.
|
||||||
|
@ -1700,6 +1721,8 @@ type Config struct {
|
||||||
// This field can be nil if the caller is not interested in using a
|
// This field can be nil if the caller is not interested in using a
|
||||||
// signature cache.
|
// signature cache.
|
||||||
HashCache *txscript.HashCache
|
HashCache *txscript.HashCache
|
||||||
|
|
||||||
|
ClaimTrie *claimtrie.ClaimTrie
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a BlockChain instance using the provided configuration details.
|
// New returns a BlockChain instance using the provided configuration details.
|
||||||
|
@ -1754,6 +1777,7 @@ func New(config *Config) (*BlockChain, error) {
|
||||||
prevOrphans: make(map[chainhash.Hash][]*orphanBlock),
|
prevOrphans: make(map[chainhash.Hash][]*orphanBlock),
|
||||||
warningCaches: newThresholdCaches(vbNumBits),
|
warningCaches: newThresholdCaches(vbNumBits),
|
||||||
deploymentCaches: newThresholdCaches(chaincfg.DefinedDeployments),
|
deploymentCaches: newThresholdCaches(chaincfg.DefinedDeployments),
|
||||||
|
claimTrie: config.ClaimTrie,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the chain state from the passed database. When the db
|
// Initialize the chain state from the passed database. When the db
|
||||||
|
@ -1796,6 +1820,14 @@ func New(config *Config) (*BlockChain, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.claimTrie != nil {
|
||||||
|
err := rebuildMissingClaimTrieData(&b, config.Interrupt)
|
||||||
|
if err != nil {
|
||||||
|
b.claimTrie.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bestNode := b.bestChain.Tip()
|
bestNode := b.bestChain.Tip()
|
||||||
log.Infof("Chain state (height %d, hash %v, totaltx %d, work %v)",
|
log.Infof("Chain state (height %d, hash %v, totaltx %d, work %v)",
|
||||||
bestNode.height, bestNode.hash, b.stateSnapshot.TotalTxns,
|
bestNode.height, bestNode.hash, b.stateSnapshot.TotalTxns,
|
||||||
|
@ -1803,3 +1835,63 @@ func New(config *Config) (*BlockChain, error) {
|
||||||
|
|
||||||
return &b, nil
|
return &b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func rebuildMissingClaimTrieData(b *BlockChain, done <-chan struct{}) error {
|
||||||
|
target := b.bestChain.Height()
|
||||||
|
if b.claimTrie.Height() == target {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if b.claimTrie.Height() > target {
|
||||||
|
return b.claimTrie.ResetHeight(target)
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
lastReport := time.Now()
|
||||||
|
// TODO: move this view inside the loop (or recreate it every 5 sec.)
|
||||||
|
// as accumulating all inputs has potential to use a huge amount of RAM
|
||||||
|
// but we need to get the spent inputs working for that to be possible
|
||||||
|
view := NewUtxoViewpoint()
|
||||||
|
for h := int32(0); h < target; h++ {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return fmt.Errorf("rebuild unfinished at height %d", b.claimTrie.Height())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
n := b.bestChain.NodeByHeight(h + 1)
|
||||||
|
|
||||||
|
var block *btcutil.Block
|
||||||
|
err := b.db.View(func(dbTx database.Tx) error {
|
||||||
|
var err error
|
||||||
|
block, err = dbFetchBlockByNode(dbTx, n)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = view.fetchInputUtxos(b.db, block)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = view.connectTransactions(block, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if h >= b.claimTrie.Height() {
|
||||||
|
err = b.ParseClaimScripts(block, n, view, true, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if time.Since(lastReport) > time.Second*5 {
|
||||||
|
lastReport = time.Now()
|
||||||
|
log.Infof("Rebuilding claim trie data to %d. At: %d", target, h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Infof("Completed rebuilding claim trie data to %d. Took %s ",
|
||||||
|
b.claimTrie.Height(), time.Since(start))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
168
blockchain/claimtrie.go
Normal file
168
blockchain/claimtrie.go
Normal file
|
@ -0,0 +1,168 @@
|
||||||
|
package blockchain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/txscript"
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
|
"github.com/btcsuite/btcutil"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/claimtrie"
|
||||||
|
"github.com/btcsuite/btcd/claimtrie/change"
|
||||||
|
"github.com/btcsuite/btcd/claimtrie/node"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (b *BlockChain) ParseClaimScripts(block *btcutil.Block, bn *blockNode, view *UtxoViewpoint,
|
||||||
|
failOnHashMiss bool, shouldFlush bool) error {
|
||||||
|
ht := block.Height()
|
||||||
|
|
||||||
|
for _, tx := range block.Transactions() {
|
||||||
|
h := handler{ht, tx, view, map[string][]byte{}}
|
||||||
|
if err := h.handleTxIns(b.claimTrie); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := h.handleTxOuts(b.claimTrie); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := b.claimTrie.AppendBlock()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "in append block")
|
||||||
|
}
|
||||||
|
|
||||||
|
if shouldFlush {
|
||||||
|
b.claimTrie.FlushToDisk()
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := b.claimTrie.MerkleHash()
|
||||||
|
if bn.claimTrie != *hash {
|
||||||
|
if failOnHashMiss {
|
||||||
|
return errors.Errorf("height: %d, ct.MerkleHash: %s != node.ClaimTrie: %s", ht, *hash, bn.claimTrie)
|
||||||
|
}
|
||||||
|
node.LogOnce(fmt.Sprintf("\n\nHeight: %d, ct.MerkleHash: %s != node.ClaimTrie: %s, Error: %s", ht, *hash, bn.claimTrie, err))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type handler struct {
|
||||||
|
ht int32
|
||||||
|
tx *btcutil.Tx
|
||||||
|
view *UtxoViewpoint
|
||||||
|
spent map[string][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handler) handleTxIns(ct *claimtrie.ClaimTrie) error {
|
||||||
|
if IsCoinBase(h.tx) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, txIn := range h.tx.MsgTx().TxIn {
|
||||||
|
op := txIn.PreviousOutPoint
|
||||||
|
e := h.view.LookupEntry(op)
|
||||||
|
if e == nil {
|
||||||
|
return errors.Errorf("missing input in view for %s", op.String())
|
||||||
|
}
|
||||||
|
cs, err := txscript.DecodeClaimScript(e.pkScript)
|
||||||
|
if err == txscript.ErrNotClaimScript {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var id change.ClaimID
|
||||||
|
name := cs.Name() // name of the previous one (that we're now spending)
|
||||||
|
|
||||||
|
switch cs.Opcode() {
|
||||||
|
case txscript.OP_CLAIMNAME: // OP code from previous transaction
|
||||||
|
id = change.NewClaimID(op) // claimID of the previous item now being spent
|
||||||
|
h.spent[id.Key()] = node.NormalizeIfNecessary(name, ct.Height())
|
||||||
|
err = ct.SpendClaim(name, op, id)
|
||||||
|
case txscript.OP_UPDATECLAIM:
|
||||||
|
copy(id[:], cs.ClaimID())
|
||||||
|
h.spent[id.Key()] = node.NormalizeIfNecessary(name, ct.Height())
|
||||||
|
err = ct.SpendClaim(name, op, id)
|
||||||
|
case txscript.OP_SUPPORTCLAIM:
|
||||||
|
copy(id[:], cs.ClaimID())
|
||||||
|
err = ct.SpendSupport(name, op, id)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "handleTxIns")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handler) handleTxOuts(ct *claimtrie.ClaimTrie) error {
|
||||||
|
for i, txOut := range h.tx.MsgTx().TxOut {
|
||||||
|
op := *wire.NewOutPoint(h.tx.Hash(), uint32(i))
|
||||||
|
cs, err := txscript.DecodeClaimScript(txOut.PkScript)
|
||||||
|
if err == txscript.ErrNotClaimScript {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var id change.ClaimID
|
||||||
|
name := cs.Name()
|
||||||
|
amt := txOut.Value
|
||||||
|
|
||||||
|
switch cs.Opcode() {
|
||||||
|
case txscript.OP_CLAIMNAME:
|
||||||
|
id = change.NewClaimID(op)
|
||||||
|
err = ct.AddClaim(name, op, id, amt)
|
||||||
|
case txscript.OP_SUPPORTCLAIM:
|
||||||
|
copy(id[:], cs.ClaimID())
|
||||||
|
err = ct.AddSupport(name, op, amt, id)
|
||||||
|
case txscript.OP_UPDATECLAIM:
|
||||||
|
// old code wouldn't run the update if name or claimID didn't match existing data
|
||||||
|
// that was a safety feature, but it should have rejected the transaction instead
|
||||||
|
// TODO: reject transactions with invalid update commands
|
||||||
|
copy(id[:], cs.ClaimID())
|
||||||
|
normName := node.NormalizeIfNecessary(name, ct.Height())
|
||||||
|
if !bytes.Equal(h.spent[id.Key()], normName) {
|
||||||
|
node.LogOnce(fmt.Sprintf("Invalid update operation: name or ID mismatch at %d for: %s, %s",
|
||||||
|
ct.Height(), normName, id.String()))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(h.spent, id.Key())
|
||||||
|
err = ct.UpdateClaim(name, op, amt, id)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "handleTxOuts")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlockChain) GetNamesChangedInBlock(height int32) ([]string, error) {
|
||||||
|
b.chainLock.RLock()
|
||||||
|
defer b.chainLock.RUnlock()
|
||||||
|
|
||||||
|
return b.claimTrie.NamesChangedInBlock(height)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlockChain) GetClaimsForName(height int32, name string) (string, *node.Node, error) {
|
||||||
|
|
||||||
|
normalizedName := node.NormalizeIfNecessary([]byte(name), height)
|
||||||
|
|
||||||
|
b.chainLock.RLock()
|
||||||
|
defer b.chainLock.RUnlock()
|
||||||
|
|
||||||
|
n, err := b.claimTrie.NodeAt(height, normalizedName)
|
||||||
|
if err != nil {
|
||||||
|
return string(normalizedName), nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if n == nil {
|
||||||
|
return string(normalizedName), nil, fmt.Errorf("name does not exist at height %d: %s", height, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.SortClaimsByBid()
|
||||||
|
return string(normalizedName), n, nil
|
||||||
|
}
|
7
btcd.go
7
btcd.go
|
@ -16,6 +16,7 @@ import (
|
||||||
"runtime/pprof"
|
"runtime/pprof"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/blockchain/indexers"
|
"github.com/btcsuite/btcd/blockchain/indexers"
|
||||||
|
"github.com/btcsuite/btcd/claimtrie/param"
|
||||||
"github.com/btcsuite/btcd/database"
|
"github.com/btcsuite/btcd/database"
|
||||||
"github.com/btcsuite/btcd/limits"
|
"github.com/btcsuite/btcd/limits"
|
||||||
|
|
||||||
|
@ -147,6 +148,8 @@ func btcdMain(serverChan chan<- *server) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
param.SetNetwork(activeNetParams.Params.Net) // prep the claimtrie params
|
||||||
|
|
||||||
// Create server and start it.
|
// Create server and start it.
|
||||||
server, err := newServer(cfg.Listeners, cfg.AgentBlacklist,
|
server, err := newServer(cfg.Listeners, cfg.AgentBlacklist,
|
||||||
cfg.AgentWhitelist, db, activeNetParams.Params, interrupt)
|
cfg.AgentWhitelist, db, activeNetParams.Params, interrupt)
|
||||||
|
@ -161,6 +164,10 @@ func btcdMain(serverChan chan<- *server) error {
|
||||||
server.Stop()
|
server.Stop()
|
||||||
server.WaitForShutdown()
|
server.WaitForShutdown()
|
||||||
srvrLog.Infof("Server shutdown complete")
|
srvrLog.Infof("Server shutdown complete")
|
||||||
|
// TODO: tie into the sync manager for shutdown instead
|
||||||
|
if ct := server.chain.ClaimTrie(); ct != nil {
|
||||||
|
ct.Close()
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
server.Start()
|
server.Start()
|
||||||
if serverChan != nil {
|
if serverChan != nil {
|
||||||
|
|
40
server.go
40
server.go
|
@ -27,6 +27,8 @@ import (
|
||||||
"github.com/btcsuite/btcd/blockchain/indexers"
|
"github.com/btcsuite/btcd/blockchain/indexers"
|
||||||
"github.com/btcsuite/btcd/chaincfg"
|
"github.com/btcsuite/btcd/chaincfg"
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
|
"github.com/btcsuite/btcd/claimtrie"
|
||||||
|
claimtrieconfig "github.com/btcsuite/btcd/claimtrie/config"
|
||||||
"github.com/btcsuite/btcd/connmgr"
|
"github.com/btcsuite/btcd/connmgr"
|
||||||
"github.com/btcsuite/btcd/database"
|
"github.com/btcsuite/btcd/database"
|
||||||
"github.com/btcsuite/btcd/mempool"
|
"github.com/btcsuite/btcd/mempool"
|
||||||
|
@ -2721,8 +2723,43 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
|
||||||
checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints)
|
checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new block chain instance with the appropriate configuration.
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
claimTrieCfg := claimtrieconfig.DefaultConfig
|
||||||
|
claimTrieCfg.DataDir = cfg.DataDir
|
||||||
|
|
||||||
|
var ct *claimtrie.ClaimTrie
|
||||||
|
|
||||||
|
switch cfg.ClaimTrieImpl {
|
||||||
|
case "none":
|
||||||
|
// Disable ClaimTrie for development purpose.
|
||||||
|
lbryLog.Infof("ClaimTrie is disabled")
|
||||||
|
case "persistent":
|
||||||
|
claimTrieCfg.RamTrie = false
|
||||||
|
lbryLog.Infof("ClaimTrie uses Persistent implementation")
|
||||||
|
case "ram", "":
|
||||||
|
claimTrieCfg.RamTrie = true
|
||||||
|
lbryLog.Infof("ClaimTrie uses RamTrie implementation")
|
||||||
|
default:
|
||||||
|
lbryLog.Errorf("ClaimTrie uses Unknown implementation")
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.ClaimTrieImpl != "none" {
|
||||||
|
ct, err = claimtrie.New(claimTrieCfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if h := cfg.ClaimTrieHeight; h != 0 {
|
||||||
|
lbryLog.Infof("Reseting claim trie height to %d", h)
|
||||||
|
err := ct.ResetHeight(int32(h))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lbryLog.Infof("Claim trie height is reset to %d", h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new block chain instance with the appropriate configuration.
|
||||||
s.chain, err = blockchain.New(&blockchain.Config{
|
s.chain, err = blockchain.New(&blockchain.Config{
|
||||||
DB: s.db,
|
DB: s.db,
|
||||||
Interrupt: interrupt,
|
Interrupt: interrupt,
|
||||||
|
@ -2732,6 +2769,7 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
|
||||||
SigCache: s.sigCache,
|
SigCache: s.sigCache,
|
||||||
IndexManager: indexManager,
|
IndexManager: indexManager,
|
||||||
HashCache: s.hashCache,
|
HashCache: s.hashCache,
|
||||||
|
ClaimTrie: ct,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in a new issue