From 744375b2c0373395f7d295d7a8bf73f10b5a02ca Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 Feb 2019 15:46:31 -0500 Subject: [PATCH] re-assemble file / resume downloads --- lbrynet/extras/daemon/Daemon.py | 5 +++- lbrynet/extras/daemon/storage.py | 6 +++++ lbrynet/stream/managed_stream.py | 5 ++++ lbrynet/stream/stream_manager.py | 39 +++++++++++++++++++++++--------- 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index a7d4b167b..c0566765f 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1581,6 +1581,9 @@ class Daemon(metaclass=JSONRPCServerType): if existing: log.info("already have matching stream for %s", uri) stream = existing[0] + if not stream.running: + log.info("resuming download") + await self.stream_manager.start_stream(stream) else: stream = await self.stream_manager.download_stream_from_claim( self.dht_node, resolved, file_name, timeout, fee_amount, fee_address @@ -1618,7 +1621,7 @@ class Daemon(metaclass=JSONRPCServerType): raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] if status == 'start' and not stream.running and not stream.finished: - stream.downloader.download(self.dht_node) + await self.stream_manager.start_stream(stream) msg = "Resumed download" elif status == 'stop' and stream.running: stream.stop_download() diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 084b0c11d..650054e4a 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -447,6 +447,12 @@ class SQLiteStorage(SQLiteMixin): log.info("update file status %s -> %s", stream_hash, new_status) return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash)) + def change_file_download_dir(self, stream_hash: str, download_dir: str): + log.info("update file status %s -> %s", stream_hash, download_dir) + return self.db.execute("update file set download_directory=? where stream_hash=?", ( + binascii.hexlify(download_dir.encode()).decode(), stream_hash + )) + def get_all_stream_hashes(self): return self.run_and_return_list("select stream_hash from stream") diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index f5c4f957b..6ca488031 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -11,6 +11,7 @@ from lbrynet.extras.daemon.storage import StoredStreamClaim if typing.TYPE_CHECKING: from lbrynet.schema.claim import ClaimDict from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.dht.node import Node log = logging.getLogger(__name__) @@ -155,6 +156,10 @@ class ManagedStream: return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path), status=cls.STATUS_FINISHED) + def start_download(self, node: typing.Optional['Node']): + self.downloader.download(node) + self.update_status(self.STATUS_RUNNING) + def stop_download(self): if self.downloader: self.downloader.stop() diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 164b1cda6..b38a0dfc5 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -63,15 +63,35 @@ class StreamManager: claim_info = await self.storage.get_content_claim(stream.stream_hash) stream.set_claim(claim_info, smart_decode(claim_info['value'])) + async def start_stream(self, stream: ManagedStream): + path = os.path.join(stream.download_directory, stream.file_name) + + if not stream.running or not os.path.isfile(path): + if stream.downloader: + stream.downloader.stop() + stream.downloader = None + if not os.path.isfile(path) and not os.path.isfile( + os.path.join(self.config.download_dir, stream.file_name)): + await self.storage.change_file_download_dir(stream.stream_hash, self.config.download_dir) + stream.download_directory = self.config.download_dir + stream.downloader = self.make_downloader( + stream.sd_hash, stream.download_directory, stream.file_name + ) + stream.start_download(self.node) + await self.storage.change_file_status(stream.stream_hash, 'running') + stream.update_status('running') + self.wait_for_stream_finished(stream) + + def make_downloader(self, sd_hash: str, download_directory: str, file_name: str): + return StreamDownloader( + self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name + ) + async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim): sd_blob = self.blob_manager.get_blob(sd_hash) if sd_blob.get_is_verified(): descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) - downloader = StreamDownloader( - self.loop, self.config, self.blob_manager, descriptor.sd_hash, - download_directory, - file_name - ) + downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name) stream = ManagedStream( self.loop, self.blob_manager, descriptor, download_directory, @@ -96,13 +116,10 @@ class StreamManager: return await self.node.joined.wait() resumed = 0 - for stream in self.streams: - if stream.status == ManagedStream.STATUS_RUNNING: - resumed += 1 - stream.downloader.download(self.node) - self.wait_for_stream_finished(stream) + t = [self.start_stream(stream) for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING] if resumed: - log.info("resuming %i downloads", resumed) + log.info("resuming %i downloads", t) + await asyncio.gather(*t, loop=self.loop) async def reflect_streams(self): streams = list(self.streams)