diff --git a/claimtrie/claimtrie.go b/claimtrie/claimtrie.go index 2bc0cdbf..1d848e13 100644 --- a/claimtrie/claimtrie.go +++ b/claimtrie/claimtrie.go @@ -4,9 +4,7 @@ import ( "bytes" "fmt" "path/filepath" - "runtime" "sort" - "sync" "github.com/pkg/errors" @@ -249,17 +247,17 @@ func (ct *ClaimTrie) AppendBlock(temporary bool) error { names = append(names, expirations...) names = removeDuplicates(names) - nhns := ct.makeNameHashNext(names, false, nil) - for nhn := range nhns { + for _, name := range names { - ct.merkleTrie.Update(nhn.Name, nhn.Hash, true) - if nhn.Next <= 0 { + hash, next := ct.nodeManager.Hash(name) + ct.merkleTrie.Update(name, hash, true) + if next <= 0 { continue } - newName := normalization.NormalizeIfNecessary(nhn.Name, nhn.Next) + newName := normalization.NormalizeIfNecessary(name, next) updateNames = append(updateNames, newName) - updateHeights = append(updateHeights, nhn.Next) + updateHeights = append(updateHeights, next) } if !temporary && len(updateNames) > 0 { err = ct.temporalRepo.SetNodesAt(updateNames, updateHeights) @@ -356,22 +354,29 @@ func (ct *ClaimTrie) ResetHeight(height int32) error { } func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte, interrupt <-chan struct{}) { - var nhns chan NameHashNext if names == nil { node.Log("Building the entire claim trie in RAM...") ct.claimLogger = newClaimProgressLogger("Processed", node.GetLogger()) - nhns = ct.makeNameHashNext(nil, true, interrupt) - } else { - ct.claimLogger = nil - nhns = ct.makeNameHashNext(names, false, interrupt) - } - for nhn := range nhns { - ct.merkleTrie.Update(nhn.Name, nhn.Hash, false) - if ct.claimLogger != nil { - ct.claimLogger.LogName(nhn.Name) + ct.nodeManager.IterateNames(func(name []byte) bool { + if interruptRequested(interrupt) { + return false + } + clone := make([]byte, len(name)) + copy(clone, name) + hash, _ := ct.nodeManager.Hash(clone) + ct.merkleTrie.Update(clone, hash, false) + ct.claimLogger.LogName(name) + return true + }) + + } else { + for _, name := range names { + hash, _ := ct.nodeManager.Hash(name) + ct.merkleTrie.Update(name, hash, false) } } + } // MerkleHash returns the Merkle Hash of the claimTrie. @@ -437,12 +442,6 @@ func (ct *ClaimTrie) FlushToDisk() { } } -type NameHashNext struct { - Name []byte - Hash *chainhash.Hash - Next int32 -} - func interruptRequested(interrupted <-chan struct{}) bool { select { case <-interrupted: // should never block on nil @@ -452,53 +451,3 @@ func interruptRequested(interrupted <-chan struct{}) bool { return false } - -func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool, interrupt <-chan struct{}) chan NameHashNext { - inputs := make(chan []byte, 512) - outputs := make(chan NameHashNext, 512) - - var wg sync.WaitGroup - hashComputationWorker := func() { - for name := range inputs { - hash, next := ct.nodeManager.Hash(name) - outputs <- NameHashNext{name, hash, next} - } - wg.Done() - } - - threads := int(0.8 * float32(runtime.GOMAXPROCS(0))) - if threads < 1 { - threads = 1 - } - for threads > 0 { - threads-- - wg.Add(1) - go hashComputationWorker() - } - go func() { - if all { - ct.nodeManager.IterateNames(func(name []byte) bool { - if interruptRequested(interrupt) { - return false - } - clone := make([]byte, len(name)) - copy(clone, name) // iteration name buffer is reused on future loops - inputs <- clone - return true - }) - } else { - for _, name := range names { - if interruptRequested(interrupt) { - break - } - inputs <- name - } - } - close(inputs) - }() - go func() { - wg.Wait() - close(outputs) - }() - return outputs -} diff --git a/claimtrie/node/cache.go b/claimtrie/node/cache.go new file mode 100644 index 00000000..905ecd1d --- /dev/null +++ b/claimtrie/node/cache.go @@ -0,0 +1,85 @@ +package node + +import ( + "container/list" + + "github.com/lbryio/lbcd/claimtrie/change" +) + +type cacheLeaf struct { + node *Node + element *list.Element + changes []change.Change + height int32 +} + +type Cache struct { + nodes map[string]*cacheLeaf + order *list.List + limit int +} + +func (nc *Cache) insert(name []byte, n *Node, height int32) { + key := string(name) + + existing := nc.nodes[key] + if existing != nil { + existing.node = n + existing.height = height + existing.changes = nil + nc.order.MoveToFront(existing.element) + return + } + + for nc.order.Len() >= nc.limit { + // TODO: maybe ensure that we don't remove nodes that have a lot of changes? + delete(nc.nodes, nc.order.Back().Value.(string)) + nc.order.Remove(nc.order.Back()) + } + + element := nc.order.PushFront(key) + nc.nodes[key] = &cacheLeaf{node: n, element: element, height: height} +} + +func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) { + key := string(name) + + existing := nc.nodes[key] + if existing != nil && existing.height <= height { + nc.order.MoveToFront(existing.element) + return existing.node, existing.changes, existing.height + } + return nil, nil, -1 +} + +func (nc *Cache) addChanges(changes []change.Change, height int32) { + for _, c := range changes { + key := string(c.Name) + existing := nc.nodes[key] + if existing != nil && existing.height <= height { + existing.changes = append(existing.changes, c) + } + } +} + +func (nc *Cache) drop(names [][]byte) { + for _, name := range names { + key := string(name) + existing := nc.nodes[key] + if existing != nil { + // we can't roll it backwards because we don't know its previous height value; just toast it + delete(nc.nodes, key) + nc.order.Remove(existing.element) + } + } +} + +func (nc *Cache) clear() { + nc.nodes = map[string]*cacheLeaf{} + nc.order = list.New() + // we'll let the GC sort out the remains... +} + +func NewCache(limit int) *Cache { + return &Cache{limit: limit, nodes: map[string]*cacheLeaf{}, order: list.New()} +} diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index 814bfc80..7081ac25 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -21,6 +21,7 @@ type Manager interface { IterateNames(predicate func(name []byte) bool) Hash(name []byte) (*chainhash.Hash, int32) Flush() error + ClearCache() } type BaseManager struct { @@ -30,31 +31,62 @@ type BaseManager struct { changes []change.Change tempChanges map[string][]change.Change + + cache *Cache } func NewBaseManager(repo Repo) (*BaseManager, error) { nm := &BaseManager{ - repo: repo, + repo: repo, + cache: NewCache(10000), // TODO: how many should we cache? } return nm, nil } +func (nm *BaseManager) ClearCache() { + nm.cache.clear() +} + func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { - changes, err := nm.repo.LoadChanges(name) - if err != nil { - return nil, errors.Wrap(err, "in load changes") - } + n, changes, oldHeight := nm.cache.fetch(name, height) + if n == nil { + changes, err := nm.repo.LoadChanges(name) + if err != nil { + return nil, errors.Wrap(err, "in load changes") + } - if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block - changes = append(changes, nm.tempChanges[string(name)]...) - } + if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block + changes = append(changes, nm.tempChanges[string(name)]...) + } - n, err := nm.newNodeFromChanges(changes, height) - if err != nil { - return nil, errors.Wrap(err, "in new node") + n, err = nm.newNodeFromChanges(changes, height) + if err != nil { + return nil, errors.Wrap(err, "in new node") + } + // TODO: how can we tell what needs to be cached? + if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 4 || len(name) < 12) { + nm.cache.insert(name, n, height) + } + } else { + if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block + changes = append(changes, nm.tempChanges[string(name)]...) + n = n.Clone() + } else if height != nm.height { + n = n.Clone() + } + updated, err := nm.updateFromChanges(n, changes, height) + if err != nil { + return nil, errors.Wrap(err, "in update from changes") + } + if !updated { + n.AdjustTo(oldHeight, height, name) + } + if nm.tempChanges == nil && height == nm.height { + nm.cache.insert(name, n, height) + } } return n, nil @@ -66,17 +98,13 @@ func (nm *BaseManager) node(name []byte) (*Node, error) { return nm.NodeAt(nm.height, name) } -// newNodeFromChanges returns a new Node constructed from the changes. -// The changes must preserve their order received. -func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) { +func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, height int32) (bool, error) { - if len(changes) == 0 { - return nil, nil - } - - n := New() - previous := changes[0].Height count := len(changes) + if count == 0 { + return false, nil + } + previous := changes[0].Height for i, chg := range changes { if chg.Height < previous { @@ -95,15 +123,37 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) delay := nm.getDelayForName(n, chg) err := n.ApplyChange(chg, delay) if err != nil { - return nil, errors.Wrap(err, "in apply change") + return false, errors.Wrap(err, "in apply change") } } if count <= 0 { - return nil, nil + // we applied no changes, which means we shouldn't exist if we had all the changes + // or might mean nothing significant if we are applying a partial changeset + return false, nil } lastChange := changes[count-1] - return n.AdjustTo(lastChange.Height, height, lastChange.Name), nil + n.AdjustTo(lastChange.Height, height, lastChange.Name) + return true, nil +} + +// newNodeFromChanges returns a new Node constructed from the changes. +// The changes must preserve their order received. +func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) { + + if len(changes) == 0 { + return nil, nil + } + + n := New() + updated, err := nm.updateFromChanges(n, changes, height) + if err != nil { + return nil, errors.Wrap(err, "in update from changes") + } + if updated { + return n, nil + } + return nil, nil } func (nm *BaseManager) AppendChange(chg change.Change) { @@ -220,6 +270,7 @@ func (nm *BaseManager) IncrementHeightTo(height int32, temporary bool) ([][]byte } if !temporary { + nm.cache.addChanges(nm.changes, height) if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names return nil, errors.Wrap(err, "in append changes") } @@ -255,6 +306,8 @@ func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) ( return affectedNames, errors.Wrap(err, "in drop changes") } } + + nm.cache.drop(affectedNames) } nm.height = height diff --git a/claimtrie/node/node.go b/claimtrie/node/node.go index fe6db947..ff45fc11 100644 --- a/claimtrie/node/node.go +++ b/claimtrie/node/node.go @@ -110,7 +110,7 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error { } // AdjustTo activates claims and computes takeovers until it reaches the specified height. -func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node { +func (n *Node) AdjustTo(height, maxHeight int32, name []byte) { changed := n.handleExpiredAndActivated(height) > 0 n.updateTakeoverHeight(height, name, changed) if maxHeight > height { @@ -120,7 +120,6 @@ func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node { height = h } } - return n } func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) { @@ -340,3 +339,28 @@ func (n *Node) SortClaimsByBid() { return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint) }) } + +func (n *Node) Clone() *Node { + clone := New() + if n.SupportSums != nil { + clone.SupportSums = map[string]int64{} + for key, value := range n.SupportSums { + clone.SupportSums[key] = value + } + } + clone.Supports = make(ClaimList, len(n.Supports)) + for i, support := range n.Supports { + clone.Supports[i] = &Claim{} + *clone.Supports[i] = *support + } + clone.Claims = make(ClaimList, len(n.Claims)) + for i, claim := range n.Claims { + clone.Claims[i] = &Claim{} + *clone.Claims[i] = *claim + } + clone.TakenOverAt = n.TakenOverAt + if n.BestClaim != nil { + clone.BestClaim = clone.Claims.find(byID(n.BestClaim.ClaimID)) + } + return clone +} diff --git a/claimtrie/node/normalizing_manager.go b/claimtrie/node/normalizing_manager.go index 2f6c4cfe..604fa34d 100644 --- a/claimtrie/node/normalizing_manager.go +++ b/claimtrie/node/normalizing_manager.go @@ -34,6 +34,7 @@ func (nm *NormalizingManager) IncrementHeightTo(height int32, temporary bool) ([ func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) { if nm.normalizedAt > height { nm.normalizedAt = -1 + nm.ClearCache() } return nm.Manager.DecrementHeightTo(affectedNames, height) } @@ -110,5 +111,7 @@ func (nm *NormalizingManager) addNormalizationForkChangesIfNecessary(height int3 return true } + + nm.Manager.ClearCache() nm.Manager.IterateNames(predicate) }