diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 67df0ae8d..fd4627987 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1965,8 +1965,9 @@ class Daemon(metaclass=JSONRPCServerType): [--outpoint=] [--txid=] [--nout=] [--channel_claim_id=] [--channel_name=] [--claim_name=] [--blobs_in_stream=] - [--blobs_remaining=] [--sort=] - [--comparison=] [--full_status=] [--reverse] + [--blobs_remaining=] [--uploading_to_reflector=] + [--is_fully_reflected=] [--status=] [--completed=] + [--sort=] [--comparison=] [--full_status=] [--reverse] [--page=] [--page_size=] [--wallet_id=] Options: @@ -1984,6 +1985,10 @@ class Daemon(metaclass=JSONRPCServerType): --channel_name= : (str) get file with matching channel name --claim_name= : (str) get file with matching claim name --blobs_in_stream : (int) get file with matching blobs in stream + --uploading_to_reflector= : (bool) get files currently uploading to reflector + --is_fully_reflected= : (bool) get files that have been uploaded to reflector + --status= : (str) match by status, ( running | finished | stopped ) + --completed= : (bool) match only completed --blobs_remaining= : (int) amount of remaining blobs to download --sort= : (str) field to sort by (one of the above filter fields) --comparison= : (str) logical comparison, (eq | ne | g | ge | l | le | in) diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 87f0a17f1..bf2846a00 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -34,7 +34,14 @@ class SourceManager: 'txid', 'nout', 'channel_claim_id', - 'channel_name' + 'channel_name', + 'completed' + } + + set_filter_fields = { + "claim_ids": "claim_id", + "channel_claim_ids": "channel_claim_id", + "outpoints": "outpoint" } source_class = ManagedDownloadSource @@ -108,23 +115,19 @@ class SourceManager: if isinstance(search_by.get('channel_claim_id'), list): compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id') - if search_by: + if search_by or compare_sets: comparison = comparison or 'eq' streams = [] for stream in self._sources.values(): - matched = False - for set_search, val in compare_sets.items(): - if COMPARISON_OPERATORS[comparison](getattr(stream, self.filter_fields[set_search]), val): - streams.append(stream) - matched = True - break - if matched: + if compare_sets and not all( + getattr(stream, self.set_filter_fields[set_search]) in val + for set_search, val in compare_sets.items()): continue - for search, val in search_by.items(): - this_stream = getattr(stream, search) - if COMPARISON_OPERATORS[comparison](this_stream, val): - streams.append(stream) - break + if search_by and not all( + COMPARISON_OPERATORS[comparison](getattr(stream, search), val) + for search, val in search_by.items()): + continue + streams.append(stream) else: streams = list(self._sources.values()) if sort_by: diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 7d87577a8..debd987a4 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -57,7 +57,6 @@ class ManagedStream(ManagedDownloadSource): self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager - self.fully_reflected = asyncio.Event(loop=self.loop) self.reflector_progress = 0 self.uploading_to_reflector = False self.file_output_task: typing.Optional[asyncio.Task] = None diff --git a/lbry/stream/reflector/client.py b/lbry/stream/reflector/client.py index 7a8032c99..fbc084b9b 100644 --- a/lbry/stream/reflector/client.py +++ b/lbry/stream/reflector/client.py @@ -35,6 +35,8 @@ class StreamReflectorClient(asyncio.Protocol): def connection_lost(self, exc: typing.Optional[Exception]): self.transport = None self.connected.clear() + if self.pending_request: + self.pending_request.cancel() if self.reflected_blobs: log.info("Finished sending reflector %i blobs", len(self.reflected_blobs)) @@ -56,11 +58,11 @@ class StreamReflectorClient(asyncio.Protocol): self.response_buff = b'' return - async def send_request(self, request_dict: typing.Dict): + async def send_request(self, request_dict: typing.Dict, timeout: int = 180): msg = json.dumps(request_dict) self.transport.write(msg.encode()) try: - self.pending_request = self.loop.create_task(self.response_queue.get()) + self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout)) return await self.pending_request finally: self.pending_request = None @@ -87,7 +89,7 @@ class StreamReflectorClient(asyncio.Protocol): sent_sd = False if response['send_sd_blob']: await sd_blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_sd_blob'): sent_sd = True if not needed: @@ -111,7 +113,7 @@ class StreamReflectorClient(asyncio.Protocol): raise ValueError("I don't know whether to send the blob or not!") if response['send_blob']: await blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_blob'): self.reflected_blobs.append(blob.blob_hash) log.info("Sent reflector blob %s", blob.blob_hash[:8]) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 8df388452..4d0d1093b 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -38,7 +38,9 @@ class StreamManager(SourceManager): 'stream_hash', 'full_status', # TODO: remove 'blobs_remaining', - 'blobs_in_stream' + 'blobs_in_stream', + 'uploading_to_reflector', + 'is_fully_reflected' }) def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',