reannounce

This commit is contained in:
Alex Grintsvayg 2018-05-22 12:16:01 -04:00
parent ea67bb93d8
commit 08d2991244
3 changed files with 48 additions and 11 deletions

View file

@ -36,11 +36,11 @@ const (
maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table
tExpire = 24 * time.Hour // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair
tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed
tRepublish = 24 * time.Hour // the time after which the original publisher must republish a key/value pair //tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database
tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us //tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us
compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port
@ -85,6 +85,10 @@ type DHT struct {
stopWG *sync.WaitGroup stopWG *sync.WaitGroup
// channel is closed when DHT joins network // channel is closed when DHT joins network
joined chan struct{} joined chan struct{}
// lock for announced list
lock *sync.RWMutex
// list of bitmaps that need to be reannounced periodically
announced map[Bitmap]bool
} }
// New returns a DHT pointer. If config is nil, then config will be set to the default config. // New returns a DHT pointer. If config is nil, then config will be set to the default config.
@ -99,12 +103,14 @@ func New(config *Config) (*DHT, error) {
} }
d := &DHT{ d := &DHT{
conf: config, conf: config,
contact: contact, contact: contact,
node: NewNode(contact.ID), node: NewNode(contact.ID),
stop: stopOnce.New(), stop: stopOnce.New(),
stopWG: &sync.WaitGroup{}, stopWG: &sync.WaitGroup{},
joined: make(chan struct{}), joined: make(chan struct{}),
lock: &sync.RWMutex{},
announced: make(map[Bitmap]bool),
} }
return d, nil return d, nil
} }
@ -153,6 +159,8 @@ func (dht *DHT) Start() error {
} }
dht.join() dht.join()
dht.startReannouncer()
log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count())
return nil return nil
} }
@ -224,9 +232,31 @@ func (dht *DHT) Announce(hash Bitmap) error {
wg.Wait() wg.Wait()
dht.lock.Lock()
dht.announced[hash] = true
dht.lock.Unlock()
return nil return nil
} }
func (dht *DHT) startReannouncer() {
dht.stopWG.Add(1)
defer dht.stopWG.Done()
tick := time.NewTicker(tReannounce)
for {
select {
case <-dht.stop.Chan():
return
case <-tick.C:
dht.lock.RLock()
for h := range dht.announced {
go dht.Announce(h)
}
dht.lock.RUnlock()
}
}
}
func (dht *DHT) storeOnNode(hash Bitmap, c Contact) { func (dht *DHT) storeOnNode(hash Bitmap, c Contact) {
dht.stopWG.Add(1) dht.stopWG.Add(1)
defer dht.stopWG.Done() defer dht.stopWG.Done()

View file

@ -16,6 +16,11 @@ import (
"github.com/lyoshenka/bencode" "github.com/lyoshenka/bencode"
) )
// TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap
// TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg)
// https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41
type Contact struct { type Contact struct {
ID Bitmap ID Bitmap
IP net.IP IP net.IP

View file

@ -2,6 +2,8 @@ package dht
import "sync" import "sync"
// TODO: expire stored data after tExpire time
type contactStore struct { type contactStore struct {
// map of blob hashes to (map of node IDs to bools) // map of blob hashes to (map of node IDs to bools)
hashes map[Bitmap]map[Bitmap]bool hashes map[Bitmap]map[Bitmap]bool