diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 4ba896783..b37909240 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -105,44 +105,53 @@ def get_content_claim_from_outpoint(transaction: sqlite3.Connection, return StoredStreamClaim(*claim_fields) -def _batched_select(transaction, query, parameters): - for start_index in range(0, len(parameters), 900): - current_batch = parameters[start_index:start_index+900] +def _batched_select(transaction, query, parameters, batch_size=900): + for start_index in range(0, len(parameters), batch_size): + current_batch = parameters[start_index:start_index+batch_size] bind = "({})".format(','.join(['?'] * len(current_batch))) for result in transaction.execute(query.format(bind), current_batch): yield result def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: - return [ - { - "row_id": rowid, - "stream_hash": stream_hash, - "file_name": file_name, # hex - "download_directory": download_dir, # hex - "blob_data_rate": data_rate, - "status": status, - "sd_hash": sd_hash, - "key": stream_key, - "stream_name": stream_name, # hex - "suggested_file_name": suggested_file_name, # hex - "claim": StoredStreamClaim(stream_hash, *claim_args) - } for (rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, - stream_name, suggested_file_name, *claim_args) in _batched_select( - transaction, "select file.rowid, file.*, stream.*, c.*, case when c.channel_claim_id is not null " - " then (select claim_name from claim where claim_id==c.channel_claim_id) " - " else null end as channel_name " + files = [] + signed_claims = {} + for (rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, + stream_name, suggested_file_name, *claim_args) in _batched_select( + transaction, "select file.rowid, file.*, stream.*, c.* " "from file inner join stream on file.stream_hash=stream.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash " "inner join claim c on cc.claim_outpoint=c.claim_outpoint " "where file.stream_hash in {} " - "order by c.rowid desc", - [ - stream_hash - for (stream_hash,) in transaction.execute("select stream_hash from file") - ] + "order by c.rowid desc", [ + stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file")]): + + claim = StoredStreamClaim(stream_hash, *claim_args) + if claim.channel_claim_id: + if claim.channel_claim_id not in signed_claims: + signed_claims[claim.channel_claim_id] = [] + signed_claims[claim.channel_claim_id].append(claim) + files.append( + { + "row_id": rowid, + "stream_hash": stream_hash, + "file_name": file_name, # hex + "download_directory": download_dir, # hex + "blob_data_rate": data_rate, + "status": status, + "sd_hash": sd_hash, + "key": stream_key, + "stream_name": stream_name, # hex + "suggested_file_name": suggested_file_name, # hex + "claim": claim + } ) - ] + for claim_id in signed_claims.keys(): + channel_name = transaction.execute("select claim_name from claim where claim_id=?", (claim_id, )).fetchone() + if channel_name: + for claim in signed_claims[claim_id]: + claim.channel_name = channel_name[0] + return files class SQLiteStorage(SQLiteMixin): @@ -441,7 +450,7 @@ class SQLiteStorage(SQLiteMixin): binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status) ) - def get_all_lbry_files(self) -> typing.List[typing.Dict]: + def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: return self.db.run(get_all_lbry_files) def change_file_status(self, stream_hash: str, new_status: str):