Merge pull request #2772 from lbryio/reflector-status

Add `is_fully_reflected` to `file_list` response
This commit is contained in:
Jack Robison 2020-02-21 13:21:36 -05:00 committed by GitHub
commit 277b243f52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 9 deletions

View file

@ -294,7 +294,8 @@ class JSONResponseEncoder(JSONEncoder):
'added_on': managed_stream.added_on,
'height': tx_height,
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None,
'is_fully_reflected': managed_stream.is_fully_reflected
}
def encode_claim(self, claim):

View file

@ -112,7 +112,7 @@ def _batched_select(transaction, query, parameters, batch_size=900):
def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
raw_content_fee):
raw_content_fee, fully_reflected):
return {
"rowid": rowid,
"added_on": added_on,
@ -129,7 +129,8 @@ def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download
"saved_file": bool(saved_file),
"content_fee": None if not raw_content_fee else Transaction(
binascii.unhexlify(raw_content_fee)
)
),
"fully_reflected": fully_reflected
}
@ -138,11 +139,14 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
signed_claims = {}
for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee,
added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in transaction.execute(
"select file.rowid, file.*, stream.*, c.* "
"select file.rowid, file.*, stream.*, c.*, "
" case when (SELECT 1 FROM reflected_stream r WHERE r.sd_hash=stream.sd_hash) "
" is null then 0 else 1 end as fully_reflected "
"from file inner join stream on file.stream_hash=stream.stream_hash "
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
"order by c.rowid desc").fetchall():
claim_args, fully_reflected = tuple(claim_args[:-1]), claim_args[-1]
claim = StoredContentClaim(*claim_args)
if claim.channel_claim_id:
if claim.channel_claim_id not in signed_claims:
@ -152,7 +156,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
_get_lbry_file_stream_dict(
rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
raw_content_fee
raw_content_fee, fully_reflected
)
)
for claim_name, claim_id in _batched_select(

View file

@ -111,6 +111,10 @@ class ManagedStream:
self.started_writing = asyncio.Event(loop=self.loop)
self.finished_write_attempt = asyncio.Event(loop=self.loop)
@property
def is_fully_reflected(self) -> bool:
return self.fully_reflected.is_set()
@property
def descriptor(self) -> StreamDescriptor:
return self.downloader.descriptor
@ -429,9 +433,9 @@ class ManagedStream:
await self.loop.create_connection(lambda: protocol, host, port)
await protocol.send_handshake()
sent_sd, needed = await protocol.send_descriptor()
if sent_sd:
if sent_sd: # reflector needed the sd blob
sent.append(self.sd_hash)
if not sent_sd and not needed:
if not sent_sd and not needed: # reflector already has the stream
if not self.fully_reflected.is_set():
self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")

View file

@ -116,7 +116,7 @@ class StreamManager:
async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str],
download_directory: Optional[str], status: str,
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int]):
added_on: Optional[int], fully_reflected: bool):
try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err:
@ -127,6 +127,8 @@ class StreamManager:
claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor,
analytics_manager=self.analytics_manager, added_on=added_on
)
if fully_reflected:
stream.fully_reflected.set()
self.streams[sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
@ -159,7 +161,7 @@ class StreamManager:
file_info['rowid'], file_info['sd_hash'], file_name,
download_directory, file_info['status'],
file_info['claim'], file_info['content_fee'],
file_info['added_on']
file_info['added_on'], file_info['fully_reflected']
)))
if add_stream_tasks:
await asyncio.gather(*add_stream_tasks, loop=self.loop)