code cleanup

-Added travis support
-updated travis to analyze code beneath the root.
-refactored upload.go to fix travis errors.
-gocyclo should ignore test files. $GOFILES needed to be adjusted.
-fix rows.Close() ignoring error. Created func to handle so defer can be used when needed also.
-fixed ignored errors.
-fixed unit test that was not passing correctly to anonymous function.
-fixed govet error for passing param inside go func.
-removed returned error, in favor of logging instead.
-added error logging for ignored error.
-fixed potential race conditions.
-removed unused append
-fixed time usage to align with go standards.
-removed unused variables
-made changes for code review.
-code comments for exported functions.
-Documented bitmap.go and insert into contact list.
-Documented dht, message, bootstrap
-Fixed comment typos
-Documented message,node, routing_table, testing in DHT package.
-Documented server, client, prism, server and shared in peer and reflector packages.
-Documented the stores in Store package.
-made defer adjustments inline and deleted the separate function.
-adjusted method in upload to take the only parameter it requires.
This commit is contained in:
Mark Beamer Jr 2018-05-29 21:38:55 -04:00 committed by Alex Grintsvayg
parent 79527da8a9
commit 8100010220
13 changed files with 173 additions and 62 deletions

View file

@ -6,15 +6,19 @@ import (
"encoding/hex"
"strings"
"strconv"
"github.com/lbryio/lbry.go/errors"
"github.com/lyoshenka/bencode"
)
// TODO: http://roaringbitmap.org/
// Bitmap is a generalized representation of an identifier or data that can be sorted, compared fast. Used by the DHT
// package as a way to handle the unique identifiers of a DHT node.
type Bitmap [nodeIDLength]byte
func (b Bitmap) RawString() string {
func (b Bitmap) rawString() string {
return string(b[:])
}
@ -31,14 +35,17 @@ func (b Bitmap) BString() string {
return buf.String()
}
// Hex returns a hexadecimal representation of the bitmap.
func (b Bitmap) Hex() string {
return hex.EncodeToString(b[:])
}
// HexShort returns a hexadecimal representation of the first 4 bytes.
func (b Bitmap) HexShort() string {
return hex.EncodeToString(b[:4])
}
// HexSimplified returns the hexadecimal representation with all leading 0's removed
func (b Bitmap) HexSimplified() string {
simple := strings.TrimLeft(b.Hex(), "0")
if simple == "" {
@ -47,6 +54,7 @@ func (b Bitmap) HexSimplified() string {
return simple
}
// Equals returns T/F if every byte in bitmap are equal.
func (b Bitmap) Equals(other Bitmap) bool {
for k := range b {
if b[k] != other[k] {
@ -56,6 +64,7 @@ func (b Bitmap) Equals(other Bitmap) bool {
return true
}
// Less returns T/F if there exists a byte pair that is not equal AND this bitmap is less than the other.
func (b Bitmap) Less(other interface{}) bool {
for k := range b {
if b[k] != other.(Bitmap)[k] {
@ -65,6 +74,7 @@ func (b Bitmap) Less(other interface{}) bool {
return false
}
// LessOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is less than the other.
func (b Bitmap) LessOrEqual(other interface{}) bool {
if bm, ok := other.(Bitmap); ok && b.Equals(bm) {
return true
@ -72,6 +82,7 @@ func (b Bitmap) LessOrEqual(other interface{}) bool {
return b.Less(other)
}
// Greater returns T/F if there exists a byte pair that is not equal AND this bitmap byte is greater than the other.
func (b Bitmap) Greater(other interface{}) bool {
for k := range b {
if b[k] != other.(Bitmap)[k] {
@ -81,6 +92,7 @@ func (b Bitmap) Greater(other interface{}) bool {
return false
}
// GreaterOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is greater than the other.
func (b Bitmap) GreaterOrEqual(other interface{}) bool {
if bm, ok := other.(Bitmap); ok && b.Equals(bm) {
return true
@ -88,12 +100,15 @@ func (b Bitmap) GreaterOrEqual(other interface{}) bool {
return b.Greater(other)
}
// Copy returns a duplicate value for the bitmap.
func (b Bitmap) Copy() Bitmap {
var ret Bitmap
copy(ret[:], b[:])
return ret
}
// Xor returns a diff bitmap. If they are equal, the returned bitmap will be all 0's. If 100% unique the returned
// bitmap will be all 1's.
func (b Bitmap) Xor(other Bitmap) Bitmap {
var ret Bitmap
for k := range b {
@ -102,6 +117,7 @@ func (b Bitmap) Xor(other Bitmap) Bitmap {
return ret
}
// And returns a comparison bitmap, that for each byte returns the AND true table result
func (b Bitmap) And(other Bitmap) Bitmap {
var ret Bitmap
for k := range b {
@ -110,6 +126,7 @@ func (b Bitmap) And(other Bitmap) Bitmap {
return ret
}
// Or returns a comparison bitmap, that for each byte returns the OR true table result
func (b Bitmap) Or(other Bitmap) Bitmap {
var ret Bitmap
for k := range b {
@ -118,6 +135,7 @@ func (b Bitmap) Or(other Bitmap) Bitmap {
return ret
}
// Not returns a complimentary bitmap that is an inverse. So b.NOT.NOT = b
func (b Bitmap) Not() Bitmap {
var ret Bitmap
for k := range b {
@ -138,16 +156,21 @@ func (b Bitmap) add(other Bitmap) (Bitmap, bool) {
return ret, carry
}
// Add returns a bitmap that treats both bitmaps as numbers and adding them together. Since the size of a bitmap is
// limited, an overflow is possible when adding bitmaps.
func (b Bitmap) Add(other Bitmap) Bitmap {
ret, carry := b.add(other)
if carry {
panic("overflow in bitmap addition")
panic("overflow in bitmap addition. limited to " + strconv.Itoa(nodeIDBits) + " bits.")
}
return ret
}
// Sub returns a bitmap that treats both bitmaps as numbers and subtracts then via the inverse of the other and adding
// then together a + (-b). Negative bitmaps are not supported so other must be greater than this.
func (b Bitmap) Sub(other Bitmap) Bitmap {
if b.Less(other) {
// ToDo: Why is this not supported? Should it say not implemented? BitMap might have a generic use case outside of dht.
panic("negative bitmaps not supported")
}
complement, _ := other.Not().add(BitmapFromShortHexP("1"))
@ -155,10 +178,12 @@ func (b Bitmap) Sub(other Bitmap) Bitmap {
return ret
}
// Get returns the binary bit at the position passed.
func (b Bitmap) Get(n int) bool {
return getBit(b[:], n)
}
// Set sets the binary bit at the position passed.
func (b Bitmap) Set(n int, one bool) Bitmap {
ret := b.Copy()
setBit(ret[:], n, one)
@ -200,7 +225,7 @@ Outer:
return ret
}
// Syffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false)
// Suffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false)
// https://stackoverflow.com/a/23192263/182709
func (b Bitmap) Suffix(n int, one bool) Bitmap {
ret := b.Copy()
@ -223,11 +248,13 @@ Outer:
return ret
}
// MarshalBencode implements the Marshaller(bencode)/Message interface.
func (b Bitmap) MarshalBencode() ([]byte, error) {
str := string(b[:])
return bencode.EncodeBytes(str)
}
// UnmarshalBencode implements the Marshaller(bencode)/Message interface.
func (b *Bitmap) UnmarshalBencode(encoded []byte) error {
var str string
err := bencode.DecodeBytes(encoded, &str)
@ -241,6 +268,7 @@ func (b *Bitmap) UnmarshalBencode(encoded []byte) error {
return nil
}
// BitmapFromBytes returns a bitmap as long as the byte array is of a specific length specified in the parameters.
func BitmapFromBytes(data []byte) (Bitmap, error) {
var bmp Bitmap
@ -252,6 +280,8 @@ func BitmapFromBytes(data []byte) (Bitmap, error) {
return bmp, nil
}
// BitmapFromBytesP returns a bitmap as long as the byte array is of a specific length specified in the parameters
// otherwise it wil panic.
func BitmapFromBytesP(data []byte) Bitmap {
bmp, err := BitmapFromBytes(data)
if err != nil {
@ -260,10 +290,14 @@ func BitmapFromBytesP(data []byte) Bitmap {
return bmp
}
//BitmapFromString returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters
func BitmapFromString(data string) (Bitmap, error) {
return BitmapFromBytes([]byte(data))
}
//BitmapFromStringP returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters otherwise it wil panic.
func BitmapFromStringP(data string) Bitmap {
bmp, err := BitmapFromString(data)
if err != nil {
@ -272,6 +306,8 @@ func BitmapFromStringP(data string) Bitmap {
return bmp
}
//BitmapFromHex returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters
func BitmapFromHex(hexStr string) (Bitmap, error) {
decoded, err := hex.DecodeString(hexStr)
if err != nil {
@ -280,6 +316,8 @@ func BitmapFromHex(hexStr string) (Bitmap, error) {
return BitmapFromBytes(decoded)
}
//BitmapFromHexP returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters otherwise it wil panic.
func BitmapFromHexP(hexStr string) Bitmap {
bmp, err := BitmapFromHex(hexStr)
if err != nil {
@ -288,10 +326,15 @@ func BitmapFromHexP(hexStr string) Bitmap {
return bmp
}
//BitmapFromShortHex returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the
// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters
func BitmapFromShortHex(hexStr string) (Bitmap, error) {
return BitmapFromHex(strings.Repeat("0", nodeIDLength*2-len(hexStr)) + hexStr)
}
//BitmapFromShortHexP returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the
// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters
// otherwise it wil panic.
func BitmapFromShortHexP(hexStr string) Bitmap {
bmp, err := BitmapFromShortHex(hexStr)
if err != nil {
@ -300,6 +343,7 @@ func BitmapFromShortHexP(hexStr string) Bitmap {
return bmp
}
// RandomBitmapP generates a cryptographically random bitmap with the confines of the parameters specified.
func RandomBitmapP() Bitmap {
var id Bitmap
_, err := rand.Read(id[:])
@ -309,12 +353,16 @@ func RandomBitmapP() Bitmap {
return id
}
// RandomBitmapInRangeP generates a cryptographically random bitmap and while it is greater than the high threshold
// bitmap will subtract the diff between high and low until it is no longer greater that the high.
func RandomBitmapInRangeP(low, high Bitmap) Bitmap {
diff := high.Sub(low)
r := RandomBitmapP()
for r.Greater(diff) {
r = r.Sub(diff)
}
//ToDo - Adding the low at this point doesn't gurantee it will be within the range. Consider bitmaps as numbers and
// I have a range of 50-100. If get to say 60, and add 50, I would be at 110. Should protect against this?
return r.Add(low)
}

View file

@ -54,13 +54,10 @@ func TestBitmap(t *testing.T) {
func TestBitmap_GetBit(t *testing.T) {
tt := []struct {
hex string
bit int
expected bool
panic bool
}{
//{hex: "0", bit: 385, one: true, expected: "1", panic:true}, // should error
//{hex: "0", bit: 384, one: true, expected: "1", panic:true},
{bit: 383, expected: false, panic: false},
{bit: 382, expected: true, panic: false},
{bit: 381, expected: false, panic: false},

View file

@ -13,6 +13,7 @@ const (
bootstrapDefaultRefreshDuration = 15 * time.Minute
)
// BootstrapNode is a configured node setup for testing.
type BootstrapNode struct {
Node
@ -24,7 +25,7 @@ type BootstrapNode struct {
nodeKeys map[Bitmap]int
}
// New returns a BootstrapNode pointer.
// NewBootstrapNode returns a BootstrapNode pointer.
func NewBootstrapNode(id Bitmap, initialPingInterval, rePingInterval time.Duration) *BootstrapNode {
b := &BootstrapNode{
Node: *NewNode(id),
@ -71,7 +72,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error {
return nil
}
// ypsert adds the contact to the list, or updates the lastPinged time
// upsert adds the contact to the list, or updates the lastPinged time
func (b *BootstrapNode) upsert(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
@ -157,17 +158,21 @@ func (b *BootstrapNode) check() {
func (b *BootstrapNode) handleRequest(addr *net.UDPAddr, request Request) {
switch request.Method {
case pingMethod:
b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse})
if err := b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse}); err != nil {
log.Error("error sending response message - ", err)
}
case findNodeMethod:
if request.Arg == nil {
log.Errorln("request is missing arg")
return
}
b.sendMessage(addr, Response{
if err := b.sendMessage(addr, Response{
ID: request.ID,
NodeID: b.id,
Contacts: b.get(bucketSize),
})
}); err != nil {
log.Error("error sending 'findnodemethod' response message - ", err)
}
}
go func() {

View file

@ -13,7 +13,9 @@ func TestBootstrapPing(t *testing.T) {
panic(err)
}
b.Connect(listener.(*net.UDPConn))
if err := b.Connect(listener.(*net.UDPConn)); err != nil {
t.Error(err)
}
defer b.Shutdown()
b.Shutdown()

View file

@ -35,8 +35,7 @@ const (
udpMaxMessageLength = 1024 // bytes. I think our longest message is ~676 bytes, so I rounded up
maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table
tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
//tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair
tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed
//tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database
@ -165,6 +164,7 @@ func (dht *DHT) Start() error {
return nil
}
// WaitUntilJoined blocks until the node joins the network.
func (dht *DHT) WaitUntilJoined() {
if dht.joined == nil {
panic("dht not initialized")
@ -181,7 +181,8 @@ func (dht *DHT) Shutdown() {
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
}
// Get returns the list of nodes that have the blob for the given hash
// Ping pings a given address, creates a temporary contact for sending a message, and returns an error if communication
// fails.
func (dht *DHT) Ping(addr string) error {
raddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
@ -254,7 +255,11 @@ func (dht *DHT) startReannouncer() {
case <-tick.C:
dht.lock.RLock()
for h := range dht.announced {
go dht.Announce(h)
go func(bm Bitmap) {
if err := dht.Announce(bm); err != nil {
log.Error("error re-announcing bitmap - ", err)
}
}(h)
}
dht.lock.RUnlock()
}
@ -310,6 +315,8 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) {
}()
}
// PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well
// as current bucket information.
func (dht *DHT) PrintState() {
log.Printf("DHT node %s at %s", dht.contact.String(), time.Now().Format(time.RFC822Z))
log.Printf("Outstanding transactions: %d", dht.node.CountActiveTransactions())

View file

@ -8,7 +8,7 @@ import (
)
func TestNodeFinder_FindNodes(t *testing.T) {
bs, dhts := TestingCreateDHT(3, true, false)
bs, dhts := TestingCreateDHT(t, 3, true, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -59,7 +59,7 @@ func TestNodeFinder_FindNodes(t *testing.T) {
}
func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) {
_, dhts := TestingCreateDHT(3, false, false)
_, dhts := TestingCreateDHT(t, 3, false, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -74,7 +74,7 @@ func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) {
}
func TestNodeFinder_FindValue(t *testing.T) {
bs, dhts := TestingCreateDHT(3, true, false)
bs, dhts := TestingCreateDHT(t, 3, true, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -108,7 +108,7 @@ func TestNodeFinder_FindValue(t *testing.T) {
func TestDHT_LargeDHT(t *testing.T) {
nodes := 100
bs, dhts := TestingCreateDHT(nodes, true, true)
bs, dhts := TestingCreateDHT(t, nodes, true, true)
defer func() {
for _, d := range dhts {
go d.Shutdown()
@ -121,10 +121,12 @@ func TestDHT_LargeDHT(t *testing.T) {
ids := make([]Bitmap, nodes)
for i := range ids {
ids[i] = RandomBitmapP()
go func(i int) {
wg.Add(1)
go func(index int) {
defer wg.Done()
dhts[i].Announce(ids[i])
if err := dhts[index].Announce(ids[index]); err != nil {
t.Error("error announcing random bitmap - ", err)
}
}(i)
}
wg.Wait()

View file

@ -42,16 +42,19 @@ const (
tokenField = "token"
)
// Message is an extension of the bencode marshalling interface for serialized message passing.
type Message interface {
bencode.Marshaler
}
type messageID [messageIDLength]byte
// HexShort returns the first 8 hex characters of the hex encoded message id.
func (m messageID) HexShort() string {
return hex.EncodeToString(m[:])[:8]
}
// UnmarshalBencode takes a byte slice and unmarshals the message id.
func (m *messageID) UnmarshalBencode(encoded []byte) error {
var str string
err := bencode.DecodeBytes(encoded, &str)
@ -62,6 +65,7 @@ func (m *messageID) UnmarshalBencode(encoded []byte) error {
return nil
}
// MarshallBencode returns the encoded byte slice of the message id.
func (m messageID) MarshalBencode() ([]byte, error) {
str := string(m[:])
return bencode.EncodeBytes(str)
@ -76,6 +80,7 @@ func newMessageID() messageID {
return m
}
// Request represents the structured request from one node to another.
type Request struct {
ID messageID
NodeID Bitmap
@ -84,6 +89,7 @@ type Request struct {
StoreArgs *storeArgs
}
// MarshalBencode returns the serialized byte slice representation of the request
func (r Request) MarshalBencode() ([]byte, error) {
var args interface{}
if r.StoreArgs != nil {
@ -102,6 +108,7 @@ func (r Request) MarshalBencode() ([]byte, error) {
})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the request.
func (r *Request) UnmarshalBencode(b []byte) error {
var raw struct {
ID messageID `bencode:"1"`
@ -136,7 +143,7 @@ func (r *Request) UnmarshalBencode(b []byte) error {
return nil
}
func (r Request) ArgsDebug() string {
func (r Request) argsDebug() string {
if r.StoreArgs != nil {
return r.StoreArgs.BlobHash.HexShort() + ", " + r.StoreArgs.Value.LbryID.HexShort() + ":" + strconv.Itoa(r.StoreArgs.Value.Port)
} else if r.Arg != nil {
@ -158,6 +165,7 @@ type storeArgs struct {
SelfStore bool // this is an int on the wire
}
// MarshalBencode returns the serialized byte slice representation of the storage arguments.
func (s storeArgs) MarshalBencode() ([]byte, error) {
encodedValue, err := bencode.EncodeString(s.Value)
if err != nil {
@ -177,6 +185,7 @@ func (s storeArgs) MarshalBencode() ([]byte, error) {
})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments.
func (s *storeArgs) UnmarshalBencode(b []byte) error {
var argsInt []bencode.RawMessage
err := bencode.DecodeBytes(b, &argsInt)
@ -219,6 +228,7 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error {
return nil
}
// Response represents the structured response one node returns to another.
type Response struct {
ID messageID
NodeID Bitmap
@ -228,7 +238,7 @@ type Response struct {
Token string
}
func (r Response) ArgsDebug() string {
func (r Response) argsDebug() string {
if r.Data != "" {
return r.Data
}
@ -251,6 +261,7 @@ func (r Response) ArgsDebug() string {
return str
}
// MarshalBencode returns the serialized byte slice representation of the response.
func (r Response) MarshalBencode() ([]byte, error) {
data := map[string]interface{}{
headerTypeField: responseType,
@ -293,6 +304,7 @@ func (r Response) MarshalBencode() ([]byte, error) {
return bencode.EncodeBytes(data)
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments.
func (r *Response) UnmarshalBencode(b []byte) error {
var raw struct {
ID messageID `bencode:"1"`
@ -362,6 +374,7 @@ func (r *Response) UnmarshalBencode(b []byte) error {
return nil
}
// Error represents an error message that is returned from one node to another in communication.
type Error struct {
ID messageID
NodeID Bitmap
@ -369,6 +382,7 @@ type Error struct {
Response []string
}
// MarshalBencode returns the serialized byte slice representation of an error message.
func (e Error) MarshalBencode() ([]byte, error) {
return bencode.EncodeBytes(map[string]interface{}{
headerTypeField: errorType,
@ -379,6 +393,7 @@ func (e Error) MarshalBencode() ([]byte, error) {
})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the error message.
func (e *Error) UnmarshalBencode(b []byte) error {
var raw struct {
ID messageID `bencode:"1"`

View file

@ -101,7 +101,7 @@ func TestBencodeFindValueResponse(t *testing.T) {
res := Response{
ID: newMessageID(),
NodeID: RandomBitmapP(),
FindValueKey: RandomBitmapP().RawString(),
FindValueKey: RandomBitmapP().rawString(),
Token: "arst",
Contacts: []Contact{
{ID: RandomBitmapP(), IP: net.IPv4(1, 2, 3, 4).To4(), Port: 5678},

View file

@ -33,8 +33,10 @@ type UDPConn interface {
Close() error
}
// RequestHandlerFunc is exported handler for requests.
type RequestHandlerFunc func(addr *net.UDPAddr, request Request)
// Node is a type representation of a node on the network.
type Node struct {
// the node's id
id Bitmap
@ -61,7 +63,7 @@ type Node struct {
stop *stopOnce.Stopper
}
// New returns a Node pointer.
// NewNode returns an initialized Node's pointer.
func NewNode(id Bitmap) *Node {
return &Node{
id: id,
@ -87,13 +89,14 @@ func (n *Node) Connect(conn UDPConn) error {
<-n.stop.Ch()
n.tokens.Stop()
n.connClosed = true
n.conn.Close()
if err := n.conn.Close(); err != nil {
log.Error("error closing node connection on shutdown - ", err)
}
}()
packets := make(chan packet)
go func() {
n.stop.Add(1)
go func() {
defer n.stop.Done()
buf := make([]byte, udpMaxMessageLength)
@ -121,9 +124,8 @@ func (n *Node) Connect(conn UDPConn) error {
}
}
}()
go func() {
n.stop.Add(1)
go func() {
defer n.stop.Done()
var pkt packet
@ -171,7 +173,7 @@ func (n *Node) handlePacket(pkt packet) {
log.Errorf("[%s] error decoding request from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data))
return
}
log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.ArgsDebug())
log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.argsDebug())
n.handleRequest(pkt.raddr, request)
case '0' + responseType:
@ -181,7 +183,7 @@ func (n *Node) handlePacket(pkt packet) {
log.Errorf("[%s] error decoding response from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data))
return
}
log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.ArgsDebug())
log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.argsDebug())
n.handleResponse(pkt.raddr, response)
case '0' + errorType:
@ -219,26 +221,34 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
log.Errorln("invalid request method")
return
case pingMethod:
n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse})
if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}); err != nil {
log.Error("error sending 'pingmethod' response message - ", err)
}
case storeMethod:
// TODO: we should be sending the IP in the request, not just using the sender's IP
// TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ???
if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) {
n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port})
n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse})
if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}); err != nil {
log.Error("error sending 'storemethod' response message - ", err)
}
} else {
n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"})
if err := n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}); err != nil {
log.Error("error sending 'storemethod'response message for invalid-token - ", err)
}
}
case findNodeMethod:
if request.Arg == nil {
log.Errorln("request is missing arg")
return
}
n.sendMessage(addr, Response{
if err := n.sendMessage(addr, Response{
ID: request.ID,
NodeID: n.id,
Contacts: n.rt.GetClosest(*request.Arg, bucketSize),
})
}); err != nil {
log.Error("error sending 'findnodemethod' response message - ", err)
}
case findValueMethod:
if request.Arg == nil {
@ -253,13 +263,15 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
}
if contacts := n.store.Get(*request.Arg); len(contacts) > 0 {
res.FindValueKey = request.Arg.RawString()
res.FindValueKey = request.Arg.rawString()
res.Contacts = contacts
} else {
res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize)
}
n.sendMessage(addr, res)
if err := n.sendMessage(addr, res); err != nil {
log.Error("error sending 'findvaluemethod' response message - ", err)
}
}
// nodes that send us requests should not be inserted, only refreshed.
@ -294,15 +306,17 @@ func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error {
if req, ok := data.(Request); ok {
log.Debugf("[%s] query %s: sending request to %s (%d bytes) %s(%s)",
n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.ArgsDebug())
n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.argsDebug())
} else if res, ok := data.(Response); ok {
log.Debugf("[%s] query %s: sending response to %s (%d bytes) %s",
n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.ArgsDebug())
n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.argsDebug())
} else {
log.Debugf("[%s] (%d bytes) %s", n.id.HexShort(), len(encoded), spew.Sdump(data))
}
n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
log.Error("error setting write deadline - ", err)
}
_, err = n.conn.WriteToUDP(encoded, addr)
return errors.Err(err)
@ -405,7 +419,7 @@ func (n *Node) SendCancelable(contact Contact, req Request) (<-chan *Response, c
return n.SendAsync(ctx, contact, req), cancel
}
// Count returns the number of transactions in the manager
// CountActiveTransactions returns the number of transactions in the manager
func (n *Node) CountActiveTransactions() int {
n.txLock.Lock()
defer n.txLock.Unlock()
@ -428,6 +442,7 @@ func (n *Node) startRoutingTableGrooming() {
}()
}
// Store stores a node contact in the node's contact store.
func (n *Node) Store(hash Bitmap, c Contact) {
n.store.Upsert(hash, c)
}

View file

@ -195,6 +195,9 @@ func (cf *contactFinder) insertIntoActiveList(contact Contact) {
inserted := false
for i, n := range cf.activeContacts {
// 5000ft: insert contact into sorted active contacts list
// Detail: if diff between new contact id and the target id has fewer changes than the n contact from target
// it should be inserted in between the previous and current.
if contact.ID.Xor(cf.target).Less(n.ID.Xor(cf.target)) {
cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...)
inserted = true

View file

@ -30,7 +30,7 @@ func TestPing(t *testing.T) {
data, err := bencode.EncodeBytes(map[string]interface{}{
headerTypeField: requestType,
headerMessageIDField: messageID,
headerNodeIDField: testNodeID.RawString(),
headerNodeIDField: testNodeID.rawString(),
headerPayloadField: "ping",
headerArgsField: []string{},
})
@ -86,7 +86,7 @@ func TestPing(t *testing.T) {
rNodeID, ok := response[headerNodeIDField].(string)
if !ok {
t.Error("node ID is not a string")
} else if rNodeID != dhtNodeID.RawString() {
} else if rNodeID != dhtNodeID.rawString() {
t.Error("unexpected node ID")
}
}
@ -176,7 +176,7 @@ func TestStore(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {
@ -257,7 +257,7 @@ func TestFindNode(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {
@ -290,10 +290,8 @@ func TestFindValueExisting(t *testing.T) {
defer dht.Shutdown()
nodesToInsert := 3
var nodes []Contact
for i := 0; i < nodesToInsert; i++ {
n := Contact{ID: RandomBitmapP(), IP: net.ParseIP("127.0.0.1"), Port: 10000 + i}
nodes = append(nodes, n)
dht.node.rt.Update(n)
}
@ -333,7 +331,7 @@ func TestFindValueExisting(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {
@ -345,7 +343,7 @@ func TestFindValueExisting(t *testing.T) {
t.Fatal("payload is not a dictionary")
}
compactContacts, ok := payload[valueToFind.RawString()]
compactContacts, ok := payload[valueToFind.rawString()]
if !ok {
t.Fatal("payload is missing key for search value")
}
@ -412,7 +410,7 @@ func TestFindValueFallbackToFindNode(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {

View file

@ -14,6 +14,7 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lyoshenka/bencode"
log "github.com/sirupsen/logrus"
)
// TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap
@ -21,24 +22,29 @@ import (
// TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg)
// https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41
// Contact is a type representation of another node that a specific node is in communication with.
type Contact struct {
ID Bitmap
IP net.IP
Port int
}
// Equals returns T/F if two contacts are the same.
func (c Contact) Equals(other Contact) bool {
return c.ID == other.ID
}
// Addr returns the UPD Address of the contact.
func (c Contact) Addr() *net.UDPAddr {
return &net.UDPAddr{IP: c.IP, Port: c.Port}
}
// String returns the concatenated short hex encoded string of its ID + @ + string represention of its UPD Address.
func (c Contact) String() string {
return c.ID.HexShort() + "@" + c.Addr().String()
}
// MarshalCompact returns the compact byte slice representation of a contact.
func (c Contact) MarshalCompact() ([]byte, error) {
if c.IP.To4() == nil {
return nil, errors.Err("ip not set")
@ -60,6 +66,7 @@ func (c Contact) MarshalCompact() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalCompact unmarshals the compact byte slice representation of a contact.
func (c *Contact) UnmarshalCompact(b []byte) error {
if len(b) != compactNodeInfoLength {
return errors.Err("invalid compact length")
@ -70,10 +77,12 @@ func (c *Contact) UnmarshalCompact(b []byte) error {
return nil
}
// MarshalBencode returns the serialized byte slice representation of a contact.
func (c Contact) MarshalBencode() ([]byte, error) {
return bencode.EncodeBytes([]interface{}{c.ID, c.IP.String(), c.Port})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the contact.
func (c *Contact) UnmarshalBencode(b []byte) error {
var raw []bencode.RawMessage
err := bencode.DecodeBytes(b, &raw)
@ -139,7 +148,7 @@ func (p *peer) Touch() {
// ActiveSince returns whether a peer has responded in the last `d` duration
// this is used to check if the peer is "good", meaning that we believe the peer will respond to our requests
func (p *peer) ActiveInLast(d time.Duration) bool {
return time.Now().Sub(p.LastActivity) > d
return time.Since(p.LastActivity) > d
}
// IsBad returns whether a peer is "bad", meaning that it has failed to respond to multiple pings in a row
@ -236,7 +245,7 @@ func find(id Bitmap, peers []peer) int {
func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool {
b.lock.RLock()
defer b.lock.RUnlock()
return time.Now().Sub(b.lastUpdate) > refreshInterval
return time.Since(b.lastUpdate) > refreshInterval
}
type routingTable struct {
@ -341,6 +350,7 @@ func (rt *routingTable) Count() int {
return count
}
// Range is a structure that holds a min and max bitmaps. The range is used in bucket sizing.
type Range struct {
start Bitmap
end Bitmap
@ -457,7 +467,9 @@ func RoutingTableRefresh(n *Node, refreshInterval time.Duration, cancel <-chan s
}()
}
nf.Find()
if _, err := nf.Find(); err != nil {
log.Error("error finding contact during routing table refresh - ", err)
}
}(id)
}

View file

@ -13,7 +13,8 @@ import (
var testingDHTIP = "127.0.0.1"
var testingDHTFirstPort = 21000
func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) {
// TestingCreateDHT initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options.
func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) {
var bootstrapNode *BootstrapNode
var seeds []string
@ -25,7 +26,9 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode,
if err != nil {
panic(err)
}
bootstrapNode.Connect(listener.(*net.UDPConn))
if err := bootstrapNode.Connect(listener.(*net.UDPConn)); err != nil {
t.Error("error connecting bootstrap node - ", err)
}
}
if numNodes < 1 {
@ -41,7 +44,11 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode,
panic(err)
}
go dht.Start()
go func() {
if err := dht.Start(); err != nil {
t.Error("error starting dht - ", err)
}
}()
if !concurrent {
dht.WaitUntilJoined()
}
@ -103,7 +110,7 @@ func newTestUDPConn(addr string) *testUDPConn {
func (t testUDPConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) {
var timeoutCh <-chan time.Time
if !t.readDeadline.IsZero() {
timeoutCh = time.After(t.readDeadline.Sub(time.Now()))
timeoutCh = time.After(time.Until(t.readDeadline))
}
select {
@ -218,7 +225,7 @@ func verifyContacts(t *testing.T, contacts []interface{}, nodes []Contact) {
continue
}
for _, n := range nodes {
if n.ID.RawString() == id {
if n.ID.rawString() == id {
currNode = n
currNodeFound = true
foundNodes[id] = true