wip: commits, nodes, and trie are all backed with storage.

This commit is contained in:
Tzu-Jung Lee 2018-08-05 17:43:38 -07:00
parent 4651558b98
commit a7fe59dd49
14 changed files with 729 additions and 637 deletions

41
cfg/cfg.go Normal file
View file

@ -0,0 +1,41 @@
package cfg
import (
"path/filepath"
"github.com/btcsuite/btcutil"
)
// Index ...
type Index int
// ...
const (
TrieDB Index = 1 << iota
CommitDB
NodeDB
ClaimScriptDB
)
var (
defaultHomeDir = btcutil.AppDataDir("lbrycrd.go", false)
defaultDataDir = filepath.Join(defaultHomeDir, "data")
)
// Config ...
type Config struct {
path string
}
var datastores = map[Index]string{
ClaimScriptDB: "cs.db", // Exported from BTCD
CommitDB: "commit.db",
TrieDB: "trie.db",
NodeDB: "nm.db",
}
// DefaultConfig ...
func DefaultConfig(idx Index) string {
return filepath.Join(defaultDataDir, datastores[idx])
}

65
change/chg.go Normal file
View file

@ -0,0 +1,65 @@
package change
import (
"fmt"
"github.com/lbryio/claimtrie/claim"
)
// Cmd defines the type of Change.
type Cmd int
// The list of command currently supported.
const (
AddClaim Cmd = 1 << iota
SpendClaim
UpdateClaim
AddSupport
SpendSupport
)
var names = map[Cmd]string{
AddClaim: "+C",
SpendClaim: "-C",
UpdateClaim: "+U",
AddSupport: "+S",
SpendSupport: "-S",
}
// Change represent a record of changes to the node of Name at Height.
type Change struct {
Height claim.Height
Cmd Cmd
Name string
OP claim.OutPoint
Amt claim.Amount
ID claim.ID
Value []byte
}
func (c Change) String() string {
return fmt.Sprintf("%6d %s %s %s %12d [%s]", c.Height, names[c.Cmd], c.OP, c.ID, c.Amt, c.Name)
}
// New returns a Change initialized with Cmd.
func New(cmd Cmd) *Change {
return &Change{Cmd: cmd}
}
// SetName sets name to the Change.
func (c *Change) SetName(name string) *Change { c.Name = name; return c }
// SetHeight sets height to the Change.
func (c *Change) SetHeight(h claim.Height) *Change { c.Height = h; return c }
// SetOP sets OP to the Change.
func (c *Change) SetOP(op claim.OutPoint) *Change { c.OP = op; return c }
// SetAmt sets amt to the Change.
func (c *Change) SetAmt(amt claim.Amount) *Change { c.Amt = amt; return c }
// SetID sets id to the Change.
func (c *Change) SetID(id claim.ID) *Change { c.ID = id; return c }
// SetValue sets value to the Change.
func (c *Change) SetValue(v []byte) *Change { c.Value = v; return c }

93
change/list.go Normal file
View file

@ -0,0 +1,93 @@
package change
import (
"bytes"
"encoding/gob"
"fmt"
"github.com/lbryio/claimtrie/claim"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb"
)
// List ...
type List struct {
db *leveldb.DB
name string
chgs []*Change
err error
}
// NewChangeList ...
func NewChangeList(db *leveldb.DB, name string) *List {
return &List{db: db, name: name}
}
// Changes returns the Changes in the list.
func (cl *List) Changes() []*Change {
return cl.chgs
}
// Load loads Changes from database.
func (cl *List) Load() *List {
if cl.err == nil {
cl.chgs, cl.err = loadChanges(cl.db, cl.name)
}
return cl
}
// Save saves Changes to database.
func (cl *List) Save() *List {
if cl.err == nil {
cl.err = saveChanges(cl.db, cl.name, cl.chgs)
}
return cl
}
// Append appenda a Change to the Changes in the list.
func (cl *List) Append(chg *Change) *List {
cl.chgs = append(cl.chgs, chg)
return cl
}
// Truncate truncates Changes that has Heiht larger than ht.
func (cl *List) Truncate(ht claim.Height) *List {
for i, chg := range cl.chgs {
if chg.Height > ht {
cl.chgs = cl.chgs[:i]
break
}
}
return cl
}
// Dump prints out the Changes in the list. (Debugging only.)
func (cl *List) Dump() *List {
for i, chg := range cl.chgs {
fmt.Printf("chgs[%d] %s\n", i, chg)
}
return cl
}
func loadChanges(db *leveldb.DB, name string) ([]*Change, error) {
data, err := db.Get([]byte(name), nil)
if err == leveldb.ErrNotFound {
return nil, nil
} else if err != nil {
return nil, errors.Wrapf(err, "db.Get(%s)", name)
}
var chgs []*Change
if err = gob.NewDecoder(bytes.NewBuffer(data)).Decode(&chgs); err != nil {
return nil, errors.Wrapf(err, "gob.Decode(&blk)")
}
return chgs, nil
}
func saveChanges(db *leveldb.DB, name string, chgs []*Change) error {
buf := bytes.NewBuffer(nil)
if err := gob.NewEncoder(buf).Encode(&chgs); err != nil {
return errors.Wrapf(err, "gob.Decode(&blk)")
}
return errors.Wrapf(db.Put([]byte(name), buf.Bytes(), nil), "db.put(%s, buf)", name)
}

View file

@ -35,8 +35,8 @@ type Claim struct {
func (c *Claim) setOutPoint(op OutPoint) *Claim { c.OutPoint = op; return c } func (c *Claim) setOutPoint(op OutPoint) *Claim { c.OutPoint = op; return c }
func (c *Claim) setID(id ID) *Claim { c.ID = id; return c } func (c *Claim) setID(id ID) *Claim { c.ID = id; return c }
func (c *Claim) setAmt(amt Amount) *Claim { c.Amt = amt; return c } func (c *Claim) setAmt(amt Amount) *Claim { c.Amt = amt; return c }
func (c *Claim) setAccepted(h Height) *Claim { c.Accepted = h; return c } func (c *Claim) setAccepted(ht Height) *Claim { c.Accepted = ht; return c }
func (c *Claim) setActiveAt(h Height) *Claim { c.ActiveAt = h; return c } func (c *Claim) setActiveAt(ht Height) *Claim { c.ActiveAt = ht; return c }
func (c *Claim) String() string { return claimToString(c) } func (c *Claim) String() string { return claimToString(c) }
func (c *Claim) expireAt() Height { func (c *Claim) expireAt() Height {
@ -46,8 +46,8 @@ func (c *Claim) expireAt() Height {
return c.Accepted + paramOriginalClaimExpirationTime return c.Accepted + paramOriginalClaimExpirationTime
} }
func isActiveAt(c *Claim, h Height) bool { func isActiveAt(c *Claim, ht Height) bool {
return c != nil && c.ActiveAt <= h && c.expireAt() > h return c != nil && c.ActiveAt <= ht && c.expireAt() > ht
} }
func equal(a, b *Claim) bool { func equal(a, b *Claim) bool {

View file

@ -1,7 +1,6 @@
package claim package claim
import ( import (
"fmt"
"math" "math"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
@ -22,8 +21,6 @@ type Node struct {
// refer to updateClaim. // refer to updateClaim.
removed list removed list
records []*Cmd
} }
// NewNode returns a new Node. // NewNode returns a new Node.
@ -41,36 +38,11 @@ func (n *Node) BestClaim() *Claim {
return n.best return n.best
} }
// AddClaim adds a claim to the node. // AddClaim adds a Claim to the Node.
func (n *Node) AddClaim(op OutPoint, amt Amount) error { func (n *Node) AddClaim(op OutPoint, amt Amount) error {
return n.execute(n.record(CmdAddClaim, op, amt, ID{}))
}
// SpendClaim spends a claim in the node.
func (n *Node) SpendClaim(op OutPoint) error {
return n.execute(n.record(CmdSpendClaim, op, 0, ID{}))
}
// UpdateClaim updates a claim in the node.
func (n *Node) UpdateClaim(op OutPoint, amt Amount, id ID) error {
return n.execute(n.record(CmdUpdateClaim, op, amt, id))
}
// AddSupport adds a support in the node.
func (n *Node) AddSupport(op OutPoint, amt Amount, id ID) error {
return n.execute(n.record(CmdAddSupport, op, amt, id))
}
// SpendSupport spends a spport in the node.
func (n *Node) SpendSupport(op OutPoint) error {
return n.execute(n.record(CmdSpendSupport, op, 0, ID{}))
}
func (n *Node) addClaim(op OutPoint, amt Amount) error {
if find(byOP(op), n.claims, n.supports) != nil { if find(byOP(op), n.claims, n.supports) != nil {
return ErrDuplicate return ErrDuplicate
} }
accepted := n.height + 1 accepted := n.height + 1
c := New(op, amt).setID(NewID(op)).setAccepted(accepted) c := New(op, amt).setID(NewID(op)).setAccepted(accepted)
c.setActiveAt(accepted + calDelay(accepted, n.tookover)) c.setActiveAt(accepted + calDelay(accepted, n.tookover))
@ -82,7 +54,8 @@ func (n *Node) addClaim(op OutPoint, amt Amount) error {
return nil return nil
} }
func (n *Node) spendClaim(op OutPoint) error { // SpendClaim spends a Claim in the Node.
func (n *Node) SpendClaim(op OutPoint) error {
var c *Claim var c *Claim
if n.claims, c = remove(n.claims, byOP(op)); c == nil { if n.claims, c = remove(n.claims, byOP(op)); c == nil {
return ErrNotFound return ErrNotFound
@ -91,6 +64,7 @@ func (n *Node) spendClaim(op OutPoint) error {
return nil return nil
} }
// UpdateClaim updates a Claim in the Node.
// A claim update is composed of two separate commands (2 & 3 below). // A claim update is composed of two separate commands (2 & 3 below).
// //
// (1) blk 500: Add Claim (opA, amtA, NewID(opA) // (1) blk 500: Add Claim (opA, amtA, NewID(opA)
@ -100,7 +74,7 @@ func (n *Node) spendClaim(op OutPoint) error {
// //
// For each block, all the spent claims are kept in n.removed until committed. // For each block, all the spent claims are kept in n.removed until committed.
// The paired (spend, update) commands has to happen in the same trasaction. // The paired (spend, update) commands has to happen in the same trasaction.
func (n *Node) updateClaim(op OutPoint, amt Amount, id ID) error { func (n *Node) UpdateClaim(op OutPoint, amt Amount, id ID) error {
if find(byOP(op), n.claims, n.supports) != nil { if find(byOP(op), n.claims, n.supports) != nil {
return ErrDuplicate return ErrDuplicate
} }
@ -119,14 +93,15 @@ func (n *Node) updateClaim(op OutPoint, amt Amount, id ID) error {
return nil return nil
} }
func (n *Node) addSupport(op OutPoint, amt Amount, id ID) error { // AddSupport adds a Support to the Node.
func (n *Node) AddSupport(op OutPoint, amt Amount, id ID) error {
if find(byOP(op), n.claims, n.supports) != nil { if find(byOP(op), n.claims, n.supports) != nil {
return ErrDuplicate return ErrDuplicate
} }
// Accepted by rules. No effects on bidding result though. // Accepted by rules. No effects on bidding result though.
// It may be spent later. // It may be spent later.
if find(byID(id), n.claims, n.removed) == nil { if find(byID(id), n.claims, n.removed) == nil {
fmt.Printf("INFO: can't find suooported claim ID: %s\n", id) // fmt.Printf("INFO: can't find suooported claim ID: %s for %s\n", id, n.name)
} }
accepted := n.height + 1 accepted := n.height + 1
@ -139,7 +114,8 @@ func (n *Node) addSupport(op OutPoint, amt Amount, id ID) error {
return nil return nil
} }
func (n *Node) spendSupport(op OutPoint) error { // SpendSupport spends a support in the Node.
func (n *Node) SpendSupport(op OutPoint) error {
var s *Claim var s *Claim
if n.supports, s = remove(n.supports, byOP(op)); s != nil { if n.supports, s = remove(n.supports, byOP(op)); s != nil {
return nil return nil
@ -147,6 +123,26 @@ func (n *Node) spendSupport(op OutPoint) error {
return ErrNotFound return ErrNotFound
} }
// AdjustTo increments current height until it reaches the specific height.
func (n *Node) AdjustTo(ht Height) *Node {
if ht <= n.height {
return n
}
for n.height < ht {
n.height++
n.bid()
next := n.NextUpdate()
if next > ht || next == n.height {
n.height = ht
break
}
n.height = next
n.bid()
}
n.bid()
return n
}
// NextUpdate returns the height at which pending updates should happen. // NextUpdate returns the height at which pending updates should happen.
// When no pending updates exist, current height is returned. // When no pending updates exist, current height is returned.
func (n *Node) NextUpdate() Height { func (n *Node) NextUpdate() Height {
@ -191,15 +187,15 @@ func (n *Node) bid() {
n.removed = nil n.removed = nil
} }
func updateEffectiveAmounts(h Height, claims, supports list) { func updateEffectiveAmounts(ht Height, claims, supports list) {
for _, c := range claims { for _, c := range claims {
c.EffAmt = 0 c.EffAmt = 0
if !isActiveAt(c, h) { if !isActiveAt(c, ht) {
continue continue
} }
c.EffAmt = c.Amt c.EffAmt = c.Amt
for _, s := range supports { for _, s := range supports {
if !isActiveAt(s, h) || s.ID != c.ID { if !isActiveAt(s, ht) || s.ID != c.ID {
continue continue
} }
c.EffAmt += s.Amt c.EffAmt += s.Amt
@ -215,11 +211,11 @@ func updateActiveHeights(n *Node, lists ...list) {
} }
} }
func findCandiadte(h Height, claims list) *Claim { func findCandiadte(ht Height, claims list) *Claim {
var c *Claim var c *Claim
for _, v := range claims { for _, v := range claims {
switch { switch {
case !isActiveAt(v, h): case !isActiveAt(v, ht):
continue continue
case c == nil: case c == nil:
c = v c = v

View file

@ -1,135 +0,0 @@
package claim
import (
"fmt"
"github.com/pkg/errors"
)
type cmd int
// ...
const (
CmdAddClaim cmd = 1 << iota
CmdSpendClaim
CmdUpdateClaim
CmdAddSupport
CmdSpendSupport
)
var cmdName = map[cmd]string{
CmdAddClaim: "+C",
CmdSpendClaim: "-C",
CmdUpdateClaim: "+U",
CmdAddSupport: "+S",
CmdSpendSupport: "-S",
}
// Cmd ...
type Cmd struct {
Height Height
Cmd cmd
Name string
OP OutPoint
Amt Amount
ID ID
Value []byte
}
func (c Cmd) String() string {
return fmt.Sprintf("%6d %s %s %s %12d [%s]", c.Height, cmdName[c.Cmd], c.OP, c.ID, c.Amt, c.Name)
}
func (n *Node) record(c cmd, op OutPoint, amt Amount, id ID) *Cmd {
r := &Cmd{Height: n.height + 1, Name: n.name, Cmd: c, OP: op, Amt: amt, ID: id}
n.records = append(n.records, r)
return r
}
// AdjustTo increments current height until it reaches the specific height.
func (n *Node) AdjustTo(h Height) error {
if h < n.height {
return errors.Wrapf(ErrInvalidHeight, "adjust n.height: %d > %d", n.height, h)
}
if h == n.height {
return nil
}
for n.height < h {
n.height++
n.bid()
next := n.NextUpdate()
if next > h {
n.height = h
break
}
n.height = next
}
n.bid()
return nil
}
// Recall ...
func (n *Node) Recall(h Height) error {
if h >= n.height {
return errors.Wrapf(ErrInvalidHeight, "h: %d >= n.height: %d", h, n.height)
}
fmt.Printf("n.Recall from %d to %d\n", n.height, h)
err := n.replay(h, false)
return errors.Wrapf(err, "reply(%d, false)", h)
}
// Reset rests ...
func (n *Node) Reset(h Height) error {
if h > n.height {
return nil
}
fmt.Printf("n.Reset from %d to %d\n", n.height, h)
err := n.replay(h, true)
return errors.Wrapf(err, "reply(%d, true)", h)
}
func (n *Node) replay(h Height, truncate bool) error {
fmt.Printf("replay %s from %d to %d:\n", n.name, n.height, h)
backup := n.records
*n = *NewNode(n.name)
n.records = backup
var i int
var r *Cmd
for i < len(n.records) {
r = n.records[i]
if n.height == r.Height-1 {
if err := n.execute(r); err != nil {
return err
}
i++
continue
}
n.height++
n.bid()
if n.height == h {
break
}
}
if truncate {
n.records = n.records[:i]
}
return nil
}
func (n *Node) execute(c *Cmd) error {
var err error
switch c.Cmd {
case CmdAddClaim:
err = n.addClaim(c.OP, c.Amt)
case CmdSpendClaim:
err = n.spendClaim(c.OP)
case CmdUpdateClaim:
err = n.updateClaim(c.OP, c.Amt, c.ID)
case CmdAddSupport:
err = n.addSupport(c.OP, c.Amt, c.ID)
case CmdSpendSupport:
err = n.spendSupport(c.OP)
}
return errors.Wrapf(err, "cmd %s", c)
}

View file

@ -22,8 +22,8 @@ func export(n *Node) interface{} {
}{ }{
Height: n.height, Height: n.height,
Hash: hash, Hash: hash,
NextUpdate: n.NextUpdate(),
Tookover: n.tookover, Tookover: n.tookover,
NextUpdate: n.NextUpdate(),
BestClaim: n.best, BestClaim: n.best,
Claims: n.claims, Claims: n.claims,
Supports: n.supports, Supports: n.supports,
@ -39,7 +39,7 @@ func nodeToString(n *Node) string {
{{end}} {{end}}
{{- end}} {{- end}}
{{- if .Supports}} {{- if .Supports}}
S {{range .Supports}}{{.}} {{range .Supports}}S {{.}}
{{end}} {{end}}
{{- end}}` {{- end}}`
@ -54,6 +54,6 @@ func nodeToString(n *Node) string {
} }
func claimToString(c *Claim) string { func claimToString(c *Claim) string {
return fmt.Sprintf("%-68s id: %s accepted: %3d active: %3d, amt: %12d effamt: %3d", return fmt.Sprintf("%-68s id: %s accepted: %6d active: %6d, amt: %12d effamt: %12d",
c.OutPoint, c.ID, c.Accepted, c.ActiveAt, c.Amt, c.EffAmt) c.OutPoint, c.ID, c.Accepted, c.ActiveAt, c.Amt, c.EffAmt)
} }

View file

@ -3,6 +3,7 @@ package claimtrie
import ( import (
"fmt" "fmt"
"github.com/lbryio/claimtrie/change"
"github.com/lbryio/claimtrie/claim" "github.com/lbryio/claimtrie/claim"
"github.com/lbryio/claimtrie/nodemgr" "github.com/lbryio/claimtrie/nodemgr"
"github.com/lbryio/claimtrie/trie" "github.com/lbryio/claimtrie/trie"
@ -14,140 +15,135 @@ import (
// ClaimTrie implements a Merkle Trie supporting linear history of commits. // ClaimTrie implements a Merkle Trie supporting linear history of commits.
type ClaimTrie struct { type ClaimTrie struct {
height claim.Height cm *CommitMgr
head *Commit
stg *trie.Trie
nm *nodemgr.NodeMgr nm *nodemgr.NodeMgr
tr *trie.Trie
} }
// New returns a ClaimTrie. // New returns a ClaimTrie.
func New(dbTrie, dbNodeMgr *leveldb.DB) *ClaimTrie { func New(dbCommit, dbTrie, dbNodeMgr *leveldb.DB) *ClaimTrie {
nm := nodemgr.New(dbNodeMgr) nm := nodemgr.New(dbNodeMgr)
cm := NewCommitMgr(dbCommit)
return &ClaimTrie{ return &ClaimTrie{
head: newCommit(nil, CommitMeta{0}, trie.EmptyTrieHash), cm: cm,
nm: nm, nm: nm,
stg: trie.New(nm, dbTrie), tr: trie.New(nm, dbTrie),
} }
} }
// Load loads ClaimTrie, NodeManager, Trie from databases.
func (ct *ClaimTrie) Load() error {
if err := ct.cm.Load(); err != nil {
return errors.Wrapf(err, "cm.Load()")
}
fmt.Printf("%d of commits loaded. Head: %d\n", len(ct.cm.commits), ct.cm.head.Meta.Height)
ct.nm.Load(ct.Height())
fmt.Printf("%d of nodes loaded.\n", ct.nm.Size())
ct.tr.SetRoot(ct.cm.Head().MerkleRoot)
fmt.Printf("Trie root: %s.\n", ct.MerkleHash())
return nil
}
// Save saves ClaimTrie state to database.
func (ct *ClaimTrie) Save() error {
if err := ct.cm.Save(); err != nil {
return errors.Wrapf(err, "cm.Save()")
}
return nil
}
// Height returns the highest height of blocks commited to the ClaimTrie. // Height returns the highest height of blocks commited to the ClaimTrie.
func (ct *ClaimTrie) Height() claim.Height { func (ct *ClaimTrie) Height() claim.Height {
return ct.height return ct.cm.Head().Meta.Height
} }
// Head returns the tip commit in the commit database. // Head returns the tip commit in the commit database.
func (ct *ClaimTrie) Head() *Commit { func (ct *ClaimTrie) Head() *Commit {
return ct.head return ct.cm.Head()
} }
// Trie returns the Stage of the claimtrie . // Trie returns the MerkleTrie of the ClaimTrie .
func (ct *ClaimTrie) Trie() *trie.Trie { func (ct *ClaimTrie) Trie() *trie.Trie {
return ct.stg return ct.tr
} }
// NodeMgr returns the Node Manager of the claimtrie . // NodeMgr returns the Node Manager of the ClaimTrie .
func (ct *ClaimTrie) NodeMgr() *nodemgr.NodeMgr { func (ct *ClaimTrie) NodeMgr() *nodemgr.NodeMgr {
return ct.nm return ct.nm
} }
// AddClaim adds a Claim to the Stage. // CommitMgr returns the Commit Manager of the ClaimTrie .
func (ct *ClaimTrie) CommitMgr() *CommitMgr {
return ct.cm
}
// AddClaim adds a Claim to the ClaimTrie.
func (ct *ClaimTrie) AddClaim(name string, op claim.OutPoint, amt claim.Amount) error { func (ct *ClaimTrie) AddClaim(name string, op claim.OutPoint, amt claim.Amount) error {
modifier := func(n *claim.Node) error { c := change.New(change.AddClaim).SetOP(op).SetAmt(amt)
return n.AddClaim(op, amt) return ct.modify(name, c)
}
return ct.updateNode(name, modifier)
} }
// SpendClaim spend a Claim in the Stage. // SpendClaim spend a Claim in the ClaimTrie.
func (ct *ClaimTrie) SpendClaim(name string, op claim.OutPoint) error { func (ct *ClaimTrie) SpendClaim(name string, op claim.OutPoint) error {
modifier := func(n *claim.Node) error { c := change.New(change.SpendClaim).SetOP(op)
return n.SpendClaim(op) return ct.modify(name, c)
}
return ct.updateNode(name, modifier)
} }
// UpdateClaim updates a Claim in the Stage. // UpdateClaim updates a Claim in the ClaimTrie.
func (ct *ClaimTrie) UpdateClaim(name string, op claim.OutPoint, amt claim.Amount, id claim.ID) error { func (ct *ClaimTrie) UpdateClaim(name string, op claim.OutPoint, amt claim.Amount, id claim.ID) error {
modifier := func(n *claim.Node) error { c := change.New(change.UpdateClaim).SetOP(op).SetAmt(amt).SetID(id)
return n.UpdateClaim(op, amt, id) return ct.modify(name, c)
}
return ct.updateNode(name, modifier)
} }
// AddSupport adds a Support to the Stage. // AddSupport adds a Support to the ClaimTrie.
func (ct *ClaimTrie) AddSupport(name string, op claim.OutPoint, amt claim.Amount, id claim.ID) error { func (ct *ClaimTrie) AddSupport(name string, op claim.OutPoint, amt claim.Amount, id claim.ID) error {
modifier := func(n *claim.Node) error { c := change.New(change.AddSupport).SetOP(op).SetAmt(amt).SetID(id)
return n.AddSupport(op, amt, id) return ct.modify(name, c)
}
return ct.updateNode(name, modifier)
} }
// SpendSupport spend a support in the Stage. // SpendSupport spend a support in the ClaimTrie.
func (ct *ClaimTrie) SpendSupport(name string, op claim.OutPoint) error { func (ct *ClaimTrie) SpendSupport(name string, op claim.OutPoint) error {
modifier := func(n *claim.Node) error { c := change.New(change.SpendSupport).SetOP(op)
return n.SpendSupport(op) return ct.modify(name, c)
}
return ct.updateNode(name, modifier)
} }
// Traverse visits Nodes in the Stage. func (ct *ClaimTrie) modify(name string, c *change.Change) error {
func (ct *ClaimTrie) Traverse(visit trie.Visit) error { c.SetHeight(ct.Height() + 1).SetName(name)
return ct.stg.Traverse(visit) if err := ct.nm.ModifyNode(name, c); err != nil {
} return err
// MerkleHash returns the Merkle Hash of the Stage.
func (ct *ClaimTrie) MerkleHash() (*chainhash.Hash, error) {
// ct.nm.UpdateAll(ct.stg.Update)
return ct.stg.MerkleHash()
}
// Commit commits the current Stage into database.
func (ct *ClaimTrie) Commit(h claim.Height) error {
if h < ct.height {
return errors.Wrapf(ErrInvalidHeight, "%d < ct.height %d", h, ct.height)
} }
ct.tr.Update([]byte(name))
for i := ct.height + 1; i <= h; i++ {
if err := ct.nm.CatchUp(i, ct.stg.Update); err != nil {
return errors.Wrapf(err, "nm.CatchUp(%d, stg.Update)", i)
}
}
hash, err := ct.MerkleHash()
if err != nil {
return errors.Wrapf(err, "MerkleHash()")
}
commit := newCommit(ct.head, CommitMeta{Height: h}, hash)
ct.head = commit
ct.height = h
ct.stg.SetRoot(hash)
return nil return nil
} }
// Reset reverts the Stage to the current or previous height specified. // MerkleHash returns the Merkle Hash of the ClaimTrie.
func (ct *ClaimTrie) Reset(h claim.Height) error { func (ct *ClaimTrie) MerkleHash() *chainhash.Hash {
if h > ct.height { return ct.tr.MerkleHash()
return errors.Wrapf(ErrInvalidHeight, "%d > ct.height %d", h, ct.height)
}
fmt.Printf("ct.Reset from %d to %d\n", ct.height, h)
commit := ct.head
for commit.Meta.Height > h {
commit = commit.Prev
}
if err := ct.nm.Reset(h); err != nil {
return errors.Wrapf(err, "nm.Reset(%d)", h)
}
ct.head = commit
ct.height = h
ct.stg.SetRoot(commit.MerkleRoot)
return nil
} }
func (ct *ClaimTrie) updateNode(name string, modifier func(n *claim.Node) error) error { // Commit commits the current changes into database.
if err := ct.nm.ModifyNode(name, ct.height, modifier); err != nil { func (ct *ClaimTrie) Commit(ht claim.Height) {
return errors.Wrapf(err, "nm.ModifyNode(%s, %d)", name, ct.height) if ht < ct.Height() {
return
} }
if err := ct.stg.Update(trie.Key(name)); err != nil { for i := ct.Height() + 1; i <= ht; i++ {
return errors.Wrapf(err, "stg.Update(%s)", name) ct.nm.CatchUp(i, ct.tr.Update)
} }
h := ct.MerkleHash()
ct.cm.Commit(ht, h)
ct.tr.SetRoot(h)
}
// Reset resets the tip commit to a previous height specified.
func (ct *ClaimTrie) Reset(ht claim.Height) error {
if ht > ct.Height() {
return ErrInvalidHeight
}
ct.cm.Reset(ht)
ct.nm.Reset(ht)
ct.tr.SetRoot(ct.Head().MerkleRoot)
return nil return nil
} }

View file

@ -3,36 +3,33 @@ package main
import ( import (
"bufio" "bufio"
"crypto/rand" "crypto/rand"
"errors"
"fmt" "fmt"
"log" "log"
"math/big" "math/big"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strings" "strings"
"syscall"
"github.com/lbryio/claimtrie" "github.com/lbryio/claimtrie"
"github.com/lbryio/claimtrie/cfg"
"github.com/lbryio/claimtrie/claim" "github.com/lbryio/claimtrie/claim"
"github.com/lbryio/claimtrie/trie"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil" "github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
var ( var (
ct *claimtrie.ClaimTrie ct *claimtrie.ClaimTrie
defaultHomeDir = btcutil.AppDataDir("lbrycrd.go", false)
defaultDataDir = filepath.Join(defaultHomeDir, "data")
dbTriePath = filepath.Join(defaultDataDir, "dbTrie")
) )
var ( var (
all bool all bool
chk bool chk bool
dump bool
verbose bool
name string name string
height claim.Height height claim.Height
amt claim.Amount amt claim.Amount
@ -42,7 +39,9 @@ var (
var ( var (
flagAll = cli.BoolFlag{Name: "all, a", Usage: "Show all nodes", Destination: &all} flagAll = cli.BoolFlag{Name: "all, a", Usage: "Show all nodes", Destination: &all}
flagCheck = cli.BoolFlag{Name: "chk, c", Usage: "Check Merkle Hash during importing", Destination: &chk} flagCheck = cli.BoolTFlag{Name: "chk, c", Usage: "Check Merkle Hash during importing", Destination: &chk}
flagDump = cli.BoolFlag{Name: "dump, d", Usage: "Dump cmds", Destination: &dump}
flagVerbose = cli.BoolFlag{Name: "verbose, v", Usage: "Verbose (will be replaced by loglevel)", Destination: &verbose}
flagAmount = cli.Int64Flag{Name: "amount, a", Usage: "Amount", Destination: (*int64)(&amt)} flagAmount = cli.Int64Flag{Name: "amount, a", Usage: "Amount", Destination: (*int64)(&amt)}
flagHeight = cli.Int64Flag{Name: "height, ht", Usage: "Height"} flagHeight = cli.Int64Flag{Name: "height, ht", Usage: "Height"}
flagName = cli.StringFlag{Name: "name, n", Value: "Hello", Usage: "Name", Destination: &name} flagName = cli.StringFlag{Name: "name, n", Value: "Hello", Usage: "Name", Destination: &name}
@ -105,22 +104,22 @@ func main() {
{ {
Name: "show", Name: "show",
Aliases: []string{"s"}, Aliases: []string{"s"},
Usage: "Show the status of Stage)", Usage: "Show the status of nodes)",
Before: parseArgs, Before: parseArgs,
Action: cmdShow, Action: cmdShow,
Flags: []cli.Flag{flagAll, flagName, flagHeight}, Flags: []cli.Flag{flagAll, flagName, flagHeight, flagDump},
}, },
{ {
Name: "merkle", Name: "merkle",
Aliases: []string{"m"}, Aliases: []string{"m"},
Usage: "Show the Merkle Hash of the Stage.", Usage: "Show the Merkle Hash of the ClaimTrie.",
Before: parseArgs, Before: parseArgs,
Action: cmdMerkle, Action: cmdMerkle,
}, },
{ {
Name: "commit", Name: "commit",
Aliases: []string{"c"}, Aliases: []string{"c"},
Usage: "Commit the current Stage to database.", Usage: "Commit the current changes to database.",
Before: parseArgs, Before: parseArgs,
Action: cmdCommit, Action: cmdCommit,
Flags: []cli.Flag{flagHeight}, Flags: []cli.Flag{flagHeight},
@ -128,7 +127,7 @@ func main() {
{ {
Name: "reset", Name: "reset",
Aliases: []string{"r"}, Aliases: []string{"r"},
Usage: "Reset the Head commit and Stage to a specified commit.", Usage: "Reset the Head commit and a specified commit (by Height).",
Before: parseArgs, Before: parseArgs,
Action: cmdReset, Action: cmdReset,
Flags: []cli.Flag{flagHeight}, Flags: []cli.Flag{flagHeight},
@ -140,13 +139,36 @@ func main() {
Before: parseArgs, Before: parseArgs,
Action: cmdLog, Action: cmdLog,
}, },
{
Name: "ipmort",
Aliases: []string{"i"},
Usage: "Import changes from datbase.",
Before: parseArgs,
Action: cmdImport,
Flags: []cli.Flag{flagHeight, flagCheck, flagVerbose},
},
{ {
Name: "load", Name: "load",
Aliases: []string{"ld"}, Aliases: []string{"ld"},
Usage: "Load prerecorded command from datbase.", Usage: "Load nodes from datbase.",
Before: parseArgs, Before: parseArgs,
Action: cmdLoad, Action: cmdLoad,
Flags: []cli.Flag{flagHeight, flagCheck}, Flags: []cli.Flag{},
},
{
Name: "save",
Aliases: []string{"sv"},
Usage: "Save nodes to datbase.",
Before: parseArgs,
Action: cmdSave,
Flags: []cli.Flag{},
},
{
Name: "erase",
Usage: "Erase datbase",
Before: parseArgs,
Action: cmdErase,
Flags: []cli.Flag{},
}, },
{ {
Name: "shell", Name: "shell",
@ -157,12 +179,28 @@ func main() {
}, },
} }
dbTrie, err := leveldb.OpenFile(dbTriePath, nil) path := cfg.DefaultConfig(cfg.TrieDB)
dbTrie, err := leveldb.OpenFile(path, nil)
if err != nil { if err != nil {
log.Fatalf("can't open dbTrie at %s, err: %s\n", dbTriePath, err) log.Fatalf("can't open %s, err: %s\n", path, err)
} }
fmt.Printf("dbTriePath: %q\n", dbTriePath) fmt.Printf("opened %q\n", path)
ct = claimtrie.New(dbTrie, nil)
path = cfg.DefaultConfig(cfg.NodeDB)
dbNodeMgr, err := leveldb.OpenFile(path, nil)
if err != nil {
log.Fatalf("can't open %s, err: %s\n", path, err)
}
fmt.Printf("opened %q\n", path)
path = cfg.DefaultConfig(cfg.CommitDB)
dbCommit, err := leveldb.OpenFile(path, nil)
if err != nil {
log.Fatalf("can't open %s, err: %s\n", path, err)
}
fmt.Printf("opened %q\n", path)
ct = claimtrie.New(dbCommit, dbTrie, dbNodeMgr)
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
fmt.Printf("error: %s\n", err) fmt.Printf("error: %s\n", err)
} }
@ -195,22 +233,18 @@ func cmdSpendSupport(c *cli.Context) error {
} }
func cmdShow(c *cli.Context) error { func cmdShow(c *cli.Context) error {
fmt.Printf("\n<ClaimTrie Height %d (Nodes) >\n\n", ct.Height()) fmt.Printf("\n<ClaimTrie Height %d >\n\n", ct.Height())
if all { if all {
name = "" name = ""
} }
return ct.NodeMgr().Show(name) if !c.IsSet("height") {
height = ct.Height()
// fmt.Printf("\n<ClaimTrie Height %d (Stage) >\n\n", ct.Height()) }
// return ct.Traverse(showNode()) return ct.NodeMgr().Show(name, height, dump)
} }
func cmdMerkle(c *cli.Context) error { func cmdMerkle(c *cli.Context) error {
h, err := ct.MerkleHash() fmt.Printf("%s at %d\n", ct.MerkleHash(), ct.Height())
if err != nil {
return err
}
fmt.Printf("%s at %d\n", h, ct.Height())
return nil return nil
} }
@ -218,7 +252,8 @@ func cmdCommit(c *cli.Context) error {
if !c.IsSet("height") { if !c.IsSet("height") {
height = ct.Height() + 1 height = ct.Height() + 1
} }
return ct.Commit(height) ct.Commit(height)
return nil
} }
func cmdReset(c *cli.Context) error { func cmdReset(c *cli.Context) error {
@ -227,14 +262,46 @@ func cmdReset(c *cli.Context) error {
func cmdLog(c *cli.Context) error { func cmdLog(c *cli.Context) error {
visit := func(c *claimtrie.Commit) { visit := func(c *claimtrie.Commit) {
meta := c.Meta fmt.Printf("%s at %d\n", c.MerkleRoot, c.Meta.Height)
fmt.Printf("%s at %d\n", c.MerkleRoot, meta.Height)
} }
return claimtrie.Log(ct.Head(), visit) ct.CommitMgr().Log(ct.Height(), visit)
return nil
}
func cmdImport(c *cli.Context) error {
path := cfg.DefaultConfig(cfg.ClaimScriptDB)
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return errors.Wrapf(err, "path %s", path)
}
defer db.Close()
if err = claimtrie.Load(db, ct, height, verbose, chk); err != nil {
return err
}
return nil
} }
func cmdLoad(c *cli.Context) error { func cmdLoad(c *cli.Context) error {
return claimtrie.Load(ct, height, chk) return ct.Load()
}
func cmdSave(c *cli.Context) error {
return ct.Save()
}
func cmdErase(c *cli.Context) error {
if err := os.RemoveAll(cfg.DefaultConfig(cfg.CommitDB)); err != nil {
return err
}
if err := os.RemoveAll(cfg.DefaultConfig(cfg.NodeDB)); err != nil {
return err
}
if err := os.RemoveAll(cfg.DefaultConfig(cfg.TrieDB)); err != nil {
return err
}
fmt.Printf("Databses erased. Exiting...\n")
os.Exit(0)
return nil
} }
func cmdShell(app *cli.App) { func cmdShell(app *cli.App) {
@ -248,7 +315,7 @@ func cmdShell(app *cli.App) {
} }
}() }()
defer close(sigs) defer close(sigs)
// signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
for { for {
fmt.Printf("%s > ", app.Name) fmt.Printf("%s > ", app.Name)
text, err := reader.ReadString('\n') text, err := reader.ReadString('\n')
@ -260,6 +327,9 @@ func cmdShell(app *cli.App) {
continue continue
} }
if text == "quit" || text == "q" { if text == "quit" || text == "q" {
if err = ct.Save(); err != nil {
fmt.Printf("ct.Save() failed, err: %s\n", err)
}
break break
} }
err = app.Run(append(os.Args[1:], strings.Split(text, " ")...)) err = app.Run(append(os.Args[1:], strings.Split(text, " ")...))
@ -335,29 +405,3 @@ func parseID(c *cli.Context) error {
id, err = claim.NewIDFromString(c.String("id")) id, err = claim.NewIDFromString(c.String("id"))
return err return err
} }
var showNode = func() trie.Visit {
return func(prefix trie.Key, val trie.Value) error {
if val == nil || val.Hash() == nil {
fmt.Printf("%-8s:\n", prefix)
return nil
}
fmt.Printf("%-8s: %v\n", prefix, val)
return nil
}
}
var recall = func(h claim.Height, visit trie.Visit) trie.Visit {
return func(prefix trie.Key, val trie.Value) error {
n := val.(*claim.Node)
old := n.Height()
err := n.Recall(h)
if err == nil {
err = visit(prefix, val)
}
if err == nil {
err = n.AdjustTo(old)
}
return err
}
}

112
commit.go
View file

@ -1,11 +1,20 @@
package claimtrie package claimtrie
import ( import (
"bytes"
"encoding/gob"
"github.com/lbryio/claimtrie/claim" "github.com/lbryio/claimtrie/claim"
"github.com/lbryio/claimtrie/trie"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb"
) )
// CommitVisit ...
type CommitVisit func(c *Commit)
// CommitMeta represent the meta associated with each commit. // CommitMeta represent the meta associated with each commit.
type CommitMeta struct { type CommitMeta struct {
Height claim.Height Height claim.Height
@ -13,7 +22,6 @@ type CommitMeta struct {
func newCommit(head *Commit, meta CommitMeta, h *chainhash.Hash) *Commit { func newCommit(head *Commit, meta CommitMeta, h *chainhash.Hash) *Commit {
return &Commit{ return &Commit{
Prev: head,
MerkleRoot: h, MerkleRoot: h,
Meta: meta, Meta: meta,
} }
@ -21,19 +29,105 @@ func newCommit(head *Commit, meta CommitMeta, h *chainhash.Hash) *Commit {
// Commit ... // Commit ...
type Commit struct { type Commit struct {
Prev *Commit
MerkleRoot *chainhash.Hash MerkleRoot *chainhash.Hash
Meta CommitMeta Meta CommitMeta
} }
// CommitVisit ... // CommitMgr ...
type CommitVisit func(c *Commit) type CommitMgr struct {
db *leveldb.DB
commits []*Commit
head *Commit
}
// Log ... // NewCommitMgr ...
func Log(commit *Commit, visit CommitVisit) error { func NewCommitMgr(db *leveldb.DB) *CommitMgr {
for commit != nil { head := newCommit(nil, CommitMeta{0}, trie.EmptyTrieHash)
visit(commit) cm := CommitMgr{
commit = commit.Prev db: db,
head: head,
}
cm.commits = append(cm.commits, head)
return &cm
}
// Head ...
func (cm *CommitMgr) Head() *Commit {
return cm.head
}
// Commit ...
func (cm *CommitMgr) Commit(ht claim.Height, merkle *chainhash.Hash) {
if ht == 0 {
return
}
c := newCommit(cm.head, CommitMeta{ht}, merkle)
cm.commits = append(cm.commits, c)
cm.head = c
}
// Reset ...
func (cm *CommitMgr) Reset(ht claim.Height) {
for i := len(cm.commits) - 1; i >= 0; i-- {
c := cm.commits[i]
if c.Meta.Height <= ht {
cm.head = c
cm.commits = cm.commits[:i+1]
break
}
}
if cm.head.Meta.Height == ht {
return
}
cm.Commit(ht, cm.head.MerkleRoot)
}
// Save ...
func (cm *CommitMgr) Save() error {
exported := struct {
Commits []*Commit
Head *Commit
}{
Commits: cm.commits,
Head: cm.head,
}
buf := bytes.NewBuffer(nil)
if err := gob.NewEncoder(buf).Encode(exported); err != nil {
return errors.Wrapf(err, "gob.Encode()", err)
}
if err := cm.db.Put([]byte("CommitMgr"), buf.Bytes(), nil); err != nil {
return errors.Wrapf(err, "db.Put(CommitMgr)")
} }
return nil return nil
} }
// Load ...
func (cm *CommitMgr) Load() error {
exported := struct {
Commits []*Commit
Head *Commit
}{}
data, err := cm.db.Get([]byte("CommitMgr"), nil)
if err != nil {
return errors.Wrapf(err, "db.Get(CommitMgr)")
}
if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(&exported); err != nil {
return errors.Wrapf(err, "gob.Encode()", err)
}
cm.commits = exported.Commits
cm.head = exported.Head
return nil
}
// Log ...
func (cm *CommitMgr) Log(ht claim.Height, visit CommitVisit) {
for i := len(cm.commits) - 1; i >= 0; i-- {
c := cm.commits[i]
if c.Meta.Height > ht {
continue
}
visit(c)
}
}

156
import.go
View file

@ -5,130 +5,76 @@ import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"log" "log"
"path/filepath"
"strconv" "strconv"
"github.com/lbryio/claimtrie/change"
"github.com/lbryio/claimtrie/claim" "github.com/lbryio/claimtrie/claim"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/pkg/errors"
"github.com/btcsuite/btcutil"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
var (
defaultHomeDir = btcutil.AppDataDir("lbrycrd.go", false)
defaultDataDir = filepath.Join(defaultHomeDir, "data")
dbCmdPath = filepath.Join(defaultDataDir, "dbCmd")
)
type block struct {
Hash chainhash.Hash
Cmds []claim.Cmd
}
// Load ... // Load ...
func Load(ct *ClaimTrie, h claim.Height, chk bool) error { func Load(db *leveldb.DB, ct *ClaimTrie, ht claim.Height, verbose, chk bool) error {
db := DefaultRecorder() for i := ct.Height() + 1; i <= ht; i++ {
defer db.Close() blk, err := getBlock(db, i)
if err != nil {
for i := ct.height + 1; i <= h; i++ { return errors.Wrapf(err, "getBlock(db, %d)", i)
key := strconv.Itoa(int(i)) }
data, err := db.Get([]byte(key), nil) if blk == nil {
if err == leveldb.ErrNotFound {
continue continue
} else if err != nil {
return err
} }
var blk block ct.Commit(i - 1)
if err = gob.NewDecoder(bytes.NewBuffer(data)).Decode(&blk); err != nil { for _, chg := range blk.Changes {
return err if err = apply(ct, chg, verbose); err != nil {
return errors.Wrapf(err, "apply(%s)", chg)
} }
}
ct.Commit(i)
if err = ct.Commit(i - 1); err != nil {
return err
}
for _, cmd := range blk.Cmds {
if err = execute(ct, &cmd); err != nil {
fmt.Printf("execute faile: err %s\n", err)
return err
}
}
if err = ct.Commit(i); err != nil {
return err
}
if !chk { if !chk {
continue continue
} }
hash, err := ct.MerkleHash() if hash := ct.MerkleHash(); *hash != blk.Hash {
if err != nil { return fmt.Errorf("blk %d hash: got %s, want %s", i, *hash, blk.Hash)
return err
}
if *hash != blk.Hash {
return fmt.Errorf("block %d hash: got %s, want %s", i, *hash, blk.Hash)
} }
} }
return ct.Commit(h) ct.Commit(ht)
return nil
} }
func execute(ct *ClaimTrie, c *claim.Cmd) error { func getBlock(db *leveldb.DB, ht claim.Height) (*change.Block, error) {
// Value []byte key := strconv.Itoa(int(ht))
fmt.Printf("%s\n", c) data, err := db.Get([]byte(key), nil)
if err == leveldb.ErrNotFound {
return nil, nil
} else if err != nil {
return nil, errors.Wrapf(err, "db.Get(%s)", key)
}
var blk change.Block
if err = gob.NewDecoder(bytes.NewBuffer(data)).Decode(&blk); err != nil {
return nil, errors.Wrapf(err, "gob.Decode(&blk)")
}
return &blk, nil
}
func apply(ct *ClaimTrie, c *change.Change, verbose bool) error {
if verbose {
log.Printf("%s", c)
}
var err error
switch c.Cmd { switch c.Cmd {
case claim.CmdAddClaim: case change.AddClaim:
return ct.AddClaim(c.Name, c.OP, c.Amt) err = ct.AddClaim(c.Name, c.OP, c.Amt)
case claim.CmdSpendClaim: case change.SpendClaim:
return ct.SpendClaim(c.Name, c.OP) err = ct.SpendClaim(c.Name, c.OP)
case claim.CmdUpdateClaim: case change.UpdateClaim:
return ct.UpdateClaim(c.Name, c.OP, c.Amt, c.ID) err = ct.UpdateClaim(c.Name, c.OP, c.Amt, c.ID)
case claim.CmdAddSupport: case change.AddSupport:
return ct.AddSupport(c.Name, c.OP, c.Amt, c.ID) err = ct.AddSupport(c.Name, c.OP, c.Amt, c.ID)
case claim.CmdSpendSupport: case change.SpendSupport:
return ct.SpendSupport(c.Name, c.OP) err = ct.SpendSupport(c.Name, c.OP)
} }
return nil return errors.Wrapf(err, "exec %s", c)
}
// Recorder ..
type Recorder struct {
db *leveldb.DB
}
// Put sets the value for the given key. It overwrites any previous value for that key.
func (r *Recorder) Put(key []byte, data interface{}) error {
buf := bytes.NewBuffer(nil)
if err := gob.NewEncoder(buf).Encode(data); err != nil {
return fmt.Errorf("can't encode cmds, err: %s", err)
}
if err := r.db.Put(key, buf.Bytes(), nil); err != nil {
return fmt.Errorf("can't put to db, err: %s", err)
}
return nil
}
// Get ...
func (r *Recorder) Get(key []byte, data interface{}) ([]byte, error) {
return r.db.Get(key, nil)
}
// Close ...
func (r *Recorder) Close() error {
err := r.db.Close()
r.db = nil
return err
}
var recorder Recorder
// DefaultRecorder ...
func DefaultRecorder() *Recorder {
if recorder.db == nil {
db, err := leveldb.OpenFile(dbCmdPath, nil)
if err != nil {
log.Fatalf("can't open :%s, err: %s\n", dbCmdPath, err)
}
fmt.Printf("dbCmds %s opened\n", dbCmdPath)
recorder.db = db
}
return &recorder
} }

View file

@ -3,20 +3,21 @@ package nodemgr
import ( import (
"fmt" "fmt"
"sort" "sort"
"sync"
"github.com/lbryio/claimtrie/change"
"github.com/lbryio/claimtrie/claim" "github.com/lbryio/claimtrie/claim"
"github.com/lbryio/claimtrie/trie" "github.com/lbryio/claimtrie/trie"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
// NodeMgr ... // NodeMgr ...
type NodeMgr struct { type NodeMgr struct {
sync.RWMutex height claim.Height
db *leveldb.DB db *leveldb.DB
nodes map[string]*claim.Node cache map[string]*claim.Node
dirty map[string]bool
nextUpdates todos nextUpdates todos
} }
@ -24,132 +25,154 @@ type NodeMgr struct {
func New(db *leveldb.DB) *NodeMgr { func New(db *leveldb.DB) *NodeMgr {
nm := &NodeMgr{ nm := &NodeMgr{
db: db, db: db,
nodes: map[string]*claim.Node{}, cache: map[string]*claim.Node{},
dirty: map[string]bool{},
nextUpdates: todos{}, nextUpdates: todos{},
} }
return nm return nm
} }
// Get ... // Load loads the nodes from the database up to height ht.
func (nm *NodeMgr) Get(key trie.Key) (trie.Value, error) { func (nm *NodeMgr) Load(ht claim.Height) {
nm.Lock() nm.height = ht
defer nm.Unlock() iter := nm.db.NewIterator(nil, nil)
for iter.Next() {
if n, ok := nm.nodes[string(key)]; ok { name := string(iter.Key())
return n, nil nm.cache[name] = nm.load(name, ht)
} }
if nm.db != nil {
b, err := nm.db.Get(key, nil)
if err == nil {
_ = b // TODO: Loaded. Deserialize it.
} else if err != leveldb.ErrNotFound {
// DB error. Propagated.
return nil, err
}
}
// New node.
n := claim.NewNode(string(key))
nm.nodes[string(key)] = n
return n, nil
} }
// Set ... // Get returns the latest node with name specified by key.
func (nm *NodeMgr) Set(key trie.Key, val trie.Value) { func (nm *NodeMgr) Get(key []byte) trie.Value {
n := val.(*claim.Node) return nm.nodeAt(string(key), nm.height)
nm.Lock()
defer nm.Unlock()
nm.nodes[string(key)] = n
nm.dirty[string(key)] = true
// TODO: flush to disk.
} }
// Reset resets all nodes to specified height. // Reset resets all nodes to specified height.
func (nm *NodeMgr) Reset(h claim.Height) error { func (nm *NodeMgr) Reset(ht claim.Height) {
for _, n := range nm.nodes { nm.height = ht
if err := n.Reset(h); err != nil { for name, n := range nm.cache {
return err if n.Height() >= ht {
nm.cache[name] = nm.load(name, ht)
} }
} }
return nil
} }
// NodeAt returns the node adjusted to specified height. // Size returns the number of nodes loaded into the cache.
func (nm *NodeMgr) NodeAt(name string, h claim.Height) (*claim.Node, error) { func (nm *NodeMgr) Size() int {
v, err := nm.Get(trie.Key(name)) return len(nm.cache)
if err != nil { }
return nil, err
func (nm *NodeMgr) load(name string, ht claim.Height) *claim.Node {
c := change.NewChangeList(nm.db, name).Load().Truncate(ht).Changes()
return NewFromChanges(name, c, ht)
}
// nodeAt returns the node adjusted to specified height.
func (nm *NodeMgr) nodeAt(name string, ht claim.Height) *claim.Node {
n, ok := nm.cache[name]
if !ok {
n = claim.NewNode(name)
nm.cache[name] = n
} }
n := v.(*claim.Node)
if err = n.AdjustTo(h); err != nil { // Cached version is too new.
return nil, err if n.Height() > nm.height || n.Height() > ht {
n = nm.load(name, ht)
} }
return n, nil return n.AdjustTo(ht)
} }
// ModifyNode returns the node adjusted to specified height. // ModifyNode returns the node adjusted to specified height.
func (nm *NodeMgr) ModifyNode(name string, h claim.Height, modifier func(*claim.Node) error) error { func (nm *NodeMgr) ModifyNode(name string, chg *change.Change) error {
n, err := nm.NodeAt(name, h) ht := nm.height
if err != nil { n := nm.nodeAt(name, ht)
return err n.AdjustTo(ht)
if err := execute(n, chg); err != nil {
return errors.Wrapf(err, "claim.execute(n,chg)")
} }
if err = modifier(n); err != nil { nm.cache[name] = n
return err nm.nextUpdates.set(name, ht+1)
} change.NewChangeList(nm.db, name).Load().Append(chg).Save()
nm.nextUpdates.set(name, h+1)
return nil return nil
} }
// CatchUp ... // CatchUp ...
func (nm *NodeMgr) CatchUp(h claim.Height, notifier func(key trie.Key) error) error { func (nm *NodeMgr) CatchUp(ht claim.Height, notifier func(key []byte)) {
for name := range nm.nextUpdates[h] { nm.height = ht
n, err := nm.NodeAt(name, h) for name := range nm.nextUpdates[ht] {
if err != nil { notifier([]byte(name))
return err if next := nm.nodeAt(name, ht).NextUpdate(); next > ht {
}
if err = notifier(trie.Key(name)); err != nil {
return err
}
if next := n.NextUpdate(); next > h {
nm.nextUpdates.set(name, next) nm.nextUpdates.set(name, next)
} }
} }
return nil
} }
// Show ... // Show is a conevenient function for debugging and velopment purpose.
func (nm *NodeMgr) Show(name string) error { // The proper way to handle user request would be a query function with filters specified.
if len(name) != 0 { func (nm *NodeMgr) Show(name string, ht claim.Height, dump bool) error {
fmt.Printf("[%s] %s\n", name, nm.nodes[name])
return nil
}
names := []string{} names := []string{}
for name := range nm.nodes { if len(name) != 0 {
names = append(names, name) names = append(names, name)
} else {
for name := range nm.cache {
names = append(names, name)
}
} }
sort.Strings(names) sort.Strings(names)
for _, name := range names { for _, name := range names {
fmt.Printf("[%s] %s\n", name, nm.nodes[name]) n := nm.nodeAt(name, ht)
if n.BestClaim() == nil {
continue
}
fmt.Printf("[%s] %s\n", name, n)
if dump {
change.NewChangeList(nm.db, name).Load().Dump()
}
} }
return nil return nil
} }
// UpdateAll ... // NewFromChanges ...
func (nm *NodeMgr) UpdateAll(m func(key trie.Key) error) error { func NewFromChanges(name string, chgs []*change.Change, ht claim.Height) *claim.Node {
for name := range nm.nodes { return replay(name, chgs).AdjustTo(ht)
m(trie.Key(name)) }
func replay(name string, chgs []*change.Change) *claim.Node {
n := claim.NewNode(name)
for _, chg := range chgs {
if n.Height() < chg.Height-1 {
n.AdjustTo(chg.Height - 1)
} }
return nil if n.Height() == chg.Height-1 {
if err := execute(n, chg); err != nil {
panic(err)
}
}
}
return n
}
func execute(n *claim.Node, c *change.Change) error {
var err error
switch c.Cmd {
case change.AddClaim:
err = n.AddClaim(c.OP, c.Amt)
case change.SpendClaim:
err = n.SpendClaim(c.OP)
case change.UpdateClaim:
err = n.UpdateClaim(c.OP, c.Amt, c.ID)
case change.AddSupport:
err = n.AddSupport(c.OP, c.Amt, c.ID)
case change.SpendSupport:
err = n.SpendSupport(c.OP)
}
return errors.Wrapf(err, "chg %s", c)
} }
type todos map[claim.Height]map[string]bool type todos map[claim.Height]map[string]bool
func (t todos) set(name string, h claim.Height) { func (t todos) set(name string, ht claim.Height) {
if t[h] == nil { if t[ht] == nil {
t[h] = map[string]bool{} t[ht] = map[string]bool{}
} }
t[h][name] = true t[ht][name] = true
} }

View file

@ -4,9 +4,6 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
) )
// Key defines the key type of the MerkleTrie.
type Key []byte
// Value defines value for the MerkleTrie. // Value defines value for the MerkleTrie.
type Value interface { type Value interface {
Hash() *chainhash.Hash Hash() *chainhash.Hash
@ -14,6 +11,5 @@ type Value interface {
// KeyValue ... // KeyValue ...
type KeyValue interface { type KeyValue interface {
Get(k Key) (Value, error) Get(key []byte) Value
Set(k Key, v Value)
} }

View file

@ -2,18 +2,12 @@ package trie
import ( import (
"bytes" "bytes"
"fmt"
"sync" "sync"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
var (
// ErrResolve is returned when an error occured during resolve.
ErrResolve = fmt.Errorf("can't resolve")
)
var ( var (
// EmptyTrieHash represents the Merkle Hash of an empty Trie. // EmptyTrieHash represents the Merkle Hash of an empty Trie.
// "0000000000000000000000000000000000000000000000000000000000000001" // "0000000000000000000000000000000000000000000000000000000000000001"
@ -52,35 +46,30 @@ func (t *Trie) SetRoot(h *chainhash.Hash) {
// Update updates the nodes along the path to the key. // Update updates the nodes along the path to the key.
// Each node is resolved or created with their Hash cleared. // Each node is resolved or created with their Hash cleared.
func (t *Trie) Update(key Key) error { func (t *Trie) Update(key []byte) {
n := t.root n := t.root
for _, ch := range key { for _, ch := range key {
if err := t.resolve(n); err != nil { t.resolve(n)
return ErrResolve
}
if n.links[ch] == nil { if n.links[ch] == nil {
n.links[ch] = newNode() n.links[ch] = newNode()
} }
n.hash = nil n.hash = nil
n = n.links[ch] n = n.links[ch]
} }
if err := t.resolve(n); err != nil { t.resolve(n)
return ErrResolve
}
n.hasValue = true n.hasValue = true
n.hash = nil n.hash = nil
return nil
} }
func (t *Trie) resolve(n *node) error { func (t *Trie) resolve(n *node) {
if n.hash == nil { if n.hash == nil {
return nil return
} }
b, err := t.db.Get(n.hash[:], nil) b, err := t.db.Get(n.hash[:], nil)
if err == leveldb.ErrNotFound { if err == leveldb.ErrNotFound {
return nil return
} else if err != nil { } else if err != nil {
return errors.Wrapf(err, "db.Get(%s)", n.hash) panic(err)
} }
nb := nbuf(b) nb := nbuf(b)
@ -90,68 +79,29 @@ func (t *Trie) resolve(n *node) error {
n.links[p] = newNode() n.links[p] = newNode()
n.links[p].hash = h n.links[p].hash = h
} }
return nil
}
// Visit implements callback function invoked when the Value is visited.
// During the traversal, if a non-nil error is returned, the traversal ends early.
type Visit func(prefix Key, val Value) error
// Traverse implements preorder traversal visiting each Value node.
func (t *Trie) Traverse(visit Visit) error {
var traverse func(prefix Key, n *node) error
traverse = func(prefix Key, n *node) error {
if n == nil {
return nil
}
for ch, n := range n.links {
if n == nil || !n.hasValue {
continue
}
p := append(prefix, byte(ch))
val, err := t.kv.Get(p)
if err != nil {
return errors.Wrapf(err, "kv.Get(%s)", p)
}
if err := visit(p, val); err != nil {
return err
}
if err := traverse(p, n); err != nil {
return err
}
}
return nil
}
buf := make([]byte, 0, 4096)
return traverse(buf, t.root)
} }
// MerkleHash returns the Merkle Hash of the Trie. // MerkleHash returns the Merkle Hash of the Trie.
// All nodes must have been resolved before calling this function. // All nodes must have been resolved before calling this function.
func (t *Trie) MerkleHash() (*chainhash.Hash, error) { func (t *Trie) MerkleHash() *chainhash.Hash {
t.batch = &leveldb.Batch{} t.batch = &leveldb.Batch{}
buf := make([]byte, 0, 4096) buf := make([]byte, 0, 4096)
if err := t.merkle(buf, t.root); err != nil { if h := t.merkle(buf, t.root); h == nil {
return nil, err return EmptyTrieHash
} }
if t.root.hash == nil { if t.batch.Len() != 0 {
return EmptyTrieHash, nil
}
if t.db != nil && t.batch.Len() != 0 {
if err := t.db.Write(t.batch, nil); err != nil { if err := t.db.Write(t.batch, nil); err != nil {
return nil, errors.Wrapf(err, "db.Write(t.batch, nil)") panic(err)
} }
} }
return t.root.hash, nil return t.root.hash
} }
// merkle recursively resolves the hashes of the node. // merkle recursively resolves the hashes of the node.
// All nodes must have been resolved before calling this function. // All nodes must have been resolved before calling this function.
func (t *Trie) merkle(prefix Key, n *node) error { func (t *Trie) merkle(prefix []byte, n *node) *chainhash.Hash {
if n.hash != nil { if n.hash != nil {
return nil return n.hash
} }
b := t.bufs.Get().(*bytes.Buffer) b := t.bufs.Get().(*bytes.Buffer)
defer t.bufs.Put(b) defer t.bufs.Put(b)
@ -162,39 +112,22 @@ func (t *Trie) merkle(prefix Key, n *node) error {
continue continue
} }
p := append(prefix, byte(ch)) p := append(prefix, byte(ch))
if err := t.merkle(p, n); err != nil { if h := t.merkle(p, n); h != nil {
return err b.WriteByte(byte(ch)) // nolint : errchk
} b.Write(h[:]) // nolint : errchk
if n.hash == nil {
continue
}
if err := b.WriteByte(byte(ch)); err != nil {
panic(err) // Can't happen. Kepp linter happy.
}
if _, err := b.Write(n.hash[:]); err != nil {
panic(err) // Can't happen. Kepp linter happy.
} }
} }
if n.hasValue { if n.hasValue {
val, err := t.kv.Get(prefix) if h := t.kv.Get(prefix).Hash(); h != nil {
if err != nil { b.Write(h[:]) // nolint : errchk
return errors.Wrapf(err, "t.kv.get(%s)", prefix)
}
if h := val.Hash(); h != nil {
if _, err = b.Write(h[:]); err != nil {
panic(err) // Can't happen. Kepp linter happy.
}
} }
} }
if b.Len() == 0 { if b.Len() != 0 {
return nil
}
h := chainhash.DoubleHashH(b.Bytes()) h := chainhash.DoubleHashH(b.Bytes())
n.hash = &h n.hash = &h
if t.db != nil { t.batch.Put(h[:], b.Bytes())
t.batch.Put(n.hash[:], b.Bytes())
} }
return nil return n.hash
} }