announce still needs tests, but i tested a lot by hand and its good

This commit is contained in:
Alex Grintsvayg 2018-07-26 21:30:22 -04:00
parent 5378fcbb94
commit 38eaa17a9b
3 changed files with 85 additions and 56 deletions

View file

@ -12,9 +12,8 @@ const (
Network = "udp4"
DefaultPort = 4444
DefaultAnnounceRate = 10
DefaultAnnounceBurst = 1
DefaultReannounceTime = 50 * time.Minute
DefaultAnnounceRate = 10 // send at most this many announces per second
DefaultReannounceTime = 50 * time.Minute // should be a bit less than hash expiration time
// TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config
// TODO: alternatively, have a global Config for constants. at least that way tests can modify the values
@ -55,8 +54,8 @@ type Config struct {
ReannounceTime time.Duration
// send at most this many announces per second
AnnounceRate int
// allow bursts of up to this many times the announce rate
AnnounceBurst int
// channel that will receive notifications about announcements
AnnounceNotificationCh chan announceNotification
}
// NewStandardConfig returns a Config pointer with default values.
@ -71,6 +70,5 @@ func NewStandardConfig() *Config {
PeerProtocolPort: peerproto.DefaultPort,
ReannounceTime: DefaultReannounceTime,
AnnounceRate: DefaultAnnounceRate,
AnnounceBurst: DefaultAnnounceBurst,
}
}

View file

@ -3,6 +3,7 @@ package dht
import (
"container/ring"
"context"
"math"
"sync"
"time"
@ -16,6 +17,17 @@ type queueEdit struct {
add bool
}
const (
announceStarted = "started"
announceFinishd = "finished"
)
type announceNotification struct {
hash bits.Bitmap
action string
err error
}
// Add adds the hash to the list of hashes this node is announcing
func (dht *DHT) Add(hash bits.Bitmap) {
dht.announceAddRemove <- queueEdit{hash: hash, add: true}
@ -32,68 +44,116 @@ func (dht *DHT) runAnnouncer() {
lastAnnounce time.Time
}
queue := ring.New(0)
var queue *ring.Ring
hashes := make(map[bits.Bitmap]*ring.Ring)
limiter := rate.NewLimiter(rate.Limit(dht.conf.AnnounceRate), dht.conf.AnnounceRate*dht.conf.AnnounceBurst)
var announceNextHash <-chan time.Time
timer := time.NewTimer(0)
closedCh := make(chan time.Time)
close(closedCh)
timer := time.NewTimer(math.MaxInt64)
timer.Stop()
limitCh := make(chan time.Time)
dht.grp.Add(1)
go func() {
defer dht.grp.Done()
limiter := rate.NewLimiter(rate.Limit(dht.conf.AnnounceRate), dht.conf.AnnounceRate)
for {
limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow? so when grp is closed, wait returns
select {
case limitCh <- time.Now():
case <-dht.grp.Ch():
return
}
}
}()
maintenance := time.NewTicker(1 * time.Minute)
// TODO: work to space hash announces out so they aren't bunched up around the reannounce time. track time since last announce. if its been more than the ideal time (reannounce time / numhashes), start announcing hashes early
for {
select {
case <-dht.grp.Ch():
return
case <-maintenance.C:
maxAnnounce := dht.conf.AnnounceRate * int(dht.conf.ReannounceTime.Seconds())
if len(hashes) > maxAnnounce {
// TODO: send this to slack
log.Warnf("DHT has %d hashes, but can only announce %d hashes in the %s reannounce window. Raise the announce rate or spawn more nodes.",
len(hashes), maxAnnounce, dht.conf.ReannounceTime.String())
}
case change := <-dht.announceAddRemove:
if change.add {
if _, exists := hashes[change.hash]; exists {
continue
}
r := ring.New(1)
r.Value = hashAndTime{hash: change.hash}
queue.Prev().Link(r)
if queue != nil {
queue.Prev().Link(r)
}
queue = r
hashes[change.hash] = r
announceNextHash = closedCh // don't wait to announce next hash
announceNextHash = limitCh // announce next hash ASAP
} else {
if r, exists := hashes[change.hash]; exists {
delete(hashes, change.hash)
if len(hashes) == 0 {
queue = ring.New(0)
announceNextHash = make(chan time.Time) // no hashes to announce, wait indefinitely
} else {
if r == queue {
queue = queue.Next() // don't lose our pointer
}
r.Prev().Link(r.Next())
r, exists := hashes[change.hash]
if !exists {
continue
}
delete(hashes, change.hash)
if len(hashes) == 0 {
queue = ring.New(0)
announceNextHash = nil // no hashes to announce, wait indefinitely
} else {
if r == queue {
queue = queue.Next() // don't lose our pointer
}
r.Prev().Link(r.Next())
}
}
case <-announceNextHash:
limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow
dht.grp.Add(1)
ht := queue.Value.(hashAndTime)
if !ht.lastAnnounce.IsZero() {
nextAnnounce := ht.lastAnnounce.Add(dht.conf.ReannounceTime)
if nextAnnounce.Before(time.Now()) {
if nextAnnounce.After(time.Now()) {
timer.Reset(time.Until(nextAnnounce))
announceNextHash = timer.C // wait until next hash should be announced
continue
}
}
if dht.conf.AnnounceNotificationCh != nil {
dht.conf.AnnounceNotificationCh <- announceNotification{
hash: ht.hash,
action: announceStarted,
}
}
go func(hash bits.Bitmap) {
defer dht.grp.Done()
err := dht.announce(hash)
if err != nil {
log.Error(errors.Prefix("announce", err))
}
if dht.conf.AnnounceNotificationCh != nil {
dht.conf.AnnounceNotificationCh <- announceNotification{
hash: ht.hash,
action: announceFinishd,
err: err,
}
}
}(ht.hash)
queue.Value = hashAndTime{hash: ht.hash, lastAnnounce: time.Now()}
queue = queue.Next()
announceNextHash = closedCh // don't wait to announce next hash
announceNextHash = limitCh // announce next hash ASAP
}
}
}

View file

@ -1,29 +0,0 @@
package dht
import (
"testing"
)
func TestDHT_Announce(t *testing.T) {
t.Skip("NEED SOME TESTS FOR ANNOUNCING")
// tests
// - max rate
// - new announces get ahead of old announces
// - announcer blocks correctly (when nothing to announce, when next announce time is in the future) and unblocks correctly (when waiting to announce next and a new hash is added)
// thought: what happens when you're waiting to announce a hash and it gets removed? probably nothing, since later hashes will be announced later. but still good to test this
//
//bs, dhts := TestingCreateNetwork(t, 2, true, true)
//defer func() {
// for _, d := range dhts {
// go d.Shutdown()
// }
// bs.Shutdown()
// time.Sleep(1 * time.Second)
//}()
//
//announcer := dhts[0]
//receiver := dhts[1]
}