diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index 83dd08704..152ff0b57 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -4,13 +4,14 @@ import typing import binascii from lbry.utils import resolve_host from lbry.dht import constants +from lbry.dht.peer import make_kademlia_peer from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.iterative_find import IterativeNodeFinder, IterativeValueFinder from lbry.dht.protocol.protocol import KademliaProtocol -from lbry.dht.peer import KademliaPeer if typing.TYPE_CHECKING: from lbry.dht.peer import PeerManager + from lbry.dht.peer import KademliaPeer log = logging.getLogger(__name__) @@ -141,7 +142,7 @@ class Node: if known_node_addresses: peers = [ - KademliaPeer(self.loop, address, udp_port=port) + make_kademlia_peer(None, address, port) for (address, port) in known_node_addresses ] while True: @@ -232,7 +233,7 @@ class Node: if not peer.udp_port: udp_port_to_try = peer.tcp_port if not peer.udp_port: - peer.update_udp_port(udp_port_to_try) + peer = make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) self.loop.create_task(ping(peer)) else: log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) diff --git a/lbry/lbry/dht/peer.py b/lbry/lbry/dht/peer.py index 3bc2f7e75..83a26f65e 100644 --- a/lbry/lbry/dht/peer.py +++ b/lbry/lbry/dht/peer.py @@ -3,6 +3,7 @@ import asyncio import logging import ipaddress from binascii import hexlify +from dataclasses import dataclass, field from functools import lru_cache from lbry.dht import constants @@ -11,6 +12,13 @@ from lbry.dht.serialization.datagram import make_compact_address, make_compact_i log = logging.getLogger(__name__) +@lru_cache(1024) +def make_kademlia_peer(node_id: typing.Optional[bytes], address: typing.Optional[str], + udp_port: typing.Optional[int] = None, + tcp_port: typing.Optional[int] = None) -> 'KademliaPeer': + return KademliaPeer(address, node_id, udp_port, tcp_port=tcp_port) + + def is_valid_ipv4(address): try: ip = ipaddress.ip_address(address) @@ -81,10 +89,6 @@ class PeerManager: self._node_id_mapping[(address, udp_port)] = node_id self._node_id_reverse_mapping[node_id] = (address, udp_port) - @lru_cache(maxsize=400) - def get_kademlia_peer(self, node_id: bytes, address: str, udp_port: int) -> 'KademliaPeer': - return KademliaPeer(self._loop, address, node_id, udp_port) - def prune(self): # TODO: periodically call this now = self._loop.time() to_pop = [] @@ -116,6 +120,8 @@ class PeerManager: previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None)) last_requested = self._last_requested.get((address, udp_port)) last_replied = self._last_replied.get((address, udp_port)) + if node_id is None: + return None if most_recent_failure and last_replied: if delay < last_replied > most_recent_failure: return True @@ -135,47 +141,31 @@ class PeerManager: def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': node_id, address, tcp_port = decode_compact_address(compact_address) - return KademliaPeer(self._loop, address, node_id, tcp_port=tcp_port) + return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) +@dataclass(unsafe_hash=True) class KademliaPeer: - __slots__ = [ - 'loop', - '_node_id', - 'address', - 'udp_port', - 'tcp_port', - 'protocol_version', - ] + address: str = field(hash=True) + _node_id: typing.Optional[bytes] = field(hash=True) + udp_port: typing.Optional[int] = field(hash=True) + tcp_port: typing.Optional[int] = field(compare=False, hash=False) + protocol_version: typing.Optional[int] = field(default=1, compare=False, hash=False) - def __init__(self, loop: asyncio.AbstractEventLoop, address: str, node_id: typing.Optional[bytes] = None, - udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None): - if node_id is not None: - if not len(node_id) == constants.hash_length: - raise ValueError("invalid node_id: {}".format(hexlify(node_id).decode())) - if udp_port is not None and not 0 <= udp_port <= 65536: + def __post_init__(self): + if self._node_id is not None: + if not len(self._node_id) == constants.hash_length: + raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode())) + if self.udp_port is not None and not 1 <= self.udp_port <= 65535: raise ValueError("invalid udp port") - if tcp_port and not 0 <= tcp_port <= 65536: + if self.tcp_port is not None and not 1 <= self.tcp_port <= 65535: raise ValueError("invalid tcp port") - if not is_valid_ipv4(address): + if not is_valid_ipv4(self.address): raise ValueError("invalid ip address") - self.loop = loop - self._node_id = node_id - self.address = address - self.udp_port = udp_port - self.tcp_port = tcp_port - self.protocol_version = 1 def update_tcp_port(self, tcp_port: int): self.tcp_port = tcp_port - def update_udp_port(self, udp_port: int): - self.udp_port = udp_port - - def set_id(self, node_id): - if not self._node_id: - self._node_id = node_id - @property def node_id(self) -> bytes: return self._node_id @@ -188,11 +178,3 @@ class KademliaPeer: def compact_ip(self): return make_compact_ip(self.address) - - def __eq__(self, other): - if not isinstance(other, KademliaPeer): - raise TypeError("invalid type to compare with Peer: %s" % str(type(other))) - return (self.node_id, self.address, self.udp_port) == (other.node_id, other.address, other.udp_port) - - def __hash__(self): - return hash((self.node_id, self.address, self.udp_port)) diff --git a/lbry/lbry/dht/protocol/iterative_find.py b/lbry/lbry/dht/protocol/iterative_find.py index 8f0e26437..748324ad4 100644 --- a/lbry/lbry/dht/protocol/iterative_find.py +++ b/lbry/lbry/dht/protocol/iterative_find.py @@ -7,6 +7,7 @@ import logging from lbry.dht import constants from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.distance import Distance +from lbry.dht.peer import make_kademlia_peer from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -150,7 +151,7 @@ class IterativeFinder: self._add_active(peer) for contact_triple in response.get_close_triples(): node_id, address, udp_port = contact_triple - self._add_active(self.peer_manager.get_kademlia_peer(node_id, address, udp_port)) + self._add_active(make_kademlia_peer(node_id, address, udp_port)) self.check_result_ready(response) async def _send_probe(self, peer: 'KademliaPeer'): diff --git a/lbry/lbry/dht/protocol/protocol.py b/lbry/lbry/dht/protocol/protocol.py index 68940f35b..838afb2db 100644 --- a/lbry/lbry/dht/protocol/protocol.py +++ b/lbry/lbry/dht/protocol/protocol.py @@ -15,6 +15,7 @@ from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.routing_table import TreeRoutingTable from lbry.dht.protocol.data_store import DictDataStore +from lbry.dht.peer import make_kademlia_peer if typing.TYPE_CHECKING: from lbry.dht.peer import PeerManager, KademliaPeer @@ -322,6 +323,9 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): + if not peer.node_id: + log.warning("Tried adding a peer with no node id!") + return False for p in self.routing_table.get_peers(): if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: self.routing_table.remove_peer(p) @@ -447,7 +451,7 @@ class KademliaProtocol(DatagramProtocol): try: peer = self.routing_table.get_peer(request_datagram.node_id) except IndexError: - peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1]) + peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1]) try: self._handle_rpc(peer, request_datagram) # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it @@ -494,8 +498,7 @@ class KademliaProtocol(DatagramProtocol): elif response_datagram.node_id == self.node_id: df.set_exception(RemoteException("incoming message is from our node id")) return - peer.set_id(response_datagram.node_id) - peer.update_udp_port(address[1]) + peer = make_kademlia_peer(response_datagram.node_id, address[0], address[1]) self.peer_manager.report_last_replied(address[0], address[1]) self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) if not df.cancelled(): diff --git a/lbry/lbry/extras/daemon/Daemon.py b/lbry/lbry/extras/daemon/Daemon.py index 1399683e0..dc1a10f06 100644 --- a/lbry/lbry/extras/daemon/Daemon.py +++ b/lbry/lbry/extras/daemon/Daemon.py @@ -23,6 +23,7 @@ from lbry import utils from lbry.conf import Config, Setting from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer from lbry.blob_exchange.downloader import download_blob +from lbry.dht.peer import make_kademlia_peer from lbry.error import DownloadSDTimeout, ComponentsNotStarted from lbry.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet from lbry.extras import system_info @@ -3655,8 +3656,7 @@ class Daemon(metaclass=JSONRPCServerType): """ peer = None if node_id and address and port: - peer = self.component_manager.peer_manager.get_kademlia_peer(unhexlify(node_id), address, - udp_port=int(port)) + peer = make_kademlia_peer(unhexlify(node_id), address, udp_port=int(port)) try: return await self.dht_node.protocol.get_rpc_peer(peer).ping() except asyncio.TimeoutError: diff --git a/lbry/lbry/stream/downloader.py b/lbry/lbry/stream/downloader.py index 17363b37e..fae884f6f 100644 --- a/lbry/lbry/stream/downloader.py +++ b/lbry/lbry/stream/downloader.py @@ -2,11 +2,12 @@ import asyncio import typing import logging import binascii + +from lbry.dht.peer import make_kademlia_peer from lbry.error import DownloadSDTimeout from lbry.utils import resolve_host, lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader -from lbry.dht.peer import KademliaPeer if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.dht.node import Node @@ -50,7 +51,7 @@ class StreamDownloader: def _delayed_add_fixed_peers(): self.added_fixed_peers = True self.peer_queue.put_nowait([ - KademliaPeer(self.loop, address=address, tcp_port=port + 1) + make_kademlia_peer(None, address, None, tcp_port=port + 1) for address, port in addresses ]) diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index bdf4249db..d8d4c2bd8 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -3,7 +3,7 @@ from binascii import hexlify from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager, KademliaPeer +from lbry.dht.peer import PeerManager, make_kademlia_peer from torba.testcase import AsyncioTestCase @@ -39,7 +39,7 @@ class DHTIntegrationTest(AsyncioTestCase): bad_peers = [] for candidate in self.nodes[1:10]: address, port, node_id = candidate.protocol.external_ip, candidate.protocol.udp_port, candidate.protocol.node_id - peer = KademliaPeer(self.loop, address, node_id, port) + peer = make_kademlia_peer(node_id, address, udp_port=port) bad_peers.append(peer) node.protocol.add_peer(peer) candidate.stop() @@ -102,7 +102,7 @@ class DHTIntegrationTest(AsyncioTestCase): node2.stop() # forcefully make it a bad peer but don't remove it from routing table address, port, node_id = node2.protocol.external_ip, node2.protocol.udp_port, node2.protocol.node_id - peer = KademliaPeer(self.loop, address, node_id, port) + peer = make_kademlia_peer(node_id, address, udp_port=port) self.assertTrue(node1.protocol.peer_manager.peer_is_good(peer)) node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) diff --git a/lbry/tests/unit/blob_exchange/test_transfer_blob.py b/lbry/tests/unit/blob_exchange/test_transfer_blob.py index 5c9a0c587..cf9945d3a 100644 --- a/lbry/tests/unit/blob_exchange/test_transfer_blob.py +++ b/lbry/tests/unit/blob_exchange/test_transfer_blob.py @@ -12,7 +12,8 @@ from lbry.extras.daemon.storage import SQLiteStorage from lbry.blob.blob_manager import BlobManager from lbry.blob_exchange.server import BlobServer, BlobServerProtocol from lbry.blob_exchange.client import request_blob -from lbry.dht.peer import KademliaPeer, PeerManager +from lbry.dht.peer import PeerManager, make_kademlia_peer + # import logging # logging.getLogger("lbry").setLevel(logging.DEBUG) @@ -43,7 +44,7 @@ class BlobExchangeTestBase(AsyncioTestCase): self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite")) self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config) self.client_peer_manager = PeerManager(self.loop) - self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + self.server_from_client = make_kademlia_peer(b'1' * 48, "127.0.0.1", tcp_port=33333) await self.client_storage.open() await self.server_storage.open() @@ -102,7 +103,7 @@ class TestBlobExchange(BlobExchangeTestBase): second_client_blob_manager = BlobManager( self.loop, second_client_dir, second_client_storage, second_client_conf ) - server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + server_from_second_client = make_kademlia_peer(b'1' * 48, "127.0.0.1", tcp_port=33333) await second_client_storage.open() await second_client_blob_manager.setup() @@ -193,7 +194,7 @@ class TestBlobExchange(BlobExchangeTestBase): second_client_blob_manager = BlobManager( self.loop, second_client_dir, second_client_storage, second_client_conf ) - server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + server_from_second_client = make_kademlia_peer(b'1' * 48, "127.0.0.1", tcp_port=33333) await second_client_storage.open() await second_client_blob_manager.setup() diff --git a/lbry/tests/unit/dht/protocol/test_data_store.py b/lbry/tests/unit/dht/protocol/test_data_store.py index 97d61e7ec..fe383b7b7 100644 --- a/lbry/tests/unit/dht/protocol/test_data_store.py +++ b/lbry/tests/unit/dht/protocol/test_data_store.py @@ -1,7 +1,7 @@ import asyncio from unittest import mock, TestCase from lbry.dht.protocol.data_store import DictDataStore -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, make_kademlia_peer class DataStoreTests(TestCase): @@ -13,7 +13,7 @@ class DataStoreTests(TestCase): def _test_add_peer_to_blob(self, blob=b'2' * 48, node_id=b'1' * 48, address='1.2.3.4', tcp_port=3333, udp_port=4444): - peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port) + peer = make_kademlia_peer(node_id, address, udp_port) peer.update_tcp_port(tcp_port) before = self.data_store.get_peers_for_blob(blob) self.data_store.add_peer_to_blob(peer, blob) diff --git a/lbry/tests/unit/dht/protocol/test_kbucket.py b/lbry/tests/unit/dht/protocol/test_kbucket.py index eff936b42..9a5ecc15d 100644 --- a/lbry/tests/unit/dht/protocol/test_kbucket.py +++ b/lbry/tests/unit/dht/protocol/test_kbucket.py @@ -2,7 +2,7 @@ import struct import asyncio from lbry.utils import generate_id from lbry.dht.protocol.routing_table import KBucket -from lbry.dht.peer import PeerManager, KademliaPeer +from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht import constants from torba.testcase import AsyncioTestCase @@ -29,8 +29,8 @@ class TestKBucket(AsyncioTestCase): self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id()) def test_add_peer(self): - peer = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4444) - peer_update2 = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4445) + peer = make_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4444) + peer_update2 = make_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4445) self.assertListEqual([], self.kbucket.peers) @@ -59,12 +59,12 @@ class TestKBucket(AsyncioTestCase): # Test if contacts can be added to empty list # Add k contacts to bucket for i in range(constants.k): - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertTrue(self.kbucket.add_peer(peer)) self.assertEqual(peer, self.kbucket.peers[i]) # Test if contact is not added to full list - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertFalse(self.kbucket.add_peer(peer)) # Test if an existing contact is updated correctly if added again @@ -125,13 +125,13 @@ class TestKBucket(AsyncioTestCase): def test_remove_peer(self): # try remove contact from empty list - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertRaises(ValueError, self.kbucket.remove_peer, peer) added = [] # Add couple contacts for i in range(constants.k-2): - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = make_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertTrue(self.kbucket.add_peer(peer)) added.append(peer) diff --git a/lbry/tests/unit/dht/protocol/test_protocol.py b/lbry/tests/unit/dht/protocol/test_protocol.py index 00c0078f4..407fc6c7d 100644 --- a/lbry/tests/unit/dht/protocol/test_protocol.py +++ b/lbry/tests/unit/dht/protocol/test_protocol.py @@ -5,7 +5,7 @@ from tests import dht_mocks from lbry.dht.serialization.bencoding import bencode, bdecode from lbry.dht import constants from lbry.dht.protocol.protocol import KademliaProtocol -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, make_kademlia_peer class TestProtocol(AsyncioTestCase): @@ -22,7 +22,7 @@ class TestProtocol(AsyncioTestCase): await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) - peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) + peer = make_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) result = await peer2.get_rpc_peer(peer).ping() self.assertEqual(result, b'pong') peer1.stop() @@ -43,7 +43,7 @@ class TestProtocol(AsyncioTestCase): await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) - peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) + peer = make_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) self.assertEqual(None, peer2.peer_manager.get_node_token(peer.node_id)) await peer2.get_rpc_peer(peer).find_value(b'1' * 48) self.assertNotEqual(None, peer2.peer_manager.get_node_token(peer.node_id)) @@ -65,12 +65,12 @@ class TestProtocol(AsyncioTestCase): await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) - peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) - peer2_from_peer1 = peer1.peer_manager.get_kademlia_peer( + peer = make_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) + peer2_from_peer1 = make_kademlia_peer( peer2.node_id, peer2.external_ip, udp_port=peer2.udp_port ) peer2_from_peer1.update_tcp_port(3333) - peer3 = peer1.peer_manager.get_kademlia_peer( + peer3 = make_kademlia_peer( constants.generate_id(), '1.2.3.6', udp_port=4444 ) store_result = await peer2.store_to_peer(b'2' * 48, peer) @@ -103,7 +103,7 @@ class TestProtocol(AsyncioTestCase): ) await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444)) proto.start() - return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port) + return proto, make_kademlia_peer(node_id, address, udp_port=udp_port) async def test_add_peer_after_handle_request(self): with dht_mocks.mock_network_loop(self.loop): @@ -129,7 +129,7 @@ class TestProtocol(AsyncioTestCase): peer1.routing_table.remove_peer(peer_2_from_peer_1) # peers not known by be good/bad should be enqueued to maybe-ping - peer1_from_peer3 = peer3.get_rpc_peer(peer3.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1_from_peer3 = peer3.get_rpc_peer(make_kademlia_peer(node_id1, '1.2.3.4', 4444)) self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) pong = await peer1_from_peer3.ping() self.assertEqual(b'pong', pong) @@ -137,7 +137,7 @@ class TestProtocol(AsyncioTestCase): peer1.ping_queue._pending_contacts.clear() # peers who are already good should be added - peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1_from_peer4 = peer4.get_rpc_peer(make_kademlia_peer(node_id1, '1.2.3.4', 4444)) peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) peer1.peer_manager.report_last_replied('1.2.3.7', 4444) self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) @@ -149,7 +149,7 @@ class TestProtocol(AsyncioTestCase): peer1.routing_table.buckets[0].peers.clear() # peers who are known to be bad recently should not be added or maybe-pinged - peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1_from_peer4 = peer4.get_rpc_peer(make_kademlia_peer(node_id1, '1.2.3.4', 4444)) peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) peer1.peer_manager.report_failure('1.2.3.7', 4444) peer1.peer_manager.report_failure('1.2.3.7', 4444) diff --git a/lbry/tests/unit/dht/protocol/test_routing_table.py b/lbry/tests/unit/dht/protocol/test_routing_table.py index d380fdc90..c2d017302 100644 --- a/lbry/tests/unit/dht/protocol/test_routing_table.py +++ b/lbry/tests/unit/dht/protocol/test_routing_table.py @@ -3,8 +3,7 @@ from torba.testcase import AsyncioTestCase from tests import dht_mocks from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager - +from lbry.dht.peer import PeerManager, make_kademlia_peer expected_ranges = [ ( @@ -29,6 +28,7 @@ expected_ranges = [ ) ] + class TestRouting(AsyncioTestCase): async def test_fill_one_bucket(self): loop = asyncio.get_event_loop() @@ -53,7 +53,7 @@ class TestRouting(AsyncioTestCase): for i in range(1, len(peer_addresses)): self.assertEqual(len(node_1.protocol.routing_table.get_peers()), contact_cnt) node = nodes[i] - peer = node_1.protocol.peer_manager.get_kademlia_peer( + peer = make_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) @@ -66,6 +66,16 @@ class TestRouting(AsyncioTestCase): for node in nodes.values(): node.protocol.stop() + async def test_cant_add_peer_without_a_node_id_gracefully(self): + loop = asyncio.get_event_loop() + node = Node(loop, PeerManager(loop), constants.generate_id(), 4444, 4444, 3333, '1.2.3.4') + bad_peer = make_kademlia_peer(None, '1.2.3.4', 5555) + with self.assertLogs(level='WARNING') as logged: + self.assertFalse(await node.protocol._add_peer(bad_peer)) + self.assertEqual(1, len(logged.output)) + self.assertTrue(logged.output[0].endswith('Tried adding a peer with no node id!')) + + async def test_split_buckets(self): loop = asyncio.get_event_loop() peer_addresses = [ @@ -81,7 +91,7 @@ class TestRouting(AsyncioTestCase): node_1 = nodes[0] for i in range(1, len(peer_addresses)): node = nodes[i] - peer = node_1.protocol.peer_manager.get_kademlia_peer( + peer = make_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) diff --git a/lbry/tests/unit/dht/test_blob_announcer.py b/lbry/tests/unit/dht/test_blob_announcer.py index b3bdaaa41..12fb0455f 100644 --- a/lbry/tests/unit/dht/test_blob_announcer.py +++ b/lbry/tests/unit/dht/test_blob_announcer.py @@ -8,7 +8,7 @@ from tests import dht_mocks from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht.blob_announcer import BlobAnnouncer from lbry.extras.daemon.storage import SQLiteStorage @@ -35,7 +35,7 @@ class TestBlobAnnouncer(AsyncioTestCase): self.nodes.update({len(self.nodes): n}) if add_to_routing_table: self.node.protocol.add_peer( - self.peer_manager.get_kademlia_peer( + make_kademlia_peer( n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port ) ) @@ -67,7 +67,7 @@ class TestBlobAnnouncer(AsyncioTestCase): await self.add_peer(node_id, address, False) last_node = self.nodes[len(self.nodes) - 1] peer = last_node.protocol.get_rpc_peer( - last_node.protocol.peer_manager.get_kademlia_peer( + make_kademlia_peer( previous_last_node.protocol.node_id, previous_last_node.protocol.external_ip, previous_last_node.protocol.udp_port ) @@ -129,12 +129,12 @@ class TestBlobAnnouncer(AsyncioTestCase): announced_to = self.nodes[0] for i in range(1, peer_count): node = self.nodes[i] - kad_peer = announced_to.protocol.peer_manager.get_kademlia_peer( + kad_peer = make_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port ) await announced_to.protocol._add_peer(kad_peer) peer = node.protocol.get_rpc_peer( - node.protocol.peer_manager.get_kademlia_peer( + make_kademlia_peer( announced_to.protocol.node_id, announced_to.protocol.external_ip, announced_to.protocol.udp_port diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index 8828c63f0..03902b553 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -4,7 +4,7 @@ from torba.testcase import AsyncioTestCase from tests import dht_mocks from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, make_kademlia_peer class TestNodePingQueueDiscover(AsyncioTestCase): @@ -42,7 +42,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase): for i in range(1, len(peer_addresses)): node = nodes[i] assert node.protocol.node_id != node_1.protocol.node_id - peer = node_1.protocol.peer_manager.get_kademlia_peer( + peer = make_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) futs.append(node_1.protocol.get_rpc_peer(peer).ping()) diff --git a/lbry/tests/unit/dht/test_peer.py b/lbry/tests/unit/dht/test_peer.py index 51a0c15b7..a82a1a484 100644 --- a/lbry/tests/unit/dht/test_peer.py +++ b/lbry/tests/unit/dht/test_peer.py @@ -1,7 +1,7 @@ import asyncio import unittest from lbry.utils import generate_id -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, make_kademlia_peer from torba.testcase import AsyncioTestCase @@ -10,20 +10,30 @@ class PeerTest(AsyncioTestCase): self.loop = asyncio.get_event_loop() self.peer_manager = PeerManager(self.loop) self.node_ids = [generate_id(), generate_id(), generate_id()] - self.first_contact = self.peer_manager.get_kademlia_peer(self.node_ids[1], '127.0.0.1', udp_port=1000) - self.second_contact = self.peer_manager.get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) + self.first_contact = make_kademlia_peer(self.node_ids[1], '127.0.0.1', udp_port=1000) + self.second_contact = make_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) + + def test_peer_is_good_unknown_peer(self): + # Scenario: peer replied, but caller doesn't know the node_id. + # Outcome: We can't say it's good or bad. + # (yes, we COULD tell the node id, but not here. It would be + # a side effect and the caller is responsible to discover it) + peer = make_kademlia_peer(None, '1.2.3.4', 4444) + self.peer_manager.report_last_requested('1.2.3.4', 4444) + self.peer_manager.report_last_replied('1.2.3.4', 4444) + self.assertIsNone(self.peer_manager.peer_is_good(peer)) def test_make_contact_error_cases(self): - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20', 100000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20.1', 1000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], 'this is not an ip', 1000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20', -1000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, b'not valid node id', '192.168.1.20', 1000) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.1.20', 100000) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.1.20.1', 1000) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], 'this is not an ip', 1000) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.1.20', -1000) + self.assertRaises(ValueError, make_kademlia_peer, b'not valid node id', '192.168.1.20', 1000) def test_boolean(self): self.assertNotEqual(self.first_contact, self.second_contact) self.assertEqual( - self.second_contact, self.peer_manager.get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) + self.second_contact, make_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) ) def test_compact_ip(self): diff --git a/lbry/tests/unit/stream/test_managed_stream.py b/lbry/tests/unit/stream/test_managed_stream.py index 8a881d8d3..fb58a2168 100644 --- a/lbry/tests/unit/stream/test_managed_stream.py +++ b/lbry/tests/unit/stream/test_managed_stream.py @@ -7,8 +7,7 @@ from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob_exchange.serialization import BlobResponse from lbry.blob_exchange.server import BlobServerProtocol from lbry.dht.node import Node -from lbry.dht.peer import KademliaPeer -from lbry.extras.daemon.storage import StoredStreamClaim +from lbry.dht.peer import make_kademlia_peer from lbry.stream.managed_stream import ManagedStream from lbry.stream.descriptor import StreamDescriptor from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase @@ -95,7 +94,7 @@ class TestManagedStream(BlobExchangeTestBase): mock_node = mock.Mock(spec=Node) q = asyncio.Queue() - bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334) + bad_peer = make_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334) def _mock_accumulate_peers(q1, q2): async def _task():