diff --git a/claimtrie/node/cache.go b/claimtrie/node/cache.go new file mode 100644 index 00000000..0f556af1 --- /dev/null +++ b/claimtrie/node/cache.go @@ -0,0 +1,101 @@ +package node + +import ( + "container/list" + "sync" + + "github.com/lbryio/lbcd/claimtrie/change" +) + +type cacheLeaf struct { + node *Node + element *list.Element + changes []change.Change + height int32 +} + +type Cache struct { + nodes map[string]*cacheLeaf + order *list.List + mtx sync.Mutex + limit int +} + +func (nc *Cache) insert(name []byte, n *Node, height int32) { + key := string(name) + + nc.mtx.Lock() + defer nc.mtx.Unlock() + + existing := nc.nodes[key] + if existing != nil { + existing.node = n + existing.height = height + existing.changes = nil + nc.order.MoveToFront(existing.element) + return + } + + for nc.order.Len() >= nc.limit { + // TODO: maybe ensure that we don't remove nodes that have a lot of changes? + delete(nc.nodes, nc.order.Back().Value.(string)) + nc.order.Remove(nc.order.Back()) + } + + element := nc.order.PushFront(key) + nc.nodes[key] = &cacheLeaf{node: n, element: element, height: height} +} + +func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) { + key := string(name) + + nc.mtx.Lock() + defer nc.mtx.Unlock() + + existing := nc.nodes[key] + if existing != nil && existing.height <= height { + nc.order.MoveToFront(existing.element) + return existing.node, existing.changes, existing.height + } + return nil, nil, -1 +} + +func (nc *Cache) addChanges(changes []change.Change, height int32) { + nc.mtx.Lock() + defer nc.mtx.Unlock() + + for _, c := range changes { + key := string(c.Name) + existing := nc.nodes[key] + if existing != nil && existing.height <= height { + existing.changes = append(existing.changes, c) + } + } +} + +func (nc *Cache) drop(names [][]byte) { + nc.mtx.Lock() + defer nc.mtx.Unlock() + + for _, name := range names { + key := string(name) + existing := nc.nodes[key] + if existing != nil { + // we can't roll it backwards because we don't know its previous height value; just toast it + delete(nc.nodes, key) + nc.order.Remove(existing.element) + } + } +} + +func (nc *Cache) clear() { + nc.mtx.Lock() + defer nc.mtx.Unlock() + nc.nodes = map[string]*cacheLeaf{} + nc.order = list.New() + // we'll let the GC sort out the remains... +} + +func NewCache(limit int) *Cache { + return &Cache{limit: limit, nodes: map[string]*cacheLeaf{}, order: list.New()} +} diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index 814bfc80..31ba0f1a 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -21,6 +21,7 @@ type Manager interface { IterateNames(predicate func(name []byte) bool) Hash(name []byte) (*chainhash.Hash, int32) Flush() error + ClearCache() } type BaseManager struct { @@ -30,31 +31,60 @@ type BaseManager struct { changes []change.Change tempChanges map[string][]change.Change + + cache *Cache } func NewBaseManager(repo Repo) (*BaseManager, error) { nm := &BaseManager{ - repo: repo, + repo: repo, + cache: NewCache(10000), // TODO: how many should we cache? } return nm, nil } +func (nm *BaseManager) ClearCache() { + nm.cache.clear() +} + func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { - changes, err := nm.repo.LoadChanges(name) - if err != nil { - return nil, errors.Wrap(err, "in load changes") - } + n, changes, oldHeight := nm.cache.fetch(name, height) + if n == nil { + changes, err := nm.repo.LoadChanges(name) + if err != nil { + return nil, errors.Wrap(err, "in load changes") + } - if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block - changes = append(changes, nm.tempChanges[string(name)]...) - } + if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block + changes = append(changes, nm.tempChanges[string(name)]...) + } - n, err := nm.newNodeFromChanges(changes, height) - if err != nil { - return nil, errors.Wrap(err, "in new node") + n, err = nm.newNodeFromChanges(changes, height) + if err != nil { + return nil, errors.Wrap(err, "in new node") + } + // TODO: how can we tell what needs to be cached? + if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 7 || len(name) < 12) { + nm.cache.insert(name, n, height) + } + } else { + if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block + changes = append(changes, nm.tempChanges[string(name)]...) + } + n = n.Clone() + updated, err := nm.updateFromChanges(n, changes, height) + if err != nil { + return nil, errors.Wrap(err, "in update from changes") + } + if !updated { + n.AdjustTo(oldHeight, height, name) + } + if nm.tempChanges == nil && height == nm.height { // TODO: how many changes before we update the cache? + nm.cache.insert(name, n, height) + } } return n, nil @@ -66,17 +96,13 @@ func (nm *BaseManager) node(name []byte) (*Node, error) { return nm.NodeAt(nm.height, name) } -// newNodeFromChanges returns a new Node constructed from the changes. -// The changes must preserve their order received. -func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) { +func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, height int32) (bool, error) { - if len(changes) == 0 { - return nil, nil - } - - n := New() - previous := changes[0].Height count := len(changes) + if count == 0 { + return false, nil + } + previous := changes[0].Height for i, chg := range changes { if chg.Height < previous { @@ -95,15 +121,37 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) delay := nm.getDelayForName(n, chg) err := n.ApplyChange(chg, delay) if err != nil { - return nil, errors.Wrap(err, "in apply change") + return false, errors.Wrap(err, "in apply change") } } if count <= 0 { - return nil, nil + // we applied no changes, which means we shouldn't exist if we had all the changes + // or might mean nothing significant if we are applying a partial changeset + return false, nil } lastChange := changes[count-1] - return n.AdjustTo(lastChange.Height, height, lastChange.Name), nil + n.AdjustTo(lastChange.Height, height, lastChange.Name) + return true, nil +} + +// newNodeFromChanges returns a new Node constructed from the changes. +// The changes must preserve their order received. +func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) { + + if len(changes) == 0 { + return nil, nil + } + + n := New() + updated, err := nm.updateFromChanges(n, changes, height) + if err != nil { + return nil, errors.Wrap(err, "in update from changes") + } + if updated { + return n, nil + } + return nil, nil } func (nm *BaseManager) AppendChange(chg change.Change) { @@ -220,6 +268,7 @@ func (nm *BaseManager) IncrementHeightTo(height int32, temporary bool) ([][]byte } if !temporary { + nm.cache.addChanges(nm.changes, height) if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names return nil, errors.Wrap(err, "in append changes") } @@ -255,6 +304,8 @@ func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) ( return affectedNames, errors.Wrap(err, "in drop changes") } } + + nm.cache.drop(affectedNames) } nm.height = height diff --git a/claimtrie/node/node.go b/claimtrie/node/node.go index fe6db947..ff45fc11 100644 --- a/claimtrie/node/node.go +++ b/claimtrie/node/node.go @@ -110,7 +110,7 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error { } // AdjustTo activates claims and computes takeovers until it reaches the specified height. -func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node { +func (n *Node) AdjustTo(height, maxHeight int32, name []byte) { changed := n.handleExpiredAndActivated(height) > 0 n.updateTakeoverHeight(height, name, changed) if maxHeight > height { @@ -120,7 +120,6 @@ func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node { height = h } } - return n } func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) { @@ -340,3 +339,28 @@ func (n *Node) SortClaimsByBid() { return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint) }) } + +func (n *Node) Clone() *Node { + clone := New() + if n.SupportSums != nil { + clone.SupportSums = map[string]int64{} + for key, value := range n.SupportSums { + clone.SupportSums[key] = value + } + } + clone.Supports = make(ClaimList, len(n.Supports)) + for i, support := range n.Supports { + clone.Supports[i] = &Claim{} + *clone.Supports[i] = *support + } + clone.Claims = make(ClaimList, len(n.Claims)) + for i, claim := range n.Claims { + clone.Claims[i] = &Claim{} + *clone.Claims[i] = *claim + } + clone.TakenOverAt = n.TakenOverAt + if n.BestClaim != nil { + clone.BestClaim = clone.Claims.find(byID(n.BestClaim.ClaimID)) + } + return clone +} diff --git a/claimtrie/node/normalizing_manager.go b/claimtrie/node/normalizing_manager.go index 2f6c4cfe..604fa34d 100644 --- a/claimtrie/node/normalizing_manager.go +++ b/claimtrie/node/normalizing_manager.go @@ -34,6 +34,7 @@ func (nm *NormalizingManager) IncrementHeightTo(height int32, temporary bool) ([ func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) { if nm.normalizedAt > height { nm.normalizedAt = -1 + nm.ClearCache() } return nm.Manager.DecrementHeightTo(affectedNames, height) } @@ -110,5 +111,7 @@ func (nm *NormalizingManager) addNormalizationForkChangesIfNecessary(height int3 return true } + + nm.Manager.ClearCache() nm.Manager.IterateNames(predicate) }