diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index c808ab046..084b0c11d 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -400,16 +400,15 @@ class SQLiteStorage(SQLiteMixin): def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate): return self.save_published_file( - stream_hash, binascii.hexlify(file_name.encode()).decode(), - binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, - status="running" + stream_hash, file_name, download_directory, data_payment_rate, status="running" ) def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float, status="finished"): return self.db.execute( "insert into file values (?, ?, ?, ?, ?)", - (stream_hash, file_name, download_directory, data_payment_rate, status) + (stream_hash, binascii.hexlify(file_name.encode()).decode(), + binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status) ) async def get_all_lbry_files(self) -> typing.List[typing.Dict]: diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 216f3186e..a3c75a7d7 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -6,6 +6,7 @@ from lbrynet.extras.daemon.mime_types import guess_media_type from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.reflector.client import StreamReflectorClient +from lbrynet.schema.claim import ClaimDict if typing.TYPE_CHECKING: from lbrynet.extras.daemon.storage import StoredStreamClaim from lbrynet.blob.blob_manager import BlobFileManager @@ -99,11 +100,11 @@ class ManagedStream: def as_dict(self) -> typing.Dict: full_path = os.path.join(self.download_directory, self.file_name) - if not os.path.exists(full_path): + if not os.path.isfile(full_path): full_path = None mime_type = guess_media_type(os.path.basename(self.file_name)) - if self.downloader: + if self.downloader and self.downloader.written_bytes: written_bytes = self.downloader.written_bytes elif full_path: written_bytes = os.stat(full_path).st_size @@ -199,3 +200,11 @@ class ManagedStream: self.fully_reflected.set() await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") return sent + + def set_claim(self, claim_info: typing.Dict, claim: ClaimDict): + self.stream_claim_info = StoredStreamClaim( + self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], + claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['hex'], + claim.certificate_id, claim_info['address'], claim_info['claim_sequence'], + claim_info.get('channel_name') + ) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 4de483a3e..1ac5e2ccb 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -59,6 +59,10 @@ class StreamManager: self.resume_downloading_task: asyncio.Task = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] + async def _update_content_claim(self, stream: ManagedStream): + claim_info = await self.storage.get_content_claim(stream.stream_hash) + stream.set_claim(claim_info, ClaimDict.load_dict(claim_info['value'])) + async def load_streams_from_database(self): infos = await self.storage.get_all_lbry_files() for file_info in infos: @@ -77,6 +81,7 @@ class StreamManager: downloader, file_info['status'], file_info['claim'] ) self.streams.add(stream) + self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) async def resume(self): if not self.node: @@ -123,6 +128,7 @@ class StreamManager: iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator) self.streams.add(stream) + self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflector_servers: host, port = random.choice(self.config.reflector_servers) self.loop.create_task(stream.upload_to_reflector(host, port)) @@ -182,16 +188,9 @@ class StreamManager: await self.blob_manager.storage.save_content_claim( downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" ) - - stored_claim = StoredStreamClaim( - downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], - claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['hex'], - claim.certificate_id, claim_info['address'], claim_info['claim_sequence'], - claim_info.get('channel_name') - ) stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory, - os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING, - stored_claim) + os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING) + stream.set_claim(claim_info, claim) self.streams.add(stream) try: await stream.downloader.wrote_bytes_event.wait() diff --git a/scripts/repair_0_31_1_db.py b/scripts/repair_0_31_1_db.py new file mode 100644 index 000000000..5f05ca83f --- /dev/null +++ b/scripts/repair_0_31_1_db.py @@ -0,0 +1,33 @@ +import os +import binascii +import sqlite3 +from lbrynet.conf import Config + + +def main(): + conf = Config() + db = sqlite3.connect(os.path.join(conf.data_dir, 'lbrynet.sqlite')) + cur = db.cursor() + files = cur.execute("select stream_hash, file_name, download_directory from file").fetchall() + update = {} + for stream_hash, file_name, download_directory in files: + try: + binascii.unhexlify(file_name) + except binascii.Error: + try: + binascii.unhexlify(download_directory) + except binascii.Error: + update[stream_hash] = ( + binascii.hexlify(file_name.encode()).decode(), binascii.hexlify(download_directory.encode()).decode() + ) + if update: + print(f"repair {len(update)} streams") + for stream_hash, (file_name, download_directory) in update.items(): + cur.execute('update file set file_name=?, download_directory=? where stream_hash=?', + (file_name, download_directory, stream_hash)) + db.commit() + db.close() + + +if __name__ == "__main__": + main()