more work

This commit is contained in:
Alex Grintsvayg 2017-08-17 11:46:51 -04:00
parent c0290497de
commit 7f3e4460fd
4 changed files with 121 additions and 49 deletions

View file

@ -109,7 +109,8 @@ func New(config *Config) *DHT {
// init initializes global variables. // init initializes global variables.
func (dht *DHT) init() { func (dht *DHT) init() {
log.Info("Initializing DHT") log.Info("Initializing DHT on " + dht.Address)
log.Infof("Node ID is %s", dht.node.HexID())
listener, err := net.ListenPacket(dht.Network, dht.Address) listener, err := net.ListenPacket(dht.Network, dht.Address)
if err != nil { if err != nil {
panic(err) panic(err)
@ -133,8 +134,9 @@ func (dht *DHT) join() {
continue continue
} }
// NOTE: Temporary node has NO node id.
dht.transactionManager.findNode( dht.transactionManager.findNode(
&node{id: dht.node.id, addr: raddr}, &node{addr: raddr},
dht.node.id.RawString(), dht.node.id.RawString(),
) )
} }
@ -155,27 +157,21 @@ func (dht *DHT) listen() {
}() }()
} }
// id returns a id near to target if target is not null, otherwise it returns // FindNode returns peers who have announced having key.
// the dht's node id. func (dht *DHT) FindNode(key string) ([]*Peer, error) {
func (dht *DHT) id(target string) string {
return dht.node.id.RawString()
}
// GetPeers returns peers who have announced having infoHash.
func (dht *DHT) GetPeers(infoHash string) ([]*Peer, error) {
if !dht.Ready { if !dht.Ready {
return nil, errors.New("dht not ready") return nil, errors.New("dht not ready")
} }
if len(infoHash) == 40 { if len(key) == nodeIDLength*2 {
data, err := hex.DecodeString(infoHash) data, err := hex.DecodeString(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
infoHash = string(data) key = string(data)
} }
peers := dht.peersManager.GetPeers(infoHash, dht.K) peers := dht.peersManager.GetPeers(key, dht.K)
if len(peers) != 0 { if len(peers) != 0 {
return peers, nil return peers, nil
} }
@ -183,17 +179,16 @@ func (dht *DHT) GetPeers(infoHash string) ([]*Peer, error) {
ch := make(chan struct{}) ch := make(chan struct{})
go func() { go func() {
//neighbors := dht.routingTable.GetNeighbors( neighbors := dht.routingTable.GetNeighbors(newBitmapFromString(key), dht.K)
// newBitmapFromString(infoHash), dht.K)
// for _, no := range neighbors {
//for _, no := range neighbors { dht.transactionManager.findNode(no, key)
// dht.transactionManager.getPeers(no, infoHash) }
//}
i := 0 i := 0
for range time.Tick(time.Second * 1) { for range time.Tick(time.Second * 1) {
i++ i++
peers = dht.peersManager.GetPeers(infoHash, dht.K) peers = dht.peersManager.GetPeers(key, dht.K)
if len(peers) != 0 || i >= 30 { if len(peers) != 0 || i >= 30 {
break break
} }

View file

@ -65,20 +65,38 @@ func (r Request) Encode() ([]byte, error) {
}) })
} }
type findNodeDatum struct {
ID string
IP string
Port int
}
type Response struct { type Response struct {
ID string ID string
NodeID string NodeID string
Response string Data string
FindNodeData []findNodeDatum
} }
func (r Response) GetID() string { return r.ID } func (r Response) GetID() string { return r.ID }
func (r Response) Encode() ([]byte, error) { func (r Response) Encode() ([]byte, error) {
return bencode.EncodeBytes(map[string]interface{}{ data := map[string]interface{}{
headerTypeField: responseType, headerTypeField: responseType,
headerMessageIDField: r.ID, headerMessageIDField: r.ID,
headerNodeIDField: r.NodeID, headerNodeIDField: r.NodeID,
headerPayloadField: r.Response, }
}) if r.Data != "" {
data[headerPayloadField] = r.Data
} else {
var nodes []interface{}
for _, n := range r.FindNodeData {
nodes = append(nodes, []interface{}{n.ID, n.IP, n.Port})
}
data[headerPayloadField] = nodes
}
log.Info("Response data is ")
spew.Dump(data)
return bencode.EncodeBytes(data)
} }
type Error struct { type Error struct {
@ -379,7 +397,7 @@ func (tm *transactionManager) sendQuery(no *node, request Request) {
} }
request.ID = tm.genTransID() request.ID = tm.genTransID()
request.NodeID = tm.dht.id(no.id.RawString()) request.NodeID = tm.dht.node.id.RawString()
tm.queryChan <- &query{node: no, request: request} tm.queryChan <- &query{node: no, request: request}
} }
@ -433,10 +451,18 @@ func handle(dht *DHT, pkt packet) {
case responseType: case responseType:
response := Response{ response := Response{
ID: data[headerMessageIDField].(string), ID: data[headerMessageIDField].(string),
NodeID: data[headerNodeIDField].(string), NodeID: data[headerNodeIDField].(string),
Response: data[headerPayloadField].(string),
} }
if reflect.TypeOf(data[headerPayloadField]).Kind() == reflect.String {
response.Data = data[headerPayloadField].(string)
} else {
response.FindNodeData = getFindNodeResponse(data[headerPayloadField])
}
spew.Dump(response)
handleResponse(dht, pkt.raddr, response) handleResponse(dht, pkt.raddr, response)
case errorType: case errorType:
@ -455,6 +481,38 @@ func handle(dht *DHT, pkt packet) {
}() }()
} }
func getFindNodeResponse(i interface{}) (data []findNodeDatum) {
if reflect.TypeOf(i).Kind() != reflect.Slice {
return
}
v := reflect.ValueOf(i)
for i := 0; i < v.Len(); i++ {
if v.Index(i).Kind() != reflect.Interface {
continue
}
contact := v.Index(i).Elem()
if contact.Type().Kind() != reflect.Slice || contact.Len() != 3 {
continue
}
if contact.Index(0).Elem().Kind() != reflect.String ||
contact.Index(1).Elem().Kind() != reflect.String ||
!(contact.Index(2).Elem().Kind() == reflect.Int64 ||
contact.Index(2).Elem().Kind() == reflect.Int) {
continue
}
data = append(data, findNodeDatum{
ID: contact.Index(0).Elem().String(),
IP: contact.Index(1).Elem().String(),
Port: int(contact.Index(2).Elem().Int()),
})
}
return
}
func getArgs(argsInt interface{}) (args []string) { func getArgs(argsInt interface{}) (args []string) {
if reflect.TypeOf(argsInt).Kind() == reflect.Slice { if reflect.TypeOf(argsInt).Kind() == reflect.Slice {
v := reflect.ValueOf(argsInt) v := reflect.ValueOf(argsInt)
@ -484,7 +542,7 @@ func handleRequest(dht *DHT, addr *net.UDPAddr, request Request) (success bool)
switch request.Method { switch request.Method {
case pingMethod: case pingMethod:
send(dht, addr, Response{ID: request.ID, NodeID: dht.node.id.RawString(), Response: "pong"}) send(dht, addr, Response{ID: request.ID, NodeID: dht.node.id.RawString(), Data: "pong"})
case findNodeMethod: case findNodeMethod:
if len(request.Args) < 1 { if len(request.Args) < 1 {
send(dht, addr, Error{ID: request.ID, NodeID: dht.node.id.RawString(), Response: []string{"No target"}}) send(dht, addr, Error{ID: request.ID, NodeID: dht.node.id.RawString(), Response: []string{"No target"}})
@ -497,17 +555,20 @@ func handleRequest(dht *DHT, addr *net.UDPAddr, request Request) (success bool)
return return
} }
var nodes string nodes := []findNodeDatum{}
targetID := newBitmapFromString(target) targetID := newBitmapFromString(target)
no, _ := dht.routingTable.GetNodeKBucktByID(targetID) no, _ := dht.routingTable.GetNodeKBucktByID(targetID)
if no != nil { if no != nil {
nodes = no.CompactNodeInfo() nodes = []findNodeDatum{{ID: no.id.RawString(), IP: no.addr.IP.String(), Port: no.addr.Port}}
} else { } else {
nodes = strings.Join(dht.routingTable.GetNeighborCompactInfos(targetID, dht.K), "") neighbors := dht.routingTable.GetNeighbors(targetID, dht.K)
for _, n := range neighbors {
nodes = append(nodes, findNodeDatum{ID: n.id.RawString(), IP: n.addr.IP.String(), Port: n.addr.Port})
}
} }
send(dht, addr, Response{ID: request.ID, NodeID: dht.node.id.RawString(), Response: nodes}) send(dht, addr, Response{ID: request.ID, NodeID: dht.node.id.RawString(), FindNodeData: nodes})
default: default:
// send(dht, addr, makeError(t, protocolError, "invalid q")) // send(dht, addr, makeError(t, protocolError, "invalid q"))
@ -522,15 +583,13 @@ func handleRequest(dht *DHT, addr *net.UDPAddr, request Request) (success bool)
// findOn puts nodes in the response to the routingTable, then if target is in // findOn puts nodes in the response to the routingTable, then if target is in
// the nodes or all nodes are in the routingTable, it stops. Otherwise it // the nodes or all nodes are in the routingTable, it stops. Otherwise it
// continues to findNode or getPeers. // continues to findNode or getPeers.
func findOn(dht *DHT, nodes string, target *bitmap, queryType string) error { func findOn(dht *DHT, nodes []findNodeDatum, target *bitmap, queryType string) error {
if len(nodes)%compactNodeInfoLength != 0 {
return fmt.Errorf("the length of nodes should can be divided by %d", compactNodeInfoLength)
}
hasNew, found := false, false hasNew, found := false, false
for i := 0; i < len(nodes)/compactNodeInfoLength; i++ { for _, n := range nodes {
no, _ := newNodeFromCompactInfo( no, err := newNode(n.ID, dht.Network, fmt.Sprintf("%s:%d", n.IP, n.Port))
string(nodes[i*compactNodeInfoLength:(i+1)*compactNodeInfoLength]), dht.Network) if err != nil {
return err
}
if no.id.RawString() == target.RawString() { if no.id.RawString() == target.RawString() {
found = true found = true
@ -581,7 +640,7 @@ func handleResponse(dht *DHT, addr *net.UDPAddr, response Response) (success boo
case pingMethod: case pingMethod:
case findNodeMethod: case findNodeMethod:
target := trans.request.Args[0] target := trans.request.Args[0]
if findOn(dht, response.Response, newBitmapFromString(target), findNodeMethod) != nil { if findOn(dht, response.FindNodeData, newBitmapFromString(target), findNodeMethod) != nil {
return return
} }
default: default:

View file

@ -2,7 +2,9 @@ package dht
import ( import (
"container/heap" "container/heap"
"encoding/hex"
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"net" "net"
"strings" "strings"
"sync" "sync"
@ -63,6 +65,13 @@ func (node *node) CompactNodeInfo() string {
}, "") }, "")
} }
func (node *node) HexID() string {
if node.id == nil {
return ""
}
return hex.EncodeToString([]byte(node.id.RawString()))
}
// Peer represents a peer contact. // Peer represents a peer contact.
type Peer struct { type Peer struct {
IP net.IP IP net.IP
@ -359,6 +368,8 @@ func (rt *routingTable) Insert(nd *node) bool {
rt.Lock() rt.Lock()
defer rt.Unlock() defer rt.Unlock()
log.Infof("Adding node to routing table: %s (%s:%d)", nd.id.RawString(), nd.addr.IP, nd.addr.Port)
var ( var (
next *routingTableNode next *routingTableNode
bucket *kbucket bucket *kbucket

15
main.go
View file

@ -2,15 +2,20 @@ package main
import ( import (
"fmt" "fmt"
"time"
"github.com/lbryio/lbry.go/dht" "github.com/lbryio/lbry.go/dht"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"math/rand"
"strconv"
"time"
) )
func main() { func main() {
rand.Seed(time.Now().UnixNano())
port := 49449 // + (rand.Int() % 10)
config := dht.NewStandardConfig() config := dht.NewStandardConfig()
config.Address = ":49449" // dont pollute real port config.Address = "127.0.0.1:" + strconv.Itoa(port)
config.PrimeNodes = []string{ config.PrimeNodes = []string{
"127.0.0.1:10001", "127.0.0.1:10001",
} }
@ -22,7 +27,7 @@ func main() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
for { for {
peers, err := d.GetPeers("012b66fc7052d9a0c8cb563b8ede7662003ba65f425c2661b5c6919d445deeb31469be8b842d6faeea3f2b3ebcaec845") peers, err := d.FindNode("012b66fc7052d9a0c8cb563b8ede7662003ba65f425c2661b5c6919d445deeb31469be8b842d6faeea3f2b3ebcaec845")
if err != nil { if err != nil {
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
continue continue
@ -31,4 +36,6 @@ func main() {
fmt.Println("Found peers:", peers) fmt.Println("Found peers:", peers)
break break
} }
time.Sleep(1 * time.Hour)
} }