wait started event

This commit is contained in:
Victor Shyba 2020-02-28 14:58:59 -03:00
parent 6ad0242617
commit 53382b7e15
4 changed files with 38 additions and 10 deletions

View file

@ -278,7 +278,7 @@ class JSONResponseEncoder(JSONEncoder):
best_height = self.ledger.headers.height best_height = self.ledger.headers.height
is_stream = hasattr(managed_stream, 'stream_hash') is_stream = hasattr(managed_stream, 'stream_hash')
return { return {
'streaming_url': managed_stream.stream_url if is_stream else None, 'streaming_url': managed_stream.stream_url if is_stream else f'file://{managed_stream.full_path}',
'completed': managed_stream.completed, 'completed': managed_stream.completed,
'file_name': managed_stream.file_name if output_exists else None, 'file_name': managed_stream.file_name if output_exists else None,
'download_directory': managed_stream.download_directory if output_exists else None, 'download_directory': managed_stream.download_directory if output_exists else None,

View file

@ -58,25 +58,32 @@ class TorrentHandle:
self._loop = loop self._loop = loop
self._executor = executor self._executor = executor
self._handle: libtorrent.torrent_handle = handle self._handle: libtorrent.torrent_handle = handle
self.started = asyncio.Event(loop=loop)
self.finished = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop)
self.size = 0 self.size = 0
self.total_wanted_done = 0 self.total_wanted_done = 0
self.name = '' self.name = ''
self.tasks = [] self.tasks = []
self.torrent_file: Optional[libtorrent.torrent_info] = None self.torrent_file: Optional[libtorrent.file_storage] = None
self._base_path = None self._base_path = None
self._handle.set_sequential_download(1)
@property @property
def largest_file(self) -> Optional[str]: def largest_file(self) -> Optional[str]:
if not self.torrent_file: if not self.torrent_file:
return None return None
largest_size, path = 0, None index = self.largest_file_index
return os.path.join(self._base_path, self.torrent_file.at(index).path)
@property
def largest_file_index(self):
largest_size, index = 0, 0
for file_num in range(self.torrent_file.num_files()): for file_num in range(self.torrent_file.num_files()):
if self.torrent_file.file_size(file_num) > largest_size: if self.torrent_file.file_size(file_num) > largest_size:
largest_size = self.torrent_file.file_size(file_num) largest_size = self.torrent_file.file_size(file_num)
path = self.torrent_file.at(file_num).path index = file_num
return os.path.join(self._base_path, path) return index
def stop_tasks(self): def stop_tasks(self):
while self.tasks: while self.tasks:
@ -84,6 +91,8 @@ class TorrentHandle:
def _show_status(self): def _show_status(self):
# fixme: cleanup # fixme: cleanup
if not self._handle.is_valid():
return
status = self._handle.status() status = self._handle.status()
if status.has_metadata: if status.has_metadata:
self.size = status.total_wanted self.size = status.total_wanted
@ -94,6 +103,14 @@ class TorrentHandle:
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
self.torrent_file = self._handle.get_torrent_info().files() self.torrent_file = self._handle.get_torrent_info().files()
self._base_path = status.save_path self._base_path = status.save_path
first_piece = self.torrent_file.at(self.largest_file_index).offset
self._handle.read_piece(first_piece)
if not self.started.is_set():
if self._handle.have_piece(first_piece):
self.started.set()
else:
# prioritize it
self._handle.set_piece_deadline(first_piece, 100)
if not status.is_seeding: if not status.is_seeding:
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
@ -127,6 +144,7 @@ class TorrentSession:
self._session: Optional[libtorrent.session] = None self._session: Optional[libtorrent.session] = None
self._handles = {} self._handles = {}
self.tasks = [] self.tasks = []
self.wait_start = True
async def add_fake_torrent(self): async def add_fake_torrent(self):
tmpdir = mkdtemp() tmpdir = mkdtemp()
@ -190,8 +208,9 @@ class TorrentSession:
params = {'info_hash': binascii.unhexlify(btih.encode())} params = {'info_hash': binascii.unhexlify(btih.encode())}
if download_directory: if download_directory:
params['save_path'] = download_directory params['save_path'] = download_directory
handle = self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params)) handle = self._session.add_torrent(params)
handle._handle.force_dht_announce() handle.force_dht_announce()
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
def full_path(self, btih): def full_path(self, btih):
return self._handles[btih].largest_file return self._handles[btih].largest_file
@ -202,6 +221,9 @@ class TorrentSession:
) )
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
await self._handles[btih].metadata_completed.wait() await self._handles[btih].metadata_completed.wait()
if self.wait_start:
# fixme: temporary until we add streaming support, otherwise playback fails!
await self._handles[btih].started.wait()
def remove_torrent(self, btih, remove_files=False): def remove_torrent(self, btih, remove_files=False):
if btih in self._handles: if btih in self._handles:
@ -223,6 +245,9 @@ class TorrentSession:
def get_downloaded(self, btih): def get_downloaded(self, btih):
return self._handles[btih].total_wanted_done return self._handles[btih].total_wanted_done
def is_completed(self, btih):
return self._handles[btih].finished.is_set()
def get_magnet_uri(btih): def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}" return f"magnet:?xt=urn:btih:{btih}"

View file

@ -1,6 +1,7 @@
import asyncio import asyncio
import binascii import binascii
import logging import logging
import os
import typing import typing
from typing import Optional from typing import Optional
from aiohttp.web import Request from aiohttp.web import Request
@ -44,7 +45,9 @@ class TorrentSource(ManagedDownloadSource):
@property @property
def full_path(self) -> Optional[str]: def full_path(self) -> Optional[str]:
return self.torrent_session.full_path(self.identifier) full_path = self.torrent_session.full_path(self.identifier)
self.download_directory = os.path.dirname(full_path)
return full_path
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
await self.torrent_session.add_torrent(self.identifier, self.download_directory) await self.torrent_session.add_torrent(self.identifier, self.download_directory)
@ -72,7 +75,7 @@ class TorrentSource(ManagedDownloadSource):
@property @property
def completed(self): def completed(self):
return self.torrent_session.get_downloaded(self.identifier) == self.torrent_length return self.torrent_session.is_completed(self.identifier)
class TorrentManager(SourceManager): class TorrentManager(SourceManager):

View file

@ -5,7 +5,6 @@ from binascii import hexlify
from lbry.schema import Claim from lbry.schema import Claim
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.torrent.session import TorrentSession from lbry.torrent.session import TorrentSession
from lbry.torrent.torrent_manager import TorrentManager
from lbry.wallet import Transaction from lbry.wallet import Transaction
@ -34,6 +33,7 @@ class FileCommands(CommandTestCase):
await self.confirm_tx(tx.id) await self.confirm_tx(tx.id)
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
self.client_session._session.add_dht_node(('localhost', 4040)) self.client_session._session.add_dht_node(('localhost', 4040))
self.client_session.wait_start = False # fixme: this is super slow on tests
return tx, btih return tx, btih
async def test_download_torrent(self): async def test_download_torrent(self):