diff --git a/claimtrie/node/cache.go b/claimtrie/node/cache.go index 0f556af1..1ae9caa7 100644 --- a/claimtrie/node/cache.go +++ b/claimtrie/node/cache.go @@ -3,6 +3,7 @@ package node import ( "container/list" "sync" + "sync/atomic" "github.com/lbryio/lbcd/claimtrie/change" ) @@ -27,8 +28,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) { nc.mtx.Lock() defer nc.mtx.Unlock() + atomic.AddInt32(&n.refcnt, 1) + existing := nc.nodes[key] if existing != nil { + existing.node.Close() existing.node = n existing.height = height existing.changes = nil @@ -38,8 +42,11 @@ func (nc *Cache) insert(name []byte, n *Node, height int32) { for nc.order.Len() >= nc.limit { // TODO: maybe ensure that we don't remove nodes that have a lot of changes? - delete(nc.nodes, nc.order.Back().Value.(string)) + exp := nc.order.Back().Value.(string) + expired := nc.nodes[exp] + delete(nc.nodes, exp) nc.order.Remove(nc.order.Back()) + expired.node.Close() } element := nc.order.PushFront(key) @@ -55,6 +62,7 @@ func (nc *Cache) fetch(name []byte, height int32) (*Node, []change.Change, int32 existing := nc.nodes[key] if existing != nil && existing.height <= height { nc.order.MoveToFront(existing.element) + atomic.AddInt32(&existing.node.refcnt, 1) return existing.node, existing.changes, existing.height } return nil, nil, -1 @@ -84,6 +92,7 @@ func (nc *Cache) drop(names [][]byte) { // we can't roll it backwards because we don't know its previous height value; just toast it delete(nc.nodes, key) nc.order.Remove(existing.element) + existing.node.Close() } } } @@ -91,6 +100,9 @@ func (nc *Cache) drop(names [][]byte) { func (nc *Cache) clear() { nc.mtx.Lock() defer nc.mtx.Unlock() + for _, existing := range nc.nodes { + existing.node.Close() + } nc.nodes = map[string]*cacheLeaf{} nc.order = list.New() // we'll let the GC sort out the remains... diff --git a/claimtrie/node/claim.go b/claimtrie/node/claim.go index 09a7ed08..87838f0f 100644 --- a/claimtrie/node/claim.go +++ b/claimtrie/node/claim.go @@ -4,6 +4,7 @@ import ( "bytes" "strconv" "strings" + "sync" "github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbcd/claimtrie/change" @@ -32,6 +33,12 @@ type Claim struct { Sequence int32 `msgpack:",omitempty"` } +func newClaim() interface{} { + return &Claim{} +} + +var claimPool = sync.Pool{New: newClaim} + func (c *Claim) setOutPoint(op wire.OutPoint) *Claim { c.OutPoint = op return c diff --git a/claimtrie/node/hashfork_manager.go b/claimtrie/node/hashfork_manager.go index bbd814ee..f43a190c 100644 --- a/claimtrie/node/hashfork_manager.go +++ b/claimtrie/node/hashfork_manager.go @@ -15,6 +15,7 @@ func (nm *HashV2Manager) computeClaimHashes(name []byte) (*chainhash.Hash, int32 if err != nil || n == nil { return nil, 0 } + defer n.Close() n.SortClaimsByBid() claimHashes := make([]*chainhash.Hash, 0, len(n.Claims)) diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index a0667b45..924dacb2 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -78,7 +78,9 @@ func (nm *BaseManager) NodeAt(height int32, name []byte) (*Node, error) { if nm.tempChanges != nil { // making an assumption that we only ever have tempChanges for a single block changes = append(changes, nm.tempChanges[string(name)]...) } + old := n n = n.Clone() + old.Close() updated, err := nm.updateFromChanges(n, changes, height) if err != nil { if n != nil { diff --git a/claimtrie/node/node.go b/claimtrie/node/node.go index 1eaccbe9..4568cb61 100644 --- a/claimtrie/node/node.go +++ b/claimtrie/node/node.go @@ -4,7 +4,7 @@ import ( "fmt" "math" "sort" - "sync" + "sync/atomic" "github.com/lbryio/lbcd/claimtrie/change" "github.com/lbryio/lbcd/claimtrie/param" @@ -16,24 +16,19 @@ type Node struct { Claims ClaimList // List of all Claims. Supports ClaimList // List of all Supports, including orphaned ones. SupportSums map[change.ClaimID]int64 + refcnt int32 } // New returns a new node. func New() *Node { - return &Node{SupportSums: map[change.ClaimID]int64{}} + return &Node{SupportSums: map[change.ClaimID]int64{}, refcnt: 1} } func (n *Node) HasActiveBestClaim() bool { return n.BestClaim != nil && n.BestClaim.Status == Activated } -var claimPool = sync.Pool{ - New: func() interface{} { - return &Claim{} - }, -} - -func (n *Node) Close() { +func (n *Node) close() { n.BestClaim = nil n.SupportSums = nil @@ -48,6 +43,17 @@ func (n *Node) Close() { n.Supports = nil } +func (n *Node) Close() { + new := atomic.AddInt32(&n.refcnt, -1) + if new < 0 { + panic("node refcnt underflow") + } + if new > 0 { + return + } + n.close() +} + func (n *Node) ApplyChange(chg change.Change, delay int32) error { visibleAt := chg.VisibleHeight @@ -377,12 +383,12 @@ func (n *Node) Clone() *Node { } clone.Supports = make(ClaimList, len(n.Supports)) for i, support := range n.Supports { - clone.Supports[i] = &Claim{} + clone.Supports[i] = claimPool.Get().(*Claim) *clone.Supports[i] = *support } clone.Claims = make(ClaimList, len(n.Claims)) for i, claim := range n.Claims { - clone.Claims[i] = &Claim{} + clone.Claims[i] = claimPool.Get().(*Claim) *clone.Claims[i] = *claim } clone.TakenOverAt = n.TakenOverAt