From 20c46677d05c5807fc6e0972d7d0abd0975bd3f9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 3 Jan 2020 01:16:44 -0300 Subject: [PATCH] lint dht: datastore protocol --- lbry/dht/protocol/data_store.py | 4 +- lbry/dht/protocol/protocol.py | 70 ++++++++++++++++------------- lbry/dht/serialization/bencoding.py | 3 +- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index e6880a18f..6a614680f 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -53,7 +53,7 @@ class DictDataStore: now = self.loop.time() if key in self._data_store: current = list(filter(lambda x: x[0] == contact, self._data_store[key])) - if len(current): + if len(current) > 0: self._data_store[key][self._data_store[key].index(current[0])] = contact, now else: self._data_store[key].append((contact, now)) @@ -65,6 +65,6 @@ class DictDataStore: def get_storing_contacts(self) -> typing.List['KademliaPeer']: peers = set() - for key, stored in self._data_store.items(): + for _, stored in self._data_store.items(): peers.update(set(map(lambda tup: tup[0], stored))) return list(peers) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 472b26e3b..016ad50bf 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -23,7 +23,7 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -old_protocol_errors = { +OLD_PROTOCOL_ERRORS = { "findNode() takes exactly 2 arguments (5 given)": "0.19.1", "findValue() takes exactly 2 arguments (5 given)": "0.19.1" } @@ -326,9 +326,9 @@ class KademliaProtocol(DatagramProtocol): if not peer.node_id: log.warning("Tried adding a peer with no node id!") return False - for p in self.routing_table.get_peers(): - if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: - self.routing_table.remove_peer(p) + for my_peer in self.routing_table.get_peers(): + if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: + self.routing_table.remove_peer(my_peer) self.routing_table.join_buckets() bucket_index = self.routing_table.kbucket_index(peer.node_id) if self.routing_table.buckets[bucket_index].add_peer(peer): @@ -367,10 +367,10 @@ class KademliaProtocol(DatagramProtocol): not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers() not_recently_replied = [] - for p in not_good_contacts: - last_replied = self.peer_manager.get_last_replied(p.address, p.udp_port) + for my_peer in not_good_contacts: + last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port) if not last_replied or last_replied + 60 < self.loop.time(): - not_recently_replied.append(p) + not_recently_replied.append(my_peer) if not_recently_replied: to_replace = not_recently_replied[0] else: @@ -421,20 +421,20 @@ class KademliaProtocol(DatagramProtocol): raise AttributeError('Invalid method: %s' % message.method.decode()) if message.args and isinstance(message.args[-1], dict) and b'protocolVersion' in message.args[-1]: # args don't need reformatting - args, kw = tuple(message.args[:-1]), message.args[-1] + args, kwargs = tuple(message.args[:-1]), message.args[-1] else: - args, kw = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args) + args, kwargs = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args) log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(), sender_contact.address, sender_contact.udp_port) if method == b'ping': result = self.node_rpc.ping() elif method == b'store': - blob_hash, token, port, original_publisher_id, age = args[:5] + blob_hash, token, port, original_publisher_id, age = args[:5] # pylint: disable=unused-variable result = self.node_rpc.store(sender_contact, blob_hash, token, port) else: key = args[0] - page = kw.get(PAGE_KEY, 0) + page = kwargs.get(PAGE_KEY, 0) if method == b'findNode': result = self.node_rpc.find_node(sender_contact, key) else: @@ -484,25 +484,25 @@ class KademliaProtocol(DatagramProtocol): def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram): # Find the message that triggered this response if response_datagram.rpc_id in self.sent_messages: - peer, df, request = self.sent_messages[response_datagram.rpc_id] + peer, future, _ = self.sent_messages[response_datagram.rpc_id] if peer.address != address[0]: - df.set_exception(RemoteException( - f"response from {address[0]}, expected {peer.address}") + future.set_exception( + RemoteException(f"response from {address[0]}, expected {peer.address}") ) return # We got a result from the RPC if peer.node_id == self.node_id: - df.set_exception(RemoteException("node has our node id")) + future.set_exception(RemoteException("node has our node id")) return elif response_datagram.node_id == self.node_id: - df.set_exception(RemoteException("incoming message is from our node id")) + future.set_exception(RemoteException("incoming message is from our node id")) return peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1]) self.peer_manager.report_last_replied(address[0], address[1]) self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) - if not df.cancelled(): - df.set_result(response_datagram) + if not future.cancelled(): + future.set_result(response_datagram) self.add_peer(peer) else: log.warning("%s:%i replied, but after we cancelled the request attempt", @@ -516,11 +516,13 @@ class KademliaProtocol(DatagramProtocol): # The RPC request raised a remote exception; raise it locally remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})") if error_datagram.rpc_id in self.sent_messages: - peer, df, request = self.sent_messages.pop(error_datagram.rpc_id) + peer, future, request = self.sent_messages.pop(error_datagram.rpc_id) if (peer.address, peer.udp_port) != address: - df.set_exception(RemoteException( - f"response from {address[0]}:{address[1]}, " - f"expected {peer.address}:{peer.udp_port}") + future.set_exception( + RemoteException( + f"response from {address[0]}:{address[1]}, " + f"expected {peer.address}:{peer.udp_port}" + ) ) return error_msg = f"" \ @@ -529,23 +531,27 @@ class KademliaProtocol(DatagramProtocol): f"Raised: {str(remote_exception)}" if 'Invalid token' in error_msg: log.debug(error_msg) - elif error_datagram.response not in old_protocol_errors: + elif error_datagram.response not in OLD_PROTOCOL_ERRORS: log.warning(error_msg) else: - log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", - peer.address, peer.udp_port, old_protocol_errors[error_datagram.response]) - df.set_exception(remote_exception) + log.debug( + "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", + peer.address, peer.udp_port, OLD_PROTOCOL_ERRORS[error_datagram.response] + ) + future.set_exception(remote_exception) return else: - if error_datagram.response not in old_protocol_errors: + if error_datagram.response not in OLD_PROTOCOL_ERRORS: msg = f"Received error from {address[0]}:{address[1]}, but it isn't in response to a " \ f"pending request: {str(remote_exception)}" log.warning(msg) else: - log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", - address[0], address[1], old_protocol_errors[error_datagram.response]) + log.debug( + "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", + address[0], address[1], OLD_PROTOCOL_ERRORS[error_datagram.response] + ) - def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: + def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-differ try: message = decode_datagram(datagram) except (ValueError, TypeError): @@ -646,7 +652,7 @@ class KademliaProtocol(DatagramProtocol): return False return True - async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer', + async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer', # pylint: disable=too-many-return-statements retry: bool = True) -> typing.Tuple[bytes, bool]: async def __store(): res = await self.get_rpc_peer(peer).store(hash_value) @@ -661,7 +667,7 @@ class KademliaProtocol(DatagramProtocol): log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer) return peer.node_id, False except ValueError as err: - log.error("Unexpected response: %s" % err) + log.error("Unexpected response: %s", err) return peer.node_id, False except RemoteException as err: if 'findValue() takes exactly 2 arguments (5 given)' in str(err): diff --git a/lbry/dht/serialization/bencoding.py b/lbry/dht/serialization/bencoding.py index 4f4c952d5..8b06dac2a 100644 --- a/lbry/dht/serialization/bencoding.py +++ b/lbry/dht/serialization/bencoding.py @@ -52,8 +52,7 @@ def _bdecode(data: bytes, start_index: int = 0) -> typing.Tuple[typing.Union[int raise DecodeError(err) start_index = split_pos + 1 end_pos = start_index + length - b = data[start_index:end_pos] - return b, end_pos + return data[start_index:end_pos], end_pos def bencode(data: typing.Dict) -> bytes: