implemented stopper pattern

-made defer adjustments inline and deleted the separate function.
-adjusted method in upload to take the only parameter it requires.
-Implemented stopper param for reflector server
-Aligned Cluster New to NewCluster
-Adjusted DHT to use StopAndWait
-Removed blocking waitgroup add
-Unified all components under prism.
-Moved defer done outside of functions.
-renamed NewCluster to New
-fixed travis errors.
This commit is contained in:
Mark Beamer Jr 2018-06-06 23:48:07 -04:00 committed by Alex Grintsvayg
parent 8100010220
commit 470e3721d0

View file

@ -135,6 +135,11 @@ func (dht *DHT) join() {
// now call iterativeFind on yourself // now call iterativeFind on yourself
nf := newContactFinder(dht.node, dht.node.id, false) nf := newContactFinder(dht.node, dht.node.id, false)
// stop if dht is stopped
go func(finder *contactFinder) {
<-dht.stop.Ch()
nf.Cancel()
}(nf)
_, err := nf.Find() _, err := nf.Find()
if err != nil { if err != nil {
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
@ -151,14 +156,20 @@ func (dht *DHT) Start() error {
return errors.Err(err) return errors.Err(err)
} }
conn := listener.(*net.UDPConn) conn := listener.(*net.UDPConn)
err = dht.node.Connect(conn) err = dht.node.Connect(conn)
if err != nil { if err != nil {
return err return err
} }
dht.stop.Add(1)
go func() {
defer dht.stop.Done()
dht.join() dht.join()
}()
dht.stop.Add(1)
go func() {
defer dht.stop.Done()
dht.startReannouncer() dht.startReannouncer()
}()
log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count())
return nil return nil
@ -175,8 +186,7 @@ func (dht *DHT) WaitUntilJoined() {
// Shutdown shuts down the dht // Shutdown shuts down the dht
func (dht *DHT) Shutdown() { func (dht *DHT) Shutdown() {
log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort()) log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort())
dht.stop.Stop() dht.stop.StopAndWait()
dht.stop.Wait()
dht.node.Shutdown() dht.node.Shutdown()
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
} }
@ -245,8 +255,6 @@ func (dht *DHT) Announce(hash Bitmap) error {
} }
func (dht *DHT) startReannouncer() { func (dht *DHT) startReannouncer() {
dht.stop.Add(1)
defer dht.stop.Done()
tick := time.NewTicker(tReannounce) tick := time.NewTicker(tReannounce)
for { for {
select { select {
@ -255,7 +263,9 @@ func (dht *DHT) startReannouncer() {
case <-tick.C: case <-tick.C:
dht.lock.RLock() dht.lock.RLock()
for h := range dht.announced { for h := range dht.announced {
dht.stop.Add(1)
go func(bm Bitmap) { go func(bm Bitmap) {
defer dht.stop.Done()
if err := dht.Announce(bm); err != nil { if err := dht.Announce(bm); err != nil {
log.Error("error re-announcing bitmap - ", err) log.Error("error re-announcing bitmap - ", err)
} }