Integrate "reuse claim memory" with node cache. Return claims to claimPool when Node refcnt drops to zero.

This commit is contained in:
Jonathan Moody 2022-06-06 15:01:21 -04:00
parent 8482890759
commit 787098e2a2
5 changed files with 40 additions and 12 deletions

View file

@ -3,6 +3,7 @@ package node
import (
"container/list"
"sync"
"sync/atomic"
"github.com/lbryio/lbcd/claimtrie/change"
)
@ -27,8 +28,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) {
nc.mtx.Lock()
defer nc.mtx.Unlock()
atomic.AddInt32(&n.refcnt, 1)
existing := nc.nodes[key]
if existing != nil {
existing.node.Close()
existing.node = n
existing.height = height
existing.changes = nil
@ -38,8 +42,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) {
for nc.order.Len() >= nc.limit {
// TODO: maybe ensure that we don't remove nodes that have a lot of changes?
delete(nc.nodes, nc.order.Back().Value.(string))
exp := nc.order.Back().Value.(string)
expired := nc.nodes[exp]
delete(nc.nodes, exp)
nc.order.Remove(nc.order.Back())
expired.node.Close()
}
element := nc.order.PushFront(key)
@ -55,6 +62,7 @@ func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32
existing := nc.nodes[key]
if existing != nil && existing.height <= height {
nc.order.MoveToFront(existing.element)
atomic.AddInt32(&existing.node.refcnt, 1)
return existing.node, existing.changes, existing.height
}
return nil, nil, -1
@ -84,6 +92,7 @@ func (nc *Cache) drop(names [][]byte) {
// we can't roll it backwards because we don't know its previous height value; just toast it
delete(nc.nodes, key)
nc.order.Remove(existing.element)
existing.node.Close()
}
}
}
@ -91,6 +100,9 @@ func (nc *Cache) drop(names [][]byte) {
func (nc *Cache) clear() {
nc.mtx.Lock()
defer nc.mtx.Unlock()
for _, existing := range nc.nodes {
existing.node.Close()
}
nc.nodes = map[string]*cacheLeaf{}
nc.order = list.New()
// we'll let the GC sort out the remains...

View file

@ -4,6 +4,7 @@ import (
"bytes"
"strconv"
"strings"
"sync"
"github.com/lbryio/lbcd/chaincfg/chainhash"
"github.com/lbryio/lbcd/claimtrie/change"
@ -32,6 +33,12 @@ type Claim struct {
Sequence int32 `msgpack:",omitempty"`
}
func newClaim() interface{} {
return &Claim{}
}
var claimPool = sync.Pool{New: newClaim}
func (c *Claim) setOutPoint(op wire.OutPoint) *Claim {
c.OutPoint = op
return c

View file

@ -15,6 +15,7 @@ func (nm *HashV2Manager) computeClaimHashes(name []byte) (*chainhash.Hash, int32
if err != nil || n == nil {
return nil, 0
}
defer n.Close()
n.SortClaimsByBid()
claimHashes := make([]*chainhash.Hash, 0, len(n.Claims))

View file

@ -78,7 +78,9 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) {
if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block
changes = append(changes, nm.tempChanges[string(name)]...)
}
old := n
n = n.Clone()
old.Close()
updated, err := nm.updateFromChanges(n, changes, height)
if err != nil {
if n != nil {

View file

@ -4,7 +4,7 @@ import (
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
"github.com/lbryio/lbcd/claimtrie/change"
"github.com/lbryio/lbcd/claimtrie/param"
@ -16,24 +16,19 @@ type Node struct {
Claims ClaimList // List of all Claims.
Supports ClaimList // List of all Supports, including orphaned ones.
SupportSums map[change.ClaimID]int64
refcnt int32
}
// New returns a new node.
func New() *Node {
return &Node{SupportSums: map[change.ClaimID]int64{}}
return &Node{SupportSums: map[change.ClaimID]int64{}, refcnt: 1}
}
func (n *Node) HasActiveBestClaim() bool {
return n.BestClaim != nil && n.BestClaim.Status == Activated
}
var claimPool = sync.Pool{
New: func() interface{} {
return &Claim{}
},
}
func (n *Node) Close() {
func (n *Node) close() {
n.BestClaim = nil
n.SupportSums = nil
@ -48,6 +43,17 @@ func (n *Node) Close() {
n.Supports = nil
}
func (n *Node) Close() {
new := atomic.AddInt32(&n.refcnt, -1)
if new < 0 {
panic("node refcnt underflow")
}
if new > 0 {
return
}
n.close()
}
func (n *Node) ApplyChange(chg change.Change, delay int32) error {
visibleAt := chg.VisibleHeight
@ -377,12 +383,12 @@ func (n *Node) Clone() *Node {
}
clone.Supports = make(ClaimList, len(n.Supports))
for i, support := range n.Supports {
clone.Supports[i] = &Claim{}
clone.Supports[i] = claimPool.Get().(*Claim)
*clone.Supports[i] = *support
}
clone.Claims = make(ClaimList, len(n.Claims))
for i, claim := range n.Claims {
clone.Claims[i] = &Claim{}
clone.Claims[i] = claimPool.Get().(*Claim)
*clone.Claims[i] = *claim
}
clone.TakenOverAt = n.TakenOverAt