From 1f7841e4d00c92db7d601151bfedf86219d6e68e Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 1 May 2018 16:18:38 -0400 Subject: [PATCH] minor refactor --- dht/dht.go | 64 ++++++++++++++------------------------------ dht/dht_test.go | 6 ++--- dht/message.go | 2 +- dht/node.go | 21 ++++++++------- dht/node_test.go | 2 ++ dht/routing_table.go | 42 ++++++++++++++++------------- dht/store.go | 21 +++++---------- dht/testing.go | 31 +++++++++++++++++++++ 8 files changed, 98 insertions(+), 91 deletions(-) create mode 100644 dht/testing.go diff --git a/dht/dht.go b/dht/dht.go index aef320d..9561fe3 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net" - "strconv" "strings" "sync" "time" @@ -21,26 +20,31 @@ func init() { //log.SetLevel(log.DebugLevel) } -const network = "udp4" +const ( + network = "udp4" -const alpha = 3 // this is the constant alpha in the spec -const nodeIDLength = 48 // bytes. this is the constant B in the spec -const messageIDLength = 20 // bytes. -const bucketSize = 8 // this is the constant k in the spec + // TODO: all these constants should be defaults, and should be used to set values in the standard Config. then the code should use values in the config + // TODO: alternatively, have a global Config for constants. at least that way tests can modify the values + alpha = 3 // this is the constant alpha in the spec + bucketSize = 8 // this is the constant k in the spec + nodeIDLength = 48 // bytes. this is the constant B in the spec + messageIDLength = 20 // bytes. -const udpRetry = 3 -const udpTimeout = 10 * time.Second -const udpMaxMessageLength = 1024 // I think our longest message is ~676 bytes, so I rounded up + udpRetry = 3 + udpTimeout = 5 * time.Second + udpMaxMessageLength = 1024 // bytes. I think our longest message is ~676 bytes, so I rounded up -const tExpire = 86400 * time.Second // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date -const tRefresh = 3600 * time.Second // the time after which an otherwise unaccessed bucket must be refreshed -const tReplicate = 3600 * time.Second // the interval between Kademlia replication events, when a node is required to publish its entire database -const tRepublish = 86400 * time.Second // the time after which the original publisher must republish a key/value pair + tExpire = 24 * time.Hour // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date + tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed + tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database + tRepublish = 24 * time.Hour // the time after which the original publisher must republish a key/value pair + tNodeRefresh = 15 * time.Minute // the time after which a good node becomes questionable if it has not messaged us -const numBuckets = nodeIDLength * 8 -const compactNodeInfoLength = nodeIDLength + 6 + numBuckets = nodeIDLength * 8 + compactNodeInfoLength = nodeIDLength + 6 // nodeID + 4 for IP + 2 for port -const tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated + tokenSecretRotationInterval = 5 * time.Minute // how often the token-generating secret is rotated +) // Config represents the configure of dht. type Config struct { @@ -268,34 +272,6 @@ func printNodeList(list []Contact) { } } -func MakeTestDHT(numNodes int) []*DHT { - if numNodes < 1 { - return nil - } - - ip := "127.0.0.1" - firstPort := 21000 - dhts := make([]*DHT, numNodes) - - for i := 0; i < numNodes; i++ { - seeds := []string{} - if i > 0 { - seeds = []string{ip + ":" + strconv.Itoa(firstPort)} - } - - dht, err := New(&Config{Address: ip + ":" + strconv.Itoa(firstPort+i), NodeID: RandomBitmapP().Hex(), SeedNodes: seeds}) - if err != nil { - panic(err) - } - - go dht.Start() - dht.WaitUntilJoined() - dhts[i] = dht - } - - return dhts -} - func getContact(nodeID, addr string) (Contact, error) { var c Contact if nodeID == "" { diff --git a/dht/dht_test.go b/dht/dht_test.go index 7892a58..12c4425 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -13,7 +13,7 @@ import ( // TODO: make a dht with X nodes, have them all join, then ensure that every node appears at least once in another node's routing table func TestNodeFinder_FindNodes(t *testing.T) { - dhts := MakeTestDHT(3) + dhts := TestingCreateDHT(3) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -56,7 +56,7 @@ func TestNodeFinder_FindNodes(t *testing.T) { } func TestNodeFinder_FindValue(t *testing.T) { - dhts := MakeTestDHT(3) + dhts := TestingCreateDHT(3) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -91,7 +91,7 @@ func TestDHT_LargeDHT(t *testing.T) { rand.Seed(time.Now().UnixNano()) log.Println("if this takes longer than 20 seconds, its stuck. idk why it gets stuck sometimes, but its a bug.") nodes := 100 - dhts := MakeTestDHT(nodes) + dhts := TestingCreateDHT(nodes) defer func() { for _, d := range dhts { go d.Shutdown() diff --git a/dht/message.go b/dht/message.go index baefa34..edb81d2 100644 --- a/dht/message.go +++ b/dht/message.go @@ -313,7 +313,7 @@ func (r *Response) UnmarshalBencode(b []byte) error { return nil } - // maybe data is a list of nodes (response to findNode)? + // maybe data is a list of contacts (response to findNode)? err = bencode.DecodeBytes(raw.Data, &r.Contacts) if err == nil { return nil diff --git a/dht/node.go b/dht/node.go index 0fd4bc5..0bdef9e 100644 --- a/dht/node.go +++ b/dht/node.go @@ -33,13 +33,14 @@ type UDPConn interface { } type Node struct { - // TODO: replace Contact with id. ip and port aren't used except when connecting + // the node's id id Bitmap // UDP connection for sending and receiving data conn UDPConn // token manager tokens *tokenManager + // map of outstanding transactions + mutex txLock *sync.RWMutex transactions map[messageID]*transaction @@ -215,22 +216,22 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { log.Errorln("invalid request method") return case pingMethod: - n.send(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}) + n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}) case storeMethod: // TODO: we should be sending the IP in the request, not just using the sender's IP // TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ??? if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) { n.store.Upsert(request.StoreArgs.BlobHash, Contact{id: request.StoreArgs.NodeID, ip: addr.IP, port: request.StoreArgs.Value.Port}) - n.send(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}) + n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}) } else { - n.send(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}) + n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}) } case findNodeMethod: if request.Arg == nil { log.Errorln("request is missing arg") return } - n.send(addr, Response{ + n.sendMessage(addr, Response{ ID: request.ID, NodeID: n.id, Contacts: n.rt.GetClosest(*request.Arg, bucketSize), @@ -255,7 +256,7 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize) } - n.send(addr, res) + n.sendMessage(addr, res) } // nodes that send us requests should not be inserted, only refreshed. @@ -282,7 +283,7 @@ func (n *Node) handleError(addr *net.UDPAddr, e Error) { } // send sends data to a udp address -func (n *Node) send(addr *net.UDPAddr, data Message) error { +func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error { encoded, err := bencode.EncodeBytes(data) if err != nil { return errors.Err(err) @@ -365,11 +366,11 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request) <-ch defer n.txDelete(tx.req.ID) for i := 0; i < udpRetry; i++ { - if err := n.send(contact.Addr(), tx.req); err != nil { + if err := n.sendMessage(contact.Addr(), tx.req); err != nil { if !strings.Contains(err.Error(), "use of closed network connection") { // this only happens on localhost. real UDP has no connections log.Error("send error: ", err) } - continue // try again? return? + continue } select { @@ -383,7 +384,7 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request) <-ch } // if request timed out each time - n.rt.RemoveByID(tx.contact.id) + n.rt.Remove(tx.contact.id) }() return ch diff --git a/dht/node_test.go b/dht/node_test.go index 64e42a8..e58223b 100644 --- a/dht/node_test.go +++ b/dht/node_test.go @@ -23,6 +23,8 @@ func (t timeoutErr) Temporary() bool { return true } +// TODO: just use a normal net.Conn instead of this mock conn + type testUDPPacket struct { data []byte addr *net.UDPAddr diff --git a/dht/routing_table.go b/dht/routing_table.go index bf4beba..81eb6dd 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -165,7 +165,7 @@ func bucketContents(b *list.List) string { func (rt *routingTable) Update(c Contact) { rt.lock.Lock() defer rt.lock.Unlock() - bucketNum := bucketFor(rt.id, c.id) + bucketNum := rt.bucketFor(c.id) bucket := rt.buckets[bucketNum] element := findInList(bucket, c.id) if element == nil { @@ -183,7 +183,7 @@ func (rt *routingTable) Update(c Contact) { func (rt *routingTable) UpdateIfExists(c Contact) { rt.lock.Lock() defer rt.lock.Unlock() - bucketNum := bucketFor(rt.id, c.id) + bucketNum := rt.bucketFor(c.id) bucket := rt.buckets[bucketNum] element := findInList(bucket, c.id) if element != nil { @@ -191,10 +191,10 @@ func (rt *routingTable) UpdateIfExists(c Contact) { } } -func (rt *routingTable) RemoveByID(id Bitmap) { +func (rt *routingTable) Remove(id Bitmap) { rt.lock.Lock() defer rt.lock.Unlock() - bucketNum := bucketFor(rt.id, id) + bucketNum := rt.bucketFor(id) bucket := rt.buckets[bucketNum] element := findInList(bucket, rt.id) if element != nil { @@ -212,7 +212,7 @@ func (rt *routingTable) GetClosest(target Bitmap, limit int) []Contact { if rt.id.Equals(target) { bucketNum = 0 } else { - bucketNum = bucketFor(rt.id, target) + bucketNum = rt.bucketFor(target) } bucket := rt.buckets[bucketNum] @@ -242,6 +242,14 @@ func (rt *routingTable) GetClosest(target Bitmap, limit int) []Contact { return contacts } +func appendContacts(contacts []sortedContact, start *list.Element, target Bitmap) []sortedContact { + for curr := start; curr != nil; curr = curr.Next() { + c := toContact(curr) + contacts = append(contacts, sortedContact{c, c.id.Xor(target)}) + } + return contacts +} + // Count returns the number of contacts in the routing table func (rt *routingTable) Count() int { rt.lock.RLock() @@ -255,26 +263,22 @@ func (rt *routingTable) Count() int { return count } +func (rt *routingTable) bucketFor(target Bitmap) int { + if rt.id.Equals(target) { + panic("routing table does not have a bucket for its own id") + } + return numBuckets - 1 - target.Xor(rt.id).PrefixLen() +} + func findInList(bucket *list.List, value Bitmap) *list.Element { for curr := bucket.Front(); curr != nil; curr = curr.Next() { - if curr.Value.(Contact).id.Equals(value) { + if toContact(curr).id.Equals(value) { return curr } } return nil } -func appendContacts(contacts []sortedContact, start *list.Element, target Bitmap) []sortedContact { - for curr := start; curr != nil; curr = curr.Next() { - c := curr.Value.(Contact) - contacts = append(contacts, sortedContact{c, c.id.Xor(target)}) - } - return contacts -} - -func bucketFor(id Bitmap, target Bitmap) int { - if id.Equals(target) { - panic("routing table does not have a bucket for its own id") - } - return numBuckets - 1 - target.Xor(id).PrefixLen() +func toContact(el *list.Element) Contact { + return el.Value.(Contact) } diff --git a/dht/store.go b/dht/store.go index 76f33fb..70a4056 100644 --- a/dht/store.go +++ b/dht/store.go @@ -2,25 +2,18 @@ package dht import "sync" -type peer struct { - contact Contact - //, - // - // -} - type peerStore struct { // map of blob hashes to (map of node IDs to bools) hashes map[Bitmap]map[Bitmap]bool // stores the peers themselves, so they can be updated in one place - peers map[Bitmap]peer - lock sync.RWMutex + contacts map[Bitmap]Contact + lock sync.RWMutex } func newPeerStore() *peerStore { return &peerStore{ - hashes: make(map[Bitmap]map[Bitmap]bool), - peers: make(map[Bitmap]peer), + hashes: make(map[Bitmap]map[Bitmap]bool), + contacts: make(map[Bitmap]Contact), } } @@ -32,7 +25,7 @@ func (s *peerStore) Upsert(blobHash Bitmap, contact Contact) { s.hashes[blobHash] = make(map[Bitmap]bool) } s.hashes[blobHash][contact.id] = true - s.peers[contact.id] = peer{contact: contact} + s.contacts[contact.id] = contact } func (s *peerStore) Get(blobHash Bitmap) []Contact { @@ -42,11 +35,11 @@ func (s *peerStore) Get(blobHash Bitmap) []Contact { var contacts []Contact if ids, ok := s.hashes[blobHash]; ok { for id := range ids { - peer, ok := s.peers[id] + contact, ok := s.contacts[id] if !ok { panic("node id in IDs list, but not in nodeInfo") } - contacts = append(contacts, peer.contact) + contacts = append(contacts, contact) } } return contacts diff --git a/dht/testing.go b/dht/testing.go new file mode 100644 index 0000000..a22bd71 --- /dev/null +++ b/dht/testing.go @@ -0,0 +1,31 @@ +package dht + +import "strconv" + +func TestingCreateDHT(numNodes int) []*DHT { + if numNodes < 1 { + return nil + } + + ip := "127.0.0.1" + firstPort := 21000 + dhts := make([]*DHT, numNodes) + + for i := 0; i < numNodes; i++ { + seeds := []string{} + if i > 0 { + seeds = []string{ip + ":" + strconv.Itoa(firstPort)} + } + + dht, err := New(&Config{Address: ip + ":" + strconv.Itoa(firstPort+i), NodeID: RandomBitmapP().Hex(), SeedNodes: seeds}) + if err != nil { + panic(err) + } + + go dht.Start() + dht.WaitUntilJoined() + dhts[i] = dht + } + + return dhts +}