From 38eaa17a9b75dc890d18ac9949be06934017ff44 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 26 Jul 2018 21:30:22 -0400 Subject: [PATCH] announce still needs tests, but i tested a lot by hand and its good --- dht/config.go | 10 ++-- dht/dht_announce.go | 102 +++++++++++++++++++++++++++++++-------- dht/dht_announce_test.go | 29 ----------- 3 files changed, 85 insertions(+), 56 deletions(-) delete mode 100644 dht/dht_announce_test.go diff --git a/dht/config.go b/dht/config.go index 294a5bc..3e4a49c 100644 --- a/dht/config.go +++ b/dht/config.go @@ -12,9 +12,8 @@ const ( Network = "udp4" DefaultPort = 4444 - DefaultAnnounceRate = 10 - DefaultAnnounceBurst = 1 - DefaultReannounceTime = 50 * time.Minute + DefaultAnnounceRate = 10 // send at most this many announces per second + DefaultReannounceTime = 50 * time.Minute // should be a bit less than hash expiration time // TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config // TODO: alternatively, have a global Config for constants. at least that way tests can modify the values @@ -55,8 +54,8 @@ type Config struct { ReannounceTime time.Duration // send at most this many announces per second AnnounceRate int - // allow bursts of up to this many times the announce rate - AnnounceBurst int + // channel that will receive notifications about announcements + AnnounceNotificationCh chan announceNotification } // NewStandardConfig returns a Config pointer with default values. @@ -71,6 +70,5 @@ func NewStandardConfig() *Config { PeerProtocolPort: peerproto.DefaultPort, ReannounceTime: DefaultReannounceTime, AnnounceRate: DefaultAnnounceRate, - AnnounceBurst: DefaultAnnounceBurst, } } diff --git a/dht/dht_announce.go b/dht/dht_announce.go index 456b3d1..067071d 100644 --- a/dht/dht_announce.go +++ b/dht/dht_announce.go @@ -3,6 +3,7 @@ package dht import ( "container/ring" "context" + "math" "sync" "time" @@ -16,6 +17,17 @@ type queueEdit struct { add bool } +const ( + announceStarted = "started" + announceFinishd = "finished" +) + +type announceNotification struct { + hash bits.Bitmap + action string + err error +} + // Add adds the hash to the list of hashes this node is announcing func (dht *DHT) Add(hash bits.Bitmap) { dht.announceAddRemove <- queueEdit{hash: hash, add: true} @@ -32,68 +44,116 @@ func (dht *DHT) runAnnouncer() { lastAnnounce time.Time } - queue := ring.New(0) + var queue *ring.Ring hashes := make(map[bits.Bitmap]*ring.Ring) - limiter := rate.NewLimiter(rate.Limit(dht.conf.AnnounceRate), dht.conf.AnnounceRate*dht.conf.AnnounceBurst) var announceNextHash <-chan time.Time - timer := time.NewTimer(0) - closedCh := make(chan time.Time) - close(closedCh) + timer := time.NewTimer(math.MaxInt64) + timer.Stop() + + limitCh := make(chan time.Time) + dht.grp.Add(1) + go func() { + defer dht.grp.Done() + limiter := rate.NewLimiter(rate.Limit(dht.conf.AnnounceRate), dht.conf.AnnounceRate) + for { + limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow? so when grp is closed, wait returns + select { + case limitCh <- time.Now(): + case <-dht.grp.Ch(): + return + } + } + }() + + maintenance := time.NewTicker(1 * time.Minute) + + // TODO: work to space hash announces out so they aren't bunched up around the reannounce time. track time since last announce. if its been more than the ideal time (reannounce time / numhashes), start announcing hashes early for { select { case <-dht.grp.Ch(): - return + + case <-maintenance.C: + maxAnnounce := dht.conf.AnnounceRate * int(dht.conf.ReannounceTime.Seconds()) + if len(hashes) > maxAnnounce { + // TODO: send this to slack + log.Warnf("DHT has %d hashes, but can only announce %d hashes in the %s reannounce window. Raise the announce rate or spawn more nodes.", + len(hashes), maxAnnounce, dht.conf.ReannounceTime.String()) + } case change := <-dht.announceAddRemove: if change.add { + if _, exists := hashes[change.hash]; exists { + continue + } + r := ring.New(1) r.Value = hashAndTime{hash: change.hash} - queue.Prev().Link(r) + if queue != nil { + queue.Prev().Link(r) + } queue = r hashes[change.hash] = r - announceNextHash = closedCh // don't wait to announce next hash + announceNextHash = limitCh // announce next hash ASAP } else { - if r, exists := hashes[change.hash]; exists { - delete(hashes, change.hash) - if len(hashes) == 0 { - queue = ring.New(0) - announceNextHash = make(chan time.Time) // no hashes to announce, wait indefinitely - } else { - if r == queue { - queue = queue.Next() // don't lose our pointer - } - r.Prev().Link(r.Next()) + r, exists := hashes[change.hash] + if !exists { + continue + } + + delete(hashes, change.hash) + + if len(hashes) == 0 { + queue = ring.New(0) + announceNextHash = nil // no hashes to announce, wait indefinitely + } else { + if r == queue { + queue = queue.Next() // don't lose our pointer } + r.Prev().Link(r.Next()) } } case <-announceNextHash: - limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow dht.grp.Add(1) ht := queue.Value.(hashAndTime) if !ht.lastAnnounce.IsZero() { nextAnnounce := ht.lastAnnounce.Add(dht.conf.ReannounceTime) - if nextAnnounce.Before(time.Now()) { + if nextAnnounce.After(time.Now()) { timer.Reset(time.Until(nextAnnounce)) announceNextHash = timer.C // wait until next hash should be announced continue } } + if dht.conf.AnnounceNotificationCh != nil { + dht.conf.AnnounceNotificationCh <- announceNotification{ + hash: ht.hash, + action: announceStarted, + } + } + go func(hash bits.Bitmap) { defer dht.grp.Done() err := dht.announce(hash) if err != nil { log.Error(errors.Prefix("announce", err)) } + + if dht.conf.AnnounceNotificationCh != nil { + dht.conf.AnnounceNotificationCh <- announceNotification{ + hash: ht.hash, + action: announceFinishd, + err: err, + } + } }(ht.hash) queue.Value = hashAndTime{hash: ht.hash, lastAnnounce: time.Now()} queue = queue.Next() - announceNextHash = closedCh // don't wait to announce next hash + announceNextHash = limitCh // announce next hash ASAP } } } diff --git a/dht/dht_announce_test.go b/dht/dht_announce_test.go deleted file mode 100644 index a238af3..0000000 --- a/dht/dht_announce_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package dht - -import ( - "testing" -) - -func TestDHT_Announce(t *testing.T) { - t.Skip("NEED SOME TESTS FOR ANNOUNCING") - - // tests - // - max rate - // - new announces get ahead of old announces - // - announcer blocks correctly (when nothing to announce, when next announce time is in the future) and unblocks correctly (when waiting to announce next and a new hash is added) - // thought: what happens when you're waiting to announce a hash and it gets removed? probably nothing, since later hashes will be announced later. but still good to test this - // - - //bs, dhts := TestingCreateNetwork(t, 2, true, true) - //defer func() { - // for _, d := range dhts { - // go d.Shutdown() - // } - // bs.Shutdown() - // time.Sleep(1 * time.Second) - //}() - // - //announcer := dhts[0] - //receiver := dhts[1] - -}