From 41175a814b2181a38a975581e8ba339534dc3bb8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 28 Jan 2020 21:24:05 -0300 Subject: [PATCH] fix save from resolve --- lbry/extras/daemon/daemon.py | 24 +- lbry/extras/daemon/storage.py | 6 +- lbry/file/file_manager.py | 205 +++--------------- lbry/stream/managed_stream.py | 11 +- lbry/stream/stream_manager.py | 4 +- lbry/torrent/session.py | 7 +- .../datanetwork/test_file_commands.py | 2 +- tests/unit/stream/test_managed_stream.py | 6 +- 8 files changed, 62 insertions(+), 203 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 83299af91..01aad4ef9 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1921,7 +1921,7 @@ class Daemon(metaclass=JSONRPCServerType): sort = sort or 'rowid' comparison = comparison or 'eq' paginated = paginate_list( - self.file_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size + self.file_manager.get_filtered(sort, reverse, comparison, **kwargs), page, page_size ) if paginated['items']: receipts = { @@ -1959,12 +1959,12 @@ class Daemon(metaclass=JSONRPCServerType): if status not in ['start', 'stop']: raise Exception('Status must be "start" or "stop".') - streams = self.file_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered(**kwargs) if not streams: raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] if status == 'start' and not stream.running: - await stream.save_file(node=self.file_manager.node) + await stream.save_file() msg = "Resumed download" elif status == 'stop' and stream.running: await stream.stop() @@ -2008,7 +2008,7 @@ class Daemon(metaclass=JSONRPCServerType): (bool) true if deletion was successful """ - streams = self.file_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered(**kwargs) if len(streams) > 1: if not delete_all: @@ -2025,7 +2025,7 @@ class Daemon(metaclass=JSONRPCServerType): else: for stream in streams: message = f"Deleted file {stream.file_name}" - await self.file_manager.delete_stream(stream, delete_file=delete_from_download_dir) + await self.file_manager.delete(stream, delete_file=delete_from_download_dir) log.info(message) result = True return result @@ -2057,7 +2057,7 @@ class Daemon(metaclass=JSONRPCServerType): Returns: {File} """ - streams = self.file_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered(**kwargs) if len(streams) > 1: log.warning("There are %i matching files, use narrower filters to select one", len(streams)) @@ -4249,9 +4249,9 @@ class Daemon(metaclass=JSONRPCServerType): """ if not blob_hash or not is_valid_blobhash(blob_hash): return f"Invalid blob hash to delete '{blob_hash}'" - streams = self.file_manager.get_filtered_streams(sd_hash=blob_hash) + streams = self.file_manager.get_filtered(sd_hash=blob_hash) if streams: - await self.file_manager.delete_stream(streams[0]) + await self.file_manager.delete(streams[0]) else: await self.blob_manager.delete_blobs([blob_hash]) return "Deleted %s" % blob_hash @@ -4919,10 +4919,10 @@ class Daemon(metaclass=JSONRPCServerType): results = await self.ledger.resolve(accounts, urls) if self.conf.save_resolved_claims and results: try: - claims = self.file_manager._convert_to_old_resolve_output(self.wallet_manager, results) - await self.storage.save_claims_for_resolve([ - value for value in claims.values() if 'error' not in value - ]) + await self.storage.save_claim_from_output( + self.ledger, + *(result for result in results.values() if isinstance(result, Output)) + ) except DecodeError: pass return results diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 426985a48..8a03a456c 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -727,7 +727,7 @@ class SQLiteStorage(SQLiteMixin): if claim_id_to_supports: await self.save_supports(claim_id_to_supports) - def save_claim_from_output(self, ledger, output: Output): + def save_claim_from_output(self, ledger, *outputs: Output): return self.save_claims([{ "claim_id": output.claim_id, "name": output.claim_name, @@ -736,9 +736,9 @@ class SQLiteStorage(SQLiteMixin): "txid": output.tx_ref.id, "nout": output.position, "value": output.claim, - "height": -1, + "height": output.tx_ref.height, "claim_sequence": -1, - }]) + } for output in outputs]) def save_claims_for_resolve(self, claim_infos): to_save = {} diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 443c94a69..871b94d47 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -40,14 +40,28 @@ class FileManager: self.storage = storage self.analytics_manager = analytics_manager self.source_managers: typing.Dict[str, SourceManager] = {} + self.started = asyncio.Event() + + @property + def streams(self): + return self.source_managers['stream']._sources + + async def create_stream(self, file_path: str, key: Optional[bytes] = None, **kwargs) -> ManagedDownloadSource: + if 'stream' in self.source_managers: + return await self.source_managers['stream'].create(file_path, key, **kwargs) + raise NotImplementedError async def start(self): await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values())) + for manager in self.source_managers.values(): + await manager.started.wait() + self.started.set() def stop(self): - while self.source_managers: - _, source_manager = self.source_managers.popitem() - source_manager.stop() + for manager in self.source_managers.values(): + # fixme: pop or not? + manager.stop() + self.started.clear() @cache_concurrent async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', @@ -130,7 +144,7 @@ class FileManager: await existing_for_claim_id[0].start(node=self.node, timeout=timeout, save_now=save_file) if not existing_for_claim_id[0].output_file_exists and (save_file or file_name or download_directory): await existing_for_claim_id[0].save_file( - file_name=file_name, download_directory=download_directory, node=self.node + file_name=file_name, download_directory=download_directory ) to_replace = existing_for_claim_id[0] @@ -139,10 +153,10 @@ class FileManager: log.info("already have stream for %s", uri) if save_file and updated_stream.output_file_exists: save_file = False - await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) + await updated_stream.start(timeout=timeout, save_now=save_file) if not updated_stream.output_file_exists and (save_file or file_name or download_directory): await updated_stream.save_file( - file_name=file_name, download_directory=download_directory, node=self.node + file_name=file_name, download_directory=download_directory ) return updated_stream @@ -152,7 +166,7 @@ class FileManager: #################### if not to_replace and txo.has_price and not txo.purchase_receipt: - payment = await manager.create_purchase_transaction( + payment = await self.wallet_manager.create_purchase_transaction( wallet.accounts, txo, exchange_rate_manager ) @@ -171,7 +185,7 @@ class FileManager: log.info("starting download for %s", uri) before_download = self.loop.time() - await stream.start(source_manager.node, timeout) + await stream.start(timeout, save_file) #################### # success case: delete to_replace if applicable, broadcast fee payment @@ -190,7 +204,7 @@ class FileManager: await self.storage.save_content_claim(stream.stream_hash, outpoint) if save_file: - await asyncio.wait_for(stream.save_file(node=source_manager.node), timeout - (self.loop.time() - before_download), + await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download), loop=self.loop) return stream except asyncio.TimeoutError: @@ -235,7 +249,7 @@ class FileManager: ) async def stream_partial_content(self, request: Request, sd_hash: str): - return await self._sources[sd_hash].stream_file(request, self.node) + return await self.source_managers['stream'].stream_partial_content(request, sd_hash) def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]: """ @@ -246,7 +260,7 @@ class FileManager: :param comparison: comparison operator used for filtering :param search_by: fields and values to filter by """ - return sum(*(manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), []) + return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), []) async def _check_update_or_replace( self, outpoint: str, claim_id: str, claim: Claim @@ -271,169 +285,6 @@ class FileManager: return None, existing_for_claim_id[0] return None, None - - - # @cache_concurrent - # async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', - # timeout: Optional[float] = None, file_name: Optional[str] = None, - # download_directory: Optional[str] = None, - # save_file: Optional[bool] = None, resolve_timeout: float = 3.0, - # wallet: Optional['Wallet'] = None) -> ManagedDownloadSource: - # wallet = wallet or self.wallet_manager.default_wallet - # timeout = timeout or self.config.download_timeout - # start_time = self.loop.time() - # resolved_time = None - # stream = None - # txo: Optional[Output] = None - # error = None - # outpoint = None - # if save_file is None: - # save_file = self.config.save_files - # if file_name and not save_file: - # save_file = True - # if save_file: - # download_directory = download_directory or self.config.download_dir - # else: - # download_directory = None - # - # payment = None - # try: - # # resolve the claim - # if not URL.parse(uri).has_stream: - # raise ResolveError("cannot download a channel claim, specify a /path") - # try: - # response = await asyncio.wait_for( - # self.wallet_manager.ledger.resolve(wallet.accounts, [uri]), - # resolve_timeout - # ) - # resolved_result = {} - # for url, txo in response.items(): - # if isinstance(txo, Output): - # tx_height = txo.tx_ref.height - # best_height = self.wallet_manager.ledger.headers.height - # resolved_result[url] = { - # 'name': txo.claim_name, - # 'value': txo.claim, - # 'protobuf': binascii.hexlify(txo.claim.to_bytes()), - # 'claim_id': txo.claim_id, - # 'txid': txo.tx_ref.id, - # 'nout': txo.position, - # 'amount': dewies_to_lbc(txo.amount), - # 'effective_amount': txo.meta.get('effective_amount', 0), - # 'height': tx_height, - # 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, - # 'claim_sequence': -1, - # 'address': txo.get_address(self.wallet_manager.ledger), - # 'valid_at_height': txo.meta.get('activation_height', None), - # 'timestamp': self.wallet_manager.ledger.headers[tx_height]['timestamp'], - # 'supports': [] - # } - # else: - # resolved_result[url] = txo - # except asyncio.TimeoutError: - # raise ResolveTimeoutError(uri) - # except Exception as err: - # if isinstance(err, asyncio.CancelledError): - # raise - # log.exception("Unexpected error resolving stream:") - # raise ResolveError(f"Unexpected error resolving stream: {str(err)}") - # await self.storage.save_claims_for_resolve([ - # value for value in resolved_result.values() if 'error' not in value - # ]) - # - # resolved = resolved_result.get(uri, {}) - # resolved = resolved if 'value' in resolved else resolved.get('claim') - # if not resolved: - # raise ResolveError(f"Failed to resolve stream at '{uri}'") - # if 'error' in resolved: - # raise ResolveError(f"error resolving stream: {resolved['error']}") - # txo = response[uri] - # - # claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) - # outpoint = f"{resolved['txid']}:{resolved['nout']}" - # resolved_time = self.loop.time() - start_time - # - # # resume or update an existing stream, if the stream changed: download it and delete the old one after - # updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) - # if updated_stream: - # log.info("already have stream for %s", uri) - # if save_file and updated_stream.output_file_exists: - # save_file = False - # await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) - # if not updated_stream.output_file_exists and (save_file or file_name or download_directory): - # await updated_stream.save_file( - # file_name=file_name, download_directory=download_directory, node=self.node - # ) - # return updated_stream - # - # if not to_replace and txo.has_price and not txo.purchase_receipt: - # payment = await manager.create_purchase_transaction( - # wallet.accounts, txo, exchange_rate_manager - # ) - # - # stream = ManagedStream( - # self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, - # file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, - # analytics_manager=self.analytics_manager - # ) - # log.info("starting download for %s", uri) - # - # before_download = self.loop.time() - # await stream.start(self.node, timeout) - # stream.set_claim(resolved, claim) - # if to_replace: # delete old stream now that the replacement has started downloading - # await self.delete(to_replace) - # - # if payment is not None: - # await manager.broadcast_or_release(payment) - # payment = None # to avoid releasing in `finally` later - # log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) - # await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) - # - # self._sources[stream.sd_hash] = stream - # self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) - # await self.storage.save_content_claim(stream.stream_hash, outpoint) - # if save_file: - # await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download), - # loop=self.loop) - # return stream - # except asyncio.TimeoutError: - # error = DownloadDataTimeoutError(stream.sd_hash) - # raise error - # except Exception as err: # forgive data timeout, don't delete stream - # expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, - # KeyFeeAboveMaxAllowedError) - # if isinstance(err, expected): - # log.warning("Failed to download %s: %s", uri, str(err)) - # elif isinstance(err, asyncio.CancelledError): - # pass - # else: - # log.exception("Unexpected error downloading stream:") - # error = err - # raise - # finally: - # if payment is not None: - # # payment is set to None after broadcasting, if we're here an exception probably happened - # await manager.ledger.release_tx(payment) - # if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or - # stream.downloader.time_to_first_bytes))): - # server = self.wallet_manager.ledger.network.client.server - # self.loop.create_task( - # self.analytics_manager.send_time_to_first_bytes( - # resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id, - # uri, outpoint, - # None if not stream else len(stream.downloader.blob_downloader.active_connections), - # None if not stream else len(stream.downloader.blob_downloader.scores), - # None if not stream else len(stream.downloader.blob_downloader.connection_failures), - # False if not stream else stream.downloader.added_fixed_peers, - # self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay, - # None if not stream else stream.sd_hash, - # None if not stream else stream.downloader.time_to_descriptor, - # None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, - # None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, - # None if not stream else stream.downloader.time_to_first_bytes, - # None if not error else error.__class__.__name__, - # None if not error else str(error), - # None if not server else f"{server[0]}:{server[1]}" - # ) - # ) + async def delete(self, source: ManagedDownloadSource, delete_file=False): + for manager in self.source_managers.values(): + return await manager.delete(source, delete_file) diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 23aae8dbd..eeefbb283 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -139,7 +139,7 @@ class ManagedStream(ManagedDownloadSource): # return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), # os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) - async def start(self, node: Optional['Node'] = None, timeout: Optional[float] = None, + async def start(self, timeout: Optional[float] = None, save_now: bool = False): timeout = timeout or self.config.download_timeout if self._running.is_set(): @@ -147,7 +147,7 @@ class ManagedStream(ManagedDownloadSource): log.info("start downloader for stream (sd hash: %s)", self.sd_hash) self._running.set() try: - await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop) + await asyncio.wait_for(self.downloader.start(), timeout, loop=self.loop) except asyncio.TimeoutError: self._running.clear() raise DownloadSDTimeoutError(self.sd_hash) @@ -157,6 +157,11 @@ class ManagedStream(ManagedDownloadSource): self.delayed_stop_task = self.loop.create_task(self._delayed_stop()) if not await self.blob_manager.storage.file_exists(self.sd_hash): if save_now: + if not self._file_name: + self._file_name = await get_next_available_file_name( + self.loop, self.download_directory, + self._file_name or sanitize_file_name(self.descriptor.suggested_file_name) + ) file_name, download_dir = self._file_name, self.download_directory else: file_name, download_dir = None, None @@ -281,7 +286,7 @@ class ManagedStream(ManagedDownloadSource): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None, node: Optional['Node'] = None): - await self.start(node) + await self.start() if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task self.file_output_task.cancel() self.download_directory = download_directory or self.download_directory or self.config.download_dir diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 491b9998c..4bc83a87f 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -146,8 +146,8 @@ class StreamManager(SourceManager): log.info("no DHT node given, resuming downloads trusting that we can contact reflector") if to_resume_saving: log.info("Resuming saving %i files", len(to_resume_saving)) - self.resume_saving_task = self.loop.create_task(asyncio.gather( - *(self._sources[sd_hash].save_file(file_name, download_directory, node=self.node) + self.resume_saving_task = asyncio.ensure_future(asyncio.gather( + *(self._sources[sd_hash].save_file(file_name, download_directory) for (file_name, download_directory, sd_hash) in to_resume_saving), loop=self.loop )) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 01e4cafa3..5214042e7 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -93,7 +93,8 @@ class TorrentSession: self._executor, libtorrent.session, settings ) await self._loop.run_in_executor( - self._executor, self._session.add_dht_router, "router.utorrent.com", 6881 + self._executor, + lambda: self._session.add_dht_router("router.utorrent.com", 6881) ) self._loop.create_task(self.process_alerts()) @@ -110,11 +111,11 @@ class TorrentSession: async def pause(self): state = await self._loop.run_in_executor( - self._executor, self._session.save_state + self._executor, lambda: self._session.save_state() ) # print(f"state:\n{state}") await self._loop.run_in_executor( - self._executor, self._session.pause + self._executor, lambda: self._session.pause() ) async def resume(self): diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 128415df4..5bd338aec 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -227,7 +227,7 @@ class FileCommands(CommandTestCase): await self.daemon.file_manager.start() await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed # check that internal state got through up to the file list API - stream = self.daemon.file_manager.get_stream_by_stream_hash(file_info['stream_hash']) + stream = self.daemon.file_manager.get_filtered(stream_hash=file_info['stream_hash'])[0] file_info = (await self.file_list())[0] self.assertEqual(stream.file_name, file_info['file_name']) # checks if what the API shows is what he have at the very internal level. diff --git a/tests/unit/stream/test_managed_stream.py b/tests/unit/stream/test_managed_stream.py index dbdfa5157..64e3e3ea2 100644 --- a/tests/unit/stream/test_managed_stream.py +++ b/tests/unit/stream/test_managed_stream.py @@ -76,7 +76,8 @@ class TestManagedStream(BlobExchangeTestBase): return q2, self.loop.create_task(_task()) mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers - await self.stream.save_file(node=mock_node) + self.stream.node = mock_node + await self.stream.save_file() await self.stream.finished_write_attempt.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) if stop_when_done: @@ -123,7 +124,8 @@ class TestManagedStream(BlobExchangeTestBase): mock_node.accumulate_peers = _mock_accumulate_peers - await self.stream.save_file(node=mock_node) + self.stream.node = mock_node + await self.stream.save_file() await self.stream.finished_writing.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) with open(self.stream.full_path, 'rb') as f: