support streaming downloads / range requests

This commit is contained in:
Jack Robison 2019-03-30 21:08:34 -04:00
parent 93267efe0b
commit f0e17cff9a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -272,6 +272,8 @@ class Daemon(metaclass=JSONRPCServerType):
app = web.Application() app = web.Application()
app.router.add_get('/lbryapi', self.handle_old_jsonrpc) app.router.add_get('/lbryapi', self.handle_old_jsonrpc)
app.router.add_post('/lbryapi', self.handle_old_jsonrpc) app.router.add_post('/lbryapi', self.handle_old_jsonrpc)
app.router.add_get('/streams', self.handle_streams_index)
app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request)
app.router.add_post('/', self.handle_old_jsonrpc) app.router.add_post('/', self.handle_old_jsonrpc)
self.runner = web.AppRunner(app) self.runner = web.AppRunner(app)
@ -452,6 +454,57 @@ class Daemon(metaclass=JSONRPCServerType):
content_type='application/json' content_type='application/json'
) )
async def handle_streams_index(self, request: web.Request):
return web.Response(
body="<ul>" + "".join([
f'<li><a href="/stream/{sd_hash}">lbry://{stream.claim_name}</a></li>'
for sd_hash, stream in self.stream_manager.streams.items()
]) + "</ul>",
content_type='text/html'
)
async def handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1]
if sd_hash not in self.stream_manager.streams:
return web.HTTPNotFound()
stream = self.stream_manager.streams[sd_hash]
get_range = request.headers.get('range', 'bytes=0-')
if '=' in get_range:
get_range = get_range.split('=')[1]
start, end = get_range.split('-')
size = 0
await self.stream_manager.start_stream(stream)
for blob in stream.descriptor.blobs[:-1]:
size += 2097152 - 1 if blob.length == 2097152 else blob.length
size -= 15 # last padding is unguessable
start = int(start)
end = int(end) if end else size - 1
skip_blobs = start // 2097150
skip = skip_blobs * 2097151
start = skip
final_size = end - start + 1
headers = {
'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{size}',
'Content-Length': str(final_size),
'Content-Type': stream.mime_type
}
if stream.delayed_stop:
stream.delayed_stop.cancel()
response = web.StreamResponse(
status=206,
headers=headers
)
await response.prepare(request)
async for blob_info, decrypted in stream.aiter_read_stream(skip_blobs):
await response.write(decrypted)
log.info("sent browser blob %i/%i", blob_info.blob_num + 1, len(stream.descriptor.blobs) - 1)
return response
async def _process_rpc_call(self, data): async def _process_rpc_call(self, data):
args = data.get('params', {}) args = data.get('params', {})
@ -844,7 +897,7 @@ class Daemon(metaclass=JSONRPCServerType):
""" """
try: try:
stream = await self.stream_manager.download_stream_from_uri( stream = await self.stream_manager.download_stream_from_uri(
uri, self.exchange_rate_manager, file_name, timeout uri, timeout, self.exchange_rate_manager, file_name
) )
if not stream: if not stream:
raise DownloadSDTimeout(uri) raise DownloadSDTimeout(uri)