add tests for streaming, fix bugs
This commit is contained in:
parent
7746ded9b6
commit
37adc59b37
3 changed files with 31 additions and 6 deletions
|
@ -26,7 +26,7 @@ class TorrentHandle:
|
||||||
self.started = asyncio.Event(loop=loop)
|
self.started = asyncio.Event(loop=loop)
|
||||||
self.finished = asyncio.Event(loop=loop)
|
self.finished = asyncio.Event(loop=loop)
|
||||||
self.metadata_completed = asyncio.Event(loop=loop)
|
self.metadata_completed = asyncio.Event(loop=loop)
|
||||||
self.size = 0
|
self.size = handle.status().total_wanted
|
||||||
self.total_wanted_done = 0
|
self.total_wanted_done = 0
|
||||||
self.name = ''
|
self.name = ''
|
||||||
self.tasks = []
|
self.tasks = []
|
||||||
|
@ -70,10 +70,10 @@ class TorrentHandle:
|
||||||
|
|
||||||
async def stream_range_as_completed(self, file_index, start, end):
|
async def stream_range_as_completed(self, file_index, start, end):
|
||||||
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
|
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
|
||||||
start_piece_offset = final_piece.start
|
start_piece_offset = first_piece.start
|
||||||
piece_size = self._torrent_info.piece_length()
|
piece_size = self._torrent_info.piece_length()
|
||||||
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s",
|
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
|
||||||
first_piece.piece, final_piece.piece, start, end, self.name)
|
first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
|
||||||
self.prioritize(file_index, start, end)
|
self.prioritize(file_index, start, end)
|
||||||
await self.resume()
|
await self.resume()
|
||||||
for piece_index in range(first_piece.piece, final_piece.piece + 1):
|
for piece_index in range(first_piece.piece, final_piece.piece + 1):
|
||||||
|
|
|
@ -113,10 +113,11 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
with open(self.full_path, 'rb') as infile:
|
with open(self.full_path, 'rb') as infile:
|
||||||
infile.seek(start)
|
infile.seek(start)
|
||||||
async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end):
|
async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end):
|
||||||
if start + read_size < end:
|
if infile.tell() + read_size < end:
|
||||||
await response.write(infile.read(read_size))
|
await response.write(infile.read(read_size))
|
||||||
else:
|
else:
|
||||||
await response.write_eof(infile.read(end - infile.tell()))
|
await response.write_eof(infile.read(end - infile.tell() + 1))
|
||||||
|
return response
|
||||||
|
|
||||||
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
|
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
|
||||||
if '=' in get_range:
|
if '=' in get_range:
|
||||||
|
|
|
@ -4,11 +4,14 @@ import asyncio
|
||||||
import os
|
import os
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
|
|
||||||
|
import aiohttp.web
|
||||||
|
|
||||||
from lbry.schema import Claim
|
from lbry.schema import Claim
|
||||||
from lbry.stream.background_downloader import BackgroundDownloader
|
from lbry.stream.background_downloader import BackgroundDownloader
|
||||||
from lbry.stream.descriptor import StreamDescriptor
|
from lbry.stream.descriptor import StreamDescriptor
|
||||||
from lbry.testcase import CommandTestCase
|
from lbry.testcase import CommandTestCase
|
||||||
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
||||||
|
from lbry.utils import aiohttp_request
|
||||||
from lbry.wallet import Transaction
|
from lbry.wallet import Transaction
|
||||||
from lbry.torrent.tracker import UDPTrackerServerProtocol
|
from lbry.torrent.tracker import UDPTrackerServerProtocol
|
||||||
|
|
||||||
|
@ -51,6 +54,23 @@ class FileCommands(CommandTestCase):
|
||||||
self.addCleanup(task.cancel)
|
self.addCleanup(task.cancel)
|
||||||
return tx, btih
|
return tx, btih
|
||||||
|
|
||||||
|
async def assert_torrent_streaming_works(self, btih):
|
||||||
|
url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/torrent'
|
||||||
|
if self.daemon.streaming_runner.server is None:
|
||||||
|
await self.daemon.streaming_runner.setup()
|
||||||
|
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
|
||||||
|
self.daemon.conf.streaming_port)
|
||||||
|
await site.start()
|
||||||
|
async with aiohttp_request('get', url) as req:
|
||||||
|
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
|
||||||
|
content_range = req.headers.get('Content-Range')
|
||||||
|
content_length = int(req.headers.get('Content-Length'))
|
||||||
|
streamed_bytes = await req.content.read()
|
||||||
|
expected_size = self.seeder_session.get_size(btih)
|
||||||
|
self.assertEqual(expected_size, len(streamed_bytes))
|
||||||
|
self.assertEqual(content_length, len(streamed_bytes))
|
||||||
|
self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range)
|
||||||
|
|
||||||
@skipIf(TorrentSession is None, "libtorrent not installed")
|
@skipIf(TorrentSession is None, "libtorrent not installed")
|
||||||
async def test_download_torrent(self):
|
async def test_download_torrent(self):
|
||||||
tx, btih = await self.initialize_torrent()
|
tx, btih = await self.initialize_torrent()
|
||||||
|
@ -61,6 +81,10 @@ class FileCommands(CommandTestCase):
|
||||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih)
|
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih)
|
||||||
self.assertIn(btih, self.client_session._handles)
|
self.assertIn(btih, self.client_session._handles)
|
||||||
|
|
||||||
|
# stream over streaming API (full range of the largest file)
|
||||||
|
await self.assert_torrent_streaming_works(btih)
|
||||||
|
|
||||||
tx, new_btih = await self.initialize_torrent(tx)
|
tx, new_btih = await self.initialize_torrent(tx)
|
||||||
self.assertNotEqual(btih, new_btih)
|
self.assertNotEqual(btih, new_btih)
|
||||||
# claim now points to another torrent, update to it
|
# claim now points to another torrent, update to it
|
||||||
|
|
Loading…
Add table
Reference in a new issue