From d02ed29e505009e8d68345e41f9cc428cb1830e4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 24 May 2018 10:23:22 -0400 Subject: [PATCH] add kademlia store and expiration test --- lbrynet/dht/datastore.py | 22 ++++----- lbrynet/dht/node.py | 20 +++++--- lbrynet/tests/functional/dht/test_store.py | 56 ++++++++++++++++++++++ 3 files changed, 80 insertions(+), 18 deletions(-) create mode 100644 lbrynet/tests/functional/dht/test_store.py diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index 57cdac9ae..edfa543fc 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -23,18 +23,16 @@ class DictDataStore(UserDict.DictMixin): def removeExpiredPeers(self): now = int(self._getTime()) - - def notExpired(peer): - if (now - peer[2]) > constants.dataExpireTimeout: - return False - return True - for key in self._dict.keys(): - unexpired_peers = filter(notExpired, self._dict[key]) - self._dict[key] = unexpired_peers + unexpired_peers = filter(lambda peer: now - peer[2] < constants.dataExpireTimeout, self._dict[key]) + if not unexpired_peers: + del self._dict[key] + else: + self._dict[key] = unexpired_peers def hasPeersForBlob(self, key): - if key in self._dict and len(self._dict[key]) > 0: + if key in self._dict and len(filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout, + self._dict[key])): return True return False @@ -46,8 +44,10 @@ class DictDataStore(UserDict.DictMixin): self._dict[key] = [(value, lastPublished, originallyPublished, originalPublisherID)] def getPeersForBlob(self, key): - if key in self._dict: - return [val[0] for val in self._dict[key]] + return [] if key not in self._dict else [ + val[0] for val in filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout, + self._dict[key]) + ] def removePeer(self, value): for key in self._dict: diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index eeaed7edd..aaca0a372 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -30,6 +30,14 @@ from iterativefind import iterativeFind log = logging.getLogger(__name__) +def expand_peer(compact_peer_info): + host = ".".join([str(ord(d)) for d in compact_peer_info[:4]]) + port, = struct.unpack('>H', compact_peer_info[4:6]) + peer_node_id = compact_peer_info[6:] + return (peer_node_id, host, port) + + + def rpcmethod(func): """ Decorator to expose Node methods as remote procedure calls @@ -142,7 +150,7 @@ class Node(MockKademliaHelper): self.old_token_secret = None self.externalIP = externalIP self.peerPort = peerPort - self._dataStore = dataStore or datastore.DictDataStore() + self._dataStore = dataStore or datastore.DictDataStore(self.clock.seconds) self.peer_manager = peer_manager or PeerManager() self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager) self._join_deferred = None @@ -428,11 +436,9 @@ class Node(MockKademliaHelper): if find_result: if key in find_result: for peer in find_result[key]: - host = ".".join([str(ord(d)) for d in peer[:4]]) - port, = struct.unpack('>H', peer[4:6]) - peer_node_id = peer[6:] - if (host, port, peer_node_id) not in expanded_peers: - expanded_peers.append((peer_node_id, host, port)) + expanded = expand_peer(peer) + if expanded not in expanded_peers: + expanded_peers.append(expanded) # TODO: get this working # if 'closestNodeNoValue' in find_result: # closest_node_without_value = find_result['closestNodeNoValue'] @@ -532,7 +538,7 @@ class Node(MockKademliaHelper): raise TypeError('Invalid port') compact_address = compact_ip + compact_port + rpc_contact.id - now = int(time.time()) + now = int(self.clock.seconds()) originallyPublished = now - age self._dataStore.addPeerToBlob(blob_hash, compact_address, now, originallyPublished, originalPublisherID) return 'OK' diff --git a/lbrynet/tests/functional/dht/test_store.py b/lbrynet/tests/functional/dht/test_store.py new file mode 100644 index 000000000..a33a28f77 --- /dev/null +++ b/lbrynet/tests/functional/dht/test_store.py @@ -0,0 +1,56 @@ +import struct +from twisted.internet import defer +from lbrynet.dht import constants +from lbrynet.core.utils import generate_id +from dht_test_environment import TestKademliaBase +import logging + +log = logging.getLogger() + + +class TestStore(TestKademliaBase): + network_size = 40 + + @defer.inlineCallbacks + def test_store_and_expire(self): + blob_hash = generate_id() + announcing_node = self.nodes[20] + # announce the blob + announce_d = announcing_node.announceHaveBlob(blob_hash) + self.pump_clock(5) + storing_node_ids = yield announce_d + all_nodes = set(self.nodes).union(set(self._seeds)) + + # verify the nodes we think stored it did actually store it + storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids] + self.assertEquals(len(storing_nodes), len(storing_node_ids)) + self.assertEquals(len(storing_nodes), constants.k) + for node in storing_nodes: + self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(len(datastore_result), 1) + expanded_peers = [] + for peer in datastore_result: + host = ".".join([str(ord(d)) for d in peer[:4]]) + port, = struct.unpack('>H', peer[4:6]) + peer_node_id = peer[6:] + if (host, port, peer_node_id) not in expanded_peers: + expanded_peers.append((peer_node_id, host, port)) + self.assertEquals(expanded_peers[0], + (announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort)) + + # verify the announced blob expires in the storing nodes datastores + + self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead + for node in storing_nodes: + self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(len(datastore_result), 0) + self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet + + self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) + for node in storing_nodes: + self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(len(datastore_result), 0) + self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired after