forked from LBRYCommunity/lbry-sdk
set deleted downloads as streaming mode on startup
This commit is contained in:
parent
3234d70270
commit
832537a5cf
2 changed files with 32 additions and 12 deletions
|
@ -425,7 +425,7 @@ class SQLiteStorage(SQLiteMixin):
|
|||
return self.db.run(_sync_blobs)
|
||||
|
||||
def sync_files_to_blobs(self):
|
||||
def _sync_blobs(transaction: sqlite3.Connection) -> typing.Set[str]:
|
||||
def _sync_blobs(transaction: sqlite3.Connection):
|
||||
transaction.executemany(
|
||||
"update file set status='stopped' where stream_hash=?",
|
||||
transaction.execute(
|
||||
|
@ -435,6 +435,15 @@ class SQLiteStorage(SQLiteMixin):
|
|||
)
|
||||
return self.db.run(_sync_blobs)
|
||||
|
||||
def set_files_as_streaming(self, stream_hashes: typing.List[str]):
|
||||
def _set_streaming(transaction: sqlite3.Connection):
|
||||
transaction.executemany(
|
||||
"update file set file_name='{stream}', download_directory='{stream}' where stream_hash=?",
|
||||
[(stream_hash, ) for stream_hash in stream_hashes]
|
||||
)
|
||||
|
||||
return self.db.run(_set_streaming)
|
||||
|
||||
# # # # # # # # # stream functions # # # # # # # # #
|
||||
|
||||
async def stream_exists(self, sd_hash: str) -> bool:
|
||||
|
@ -526,10 +535,15 @@ class SQLiteStorage(SQLiteMixin):
|
|||
log.debug("update file status %s -> %s", stream_hash, new_status)
|
||||
return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
||||
|
||||
def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: str, file_name: str):
|
||||
return self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", (
|
||||
binascii.hexlify(download_dir.encode()).decode(), binascii.hexlify(file_name.encode()).decode(),
|
||||
stream_hash
|
||||
async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str],
|
||||
file_name: typing.Optional[str]):
|
||||
if not file_name or not download_dir:
|
||||
encoded_file_name, encoded_download_dir = "{stream}", "{stream}"
|
||||
else:
|
||||
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
|
||||
encoded_download_dir = binascii.hexlify(download_dir.encode()).decode()
|
||||
return await self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", (
|
||||
encoded_download_dir, encoded_file_name, stream_hash,
|
||||
))
|
||||
|
||||
async def recover_streams(self, descriptors_and_sds: typing.List[typing.Tuple['StreamDescriptor', 'BlobFile']],
|
||||
|
|
|
@ -54,13 +54,7 @@ comparison_operators = {
|
|||
|
||||
|
||||
def path_or_none(p) -> typing.Optional[str]:
|
||||
try:
|
||||
return binascii.unhexlify(p).decode()
|
||||
except binascii.Error as err:
|
||||
if p == '{stream}':
|
||||
return None
|
||||
raise err
|
||||
|
||||
return None if p == '{stream}' else binascii.unhexlify(p).decode()
|
||||
|
||||
class StreamManager:
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
||||
|
@ -155,6 +149,18 @@ class StreamManager:
|
|||
# log.info("Attempting to recover %i streams", len(to_recover))
|
||||
await self.recover_streams(to_recover)
|
||||
|
||||
if self.config.streaming_only:
|
||||
to_set_as_streaming = []
|
||||
for index in range(len(to_start)):
|
||||
file_name = path_or_none(to_start[index]['file_name'])
|
||||
download_dir = path_or_none(to_start[index]['download_directory'])
|
||||
if file_name and download_dir and not os.path.isfile(os.path.join(file_name, download_dir)):
|
||||
to_start[index]['file_name'], to_start[index]['download_directory'] = '{stream}', '{stream}'
|
||||
to_set_as_streaming.append(to_start[index]['stream_hash'])
|
||||
|
||||
if to_set_as_streaming:
|
||||
await self.storage.set_files_as_streaming(to_set_as_streaming)
|
||||
|
||||
log.info("Initializing %i files", len(to_start))
|
||||
if to_start:
|
||||
await asyncio.gather(*[
|
||||
|
|
Loading…
Reference in a new issue