diff --git a/dht/bitmap.go b/dht/bitmap.go index 0da55a0..8b9e7e2 100644 --- a/dht/bitmap.go +++ b/dht/bitmap.go @@ -6,15 +6,19 @@ import ( "encoding/hex" "strings" + "strconv" + "github.com/lbryio/lbry.go/errors" "github.com/lyoshenka/bencode" ) // TODO: http://roaringbitmap.org/ +// Bitmap is a generalized representation of an identifier or data that can be sorted, compared fast. Used by the DHT +// package as a way to handle the unique identifiers of a DHT node. type Bitmap [nodeIDLength]byte -func (b Bitmap) RawString() string { +func (b Bitmap) rawString() string { return string(b[:]) } @@ -31,14 +35,17 @@ func (b Bitmap) BString() string { return buf.String() } +// Hex returns a hexadecimal representation of the bitmap. func (b Bitmap) Hex() string { return hex.EncodeToString(b[:]) } +// HexShort returns a hexadecimal representation of the first 4 bytes. func (b Bitmap) HexShort() string { return hex.EncodeToString(b[:4]) } +// HexSimplified returns the hexadecimal representation with all leading 0's removed func (b Bitmap) HexSimplified() string { simple := strings.TrimLeft(b.Hex(), "0") if simple == "" { @@ -47,6 +54,7 @@ func (b Bitmap) HexSimplified() string { return simple } +// Equals returns T/F if every byte in bitmap are equal. func (b Bitmap) Equals(other Bitmap) bool { for k := range b { if b[k] != other[k] { @@ -56,6 +64,7 @@ func (b Bitmap) Equals(other Bitmap) bool { return true } +// Less returns T/F if there exists a byte pair that is not equal AND this bitmap is less than the other. func (b Bitmap) Less(other interface{}) bool { for k := range b { if b[k] != other.(Bitmap)[k] { @@ -65,6 +74,7 @@ func (b Bitmap) Less(other interface{}) bool { return false } +// LessOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is less than the other. func (b Bitmap) LessOrEqual(other interface{}) bool { if bm, ok := other.(Bitmap); ok && b.Equals(bm) { return true @@ -72,6 +82,7 @@ func (b Bitmap) LessOrEqual(other interface{}) bool { return b.Less(other) } +// Greater returns T/F if there exists a byte pair that is not equal AND this bitmap byte is greater than the other. func (b Bitmap) Greater(other interface{}) bool { for k := range b { if b[k] != other.(Bitmap)[k] { @@ -81,6 +92,7 @@ func (b Bitmap) Greater(other interface{}) bool { return false } +// GreaterOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is greater than the other. func (b Bitmap) GreaterOrEqual(other interface{}) bool { if bm, ok := other.(Bitmap); ok && b.Equals(bm) { return true @@ -88,12 +100,15 @@ func (b Bitmap) GreaterOrEqual(other interface{}) bool { return b.Greater(other) } +// Copy returns a duplicate value for the bitmap. func (b Bitmap) Copy() Bitmap { var ret Bitmap copy(ret[:], b[:]) return ret } +// Xor returns a diff bitmap. If they are equal, the returned bitmap will be all 0's. If 100% unique the returned +// bitmap will be all 1's. func (b Bitmap) Xor(other Bitmap) Bitmap { var ret Bitmap for k := range b { @@ -102,6 +117,7 @@ func (b Bitmap) Xor(other Bitmap) Bitmap { return ret } +// And returns a comparison bitmap, that for each byte returns the AND true table result func (b Bitmap) And(other Bitmap) Bitmap { var ret Bitmap for k := range b { @@ -110,6 +126,7 @@ func (b Bitmap) And(other Bitmap) Bitmap { return ret } +// Or returns a comparison bitmap, that for each byte returns the OR true table result func (b Bitmap) Or(other Bitmap) Bitmap { var ret Bitmap for k := range b { @@ -118,6 +135,7 @@ func (b Bitmap) Or(other Bitmap) Bitmap { return ret } +// Not returns a complimentary bitmap that is an inverse. So b.NOT.NOT = b func (b Bitmap) Not() Bitmap { var ret Bitmap for k := range b { @@ -138,16 +156,21 @@ func (b Bitmap) add(other Bitmap) (Bitmap, bool) { return ret, carry } +// Add returns a bitmap that treats both bitmaps as numbers and adding them together. Since the size of a bitmap is +// limited, an overflow is possible when adding bitmaps. func (b Bitmap) Add(other Bitmap) Bitmap { ret, carry := b.add(other) if carry { - panic("overflow in bitmap addition") + panic("overflow in bitmap addition. limited to " + strconv.Itoa(nodeIDBits) + " bits.") } return ret } +// Sub returns a bitmap that treats both bitmaps as numbers and subtracts then via the inverse of the other and adding +// then together a + (-b). Negative bitmaps are not supported so other must be greater than this. func (b Bitmap) Sub(other Bitmap) Bitmap { if b.Less(other) { + // ToDo: Why is this not supported? Should it say not implemented? BitMap might have a generic use case outside of dht. panic("negative bitmaps not supported") } complement, _ := other.Not().add(BitmapFromShortHexP("1")) @@ -155,10 +178,12 @@ func (b Bitmap) Sub(other Bitmap) Bitmap { return ret } +// Get returns the binary bit at the position passed. func (b Bitmap) Get(n int) bool { return getBit(b[:], n) } +// Set sets the binary bit at the position passed. func (b Bitmap) Set(n int, one bool) Bitmap { ret := b.Copy() setBit(ret[:], n, one) @@ -200,7 +225,7 @@ Outer: return ret } -// Syffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false) +// Suffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false) // https://stackoverflow.com/a/23192263/182709 func (b Bitmap) Suffix(n int, one bool) Bitmap { ret := b.Copy() @@ -223,11 +248,13 @@ Outer: return ret } +// MarshalBencode implements the Marshaller(bencode)/Message interface. func (b Bitmap) MarshalBencode() ([]byte, error) { str := string(b[:]) return bencode.EncodeBytes(str) } +// UnmarshalBencode implements the Marshaller(bencode)/Message interface. func (b *Bitmap) UnmarshalBencode(encoded []byte) error { var str string err := bencode.DecodeBytes(encoded, &str) @@ -241,6 +268,7 @@ func (b *Bitmap) UnmarshalBencode(encoded []byte) error { return nil } +// BitmapFromBytes returns a bitmap as long as the byte array is of a specific length specified in the parameters. func BitmapFromBytes(data []byte) (Bitmap, error) { var bmp Bitmap @@ -252,6 +280,8 @@ func BitmapFromBytes(data []byte) (Bitmap, error) { return bmp, nil } +// BitmapFromBytesP returns a bitmap as long as the byte array is of a specific length specified in the parameters +// otherwise it wil panic. func BitmapFromBytesP(data []byte) Bitmap { bmp, err := BitmapFromBytes(data) if err != nil { @@ -260,10 +290,14 @@ func BitmapFromBytesP(data []byte) Bitmap { return bmp } +//BitmapFromString returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters func BitmapFromString(data string) (Bitmap, error) { return BitmapFromBytes([]byte(data)) } +//BitmapFromStringP returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters otherwise it wil panic. func BitmapFromStringP(data string) Bitmap { bmp, err := BitmapFromString(data) if err != nil { @@ -272,6 +306,8 @@ func BitmapFromStringP(data string) Bitmap { return bmp } +//BitmapFromHex returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters func BitmapFromHex(hexStr string) (Bitmap, error) { decoded, err := hex.DecodeString(hexStr) if err != nil { @@ -280,6 +316,8 @@ func BitmapFromHex(hexStr string) (Bitmap, error) { return BitmapFromBytes(decoded) } +//BitmapFromHexP returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array +// is of a specific length specified in the parameters otherwise it wil panic. func BitmapFromHexP(hexStr string) Bitmap { bmp, err := BitmapFromHex(hexStr) if err != nil { @@ -288,10 +326,15 @@ func BitmapFromHexP(hexStr string) Bitmap { return bmp } +//BitmapFromShortHex returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the +// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters func BitmapFromShortHex(hexStr string) (Bitmap, error) { return BitmapFromHex(strings.Repeat("0", nodeIDLength*2-len(hexStr)) + hexStr) } +//BitmapFromShortHexP returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the +// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters +// otherwise it wil panic. func BitmapFromShortHexP(hexStr string) Bitmap { bmp, err := BitmapFromShortHex(hexStr) if err != nil { @@ -300,6 +343,7 @@ func BitmapFromShortHexP(hexStr string) Bitmap { return bmp } +// RandomBitmapP generates a cryptographically random bitmap with the confines of the parameters specified. func RandomBitmapP() Bitmap { var id Bitmap _, err := rand.Read(id[:]) @@ -309,12 +353,16 @@ func RandomBitmapP() Bitmap { return id } +// RandomBitmapInRangeP generates a cryptographically random bitmap and while it is greater than the high threshold +// bitmap will subtract the diff between high and low until it is no longer greater that the high. func RandomBitmapInRangeP(low, high Bitmap) Bitmap { diff := high.Sub(low) r := RandomBitmapP() for r.Greater(diff) { r = r.Sub(diff) } + //ToDo - Adding the low at this point doesn't gurantee it will be within the range. Consider bitmaps as numbers and + // I have a range of 50-100. If get to say 60, and add 50, I would be at 110. Should protect against this? return r.Add(low) } diff --git a/dht/bitmap_test.go b/dht/bitmap_test.go index cd9f533..3b541a2 100644 --- a/dht/bitmap_test.go +++ b/dht/bitmap_test.go @@ -54,13 +54,10 @@ func TestBitmap(t *testing.T) { func TestBitmap_GetBit(t *testing.T) { tt := []struct { - hex string bit int expected bool panic bool }{ - //{hex: "0", bit: 385, one: true, expected: "1", panic:true}, // should error - //{hex: "0", bit: 384, one: true, expected: "1", panic:true}, {bit: 383, expected: false, panic: false}, {bit: 382, expected: true, panic: false}, {bit: 381, expected: false, panic: false}, diff --git a/dht/bootstrap.go b/dht/bootstrap.go index a4d629a..30077c2 100644 --- a/dht/bootstrap.go +++ b/dht/bootstrap.go @@ -13,6 +13,7 @@ const ( bootstrapDefaultRefreshDuration = 15 * time.Minute ) +// BootstrapNode is a configured node setup for testing. type BootstrapNode struct { Node @@ -24,7 +25,7 @@ type BootstrapNode struct { nodeKeys map[Bitmap]int } -// New returns a BootstrapNode pointer. +// NewBootstrapNode returns a BootstrapNode pointer. func NewBootstrapNode(id Bitmap, initialPingInterval, rePingInterval time.Duration) *BootstrapNode { b := &BootstrapNode{ Node: *NewNode(id), @@ -71,7 +72,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error { return nil } -// ypsert adds the contact to the list, or updates the lastPinged time +// upsert adds the contact to the list, or updates the lastPinged time func (b *BootstrapNode) upsert(c Contact) { b.nlock.Lock() defer b.nlock.Unlock() @@ -157,17 +158,21 @@ func (b *BootstrapNode) check() { func (b *BootstrapNode) handleRequest(addr *net.UDPAddr, request Request) { switch request.Method { case pingMethod: - b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse}) + if err := b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse}); err != nil { + log.Error("error sending response message - ", err) + } case findNodeMethod: if request.Arg == nil { log.Errorln("request is missing arg") return } - b.sendMessage(addr, Response{ + if err := b.sendMessage(addr, Response{ ID: request.ID, NodeID: b.id, Contacts: b.get(bucketSize), - }) + }); err != nil { + log.Error("error sending 'findnodemethod' response message - ", err) + } } go func() { diff --git a/dht/bootstrap_test.go b/dht/bootstrap_test.go index 8b45dee..e57fc83 100644 --- a/dht/bootstrap_test.go +++ b/dht/bootstrap_test.go @@ -13,7 +13,9 @@ func TestBootstrapPing(t *testing.T) { panic(err) } - b.Connect(listener.(*net.UDPConn)) + if err := b.Connect(listener.(*net.UDPConn)); err != nil { + t.Error(err) + } defer b.Shutdown() b.Shutdown() diff --git a/dht/dht.go b/dht/dht.go index 7595c84..250fcc1 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -35,8 +35,7 @@ const ( udpMaxMessageLength = 1024 // bytes. I think our longest message is ~676 bytes, so I rounded up maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table - - tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date + //tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed //tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database @@ -165,6 +164,7 @@ func (dht *DHT) Start() error { return nil } +// WaitUntilJoined blocks until the node joins the network. func (dht *DHT) WaitUntilJoined() { if dht.joined == nil { panic("dht not initialized") @@ -181,7 +181,8 @@ func (dht *DHT) Shutdown() { log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) } -// Get returns the list of nodes that have the blob for the given hash +// Ping pings a given address, creates a temporary contact for sending a message, and returns an error if communication +// fails. func (dht *DHT) Ping(addr string) error { raddr, err := net.ResolveUDPAddr(network, addr) if err != nil { @@ -254,7 +255,11 @@ func (dht *DHT) startReannouncer() { case <-tick.C: dht.lock.RLock() for h := range dht.announced { - go dht.Announce(h) + go func(bm Bitmap) { + if err := dht.Announce(bm); err != nil { + log.Error("error re-announcing bitmap - ", err) + } + }(h) } dht.lock.RUnlock() } @@ -310,6 +315,8 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) { }() } +// PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well +// as current bucket information. func (dht *DHT) PrintState() { log.Printf("DHT node %s at %s", dht.contact.String(), time.Now().Format(time.RFC822Z)) log.Printf("Outstanding transactions: %d", dht.node.CountActiveTransactions()) diff --git a/dht/dht_test.go b/dht/dht_test.go index e702fa4..5b4bb00 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -8,7 +8,7 @@ import ( ) func TestNodeFinder_FindNodes(t *testing.T) { - bs, dhts := TestingCreateDHT(3, true, false) + bs, dhts := TestingCreateDHT(t, 3, true, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -59,7 +59,7 @@ func TestNodeFinder_FindNodes(t *testing.T) { } func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) { - _, dhts := TestingCreateDHT(3, false, false) + _, dhts := TestingCreateDHT(t, 3, false, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -74,7 +74,7 @@ func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) { } func TestNodeFinder_FindValue(t *testing.T) { - bs, dhts := TestingCreateDHT(3, true, false) + bs, dhts := TestingCreateDHT(t, 3, true, false) defer func() { for i := range dhts { dhts[i].Shutdown() @@ -108,7 +108,7 @@ func TestNodeFinder_FindValue(t *testing.T) { func TestDHT_LargeDHT(t *testing.T) { nodes := 100 - bs, dhts := TestingCreateDHT(nodes, true, true) + bs, dhts := TestingCreateDHT(t, nodes, true, true) defer func() { for _, d := range dhts { go d.Shutdown() @@ -121,10 +121,12 @@ func TestDHT_LargeDHT(t *testing.T) { ids := make([]Bitmap, nodes) for i := range ids { ids[i] = RandomBitmapP() - go func(i int) { - wg.Add(1) + wg.Add(1) + go func(index int) { defer wg.Done() - dhts[i].Announce(ids[i]) + if err := dhts[index].Announce(ids[index]); err != nil { + t.Error("error announcing random bitmap - ", err) + } }(i) } wg.Wait() diff --git a/dht/message.go b/dht/message.go index d858621..e67f42c 100644 --- a/dht/message.go +++ b/dht/message.go @@ -42,16 +42,19 @@ const ( tokenField = "token" ) +// Message is an extension of the bencode marshalling interface for serialized message passing. type Message interface { bencode.Marshaler } type messageID [messageIDLength]byte +// HexShort returns the first 8 hex characters of the hex encoded message id. func (m messageID) HexShort() string { return hex.EncodeToString(m[:])[:8] } +// UnmarshalBencode takes a byte slice and unmarshals the message id. func (m *messageID) UnmarshalBencode(encoded []byte) error { var str string err := bencode.DecodeBytes(encoded, &str) @@ -62,6 +65,7 @@ func (m *messageID) UnmarshalBencode(encoded []byte) error { return nil } +// MarshallBencode returns the encoded byte slice of the message id. func (m messageID) MarshalBencode() ([]byte, error) { str := string(m[:]) return bencode.EncodeBytes(str) @@ -76,6 +80,7 @@ func newMessageID() messageID { return m } +// Request represents the structured request from one node to another. type Request struct { ID messageID NodeID Bitmap @@ -84,6 +89,7 @@ type Request struct { StoreArgs *storeArgs } +// MarshalBencode returns the serialized byte slice representation of the request func (r Request) MarshalBencode() ([]byte, error) { var args interface{} if r.StoreArgs != nil { @@ -102,6 +108,7 @@ func (r Request) MarshalBencode() ([]byte, error) { }) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the request. func (r *Request) UnmarshalBencode(b []byte) error { var raw struct { ID messageID `bencode:"1"` @@ -136,7 +143,7 @@ func (r *Request) UnmarshalBencode(b []byte) error { return nil } -func (r Request) ArgsDebug() string { +func (r Request) argsDebug() string { if r.StoreArgs != nil { return r.StoreArgs.BlobHash.HexShort() + ", " + r.StoreArgs.Value.LbryID.HexShort() + ":" + strconv.Itoa(r.StoreArgs.Value.Port) } else if r.Arg != nil { @@ -158,6 +165,7 @@ type storeArgs struct { SelfStore bool // this is an int on the wire } +// MarshalBencode returns the serialized byte slice representation of the storage arguments. func (s storeArgs) MarshalBencode() ([]byte, error) { encodedValue, err := bencode.EncodeString(s.Value) if err != nil { @@ -177,6 +185,7 @@ func (s storeArgs) MarshalBencode() ([]byte, error) { }) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments. func (s *storeArgs) UnmarshalBencode(b []byte) error { var argsInt []bencode.RawMessage err := bencode.DecodeBytes(b, &argsInt) @@ -219,6 +228,7 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error { return nil } +// Response represents the structured response one node returns to another. type Response struct { ID messageID NodeID Bitmap @@ -228,7 +238,7 @@ type Response struct { Token string } -func (r Response) ArgsDebug() string { +func (r Response) argsDebug() string { if r.Data != "" { return r.Data } @@ -251,6 +261,7 @@ func (r Response) ArgsDebug() string { return str } +// MarshalBencode returns the serialized byte slice representation of the response. func (r Response) MarshalBencode() ([]byte, error) { data := map[string]interface{}{ headerTypeField: responseType, @@ -293,6 +304,7 @@ func (r Response) MarshalBencode() ([]byte, error) { return bencode.EncodeBytes(data) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments. func (r *Response) UnmarshalBencode(b []byte) error { var raw struct { ID messageID `bencode:"1"` @@ -362,6 +374,7 @@ func (r *Response) UnmarshalBencode(b []byte) error { return nil } +// Error represents an error message that is returned from one node to another in communication. type Error struct { ID messageID NodeID Bitmap @@ -369,6 +382,7 @@ type Error struct { Response []string } +// MarshalBencode returns the serialized byte slice representation of an error message. func (e Error) MarshalBencode() ([]byte, error) { return bencode.EncodeBytes(map[string]interface{}{ headerTypeField: errorType, @@ -379,6 +393,7 @@ func (e Error) MarshalBencode() ([]byte, error) { }) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the error message. func (e *Error) UnmarshalBencode(b []byte) error { var raw struct { ID messageID `bencode:"1"` diff --git a/dht/message_test.go b/dht/message_test.go index 0eb0f52..1cac9e3 100644 --- a/dht/message_test.go +++ b/dht/message_test.go @@ -101,7 +101,7 @@ func TestBencodeFindValueResponse(t *testing.T) { res := Response{ ID: newMessageID(), NodeID: RandomBitmapP(), - FindValueKey: RandomBitmapP().RawString(), + FindValueKey: RandomBitmapP().rawString(), Token: "arst", Contacts: []Contact{ {ID: RandomBitmapP(), IP: net.IPv4(1, 2, 3, 4).To4(), Port: 5678}, diff --git a/dht/node.go b/dht/node.go index 4bcb4ba..556b30a 100644 --- a/dht/node.go +++ b/dht/node.go @@ -33,8 +33,10 @@ type UDPConn interface { Close() error } +// RequestHandlerFunc is exported handler for requests. type RequestHandlerFunc func(addr *net.UDPAddr, request Request) +// Node is a type representation of a node on the network. type Node struct { // the node's id id Bitmap @@ -61,7 +63,7 @@ type Node struct { stop *stopOnce.Stopper } -// New returns a Node pointer. +// NewNode returns an initialized Node's pointer. func NewNode(id Bitmap) *Node { return &Node{ id: id, @@ -87,13 +89,14 @@ func (n *Node) Connect(conn UDPConn) error { <-n.stop.Ch() n.tokens.Stop() n.connClosed = true - n.conn.Close() + if err := n.conn.Close(); err != nil { + log.Error("error closing node connection on shutdown - ", err) + } }() packets := make(chan packet) - + n.stop.Add(1) go func() { - n.stop.Add(1) defer n.stop.Done() buf := make([]byte, udpMaxMessageLength) @@ -121,9 +124,8 @@ func (n *Node) Connect(conn UDPConn) error { } } }() - + n.stop.Add(1) go func() { - n.stop.Add(1) defer n.stop.Done() var pkt packet @@ -171,7 +173,7 @@ func (n *Node) handlePacket(pkt packet) { log.Errorf("[%s] error decoding request from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data)) return } - log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.ArgsDebug()) + log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.argsDebug()) n.handleRequest(pkt.raddr, request) case '0' + responseType: @@ -181,7 +183,7 @@ func (n *Node) handlePacket(pkt packet) { log.Errorf("[%s] error decoding response from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data)) return } - log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.ArgsDebug()) + log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.argsDebug()) n.handleResponse(pkt.raddr, response) case '0' + errorType: @@ -219,26 +221,34 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { log.Errorln("invalid request method") return case pingMethod: - n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}) + if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}); err != nil { + log.Error("error sending 'pingmethod' response message - ", err) + } case storeMethod: // TODO: we should be sending the IP in the request, not just using the sender's IP // TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ??? if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) { n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port}) - n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}) + if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}); err != nil { + log.Error("error sending 'storemethod' response message - ", err) + } } else { - n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}) + if err := n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}); err != nil { + log.Error("error sending 'storemethod'response message for invalid-token - ", err) + } } case findNodeMethod: if request.Arg == nil { log.Errorln("request is missing arg") return } - n.sendMessage(addr, Response{ + if err := n.sendMessage(addr, Response{ ID: request.ID, NodeID: n.id, Contacts: n.rt.GetClosest(*request.Arg, bucketSize), - }) + }); err != nil { + log.Error("error sending 'findnodemethod' response message - ", err) + } case findValueMethod: if request.Arg == nil { @@ -253,13 +263,15 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) { } if contacts := n.store.Get(*request.Arg); len(contacts) > 0 { - res.FindValueKey = request.Arg.RawString() + res.FindValueKey = request.Arg.rawString() res.Contacts = contacts } else { res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize) } - n.sendMessage(addr, res) + if err := n.sendMessage(addr, res); err != nil { + log.Error("error sending 'findvaluemethod' response message - ", err) + } } // nodes that send us requests should not be inserted, only refreshed. @@ -294,15 +306,17 @@ func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error { if req, ok := data.(Request); ok { log.Debugf("[%s] query %s: sending request to %s (%d bytes) %s(%s)", - n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.ArgsDebug()) + n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.argsDebug()) } else if res, ok := data.(Response); ok { log.Debugf("[%s] query %s: sending response to %s (%d bytes) %s", - n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.ArgsDebug()) + n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.argsDebug()) } else { log.Debugf("[%s] (%d bytes) %s", n.id.HexShort(), len(encoded), spew.Sdump(data)) } - n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { + log.Error("error setting write deadline - ", err) + } _, err = n.conn.WriteToUDP(encoded, addr) return errors.Err(err) @@ -405,7 +419,7 @@ func (n *Node) SendCancelable(contact Contact, req Request) (<-chan *Response, c return n.SendAsync(ctx, contact, req), cancel } -// Count returns the number of transactions in the manager +// CountActiveTransactions returns the number of transactions in the manager func (n *Node) CountActiveTransactions() int { n.txLock.Lock() defer n.txLock.Unlock() @@ -428,6 +442,7 @@ func (n *Node) startRoutingTableGrooming() { }() } +// Store stores a node contact in the node's contact store. func (n *Node) Store(hash Bitmap, c Contact) { n.store.Upsert(hash, c) } diff --git a/dht/node_finder.go b/dht/node_finder.go index d38eec9..6fbd35f 100644 --- a/dht/node_finder.go +++ b/dht/node_finder.go @@ -195,6 +195,9 @@ func (cf *contactFinder) insertIntoActiveList(contact Contact) { inserted := false for i, n := range cf.activeContacts { + // 5000ft: insert contact into sorted active contacts list + // Detail: if diff between new contact id and the target id has fewer changes than the n contact from target + // it should be inserted in between the previous and current. if contact.ID.Xor(cf.target).Less(n.ID.Xor(cf.target)) { cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...) inserted = true diff --git a/dht/node_test.go b/dht/node_test.go index e0f2c88..f9fbdd5 100644 --- a/dht/node_test.go +++ b/dht/node_test.go @@ -30,7 +30,7 @@ func TestPing(t *testing.T) { data, err := bencode.EncodeBytes(map[string]interface{}{ headerTypeField: requestType, headerMessageIDField: messageID, - headerNodeIDField: testNodeID.RawString(), + headerNodeIDField: testNodeID.rawString(), headerPayloadField: "ping", headerArgsField: []string{}, }) @@ -86,7 +86,7 @@ func TestPing(t *testing.T) { rNodeID, ok := response[headerNodeIDField].(string) if !ok { t.Error("node ID is not a string") - } else if rNodeID != dhtNodeID.RawString() { + } else if rNodeID != dhtNodeID.rawString() { t.Error("unexpected node ID") } } @@ -176,7 +176,7 @@ func TestStore(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { @@ -257,7 +257,7 @@ func TestFindNode(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { @@ -290,10 +290,8 @@ func TestFindValueExisting(t *testing.T) { defer dht.Shutdown() nodesToInsert := 3 - var nodes []Contact for i := 0; i < nodesToInsert; i++ { n := Contact{ID: RandomBitmapP(), IP: net.ParseIP("127.0.0.1"), Port: 10000 + i} - nodes = append(nodes, n) dht.node.rt.Update(n) } @@ -333,7 +331,7 @@ func TestFindValueExisting(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { @@ -345,7 +343,7 @@ func TestFindValueExisting(t *testing.T) { t.Fatal("payload is not a dictionary") } - compactContacts, ok := payload[valueToFind.RawString()] + compactContacts, ok := payload[valueToFind.rawString()] if !ok { t.Fatal("payload is missing key for search value") } @@ -412,7 +410,7 @@ func TestFindValueFallbackToFindNode(t *testing.T) { } } - verifyResponse(t, response, messageID, dhtNodeID.RawString()) + verifyResponse(t, response, messageID, dhtNodeID.rawString()) _, ok := response[headerPayloadField] if !ok { diff --git a/dht/routing_table.go b/dht/routing_table.go index 29bbb81..6507f9d 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -14,6 +14,7 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lyoshenka/bencode" + log "github.com/sirupsen/logrus" ) // TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap @@ -21,24 +22,29 @@ import ( // TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg) // https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41 +// Contact is a type representation of another node that a specific node is in communication with. type Contact struct { ID Bitmap IP net.IP Port int } +// Equals returns T/F if two contacts are the same. func (c Contact) Equals(other Contact) bool { return c.ID == other.ID } +// Addr returns the UPD Address of the contact. func (c Contact) Addr() *net.UDPAddr { return &net.UDPAddr{IP: c.IP, Port: c.Port} } +// String returns the concatenated short hex encoded string of its ID + @ + string represention of its UPD Address. func (c Contact) String() string { return c.ID.HexShort() + "@" + c.Addr().String() } +// MarshalCompact returns the compact byte slice representation of a contact. func (c Contact) MarshalCompact() ([]byte, error) { if c.IP.To4() == nil { return nil, errors.Err("ip not set") @@ -60,6 +66,7 @@ func (c Contact) MarshalCompact() ([]byte, error) { return buf.Bytes(), nil } +// UnmarshalCompact unmarshals the compact byte slice representation of a contact. func (c *Contact) UnmarshalCompact(b []byte) error { if len(b) != compactNodeInfoLength { return errors.Err("invalid compact length") @@ -70,10 +77,12 @@ func (c *Contact) UnmarshalCompact(b []byte) error { return nil } +// MarshalBencode returns the serialized byte slice representation of a contact. func (c Contact) MarshalBencode() ([]byte, error) { return bencode.EncodeBytes([]interface{}{c.ID, c.IP.String(), c.Port}) } +// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the contact. func (c *Contact) UnmarshalBencode(b []byte) error { var raw []bencode.RawMessage err := bencode.DecodeBytes(b, &raw) @@ -139,7 +148,7 @@ func (p *peer) Touch() { // ActiveSince returns whether a peer has responded in the last `d` duration // this is used to check if the peer is "good", meaning that we believe the peer will respond to our requests func (p *peer) ActiveInLast(d time.Duration) bool { - return time.Now().Sub(p.LastActivity) > d + return time.Since(p.LastActivity) > d } // IsBad returns whether a peer is "bad", meaning that it has failed to respond to multiple pings in a row @@ -236,7 +245,7 @@ func find(id Bitmap, peers []peer) int { func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool { b.lock.RLock() defer b.lock.RUnlock() - return time.Now().Sub(b.lastUpdate) > refreshInterval + return time.Since(b.lastUpdate) > refreshInterval } type routingTable struct { @@ -341,6 +350,7 @@ func (rt *routingTable) Count() int { return count } +// Range is a structure that holds a min and max bitmaps. The range is used in bucket sizing. type Range struct { start Bitmap end Bitmap @@ -457,7 +467,9 @@ func RoutingTableRefresh(n *Node, refreshInterval time.Duration, cancel <-chan s }() } - nf.Find() + if _, err := nf.Find(); err != nil { + log.Error("error finding contact during routing table refresh - ", err) + } }(id) } diff --git a/dht/testing.go b/dht/testing.go index 0a69439..b93d2a0 100644 --- a/dht/testing.go +++ b/dht/testing.go @@ -13,7 +13,8 @@ import ( var testingDHTIP = "127.0.0.1" var testingDHTFirstPort = 21000 -func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) { +// TestingCreateDHT initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options. +func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) { var bootstrapNode *BootstrapNode var seeds []string @@ -25,7 +26,9 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, if err != nil { panic(err) } - bootstrapNode.Connect(listener.(*net.UDPConn)) + if err := bootstrapNode.Connect(listener.(*net.UDPConn)); err != nil { + t.Error("error connecting bootstrap node - ", err) + } } if numNodes < 1 { @@ -41,7 +44,11 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, panic(err) } - go dht.Start() + go func() { + if err := dht.Start(); err != nil { + t.Error("error starting dht - ", err) + } + }() if !concurrent { dht.WaitUntilJoined() } @@ -103,7 +110,7 @@ func newTestUDPConn(addr string) *testUDPConn { func (t testUDPConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) { var timeoutCh <-chan time.Time if !t.readDeadline.IsZero() { - timeoutCh = time.After(t.readDeadline.Sub(time.Now())) + timeoutCh = time.After(time.Until(t.readDeadline)) } select { @@ -218,7 +225,7 @@ func verifyContacts(t *testing.T, contacts []interface{}, nodes []Contact) { continue } for _, n := range nodes { - if n.ID.RawString() == id { + if n.ID.rawString() == id { currNode = n currNodeFound = true foundNodes[id] = true