From 07f92014d7229c7665cba2f88fb320181261b326 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 24 May 2018 15:52:37 -0400 Subject: [PATCH] omit bad contacts from getPeersForBlob -refresh stored contacts to detect when they go offline or come back --- lbrynet/dht/contact.py | 17 ++- lbrynet/dht/datastore.py | 44 ++++--- lbrynet/dht/node.py | 34 ++++-- lbrynet/tests/functional/dht/test_store.py | 130 +++++++++++++++++++++ 4 files changed, 198 insertions(+), 27 deletions(-) diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index 9cf6eb67a..b8dac520f 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -1,6 +1,15 @@ +import ipaddress from lbrynet.dht import constants +def is_valid_ipv4(address): + try: + ip = ipaddress.ip_address(address.decode()) # this needs to be unicode, thus the decode() + return ip.version == 4 + except ipaddress.AddressValueError: + return False + + class _Contact(object): """ Encapsulation for remote contact @@ -9,11 +18,15 @@ class _Contact(object): """ def __init__(self, contactManager, id, ipAddress, udpPort, networkProtocol, firstComm): - self._contactManager = contactManager - self._id = id if id is not None: if not len(id) == constants.key_bits / 8: raise ValueError("invalid node id: %s" % id.encode('hex')) + if not 0 <= udpPort <= 65536: + raise ValueError("invalid port") + if not is_valid_ipv4(ipAddress): + raise ValueError("invalid ip address") + self._contactManager = contactManager + self._id = id self.address = ipAddress self.port = udpPort self._networkProtocol = networkProtocol diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index edfa543fc..f9e06923c 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -21,36 +21,54 @@ class DictDataStore(UserDict.DictMixin): """ Return a list of the keys in this data store """ return self._dict.keys() + def filter_bad_and_expired_peers(self, key): + """ + Returns only non-expired and unknown/good peers + """ + return filter( + lambda peer: + self._getTime() - peer[3] < constants.dataExpireTimeout and peer[0].contact_is_good is not False, + self._dict[key] + ) + + def filter_expired_peers(self, key): + """ + Returns only non-expired peers + """ + return filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout, self._dict[key]) + def removeExpiredPeers(self): - now = int(self._getTime()) for key in self._dict.keys(): - unexpired_peers = filter(lambda peer: now - peer[2] < constants.dataExpireTimeout, self._dict[key]) + unexpired_peers = self.filter_expired_peers(key) if not unexpired_peers: del self._dict[key] else: self._dict[key] = unexpired_peers def hasPeersForBlob(self, key): - if key in self._dict and len(filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout, - self._dict[key])): + if key in self._dict and len(self.filter_bad_and_expired_peers(key)): return True return False - def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): + def addPeerToBlob(self, contact, key, compact_address, lastPublished, originallyPublished, originalPublisherID): if key in self._dict: - if value not in map(lambda store_tuple: store_tuple[0], self._dict[key]): - self._dict[key].append((value, lastPublished, originallyPublished, originalPublisherID)) + if compact_address not in map(lambda store_tuple: store_tuple[1], self._dict[key]): + self._dict[key].append((contact, compact_address, lastPublished, originallyPublished, originalPublisherID)) else: - self._dict[key] = [(value, lastPublished, originallyPublished, originalPublisherID)] + self._dict[key] = [(contact, compact_address, lastPublished, originallyPublished, originalPublisherID)] def getPeersForBlob(self, key): - return [] if key not in self._dict else [ - val[0] for val in filter(lambda peer: self._getTime() - peer[2] < constants.dataExpireTimeout, - self._dict[key]) - ] + return [] if key not in self._dict else [val[1] for val in self.filter_bad_and_expired_peers(key)] def removePeer(self, value): for key in self._dict: - self._dict[key] = [val for val in self._dict[key] if val[0] != value] + self._dict[key] = [val for val in self._dict[key] if val[1] != value] if not self._dict[key]: del self._dict[key] + + def getStoringContacts(self): + contacts = set() + for key in self._dict: + for values in self._dict[key]: + contacts.add(values[0]) + return list(contacts) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index aaca0a372..2601248e5 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -506,15 +506,20 @@ class Node(MockKademliaHelper): @rpcmethod def store(self, rpc_contact, blob_hash, token, port, originalPublisherID=None, age=0): - """ Store the received data in this node's local hash table + """ Store the received data in this node's local datastore - @param blob_hash: The hashtable key of the data + @param blob_hash: The hash of the data @type blob_hash: str - @param value: The actual data (the value associated with C{key}) - @type value: str - @param originalPublisherID: The node ID of the node that is the - B{original} publisher of the data + + @param token: The token we previously returned when this contact sent us a findValue + @type token: str + + @param port: The TCP port the contact is listening on for requests for this blob (the peerPort) + @type port: int + + @param originalPublisherID: The node ID of the node that is the publisher of the data @type originalPublisherID: str + @param age: The relative age of the data (time in seconds since it was originally published). Note that the original publish time isn't actually given, to compensate for clock skew between @@ -522,11 +527,8 @@ class Node(MockKademliaHelper): @type age: int @rtype: str - - @todo: Since the data (value) may be large, passing it around as a buffer - (which is the case currently) might not be a good idea... will have - to fix this (perhaps use a stream from the Protocol class?) """ + if originalPublisherID is None: originalPublisherID = rpc_contact.id compact_ip = rpc_contact.compact_ip() @@ -536,11 +538,11 @@ class Node(MockKademliaHelper): compact_port = str(struct.pack('>H', port)) else: raise TypeError('Invalid port') - compact_address = compact_ip + compact_port + rpc_contact.id now = int(self.clock.seconds()) originallyPublished = now - age - self._dataStore.addPeerToBlob(blob_hash, compact_address, now, originallyPublished, originalPublisherID) + self._dataStore.addPeerToBlob(rpc_contact, blob_hash, compact_address, now, originallyPublished, + originalPublisherID) return 'OK' @rpcmethod @@ -658,6 +660,7 @@ class Node(MockKademliaHelper): replication/republishing as necessary """ yield self._refreshRoutingTable() self._dataStore.removeExpiredPeers() + yield self._refreshStoringPeers() defer.returnValue(None) def _refreshContacts(self): @@ -665,6 +668,13 @@ class Node(MockKademliaHelper): [contact.ping() for contact in self.contacts], consumeErrors=True ) + def _refreshStoringPeers(self): + storing_contacts = self._dataStore.getStoringContacts() + return defer.DeferredList( + [self._protocol._ping_queue.enqueue_maybe_ping(contact) for contact in storing_contacts], + consumeErrors=True + ) + @defer.inlineCallbacks def _refreshRoutingTable(self): nodeIDs = self._routingTable.getRefreshList(0, True) diff --git a/lbrynet/tests/functional/dht/test_store.py b/lbrynet/tests/functional/dht/test_store.py index a33a28f77..911ea2563 100644 --- a/lbrynet/tests/functional/dht/test_store.py +++ b/lbrynet/tests/functional/dht/test_store.py @@ -28,6 +28,10 @@ class TestStore(TestKademliaBase): for node in storing_nodes: self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port), + node._dataStore.getStoringContacts()), [(announcing_node.node_id, + announcing_node.externalIP, + announcing_node.port)]) self.assertEquals(len(datastore_result), 1) expanded_peers = [] for peer in datastore_result: @@ -47,10 +51,136 @@ class TestStore(TestKademliaBase): datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEquals(len(datastore_result), 0) self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet + self.assertEquals(len(node._dataStore.getStoringContacts()), 1) self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) for node in storing_nodes: self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) datastore_result = node._dataStore.getPeersForBlob(blob_hash) self.assertEquals(len(datastore_result), 0) + self.assertEquals(len(node._dataStore.getStoringContacts()), 0) + self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired + + @defer.inlineCallbacks + def test_refresh_storing_peers(self): + blob_hash = generate_id() + announcing_node = self.nodes[20] + # announce the blob + announce_d = announcing_node.announceHaveBlob(blob_hash) + self.pump_clock(5) + storing_node_ids = yield announce_d + all_nodes = set(self.nodes).union(set(self._seeds)) + + # verify the nodes we think stored it did actually store it + storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids] + self.assertEquals(len(storing_nodes), len(storing_node_ids)) + self.assertEquals(len(storing_nodes), constants.k) + for node in storing_nodes: + self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port), + node._dataStore.getStoringContacts()), [(announcing_node.node_id, + announcing_node.externalIP, + announcing_node.port)]) + self.assertEquals(len(datastore_result), 1) + expanded_peers = [] + for peer in datastore_result: + host = ".".join([str(ord(d)) for d in peer[:4]]) + port, = struct.unpack('>H', peer[4:6]) + peer_node_id = peer[6:] + if (host, port, peer_node_id) not in expanded_peers: + expanded_peers.append((peer_node_id, host, port)) + self.assertEquals(expanded_peers[0], + (announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort)) + + self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) + + # verify the announced blob expires in the storing nodes datastores + + self.clock.advance(constants.dataExpireTimeout) # skip the clock directly ahead + for node in storing_nodes: + self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(len(datastore_result), 0) + self.assertTrue(blob_hash in node._dataStore._dict) # the looping call shouldn't have removed it yet + self.assertEquals(len(node._dataStore.getStoringContacts()), 1) + + self.pump_clock(constants.checkRefreshInterval + 1) # tick the clock forward (so the nodes refresh) + for node in storing_nodes: + self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(len(datastore_result), 0) + self.assertEquals(len(node._dataStore.getStoringContacts()), 0) self.assertTrue(blob_hash not in node._dataStore._dict) # the looping call should have fired after + + +class TestStoringNodeWentStale(TestKademliaBase): + network_size = 40 + + @defer.inlineCallbacks + def test_storing_node_went_stale_then_came_back(self): + blob_hash = generate_id() + announcing_node = self.nodes[20] + # announce the blob + announce_d = announcing_node.announceHaveBlob(blob_hash) + announce_time = self.clock.seconds() + self.pump_clock(5) + storing_node_ids = yield announce_d + all_nodes = set(self.nodes).union(set(self._seeds)) + + # verify the nodes we think stored it did actually store it + storing_nodes = [node for node in all_nodes if node.node_id.encode('hex') in storing_node_ids] + self.assertEquals(len(storing_nodes), len(storing_node_ids)) + self.assertEquals(len(storing_nodes), constants.k) + for node in storing_nodes: + self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(map(lambda contact: (contact.id, contact.address, contact.port), + node._dataStore.getStoringContacts()), [(announcing_node.node_id, + announcing_node.externalIP, + announcing_node.port)]) + self.assertEquals(len(datastore_result), 1) + expanded_peers = [] + for peer in datastore_result: + host = ".".join([str(ord(d)) for d in peer[:4]]) + port, = struct.unpack('>H', peer[4:6]) + peer_node_id = peer[6:] + if (host, port, peer_node_id) not in expanded_peers: + expanded_peers.append((peer_node_id, host, port)) + self.assertEquals(expanded_peers[0], + (announcing_node.node_id, announcing_node.externalIP, announcing_node.peerPort)) + + self.nodes.remove(announcing_node) + yield self.run_reactor(1, [announcing_node.stop()]) + + # run the network for an hour, which should expire the removed node and the announced value + self.pump_clock(3600) + self.verify_all_nodes_are_routable() + self.verify_all_nodes_are_pingable() + + for node in storing_nodes: # make sure the contact isn't returned as a peer for the blob, but that + # we still have the entry in the datastore in case the node returns + self.assertFalse(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(len(datastore_result), 0) + self.assertEquals(len(node._dataStore.getStoringContacts()), 1) + self.assertTrue(blob_hash in node._dataStore._dict) + + # bring the announcing node back online + self.nodes.append(announcing_node) + yield self.run_reactor( + 31, [announcing_node.start([(seed_name, 4444) for seed_name in sorted(self.seed_dns.keys())])] + ) + self.pump_clock(24*60+1) # FIXME: this should work after 12 minutes + 1 second, yet it doesnt + self.verify_all_nodes_are_routable() + self.verify_all_nodes_are_pingable() + + # now the announcing node should once again be returned as a peer for the blob + for node in storing_nodes: + self.assertTrue(node._dataStore.hasPeersForBlob(blob_hash)) + datastore_result = node._dataStore.getPeersForBlob(blob_hash) + self.assertEquals(len(datastore_result), 1) + self.assertEquals(len(node._dataStore.getStoringContacts()), 1) + self.assertTrue(blob_hash in node._dataStore._dict) + + # TODO: handle the case where the announcing node re joins with a different address from what is stored