diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 0bd058529..2df515d94 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -5,7 +5,7 @@ import logging import random from hashlib import sha1 from tempfile import mkdtemp -from typing import Optional +from typing import Optional, Tuple import libtorrent @@ -31,9 +31,13 @@ class TorrentHandle: self.total_wanted_done = 0 self.name = '' self.tasks = [] - self.torrent_file: Optional[libtorrent.file_storage] = None + self._torrent_info: libtorrent.torrent_info = handle.torrent_file() self._base_path = None + @property + def torrent_file(self) -> Optional[libtorrent.file_storage]: + return self._torrent_info.files() + @property def largest_file(self) -> Optional[str]: if self.torrent_file is None: @@ -58,6 +62,25 @@ class TorrentHandle: while self.tasks: self.tasks.pop().cancel() + def byte_range_to_piece_range( + self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]: + start_piece = self._torrent_info.map_file(file_index, start_offset, 0) + end_piece = self._torrent_info.map_file(file_index, end_offset, 0) + return start_piece, end_piece + + async def stream_range_as_completed(self, file_index, start, end): + first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) + start_piece_offset = final_piece.start + piece_size = self._torrent_info.piece_length() + log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s", + first_piece.piece, final_piece.piece, start, end, self.name) + for piece_index in range(first_piece.piece, final_piece.piece + 1): + while not self._handle.have_piece(piece_index): + log.info("Waiting for piece %d: %s", piece_index, self.name) + await asyncio.sleep(0.2) + log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name) + yield piece_size - start_piece_offset + def _show_status(self): # fixme: cleanup if not self._handle.is_valid(): @@ -69,8 +92,8 @@ class TorrentHandle: self.name = status.name if not self.metadata_completed.is_set(): self.metadata_completed.set() + self._torrent_info = self._handle.torrent_file() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) - self.torrent_file = self._handle.torrent_file().files() self._base_path = status.save_path first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) if not self.started.is_set(): @@ -220,6 +243,10 @@ class TorrentSession: def is_completed(self, btih): return self._handles[btih].finished.is_set() + def stream_largest_file(self, btih, start, end): + handle = self._handles[btih] + return handle.stream_range_as_completed(handle.largest_file_index, start, end) + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index a884caf9a..da839850c 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -94,7 +94,7 @@ class TorrentSource(ManagedDownloadSource): async def stream_file(self, request): log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id, self.identifier[:6]) - headers, size, start, end = self._prepare_range_response_headers( + headers, start, end = self._prepare_range_response_headers( request.headers.get('range', 'bytes=0-') ) await self.start() @@ -105,9 +105,13 @@ class TorrentSource(ManagedDownloadSource): await response.prepare(request) with open(self.full_path, 'rb') as infile: infile.seek(start) - await response.write_eof(infile.read(size)) + async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end): + if start + read_size < end: + await response.write(infile.read(read_size)) + else: + await response.write_eof(infile.read(end - infile.tell())) - def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]: + def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: if '=' in get_range: get_range = get_range.split('=')[1] start, end = get_range.split('-') @@ -126,7 +130,7 @@ class TorrentSource(ManagedDownloadSource): 'Content-Length': str(final_size), 'Content-Type': self.mime_type } - return headers, final_size, start, end + return headers, start, end class TorrentManager(SourceManager):