hash announcer / rate limiter

This commit is contained in:
Alex Grintsvayg 2018-07-26 16:05:27 -04:00
parent 965bed9587
commit 5378fcbb94
6 changed files with 275 additions and 182 deletions

76
dht/config.go Normal file
View file

@ -0,0 +1,76 @@
package dht
import (
"strconv"
"time"
"github.com/lbryio/reflector.go/dht/bits"
peerproto "github.com/lbryio/reflector.go/peer"
)
const (
Network = "udp4"
DefaultPort = 4444
DefaultAnnounceRate = 10
DefaultAnnounceBurst = 1
DefaultReannounceTime = 50 * time.Minute
// TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config
// TODO: alternatively, have a global Config for constants. at least that way tests can modify the values
alpha = 3 // this is the constant alpha in the spec
bucketSize = 8 // this is the constant k in the spec
nodeIDLength = bits.NumBytes // bytes. this is the constant B in the spec
messageIDLength = 20 // bytes.
udpRetry = 1
udpTimeout = 5 * time.Second
udpMaxMessageLength = 4096 // bytes. I think our longest message is ~676 bytes, so I rounded up to 1024
// scratch that. a findValue could return more than K results if a lot of nodes are storing that value, so we need more buffer
maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table
//tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed
//tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database
//tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us
compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port
tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated
)
// Config represents the configure of dht.
type Config struct {
// this node's address. format is `ip:port`
Address string
// the seed nodes through which we can join in dht network
SeedNodes []string
// the hex-encoded node id for this node. if string is empty, a random id will be generated
NodeID string
// print the state of the dht every X time
PrintState time.Duration
// the port that clients can use to download blobs using the LBRY peer protocol
PeerProtocolPort int
// the time after which the original publisher must reannounce a key/value pair
ReannounceTime time.Duration
// send at most this many announces per second
AnnounceRate int
// allow bursts of up to this many times the announce rate
AnnounceBurst int
}
// NewStandardConfig returns a Config pointer with default values.
func NewStandardConfig() *Config {
return &Config{
Address: "0.0.0.0:" + strconv.Itoa(DefaultPort),
SeedNodes: []string{
"lbrynet1.lbry.io:4444",
"lbrynet2.lbry.io:4444",
"lbrynet3.lbry.io:4444",
},
PeerProtocolPort: peerproto.DefaultPort,
ReannounceTime: DefaultReannounceTime,
AnnounceRate: DefaultAnnounceRate,
AnnounceBurst: DefaultAnnounceBurst,
}
}

View file

@ -3,13 +3,10 @@ package dht
import (
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/lbryio/reflector.go/dht/bits"
peerproto "github.com/lbryio/reflector.go/peer"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stop"
@ -30,61 +27,6 @@ func init() {
//log.SetLevel(log.DebugLevel)
}
const (
Network = "udp4"
DefaultPort = 4444
// TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config
// TODO: alternatively, have a global Config for constants. at least that way tests can modify the values
alpha = 3 // this is the constant alpha in the spec
bucketSize = 8 // this is the constant k in the spec
nodeIDLength = bits.NumBytes // bytes. this is the constant B in the spec
messageIDLength = 20 // bytes.
udpRetry = 1
udpTimeout = 5 * time.Second
udpMaxMessageLength = 4096 // bytes. I think our longest message is ~676 bytes, so I rounded up to 1024
// scratch that. a findValue could return more than K results if a lot of nodes are storing that value, so we need more buffer
maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table
//tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair
tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed
//tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database
//tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us
compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port
tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated
)
// Config represents the configure of dht.
type Config struct {
// this node's address. format is `ip:port`
Address string
// the seed nodes through which we can join in dht network
SeedNodes []string
// the hex-encoded node id for this node. if string is empty, a random id will be generated
NodeID string
// print the state of the dht every X time
PrintState time.Duration
// the port that clients can use to download blobs using the LBRY peer protocol
PeerProtocolPort int
}
// NewStandardConfig returns a Config pointer with default values.
func NewStandardConfig() *Config {
return &Config{
Address: "0.0.0.0:" + strconv.Itoa(DefaultPort),
SeedNodes: []string{
"lbrynet1.lbry.io:4444",
"lbrynet2.lbry.io:4444",
"lbrynet3.lbry.io:4444",
},
PeerProtocolPort: peerproto.DefaultPort,
}
}
// DHT represents a DHT node.
type DHT struct {
// config
@ -97,12 +39,10 @@ type DHT struct {
grp *stop.Group
// channel is closed when DHT joins network
joined chan struct{}
// lock for announced list
lock *sync.RWMutex
// list of bitmaps that need to be reannounced periodically
announced map[bits.Bitmap]bool
// cache for store tokens
tokenCache *tokenCache
// hashes that need to be put into the announce queue or removed from the queue
announceAddRemove chan queueEdit
}
// New returns a DHT pointer. If config is nil, then config will be set to the default config.
@ -112,11 +52,10 @@ func New(config *Config) *DHT {
}
d := &DHT{
conf: config,
grp: stop.New(),
joined: make(chan struct{}),
lock: &sync.RWMutex{},
announced: make(map[bits.Bitmap]bool),
conf: config,
grp: stop.New(),
joined: make(chan struct{}),
announceAddRemove: make(chan queueEdit),
}
return d
}
@ -155,7 +94,11 @@ func (dht *DHT) Start() error {
log.Infof("[%s] DHT ready on %s (%d nodes found during join)",
dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count())
go dht.startReannouncer()
dht.grp.Add(1)
go func() {
dht.runAnnouncer()
dht.grp.Done()
}()
return nil
}
@ -238,113 +181,6 @@ func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) {
return nil, nil
}
// Add adds the hash to the list of hashes this node has
func (dht *DHT) Add(hash bits.Bitmap) {
// TODO: calling Add several times quickly could cause it to be announced multiple times before dht.announced[hash] is set to true
dht.lock.RLock()
exists := dht.announced[hash]
dht.lock.RUnlock()
if exists {
return
}
dht.grp.Add(1)
go func() {
defer dht.grp.Done()
err := dht.announce(hash)
if err != nil {
log.Error(errors.Prefix("error announcing bitmap", err))
}
}()
}
// Announce announces to the DHT that this node has the blob for the given hash
func (dht *DHT) announce(hash bits.Bitmap) error {
contacts, _, err := FindContacts(dht.node, hash, false, dht.grp.Child())
if err != nil {
return err
}
// if we found less than K contacts, or current node is closer than farthest contact
if len(contacts) < bucketSize {
// append self to contacts, and self-store
contacts = append(contacts, dht.contact)
} else if hash.Closer(dht.node.id, contacts[bucketSize-1].ID) {
// pop last contact, and self-store instead
contacts[bucketSize-1] = dht.contact
}
wg := &sync.WaitGroup{}
for _, c := range contacts {
wg.Add(1)
go func(c Contact) {
dht.storeOnNode(hash, c)
wg.Done()
}(c)
}
wg.Wait()
dht.lock.Lock()
dht.announced[hash] = true
dht.lock.Unlock()
return nil
}
func (dht *DHT) startReannouncer() {
tick := time.NewTicker(tReannounce)
for {
select {
case <-dht.grp.Ch():
return
case <-tick.C:
dht.lock.RLock()
for h := range dht.announced {
dht.grp.Add(1)
go func(bm bits.Bitmap) {
defer dht.grp.Done()
err := dht.announce(bm)
if err != nil {
log.Error("error re-announcing bitmap - ", err)
}
}(h)
}
dht.lock.RUnlock()
}
}
}
func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
// self-store
if dht.contact.ID == c.ID {
c.PeerPort = dht.conf.PeerProtocolPort
dht.node.Store(hash, c)
return
}
token := dht.tokenCache.Get(c, hash, dht.grp.Ch())
resCh := dht.node.SendAsync(c, Request{
Method: storeMethod,
StoreArgs: &storeArgs{
BlobHash: hash,
Value: storeArgsValue{
Token: token,
LbryID: dht.contact.ID,
Port: dht.conf.PeerProtocolPort,
},
},
})
go func() {
select {
case <-resCh:
case <-dht.grp.Ch():
}
}()
}
// PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well
// as current bucket information.
func (dht *DHT) PrintState() {

148
dht/dht_announce.go Normal file
View file

@ -0,0 +1,148 @@
package dht
import (
"container/ring"
"context"
"sync"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/reflector.go/dht/bits"
"golang.org/x/time/rate"
)
type queueEdit struct {
hash bits.Bitmap
add bool
}
// Add adds the hash to the list of hashes this node is announcing
func (dht *DHT) Add(hash bits.Bitmap) {
dht.announceAddRemove <- queueEdit{hash: hash, add: true}
}
// Remove removes the hash from the list of hashes this node is announcing
func (dht *DHT) Remove(hash bits.Bitmap) {
dht.announceAddRemove <- queueEdit{hash: hash, add: false}
}
func (dht *DHT) runAnnouncer() {
type hashAndTime struct {
hash bits.Bitmap
lastAnnounce time.Time
}
queue := ring.New(0)
hashes := make(map[bits.Bitmap]*ring.Ring)
limiter := rate.NewLimiter(rate.Limit(dht.conf.AnnounceRate), dht.conf.AnnounceRate*dht.conf.AnnounceBurst)
var announceNextHash <-chan time.Time
timer := time.NewTimer(0)
closedCh := make(chan time.Time)
close(closedCh)
for {
select {
case <-dht.grp.Ch():
return
case change := <-dht.announceAddRemove:
if change.add {
r := ring.New(1)
r.Value = hashAndTime{hash: change.hash}
queue.Prev().Link(r)
queue = r
hashes[change.hash] = r
announceNextHash = closedCh // don't wait to announce next hash
} else {
if r, exists := hashes[change.hash]; exists {
delete(hashes, change.hash)
if len(hashes) == 0 {
queue = ring.New(0)
announceNextHash = make(chan time.Time) // no hashes to announce, wait indefinitely
} else {
if r == queue {
queue = queue.Next() // don't lose our pointer
}
r.Prev().Link(r.Next())
}
}
}
case <-announceNextHash:
limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow
dht.grp.Add(1)
ht := queue.Value.(hashAndTime)
if !ht.lastAnnounce.IsZero() {
nextAnnounce := ht.lastAnnounce.Add(dht.conf.ReannounceTime)
if nextAnnounce.Before(time.Now()) {
timer.Reset(time.Until(nextAnnounce))
announceNextHash = timer.C // wait until next hash should be announced
continue
}
}
go func(hash bits.Bitmap) {
defer dht.grp.Done()
err := dht.announce(hash)
if err != nil {
log.Error(errors.Prefix("announce", err))
}
}(ht.hash)
queue.Value = hashAndTime{hash: ht.hash, lastAnnounce: time.Now()}
queue = queue.Next()
announceNextHash = closedCh // don't wait to announce next hash
}
}
}
// Announce announces to the DHT that this node has the blob for the given hash
func (dht *DHT) announce(hash bits.Bitmap) error {
contacts, _, err := FindContacts(dht.node, hash, false, dht.grp.Child())
if err != nil {
return err
}
// self-store if we found less than K contacts, or we're closer than the farthest contact
if len(contacts) < bucketSize {
contacts = append(contacts, dht.contact)
} else if hash.Closer(dht.node.id, contacts[bucketSize-1].ID) {
contacts[bucketSize-1] = dht.contact
}
wg := &sync.WaitGroup{}
for _, c := range contacts {
wg.Add(1)
go func(c Contact) {
dht.store(hash, c)
wg.Done()
}(c)
}
wg.Wait()
return nil
}
func (dht *DHT) store(hash bits.Bitmap, c Contact) {
if dht.contact.ID == c.ID {
// self-store
c.PeerPort = dht.conf.PeerProtocolPort
dht.node.Store(hash, c)
return
}
dht.node.SendAsync(c, Request{
Method: storeMethod,
StoreArgs: &storeArgs{
BlobHash: hash,
Value: storeArgsValue{
Token: dht.tokenCache.Get(c, hash, dht.grp.Ch()),
LbryID: dht.contact.ID,
Port: dht.conf.PeerProtocolPort,
},
},
})
}

29
dht/dht_announce_test.go Normal file
View file

@ -0,0 +1,29 @@
package dht
import (
"testing"
)
func TestDHT_Announce(t *testing.T) {
t.Skip("NEED SOME TESTS FOR ANNOUNCING")
// tests
// - max rate
// - new announces get ahead of old announces
// - announcer blocks correctly (when nothing to announce, when next announce time is in the future) and unblocks correctly (when waiting to announce next and a new hash is added)
// thought: what happens when you're waiting to announce a hash and it gets removed? probably nothing, since later hashes will be announced later. but still good to test this
//
//bs, dhts := TestingCreateNetwork(t, 2, true, true)
//defer func() {
// for _, d := range dhts {
// go d.Shutdown()
// }
// bs.Shutdown()
// time.Sleep(1 * time.Second)
//}()
//
//announcer := dhts[0]
//receiver := dhts[1]
}

View file

@ -14,7 +14,7 @@ func TestNodeFinder_FindNodes(t *testing.T) {
t.Skip("skipping slow nodeFinder test")
}
bs, dhts := TestingCreateDHT(t, 3, true, false)
bs, dhts := TestingCreateNetwork(t, 3, true, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -63,7 +63,7 @@ func TestNodeFinder_FindNodes(t *testing.T) {
}
func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) {
_, dhts := TestingCreateDHT(t, 3, false, false)
_, dhts := TestingCreateNetwork(t, 3, false, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -81,7 +81,7 @@ func TestNodeFinder_FindValue(t *testing.T) {
t.Skip("skipping slow nodeFinder test")
}
bs, dhts := TestingCreateDHT(t, 3, true, false)
bs, dhts := TestingCreateNetwork(t, 3, true, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -117,7 +117,7 @@ func TestDHT_LargeDHT(t *testing.T) {
}
nodes := 100
bs, dhts := TestingCreateDHT(t, nodes, true, true)
bs, dhts := TestingCreateNetwork(t, nodes, true, true)
defer func() {
for _, d := range dhts {
go d.Shutdown()

View file

@ -14,8 +14,8 @@ import (
var testingDHTIP = "127.0.0.1"
var testingDHTFirstPort = 21000
// TestingCreateDHT initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options.
func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) {
// TestingCreateNetwork initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options.
func TestingCreateNetwork(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) {
var bootstrapNode *BootstrapNode
var seeds []string
@ -42,7 +42,11 @@ func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (*
dhts := make([]*DHT, numNodes)
for i := 0; i < numNodes; i++ {
dht := New(&Config{Address: testingDHTIP + ":" + strconv.Itoa(firstPort+i), NodeID: bits.Rand().Hex(), SeedNodes: seeds})
c := NewStandardConfig()
c.NodeID = bits.Rand().Hex()
c.Address = testingDHTIP + ":" + strconv.Itoa(firstPort+i)
c.SeedNodes = seeds
dht := New(c)
go func() {
err := dht.Start()