From 53382b7e15c2f058108467a84bb06b904a3136cf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 28 Feb 2020 14:58:59 -0300 Subject: [PATCH] wait started event --- lbry/extras/daemon/json_response_encoder.py | 2 +- lbry/torrent/session.py | 37 ++++++++++++++++--- lbry/torrent/torrent_manager.py | 7 +++- .../datanetwork/test_file_commands.py | 2 +- 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index 6545a34d5..6bb1ed0b2 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -278,7 +278,7 @@ class JSONResponseEncoder(JSONEncoder): best_height = self.ledger.headers.height is_stream = hasattr(managed_stream, 'stream_hash') return { - 'streaming_url': managed_stream.stream_url if is_stream else None, + 'streaming_url': managed_stream.stream_url if is_stream else f'file://{managed_stream.full_path}', 'completed': managed_stream.completed, 'file_name': managed_stream.file_name if output_exists else None, 'download_directory': managed_stream.download_directory if output_exists else None, diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 6f559386c..b4b47bcdb 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -58,25 +58,32 @@ class TorrentHandle: self._loop = loop self._executor = executor self._handle: libtorrent.torrent_handle = handle + self.started = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop) self.size = 0 self.total_wanted_done = 0 self.name = '' self.tasks = [] - self.torrent_file: Optional[libtorrent.torrent_info] = None + self.torrent_file: Optional[libtorrent.file_storage] = None self._base_path = None + self._handle.set_sequential_download(1) @property def largest_file(self) -> Optional[str]: if not self.torrent_file: return None - largest_size, path = 0, None + index = self.largest_file_index + return os.path.join(self._base_path, self.torrent_file.at(index).path) + + @property + def largest_file_index(self): + largest_size, index = 0, 0 for file_num in range(self.torrent_file.num_files()): if self.torrent_file.file_size(file_num) > largest_size: largest_size = self.torrent_file.file_size(file_num) - path = self.torrent_file.at(file_num).path - return os.path.join(self._base_path, path) + index = file_num + return index def stop_tasks(self): while self.tasks: @@ -84,6 +91,8 @@ class TorrentHandle: def _show_status(self): # fixme: cleanup + if not self._handle.is_valid(): + return status = self._handle.status() if status.has_metadata: self.size = status.total_wanted @@ -94,6 +103,14 @@ class TorrentHandle: log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) self.torrent_file = self._handle.get_torrent_info().files() self._base_path = status.save_path + first_piece = self.torrent_file.at(self.largest_file_index).offset + self._handle.read_piece(first_piece) + if not self.started.is_set(): + if self._handle.have_piece(first_piece): + self.started.set() + else: + # prioritize it + self._handle.set_piece_deadline(first_piece, 100) if not status.is_seeding: log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, @@ -127,6 +144,7 @@ class TorrentSession: self._session: Optional[libtorrent.session] = None self._handles = {} self.tasks = [] + self.wait_start = True async def add_fake_torrent(self): tmpdir = mkdtemp() @@ -190,8 +208,9 @@ class TorrentSession: params = {'info_hash': binascii.unhexlify(btih.encode())} if download_directory: params['save_path'] = download_directory - handle = self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params)) - handle._handle.force_dht_announce() + handle = self._session.add_torrent(params) + handle.force_dht_announce() + self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) def full_path(self, btih): return self._handles[btih].largest_file @@ -202,6 +221,9 @@ class TorrentSession: ) self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) await self._handles[btih].metadata_completed.wait() + if self.wait_start: + # fixme: temporary until we add streaming support, otherwise playback fails! + await self._handles[btih].started.wait() def remove_torrent(self, btih, remove_files=False): if btih in self._handles: @@ -223,6 +245,9 @@ class TorrentSession: def get_downloaded(self, btih): return self._handles[btih].total_wanted_done + def is_completed(self, btih): + return self._handles[btih].finished.is_set() + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index ff38e51c6..a2e1edbe4 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -1,6 +1,7 @@ import asyncio import binascii import logging +import os import typing from typing import Optional from aiohttp.web import Request @@ -44,7 +45,9 @@ class TorrentSource(ManagedDownloadSource): @property def full_path(self) -> Optional[str]: - return self.torrent_session.full_path(self.identifier) + full_path = self.torrent_session.full_path(self.identifier) + self.download_directory = os.path.dirname(full_path) + return full_path async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): await self.torrent_session.add_torrent(self.identifier, self.download_directory) @@ -72,7 +75,7 @@ class TorrentSource(ManagedDownloadSource): @property def completed(self): - return self.torrent_session.get_downloaded(self.identifier) == self.torrent_length + return self.torrent_session.is_completed(self.identifier) class TorrentManager(SourceManager): diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index bbc0dca55..df46c6fab 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -5,7 +5,6 @@ from binascii import hexlify from lbry.schema import Claim from lbry.testcase import CommandTestCase from lbry.torrent.session import TorrentSession -from lbry.torrent.torrent_manager import TorrentManager from lbry.wallet import Transaction @@ -34,6 +33,7 @@ class FileCommands(CommandTestCase): await self.confirm_tx(tx.id) self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session self.client_session._session.add_dht_node(('localhost', 4040)) + self.client_session.wait_start = False # fixme: this is super slow on tests return tx, btih async def test_download_torrent(self):