Merge pull request #2954 from lbryio/file-list-reflector-args

add `uploading_to_reflector`,  `is_fully_reflected`, `completed`, and `status` filter arguments to `file_list`, update to match all filters rather than any filters
This commit is contained in:
Jack Robison 2020-05-11 19:45:51 -04:00 committed by GitHub
commit af66b31a44
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 22 deletions

View file

@ -1965,8 +1965,9 @@ class Daemon(metaclass=JSONRPCServerType):
[--outpoint=<outpoint>] [--txid=<txid>] [--nout=<nout>]
[--channel_claim_id=<channel_claim_id>] [--channel_name=<channel_name>]
[--claim_name=<claim_name>] [--blobs_in_stream=<blobs_in_stream>]
[--blobs_remaining=<blobs_remaining>] [--sort=<sort_by>]
[--comparison=<comparison>] [--full_status=<full_status>] [--reverse]
[--blobs_remaining=<blobs_remaining>] [--uploading_to_reflector=<uploading_to_reflector>]
[--is_fully_reflected=<is_fully_reflected>] [--status=<status>] [--completed=<completed>]
[--sort=<sort_by>] [--comparison=<comparison>] [--full_status=<full_status>] [--reverse]
[--page=<page>] [--page_size=<page_size>] [--wallet_id=<wallet_id>]
Options:
@ -1984,6 +1985,10 @@ class Daemon(metaclass=JSONRPCServerType):
--channel_name=<channel_name> : (str) get file with matching channel name
--claim_name=<claim_name> : (str) get file with matching claim name
--blobs_in_stream<blobs_in_stream> : (int) get file with matching blobs in stream
--uploading_to_reflector=<uploading_to_reflector> : (bool) get files currently uploading to reflector
--is_fully_reflected=<is_fully_reflected> : (bool) get files that have been uploaded to reflector
--status=<status> : (str) match by status, ( running | finished | stopped )
--completed=<completed> : (bool) match only completed
--blobs_remaining=<blobs_remaining> : (int) amount of remaining blobs to download
--sort=<sort_by> : (str) field to sort by (one of the above filter fields)
--comparison=<comparison> : (str) logical comparison, (eq | ne | g | ge | l | le | in)

View file

@ -34,7 +34,14 @@ class SourceManager:
'txid',
'nout',
'channel_claim_id',
'channel_name'
'channel_name',
'completed'
}
set_filter_fields = {
"claim_ids": "claim_id",
"channel_claim_ids": "channel_claim_id",
"outpoints": "outpoint"
}
source_class = ManagedDownloadSource
@ -108,23 +115,19 @@ class SourceManager:
if isinstance(search_by.get('channel_claim_id'), list):
compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id')
if search_by:
if search_by or compare_sets:
comparison = comparison or 'eq'
streams = []
for stream in self._sources.values():
matched = False
for set_search, val in compare_sets.items():
if COMPARISON_OPERATORS[comparison](getattr(stream, self.filter_fields[set_search]), val):
streams.append(stream)
matched = True
break
if matched:
if compare_sets and not all(
getattr(stream, self.set_filter_fields[set_search]) in val
for set_search, val in compare_sets.items()):
continue
for search, val in search_by.items():
this_stream = getattr(stream, search)
if COMPARISON_OPERATORS[comparison](this_stream, val):
streams.append(stream)
break
if search_by and not all(
COMPARISON_OPERATORS[comparison](getattr(stream, search), val)
for search, val in search_by.items()):
continue
streams.append(stream)
else:
streams = list(self._sources.values())
if sort_by:

View file

@ -57,7 +57,6 @@ class ManagedStream(ManagedDownloadSource):
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.analytics_manager = analytics_manager
self.fully_reflected = asyncio.Event(loop=self.loop)
self.reflector_progress = 0
self.uploading_to_reflector = False
self.file_output_task: typing.Optional[asyncio.Task] = None

View file

@ -35,6 +35,8 @@ class StreamReflectorClient(asyncio.Protocol):
def connection_lost(self, exc: typing.Optional[Exception]):
self.transport = None
self.connected.clear()
if self.pending_request:
self.pending_request.cancel()
if self.reflected_blobs:
log.info("Finished sending reflector %i blobs", len(self.reflected_blobs))
@ -56,11 +58,11 @@ class StreamReflectorClient(asyncio.Protocol):
self.response_buff = b''
return
async def send_request(self, request_dict: typing.Dict):
async def send_request(self, request_dict: typing.Dict, timeout: int = 180):
msg = json.dumps(request_dict)
self.transport.write(msg.encode())
try:
self.pending_request = self.loop.create_task(self.response_queue.get())
self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
return await self.pending_request
finally:
self.pending_request = None
@ -87,7 +89,7 @@ class StreamReflectorClient(asyncio.Protocol):
sent_sd = False
if response['send_sd_blob']:
await sd_blob.sendfile(self)
received = await self.response_queue.get()
received = await asyncio.wait_for(self.response_queue.get(), 30)
if received.get('received_sd_blob'):
sent_sd = True
if not needed:
@ -111,7 +113,7 @@ class StreamReflectorClient(asyncio.Protocol):
raise ValueError("I don't know whether to send the blob or not!")
if response['send_blob']:
await blob.sendfile(self)
received = await self.response_queue.get()
received = await asyncio.wait_for(self.response_queue.get(), 30)
if received.get('received_blob'):
self.reflected_blobs.append(blob.blob_hash)
log.info("Sent reflector blob %s", blob.blob_hash[:8])

View file

@ -38,7 +38,9 @@ class StreamManager(SourceManager):
'stream_hash',
'full_status', # TODO: remove
'blobs_remaining',
'blobs_in_stream'
'blobs_in_stream',
'uploading_to_reflector',
'is_fully_reflected'
})
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',