import logging import socket import functools import hashlib import asyncio import typing import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport from lbry.dht import constants from lbry.dht.serialization.bencoding import DecodeError from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.routing_table import TreeRoutingTable from lbry.dht.protocol.data_store import DictDataStore from lbry.dht.peer import make_kademlia_peer if typing.TYPE_CHECKING: from lbry.dht.peer import PeerManager, KademliaPeer log = logging.getLogger(__name__) OLD_PROTOCOL_ERRORS = { "findNode() takes exactly 2 arguments (5 given)": "0.19.1", "findValue() takes exactly 2 arguments (5 given)": "0.19.1" } class KademliaRPC: def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333): self.protocol = protocol self.loop = loop self.peer_port = peer_port self.old_token_secret: bytes = None self.token_secret = constants.generate_id() def compact_address(self): compact_ip = functools.reduce(lambda buff, x: buff + bytearray([int(x)]), self.protocol.external_ip.split('.'), bytearray()) compact_port = self.peer_port.to_bytes(2, 'big') return compact_ip + compact_port + self.protocol.node_id @staticmethod def ping(): return b'pong' def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, port: int) -> bytes: if len(blob_hash) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") if not 0 < port < 65535: raise ValueError(f"invalid tcp port: {port}") rpc_contact.update_tcp_port(port) if not self.verify_token(token, rpc_contact.compact_ip()): if self.loop.time() - self.protocol.started_listening_time < constants.TOKEN_SECRET_REFRESH_INTERVAL: pass else: raise ValueError("Invalid token") self.protocol.data_store.add_peer_to_blob( rpc_contact, blob_hash ) return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: if len(key) != constants.HASH_LENGTH: raise ValueError("invalid contact node_id length: %i" % len(key)) contacts = self.protocol.routing_table.find_close_peers(key, sender_node_id=rpc_contact.node_id) contact_triples = [] for contact in contacts[:constants.K * 2]: contact_triples.append((contact.node_id, contact.address, contact.udp_port)) return contact_triples def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0): page = page if page > 0 else 0 if len(key) != constants.HASH_LENGTH: raise ValueError("invalid blob_exchange hash length: %i" % len(key)) response = { b'token': self.make_token(rpc_contact.compact_ip()), } if not page: response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.K] if self.protocol.protocol_version: response[b'protocolVersion'] = self.protocol.protocol_version # get peers we have stored for this blob_exchange peers = [ peer.compact_address_tcp() for peer in self.protocol.data_store.get_peers_for_blob(key) if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] # if we don't have k storing peers to return and we have this hash locally, include our contact information if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) if not peers: response[PAGE_KEY] = 0 else: response[PAGE_KEY] = (len(peers) // (constants.K + 1)) + 1 # how many pages of peers we have for the blob if len(peers) > constants.K: random.Random(self.protocol.node_id).shuffle(peers) if page * constants.K < len(peers): response[key] = peers[page * constants.K:page * constants.K + constants.K] return response def refresh_token(self): # TODO: this needs to be called periodically self.old_token_secret = self.token_secret self.token_secret = constants.generate_id() def make_token(self, compact_ip): h = hashlib.new('sha384') h.update(self.token_secret + compact_ip) return h.digest() def verify_token(self, token, compact_ip): h = hashlib.new('sha384') h.update(self.token_secret + compact_ip) if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token? h = hashlib.new('sha384') h.update(self.old_token_secret + compact_ip) if not token == h.digest(): return False return True class RemoteKademliaRPC: """ Encapsulates RPC calls to remote Peers """ def __init__(self, loop: asyncio.AbstractEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol', peer: 'KademliaPeer'): self.loop = loop self.peer_tracker = peer_tracker self.protocol = protocol self.peer = peer async def ping(self) -> bytes: """ :return: b'pong' """ response = await self.protocol.send_request( self.peer, RequestDatagram.make_ping(self.protocol.node_id) ) return response.response async def store(self, blob_hash: bytes) -> bytes: """ :param blob_hash: blob hash as bytes :return: b'OK' """ if len(blob_hash) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") if not self.protocol.peer_port or not 0 < self.protocol.peer_port < 65535: raise ValueError(f"invalid tcp port: {self.protocol.peer_port}") token = self.peer_tracker.get_node_token(self.peer.node_id) if not token: find_value_resp = await self.find_value(blob_hash) token = find_value_resp[b'token'] response = await self.protocol.send_request( self.peer, RequestDatagram.make_store(self.protocol.node_id, blob_hash, token, self.protocol.peer_port) ) return response.response async def find_node(self, key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: """ :return: [(node_id, address, udp_port), ...] """ if len(key) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of find node key: {len(key)}") response = await self.protocol.send_request( self.peer, RequestDatagram.make_find_node(self.protocol.node_id, key) ) return [(node_id, address.decode(), udp_port) for node_id, address, udp_port in response.response] async def find_value(self, key: bytes, page: int = 0) -> typing.Union[typing.Dict]: """ :return: { b'token': , b'contacts': [(node_id, address, udp_port), ...] : [ RemoteKademliaRPC: return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer) def start(self): self.maintaing_routing_task = self.loop.create_task(self.routing_table_task()) def stop(self): if self.maintaing_routing_task: self.maintaing_routing_task.cancel() if self.transport: self.disconnect() def disconnect(self): self.transport.close() def connection_made(self, transport: DatagramTransport): self.transport = transport def connection_lost(self, exc): self.stop() @staticmethod def _migrate_incoming_rpc_args(peer: 'KademliaPeer', method: bytes, *args) -> typing.Tuple[typing.Tuple, typing.Dict]: if method == b'store' and peer.protocol_version == 0: if isinstance(args[1], dict): blob_hash = args[0] token = args[1].pop(b'token', None) port = args[1].pop(b'port', -1) original_publisher_id = args[1].pop(b'lbryid', None) age = 0 return (blob_hash, token, port, original_publisher_id, age), {} return args, {} async def _add_peer(self, peer: 'KademliaPeer'): if not peer.node_id: log.warning("Tried adding a peer with no node id!") return False for my_peer in self.routing_table.get_peers(): if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: self.routing_table.remove_peer(my_peer) self.routing_table.join_buckets() bucket_index = self.routing_table.kbucket_index(peer.node_id) if self.routing_table.buckets[bucket_index].add_peer(peer): return True # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) if self.routing_table.should_split(bucket_index, peer.node_id): self.routing_table.split_bucket(bucket_index) # Retry the insertion attempt result = await self._add_peer(peer) self.routing_table.join_buckets() return result else: # We can't split the k-bucket # # The 13 page kademlia paper specifies that the least recently contacted node in the bucket # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4). # # A reasonable extension to this is BEP 0005, which extends the above: # # Not all nodes that we learn about are equal. Some are "good" and some are not. # Many nodes using the DHT are able to send queries and receive responses, # but are not able to respond to queries from other nodes. It is important that # each node's routing table must contain only known good nodes. A good node is # a node has responded to one of our queries within the last 15 minutes. A node # is also good if it has ever responded to one of our queries and has sent us a # query within the last 15 minutes. After 15 minutes of inactivity, a node becomes # questionable. Nodes become bad when they fail to respond to multiple queries # in a row. Nodes that we know are good are given priority over nodes with unknown status. # # When there are bad or questionable nodes in the bucket, the least recent is selected for # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent) # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact # is ignored if the pinged node replies. not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers() not_recently_replied = [] for my_peer in not_good_contacts: last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port) if not last_replied or last_replied + 60 < self.loop.time(): not_recently_replied.append(my_peer) if not_recently_replied: to_replace = not_recently_replied[0] else: to_replace = self.routing_table.buckets[bucket_index].peers[0] last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port) if last_replied and last_replied + 60 > self.loop.time(): return False log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port) try: to_replace_rpc = self.get_rpc_peer(to_replace) await to_replace_rpc.ping() return False except asyncio.TimeoutError: log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) if to_replace in self.routing_table.buckets[bucket_index]: self.routing_table.buckets[bucket_index].remove_peer(to_replace) return await self._add_peer(peer) def add_peer(self, peer: 'KademliaPeer'): if peer.node_id == self.node_id: return False self._to_add.add(peer) self._wakeup_routing_task.set() def remove_peer(self, peer: 'KademliaPeer'): self._to_remove.add(peer) self._wakeup_routing_task.set() async def routing_table_task(self): while True: while self._to_remove: async with self._split_lock: peer = self._to_remove.pop() self.routing_table.remove_peer(peer) self.routing_table.join_buckets() while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop) self._wakeup_routing_task.clear() def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8], self.node_id.hex()[:8]) method = message.method if method not in [b'ping', b'store', b'findNode', b'findValue']: raise AttributeError('Invalid method: %s' % message.method.decode()) if message.args and isinstance(message.args[-1], dict) and b'protocolVersion' in message.args[-1]: # args don't need reformatting args, kwargs = tuple(message.args[:-1]), message.args[-1] else: args, kwargs = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args) log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(), sender_contact.address, sender_contact.udp_port) if method == b'ping': result = self.node_rpc.ping() elif method == b'store': blob_hash, token, port, original_publisher_id, age = args[:5] # pylint: disable=unused-variable result = self.node_rpc.store(sender_contact, blob_hash, token, port) else: key = args[0] page = kwargs.get(PAGE_KEY, 0) if method == b'findNode': result = self.node_rpc.find_node(sender_contact, key) else: assert method == b'findValue' result = self.node_rpc.find_value(sender_contact, key, page) self.send_response( sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result), ) def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): # This is an RPC method request self.peer_manager.report_last_requested(address[0], address[1]) try: peer = self.routing_table.get_peer(request_datagram.node_id) except IndexError: try: peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1]) except ValueError as err: log.warning("error replying to %s: %s", address[0], str(err)) return try: self._handle_rpc(peer, request_datagram) # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it # will be added to our routing table if successful is_good = self.peer_manager.peer_is_good(peer) if is_good is None: self.ping_queue.enqueue_maybe_ping(peer) # only add a requesting contact to the routing table if it has replied to one of our requests elif is_good is True: self.add_peer(peer) except ValueError as err: log.debug("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), str(err)) self.send_error( peer, ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(), str(err).encode()) ) except Exception as err: log.warning("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), str(err)) self.send_error( peer, ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(), str(err).encode()) ) def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram): # Find the message that triggered this response if response_datagram.rpc_id in self.sent_messages: peer, future, _ = self.sent_messages[response_datagram.rpc_id] if peer.address != address[0]: future.set_exception( RemoteException(f"response from {address[0]}, expected {peer.address}") ) return # We got a result from the RPC if peer.node_id == self.node_id: future.set_exception(RemoteException("node has our node id")) return elif response_datagram.node_id == self.node_id: future.set_exception(RemoteException("incoming message is from our node id")) return peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1]) self.peer_manager.report_last_replied(address[0], address[1]) self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) if not future.cancelled(): future.set_result(response_datagram) self.add_peer(peer) else: log.warning("%s:%i replied, but after we cancelled the request attempt", peer.address, peer.udp_port) else: # If the original message isn't found, it must have timed out # TODO: we should probably do something with this... pass def handle_error_datagram(self, address, error_datagram: ErrorDatagram): # The RPC request raised a remote exception; raise it locally remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})") if error_datagram.rpc_id in self.sent_messages: peer, future, request = self.sent_messages.pop(error_datagram.rpc_id) if (peer.address, peer.udp_port) != address: future.set_exception( RemoteException( f"response from {address[0]}:{address[1]}, " f"expected {peer.address}:{peer.udp_port}" ) ) return error_msg = f"" \ f"Error sending '{request.method}' to {peer.address}:{peer.udp_port}\n" \ f"Args: {request.args}\n" \ f"Raised: {str(remote_exception)}" if 'Invalid token' in error_msg: log.debug(error_msg) elif error_datagram.response not in OLD_PROTOCOL_ERRORS: log.warning(error_msg) else: log.debug( "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", peer.address, peer.udp_port, OLD_PROTOCOL_ERRORS[error_datagram.response] ) future.set_exception(remote_exception) return else: if error_datagram.response not in OLD_PROTOCOL_ERRORS: msg = f"Received error from {address[0]}:{address[1]}, but it isn't in response to a " \ f"pending request: {str(remote_exception)}" log.warning(msg) else: log.debug( "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", address[0], address[1], OLD_PROTOCOL_ERRORS[error_datagram.response] ) def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-renamed try: message = decode_datagram(datagram) except (ValueError, TypeError, DecodeError): self.peer_manager.report_failure(address[0], address[1]) log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex()) return if isinstance(message, RequestDatagram): self.handle_request_datagram(address, message) elif isinstance(message, ErrorDatagram): self.handle_error_datagram(address, message) else: assert isinstance(message, ResponseDatagram), "sanity" self.handle_response_datagram(address, message) async def send_request(self, peer: 'KademliaPeer', request: RequestDatagram) -> ResponseDatagram: self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: response = await asyncio.wait_for(response_fut, self.rpc_timeout) self.peer_manager.report_last_replied(peer.address, peer.udp_port) return response except asyncio.CancelledError: if not response_fut.done(): response_fut.cancel() raise except (asyncio.TimeoutError, RemoteException): self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: self.remove_peer(peer) raise def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): self._send(peer, response) def send_error(self, peer: 'KademliaPeer', error: ErrorDatagram): self._send(peer, error) def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, ResponseDatagram, ErrorDatagram]): if not self.transport or self.transport.is_closing(): raise TransportNotConnected() data = message.bencode() if len(data) > constants.MSG_SIZE_LIMIT: log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)", constants.MSG_SIZE_LIMIT, len(data)) log.debug("Packet is too large to send: %s", data[:3500].hex()) raise ValueError( f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)" ) if isinstance(message, (RequestDatagram, ResponseDatagram)): assert message.node_id == self.node_id, message if isinstance(message, RequestDatagram): assert self.node_id != peer.node_id def pop_from_sent_messages(_): if message.rpc_id in self.sent_messages: self.sent_messages.pop(message.rpc_id) if isinstance(message, RequestDatagram): response_fut = self.loop.create_future() response_fut.add_done_callback(pop_from_sent_messages) self.sent_messages[message.rpc_id] = (peer, response_fut, message) try: self.transport.sendto(data, (peer.address, peer.udp_port)) except OSError as err: # TODO: handle ENETUNREACH if err.errno == socket.EWOULDBLOCK: # i'm scared this may swallow important errors, but i get a million of these # on Linux and it doesn't seem to affect anything -grin log.warning("Can't send data to dht: EWOULDBLOCK") else: log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)", len(data), peer.address, peer.udp_port, str(err), err.errno) if isinstance(message, RequestDatagram): self.sent_messages[message.rpc_id][1].set_exception(err) else: raise err if isinstance(message, RequestDatagram): self.peer_manager.report_last_sent(peer.address, peer.udp_port) elif isinstance(message, ErrorDatagram): self.peer_manager.report_failure(peer.address, peer.udp_port) def change_token(self): self.old_token_secret = self.token_secret self.token_secret = constants.generate_id() def make_token(self, compact_ip): return constants.digest(self.token_secret + compact_ip) def verify_token(self, token, compact_ip): h = constants.HASH_CLASS() h.update(self.token_secret + compact_ip) if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token? h = constants.HASH_CLASS() h.update(self.old_token_secret + compact_ip) if not token == h.digest(): return False return True async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer', # pylint: disable=too-many-return-statements retry: bool = True) -> typing.Tuple[bytes, bool]: async def __store(): res = await self.get_rpc_peer(peer).store(hash_value) if res != b"OK": raise ValueError(res) log.debug("Stored %s to %s", hash_value.hex()[:8], peer) return peer.node_id, True try: return await __store() except asyncio.TimeoutError: log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer) return peer.node_id, False except ValueError as err: log.error("Unexpected response: %s", err) return peer.node_id, False except RemoteException as err: if 'findValue() takes exactly 2 arguments (5 given)' in str(err): log.debug("peer %s:%i is running an incompatible version of lbrynet", peer.address, peer.udp_port) return peer.node_id, False if 'Invalid token' not in str(err): log.warning("Unexpected error while storing blob_hash: %s", err) return peer.node_id, False self.peer_manager.clear_token(peer.node_id) if not retry: return peer.node_id, False return await self.store_to_peer(hash_value, peer, retry=False)