Merge pull request #2179 from lbryio/fix-range-request-start-offset
start returning bytes for a range request at the requested starting position
This commit is contained in:
commit
3ed72a215e
2 changed files with 36 additions and 7 deletions
|
@ -486,6 +486,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
async def handle_stream_get_request(self, request: web.Request):
|
async def handle_stream_get_request(self, request: web.Request):
|
||||||
if not self.conf.streaming_get:
|
if not self.conf.streaming_get:
|
||||||
|
log.warning("streaming_get is disabled, rejecting request")
|
||||||
raise web.HTTPForbidden()
|
raise web.HTTPForbidden()
|
||||||
name_and_claim_id = request.path.split("/get/")[1]
|
name_and_claim_id = request.path.split("/get/")[1]
|
||||||
if "/" not in name_and_claim_id:
|
if "/" not in name_and_claim_id:
|
||||||
|
@ -501,6 +502,20 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
|
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
|
||||||
|
|
||||||
async def handle_stream_range_request(self, request: web.Request):
|
async def handle_stream_range_request(self, request: web.Request):
|
||||||
|
try:
|
||||||
|
return await self._handle_stream_range_request(request)
|
||||||
|
except web.HTTPException as err:
|
||||||
|
log.warning("http code during /stream range request: %s", err)
|
||||||
|
raise err
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
log.debug("/stream range request cancelled")
|
||||||
|
except Exception:
|
||||||
|
log.exception("error handling /stream range request")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
log.debug("finished handling /stream range request")
|
||||||
|
|
||||||
|
async def _handle_stream_range_request(self, request: web.Request):
|
||||||
sd_hash = request.path.split("/stream/")[1]
|
sd_hash = request.path.split("/stream/")[1]
|
||||||
if not self.stream_manager.started.is_set():
|
if not self.stream_manager.started.is_set():
|
||||||
await self.stream_manager.started.wait()
|
await self.stream_manager.started.wait()
|
||||||
|
|
|
@ -3,7 +3,7 @@ import asyncio
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
import binascii
|
import binascii
|
||||||
from aiohttp.web import Request, StreamResponse
|
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
|
||||||
from lbrynet.utils import generate_id
|
from lbrynet.utils import generate_id
|
||||||
from lbrynet.error import DownloadSDTimeout
|
from lbrynet.error import DownloadSDTimeout
|
||||||
from lbrynet.schema.mime_types import guess_media_type
|
from lbrynet.schema.mime_types import guess_media_type
|
||||||
|
@ -316,8 +316,10 @@ class ManagedStream:
|
||||||
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
|
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
|
||||||
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
|
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
|
||||||
self.sd_hash[:6])
|
self.sd_hash[:6])
|
||||||
|
headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
|
||||||
|
request.headers.get('range', 'bytes=0-')
|
||||||
|
)
|
||||||
await self.start(node)
|
await self.start(node)
|
||||||
headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-'))
|
|
||||||
response = StreamResponse(
|
response = StreamResponse(
|
||||||
status=206,
|
status=206,
|
||||||
headers=headers
|
headers=headers
|
||||||
|
@ -327,11 +329,16 @@ class ManagedStream:
|
||||||
self.streaming.set()
|
self.streaming.set()
|
||||||
try:
|
try:
|
||||||
wrote = 0
|
wrote = 0
|
||||||
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2):
|
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=self.STREAMING_ID):
|
||||||
|
if not wrote:
|
||||||
|
decrypted = decrypted[first_blob_start_offset:]
|
||||||
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
|
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
|
||||||
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151)))
|
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151)))
|
||||||
|
log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1,
|
||||||
|
len(self.descriptor.blobs) - 1)
|
||||||
await response.write_eof(decrypted)
|
await response.write_eof(decrypted)
|
||||||
else:
|
else:
|
||||||
|
log.debug("sending browser blob (%i/%i)", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
||||||
await response.write(decrypted)
|
await response.write(decrypted)
|
||||||
wrote += len(decrypted)
|
wrote += len(decrypted)
|
||||||
log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
|
log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
|
||||||
|
@ -491,7 +498,7 @@ class ManagedStream:
|
||||||
return
|
return
|
||||||
await asyncio.sleep(1, loop=self.loop)
|
await asyncio.sleep(1, loop=self.loop)
|
||||||
|
|
||||||
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
|
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
|
||||||
if '=' in get_range:
|
if '=' in get_range:
|
||||||
get_range = get_range.split('=')[1]
|
get_range = get_range.split('=')[1]
|
||||||
start, end = get_range.split('-')
|
start, end = get_range.split('-')
|
||||||
|
@ -509,16 +516,23 @@ class ManagedStream:
|
||||||
log.debug("estimating stream size")
|
log.debug("estimating stream size")
|
||||||
|
|
||||||
start = int(start)
|
start = int(start)
|
||||||
|
if not 0 <= start < size:
|
||||||
|
raise HTTPRequestRangeNotSatisfiable()
|
||||||
|
|
||||||
end = int(end) if end else size - 1
|
end = int(end) if end else size - 1
|
||||||
|
|
||||||
|
if end >= size:
|
||||||
|
raise HTTPRequestRangeNotSatisfiable()
|
||||||
|
|
||||||
skip_blobs = start // 2097150
|
skip_blobs = start // 2097150
|
||||||
skip = skip_blobs * 2097151
|
skip = skip_blobs * 2097151
|
||||||
start = skip
|
skip_first_blob = start - skip
|
||||||
|
start = skip_first_blob + skip
|
||||||
final_size = end - start + 1
|
final_size = end - start + 1
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
'Accept-Ranges': 'bytes',
|
'Accept-Ranges': 'bytes',
|
||||||
'Content-Range': f'bytes {start}-{end}/{size}',
|
'Content-Range': f'bytes {start}-{end}/{size}',
|
||||||
'Content-Length': str(final_size),
|
'Content-Length': str(final_size),
|
||||||
'Content-Type': self.mime_type
|
'Content-Type': self.mime_type
|
||||||
}
|
}
|
||||||
return headers, size, skip_blobs
|
return headers, size, skip_blobs, skip_first_blob
|
||||||
|
|
Loading…
Add table
Reference in a new issue