diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index f53a9a1a0..b08aacb5f 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -4,12 +4,13 @@ import typing import binascii from lbry.utils import resolve_host from lbry.dht import constants +from lbry.dht.peer import get_kademlia_peer from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.iterative_find import IterativeNodeFinder, IterativeValueFinder from lbry.dht.protocol.protocol import KademliaProtocol if typing.TYPE_CHECKING: - from lbry.dht.peer import PeerManager, get_kademlia_peer + from lbry.dht.peer import PeerManager from lbry.dht.peer import KademliaPeer log = logging.getLogger(__name__) diff --git a/lbry/lbry/dht/peer.py b/lbry/lbry/dht/peer.py index 24bcf24d7..cf443283d 100644 --- a/lbry/lbry/dht/peer.py +++ b/lbry/lbry/dht/peer.py @@ -3,23 +3,20 @@ import asyncio import logging import ipaddress from binascii import hexlify +from dataclasses import dataclass, field +from functools import lru_cache -import pylru from lbry.dht import constants from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address log = logging.getLogger(__name__) -peer_cache = pylru.lrucache(size=512) +@lru_cache(1024) def get_kademlia_peer(node_id: typing.Optional[bytes], address: typing.Optional[str], udp_port: typing.Optional[int], tcp_port: typing.Optional[int] = None) -> 'KademliaPeer': - node = peer_cache.get(address, None) if address else None - if not node: - node = KademliaPeer(address, node_id, udp_port, tcp_port=tcp_port) - peer_cache.setdefault(address, node) - return node + return KademliaPeer(address, node_id, udp_port, tcp_port=tcp_port) def is_valid_ipv4(address): @@ -145,32 +142,24 @@ class PeerManager: return get_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) +@dataclass(unsafe_hash=True) class KademliaPeer: - __slots__ = [ - 'loop', - '_node_id', - 'address', - 'udp_port', - 'tcp_port', - 'protocol_version', - ] + address: str = field(hash=True) + _node_id: typing.Optional[bytes] = field(hash=True) + udp_port: typing.Optional[int] = field(hash=True) + tcp_port: typing.Optional[int] = field(compare=False) + protocol_version: typing.Optional[int] = field(default=1, compare=False) - def __init__(self, address: str, node_id: typing.Optional[bytes] = None, - udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None): - if node_id is not None: - if not len(node_id) == constants.hash_length: - raise ValueError("invalid node_id: {}".format(hexlify(node_id).decode())) - if udp_port is not None and not 0 <= udp_port <= 65536: + def __post_init__(self): + if self._node_id is not None: + if not len(self._node_id) == constants.hash_length: + raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode())) + if self.udp_port is not None and not 0 <= self.udp_port <= 65536: raise ValueError("invalid udp port") - if tcp_port and not 0 <= tcp_port <= 65536: + if self.tcp_port and not 0 <= self.tcp_port <= 65536: raise ValueError("invalid tcp port") - if not is_valid_ipv4(address): + if not is_valid_ipv4(self.address): raise ValueError("invalid ip address") - self._node_id = node_id - self.address = address - self.udp_port = udp_port - self.tcp_port = tcp_port - self.protocol_version = 1 def update_tcp_port(self, tcp_port: int): self.tcp_port = tcp_port @@ -194,11 +183,3 @@ class KademliaPeer: def compact_ip(self): return make_compact_ip(self.address) - - def __eq__(self, other): - if not isinstance(other, KademliaPeer): - raise TypeError("invalid type to compare with Peer: %s" % str(type(other))) - return (self.node_id, self.address, self.udp_port) == (other.node_id, other.address, other.udp_port) - - def __hash__(self): - return hash((self.node_id, self.address, self.udp_port)) diff --git a/lbry/lbry/dht/protocol/iterative_find.py b/lbry/lbry/dht/protocol/iterative_find.py index 8f0e26437..0520e1bec 100644 --- a/lbry/lbry/dht/protocol/iterative_find.py +++ b/lbry/lbry/dht/protocol/iterative_find.py @@ -7,6 +7,7 @@ import logging from lbry.dht import constants from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.distance import Distance +from lbry.dht.peer import get_kademlia_peer from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -150,7 +151,7 @@ class IterativeFinder: self._add_active(peer) for contact_triple in response.get_close_triples(): node_id, address, udp_port = contact_triple - self._add_active(self.peer_manager.get_kademlia_peer(node_id, address, udp_port)) + self._add_active(get_kademlia_peer(node_id, address, udp_port)) self.check_result_ready(response) async def _send_probe(self, peer: 'KademliaPeer'): diff --git a/lbry/lbry/dht/protocol/protocol.py b/lbry/lbry/dht/protocol/protocol.py index 68940f35b..d83001acb 100644 --- a/lbry/lbry/dht/protocol/protocol.py +++ b/lbry/lbry/dht/protocol/protocol.py @@ -15,6 +15,7 @@ from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY from lbry.dht.error import RemoteException, TransportNotConnected from lbry.dht.protocol.routing_table import TreeRoutingTable from lbry.dht.protocol.data_store import DictDataStore +from lbry.dht.peer import get_kademlia_peer if typing.TYPE_CHECKING: from lbry.dht.peer import PeerManager, KademliaPeer @@ -447,7 +448,7 @@ class KademliaProtocol(DatagramProtocol): try: peer = self.routing_table.get_peer(request_datagram.node_id) except IndexError: - peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1]) + peer = get_kademlia_peer(request_datagram.node_id, address[0], address[1]) try: self._handle_rpc(peer, request_datagram) # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it diff --git a/lbry/lbry/extras/daemon/Daemon.py b/lbry/lbry/extras/daemon/Daemon.py index 1399683e0..96b3f0fbb 100644 --- a/lbry/lbry/extras/daemon/Daemon.py +++ b/lbry/lbry/extras/daemon/Daemon.py @@ -23,6 +23,7 @@ from lbry import utils from lbry.conf import Config, Setting from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer from lbry.blob_exchange.downloader import download_blob +from lbry.dht.peer import get_kademlia_peer from lbry.error import DownloadSDTimeout, ComponentsNotStarted from lbry.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet from lbry.extras import system_info @@ -3655,8 +3656,7 @@ class Daemon(metaclass=JSONRPCServerType): """ peer = None if node_id and address and port: - peer = self.component_manager.peer_manager.get_kademlia_peer(unhexlify(node_id), address, - udp_port=int(port)) + peer = get_kademlia_peer(unhexlify(node_id), address, udp_port=int(port)) try: return await self.dht_node.protocol.get_rpc_peer(peer).ping() except asyncio.TimeoutError: diff --git a/lbry/tests/unit/dht/protocol/test_data_store.py b/lbry/tests/unit/dht/protocol/test_data_store.py index 97d61e7ec..2fb9ef37d 100644 --- a/lbry/tests/unit/dht/protocol/test_data_store.py +++ b/lbry/tests/unit/dht/protocol/test_data_store.py @@ -1,7 +1,7 @@ import asyncio from unittest import mock, TestCase from lbry.dht.protocol.data_store import DictDataStore -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, get_kademlia_peer class DataStoreTests(TestCase): @@ -13,7 +13,7 @@ class DataStoreTests(TestCase): def _test_add_peer_to_blob(self, blob=b'2' * 48, node_id=b'1' * 48, address='1.2.3.4', tcp_port=3333, udp_port=4444): - peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port) + peer = get_kademlia_peer(node_id, address, udp_port) peer.update_tcp_port(tcp_port) before = self.data_store.get_peers_for_blob(blob) self.data_store.add_peer_to_blob(peer, blob) diff --git a/lbry/tests/unit/dht/protocol/test_kbucket.py b/lbry/tests/unit/dht/protocol/test_kbucket.py index f34c7c10f..1fce59a05 100644 --- a/lbry/tests/unit/dht/protocol/test_kbucket.py +++ b/lbry/tests/unit/dht/protocol/test_kbucket.py @@ -2,7 +2,7 @@ import struct import asyncio from lbry.utils import generate_id from lbry.dht.protocol.routing_table import KBucket -from lbry.dht.peer import PeerManager, KademliaPeer, get_kademlia_peer +from lbry.dht.peer import PeerManager, get_kademlia_peer from lbry.dht import constants from torba.testcase import AsyncioTestCase @@ -59,12 +59,12 @@ class TestKBucket(AsyncioTestCase): # Test if contacts can be added to empty list # Add k contacts to bucket for i in range(constants.k): - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = get_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertTrue(self.kbucket.add_peer(peer)) self.assertEqual(peer, self.kbucket.peers[i]) # Test if contact is not added to full list - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = get_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertFalse(self.kbucket.add_peer(peer)) # Test if an existing contact is updated correctly if added again @@ -125,13 +125,13 @@ class TestKBucket(AsyncioTestCase): def test_remove_peer(self): # try remove contact from empty list - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = get_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertRaises(ValueError, self.kbucket.remove_peer, peer) added = [] # Add couple contacts for i in range(constants.k-2): - peer = self.peer_manager.get_kademlia_peer(generate_id(), next(self.address_generator), 4444) + peer = get_kademlia_peer(generate_id(), next(self.address_generator), 4444) self.assertTrue(self.kbucket.add_peer(peer)) added.append(peer) diff --git a/lbry/tests/unit/dht/protocol/test_protocol.py b/lbry/tests/unit/dht/protocol/test_protocol.py index 00c0078f4..f2567d3e2 100644 --- a/lbry/tests/unit/dht/protocol/test_protocol.py +++ b/lbry/tests/unit/dht/protocol/test_protocol.py @@ -5,7 +5,7 @@ from tests import dht_mocks from lbry.dht.serialization.bencoding import bencode, bdecode from lbry.dht import constants from lbry.dht.protocol.protocol import KademliaProtocol -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, get_kademlia_peer class TestProtocol(AsyncioTestCase): @@ -22,7 +22,7 @@ class TestProtocol(AsyncioTestCase): await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) - peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) + peer = get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) result = await peer2.get_rpc_peer(peer).ping() self.assertEqual(result, b'pong') peer1.stop() @@ -43,7 +43,7 @@ class TestProtocol(AsyncioTestCase): await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) - peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) + peer = get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) self.assertEqual(None, peer2.peer_manager.get_node_token(peer.node_id)) await peer2.get_rpc_peer(peer).find_value(b'1' * 48) self.assertNotEqual(None, peer2.peer_manager.get_node_token(peer.node_id)) @@ -65,12 +65,12 @@ class TestProtocol(AsyncioTestCase): await loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) await loop.create_datagram_endpoint(lambda: peer2, ('1.2.3.5', 4444)) - peer = peer2.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) - peer2_from_peer1 = peer1.peer_manager.get_kademlia_peer( + peer = get_kademlia_peer(node_id1, '1.2.3.4', udp_port=4444) + peer2_from_peer1 = get_kademlia_peer( peer2.node_id, peer2.external_ip, udp_port=peer2.udp_port ) peer2_from_peer1.update_tcp_port(3333) - peer3 = peer1.peer_manager.get_kademlia_peer( + peer3 = get_kademlia_peer( constants.generate_id(), '1.2.3.6', udp_port=4444 ) store_result = await peer2.store_to_peer(b'2' * 48, peer) @@ -103,7 +103,7 @@ class TestProtocol(AsyncioTestCase): ) await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444)) proto.start() - return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port) + return proto, get_kademlia_peer(node_id, address, udp_port=udp_port) async def test_add_peer_after_handle_request(self): with dht_mocks.mock_network_loop(self.loop): @@ -129,7 +129,7 @@ class TestProtocol(AsyncioTestCase): peer1.routing_table.remove_peer(peer_2_from_peer_1) # peers not known by be good/bad should be enqueued to maybe-ping - peer1_from_peer3 = peer3.get_rpc_peer(peer3.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1_from_peer3 = peer3.get_rpc_peer(get_kademlia_peer(node_id1, '1.2.3.4', 4444)) self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) pong = await peer1_from_peer3.ping() self.assertEqual(b'pong', pong) @@ -137,7 +137,7 @@ class TestProtocol(AsyncioTestCase): peer1.ping_queue._pending_contacts.clear() # peers who are already good should be added - peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1_from_peer4 = peer4.get_rpc_peer(get_kademlia_peer(node_id1, '1.2.3.4', 4444)) peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) peer1.peer_manager.report_last_replied('1.2.3.7', 4444) self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) @@ -149,7 +149,7 @@ class TestProtocol(AsyncioTestCase): peer1.routing_table.buckets[0].peers.clear() # peers who are known to be bad recently should not be added or maybe-pinged - peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1_from_peer4 = peer4.get_rpc_peer(get_kademlia_peer(node_id1, '1.2.3.4', 4444)) peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) peer1.peer_manager.report_failure('1.2.3.7', 4444) peer1.peer_manager.report_failure('1.2.3.7', 4444) diff --git a/lbry/tests/unit/dht/protocol/test_routing_table.py b/lbry/tests/unit/dht/protocol/test_routing_table.py index d380fdc90..236ecec24 100644 --- a/lbry/tests/unit/dht/protocol/test_routing_table.py +++ b/lbry/tests/unit/dht/protocol/test_routing_table.py @@ -3,8 +3,7 @@ from torba.testcase import AsyncioTestCase from tests import dht_mocks from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager - +from lbry.dht.peer import PeerManager, get_kademlia_peer expected_ranges = [ ( @@ -53,7 +52,7 @@ class TestRouting(AsyncioTestCase): for i in range(1, len(peer_addresses)): self.assertEqual(len(node_1.protocol.routing_table.get_peers()), contact_cnt) node = nodes[i] - peer = node_1.protocol.peer_manager.get_kademlia_peer( + peer = get_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) @@ -81,7 +80,7 @@ class TestRouting(AsyncioTestCase): node_1 = nodes[0] for i in range(1, len(peer_addresses)): node = nodes[i] - peer = node_1.protocol.peer_manager.get_kademlia_peer( + peer = get_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) diff --git a/lbry/tests/unit/dht/test_blob_announcer.py b/lbry/tests/unit/dht/test_blob_announcer.py index b3bdaaa41..521551168 100644 --- a/lbry/tests/unit/dht/test_blob_announcer.py +++ b/lbry/tests/unit/dht/test_blob_announcer.py @@ -8,7 +8,7 @@ from tests import dht_mocks from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, get_kademlia_peer from lbry.dht.blob_announcer import BlobAnnouncer from lbry.extras.daemon.storage import SQLiteStorage @@ -35,7 +35,7 @@ class TestBlobAnnouncer(AsyncioTestCase): self.nodes.update({len(self.nodes): n}) if add_to_routing_table: self.node.protocol.add_peer( - self.peer_manager.get_kademlia_peer( + get_kademlia_peer( n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port ) ) @@ -67,7 +67,7 @@ class TestBlobAnnouncer(AsyncioTestCase): await self.add_peer(node_id, address, False) last_node = self.nodes[len(self.nodes) - 1] peer = last_node.protocol.get_rpc_peer( - last_node.protocol.peer_manager.get_kademlia_peer( + get_kademlia_peer( previous_last_node.protocol.node_id, previous_last_node.protocol.external_ip, previous_last_node.protocol.udp_port ) @@ -129,12 +129,12 @@ class TestBlobAnnouncer(AsyncioTestCase): announced_to = self.nodes[0] for i in range(1, peer_count): node = self.nodes[i] - kad_peer = announced_to.protocol.peer_manager.get_kademlia_peer( + kad_peer = get_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port ) await announced_to.protocol._add_peer(kad_peer) peer = node.protocol.get_rpc_peer( - node.protocol.peer_manager.get_kademlia_peer( + get_kademlia_peer( announced_to.protocol.node_id, announced_to.protocol.external_ip, announced_to.protocol.udp_port diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index 8828c63f0..ae44d57b5 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -4,7 +4,7 @@ from torba.testcase import AsyncioTestCase from tests import dht_mocks from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, get_kademlia_peer class TestNodePingQueueDiscover(AsyncioTestCase): @@ -42,7 +42,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase): for i in range(1, len(peer_addresses)): node = nodes[i] assert node.protocol.node_id != node_1.protocol.node_id - peer = node_1.protocol.peer_manager.get_kademlia_peer( + peer = get_kademlia_peer( node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) futs.append(node_1.protocol.get_rpc_peer(peer).ping()) diff --git a/lbry/tests/unit/dht/test_peer.py b/lbry/tests/unit/dht/test_peer.py index 51a0c15b7..e9ffe3352 100644 --- a/lbry/tests/unit/dht/test_peer.py +++ b/lbry/tests/unit/dht/test_peer.py @@ -1,7 +1,7 @@ import asyncio import unittest from lbry.utils import generate_id -from lbry.dht.peer import PeerManager +from lbry.dht.peer import PeerManager, get_kademlia_peer from torba.testcase import AsyncioTestCase @@ -10,20 +10,20 @@ class PeerTest(AsyncioTestCase): self.loop = asyncio.get_event_loop() self.peer_manager = PeerManager(self.loop) self.node_ids = [generate_id(), generate_id(), generate_id()] - self.first_contact = self.peer_manager.get_kademlia_peer(self.node_ids[1], '127.0.0.1', udp_port=1000) - self.second_contact = self.peer_manager.get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) + self.first_contact = get_kademlia_peer(self.node_ids[1], '127.0.0.1', udp_port=1000) + self.second_contact = get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) def test_make_contact_error_cases(self): - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20', 100000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20.1', 1000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], 'this is not an ip', 1000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, self.node_ids[1], '192.168.1.20', -1000) - self.assertRaises(ValueError, self.peer_manager.get_kademlia_peer, b'not valid node id', '192.168.1.20', 1000) + self.assertRaises(ValueError, get_kademlia_peer, self.node_ids[1], '192.168.1.20', 100000) + self.assertRaises(ValueError, get_kademlia_peer, self.node_ids[1], '192.168.1.20.1', 1000) + self.assertRaises(ValueError, get_kademlia_peer, self.node_ids[1], 'this is not an ip', 1000) + self.assertRaises(ValueError, get_kademlia_peer, self.node_ids[1], '192.168.1.20', -1000) + self.assertRaises(ValueError, get_kademlia_peer, b'not valid node id', '192.168.1.20', 1000) def test_boolean(self): self.assertNotEqual(self.first_contact, self.second_contact) self.assertEqual( - self.second_contact, self.peer_manager.get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) + self.second_contact, get_kademlia_peer(self.node_ids[0], '192.168.0.1', udp_port=1000) ) def test_compact_ip(self):