minor refactor

This commit is contained in:
Alex Grintsvayg 2018-05-01 16:18:38 -04:00
parent 079a6bf610
commit 1f7841e4d0
8 changed files with 98 additions and 91 deletions

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -21,26 +20,31 @@ func init() {
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
} }
const network = "udp4" const (
network = "udp4"
const alpha = 3 // this is the constant alpha in the spec // TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config
const nodeIDLength = 48 // bytes. this is the constant B in the spec // TODO: alternatively, have a global Config for constants. at least that way tests can modify the values
const messageIDLength = 20 // bytes. alpha = 3 // this is the constant alpha in the spec
const bucketSize = 8 // this is the constant k in the spec bucketSize = 8 // this is the constant k in the spec
nodeIDLength = 48 // bytes. this is the constant B in the spec
messageIDLength = 20 // bytes.
const udpRetry = 3 udpRetry = 3
const udpTimeout = 10 * time.Second udpTimeout = 5 * time.Second
const udpMaxMessageLength = 1024 // I think our longest message is ~676 bytes, so I rounded up udpMaxMessageLength = 1024 // bytes. I think our longest message is ~676 bytes, so I rounded up
const tExpire = 86400 * time.Second // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date tExpire = 24 * time.Hour // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
const tRefresh = 3600 * time.Second // the time after which an otherwise unaccessed bucket must be refreshed tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed
const tReplicate = 3600 * time.Second // the interval between Kademlia replication events, when a node is required to publish its entire database tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database
const tRepublish = 86400 * time.Second // the time after which the original publisher must republish a key/value pair tRepublish = 24 * time.Hour // the time after which the original publisher must republish a key/value pair
tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us
const numBuckets = nodeIDLength * 8 numBuckets = nodeIDLength * 8
const compactNodeInfoLength = nodeIDLength + 6 compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port
const tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated
)
// Config represents the configure of dht. // Config represents the configure of dht.
type Config struct { type Config struct {
@ -268,34 +272,6 @@ func printNodeList(list []Contact) {
} }
} }
func MakeTestDHT(numNodes int) []*DHT {
if numNodes < 1 {
return nil
}
ip := "127.0.0.1"
firstPort := 21000
dhts := make([]*DHT, numNodes)
for i := 0; i < numNodes; i++ {
seeds := []string{}
if i > 0 {
seeds = []string{ip + ":" + strconv.Itoa(firstPort)}
}
dht, err := New(&Config{Address: ip + ":" + strconv.Itoa(firstPort+i), NodeID: RandomBitmapP().Hex(), SeedNodes: seeds})
if err != nil {
panic(err)
}
go dht.Start()
dht.WaitUntilJoined()
dhts[i] = dht
}
return dhts
}
func getContact(nodeID, addr string) (Contact, error) { func getContact(nodeID, addr string) (Contact, error) {
var c Contact var c Contact
if nodeID == "" { if nodeID == "" {

View file

@ -13,7 +13,7 @@ import (
// TODO: make a dht with X nodes, have them all join, then ensure that every node appears at least once in another node's routing table // TODO: make a dht with X nodes, have them all join, then ensure that every node appears at least once in another node's routing table
func TestNodeFinder_FindNodes(t *testing.T) { func TestNodeFinder_FindNodes(t *testing.T) {
dhts := MakeTestDHT(3) dhts := TestingCreateDHT(3)
defer func() { defer func() {
for i := range dhts { for i := range dhts {
dhts[i].Shutdown() dhts[i].Shutdown()
@ -56,7 +56,7 @@ func TestNodeFinder_FindNodes(t *testing.T) {
} }
func TestNodeFinder_FindValue(t *testing.T) { func TestNodeFinder_FindValue(t *testing.T) {
dhts := MakeTestDHT(3) dhts := TestingCreateDHT(3)
defer func() { defer func() {
for i := range dhts { for i := range dhts {
dhts[i].Shutdown() dhts[i].Shutdown()
@ -91,7 +91,7 @@ func TestDHT_LargeDHT(t *testing.T) {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
log.Println("if this takes longer than 20 seconds, its stuck. idk why it gets stuck sometimes, but its a bug.") log.Println("if this takes longer than 20 seconds, its stuck. idk why it gets stuck sometimes, but its a bug.")
nodes := 100 nodes := 100
dhts := MakeTestDHT(nodes) dhts := TestingCreateDHT(nodes)
defer func() { defer func() {
for _, d := range dhts { for _, d := range dhts {
go d.Shutdown() go d.Shutdown()

View file

@ -313,7 +313,7 @@ func (r *Response) UnmarshalBencode(b []byte) error {
return nil return nil
} }
// maybe data is a list of nodes (response to findNode)? // maybe data is a list of contacts (response to findNode)?
err = bencode.DecodeBytes(raw.Data, &r.Contacts) err = bencode.DecodeBytes(raw.Data, &r.Contacts)
if err == nil { if err == nil {
return nil return nil

View file

@ -33,13 +33,14 @@ type UDPConn interface {
} }
type Node struct { type Node struct {
// TODO: replace Contact with id. ip and port aren't used except when connecting // the node's id
id Bitmap id Bitmap
// UDP connection for sending and receiving data // UDP connection for sending and receiving data
conn UDPConn conn UDPConn
// token manager // token manager
tokens *tokenManager tokens *tokenManager
// map of outstanding transactions + mutex
txLock *sync.RWMutex txLock *sync.RWMutex
transactions map[messageID]*transaction transactions map[messageID]*transaction
@ -215,22 +216,22 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
log.Errorln("invalid request method") log.Errorln("invalid request method")
return return
case pingMethod: case pingMethod:
n.send(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}) n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse})
case storeMethod: case storeMethod:
// TODO: we should be sending the IP in the request, not just using the sender's IP // TODO: we should be sending the IP in the request, not just using the sender's IP
// TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ??? // TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ???
if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) { if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) {
n.store.Upsert(request.StoreArgs.BlobHash, Contact{id: request.StoreArgs.NodeID, ip: addr.IP, port: request.StoreArgs.Value.Port}) n.store.Upsert(request.StoreArgs.BlobHash, Contact{id: request.StoreArgs.NodeID, ip: addr.IP, port: request.StoreArgs.Value.Port})
n.send(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}) n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse})
} else { } else {
n.send(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}) n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"})
} }
case findNodeMethod: case findNodeMethod:
if request.Arg == nil { if request.Arg == nil {
log.Errorln("request is missing arg") log.Errorln("request is missing arg")
return return
} }
n.send(addr, Response{ n.sendMessage(addr, Response{
ID: request.ID, ID: request.ID,
NodeID: n.id, NodeID: n.id,
Contacts: n.rt.GetClosest(*request.Arg, bucketSize), Contacts: n.rt.GetClosest(*request.Arg, bucketSize),
@ -255,7 +256,7 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize) res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize)
} }
n.send(addr, res) n.sendMessage(addr, res)
} }
// nodes that send us requests should not be inserted, only refreshed. // nodes that send us requests should not be inserted, only refreshed.
@ -282,7 +283,7 @@ func (n *Node) handleError(addr *net.UDPAddr, e Error) {
} }
// send sends data to a udp address // send sends data to a udp address
func (n *Node) send(addr *net.UDPAddr, data Message) error { func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error {
encoded, err := bencode.EncodeBytes(data) encoded, err := bencode.EncodeBytes(data)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -365,11 +366,11 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request) <-ch
defer n.txDelete(tx.req.ID) defer n.txDelete(tx.req.ID)
for i := 0; i < udpRetry; i++ { for i := 0; i < udpRetry; i++ {
if err := n.send(contact.Addr(), tx.req); err != nil { if err := n.sendMessage(contact.Addr(), tx.req); err != nil {
if !strings.Contains(err.Error(), "use of closed network connection") { // this only happens on localhost. real UDP has no connections if !strings.Contains(err.Error(), "use of closed network connection") { // this only happens on localhost. real UDP has no connections
log.Error("send error: ", err) log.Error("send error: ", err)
} }
continue // try again? return? continue
} }
select { select {
@ -383,7 +384,7 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request) <-ch
} }
// if request timed out each time // if request timed out each time
n.rt.RemoveByID(tx.contact.id) n.rt.Remove(tx.contact.id)
}() }()
return ch return ch

View file

@ -23,6 +23,8 @@ func (t timeoutErr) Temporary() bool {
return true return true
} }
// TODO: just use a normal net.Conn instead of this mock conn
type testUDPPacket struct { type testUDPPacket struct {
data []byte data []byte
addr *net.UDPAddr addr *net.UDPAddr

View file

@ -165,7 +165,7 @@ func bucketContents(b *list.List) string {
func (rt *routingTable) Update(c Contact) { func (rt *routingTable) Update(c Contact) {
rt.lock.Lock() rt.lock.Lock()
defer rt.lock.Unlock() defer rt.lock.Unlock()
bucketNum := bucketFor(rt.id, c.id) bucketNum := rt.bucketFor(c.id)
bucket := rt.buckets[bucketNum] bucket := rt.buckets[bucketNum]
element := findInList(bucket, c.id) element := findInList(bucket, c.id)
if element == nil { if element == nil {
@ -183,7 +183,7 @@ func (rt *routingTable) Update(c Contact) {
func (rt *routingTable) UpdateIfExists(c Contact) { func (rt *routingTable) UpdateIfExists(c Contact) {
rt.lock.Lock() rt.lock.Lock()
defer rt.lock.Unlock() defer rt.lock.Unlock()
bucketNum := bucketFor(rt.id, c.id) bucketNum := rt.bucketFor(c.id)
bucket := rt.buckets[bucketNum] bucket := rt.buckets[bucketNum]
element := findInList(bucket, c.id) element := findInList(bucket, c.id)
if element != nil { if element != nil {
@ -191,10 +191,10 @@ func (rt *routingTable) UpdateIfExists(c Contact) {
} }
} }
func (rt *routingTable) RemoveByID(id Bitmap) { func (rt *routingTable) Remove(id Bitmap) {
rt.lock.Lock() rt.lock.Lock()
defer rt.lock.Unlock() defer rt.lock.Unlock()
bucketNum := bucketFor(rt.id, id) bucketNum := rt.bucketFor(id)
bucket := rt.buckets[bucketNum] bucket := rt.buckets[bucketNum]
element := findInList(bucket, rt.id) element := findInList(bucket, rt.id)
if element != nil { if element != nil {
@ -212,7 +212,7 @@ func (rt *routingTable) GetClosest(target Bitmap, limit int) []Contact {
if rt.id.Equals(target) { if rt.id.Equals(target) {
bucketNum = 0 bucketNum = 0
} else { } else {
bucketNum = bucketFor(rt.id, target) bucketNum = rt.bucketFor(target)
} }
bucket := rt.buckets[bucketNum] bucket := rt.buckets[bucketNum]
@ -242,6 +242,14 @@ func (rt *routingTable) GetClosest(target Bitmap, limit int) []Contact {
return contacts return contacts
} }
func appendContacts(contacts []sortedContact, start *list.Element, target Bitmap) []sortedContact {
for curr := start; curr != nil; curr = curr.Next() {
c := toContact(curr)
contacts = append(contacts, sortedContact{c, c.id.Xor(target)})
}
return contacts
}
// Count returns the number of contacts in the routing table // Count returns the number of contacts in the routing table
func (rt *routingTable) Count() int { func (rt *routingTable) Count() int {
rt.lock.RLock() rt.lock.RLock()
@ -255,26 +263,22 @@ func (rt *routingTable) Count() int {
return count return count
} }
func (rt *routingTable) bucketFor(target Bitmap) int {
if rt.id.Equals(target) {
panic("routing table does not have a bucket for its own id")
}
return numBuckets - 1 - target.Xor(rt.id).PrefixLen()
}
func findInList(bucket *list.List, value Bitmap) *list.Element { func findInList(bucket *list.List, value Bitmap) *list.Element {
for curr := bucket.Front(); curr != nil; curr = curr.Next() { for curr := bucket.Front(); curr != nil; curr = curr.Next() {
if curr.Value.(Contact).id.Equals(value) { if toContact(curr).id.Equals(value) {
return curr return curr
} }
} }
return nil return nil
} }
func appendContacts(contacts []sortedContact, start *list.Element, target Bitmap) []sortedContact { func toContact(el *list.Element) Contact {
for curr := start; curr != nil; curr = curr.Next() { return el.Value.(Contact)
c := curr.Value.(Contact)
contacts = append(contacts, sortedContact{c, c.id.Xor(target)})
}
return contacts
}
func bucketFor(id Bitmap, target Bitmap) int {
if id.Equals(target) {
panic("routing table does not have a bucket for its own id")
}
return numBuckets - 1 - target.Xor(id).PrefixLen()
} }

View file

@ -2,25 +2,18 @@ package dht
import "sync" import "sync"
type peer struct {
contact Contact
//<lastPublished>,
//<originallyPublished>
// <originalPublisherID>
}
type peerStore struct { type peerStore struct {
// map of blob hashes to (map of node IDs to bools) // map of blob hashes to (map of node IDs to bools)
hashes map[Bitmap]map[Bitmap]bool hashes map[Bitmap]map[Bitmap]bool
// stores the peers themselves, so they can be updated in one place // stores the peers themselves, so they can be updated in one place
peers map[Bitmap]peer contacts map[Bitmap]Contact
lock sync.RWMutex lock sync.RWMutex
} }
func newPeerStore() *peerStore { func newPeerStore() *peerStore {
return &peerStore{ return &peerStore{
hashes: make(map[Bitmap]map[Bitmap]bool), hashes: make(map[Bitmap]map[Bitmap]bool),
peers: make(map[Bitmap]peer), contacts: make(map[Bitmap]Contact),
} }
} }
@ -32,7 +25,7 @@ func (s *peerStore) Upsert(blobHash Bitmap, contact Contact) {
s.hashes[blobHash] = make(map[Bitmap]bool) s.hashes[blobHash] = make(map[Bitmap]bool)
} }
s.hashes[blobHash][contact.id] = true s.hashes[blobHash][contact.id] = true
s.peers[contact.id] = peer{contact: contact} s.contacts[contact.id] = contact
} }
func (s *peerStore) Get(blobHash Bitmap) []Contact { func (s *peerStore) Get(blobHash Bitmap) []Contact {
@ -42,11 +35,11 @@ func (s *peerStore) Get(blobHash Bitmap) []Contact {
var contacts []Contact var contacts []Contact
if ids, ok := s.hashes[blobHash]; ok { if ids, ok := s.hashes[blobHash]; ok {
for id := range ids { for id := range ids {
peer, ok := s.peers[id] contact, ok := s.contacts[id]
if !ok { if !ok {
panic("node id in IDs list, but not in nodeInfo") panic("node id in IDs list, but not in nodeInfo")
} }
contacts = append(contacts, peer.contact) contacts = append(contacts, contact)
} }
} }
return contacts return contacts

31
dht/testing.go Normal file
View file

@ -0,0 +1,31 @@
package dht
import "strconv"
func TestingCreateDHT(numNodes int) []*DHT {
if numNodes < 1 {
return nil
}
ip := "127.0.0.1"
firstPort := 21000
dhts := make([]*DHT, numNodes)
for i := 0; i < numNodes; i++ {
seeds := []string{}
if i > 0 {
seeds = []string{ip + ":" + strconv.Itoa(firstPort)}
}
dht, err := New(&Config{Address: ip + ":" + strconv.Itoa(firstPort+i), NodeID: RandomBitmapP().Hex(), SeedNodes: seeds})
if err != nil {
panic(err)
}
go dht.Start()
dht.WaitUntilJoined()
dhts[i] = dht
}
return dhts
}