From af0ad417dfcba2fca98bdf9e37451c0ed1c4361d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 11 Oct 2022 23:27:56 -0300 Subject: [PATCH] generalize DownloadSDTimeout to DownloadMetadata timeout + fix usages --- lbry/error/README.md | 4 ++-- lbry/error/__init__.py | 6 +++--- lbry/extras/daemon/daemon.py | 4 ++-- lbry/file/file_manager.py | 6 +++--- lbry/stream/downloader.py | 4 ++-- lbry/stream/managed_stream.py | 4 ++-- lbry/torrent/torrent_manager.py | 8 +++++++- tests/unit/stream/test_stream_manager.py | 10 +++++----- 8 files changed, 26 insertions(+), 20 deletions(-) diff --git a/lbry/error/README.md b/lbry/error/README.md index cc5ab7a3b..2c747408e 100644 --- a/lbry/error/README.md +++ b/lbry/error/README.md @@ -81,8 +81,8 @@ Code | Name | Message 511 | CorruptBlob | Blobs is corrupted. 520 | BlobFailedEncryption | Failed to encrypt blob. 531 | DownloadCancelled | Download was canceled. -532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout. -533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout. +532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout. +533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout. 534 | InvalidStreamDescriptor | {message} 535 | InvalidData | {message} 536 | InvalidBlobHash | {message} diff --git a/lbry/error/__init__.py b/lbry/error/__init__.py index 7e18f5bf9..88886487e 100644 --- a/lbry/error/__init__.py +++ b/lbry/error/__init__.py @@ -411,18 +411,18 @@ class DownloadCancelledError(BlobError): super().__init__("Download was canceled.") -class DownloadSDTimeoutError(BlobError): +class DownloadMetadataTimeoutError(BlobError): def __init__(self, download): self.download = download - super().__init__(f"Failed to download sd blob {download} within timeout.") + super().__init__(f"Failed to download metadata for {download} within timeout.") class DownloadDataTimeoutError(BlobError): def __init__(self, download): self.download = download - super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.") + super().__init__(f"Failed to download data blobs for {download} within timeout.") class InvalidStreamDescriptorError(BlobError): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index ac8782d7f..f6f725b32 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -36,7 +36,7 @@ from lbry.blob.blob_file import is_valid_blobhash, BlobBuffer from lbry.blob_exchange.downloader import download_blob from lbry.dht.peer import make_kademlia_peer from lbry.error import ( - DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError, + DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError, CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError, ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError, InputValueError @@ -1140,7 +1140,7 @@ class Daemon(metaclass=JSONRPCServerType): save_file=save_file, wallet=wallet ) if not stream: - raise DownloadSDTimeoutError(uri) + raise DownloadMetadataTimeoutError(uri) except Exception as e: # TODO: use error from lbry.error log.warning("Error downloading %s: %s", uri, str(e)) diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 9cf666a81..9e2ae510f 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -3,7 +3,7 @@ import logging import typing from typing import Optional from aiohttp.web import Request -from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError +from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError from lbry.error import InvalidStreamURLError from lbry.stream.managed_stream import ManagedStream @@ -247,10 +247,10 @@ class FileManager: await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download)) return stream except asyncio.TimeoutError: - error = DownloadDataTimeoutError(stream.sd_hash) + error = DownloadDataTimeoutError(stream.identifier) raise error except Exception as err: # forgive data timeout, don't delete stream - expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, + expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError) if isinstance(err, expected): log.warning("Failed to download %s: %s", uri, str(err)) diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 39e24b37e..cf7923912 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -4,7 +4,7 @@ import logging import binascii from lbry.dht.node import get_kademlia_peers_from_hosts -from lbry.error import DownloadSDTimeoutError +from lbry.error import DownloadMetadataTimeoutError from lbry.utils import lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader @@ -77,7 +77,7 @@ class StreamDownloader: log.info("downloaded sd blob %s", self.sd_hash) self.time_to_descriptor = self.loop.time() - now except asyncio.TimeoutError: - raise DownloadSDTimeoutError(self.sd_hash) + raise DownloadMetadataTimeoutError(self.sd_hash) # parse the descriptor self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index b08962820..3c57cefd5 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -5,7 +5,7 @@ import typing import logging from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable -from lbry.error import DownloadSDTimeoutError +from lbry.error import DownloadMetadataTimeoutError from lbry.schema.mime_types import guess_media_type from lbry.stream.downloader import StreamDownloader from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name @@ -160,7 +160,7 @@ class ManagedStream(ManagedDownloadSource): await asyncio.wait_for(self.downloader.start(), timeout) except asyncio.TimeoutError: self._running.clear() - raise DownloadSDTimeoutError(self.sd_hash) + raise DownloadMetadataTimeoutError(self.identifier) if self.delayed_stop_task and not self.delayed_stop_task.done(): self.delayed_stop_task.cancel() diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 35e59f5a1..b4b52bdb0 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable +from lbry.error import DownloadMetadataTimeoutError from lbry.file.source_manager import SourceManager from lbry.file.source import ManagedDownloadSource from lbry.schema.mime_types import guess_media_type @@ -57,7 +58,12 @@ class TorrentSource(ManagedDownloadSource): return guess_media_type(os.path.basename(self.full_path))[0] async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): - await self.torrent_session.add_torrent(self.identifier, self.download_directory) + try: + metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory) + await asyncio.wait_for(metadata_download, timeout, loop=self.loop) + except asyncio.TimeoutError: + self.torrent_session.remove_torrent(btih=self.identifier) + raise DownloadMetadataTimeoutError(self.identifier) self.download_directory = self.torrent_session.save_path(self.identifier) self._file_name = Path(self.torrent_session.full_path(self.identifier)).name await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index ba6d8dbc8..26c961b62 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -11,7 +11,7 @@ from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from lbry.testcase import get_fake_exchange_rate_manager from lbry.utils import generate_id from lbry.error import InsufficientFundsError -from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadSDTimeoutError, DownloadDataTimeoutError +from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadMetadataTimeoutError, DownloadDataTimeoutError from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database from lbry.wallet.constants import CENT, NULL_HASH32 from lbry.wallet.network import ClientSession @@ -232,7 +232,7 @@ class TestStreamManager(BlobExchangeTestBase): event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.' ) - await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError, after_setup=after_setup) + await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError, after_setup=after_setup) async def test_override_fixed_peer_delay_dht_disabled(self): self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)] @@ -266,7 +266,7 @@ class TestStreamManager(BlobExchangeTestBase): def check_post(event): self.assertEqual(event['event'], 'Time To First Bytes') - self.assertEqual(event['properties']['error'], 'DownloadSDTimeoutError') + self.assertEqual(event['properties']['error'], 'DownloadMetadataTimeoutError') self.assertEqual(event['properties']['tried_peers_count'], 0) self.assertEqual(event['properties']['active_peer_count'], 0) self.assertFalse(event['properties']['use_fixed_peers']) @@ -277,7 +277,7 @@ class TestStreamManager(BlobExchangeTestBase): ) start = self.loop.time() - await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError) + await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError) duration = self.loop.time() - start self.assertLessEqual(duration, 5) self.assertGreaterEqual(duration, 3.0) @@ -387,7 +387,7 @@ class TestStreamManager(BlobExchangeTestBase): self.server.stop_server() await self.setup_stream_manager() await self._test_download_error_analytics_on_start( - DownloadSDTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1 + DownloadMetadataTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1 ) async def test_download_data_timeout(self):