From f20ca70c013534266c9cbf6bcae4ce4d7b625fa0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 May 2020 15:48:34 -0400 Subject: [PATCH 1/5] add `uploading_to_reflector` and `is_fully_reflected` filter arguments to `file_list` --- lbry/extras/daemon/daemon.py | 5 ++++- lbry/stream/managed_stream.py | 1 - lbry/stream/stream_manager.py | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 67df0ae8d..c0b333fcd 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1965,7 +1965,8 @@ class Daemon(metaclass=JSONRPCServerType): [--outpoint=] [--txid=] [--nout=] [--channel_claim_id=] [--channel_name=] [--claim_name=] [--blobs_in_stream=] - [--blobs_remaining=] [--sort=] + [--blobs_remaining=] [--uploading_to_reflector=] + [--is_fully_reflected=] [--sort=] [--comparison=] [--full_status=] [--reverse] [--page=] [--page_size=] [--wallet_id=] @@ -1984,6 +1985,8 @@ class Daemon(metaclass=JSONRPCServerType): --channel_name= : (str) get file with matching channel name --claim_name= : (str) get file with matching claim name --blobs_in_stream : (int) get file with matching blobs in stream + --uploading_to_reflector= : (bool) get files currently uploading to reflector + --is_fully_reflected= : (bool) get files that have been uploaded to reflector --blobs_remaining= : (int) amount of remaining blobs to download --sort= : (str) field to sort by (one of the above filter fields) --comparison= : (str) logical comparison, (eq | ne | g | ge | l | le | in) diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 7d87577a8..debd987a4 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -57,7 +57,6 @@ class ManagedStream(ManagedDownloadSource): self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager - self.fully_reflected = asyncio.Event(loop=self.loop) self.reflector_progress = 0 self.uploading_to_reflector = False self.file_output_task: typing.Optional[asyncio.Task] = None diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 8df388452..4d0d1093b 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -38,7 +38,9 @@ class StreamManager(SourceManager): 'stream_hash', 'full_status', # TODO: remove 'blobs_remaining', - 'blobs_in_stream' + 'blobs_in_stream', + 'uploading_to_reflector', + 'is_fully_reflected' }) def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', From 78b8261a3a1216cccf775950cf92a354acd7fb71 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 May 2020 16:05:13 -0400 Subject: [PATCH 2/5] cancel pending reflector request when connection is lost -add 180s timeout --- lbry/stream/reflector/client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lbry/stream/reflector/client.py b/lbry/stream/reflector/client.py index 7a8032c99..fbc084b9b 100644 --- a/lbry/stream/reflector/client.py +++ b/lbry/stream/reflector/client.py @@ -35,6 +35,8 @@ class StreamReflectorClient(asyncio.Protocol): def connection_lost(self, exc: typing.Optional[Exception]): self.transport = None self.connected.clear() + if self.pending_request: + self.pending_request.cancel() if self.reflected_blobs: log.info("Finished sending reflector %i blobs", len(self.reflected_blobs)) @@ -56,11 +58,11 @@ class StreamReflectorClient(asyncio.Protocol): self.response_buff = b'' return - async def send_request(self, request_dict: typing.Dict): + async def send_request(self, request_dict: typing.Dict, timeout: int = 180): msg = json.dumps(request_dict) self.transport.write(msg.encode()) try: - self.pending_request = self.loop.create_task(self.response_queue.get()) + self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout)) return await self.pending_request finally: self.pending_request = None @@ -87,7 +89,7 @@ class StreamReflectorClient(asyncio.Protocol): sent_sd = False if response['send_sd_blob']: await sd_blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_sd_blob'): sent_sd = True if not needed: @@ -111,7 +113,7 @@ class StreamReflectorClient(asyncio.Protocol): raise ValueError("I don't know whether to send the blob or not!") if response['send_blob']: await blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_blob'): self.reflected_blobs.append(blob.blob_hash) log.info("Sent reflector blob %s", blob.blob_hash[:8]) From a469b8bc042f7d01a8bd80050049d671acf9e470 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 May 2020 18:43:47 -0400 Subject: [PATCH 3/5] return streams matching all file_list filters rather than those matching any -fix filter fields when using sets --- lbry/file/source_manager.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 87f0a17f1..0fadeb346 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -37,6 +37,12 @@ class SourceManager: 'channel_name' } + set_filter_fields = { + "claim_ids": "claim_id", + "channel_claim_ids": "channel_claim_id", + "outpoints": "outpoint" + } + source_class = ManagedDownloadSource def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', @@ -108,23 +114,19 @@ class SourceManager: if isinstance(search_by.get('channel_claim_id'), list): compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id') - if search_by: + if search_by or compare_sets: comparison = comparison or 'eq' streams = [] for stream in self._sources.values(): - matched = False - for set_search, val in compare_sets.items(): - if COMPARISON_OPERATORS[comparison](getattr(stream, self.filter_fields[set_search]), val): - streams.append(stream) - matched = True - break - if matched: + if compare_sets and not all( + getattr(stream, self.set_filter_fields[set_search]) in val + for set_search, val in compare_sets.items()): continue - for search, val in search_by.items(): - this_stream = getattr(stream, search) - if COMPARISON_OPERATORS[comparison](this_stream, val): - streams.append(stream) - break + if search_by and not all( + COMPARISON_OPERATORS[comparison](getattr(stream, search), val) + for search, val in search_by.items()): + continue + streams.append(stream) else: streams = list(self._sources.values()) if sort_by: From 3c85322523a2e4665b37c026c6af5e0aadda6edc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 May 2020 19:16:08 -0400 Subject: [PATCH 4/5] add `status` arg to `file_list` cli --- lbry/extras/daemon/daemon.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index c0b333fcd..8352bf047 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1966,7 +1966,7 @@ class Daemon(metaclass=JSONRPCServerType): [--channel_claim_id=] [--channel_name=] [--claim_name=] [--blobs_in_stream=] [--blobs_remaining=] [--uploading_to_reflector=] - [--is_fully_reflected=] [--sort=] + [--is_fully_reflected=] [--status=] [--sort=] [--comparison=] [--full_status=] [--reverse] [--page=] [--page_size=] [--wallet_id=] @@ -1987,6 +1987,7 @@ class Daemon(metaclass=JSONRPCServerType): --blobs_in_stream : (int) get file with matching blobs in stream --uploading_to_reflector= : (bool) get files currently uploading to reflector --is_fully_reflected= : (bool) get files that have been uploaded to reflector + --status= : (str) match by status, ( running | finished | stopped ) --blobs_remaining= : (int) amount of remaining blobs to download --sort= : (str) field to sort by (one of the above filter fields) --comparison= : (str) logical comparison, (eq | ne | g | ge | l | le | in) From b000a40f28e2101c16f1e3ff5ff745f669b005a5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 May 2020 19:22:53 -0400 Subject: [PATCH 5/5] add `completed` filter arg to `file_list` --- lbry/extras/daemon/daemon.py | 5 +++-- lbry/file/source_manager.py | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 8352bf047..fd4627987 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1966,8 +1966,8 @@ class Daemon(metaclass=JSONRPCServerType): [--channel_claim_id=] [--channel_name=] [--claim_name=] [--blobs_in_stream=] [--blobs_remaining=] [--uploading_to_reflector=] - [--is_fully_reflected=] [--status=] [--sort=] - [--comparison=] [--full_status=] [--reverse] + [--is_fully_reflected=] [--status=] [--completed=] + [--sort=] [--comparison=] [--full_status=] [--reverse] [--page=] [--page_size=] [--wallet_id=] Options: @@ -1988,6 +1988,7 @@ class Daemon(metaclass=JSONRPCServerType): --uploading_to_reflector= : (bool) get files currently uploading to reflector --is_fully_reflected= : (bool) get files that have been uploaded to reflector --status= : (str) match by status, ( running | finished | stopped ) + --completed= : (bool) match only completed --blobs_remaining= : (int) amount of remaining blobs to download --sort= : (str) field to sort by (one of the above filter fields) --comparison= : (str) logical comparison, (eq | ne | g | ge | l | le | in) diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 0fadeb346..bf2846a00 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -34,7 +34,8 @@ class SourceManager: 'txid', 'nout', 'channel_claim_id', - 'channel_name' + 'channel_name', + 'completed' } set_filter_fields = {