import logging import socket import functools import hashlib import asyncio import time import typing import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport from prometheus_client import Gauge, Counter, Histogram from lbry.dht import constants from lbry.dht.serialization.bencoding import DecodeError from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.routing_table import TreeRoutingTable from lbry.dht.protocol.data_store import DictDataStore from lbry.dht.peer import make_kademlia_peer if typing.TYPE_CHECKING: from lbry.dht.peer import PeerManager, KademliaPeer log = logging.getLogger(__name__) OLD_PROTOCOL_ERRORS = { "findNode() takes exactly 2 arguments (5 given)": "0.19.1", "findValue() takes exactly 2 arguments (5 given)": "0.19.1" } class KademliaRPC: stored_blobs_metric = Gauge( "stored_blobs", "Number of blobs announced by other peers", namespace="dht_node", labelnames=("scope",), ) def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333): self.protocol = protocol self.loop = loop self.peer_port = peer_port self.old_token_secret: bytes = None self.token_secret = constants.generate_id() def compact_address(self): compact_ip = functools.reduce(lambda buff, x: buff + bytearray([int(x)]), self.protocol.external_ip.split('.'), bytearray()) compact_port = self.peer_port.to_bytes(2, 'big') return compact_ip + compact_port + self.protocol.node_id @staticmethod def ping(): return b'pong' def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, port: int) -> bytes: if len(blob_hash) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") if not 0 < port < 65535: raise ValueError(f"invalid tcp port: {port}") rpc_contact.update_tcp_port(port) if not self.verify_token(token, rpc_contact.compact_ip()): if self.loop.time() - self.protocol.started_listening_time < constants.TOKEN_SECRET_REFRESH_INTERVAL: pass else: raise ValueError("Invalid token") self.protocol.data_store.add_peer_to_blob( rpc_contact, blob_hash ) self.stored_blobs_metric.labels("global").set(len(self.protocol.data_store)) return b'OK' def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: if len(key) != constants.HASH_LENGTH: raise ValueError("invalid contact node_id length: %i" % len(key)) contacts = self.protocol.routing_table.find_close_peers(key, sender_node_id=rpc_contact.node_id) contact_triples = [] for contact in contacts[:constants.K * 2]: contact_triples.append((contact.node_id, contact.address, contact.udp_port)) return contact_triples def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0): page = page if page > 0 else 0 if len(key) != constants.HASH_LENGTH: raise ValueError("invalid blob_exchange hash length: %i" % len(key)) response = { b'token': self.make_token(rpc_contact.compact_ip()), } if not page: response[b'contacts'] = self.find_node(rpc_contact, key)[:constants.K] if self.protocol.protocol_version: response[b'protocolVersion'] = self.protocol.protocol_version # get peers we have stored for this blob_exchange peers = [ peer.compact_address_tcp() for peer in self.protocol.data_store.get_peers_for_blob(key) if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] # if we don't have k storing peers to return and we have this hash locally, include our contact information if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) if not peers: response[PAGE_KEY] = 0 else: response[PAGE_KEY] = (len(peers) // (constants.K + 1)) + 1 # how many pages of peers we have for the blob if len(peers) > constants.K: random.Random(self.protocol.node_id).shuffle(peers) if page * constants.K < len(peers): response[key] = peers[page * constants.K:page * constants.K + constants.K] return response def refresh_token(self): # TODO: this needs to be called periodically self.old_token_secret = self.token_secret self.token_secret = constants.generate_id() def make_token(self, compact_ip): h = hashlib.new('sha384') h.update(self.token_secret + compact_ip) return h.digest() def verify_token(self, token, compact_ip): h = hashlib.new('sha384') h.update(self.token_secret + compact_ip) if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token? h = hashlib.new('sha384') h.update(self.old_token_secret + compact_ip) if not token == h.digest(): return False return True class RemoteKademliaRPC: """ Encapsulates RPC calls to remote Peers """ def __init__(self, loop: asyncio.AbstractEventLoop, peer_tracker: 'PeerManager', protocol: 'KademliaProtocol', peer: 'KademliaPeer'): self.loop = loop self.peer_tracker = peer_tracker self.protocol = protocol self.peer = peer async def ping(self) -> bytes: """ :return: b'pong' """ response = await self.protocol.send_request( self.peer, RequestDatagram.make_ping(self.protocol.node_id) ) return response.response async def store(self, blob_hash: bytes) -> bytes: """ :param blob_hash: blob hash as bytes :return: b'OK' """ if len(blob_hash) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of blob hash: {len(blob_hash)}") if not self.protocol.peer_port or not 0 < self.protocol.peer_port < 65535: raise ValueError(f"invalid tcp port: {self.protocol.peer_port}") token = self.peer_tracker.get_node_token(self.peer.node_id) if not token: find_value_resp = await self.find_value(blob_hash) token = find_value_resp[b'token'] response = await self.protocol.send_request( self.peer, RequestDatagram.make_store(self.protocol.node_id, blob_hash, token, self.protocol.peer_port) ) return response.response async def find_node(self, key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]: """ :return: [(node_id, address, udp_port), ...] """ if len(key) != constants.HASH_BITS // 8: raise ValueError(f"invalid length of find node key: {len(key)}") response = await self.protocol.send_request( self.peer, RequestDatagram.make_find_node(self.protocol.node_id, key) ) return [(node_id, address.decode(), udp_port) for node_id, address, udp_port in response.response] async def find_value(self, key: bytes, page: int = 0) -> typing.Union[typing.Dict]: """ :return: { b'token': , b'contacts': [(node_id, address, udp_port), ...] : [ RemoteKademliaRPC: return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer) def start(self): self.maintaing_routing_task = self.loop.create_task(self.routing_table_task()) def stop(self): if self.maintaing_routing_task: self.maintaing_routing_task.cancel() if self.transport: self.disconnect() def disconnect(self): self.transport.close() def connection_made(self, transport: DatagramTransport): self.transport = transport def connection_lost(self, exc): self.stop() @staticmethod def _migrate_incoming_rpc_args(peer: 'KademliaPeer', method: bytes, *args) -> typing.Tuple[typing.Tuple, typing.Dict]: if method == b'store' and peer.protocol_version == 0: if isinstance(args[1], dict): blob_hash = args[0] token = args[1].pop(b'token', None) port = args[1].pop(b'port', -1) original_publisher_id = args[1].pop(b'lbryid', None) age = 0 return (blob_hash, token, port, original_publisher_id, age), {} return args, {} async def _add_peer(self, peer: 'KademliaPeer'): if not peer.node_id: log.warning("Tried adding a peer with no node id!") return False for my_peer in self.routing_table.get_peers(): if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id: self.routing_table.remove_peer(my_peer) self.routing_table.join_buckets() bucket_index = self.routing_table.kbucket_index(peer.node_id) if self.routing_table.buckets[bucket_index].add_peer(peer): return True # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) if self.routing_table.should_split(bucket_index, peer.node_id): self.routing_table.split_bucket(bucket_index) # Retry the insertion attempt result = await self._add_peer(peer) self.routing_table.join_buckets() return result else: # We can't split the k-bucket # # The 13 page kademlia paper specifies that the least recently contacted node in the bucket # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4). # # A reasonable extension to this is BEP 0005, which extends the above: # # Not all nodes that we learn about are equal. Some are "good" and some are not. # Many nodes using the DHT are able to send queries and receive responses, # but are not able to respond to queries from other nodes. It is important that # each node's routing table must contain only known good nodes. A good node is # a node has responded to one of our queries within the last 15 minutes. A node # is also good if it has ever responded to one of our queries and has sent us a # query within the last 15 minutes. After 15 minutes of inactivity, a node becomes # questionable. Nodes become bad when they fail to respond to multiple queries # in a row. Nodes that we know are good are given priority over nodes with unknown status. # # When there are bad or questionable nodes in the bucket, the least recent is selected for # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent) # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact # is ignored if the pinged node replies. not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers() not_recently_replied = [] for my_peer in not_good_contacts: last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port) if not last_replied or last_replied + 60 < self.loop.time(): not_recently_replied.append(my_peer) if not_recently_replied: to_replace = not_recently_replied[0] else: to_replace = self.routing_table.buckets[bucket_index].peers[0] last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port) if last_replied and last_replied + 60 > self.loop.time(): return False log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port) try: to_replace_rpc = self.get_rpc_peer(to_replace) await to_replace_rpc.ping() return False except asyncio.TimeoutError: log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index, to_replace.address, to_replace.udp_port, peer.address, peer.udp_port) if to_replace in self.routing_table.buckets[bucket_index]: self.routing_table.buckets[bucket_index].remove_peer(to_replace) return await self._add_peer(peer) def add_peer(self, peer: 'KademliaPeer'): if peer.node_id == self.node_id: return False self._to_add.add(peer) self._wakeup_routing_task.set() def remove_peer(self, peer: 'KademliaPeer'): self._to_remove.add(peer) self._wakeup_routing_task.set() async def routing_table_task(self): while True: while self._to_remove: async with self._split_lock: peer = self._to_remove.pop() self.routing_table.remove_peer(peer) self.routing_table.join_buckets() while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop) self._wakeup_routing_task.clear() def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8], self.node_id.hex()[:8]) method = message.method if method not in [b'ping', b'store', b'findNode', b'findValue']: raise AttributeError('Invalid method: %s' % message.method.decode()) if message.args and isinstance(message.args[-1], dict) and b'protocolVersion' in message.args[-1]: # args don't need reformatting args, kwargs = tuple(message.args[:-1]), message.args[-1] else: args, kwargs = self._migrate_incoming_rpc_args(sender_contact, message.method, *message.args) log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(), sender_contact.address, sender_contact.udp_port) if method == b'ping': result = self.node_rpc.ping() elif method == b'store': blob_hash, token, port, original_publisher_id, age = args[:5] # pylint: disable=unused-variable result = self.node_rpc.store(sender_contact, blob_hash, token, port) else: key = args[0] page = kwargs.get(PAGE_KEY, 0) if method == b'findNode': result = self.node_rpc.find_node(sender_contact, key) else: assert method == b'findValue' result = self.node_rpc.find_value(sender_contact, key, page) self.send_response( sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result), ) def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): # This is an RPC method request self.peer_manager.report_last_requested(address[0], address[1]) try: peer = self.routing_table.get_peer(request_datagram.node_id) except IndexError: try: peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1]) except ValueError as err: log.warning("error replying to %s: %s", address[0], str(err)) return try: self._handle_rpc(peer, request_datagram) # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it # will be added to our routing table if successful is_good = self.peer_manager.peer_is_good(peer) if is_good is None: self.ping_queue.enqueue_maybe_ping(peer) # only add a requesting contact to the routing table if it has replied to one of our requests elif is_good is True: self.add_peer(peer) except ValueError as err: log.debug("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), str(err)) self.send_error( peer, ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(), str(err).encode()) ) except Exception as err: log.warning("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), str(err)) self.send_error( peer, ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(), str(err).encode()) ) def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram): # Find the message that triggered this response if response_datagram.rpc_id in self.sent_messages: peer, future, _ = self.sent_messages[response_datagram.rpc_id] if peer.address != address[0]: future.set_exception( RemoteException(f"response from {address[0]}, expected {peer.address}") ) return # We got a result from the RPC if peer.node_id == self.node_id: future.set_exception(RemoteException("node has our node id")) return elif response_datagram.node_id == self.node_id: future.set_exception(RemoteException("incoming message is from our node id")) return peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1]) self.peer_manager.report_last_replied(address[0], address[1]) self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) if not future.cancelled(): future.set_result(response_datagram) self.add_peer(peer) else: log.warning("%s:%i replied, but after we cancelled the request attempt", peer.address, peer.udp_port) else: # If the original message isn't found, it must have timed out # TODO: we should probably do something with this... pass def handle_error_datagram(self, address, error_datagram: ErrorDatagram): # The RPC request raised a remote exception; raise it locally remote_exception = RemoteException(f"{error_datagram.exception_type}({error_datagram.response})") if error_datagram.rpc_id in self.sent_messages: peer, future, request = self.sent_messages.pop(error_datagram.rpc_id) if (peer.address, peer.udp_port) != address: future.set_exception( RemoteException( f"response from {address[0]}:{address[1]}, " f"expected {peer.address}:{peer.udp_port}" ) ) return error_msg = f"" \ f"Error sending '{request.method}' to {peer.address}:{peer.udp_port}\n" \ f"Args: {request.args}\n" \ f"Raised: {str(remote_exception)}" if 'Invalid token' in error_msg: log.debug(error_msg) elif error_datagram.response not in OLD_PROTOCOL_ERRORS: log.warning(error_msg) else: log.debug( "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", peer.address, peer.udp_port, OLD_PROTOCOL_ERRORS[error_datagram.response] ) future.set_exception(remote_exception) return else: if error_datagram.response not in OLD_PROTOCOL_ERRORS: msg = f"Received error from {address[0]}:{address[1]}, but it isn't in response to a " \ f"pending request: {str(remote_exception)}" log.warning(msg) else: log.debug( "known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", address[0], address[1], OLD_PROTOCOL_ERRORS[error_datagram.response] ) def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-renamed try: message = decode_datagram(datagram) except (ValueError, TypeError, DecodeError): self.peer_manager.report_failure(address[0], address[1]) log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex()) return if isinstance(message, RequestDatagram): self.handle_request_datagram(address, message) elif isinstance(message, ErrorDatagram): self.handle_error_datagram(address, message) else: assert isinstance(message, ResponseDatagram), "sanity" self.handle_response_datagram(address, message) async def send_request(self, peer: 'KademliaPeer', request: RequestDatagram) -> ResponseDatagram: self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: self.requests_sent_metric.labels(method=request.method).inc() start = time.perf_counter() response = await asyncio.wait_for(response_fut, self.rpc_timeout) self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start) self.peer_manager.report_last_replied(peer.address, peer.udp_port) return response except asyncio.CancelledError: if not response_fut.done(): response_fut.cancel() raise except (asyncio.TimeoutError, RemoteException): self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: self.remove_peer(peer) raise def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): self._send(peer, response) def send_error(self, peer: 'KademliaPeer', error: ErrorDatagram): self._send(peer, error) def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, ResponseDatagram, ErrorDatagram]): if not self.transport or self.transport.is_closing(): raise TransportNotConnected() data = message.bencode() if len(data) > constants.MSG_SIZE_LIMIT: log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)", constants.MSG_SIZE_LIMIT, len(data)) log.debug("Packet is too large to send: %s", data[:3500].hex()) raise ValueError( f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)" ) if isinstance(message, (RequestDatagram, ResponseDatagram)): assert message.node_id == self.node_id, message if isinstance(message, RequestDatagram): assert self.node_id != peer.node_id def pop_from_sent_messages(_): if message.rpc_id in self.sent_messages: self.sent_messages.pop(message.rpc_id) if isinstance(message, RequestDatagram): response_fut = self.loop.create_future() response_fut.add_done_callback(pop_from_sent_messages) self.sent_messages[message.rpc_id] = (peer, response_fut, message) try: self.transport.sendto(data, (peer.address, peer.udp_port)) except OSError as err: # TODO: handle ENETUNREACH if err.errno == socket.EWOULDBLOCK: # i'm scared this may swallow important errors, but i get a million of these # on Linux and it doesn't seem to affect anything -grin log.warning("Can't send data to dht: EWOULDBLOCK") else: log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)", len(data), peer.address, peer.udp_port, str(err), err.errno) if isinstance(message, RequestDatagram): self.sent_messages[message.rpc_id][1].set_exception(err) else: raise err if isinstance(message, RequestDatagram): self.peer_manager.report_last_sent(peer.address, peer.udp_port) elif isinstance(message, ErrorDatagram): self.peer_manager.report_failure(peer.address, peer.udp_port) def change_token(self): self.old_token_secret = self.token_secret self.token_secret = constants.generate_id() def make_token(self, compact_ip): return constants.digest(self.token_secret + compact_ip) def verify_token(self, token, compact_ip): h = constants.HASH_CLASS() h.update(self.token_secret + compact_ip) if self.old_token_secret and not token == h.digest(): # TODO: why should we be accepting the previous token? h = constants.HASH_CLASS() h.update(self.old_token_secret + compact_ip) if not token == h.digest(): return False return True async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer', # pylint: disable=too-many-return-statements retry: bool = True) -> typing.Tuple[bytes, bool]: async def __store(): res = await self.get_rpc_peer(peer).store(hash_value) if res != b"OK": raise ValueError(res) log.debug("Stored %s to %s", hash_value.hex()[:8], peer) return peer.node_id, True try: return await __store() except asyncio.TimeoutError: log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer) return peer.node_id, False except ValueError as err: log.error("Unexpected response: %s", err) return peer.node_id, False except RemoteException as err: if 'findValue() takes exactly 2 arguments (5 given)' in str(err): log.debug("peer %s:%i is running an incompatible version of lbrynet", peer.address, peer.udp_port) return peer.node_id, False if 'Invalid token' not in str(err): log.warning("Unexpected error while storing blob_hash: %s", err) return peer.node_id, False self.peer_manager.clear_token(peer.node_id) if not retry: return peer.node_id, False return await self.store_to_peer(hash_value, peer, retry=False)