fix rt updates, add Ping method
This commit is contained in:
parent
3419e396d1
commit
9979a70c61
6 changed files with 100 additions and 19 deletions
41
cmd/dht.go
41
cmd/dht.go
|
@ -1,6 +1,8 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
@ -11,6 +13,7 @@ import (
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/lbryio/reflector.go/dht"
|
"github.com/lbryio/reflector.go/dht"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,6 +27,44 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func dhtCmd(cmd *cobra.Command, args []string) {
|
func dhtCmd(cmd *cobra.Command, args []string) {
|
||||||
|
type rtData struct {
|
||||||
|
BlobHashes []string `json:"blob_hashes"`
|
||||||
|
Buckets map[string][]struct {
|
||||||
|
Address string `json:"address"`
|
||||||
|
Blobs []string `json:"blobs"`
|
||||||
|
NodeID string `json:"node_id"`
|
||||||
|
} `json:"buckets"`
|
||||||
|
Contacts []string `json:"contacts"`
|
||||||
|
NodeID string `json:"node_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes, err := ioutil.ReadAll(os.Stdin)
|
||||||
|
checkErr(err)
|
||||||
|
|
||||||
|
var data rtData
|
||||||
|
err = json.Unmarshal(bytes, &data)
|
||||||
|
checkErr(err)
|
||||||
|
|
||||||
|
spew.Dump(data)
|
||||||
|
|
||||||
|
d, err := dht.New(nil)
|
||||||
|
checkErr(err)
|
||||||
|
err = d.Start()
|
||||||
|
checkErr(err)
|
||||||
|
|
||||||
|
for _, nodes := range data.Buckets {
|
||||||
|
for _, node := range nodes {
|
||||||
|
err = d.Ping(node.Address + ":4444")
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("no response from %s", node.Address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
d.Shutdown()
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
//d, err := dht.New(&dht.Config{
|
//d, err := dht.New(&dht.Config{
|
||||||
|
|
53
dht/dht.go
53
dht/dht.go
|
@ -2,6 +2,7 @@ package dht
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -215,29 +216,31 @@ func (dht *DHT) listen() {
|
||||||
|
|
||||||
// join makes current node join the dht network.
|
// join makes current node join the dht network.
|
||||||
func (dht *DHT) join() {
|
func (dht *DHT) join() {
|
||||||
log.Debugf("[%s] joining network", dht.node.id.HexShort())
|
defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done
|
||||||
// get real node IDs and add them to the routing table
|
|
||||||
for _, addr := range dht.conf.SeedNodes {
|
|
||||||
raddr, err := net.ResolveUDPAddr(network, addr)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorln(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tmpNode := Node{id: RandomBitmapP(), ip: raddr.IP, port: raddr.Port}
|
log.Debugf("[%s] joining network", dht.node.id.HexShort())
|
||||||
res := dht.tm.Send(tmpNode, Request{Method: pingMethod})
|
|
||||||
if res == nil {
|
// ping nodes, which gets their real node IDs and adds them to the routing table
|
||||||
log.Errorf("[%s] join: no response from seed node %s", dht.node.id.HexShort(), addr)
|
atLeastOneNodeResponded := false
|
||||||
|
for _, addr := range dht.conf.SeedNodes {
|
||||||
|
err := dht.Ping(addr)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(errors.Prefix(fmt.Sprintf("[%s] join", dht.node.id.HexShort()), err))
|
||||||
|
} else {
|
||||||
|
atLeastOneNodeResponded = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !atLeastOneNodeResponded {
|
||||||
|
log.Errorf("[%s] join: no nodes responded to initial ping", dht.node.id.HexShort())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// now call iterativeFind on yourself
|
// now call iterativeFind on yourself
|
||||||
_, err := dht.Get(dht.node.id)
|
_, err := dht.Get(dht.node.id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
|
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
close(dht.joined) // if anyone's waiting for join to finish, they'll know its done
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *DHT) runHandler() {
|
func (dht *DHT) runHandler() {
|
||||||
|
@ -257,11 +260,10 @@ func (dht *DHT) runHandler() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the dht
|
// Start starts the dht
|
||||||
func (dht *DHT) Start() {
|
func (dht *DHT) Start() error {
|
||||||
err := dht.init()
|
err := dht.init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go dht.listen()
|
go dht.listen()
|
||||||
|
@ -269,6 +271,7 @@ func (dht *DHT) Start() {
|
||||||
|
|
||||||
dht.join()
|
dht.join()
|
||||||
log.Debugf("[%s] DHT ready on %s", dht.node.id.HexShort(), dht.node.Addr().String())
|
log.Debugf("[%s] DHT ready on %s", dht.node.id.HexShort(), dht.node.Addr().String())
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *DHT) WaitUntilJoined() {
|
func (dht *DHT) WaitUntilJoined() {
|
||||||
|
@ -288,6 +291,22 @@ func (dht *DHT) Shutdown() {
|
||||||
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
|
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns the list of nodes that have the blob for the given hash
|
||||||
|
func (dht *DHT) Ping(addr string) error {
|
||||||
|
raddr, err := net.ResolveUDPAddr(network, addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpNode := Node{id: RandomBitmapP(), ip: raddr.IP, port: raddr.Port}
|
||||||
|
res := dht.tm.Send(tmpNode, Request{Method: pingMethod})
|
||||||
|
if res == nil {
|
||||||
|
return errors.Err("no response from node %s", addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns the list of nodes that have the blob for the given hash
|
// Get returns the list of nodes that have the blob for the given hash
|
||||||
func (dht *DHT) Get(hash Bitmap) ([]Node, error) {
|
func (dht *DHT) Get(hash Bitmap) ([]Node, error) {
|
||||||
nf := newNodeFinder(dht, hash, true)
|
nf := newNodeFinder(dht, hash, true)
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TODO: make a dht with X nodes, have them all join, then ensure that every node appears at least once in another node's routing table
|
||||||
|
|
||||||
func TestNodeFinder_FindNodes(t *testing.T) {
|
func TestNodeFinder_FindNodes(t *testing.T) {
|
||||||
dhts := MakeTestDHT(3)
|
dhts := MakeTestDHT(3)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -162,6 +162,7 @@ func bucketContents(b *list.List) string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update inserts or refreshes a node
|
||||||
func (rt *routingTable) Update(node Node) {
|
func (rt *routingTable) Update(node Node) {
|
||||||
rt.lock.Lock()
|
rt.lock.Lock()
|
||||||
defer rt.lock.Unlock()
|
defer rt.lock.Unlock()
|
||||||
|
@ -179,6 +180,18 @@ func (rt *routingTable) Update(node Node) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateIfExists refreshes a node if its already in the routing table
|
||||||
|
func (rt *routingTable) UpdateIfExists(node Node) {
|
||||||
|
rt.lock.Lock()
|
||||||
|
defer rt.lock.Unlock()
|
||||||
|
bucketNum := bucketFor(rt.node.id, node.id)
|
||||||
|
bucket := rt.buckets[bucketNum]
|
||||||
|
element := findInList(bucket, node.id)
|
||||||
|
if element != nil {
|
||||||
|
bucket.MoveToBack(element)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (rt *routingTable) RemoveByID(id Bitmap) {
|
func (rt *routingTable) RemoveByID(id Bitmap) {
|
||||||
rt.lock.Lock()
|
rt.lock.Lock()
|
||||||
defer rt.lock.Unlock()
|
defer rt.lock.Unlock()
|
||||||
|
|
|
@ -115,8 +115,12 @@ func handleRequest(dht *DHT, addr *net.UDPAddr, request Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nodes that send us requests should not be inserted, only refreshed.
|
||||||
|
// the routing table must only contain "good" nodes, which are nodes that reply to our requests
|
||||||
|
// if a node is already good (aka in the table), its fine to refresh it
|
||||||
|
// http://www.bittorrent.org/beps/bep_0005.html#routing-table
|
||||||
node := Node{id: request.NodeID, ip: addr.IP, port: addr.Port}
|
node := Node{id: request.NodeID, ip: addr.IP, port: addr.Port}
|
||||||
dht.rt.Update(node)
|
dht.rt.UpdateIfExists(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getFindResponse(dht *DHT, request Request) Response {
|
func getFindResponse(dht *DHT, request Request) Response {
|
||||||
|
@ -147,7 +151,7 @@ func handleResponse(dht *DHT, addr *net.UDPAddr, response Response) {
|
||||||
func handleError(dht *DHT, addr *net.UDPAddr, e Error) {
|
func handleError(dht *DHT, addr *net.UDPAddr, e Error) {
|
||||||
spew.Dump(e)
|
spew.Dump(e)
|
||||||
node := Node{id: e.NodeID, ip: addr.IP, port: addr.Port}
|
node := Node{id: e.NodeID, ip: addr.IP, port: addr.Port}
|
||||||
dht.rt.Update(node)
|
dht.rt.UpdateIfExists(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// send sends data to a udp address
|
// send sends data to a udp address
|
||||||
|
|
|
@ -52,6 +52,8 @@ func (tm *transactionManager) Find(id messageID, addr *net.UDPAddr) *transaction
|
||||||
tm.lock.RLock()
|
tm.lock.RLock()
|
||||||
defer tm.lock.RUnlock()
|
defer tm.lock.RUnlock()
|
||||||
|
|
||||||
|
// TODO: also check that the response's nodeid matches the id you thought you sent to?
|
||||||
|
|
||||||
t, ok := tm.transactions[id]
|
t, ok := tm.transactions[id]
|
||||||
if !ok || (addr != nil && t.node.Addr().String() != addr.String()) {
|
if !ok || (addr != nil && t.node.Addr().String() != addr.String()) {
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue