diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index b7702f541..641a7148f 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -7,6 +7,7 @@ from json import JSONEncoder from google.protobuf.message import DecodeError from lbry.schema.claim import Claim +from lbry.torrent.torrent_manager import TorrentSource from lbry.wallet import Wallet, Ledger, Account, Transaction, Output from lbry.wallet.bip32 import PubKey from lbry.wallet.dewies import dewies_to_lbc @@ -128,6 +129,8 @@ class JSONResponseEncoder(JSONEncoder): return self.encode_wallet(obj) if isinstance(obj, ManagedStream): return self.encode_file(obj) + if isinstance(obj, TorrentSource): + return self.encode_file(obj) if isinstance(obj, Transaction): return self.encode_transaction(obj) if isinstance(obj, Output): @@ -273,26 +276,27 @@ class JSONResponseEncoder(JSONEncoder): output_exists = managed_stream.output_file_exists tx_height = managed_stream.stream_claim_info.height best_height = self.ledger.headers.height + is_stream = hasattr(managed_stream, 'stream_hash') return { - 'streaming_url': managed_stream.stream_url, + 'streaming_url': managed_stream.stream_url if is_stream else None, 'completed': managed_stream.completed, 'file_name': managed_stream.file_name if output_exists else None, 'download_directory': managed_stream.download_directory if output_exists else None, 'download_path': managed_stream.full_path if output_exists else None, 'points_paid': 0.0, 'stopped': not managed_stream.running, - 'stream_hash': managed_stream.stream_hash, - 'stream_name': managed_stream.descriptor.stream_name, - 'suggested_file_name': managed_stream.descriptor.suggested_file_name, - 'sd_hash': managed_stream.descriptor.sd_hash, - 'mime_type': managed_stream.mime_type, - 'key': managed_stream.descriptor.key, - 'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length(), - 'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length(), - 'written_bytes': managed_stream.written_bytes, - 'blobs_completed': managed_stream.blobs_completed, - 'blobs_in_stream': managed_stream.blobs_in_stream, - 'blobs_remaining': managed_stream.blobs_remaining, + 'stream_hash': managed_stream.stream_hash if is_stream else None, + 'stream_name': managed_stream.descriptor.stream_name if is_stream else None, + 'suggested_file_name': managed_stream.descriptor.suggested_file_name if is_stream else None, + 'sd_hash': managed_stream.descriptor.sd_hash if is_stream else None, + 'mime_type': managed_stream.mime_type if is_stream else None, + 'key': managed_stream.descriptor.key if is_stream else None, + 'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length() if is_stream else None, + 'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length() if is_stream else None, + 'written_bytes': managed_stream.written_bytes if is_stream else None, + 'blobs_completed': managed_stream.blobs_completed if is_stream else None, + 'blobs_in_stream': managed_stream.blobs_in_stream if is_stream else None, + 'blobs_remaining': managed_stream.blobs_remaining if is_stream else None, 'status': managed_stream.status, 'claim_id': managed_stream.claim_id, 'txid': managed_stream.txid, diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 8a03a456c..ce242bfe0 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -753,7 +753,8 @@ class SQLiteStorage(SQLiteMixin): return self.save_claims(to_save.values()) @staticmethod - def _save_content_claim(transaction, claim_outpoint, stream_hash): + def _save_content_claim(transaction, claim_outpoint, stream_hash=None, bt_infohash=None): + assert stream_hash or bt_infohash # get the claim id and serialized metadata claim_info = transaction.execute( "select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,) @@ -801,6 +802,19 @@ class SQLiteStorage(SQLiteMixin): if stream_hash in self.content_claim_callbacks: await self.content_claim_callbacks[stream_hash]() + async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name): + def _save_torrent(transaction): + transaction.execute( + "insert into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) + ).fetchall() + transaction.execute( + "insert into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint) + ).fetchall() + await self.db.run(_save_torrent) + # update corresponding ManagedEncryptedFileDownloader object + if bt_infohash in self.content_claim_callbacks: + await self.content_claim_callbacks[bt_infohash]() + async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict: claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash]) claim = None @@ -812,6 +826,10 @@ class SQLiteStorage(SQLiteMixin): claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports) return claim + async def get_content_claim_for_torrent(self, bt_infohash): + claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash]) + return claims[bt_infohash].as_dict() if claims else None + # # # # # # # # # reflector functions # # # # # # # # # def update_reflected_stream(self, sd_hash, reflector_address, success=True): diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 926296f6e..c8ef53be7 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -16,7 +16,7 @@ if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.storage import SQLiteStorage - from lbry.wallet import WalletManager + from lbry.wallet import WalletManager, Output from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager log = logging.getLogger(__name__) @@ -206,6 +206,12 @@ class FileManager: if not claim.stream.source.bt_infohash: await self.storage.save_content_claim(stream.stream_hash, outpoint) + else: + await self.storage.save_torrent_content_claim( + stream.identifier, outpoint, stream.torrent_length, stream.torrent_name + ) + claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier) + stream.set_claim(claim_info, claim) if save_file: await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download), loop=self.loop) diff --git a/lbry/file/source.py b/lbry/file/source.py index 81ab0ee82..b661eb594 100644 --- a/lbry/file/source.py +++ b/lbry/file/source.py @@ -69,14 +69,14 @@ class ManagedDownloadSource: def stop_tasks(self): raise NotImplementedError() - # def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): - # self.stream_claim_info = StoredContentClaim( - # f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], - # claim_info['name'], claim_info['amount'], claim_info['height'], - # binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], - # claim_info['claim_sequence'], claim_info.get('channel_name') - # ) - # + def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): + self.stream_claim_info = StoredContentClaim( + f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], + claim_info['name'], claim_info['amount'], claim_info['height'], + binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], + claim_info['claim_sequence'], claim_info.get('channel_name') + ) + # async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): # if not claim_info: # claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 2e00a8d65..8147b5a7a 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -363,14 +363,6 @@ class ManagedStream(ManagedDownloadSource): await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") return sent - def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): - self.stream_claim_info = StoredContentClaim( - f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], - claim_info['name'], claim_info['amount'], claim_info['height'], - binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], - claim_info['claim_sequence'], claim_info.get('channel_name') - ) - async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): if not claim_info: claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 2e4c6a72e..e37b2e68f 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -57,12 +57,19 @@ class TorrentHandle: self._handle: libtorrent.torrent_handle = handle self.finished = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop) + self.size = 0 + self.total_wanted_done = 0 + self.name = '' def _show_status(self): # fixme: cleanup status = self._handle.status() if status.has_metadata: self.metadata_completed.set() + self._handle.pause() + self.size = status.total_wanted + self.total_wanted_done = status.total_wanted_done + self.name = status.name # metadata: libtorrent.torrent_info = self._handle.get_torrent_info() # print(metadata) # print(metadata.files()) @@ -205,6 +212,15 @@ class TorrentSession: handle = self._handles[btih] handle._handle.move_storage(download_directory) + def get_size(self, btih): + return self._handles[btih].size + + def get_name(self, btih): + return self._handles[btih].name + + def get_downloaded(self, btih): + return self._handles[btih].total_wanted_done + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 118c5d897..c8f0f0b36 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -51,12 +51,20 @@ class TorrentSource(ManagedDownloadSource): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): await self.torrent_session.save_file(self.identifier, download_directory) + @property + def torrent_length(self): + return self.torrent_session.get_size(self.identifier) + + @property + def torrent_name(self): + return self.torrent_session.get_name(self.identifier) + def stop_tasks(self): pass @property def completed(self): - raise NotImplementedError() + return self.torrent_session.get_downloaded(self.identifier) == self.torrent_length class TorrentManager(SourceManager): diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 0d8ac3a41..b5a7ba405 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -34,6 +34,7 @@ class FileCommands(CommandTestCase): async def test_download_torrent(self): await self.initialize_torrent() await self.out(self.daemon.jsonrpc_get('torrent')) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) async def create_streams_in_range(self, *args, **kwargs): self.stream_claim_ids = []