From cf985486e52ffde83ebeeaf04c0f13414bca5ec5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 10 Feb 2020 21:50:16 -0300 Subject: [PATCH] torrent test and misc fixes --- Makefile | 1 + lbry/file/file_manager.py | 9 ++++-- lbry/torrent/session.py | 26 +++++++++++++---- lbry/torrent/torrent_manager.py | 5 ++-- .../datanetwork/test_file_commands.py | 28 +++++++++++++++++++ tox.ini | 2 ++ 6 files changed, 62 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 23b3f84e8..ab1f20ac4 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ .PHONY: install tools lint test idea install: + pip install -e git+https://github.com/shyba/libtorrent.git#egg=python-libtorrent CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \ --global-option=fetch \ --global-option=--version --global-option=3.30.1 --global-option=--all \ diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 1ab72f7c1..926296f6e 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -66,6 +66,7 @@ class FileManager: start_time = self.loop.time() resolved_time = None stream = None + claim = None error = None outpoint = None if save_file is None: @@ -203,7 +204,8 @@ class FileManager: source_manager.add(stream) - await self.storage.save_content_claim(stream.stream_hash, outpoint) + if not claim.stream.source.bt_infohash: + await self.storage.save_content_claim(stream.stream_hash, outpoint) if save_file: await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download), loop=self.loop) @@ -226,7 +228,10 @@ class FileManager: if payment is not None: # payment is set to None after broadcasting, if we're here an exception probably happened await self.wallet_manager.ledger.release_tx(payment) - if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or + if self.analytics_manager and claim and claim.stream.source.bt_infohash: + # TODO: analytics for torrents + pass + elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or stream.downloader.time_to_first_bytes))): server = self.wallet_manager.ledger.network.client.server self.loop.create_task( diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index af2663e64..2e4c6a72e 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -37,6 +37,7 @@ NOTIFICATION_MASKS = [ DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? libtorrent.add_torrent_params_flags_t.flag_auto_managed + | libtorrent.add_torrent_params_flags_t.flag_paused | libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error | libtorrent.add_torrent_params_flags_t.flag_update_subscribe ) @@ -76,9 +77,7 @@ class TorrentHandle: async def status_loop(self): while True: - await self._loop.run_in_executor( - self._executor, self._show_status - ) + self._show_status() if self.finished.is_set(): break await asyncio.sleep(0.1, loop=self._loop) @@ -100,6 +99,7 @@ class TorrentSession: self._executor = executor self._session: Optional[libtorrent.session] = None self._handles = {} + self.tasks = [] async def add_fake_torrent(self): dir = mkdtemp() @@ -136,7 +136,18 @@ class TorrentSession: self._session = await self._loop.run_in_executor( self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member ) - self._loop.create_task(self.process_alerts()) + self.tasks.append(self._loop.create_task(self.process_alerts())) + + def stop(self): + while self.tasks: + self.tasks.pop().cancel() + self._session.save_state() + self._session.pause() + self._session.stop_dht() + self._session.stop_lsd() + self._session.stop_natpmp() + self._session.stop_upnp() + self._session = None def _pop_alerts(self): for alert in self._session.pop_alerts(): @@ -167,7 +178,7 @@ class TorrentSession: flags = DEFAULT_FLAGS print(bin(flags)) flags ^= libtorrent.add_torrent_params_flags_t.flag_paused - # flags ^= libtorrent.add_torrent_params_flags_t.flag_auto_managed + flags ^= libtorrent.add_torrent_params_flags_t.flag_auto_managed # flags ^= libtorrent.add_torrent_params_flags_t.flag_stop_when_ready print(bin(flags)) # params['flags'] = flags @@ -189,6 +200,11 @@ class TorrentSession: self._session.remove_torrent(handle, 1 if remove_files else 0) self._handles.pop(btih) + async def save_file(self, btih, download_directory): + return + handle = self._handles[btih] + handle._handle.move_storage(download_directory) + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 24ed651be..118c5d897 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -49,15 +49,16 @@ class TorrentSource(ManagedDownloadSource): await self.torrent_session.remove_torrent(self.identifier) async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): - raise NotImplementedError() + await self.torrent_session.save_file(self.identifier, download_directory) def stop_tasks(self): - raise NotImplementedError() + pass @property def completed(self): raise NotImplementedError() + class TorrentManager(SourceManager): _sources: typing.Dict[str, ManagedDownloadSource] diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 542be228c..0d8ac3a41 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -2,10 +2,38 @@ import asyncio import os from binascii import hexlify +from lbry.schema import Claim from lbry.testcase import CommandTestCase +from lbry.torrent.session import TorrentSession +from lbry.torrent.torrent_manager import TorrentManager +from lbry.wallet import Transaction class FileCommands(CommandTestCase): + async def initialize_torrent(self): + self.seeder_session = TorrentSession(self.loop, None) + self.addCleanup(self.seeder_session.stop) + await self.seeder_session.bind('localhost', 4040) + self.btih = await self.seeder_session.add_fake_torrent() + address = await self.account.receiving.get_or_create_usable_address() + claim = Claim() + claim.stream.update(bt_infohash=self.btih) + tx = await Transaction.claim_create( + 'torrent', claim, 1, address, [self.account], self.account) + await tx.sign([self.account]) + await self.broadcast(tx) + await self.confirm_tx(tx.id) + client_session = TorrentSession(self.loop, None) + self.daemon.file_manager.source_managers['torrent'] = TorrentManager( + self.loop, self.daemon.conf, client_session, self.daemon.storage, self.daemon.analytics_manager + ) + await self.daemon.file_manager.source_managers['torrent'].start() + await client_session.bind('localhost', 4041) + client_session._session.add_dht_node(('localhost', 4040)) + + async def test_download_torrent(self): + await self.initialize_torrent() + await self.out(self.daemon.jsonrpc_get('torrent')) async def create_streams_in_range(self, *args, **kwargs): self.stream_claim_ids = [] diff --git a/tox.ini b/tox.ini index 3b446a241..697c76ed2 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,8 @@ changedir = {toxinidir}/tests setenv = HOME=/tmp commands = + pip install -U pip + pip install -e 'git+https://github.com/shyba/libtorrent.git#egg=python-libtorrent' pip install https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \ --global-option=fetch \ --global-option=--version --global-option=3.30.1 --global-option=--all \