diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index cf9106731..208392b76 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -4,9 +4,11 @@ import logging import os import typing from typing import Optional -from aiohttp.web import Request +from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable + from lbry.file.source_manager import SourceManager from lbry.file.source import ManagedDownloadSource +from lbry.schema.mime_types import guess_media_type if typing.TYPE_CHECKING: from lbry.torrent.session import TorrentSession @@ -49,6 +51,10 @@ class TorrentSource(ManagedDownloadSource): self.download_directory = os.path.dirname(full_path) return full_path + @property + def mime_type(self) -> Optional[str]: + return guess_media_type(os.path.basename(self.full_path))[0] + async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): await self.torrent_session.add_torrent(self.identifier, self.download_directory) @@ -62,6 +68,10 @@ class TorrentSource(ManagedDownloadSource): def torrent_length(self): return self.torrent_session.get_size(self.identifier) + @property + def stream_length(self): + return os.path.getsize(self.full_path) + @property def written_bytes(self): return self.torrent_session.get_downloaded(self.identifier) @@ -81,6 +91,43 @@ class TorrentSource(ManagedDownloadSource): def completed(self): return self.torrent_session.is_completed(self.identifier) + async def stream_file(self, request): + log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id, + self.identifier[:6]) + headers, size, start, end = self._prepare_range_response_headers( + request.headers.get('range', 'bytes=0-') + ) + await self.start() + response = StreamResponse( + status=206, + headers=headers + ) + await response.prepare(request) + with open(self.full_path, 'rb') as infile: + infile.seek(start) + await response.write_eof(infile.read(size)) + + def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]: + if '=' in get_range: + get_range = get_range.split('=')[1] + start, end = get_range.split('-') + size = self.stream_length + + start = int(start) + end = int(end) if end else size - 1 + + if end >= size or not 0 <= start < size: + raise HTTPRequestRangeNotSatisfiable() + + final_size = end - start + 1 + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start}-{end}/{size}', + 'Content-Length': str(final_size), + 'Content-Type': self.mime_type + } + return headers, final_size, start, end + class TorrentManager(SourceManager): _sources: typing.Dict[str, ManagedDownloadSource] @@ -136,5 +183,5 @@ class TorrentManager(SourceManager): # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) # await self.storage.delete_stream(source.descriptor) - async def stream_partial_content(self, request: Request, sd_hash: str): - raise NotImplementedError + async def stream_partial_content(self, request: Request, identifier: str): + return await self._sources[identifier].stream_file(request)