add tcp port mapping to data store
This commit is contained in:
parent
a3d0a3543a
commit
a98d10fbd5
3 changed files with 24 additions and 9 deletions
|
@ -318,7 +318,7 @@ func (dht *DHT) startReannouncer() {
|
||||||
func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
|
func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
|
||||||
// self-store
|
// self-store
|
||||||
if dht.contact.ID == c.ID {
|
if dht.contact.ID == c.ID {
|
||||||
dht.node.Store(hash, c)
|
dht.node.Store(hash, c, dht.conf.PeerProtocolPort)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
15
dht/node.go
15
dht/node.go
|
@ -236,8 +236,11 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
|
||||||
// TODO: we should be sending the IP in the request, not just using the sender's IP
|
// TODO: we should be sending the IP in the request, not just using the sender's IP
|
||||||
// TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ???
|
// TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ???
|
||||||
if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) {
|
if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) {
|
||||||
n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port})
|
n.Store(
|
||||||
|
request.StoreArgs.BlobHash,
|
||||||
|
Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: addr.Port},
|
||||||
|
request.StoreArgs.Value.Port,
|
||||||
|
)
|
||||||
err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse})
|
err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("error sending 'storemethod' response message - ", err)
|
log.Error("error sending 'storemethod' response message - ", err)
|
||||||
|
@ -276,9 +279,9 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
|
||||||
|
|
||||||
if contacts := n.store.Get(*request.Arg); len(contacts) > 0 {
|
if contacts := n.store.Get(*request.Arg); len(contacts) > 0 {
|
||||||
res.FindValueKey = request.Arg.RawString()
|
res.FindValueKey = request.Arg.RawString()
|
||||||
res.Contacts = contacts
|
res.Contacts = contacts // we are returning stored contacts with tcp ports for file transfer
|
||||||
} else {
|
} else {
|
||||||
res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize)
|
res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize) // these are normal dht contacts with udp ports
|
||||||
}
|
}
|
||||||
|
|
||||||
err := n.sendMessage(addr, res)
|
err := n.sendMessage(addr, res)
|
||||||
|
@ -464,6 +467,6 @@ func (n *Node) startRoutingTableGrooming() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store stores a node contact in the node's contact store.
|
// Store stores a node contact in the node's contact store.
|
||||||
func (n *Node) Store(hash bits.Bitmap, c Contact) {
|
func (n *Node) Store(hash bits.Bitmap, c Contact, tcpPort int) {
|
||||||
n.store.Upsert(hash, c)
|
n.store.Upsert(hash, c, tcpPort)
|
||||||
}
|
}
|
||||||
|
|
16
dht/store.go
16
dht/store.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/dht/bits"
|
"github.com/lbryio/reflector.go/dht/bits"
|
||||||
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: expire stored data after tExpire time
|
// TODO: expire stored data after tExpire time
|
||||||
|
@ -11,6 +12,8 @@ import (
|
||||||
type contactStore struct {
|
type contactStore struct {
|
||||||
// map of blob hashes to (map of node IDs to bools)
|
// map of blob hashes to (map of node IDs to bools)
|
||||||
hashes map[bits.Bitmap]map[bits.Bitmap]bool
|
hashes map[bits.Bitmap]map[bits.Bitmap]bool
|
||||||
|
// map of blob hashes to (map of node ids to tcp ports)
|
||||||
|
ports map[bits.Bitmap]map[bits.Bitmap]int
|
||||||
// stores the peers themselves, so they can be updated in one place
|
// stores the peers themselves, so they can be updated in one place
|
||||||
contacts map[bits.Bitmap]Contact
|
contacts map[bits.Bitmap]Contact
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
@ -19,18 +22,23 @@ type contactStore struct {
|
||||||
func newStore() *contactStore {
|
func newStore() *contactStore {
|
||||||
return &contactStore{
|
return &contactStore{
|
||||||
hashes: make(map[bits.Bitmap]map[bits.Bitmap]bool),
|
hashes: make(map[bits.Bitmap]map[bits.Bitmap]bool),
|
||||||
|
ports: make(map[bits.Bitmap]map[bits.Bitmap]int),
|
||||||
contacts: make(map[bits.Bitmap]Contact),
|
contacts: make(map[bits.Bitmap]Contact),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *contactStore) Upsert(blobHash bits.Bitmap, contact Contact) {
|
func (s *contactStore) Upsert(blobHash bits.Bitmap, contact Contact, tcpPort int) {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
if _, ok := s.hashes[blobHash]; !ok {
|
if _, ok := s.hashes[blobHash]; !ok {
|
||||||
s.hashes[blobHash] = make(map[bits.Bitmap]bool)
|
s.hashes[blobHash] = make(map[bits.Bitmap]bool)
|
||||||
}
|
}
|
||||||
|
if _, ok := s.ports[blobHash]; !ok {
|
||||||
|
s.ports[blobHash] = make(map[bits.Bitmap]int)
|
||||||
|
}
|
||||||
s.hashes[blobHash][contact.ID] = true
|
s.hashes[blobHash][contact.ID] = true
|
||||||
|
s.ports[blobHash][contact.ID] = tcpPort
|
||||||
s.contacts[contact.ID] = contact
|
s.contacts[contact.ID] = contact
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +53,11 @@ func (s *contactStore) Get(blobHash bits.Bitmap) []Contact {
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("node id in IDs list, but not in nodeInfo")
|
panic("node id in IDs list, but not in nodeInfo")
|
||||||
}
|
}
|
||||||
contacts = append(contacts, contact)
|
peerPort, ok := s.ports[blobHash][id]
|
||||||
|
if !ok {
|
||||||
|
panic("node id in IDs list, but missing peer port")
|
||||||
|
}
|
||||||
|
contacts = append(contacts, Contact{ID: contact.ID, IP: contact.IP, Port: peerPort})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return contacts
|
return contacts
|
||||||
|
|
Loading…
Reference in a new issue