diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 0d9684443..1f78979b7 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -1,5 +1,4 @@ import asyncio -import ipaddress import typing import logging import binascii @@ -9,10 +8,9 @@ from lbry.error import DownloadSDTimeoutError from lbry.utils import lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader -from lbry.torrent.tracker import subscribe_hash +from lbry.torrent.tracker import enqueue_tracker_search if typing.TYPE_CHECKING: - from lbry.torrent.tracker import AnnounceResponse from lbry.conf import Config from lbry.dht.node import Node from lbry.blob.blob_manager import BlobManager @@ -66,13 +64,6 @@ class StreamDownloader: fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers) self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers) - async def _process_announcement(self, announcement: 'AnnounceResponse'): - peers = await get_kademlia_peers_from_hosts([ - (str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers if peer.port > 1024 - ]) - log.info("Found %d peers from tracker for %s", len(peers), self.sd_hash[:8]) - self.peer_queue.put_nowait(peers) - async def load_descriptor(self, connection_id: int = 0): # download or get the sd blob sd_blob = self.blob_manager.get_blob(self.sd_hash) @@ -102,8 +93,7 @@ class StreamDownloader: self.accumulate_task.cancel() _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) await self.add_fixed_peers() - subscribe_hash( - bytes.fromhex(self.sd_hash), lambda result: asyncio.ensure_future(self._process_announcement(result))) + enqueue_tracker_search(bytes.fromhex(self.sd_hash), self.peer_queue) # start searching for peers for the sd hash self.search_queue.put_nowait(self.sd_hash) log.info("searching for peers for stream %s", self.sd_hash) diff --git a/lbry/torrent/tracker.py b/lbry/torrent/tracker.py index 0eb64b793..14b782f62 100644 --- a/lbry/torrent/tracker.py +++ b/lbry/torrent/tracker.py @@ -199,11 +199,7 @@ class TrackerClient: async def get_kademlia_peer_list(self, info_hash): responses = await self.get_peer_list(info_hash, no_port=True) - peers = [ - (str(ipaddress.ip_address(peer.address)), peer.port) - for ann in responses for peer in ann.peers if peer.port > 1024 # filter out privileged and 0 - ] - return await get_kademlia_peers_from_hosts(peers) + return await announcement_to_kademlia_peers(*responses) async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False, no_port=False): result = None @@ -229,8 +225,20 @@ class TrackerClient: return result -def subscribe_hash(info_hash: bytes, on_data): - TrackerClient.EVENT_CONTROLLER.add(('search', info_hash, on_data)) +def enqueue_tracker_search(info_hash: bytes, peer_q: asyncio.Queue): + async def on_announcement(announcement: AnnounceResponse): + peers = await announcement_to_kademlia_peers(announcement) + log.info("Found %d peers from tracker for %s", len(peers), info_hash.hex()[:8]) + peer_q.put_nowait(peers) + TrackerClient.EVENT_CONTROLLER.add(('search', info_hash, on_announcement)) + + +def announcement_to_kademlia_peers(*announcements: AnnounceResponse): + peers = [ + (str(ipaddress.ip_address(peer.address)), peer.port) + for announcement in announcements for peer in announcement.peers if peer.port > 1024 # no privileged or 0 + ] + return get_kademlia_peers_from_hosts(peers) class UDPTrackerServerProtocol(asyncio.DatagramProtocol): # for testing. Not suitable for production diff --git a/tests/unit/torrent/test_tracker.py b/tests/unit/torrent/test_tracker.py index a411cddb0..32e4846a1 100644 --- a/tests/unit/torrent/test_tracker.py +++ b/tests/unit/torrent/test_tracker.py @@ -2,7 +2,8 @@ import asyncio import random from lbry.testcase import AsyncioTestCase -from lbry.torrent.tracker import CompactIPv4Peer, TrackerClient, subscribe_hash, UDPTrackerServerProtocol, encode_peer +from lbry.dht.peer import KademliaPeer +from lbry.torrent.tracker import CompactIPv4Peer, TrackerClient, enqueue_tracker_search, UDPTrackerServerProtocol, encode_peer class UDPTrackerClientTestCase(AsyncioTestCase): @@ -46,10 +47,9 @@ class UDPTrackerClientTestCase(AsyncioTestCase): async def test_announce_using_helper_function(self): info_hash = random.getrandbits(160).to_bytes(20, "big", signed=False) queue = asyncio.Queue() - subscribe_hash(info_hash, queue.put) - announcement = await queue.get() - peers = announcement.peers - self.assertEqual(peers, [CompactIPv4Peer(int.from_bytes(bytes([127, 0, 0, 1]), "big", signed=False), 4444)]) + enqueue_tracker_search(info_hash, queue) + peers = await queue.get() + self.assertEqual(peers, [KademliaPeer('127.0.0.1', None, None, 4444, allow_localhost=True)]) async def test_error(self): info_hash = random.getrandbits(160).to_bytes(20, "big", signed=False) @@ -85,8 +85,8 @@ class UDPTrackerClientTestCase(AsyncioTestCase): peer = (f"127.0.0.{random.randint(1, 255)}", random.randint(2000, 65500)) fake_peers.append(peer) server.add_peer(info_hash, *peer) - response = [] - subscribe_hash(info_hash, response.append) + peer_q = asyncio.Queue() + enqueue_tracker_search(info_hash, peer_q) await asyncio.sleep(0) await asyncio.gather(*self.client.tasks.values()) - self.assertEqual(11, len(response)) + self.assertEqual(11, peer_q.qsize())