reflector.go/dht/dht.go

417 lines
9.6 KiB
Go
Raw Normal View History

2018-03-07 02:15:44 +01:00
package dht
import (
2018-04-05 17:35:57 +02:00
"context"
2018-04-25 00:12:17 +02:00
"fmt"
2018-03-07 02:15:44 +01:00
"net"
"strconv"
2018-04-05 17:35:57 +02:00
"strings"
2018-03-29 03:05:27 +02:00
"sync"
2018-03-07 02:15:44 +01:00
"time"
"github.com/lbryio/errors.go"
2018-03-29 03:05:27 +02:00
"github.com/lbryio/lbry.go/stopOnce"
2018-03-07 02:15:44 +01:00
log "github.com/sirupsen/logrus"
"github.com/spf13/cast"
)
2018-04-03 18:14:04 +02:00
func init() {
//log.SetFormatter(&log.TextFormatter{ForceColors: true})
//log.SetLevel(log.DebugLevel)
}
2018-03-07 02:15:44 +01:00
const network = "udp4"
const alpha = 3 // this is the constant alpha in the spec
const nodeIDLength = 48 // bytes. this is the constant B in the spec
const messageIDLength = 20 // bytes.
const bucketSize = 8 // this is the constant k in the spec
const udpRetry = 3
const udpTimeout = 10 * time.Second
const udpMaxMessageLength = 1024 // I think our longest message is ~676 bytes, so I rounded up
const tExpire = 86400 * time.Second // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
2018-03-09 22:43:30 +01:00
const tRefresh = 3600 * time.Second // the time after which an otherwise unaccessed bucket must be refreshed
const tReplicate = 3600 * time.Second // the interval between Kademlia replication events, when a node is required to publish its entire database
const tRepublish = 86400 * time.Second // the time after which the original publisher must republish a key/value pair
2018-03-07 02:15:44 +01:00
const numBuckets = nodeIDLength * 8
const compactNodeInfoLength = nodeIDLength + 6
2018-03-07 02:15:44 +01:00
const tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated
2018-03-07 02:15:44 +01:00
// packet represents the information receive from udp.
type packet struct {
data []byte
raddr *net.UDPAddr
}
// Config represents the configure of dht.
type Config struct {
// this node's address. format is `ip:port`
Address string
// the seed nodes through which we can join in dht network
SeedNodes []string
// the hex-encoded node id for this node. if string is empty, a random id will be generated
NodeID string
2018-04-05 17:35:57 +02:00
// print the state of the dht every X time
PrintState time.Duration
2018-03-07 02:15:44 +01:00
}
// NewStandardConfig returns a Config pointer with default values.
func NewStandardConfig() *Config {
return &Config{
2018-04-24 23:20:03 +02:00
Address: "0.0.0.0:4444",
2018-03-07 02:15:44 +01:00
SeedNodes: []string{
"lbrynet1.lbry.io:4444",
"lbrynet2.lbry.io:4444",
"lbrynet3.lbry.io:4444",
},
}
}
2018-04-24 23:20:03 +02:00
// UDPConn allows using a mocked connection to test sending/receiving data
type UDPConn interface {
ReadFromUDP([]byte) (int, *net.UDPAddr, error)
WriteToUDP([]byte, *net.UDPAddr) (int, error)
2018-03-29 03:05:27 +02:00
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
2018-04-03 18:14:04 +02:00
Close() error
}
2018-03-07 02:15:44 +01:00
// DHT represents a DHT node.
type DHT struct {
2018-04-05 17:35:57 +02:00
// config
conf *Config
// UDP connection for sending and receiving data
conn UDPConn
// the local dht node
node *Node
// routing table
rt *routingTable
// channel of incoming packets
packets chan packet
2018-04-05 17:35:57 +02:00
// data store
store *peerStore
// transaction manager
tm *transactionManager
// token manager
tokens *tokenManager
2018-04-05 17:35:57 +02:00
// stopper to shut down DHT
stop *stopOnce.Stopper
// wait group for all the things that need to be stopped when DHT shuts down
stopWG *sync.WaitGroup
// channel is closed when DHT joins network
joined chan struct{}
2018-03-07 02:15:44 +01:00
}
// New returns a DHT pointer. If config is nil, then config will be set to the default config.
func New(config *Config) (*DHT, error) {
2018-03-07 02:15:44 +01:00
if config == nil {
config = NewStandardConfig()
}
var id Bitmap
2018-03-07 02:15:44 +01:00
if config.NodeID == "" {
id = RandomBitmapP()
2018-03-07 02:15:44 +01:00
} else {
id = BitmapFromHexP(config.NodeID)
2018-03-07 02:15:44 +01:00
}
ip, port, err := net.SplitHostPort(config.Address)
if err != nil {
return nil, errors.Err(err)
2018-03-09 22:43:30 +01:00
} else if ip == "" {
return nil, errors.Err("address does not contain an IP")
2018-03-09 22:43:30 +01:00
} else if port == "" {
return nil, errors.Err("address does not contain a port")
}
2018-03-09 22:43:30 +01:00
portInt, err := cast.ToIntE(port)
if err != nil {
return nil, errors.Err(err)
}
2018-03-09 22:43:30 +01:00
node := &Node{id: id, ip: net.ParseIP(ip), port: portInt}
if node.ip == nil {
return nil, errors.Err("invalid ip")
2018-03-09 22:43:30 +01:00
}
d := &DHT{
conf: config,
node: node,
rt: newRoutingTable(node),
packets: make(chan packet),
store: newPeerStore(),
2018-03-29 03:05:27 +02:00
stop: stopOnce.New(),
2018-04-03 18:14:04 +02:00
stopWG: &sync.WaitGroup{},
2018-04-05 17:35:57 +02:00
joined: make(chan struct{}),
tokens: &tokenManager{},
2018-03-07 02:15:44 +01:00
}
d.tm = newTransactionManager(d)
d.tokens.Start(tokenSecretRotationInterval)
return d, nil
2018-03-07 02:15:44 +01:00
}
// init initializes global variables.
func (dht *DHT) init() error {
2018-03-07 02:15:44 +01:00
listener, err := net.ListenPacket(network, dht.conf.Address)
if err != nil {
return errors.Err(err)
2018-03-07 02:15:44 +01:00
}
dht.conn = listener.(*net.UDPConn)
2018-04-05 17:35:57 +02:00
if dht.conf.PrintState > 0 {
go func() {
t := time.NewTicker(dht.conf.PrintState)
for {
dht.PrintState()
2018-04-05 22:39:05 +02:00
select {
case <-t.C:
case <-dht.stop.Chan():
return
}
2018-04-05 17:35:57 +02:00
}
}()
}
return nil
2018-03-07 02:15:44 +01:00
}
// listen receives message from udp.
func (dht *DHT) listen() {
2018-04-03 18:14:04 +02:00
dht.stopWG.Add(1)
defer dht.stopWG.Done()
buf := make([]byte, udpMaxMessageLength)
2018-04-03 18:14:04 +02:00
2018-03-29 03:05:27 +02:00
for {
select {
case <-dht.stop.Chan():
return
default:
}
2018-04-05 17:35:57 +02:00
dht.conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) // need this to periodically check shutdown chan
2018-03-29 03:05:27 +02:00
n, raddr, err := dht.conn.ReadFromUDP(buf)
if err != nil {
if e, ok := err.(net.Error); !ok || !e.Timeout() {
2018-03-07 02:15:44 +01:00
log.Errorf("udp read error: %v", err)
}
2018-03-29 03:05:27 +02:00
continue
} else if raddr == nil {
log.Errorf("udp read with no raddr")
continue
2018-03-07 02:15:44 +01:00
}
2018-03-29 03:05:27 +02:00
2018-04-03 18:14:04 +02:00
data := make([]byte, n)
copy(data, buf[:n]) // slices use the same underlying array, so we need a new one for each packet
dht.packets <- packet{data: data, raddr: raddr}
2018-03-29 03:05:27 +02:00
}
2018-03-07 02:15:44 +01:00
}
// join makes current node join the dht network.
func (dht *DHT) join() {
2018-04-25 00:12:17 +02:00
defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done
2018-04-03 18:14:04 +02:00
log.Debugf("[%s] joining network", dht.node.id.HexShort())
2018-04-25 00:12:17 +02:00
// ping nodes, which gets their real node IDs and adds them to the routing table
atLeastOneNodeResponded := false
2018-03-07 02:15:44 +01:00
for _, addr := range dht.conf.SeedNodes {
2018-04-25 00:12:17 +02:00
err := dht.Ping(addr)
2018-03-07 02:15:44 +01:00
if err != nil {
2018-04-25 00:12:17 +02:00
log.Error(errors.Prefix(fmt.Sprintf("[%s] join", dht.node.id.HexShort()), err))
} else {
atLeastOneNodeResponded = true
2018-03-07 02:15:44 +01:00
}
2018-04-25 00:12:17 +02:00
}
2018-03-07 02:15:44 +01:00
2018-04-25 00:12:17 +02:00
if !atLeastOneNodeResponded {
log.Errorf("[%s] join: no nodes responded to initial ping", dht.node.id.HexShort())
return
2018-03-29 03:05:27 +02:00
}
2018-03-07 02:15:44 +01:00
2018-03-29 03:05:27 +02:00
// now call iterativeFind on yourself
_, err := dht.Get(dht.node.id)
2018-03-29 03:05:27 +02:00
if err != nil {
2018-04-03 18:14:04 +02:00
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
2018-03-07 02:15:44 +01:00
}
}
func (dht *DHT) runHandler() {
2018-04-03 18:14:04 +02:00
dht.stopWG.Add(1)
defer dht.stopWG.Done()
2018-03-07 02:15:44 +01:00
var pkt packet
for {
select {
case pkt = <-dht.packets:
handlePacket(dht, pkt)
2018-03-29 03:05:27 +02:00
case <-dht.stop.Chan():
return
2018-03-07 02:15:44 +01:00
}
}
}
2018-03-29 03:05:27 +02:00
// Start starts the dht
2018-04-25 00:12:17 +02:00
func (dht *DHT) Start() error {
err := dht.init()
2018-03-07 02:15:44 +01:00
if err != nil {
2018-04-25 00:12:17 +02:00
return err
2018-03-07 02:15:44 +01:00
}
2018-03-29 03:05:27 +02:00
go dht.listen()
go dht.runHandler()
dht.join()
log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.node.Addr().String(), dht.rt.Count())
2018-04-25 00:12:17 +02:00
return nil
2018-04-05 17:35:57 +02:00
}
func (dht *DHT) WaitUntilJoined() {
if dht.joined == nil {
panic("dht not initialized")
}
<-dht.joined
2018-03-07 02:15:44 +01:00
}
2018-03-29 03:05:27 +02:00
// Shutdown shuts down the dht
func (dht *DHT) Shutdown() {
2018-04-03 18:14:04 +02:00
log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort())
2018-03-29 03:05:27 +02:00
dht.stop.Stop()
2018-04-03 18:14:04 +02:00
dht.stopWG.Wait()
dht.tokens.Stop()
2018-04-03 18:14:04 +02:00
dht.conn.Close()
2018-04-05 17:35:57 +02:00
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
2018-03-29 03:05:27 +02:00
}
2018-04-25 00:12:17 +02:00
// Get returns the list of nodes that have the blob for the given hash
func (dht *DHT) Ping(addr string) error {
raddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
return err
}
tmpNode := Node{id: RandomBitmapP(), ip: raddr.IP, port: raddr.Port}
res := dht.tm.Send(tmpNode, Request{Method: pingMethod})
if res == nil {
return errors.Err("no response from node %s", addr)
}
return nil
}
// Get returns the list of nodes that have the blob for the given hash
func (dht *DHT) Get(hash Bitmap) ([]Node, error) {
2018-04-03 19:38:01 +02:00
nf := newNodeFinder(dht, hash, true)
2018-03-29 03:05:27 +02:00
res, err := nf.Find()
if err != nil {
return nil, err
2018-03-29 03:05:27 +02:00
}
if res.Found {
return res.Nodes, nil
}
return nil, nil
2018-03-29 03:05:27 +02:00
}
2018-04-04 17:43:27 +02:00
// Announce announces to the DHT that this node has the blob for the given hash
func (dht *DHT) Announce(hash Bitmap) error {
2018-04-03 19:38:01 +02:00
nf := newNodeFinder(dht, hash, false)
2018-03-29 03:05:27 +02:00
res, err := nf.Find()
if err != nil {
2018-04-03 19:38:01 +02:00
return err
2018-03-29 03:05:27 +02:00
}
2018-04-03 19:38:01 +02:00
// TODO: if this node is closer than farthest peer, store locally and pop farthest peer
2018-04-03 19:38:01 +02:00
for _, node := range res.Nodes {
go dht.storeOnNode(hash, node)
2018-04-03 19:38:01 +02:00
}
return nil
2018-03-29 03:05:27 +02:00
}
func (dht *DHT) storeOnNode(hash Bitmap, node Node) {
dht.stopWG.Add(1)
defer dht.stopWG.Done()
resCh := dht.tm.SendAsync(context.Background(), node, Request{
Method: findValueMethod,
Arg: &hash,
})
var res *Response
select {
case res = <-resCh:
case <-dht.stop.Chan():
return
}
if res == nil {
return // request timed out
}
dht.tm.SendAsync(context.Background(), node, Request{
Method: storeMethod,
StoreArgs: &storeArgs{
BlobHash: hash,
Value: storeArgsValue{
Token: res.Token,
LbryID: dht.node.id,
Port: dht.node.port,
},
},
})
}
2018-04-05 17:35:57 +02:00
func (dht *DHT) PrintState() {
log.Printf("DHT node %s at %s", dht.node.String(), time.Now().Format(time.RFC822Z))
2018-04-05 17:35:57 +02:00
log.Printf("Outstanding transactions: %d", dht.tm.Count())
log.Printf("Stored hashes: %d", dht.store.CountStoredHashes())
log.Printf("Buckets:")
for _, line := range strings.Split(dht.rt.BucketInfo(), "\n") {
log.Println(line)
}
}
func printNodeList(list []Node) {
for i, n := range list {
log.Printf("%d) %s", i, n.String())
}
}
func MakeTestDHT(numNodes int) []*DHT {
if numNodes < 1 {
return nil
}
ip := "127.0.0.1"
firstPort := 21000
dhts := make([]*DHT, numNodes)
for i := 0; i < numNodes; i++ {
seeds := []string{}
if i > 0 {
seeds = []string{ip + ":" + strconv.Itoa(firstPort)}
}
dht, err := New(&Config{Address: ip + ":" + strconv.Itoa(firstPort+i), NodeID: RandomBitmapP().Hex(), SeedNodes: seeds})
if err != nil {
panic(err)
}
go dht.Start()
dht.WaitUntilJoined()
dhts[i] = dht
2018-03-29 03:05:27 +02:00
}
return dhts
2018-03-29 03:05:27 +02:00
}