working file list after torrent get

This commit is contained in:
Victor Shyba 2020-02-10 23:15:18 -03:00
parent cf985486e5
commit 4d47873219
8 changed files with 77 additions and 32 deletions

View file

@ -7,6 +7,7 @@ from json import JSONEncoder
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.torrent.torrent_manager import TorrentSource
from lbry.wallet import Wallet, Ledger, Account, Transaction, Output from lbry.wallet import Wallet, Ledger, Account, Transaction, Output
from lbry.wallet.bip32 import PubKey from lbry.wallet.bip32 import PubKey
from lbry.wallet.dewies import dewies_to_lbc from lbry.wallet.dewies import dewies_to_lbc
@ -128,6 +129,8 @@ class JSONResponseEncoder(JSONEncoder):
return self.encode_wallet(obj) return self.encode_wallet(obj)
if isinstance(obj, ManagedStream): if isinstance(obj, ManagedStream):
return self.encode_file(obj) return self.encode_file(obj)
if isinstance(obj, TorrentSource):
return self.encode_file(obj)
if isinstance(obj, Transaction): if isinstance(obj, Transaction):
return self.encode_transaction(obj) return self.encode_transaction(obj)
if isinstance(obj, Output): if isinstance(obj, Output):
@ -273,26 +276,27 @@ class JSONResponseEncoder(JSONEncoder):
output_exists = managed_stream.output_file_exists output_exists = managed_stream.output_file_exists
tx_height = managed_stream.stream_claim_info.height tx_height = managed_stream.stream_claim_info.height
best_height = self.ledger.headers.height best_height = self.ledger.headers.height
is_stream = hasattr(managed_stream, 'stream_hash')
return { return {
'streaming_url': managed_stream.stream_url, 'streaming_url': managed_stream.stream_url if is_stream else None,
'completed': managed_stream.completed, 'completed': managed_stream.completed,
'file_name': managed_stream.file_name if output_exists else None, 'file_name': managed_stream.file_name if output_exists else None,
'download_directory': managed_stream.download_directory if output_exists else None, 'download_directory': managed_stream.download_directory if output_exists else None,
'download_path': managed_stream.full_path if output_exists else None, 'download_path': managed_stream.full_path if output_exists else None,
'points_paid': 0.0, 'points_paid': 0.0,
'stopped': not managed_stream.running, 'stopped': not managed_stream.running,
'stream_hash': managed_stream.stream_hash, 'stream_hash': managed_stream.stream_hash if is_stream else None,
'stream_name': managed_stream.descriptor.stream_name, 'stream_name': managed_stream.descriptor.stream_name if is_stream else None,
'suggested_file_name': managed_stream.descriptor.suggested_file_name, 'suggested_file_name': managed_stream.descriptor.suggested_file_name if is_stream else None,
'sd_hash': managed_stream.descriptor.sd_hash, 'sd_hash': managed_stream.descriptor.sd_hash if is_stream else None,
'mime_type': managed_stream.mime_type, 'mime_type': managed_stream.mime_type if is_stream else None,
'key': managed_stream.descriptor.key, 'key': managed_stream.descriptor.key if is_stream else None,
'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length(), 'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length() if is_stream else None,
'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length(), 'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length() if is_stream else None,
'written_bytes': managed_stream.written_bytes, 'written_bytes': managed_stream.written_bytes if is_stream else None,
'blobs_completed': managed_stream.blobs_completed, 'blobs_completed': managed_stream.blobs_completed if is_stream else None,
'blobs_in_stream': managed_stream.blobs_in_stream, 'blobs_in_stream': managed_stream.blobs_in_stream if is_stream else None,
'blobs_remaining': managed_stream.blobs_remaining, 'blobs_remaining': managed_stream.blobs_remaining if is_stream else None,
'status': managed_stream.status, 'status': managed_stream.status,
'claim_id': managed_stream.claim_id, 'claim_id': managed_stream.claim_id,
'txid': managed_stream.txid, 'txid': managed_stream.txid,

View file

@ -753,7 +753,8 @@ class SQLiteStorage(SQLiteMixin):
return self.save_claims(to_save.values()) return self.save_claims(to_save.values())
@staticmethod @staticmethod
def _save_content_claim(transaction, claim_outpoint, stream_hash): def _save_content_claim(transaction, claim_outpoint, stream_hash=None, bt_infohash=None):
assert stream_hash or bt_infohash
# get the claim id and serialized metadata # get the claim id and serialized metadata
claim_info = transaction.execute( claim_info = transaction.execute(
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,) "select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,)
@ -801,6 +802,19 @@ class SQLiteStorage(SQLiteMixin):
if stream_hash in self.content_claim_callbacks: if stream_hash in self.content_claim_callbacks:
await self.content_claim_callbacks[stream_hash]() await self.content_claim_callbacks[stream_hash]()
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent(transaction):
transaction.execute(
"insert into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
).fetchall()
transaction.execute(
"insert into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
).fetchall()
await self.db.run(_save_torrent)
# update corresponding ManagedEncryptedFileDownloader object
if bt_infohash in self.content_claim_callbacks:
await self.content_claim_callbacks[bt_infohash]()
async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict: async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict:
claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash]) claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash])
claim = None claim = None
@ -812,6 +826,10 @@ class SQLiteStorage(SQLiteMixin):
claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports) claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
return claim return claim
async def get_content_claim_for_torrent(self, bt_infohash):
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
return claims[bt_infohash].as_dict() if claims else None
# # # # # # # # # reflector functions # # # # # # # # # # # # # # # # # # reflector functions # # # # # # # # #
def update_reflected_stream(self, sd_hash, reflector_address, success=True): def update_reflected_stream(self, sd_hash, reflector_address, success=True):

View file

@ -16,7 +16,7 @@ if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import WalletManager from lbry.wallet import WalletManager, Output
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -206,6 +206,12 @@ class FileManager:
if not claim.stream.source.bt_infohash: if not claim.stream.source.bt_infohash:
await self.storage.save_content_claim(stream.stream_hash, outpoint) await self.storage.save_content_claim(stream.stream_hash, outpoint)
else:
await self.storage.save_torrent_content_claim(
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim)
if save_file: if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download), await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
loop=self.loop) loop=self.loop)

View file

@ -69,14 +69,14 @@ class ManagedDownloadSource:
def stop_tasks(self): def stop_tasks(self):
raise NotImplementedError() raise NotImplementedError()
# def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
# self.stream_claim_info = StoredContentClaim( self.stream_claim_info = StoredContentClaim(
# f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
# claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['name'], claim_info['amount'], claim_info['height'],
# binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
# claim_info['claim_sequence'], claim_info.get('channel_name') claim_info['claim_sequence'], claim_info.get('channel_name')
# ) )
#
# async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): # async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
# if not claim_info: # if not claim_info:
# claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) # claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)

View file

@ -363,14 +363,6 @@ class ManagedStream(ManagedDownloadSource):
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return sent return sent
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
self.stream_claim_info = StoredContentClaim(
f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
claim_info['name'], claim_info['amount'], claim_info['height'],
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
claim_info['claim_sequence'], claim_info.get('channel_name')
)
async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
if not claim_info: if not claim_info:
claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)

View file

@ -57,12 +57,19 @@ class TorrentHandle:
self._handle: libtorrent.torrent_handle = handle self._handle: libtorrent.torrent_handle = handle
self.finished = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop)
self.size = 0
self.total_wanted_done = 0
self.name = ''
def _show_status(self): def _show_status(self):
# fixme: cleanup # fixme: cleanup
status = self._handle.status() status = self._handle.status()
if status.has_metadata: if status.has_metadata:
self.metadata_completed.set() self.metadata_completed.set()
self._handle.pause()
self.size = status.total_wanted
self.total_wanted_done = status.total_wanted_done
self.name = status.name
# metadata: libtorrent.torrent_info = self._handle.get_torrent_info() # metadata: libtorrent.torrent_info = self._handle.get_torrent_info()
# print(metadata) # print(metadata)
# print(metadata.files()) # print(metadata.files())
@ -205,6 +212,15 @@ class TorrentSession:
handle = self._handles[btih] handle = self._handles[btih]
handle._handle.move_storage(download_directory) handle._handle.move_storage(download_directory)
def get_size(self, btih):
return self._handles[btih].size
def get_name(self, btih):
return self._handles[btih].name
def get_downloaded(self, btih):
return self._handles[btih].total_wanted_done
def get_magnet_uri(btih): def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}" return f"magnet:?xt=urn:btih:{btih}"

View file

@ -51,12 +51,20 @@ class TorrentSource(ManagedDownloadSource):
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
await self.torrent_session.save_file(self.identifier, download_directory) await self.torrent_session.save_file(self.identifier, download_directory)
@property
def torrent_length(self):
return self.torrent_session.get_size(self.identifier)
@property
def torrent_name(self):
return self.torrent_session.get_name(self.identifier)
def stop_tasks(self): def stop_tasks(self):
pass pass
@property @property
def completed(self): def completed(self):
raise NotImplementedError() return self.torrent_session.get_downloaded(self.identifier) == self.torrent_length
class TorrentManager(SourceManager): class TorrentManager(SourceManager):

View file

@ -34,6 +34,7 @@ class FileCommands(CommandTestCase):
async def test_download_torrent(self): async def test_download_torrent(self):
await self.initialize_torrent() await self.initialize_torrent()
await self.out(self.daemon.jsonrpc_get('torrent')) await self.out(self.daemon.jsonrpc_get('torrent'))
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
async def create_streams_in_range(self, *args, **kwargs): async def create_streams_in_range(self, *args, **kwargs):
self.stream_claim_ids = [] self.stream_claim_ids = []