start returning range request bytes at the requested offset

This commit is contained in:
Jack Robison 2019-05-23 22:40:59 -04:00
parent b922ea8e7e
commit 85d94d4ca3
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 36 additions and 7 deletions

View file

@ -486,6 +486,7 @@ class Daemon(metaclass=JSONRPCServerType):
async def handle_stream_get_request(self, request: web.Request): async def handle_stream_get_request(self, request: web.Request):
if not self.conf.streaming_get: if not self.conf.streaming_get:
log.warning("streaming_get is disabled, rejecting request")
raise web.HTTPForbidden() raise web.HTTPForbidden()
name_and_claim_id = request.path.split("/get/")[1] name_and_claim_id = request.path.split("/get/")[1]
if "/" not in name_and_claim_id: if "/" not in name_and_claim_id:
@ -501,6 +502,20 @@ class Daemon(metaclass=JSONRPCServerType):
raise web.HTTPFound(f"/stream/{stream.sd_hash}") raise web.HTTPFound(f"/stream/{stream.sd_hash}")
async def handle_stream_range_request(self, request: web.Request): async def handle_stream_range_request(self, request: web.Request):
try:
return await self._handle_stream_range_request(request)
except web.HTTPException as err:
log.warning("http code during /stream range request: %s", err)
raise err
except asyncio.CancelledError:
log.debug("/stream range request cancelled")
except Exception:
log.exception("error handling /stream range request")
raise
finally:
log.debug("finished handling /stream range request")
async def _handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1] sd_hash = request.path.split("/stream/")[1]
if not self.stream_manager.started.is_set(): if not self.stream_manager.started.is_set():
await self.stream_manager.started.wait() await self.stream_manager.started.wait()

View file

@ -3,7 +3,7 @@ import asyncio
import typing import typing
import logging import logging
import binascii import binascii
from aiohttp.web import Request, StreamResponse from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbrynet.utils import generate_id from lbrynet.utils import generate_id
from lbrynet.error import DownloadSDTimeout from lbrynet.error import DownloadSDTimeout
from lbrynet.schema.mime_types import guess_media_type from lbrynet.schema.mime_types import guess_media_type
@ -316,8 +316,10 @@ class ManagedStream:
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
self.sd_hash[:6]) self.sd_hash[:6])
headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-')
)
await self.start(node) await self.start(node)
headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-'))
response = StreamResponse( response = StreamResponse(
status=206, status=206,
headers=headers headers=headers
@ -327,11 +329,16 @@ class ManagedStream:
self.streaming.set() self.streaming.set()
try: try:
wrote = 0 wrote = 0
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2): async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=self.STREAMING_ID):
if not wrote:
decrypted = decrypted[first_blob_start_offset:]
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151))) decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151)))
log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1,
len(self.descriptor.blobs) - 1)
await response.write_eof(decrypted) await response.write_eof(decrypted)
else: else:
log.debug("sending browser blob (%i/%i)", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
await response.write(decrypted) await response.write(decrypted)
wrote += len(decrypted) wrote += len(decrypted)
log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "", log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
@ -491,7 +498,7 @@ class ManagedStream:
return return
await asyncio.sleep(1, loop=self.loop) await asyncio.sleep(1, loop=self.loop)
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
if '=' in get_range: if '=' in get_range:
get_range = get_range.split('=')[1] get_range = get_range.split('=')[1]
start, end = get_range.split('-') start, end = get_range.split('-')
@ -509,16 +516,23 @@ class ManagedStream:
log.debug("estimating stream size") log.debug("estimating stream size")
start = int(start) start = int(start)
if not 0 <= start < size:
raise HTTPRequestRangeNotSatisfiable()
end = int(end) if end else size - 1 end = int(end) if end else size - 1
if end >= size:
raise HTTPRequestRangeNotSatisfiable()
skip_blobs = start // 2097150 skip_blobs = start // 2097150
skip = skip_blobs * 2097151 skip = skip_blobs * 2097151
start = skip skip_first_blob = start - skip
start = skip_first_blob + skip
final_size = end - start + 1 final_size = end - start + 1
headers = { headers = {
'Accept-Ranges': 'bytes', 'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{size}', 'Content-Range': f'bytes {start}-{end}/{size}',
'Content-Length': str(final_size), 'Content-Length': str(final_size),
'Content-Type': self.mime_type 'Content-Type': self.mime_type
} }
return headers, size, skip_blobs return headers, size, skip_blobs, skip_first_blob