don't set running streams as stopped on startup

This commit is contained in:
Jack Robison 2019-05-03 14:54:09 -04:00
parent 1116c7f29e
commit 4e32b69d1d
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -7,7 +7,7 @@ import random
from decimal import Decimal from decimal import Decimal
from aiohttp.web import Request from aiohttp.web import Request
from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError
from lbrynet.error import ResolveTimeout, DownloadDataTimeout from lbrynet.error import ResolveTimeout, DownloadDataTimeout, DownloadSDTimeout
from lbrynet.utils import cache_concurrent from lbrynet.utils import cache_concurrent
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.managed_stream import ManagedStream from lbrynet.stream.managed_stream import ManagedStream
@ -124,29 +124,19 @@ class StreamManager:
async def load_streams_from_database(self): async def load_streams_from_database(self):
to_recover = [] to_recover = []
to_start = [] to_start = []
await self.storage.sync_files_to_blobs()
# this will set streams marked as finished and are missing blobs as being stopped
# await self.storage.sync_files_to_blobs()
for file_info in await self.storage.get_all_lbry_files(): for file_info in await self.storage.get_all_lbry_files():
# if the sd blob is not verified, try to reconstruct it from the database
# this could either be because the blob files were deleted manually or save_blobs was not true when
# the stream was downloaded
if not self.blob_manager.is_blob_verified(file_info['sd_hash']): if not self.blob_manager.is_blob_verified(file_info['sd_hash']):
to_recover.append(file_info) to_recover.append(file_info)
to_start.append(file_info) to_start.append(file_info)
if to_recover: if to_recover:
# if self.blob_manager._save_blobs:
# log.info("Attempting to recover %i streams", len(to_recover))
await self.recover_streams(to_recover) await self.recover_streams(to_recover)
if not self.config.save_files:
# set files that have been deleted manually to streaming mode
to_set_as_streaming = []
for file_info in to_start:
file_name = path_or_none(file_info['file_name'])
download_dir = path_or_none(file_info['download_directory'])
if file_name and download_dir and not os.path.isfile(os.path.join(file_name, download_dir)):
file_info['file_name'], file_info['download_directory'] = '{stream}', '{stream}'
to_set_as_streaming.append(file_info['stream_hash'])
if to_set_as_streaming:
await self.storage.set_files_as_streaming(to_set_as_streaming)
log.info("Initializing %i files", len(to_start)) log.info("Initializing %i files", len(to_start))
if to_start: if to_start:
await asyncio.gather(*[ await asyncio.gather(*[
@ -162,8 +152,9 @@ class StreamManager:
if not self.node: if not self.node:
log.warning("no DHT node given, resuming downloads trusting that we can contact reflector") log.warning("no DHT node given, resuming downloads trusting that we can contact reflector")
t = [ t = [
self.loop.create_task(stream.start(node=self.node)) for stream in self.streams.values() self.loop.create_task(
if stream.running stream.start(node=self.node, save_now=(stream.full_path is not None))
) for stream in self.streams.values() if stream.running
] ]
if t: if t:
log.info("resuming %i downloads", len(t)) log.info("resuming %i downloads", len(t))
@ -207,6 +198,7 @@ class StreamManager:
while self.running_reflector_uploads: while self.running_reflector_uploads:
self.running_reflector_uploads.pop().cancel() self.running_reflector_uploads.pop().cancel()
self.started.clear() self.started.clear()
log.info("finished stopping the stream manager")
async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
@ -390,14 +382,9 @@ class StreamManager:
stream.set_claim(resolved, claim) stream.set_claim(resolved, claim)
await self.storage.save_content_claim(stream.stream_hash, outpoint) await self.storage.save_content_claim(stream.stream_hash, outpoint)
return stream return stream
except DownloadDataTimeout as err: # forgive data timeout, dont delete stream except Exception as err: # forgive data timeout, dont delete stream
error = err error = err
raise raise
except Exception as err:
error = err
if stream and stream.descriptor:
await self.storage.delete_stream(stream.descriptor)
await self.blob_manager.delete_blob(stream.sd_hash)
finally: finally:
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))): stream.downloader.time_to_first_bytes))):
@ -417,8 +404,6 @@ class StreamManager:
None if not error else error.__class__.__name__ None if not error else error.__class__.__name__
) )
) )
if error:
raise error
async def stream_partial_content(self, request: Request, sd_hash: str): async def stream_partial_content(self, request: Request, sd_hash: str):
return await self.streams[sd_hash].stream_file(request, self.node) return await self.streams[sd_hash].stream_file(request, self.node)