From 0973ac753ffa4277584b2f978d740ebfb5d1e024 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 7 Feb 2020 10:34:47 -0500 Subject: [PATCH] add is_fully_reflected to file_list response --- lbry/extras/daemon/json_response_encoder.py | 3 ++- lbry/extras/daemon/storage.py | 12 ++++++++---- lbry/stream/managed_stream.py | 8 ++++++-- lbry/stream/stream_manager.py | 6 ++++-- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index 6799ebd57..248474417 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -294,7 +294,8 @@ class JSONResponseEncoder(JSONEncoder): 'added_on': managed_stream.added_on, 'height': tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, - 'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None + 'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None, + 'is_fully_reflected': managed_stream.is_fully_reflected } def encode_claim(self, claim): diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 861ec6067..11a61e45e 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -112,7 +112,7 @@ def _batched_select(transaction, query, parameters, batch_size=900): def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, - raw_content_fee): + raw_content_fee, fully_reflected): return { "rowid": rowid, "added_on": added_on, @@ -129,7 +129,8 @@ def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download "saved_file": bool(saved_file), "content_fee": None if not raw_content_fee else Transaction( binascii.unhexlify(raw_content_fee) - ) + ), + "fully_reflected": fully_reflected } @@ -138,11 +139,14 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di signed_claims = {} for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in transaction.execute( - "select file.rowid, file.*, stream.*, c.* " + "select file.rowid, file.*, stream.*, c.*, " + " case when (SELECT 1 FROM reflected_stream r WHERE r.sd_hash=stream.sd_hash) " + " is null then 0 else 1 end as fully_reflected " "from file inner join stream on file.stream_hash=stream.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash " "inner join claim c on cc.claim_outpoint=c.claim_outpoint " "order by c.rowid desc").fetchall(): + claim_args, fully_reflected = tuple(claim_args[:-1]), claim_args[-1] claim = StoredContentClaim(*claim_args) if claim.channel_claim_id: if claim.channel_claim_id not in signed_claims: @@ -152,7 +156,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di _get_lbry_file_stream_dict( rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, - raw_content_fee + raw_content_fee, fully_reflected ) ) for claim_name, claim_id in _batched_select( diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 7c5f4e5d9..bf6955a9b 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -111,6 +111,10 @@ class ManagedStream: self.started_writing = asyncio.Event(loop=self.loop) self.finished_write_attempt = asyncio.Event(loop=self.loop) + @property + def is_fully_reflected(self) -> bool: + return self.fully_reflected.is_set() + @property def descriptor(self) -> StreamDescriptor: return self.downloader.descriptor @@ -429,9 +433,9 @@ class ManagedStream: await self.loop.create_connection(lambda: protocol, host, port) await protocol.send_handshake() sent_sd, needed = await protocol.send_descriptor() - if sent_sd: + if sent_sd: # reflector needed the sd blob sent.append(self.sd_hash) - if not sent_sd and not needed: + if not sent_sd and not needed: # reflector already has the stream if not self.fully_reflected.is_set(): self.fully_reflected.set() await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 119f7f01e..18e8b5ddc 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -116,7 +116,7 @@ class StreamManager: async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], download_directory: Optional[str], status: str, claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], - added_on: Optional[int]): + added_on: Optional[int], fully_reflected: bool): try: descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) except InvalidStreamDescriptorError as err: @@ -127,6 +127,8 @@ class StreamManager: claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor, analytics_manager=self.analytics_manager, added_on=added_on ) + if fully_reflected: + stream.fully_reflected.set() self.streams[sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) @@ -159,7 +161,7 @@ class StreamManager: file_info['rowid'], file_info['sd_hash'], file_name, download_directory, file_info['status'], file_info['claim'], file_info['content_fee'], - file_info['added_on'] + file_info['added_on'], file_info['fully_reflected'] ))) if add_stream_tasks: await asyncio.gather(*add_stream_tasks, loop=self.loop)