working file list after torrent get
This commit is contained in:
parent
a1090679a5
commit
e9623fae13
8 changed files with 77 additions and 32 deletions
|
@ -7,6 +7,7 @@ from json import JSONEncoder
|
||||||
from google.protobuf.message import DecodeError
|
from google.protobuf.message import DecodeError
|
||||||
|
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
|
from lbry.torrent.torrent_manager import TorrentSource
|
||||||
from lbry.wallet import Wallet, Ledger, Account, Transaction, Output
|
from lbry.wallet import Wallet, Ledger, Account, Transaction, Output
|
||||||
from lbry.wallet.bip32 import PubKey
|
from lbry.wallet.bip32 import PubKey
|
||||||
from lbry.wallet.dewies import dewies_to_lbc
|
from lbry.wallet.dewies import dewies_to_lbc
|
||||||
|
@ -126,6 +127,8 @@ class JSONResponseEncoder(JSONEncoder):
|
||||||
return self.encode_wallet(obj)
|
return self.encode_wallet(obj)
|
||||||
if isinstance(obj, ManagedStream):
|
if isinstance(obj, ManagedStream):
|
||||||
return self.encode_file(obj)
|
return self.encode_file(obj)
|
||||||
|
if isinstance(obj, TorrentSource):
|
||||||
|
return self.encode_file(obj)
|
||||||
if isinstance(obj, Transaction):
|
if isinstance(obj, Transaction):
|
||||||
return self.encode_transaction(obj)
|
return self.encode_transaction(obj)
|
||||||
if isinstance(obj, Output):
|
if isinstance(obj, Output):
|
||||||
|
@ -265,26 +268,27 @@ class JSONResponseEncoder(JSONEncoder):
|
||||||
output_exists = managed_stream.output_file_exists
|
output_exists = managed_stream.output_file_exists
|
||||||
tx_height = managed_stream.stream_claim_info.height
|
tx_height = managed_stream.stream_claim_info.height
|
||||||
best_height = self.ledger.headers.height
|
best_height = self.ledger.headers.height
|
||||||
|
is_stream = hasattr(managed_stream, 'stream_hash')
|
||||||
return {
|
return {
|
||||||
'streaming_url': managed_stream.stream_url,
|
'streaming_url': managed_stream.stream_url if is_stream else None,
|
||||||
'completed': managed_stream.completed,
|
'completed': managed_stream.completed,
|
||||||
'file_name': managed_stream.file_name if output_exists else None,
|
'file_name': managed_stream.file_name if output_exists else None,
|
||||||
'download_directory': managed_stream.download_directory if output_exists else None,
|
'download_directory': managed_stream.download_directory if output_exists else None,
|
||||||
'download_path': managed_stream.full_path if output_exists else None,
|
'download_path': managed_stream.full_path if output_exists else None,
|
||||||
'points_paid': 0.0,
|
'points_paid': 0.0,
|
||||||
'stopped': not managed_stream.running,
|
'stopped': not managed_stream.running,
|
||||||
'stream_hash': managed_stream.stream_hash,
|
'stream_hash': managed_stream.stream_hash if is_stream else None,
|
||||||
'stream_name': managed_stream.descriptor.stream_name,
|
'stream_name': managed_stream.descriptor.stream_name if is_stream else None,
|
||||||
'suggested_file_name': managed_stream.descriptor.suggested_file_name,
|
'suggested_file_name': managed_stream.descriptor.suggested_file_name if is_stream else None,
|
||||||
'sd_hash': managed_stream.descriptor.sd_hash,
|
'sd_hash': managed_stream.descriptor.sd_hash if is_stream else None,
|
||||||
'mime_type': managed_stream.mime_type,
|
'mime_type': managed_stream.mime_type if is_stream else None,
|
||||||
'key': managed_stream.descriptor.key,
|
'key': managed_stream.descriptor.key if is_stream else None,
|
||||||
'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length(),
|
'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length() if is_stream else None,
|
||||||
'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length(),
|
'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length() if is_stream else None,
|
||||||
'written_bytes': managed_stream.written_bytes,
|
'written_bytes': managed_stream.written_bytes if is_stream else None,
|
||||||
'blobs_completed': managed_stream.blobs_completed,
|
'blobs_completed': managed_stream.blobs_completed if is_stream else None,
|
||||||
'blobs_in_stream': managed_stream.blobs_in_stream,
|
'blobs_in_stream': managed_stream.blobs_in_stream if is_stream else None,
|
||||||
'blobs_remaining': managed_stream.blobs_remaining,
|
'blobs_remaining': managed_stream.blobs_remaining if is_stream else None,
|
||||||
'status': managed_stream.status,
|
'status': managed_stream.status,
|
||||||
'claim_id': managed_stream.claim_id,
|
'claim_id': managed_stream.claim_id,
|
||||||
'txid': managed_stream.txid,
|
'txid': managed_stream.txid,
|
||||||
|
|
|
@ -753,7 +753,8 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
return self.save_claims(to_save.values())
|
return self.save_claims(to_save.values())
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _save_content_claim(transaction, claim_outpoint, stream_hash):
|
def _save_content_claim(transaction, claim_outpoint, stream_hash=None, bt_infohash=None):
|
||||||
|
assert stream_hash or bt_infohash
|
||||||
# get the claim id and serialized metadata
|
# get the claim id and serialized metadata
|
||||||
claim_info = transaction.execute(
|
claim_info = transaction.execute(
|
||||||
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,)
|
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,)
|
||||||
|
@ -801,6 +802,19 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
if stream_hash in self.content_claim_callbacks:
|
if stream_hash in self.content_claim_callbacks:
|
||||||
await self.content_claim_callbacks[stream_hash]()
|
await self.content_claim_callbacks[stream_hash]()
|
||||||
|
|
||||||
|
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
|
||||||
|
def _save_torrent(transaction):
|
||||||
|
transaction.execute(
|
||||||
|
"insert into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
|
||||||
|
).fetchall()
|
||||||
|
transaction.execute(
|
||||||
|
"insert into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
|
||||||
|
).fetchall()
|
||||||
|
await self.db.run(_save_torrent)
|
||||||
|
# update corresponding ManagedEncryptedFileDownloader object
|
||||||
|
if bt_infohash in self.content_claim_callbacks:
|
||||||
|
await self.content_claim_callbacks[bt_infohash]()
|
||||||
|
|
||||||
async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict:
|
async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict:
|
||||||
claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash])
|
claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash])
|
||||||
claim = None
|
claim = None
|
||||||
|
@ -812,6 +826,10 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
|
claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
|
||||||
return claim
|
return claim
|
||||||
|
|
||||||
|
async def get_content_claim_for_torrent(self, bt_infohash):
|
||||||
|
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
|
||||||
|
return claims[bt_infohash].as_dict() if claims else None
|
||||||
|
|
||||||
# # # # # # # # # reflector functions # # # # # # # # #
|
# # # # # # # # # reflector functions # # # # # # # # #
|
||||||
|
|
||||||
def update_reflected_stream(self, sd_hash, reflector_address, success=True):
|
def update_reflected_stream(self, sd_hash, reflector_address, success=True):
|
||||||
|
|
|
@ -16,7 +16,7 @@ if typing.TYPE_CHECKING:
|
||||||
from lbry.conf import Config
|
from lbry.conf import Config
|
||||||
from lbry.extras.daemon.analytics import AnalyticsManager
|
from lbry.extras.daemon.analytics import AnalyticsManager
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
from lbry.wallet import WalletManager
|
from lbry.wallet import WalletManager, Output
|
||||||
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -206,6 +206,12 @@ class FileManager:
|
||||||
|
|
||||||
if not claim.stream.source.bt_infohash:
|
if not claim.stream.source.bt_infohash:
|
||||||
await self.storage.save_content_claim(stream.stream_hash, outpoint)
|
await self.storage.save_content_claim(stream.stream_hash, outpoint)
|
||||||
|
else:
|
||||||
|
await self.storage.save_torrent_content_claim(
|
||||||
|
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
|
||||||
|
)
|
||||||
|
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
|
||||||
|
stream.set_claim(claim_info, claim)
|
||||||
if save_file:
|
if save_file:
|
||||||
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
|
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
|
||||||
loop=self.loop)
|
loop=self.loop)
|
||||||
|
|
|
@ -69,14 +69,14 @@ class ManagedDownloadSource:
|
||||||
def stop_tasks(self):
|
def stop_tasks(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
# def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
|
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
|
||||||
# self.stream_claim_info = StoredContentClaim(
|
self.stream_claim_info = StoredContentClaim(
|
||||||
# f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
|
f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
|
||||||
# claim_info['name'], claim_info['amount'], claim_info['height'],
|
claim_info['name'], claim_info['amount'], claim_info['height'],
|
||||||
# binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
|
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
|
||||||
# claim_info['claim_sequence'], claim_info.get('channel_name')
|
claim_info['claim_sequence'], claim_info.get('channel_name')
|
||||||
# )
|
)
|
||||||
#
|
|
||||||
# async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
|
# async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
|
||||||
# if not claim_info:
|
# if not claim_info:
|
||||||
# claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
|
# claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
|
||||||
|
|
|
@ -355,14 +355,6 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||||
return sent
|
return sent
|
||||||
|
|
||||||
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
|
|
||||||
self.stream_claim_info = StoredContentClaim(
|
|
||||||
f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
|
|
||||||
claim_info['name'], claim_info['amount'], claim_info['height'],
|
|
||||||
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
|
|
||||||
claim_info['claim_sequence'], claim_info.get('channel_name')
|
|
||||||
)
|
|
||||||
|
|
||||||
async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
|
async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
|
||||||
if not claim_info:
|
if not claim_info:
|
||||||
claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
|
claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
|
||||||
|
|
|
@ -57,12 +57,19 @@ class TorrentHandle:
|
||||||
self._handle: libtorrent.torrent_handle = handle
|
self._handle: libtorrent.torrent_handle = handle
|
||||||
self.finished = asyncio.Event(loop=loop)
|
self.finished = asyncio.Event(loop=loop)
|
||||||
self.metadata_completed = asyncio.Event(loop=loop)
|
self.metadata_completed = asyncio.Event(loop=loop)
|
||||||
|
self.size = 0
|
||||||
|
self.total_wanted_done = 0
|
||||||
|
self.name = ''
|
||||||
|
|
||||||
def _show_status(self):
|
def _show_status(self):
|
||||||
# fixme: cleanup
|
# fixme: cleanup
|
||||||
status = self._handle.status()
|
status = self._handle.status()
|
||||||
if status.has_metadata:
|
if status.has_metadata:
|
||||||
self.metadata_completed.set()
|
self.metadata_completed.set()
|
||||||
|
self._handle.pause()
|
||||||
|
self.size = status.total_wanted
|
||||||
|
self.total_wanted_done = status.total_wanted_done
|
||||||
|
self.name = status.name
|
||||||
# metadata: libtorrent.torrent_info = self._handle.get_torrent_info()
|
# metadata: libtorrent.torrent_info = self._handle.get_torrent_info()
|
||||||
# print(metadata)
|
# print(metadata)
|
||||||
# print(metadata.files())
|
# print(metadata.files())
|
||||||
|
@ -205,6 +212,15 @@ class TorrentSession:
|
||||||
handle = self._handles[btih]
|
handle = self._handles[btih]
|
||||||
handle._handle.move_storage(download_directory)
|
handle._handle.move_storage(download_directory)
|
||||||
|
|
||||||
|
def get_size(self, btih):
|
||||||
|
return self._handles[btih].size
|
||||||
|
|
||||||
|
def get_name(self, btih):
|
||||||
|
return self._handles[btih].name
|
||||||
|
|
||||||
|
def get_downloaded(self, btih):
|
||||||
|
return self._handles[btih].total_wanted_done
|
||||||
|
|
||||||
|
|
||||||
def get_magnet_uri(btih):
|
def get_magnet_uri(btih):
|
||||||
return f"magnet:?xt=urn:btih:{btih}"
|
return f"magnet:?xt=urn:btih:{btih}"
|
||||||
|
|
|
@ -51,12 +51,20 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
|
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
|
||||||
await self.torrent_session.save_file(self.identifier, download_directory)
|
await self.torrent_session.save_file(self.identifier, download_directory)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def torrent_length(self):
|
||||||
|
return self.torrent_session.get_size(self.identifier)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def torrent_name(self):
|
||||||
|
return self.torrent_session.get_name(self.identifier)
|
||||||
|
|
||||||
def stop_tasks(self):
|
def stop_tasks(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def completed(self):
|
def completed(self):
|
||||||
raise NotImplementedError()
|
return self.torrent_session.get_downloaded(self.identifier) == self.torrent_length
|
||||||
|
|
||||||
|
|
||||||
class TorrentManager(SourceManager):
|
class TorrentManager(SourceManager):
|
||||||
|
|
|
@ -34,6 +34,7 @@ class FileCommands(CommandTestCase):
|
||||||
async def test_download_torrent(self):
|
async def test_download_torrent(self):
|
||||||
await self.initialize_torrent()
|
await self.initialize_torrent()
|
||||||
await self.out(self.daemon.jsonrpc_get('torrent'))
|
await self.out(self.daemon.jsonrpc_get('torrent'))
|
||||||
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
|
|
||||||
async def create_streams_in_range(self, *args, **kwargs):
|
async def create_streams_in_range(self, *args, **kwargs):
|
||||||
self.stream_claim_ids = []
|
self.stream_claim_ids = []
|
||||||
|
|
Loading…
Add table
Reference in a new issue