diff --git a/dht/dht.go b/dht/dht.go index e92a70c..76877ef 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -36,11 +36,11 @@ const ( maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table - tExpire = 24 * time.Hour // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date - tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed - tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database - tRepublish = 24 * time.Hour // the time after which the original publisher must republish a key/value pair - tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us + tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date + tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair + tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed + //tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database + //tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port @@ -85,6 +85,10 @@ type DHT struct { stopWG *sync.WaitGroup // channel is closed when DHT joins network joined chan struct{} + // lock for announced list + lock *sync.RWMutex + // list of bitmaps that need to be reannounced periodically + announced map[Bitmap]bool } // New returns a DHT pointer. If config is nil, then config will be set to the default config. @@ -99,12 +103,14 @@ func New(config *Config) (*DHT, error) { } d := &DHT{ - conf: config, - contact: contact, - node: NewNode(contact.ID), - stop: stopOnce.New(), - stopWG: &sync.WaitGroup{}, - joined: make(chan struct{}), + conf: config, + contact: contact, + node: NewNode(contact.ID), + stop: stopOnce.New(), + stopWG: &sync.WaitGroup{}, + joined: make(chan struct{}), + lock: &sync.RWMutex{}, + announced: make(map[Bitmap]bool), } return d, nil } @@ -153,6 +159,8 @@ func (dht *DHT) Start() error { } dht.join() + dht.startReannouncer() + log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) return nil } @@ -224,9 +232,31 @@ func (dht *DHT) Announce(hash Bitmap) error { wg.Wait() + dht.lock.Lock() + dht.announced[hash] = true + dht.lock.Unlock() + return nil } +func (dht *DHT) startReannouncer() { + dht.stopWG.Add(1) + defer dht.stopWG.Done() + tick := time.NewTicker(tReannounce) + for { + select { + case <-dht.stop.Chan(): + return + case <-tick.C: + dht.lock.RLock() + for h := range dht.announced { + go dht.Announce(h) + } + dht.lock.RUnlock() + } + } +} + func (dht *DHT) storeOnNode(hash Bitmap, c Contact) { dht.stopWG.Add(1) defer dht.stopWG.Done() diff --git a/dht/routing_table.go b/dht/routing_table.go index 83f98a1..6b18fd6 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -16,6 +16,11 @@ import ( "github.com/lyoshenka/bencode" ) +// TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap + +// TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg) +// https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41 + type Contact struct { ID Bitmap IP net.IP diff --git a/dht/store.go b/dht/store.go index 9cae103..25a85d8 100644 --- a/dht/store.go +++ b/dht/store.go @@ -2,6 +2,8 @@ package dht import "sync" +// TODO: expire stored data after tExpire time + type contactStore struct { // map of blob hashes to (map of node IDs to bools) hashes map[Bitmap]map[Bitmap]bool