2018-03-07 02:15:44 +01:00
package dht
import (
2018-04-25 00:12:17 +02:00
"fmt"
2018-03-07 02:15:44 +01:00
"net"
2018-06-21 17:26:48 +02:00
"strconv"
2018-04-05 17:35:57 +02:00
"strings"
2018-03-29 03:05:27 +02:00
"sync"
2018-03-07 02:15:44 +01:00
"time"
2018-06-21 17:26:48 +02:00
"github.com/lbryio/reflector.go/dht/bits"
peerproto "github.com/lbryio/reflector.go/peer"
2018-05-24 23:49:43 +02:00
"github.com/lbryio/lbry.go/errors"
2018-06-25 22:49:40 +02:00
"github.com/lbryio/lbry.go/stop"
2018-03-24 00:18:00 +01:00
2018-06-25 22:49:40 +02:00
"github.com/sirupsen/logrus"
2018-06-21 17:26:48 +02:00
"github.com/spf13/cast"
2018-03-07 02:15:44 +01:00
)
2018-06-25 22:49:40 +02:00
var log * logrus . Logger
func UseLogger ( l * logrus . Logger ) {
log = l
}
2018-04-03 18:14:04 +02:00
func init ( ) {
2018-06-25 22:49:40 +02:00
log = logrus . StandardLogger ( )
2018-04-03 18:14:04 +02:00
//log.SetFormatter(&log.TextFormatter{ForceColors: true})
//log.SetLevel(log.DebugLevel)
}
2018-05-01 22:18:38 +02:00
const (
2018-06-21 17:26:48 +02:00
Network = "udp4"
DefaultPort = 4444
2018-03-09 01:50:18 +01:00
2018-05-01 22:18:38 +02:00
// TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config
// TODO: alternatively, have a global Config for constants. at least that way tests can modify the values
2018-06-14 17:48:02 +02:00
alpha = 3 // this is the constant alpha in the spec
bucketSize = 8 // this is the constant k in the spec
nodeIDLength = bits . NumBytes // bytes. this is the constant B in the spec
messageIDLength = 20 // bytes.
2018-03-24 00:18:00 +01:00
2018-06-25 21:48:57 +02:00
udpRetry = 1
2018-05-01 22:18:38 +02:00
udpTimeout = 5 * time . Second
2018-06-21 21:05:45 +02:00
udpMaxMessageLength = 4096 // bytes. I think our longest message is ~676 bytes, so I rounded up to 1024
// scratch that. a findValue could return more than K results if a lot of nodes are storing that value, so we need more buffer
2018-03-09 01:50:18 +01:00
2018-05-13 22:02:46 +02:00
maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table
2018-05-30 03:38:55 +02:00
//tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
2018-05-22 18:16:01 +02:00
tReannounce = 50 * time . Minute // the time after which the original publisher must republish a key/value pair
tRefresh = 1 * time . Hour // the time after which an otherwise unaccessed bucket must be refreshed
//tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database
//tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us
2018-03-09 01:50:18 +01:00
2018-05-01 22:18:38 +02:00
compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port
2018-03-07 02:15:44 +01:00
2018-05-01 22:18:38 +02:00
tokenSecretRotationInterval = 5 * time . Minute // how often the token-generating secret is rotated
)
2018-04-05 22:05:28 +02:00
2018-03-07 02:15:44 +01:00
// Config represents the configure of dht.
type Config struct {
// this node's address. format is `ip:port`
Address string
// the seed nodes through which we can join in dht network
SeedNodes [ ] string
// the hex-encoded node id for this node. if string is empty, a random id will be generated
NodeID string
2018-04-05 17:35:57 +02:00
// print the state of the dht every X time
PrintState time . Duration
2018-06-21 17:26:48 +02:00
// the port that clients can use to download blobs using the LBRY peer protocol
PeerProtocolPort int
2018-03-07 02:15:44 +01:00
}
// NewStandardConfig returns a Config pointer with default values.
func NewStandardConfig ( ) * Config {
return & Config {
2018-06-21 17:26:48 +02:00
Address : "0.0.0.0:" + strconv . Itoa ( DefaultPort ) ,
2018-03-07 02:15:44 +01:00
SeedNodes : [ ] string {
"lbrynet1.lbry.io:4444" ,
"lbrynet2.lbry.io:4444" ,
"lbrynet3.lbry.io:4444" ,
} ,
2018-06-21 17:26:48 +02:00
PeerProtocolPort : peerproto . DefaultPort ,
2018-03-07 02:15:44 +01:00
}
}
// DHT represents a DHT node.
type DHT struct {
2018-04-05 17:35:57 +02:00
// config
conf * Config
2018-04-28 02:16:12 +02:00
// local contact
contact Contact
// node
2018-04-05 17:35:57 +02:00
node * Node
2018-06-25 22:49:40 +02:00
// stopGroup to shut down DHT
grp * stop . Group
2018-04-05 17:35:57 +02:00
// channel is closed when DHT joins network
joined chan struct { }
2018-05-22 18:16:01 +02:00
// lock for announced list
lock * sync . RWMutex
// list of bitmaps that need to be reannounced periodically
2018-06-14 17:48:02 +02:00
announced map [ bits . Bitmap ] bool
2018-06-21 19:40:22 +02:00
// cache for store tokens
tokenCache * tokenCache
2018-03-07 02:15:44 +01:00
}
// New returns a DHT pointer. If config is nil, then config will be set to the default config.
2018-06-15 04:30:37 +02:00
func New ( config * Config ) * DHT {
2018-03-07 02:15:44 +01:00
if config == nil {
config = NewStandardConfig ( )
}
2018-03-24 00:18:00 +01:00
d := & DHT {
2018-05-22 18:16:01 +02:00
conf : config ,
2018-06-25 22:49:40 +02:00
grp : stop . New ( ) ,
2018-05-22 18:16:01 +02:00
joined : make ( chan struct { } ) ,
lock : & sync . RWMutex { } ,
2018-06-14 17:48:02 +02:00
announced : make ( map [ bits . Bitmap ] bool ) ,
2018-03-07 02:15:44 +01:00
}
2018-06-15 04:30:37 +02:00
return d
}
func ( dht * DHT ) connect ( conn UDPConn ) error {
contact , err := getContact ( dht . conf . NodeID , dht . conf . Address )
if err != nil {
return err
}
dht . contact = contact
dht . node = NewNode ( contact . ID )
2018-06-21 19:40:22 +02:00
dht . tokenCache = newTokenCache ( dht . node , tokenSecretRotationInterval )
2018-06-15 04:30:37 +02:00
err = dht . node . Connect ( conn )
if err != nil {
return err
}
return nil
}
// Start starts the dht
func ( dht * DHT ) Start ( ) error {
listener , err := net . ListenPacket ( Network , dht . conf . Address )
if err != nil {
return errors . Err ( err )
}
conn := listener . ( * net . UDPConn )
err = dht . connect ( conn )
if err != nil {
return err
}
dht . join ( )
2018-06-21 19:40:22 +02:00
log . Infof ( "[%s] DHT ready on %s (%d nodes found during join)" ,
2018-06-15 04:30:37 +02:00
dht . node . id . HexShort ( ) , dht . contact . Addr ( ) . String ( ) , dht . node . rt . Count ( ) )
go dht . startReannouncer ( )
return nil
2018-03-07 02:15:44 +01:00
}
// join makes current node join the dht network.
func ( dht * DHT ) join ( ) {
2018-04-25 00:12:17 +02:00
defer close ( dht . joined ) // if anyone's waiting for join to finish, they'll know its done
2018-06-21 19:40:22 +02:00
log . Infof ( "[%s] joining DHT network" , dht . node . id . HexShort ( ) )
2018-04-25 00:12:17 +02:00
// ping nodes, which gets their real node IDs and adds them to the routing table
atLeastOneNodeResponded := false
2018-03-07 02:15:44 +01:00
for _ , addr := range dht . conf . SeedNodes {
2018-04-25 00:12:17 +02:00
err := dht . Ping ( addr )
2018-03-07 02:15:44 +01:00
if err != nil {
2018-04-25 00:12:17 +02:00
log . Error ( errors . Prefix ( fmt . Sprintf ( "[%s] join" , dht . node . id . HexShort ( ) ) , err ) )
} else {
atLeastOneNodeResponded = true
2018-03-07 02:15:44 +01:00
}
2018-04-25 00:12:17 +02:00
}
2018-03-07 02:15:44 +01:00
2018-04-25 00:12:17 +02:00
if ! atLeastOneNodeResponded {
log . Errorf ( "[%s] join: no nodes responded to initial ping" , dht . node . id . HexShort ( ) )
return
2018-03-29 03:05:27 +02:00
}
2018-03-07 02:15:44 +01:00
2018-03-29 03:05:27 +02:00
// now call iterativeFind on yourself
2018-06-25 22:49:40 +02:00
_ , _ , err := FindContacts ( dht . node , dht . node . id , false , dht . grp . Child ( ) )
2018-03-29 03:05:27 +02:00
if err != nil {
2018-04-03 18:14:04 +02:00
log . Errorf ( "[%s] join: %s" , dht . node . id . HexShort ( ) , err . Error ( ) )
2018-03-07 02:15:44 +01:00
}
2018-05-24 19:05:05 +02:00
2018-06-13 18:45:47 +02:00
// TODO: after joining, refresh all buckets further away than our closest neighbor
2018-05-24 19:05:05 +02:00
// http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html#join
2018-03-07 02:15:44 +01:00
}
2018-05-30 03:38:55 +02:00
// WaitUntilJoined blocks until the node joins the network.
2018-04-05 17:35:57 +02:00
func ( dht * DHT ) WaitUntilJoined ( ) {
if dht . joined == nil {
panic ( "dht not initialized" )
}
<- dht . joined
2018-03-07 02:15:44 +01:00
}
2018-03-29 03:05:27 +02:00
// Shutdown shuts down the dht
func ( dht * DHT ) Shutdown ( ) {
2018-04-03 18:14:04 +02:00
log . Debugf ( "[%s] DHT shutting down" , dht . node . id . HexShort ( ) )
2018-06-25 22:49:40 +02:00
dht . grp . StopAndWait ( )
2018-04-28 02:16:12 +02:00
dht . node . Shutdown ( )
2018-04-05 17:35:57 +02:00
log . Debugf ( "[%s] DHT stopped" , dht . node . id . HexShort ( ) )
2018-03-29 03:05:27 +02:00
}
2018-05-30 03:38:55 +02:00
// Ping pings a given address, creates a temporary contact for sending a message, and returns an error if communication
// fails.
2018-04-25 00:12:17 +02:00
func ( dht * DHT ) Ping ( addr string ) error {
2018-06-15 04:30:37 +02:00
raddr , err := net . ResolveUDPAddr ( Network , addr )
2018-04-25 00:12:17 +02:00
if err != nil {
return err
}
2018-06-14 17:48:02 +02:00
tmpNode := Contact { ID : bits . Rand ( ) , IP : raddr . IP , Port : raddr . Port }
2018-06-19 19:47:13 +02:00
res := dht . node . Send ( tmpNode , Request { Method : pingMethod } , SendOptions { skipIDCheck : true } )
2018-04-25 00:12:17 +02:00
if res == nil {
return errors . Err ( "no response from node %s" , addr )
}
return nil
}
2018-04-03 20:00:35 +02:00
// Get returns the list of nodes that have the blob for the given hash
2018-06-14 17:48:02 +02:00
func ( dht * DHT ) Get ( hash bits . Bitmap ) ( [ ] Contact , error ) {
2018-06-25 22:49:40 +02:00
contacts , found , err := FindContacts ( dht . node , hash , true , dht . grp . Child ( ) )
2018-03-29 03:05:27 +02:00
if err != nil {
2018-04-03 20:00:35 +02:00
return nil , err
2018-03-29 03:05:27 +02:00
}
2018-04-03 20:00:35 +02:00
2018-06-13 18:45:47 +02:00
if found {
return contacts , nil
2018-04-03 20:00:35 +02:00
}
return nil , nil
2018-03-29 03:05:27 +02:00
}
2018-06-15 04:30:37 +02:00
// Add adds the hash to the list of hashes this node has
2018-06-19 19:47:13 +02:00
func ( dht * DHT ) Add ( hash bits . Bitmap ) {
2018-06-15 04:30:37 +02:00
// TODO: calling Add several times quickly could cause it to be announced multiple times before dht.announced[hash] is set to true
dht . lock . RLock ( )
exists := dht . announced [ hash ]
dht . lock . RUnlock ( )
if exists {
2018-06-19 19:47:13 +02:00
return
2018-06-15 04:30:37 +02:00
}
2018-06-19 19:47:13 +02:00
2018-06-25 22:49:40 +02:00
dht . grp . Add ( 1 )
2018-06-19 19:47:13 +02:00
go func ( ) {
2018-06-25 22:49:40 +02:00
defer dht . grp . Done ( )
2018-06-19 19:47:13 +02:00
err := dht . announce ( hash )
if err != nil {
log . Error ( errors . Prefix ( "error announcing bitmap" , err ) )
}
} ( )
2018-06-15 04:30:37 +02:00
}
2018-04-04 17:43:27 +02:00
// Announce announces to the DHT that this node has the blob for the given hash
2018-06-15 04:30:37 +02:00
func ( dht * DHT ) announce ( hash bits . Bitmap ) error {
2018-06-25 22:49:40 +02:00
contacts , _ , err := FindContacts ( dht . node , hash , false , dht . grp . Child ( ) )
2018-03-29 03:05:27 +02:00
if err != nil {
2018-04-03 19:38:01 +02:00
return err
2018-03-29 03:05:27 +02:00
}
2018-04-03 19:38:01 +02:00
2018-05-22 18:27:49 +02:00
// if we found less than K contacts, or current node is closer than farthest contact
2018-06-19 19:47:13 +02:00
if len ( contacts ) < bucketSize {
// append self to contacts, and self-store
contacts = append ( contacts , dht . contact )
2018-06-25 21:48:57 +02:00
} else if hash . Closer ( dht . node . id , contacts [ bucketSize - 1 ] . ID ) {
2018-05-22 18:27:49 +02:00
// pop last contact, and self-store instead
2018-06-13 18:45:47 +02:00
contacts [ bucketSize - 1 ] = dht . contact
2018-05-22 18:27:49 +02:00
}
2018-04-05 22:05:28 +02:00
2018-05-19 19:05:30 +02:00
wg := & sync . WaitGroup { }
2018-06-13 18:45:47 +02:00
for _ , c := range contacts {
2018-05-19 19:05:30 +02:00
wg . Add ( 1 )
go func ( c Contact ) {
dht . storeOnNode ( hash , c )
wg . Done ( )
} ( c )
2018-04-03 19:38:01 +02:00
}
2018-05-19 19:05:30 +02:00
wg . Wait ( )
2018-05-22 18:16:01 +02:00
dht . lock . Lock ( )
dht . announced [ hash ] = true
dht . lock . Unlock ( )
2018-04-03 19:38:01 +02:00
return nil
2018-03-29 03:05:27 +02:00
}
2018-05-22 18:16:01 +02:00
func ( dht * DHT ) startReannouncer ( ) {
tick := time . NewTicker ( tReannounce )
for {
select {
2018-06-25 22:49:40 +02:00
case <- dht . grp . Ch ( ) :
2018-05-22 18:16:01 +02:00
return
case <- tick . C :
dht . lock . RLock ( )
for h := range dht . announced {
2018-06-25 22:49:40 +02:00
dht . grp . Add ( 1 )
2018-06-14 17:48:02 +02:00
go func ( bm bits . Bitmap ) {
2018-06-25 22:49:40 +02:00
defer dht . grp . Done ( )
2018-06-15 04:30:37 +02:00
err := dht . announce ( bm )
2018-06-13 18:45:47 +02:00
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error re-announcing bitmap - " , err )
}
} ( h )
2018-05-22 18:16:01 +02:00
}
dht . lock . RUnlock ( )
}
}
}
2018-06-14 17:48:02 +02:00
func ( dht * DHT ) storeOnNode ( hash bits . Bitmap , c Contact ) {
2018-05-22 18:27:49 +02:00
// self-store
2018-06-19 19:47:13 +02:00
if dht . contact . ID == c . ID {
2018-05-22 18:27:49 +02:00
dht . node . Store ( hash , c )
return
}
2018-06-25 22:49:40 +02:00
token := dht . tokenCache . Get ( c , hash , dht . grp . Ch ( ) )
2018-04-05 22:05:28 +02:00
2018-06-25 21:56:45 +02:00
resCh := dht . node . SendAsync ( c , Request {
2018-04-05 22:05:28 +02:00
Method : storeMethod ,
StoreArgs : & storeArgs {
BlobHash : hash ,
Value : storeArgsValue {
2018-06-21 19:40:22 +02:00
Token : token ,
2018-05-19 19:05:30 +02:00
LbryID : dht . contact . ID ,
2018-06-21 17:26:48 +02:00
Port : dht . conf . PeerProtocolPort ,
2018-04-05 22:05:28 +02:00
} ,
} ,
} )
2018-05-13 22:02:46 +02:00
go func ( ) {
select {
case <- resCh :
2018-06-25 22:49:40 +02:00
case <- dht . grp . Ch ( ) :
2018-05-13 22:02:46 +02:00
}
} ( )
2018-04-05 22:05:28 +02:00
}
2018-05-30 03:38:55 +02:00
// PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well
// as current bucket information.
2018-04-05 17:35:57 +02:00
func ( dht * DHT ) PrintState ( ) {
2018-04-28 02:16:12 +02:00
log . Printf ( "DHT node %s at %s" , dht . contact . String ( ) , time . Now ( ) . Format ( time . RFC822Z ) )
log . Printf ( "Outstanding transactions: %d" , dht . node . CountActiveTransactions ( ) )
log . Printf ( "Stored hashes: %d" , dht . node . store . CountStoredHashes ( ) )
2018-04-05 17:35:57 +02:00
log . Printf ( "Buckets:" )
2018-04-28 02:16:12 +02:00
for _ , line := range strings . Split ( dht . node . rt . BucketInfo ( ) , "\n" ) {
2018-04-05 17:35:57 +02:00
log . Println ( line )
}
}
2018-06-14 17:48:02 +02:00
func ( dht DHT ) ID ( ) bits . Bitmap {
return dht . contact . ID
}
2018-04-28 02:16:12 +02:00
func getContact ( nodeID , addr string ) ( Contact , error ) {
var c Contact
if nodeID == "" {
2018-06-14 17:48:02 +02:00
c . ID = bits . Rand ( )
2018-04-28 02:16:12 +02:00
} else {
2018-06-14 17:48:02 +02:00
c . ID = bits . FromHexP ( nodeID )
2018-04-28 02:16:12 +02:00
}
ip , port , err := net . SplitHostPort ( addr )
if err != nil {
return c , errors . Err ( err )
} else if ip == "" {
return c , errors . Err ( "address does not contain an IP" )
} else if port == "" {
return c , errors . Err ( "address does not contain a port" )
}
2018-05-19 19:05:30 +02:00
c . IP = net . ParseIP ( ip )
if c . IP == nil {
2018-04-28 02:16:12 +02:00
return c , errors . Err ( "invalid ip" )
}
2018-05-19 19:05:30 +02:00
c . Port , err = cast . ToIntE ( port )
2018-04-28 02:16:12 +02:00
if err != nil {
return c , errors . Err ( err )
}
return c , nil
}