diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index 83dd08704..f53a9a1a0 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -7,10 +7,10 @@ from lbry.dht import constants from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.iterative_find import IterativeNodeFinder, IterativeValueFinder from lbry.dht.protocol.protocol import KademliaProtocol -from lbry.dht.peer import KademliaPeer if typing.TYPE_CHECKING: - from lbry.dht.peer import PeerManager + from lbry.dht.peer import PeerManager, get_kademlia_peer + from lbry.dht.peer import KademliaPeer log = logging.getLogger(__name__) @@ -141,7 +141,7 @@ class Node: if known_node_addresses: peers = [ - KademliaPeer(self.loop, address, udp_port=port) + get_kademlia_peer(None, address, port) for (address, port) in known_node_addresses ] while True: diff --git a/lbry/lbry/dht/peer.py b/lbry/lbry/dht/peer.py index 3bc2f7e75..24bcf24d7 100644 --- a/lbry/lbry/dht/peer.py +++ b/lbry/lbry/dht/peer.py @@ -3,12 +3,23 @@ import asyncio import logging import ipaddress from binascii import hexlify -from functools import lru_cache +import pylru from lbry.dht import constants from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address log = logging.getLogger(__name__) +peer_cache = pylru.lrucache(size=512) + + +def get_kademlia_peer(node_id: typing.Optional[bytes], address: typing.Optional[str], + udp_port: typing.Optional[int], + tcp_port: typing.Optional[int] = None) -> 'KademliaPeer': + node = peer_cache.get(address, None) if address else None + if not node: + node = KademliaPeer(address, node_id, udp_port, tcp_port=tcp_port) + peer_cache.setdefault(address, node) + return node def is_valid_ipv4(address): @@ -81,10 +92,6 @@ class PeerManager: self._node_id_mapping[(address, udp_port)] = node_id self._node_id_reverse_mapping[node_id] = (address, udp_port) - @lru_cache(maxsize=400) - def get_kademlia_peer(self, node_id: bytes, address: str, udp_port: int) -> 'KademliaPeer': - return KademliaPeer(self._loop, address, node_id, udp_port) - def prune(self): # TODO: periodically call this now = self._loop.time() to_pop = [] @@ -135,7 +142,7 @@ class PeerManager: def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': node_id, address, tcp_port = decode_compact_address(compact_address) - return KademliaPeer(self._loop, address, node_id, tcp_port=tcp_port) + return get_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port) class KademliaPeer: @@ -148,7 +155,7 @@ class KademliaPeer: 'protocol_version', ] - def __init__(self, loop: asyncio.AbstractEventLoop, address: str, node_id: typing.Optional[bytes] = None, + def __init__(self, address: str, node_id: typing.Optional[bytes] = None, udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None): if node_id is not None: if not len(node_id) == constants.hash_length: @@ -159,7 +166,6 @@ class KademliaPeer: raise ValueError("invalid tcp port") if not is_valid_ipv4(address): raise ValueError("invalid ip address") - self.loop = loop self._node_id = node_id self.address = address self.udp_port = udp_port diff --git a/lbry/lbry/stream/downloader.py b/lbry/lbry/stream/downloader.py index 17363b37e..f01a98d1d 100644 --- a/lbry/lbry/stream/downloader.py +++ b/lbry/lbry/stream/downloader.py @@ -2,11 +2,12 @@ import asyncio import typing import logging import binascii + +from lbry.dht.peer import get_kademlia_peer from lbry.error import DownloadSDTimeout from lbry.utils import resolve_host, lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader -from lbry.dht.peer import KademliaPeer if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.dht.node import Node @@ -50,7 +51,7 @@ class StreamDownloader: def _delayed_add_fixed_peers(): self.added_fixed_peers = True self.peer_queue.put_nowait([ - KademliaPeer(self.loop, address=address, tcp_port=port + 1) + get_kademlia_peer(None, address, None, tcp_port=port + 1) for address, port in addresses ]) diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index bdf4249db..2258babe2 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -3,7 +3,7 @@ from binascii import hexlify from lbry.dht import constants from lbry.dht.node import Node -from lbry.dht.peer import PeerManager, KademliaPeer +from lbry.dht.peer import PeerManager, get_kademlia_peer from torba.testcase import AsyncioTestCase @@ -39,7 +39,7 @@ class DHTIntegrationTest(AsyncioTestCase): bad_peers = [] for candidate in self.nodes[1:10]: address, port, node_id = candidate.protocol.external_ip, candidate.protocol.udp_port, candidate.protocol.node_id - peer = KademliaPeer(self.loop, address, node_id, port) + peer = get_kademlia_peer(node_id, address, udp_port=port) bad_peers.append(peer) node.protocol.add_peer(peer) candidate.stop() @@ -102,7 +102,7 @@ class DHTIntegrationTest(AsyncioTestCase): node2.stop() # forcefully make it a bad peer but don't remove it from routing table address, port, node_id = node2.protocol.external_ip, node2.protocol.udp_port, node2.protocol.node_id - peer = KademliaPeer(self.loop, address, node_id, port) + peer = get_kademlia_peer(node_id, address, udp_port=port) self.assertTrue(node1.protocol.peer_manager.peer_is_good(peer)) node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) diff --git a/lbry/tests/unit/blob_exchange/test_transfer_blob.py b/lbry/tests/unit/blob_exchange/test_transfer_blob.py index 5c9a0c587..932f5056d 100644 --- a/lbry/tests/unit/blob_exchange/test_transfer_blob.py +++ b/lbry/tests/unit/blob_exchange/test_transfer_blob.py @@ -12,7 +12,8 @@ from lbry.extras.daemon.storage import SQLiteStorage from lbry.blob.blob_manager import BlobManager from lbry.blob_exchange.server import BlobServer, BlobServerProtocol from lbry.blob_exchange.client import request_blob -from lbry.dht.peer import KademliaPeer, PeerManager +from lbry.dht.peer import PeerManager, get_kademlia_peer + # import logging # logging.getLogger("lbry").setLevel(logging.DEBUG) @@ -43,7 +44,7 @@ class BlobExchangeTestBase(AsyncioTestCase): self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite")) self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config) self.client_peer_manager = PeerManager(self.loop) - self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + self.server_from_client = get_kademlia_peer(b'1' * 48, "127.0.0.1", udp_port=33333) await self.client_storage.open() await self.server_storage.open() @@ -102,7 +103,7 @@ class TestBlobExchange(BlobExchangeTestBase): second_client_blob_manager = BlobManager( self.loop, second_client_dir, second_client_storage, second_client_conf ) - server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + server_from_second_client = get_kademlia_peer(b'1' * 48, "127.0.0.1", udp_port=33333) await second_client_storage.open() await second_client_blob_manager.setup() @@ -193,7 +194,7 @@ class TestBlobExchange(BlobExchangeTestBase): second_client_blob_manager = BlobManager( self.loop, second_client_dir, second_client_storage, second_client_conf ) - server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + server_from_second_client = get_kademlia_peer(b'1' * 48, "127.0.0.1", udp_port=33333) await second_client_storage.open() await second_client_blob_manager.setup() diff --git a/lbry/tests/unit/dht/protocol/test_kbucket.py b/lbry/tests/unit/dht/protocol/test_kbucket.py index eff936b42..f34c7c10f 100644 --- a/lbry/tests/unit/dht/protocol/test_kbucket.py +++ b/lbry/tests/unit/dht/protocol/test_kbucket.py @@ -2,7 +2,7 @@ import struct import asyncio from lbry.utils import generate_id from lbry.dht.protocol.routing_table import KBucket -from lbry.dht.peer import PeerManager, KademliaPeer +from lbry.dht.peer import PeerManager, KademliaPeer, get_kademlia_peer from lbry.dht import constants from torba.testcase import AsyncioTestCase @@ -29,8 +29,8 @@ class TestKBucket(AsyncioTestCase): self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id()) def test_add_peer(self): - peer = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4444) - peer_update2 = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4445) + peer = get_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4444) + peer_update2 = get_kademlia_peer(constants.generate_id(2), "1.2.3.4", udp_port=4445) self.assertListEqual([], self.kbucket.peers) diff --git a/lbry/tests/unit/stream/test_managed_stream.py b/lbry/tests/unit/stream/test_managed_stream.py index 8a881d8d3..f475f1804 100644 --- a/lbry/tests/unit/stream/test_managed_stream.py +++ b/lbry/tests/unit/stream/test_managed_stream.py @@ -7,8 +7,7 @@ from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob_exchange.serialization import BlobResponse from lbry.blob_exchange.server import BlobServerProtocol from lbry.dht.node import Node -from lbry.dht.peer import KademliaPeer -from lbry.extras.daemon.storage import StoredStreamClaim +from lbry.dht.peer import get_kademlia_peer from lbry.stream.managed_stream import ManagedStream from lbry.stream.descriptor import StreamDescriptor from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase @@ -95,7 +94,7 @@ class TestManagedStream(BlobExchangeTestBase): mock_node = mock.Mock(spec=Node) q = asyncio.Queue() - bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334) + bad_peer = get_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334) def _mock_accumulate_peers(q1, q2): async def _task():