From 2089059792149b9be0a45fae9e432fa832ec7275 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 28 Jan 2020 22:37:52 -0300 Subject: [PATCH] pylint --- lbry/extras/daemon/components.py | 6 +- lbry/file/file_manager.py | 48 +++------------ lbry/file/source.py | 16 +---- lbry/file/source_manager.py | 20 +++--- lbry/stream/managed_stream.py | 5 +- lbry/stream/stream_manager.py | 78 +++--------------------- lbry/torrent/session.py | 60 +++++++++--------- lbry/torrent/torrent.py | 16 ++--- tests/unit/stream/test_stream_manager.py | 3 +- 9 files changed, 67 insertions(+), 185 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index ff6de3c61..046faca2a 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -24,10 +24,8 @@ from lbry.extras.daemon.storage import SQLiteStorage from lbry.wallet import WalletManager from lbry.wallet.usage_payment import WalletServerPayer try: - import libtorrent from lbry.torrent.session import TorrentSession except ImportError: - libtorrent = None TorrentSession = None log = logging.getLogger(__name__) @@ -343,7 +341,7 @@ class FileManagerComponent(Component): if not self.file_manager: return return { - 'managed_files': len(self.file_manager._sources), + 'managed_files': len(self.file_manager.get_filtered()), } async def start(self): @@ -386,7 +384,7 @@ class TorrentComponent(Component): } async def start(self): - if libtorrent: + if TorrentSession: self.torrent_session = TorrentSession(asyncio.get_event_loop(), None) await self.torrent_session.bind() # TODO: specify host/port diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 871b94d47..8da9a5619 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -1,38 +1,28 @@ -import time import asyncio -import binascii import logging import typing from typing import Optional from aiohttp.web import Request -from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError +from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError from lbry.stream.managed_stream import ManagedStream from lbry.utils import cache_concurrent -from lbry.schema.claim import Claim from lbry.schema.url import URL from lbry.wallet.dewies import dewies_to_lbc -from lbry.wallet.transaction import Output from lbry.file.source_manager import SourceManager from lbry.file.source import ManagedDownloadSource if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.storage import SQLiteStorage - from lbry.wallet import LbryWalletManager + from lbry.wallet import WalletManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager log = logging.getLogger(__name__) -def path_or_none(p) -> Optional[str]: - if not p: - return - return binascii.unhexlify(p).decode() - - class FileManager: - def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'LbryWalletManager', + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'WalletManager', storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None): self.loop = loop self.config = config @@ -141,8 +131,9 @@ class FileManager: log.info("claim contains an update to a stream we have, downloading it") if save_file and existing_for_claim_id[0].output_file_exists: save_file = False - await existing_for_claim_id[0].start(node=self.node, timeout=timeout, save_now=save_file) - if not existing_for_claim_id[0].output_file_exists and (save_file or file_name or download_directory): + await existing_for_claim_id[0].start(timeout=timeout, save_now=save_file) + if not existing_for_claim_id[0].output_file_exists and ( + save_file or file_name or download_directory): await existing_for_claim_id[0].save_file( file_name=file_name, download_directory=download_directory ) @@ -176,8 +167,8 @@ class FileManager: if not claim.stream.source.bt_infohash: stream = ManagedStream( - self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, download_directory, - file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, + self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, + download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, analytics_manager=self.analytics_manager ) else: @@ -262,29 +253,6 @@ class FileManager: """ return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), []) - async def _check_update_or_replace( - self, outpoint: str, claim_id: str, claim: Claim - ) -> typing.Tuple[Optional[ManagedDownloadSource], Optional[ManagedDownloadSource]]: - existing = self.get_filtered(outpoint=outpoint) - if existing: - return existing[0], None - existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash) - if existing and existing[0].claim_id != claim_id: - raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}") - if existing: - log.info("claim contains a metadata only update to a stream we have") - await self.storage.save_content_claim( - existing[0].stream_hash, outpoint - ) - await self._update_content_claim(existing[0]) - return existing[0], None - else: - existing_for_claim_id = self.get_filtered(claim_id=claim_id) - if existing_for_claim_id: - log.info("claim contains an update to a stream we have, downloading it") - return None, existing_for_claim_id[0] - return None, None - async def delete(self, source: ManagedDownloadSource, delete_file=False): for manager in self.source_managers.values(): return await manager.delete(source, delete_file) diff --git a/lbry/file/source.py b/lbry/file/source.py index 05d8fb35d..81ab0ee82 100644 --- a/lbry/file/source.py +++ b/lbry/file/source.py @@ -16,20 +16,6 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -# def _get_next_available_file_name(download_directory: str, file_name: str) -> str: -# base_name, ext = os.path.splitext(os.path.basename(file_name)) -# i = 0 -# while os.path.isfile(os.path.join(download_directory, file_name)): -# i += 1 -# file_name = "%s_%i%s" % (base_name, i, ext) -# -# return file_name -# -# -# async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download_directory: str, file_name: str) -> str: -# return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) - - class ManagedDownloadSource: STATUS_RUNNING = "running" STATUS_STOPPED = "stopped" @@ -71,7 +57,7 @@ class ManagedDownloadSource: # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource': # raise NotImplementedError() - async def start(self, timeout: Optional[float] = None): + async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): raise NotImplementedError() async def stop(self, finished: bool = False): diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 56ba5fd5f..9eada3cca 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -1,6 +1,5 @@ import os import asyncio -import binascii import logging import typing from typing import Optional @@ -12,7 +11,7 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -comparison_operators = { +COMPARISON_OPERATORS = { 'eq': lambda a, b: a == b, 'ne': lambda a, b: a != b, 'g': lambda a, b: a > b, @@ -22,12 +21,6 @@ comparison_operators = { } -def path_or_none(p) -> Optional[str]: - if not p: - return - return binascii.unhexlify(p).decode() - - class SourceManager: filter_fields = { 'rowid', @@ -77,10 +70,11 @@ class SourceManager: source.stop_tasks() self.started.clear() - async def create(self, file_path: str, key: Optional[bytes] = None, **kw) -> ManagedDownloadSource: + async def create(self, file_path: str, key: Optional[bytes] = None, + iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource: raise NotImplementedError() - async def _delete(self, source: ManagedDownloadSource): + async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): raise NotImplementedError() async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): @@ -101,11 +95,11 @@ class SourceManager: """ if sort_by and sort_by not in self.filter_fields: raise ValueError(f"'{sort_by}' is not a valid field to sort by") - if comparison and comparison not in comparison_operators: + if comparison and comparison not in COMPARISON_OPERATORS: raise ValueError(f"'{comparison}' is not a valid comparison") if 'full_status' in search_by: del search_by['full_status'] - for search in search_by.keys(): + for search in search_by: if search not in self.filter_fields: raise ValueError(f"'{search}' is not a valid search operation") if search_by: @@ -113,7 +107,7 @@ class SourceManager: sources = [] for stream in self._sources.values(): for search, val in search_by.items(): - if comparison_operators[comparison](getattr(stream, search), val): + if COMPARISON_OPERATORS[comparison](getattr(stream, search), val): sources.append(stream) break else: diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 34696c442..2e00a8d65 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -6,7 +6,6 @@ import logging import binascii from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable -from lbry.utils import generate_id from lbry.error import DownloadSDTimeoutError from lbry.schema.mime_types import guess_media_type from lbry.stream.downloader import StreamDownloader @@ -21,7 +20,6 @@ if typing.TYPE_CHECKING: from lbry.schema.claim import Claim from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_info import BlobInfo - from lbry.extras.daemon.storage import SQLiteStorage from lbry.dht.node import Node from lbry.extras.daemon.analytics import AnalyticsManager from lbry.wallet.transaction import Transaction @@ -289,8 +287,7 @@ class ManagedStream(ManagedDownloadSource): self.saving.clear() self.finished_write_attempt.set() - async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None, - node: Optional['Node'] = None): + async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): await self.start() if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task self.file_output_task.cancel() diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index caee74b73..ad232b28b 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -23,24 +23,10 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -SET_FILTER_FIELDS = { - "claim_ids": "claim_id", - "channel_claim_ids": "channel_claim_id", - "outpoints": "outpoint" -} -COMPARISON_OPERATORS = { - 'eq': lambda a, b: a == b, - 'ne': lambda a, b: a != b, - 'g': lambda a, b: a > b, - 'l': lambda a, b: a < b, - 'ge': lambda a, b: a >= b, - 'le': lambda a, b: a <= b, - 'in': lambda a, b: a in b -} -def path_or_none(p) -> Optional[str]: - if not p: +def path_or_none(encoded_path) -> Optional[str]: + if not encoded_path: return - return binascii.unhexlify(p).decode() + return binascii.unhexlify(encoded_path).decode() class StreamManager(SourceManager): @@ -235,60 +221,10 @@ class StreamManager(SourceManager): await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.storage.delete(stream.descriptor) - def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, - comparison: Optional[str] = None, - **search_by) -> typing.List[ManagedStream]: - """ - Get a list of filtered and sorted ManagedStream objects - - :param sort_by: field to sort by - :param reverse: reverse sorting - :param comparison: comparison operator used for filtering - :param search_by: fields and values to filter by - """ - if sort_by and sort_by not in FILTER_FIELDS: - raise ValueError(f"'{sort_by}' is not a valid field to sort by") - if comparison and comparison not in COMPARISON_OPERATORS: - raise ValueError(f"'{comparison}' is not a valid comparison") - if 'full_status' in search_by: - del search_by['full_status'] - - for search in search_by: - if search not in FILTER_FIELDS: - raise ValueError(f"'{search}' is not a valid search operation") - - compare_sets = {} - if isinstance(search_by.get('claim_id'), list): - compare_sets['claim_ids'] = search_by.pop('claim_id') - if isinstance(search_by.get('outpoint'), list): - compare_sets['outpoints'] = search_by.pop('outpoint') - if isinstance(search_by.get('channel_claim_id'), list): - compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id') - - if search_by: - comparison = comparison or 'eq' - streams = [] - for stream in self.streams.values(): - matched = False - for set_search, val in compare_sets.items(): - if COMPARISON_OPERATORS[comparison](getattr(stream, SET_FILTER_FIELDS[set_search]), val): - streams.append(stream) - matched = True - break - if matched: - continue - for search, val in search_by.items(): - this_stream = getattr(stream, search) - if COMPARISON_OPERATORS[comparison](this_stream, val): - streams.append(stream) - break - else: - streams = list(self.streams.values()) - if sort_by: - streams.sort(key=lambda s: getattr(s, sort_by)) - if reverse: - streams.reverse() - return streams + async def _delete(self, source: ManagedStream, delete_file: Optional[bool] = False): + blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]] + await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) + await self.storage.delete_stream(source.descriptor) async def stream_partial_content(self, request: Request, sd_hash: str): return await self._sources[sd_hash].stream_file(request, self.node) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 5214042e7..294a2cba5 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -4,29 +4,29 @@ import libtorrent NOTIFICATION_MASKS = [ - "error", - "peer", - "port_mapping", - "storage", - "tracker", - "debug", - "status", - "progress", - "ip_block", - "dht", - "stats", - "session_log", - "torrent_log", - "peer_log", - "incoming_request", - "dht_log", - "dht_operation", - "port_mapping_log", - "picker_log", - "file_progress", - "piece_progress", - "upload", - "block_progress" + "error", + "peer", + "port_mapping", + "storage", + "tracker", + "debug", + "status", + "progress", + "ip_block", + "dht", + "stats", + "session_log", + "torrent_log", + "peer_log", + "incoming_request", + "dht_log", + "dht_operation", + "port_mapping_log", + "picker_log", + "file_progress", + "piece_progress", + "upload", + "block_progress" ] @@ -90,11 +90,12 @@ class TorrentSession: 'enable_incoming_tcp': True } self._session = await self._loop.run_in_executor( - self._executor, libtorrent.session, settings + self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member ) await self._loop.run_in_executor( self._executor, - lambda: self._session.add_dht_router("router.utorrent.com", 6881) + # lambda necessary due boost functions raising errors when asyncio inspects them. try removing later + lambda: self._session.add_dht_router("router.utorrent.com", 6881) # pylint: disable=unnecessary-lambda ) self._loop.create_task(self.process_alerts()) @@ -110,12 +111,11 @@ class TorrentSession: await asyncio.sleep(1, loop=self._loop) async def pause(self): - state = await self._loop.run_in_executor( - self._executor, lambda: self._session.save_state() - ) - # print(f"state:\n{state}") await self._loop.run_in_executor( - self._executor, lambda: self._session.pause() + self._executor, lambda: self._session.save_state() # pylint: disable=unnecessary-lambda + ) + await self._loop.run_in_executor( + self._executor, lambda: self._session.pause() # pylint: disable=unnecessary-lambda ) async def resume(self): diff --git a/lbry/torrent/torrent.py b/lbry/torrent/torrent.py index bbbc487bf..04a8544c7 100644 --- a/lbry/torrent/torrent.py +++ b/lbry/torrent/torrent.py @@ -18,17 +18,17 @@ class TorrentInfo: self.total_size = total_size @classmethod - def from_libtorrent_info(cls, ti): + def from_libtorrent_info(cls, torrent_info): return cls( - ti.nodes(), tuple( + torrent_info.nodes(), tuple( { 'url': web_seed['url'], 'type': web_seed['type'], 'auth': web_seed['auth'] - } for web_seed in ti.web_seeds() + } for web_seed in torrent_info.web_seeds() ), tuple( - (tracker.url, tracker.tier) for tracker in ti.trackers() - ), ti.total_size() + (tracker.url, tracker.tier) for tracker in torrent_info.trackers() + ), torrent_info.total_size() ) @@ -41,9 +41,11 @@ class Torrent: def _threaded_update_status(self): status = self._handle.status() if not status.is_seeding: - log.info('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' % ( + log.info( + '%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s', status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, - status.num_peers, status.state)) + status.num_peers, status.state + ) elif not self.finished.is_set(): self.finished.set() diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index e33064503..b14f63de6 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -302,7 +302,8 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "stopped") - await stream.save_file(node=self.stream_manager.node) + stream.node = self.stream_manager.node + await stream.save_file() await stream.finished_writing.wait() await asyncio.sleep(0, loop=self.loop) self.assertTrue(stream.finished)