stream torrent from file
This commit is contained in:
parent
63784622e9
commit
8212e73c2e
1 changed files with 50 additions and 3 deletions
|
@ -4,9 +4,11 @@ import logging
|
||||||
import os
|
import os
|
||||||
import typing
|
import typing
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from aiohttp.web import Request
|
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
|
||||||
|
|
||||||
from lbry.file.source_manager import SourceManager
|
from lbry.file.source_manager import SourceManager
|
||||||
from lbry.file.source import ManagedDownloadSource
|
from lbry.file.source import ManagedDownloadSource
|
||||||
|
from lbry.schema.mime_types import guess_media_type
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.torrent.session import TorrentSession
|
from lbry.torrent.session import TorrentSession
|
||||||
|
@ -49,6 +51,10 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
self.download_directory = os.path.dirname(full_path)
|
self.download_directory = os.path.dirname(full_path)
|
||||||
return full_path
|
return full_path
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mime_type(self) -> Optional[str]:
|
||||||
|
return guess_media_type(os.path.basename(self.full_path))[0]
|
||||||
|
|
||||||
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
|
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
|
||||||
await self.torrent_session.add_torrent(self.identifier, self.download_directory)
|
await self.torrent_session.add_torrent(self.identifier, self.download_directory)
|
||||||
|
|
||||||
|
@ -62,6 +68,10 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
def torrent_length(self):
|
def torrent_length(self):
|
||||||
return self.torrent_session.get_size(self.identifier)
|
return self.torrent_session.get_size(self.identifier)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream_length(self):
|
||||||
|
return os.path.getsize(self.full_path)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def written_bytes(self):
|
def written_bytes(self):
|
||||||
return self.torrent_session.get_downloaded(self.identifier)
|
return self.torrent_session.get_downloaded(self.identifier)
|
||||||
|
@ -81,6 +91,43 @@ class TorrentSource(ManagedDownloadSource):
|
||||||
def completed(self):
|
def completed(self):
|
||||||
return self.torrent_session.is_completed(self.identifier)
|
return self.torrent_session.is_completed(self.identifier)
|
||||||
|
|
||||||
|
async def stream_file(self, request):
|
||||||
|
log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id,
|
||||||
|
self.identifier[:6])
|
||||||
|
headers, size, start, end = self._prepare_range_response_headers(
|
||||||
|
request.headers.get('range', 'bytes=0-')
|
||||||
|
)
|
||||||
|
await self.start()
|
||||||
|
response = StreamResponse(
|
||||||
|
status=206,
|
||||||
|
headers=headers
|
||||||
|
)
|
||||||
|
await response.prepare(request)
|
||||||
|
with open(self.full_path, 'rb') as infile:
|
||||||
|
infile.seek(start)
|
||||||
|
await response.write_eof(infile.read(size))
|
||||||
|
|
||||||
|
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
|
||||||
|
if '=' in get_range:
|
||||||
|
get_range = get_range.split('=')[1]
|
||||||
|
start, end = get_range.split('-')
|
||||||
|
size = self.stream_length
|
||||||
|
|
||||||
|
start = int(start)
|
||||||
|
end = int(end) if end else size - 1
|
||||||
|
|
||||||
|
if end >= size or not 0 <= start < size:
|
||||||
|
raise HTTPRequestRangeNotSatisfiable()
|
||||||
|
|
||||||
|
final_size = end - start + 1
|
||||||
|
headers = {
|
||||||
|
'Accept-Ranges': 'bytes',
|
||||||
|
'Content-Range': f'bytes {start}-{end}/{size}',
|
||||||
|
'Content-Length': str(final_size),
|
||||||
|
'Content-Type': self.mime_type
|
||||||
|
}
|
||||||
|
return headers, final_size, start, end
|
||||||
|
|
||||||
|
|
||||||
class TorrentManager(SourceManager):
|
class TorrentManager(SourceManager):
|
||||||
_sources: typing.Dict[str, ManagedDownloadSource]
|
_sources: typing.Dict[str, ManagedDownloadSource]
|
||||||
|
@ -136,5 +183,5 @@ class TorrentManager(SourceManager):
|
||||||
# await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
# await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
||||||
# await self.storage.delete_stream(source.descriptor)
|
# await self.storage.delete_stream(source.descriptor)
|
||||||
|
|
||||||
async def stream_partial_content(self, request: Request, sd_hash: str):
|
async def stream_partial_content(self, request: Request, identifier: str):
|
||||||
raise NotImplementedError
|
return await self._sources[identifier].stream_file(request)
|
||||||
|
|
Loading…
Reference in a new issue