From c1c6d5bc9911913d1a6c2352a3d25b06906bf660 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Feb 2019 09:29:19 -0500 Subject: [PATCH] fix deleting partial downloads when stopped and previous streams when updating a publish --- lbrynet/blob/blob_manager.py | 38 ++++++++++++------------ lbrynet/extras/daemon/Daemon.py | 2 +- lbrynet/stream/assembler.py | 51 ++++++++++++++++++-------------- lbrynet/stream/downloader.py | 4 +++ lbrynet/stream/managed_stream.py | 13 ++++---- lbrynet/stream/stream_manager.py | 50 +++++++++++++++++++------------ 6 files changed, 90 insertions(+), 68 deletions(-) diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 44d8fa8b5..f079ccb8f 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -63,23 +63,23 @@ class BlobFileManager: blob_hashes = await self.storage.get_all_blob_hashes() return self.check_completed_blobs(blob_hashes) - async def delete_blobs(self, blob_hashes: typing.List[str]): - bh_to_delete_from_db = [] - for blob_hash in blob_hashes: - if not blob_hash: - continue - try: - blob = self.get_blob(blob_hash) - await blob.delete() - bh_to_delete_from_db.append(blob_hash) - except Exception as e: - log.warning("Failed to delete blob file. Reason: %s", e) - if blob_hash in self.completed_blob_hashes: - self.completed_blob_hashes.remove(blob_hash) - if blob_hash in self.blobs: - del self.blobs[blob_hash] + async def delete_blob(self, blob_hash: str): try: - await self.storage.delete_blobs_from_db(bh_to_delete_from_db) - except IntegrityError as err: - if str(err) != "FOREIGN KEY constraint failed": - raise err + blob = self.get_blob(blob_hash) + await blob.delete() + except Exception as e: + log.warning("Failed to delete blob file. Reason: %s", e) + if blob_hash in self.completed_blob_hashes: + self.completed_blob_hashes.remove(blob_hash) + if blob_hash in self.blobs: + del self.blobs[blob_hash] + + async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True): + bh_to_delete_from_db = [] + await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop) + if delete_from_db: + try: + await self.storage.delete_blobs_from_db(bh_to_delete_from_db) + except IntegrityError as err: + if str(err) != "FOREIGN KEY constraint failed": + raise err diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 7f19be3ee..f2f23625a 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1614,7 +1614,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.stream_manager.start_stream(stream) msg = "Resumed download" elif status == 'stop' and stream.running: - stream.stop_download() + await self.stream_manager.stop_stream(stream) msg = "Stopped download" else: msg = ( diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 6b325d248..ae53dcc9a 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -43,7 +43,7 @@ class StreamAssembler: self.written_bytes: int = 0 async def _decrypt_blob(self, blob: 'BlobFile', blob_info: 'BlobInfo', key: str): - if not blob or self.stream_handle.closed: + if not blob or not self.stream_handle or self.stream_handle.closed: return False def _decrypt_and_write(): @@ -86,28 +86,35 @@ class StreamAssembler: self.sd_blob, self.descriptor ) await self.blob_manager.blob_completed(self.sd_blob) - with open(self.output_path, 'wb') as stream_handle: - self.stream_handle = stream_handle - for i, blob_info in enumerate(self.descriptor.blobs[:-1]): - if blob_info.blob_num != i: - log.error("sd blob %s is invalid, cannot assemble stream", self.descriptor.sd_hash) - return - while not stream_handle.closed: - try: - blob = await self.get_blob(blob_info.blob_hash, blob_info.length) - if await self._decrypt_blob(blob, blob_info, self.descriptor.key): - await self.blob_manager.blob_completed(blob) - break - except FileNotFoundError: - log.debug("stream assembler stopped") + written_blobs = None + try: + with open(self.output_path, 'wb') as stream_handle: + self.stream_handle = stream_handle + for i, blob_info in enumerate(self.descriptor.blobs[:-1]): + if blob_info.blob_num != i: + log.error("sd blob %s is invalid, cannot assemble stream", self.descriptor.sd_hash) return - except (ValueError, IOError, OSError): - log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash, - self.descriptor.sd_hash) - continue - - self.stream_finished_event.set() - await self.after_finished() + while self.stream_handle and not self.stream_handle.closed: + try: + blob = await self.get_blob(blob_info.blob_hash, blob_info.length) + if await self._decrypt_blob(blob, blob_info, self.descriptor.key): + await self.blob_manager.blob_completed(blob) + written_blobs = i + break + except FileNotFoundError: + log.debug("stream assembler stopped") + return + except (ValueError, IOError, OSError): + log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash, + self.descriptor.sd_hash) + continue + finally: + if written_blobs == len(self.descriptor.blobs) - 1: + log.debug("finished decrypting and assembling stream") + self.stream_finished_event.set() + await self.after_finished() + else: + log.debug("stream decryption and assembly did not finish") async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': return self.blob_manager.get_blob(blob_hash, length) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 3faedefd1..e9142c2c4 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -63,6 +63,10 @@ class StreamDownloader(StreamAssembler): self.fixed_peers_handle.cancel() self.fixed_peers_handle = None self.blob_downloader = None + if self.stream_handle: + if not self.stream_handle.closed: + self.stream_handle.close() + self.stream_handle = None async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': return await self.blob_downloader.download_blob(blob_hash, length) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index e8965318a..ecfac63bd 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -104,8 +104,12 @@ class ManagedStream: def blobs_remaining(self) -> int: return self.blobs_in_stream - self.blobs_completed + @property + def full_path(self) -> str: + return os.path.join(self.download_directory, os.path.basename(self.file_name)) + def as_dict(self) -> typing.Dict: - full_path = os.path.join(self.download_directory, self.file_name) + full_path = self.full_path if not os.path.isfile(full_path): full_path = None mime_type = guess_media_type(os.path.basename(self.file_name)) @@ -170,12 +174,7 @@ class ManagedStream: def stop_download(self): if self.downloader: self.downloader.stop() - if not self.downloader.stream_finished_event.is_set() and self.downloader.wrote_bytes_event.is_set(): - path = os.path.join(self.download_directory, self.file_name) - if os.path.isfile(path): - os.remove(path) - if not self.finished: - self.update_status(self.STATUS_STOPPED) + self.downloader = None async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]: sent = [] diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 4adad9639..1b77d572c 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -4,7 +4,7 @@ import typing import binascii import logging import random -from lbrynet.error import ResolveError +from lbrynet.error import ResolveError, InvalidStreamDescriptorError from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.managed_stream import ManagedStream from lbrynet.schema.claim import ClaimDict @@ -97,8 +97,9 @@ class StreamManager: await asyncio.wait_for(self.loop.create_task(stream.downloader.got_descriptor.wait()), self.config.download_timeout) except asyncio.TimeoutError: - stream.stop_download() - stream.downloader = None + await self.stop_stream(stream) + if stream in self.streams: + self.streams.remove(stream) return False file_name = os.path.basename(stream.downloader.output_path) await self.storage.change_file_download_dir_and_file_name( @@ -108,6 +109,18 @@ class StreamManager: return True return True + async def stop_stream(self, stream: ManagedStream): + stream.stop_download() + if not stream.finished and os.path.isfile(stream.full_path): + try: + os.remove(stream.full_path) + except OSError as err: + log.warning("Failed to delete partial download %s from downloads directory: %s", stream.full_path, + str(err)) + if stream.running: + stream.update_status(ManagedStream.STATUS_STOPPED) + await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_STOPPED) + def make_downloader(self, sd_hash: str, download_directory: str, file_name: str): return StreamDownloader( self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name @@ -116,13 +129,15 @@ class StreamManager: async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim): sd_blob = self.blob_manager.get_blob(sd_hash) if sd_blob.get_is_verified(): - descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) + try: + descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) + except InvalidStreamDescriptorError as err: + log.warning("Failed to start stream for sd %s - %s", sd_hash, str(err)) + return + downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name) stream = ManagedStream( - self.loop, self.blob_manager, descriptor, - download_directory, - file_name, - downloader, status, claim + self.loop, self.blob_manager, descriptor, download_directory, file_name, downloader, status, claim ) self.streams.add(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) @@ -194,18 +209,14 @@ class StreamManager: return stream async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): - stream.stop_download() - self.streams.remove(stream) + await self.stop_stream(stream) + if stream in self.streams: + self.streams.remove(stream) + blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] + await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.storage.delete_stream(stream.descriptor) - - blob_hashes = [stream.sd_hash] - for blob_info in stream.descriptor.blobs[:-1]: - blob_hashes.append(blob_info.blob_hash) - await self.blob_manager.delete_blobs(blob_hashes) - if delete_file: - path = os.path.join(stream.download_directory, stream.file_name) - if os.path.isfile(path): - os.remove(path) + if delete_file and os.path.isfile(stream.full_path): + os.remove(stream.full_path) def wait_for_stream_finished(self, stream: ManagedStream): async def _wait_for_stream_finished(): @@ -213,6 +224,7 @@ class StreamManager: try: await stream.downloader.stream_finished_event.wait() stream.update_status(ManagedStream.STATUS_FINISHED) + await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_FINISHED) except asyncio.CancelledError: pass task = self.loop.create_task(_wait_for_stream_finished())