lbry.go/dht/dht.go

232 lines
5.6 KiB
Go
Raw Normal View History

2018-03-07 02:15:44 +01:00
package dht
import (
2018-04-25 00:12:17 +02:00
"fmt"
2018-03-07 02:15:44 +01:00
"net"
2018-04-05 17:35:57 +02:00
"strings"
2018-03-07 02:15:44 +01:00
"time"
"github.com/lbryio/lbry.go/v3/dht/bits"
"github.com/lbryio/lbry.go/v3/extras/stop"
"github.com/cockroachdb/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cast"
2018-03-07 02:15:44 +01:00
)
var log *logrus.Logger
func UseLogger(l *logrus.Logger) {
log = l
}
2018-04-03 18:14:04 +02:00
func init() {
log = logrus.StandardLogger()
2018-04-03 18:14:04 +02:00
//log.SetFormatter(&log.TextFormatter{ForceColors: true})
//log.SetLevel(log.DebugLevel)
}
2018-03-07 02:15:44 +01:00
// DHT represents a DHT node.
type DHT struct {
2018-04-05 17:35:57 +02:00
// config
conf *Config
2018-04-28 02:16:12 +02:00
// local contact
contact Contact
// node
2018-04-05 17:35:57 +02:00
node *Node
// stopGroup to shut down DHT
grp *stop.Group
2018-04-05 17:35:57 +02:00
// channel is closed when DHT joins network
joined chan struct{}
2018-06-21 19:40:22 +02:00
// cache for store tokens
tokenCache *tokenCache
2018-07-26 22:05:27 +02:00
// hashes that need to be put into the announce queue or removed from the queue
announceAddRemove chan queueEdit
2018-03-07 02:15:44 +01:00
}
// New returns a DHT pointer. If config is nil, then config will be set to the default config.
func New(config *Config) *DHT {
2018-03-07 02:15:44 +01:00
if config == nil {
config = NewStandardConfig()
}
d := &DHT{
2018-07-26 22:05:27 +02:00
conf: config,
grp: stop.New(),
joined: make(chan struct{}),
announceAddRemove: make(chan queueEdit),
2018-03-07 02:15:44 +01:00
}
return d
}
func (dht *DHT) connect(conn UDPConn) error {
contact, err := getContact(dht.conf.NodeID, dht.conf.Address)
if err != nil {
return err
}
dht.contact = contact
dht.node = NewNode(contact.ID)
2018-06-21 19:40:22 +02:00
dht.tokenCache = newTokenCache(dht.node, tokenSecretRotationInterval)
return dht.node.Connect(conn)
}
// Start starts the dht
func (dht *DHT) Start() error {
listener, err := net.ListenPacket(Network, dht.conf.Address)
if err != nil {
return errors.WithStack(err)
}
conn := listener.(*net.UDPConn)
err = dht.connect(conn)
if err != nil {
return err
}
dht.join()
2018-06-21 19:40:22 +02:00
log.Infof("[%s] DHT ready on %s (%d nodes found during join)",
dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count())
2018-07-26 22:05:27 +02:00
dht.grp.Add(1)
go func() {
dht.runAnnouncer()
dht.grp.Done()
}()
2018-08-07 17:00:04 +02:00
if dht.conf.RPCPort > 0 {
dht.grp.Add(1)
go func() {
dht.runRPCServer(dht.conf.RPCPort)
dht.grp.Done()
}()
}
return nil
2018-03-07 02:15:44 +01:00
}
// join makes current node join the dht network.
func (dht *DHT) join() {
2018-04-25 00:12:17 +02:00
defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done
2018-06-21 19:40:22 +02:00
log.Infof("[%s] joining DHT network", dht.node.id.HexShort())
2018-04-25 00:12:17 +02:00
// ping nodes, which gets their real node IDs and adds them to the routing table
atLeastOneNodeResponded := false
2018-03-07 02:15:44 +01:00
for _, addr := range dht.conf.SeedNodes {
2018-04-25 00:12:17 +02:00
err := dht.Ping(addr)
2018-03-07 02:15:44 +01:00
if err != nil {
log.Error(errors.WithMessage(err, fmt.Sprintf("[%s] join", dht.node.id.HexShort())))
2018-04-25 00:12:17 +02:00
} else {
atLeastOneNodeResponded = true
2018-03-07 02:15:44 +01:00
}
2018-04-25 00:12:17 +02:00
}
2018-03-07 02:15:44 +01:00
2018-04-25 00:12:17 +02:00
if !atLeastOneNodeResponded {
log.Errorf("[%s] join: no nodes responded to initial ping", dht.node.id.HexShort())
return
2018-03-29 03:05:27 +02:00
}
2018-03-07 02:15:44 +01:00
2018-03-29 03:05:27 +02:00
// now call iterativeFind on yourself
_, _, err := FindContacts(dht.node, dht.node.id, false, dht.grp.Child())
2018-03-29 03:05:27 +02:00
if err != nil {
2018-04-03 18:14:04 +02:00
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
2018-03-07 02:15:44 +01:00
}
2018-05-24 19:05:05 +02:00
2018-06-13 18:45:47 +02:00
// TODO: after joining, refresh all buckets further away than our closest neighbor
2018-05-24 19:05:05 +02:00
// http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html#join
2018-03-07 02:15:44 +01:00
}
// WaitUntilJoined blocks until the node joins the network.
2018-04-05 17:35:57 +02:00
func (dht *DHT) WaitUntilJoined() {
if dht.joined == nil {
panic("dht not initialized")
}
<-dht.joined
2018-03-07 02:15:44 +01:00
}
2018-03-29 03:05:27 +02:00
// Shutdown shuts down the dht
func (dht *DHT) Shutdown() {
2018-04-03 18:14:04 +02:00
log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort())
dht.grp.StopAndWait()
2018-04-28 02:16:12 +02:00
dht.node.Shutdown()
2018-04-05 17:35:57 +02:00
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
2018-03-29 03:05:27 +02:00
}
// Ping pings a given address, creates a temporary contact for sending a message, and returns an error if communication
// fails.
2018-04-25 00:12:17 +02:00
func (dht *DHT) Ping(addr string) error {
raddr, err := net.ResolveUDPAddr(Network, addr)
2018-04-25 00:12:17 +02:00
if err != nil {
return err
}
2018-06-14 17:48:02 +02:00
tmpNode := Contact{ID: bits.Rand(), IP: raddr.IP, Port: raddr.Port}
res := dht.node.Send(tmpNode, Request{Method: pingMethod}, SendOptions{skipIDCheck: true})
2018-04-25 00:12:17 +02:00
if res == nil {
return errors.WithStack(errors.Newf("no response from node %s", addr))
2018-04-25 00:12:17 +02:00
}
return nil
}
// Get returns the list of nodes that have the blob for the given hash
2018-06-14 17:48:02 +02:00
func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) {
contacts, found, err := FindContacts(dht.node, hash, true, dht.grp.Child())
2018-03-29 03:05:27 +02:00
if err != nil {
return nil, err
2018-03-29 03:05:27 +02:00
}
2018-06-13 18:45:47 +02:00
if found {
return contacts, nil
}
return nil, nil
2018-03-29 03:05:27 +02:00
}
// PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well
// as current bucket information.
2018-04-05 17:35:57 +02:00
func (dht *DHT) PrintState() {
2018-04-28 02:16:12 +02:00
log.Printf("DHT node %s at %s", dht.contact.String(), time.Now().Format(time.RFC822Z))
log.Printf("Outstanding transactions: %d", dht.node.CountActiveTransactions())
log.Printf("Stored hashes: %d", dht.node.store.CountStoredHashes())
2018-04-05 17:35:57 +02:00
log.Printf("Buckets:")
2018-04-28 02:16:12 +02:00
for _, line := range strings.Split(dht.node.rt.BucketInfo(), "\n") {
2018-04-05 17:35:57 +02:00
log.Println(line)
}
}
2018-06-14 17:48:02 +02:00
func (dht DHT) ID() bits.Bitmap {
return dht.contact.ID
}
2018-04-28 02:16:12 +02:00
func getContact(nodeID, addr string) (Contact, error) {
var c Contact
if nodeID == "" {
2018-06-14 17:48:02 +02:00
c.ID = bits.Rand()
2018-04-28 02:16:12 +02:00
} else {
2018-06-14 17:48:02 +02:00
c.ID = bits.FromHexP(nodeID)
2018-04-28 02:16:12 +02:00
}
ip, port, err := net.SplitHostPort(addr)
if err != nil {
return c, errors.WithStack(err)
2018-04-28 02:16:12 +02:00
} else if ip == "" {
return c, errors.WithStack(errors.New("address does not contain an IP"))
2018-04-28 02:16:12 +02:00
} else if port == "" {
return c, errors.WithStack(errors.New("address does not contain a port"))
2018-04-28 02:16:12 +02:00
}
c.IP = net.ParseIP(ip)
if c.IP == nil {
return c, errors.WithStack(errors.New("invalid ip"))
2018-04-28 02:16:12 +02:00
}
c.Port, err = cast.ToIntE(port)
2018-04-28 02:16:12 +02:00
if err != nil {
return c, errors.WithStack(err)
2018-04-28 02:16:12 +02:00
}
return c, nil
}