From 78c8c8e64d4043c40f0c5123449ad3bc6ec0005c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 24 Jul 2018 18:51:41 -0300 Subject: [PATCH] more porting, plus some functional tests working --- lbrynet/dht/contact.py | 1 - lbrynet/dht/datastore.py | 2 +- lbrynet/dht/encoding.py | 2 ++ lbrynet/dht/iterativefind.py | 12 ++++----- lbrynet/dht/node.py | 25 +++++++++++-------- lbrynet/dht/protocol.py | 12 ++++----- lbrynet/dht/routingtable.py | 6 +++-- tests/functional/dht/dht_test_environment.py | 2 +- .../functional/dht/test_contact_expiration.py | 6 ++--- 9 files changed, 38 insertions(+), 30 deletions(-) diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index 9497cfa70..dfb24cc01 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -182,7 +182,6 @@ class ContactManager: return contact def make_contact(self, id, ipAddress, udpPort, networkProtocol, firstComm=0): - ipAddress = str(ipAddress) contact = self.get_contact(id, ipAddress, udpPort) if contact: return contact diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index 916f6013c..b6d990ca0 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -38,7 +38,7 @@ class DictDataStore(UserDict): if not unexpired_peers: del self[key] else: - self[key] = unexpired_peers + self[key] = list(unexpired_peers) def hasPeersForBlob(self, key): return True if key in self and len(tuple(self.filter_bad_and_expired_peers(key))) else False diff --git a/lbrynet/dht/encoding.py b/lbrynet/dht/encoding.py index e868ca582..631c65e36 100644 --- a/lbrynet/dht/encoding.py +++ b/lbrynet/dht/encoding.py @@ -60,6 +60,8 @@ class Bencode(Encoding): return b'i%de' % data elif isinstance(data, bytes): return b'%d:%s' % (len(data), data) + elif isinstance(data, str): + return b'%d:' % (len(data)) + data.encode() elif isinstance(data, (list, tuple)): encodedListItems = b'' for item in data: diff --git a/lbrynet/dht/iterativefind.py b/lbrynet/dht/iterativefind.py index d6039d5dd..d8136a8ac 100644 --- a/lbrynet/dht/iterativefind.py +++ b/lbrynet/dht/iterativefind.py @@ -38,7 +38,7 @@ class _IterativeFind: # Shortlist of contact objects (the k closest known contacts to the key from the routing table) self.shortlist = shortlist # The search key - self.key = str(key) + self.key = key # The rpc method name (findValue or findNode) self.rpc = rpc # List of active queries; len() indicates number of active probes @@ -74,22 +74,22 @@ class _IterativeFind: for contact_tup in contact_triples: if not isinstance(contact_tup, (list, tuple)) or len(contact_tup) != 3: raise ValueError("invalid contact triple") + contact_tup[1] = contact_tup[1].decode() # ips are strings return contact_triples def sortByDistance(self, contact_list): """Sort the list of contacts in order by distance from key""" contact_list.sort(key=lambda c: self.distance(c.id)) - @defer.inlineCallbacks def extendShortlist(self, contact, result): # The "raw response" tuple contains the response message and the originating address info originAddress = (contact.address, contact.port) if self.finished_deferred.called: - defer.returnValue(contact.id) + return contact.id if self.node.contact_manager.is_ignored(originAddress): raise ValueError("contact is ignored") if contact.id == self.node.node_id: - defer.returnValue(contact.id) + return contact.id if contact not in self.active_contacts: self.active_contacts.append(contact) @@ -134,14 +134,14 @@ class _IterativeFind: self.sortByDistance(self.active_contacts) self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))]) - defer.returnValue(contact.id) + return contact.id @defer.inlineCallbacks def probeContact(self, contact): fn = getattr(contact, self.rpc) try: response = yield fn(self.key) - result = yield self.extendShortlist(contact, response) + result = self.extendShortlist(contact, response) defer.returnValue(result) except (TimeoutError, defer.CancelledError, ValueError, IndexError): defer.returnValue(contact.id) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 118872844..517abd3a4 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -162,6 +162,13 @@ class Node(MockKademliaHelper): # if hasattr(self, "_listeningPort") and self._listeningPort is not None: # self._listeningPort.stopListening() + def __str__(self): + return '<%s.%s object; ID: %s, IP address: %s, UDP port: %d>' % ( + self.__module__, self.__class__.__name__, binascii.hexlify(self.node_id), self.externalIP, self.port) + + def __hash__(self): + return self.node_id.__hash__() + @defer.inlineCallbacks def stop(self): # stop LoopingCalls: @@ -315,10 +322,10 @@ class Node(MockKademliaHelper): token = contact.token if not token: find_value_response = yield contact.findValue(blob_hash) - token = find_value_response['token'] + token = find_value_response[b'token'] contact.update_token(token) res = yield contact.store(blob_hash, token, self.peerPort, self.node_id, 0) - if res != "OK": + if res != b"OK": raise ValueError(res) defer.returnValue(True) log.debug("Stored %s to %s (%s)", binascii.hexlify(blob_hash), contact.log_id(), contact.address) @@ -326,7 +333,7 @@ class Node(MockKademliaHelper): log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(blob_hash), contact.log_id()) except ValueError as err: - log.error("Unexpected response: %s" % err.message) + log.error("Unexpected response: %s" % err) except Exception as err: log.error("Unexpected error while storing blob_hash %s at %s: %s", binascii.hexlify(blob_hash), contact, err) @@ -339,9 +346,7 @@ class Node(MockKademliaHelper): if not self.externalIP: raise Exception("Cannot determine external IP: %s" % self.externalIP) stored_to = yield DeferredDict({contact: self.storeToContact(blob_hash, contact) for contact in contacts}) - contacted_node_ids = map( - lambda contact: contact.id.encode('hex'), filter(lambda contact: stored_to[contact], stored_to.keys()) - ) + contacted_node_ids = [binascii.hexlify(contact.id) for contact in stored_to.keys() if stored_to[contact]] log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(blob_hash), len(contacted_node_ids), len(contacts)) defer.returnValue(contacted_node_ids) @@ -403,7 +408,7 @@ class Node(MockKademliaHelper): @rtype: twisted.internet.defer.Deferred """ - if len(key) != constants.key_bits / 8: + if len(key) != constants.key_bits // 8: raise ValueError("invalid key length!") # Execute the search @@ -554,7 +559,7 @@ class Node(MockKademliaHelper): node is returning all of the contacts that it knows of. @rtype: list """ - if len(key) != constants.key_bits / 8: + if len(key) != constants.key_bits // 8: raise ValueError("invalid contact id length: %i" % len(key)) contacts = self._routingTable.findCloseNodes(key, sender_node_id=rpc_contact.id) @@ -576,7 +581,7 @@ class Node(MockKademliaHelper): @rtype: dict or list """ - if len(key) != constants.key_bits / 8: + if len(key) != constants.key_bits // 8: raise ValueError("invalid blob hash length: %i" % len(key)) response = { @@ -645,7 +650,7 @@ class Node(MockKademliaHelper): @rtype: twisted.internet.defer.Deferred """ - if len(key) != constants.key_bits / 8: + if len(key) != constants.key_bits // 8: raise ValueError("invalid key length: %i" % len(key)) if startupShortlist is None: diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 3817486e6..d52baad04 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -220,17 +220,17 @@ class KademliaProtocol(protocol.DatagramProtocol): @note: This is automatically called by Twisted when the protocol receives a UDP datagram """ - if datagram[0] == b'\x00' and datagram[25] == b'\x00': - totalPackets = (ord(datagram[1]) << 8) | ord(datagram[2]) + if chr(datagram[0]) == '\x00' and chr(datagram[25]) == '\x00': + totalPackets = (datagram[1] << 8) | datagram[2] msgID = datagram[5:25] - seqNumber = (ord(datagram[3]) << 8) | ord(datagram[4]) + seqNumber = (datagram[3] << 8) | datagram[4] if msgID not in self._partialMessages: self._partialMessages[msgID] = {} self._partialMessages[msgID][seqNumber] = datagram[26:] if len(self._partialMessages[msgID]) == totalPackets: keys = self._partialMessages[msgID].keys() keys.sort() - data = '' + data = b'' for key in keys: data += self._partialMessages[msgID][key] datagram = data @@ -350,7 +350,7 @@ class KademliaProtocol(protocol.DatagramProtocol): # 1st byte is transmission type id, bytes 2 & 3 are the # total number of packets in this transmission, bytes 4 & # 5 are the sequence number for this specific packet - totalPackets = len(data) / self.msgSizeLimit + totalPackets = len(data) // self.msgSizeLimit if len(data) % self.msgSizeLimit > 0: totalPackets += 1 encTotalPackets = chr(totalPackets >> 8) + chr(totalPackets & 0xff) @@ -375,7 +375,7 @@ class KademliaProtocol(protocol.DatagramProtocol): if self.transport: try: self.transport.write(txData, address) - except socket.error as err: + except OSError as err: if err.errno == errno.EWOULDBLOCK: # i'm scared this may swallow important errors, but i get a million of these # on Linux and it doesnt seem to affect anything -grin diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 579763139..0d5deaa0a 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -6,6 +6,8 @@ # may be created by processing this file with epydoc: http://epydoc.sf.net import random +from binascii import unhexlify + from twisted.internet import defer from . import constants from . import kbucket @@ -267,8 +269,8 @@ class TreeRoutingTable: randomID = randomID[:-1] if len(randomID) % 2 != 0: randomID = '0' + randomID - randomID = randomID.decode('hex') - randomID = (constants.key_bits / 8 - len(randomID)) * '\x00' + randomID + randomID = unhexlify(randomID) + randomID = ((constants.key_bits // 8) - len(randomID)) * b'\x00' + randomID return randomID def _splitBucket(self, oldBucketIndex): diff --git a/tests/functional/dht/dht_test_environment.py b/tests/functional/dht/dht_test_environment.py index 3bda2a981..233d2403e 100644 --- a/tests/functional/dht/dht_test_environment.py +++ b/tests/functional/dht/dht_test_environment.py @@ -173,5 +173,5 @@ class TestKademliaBase(unittest.TestCase): yield self.run_reactor(2, ping_dl) node_addresses = {node.externalIP for node in self.nodes}.union({seed.externalIP for seed in self._seeds}) self.assertSetEqual(node_addresses, contacted) - expected = {node: "pong" for node in contacted} + expected = {node: b"pong" for node in contacted} self.assertDictEqual(ping_replies, expected) diff --git a/tests/functional/dht/test_contact_expiration.py b/tests/functional/dht/test_contact_expiration.py index c62cac866..50156849d 100644 --- a/tests/functional/dht/test_contact_expiration.py +++ b/tests/functional/dht/test_contact_expiration.py @@ -25,9 +25,9 @@ class TestPeerExpiration(TestKademliaBase): offline_addresses = self.get_routable_addresses().difference(self.get_online_addresses()) self.assertSetEqual(offline_addresses, removed_addresses) - get_nodes_with_stale_contacts = lambda: filter(lambda node: any(contact.address in offline_addresses - for contact in node.contacts), - self.nodes + self._seeds) + get_nodes_with_stale_contacts = lambda: list(filter(lambda node: any(contact.address in offline_addresses + for contact in node.contacts), + self.nodes + self._seeds)) self.assertRaises(AssertionError, self.verify_all_nodes_are_routable) self.assertTrue(len(get_nodes_with_stale_contacts()) > 1)