From 30e8728f7fd88ffb3ffc5805cd107ec7a8c57bb8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 5 Mar 2022 04:15:04 -0300 Subject: [PATCH] use tracker on download --- lbry/stream/downloader.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 0ef627248..874715c6f 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -1,4 +1,6 @@ import asyncio +import ipaddress +import time import typing import logging import binascii @@ -8,6 +10,8 @@ from lbry.error import DownloadSDTimeoutError from lbry.utils import lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader +from lbry.torrent.tracker import get_peer_list + if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.dht.node import Node @@ -36,6 +40,7 @@ class StreamDownloader: self.added_fixed_peers = False self.time_to_descriptor: typing.Optional[float] = None self.time_to_first_bytes: typing.Optional[float] = None + self.next_tracker_announce_time = None async def cached_read_blob(blob_info: 'BlobInfo') -> bytes: return await self.read_blob(blob_info, 2) @@ -62,6 +67,32 @@ class StreamDownloader: fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers) self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers) + async def refresh_from_trackers(self, save_peers=True): + if not self.config.tracker_servers: + return + node_id = self.node.protocol.node_id if self.node else None + port = self.config.tcp_port + for server in self.config.tracker_servers: + try: + announcement = await get_peer_list( + bytes.fromhex(self.sd_hash)[:20], node_id, port, server[0], server[1]) + self.next_tracker_announce_time = max(self.next_tracker_announce_time or 0, + time.time() + announcement.interval) + except asyncio.CancelledError: + raise + except asyncio.TimeoutError: + log.warning("Tracker timed out: %s", server) + return + except Exception: + log.exception("Unexpected error querying tracker %s", server) + return + if not save_peers: + return + peers = [(str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers] + peers = await get_kademlia_peers_from_hosts(peers) + log.info("Found %d peers from tracker %s for %s", len(peers), server, self.sd_hash[:8]) + self.peer_queue.put_nowait(peers) + async def load_descriptor(self, connection_id: int = 0): # download or get the sd blob sd_blob = self.blob_manager.get_blob(self.sd_hash) @@ -91,6 +122,7 @@ class StreamDownloader: self.accumulate_task.cancel() _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) await self.add_fixed_peers() + asyncio.ensure_future(self.refresh_from_trackers()) # start searching for peers for the sd hash self.search_queue.put_nowait(self.sd_hash) log.info("searching for peers for stream %s", self.sd_hash)