diff --git a/dht/dht.go b/dht/dht.go index f8e328c..ee757d0 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -318,7 +318,7 @@ func (dht *DHT) startReannouncer() { func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) { // self-store if dht.contact.ID == c.ID { - dht.node.Store(hash, c) + dht.node.Store(hash, c, dht.conf.PeerProtocolPort) return } diff --git a/dht/node.go b/dht/node.go index 0c8f95a..c1577af 100644 --- a/dht/node.go +++ b/dht/node.go @@ -236,8 +236,11 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { // TODO: we should be sending the IP in the request, not just using the sender's IP // TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ??? if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) { - n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port}) - + n.Store( + request.StoreArgs.BlobHash, + Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: addr.Port}, + request.StoreArgs.Value.Port, + ) err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}) if err != nil { log.Error("error sending 'storemethod' response message - ", err) @@ -276,9 +279,9 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { if contacts := n.store.Get(*request.Arg); len(contacts) > 0 { res.FindValueKey = request.Arg.RawString() - res.Contacts = contacts + res.Contacts = contacts // we are returning stored contacts with tcp ports for file transfer } else { - res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize) + res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize) // these are normal dht contacts with udp ports } err := n.sendMessage(addr, res) @@ -464,6 +467,6 @@ func (n *Node) startRoutingTableGrooming() { } // Store stores a node contact in the node's contact store. -func (n *Node) Store(hash bits.Bitmap, c Contact) { - n.store.Upsert(hash, c) +func (n *Node) Store(hash bits.Bitmap, c Contact, tcpPort int) { + n.store.Upsert(hash, c, tcpPort) } diff --git a/dht/store.go b/dht/store.go index bc77e53..55f444b 100644 --- a/dht/store.go +++ b/dht/store.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/lbryio/reflector.go/dht/bits" + "net" ) // TODO: expire stored data after tExpire time @@ -11,6 +12,8 @@ import ( type contactStore struct { // map of blob hashes to (map of node IDs to bools) hashes map[bits.Bitmap]map[bits.Bitmap]bool + // map of blob hashes to (map of node ids to tcp ports) + ports map[bits.Bitmap]map[bits.Bitmap]int // stores the peers themselves, so they can be updated in one place contacts map[bits.Bitmap]Contact lock sync.RWMutex @@ -19,18 +22,23 @@ type contactStore struct { func newStore() *contactStore { return &contactStore{ hashes: make(map[bits.Bitmap]map[bits.Bitmap]bool), + ports: make(map[bits.Bitmap]map[bits.Bitmap]int), contacts: make(map[bits.Bitmap]Contact), } } -func (s *contactStore) Upsert(blobHash bits.Bitmap, contact Contact) { +func (s *contactStore) Upsert(blobHash bits.Bitmap, contact Contact, tcpPort int) { s.lock.Lock() defer s.lock.Unlock() if _, ok := s.hashes[blobHash]; !ok { s.hashes[blobHash] = make(map[bits.Bitmap]bool) } + if _, ok := s.ports[blobHash]; !ok { + s.ports[blobHash] = make(map[bits.Bitmap]int) + } s.hashes[blobHash][contact.ID] = true + s.ports[blobHash][contact.ID] = tcpPort s.contacts[contact.ID] = contact } @@ -45,7 +53,11 @@ func (s *contactStore) Get(blobHash bits.Bitmap) []Contact { if !ok { panic("node id in IDs list, but not in nodeInfo") } - contacts = append(contacts, contact) + peerPort, ok := s.ports[blobHash][id] + if !ok { + panic("node id in IDs list, but missing peer port") + } + contacts = append(contacts, Contact{ID: contact.ID, IP: contact.IP, Port: peerPort}) } } return contacts