From d035c138837fb7691ad5e330bc042858c0f1720e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 19 Feb 2019 21:44:31 -0300 Subject: [PATCH 1/2] file_path can be None, add helper to check if stream output file exists instead --- lbrynet/stream/managed_stream.py | 18 ++++++++++-------- lbrynet/stream/stream_manager.py | 9 +++------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 02d851af7..5e7404e08 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -22,7 +22,7 @@ class ManagedStream: STATUS_FINISHED = "finished" def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int, - descriptor: 'StreamDescriptor', download_directory: str, file_name: str, + descriptor: 'StreamDescriptor', download_directory: str, file_name: typing.Optional[str], downloader: typing.Optional[StreamDownloader] = None, status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None): self.loop = loop @@ -39,7 +39,7 @@ class ManagedStream: self.tx = None @property - def file_name(self): + def file_name(self) -> typing.Optional[str]: return self.downloader.output_file_name if self.downloader else self._file_name @property @@ -112,14 +112,16 @@ class ManagedStream: return self.blobs_in_stream - self.blobs_completed @property - def full_path(self) -> str: - return os.path.join(self.download_directory, os.path.basename(self.file_name)) + def full_path(self) -> typing.Optional[str]: + return os.path.join(self.download_directory, os.path.basename(self.file_name)) if self.file_name else None + + @property + def output_file_exists(self): + return os.path.isfile(self.full_path) if self.full_path else False def as_dict(self) -> typing.Dict: - full_path = self.full_path - if not os.path.isfile(full_path): - full_path = None - mime_type = guess_media_type(os.path.basename(self.file_name)) + full_path = self.full_path if self.output_file_exists else None + mime_type = guess_media_type(os.path.basename(self.descriptor.suggested_file_name)) if self.downloader and self.downloader.written_bytes: written_bytes = self.downloader.written_bytes diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index cca6cf078..ced6a303c 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -76,10 +76,7 @@ class StreamManager: """ Resume or rebuild a partial or completed stream """ - - path = os.path.join(stream.download_directory, stream.file_name) - - if not stream.running and not os.path.isfile(path): + if not stream.running and not stream.output_file_exists: if stream.downloader: stream.downloader.stop() stream.downloader = None @@ -117,7 +114,7 @@ class StreamManager: async def stop_stream(self, stream: ManagedStream): stream.stop_download() - if not stream.finished and os.path.isfile(stream.full_path): + if not stream.finished and stream.output_file_exists: try: os.remove(stream.full_path) except OSError as err: @@ -272,7 +269,7 @@ class StreamManager: blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.storage.delete_stream(stream.descriptor) - if delete_file and os.path.isfile(stream.full_path): + if delete_file and stream.output_file_exists: os.remove(stream.full_path) def wait_for_stream_finished(self, stream: ManagedStream): From 38ed212c01ef046151f5624102d6d9f35aacf7ab Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 19 Feb 2019 21:44:50 -0300 Subject: [PATCH 2/2] refactor, dont call storage from blob_manager + missing await --- lbrynet/stream/stream_manager.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index ced6a303c..436620fa4 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -302,18 +302,8 @@ class StreamManager: downloader.stop() log.info("stopped stream") raise DownloadSDTimeout(downloader.sd_hash) - file_name = os.path.basename(downloader.output_path) - download_directory = os.path.dirname(downloader.output_path) - if not await self.blob_manager.storage.stream_exists(downloader.sd_hash): - await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor) - if not await self.blob_manager.storage.file_exists(downloader.sd_hash): - rowid = await self.blob_manager.storage.save_downloaded_file( - downloader.descriptor.stream_hash, file_name, download_directory, - 0.0 - ) - else: - rowid = self.blob_manager.storage.rowid_for_stream(downloader.descriptor.stream_hash) - await self.blob_manager.storage.save_content_claim( + rowid = await self._store_stream(downloader) + await self.storage.save_content_claim( downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" ) stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, download_directory, @@ -330,6 +320,19 @@ class StreamManager: await self.stop_stream(stream) raise DownloadDataTimeout(downloader.sd_hash) + async def _store_stream(self, downloader: StreamDownloader) -> int: + file_name = os.path.basename(downloader.output_path) + download_directory = os.path.dirname(downloader.output_path) + if not await self.storage.stream_exists(downloader.sd_hash): + await self.storage.store_stream(downloader.sd_blob, downloader.descriptor) + if not await self.storage.file_exists(downloader.sd_hash): + return await self.storage.save_downloaded_file( + downloader.descriptor.stream_hash, file_name, download_directory, + 0.0 + ) + else: + return await self.storage.rowid_for_stream(downloader.descriptor.stream_hash) + async def download_stream_from_claim(self, node: 'Node', claim_info: typing.Dict, file_name: typing.Optional[str] = None, timeout: typing.Optional[float] = 60,