From 4e78c08818f4b7e0e0a49fc5ac5270969eab5821 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 25 Jun 2018 16:49:40 -0400 Subject: [PATCH] partial switch to new stopgroup. need to refactor to take advantage of child cancelation --- dht/bootstrap.go | 8 ++++---- dht/dht.go | 39 +++++++++++++++++++++++---------------- dht/node.go | 28 ++++++++++++++-------------- dht/node_finder.go | 35 ++++++++++++----------------------- dht/routing_table.go | 8 ++++---- dht/token_cache.go | 4 ++-- dht/token_manager.go | 6 +++--- 7 files changed, 62 insertions(+), 66 deletions(-) diff --git a/dht/bootstrap.go b/dht/bootstrap.go index 2bb9230..f5bdd07 100644 --- a/dht/bootstrap.go +++ b/dht/bootstrap.go @@ -64,7 +64,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error { select { case <-t.C: b.check() - case <-b.stop.Ch(): + case <-b.grp.Ch(): return } } @@ -129,8 +129,8 @@ func (b *BootstrapNode) get(limit int) []Contact { // ping pings a node. if the node responds, it is added to the list. otherwise, it is removed func (b *BootstrapNode) ping(c Contact) { log.Debugf("[%s] bootstrap: pinging %s", b.id.HexShort(), c.ID.HexShort()) - b.stop.Add(1) - defer b.stop.Done() + b.grp.Add(1) + defer b.grp.Done() resCh := b.SendAsync(c, Request{Method: pingMethod}) @@ -138,7 +138,7 @@ func (b *BootstrapNode) ping(c Contact) { select { case res = <-resCh: - case <-b.stop.Ch(): + case <-b.grp.Ch(): return } diff --git a/dht/dht.go b/dht/dht.go index 37f16de..3990d32 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -12,13 +12,20 @@ import ( peerproto "github.com/lbryio/reflector.go/peer" "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/stop" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/spf13/cast" ) +var log *logrus.Logger + +func UseLogger(l *logrus.Logger) { + log = l +} + func init() { + log = logrus.StandardLogger() //log.SetFormatter(&log.TextFormatter{ForceColors: true}) //log.SetLevel(log.DebugLevel) } @@ -87,8 +94,8 @@ type DHT struct { contact Contact // node node *Node - // stopper to shut down DHT - stop *stopOnce.Stopper + // stopGroup to shut down DHT + grp *stop.Group // channel is closed when DHT joins network joined chan struct{} // lock for announced list @@ -107,7 +114,7 @@ func New(config *Config) *DHT { d := &DHT{ conf: config, - stop: stopOnce.New(), + grp: stop.New(), joined: make(chan struct{}), lock: &sync.RWMutex{}, announced: make(map[bits.Bitmap]bool), @@ -177,7 +184,7 @@ func (dht *DHT) join() { } // now call iterativeFind on yourself - _, _, err := FindContacts(dht.node, dht.node.id, false, dht.stop.Ch()) + _, _, err := FindContacts(dht.node, dht.node.id, false, dht.grp.Child()) if err != nil { log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) } @@ -197,7 +204,7 @@ func (dht *DHT) WaitUntilJoined() { // Shutdown shuts down the dht func (dht *DHT) Shutdown() { log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort()) - dht.stop.StopAndWait() + dht.grp.StopAndWait() dht.node.Shutdown() log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) } @@ -221,7 +228,7 @@ func (dht *DHT) Ping(addr string) error { // Get returns the list of nodes that have the blob for the given hash func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) { - contacts, found, err := FindContacts(dht.node, hash, true, dht.stop.Ch()) + contacts, found, err := FindContacts(dht.node, hash, true, dht.grp.Child()) if err != nil { return nil, err } @@ -242,9 +249,9 @@ func (dht *DHT) Add(hash bits.Bitmap) { return } - dht.stop.Add(1) + dht.grp.Add(1) go func() { - defer dht.stop.Done() + defer dht.grp.Done() err := dht.announce(hash) if err != nil { log.Error(errors.Prefix("error announcing bitmap", err)) @@ -254,7 +261,7 @@ func (dht *DHT) Add(hash bits.Bitmap) { // Announce announces to the DHT that this node has the blob for the given hash func (dht *DHT) announce(hash bits.Bitmap) error { - contacts, _, err := FindContacts(dht.node, hash, false, dht.stop.Ch()) + contacts, _, err := FindContacts(dht.node, hash, false, dht.grp.Child()) if err != nil { return err } @@ -290,14 +297,14 @@ func (dht *DHT) startReannouncer() { tick := time.NewTicker(tReannounce) for { select { - case <-dht.stop.Ch(): + case <-dht.grp.Ch(): return case <-tick.C: dht.lock.RLock() for h := range dht.announced { - dht.stop.Add(1) + dht.grp.Add(1) go func(bm bits.Bitmap) { - defer dht.stop.Done() + defer dht.grp.Done() err := dht.announce(bm) if err != nil { log.Error("error re-announcing bitmap - ", err) @@ -316,7 +323,7 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) { return } - token := dht.tokenCache.Get(c, hash, dht.stop.Ch()) + token := dht.tokenCache.Get(c, hash, dht.grp.Ch()) resCh := dht.node.SendAsync(c, Request{ Method: storeMethod, @@ -333,7 +340,7 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) { go func() { select { case <-resCh: - case <-dht.stop.Ch(): + case <-dht.grp.Ch(): } }() } diff --git a/dht/node.go b/dht/node.go index 7bc828a..0a5b5e2 100644 --- a/dht/node.go +++ b/dht/node.go @@ -8,7 +8,7 @@ import ( "time" "github.com/lbryio/errors.go" - "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/stop" "github.com/lbryio/lbry.go/util" "github.com/lbryio/reflector.go/dht/bits" @@ -60,7 +60,7 @@ type Node struct { requestHandler RequestHandlerFunc // stop the node neatly and clean up after itself - stop *stopOnce.Stopper + grp *stop.Group } // NewNode returns an initialized Node's pointer. @@ -73,7 +73,7 @@ func NewNode(id bits.Bitmap) *Node { txLock: &sync.RWMutex{}, transactions: make(map[messageID]*transaction), - stop: stopOnce.New(), + grp: stop.New(), tokens: &tokenManager{}, } } @@ -86,7 +86,7 @@ func (n *Node) Connect(conn UDPConn) error { go func() { // stop tokens and close the connection when we're shutting down - <-n.stop.Ch() + <-n.grp.Ch() n.tokens.Stop() n.connClosed = true err := n.conn.Close() @@ -97,9 +97,9 @@ func (n *Node) Connect(conn UDPConn) error { packets := make(chan packet) - n.stop.Add(1) + n.grp.Add(1) go func() { - defer n.stop.Done() + defer n.grp.Done() buf := make([]byte, udpMaxMessageLength) @@ -121,15 +121,15 @@ func (n *Node) Connect(conn UDPConn) error { select { // needs select here because packet consumer can quit and the packets channel gets filled up and blocks case packets <- packet{data: data, raddr: raddr}: - case <-n.stop.Ch(): + case <-n.grp.Ch(): return } } }() - n.stop.Add(1) + n.grp.Add(1) go func() { - defer n.stop.Done() + defer n.grp.Done() var pkt packet @@ -137,7 +137,7 @@ func (n *Node) Connect(conn UDPConn) error { select { case pkt = <-packets: n.handlePacket(pkt) - case <-n.stop.Ch(): + case <-n.grp.Ch(): return } } @@ -156,7 +156,7 @@ func (n *Node) Connect(conn UDPConn) error { // Shutdown shuts down the node func (n *Node) Shutdown() { log.Debugf("[%s] node shutting down", n.id.HexShort()) - n.stop.StopAndWait() + n.grp.StopAndWait() log.Debugf("[%s] node stopped", n.id.HexShort()) } @@ -426,7 +426,7 @@ func (n *Node) SendAsync(contact Contact, req Request, options ...SendOptions) < case res := <-tx.res: ch <- &res return - case <-n.stop.Ch(): + case <-n.grp.Ch(): return case <-time.After(udpTimeout): } @@ -457,8 +457,8 @@ func (n *Node) startRoutingTableGrooming() { for { select { case <-refreshTicker.C: - RoutingTableRefresh(n, tRefresh, n.stop.Ch()) - case <-n.stop.Ch(): + RoutingTableRefresh(n, tRefresh, n.grp.Child()) + case <-n.grp.Ch(): return } } diff --git a/dht/node_finder.go b/dht/node_finder.go index a455374..7375bd9 100644 --- a/dht/node_finder.go +++ b/dht/node_finder.go @@ -5,9 +5,9 @@ import ( "sync" "time" - "github.com/lbryio/internal-apis/app/crypto" + "github.com/lbryio/lbry.go/crypto" "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/stop" "github.com/lbryio/reflector.go/dht/bits" "github.com/sirupsen/logrus" @@ -23,7 +23,7 @@ func init() { cfLog = logrus.StandardLogger() } -func NodeFinderUserLogger(l *logrus.Logger) { +func NodeFinderUseLogger(l *logrus.Logger) { cfLog = l } @@ -32,7 +32,7 @@ type contactFinder struct { target bits.Bitmap node *Node - stop *stopOnce.Stopper + grp *stop.Group findValueMutex *sync.Mutex findValueResult []Contact @@ -49,7 +49,7 @@ type contactFinder struct { notGettingCloser *atomic.Bool } -func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop stopOnce.Chan) ([]Contact, bool, error) { +func FindContacts(node *Node, target bits.Bitmap, findValue bool, parentGrp *stop.Group) ([]Contact, bool, error) { cf := &contactFinder{ node: node, target: target, @@ -58,27 +58,16 @@ func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop s activeContactsMutex: &sync.Mutex{}, shortlistMutex: &sync.Mutex{}, shortlistAdded: make(map[bits.Bitmap]bool), - stop: stopOnce.New(), + grp: stop.New(parentGrp), closestContactMutex: &sync.RWMutex{}, notGettingCloser: atomic.NewBool(false), } - if upstreamStop != nil { - go func() { - select { - case <-upstreamStop: - cf.Stop() - case <-cf.stop.Ch(): - } - }() - } - return cf.Find() } func (cf *contactFinder) Stop() { - cf.stop.Stop() - cf.stop.Wait() + cf.grp.StopAndWait() } func (cf *contactFinder) Find() ([]Contact, bool, error) { @@ -100,7 +89,7 @@ CycleLoop: select { case <-time.After(timeout): go cf.cycle(false) - case <-cf.stop.Ch(): + case <-cf.grp.Ch(): break CycleLoop } } @@ -176,7 +165,7 @@ func (cf *contactFinder) cycle(bigCycle bool) { } if cf.isSearchFinished() { - cf.stop.Stop() + cf.grp.Stop() return } @@ -225,7 +214,7 @@ func (cf *contactFinder) probe(cycleID string) *Contact { resCh := cf.node.SendAsync(c, req) select { case res = <-resCh: - case <-cf.stop.Ch(): + case <-cf.grp.Ch(): cf.debug("|%s| probe %s: canceled", cycleID, c.ID.HexShort()) return nil } @@ -240,7 +229,7 @@ func (cf *contactFinder) probe(cycleID string) *Contact { cf.findValueMutex.Lock() cf.findValueResult = res.Contacts cf.findValueMutex.Unlock() - cf.stop.Stop() + cf.grp.Stop() return nil } @@ -321,7 +310,7 @@ func (cf *contactFinder) isSearchFinished() bool { } select { - case <-cf.stop.Ch(): + case <-cf.grp.Ch(): return true default: } diff --git a/dht/routing_table.go b/dht/routing_table.go index a467f61..ffc4eaf 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -11,7 +11,7 @@ import ( "time" "github.com/lbryio/lbry.go/errors" - "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/stop" "github.com/lbryio/reflector.go/dht/bits" log "github.com/sirupsen/logrus" @@ -336,14 +336,14 @@ func (rt *routingTable) UnmarshalJSON(b []byte) error { } // RoutingTableRefresh refreshes any buckets that need to be refreshed -func RoutingTableRefresh(n *Node, refreshInterval time.Duration, upstreamStop stopOnce.Chan) { - done := stopOnce.New() +func RoutingTableRefresh(n *Node, refreshInterval time.Duration, parentGrp *stop.Group) { + done := stop.New() for _, id := range n.rt.GetIDsForRefresh(refreshInterval) { done.Add(1) go func(id bits.Bitmap) { defer done.Done() - _, _, err := FindContacts(n, id, false, upstreamStop) + _, _, err := FindContacts(n, id, false, parentGrp) if err != nil { log.Error("error finding contact during routing table refresh - ", err) } diff --git a/dht/token_cache.go b/dht/token_cache.go index b10c514..da50da6 100644 --- a/dht/token_cache.go +++ b/dht/token_cache.go @@ -6,7 +6,7 @@ import ( "github.com/lbryio/reflector.go/dht/bits" - "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/stop" ) // TODO: this should be moved out of dht and into node, and it should be completely hidden inside node. dht should not need to know about tokens @@ -32,7 +32,7 @@ func newTokenCache(node *Node, expiration time.Duration) *tokenCache { return tc } -func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stopOnce.Chan) string { +func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stop.Chan) string { tc.lock.RLock() token, exists := tc.tokens[c.String()] tc.lock.RUnlock() diff --git a/dht/token_manager.go b/dht/token_manager.go index 718cad0..68b4169 100644 --- a/dht/token_manager.go +++ b/dht/token_manager.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/lbry.go/stop" "github.com/lbryio/reflector.go/dht/bits" ) @@ -17,14 +17,14 @@ type tokenManager struct { secret []byte prevSecret []byte lock *sync.RWMutex - stop *stopOnce.Stopper + stop *stop.Group } func (tm *tokenManager) Start(interval time.Duration) { tm.secret = make([]byte, 64) tm.prevSecret = make([]byte, 64) tm.lock = &sync.RWMutex{} - tm.stop = stopOnce.New() + tm.stop = stop.New() tm.rotateSecret()