add is_fully_reflected to file_list response

This commit is contained in:
Jack Robison 2020-02-07 10:34:47 -05:00
parent a70980c81a
commit 0973ac753f
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 20 additions and 9 deletions

View file

@ -294,7 +294,8 @@ class JSONResponseEncoder(JSONEncoder):
'added_on': managed_stream.added_on, 'added_on': managed_stream.added_on,
'height': tx_height, 'height': tx_height,
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None 'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None,
'is_fully_reflected': managed_stream.is_fully_reflected
} }
def encode_claim(self, claim): def encode_claim(self, claim):

View file

@ -112,7 +112,7 @@ def _batched_select(transaction, query, parameters, batch_size=900):
def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
raw_content_fee): raw_content_fee, fully_reflected):
return { return {
"rowid": rowid, "rowid": rowid,
"added_on": added_on, "added_on": added_on,
@ -129,7 +129,8 @@ def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download
"saved_file": bool(saved_file), "saved_file": bool(saved_file),
"content_fee": None if not raw_content_fee else Transaction( "content_fee": None if not raw_content_fee else Transaction(
binascii.unhexlify(raw_content_fee) binascii.unhexlify(raw_content_fee)
) ),
"fully_reflected": fully_reflected
} }
@ -138,11 +139,14 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
signed_claims = {} signed_claims = {}
for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee,
added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in transaction.execute( added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in transaction.execute(
"select file.rowid, file.*, stream.*, c.* " "select file.rowid, file.*, stream.*, c.*, "
" case when (SELECT 1 FROM reflected_stream r WHERE r.sd_hash=stream.sd_hash) "
" is null then 0 else 1 end as fully_reflected "
"from file inner join stream on file.stream_hash=stream.stream_hash " "from file inner join stream on file.stream_hash=stream.stream_hash "
"inner join content_claim cc on file.stream_hash=cc.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash "
"inner join claim c on cc.claim_outpoint=c.claim_outpoint " "inner join claim c on cc.claim_outpoint=c.claim_outpoint "
"order by c.rowid desc").fetchall(): "order by c.rowid desc").fetchall():
claim_args, fully_reflected = tuple(claim_args[:-1]), claim_args[-1]
claim = StoredContentClaim(*claim_args) claim = StoredContentClaim(*claim_args)
if claim.channel_claim_id: if claim.channel_claim_id:
if claim.channel_claim_id not in signed_claims: if claim.channel_claim_id not in signed_claims:
@ -152,7 +156,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
_get_lbry_file_stream_dict( _get_lbry_file_stream_dict(
rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
raw_content_fee raw_content_fee, fully_reflected
) )
) )
for claim_name, claim_id in _batched_select( for claim_name, claim_id in _batched_select(

View file

@ -111,6 +111,10 @@ class ManagedStream:
self.started_writing = asyncio.Event(loop=self.loop) self.started_writing = asyncio.Event(loop=self.loop)
self.finished_write_attempt = asyncio.Event(loop=self.loop) self.finished_write_attempt = asyncio.Event(loop=self.loop)
@property
def is_fully_reflected(self) -> bool:
return self.fully_reflected.is_set()
@property @property
def descriptor(self) -> StreamDescriptor: def descriptor(self) -> StreamDescriptor:
return self.downloader.descriptor return self.downloader.descriptor
@ -429,9 +433,9 @@ class ManagedStream:
await self.loop.create_connection(lambda: protocol, host, port) await self.loop.create_connection(lambda: protocol, host, port)
await protocol.send_handshake() await protocol.send_handshake()
sent_sd, needed = await protocol.send_descriptor() sent_sd, needed = await protocol.send_descriptor()
if sent_sd: if sent_sd: # reflector needed the sd blob
sent.append(self.sd_hash) sent.append(self.sd_hash)
if not sent_sd and not needed: if not sent_sd and not needed: # reflector already has the stream
if not self.fully_reflected.is_set(): if not self.fully_reflected.is_set():
self.fully_reflected.set() self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")

View file

@ -116,7 +116,7 @@ class StreamManager:
async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str],
download_directory: Optional[str], status: str, download_directory: Optional[str], status: str,
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int]): added_on: Optional[int], fully_reflected: bool):
try: try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err: except InvalidStreamDescriptorError as err:
@ -127,6 +127,8 @@ class StreamManager:
claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor, claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor,
analytics_manager=self.analytics_manager, added_on=added_on analytics_manager=self.analytics_manager, added_on=added_on
) )
if fully_reflected:
stream.fully_reflected.set()
self.streams[sd_hash] = stream self.streams[sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
@ -159,7 +161,7 @@ class StreamManager:
file_info['rowid'], file_info['sd_hash'], file_name, file_info['rowid'], file_info['sd_hash'], file_name,
download_directory, file_info['status'], download_directory, file_info['status'],
file_info['claim'], file_info['content_fee'], file_info['claim'], file_info['content_fee'],
file_info['added_on'] file_info['added_on'], file_info['fully_reflected']
))) )))
if add_stream_tasks: if add_stream_tasks:
await asyncio.gather(*add_stream_tasks, loop=self.loop) await asyncio.gather(*add_stream_tasks, loop=self.loop)