From b25d592d9954d6eea1c8e23e1385659c9fe58494 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 26 Jul 2018 01:20:34 -0300 Subject: [PATCH] more dht fixes and most of functional tests --- lbrynet/dht/contact.py | 2 +- lbrynet/dht/datastore.py | 9 ++++--- lbrynet/dht/kbucket.py | 1 - tests/functional/dht/dht_test_environment.py | 28 ++++++++++++++------ tests/functional/dht/mock_transport.py | 2 +- tests/functional/dht/test_store.py | 26 +++++++++--------- 6 files changed, 41 insertions(+), 27 deletions(-) diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index dfb24cc01..e17c8b28b 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -114,7 +114,7 @@ class _Contact: return True def __hash__(self): - return self.id.__hash__() + return int(hexlify(self.id), 16) if self.id else -1 def compact_ip(self): compact_ip = reduce( diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index b6d990ca0..2ae0f393d 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -33,12 +33,15 @@ class DictDataStore(UserDict): return filter(lambda peer: self._getTime() - peer[3] < constants.dataExpireTimeout, self[key]) def removeExpiredPeers(self): + expired_keys = [] for key in self.keys(): - unexpired_peers = self.filter_expired_peers(key) + unexpired_peers = list(self.filter_expired_peers(key)) if not unexpired_peers: - del self[key] + expired_keys.append(key) else: - self[key] = list(unexpired_peers) + self[key] = unexpired_peers + for key in expired_keys: + del self[key] def hasPeersForBlob(self, key): return True if key in self and len(tuple(self.filter_bad_and_expired_peers(key))) else False diff --git a/lbrynet/dht/kbucket.py b/lbrynet/dht/kbucket.py index 4fe424bee..64027fe1e 100644 --- a/lbrynet/dht/kbucket.py +++ b/lbrynet/dht/kbucket.py @@ -140,7 +140,6 @@ class KBucket: if not. @rtype: bool """ - assert type(key) in [long, bytes], "{} is {}".format(key, type(key)) # fixme: _maybe_ remove this after porting if isinstance(key, bytes): key = long(hexlify(key), 16) return self.rangeMin <= key < self.rangeMax diff --git a/tests/functional/dht/dht_test_environment.py b/tests/functional/dht/dht_test_environment.py index 233d2403e..6f18c1c57 100644 --- a/tests/functional/dht/dht_test_environment.py +++ b/tests/functional/dht/dht_test_environment.py @@ -1,5 +1,4 @@ import logging -from binascii import hexlify from twisted.trial import unittest from twisted.internet import defer, task @@ -25,9 +24,9 @@ class TestKademliaBase(unittest.TestCase): return node @defer.inlineCallbacks - def add_node(self): + def add_node(self, known_addresses): node = self._add_next_node() - yield node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())]) + yield node.start(known_addresses) defer.returnValue(node) def get_node(self, node_id): @@ -41,13 +40,24 @@ class TestKademliaBase(unittest.TestCase): node = self.nodes.pop() yield node.stop() - def pump_clock(self, n, step=0.1, tick_callback=None): + def pump_clock(self, n, step=None, tick_callback=None): """ :param n: seconds to run the reactor for :param step: reactor tick rate (in seconds) """ - for _ in range(int(n * (1.0 / float(step)))): - self.clock.advance(step) + advanced = 0.0 + while advanced < n: + self.clock._sortCalls() + if step: + next_step = step + elif self.clock.getDelayedCalls(): + next_call = self.clock.getDelayedCalls()[0].getTime() + next_step = min(n - advanced, max(next_call - self.clock.rightNow, .000000000001)) + else: + next_step = n - advanced + assert next_step > 0 + self.clock.advance(next_step) + advanced += float(next_step) if tick_callback and callable(tick_callback): tick_callback(self.clock.seconds()) @@ -116,8 +126,10 @@ class TestKademliaBase(unittest.TestCase): yield self.run_reactor(constants.checkRefreshInterval+1, seed_dl) while len(self.nodes + self._seeds) < self.network_size: network_dl = [] - for i in range(min(10, self.network_size - len(self._seeds) - len(self.nodes))): - network_dl.append(self.add_node()) + # fixme: We are starting one by one to reduce flakiness on time advance. + # fixme: When that improves, get back to 10+! + for i in range(min(1, self.network_size - len(self._seeds) - len(self.nodes))): + network_dl.append(self.add_node(known_addresses)) yield self.run_reactor(constants.checkRefreshInterval*2+1, network_dl) self.assertEqual(len(self.nodes + self._seeds), self.network_size) self.pump_clock(3600) diff --git a/tests/functional/dht/mock_transport.py b/tests/functional/dht/mock_transport.py index f02efb1aa..a000b1773 100644 --- a/tests/functional/dht/mock_transport.py +++ b/tests/functional/dht/mock_transport.py @@ -143,7 +143,7 @@ def debug_kademlia_packet(data, source, destination, node): log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request, node.clock.seconds()) elif isinstance(packet, ResponseMessage): - if isinstance(packet.response, (str, unicode)): + if isinstance(packet.response, bytes): log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response, node.clock.seconds()) else: diff --git a/tests/functional/dht/test_store.py b/tests/functional/dht/test_store.py index 00cdbb443..f5dc8a648 100644 --- a/tests/functional/dht/test_store.py +++ b/tests/functional/dht/test_store.py @@ -19,7 +19,7 @@ class TestStoreExpiration(TestKademliaBase): announcing_node = self.nodes[20] # announce the blob announce_d = announcing_node.announceHaveBlob(blob_hash) - self.pump_clock(5) + self.pump_clock(5+1) storing_node_ids = yield announce_d all_nodes = set(self.nodes).union(set(self._seeds)) @@ -30,8 +30,8 @@ class TestStoreExpiration(TestKademliaBase): for node in storing_nodes: self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(map(lambda contact: (contact.id, contact.address, contact.port), - node._dataStore.getStoringContacts()), [(announcing_node.node_id, + self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port), + node._dataStore.getStoringContacts())), [(announcing_node.node_id, announcing_node.externalIP, announcing_node.port)]) self.assertEqual(len(datastore_result), 1) @@ -52,7 +52,7 @@ class TestStoreExpiration(TestKademliaBase): self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEqual(len(datastore_result), 0) - self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet + self.assertTrue(blob_hash in node._dataStore) # the looping call shouldn't have removed it yet self.assertEqual(len(node._dataStore.getStoringContacts()), 1) self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) @@ -61,7 +61,7 @@ class TestStoreExpiration(TestKademliaBase): datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEqual(len(datastore_result), 0) self.assertEqual(len(node._dataStore.getStoringContacts()), 0) - self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired + self.assertTrue(blob_hash not in node._dataStore.keys()) # the looping call should have fired @defer.inlineCallbacks def test_storing_node_went_stale_then_came_back(self): @@ -69,19 +69,19 @@ class TestStoreExpiration(TestKademliaBase): announcing_node = self.nodes[20] # announce the blob announce_d = announcing_node.announceHaveBlob(blob_hash) - self.pump_clock(5) + self.pump_clock(5+1) storing_node_ids = yield announce_d all_nodes = set(self.nodes).union(set(self._seeds)) # verify the nodes we think stored it did actually store it - storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids] + storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids] self.assertEqual(len(storing_nodes), len(storing_node_ids)) self.assertEqual(len(storing_nodes), constants.k) for node in storing_nodes: self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) datastore_result = node._dataStore.getPeersForBlob(blob_hash) - self.assertEqual(map(lambda contact: (contact.id, contact.address, contact.port), - node._dataStore.getStoringContacts()), [(announcing_node.node_id, + self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port), + node._dataStore.getStoringContacts())), [(announcing_node.node_id, announcing_node.externalIP, announcing_node.port)]) self.assertEqual(len(datastore_result), 1) @@ -111,7 +111,7 @@ class TestStoreExpiration(TestKademliaBase): datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEqual(len(datastore_result), 0) self.assertEqual(len(node._dataStore.getStoringContacts()), 1) - self.assertTrue(blob_hash in node._dataStore._dict) + self.assertTrue(blob_hash in node._dataStore) # # bring the announcing node back online self.nodes.append(announcing_node) @@ -127,7 +127,7 @@ class TestStoreExpiration(TestKademliaBase): datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEqual(len(datastore_result), 1) self.assertEqual(len(node._dataStore.getStoringContacts()), 1) - self.assertTrue(blob_hash in node._dataStore._dict) + self.assertTrue(blob_hash in node._dataStore) # verify the announced blob expires in the storing nodes datastores self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead @@ -135,7 +135,7 @@ class TestStoreExpiration(TestKademliaBase): self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEqual(len(datastore_result), 0) - self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet + self.assertTrue(blob_hash in node._dataStore) # the looping call shouldn't have removed it yet self.assertEqual(len(node._dataStore.getStoringContacts()), 1) self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) @@ -144,4 +144,4 @@ class TestStoreExpiration(TestKademliaBase): datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEqual(len(datastore_result), 0) self.assertEqual(len(node._dataStore.getStoringContacts()), 0) - self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired + self.assertTrue(blob_hash not in node._dataStore) # the looping call should have fired