partial switch to new stopgroup. need to refactor to take advantage of child cancelation

This commit is contained in:
Alex Grintsvayg 2018-06-25 16:49:40 -04:00
parent 531933761a
commit 34cc92678b
14 changed files with 118 additions and 118 deletions

6
Gopkg.lock generated
View file

@ -155,10 +155,10 @@
"errors", "errors",
"null", "null",
"querytools", "querytools",
"stopOnce", "stop",
"util" "util"
] ]
revision = "f0762e9c57d41be10cb83edc4e9b7a7ce0891519" revision = "f14ff7ba4f2c420108302f651ce85df1e45477ad"
[[projects]] [[projects]]
branch = "master" branch = "master"
@ -268,6 +268,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "e24df8daf9e27895cd79820c74d57f7cc554a84cd1d268b3cd9e58acf03c0a55" inputs-digest = "4dc432f7df1c1d59d5ee47417ab4f0fe187d26eb9e1f53fecdb6396b3bd1e6e0"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View file

@ -8,7 +8,7 @@ import (
"github.com/lbryio/lbry.go/crypto" "github.com/lbryio/lbry.go/crypto"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -29,7 +29,7 @@ type Cluster struct {
s *serf.Serf s *serf.Serf
eventCh chan serf.Event eventCh chan serf.Event
stop *stopOnce.Stopper stop *stop.Group
} }
// New returns a new Cluster instance that is not connected. // New returns a new Cluster instance that is not connected.
@ -38,7 +38,7 @@ func New(port int, seedAddr string) *Cluster {
name: crypto.RandString(12), name: crypto.RandString(12),
port: port, port: port,
seedAddr: seedAddr, seedAddr: seedAddr,
stop: stopOnce.New(), stop: stop.New(),
} }
} }

View file

@ -6,9 +6,10 @@ import (
"os" "os"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/util"
"github.com/lbryio/reflector.go/dht" "github.com/lbryio/reflector.go/dht"
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -21,13 +22,14 @@ type Config struct {
DBConn string `json:"db_conn"` DBConn string `json:"db_conn"`
} }
var debug []string var verbose []string
const ( const (
debugNodeFinder = "nodefinder" verboseAll = "all"
verboseDHT = "dht"
verboseNodeFinder = "nodefinder"
) )
var verbose bool
var conf string var conf string
var globalConfig Config var globalConfig Config
@ -35,27 +37,30 @@ var rootCmd = &cobra.Command{
Use: "prism", Use: "prism",
Short: "Prism is a single entry point application with multiple sub modules which can be leveraged individually or together", Short: "Prism is a single entry point application with multiple sub modules which can be leveraged individually or together",
PersistentPreRun: func(cmd *cobra.Command, args []string) { PersistentPreRun: func(cmd *cobra.Command, args []string) {
if verbose { debugLogger := logrus.New()
log.SetLevel(log.DebugLevel) debugLogger.SetLevel(logrus.DebugLevel)
if util.InSlice(verboseAll, verbose) {
verbose = []string{verboseDHT, verboseNodeFinder}
} }
debugLogger := log.New() for _, debugType := range verbose {
debugLogger.SetLevel(log.DebugLevel)
for _, debugType := range debug {
switch debugType { switch debugType {
case debugNodeFinder: case verboseDHT:
dht.NodeFinderUserLogger(debugLogger) dht.UseLogger(debugLogger)
case verboseNodeFinder:
dht.NodeFinderUseLogger(debugLogger)
} }
} }
var err error var err error
if conf == "" { if conf == "" {
log.Errorln("--conf flag required") logrus.Errorln("--conf flag required")
os.Exit(1) os.Exit(1)
} else { } else {
globalConfig, err = loadConfig(conf) globalConfig, err = loadConfig(conf)
if err != nil { if err != nil {
log.Error(err) logrus.Error(err)
os.Exit(1) os.Exit(1)
} }
} }
@ -67,8 +72,7 @@ var rootCmd = &cobra.Command{
} }
func init() { func init() {
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose logging") rootCmd.PersistentFlags().StringSliceVarP(&verbose, "verbose", "v", []string{}, "Verbose logging for specific components")
rootCmd.PersistentFlags().StringSliceVar(&debug, "debug", []string{}, "Debug loggin for specific components")
rootCmd.PersistentFlags().StringVar(&conf, "conf", "config.json", "Path to config") rootCmd.PersistentFlags().StringVar(&conf, "conf", "config.json", "Path to config")
} }
@ -77,7 +81,7 @@ func init() {
func Execute() { func Execute() {
err := rootCmd.Execute() err := rootCmd.Execute()
if err != nil { if err != nil {
log.Errorln(err) logrus.Errorln(err)
os.Exit(1) os.Exit(1)
} }
} }

View file

@ -9,7 +9,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
@ -29,7 +29,7 @@ const (
type uploaderParams struct { type uploaderParams struct {
workerWG *sync.WaitGroup workerWG *sync.WaitGroup
counterWG *sync.WaitGroup counterWG *sync.WaitGroup
stopper *stopOnce.Stopper stopper *stop.Group
filenameChan chan string filenameChan chan string
countChan chan int countChan chan int
sdCount int sdCount int
@ -59,7 +59,7 @@ func uploadCmd(cmd *cobra.Command, args []string) {
counterWG: &sync.WaitGroup{}, counterWG: &sync.WaitGroup{},
filenameChan: make(chan string), filenameChan: make(chan string),
countChan: make(chan int), countChan: make(chan int),
stopper: stopOnce.New()} stopper: stop.New()}
setInterrupt(params.stopper) setInterrupt(params.stopper)
@ -125,7 +125,7 @@ func newBlobStore() *store.DBBackedS3Store {
return store.NewDBBackedS3Store(s3, db) return store.NewDBBackedS3Store(s3, db)
} }
func setInterrupt(stopper *stopOnce.Stopper) { func setInterrupt(stopper *stop.Group) {
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
go func() { go func() {

View file

@ -64,7 +64,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error {
select { select {
case <-t.C: case <-t.C:
b.check() b.check()
case <-b.stop.Ch(): case <-b.grp.Ch():
return return
} }
} }
@ -129,8 +129,8 @@ func (b *BootstrapNode) get(limit int) []Contact {
// ping pings a node. if the node responds, it is added to the list. otherwise, it is removed // ping pings a node. if the node responds, it is added to the list. otherwise, it is removed
func (b *BootstrapNode) ping(c Contact) { func (b *BootstrapNode) ping(c Contact) {
log.Debugf("[%s] bootstrap: pinging %s", b.id.HexShort(), c.ID.HexShort()) log.Debugf("[%s] bootstrap: pinging %s", b.id.HexShort(), c.ID.HexShort())
b.stop.Add(1) b.grp.Add(1)
defer b.stop.Done() defer b.grp.Done()
resCh := b.SendAsync(c, Request{Method: pingMethod}) resCh := b.SendAsync(c, Request{Method: pingMethod})
@ -138,7 +138,7 @@ func (b *BootstrapNode) ping(c Contact) {
select { select {
case res = <-resCh: case res = <-resCh:
case <-b.stop.Ch(): case <-b.grp.Ch():
return return
} }

View file

@ -12,13 +12,20 @@ import (
peerproto "github.com/lbryio/reflector.go/peer" peerproto "github.com/lbryio/reflector.go/peer"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cast" "github.com/spf13/cast"
) )
var log *logrus.Logger
func UseLogger(l *logrus.Logger) {
log = l
}
func init() { func init() {
log = logrus.StandardLogger()
//log.SetFormatter(&log.TextFormatter{ForceColors: true}) //log.SetFormatter(&log.TextFormatter{ForceColors: true})
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
} }
@ -87,8 +94,8 @@ type DHT struct {
contact Contact contact Contact
// node // node
node *Node node *Node
// stopper to shut down DHT // stopGroup to shut down DHT
stop *stopOnce.Stopper grp *stop.Group
// channel is closed when DHT joins network // channel is closed when DHT joins network
joined chan struct{} joined chan struct{}
// lock for announced list // lock for announced list
@ -107,7 +114,7 @@ func New(config *Config) *DHT {
d := &DHT{ d := &DHT{
conf: config, conf: config,
stop: stopOnce.New(), grp: stop.New(),
joined: make(chan struct{}), joined: make(chan struct{}),
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
announced: make(map[bits.Bitmap]bool), announced: make(map[bits.Bitmap]bool),
@ -177,7 +184,7 @@ func (dht *DHT) join() {
} }
// now call iterativeFind on yourself // now call iterativeFind on yourself
_, _, err := FindContacts(dht.node, dht.node.id, false, dht.stop.Ch()) _, _, err := FindContacts(dht.node, dht.node.id, false, dht.grp.Child())
if err != nil { if err != nil {
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
} }
@ -197,7 +204,7 @@ func (dht *DHT) WaitUntilJoined() {
// Shutdown shuts down the dht // Shutdown shuts down the dht
func (dht *DHT) Shutdown() { func (dht *DHT) Shutdown() {
log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort()) log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort())
dht.stop.StopAndWait() dht.grp.StopAndWait()
dht.node.Shutdown() dht.node.Shutdown()
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
} }
@ -221,7 +228,7 @@ func (dht *DHT) Ping(addr string) error {
// Get returns the list of nodes that have the blob for the given hash // Get returns the list of nodes that have the blob for the given hash
func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) { func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) {
contacts, found, err := FindContacts(dht.node, hash, true, dht.stop.Ch()) contacts, found, err := FindContacts(dht.node, hash, true, dht.grp.Child())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -242,9 +249,9 @@ func (dht *DHT) Add(hash bits.Bitmap) {
return return
} }
dht.stop.Add(1) dht.grp.Add(1)
go func() { go func() {
defer dht.stop.Done() defer dht.grp.Done()
err := dht.announce(hash) err := dht.announce(hash)
if err != nil { if err != nil {
log.Error(errors.Prefix("error announcing bitmap", err)) log.Error(errors.Prefix("error announcing bitmap", err))
@ -254,7 +261,7 @@ func (dht *DHT) Add(hash bits.Bitmap) {
// Announce announces to the DHT that this node has the blob for the given hash // Announce announces to the DHT that this node has the blob for the given hash
func (dht *DHT) announce(hash bits.Bitmap) error { func (dht *DHT) announce(hash bits.Bitmap) error {
contacts, _, err := FindContacts(dht.node, hash, false, dht.stop.Ch()) contacts, _, err := FindContacts(dht.node, hash, false, dht.grp.Child())
if err != nil { if err != nil {
return err return err
} }
@ -290,14 +297,14 @@ func (dht *DHT) startReannouncer() {
tick := time.NewTicker(tReannounce) tick := time.NewTicker(tReannounce)
for { for {
select { select {
case <-dht.stop.Ch(): case <-dht.grp.Ch():
return return
case <-tick.C: case <-tick.C:
dht.lock.RLock() dht.lock.RLock()
for h := range dht.announced { for h := range dht.announced {
dht.stop.Add(1) dht.grp.Add(1)
go func(bm bits.Bitmap) { go func(bm bits.Bitmap) {
defer dht.stop.Done() defer dht.grp.Done()
err := dht.announce(bm) err := dht.announce(bm)
if err != nil { if err != nil {
log.Error("error re-announcing bitmap - ", err) log.Error("error re-announcing bitmap - ", err)
@ -316,7 +323,7 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
return return
} }
token := dht.tokenCache.Get(c, hash, dht.stop.Ch()) token := dht.tokenCache.Get(c, hash, dht.grp.Ch())
resCh := dht.node.SendAsync(c, Request{ resCh := dht.node.SendAsync(c, Request{
Method: storeMethod, Method: storeMethod,
@ -333,7 +340,7 @@ func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
go func() { go func() {
select { select {
case <-resCh: case <-resCh:
case <-dht.stop.Ch(): case <-dht.grp.Ch():
} }
}() }()
} }

View file

@ -8,7 +8,7 @@ import (
"time" "time"
"github.com/lbryio/errors.go" "github.com/lbryio/errors.go"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/lbry.go/util" "github.com/lbryio/lbry.go/util"
"github.com/lbryio/reflector.go/dht/bits" "github.com/lbryio/reflector.go/dht/bits"
@ -60,7 +60,7 @@ type Node struct {
requestHandler RequestHandlerFunc requestHandler RequestHandlerFunc
// stop the node neatly and clean up after itself // stop the node neatly and clean up after itself
stop *stopOnce.Stopper grp *stop.Group
} }
// NewNode returns an initialized Node's pointer. // NewNode returns an initialized Node's pointer.
@ -73,7 +73,7 @@ func NewNode(id bits.Bitmap) *Node {
txLock: &sync.RWMutex{}, txLock: &sync.RWMutex{},
transactions: make(map[messageID]*transaction), transactions: make(map[messageID]*transaction),
stop: stopOnce.New(), grp: stop.New(),
tokens: &tokenManager{}, tokens: &tokenManager{},
} }
} }
@ -86,7 +86,7 @@ func (n *Node) Connect(conn UDPConn) error {
go func() { go func() {
// stop tokens and close the connection when we're shutting down // stop tokens and close the connection when we're shutting down
<-n.stop.Ch() <-n.grp.Ch()
n.tokens.Stop() n.tokens.Stop()
n.connClosed = true n.connClosed = true
err := n.conn.Close() err := n.conn.Close()
@ -97,9 +97,9 @@ func (n *Node) Connect(conn UDPConn) error {
packets := make(chan packet) packets := make(chan packet)
n.stop.Add(1) n.grp.Add(1)
go func() { go func() {
defer n.stop.Done() defer n.grp.Done()
buf := make([]byte, udpMaxMessageLength) buf := make([]byte, udpMaxMessageLength)
@ -121,15 +121,15 @@ func (n *Node) Connect(conn UDPConn) error {
select { // needs select here because packet consumer can quit and the packets channel gets filled up and blocks select { // needs select here because packet consumer can quit and the packets channel gets filled up and blocks
case packets <- packet{data: data, raddr: raddr}: case packets <- packet{data: data, raddr: raddr}:
case <-n.stop.Ch(): case <-n.grp.Ch():
return return
} }
} }
}() }()
n.stop.Add(1) n.grp.Add(1)
go func() { go func() {
defer n.stop.Done() defer n.grp.Done()
var pkt packet var pkt packet
@ -137,7 +137,7 @@ func (n *Node) Connect(conn UDPConn) error {
select { select {
case pkt = <-packets: case pkt = <-packets:
n.handlePacket(pkt) n.handlePacket(pkt)
case <-n.stop.Ch(): case <-n.grp.Ch():
return return
} }
} }
@ -156,7 +156,7 @@ func (n *Node) Connect(conn UDPConn) error {
// Shutdown shuts down the node // Shutdown shuts down the node
func (n *Node) Shutdown() { func (n *Node) Shutdown() {
log.Debugf("[%s] node shutting down", n.id.HexShort()) log.Debugf("[%s] node shutting down", n.id.HexShort())
n.stop.StopAndWait() n.grp.StopAndWait()
log.Debugf("[%s] node stopped", n.id.HexShort()) log.Debugf("[%s] node stopped", n.id.HexShort())
} }
@ -426,7 +426,7 @@ func (n *Node) SendAsync(contact Contact, req Request, options ...SendOptions) <
case res := <-tx.res: case res := <-tx.res:
ch <- &res ch <- &res
return return
case <-n.stop.Ch(): case <-n.grp.Ch():
return return
case <-time.After(udpTimeout): case <-time.After(udpTimeout):
} }
@ -457,8 +457,8 @@ func (n *Node) startRoutingTableGrooming() {
for { for {
select { select {
case <-refreshTicker.C: case <-refreshTicker.C:
RoutingTableRefresh(n, tRefresh, n.stop.Ch()) RoutingTableRefresh(n, tRefresh, n.grp.Child())
case <-n.stop.Ch(): case <-n.grp.Ch():
return return
} }
} }

View file

@ -5,9 +5,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/lbryio/internal-apis/app/crypto" "github.com/lbryio/lbry.go/crypto"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/dht/bits" "github.com/lbryio/reflector.go/dht/bits"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -23,7 +23,7 @@ func init() {
cfLog = logrus.StandardLogger() cfLog = logrus.StandardLogger()
} }
func NodeFinderUserLogger(l *logrus.Logger) { func NodeFinderUseLogger(l *logrus.Logger) {
cfLog = l cfLog = l
} }
@ -32,7 +32,7 @@ type contactFinder struct {
target bits.Bitmap target bits.Bitmap
node *Node node *Node
stop *stopOnce.Stopper grp *stop.Group
findValueMutex *sync.Mutex findValueMutex *sync.Mutex
findValueResult []Contact findValueResult []Contact
@ -49,7 +49,7 @@ type contactFinder struct {
notGettingCloser *atomic.Bool notGettingCloser *atomic.Bool
} }
func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop stopOnce.Chan) ([]Contact, bool, error) { func FindContacts(node *Node, target bits.Bitmap, findValue bool, parentGrp *stop.Group) ([]Contact, bool, error) {
cf := &contactFinder{ cf := &contactFinder{
node: node, node: node,
target: target, target: target,
@ -58,27 +58,16 @@ func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop s
activeContactsMutex: &sync.Mutex{}, activeContactsMutex: &sync.Mutex{},
shortlistMutex: &sync.Mutex{}, shortlistMutex: &sync.Mutex{},
shortlistAdded: make(map[bits.Bitmap]bool), shortlistAdded: make(map[bits.Bitmap]bool),
stop: stopOnce.New(), grp: stop.New(parentGrp),
closestContactMutex: &sync.RWMutex{}, closestContactMutex: &sync.RWMutex{},
notGettingCloser: atomic.NewBool(false), notGettingCloser: atomic.NewBool(false),
} }
if upstreamStop != nil {
go func() {
select {
case <-upstreamStop:
cf.Stop()
case <-cf.stop.Ch():
}
}()
}
return cf.Find() return cf.Find()
} }
func (cf *contactFinder) Stop() { func (cf *contactFinder) Stop() {
cf.stop.Stop() cf.grp.StopAndWait()
cf.stop.Wait()
} }
func (cf *contactFinder) Find() ([]Contact, bool, error) { func (cf *contactFinder) Find() ([]Contact, bool, error) {
@ -100,7 +89,7 @@ CycleLoop:
select { select {
case <-time.After(timeout): case <-time.After(timeout):
go cf.cycle(false) go cf.cycle(false)
case <-cf.stop.Ch(): case <-cf.grp.Ch():
break CycleLoop break CycleLoop
} }
} }
@ -176,7 +165,7 @@ func (cf *contactFinder) cycle(bigCycle bool) {
} }
if cf.isSearchFinished() { if cf.isSearchFinished() {
cf.stop.Stop() cf.grp.Stop()
return return
} }
@ -225,7 +214,7 @@ func (cf *contactFinder) probe(cycleID string) *Contact {
resCh := cf.node.SendAsync(c, req) resCh := cf.node.SendAsync(c, req)
select { select {
case res = <-resCh: case res = <-resCh:
case <-cf.stop.Ch(): case <-cf.grp.Ch():
cf.debug("|%s| probe %s: canceled", cycleID, c.ID.HexShort()) cf.debug("|%s| probe %s: canceled", cycleID, c.ID.HexShort())
return nil return nil
} }
@ -240,7 +229,7 @@ func (cf *contactFinder) probe(cycleID string) *Contact {
cf.findValueMutex.Lock() cf.findValueMutex.Lock()
cf.findValueResult = res.Contacts cf.findValueResult = res.Contacts
cf.findValueMutex.Unlock() cf.findValueMutex.Unlock()
cf.stop.Stop() cf.grp.Stop()
return nil return nil
} }
@ -321,7 +310,7 @@ func (cf *contactFinder) isSearchFinished() bool {
} }
select { select {
case <-cf.stop.Ch(): case <-cf.grp.Ch():
return true return true
default: default:
} }

View file

@ -11,7 +11,7 @@ import (
"time" "time"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/dht/bits" "github.com/lbryio/reflector.go/dht/bits"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -336,14 +336,14 @@ func (rt *routingTable) UnmarshalJSON(b []byte) error {
} }
// RoutingTableRefresh refreshes any buckets that need to be refreshed // RoutingTableRefresh refreshes any buckets that need to be refreshed
func RoutingTableRefresh(n *Node, refreshInterval time.Duration, upstreamStop stopOnce.Chan) { func RoutingTableRefresh(n *Node, refreshInterval time.Duration, parentGrp *stop.Group) {
done := stopOnce.New() done := stop.New()
for _, id := range n.rt.GetIDsForRefresh(refreshInterval) { for _, id := range n.rt.GetIDsForRefresh(refreshInterval) {
done.Add(1) done.Add(1)
go func(id bits.Bitmap) { go func(id bits.Bitmap) {
defer done.Done() defer done.Done()
_, _, err := FindContacts(n, id, false, upstreamStop) _, _, err := FindContacts(n, id, false, parentGrp)
if err != nil { if err != nil {
log.Error("error finding contact during routing table refresh - ", err) log.Error("error finding contact during routing table refresh - ", err)
} }

View file

@ -6,7 +6,7 @@ import (
"github.com/lbryio/reflector.go/dht/bits" "github.com/lbryio/reflector.go/dht/bits"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
) )
// TODO: this should be moved out of dht and into node, and it should be completely hidden inside node. dht should not need to know about tokens // TODO: this should be moved out of dht and into node, and it should be completely hidden inside node. dht should not need to know about tokens
@ -32,7 +32,7 @@ func newTokenCache(node *Node, expiration time.Duration) *tokenCache {
return tc return tc
} }
func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stopOnce.Chan) string { func (tc *tokenCache) Get(c Contact, hash bits.Bitmap, cancelCh stop.Chan) string {
tc.lock.RLock() tc.lock.RLock()
token, exists := tc.tokens[c.String()] token, exists := tc.tokens[c.String()]
tc.lock.RUnlock() tc.lock.RUnlock()

View file

@ -9,7 +9,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/dht/bits" "github.com/lbryio/reflector.go/dht/bits"
) )
@ -17,14 +17,14 @@ type tokenManager struct {
secret []byte secret []byte
prevSecret []byte prevSecret []byte
lock *sync.RWMutex lock *sync.RWMutex
stop *stopOnce.Stopper stop *stop.Group
} }
func (tm *tokenManager) Start(interval time.Duration) { func (tm *tokenManager) Start(interval time.Duration) {
tm.secret = make([]byte, 64) tm.secret = make([]byte, 64)
tm.prevSecret = make([]byte, 64) tm.prevSecret = make([]byte, 64)
tm.lock = &sync.RWMutex{} tm.lock = &sync.RWMutex{}
tm.stop = stopOnce.New() tm.stop = stop.New()
tm.rotateSecret() tm.rotateSecret()

View file

@ -11,7 +11,7 @@ import (
"time" "time"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
@ -30,21 +30,21 @@ type Server struct {
store store.BlobStore store store.BlobStore
closed bool closed bool
stop *stopOnce.Stopper grp *stop.Group
} }
// NewServer returns an initialized Server pointer. // NewServer returns an initialized Server pointer.
func NewServer(store store.BlobStore) *Server { func NewServer(store store.BlobStore) *Server {
return &Server{ return &Server{
store: store, store: store,
stop: stopOnce.New(), grp: stop.New(),
} }
} }
// Shutdown gracefully shuts down the peer server. // Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
log.Debug("shutting down peer server...") log.Debug("shutting down peer server...")
s.stop.StopAndWait() s.grp.StopAndWait()
log.Debug("peer server stopped") log.Debug("peer server stopped")
} }
@ -57,17 +57,17 @@ func (s *Server) Start(address string) error {
} }
go s.listenForShutdown(l) go s.listenForShutdown(l)
s.stop.Add(1) s.grp.Add(1)
go func() { go func() {
s.listenAndServe(l) s.listenAndServe(l)
s.stop.Done() s.grp.Done()
}() }()
return nil return nil
} }
func (s *Server) listenForShutdown(listener net.Listener) { func (s *Server) listenForShutdown(listener net.Listener) {
<-s.stop.Ch() <-s.grp.Ch()
s.closed = true s.closed = true
err := listener.Close() err := listener.Close()
if err != nil { if err != nil {
@ -84,10 +84,10 @@ func (s *Server) listenAndServe(listener net.Listener) {
} }
log.Error(err) log.Error(err)
} else { } else {
s.stop.Add(1) s.grp.Add(1)
go func() { go func() {
s.handleConnection(conn) s.handleConnection(conn)
s.stop.Done() s.grp.Done()
}() }()
} }
} }

View file

@ -14,7 +14,7 @@ import (
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -53,7 +53,7 @@ type Prism struct {
reflector *reflector.Server reflector *reflector.Server
cluster *cluster.Cluster cluster *cluster.Cluster
stop *stopOnce.Stopper grp *stop.Group
} }
// New returns an initialized Prism instance // New returns an initialized Prism instance
@ -81,14 +81,14 @@ func New(conf *Config) *Prism {
peer: peer.NewServer(conf.Blobs), peer: peer.NewServer(conf.Blobs),
reflector: reflector.NewServer(conf.Blobs), reflector: reflector.NewServer(conf.Blobs),
stop: stopOnce.New(), grp: stop.New(),
} }
c.OnMembershipChange = func(n, total int) { c.OnMembershipChange = func(n, total int) {
p.stop.Add(1) p.grp.Add(1)
go func() { go func() {
p.AnnounceRange(n, total) p.AnnounceRange(n, total)
p.stop.Done() p.grp.Done()
}() }()
} }
@ -132,7 +132,7 @@ func (p *Prism) Start() error {
// Shutdown gracefully shuts down the different prism components before exiting. // Shutdown gracefully shuts down the different prism components before exiting.
func (p *Prism) Shutdown() { func (p *Prism) Shutdown() {
p.stop.StopAndWait() p.grp.StopAndWait()
p.cluster.Shutdown() p.cluster.Shutdown()
p.dht.Shutdown() p.dht.Shutdown()
p.reflector.Shutdown() p.reflector.Shutdown()
@ -174,7 +174,7 @@ func (p *Prism) AnnounceRange(n, total int) {
go func() { go func() {
defer wg.Done() defer wg.Done()
select { select {
case <-p.stop.Ch(): case <-p.grp.Ch():
return return
case err, more := <-errCh: case err, more := <-errCh:
if more && err != nil { if more && err != nil {
@ -188,7 +188,7 @@ func (p *Prism) AnnounceRange(n, total int) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
case <-p.stop.Ch(): case <-p.grp.Ch():
cancel() cancel()
return return
case hash, more := <-hashCh: case hash, more := <-hashCh:

View file

@ -8,7 +8,7 @@ import (
"strconv" "strconv"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -19,21 +19,21 @@ type Server struct {
store store.BlobStore store store.BlobStore
closed bool closed bool
stop *stopOnce.Stopper grp *stop.Group
} }
// NewServer returns an initialized reflector server pointer. // NewServer returns an initialized reflector server pointer.
func NewServer(store store.BlobStore) *Server { func NewServer(store store.BlobStore) *Server {
return &Server{ return &Server{
store: store, store: store,
stop: stopOnce.New(), grp: stop.New(),
} }
} }
// Shutdown shuts down the reflector server gracefully. // Shutdown shuts down the reflector server gracefully.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
log.Debug("shutting down reflector server...") log.Debug("shutting down reflector server...")
s.stop.StopAndWait() s.grp.StopAndWait()
log.Debug("reflector server stopped") log.Debug("reflector server stopped")
} }
@ -48,17 +48,17 @@ func (s *Server) Start(address string) error {
go s.listenForShutdown(l) go s.listenForShutdown(l)
s.stop.Add(1) s.grp.Add(1)
go func() { go func() {
s.listenAndServe(l) s.listenAndServe(l)
s.stop.Done() s.grp.Done()
}() }()
return nil return nil
} }
func (s *Server) listenForShutdown(listener net.Listener) { func (s *Server) listenForShutdown(listener net.Listener) {
<-s.stop.Ch() <-s.grp.Ch()
s.closed = true s.closed = true
err := listener.Close() err := listener.Close()
if err != nil { if err != nil {
@ -75,10 +75,10 @@ func (s *Server) listenAndServe(listener net.Listener) {
} }
log.Error(err) log.Error(err)
} else { } else {
s.stop.Add(1) s.grp.Add(1)
go func() { go func() {
s.handleConn(conn) s.handleConn(conn)
s.stop.Done() s.grp.Done()
}() }()
} }
} }