From 4e32b69d1d168863881ba3c26fef90eea061e740 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 May 2019 14:54:09 -0400 Subject: [PATCH] don't set running streams as stopped on startup --- lbrynet/stream/stream_manager.py | 39 ++++++++++---------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index e1f00261b..e19f6c12e 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -7,7 +7,7 @@ import random from decimal import Decimal from aiohttp.web import Request from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError -from lbrynet.error import ResolveTimeout, DownloadDataTimeout +from lbrynet.error import ResolveTimeout, DownloadDataTimeout, DownloadSDTimeout from lbrynet.utils import cache_concurrent from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.managed_stream import ManagedStream @@ -124,29 +124,19 @@ class StreamManager: async def load_streams_from_database(self): to_recover = [] to_start = [] - await self.storage.sync_files_to_blobs() + + # this will set streams marked as finished and are missing blobs as being stopped + # await self.storage.sync_files_to_blobs() for file_info in await self.storage.get_all_lbry_files(): + # if the sd blob is not verified, try to reconstruct it from the database + # this could either be because the blob files were deleted manually or save_blobs was not true when + # the stream was downloaded if not self.blob_manager.is_blob_verified(file_info['sd_hash']): to_recover.append(file_info) to_start.append(file_info) if to_recover: - # if self.blob_manager._save_blobs: - # log.info("Attempting to recover %i streams", len(to_recover)) await self.recover_streams(to_recover) - if not self.config.save_files: - # set files that have been deleted manually to streaming mode - to_set_as_streaming = [] - for file_info in to_start: - file_name = path_or_none(file_info['file_name']) - download_dir = path_or_none(file_info['download_directory']) - if file_name and download_dir and not os.path.isfile(os.path.join(file_name, download_dir)): - file_info['file_name'], file_info['download_directory'] = '{stream}', '{stream}' - to_set_as_streaming.append(file_info['stream_hash']) - - if to_set_as_streaming: - await self.storage.set_files_as_streaming(to_set_as_streaming) - log.info("Initializing %i files", len(to_start)) if to_start: await asyncio.gather(*[ @@ -162,8 +152,9 @@ class StreamManager: if not self.node: log.warning("no DHT node given, resuming downloads trusting that we can contact reflector") t = [ - self.loop.create_task(stream.start(node=self.node)) for stream in self.streams.values() - if stream.running + self.loop.create_task( + stream.start(node=self.node, save_now=(stream.full_path is not None)) + ) for stream in self.streams.values() if stream.running ] if t: log.info("resuming %i downloads", len(t)) @@ -207,6 +198,7 @@ class StreamManager: while self.running_reflector_uploads: self.running_reflector_uploads.pop().cancel() self.started.clear() + log.info("finished stopping the stream manager") async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: @@ -390,14 +382,9 @@ class StreamManager: stream.set_claim(resolved, claim) await self.storage.save_content_claim(stream.stream_hash, outpoint) return stream - except DownloadDataTimeout as err: # forgive data timeout, dont delete stream + except Exception as err: # forgive data timeout, dont delete stream error = err raise - except Exception as err: - error = err - if stream and stream.descriptor: - await self.storage.delete_stream(stream.descriptor) - await self.blob_manager.delete_blob(stream.sd_hash) finally: if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or stream.downloader.time_to_first_bytes))): @@ -417,8 +404,6 @@ class StreamManager: None if not error else error.__class__.__name__ ) ) - if error: - raise error async def stream_partial_content(self, request: Request, sd_hash: str): return await self.streams[sd_hash].stream_file(request, self.node)