diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index ff5500b2a..181214e95 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1382,7 +1382,7 @@ class Daemon(metaclass=JSONRPCServerType): ] } """ - sort = sort or 'status' + sort = sort or 'rowid' comparison = comparison or 'eq' return [ stream.as_dict() for stream in self.stream_manager.get_filtered_streams( @@ -2023,8 +2023,6 @@ class Daemon(metaclass=JSONRPCServerType): if file_path: stream = await self.stream_manager.create_stream(file_path) - await self.storage.save_published_file(stream.stream_hash, os.path.basename(file_path), - os.path.dirname(file_path), 0) claim_dict['stream']['source']['source'] = stream.sd_hash claim_dict['stream']['source']['contentType'] = guess_media_type(file_path) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 7830a4a77..d3826530b 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -134,7 +134,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di signed_claims[claim.channel_claim_id].append(claim) files.append( { - "row_id": rowid, + "rowid": rowid, "stream_hash": stream_hash, "file_name": file_name, # hex "download_directory": download_dir, # hex @@ -195,12 +195,13 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: str, download_directory: str, - data_payment_rate: float, status: str): + data_payment_rate: float, status: str) -> int: transaction.execute( "insert or replace into file values (?, ?, ?, ?, ?)", (stream_hash, binascii.hexlify(file_name.encode()).decode(), binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status) ) + return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] class SQLiteStorage(SQLiteMixin): @@ -429,6 +430,11 @@ class SQLiteStorage(SQLiteMixin): "s.stream_hash=f.stream_hash and s.sd_hash=?", sd_hash) return streams is not None + def rowid_for_stream(self, stream_hash: str) -> typing.Awaitable[typing.Optional[int]]: + return self.run_and_return_one_or_none( + "select rowid from file where stream_hash=?", stream_hash + ) + def store_stream(self, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'): return self.db.run(store_stream, sd_blob, descriptor) @@ -479,13 +485,14 @@ class SQLiteStorage(SQLiteMixin): # # # # # # # # # file stuff # # # # # # # # # - def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate): + def save_downloaded_file(self, stream_hash, file_name, download_directory, + data_payment_rate) -> typing.Awaitable[int]: return self.save_published_file( stream_hash, file_name, download_directory, data_payment_rate, status="running" ) def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float, - status="finished"): + status="finished") -> typing.Awaitable[int]: return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status) def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 343f1647a..054269423 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -21,11 +21,13 @@ class ManagedStream: STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" - def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', descriptor: 'StreamDescriptor', - download_directory: str, file_name: str, downloader: typing.Optional[StreamDownloader] = None, + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int, + descriptor: 'StreamDescriptor', download_directory: str, file_name: str, + downloader: typing.Optional[StreamDownloader] = None, status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None): self.loop = loop self.blob_manager = blob_manager + self.rowid = rowid self.download_directory = download_directory self._file_name = file_name self.descriptor = descriptor @@ -168,7 +170,9 @@ class ManagedStream: await blob_manager.blob_completed(sd_blob) for blob in descriptor.blobs[:-1]: await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length)) - return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path), + row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), + os.path.dirname(file_path), 0) + return cls(loop, blob_manager, row_id, descriptor, os.path.dirname(file_path), os.path.basename(file_path), status=cls.STATUS_FINISHED) def start_download(self, node: typing.Optional['Node']): diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 719068982..14478ce25 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -24,6 +24,7 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) filter_fields = [ + 'rowid', 'status', 'file_name', 'sd_hash', @@ -156,7 +157,7 @@ class StreamManager: await self.storage.recover_streams(to_restore, self.config.download_dir) log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) - async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, + async def add_stream(self, rowid: int, sd_hash: str, file_name: str, download_directory: str, status: str, claim: typing.Optional['StoredStreamClaim']): sd_blob = self.blob_manager.get_blob(sd_hash) if not sd_blob.get_is_verified(): @@ -171,7 +172,7 @@ class StreamManager: else: downloader = None stream = ManagedStream( - self.loop, self.blob_manager, descriptor, download_directory, file_name, downloader, status, claim + self.loop, self.blob_manager, rowid, descriptor, download_directory, file_name, downloader, status, claim ) self.streams.add(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) @@ -194,7 +195,7 @@ class StreamManager: await asyncio.gather(*[ self.add_stream( - file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(), + file_info['rowid'], file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(), binascii.unhexlify(file_info['download_directory']).decode(), file_info['status'], file_info['claim'] ) for file_info in to_start @@ -309,14 +310,16 @@ class StreamManager: if not await self.blob_manager.storage.stream_exists(downloader.sd_hash): await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor) if not await self.blob_manager.storage.file_exists(downloader.sd_hash): - await self.blob_manager.storage.save_downloaded_file( + rowid = await self.blob_manager.storage.save_downloaded_file( downloader.descriptor.stream_hash, file_name, download_directory, 0.0 ) + else: + rowid = self.blob_manager.storage.rowid_for_stream(downloader.descriptor.stream_hash) await self.blob_manager.storage.save_content_claim( downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" ) - stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory, + stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, download_directory, file_name, downloader, ManagedStream.STATUS_RUNNING) stream.set_claim(claim_info, claim) self.streams.add(stream)