more dht fixes and most of functional tests

This commit is contained in:
Victor Shyba 2018-07-26 01:20:34 -03:00 committed by Jack Robison
parent 875edb5f76
commit b25d592d99
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 41 additions and 27 deletions

View file

@ -114,7 +114,7 @@ class _Contact:
return True return True
def __hash__(self): def __hash__(self):
return self.id.__hash__() return int(hexlify(self.id), 16) if self.id else -1
def compact_ip(self): def compact_ip(self):
compact_ip = reduce( compact_ip = reduce(

View file

@ -33,12 +33,15 @@ class DictDataStore(UserDict):
return filter(lambda peer: self._getTime() - peer[3] < constants.dataExpireTimeout, self[key]) return filter(lambda peer: self._getTime() - peer[3] < constants.dataExpireTimeout, self[key])
def removeExpiredPeers(self): def removeExpiredPeers(self):
expired_keys = []
for key in self.keys(): for key in self.keys():
unexpired_peers = self.filter_expired_peers(key) unexpired_peers = list(self.filter_expired_peers(key))
if not unexpired_peers: if not unexpired_peers:
del self[key] expired_keys.append(key)
else: else:
self[key] = list(unexpired_peers) self[key] = unexpired_peers
for key in expired_keys:
del self[key]
def hasPeersForBlob(self, key): def hasPeersForBlob(self, key):
return True if key in self and len(tuple(self.filter_bad_and_expired_peers(key))) else False return True if key in self and len(tuple(self.filter_bad_and_expired_peers(key))) else False

View file

@ -140,7 +140,6 @@ class KBucket:
if not. if not.
@rtype: bool @rtype: bool
""" """
assert type(key) in [long, bytes], "{} is {}".format(key, type(key)) # fixme: _maybe_ remove this after porting
if isinstance(key, bytes): if isinstance(key, bytes):
key = long(hexlify(key), 16) key = long(hexlify(key), 16)
return self.rangeMin <= key < self.rangeMax return self.rangeMin <= key < self.rangeMax

View file

@ -1,5 +1,4 @@
import logging import logging
from binascii import hexlify
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer, task from twisted.internet import defer, task
@ -25,9 +24,9 @@ class TestKademliaBase(unittest.TestCase):
return node return node
@defer.inlineCallbacks @defer.inlineCallbacks
def add_node(self): def add_node(self, known_addresses):
node = self._add_next_node() node = self._add_next_node()
yield node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())]) yield node.start(known_addresses)
defer.returnValue(node) defer.returnValue(node)
def get_node(self, node_id): def get_node(self, node_id):
@ -41,13 +40,24 @@ class TestKademliaBase(unittest.TestCase):
node = self.nodes.pop() node = self.nodes.pop()
yield node.stop() yield node.stop()
def pump_clock(self, n, step=0.1, tick_callback=None): def pump_clock(self, n, step=None, tick_callback=None):
""" """
:param n: seconds to run the reactor for :param n: seconds to run the reactor for
:param step: reactor tick rate (in seconds) :param step: reactor tick rate (in seconds)
""" """
for _ in range(int(n * (1.0 / float(step)))): advanced = 0.0
self.clock.advance(step) while advanced < n:
self.clock._sortCalls()
if step:
next_step = step
elif self.clock.getDelayedCalls():
next_call = self.clock.getDelayedCalls()[0].getTime()
next_step = min(n - advanced, max(next_call - self.clock.rightNow, .000000000001))
else:
next_step = n - advanced
assert next_step > 0
self.clock.advance(next_step)
advanced += float(next_step)
if tick_callback and callable(tick_callback): if tick_callback and callable(tick_callback):
tick_callback(self.clock.seconds()) tick_callback(self.clock.seconds())
@ -116,8 +126,10 @@ class TestKademliaBase(unittest.TestCase):
yield self.run_reactor(constants.checkRefreshInterval+1, seed_dl) yield self.run_reactor(constants.checkRefreshInterval+1, seed_dl)
while len(self.nodes + self._seeds) < self.network_size: while len(self.nodes + self._seeds) < self.network_size:
network_dl = [] network_dl = []
for i in range(min(10, self.network_size - len(self._seeds) - len(self.nodes))): # fixme: We are starting one by one to reduce flakiness on time advance.
network_dl.append(self.add_node()) # fixme: When that improves, get back to 10+!
for i in range(min(1, self.network_size - len(self._seeds) - len(self.nodes))):
network_dl.append(self.add_node(known_addresses))
yield self.run_reactor(constants.checkRefreshInterval*2+1, network_dl) yield self.run_reactor(constants.checkRefreshInterval*2+1, network_dl)
self.assertEqual(len(self.nodes + self._seeds), self.network_size) self.assertEqual(len(self.nodes + self._seeds), self.network_size)
self.pump_clock(3600) self.pump_clock(3600)

View file

@ -143,7 +143,7 @@ def debug_kademlia_packet(data, source, destination, node):
log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request, log.debug("request %s --> %s %s (node time %s)", source[0], destination[0], packet.request,
node.clock.seconds()) node.clock.seconds())
elif isinstance(packet, ResponseMessage): elif isinstance(packet, ResponseMessage):
if isinstance(packet.response, (str, unicode)): if isinstance(packet.response, bytes):
log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response, log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response,
node.clock.seconds()) node.clock.seconds())
else: else:

View file

@ -19,7 +19,7 @@ class TestStoreExpiration(TestKademliaBase):
announcing_node = self.nodes[20] announcing_node = self.nodes[20]
# announce the blob # announce the blob
announce_d = announcing_node.announceHaveBlob(blob_hash) announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5) self.pump_clock(5+1)
storing_node_ids = yield announce_d storing_node_ids = yield announce_d
all_nodes = set(self.nodes).union(set(self._seeds)) all_nodes = set(self.nodes).union(set(self._seeds))
@ -30,8 +30,8 @@ class TestStoreExpiration(TestKademliaBase):
for node in storing_nodes: for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(map(lambda contact: (contact.id, contact.address, contact.port), self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts()), [(announcing_node.node_id, node._dataStore.getStoringContacts())), [(announcing_node.node_id,
announcing_node.externalIP, announcing_node.externalIP,
announcing_node.port)]) announcing_node.port)])
self.assertEqual(len(datastore_result), 1) self.assertEqual(len(datastore_result), 1)
@ -52,7 +52,7 @@ class TestStoreExpiration(TestKademliaBase):
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0) self.assertEqual(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet self.assertTrue(blob_hash in node._dataStore) # the looping call shouldn't have removed it yet
self.assertEqual(len(node._dataStore.getStoringContacts()), 1) self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
@ -61,7 +61,7 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0) self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0) self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired self.assertTrue(blob_hash not in node._dataStore.keys()) # the looping call should have fired
@defer.inlineCallbacks @defer.inlineCallbacks
def test_storing_node_went_stale_then_came_back(self): def test_storing_node_went_stale_then_came_back(self):
@ -69,19 +69,19 @@ class TestStoreExpiration(TestKademliaBase):
announcing_node = self.nodes[20] announcing_node = self.nodes[20]
# announce the blob # announce the blob
announce_d = announcing_node.announceHaveBlob(blob_hash) announce_d = announcing_node.announceHaveBlob(blob_hash)
self.pump_clock(5) self.pump_clock(5+1)
storing_node_ids = yield announce_d storing_node_ids = yield announce_d
all_nodes = set(self.nodes).union(set(self._seeds)) all_nodes = set(self.nodes).union(set(self._seeds))
# verify the nodes we think stored it did actually store it # verify the nodes we think stored it did actually store it
storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids] storing_nodes = [node for node in all_nodes if hexlify(node.node_id) in storing_node_ids]
self.assertEqual(len(storing_nodes), len(storing_node_ids)) self.assertEqual(len(storing_nodes), len(storing_node_ids))
self.assertEqual(len(storing_nodes), constants.k) self.assertEqual(len(storing_nodes), constants.k)
for node in storing_nodes: for node in storing_nodes:
self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(map(lambda contact: (contact.id, contact.address, contact.port), self.assertEqual(list(map(lambda contact: (contact.id, contact.address, contact.port),
node._dataStore.getStoringContacts()), [(announcing_node.node_id, node._dataStore.getStoringContacts())), [(announcing_node.node_id,
announcing_node.externalIP, announcing_node.externalIP,
announcing_node.port)]) announcing_node.port)])
self.assertEqual(len(datastore_result), 1) self.assertEqual(len(datastore_result), 1)
@ -111,7 +111,7 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0) self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1) self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore._dict) self.assertTrue(blob_hash in node._dataStore)
# # bring the announcing node back online # # bring the announcing node back online
self.nodes.append(announcing_node) self.nodes.append(announcing_node)
@ -127,7 +127,7 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 1) self.assertEqual(len(datastore_result), 1)
self.assertEqual(len(node._dataStore.getStoringContacts()), 1) self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.assertTrue(blob_hash in node._dataStore._dict) self.assertTrue(blob_hash in node._dataStore)
# verify the announced blob expires in the storing nodes datastores # verify the announced blob expires in the storing nodes datastores
self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead
@ -135,7 +135,7 @@ class TestStoreExpiration(TestKademliaBase):
self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash))
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0) self.assertEqual(len(datastore_result), 0)
self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet self.assertTrue(blob_hash in node._dataStore) # the looping call shouldn't have removed it yet
self.assertEqual(len(node._dataStore.getStoringContacts()), 1) self.assertEqual(len(node._dataStore.getStoringContacts()), 1)
self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh)
@ -144,4 +144,4 @@ class TestStoreExpiration(TestKademliaBase):
datastore_result = node._dataStore.getPeersForBlob(blob_hash) datastore_result = node._dataStore.getPeersForBlob(blob_hash)
self.assertEqual(len(datastore_result), 0) self.assertEqual(len(datastore_result), 0)
self.assertEqual(len(node._dataStore.getStoringContacts()), 0) self.assertEqual(len(node._dataStore.getStoringContacts()), 0)
self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired self.assertTrue(blob_hash not in node._dataStore) # the looping call should have fired