[lbry] claimtrie: created node cache
This commit is contained in:
parent
5d5f53c8d8
commit
8f95946b17
4 changed files with 204 additions and 25 deletions
101
claimtrie/node/cache.go
Normal file
101
claimtrie/node/cache.go
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
package node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbcd/claimtrie/change"
|
||||||
|
)
|
||||||
|
|
||||||
|
type cacheLeaf struct {
|
||||||
|
node *Node
|
||||||
|
element *list.Element
|
||||||
|
changes []change.Change
|
||||||
|
height int32
|
||||||
|
}
|
||||||
|
|
||||||
|
type Cache struct {
|
||||||
|
nodes map[string]*cacheLeaf
|
||||||
|
order *list.List
|
||||||
|
mtx sync.Mutex
|
||||||
|
limit int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *Cache) insert(name []byte, n *Node, height int32) {
|
||||||
|
key := string(name)
|
||||||
|
|
||||||
|
nc.mtx.Lock()
|
||||||
|
defer nc.mtx.Unlock()
|
||||||
|
|
||||||
|
existing := nc.nodes[key]
|
||||||
|
if existing != nil {
|
||||||
|
existing.node = n
|
||||||
|
existing.height = height
|
||||||
|
existing.changes = nil
|
||||||
|
nc.order.MoveToFront(existing.element)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for nc.order.Len() >= nc.limit {
|
||||||
|
// TODO: maybe ensure that we don't remove nodes that have a lot of changes?
|
||||||
|
delete(nc.nodes, nc.order.Back().Value.(string))
|
||||||
|
nc.order.Remove(nc.order.Back())
|
||||||
|
}
|
||||||
|
|
||||||
|
element := nc.order.PushFront(key)
|
||||||
|
nc.nodes[key] = &cacheLeaf{node: n, element: element, height: height}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32) {
|
||||||
|
key := string(name)
|
||||||
|
|
||||||
|
nc.mtx.Lock()
|
||||||
|
defer nc.mtx.Unlock()
|
||||||
|
|
||||||
|
existing := nc.nodes[key]
|
||||||
|
if existing != nil && existing.height <= height {
|
||||||
|
nc.order.MoveToFront(existing.element)
|
||||||
|
return existing.node, existing.changes, existing.height
|
||||||
|
}
|
||||||
|
return nil, nil, -1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *Cache) addChanges(changes []change.Change, height int32) {
|
||||||
|
nc.mtx.Lock()
|
||||||
|
defer nc.mtx.Unlock()
|
||||||
|
|
||||||
|
for _, c := range changes {
|
||||||
|
key := string(c.Name)
|
||||||
|
existing := nc.nodes[key]
|
||||||
|
if existing != nil && existing.height <= height {
|
||||||
|
existing.changes = append(existing.changes, c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *Cache) drop(names [][]byte) {
|
||||||
|
nc.mtx.Lock()
|
||||||
|
defer nc.mtx.Unlock()
|
||||||
|
|
||||||
|
for _, name := range names {
|
||||||
|
key := string(name)
|
||||||
|
existing := nc.nodes[key]
|
||||||
|
if existing != nil {
|
||||||
|
// we can't roll it backwards because we don't know its previous height value; just toast it
|
||||||
|
delete(nc.nodes, key)
|
||||||
|
nc.order.Remove(existing.element)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *Cache) clear() {
|
||||||
|
nc.mtx.Lock()
|
||||||
|
defer nc.mtx.Unlock()
|
||||||
|
nc.nodes = map[string]*cacheLeaf{}
|
||||||
|
nc.order = list.New()
|
||||||
|
// we'll let the GC sort out the remains...
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCache(limit int) *Cache {
|
||||||
|
return &Cache{limit: limit, nodes: map[string]*cacheLeaf{}, order: list.New()}
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ type Manager interface {
|
||||||
IterateNames(predicate func(name []byte) bool)
|
IterateNames(predicate func(name []byte) bool)
|
||||||
Hash(name []byte) (*chainhash.Hash, int32)
|
Hash(name []byte) (*chainhash.Hash, int32)
|
||||||
Flush() error
|
Flush() error
|
||||||
|
ClearCache()
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseManager struct {
|
type BaseManager struct {
|
||||||
|
@ -30,31 +31,60 @@ type BaseManager struct {
|
||||||
changes []change.Change
|
changes []change.Change
|
||||||
|
|
||||||
tempChanges map[string][]change.Change
|
tempChanges map[string][]change.Change
|
||||||
|
|
||||||
|
cache *Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBaseManager(repo Repo) (*BaseManager, error) {
|
func NewBaseManager(repo Repo) (*BaseManager, error) {
|
||||||
|
|
||||||
nm := &BaseManager{
|
nm := &BaseManager{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
|
cache: NewCache(10000), // TODO: how many should we cache?
|
||||||
}
|
}
|
||||||
|
|
||||||
return nm, nil
|
return nm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (nm *BaseManager) ClearCache() {
|
||||||
|
nm.cache.clear()
|
||||||
|
}
|
||||||
|
|
||||||
func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
|
func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
|
||||||
|
|
||||||
changes, err := nm.repo.LoadChanges(name)
|
n, changes, oldHeight := nm.cache.fetch(name, height)
|
||||||
if err != nil {
|
if n == nil {
|
||||||
return nil, errors.Wrap(err, "in load changes")
|
changes, err := nm.repo.LoadChanges(name)
|
||||||
}
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "in load changes")
|
||||||
|
}
|
||||||
|
|
||||||
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
|
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
|
||||||
changes = append(changes, nm.tempChanges[string(name)]...)
|
changes = append(changes, nm.tempChanges[string(name)]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := nm.newNodeFromChanges(changes, height)
|
n, err = nm.newNodeFromChanges(changes, height)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "in new node")
|
return nil, errors.Wrap(err, "in new node")
|
||||||
|
}
|
||||||
|
// TODO: how can we tell what needs to be cached?
|
||||||
|
if nm.tempChanges == nil && height == nm.height && n != nil && (len(changes) > 7 || len(name) < 12) {
|
||||||
|
nm.cache.insert(name, n, height)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
|
||||||
|
changes = append(changes, nm.tempChanges[string(name)]...)
|
||||||
|
}
|
||||||
|
n = n.Clone()
|
||||||
|
updated, err := nm.updateFromChanges(n, changes, height)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "in update from changes")
|
||||||
|
}
|
||||||
|
if !updated {
|
||||||
|
n.AdjustTo(oldHeight, height, name)
|
||||||
|
}
|
||||||
|
if nm.tempChanges == nil && height == nm.height { // TODO: how many changes before we update the cache?
|
||||||
|
nm.cache.insert(name, n, height)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return n, nil
|
return n, nil
|
||||||
|
@ -66,17 +96,13 @@ func (nm *BaseManager) node(name []byte) (*Node, error) {
|
||||||
return nm.NodeAt(nm.height, name)
|
return nm.NodeAt(nm.height, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newNodeFromChanges returns a new Node constructed from the changes.
|
func (nm *BaseManager) updateFromChanges(n *Node, changes []change.Change, height int32) (bool, error) {
|
||||||
// The changes must preserve their order received.
|
|
||||||
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
|
|
||||||
|
|
||||||
if len(changes) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
n := New()
|
|
||||||
previous := changes[0].Height
|
|
||||||
count := len(changes)
|
count := len(changes)
|
||||||
|
if count == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
previous := changes[0].Height
|
||||||
|
|
||||||
for i, chg := range changes {
|
for i, chg := range changes {
|
||||||
if chg.Height < previous {
|
if chg.Height < previous {
|
||||||
|
@ -95,15 +121,37 @@ func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32)
|
||||||
delay := nm.getDelayForName(n, chg)
|
delay := nm.getDelayForName(n, chg)
|
||||||
err := n.ApplyChange(chg, delay)
|
err := n.ApplyChange(chg, delay)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "in apply change")
|
return false, errors.Wrap(err, "in apply change")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if count <= 0 {
|
if count <= 0 {
|
||||||
return nil, nil
|
// we applied no changes, which means we shouldn't exist if we had all the changes
|
||||||
|
// or might mean nothing significant if we are applying a partial changeset
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
lastChange := changes[count-1]
|
lastChange := changes[count-1]
|
||||||
return n.AdjustTo(lastChange.Height, height, lastChange.Name), nil
|
n.AdjustTo(lastChange.Height, height, lastChange.Name)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newNodeFromChanges returns a new Node constructed from the changes.
|
||||||
|
// The changes must preserve their order received.
|
||||||
|
func (nm *BaseManager) newNodeFromChanges(changes []change.Change, height int32) (*Node, error) {
|
||||||
|
|
||||||
|
if len(changes) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
n := New()
|
||||||
|
updated, err := nm.updateFromChanges(n, changes, height)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "in update from changes")
|
||||||
|
}
|
||||||
|
if updated {
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nm *BaseManager) AppendChange(chg change.Change) {
|
func (nm *BaseManager) AppendChange(chg change.Change) {
|
||||||
|
@ -220,6 +268,7 @@ func (nm *BaseManager) IncrementHeightTo(height int32, temporary bool) ([][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
if !temporary {
|
if !temporary {
|
||||||
|
nm.cache.addChanges(nm.changes, height)
|
||||||
if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names
|
if err := nm.repo.AppendChanges(nm.changes); err != nil { // destroys names
|
||||||
return nil, errors.Wrap(err, "in append changes")
|
return nil, errors.Wrap(err, "in append changes")
|
||||||
}
|
}
|
||||||
|
@ -255,6 +304,8 @@ func (nm *BaseManager) DecrementHeightTo(affectedNames [][]byte, height int32) (
|
||||||
return affectedNames, errors.Wrap(err, "in drop changes")
|
return affectedNames, errors.Wrap(err, "in drop changes")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nm.cache.drop(affectedNames)
|
||||||
}
|
}
|
||||||
nm.height = height
|
nm.height = height
|
||||||
|
|
||||||
|
|
|
@ -110,7 +110,7 @@ func (n *Node) ApplyChange(chg change.Change, delay int32) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AdjustTo activates claims and computes takeovers until it reaches the specified height.
|
// AdjustTo activates claims and computes takeovers until it reaches the specified height.
|
||||||
func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node {
|
func (n *Node) AdjustTo(height, maxHeight int32, name []byte) {
|
||||||
changed := n.handleExpiredAndActivated(height) > 0
|
changed := n.handleExpiredAndActivated(height) > 0
|
||||||
n.updateTakeoverHeight(height, name, changed)
|
n.updateTakeoverHeight(height, name, changed)
|
||||||
if maxHeight > height {
|
if maxHeight > height {
|
||||||
|
@ -120,7 +120,6 @@ func (n *Node) AdjustTo(height, maxHeight int32, name []byte) *Node {
|
||||||
height = h
|
height = h
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) {
|
func (n *Node) updateTakeoverHeight(height int32, name []byte, refindBest bool) {
|
||||||
|
@ -340,3 +339,28 @@ func (n *Node) SortClaimsByBid() {
|
||||||
return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint)
|
return OutPointLess(n.Claims[j].OutPoint, n.Claims[i].OutPoint)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Node) Clone() *Node {
|
||||||
|
clone := New()
|
||||||
|
if n.SupportSums != nil {
|
||||||
|
clone.SupportSums = map[string]int64{}
|
||||||
|
for key, value := range n.SupportSums {
|
||||||
|
clone.SupportSums[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
clone.Supports = make(ClaimList, len(n.Supports))
|
||||||
|
for i, support := range n.Supports {
|
||||||
|
clone.Supports[i] = &Claim{}
|
||||||
|
*clone.Supports[i] = *support
|
||||||
|
}
|
||||||
|
clone.Claims = make(ClaimList, len(n.Claims))
|
||||||
|
for i, claim := range n.Claims {
|
||||||
|
clone.Claims[i] = &Claim{}
|
||||||
|
*clone.Claims[i] = *claim
|
||||||
|
}
|
||||||
|
clone.TakenOverAt = n.TakenOverAt
|
||||||
|
if n.BestClaim != nil {
|
||||||
|
clone.BestClaim = clone.Claims.find(byID(n.BestClaim.ClaimID))
|
||||||
|
}
|
||||||
|
return clone
|
||||||
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ func (nm *NormalizingManager) IncrementHeightTo(height int32, temporary bool) ([
|
||||||
func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) {
|
func (nm *NormalizingManager) DecrementHeightTo(affectedNames [][]byte, height int32) ([][]byte, error) {
|
||||||
if nm.normalizedAt > height {
|
if nm.normalizedAt > height {
|
||||||
nm.normalizedAt = -1
|
nm.normalizedAt = -1
|
||||||
|
nm.ClearCache()
|
||||||
}
|
}
|
||||||
return nm.Manager.DecrementHeightTo(affectedNames, height)
|
return nm.Manager.DecrementHeightTo(affectedNames, height)
|
||||||
}
|
}
|
||||||
|
@ -110,5 +111,7 @@ func (nm *NormalizingManager) addNormalizationForkChangesIfNecessary(height int3
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nm.Manager.ClearCache()
|
||||||
nm.Manager.IterateNames(predicate)
|
nm.Manager.IterateNames(predicate)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue