2018-07-26 22:05:27 +02:00
package dht
import (
"container/ring"
"context"
2018-07-27 03:30:22 +02:00
"math"
2018-07-26 22:05:27 +02:00
"sync"
"time"
2019-01-09 23:37:29 +01:00
"github.com/lbryio/lbry.go/dht/bits"
"github.com/lbryio/lbry.go/extras/errors"
2018-08-09 20:56:49 +02:00
2018-07-26 22:05:27 +02:00
"golang.org/x/time/rate"
)
type queueEdit struct {
hash bits . Bitmap
add bool
}
2018-07-27 03:30:22 +02:00
const (
announceStarted = "started"
announceFinishd = "finished"
)
type announceNotification struct {
hash bits . Bitmap
action string
err error
}
2018-07-26 22:05:27 +02:00
// Add adds the hash to the list of hashes this node is announcing
func ( dht * DHT ) Add ( hash bits . Bitmap ) {
dht . announceAddRemove <- queueEdit { hash : hash , add : true }
}
// Remove removes the hash from the list of hashes this node is announcing
func ( dht * DHT ) Remove ( hash bits . Bitmap ) {
dht . announceAddRemove <- queueEdit { hash : hash , add : false }
}
func ( dht * DHT ) runAnnouncer ( ) {
type hashAndTime struct {
hash bits . Bitmap
lastAnnounce time . Time
}
2018-07-27 03:30:22 +02:00
var queue * ring . Ring
2018-07-26 22:05:27 +02:00
hashes := make ( map [ bits . Bitmap ] * ring . Ring )
var announceNextHash <- chan time . Time
2018-07-27 03:30:22 +02:00
timer := time . NewTimer ( math . MaxInt64 )
timer . Stop ( )
limitCh := make ( chan time . Time )
dht . grp . Add ( 1 )
go func ( ) {
defer dht . grp . Done ( )
limiter := rate . NewLimiter ( rate . Limit ( dht . conf . AnnounceRate ) , dht . conf . AnnounceRate )
for {
2018-08-07 17:38:55 +02:00
err := limiter . Wait ( context . Background ( ) ) // TODO: should use grp.ctx somehow? so when grp is closed, wait returns
if err != nil {
log . Error ( errors . Prefix ( "rate limiter" , err ) )
continue
}
2018-07-27 03:30:22 +02:00
select {
case limitCh <- time . Now ( ) :
case <- dht . grp . Ch ( ) :
return
}
}
} ( )
maintenance := time . NewTicker ( 1 * time . Minute )
// TODO: work to space hash announces out so they aren't bunched up around the reannounce time. track time since last announce. if its been more than the ideal time (reannounce time / numhashes), start announcing hashes early
2018-07-26 22:05:27 +02:00
for {
select {
case <- dht . grp . Ch ( ) :
2018-08-07 01:52:09 +02:00
return
2018-07-27 03:30:22 +02:00
case <- maintenance . C :
maxAnnounce := dht . conf . AnnounceRate * int ( dht . conf . ReannounceTime . Seconds ( ) )
if len ( hashes ) > maxAnnounce {
// TODO: send this to slack
log . Warnf ( "DHT has %d hashes, but can only announce %d hashes in the %s reannounce window. Raise the announce rate or spawn more nodes." ,
len ( hashes ) , maxAnnounce , dht . conf . ReannounceTime . String ( ) )
}
2018-07-26 22:05:27 +02:00
case change := <- dht . announceAddRemove :
if change . add {
2018-07-27 03:30:22 +02:00
if _ , exists := hashes [ change . hash ] ; exists {
continue
}
2018-07-26 22:05:27 +02:00
r := ring . New ( 1 )
r . Value = hashAndTime { hash : change . hash }
2018-07-27 03:30:22 +02:00
if queue != nil {
queue . Prev ( ) . Link ( r )
}
2018-07-26 22:05:27 +02:00
queue = r
hashes [ change . hash ] = r
2018-07-27 03:30:22 +02:00
announceNextHash = limitCh // announce next hash ASAP
2018-07-26 22:05:27 +02:00
} else {
2018-07-27 03:30:22 +02:00
r , exists := hashes [ change . hash ]
if ! exists {
continue
}
delete ( hashes , change . hash )
if len ( hashes ) == 0 {
queue = ring . New ( 0 )
announceNextHash = nil // no hashes to announce, wait indefinitely
} else {
if r == queue {
queue = queue . Next ( ) // don't lose our pointer
2018-07-26 22:05:27 +02:00
}
2018-07-27 03:30:22 +02:00
r . Prev ( ) . Link ( r . Next ( ) )
2018-07-26 22:05:27 +02:00
}
}
case <- announceNextHash :
dht . grp . Add ( 1 )
ht := queue . Value . ( hashAndTime )
if ! ht . lastAnnounce . IsZero ( ) {
nextAnnounce := ht . lastAnnounce . Add ( dht . conf . ReannounceTime )
2018-07-27 03:30:22 +02:00
if nextAnnounce . After ( time . Now ( ) ) {
2018-07-26 22:05:27 +02:00
timer . Reset ( time . Until ( nextAnnounce ) )
announceNextHash = timer . C // wait until next hash should be announced
continue
}
}
2018-07-27 03:30:22 +02:00
if dht . conf . AnnounceNotificationCh != nil {
dht . conf . AnnounceNotificationCh <- announceNotification {
hash : ht . hash ,
action : announceStarted ,
}
}
2018-07-26 22:05:27 +02:00
go func ( hash bits . Bitmap ) {
defer dht . grp . Done ( )
err := dht . announce ( hash )
if err != nil {
log . Error ( errors . Prefix ( "announce" , err ) )
}
2018-07-27 03:30:22 +02:00
if dht . conf . AnnounceNotificationCh != nil {
dht . conf . AnnounceNotificationCh <- announceNotification {
hash : ht . hash ,
action : announceFinishd ,
err : err ,
}
}
2018-07-26 22:05:27 +02:00
} ( ht . hash )
queue . Value = hashAndTime { hash : ht . hash , lastAnnounce : time . Now ( ) }
queue = queue . Next ( )
2018-07-27 03:30:22 +02:00
announceNextHash = limitCh // announce next hash ASAP
2018-07-26 22:05:27 +02:00
}
}
}
// Announce announces to the DHT that this node has the blob for the given hash
func ( dht * DHT ) announce ( hash bits . Bitmap ) error {
contacts , _ , err := FindContacts ( dht . node , hash , false , dht . grp . Child ( ) )
if err != nil {
return err
}
// self-store if we found less than K contacts, or we're closer than the farthest contact
if len ( contacts ) < bucketSize {
contacts = append ( contacts , dht . contact )
} else if hash . Closer ( dht . node . id , contacts [ bucketSize - 1 ] . ID ) {
contacts [ bucketSize - 1 ] = dht . contact
}
wg := & sync . WaitGroup { }
for _ , c := range contacts {
wg . Add ( 1 )
go func ( c Contact ) {
dht . store ( hash , c )
wg . Done ( )
} ( c )
}
wg . Wait ( )
return nil
}
func ( dht * DHT ) store ( hash bits . Bitmap , c Contact ) {
if dht . contact . ID == c . ID {
// self-store
c . PeerPort = dht . conf . PeerProtocolPort
dht . node . Store ( hash , c )
return
}
dht . node . SendAsync ( c , Request {
Method : storeMethod ,
StoreArgs : & storeArgs {
BlobHash : hash ,
Value : storeArgsValue {
Token : dht . tokenCache . Get ( c , hash , dht . grp . Ch ( ) ) ,
LbryID : dht . contact . ID ,
Port : dht . conf . PeerProtocolPort ,
} ,
} ,
} )
}