Merge pull request #2725 from lbryio/stream-manager-startup
faster stream manager startup
This commit is contained in:
commit
9371122bed
2 changed files with 27 additions and 23 deletions
|
@ -136,18 +136,13 @@ def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download
|
||||||
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
|
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
|
||||||
files = []
|
files = []
|
||||||
signed_claims = {}
|
signed_claims = {}
|
||||||
stream_hashes_and_bt_infohashes = transaction.execute("select stream_hash, bt_infohash from file").fetchall()
|
|
||||||
stream_hashes = tuple(
|
|
||||||
stream_hash for stream_hash, _ in stream_hashes_and_bt_infohashes if stream_hash is not None
|
|
||||||
)
|
|
||||||
for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee,
|
for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee,
|
||||||
added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
|
added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in transaction.execute(
|
||||||
transaction, "select file.rowid, file.*, stream.*, c.* "
|
"select file.rowid, file.*, stream.*, c.* "
|
||||||
"from file inner join stream on file.stream_hash=stream.stream_hash "
|
"from file inner join stream on file.stream_hash=stream.stream_hash "
|
||||||
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
|
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
|
||||||
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
|
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
|
||||||
"where file.stream_hash in {} "
|
"order by c.rowid desc").fetchall():
|
||||||
"order by c.rowid desc", stream_hashes):
|
|
||||||
claim = StoredContentClaim(*claim_args)
|
claim = StoredContentClaim(*claim_args)
|
||||||
if claim.channel_claim_id:
|
if claim.channel_claim_id:
|
||||||
if claim.channel_claim_id not in signed_claims:
|
if claim.channel_claim_id not in signed_claims:
|
||||||
|
@ -539,22 +534,30 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
Update files that have been removed from the downloads directory since the last run
|
Update files that have been removed from the downloads directory since the last run
|
||||||
"""
|
"""
|
||||||
def update_manually_removed_files(transaction: sqlite3.Connection):
|
def update_manually_removed_files(transaction: sqlite3.Connection):
|
||||||
removed = []
|
files = {}
|
||||||
for (stream_hash, download_directory, file_name) in transaction.execute(
|
query = "select stream_hash, download_directory, file_name from file where saved_file=1 " \
|
||||||
"select stream_hash, download_directory, file_name from file where saved_file=1 "
|
|
||||||
"and stream_hash is not null"
|
"and stream_hash is not null"
|
||||||
).fetchall():
|
for (stream_hash, download_directory, file_name) in transaction.execute(query).fetchall():
|
||||||
if download_directory and file_name and os.path.isfile(
|
if download_directory and file_name:
|
||||||
os.path.join(binascii.unhexlify(download_directory).decode(),
|
files[stream_hash] = download_directory, file_name
|
||||||
binascii.unhexlify(file_name).decode())):
|
return files
|
||||||
continue
|
|
||||||
removed.append((stream_hash,))
|
def detect_removed(files):
|
||||||
if removed:
|
return [
|
||||||
transaction.executemany(
|
stream_hash for stream_hash, (download_directory, file_name) in files.items()
|
||||||
"update file set file_name=null, download_directory=null, saved_file=0 where stream_hash=?",
|
if not os.path.isfile(os.path.join(binascii.unhexlify(download_directory).decode(),
|
||||||
removed
|
binascii.unhexlify(file_name).decode()))
|
||||||
).fetchall()
|
]
|
||||||
return await self.db.run(update_manually_removed_files)
|
|
||||||
|
def update_db_removed(transaction: sqlite3.Connection, removed):
|
||||||
|
query = "update file set file_name=null, download_directory=null, saved_file=0 where stream_hash in {}"
|
||||||
|
for cur in _batched_select(transaction, query, removed):
|
||||||
|
cur.fetchall()
|
||||||
|
|
||||||
|
stream_and_file = await self.db.run(update_manually_removed_files)
|
||||||
|
removed = await self.loop.run_in_executor(None, detect_removed, stream_and_file)
|
||||||
|
if removed:
|
||||||
|
await self.db.run(update_db_removed, removed)
|
||||||
|
|
||||||
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
|
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
|
||||||
return self.db.run(get_all_lbry_files)
|
return self.db.run(get_all_lbry_files)
|
||||||
|
|
|
@ -144,6 +144,7 @@ class StreamManager:
|
||||||
to_recover.append(file_info)
|
to_recover.append(file_info)
|
||||||
to_start.append(file_info)
|
to_start.append(file_info)
|
||||||
if to_recover:
|
if to_recover:
|
||||||
|
log.info("Recover %i files", len(to_recover))
|
||||||
await self.recover_streams(to_recover)
|
await self.recover_streams(to_recover)
|
||||||
|
|
||||||
log.info("Initializing %i files", len(to_start))
|
log.info("Initializing %i files", len(to_start))
|
||||||
|
|
Loading…
Reference in a new issue