partial switch to new stopgroup. need to refactor to take advantage of child cancelation
This commit is contained in:
parent
66ca77b690
commit
4e78c08818
7 changed files with 62 additions and 66 deletions
|
@ -64,7 +64,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error {
|
|||
select {
|
||||
case <-t.C:
|
||||
b.check()
|
||||
case <-b.stop.Ch():
|
||||
case <-b.grp.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -129,8 +129,8 @@ func (b *BootstrapNode) get(limit int) []Contact {
|
|||
// ping pings a node. if the node responds, it is added to the list. otherwise, it is removed
|
||||
func (b *BootstrapNode) ping(c Contact) {
|
||||
log.Debugf("[%s] bootstrap: pinging %s", b.id.HexShort(), c.ID.HexShort())
|
||||
b.stop.Add(1)
|
||||
defer b.stop.Done()
|
||||
b.grp.Add(1)
|
||||
defer b.grp.Done()
|
||||
|
||||
resCh := b.SendAsync(c, Request{Method: pingMethod})
|
||||
|
||||
|
@ -138,7 +138,7 @@ func (b *BootstrapNode) ping(c Contact) {
|
|||
|
||||
select {
|
||||
case res = <-resCh:
|
||||
case <-b.stop.Ch():
|
||||
case <-b.grp.Ch():
|
||||
return
|
||||
}
|
||||
|
||||
|
|
39
dht/dht.go
39
dht/dht.go
|
@ -12,13 +12,20 @@ import (
|
|||
peerproto "github.com/lbryio/reflector.go/peer"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/stopOnce"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
var log *logrus.Logger
|
||||
|
||||
func UseLogger(l *logrus.Logger) {
|
||||
log = l
|
||||
}
|
||||
|
||||
func init() {
|
||||
log = logrus.StandardLogger()
|
||||
//log.SetFormatter(&log.TextFormatter{ForceColors: true})
|
||||
//log.SetLevel(log.DebugLevel)
|
||||
}
|
||||
|
@ -87,8 +94,8 @@ type DHT struct {
|
|||
contact Contact
|
||||
// node
|
||||
node *Node
|
||||
// stopper to shut down DHT
|
||||
stop *stopOnce.Stopper
|
||||
// stopGroup to shut down DHT
|
||||
grp *stop.Group
|
||||
// channel is closed when DHT joins network
|
||||
joined chan struct{}
|
||||
// lock for announced list
|
||||
|
@ -107,7 +114,7 @@ func New(config *Config) *DHT {
|
|||
|
||||
d := &DHT{
|
||||
conf: config,
|
||||
stop: stopOnce.New(),
|
||||
grp: stop.New(),
|
||||
joined: make(chan struct{}),
|
||||
lock: &sync.RWMutex{},
|
||||
announced: make(map[bits.Bitmap]bool),
|
||||
|
@ -177,7 +184,7 @@ func (dht *DHT) join() {
|
|||
}
|
||||
|
||||
// now call iterativeFind on yourself
|
||||
_, _, err := FindContacts(dht.node, dht.node.id, false, dht.stop.Ch())
|
||||
_, _, err := FindContacts(dht.node, dht.node.id, false, dht.grp.Child())
|
||||
if err != nil {
|
||||
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
|
||||
}
|
||||
|
@ -197,7 +204,7 @@ func (dht *DHT) WaitUntilJoined() {
|
|||
// Shutdown shuts down the dht
|
||||
func (dht *DHT) Shutdown() {
|
||||
log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort())
|
||||
dht.stop.StopAndWait()
|
||||
dht.grp.StopAndWait()
|
||||
dht.node.Shutdown()
|
||||
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
|
||||
}
|
||||
|
@ -221,7 +228,7 @@ func (dht *DHT) Ping(addr string) error {
|
|||
|
||||
// Get returns the list of nodes that have the blob for the given hash
|
||||
func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) {
|
||||
contacts, found, err := FindContacts(dht.node, hash, true, dht.stop.Ch())
|
||||
contacts, found, err := FindContacts(dht.node, hash, true, dht.grp.Child())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -242,9 +249,9 @@ func (dht *DHT) Add(hash bits.Bitmap) {
|
|||
return
|
||||
}
|
||||
|
||||
dht.stop.Add(1)
|
||||
dht.grp.Add(1)
|
||||
go func() {
|
||||
defer dht.stop.Done()
|
||||
defer dht.grp.Done()
|
||||
err := dht.announce(hash)
|
||||
if err != nil {
|
||||
log.Error(errors.Prefix("error announcing bitmap", err))
|
||||
|
@ -254,7 +261,7 @@ func (dht *DHT) Add(hash bits.Bitmap) {
|
|||
|
||||
// Announce announces to the DHT that this node has the blob for the given hash
|
||||
func (dht *DHT) announce(hash bits.Bitmap) error {
|
||||
contacts, _, err := FindContacts(dht.node, hash, false, dht.stop.Ch())
|
||||
contacts, _, err := FindContacts(dht.node, hash, false, dht.grp.Child())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -290,14 +297,14 @@ func (dht *DHT) startReannouncer() {
|
|||
tick := time.NewTicker(tReannounce)
|
||||
for {
|
||||
select {
|
||||
case <-dht.stop.Ch():
|
||||
case <-dht.grp.Ch():
|
||||
return
|
||||
case <-tick.C:
|
||||
dht.lock.RLock()
|
||||
for h := range dht.announced {
|
||||
dht.stop.Add(1)
|
||||
dht.grp.Add(1)
|
||||
go func(bm bits.Bitmap) {
|
||||
defer dht.stop.Done()
|
||||
defer dht.grp.Done()
|
||||
err := dht.announce(bm)
|
||||
if err != nil {
|
||||
log.Error("error re-announcing bitmap - ", err)
|
||||
|
@ -316,7 +323,7 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
|
|||
return
|
||||
}
|
||||
|
||||
token := dht.tokenCache.Get(c, hash, dht.stop.Ch())
|
||||
token := dht.tokenCache.Get(c, hash, dht.grp.Ch())
|
||||
|
||||
resCh := dht.node.SendAsync(c, Request{
|
||||
Method: storeMethod,
|
||||
|
@ -333,7 +340,7 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
|
|||
go func() {
|
||||
select {
|
||||
case <-resCh:
|
||||
case <-dht.stop.Ch():
|
||||
case <-dht.grp.Ch():
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
28
dht/node.go
28
dht/node.go
|
@ -8,7 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/lbryio/errors.go"
|
||||
"github.com/lbryio/lbry.go/stopOnce"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
"github.com/lbryio/lbry.go/util"
|
||||
"github.com/lbryio/reflector.go/dht/bits"
|
||||
|
||||
|
@ -60,7 +60,7 @@ type Node struct {
|
|||
requestHandler RequestHandlerFunc
|
||||
|
||||
// stop the node neatly and clean up after itself
|
||||
stop *stopOnce.Stopper
|
||||
grp *stop.Group
|
||||
}
|
||||
|
||||
// NewNode returns an initialized Node's pointer.
|
||||
|
@ -73,7 +73,7 @@ func NewNode(id bits.Bitmap) *Node {
|
|||
txLock: &sync.RWMutex{},
|
||||
transactions: make(map[messageID]*transaction),
|
||||
|
||||
stop: stopOnce.New(),
|
||||
grp: stop.New(),
|
||||
tokens: &tokenManager{},
|
||||
}
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
|
||||
go func() {
|
||||
// stop tokens and close the connection when we're shutting down
|
||||
<-n.stop.Ch()
|
||||
<-n.grp.Ch()
|
||||
n.tokens.Stop()
|
||||
n.connClosed = true
|
||||
err := n.conn.Close()
|
||||
|
@ -97,9 +97,9 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
|
||||
packets := make(chan packet)
|
||||
|
||||
n.stop.Add(1)
|
||||
n.grp.Add(1)
|
||||
go func() {
|
||||
defer n.stop.Done()
|
||||
defer n.grp.Done()
|
||||
|
||||
buf := make([]byte, udpMaxMessageLength)
|
||||
|
||||
|
@ -121,15 +121,15 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
|
||||
select { // needs select here because packet consumer can quit and the packets channel gets filled up and blocks
|
||||
case packets <- packet{data: data, raddr: raddr}:
|
||||
case <-n.stop.Ch():
|
||||
case <-n.grp.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
n.stop.Add(1)
|
||||
n.grp.Add(1)
|
||||
go func() {
|
||||
defer n.stop.Done()
|
||||
defer n.grp.Done()
|
||||
|
||||
var pkt packet
|
||||
|
||||
|
@ -137,7 +137,7 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
select {
|
||||
case pkt = <-packets:
|
||||
n.handlePacket(pkt)
|
||||
case <-n.stop.Ch():
|
||||
case <-n.grp.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
// Shutdown shuts down the node
|
||||
func (n *Node) Shutdown() {
|
||||
log.Debugf("[%s] node shutting down", n.id.HexShort())
|
||||
n.stop.StopAndWait()
|
||||
n.grp.StopAndWait()
|
||||
log.Debugf("[%s] node stopped", n.id.HexShort())
|
||||
}
|
||||
|
||||
|
@ -426,7 +426,7 @@ func (n *Node) SendAsync(contact Contact, req Request, options ...SendOptions) <
|
|||
case res := <-tx.res:
|
||||
ch <- &res
|
||||
return
|
||||
case <-n.stop.Ch():
|
||||
case <-n.grp.Ch():
|
||||
return
|
||||
case <-time.After(udpTimeout):
|
||||
}
|
||||
|
@ -457,8 +457,8 @@ func (n *Node) startRoutingTableGrooming() {
|
|||
for {
|
||||
select {
|
||||
case <-refreshTicker.C:
|
||||
RoutingTableRefresh(n, tRefresh, n.stop.Ch())
|
||||
case <-n.stop.Ch():
|
||||
RoutingTableRefresh(n, tRefresh, n.grp.Child())
|
||||
case <-n.grp.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/internal-apis/app/crypto"
|
||||
"github.com/lbryio/lbry.go/crypto"
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/stopOnce"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
"github.com/lbryio/reflector.go/dht/bits"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -23,7 +23,7 @@ func init() {
|
|||
cfLog = logrus.StandardLogger()
|
||||
}
|
||||
|
||||
func NodeFinderUserLogger(l *logrus.Logger) {
|
||||
func NodeFinderUseLogger(l *logrus.Logger) {
|
||||
cfLog = l
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ type contactFinder struct {
|
|||
target bits.Bitmap
|
||||
node *Node
|
||||
|
||||
stop *stopOnce.Stopper
|
||||
grp *stop.Group
|
||||
|
||||
findValueMutex *sync.Mutex
|
||||
findValueResult []Contact
|
||||
|
@ -49,7 +49,7 @@ type contactFinder struct {
|
|||
notGettingCloser *atomic.Bool
|
||||
}
|
||||
|
||||
func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop stopOnce.Chan) ([]Contact, bool, error) {
|
||||
func FindContacts(node *Node, target bits.Bitmap, findValue bool, parentGrp *stop.Group) ([]Contact, bool, error) {
|
||||
cf := &contactFinder{
|
||||
node: node,
|
||||
target: target,
|
||||
|
@ -58,27 +58,16 @@ func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop s
|
|||
activeContactsMutex: &sync.Mutex{},
|
||||
shortlistMutex: &sync.Mutex{},
|
||||
shortlistAdded: make(map[bits.Bitmap]bool),
|
||||
stop: stopOnce.New(),
|
||||
grp: stop.New(parentGrp),
|
||||
closestContactMutex: &sync.RWMutex{},
|
||||
notGettingCloser: atomic.NewBool(false),
|
||||
}
|
||||
|
||||
if upstreamStop != nil {
|
||||
go func() {
|
||||
select {
|
||||
case <-upstreamStop:
|
||||
cf.Stop()
|
||||
case <-cf.stop.Ch():
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return cf.Find()
|
||||
}
|
||||
|
||||
func (cf *contactFinder) Stop() {
|
||||
cf.stop.Stop()
|
||||
cf.stop.Wait()
|
||||
cf.grp.StopAndWait()
|
||||
}
|
||||
|
||||
func (cf *contactFinder) Find() ([]Contact, bool, error) {
|
||||
|
@ -100,7 +89,7 @@ CycleLoop:
|
|||
select {
|
||||
case <-time.After(timeout):
|
||||
go cf.cycle(false)
|
||||
case <-cf.stop.Ch():
|
||||
case <-cf.grp.Ch():
|
||||
break CycleLoop
|
||||
}
|
||||
}
|
||||
|
@ -176,7 +165,7 @@ func (cf *contactFinder) cycle(bigCycle bool) {
|
|||
}
|
||||
|
||||
if cf.isSearchFinished() {
|
||||
cf.stop.Stop()
|
||||
cf.grp.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -225,7 +214,7 @@ func (cf *contactFinder) probe(cycleID string) *Contact {
|
|||
resCh := cf.node.SendAsync(c, req)
|
||||
select {
|
||||
case res = <-resCh:
|
||||
case <-cf.stop.Ch():
|
||||
case <-cf.grp.Ch():
|
||||
cf.debug("|%s| probe %s: canceled", cycleID, c.ID.HexShort())
|
||||
return nil
|
||||
}
|
||||
|
@ -240,7 +229,7 @@ func (cf *contactFinder) probe(cycleID string) *Contact {
|
|||
cf.findValueMutex.Lock()
|
||||
cf.findValueResult = res.Contacts
|
||||
cf.findValueMutex.Unlock()
|
||||
cf.stop.Stop()
|
||||
cf.grp.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -321,7 +310,7 @@ func (cf *contactFinder) isSearchFinished() bool {
|
|||
}
|
||||
|
||||
select {
|
||||
case <-cf.stop.Ch():
|
||||
case <-cf.grp.Ch():
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/errors"
|
||||
"github.com/lbryio/lbry.go/stopOnce"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
"github.com/lbryio/reflector.go/dht/bits"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -336,14 +336,14 @@ func (rt *routingTable) UnmarshalJSON(b []byte) error {
|
|||
}
|
||||
|
||||
// RoutingTableRefresh refreshes any buckets that need to be refreshed
|
||||
func RoutingTableRefresh(n *Node, refreshInterval time.Duration, upstreamStop stopOnce.Chan) {
|
||||
done := stopOnce.New()
|
||||
func RoutingTableRefresh(n *Node, refreshInterval time.Duration, parentGrp *stop.Group) {
|
||||
done := stop.New()
|
||||
|
||||
for _, id := range n.rt.GetIDsForRefresh(refreshInterval) {
|
||||
done.Add(1)
|
||||
go func(id bits.Bitmap) {
|
||||
defer done.Done()
|
||||
_, _, err := FindContacts(n, id, false, upstreamStop)
|
||||
_, _, err := FindContacts(n, id, false, parentGrp)
|
||||
if err != nil {
|
||||
log.Error("error finding contact during routing table refresh - ", err)
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
|
||||
"github.com/lbryio/reflector.go/dht/bits"
|
||||
|
||||
"github.com/lbryio/lbry.go/stopOnce"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
)
|
||||
|
||||
// TODO: this should be moved out of dht and into node, and it should be completely hidden inside node. dht should not need to know about tokens
|
||||
|
@ -32,7 +32,7 @@ func newTokenCache(node *Node, expiration time.Duration) *tokenCache {
|
|||
return tc
|
||||
}
|
||||
|
||||
func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stopOnce.Chan) string {
|
||||
func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stop.Chan) string {
|
||||
tc.lock.RLock()
|
||||
token, exists := tc.tokens[c.String()]
|
||||
tc.lock.RUnlock()
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/stopOnce"
|
||||
"github.com/lbryio/lbry.go/stop"
|
||||
"github.com/lbryio/reflector.go/dht/bits"
|
||||
)
|
||||
|
||||
|
@ -17,14 +17,14 @@ type tokenManager struct {
|
|||
secret []byte
|
||||
prevSecret []byte
|
||||
lock *sync.RWMutex
|
||||
stop *stopOnce.Stopper
|
||||
stop *stop.Group
|
||||
}
|
||||
|
||||
func (tm *tokenManager) Start(interval time.Duration) {
|
||||
tm.secret = make([]byte, 64)
|
||||
tm.prevSecret = make([]byte, 64)
|
||||
tm.lock = &sync.RWMutex{}
|
||||
tm.stop = stopOnce.New()
|
||||
tm.stop = stop.New()
|
||||
|
||||
tm.rotateSecret()
|
||||
|
||||
|
|
Loading…
Reference in a new issue