populate stream manager with a single batch query
This commit is contained in:
parent
869a8b712b
commit
868110a6f2
2 changed files with 39 additions and 31 deletions
|
@ -113,6 +113,41 @@ def _batched_select(transaction, query, parameters):
|
||||||
yield result
|
yield result
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"row_id": rowid,
|
||||||
|
"stream_hash": stream_hash,
|
||||||
|
"file_name": file_name, # hex
|
||||||
|
"download_directory": download_dir, # hex
|
||||||
|
"blob_data_rate": data_rate,
|
||||||
|
"status": status,
|
||||||
|
"sd_hash": sd_hash,
|
||||||
|
"key": stream_key,
|
||||||
|
"stream_name": stream_name, # hex
|
||||||
|
"suggested_file_name": suggested_file_name, # hex
|
||||||
|
"claim": StoredStreamClaim(stream_hash, *claim_args)
|
||||||
|
} for (rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key,
|
||||||
|
stream_name, suggested_file_name, *claim_args) in _batched_select(
|
||||||
|
transaction, "select file.rowid, file.*, stream.*, c.*, case when c.channel_claim_id is not null "
|
||||||
|
" then (select claim_name from claim where claim_id==c.channel_claim_id) "
|
||||||
|
" else null end as channel_name "
|
||||||
|
"from file inner join stream on file.stream_hash=stream.stream_hash "
|
||||||
|
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
|
||||||
|
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
|
||||||
|
"where file.stream_hash in {} "
|
||||||
|
"order by c.rowid desc",
|
||||||
|
[
|
||||||
|
stream_hash
|
||||||
|
for (stream_hash,) in transaction.execute("select stream_hash from file")
|
||||||
|
]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class SQLiteStorage(SQLiteMixin):
|
class SQLiteStorage(SQLiteMixin):
|
||||||
CREATE_TABLES_QUERY = """
|
CREATE_TABLES_QUERY = """
|
||||||
pragma foreign_keys=on;
|
pragma foreign_keys=on;
|
||||||
|
@ -407,37 +442,8 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
|
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_all_lbry_files(self) -> typing.List[typing.Dict]:
|
def get_all_lbry_files(self) -> typing.List[typing.Dict]:
|
||||||
def _lbry_file_dict(rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key,
|
return self.db.run(get_all_lbry_files)
|
||||||
stream_name, suggested_file_name) -> typing.Dict:
|
|
||||||
return {
|
|
||||||
"row_id": rowid,
|
|
||||||
"stream_hash": stream_hash,
|
|
||||||
"file_name": file_name,
|
|
||||||
"download_directory": download_dir,
|
|
||||||
"blob_data_rate": data_rate,
|
|
||||||
"status": status,
|
|
||||||
"sd_hash": sd_hash,
|
|
||||||
"key": stream_key,
|
|
||||||
"stream_name": stream_name,
|
|
||||||
"suggested_file_name": suggested_file_name
|
|
||||||
}
|
|
||||||
|
|
||||||
def _get_all_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
|
|
||||||
file_infos = list(map(lambda a: _lbry_file_dict(*a), transaction.execute(
|
|
||||||
"select file.rowid, file.*, stream.* "
|
|
||||||
"from file inner join stream on file.stream_hash=stream.stream_hash"
|
|
||||||
).fetchall()))
|
|
||||||
stream_hashes = [file_info['stream_hash'] for file_info in file_infos]
|
|
||||||
claim_infos = get_claims_from_stream_hashes(transaction, stream_hashes)
|
|
||||||
for index in range(len(file_infos)): # pylint: disable=consider-using-enumerate
|
|
||||||
file_infos[index]['claim'] = claim_infos.get(file_infos[index]['stream_hash'])
|
|
||||||
return file_infos
|
|
||||||
|
|
||||||
results = await self.db.run(_get_all_files)
|
|
||||||
if results:
|
|
||||||
return results
|
|
||||||
return []
|
|
||||||
|
|
||||||
def change_file_status(self, stream_hash: str, new_status: str):
|
def change_file_status(self, stream_hash: str, new_status: str):
|
||||||
log.info("update file status %s -> %s", stream_hash, new_status)
|
log.info("update file status %s -> %s", stream_hash, new_status)
|
||||||
|
|
|
@ -143,7 +143,9 @@ class StreamManager:
|
||||||
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||||
|
|
||||||
async def load_streams_from_database(self):
|
async def load_streams_from_database(self):
|
||||||
|
log.info("Initializing stream manager from %s", self.storage._db_path)
|
||||||
file_infos = await self.storage.get_all_lbry_files()
|
file_infos = await self.storage.get_all_lbry_files()
|
||||||
|
log.info("Initializing %i files", len(file_infos))
|
||||||
await asyncio.gather(*[
|
await asyncio.gather(*[
|
||||||
self.add_stream(
|
self.add_stream(
|
||||||
file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(),
|
file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(),
|
||||||
|
|
Loading…
Add table
Reference in a new issue