From b09e1f8825b1210b5bec07003938761114c6c735 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 28 Apr 2019 17:04:52 -0400 Subject: [PATCH 01/24] fix written_bytes never being 0 --- lbrynet/stream/managed_stream.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 1f86e5638..a0fdf4250 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -164,12 +164,10 @@ class ManagedStream: return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] def as_dict(self) -> typing.Dict: - if self.written_bytes: - written_bytes = self.written_bytes - elif self.output_file_exists: + if not self.written_bytes and self.output_file_exists: written_bytes = os.stat(self.full_path).st_size else: - written_bytes = None + written_bytes = self.written_bytes return { 'completed': self.finished, 'file_name': self.file_name, From 1f266ebdad42b9105508f4097e0e369590443d3d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 28 Apr 2019 17:05:15 -0400 Subject: [PATCH 02/24] add logging for streams we already have --- lbrynet/stream/stream_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 6fe849df5..9a4c47581 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -351,6 +351,7 @@ class StreamManager: # resume or update an existing stream, if the stream changed download it and delete the old one after updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) if updated_stream: + log.info("already have stream for %s", uri) return updated_stream content_fee = None From f8c0e80cfc4011071ad4f3e8a7b590cf8520e85f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 1 May 2019 14:23:16 -0400 Subject: [PATCH 03/24] add __slots__ to BlobInfo, StreamDescriptor, and KademliaPeer --- lbrynet/blob/blob_info.py | 7 +++++++ lbrynet/dht/peer.py | 9 +++++++++ lbrynet/stream/descriptor.py | 11 +++++++++++ 3 files changed, 27 insertions(+) diff --git a/lbrynet/blob/blob_info.py b/lbrynet/blob/blob_info.py index d300fd905..febfdcb65 100644 --- a/lbrynet/blob/blob_info.py +++ b/lbrynet/blob/blob_info.py @@ -2,6 +2,13 @@ import typing class BlobInfo: + __slots__ = [ + 'blob_hash', + 'blob_num', + 'length', + 'iv', + ] + def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None): self.blob_hash = blob_hash self.blob_num = blob_num diff --git a/lbrynet/dht/peer.py b/lbrynet/dht/peer.py index 9c8176eb5..23bc29635 100644 --- a/lbrynet/dht/peer.py +++ b/lbrynet/dht/peer.py @@ -135,6 +135,15 @@ class PeerManager: class KademliaPeer: + __slots__ = [ + 'loop', + '_node_id', + 'address', + 'udp_port', + 'tcp_port', + 'protocol_version', + ] + def __init__(self, loop: asyncio.BaseEventLoop, address: str, node_id: typing.Optional[bytes] = None, udp_port: typing.Optional[int] = None, tcp_port: typing.Optional[int] = None): if node_id is not None: diff --git a/lbrynet/stream/descriptor.py b/lbrynet/stream/descriptor.py index a2f4d1da2..2b0527d35 100644 --- a/lbrynet/stream/descriptor.py +++ b/lbrynet/stream/descriptor.py @@ -47,6 +47,17 @@ def file_reader(file_path: str): class StreamDescriptor: + __slots__ = [ + 'loop', + 'blob_dir', + 'stream_name', + 'key', + 'suggested_file_name', + 'blobs', + 'stream_hash', + 'sd_hash' + ] + def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, stream_name: str, key: str, suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None, sd_hash: typing.Optional[str] = None): From b134e0c9c906de8defabcff940cfe2796d9a59d4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 1 May 2019 14:24:19 -0400 Subject: [PATCH 04/24] fix blob_get --- lbrynet/extras/daemon/Daemon.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 59e1b72e5..fb2d5b79d 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -2927,10 +2927,12 @@ class Daemon(metaclass=JSONRPCServerType): blob = await download_blob(asyncio.get_event_loop(), self.conf, self.blob_manager, self.dht_node, blob_hash) if read: - with open(blob.file_path, 'rb') as handle: + with blob.reader_context() as handle: return handle.read().decode() - else: - return "Downloaded blob %s" % blob_hash + elif isinstance(blob, BlobBuffer): + log.warning("manually downloaded blob buffer could have missed garbage collection, clearing it") + blob.delete() + return "Downloaded blob %s" % blob_hash @requires(BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_blob_delete(self, blob_hash): From 9099ee2e8e4d6a941bb46d31e25ea233593c7b2b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 1 May 2019 17:09:50 -0400 Subject: [PATCH 05/24] fix/refactor starting and stopping files -move partial content handling into ManagedStream -add delayed stop test --- lbrynet/extras/daemon/Daemon.py | 55 +----- lbrynet/stream/managed_stream.py | 226 +++++++++++++++++------ lbrynet/stream/stream_manager.py | 78 ++++---- tests/unit/stream/test_managed_stream.py | 24 ++- tests/unit/stream/test_stream_manager.py | 4 +- 5 files changed, 222 insertions(+), 165 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index fb2d5b79d..36c39b5ca 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -18,7 +18,7 @@ from torba.client.baseaccount import SingleKey, HierarchicalDeterministic from lbrynet import utils from lbrynet.conf import Config, Setting -from lbrynet.blob.blob_file import is_valid_blobhash +from lbrynet.blob.blob_file import is_valid_blobhash, BlobBuffer from lbrynet.blob_exchange.downloader import download_blob from lbrynet.error import DownloadSDTimeout, ComponentsNotStarted from lbrynet.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet @@ -477,59 +477,11 @@ class Daemon(metaclass=JSONRPCServerType): raise web.HTTPServerError(text=stream['error']) raise web.HTTPFound(f"/stream/{stream.sd_hash}") - @staticmethod - def prepare_range_response_headers(get_range: str, stream: 'ManagedStream') -> typing.Tuple[typing.Dict[str, str], - int, int]: - if '=' in get_range: - get_range = get_range.split('=')[1] - start, end = get_range.split('-') - size = 0 - for blob in stream.descriptor.blobs[:-1]: - size += blob.length - 1 - start = int(start) - end = int(end) if end else size - 1 - skip_blobs = start // 2097150 - skip = skip_blobs * 2097151 - start = skip - final_size = end - start + 1 - - headers = { - 'Accept-Ranges': 'bytes', - 'Content-Range': f'bytes {start}-{end}/{size}', - 'Content-Length': str(final_size), - 'Content-Type': stream.mime_type - } - return headers, size, skip_blobs - async def handle_stream_range_request(self, request: web.Request): sd_hash = request.path.split("/stream/")[1] if sd_hash not in self.stream_manager.streams: return web.HTTPNotFound() - stream = self.stream_manager.streams[sd_hash] - if stream.status == 'stopped': - await self.stream_manager.start_stream(stream) - if stream.delayed_stop: - stream.delayed_stop.cancel() - headers, size, skip_blobs = self.prepare_range_response_headers( - request.headers.get('range', 'bytes=0-'), stream - ) - response = web.StreamResponse( - status=206, - headers=headers - ) - await response.prepare(request) - wrote = 0 - async for blob_info, decrypted in stream.aiter_read_stream(skip_blobs): - log.info("streamed blob %i/%i", blob_info.blob_num + 1, len(stream.descriptor.blobs) - 1) - if (blob_info.blob_num == len(stream.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): - decrypted += b'\x00' * (size - len(decrypted) - wrote) - await response.write_eof(decrypted) - break - else: - await response.write(decrypted) - wrote += len(decrypted) - response.force_close() - return response + return await self.stream_manager.stream_partial_content(request, sd_hash) async def _process_rpc_call(self, data): args = data.get('params', {}) @@ -924,7 +876,6 @@ class Daemon(metaclass=JSONRPCServerType): Returns: {File} """ - save_file = save_file if save_file is not None else self.conf.save_files try: stream = await self.stream_manager.download_stream_from_uri( uri, self.exchange_rate_manager, timeout, file_name, save_file=save_file @@ -1554,7 +1505,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.stream_manager.start_stream(stream) msg = "Resumed download" elif status == 'stop' and stream.running: - await self.stream_manager.stop_stream(stream) + await stream.stop() msg = "Stopped download" else: msg = ( diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index a0fdf4250..6535c665c 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -3,7 +3,9 @@ import asyncio import typing import logging import binascii +from aiohttp.web import Request, StreamResponse from lbrynet.utils import generate_id +from lbrynet.error import DownloadSDTimeout, DownloadDataTimeout from lbrynet.schema.mime_types import guess_media_type from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.descriptor import StreamDescriptor @@ -40,6 +42,33 @@ class ManagedStream: STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" + __slots__ = [ + 'loop', + 'config', + 'blob_manager', + 'sd_hash', + 'download_directory', + '_file_name', + '_status', + 'stream_claim_info', + 'download_id', + 'rowid', + 'written_bytes', + 'content_fee', + 'downloader', + 'analytics_manager', + 'fully_reflected', + 'file_output_task', + 'delayed_stop_task', + 'streaming_responses', + 'streaming', + '_running', + 'saving', + 'finished_writing', + 'started_writing', + + ] + def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, @@ -61,9 +90,13 @@ class ManagedStream: self.content_fee = content_fee self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager + self.fully_reflected = asyncio.Event(loop=self.loop) self.file_output_task: typing.Optional[asyncio.Task] = None - self.delayed_stop: typing.Optional[asyncio.Handle] = None + self.delayed_stop_task: typing.Optional[asyncio.Task] = None + self.streaming_responses: typing.List[StreamResponse] = [] + self.streaming = asyncio.Event(loop=self.loop) + self._running = asyncio.Event(loop=self.loop) self.saving = asyncio.Event(loop=self.loop) self.finished_writing = asyncio.Event(loop=self.loop) self.started_writing = asyncio.Event(loop=self.loop) @@ -84,9 +117,10 @@ class ManagedStream: def status(self) -> str: return self._status - def update_status(self, status: str): + async def update_status(self, status: str): assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] self._status = status + await self.blob_manager.storage.change_file_status(self.stream_hash, status) @property def finished(self) -> bool: @@ -216,47 +250,85 @@ class ManagedStream: return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) - async def setup(self, node: typing.Optional['Node'] = None, save_file: typing.Optional[bool] = True, - file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None): - await self.downloader.start(node) - if not save_file and not file_name: - if not await self.blob_manager.storage.file_exists(self.sd_hash): - self.rowid = await self.blob_manager.storage.save_downloaded_file( - self.stream_hash, None, None, 0.0 - ) - self.download_directory = None - self._file_name = None - self.update_status(ManagedStream.STATUS_RUNNING) - await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) - self.update_delayed_stop() - else: - await self.save_file(file_name, download_directory) - await self.started_writing.wait() + async def start(self, node: typing.Optional['Node'] = None, timeout: typing.Optional[float] = None, + save_now: bool = False): + timeout = timeout or self.config.download_timeout + if self._running.is_set(): + return + self._running.set() + start_time = self.loop.time() + try: + await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop) + if save_now: + await asyncio.wait_for(self.save_file(node=node), timeout - (self.loop.time() - start_time), + loop=self.loop) + except asyncio.TimeoutError: + self._running.clear() + if not self.descriptor: + raise DownloadSDTimeout(self.sd_hash) + raise DownloadDataTimeout(self.sd_hash) - def update_delayed_stop(self): - def _delayed_stop(): - log.info("Stopping inactive download for stream %s", self.sd_hash) - self.stop_download() + if self.delayed_stop_task and not self.delayed_stop_task.done(): + self.delayed_stop_task.cancel() + self.delayed_stop_task = self.loop.create_task(self._delayed_stop()) + if not await self.blob_manager.storage.file_exists(self.sd_hash): + if save_now: + file_name, download_dir = self._file_name, self.download_directory + else: + file_name, download_dir = None, None + self.rowid = await self.blob_manager.storage.save_downloaded_file( + self.stream_hash, file_name, download_dir, 0.0 + ) + if self.status != self.STATUS_RUNNING: + await self.update_status(self.STATUS_RUNNING) - if self.delayed_stop: - self.delayed_stop.cancel() - self.delayed_stop = self.loop.call_later(60, _delayed_stop) + async def stop(self, finished: bool = False): + """ + Stop any running save/stream tasks as well as the downloader and update the status in the database + """ - async def aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0) -> typing.AsyncIterator[ - typing.Tuple['BlobInfo', bytes]]: + self.stop_tasks() + if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: + await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) + + async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0)\ + -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]: if start_blob_num >= len(self.descriptor.blobs[:-1]): raise IndexError(start_blob_num) for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]): assert i + start_blob_num == blob_info.blob_num - if self.delayed_stop: - self.delayed_stop.cancel() - try: - decrypted = await self.downloader.read_blob(blob_info) - yield (blob_info, decrypted) - except asyncio.CancelledError: - if not self.saving.is_set() and not self.finished_writing.is_set(): - self.update_delayed_stop() - raise + decrypted = await self.downloader.read_blob(blob_info) + yield (blob_info, decrypted) + + async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: + await self.start(node) + headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-')) + response = StreamResponse( + status=206, + headers=headers + ) + await response.prepare(request) + self.streaming_responses.append(response) + self.streaming.set() + try: + wrote = 0 + async for blob_info, decrypted in self._aiter_read_stream(skip_blobs): + if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): + decrypted += b'\x00' * (size - len(decrypted) - wrote) + await response.write_eof(decrypted) + else: + await response.write(decrypted) + wrote += len(decrypted) + log.info("streamed %sblob %i/%i", "(closing stream) " if response._eof_sent else "", + blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) + if response._eof_sent: + break + return response + finally: + response.force_close() + if response in self.streaming_responses: + self.streaming_responses.remove(response) + self.streaming.clear() async def _save_file(self, output_path: str): log.debug("save file %s -> %s", self.sd_hash, output_path) @@ -265,15 +337,14 @@ class ManagedStream: self.started_writing.clear() try: with open(output_path, 'wb') as file_write_handle: - async for blob_info, decrypted in self.aiter_read_stream(): + async for blob_info, decrypted in self._aiter_read_stream(): log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) file_write_handle.write(decrypted) file_write_handle.flush() self.written_bytes += len(decrypted) if not self.started_writing.is_set(): self.started_writing.set() - self.update_status(ManagedStream.STATUS_FINISHED) - await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_FINISHED) + await self.update_status(ManagedStream.STATUS_FINISHED) if self.analytics_manager: self.loop.create_task(self.analytics_manager.send_download_finished( self.download_id, self.claim_name, self.sd_hash @@ -289,12 +360,11 @@ class ManagedStream: finally: self.saving.clear() - async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None): - if self.file_output_task and not self.file_output_task.done(): + async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, + node: typing.Optional['Node'] = None): + await self.start(node) + if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task self.file_output_task.cancel() - if self.delayed_stop: - self.delayed_stop.cancel() - self.delayed_stop = None self.download_directory = download_directory or self.download_directory or self.config.download_dir if not self.download_directory: raise ValueError("no directory to download to") @@ -303,28 +373,26 @@ class ManagedStream: if not os.path.isdir(self.download_directory): log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory) os.mkdir(self.download_directory) - if not await self.blob_manager.storage.file_exists(self.sd_hash): - self._file_name = await get_next_available_file_name( - self.loop, self.download_directory, - file_name or self._file_name or self.descriptor.suggested_file_name - ) - self.rowid = self.blob_manager.storage.save_downloaded_file( - self.stream_hash, self.file_name, self.download_directory, 0.0 - ) - else: - await self.blob_manager.storage.change_file_download_dir_and_file_name( - self.stream_hash, self.download_directory, self.file_name - ) - self.update_status(ManagedStream.STATUS_RUNNING) - await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) + self._file_name = await get_next_available_file_name( + self.loop, self.download_directory, + file_name or self._file_name or self.descriptor.suggested_file_name + ) + await self.blob_manager.storage.change_file_download_dir_and_file_name( + self.stream_hash, self.download_directory, self.file_name + ) + await self.update_status(ManagedStream.STATUS_RUNNING) self.written_bytes = 0 self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) + await self.started_writing.wait() - def stop_download(self): + def stop_tasks(self): if self.file_output_task and not self.file_output_task.done(): self.file_output_task.cancel() self.file_output_task = None + while self.streaming_responses: + self.streaming_responses.pop().force_close() self.downloader.stop() + self._running.clear() async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]: sent = [] @@ -365,3 +433,43 @@ class ManagedStream: binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], claim_info['claim_sequence'], claim_info.get('channel_name') ) + + async def update_content_claim(self, claim_info: typing.Optional[typing.Dict] = None): + if not claim_info: + claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) + self.set_claim(claim_info, claim_info['value']) + + async def _delayed_stop(self): + stalled_count = 0 + while self._running.is_set(): + if self.saving.is_set() or self.streaming.is_set(): + stalled_count = 0 + else: + stalled_count += 1 + if stalled_count > 1: + log.info("Stopping inactive download for stream %s", self.sd_hash) + await self.stop() + return + await asyncio.sleep(1, loop=self.loop) + + def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: + if '=' in get_range: + get_range = get_range.split('=')[1] + start, end = get_range.split('-') + size = 0 + for blob in self.descriptor.blobs[:-1]: + size += blob.length - 1 + start = int(start) + end = int(end) if end else size - 1 + skip_blobs = start // 2097150 + skip = skip_blobs * 2097151 + start = skip + final_size = end - start + 1 + + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start}-{end}/{size}', + 'Content-Length': str(final_size), + 'Content-Type': self.mime_type + } + return headers, size, skip_blobs diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 9a4c47581..2f5470c63 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -5,8 +5,9 @@ import binascii import logging import random from decimal import Decimal +from aiohttp.web import Request from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError -from lbrynet.error import DownloadSDTimeout, DownloadDataTimeout, ResolveTimeout +from lbrynet.error import ResolveTimeout, DownloadDataTimeout from lbrynet.utils import cache_concurrent from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.managed_stream import ManagedStream @@ -56,6 +57,7 @@ comparison_operators = { def path_or_none(p) -> typing.Optional[str]: return None if p == '{stream}' else binascii.unhexlify(p).decode() + class StreamManager: def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'], @@ -77,24 +79,6 @@ class StreamManager: claim_info = await self.storage.get_content_claim(stream.stream_hash) self.streams.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) - async def stop_stream(self, stream: ManagedStream): - stream.stop_download() - if not stream.finished and stream.output_file_exists: - try: - os.remove(stream.full_path) - except OSError as err: - log.warning("Failed to delete partial download %s from downloads directory: %s", stream.full_path, - str(err)) - if stream.running: - stream.update_status(ManagedStream.STATUS_STOPPED) - await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_STOPPED) - - async def start_stream(self, stream: ManagedStream): - stream.update_status(ManagedStream.STATUS_RUNNING) - await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_RUNNING) - await stream.setup(self.node, save_file=self.config.save_files) - self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) - async def recover_streams(self, file_infos: typing.List[typing.Dict]): to_restore = [] @@ -150,6 +134,7 @@ class StreamManager: await self.recover_streams(to_recover) if not self.config.save_files: + # set files that have been deleted manually to streaming mode to_set_as_streaming = [] for file_info in to_start: file_name = path_or_none(file_info['file_name']) @@ -176,7 +161,7 @@ class StreamManager: if not self.node: log.warning("no DHT node given, resuming downloads trusting that we can contact reflector") t = [ - self.loop.create_task(self.start_stream(stream)) for stream in self.streams.values() + self.loop.create_task(stream.start(node=self.node)) for stream in self.streams.values() if stream.running ] if t: @@ -214,7 +199,7 @@ class StreamManager: self.re_reflect_task.cancel() while self.streams: _, stream = self.streams.popitem() - stream.stop_download() + stream.stop_tasks() while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: @@ -236,7 +221,7 @@ class StreamManager: return stream async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): - await self.stop_stream(stream) + stream.stop_tasks() if stream.sd_hash in self.streams: del self.streams[stream.sd_hash] blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] @@ -290,21 +275,16 @@ class StreamManager: typing.Optional[ManagedStream], typing.Optional[ManagedStream]]: existing = self.get_filtered_streams(outpoint=outpoint) if existing: - if existing[0].status == ManagedStream.STATUS_STOPPED: - await self.start_stream(existing[0]) return existing[0], None existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash) if existing and existing[0].claim_id != claim_id: - raise ResolveError(f"stream for {existing[0].claim_id} collides with existing " - f"download {claim_id}") + raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}") if existing: log.info("claim contains a metadata only update to a stream we have") await self.storage.save_content_claim( existing[0].stream_hash, outpoint ) await self._update_content_claim(existing[0]) - if not existing[0].running: - await self.start_stream(existing[0]) return existing[0], None else: existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id) @@ -318,13 +298,23 @@ class StreamManager: timeout: typing.Optional[float] = None, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, - save_file: bool = True, resolve_timeout: float = 3.0) -> ManagedStream: + save_file: typing.Optional[bool] = None, + resolve_timeout: float = 3.0) -> ManagedStream: timeout = timeout or self.config.download_timeout start_time = self.loop.time() resolved_time = None stream = None error = None outpoint = None + if save_file is None: + save_file = self.config.save_files + if file_name and not save_file: + save_file = True + if save_file: + download_directory = download_directory or self.config.download_dir + else: + download_directory = None + try: # resolve the claim parsed_uri = parse_lbry_uri(uri) @@ -352,6 +342,9 @@ class StreamManager: updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) if updated_stream: log.info("already have stream for %s", uri) + if save_file and updated_stream.output_file_exists: + save_file = False + await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) return updated_stream content_fee = None @@ -381,30 +374,18 @@ class StreamManager: log.info("paid fee of %s for %s", fee_amount, uri) - download_directory = download_directory or self.config.download_dir - if not file_name and (not self.config.save_files or not save_file): - download_dir, file_name = None, None stream = ManagedStream( self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=content_fee, analytics_manager=self.analytics_manager ) log.info("starting download for %s", uri) - try: - await asyncio.wait_for(stream.setup( - self.node, save_file=save_file, file_name=file_name, download_directory=download_directory - ), timeout, loop=self.loop) - except asyncio.TimeoutError: - if not stream.descriptor: - raise DownloadSDTimeout(stream.sd_hash) - raise DownloadDataTimeout(stream.sd_hash) - finally: - if stream.descriptor: - if to_replace: # delete old stream now that the replacement has started downloading - await self.delete_stream(to_replace) - stream.set_claim(resolved, claim) - await self.storage.save_content_claim(stream.stream_hash, outpoint) - self.streams[stream.sd_hash] = stream + await stream.start(self.node, timeout, save_now=save_file) + if to_replace: # delete old stream now that the replacement has started downloading + await self.delete_stream(to_replace) + self.streams[stream.sd_hash] = stream + stream.set_claim(resolved, claim) + await self.storage.save_content_claim(stream.stream_hash, outpoint) return stream except DownloadDataTimeout as err: # forgive data timeout, dont delete stream error = err @@ -435,3 +416,6 @@ class StreamManager: ) if error: raise error + + async def stream_partial_content(self, request: Request, sd_hash: str): + return await self.streams[sd_hash].stream_file(request, self.node) diff --git a/tests/unit/stream/test_managed_stream.py b/tests/unit/stream/test_managed_stream.py index 372e5766d..0db4af1fe 100644 --- a/tests/unit/stream/test_managed_stream.py +++ b/tests/unit/stream/test_managed_stream.py @@ -40,7 +40,7 @@ class TestManagedStream(BlobExchangeTestBase): self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir ) - async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None): + async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True): await self.setup_stream(blob_count) mock_node = mock.Mock(spec=Node) @@ -51,10 +51,11 @@ class TestManagedStream(BlobExchangeTestBase): return q2, self.loop.create_task(_task()) mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers - await self.stream.setup(mock_node, save_file=True) + await self.stream.save_file(node=mock_node) await self.stream.finished_writing.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) - self.stream.stop_download() + if stop_when_done: + await self.stream.stop() self.assertTrue(os.path.isfile(self.stream.full_path)) with open(self.stream.full_path, 'rb') as f: self.assertEqual(f.read(), self.stream_bytes) @@ -62,6 +63,18 @@ class TestManagedStream(BlobExchangeTestBase): async def test_transfer_stream(self): await self._test_transfer_stream(10) + self.assertEqual(self.stream.status, "finished") + self.assertFalse(self.stream._running.is_set()) + + async def test_delayed_stop(self): + await self._test_transfer_stream(10, stop_when_done=False) + self.assertEqual(self.stream.status, "finished") + self.assertTrue(self.stream._running.is_set()) + await asyncio.sleep(0.5, loop=self.loop) + self.assertTrue(self.stream._running.is_set()) + await asyncio.sleep(0.6, loop=self.loop) + self.assertEqual(self.stream.status, "finished") + self.assertFalse(self.stream._running.is_set()) @unittest.SkipTest async def test_transfer_hundred_blob_stream(self): @@ -85,11 +98,12 @@ class TestManagedStream(BlobExchangeTestBase): mock_node.accumulate_peers = _mock_accumulate_peers - await self.stream.setup(mock_node, save_file=True) + await self.stream.save_file(node=mock_node) await self.stream.finished_writing.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) with open(self.stream.full_path, 'rb') as f: self.assertEqual(f.read(), self.stream_bytes) + await self.stream.stop() # self.assertIs(self.server_from_client.tcp_last_down, None) # self.assertIsNot(bad_peer.tcp_last_down, None) @@ -125,7 +139,7 @@ class TestManagedStream(BlobExchangeTestBase): with open(os.path.join(self.client_blob_manager.blob_dir, blob_info.blob_hash), "rb+") as handle: handle.truncate() handle.flush() - await self.stream.setup() + await self.stream.save_file() await self.stream.finished_writing.wait() if corrupt: return self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file"))) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 761154b42..6ca453501 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -225,7 +225,7 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "running") - await self.stream_manager.stop_stream(stream) + await stream.stop() self.assertFalse(stream.finished) self.assertFalse(stream.running) @@ -235,7 +235,7 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "stopped") - await self.stream_manager.start_stream(stream) + await stream.save_file(node=self.stream_manager.node) await stream.finished_writing.wait() await asyncio.sleep(0, loop=self.loop) self.assertTrue(stream.finished) From cbe689ea7fe161274d047e13bdb95806ff8dac7e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 2 May 2019 16:55:53 -0400 Subject: [PATCH 06/24] logging --- lbrynet/stream/managed_stream.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 6535c665c..ef80fc692 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -255,6 +255,7 @@ class ManagedStream: timeout = timeout or self.config.download_timeout if self._running.is_set(): return + log.info("start downloader for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, self.sd_hash[:6]) self._running.set() start_time = self.loop.time() try: @@ -301,6 +302,8 @@ class ManagedStream: yield (blob_info, decrypted) async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: + log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, + self.sd_hash[:6]) await self.start(node) headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-')) response = StreamResponse( @@ -319,7 +322,7 @@ class ManagedStream: else: await response.write(decrypted) wrote += len(decrypted) - log.info("streamed %sblob %i/%i", "(closing stream) " if response._eof_sent else "", + log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) if response._eof_sent: break @@ -331,7 +334,8 @@ class ManagedStream: self.streaming.clear() async def _save_file(self, output_path: str): - log.debug("save file %s -> %s", self.sd_hash, output_path) + log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6], + output_path) self.saving.set() self.finished_writing.clear() self.started_writing.clear() @@ -352,7 +356,7 @@ class ManagedStream: self.finished_writing.set() except Exception as err: if os.path.isfile(output_path): - log.info("removing incomplete download %s for %s", output_path, self.sd_hash) + log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) os.remove(output_path) if not isinstance(err, asyncio.CancelledError): log.exception("unexpected error encountered writing file for stream %s", self.sd_hash) @@ -447,7 +451,8 @@ class ManagedStream: else: stalled_count += 1 if stalled_count > 1: - log.info("Stopping inactive download for stream %s", self.sd_hash) + log.info("stopping inactive download for lbry://%s#%s (%s...)", self.claim_name, self.claim_id, + self.sd_hash[:6]) await self.stop() return await asyncio.sleep(1, loop=self.loop) From 1f7feafb67a345a603a6d92180f4e518befe90a4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 2 May 2019 16:56:29 -0400 Subject: [PATCH 07/24] force close open streaming requests and api calls on shutdown --- lbrynet/extras/daemon/Daemon.py | 1 + lbrynet/stream/managed_stream.py | 13 ++++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 36c39b5ca..cf9cc2c4e 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -439,6 +439,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.component_manager.stop() else: self.component_startup_task.cancel() + await self.runner.shutdown() await self.runner.cleanup() if self.analytics_manager.is_started: self.analytics_manager.stop() diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index ef80fc692..b8e4f1eb9 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -94,7 +94,7 @@ class ManagedStream: self.fully_reflected = asyncio.Event(loop=self.loop) self.file_output_task: typing.Optional[asyncio.Task] = None self.delayed_stop_task: typing.Optional[asyncio.Task] = None - self.streaming_responses: typing.List[StreamResponse] = [] + self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] self.streaming = asyncio.Event(loop=self.loop) self._running = asyncio.Event(loop=self.loop) self.saving = asyncio.Event(loop=self.loop) @@ -311,7 +311,7 @@ class ManagedStream: headers=headers ) await response.prepare(request) - self.streaming_responses.append(response) + self.streaming_responses.append((request, response)) self.streaming.set() try: wrote = 0 @@ -329,8 +329,9 @@ class ManagedStream: return response finally: response.force_close() - if response in self.streaming_responses: - self.streaming_responses.remove(response) + if (request, response) in self.streaming_responses: + self.streaming_responses.remove((request, response)) + if not self.streaming_responses: self.streaming.clear() async def _save_file(self, output_path: str): @@ -394,7 +395,9 @@ class ManagedStream: self.file_output_task.cancel() self.file_output_task = None while self.streaming_responses: - self.streaming_responses.pop().force_close() + req, response = self.streaming_responses.pop() + response.force_close() + req.transport.close() self.downloader.stop() self._running.clear() From 14b12cbea2bcfc8a595c0401e82c30c4077eada1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 2 May 2019 16:56:49 -0400 Subject: [PATCH 08/24] block /stream and /get requests on the stream manager starting --- lbrynet/extras/daemon/Daemon.py | 4 ++++ lbrynet/stream/stream_manager.py | 3 +++ 2 files changed, 7 insertions(+) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index cf9cc2c4e..fbfedf927 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -473,6 +473,8 @@ class Daemon(metaclass=JSONRPCServerType): else: name, claim_id = name_and_claim_id.split("/") uri = f"lbry://{name}#{claim_id}" + if not self.stream_manager.started.is_set(): + await self.stream_manager.started.wait() stream = await self.jsonrpc_get(uri) if isinstance(stream, dict): raise web.HTTPServerError(text=stream['error']) @@ -480,6 +482,8 @@ class Daemon(metaclass=JSONRPCServerType): async def handle_stream_range_request(self, request: web.Request): sd_hash = request.path.split("/stream/")[1] + if not self.stream_manager.started.is_set(): + await self.stream_manager.started.wait() if sd_hash not in self.stream_manager.streams: return web.HTTPNotFound() return await self.stream_manager.stream_partial_content(request, sd_hash) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 2f5470c63..e1f00261b 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -74,6 +74,7 @@ class StreamManager: self.re_reflect_task: asyncio.Task = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.running_reflector_uploads: typing.List[asyncio.Task] = [] + self.started = asyncio.Event(loop=self.loop) async def _update_content_claim(self, stream: ManagedStream): claim_info = await self.storage.get_content_claim(stream.stream_hash) @@ -191,6 +192,7 @@ class StreamManager: await self.load_streams_from_database() self.resume_downloading_task = self.loop.create_task(self.resume()) self.re_reflect_task = self.loop.create_task(self.reflect_streams()) + self.started.set() def stop(self): if self.resume_downloading_task and not self.resume_downloading_task.done(): @@ -204,6 +206,7 @@ class StreamManager: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: self.running_reflector_uploads.pop().cancel() + self.started.clear() async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: From d80c671cf2c25cd152da8ccb5a3a069f0f885336 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 May 2019 14:52:06 -0400 Subject: [PATCH 09/24] look victor --- lbrynet/blob_exchange/client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 67ca75cae..1506af864 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -4,6 +4,7 @@ import typing import binascii from lbrynet.error import InvalidBlobHashError, InvalidDataError from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest +from lbrynet.utils import cache_concurrent if typing.TYPE_CHECKING: from lbrynet.blob.blob_file import AbstractBlob from lbrynet.blob.writer import HashBlobWriter @@ -158,8 +159,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return await self._download_blob() except OSError as e: # i'm not sure how to fix this race condition - jack - log.exception("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) - return self._blob_bytes_received, self.transport + log.warning("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) + # return self._blob_bytes_received, self.transport + raise except asyncio.TimeoutError: if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() @@ -184,6 +186,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.close() +@cache_concurrent async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, connected_transport: asyncio.Transport = None)\ From 1116c7f29e181f69da6aebde4e8d43943bcc6177 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 May 2019 14:53:23 -0400 Subject: [PATCH 10/24] logging --- lbrynet/extras/daemon/Daemon.py | 3 +++ lbrynet/stream/managed_stream.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index fbfedf927..b8d506378 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -439,10 +439,13 @@ class Daemon(metaclass=JSONRPCServerType): await self.component_manager.stop() else: self.component_startup_task.cancel() + log.info("stopped api components") await self.runner.shutdown() await self.runner.cleanup() + log.info("stopped api server") if self.analytics_manager.is_started: self.analytics_manager.stop() + log.info("finished shutting down") async def handle_old_jsonrpc(self, request): data = await request.json() diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index b8e4f1eb9..2bb119b72 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -355,6 +355,8 @@ class ManagedStream: self.download_id, self.claim_name, self.sd_hash )) self.finished_writing.set() + log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, + self.sd_hash[:6]) except Exception as err: if os.path.isfile(output_path): log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) From 4e32b69d1d168863881ba3c26fef90eea061e740 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 May 2019 14:54:09 -0400 Subject: [PATCH 11/24] don't set running streams as stopped on startup --- lbrynet/stream/stream_manager.py | 39 ++++++++++---------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index e1f00261b..e19f6c12e 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -7,7 +7,7 @@ import random from decimal import Decimal from aiohttp.web import Request from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError -from lbrynet.error import ResolveTimeout, DownloadDataTimeout +from lbrynet.error import ResolveTimeout, DownloadDataTimeout, DownloadSDTimeout from lbrynet.utils import cache_concurrent from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.managed_stream import ManagedStream @@ -124,29 +124,19 @@ class StreamManager: async def load_streams_from_database(self): to_recover = [] to_start = [] - await self.storage.sync_files_to_blobs() + + # this will set streams marked as finished and are missing blobs as being stopped + # await self.storage.sync_files_to_blobs() for file_info in await self.storage.get_all_lbry_files(): + # if the sd blob is not verified, try to reconstruct it from the database + # this could either be because the blob files were deleted manually or save_blobs was not true when + # the stream was downloaded if not self.blob_manager.is_blob_verified(file_info['sd_hash']): to_recover.append(file_info) to_start.append(file_info) if to_recover: - # if self.blob_manager._save_blobs: - # log.info("Attempting to recover %i streams", len(to_recover)) await self.recover_streams(to_recover) - if not self.config.save_files: - # set files that have been deleted manually to streaming mode - to_set_as_streaming = [] - for file_info in to_start: - file_name = path_or_none(file_info['file_name']) - download_dir = path_or_none(file_info['download_directory']) - if file_name and download_dir and not os.path.isfile(os.path.join(file_name, download_dir)): - file_info['file_name'], file_info['download_directory'] = '{stream}', '{stream}' - to_set_as_streaming.append(file_info['stream_hash']) - - if to_set_as_streaming: - await self.storage.set_files_as_streaming(to_set_as_streaming) - log.info("Initializing %i files", len(to_start)) if to_start: await asyncio.gather(*[ @@ -162,8 +152,9 @@ class StreamManager: if not self.node: log.warning("no DHT node given, resuming downloads trusting that we can contact reflector") t = [ - self.loop.create_task(stream.start(node=self.node)) for stream in self.streams.values() - if stream.running + self.loop.create_task( + stream.start(node=self.node, save_now=(stream.full_path is not None)) + ) for stream in self.streams.values() if stream.running ] if t: log.info("resuming %i downloads", len(t)) @@ -207,6 +198,7 @@ class StreamManager: while self.running_reflector_uploads: self.running_reflector_uploads.pop().cancel() self.started.clear() + log.info("finished stopping the stream manager") async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: @@ -390,14 +382,9 @@ class StreamManager: stream.set_claim(resolved, claim) await self.storage.save_content_claim(stream.stream_hash, outpoint) return stream - except DownloadDataTimeout as err: # forgive data timeout, dont delete stream + except Exception as err: # forgive data timeout, dont delete stream error = err raise - except Exception as err: - error = err - if stream and stream.descriptor: - await self.storage.delete_stream(stream.descriptor) - await self.blob_manager.delete_blob(stream.sd_hash) finally: if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or stream.downloader.time_to_first_bytes))): @@ -417,8 +404,6 @@ class StreamManager: None if not error else error.__class__.__name__ ) ) - if error: - raise error async def stream_partial_content(self, request: Request, sd_hash: str): return await self.streams[sd_hash].stream_file(request, self.node) From bd6a609b30c5998c05691668c00ba7b1eda78474 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 May 2019 16:36:24 -0400 Subject: [PATCH 12/24] fix logging --- lbrynet/stream/managed_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 2bb119b72..188c9d5c1 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -356,7 +356,7 @@ class ManagedStream: )) self.finished_writing.set() log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, - self.sd_hash[:6]) + self.sd_hash[:6], self.full_path) except Exception as err: if os.path.isfile(output_path): log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) From f7412514b36e6facd935fdd2e75cb93c66d6016d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 May 2019 20:38:33 -0400 Subject: [PATCH 13/24] fix partial content 0 padding --- lbrynet/stream/managed_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 188c9d5c1..21e6d0368 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -317,7 +317,7 @@ class ManagedStream: wrote = 0 async for blob_info, decrypted in self._aiter_read_stream(skip_blobs): if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): - decrypted += b'\x00' * (size - len(decrypted) - wrote) + decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151))) await response.write_eof(decrypted) else: await response.write(decrypted) From f506b3e6d43ad019a58e2728ee837d95284e1283 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 3 May 2019 20:50:11 -0400 Subject: [PATCH 14/24] tests --- lbrynet/extras/daemon/Daemon.py | 5 +++-- lbrynet/stream/stream_manager.py | 2 +- lbrynet/testcase.py | 2 +- tests/integration/test_cli.py | 2 +- tests/integration/test_streaming.py | 6 ++++-- tests/integration/test_sync.py | 2 +- tests/unit/stream/test_stream_manager.py | 2 +- 7 files changed, 12 insertions(+), 9 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index b8d506378..3b42e7164 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -433,14 +433,15 @@ class Daemon(metaclass=JSONRPCServerType): self.component_startup_task = asyncio.create_task(self.component_manager.start()) await self.component_startup_task - async def stop(self): + async def stop(self, shutdown_runner=True): if self.component_startup_task is not None: if self.component_startup_task.done(): await self.component_manager.stop() else: self.component_startup_task.cancel() log.info("stopped api components") - await self.runner.shutdown() + if shutdown_runner: + await self.runner.shutdown() await self.runner.cleanup() log.info("stopped api server") if self.analytics_manager.is_started: diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index e19f6c12e..ac1566d1c 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -7,7 +7,7 @@ import random from decimal import Decimal from aiohttp.web import Request from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError -from lbrynet.error import ResolveTimeout, DownloadDataTimeout, DownloadSDTimeout +from lbrynet.error import ResolveTimeout from lbrynet.utils import cache_concurrent from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.managed_stream import ManagedStream diff --git a/lbrynet/testcase.py b/lbrynet/testcase.py index 75854015e..29a487cf2 100644 --- a/lbrynet/testcase.py +++ b/lbrynet/testcase.py @@ -124,7 +124,7 @@ class CommandTestCase(IntegrationTestCase): async def asyncTearDown(self): await super().asyncTearDown() self.wallet_component._running = False - await self.daemon.stop() + await self.daemon.stop(shutdown_runner=False) async def confirm_tx(self, txid): """ Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """ diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index a6a4c97e6..b01cf54c0 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -29,7 +29,7 @@ class CLIIntegrationTest(AsyncioTestCase): await self.daemon.start() async def asyncTearDown(self): - await self.daemon.stop() + await self.daemon.stop(shutdown_runner=False) def test_cli_status_command_with_auth(self): actual_output = StringIO() diff --git a/tests/integration/test_streaming.py b/tests/integration/test_streaming.py index 1c4bdc8df..832e8fed2 100644 --- a/tests/integration/test_streaming.py +++ b/tests/integration/test_streaming.py @@ -340,9 +340,11 @@ class RangeRequests(CommandTestCase): self.assertTrue(os.path.isfile(path)) await self._restart_stream_manager() stream = self.daemon.jsonrpc_file_list()[0] - - self.assertIsNone(stream.full_path) + self.assertIsNotNone(stream.full_path) self.assertFalse(os.path.isfile(path)) + if wait_for_start_writing: + await stream.started_writing.wait() + self.assertTrue(os.path.isfile(path)) async def test_file_save_stop_before_finished_streaming_only_wait_for_start(self): return await self.test_file_save_stop_before_finished_streaming_only(wait_for_start_writing=True) diff --git a/tests/integration/test_sync.py b/tests/integration/test_sync.py index 829b47d27..f59dd72bd 100644 --- a/tests/integration/test_sync.py +++ b/tests/integration/test_sync.py @@ -54,7 +54,7 @@ class AccountSynchronization(AsyncioTestCase): async def asyncTearDown(self): self.wallet_component._running = False - await self.daemon.stop() + await self.daemon.stop(shutdown_runner=False) @mock.patch('time.time', mock.Mock(return_value=12345)) def test_sync(self): diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 6ca453501..b6df1d237 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -337,7 +337,7 @@ class TestStreamManager(BlobExchangeTestBase): for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]: blob_status = await self.client_storage.get_blob_status(blob_hash) self.assertEqual('pending', blob_status) - self.assertEqual('stopped', self.stream_manager.streams[self.sd_hash].status) + self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status) sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) self.assertTrue(sd_blob.file_exists) From b2f63a1545b8b86a7f646eb1e26c5cf164b87baf Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 5 May 2019 19:41:35 -0400 Subject: [PATCH 15/24] fix tests --- lbrynet/extras/daemon/Daemon.py | 2 +- lbrynet/stream/managed_stream.py | 10 ++-------- lbrynet/stream/stream_manager.py | 16 +++++++++++++--- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 3b42e7164..aa4c3606e 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1511,7 +1511,7 @@ class Daemon(metaclass=JSONRPCServerType): raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] if status == 'start' and not stream.running: - await self.stream_manager.start_stream(stream) + await stream.save_file(node=self.stream_manager.node) msg = "Resumed download" elif status == 'stop' and stream.running: await stream.stop() diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 21e6d0368..1a1221546 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -5,7 +5,7 @@ import logging import binascii from aiohttp.web import Request, StreamResponse from lbrynet.utils import generate_id -from lbrynet.error import DownloadSDTimeout, DownloadDataTimeout +from lbrynet.error import DownloadSDTimeout from lbrynet.schema.mime_types import guess_media_type from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.descriptor import StreamDescriptor @@ -257,17 +257,11 @@ class ManagedStream: return log.info("start downloader for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, self.sd_hash[:6]) self._running.set() - start_time = self.loop.time() try: await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop) - if save_now: - await asyncio.wait_for(self.save_file(node=node), timeout - (self.loop.time() - start_time), - loop=self.loop) except asyncio.TimeoutError: self._running.clear() - if not self.descriptor: - raise DownloadSDTimeout(self.sd_hash) - raise DownloadDataTimeout(self.sd_hash) + raise DownloadSDTimeout(self.sd_hash) if self.delayed_stop_task and not self.delayed_stop_task.done(): self.delayed_stop_task.cancel() diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index ac1566d1c..bdf63f137 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -7,7 +7,7 @@ import random from decimal import Decimal from aiohttp.web import Request from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError -from lbrynet.error import ResolveTimeout +from lbrynet.error import ResolveTimeout, DownloadDataTimeout from lbrynet.utils import cache_concurrent from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.managed_stream import ManagedStream @@ -375,13 +375,23 @@ class StreamManager: analytics_manager=self.analytics_manager ) log.info("starting download for %s", uri) - await stream.start(self.node, timeout, save_now=save_file) + + before_download = self.loop.time() + await stream.start(self.node, timeout) + stream.set_claim(resolved, claim) if to_replace: # delete old stream now that the replacement has started downloading await self.delete_stream(to_replace) self.streams[stream.sd_hash] = stream - stream.set_claim(resolved, claim) + + self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) await self.storage.save_content_claim(stream.stream_hash, outpoint) + if save_file: + await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download), + loop=self.loop) return stream + except asyncio.TimeoutError: + error = DownloadDataTimeout(stream.sd_hash) + raise error except Exception as err: # forgive data timeout, dont delete stream error = err raise From 24e073680b5510dab50002b0754c7898b4f6e898 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 5 May 2019 20:22:10 -0400 Subject: [PATCH 16/24] add connection id workaround --- lbrynet/blob_exchange/client.py | 2 +- lbrynet/blob_exchange/downloader.py | 9 +++++---- lbrynet/stream/downloader.py | 16 ++++++++-------- lbrynet/stream/managed_stream.py | 10 +++++----- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 1506af864..65af4f4cc 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -189,7 +189,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, - connected_transport: asyncio.Transport = None)\ + connected_transport: asyncio.Transport = None, connection_id: int = 0)\ -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: """ Returns [, ] diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index b8b854a49..fad75253a 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -33,7 +33,7 @@ class BlobDownloader: return False return not (blob.get_is_verified() or not blob.is_writeable()) - async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer'): + async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): if blob.get_is_verified(): return self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones @@ -41,7 +41,7 @@ class BlobDownloader: start = self.loop.time() bytes_received, transport = await request_blob( self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout, connected_transport=transport + self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id ) if not transport and peer not in self.ignored: self.ignored[peer] = self.loop.time() @@ -74,7 +74,8 @@ class BlobDownloader: )) @cache_concurrent - async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'AbstractBlob': + async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None, + connection_id: int = 0) -> 'AbstractBlob': blob = self.blob_manager.get_blob(blob_hash, length) if blob.get_is_verified(): return blob @@ -94,7 +95,7 @@ class BlobDownloader: break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active() diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 8c0653d4d..b152f6e57 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -58,14 +58,14 @@ class StreamDownloader: self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers) - async def load_descriptor(self): + async def load_descriptor(self, connection_id: int = 0): # download or get the sd blob sd_blob = self.blob_manager.get_blob(self.sd_hash) if not sd_blob.get_is_verified(): try: now = self.loop.time() sd_blob = await asyncio.wait_for( - self.blob_downloader.download_blob(self.sd_hash), + self.blob_downloader.download_blob(self.sd_hash, connection_id), self.config.blob_download_timeout, loop=self.loop ) log.info("downloaded sd blob %s", self.sd_hash) @@ -79,7 +79,7 @@ class StreamDownloader: ) log.info("loaded stream manifest %s", self.sd_hash) - async def start(self, node: typing.Optional['Node'] = None): + async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0): # set up peer accumulation if node: self.node = node @@ -90,7 +90,7 @@ class StreamDownloader: log.info("searching for peers for stream %s", self.sd_hash) if not self.descriptor: - await self.load_descriptor() + await self.load_descriptor(connection_id) # add the head blob to the peer search self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash) @@ -101,10 +101,10 @@ class StreamDownloader: self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor ) - async def download_stream_blob(self, blob_info: 'BlobInfo') -> 'AbstractBlob': + async def download_stream_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> 'AbstractBlob': if not filter(lambda blob: blob.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]): raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}") - blob = await self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length) + blob = await self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id) return blob def decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob') -> bytes: @@ -112,11 +112,11 @@ class StreamDownloader: binascii.unhexlify(self.descriptor.key.encode()), binascii.unhexlify(blob_info.iv.encode()) ) - async def read_blob(self, blob_info: 'BlobInfo') -> bytes: + async def read_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> bytes: start = None if self.time_to_first_bytes is None: start = self.loop.time() - blob = await self.download_stream_blob(blob_info) + blob = await self.download_stream_blob(blob_info, connection_id) decrypted = self.decrypt_blob(blob_info, blob) if start: self.time_to_first_bytes = self.loop.time() - start diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 1a1221546..1d75a7d57 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -255,7 +255,7 @@ class ManagedStream: timeout = timeout or self.config.download_timeout if self._running.is_set(): return - log.info("start downloader for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, self.sd_hash[:6]) + log.info("start downloader for stream (sd hash: %s)", self.sd_hash) self._running.set() try: await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop) @@ -286,13 +286,13 @@ class ManagedStream: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) - async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0)\ + async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, connection_id: int = 0)\ -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]: if start_blob_num >= len(self.descriptor.blobs[:-1]): raise IndexError(start_blob_num) for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]): assert i + start_blob_num == blob_info.blob_num - decrypted = await self.downloader.read_blob(blob_info) + decrypted = await self.downloader.read_blob(blob_info, connection_id) yield (blob_info, decrypted) async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: @@ -309,7 +309,7 @@ class ManagedStream: self.streaming.set() try: wrote = 0 - async for blob_info, decrypted in self._aiter_read_stream(skip_blobs): + async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2): if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151))) await response.write_eof(decrypted) @@ -336,7 +336,7 @@ class ManagedStream: self.started_writing.clear() try: with open(output_path, 'wb') as file_write_handle: - async for blob_info, decrypted in self._aiter_read_stream(): + async for blob_info, decrypted in self._aiter_read_stream(connection_id=1): log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) file_write_handle.write(decrypted) file_write_handle.flush() From f642cfe9ddc60c8792c3e88480f52846ac0cb0e3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 12:51:51 -0400 Subject: [PATCH 17/24] fix resuming save tasks on startup --- lbrynet/stream/stream_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index bdf63f137..262565c70 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -154,6 +154,8 @@ class StreamManager: t = [ self.loop.create_task( stream.start(node=self.node, save_now=(stream.full_path is not None)) + if not stream.full_path else + stream.save_file(node=self.node) ) for stream in self.streams.values() if stream.running ] if t: From 4f0e8fce6e125709ccc2e864de9c4a7b6fc88bad Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 13:20:03 -0400 Subject: [PATCH 18/24] add optional download_directory argument to get --- lbrynet/extras/daemon/Daemon.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index aa4c3606e..faafe7304 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -868,26 +868,30 @@ class Daemon(metaclass=JSONRPCServerType): @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, STREAM_MANAGER_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - async def jsonrpc_get(self, uri, file_name=None, timeout=None, save_file=None): + async def jsonrpc_get(self, uri, file_name=None, download_directory=None, timeout=None, save_file=None): """ Download stream from a LBRY name. Usage: - get [ | --file_name=] [ | --timeout=] - [--save_file=] + get [ | --file_name=] + [ | --download_directory=] [ | --timeout=] + [--save_file=] Options: --uri= : (str) uri of the content to download --file_name= : (str) specified name for the downloaded file, overrides the stream file name + --download_directory= : (str) full path to the directory to download into --timeout= : (int) download timeout in number of seconds --save_file= : (bool) save the file to the downloads directory Returns: {File} """ + if download_directory and not os.path.isdir(download_directory): + return {"error": f"specified download directory \"{download_directory}\" does not exist"} try: stream = await self.stream_manager.download_stream_from_uri( - uri, self.exchange_rate_manager, timeout, file_name, save_file=save_file + uri, self.exchange_rate_manager, timeout, file_name, download_directory, save_file=save_file ) if not stream: raise DownloadSDTimeout(uri) From 22c701fd504ba8c346e38c9ded67683ac8bf8478 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 13:39:51 -0400 Subject: [PATCH 19/24] fix completed field for an item in file list --- lbrynet/stream/managed_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 1d75a7d57..511b42738 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -203,7 +203,7 @@ class ManagedStream: else: written_bytes = self.written_bytes return { - 'completed': self.finished, + 'completed': self.output_file_exists and self.status in ('stopped', 'finished'), 'file_name': self.file_name, 'download_directory': self.download_directory, 'points_paid': 0.0, From bc060f137803edf93fa3ec5f002d85a2e8466cbf Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 13:50:34 -0400 Subject: [PATCH 20/24] pay fee after downloading the sd blob/adding to the db --- lbrynet/stream/stream_manager.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 262565c70..3a99f3de9 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -345,6 +345,7 @@ class StreamManager: return updated_stream content_fee = None + fee_amount, fee_address = None, None # check that the fee is payable if not to_replace and claim.stream.has_fee: @@ -365,12 +366,6 @@ class StreamManager: raise InsufficientFundsError(msg) fee_address = claim.stream.fee.address - content_fee = await self.wallet.send_amount_to_address( - lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1') - ) - - log.info("paid fee of %s for %s", fee_amount, uri) - stream = ManagedStream( self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=content_fee, @@ -383,8 +378,13 @@ class StreamManager: stream.set_claim(resolved, claim) if to_replace: # delete old stream now that the replacement has started downloading await self.delete_stream(to_replace) - self.streams[stream.sd_hash] = stream + elif fee_address: + content_fee = await self.wallet.send_amount_to_address( + lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1') + ) + log.info("paid fee of %s for %s", fee_amount, uri) + self.streams[stream.sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) await self.storage.save_content_claim(stream.stream_hash, outpoint) if save_file: From a212cf6ba45a56a140b893209a11f47fd5ddaccf Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 13:53:30 -0400 Subject: [PATCH 21/24] set content_fee attribute --- lbrynet/stream/stream_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 3a99f3de9..df226e460 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -379,9 +379,10 @@ class StreamManager: if to_replace: # delete old stream now that the replacement has started downloading await self.delete_stream(to_replace) elif fee_address: - content_fee = await self.wallet.send_amount_to_address( + stream.content_fee = await self.wallet.send_amount_to_address( lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1') ) + log.info("paid fee of %s for %s", fee_amount, uri) self.streams[stream.sd_hash] = stream From 3c231d621564ed95f447744e287255816c506494 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 13:58:57 -0400 Subject: [PATCH 22/24] handle get with file_name/download_directory/save_file arg for a stream that is already running --- lbrynet/stream/stream_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index df226e460..242ff99ed 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -342,6 +342,10 @@ class StreamManager: if save_file and updated_stream.output_file_exists: save_file = False await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) + if save_file or file_name or download_directory: + await updated_stream.save_file( + file_name=file_name, download_directory=download_directory, node=self.node + ) return updated_stream content_fee = None @@ -382,7 +386,6 @@ class StreamManager: stream.content_fee = await self.wallet.send_amount_to_address( lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1') ) - log.info("paid fee of %s for %s", fee_amount, uri) self.streams[stream.sd_hash] = stream From bef244ba772fffc6ec70c159c5fd38a52b7bfcd7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 14:28:12 -0400 Subject: [PATCH 23/24] dont make duplicate files --- lbrynet/stream/stream_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 242ff99ed..aa2f93610 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -342,7 +342,7 @@ class StreamManager: if save_file and updated_stream.output_file_exists: save_file = False await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) - if save_file or file_name or download_directory: + if not updated_stream.output_file_exists and (save_file or file_name or download_directory): await updated_stream.save_file( file_name=file_name, download_directory=download_directory, node=self.node ) From 32dc1297ecb1e844652641ae7f89c24591d00497 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 6 May 2019 15:04:38 -0400 Subject: [PATCH 24/24] fix infinite loop --- lbrynet/blob_exchange/downloader.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index fad75253a..1805966ec 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -27,6 +27,7 @@ class BlobDownloader: self.scores: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} + self.is_running = asyncio.Event(loop=self.loop) def should_race_continue(self, blob: 'AbstractBlob'): if len(self.active_connections) >= self.config.max_connections_per_download: @@ -79,8 +80,9 @@ class BlobDownloader: blob = self.blob_manager.get_blob(blob_hash, length) if blob.get_is_verified(): return blob + self.is_running.set() try: - while not blob.get_is_verified(): + while not blob.get_is_verified() and self.is_running.is_set(): batch: typing.Set['KademliaPeer'] = set() while not self.peer_queue.empty(): batch.update(self.peer_queue.get_nowait()) @@ -107,6 +109,7 @@ class BlobDownloader: def close(self): self.scores.clear() self.ignored.clear() + self.is_running.clear() for transport in self.connections.values(): transport.close()