From 5c44ca40c283b1f9c789d9545ef0bfbcacd1e506 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 7 Mar 2018 19:49:33 -0500 Subject: [PATCH] store works. fixed some bencode bugs --- dht/bitmap.go | 17 ++++++++ dht/bitmap_test.go | 52 +++++++++++++++++++++++- dht/dht.go | 10 +++-- dht/dht_test.go | 96 ++++++++++++++++++++++++++------------------- dht/message.go | 30 +++++++------- dht/message_test.go | 2 +- dht/store.go | 24 +++++------- 7 files changed, 153 insertions(+), 78 deletions(-) diff --git a/dht/bitmap.go b/dht/bitmap.go index 74224b8..3125d6c 100644 --- a/dht/bitmap.go +++ b/dht/bitmap.go @@ -4,6 +4,8 @@ import ( "encoding/hex" "math/rand" "strconv" + + "github.com/zeebo/bencode" ) type bitmap [nodeIDLength]byte @@ -54,6 +56,21 @@ func (b bitmap) PrefixLen() (ret int) { return nodeIDLength*8 - 1 } +func (b *bitmap) UnmarshalBencode(encoded []byte) error { + var str string + err := bencode.DecodeBytes(encoded, &str) + if err != nil { + return err + } + copy(b[:], str) + return nil +} + +func (b bitmap) MarshalBencode() ([]byte, error) { + str := string(b[:]) + return bencode.EncodeBytes(str) +} + func newBitmapFromBytes(data []byte) bitmap { if len(data) != nodeIDLength { panic("invalid bitmap of length " + strconv.Itoa(len(data))) diff --git a/dht/bitmap_test.go b/dht/bitmap_test.go index 325f573..8b237ce 100644 --- a/dht/bitmap_test.go +++ b/dht/bitmap_test.go @@ -1,6 +1,10 @@ package dht -import "testing" +import ( + "testing" + + "github.com/zeebo/bencode" +) func TestBitmap(t *testing.T) { a := bitmap{ @@ -46,3 +50,49 @@ func TestBitmap(t *testing.T) { t.Error(newBitmapFromHex(id).Hex()) } } + +func TestBitmapMarshal(t *testing.T) { + b := newBitmapFromString("123456789012345678901234567890123456789012345678") + encoded, err := bencode.EncodeBytes(b) + if err != nil { + t.Error(err) + } + + if string(encoded) != "48:123456789012345678901234567890123456789012345678" { + t.Error("encoding does not match expected") + } +} + +func TestBitmapMarshalEmbedded(t *testing.T) { + e := struct { + A string + B bitmap + C int + }{ + A: "1", + B: newBitmapFromString("222222222222222222222222222222222222222222222222"), + C: 3, + } + + encoded, err := bencode.EncodeBytes(e) + if err != nil { + t.Error(err) + } + + if string(encoded) != "d1:A1:11:B48:2222222222222222222222222222222222222222222222221:Ci3ee" { + t.Error("encoding does not match expected") + } +} + +func TestBitmapMarshalEmbedded2(t *testing.T) { + encoded, err := bencode.EncodeBytes([]interface{}{ + newBitmapFromString("333333333333333333333333333333333333333333333333"), + }) + if err != nil { + t.Error(err) + } + + if string(encoded) != "l48:333333333333333333333333333333333333333333333333e" { + t.Error("encoding does not match expected") + } +} diff --git a/dht/dht.go b/dht/dht.go index 600f327..369dad7 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -167,6 +167,7 @@ func handle(dht *DHT, pkt packet) { request := Request{} err = bencode.DecodeBytes(pkt.data, &request) if err != nil { + log.Errorln(err) return } log.Debugf("[%s] query %s: received request from %s: %s(%s)", dht.node.id.Hex()[:8], hex.EncodeToString([]byte(request.ID))[:8], hex.EncodeToString([]byte(request.NodeID))[:8], request.Method, argsToString(request.Args)) @@ -206,12 +207,13 @@ func handleRequest(dht *DHT, addr *net.UDPAddr, request Request) (success bool) switch request.Method { case pingMethod: - log.Println("ping") send(dht, addr, Response{ID: request.ID, NodeID: dht.node.id.RawString(), Data: pingSuccessResponse}) case storeMethod: - log.Println("store") - node := &Node{id: newBitmapFromHex(request.StoreArgs.Value.LbryID), addr: request.StoreArgs.Value.Port} - dht.store.Insert(newBitmapFromHex(request.StoreArgs.BlobHash)) + if request.StoreArgs.BlobHash == "" { + log.Errorln("blobhash is empty") + return // nothing to store + } + dht.store.Insert(request.StoreArgs.BlobHash, request.StoreArgs.NodeID) send(dht, addr, Response{ID: request.ID, NodeID: dht.node.id.RawString(), Data: storeSuccessResponse}) case findNodeMethod: log.Println("findnode") diff --git a/dht/dht_test.go b/dht/dht_test.go index a3589b2..95fd3c0 100644 --- a/dht/dht_test.go +++ b/dht/dht_test.go @@ -123,6 +123,7 @@ func TestStore(t *testing.T) { Method: storeMethod, StoreArgs: &storeArgs{ BlobHash: blobHashToStore, + NodeID: testNodeID, }, } storeRequest.StoreArgs.Value.Token = "arst" @@ -155,69 +156,82 @@ func TestStore(t *testing.T) { conn.toRead <- testUDPPacket{addr: conn.addr, data: data} timer := time.NewTimer(3 * time.Second) + var response map[string]interface{} select { case <-timer.C: t.Error("timeout") + return case resp := <-conn.writes: - var response map[string]interface{} err := bencode.DecodeBytes(resp.data, &response) if err != nil { t.Error(err) return } + } - if len(response) != 4 { - t.Errorf("expected 4 response fields, got %d", len(response)) - } + if len(response) != 4 { + t.Errorf("expected 4 response fields, got %d", len(response)) + } - _, ok := response[headerTypeField] + _, ok := response[headerTypeField] + if !ok { + t.Error("missing type field") + } else { + rType, ok := response[headerTypeField].(int64) if !ok { - t.Error("missing type field") - } else { - rType, ok := response[headerTypeField].(int64) - if !ok { - t.Error("type is not an integer") - } else if rType != responseType { - t.Error("unexpected response type") - } + t.Error("type is not an integer") + } else if rType != responseType { + t.Error("unexpected response type") } + } - _, ok = response[headerMessageIDField] + _, ok = response[headerMessageIDField] + if !ok { + t.Error("missing message id field") + } else { + rMessageID, ok := response[headerMessageIDField].(string) if !ok { - t.Error("missing message id field") - } else { - rMessageID, ok := response[headerMessageIDField].(string) - if !ok { - t.Error("message ID is not a string") - } else if rMessageID != messageID { - t.Error("unexpected message ID") - } + t.Error("message ID is not a string") + } else if rMessageID != messageID { + t.Error("unexpected message ID") } + } - _, ok = response[headerNodeIDField] + _, ok = response[headerNodeIDField] + if !ok { + t.Error("missing node id field") + } else { + rNodeID, ok := response[headerNodeIDField].(string) if !ok { - t.Error("missing node id field") - } else { - rNodeID, ok := response[headerNodeIDField].(string) - if !ok { - t.Error("node ID is not a string") - } else if rNodeID != dhtNodeID.RawString() { - t.Error("unexpected node ID") - } + t.Error("node ID is not a string") + } else if rNodeID != dhtNodeID.RawString() { + t.Error("unexpected node ID") } + } - _, ok = response[headerPayloadField] + _, ok = response[headerPayloadField] + if !ok { + t.Error("missing payload field") + } else { + rNodeID, ok := response[headerPayloadField].(string) if !ok { - t.Error("missing payload field") - } else { - rNodeID, ok := response[headerPayloadField].(string) - if !ok { - t.Error("payload is not a string") - } else if rNodeID != storeSuccessResponse { - t.Error("did not return OK") - } + t.Error("payload is not a string") + } else if rNodeID != storeSuccessResponse { + t.Error("did not return OK") } } + + if len(dht.store.data) != 1 { + t.Error("dht store has wrong number of items") + } + + items := dht.store.Get(blobHashToStore) + if len(items) != 1 { + t.Error("list created in store, but nothing in list") + } + if !items[0].Equals(testNodeID) { + t.Error("wrong value stored") + } } func TestFindNode(t *testing.T) { @@ -260,7 +274,7 @@ func TestFindValue(t *testing.T) { dht.listen() go dht.runHandler() - data, _ := hex.DecodeString("6469306569306569316532303a7de8e57d34e316abbb5a8a8da50dcd1ad4c80e0f69326534383a7ce1b831dec8689e44f80f547d2dea171f6a625e1a4ff6c6165e645f953103dabeb068a622203f859c6c64658fd3aa3b693365393a66696e6456616c75656934656c34383aa47624b8e7ee1e54df0c45e2eb858feb0b705bd2a78d8b739be31ba188f4bd6f56b371c51fecc5280d5fd26ba4168e966565") + data, _ := hex.DecodeString("64313a30693065313a3132303a7de8e57d34e316abbb5a8a8da50dcd1ad4c80e0f313a3234383a7ce1b831dec8689e44f80f547d2dea171f6a625e1a4ff6c6165e645f953103dabeb068a622203f859c6c64658fd3aa3b313a33393a66696e6456616c7565313a346c34383aa47624b8e7ee1e54df0c45e2eb858feb0b705bd2a78d8b739be31ba188f4bd6f56b371c51fecc5280d5fd26ba4168e966565") conn.toRead <- testUDPPacket{addr: conn.addr, data: data} timer := time.NewTimer(3 * time.Second) diff --git a/dht/message.go b/dht/message.go index 120872e..c490286 100644 --- a/dht/message.go +++ b/dht/message.go @@ -36,7 +36,6 @@ const ( type Message interface { bencode.Marshaler - GetID() string } type Request struct { @@ -47,7 +46,6 @@ type Request struct { StoreArgs *storeArgs } -func (r Request) GetID() string { return r.ID } func (r Request) MarshalBencode() ([]byte, error) { var args interface{} if r.StoreArgs != nil { @@ -73,7 +71,7 @@ func (r *Request) UnmarshalBencode(b []byte) error { } err := bencode.DecodeBytes(b, &raw) if err != nil { - return err + return errors.Prefix("request unmarshal", err) } r.ID = raw.ID @@ -81,29 +79,30 @@ func (r *Request) UnmarshalBencode(b []byte) error { r.Method = raw.Method if r.Method == storeMethod { + r.StoreArgs = &storeArgs{} // bencode wont find the unmarshaler on a null pointer. need to fix it. err = bencode.DecodeBytes(raw.Args, &r.StoreArgs) } else { err = bencode.DecodeBytes(raw.Args, &r.Args) } if err != nil { - return err + return errors.Prefix("request unmarshal", err) } return nil } type storeArgs struct { - BlobHash string // 48 bytes + BlobHash string Value struct { Token string `bencode:"token"` LbryID string `bencode:"lbryid"` Port int `bencode:"port"` } - NodeID string // 48 bytes - SelfStore bool // this is an int on the wire + NodeID bitmap + SelfStore bool // this is an int on the wire } -func (s *storeArgs) MarshalBencode() ([]byte, error) { +func (s storeArgs) MarshalBencode() ([]byte, error) { encodedValue, err := bencode.EncodeString(s.Value) if err != nil { return nil, err @@ -126,7 +125,7 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error { var argsInt []bencode.RawMessage err := bencode.DecodeBytes(b, &argsInt) if err != nil { - return err + return errors.Prefix("storeArgs unmarshal", err) } if len(argsInt) != 4 { @@ -135,23 +134,23 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error { err = bencode.DecodeBytes(argsInt[0], &s.BlobHash) if err != nil { - return errors.Err(err) + return errors.Prefix("storeArgs unmarshal", err) } err = bencode.DecodeBytes(argsInt[1], &s.Value) if err != nil { - return errors.Err(err) + return errors.Prefix("storeArgs unmarshal", err) } err = bencode.DecodeBytes(argsInt[2], &s.NodeID) if err != nil { - return errors.Err(err) + return errors.Prefix("storeArgs unmarshal", err) } var selfStore int err = bencode.DecodeBytes(argsInt[3], &selfStore) if err != nil { - return errors.Err(err) + return errors.Prefix("storeArgs unmarshal", err) } if selfStore == 0 { s.SelfStore = false @@ -204,8 +203,6 @@ type Response struct { FindNodeData []findNodeDatum } -func (r Response) GetID() string { return r.ID } - func (r Response) MarshalBencode() ([]byte, error) { data := map[string]interface{}{ headerTypeField: responseType, @@ -246,6 +243,8 @@ func (r *Response) UnmarshalBencode(b []byte) error { return err } } + + return nil } type Error struct { @@ -255,7 +254,6 @@ type Error struct { ExceptionType string } -func (e Error) GetID() string { return e.ID } func (e Error) MarshalBencode() ([]byte, error) { return bencode.EncodeBytes(map[string]interface{}{ headerTypeField: errorType, diff --git a/dht/message_test.go b/dht/message_test.go index f2ec466..b03b46d 100644 --- a/dht/message_test.go +++ b/dht/message_test.go @@ -58,7 +58,7 @@ func TestBencodeDecodeStoreArgs(t *testing.T) { if hex.EncodeToString([]byte(storeArgs.Value.Token)) != strings.ToLower(token) { t.Error("token mismatch") } - if hex.EncodeToString([]byte(storeArgs.NodeID)) != strings.ToLower(nodeID) { + if storeArgs.NodeID.Hex() != strings.ToLower(nodeID) { t.Error("node id mismatch") } if !storeArgs.SelfStore { diff --git a/dht/store.go b/dht/store.go index c7fbeea..1da7fc5 100644 --- a/dht/store.go +++ b/dht/store.go @@ -1,32 +1,26 @@ package dht -import ( - "sync" - "time" -) +import "sync" type peer struct { - node *Node - lastPublished time.Time - originallyPublished time.Time - originalPublisherID bitmap + nodeID bitmap } type peerStore struct { - data map[bitmap][]peer + data map[string][]peer lock sync.RWMutex } func newPeerStore() *peerStore { return &peerStore{ - data: map[bitmap][]peer{}, + data: map[string][]peer{}, } } -func (s *peerStore) Insert(key bitmap, node *Node, lastPublished, originallyPublished time.Time, originaPublisherID bitmap) { +func (s *peerStore) Insert(key string, nodeId bitmap) { s.lock.Lock() defer s.lock.Unlock() - newPeer := peer{node: node, lastPublished: lastPublished, originallyPublished: originallyPublished, originalPublisherID: originaPublisherID} + newPeer := peer{nodeID: nodeId} _, ok := s.data[key] if !ok { s.data[key] = []peer{newPeer} @@ -35,13 +29,13 @@ func (s *peerStore) Insert(key bitmap, node *Node, lastPublished, originallyPubl } } -func (s *peerStore) GetNodes(key bitmap) []*Node { +func (s *peerStore) Get(key string) []bitmap { s.lock.RLock() defer s.lock.RUnlock() - nodes := []*Node{} + nodes := []bitmap{} if peers, ok := s.data[key]; ok { for _, p := range peers { - nodes = append(nodes, p.node) + nodes = append(nodes, p.nodeID) } } return nodes