diff --git a/nodemgr/nm.go b/nodemgr/nm.go index 318f0d0..3131aae 100644 --- a/nodemgr/nm.go +++ b/nodemgr/nm.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "fmt" "sort" + "sync" "github.com/lbryio/claimtrie/change" "github.com/lbryio/claimtrie/claim" @@ -17,8 +18,11 @@ import ( // NodeMgr ... type NodeMgr struct { height claim.Height + db *leveldb.DB - db *leveldb.DB + // cachemu synchronizes the access to the map itself, but not the + // values of the node. + cachemu sync.RWMutex cache map[string]*claim.Node nextUpdates todos } @@ -35,12 +39,16 @@ func New(db *leveldb.DB) *NodeMgr { // Load loads the nodes from the database up to height ht. func (nm *NodeMgr) Load(ht claim.Height) { + nm.height = ht iter := nm.db.NewIterator(nil, nil) + nm.cachemu.Lock() for iter.Next() { name := string(iter.Key()) nm.cache[name] = nm.load(name, ht) } + nm.cachemu.Unlock() + data, err := nm.db.Get([]byte("nextUpdates"), nil) if err == leveldb.ErrNotFound { return @@ -71,6 +79,8 @@ func (nm *NodeMgr) Get(key []byte) trie.Value { // Reset resets all nodes to specified height. func (nm *NodeMgr) Reset(ht claim.Height) { + nm.cachemu.Lock() + defer nm.cachemu.Unlock() nm.height = ht for name, n := range nm.cache { if n.Height() >= ht { @@ -81,21 +91,25 @@ func (nm *NodeMgr) Reset(ht claim.Height) { // Size returns the number of nodes loaded into the cache. func (nm *NodeMgr) Size() int { + nm.cachemu.RLock() + defer nm.cachemu.RUnlock() return len(nm.cache) } func (nm *NodeMgr) load(name string, ht claim.Height) *claim.Node { c := change.NewChangeList(nm.db, name).Load().Truncate(ht).Changes() - return NewFromChanges(name, c, ht) + return replay(name, c).AdjustTo(ht) } // NodeAt returns the node adjusted to specified height. func (nm *NodeMgr) NodeAt(name string, ht claim.Height) *claim.Node { + nm.cachemu.Lock() n, ok := nm.cache[name] if !ok { n = claim.NewNode(name) nm.cache[name] = n } + nm.cachemu.Unlock() // Cached version is too new. if n.Height() > nm.height || n.Height() > ht { @@ -112,7 +126,9 @@ func (nm *NodeMgr) ModifyNode(name string, chg *change.Change) error { if err := execute(n, chg); err != nil { return errors.Wrapf(err, "claim.execute(n,chg)") } + nm.cachemu.Lock() nm.cache[name] = n + nm.cachemu.Unlock() nm.nextUpdates.set(name, ht+1) change.NewChangeList(nm.db, name).Load().Append(chg).Save() return nil @@ -129,11 +145,14 @@ func (nm *NodeMgr) CatchUp(ht claim.Height, notifier func(key []byte)) { } } -// VisitFunc ... +// VisitFunc visit each node in read-only manner. type VisitFunc func(n *claim.Node) (stop bool) -// Visit visits every node in the cache with VisiFunc. +// Visit visits every node in the cache with VisitFunc. +// If the VisitFunc returns true, the iteration ends immediately. func (nm *NodeMgr) Visit(v VisitFunc) { + nm.cachemu.RLock() + defer nm.cachemu.RUnlock() for _, n := range nm.cache { if v(n) { return @@ -141,7 +160,7 @@ func (nm *NodeMgr) Visit(v VisitFunc) { } } -// Show is a conevenient function for debugging and velopment purpose. +// Show is a conevenient function for debugging purpose. // The proper way to handle user request would be a query function with filters specified. func (nm *NodeMgr) Show(name string, ht claim.Height, dump bool) error { names := []string{} @@ -166,11 +185,6 @@ func (nm *NodeMgr) Show(name string, ht claim.Height, dump bool) error { return nil } -// NewFromChanges ... -func NewFromChanges(name string, chgs []*change.Change, ht claim.Height) *claim.Node { - return replay(name, chgs).AdjustTo(ht) -} - func replay(name string, chgs []*change.Change) *claim.Node { n := claim.NewNode(name) for _, chg := range chgs {