torrent test and misc fixes

This commit is contained in:
Victor Shyba 2020-02-10 21:50:16 -03:00
parent b930c3fc93
commit cf985486e5
6 changed files with 62 additions and 9 deletions

View file

@ -1,6 +1,7 @@
.PHONY: install tools lint test idea .PHONY: install tools lint test idea
install: install:
pip install -e git+https://github.com/shyba/libtorrent.git#egg=python-libtorrent
CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \ CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \
--global-option=fetch \ --global-option=fetch \
--global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=--version --global-option=3.30.1 --global-option=--all \

View file

@ -66,6 +66,7 @@ class FileManager:
start_time = self.loop.time() start_time = self.loop.time()
resolved_time = None resolved_time = None
stream = None stream = None
claim = None
error = None error = None
outpoint = None outpoint = None
if save_file is None: if save_file is None:
@ -203,6 +204,7 @@ class FileManager:
source_manager.add(stream) source_manager.add(stream)
if not claim.stream.source.bt_infohash:
await self.storage.save_content_claim(stream.stream_hash, outpoint) await self.storage.save_content_claim(stream.stream_hash, outpoint)
if save_file: if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download), await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
@ -226,7 +228,10 @@ class FileManager:
if payment is not None: if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened # payment is set to None after broadcasting, if we're here an exception probably happened
await self.wallet_manager.ledger.release_tx(payment) await self.wallet_manager.ledger.release_tx(payment)
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or if self.analytics_manager and claim and claim.stream.source.bt_infohash:
# TODO: analytics for torrents
pass
elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))): stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server server = self.wallet_manager.ledger.network.client.server
self.loop.create_task( self.loop.create_task(

View file

@ -37,6 +37,7 @@ NOTIFICATION_MASKS = [
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
libtorrent.add_torrent_params_flags_t.flag_auto_managed libtorrent.add_torrent_params_flags_t.flag_auto_managed
| libtorrent.add_torrent_params_flags_t.flag_paused
| libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error | libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe | libtorrent.add_torrent_params_flags_t.flag_update_subscribe
) )
@ -76,9 +77,7 @@ class TorrentHandle:
async def status_loop(self): async def status_loop(self):
while True: while True:
await self._loop.run_in_executor( self._show_status()
self._executor, self._show_status
)
if self.finished.is_set(): if self.finished.is_set():
break break
await asyncio.sleep(0.1, loop=self._loop) await asyncio.sleep(0.1, loop=self._loop)
@ -100,6 +99,7 @@ class TorrentSession:
self._executor = executor self._executor = executor
self._session: Optional[libtorrent.session] = None self._session: Optional[libtorrent.session] = None
self._handles = {} self._handles = {}
self.tasks = []
async def add_fake_torrent(self): async def add_fake_torrent(self):
dir = mkdtemp() dir = mkdtemp()
@ -136,7 +136,18 @@ class TorrentSession:
self._session = await self._loop.run_in_executor( self._session = await self._loop.run_in_executor(
self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member
) )
self._loop.create_task(self.process_alerts()) self.tasks.append(self._loop.create_task(self.process_alerts()))
def stop(self):
while self.tasks:
self.tasks.pop().cancel()
self._session.save_state()
self._session.pause()
self._session.stop_dht()
self._session.stop_lsd()
self._session.stop_natpmp()
self._session.stop_upnp()
self._session = None
def _pop_alerts(self): def _pop_alerts(self):
for alert in self._session.pop_alerts(): for alert in self._session.pop_alerts():
@ -167,7 +178,7 @@ class TorrentSession:
flags = DEFAULT_FLAGS flags = DEFAULT_FLAGS
print(bin(flags)) print(bin(flags))
flags ^= libtorrent.add_torrent_params_flags_t.flag_paused flags ^= libtorrent.add_torrent_params_flags_t.flag_paused
# flags ^= libtorrent.add_torrent_params_flags_t.flag_auto_managed flags ^= libtorrent.add_torrent_params_flags_t.flag_auto_managed
# flags ^= libtorrent.add_torrent_params_flags_t.flag_stop_when_ready # flags ^= libtorrent.add_torrent_params_flags_t.flag_stop_when_ready
print(bin(flags)) print(bin(flags))
# params['flags'] = flags # params['flags'] = flags
@ -189,6 +200,11 @@ class TorrentSession:
self._session.remove_torrent(handle, 1 if remove_files else 0) self._session.remove_torrent(handle, 1 if remove_files else 0)
self._handles.pop(btih) self._handles.pop(btih)
async def save_file(self, btih, download_directory):
return
handle = self._handles[btih]
handle._handle.move_storage(download_directory)
def get_magnet_uri(btih): def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}" return f"magnet:?xt=urn:btih:{btih}"

View file

@ -49,15 +49,16 @@ class TorrentSource(ManagedDownloadSource):
await self.torrent_session.remove_torrent(self.identifier) await self.torrent_session.remove_torrent(self.identifier)
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError() await self.torrent_session.save_file(self.identifier, download_directory)
def stop_tasks(self): def stop_tasks(self):
raise NotImplementedError() pass
@property @property
def completed(self): def completed(self):
raise NotImplementedError() raise NotImplementedError()
class TorrentManager(SourceManager): class TorrentManager(SourceManager):
_sources: typing.Dict[str, ManagedDownloadSource] _sources: typing.Dict[str, ManagedDownloadSource]

View file

@ -2,10 +2,38 @@ import asyncio
import os import os
from binascii import hexlify from binascii import hexlify
from lbry.schema import Claim
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.torrent.session import TorrentSession
from lbry.torrent.torrent_manager import TorrentManager
from lbry.wallet import Transaction
class FileCommands(CommandTestCase): class FileCommands(CommandTestCase):
async def initialize_torrent(self):
self.seeder_session = TorrentSession(self.loop, None)
self.addCleanup(self.seeder_session.stop)
await self.seeder_session.bind('localhost', 4040)
self.btih = await self.seeder_session.add_fake_torrent()
address = await self.account.receiving.get_or_create_usable_address()
claim = Claim()
claim.stream.update(bt_infohash=self.btih)
tx = await Transaction.claim_create(
'torrent', claim, 1, address, [self.account], self.account)
await tx.sign([self.account])
await self.broadcast(tx)
await self.confirm_tx(tx.id)
client_session = TorrentSession(self.loop, None)
self.daemon.file_manager.source_managers['torrent'] = TorrentManager(
self.loop, self.daemon.conf, client_session, self.daemon.storage, self.daemon.analytics_manager
)
await self.daemon.file_manager.source_managers['torrent'].start()
await client_session.bind('localhost', 4041)
client_session._session.add_dht_node(('localhost', 4040))
async def test_download_torrent(self):
await self.initialize_torrent()
await self.out(self.daemon.jsonrpc_get('torrent'))
async def create_streams_in_range(self, *args, **kwargs): async def create_streams_in_range(self, *args, **kwargs):
self.stream_claim_ids = [] self.stream_claim_ids = []

View file

@ -7,6 +7,8 @@ changedir = {toxinidir}/tests
setenv = setenv =
HOME=/tmp HOME=/tmp
commands = commands =
pip install -U pip
pip install -e 'git+https://github.com/shyba/libtorrent.git#egg=python-libtorrent'
pip install https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \ pip install https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \
--global-option=fetch \ --global-option=fetch \
--global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=--version --global-option=3.30.1 --global-option=--all \