diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 98698d276..861ec6067 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -136,18 +136,13 @@ def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: files = [] signed_claims = {} - stream_hashes_and_bt_infohashes = transaction.execute("select stream_hash, bt_infohash from file").fetchall() - stream_hashes = tuple( - stream_hash for stream_hash, _ in stream_hashes_and_bt_infohashes if stream_hash is not None - ) for (rowid, stream_hash, _, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, - added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( - transaction, "select file.rowid, file.*, stream.*, c.* " + added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in transaction.execute( + "select file.rowid, file.*, stream.*, c.* " "from file inner join stream on file.stream_hash=stream.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash " "inner join claim c on cc.claim_outpoint=c.claim_outpoint " - "where file.stream_hash in {} " - "order by c.rowid desc", stream_hashes): + "order by c.rowid desc").fetchall(): claim = StoredContentClaim(*claim_args) if claim.channel_claim_id: if claim.channel_claim_id not in signed_claims: @@ -539,22 +534,30 @@ class SQLiteStorage(SQLiteMixin): Update files that have been removed from the downloads directory since the last run """ def update_manually_removed_files(transaction: sqlite3.Connection): - removed = [] - for (stream_hash, download_directory, file_name) in transaction.execute( - "select stream_hash, download_directory, file_name from file where saved_file=1 " + files = {} + query = "select stream_hash, download_directory, file_name from file where saved_file=1 " \ "and stream_hash is not null" - ).fetchall(): - if download_directory and file_name and os.path.isfile( - os.path.join(binascii.unhexlify(download_directory).decode(), - binascii.unhexlify(file_name).decode())): - continue - removed.append((stream_hash,)) - if removed: - transaction.executemany( - "update file set file_name=null, download_directory=null, saved_file=0 where stream_hash=?", - removed - ).fetchall() - return await self.db.run(update_manually_removed_files) + for (stream_hash, download_directory, file_name) in transaction.execute(query).fetchall(): + if download_directory and file_name: + files[stream_hash] = download_directory, file_name + return files + + def detect_removed(files): + return [ + stream_hash for stream_hash, (download_directory, file_name) in files.items() + if not os.path.isfile(os.path.join(binascii.unhexlify(download_directory).decode(), + binascii.unhexlify(file_name).decode())) + ] + + def update_db_removed(transaction: sqlite3.Connection, removed): + query = "update file set file_name=null, download_directory=null, saved_file=0 where stream_hash in {}" + for cur in _batched_select(transaction, query, removed): + cur.fetchall() + + stream_and_file = await self.db.run(update_manually_removed_files) + removed = await self.loop.run_in_executor(None, detect_removed, stream_and_file) + if removed: + await self.db.run(update_db_removed, removed) def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: return self.db.run(get_all_lbry_files) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index a3e422817..40c42b0d9 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -144,6 +144,7 @@ class StreamManager: to_recover.append(file_info) to_start.append(file_info) if to_recover: + log.info("Recover %i files", len(to_recover)) await self.recover_streams(to_recover) log.info("Initializing %i files", len(to_start))