diff --git a/cmd/dht.go b/cmd/dht.go index 1043bdb..db28697 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -1,6 +1,8 @@ package cmd import ( + "encoding/json" + "io/ioutil" "math/rand" "os" "os/signal" @@ -11,6 +13,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lbryio/reflector.go/dht" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -24,6 +27,44 @@ func init() { } func dhtCmd(cmd *cobra.Command, args []string) { + type rtData struct { + BlobHashes []string `json:"blob_hashes"` + Buckets map[string][]struct { + Address string `json:"address"` + Blobs []string `json:"blobs"` + NodeID string `json:"node_id"` + } `json:"buckets"` + Contacts []string `json:"contacts"` + NodeID string `json:"node_id"` + } + + bytes, err := ioutil.ReadAll(os.Stdin) + checkErr(err) + + var data rtData + err = json.Unmarshal(bytes, &data) + checkErr(err) + + spew.Dump(data) + + d, err := dht.New(nil) + checkErr(err) + err = d.Start() + checkErr(err) + + for _, nodes := range data.Buckets { + for _, node := range nodes { + err = d.Ping(node.Address + ":4444") + if err != nil { + log.Errorf("no response from %s", node.Address) + } + } + } + + d.Shutdown() + + return + rand.Seed(time.Now().UnixNano()) //d, err := dht.New(&dht.Config{ diff --git a/dht/dht.go b/dht/dht.go index cd4a011..226fa46 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -2,6 +2,7 @@ package dht import ( "context" + "fmt" "net" "strconv" "strings" @@ -215,29 +216,31 @@ func (dht *DHT) listen() { // join makes current node join the dht network. func (dht *DHT) join() { - log.Debugf("[%s] joining network", dht.node.id.HexShort()) - // get real node IDs and add them to the routing table - for _, addr := range dht.conf.SeedNodes { - raddr, err := net.ResolveUDPAddr(network, addr) - if err != nil { - log.Errorln(err) - continue - } + defer close(dht.joined) // if anyone's waiting for join to finish, they'll know its done - tmpNode := Node{id: RandomBitmapP(), ip: raddr.IP, port: raddr.Port} - res := dht.tm.Send(tmpNode, Request{Method: pingMethod}) - if res == nil { - log.Errorf("[%s] join: no response from seed node %s", dht.node.id.HexShort(), addr) + log.Debugf("[%s] joining network", dht.node.id.HexShort()) + + // ping nodes, which gets their real node IDs and adds them to the routing table + atLeastOneNodeResponded := false + for _, addr := range dht.conf.SeedNodes { + err := dht.Ping(addr) + if err != nil { + log.Error(errors.Prefix(fmt.Sprintf("[%s] join", dht.node.id.HexShort()), err)) + } else { + atLeastOneNodeResponded = true } } + if !atLeastOneNodeResponded { + log.Errorf("[%s] join: no nodes responded to initial ping", dht.node.id.HexShort()) + return + } + // now call iterativeFind on yourself _, err := dht.Get(dht.node.id) if err != nil { log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) } - - close(dht.joined) // if anyone's waiting for join to finish, they'll know its done } func (dht *DHT) runHandler() { @@ -257,11 +260,10 @@ func (dht *DHT) runHandler() { } // Start starts the dht -func (dht *DHT) Start() { +func (dht *DHT) Start() error { err := dht.init() if err != nil { - log.Error(err) - return + return err } go dht.listen() @@ -269,6 +271,7 @@ func (dht *DHT) Start() { dht.join() log.Debugf("[%s] DHT ready on %s", dht.node.id.HexShort(), dht.node.Addr().String()) + return nil } func (dht *DHT) WaitUntilJoined() { @@ -288,6 +291,22 @@ func (dht *DHT) Shutdown() { log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) } +// Get returns the list of nodes that have the blob for the given hash +func (dht *DHT) Ping(addr string) error { + raddr, err := net.ResolveUDPAddr(network, addr) + if err != nil { + return err + } + + tmpNode := Node{id: RandomBitmapP(), ip: raddr.IP, port: raddr.Port} + res := dht.tm.Send(tmpNode, Request{Method: pingMethod}) + if res == nil { + return errors.Err("no response from node %s", addr) + } + + return nil +} + // Get returns the list of nodes that have the blob for the given hash func (dht *DHT) Get(hash Bitmap) ([]Node, error) { nf := newNodeFinder(dht, hash, true) diff --git a/dht/dht_test.go b/dht/dht_test.go index 7664920..e302926 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -10,6 +10,8 @@ import ( log "github.com/sirupsen/logrus" ) +// TODO: make a dht with X nodes, have them all join, then ensure that every node appears at least once in another node's routing table + func TestNodeFinder_FindNodes(t *testing.T) { dhts := MakeTestDHT(3) defer func() { diff --git a/dht/routing_table.go b/dht/routing_table.go index b8a0979..da8fb3a 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -162,6 +162,7 @@ func bucketContents(b *list.List) string { } } +// Update inserts or refreshes a node func (rt *routingTable) Update(node Node) { rt.lock.Lock() defer rt.lock.Unlock() @@ -179,6 +180,18 @@ func (rt *routingTable) Update(node Node) { } } +// UpdateIfExists refreshes a node if its already in the routing table +func (rt *routingTable) UpdateIfExists(node Node) { + rt.lock.Lock() + defer rt.lock.Unlock() + bucketNum := bucketFor(rt.node.id, node.id) + bucket := rt.buckets[bucketNum] + element := findInList(bucket, node.id) + if element != nil { + bucket.MoveToBack(element) + } +} + func (rt *routingTable) RemoveByID(id Bitmap) { rt.lock.Lock() defer rt.lock.Unlock() diff --git a/dht/rpc.go b/dht/rpc.go index a025311..b0f463d 100644 --- a/dht/rpc.go +++ b/dht/rpc.go @@ -115,8 +115,12 @@ func handleRequest(dht *DHT, addr *net.UDPAddr, request Request) { } } + // nodes that send us requests should not be inserted, only refreshed. + // the routing table must only contain "good" nodes, which are nodes that reply to our requests + // if a node is already good (aka in the table), its fine to refresh it + // http://www.bittorrent.org/beps/bep_0005.html#routing-table node := Node{id: request.NodeID, ip: addr.IP, port: addr.Port} - dht.rt.Update(node) + dht.rt.UpdateIfExists(node) } func getFindResponse(dht *DHT, request Request) Response { @@ -147,7 +151,7 @@ func handleResponse(dht *DHT, addr *net.UDPAddr, response Response) { func handleError(dht *DHT, addr *net.UDPAddr, e Error) { spew.Dump(e) node := Node{id: e.NodeID, ip: addr.IP, port: addr.Port} - dht.rt.Update(node) + dht.rt.UpdateIfExists(node) } // send sends data to a udp address diff --git a/dht/transaction_manager.go b/dht/transaction_manager.go index 8e85905..4141bbf 100644 --- a/dht/transaction_manager.go +++ b/dht/transaction_manager.go @@ -52,6 +52,8 @@ func (tm *transactionManager) Find(id messageID, addr *net.UDPAddr) *transaction tm.lock.RLock() defer tm.lock.RUnlock() + // TODO: also check that the response's nodeid matches the id you thought you sent to? + t, ok := tm.transactions[id] if !ok || (addr != nil && t.node.Addr().String() != addr.String()) { return nil