diff --git a/lbrynet/blob/blob_info.py b/lbrynet/blob/blob_info.py index d300fd905..febfdcb65 100644 --- a/lbrynet/blob/blob_info.py +++ b/lbrynet/blob/blob_info.py @@ -2,6 +2,13 @@ import typing class BlobInfo: + __slots__ = [ + 'blob_hash', + 'blob_num', + 'length', + 'iv', + ] + def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None): self.blob_hash = blob_hash self.blob_num = blob_num diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 67ca75cae..65af4f4cc 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -4,6 +4,7 @@ import typing import binascii from lbrynet.error import InvalidBlobHashError, InvalidDataError from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest +from lbrynet.utils import cache_concurrent if typing.TYPE_CHECKING: from lbrynet.blob.blob_file import AbstractBlob from lbrynet.blob.writer import HashBlobWriter @@ -158,8 +159,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return await self._download_blob() except OSError as e: # i'm not sure how to fix this race condition - jack - log.exception("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) - return self._blob_bytes_received, self.transport + log.warning("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) + # return self._blob_bytes_received, self.transport + raise except asyncio.TimeoutError: if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() @@ -184,9 +186,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.close() +@cache_concurrent async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, - connected_transport: asyncio.Transport = None)\ + connected_transport: asyncio.Transport = None, connection_id: int = 0)\ -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: """ Returns [, ] diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index b8b854a49..1805966ec 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -27,13 +27,14 @@ class BlobDownloader: self.scores: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} + self.is_running = asyncio.Event(loop=self.loop) def should_race_continue(self, blob: 'AbstractBlob'): if len(self.active_connections) >= self.config.max_connections_per_download: return False return not (blob.get_is_verified() or not blob.is_writeable()) - async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer'): + async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): if blob.get_is_verified(): return self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones @@ -41,7 +42,7 @@ class BlobDownloader: start = self.loop.time() bytes_received, transport = await request_blob( self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout, connected_transport=transport + self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id ) if not transport and peer not in self.ignored: self.ignored[peer] = self.loop.time() @@ -74,12 +75,14 @@ class BlobDownloader: )) @cache_concurrent - async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'AbstractBlob': + async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None, + connection_id: int = 0) -> 'AbstractBlob': blob = self.blob_manager.get_blob(blob_hash, length) if blob.get_is_verified(): return blob + self.is_running.set() try: - while not blob.get_is_verified(): + while not blob.get_is_verified() and self.is_running.is_set(): batch: typing.Set['KademliaPeer'] = set() while not self.peer_queue.empty(): batch.update(self.peer_queue.get_nowait()) @@ -94,7 +97,7 @@ class BlobDownloader: break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active() @@ -106,6 +109,7 @@ class BlobDownloader: def close(self): self.scores.clear() self.ignored.clear() + self.is_running.clear() for transport in self.connections.values(): transport.close() diff --git a/lbrynet/dht/peer.py b/lbrynet/dht/peer.py index 9c8176eb5..23bc29635 100644 --- a/lbrynet/dht/peer.py +++ b/lbrynet/dht/peer.py @@ -135,6 +135,15 @@ class PeerManager: class KademliaPeer: + __slots__ = [ + 'loop', + '_node_id', + 'address', + 'udp_port', + 'tcp_port', + 'protocol_version', + ] + def __init__(self, loop: asyncio.BaseEventLoop, address: str, node_id: typing.Optional[bytes] = None, udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None): if node_id is not None: diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 59e1b72e5..faafe7304 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -18,7 +18,7 @@ from torba.client.baseaccount import SingleKey, HierarchicalDeterministic from lbrynet import utils from lbrynet.conf import Config, Setting -from lbrynet.blob.blob_file import is_valid_blobhash +from lbrynet.blob.blob_file import is_valid_blobhash, BlobBuffer from lbrynet.blob_exchange.downloader import download_blob from lbrynet.error import DownloadSDTimeout, ComponentsNotStarted from lbrynet.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet @@ -433,15 +433,20 @@ class Daemon(metaclass=JSONRPCServerType): self.component_startup_task = asyncio.create_task(self.component_manager.start()) await self.component_startup_task - async def stop(self): + async def stop(self, shutdown_runner=True): if self.component_startup_task is not None: if self.component_startup_task.done(): await self.component_manager.stop() else: self.component_startup_task.cancel() + log.info("stopped api components") + if shutdown_runner: + await self.runner.shutdown() await self.runner.cleanup() + log.info("stopped api server") if self.analytics_manager.is_started: self.analytics_manager.stop() + log.info("finished shutting down") async def handle_old_jsonrpc(self, request): data = await request.json() @@ -472,64 +477,20 @@ class Daemon(metaclass=JSONRPCServerType): else: name, claim_id = name_and_claim_id.split("/") uri = f"lbry://{name}#{claim_id}" + if not self.stream_manager.started.is_set(): + await self.stream_manager.started.wait() stream = await self.jsonrpc_get(uri) if isinstance(stream, dict): raise web.HTTPServerError(text=stream['error']) raise web.HTTPFound(f"/stream/{stream.sd_hash}") - @staticmethod - def prepare_range_response_headers(get_range: str, stream: 'ManagedStream') -> typing.Tuple[typing.Dict[str, str], - int, int]: - if '=' in get_range: - get_range = get_range.split('=')[1] - start, end = get_range.split('-') - size = 0 - for blob in stream.descriptor.blobs[:-1]: - size += blob.length - 1 - start = int(start) - end = int(end) if end else size - 1 - skip_blobs = start // 2097150 - skip = skip_blobs * 2097151 - start = skip - final_size = end - start + 1 - - headers = { - 'Accept-Ranges': 'bytes', - 'Content-Range': f'bytes {start}-{end}/{size}', - 'Content-Length': str(final_size), - 'Content-Type': stream.mime_type - } - return headers, size, skip_blobs - async def handle_stream_range_request(self, request: web.Request): sd_hash = request.path.split("/stream/")[1] + if not self.stream_manager.started.is_set(): + await self.stream_manager.started.wait() if sd_hash not in self.stream_manager.streams: return web.HTTPNotFound() - stream = self.stream_manager.streams[sd_hash] - if stream.status == 'stopped': - await self.stream_manager.start_stream(stream) - if stream.delayed_stop: - stream.delayed_stop.cancel() - headers, size, skip_blobs = self.prepare_range_response_headers( - request.headers.get('range', 'bytes=0-'), stream - ) - response = web.StreamResponse( - status=206, - headers=headers - ) - await response.prepare(request) - wrote = 0 - async for blob_info, decrypted in stream.aiter_read_stream(skip_blobs): - log.info("streamed blob %i/%i", blob_info.blob_num + 1, len(stream.descriptor.blobs) - 1) - if (blob_info.blob_num == len(stream.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): - decrypted += b'\x00' * (size - len(decrypted) - wrote) - await response.write_eof(decrypted) - break - else: - await response.write(decrypted) - wrote += len(decrypted) - response.force_close() - return response + return await self.stream_manager.stream_partial_content(request, sd_hash) async def _process_rpc_call(self, data): args = data.get('params', {}) @@ -907,27 +868,30 @@ class Daemon(metaclass=JSONRPCServerType): @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, STREAM_MANAGER_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - async def jsonrpc_get(self, uri, file_name=None, timeout=None, save_file=None): + async def jsonrpc_get(self, uri, file_name=None, download_directory=None, timeout=None, save_file=None): """ Download stream from a LBRY name. Usage: - get [ | --file_name=] [ | --timeout=] - [--save_file=] + get [ | --file_name=] + [ | --download_directory=] [ | --timeout=] + [--save_file=] Options: --uri= : (str) uri of the content to download --file_name= : (str) specified name for the downloaded file, overrides the stream file name + --download_directory= : (str) full path to the directory to download into --timeout= : (int) download timeout in number of seconds --save_file= : (bool) save the file to the downloads directory Returns: {File} """ - save_file = save_file if save_file is not None else self.conf.save_files + if download_directory and not os.path.isdir(download_directory): + return {"error": f"specified download directory \"{download_directory}\" does not exist"} try: stream = await self.stream_manager.download_stream_from_uri( - uri, self.exchange_rate_manager, timeout, file_name, save_file=save_file + uri, self.exchange_rate_manager, timeout, file_name, download_directory, save_file=save_file ) if not stream: raise DownloadSDTimeout(uri) @@ -1551,10 +1515,10 @@ class Daemon(metaclass=JSONRPCServerType): raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] if status == 'start' and not stream.running: - await self.stream_manager.start_stream(stream) + await stream.save_file(node=self.stream_manager.node) msg = "Resumed download" elif status == 'stop' and stream.running: - await self.stream_manager.stop_stream(stream) + await stream.stop() msg = "Stopped download" else: msg = ( @@ -2927,10 +2891,12 @@ class Daemon(metaclass=JSONRPCServerType): blob = await download_blob(asyncio.get_event_loop(), self.conf, self.blob_manager, self.dht_node, blob_hash) if read: - with open(blob.file_path, 'rb') as handle: + with blob.reader_context() as handle: return handle.read().decode() - else: - return "Downloaded blob %s" % blob_hash + elif isinstance(blob, BlobBuffer): + log.warning("manually downloaded blob buffer could have missed garbage collection, clearing it") + blob.delete() + return "Downloaded blob %s" % blob_hash @requires(BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_blob_delete(self, blob_hash): diff --git a/lbrynet/stream/descriptor.py b/lbrynet/stream/descriptor.py index a2f4d1da2..2b0527d35 100644 --- a/lbrynet/stream/descriptor.py +++ b/lbrynet/stream/descriptor.py @@ -47,6 +47,17 @@ def file_reader(file_path: str): class StreamDescriptor: + __slots__ = [ + 'loop', + 'blob_dir', + 'stream_name', + 'key', + 'suggested_file_name', + 'blobs', + 'stream_hash', + 'sd_hash' + ] + def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, stream_name: str, key: str, suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None, sd_hash: typing.Optional[str] = None): diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 8c0653d4d..b152f6e57 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -58,14 +58,14 @@ class StreamDownloader: self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers) - async def load_descriptor(self): + async def load_descriptor(self, connection_id: int = 0): # download or get the sd blob sd_blob = self.blob_manager.get_blob(self.sd_hash) if not sd_blob.get_is_verified(): try: now = self.loop.time() sd_blob = await asyncio.wait_for( - self.blob_downloader.download_blob(self.sd_hash), + self.blob_downloader.download_blob(self.sd_hash, connection_id), self.config.blob_download_timeout, loop=self.loop ) log.info("downloaded sd blob %s", self.sd_hash) @@ -79,7 +79,7 @@ class StreamDownloader: ) log.info("loaded stream manifest %s", self.sd_hash) - async def start(self, node: typing.Optional['Node'] = None): + async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0): # set up peer accumulation if node: self.node = node @@ -90,7 +90,7 @@ class StreamDownloader: log.info("searching for peers for stream %s", self.sd_hash) if not self.descriptor: - await self.load_descriptor() + await self.load_descriptor(connection_id) # add the head blob to the peer search self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash) @@ -101,10 +101,10 @@ class StreamDownloader: self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor ) - async def download_stream_blob(self, blob_info: 'BlobInfo') -> 'AbstractBlob': + async def download_stream_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> 'AbstractBlob': if not filter(lambda blob: blob.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]): raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}") - blob = await self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length) + blob = await self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id) return blob def decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob') -> bytes: @@ -112,11 +112,11 @@ class StreamDownloader: binascii.unhexlify(self.descriptor.key.encode()), binascii.unhexlify(blob_info.iv.encode()) ) - async def read_blob(self, blob_info: 'BlobInfo') -> bytes: + async def read_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> bytes: start = None if self.time_to_first_bytes is None: start = self.loop.time() - blob = await self.download_stream_blob(blob_info) + blob = await self.download_stream_blob(blob_info, connection_id) decrypted = self.decrypt_blob(blob_info, blob) if start: self.time_to_first_bytes = self.loop.time() - start diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 1f86e5638..511b42738 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -3,7 +3,9 @@ import asyncio import typing import logging import binascii +from aiohttp.web import Request, StreamResponse from lbrynet.utils import generate_id +from lbrynet.error import DownloadSDTimeout from lbrynet.schema.mime_types import guess_media_type from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.descriptor import StreamDescriptor @@ -40,6 +42,33 @@ class ManagedStream: STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" + __slots__ = [ + 'loop', + 'config', + 'blob_manager', + 'sd_hash', + 'download_directory', + '_file_name', + '_status', + 'stream_claim_info', + 'download_id', + 'rowid', + 'written_bytes', + 'content_fee', + 'downloader', + 'analytics_manager', + 'fully_reflected', + 'file_output_task', + 'delayed_stop_task', + 'streaming_responses', + 'streaming', + '_running', + 'saving', + 'finished_writing', + 'started_writing', + + ] + def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, @@ -61,9 +90,13 @@ class ManagedStream: self.content_fee = content_fee self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager + self.fully_reflected = asyncio.Event(loop=self.loop) self.file_output_task: typing.Optional[asyncio.Task] = None - self.delayed_stop: typing.Optional[asyncio.Handle] = None + self.delayed_stop_task: typing.Optional[asyncio.Task] = None + self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] + self.streaming = asyncio.Event(loop=self.loop) + self._running = asyncio.Event(loop=self.loop) self.saving = asyncio.Event(loop=self.loop) self.finished_writing = asyncio.Event(loop=self.loop) self.started_writing = asyncio.Event(loop=self.loop) @@ -84,9 +117,10 @@ class ManagedStream: def status(self) -> str: return self._status - def update_status(self, status: str): + async def update_status(self, status: str): assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] self._status = status + await self.blob_manager.storage.change_file_status(self.stream_hash, status) @property def finished(self) -> bool: @@ -164,14 +198,12 @@ class ManagedStream: return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] def as_dict(self) -> typing.Dict: - if self.written_bytes: - written_bytes = self.written_bytes - elif self.output_file_exists: + if not self.written_bytes and self.output_file_exists: written_bytes = os.stat(self.full_path).st_size else: - written_bytes = None + written_bytes = self.written_bytes return { - 'completed': self.finished, + 'completed': self.output_file_exists and self.status in ('stopped', 'finished'), 'file_name': self.file_name, 'download_directory': self.download_directory, 'points_paid': 0.0, @@ -218,72 +250,110 @@ class ManagedStream: return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) - async def setup(self, node: typing.Optional['Node'] = None, save_file: typing.Optional[bool] = True, - file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None): - await self.downloader.start(node) - if not save_file and not file_name: - if not await self.blob_manager.storage.file_exists(self.sd_hash): - self.rowid = await self.blob_manager.storage.save_downloaded_file( - self.stream_hash, None, None, 0.0 - ) - self.download_directory = None - self._file_name = None - self.update_status(ManagedStream.STATUS_RUNNING) - await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) - self.update_delayed_stop() - else: - await self.save_file(file_name, download_directory) - await self.started_writing.wait() + async def start(self, node: typing.Optional['Node'] = None, timeout: typing.Optional[float] = None, + save_now: bool = False): + timeout = timeout or self.config.download_timeout + if self._running.is_set(): + return + log.info("start downloader for stream (sd hash: %s)", self.sd_hash) + self._running.set() + try: + await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop) + except asyncio.TimeoutError: + self._running.clear() + raise DownloadSDTimeout(self.sd_hash) - def update_delayed_stop(self): - def _delayed_stop(): - log.info("Stopping inactive download for stream %s", self.sd_hash) - self.stop_download() + if self.delayed_stop_task and not self.delayed_stop_task.done(): + self.delayed_stop_task.cancel() + self.delayed_stop_task = self.loop.create_task(self._delayed_stop()) + if not await self.blob_manager.storage.file_exists(self.sd_hash): + if save_now: + file_name, download_dir = self._file_name, self.download_directory + else: + file_name, download_dir = None, None + self.rowid = await self.blob_manager.storage.save_downloaded_file( + self.stream_hash, file_name, download_dir, 0.0 + ) + if self.status != self.STATUS_RUNNING: + await self.update_status(self.STATUS_RUNNING) - if self.delayed_stop: - self.delayed_stop.cancel() - self.delayed_stop = self.loop.call_later(60, _delayed_stop) + async def stop(self, finished: bool = False): + """ + Stop any running save/stream tasks as well as the downloader and update the status in the database + """ - async def aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0) -> typing.AsyncIterator[ - typing.Tuple['BlobInfo', bytes]]: + self.stop_tasks() + if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: + await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) + + async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, connection_id: int = 0)\ + -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]: if start_blob_num >= len(self.descriptor.blobs[:-1]): raise IndexError(start_blob_num) for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]): assert i + start_blob_num == blob_info.blob_num - if self.delayed_stop: - self.delayed_stop.cancel() - try: - decrypted = await self.downloader.read_blob(blob_info) - yield (blob_info, decrypted) - except asyncio.CancelledError: - if not self.saving.is_set() and not self.finished_writing.is_set(): - self.update_delayed_stop() - raise + decrypted = await self.downloader.read_blob(blob_info, connection_id) + yield (blob_info, decrypted) + + async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: + log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, + self.sd_hash[:6]) + await self.start(node) + headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-')) + response = StreamResponse( + status=206, + headers=headers + ) + await response.prepare(request) + self.streaming_responses.append((request, response)) + self.streaming.set() + try: + wrote = 0 + async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2): + if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): + decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151))) + await response.write_eof(decrypted) + else: + await response.write(decrypted) + wrote += len(decrypted) + log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "", + blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) + if response._eof_sent: + break + return response + finally: + response.force_close() + if (request, response) in self.streaming_responses: + self.streaming_responses.remove((request, response)) + if not self.streaming_responses: + self.streaming.clear() async def _save_file(self, output_path: str): - log.debug("save file %s -> %s", self.sd_hash, output_path) + log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6], + output_path) self.saving.set() self.finished_writing.clear() self.started_writing.clear() try: with open(output_path, 'wb') as file_write_handle: - async for blob_info, decrypted in self.aiter_read_stream(): + async for blob_info, decrypted in self._aiter_read_stream(connection_id=1): log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) file_write_handle.write(decrypted) file_write_handle.flush() self.written_bytes += len(decrypted) if not self.started_writing.is_set(): self.started_writing.set() - self.update_status(ManagedStream.STATUS_FINISHED) - await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_FINISHED) + await self.update_status(ManagedStream.STATUS_FINISHED) if self.analytics_manager: self.loop.create_task(self.analytics_manager.send_download_finished( self.download_id, self.claim_name, self.sd_hash )) self.finished_writing.set() + log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, + self.sd_hash[:6], self.full_path) except Exception as err: if os.path.isfile(output_path): - log.info("removing incomplete download %s for %s", output_path, self.sd_hash) + log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) os.remove(output_path) if not isinstance(err, asyncio.CancelledError): log.exception("unexpected error encountered writing file for stream %s", self.sd_hash) @@ -291,12 +361,11 @@ class ManagedStream: finally: self.saving.clear() - async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None): - if self.file_output_task and not self.file_output_task.done(): + async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, + node: typing.Optional['Node'] = None): + await self.start(node) + if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task self.file_output_task.cancel() - if self.delayed_stop: - self.delayed_stop.cancel() - self.delayed_stop = None self.download_directory = download_directory or self.download_directory or self.config.download_dir if not self.download_directory: raise ValueError("no directory to download to") @@ -305,28 +374,28 @@ class ManagedStream: if not os.path.isdir(self.download_directory): log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory) os.mkdir(self.download_directory) - if not await self.blob_manager.storage.file_exists(self.sd_hash): - self._file_name = await get_next_available_file_name( - self.loop, self.download_directory, - file_name or self._file_name or self.descriptor.suggested_file_name - ) - self.rowid = self.blob_manager.storage.save_downloaded_file( - self.stream_hash, self.file_name, self.download_directory, 0.0 - ) - else: - await self.blob_manager.storage.change_file_download_dir_and_file_name( - self.stream_hash, self.download_directory, self.file_name - ) - self.update_status(ManagedStream.STATUS_RUNNING) - await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) + self._file_name = await get_next_available_file_name( + self.loop, self.download_directory, + file_name or self._file_name or self.descriptor.suggested_file_name + ) + await self.blob_manager.storage.change_file_download_dir_and_file_name( + self.stream_hash, self.download_directory, self.file_name + ) + await self.update_status(ManagedStream.STATUS_RUNNING) self.written_bytes = 0 self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) + await self.started_writing.wait() - def stop_download(self): + def stop_tasks(self): if self.file_output_task and not self.file_output_task.done(): self.file_output_task.cancel() self.file_output_task = None + while self.streaming_responses: + req, response = self.streaming_responses.pop() + response.force_close() + req.transport.close() self.downloader.stop() + self._running.clear() async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]: sent = [] @@ -367,3 +436,44 @@ class ManagedStream: binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], claim_info['claim_sequence'], claim_info.get('channel_name') ) + + async def update_content_claim(self, claim_info: typing.Optional[typing.Dict] = None): + if not claim_info: + claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) + self.set_claim(claim_info, claim_info['value']) + + async def _delayed_stop(self): + stalled_count = 0 + while self._running.is_set(): + if self.saving.is_set() or self.streaming.is_set(): + stalled_count = 0 + else: + stalled_count += 1 + if stalled_count > 1: + log.info("stopping inactive download for lbry://%s#%s (%s...)", self.claim_name, self.claim_id, + self.sd_hash[:6]) + await self.stop() + return + await asyncio.sleep(1, loop=self.loop) + + def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: + if '=' in get_range: + get_range = get_range.split('=')[1] + start, end = get_range.split('-') + size = 0 + for blob in self.descriptor.blobs[:-1]: + size += blob.length - 1 + start = int(start) + end = int(end) if end else size - 1 + skip_blobs = start // 2097150 + skip = skip_blobs * 2097151 + start = skip + final_size = end - start + 1 + + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start}-{end}/{size}', + 'Content-Length': str(final_size), + 'Content-Type': self.mime_type + } + return headers, size, skip_blobs diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 6fe849df5..aa2f93610 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -5,8 +5,9 @@ import binascii import logging import random from decimal import Decimal +from aiohttp.web import Request from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError -from lbrynet.error import DownloadSDTimeout, DownloadDataTimeout, ResolveTimeout +from lbrynet.error import ResolveTimeout, DownloadDataTimeout from lbrynet.utils import cache_concurrent from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.managed_stream import ManagedStream @@ -56,6 +57,7 @@ comparison_operators = { def path_or_none(p) -> typing.Optional[str]: return None if p == '{stream}' else binascii.unhexlify(p).decode() + class StreamManager: def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'], @@ -72,29 +74,12 @@ class StreamManager: self.re_reflect_task: asyncio.Task = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.running_reflector_uploads: typing.List[asyncio.Task] = [] + self.started = asyncio.Event(loop=self.loop) async def _update_content_claim(self, stream: ManagedStream): claim_info = await self.storage.get_content_claim(stream.stream_hash) self.streams.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) - async def stop_stream(self, stream: ManagedStream): - stream.stop_download() - if not stream.finished and stream.output_file_exists: - try: - os.remove(stream.full_path) - except OSError as err: - log.warning("Failed to delete partial download %s from downloads directory: %s", stream.full_path, - str(err)) - if stream.running: - stream.update_status(ManagedStream.STATUS_STOPPED) - await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_STOPPED) - - async def start_stream(self, stream: ManagedStream): - stream.update_status(ManagedStream.STATUS_RUNNING) - await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_RUNNING) - await stream.setup(self.node, save_file=self.config.save_files) - self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) - async def recover_streams(self, file_infos: typing.List[typing.Dict]): to_restore = [] @@ -139,28 +124,19 @@ class StreamManager: async def load_streams_from_database(self): to_recover = [] to_start = [] - await self.storage.sync_files_to_blobs() + + # this will set streams marked as finished and are missing blobs as being stopped + # await self.storage.sync_files_to_blobs() for file_info in await self.storage.get_all_lbry_files(): + # if the sd blob is not verified, try to reconstruct it from the database + # this could either be because the blob files were deleted manually or save_blobs was not true when + # the stream was downloaded if not self.blob_manager.is_blob_verified(file_info['sd_hash']): to_recover.append(file_info) to_start.append(file_info) if to_recover: - # if self.blob_manager._save_blobs: - # log.info("Attempting to recover %i streams", len(to_recover)) await self.recover_streams(to_recover) - if not self.config.save_files: - to_set_as_streaming = [] - for file_info in to_start: - file_name = path_or_none(file_info['file_name']) - download_dir = path_or_none(file_info['download_directory']) - if file_name and download_dir and not os.path.isfile(os.path.join(file_name, download_dir)): - file_info['file_name'], file_info['download_directory'] = '{stream}', '{stream}' - to_set_as_streaming.append(file_info['stream_hash']) - - if to_set_as_streaming: - await self.storage.set_files_as_streaming(to_set_as_streaming) - log.info("Initializing %i files", len(to_start)) if to_start: await asyncio.gather(*[ @@ -176,8 +152,11 @@ class StreamManager: if not self.node: log.warning("no DHT node given, resuming downloads trusting that we can contact reflector") t = [ - self.loop.create_task(self.start_stream(stream)) for stream in self.streams.values() - if stream.running + self.loop.create_task( + stream.start(node=self.node, save_now=(stream.full_path is not None)) + if not stream.full_path else + stream.save_file(node=self.node) + ) for stream in self.streams.values() if stream.running ] if t: log.info("resuming %i downloads", len(t)) @@ -206,6 +185,7 @@ class StreamManager: await self.load_streams_from_database() self.resume_downloading_task = self.loop.create_task(self.resume()) self.re_reflect_task = self.loop.create_task(self.reflect_streams()) + self.started.set() def stop(self): if self.resume_downloading_task and not self.resume_downloading_task.done(): @@ -214,11 +194,13 @@ class StreamManager: self.re_reflect_task.cancel() while self.streams: _, stream = self.streams.popitem() - stream.stop_download() + stream.stop_tasks() while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: self.running_reflector_uploads.pop().cancel() + self.started.clear() + log.info("finished stopping the stream manager") async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: @@ -236,7 +218,7 @@ class StreamManager: return stream async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): - await self.stop_stream(stream) + stream.stop_tasks() if stream.sd_hash in self.streams: del self.streams[stream.sd_hash] blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] @@ -290,21 +272,16 @@ class StreamManager: typing.Optional[ManagedStream], typing.Optional[ManagedStream]]: existing = self.get_filtered_streams(outpoint=outpoint) if existing: - if existing[0].status == ManagedStream.STATUS_STOPPED: - await self.start_stream(existing[0]) return existing[0], None existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash) if existing and existing[0].claim_id != claim_id: - raise ResolveError(f"stream for {existing[0].claim_id} collides with existing " - f"download {claim_id}") + raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}") if existing: log.info("claim contains a metadata only update to a stream we have") await self.storage.save_content_claim( existing[0].stream_hash, outpoint ) await self._update_content_claim(existing[0]) - if not existing[0].running: - await self.start_stream(existing[0]) return existing[0], None else: existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id) @@ -318,13 +295,23 @@ class StreamManager: timeout: typing.Optional[float] = None, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, - save_file: bool = True, resolve_timeout: float = 3.0) -> ManagedStream: + save_file: typing.Optional[bool] = None, + resolve_timeout: float = 3.0) -> ManagedStream: timeout = timeout or self.config.download_timeout start_time = self.loop.time() resolved_time = None stream = None error = None outpoint = None + if save_file is None: + save_file = self.config.save_files + if file_name and not save_file: + save_file = True + if save_file: + download_directory = download_directory or self.config.download_dir + else: + download_directory = None + try: # resolve the claim parsed_uri = parse_lbry_uri(uri) @@ -351,9 +338,18 @@ class StreamManager: # resume or update an existing stream, if the stream changed download it and delete the old one after updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) if updated_stream: + log.info("already have stream for %s", uri) + if save_file and updated_stream.output_file_exists: + save_file = False + await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) + if not updated_stream.output_file_exists and (save_file or file_name or download_directory): + await updated_stream.save_file( + file_name=file_name, download_directory=download_directory, node=self.node + ) return updated_stream content_fee = None + fee_amount, fee_address = None, None # check that the fee is payable if not to_replace and claim.stream.has_fee: @@ -374,45 +370,37 @@ class StreamManager: raise InsufficientFundsError(msg) fee_address = claim.stream.fee.address - content_fee = await self.wallet.send_amount_to_address( - lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1') - ) - - log.info("paid fee of %s for %s", fee_amount, uri) - - download_directory = download_directory or self.config.download_dir - if not file_name and (not self.config.save_files or not save_file): - download_dir, file_name = None, None stream = ManagedStream( self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=content_fee, analytics_manager=self.analytics_manager ) log.info("starting download for %s", uri) - try: - await asyncio.wait_for(stream.setup( - self.node, save_file=save_file, file_name=file_name, download_directory=download_directory - ), timeout, loop=self.loop) - except asyncio.TimeoutError: - if not stream.descriptor: - raise DownloadSDTimeout(stream.sd_hash) - raise DownloadDataTimeout(stream.sd_hash) - finally: - if stream.descriptor: - if to_replace: # delete old stream now that the replacement has started downloading - await self.delete_stream(to_replace) - stream.set_claim(resolved, claim) - await self.storage.save_content_claim(stream.stream_hash, outpoint) - self.streams[stream.sd_hash] = stream + + before_download = self.loop.time() + await stream.start(self.node, timeout) + stream.set_claim(resolved, claim) + if to_replace: # delete old stream now that the replacement has started downloading + await self.delete_stream(to_replace) + elif fee_address: + stream.content_fee = await self.wallet.send_amount_to_address( + lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1') + ) + log.info("paid fee of %s for %s", fee_amount, uri) + + self.streams[stream.sd_hash] = stream + self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) + await self.storage.save_content_claim(stream.stream_hash, outpoint) + if save_file: + await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download), + loop=self.loop) return stream - except DownloadDataTimeout as err: # forgive data timeout, dont delete stream + except asyncio.TimeoutError: + error = DownloadDataTimeout(stream.sd_hash) + raise error + except Exception as err: # forgive data timeout, dont delete stream error = err raise - except Exception as err: - error = err - if stream and stream.descriptor: - await self.storage.delete_stream(stream.descriptor) - await self.blob_manager.delete_blob(stream.sd_hash) finally: if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or stream.downloader.time_to_first_bytes))): @@ -432,5 +420,6 @@ class StreamManager: None if not error else error.__class__.__name__ ) ) - if error: - raise error + + async def stream_partial_content(self, request: Request, sd_hash: str): + return await self.streams[sd_hash].stream_file(request, self.node) diff --git a/lbrynet/testcase.py b/lbrynet/testcase.py index 75854015e..29a487cf2 100644 --- a/lbrynet/testcase.py +++ b/lbrynet/testcase.py @@ -124,7 +124,7 @@ class CommandTestCase(IntegrationTestCase): async def asyncTearDown(self): await super().asyncTearDown() self.wallet_component._running = False - await self.daemon.stop() + await self.daemon.stop(shutdown_runner=False) async def confirm_tx(self, txid): """ Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """ diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index a6a4c97e6..b01cf54c0 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -29,7 +29,7 @@ class CLIIntegrationTest(AsyncioTestCase): await self.daemon.start() async def asyncTearDown(self): - await self.daemon.stop() + await self.daemon.stop(shutdown_runner=False) def test_cli_status_command_with_auth(self): actual_output = StringIO() diff --git a/tests/integration/test_streaming.py b/tests/integration/test_streaming.py index 1c4bdc8df..832e8fed2 100644 --- a/tests/integration/test_streaming.py +++ b/tests/integration/test_streaming.py @@ -340,9 +340,11 @@ class RangeRequests(CommandTestCase): self.assertTrue(os.path.isfile(path)) await self._restart_stream_manager() stream = self.daemon.jsonrpc_file_list()[0] - - self.assertIsNone(stream.full_path) + self.assertIsNotNone(stream.full_path) self.assertFalse(os.path.isfile(path)) + if wait_for_start_writing: + await stream.started_writing.wait() + self.assertTrue(os.path.isfile(path)) async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self): return await self.test_file_save_stop_before_finished_streaming_only(wait_for_start_writing=True) diff --git a/tests/integration/test_sync.py b/tests/integration/test_sync.py index 829b47d27..f59dd72bd 100644 --- a/tests/integration/test_sync.py +++ b/tests/integration/test_sync.py @@ -54,7 +54,7 @@ class AccountSynchronization(AsyncioTestCase): async def asyncTearDown(self): self.wallet_component._running = False - await self.daemon.stop() + await self.daemon.stop(shutdown_runner=False) @mock.patch('time.time', mock.Mock(return_value=12345)) def test_sync(self): diff --git a/tests/unit/stream/test_managed_stream.py b/tests/unit/stream/test_managed_stream.py index 372e5766d..0db4af1fe 100644 --- a/tests/unit/stream/test_managed_stream.py +++ b/tests/unit/stream/test_managed_stream.py @@ -40,7 +40,7 @@ class TestManagedStream(BlobExchangeTestBase): self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir ) - async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None): + async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True): await self.setup_stream(blob_count) mock_node = mock.Mock(spec=Node) @@ -51,10 +51,11 @@ class TestManagedStream(BlobExchangeTestBase): return q2, self.loop.create_task(_task()) mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers - await self.stream.setup(mock_node, save_file=True) + await self.stream.save_file(node=mock_node) await self.stream.finished_writing.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) - self.stream.stop_download() + if stop_when_done: + await self.stream.stop() self.assertTrue(os.path.isfile(self.stream.full_path)) with open(self.stream.full_path, 'rb') as f: self.assertEqual(f.read(), self.stream_bytes) @@ -62,6 +63,18 @@ class TestManagedStream(BlobExchangeTestBase): async def test_transfer_stream(self): await self._test_transfer_stream(10) + self.assertEqual(self.stream.status, "finished") + self.assertFalse(self.stream._running.is_set()) + + async def test_delayed_stop(self): + await self._test_transfer_stream(10, stop_when_done=False) + self.assertEqual(self.stream.status, "finished") + self.assertTrue(self.stream._running.is_set()) + await asyncio.sleep(0.5, loop=self.loop) + self.assertTrue(self.stream._running.is_set()) + await asyncio.sleep(0.6, loop=self.loop) + self.assertEqual(self.stream.status, "finished") + self.assertFalse(self.stream._running.is_set()) @unittest.SkipTest async def test_transfer_hundred_blob_stream(self): @@ -85,11 +98,12 @@ class TestManagedStream(BlobExchangeTestBase): mock_node.accumulate_peers = _mock_accumulate_peers - await self.stream.setup(mock_node, save_file=True) + await self.stream.save_file(node=mock_node) await self.stream.finished_writing.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) with open(self.stream.full_path, 'rb') as f: self.assertEqual(f.read(), self.stream_bytes) + await self.stream.stop() # self.assertIs(self.server_from_client.tcp_last_down, None) # self.assertIsNot(bad_peer.tcp_last_down, None) @@ -125,7 +139,7 @@ class TestManagedStream(BlobExchangeTestBase): with open(os.path.join(self.client_blob_manager.blob_dir, blob_info.blob_hash), "rb+") as handle: handle.truncate() handle.flush() - await self.stream.setup() + await self.stream.save_file() await self.stream.finished_writing.wait() if corrupt: return self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file"))) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 761154b42..b6df1d237 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -225,7 +225,7 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "running") - await self.stream_manager.stop_stream(stream) + await stream.stop() self.assertFalse(stream.finished) self.assertFalse(stream.running) @@ -235,7 +235,7 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "stopped") - await self.stream_manager.start_stream(stream) + await stream.save_file(node=self.stream_manager.node) await stream.finished_writing.wait() await asyncio.sleep(0, loop=self.loop) self.assertTrue(stream.finished) @@ -337,7 +337,7 @@ class TestStreamManager(BlobExchangeTestBase): for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]: blob_status = await self.client_storage.get_blob_status(blob_hash) self.assertEqual('pending', blob_status) - self.assertEqual('stopped', self.stream_manager.streams[self.sd_hash].status) + self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status) sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) self.assertTrue(sd_blob.file_exists)