update stopper
This commit is contained in:
parent
1c31e54860
commit
6a0cab5f62
7 changed files with 35 additions and 35 deletions
|
@ -10,6 +10,8 @@ import (
|
|||
"github.com/lyoshenka/bencode"
|
||||
)
|
||||
|
||||
// TODO: http://roaringbitmap.org/
|
||||
|
||||
type Bitmap [nodeIDLength]byte
|
||||
|
||||
func (b Bitmap) RawString() string {
|
||||
|
|
|
@ -62,7 +62,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error {
|
|||
select {
|
||||
case <-t.C:
|
||||
b.check()
|
||||
case <-b.stop.Chan():
|
||||
case <-b.stop.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ func (b *BootstrapNode) ping(c Contact) {
|
|||
|
||||
select {
|
||||
case res = <-resCh:
|
||||
case <-b.stop.Chan():
|
||||
case <-b.stop.Ch():
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
|
22
dht/dht.go
22
dht/dht.go
|
@ -81,8 +81,6 @@ type DHT struct {
|
|||
node *Node
|
||||
// stopper to shut down DHT
|
||||
stop *stopOnce.Stopper
|
||||
// wait group for all the things that need to be stopped when DHT shuts down
|
||||
stopWG *sync.WaitGroup
|
||||
// channel is closed when DHT joins network
|
||||
joined chan struct{}
|
||||
// lock for announced list
|
||||
|
@ -107,7 +105,6 @@ func New(config *Config) (*DHT, error) {
|
|||
contact: contact,
|
||||
node: NewNode(contact.ID),
|
||||
stop: stopOnce.New(),
|
||||
stopWG: &sync.WaitGroup{},
|
||||
joined: make(chan struct{}),
|
||||
lock: &sync.RWMutex{},
|
||||
announced: make(map[Bitmap]bool),
|
||||
|
@ -143,6 +140,9 @@ func (dht *DHT) join() {
|
|||
if err != nil {
|
||||
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
|
||||
}
|
||||
|
||||
// TODO: after joining, refresh all the buckets all buckets further away than our closest neighbor
|
||||
// http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html#join
|
||||
}
|
||||
|
||||
// Start starts the dht
|
||||
|
@ -176,7 +176,7 @@ func (dht *DHT) WaitUntilJoined() {
|
|||
func (dht *DHT) Shutdown() {
|
||||
log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort())
|
||||
dht.stop.Stop()
|
||||
dht.stopWG.Wait()
|
||||
dht.stop.Wait()
|
||||
dht.node.Shutdown()
|
||||
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
|
||||
}
|
||||
|
@ -244,12 +244,12 @@ func (dht *DHT) Announce(hash Bitmap) error {
|
|||
}
|
||||
|
||||
func (dht *DHT) startReannouncer() {
|
||||
dht.stopWG.Add(1)
|
||||
defer dht.stopWG.Done()
|
||||
dht.stop.Add(1)
|
||||
defer dht.stop.Done()
|
||||
tick := time.NewTicker(tReannounce)
|
||||
for {
|
||||
select {
|
||||
case <-dht.stop.Chan():
|
||||
case <-dht.stop.Ch():
|
||||
return
|
||||
case <-tick.C:
|
||||
dht.lock.RLock()
|
||||
|
@ -262,8 +262,8 @@ func (dht *DHT) startReannouncer() {
|
|||
}
|
||||
|
||||
func (dht *DHT) storeOnNode(hash Bitmap, c Contact) {
|
||||
dht.stopWG.Add(1)
|
||||
defer dht.stopWG.Done()
|
||||
dht.stop.Add(1)
|
||||
defer dht.stop.Done()
|
||||
|
||||
// self-store
|
||||
if dht.contact.Equals(c) {
|
||||
|
@ -280,7 +280,7 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) {
|
|||
|
||||
select {
|
||||
case res = <-resCh:
|
||||
case <-dht.stop.Chan():
|
||||
case <-dht.stop.Ch():
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
@ -304,7 +304,7 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) {
|
|||
go func() {
|
||||
select {
|
||||
case <-resCh:
|
||||
case <-dht.stop.Chan():
|
||||
case <-dht.stop.Ch():
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -154,8 +154,8 @@ type storeArgsValue struct {
|
|||
type storeArgs struct {
|
||||
BlobHash Bitmap
|
||||
Value storeArgsValue
|
||||
NodeID Bitmap
|
||||
SelfStore bool // this is an int on the wire
|
||||
NodeID Bitmap // original publisher id? I think this is getting fixed in the new dht stuff
|
||||
SelfStore bool // this is an int on the wire
|
||||
}
|
||||
|
||||
func (s storeArgs) MarshalBencode() ([]byte, error) {
|
||||
|
|
12
dht/node.go
12
dht/node.go
|
@ -87,7 +87,7 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
// dht.PrintState()
|
||||
// select {
|
||||
// case <-t.C:
|
||||
// case <-dht.stop.Chan():
|
||||
// case <-dht.stop.Ch():
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
|
@ -106,7 +106,7 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-n.stop.Chan():
|
||||
case <-n.stop.Ch():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
|
||||
select { // needs select here because packet consumer can quit and the packets channel gets filled up and blocks
|
||||
case packets <- packet{data: data, raddr: raddr}:
|
||||
case <-n.stop.Chan():
|
||||
case <-n.stop.Ch():
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -143,7 +143,7 @@ func (n *Node) Connect(conn UDPConn) error {
|
|||
select {
|
||||
case pkt = <-packets:
|
||||
n.handlePacket(pkt)
|
||||
case <-n.stop.Chan():
|
||||
case <-n.stop.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -434,8 +434,8 @@ func (n *Node) startRoutingTableGrooming() {
|
|||
for {
|
||||
select {
|
||||
case <-refreshTicker.C:
|
||||
RoutingTableRefresh(n, tRefresh, n.stop.Chan())
|
||||
case <-n.stop.Chan():
|
||||
RoutingTableRefresh(n, tRefresh, n.stop.Ch())
|
||||
case <-n.stop.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,7 @@ type contactFinder struct {
|
|||
target Bitmap
|
||||
node *Node
|
||||
|
||||
done *stopOnce.Stopper
|
||||
doneWG *sync.WaitGroup
|
||||
stop *stopOnce.Stopper
|
||||
|
||||
findValueMutex *sync.Mutex
|
||||
findValueResult []Contact
|
||||
|
@ -50,15 +49,14 @@ func newContactFinder(node *Node, target Bitmap, findValue bool) *contactFinder
|
|||
activeContactsMutex: &sync.Mutex{},
|
||||
shortlistMutex: &sync.Mutex{},
|
||||
shortlistAdded: make(map[Bitmap]bool),
|
||||
done: stopOnce.New(),
|
||||
doneWG: &sync.WaitGroup{},
|
||||
stop: stopOnce.New(),
|
||||
outstandingRequestsMutex: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (cf *contactFinder) Cancel() {
|
||||
cf.done.Stop()
|
||||
cf.doneWG.Wait()
|
||||
cf.stop.Stop()
|
||||
cf.stop.Wait()
|
||||
}
|
||||
|
||||
func (cf *contactFinder) Find() (findNodeResponse, error) {
|
||||
|
@ -73,14 +71,14 @@ func (cf *contactFinder) Find() (findNodeResponse, error) {
|
|||
}
|
||||
|
||||
for i := 0; i < alpha; i++ {
|
||||
cf.doneWG.Add(1)
|
||||
cf.stop.Add(1)
|
||||
go func(i int) {
|
||||
defer cf.doneWG.Done()
|
||||
defer cf.stop.Done()
|
||||
cf.iterationWorker(i + 1)
|
||||
}(i)
|
||||
}
|
||||
|
||||
cf.doneWG.Wait()
|
||||
cf.stop.Wait()
|
||||
|
||||
// TODO: what to do if we have less than K active contacts, shortlist is empty, but we
|
||||
// TODO: have other contacts in our routing table whom we have not contacted. prolly contact them
|
||||
|
@ -131,7 +129,7 @@ func (cf *contactFinder) iterationWorker(num int) {
|
|||
resCh, cancel := cf.node.SendCancelable(contact, req)
|
||||
select {
|
||||
case res = <-resCh:
|
||||
case <-cf.done.Chan():
|
||||
case <-cf.stop.Ch():
|
||||
log.Debugf("[%s] worker %d: canceled", cf.node.id.HexShort(), num)
|
||||
cancel()
|
||||
return
|
||||
|
@ -145,7 +143,7 @@ func (cf *contactFinder) iterationWorker(num int) {
|
|||
cf.findValueMutex.Lock()
|
||||
cf.findValueResult = res.Contacts
|
||||
cf.findValueMutex.Unlock()
|
||||
cf.done.Stop()
|
||||
cf.stop.Stop()
|
||||
return
|
||||
} else {
|
||||
log.Debugf("[%s] worker %d: got contacts", cf.node.id.HexShort(), num)
|
||||
|
@ -158,7 +156,7 @@ func (cf *contactFinder) iterationWorker(num int) {
|
|||
|
||||
if cf.isSearchFinished() {
|
||||
log.Debugf("[%s] worker %d: search is finished", cf.node.id.HexShort(), num)
|
||||
cf.done.Stop()
|
||||
cf.stop.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -214,7 +212,7 @@ func (cf *contactFinder) isSearchFinished() bool {
|
|||
}
|
||||
|
||||
select {
|
||||
case <-cf.done.Chan():
|
||||
case <-cf.stop.Ch():
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ func (tm *tokenManager) Start(interval time.Duration) {
|
|||
select {
|
||||
case <-tick.C:
|
||||
tm.rotateSecret()
|
||||
case <-tm.done.Chan():
|
||||
case <-tm.done.Ch():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue