From f0e17cff9a96af73591dce8f72dd81fb2a55a1d4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 30 Mar 2019 21:08:34 -0400 Subject: [PATCH] support streaming downloads / range requests --- lbrynet/extras/daemon/Daemon.py | 55 ++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 22b184b2b..7af1ad21f 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -272,6 +272,8 @@ class Daemon(metaclass=JSONRPCServerType): app = web.Application() app.router.add_get('/lbryapi', self.handle_old_jsonrpc) app.router.add_post('/lbryapi', self.handle_old_jsonrpc) + app.router.add_get('/streams', self.handle_streams_index) + app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request) app.router.add_post('/', self.handle_old_jsonrpc) self.runner = web.AppRunner(app) @@ -452,6 +454,57 @@ class Daemon(metaclass=JSONRPCServerType): content_type='application/json' ) + async def handle_streams_index(self, request: web.Request): + return web.Response( + body="", + content_type='text/html' + ) + + async def handle_stream_range_request(self, request: web.Request): + sd_hash = request.path.split("/stream/")[1] + if sd_hash not in self.stream_manager.streams: + return web.HTTPNotFound() + stream = self.stream_manager.streams[sd_hash] + + get_range = request.headers.get('range', 'bytes=0-') + if '=' in get_range: + get_range = get_range.split('=')[1] + start, end = get_range.split('-') + size = 0 + await self.stream_manager.start_stream(stream) + for blob in stream.descriptor.blobs[:-1]: + size += 2097152 - 1 if blob.length == 2097152 else blob.length + size -= 15 # last padding is unguessable + + start = int(start) + end = int(end) if end else size - 1 + skip_blobs = start // 2097150 + skip = skip_blobs * 2097151 + start = skip + final_size = end - start + 1 + + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start}-{end}/{size}', + 'Content-Length': str(final_size), + 'Content-Type': stream.mime_type + } + + if stream.delayed_stop: + stream.delayed_stop.cancel() + response = web.StreamResponse( + status=206, + headers=headers + ) + await response.prepare(request) + async for blob_info, decrypted in stream.aiter_read_stream(skip_blobs): + await response.write(decrypted) + log.info("sent browser blob %i/%i", blob_info.blob_num + 1, len(stream.descriptor.blobs) - 1) + return response + async def _process_rpc_call(self, data): args = data.get('params', {}) @@ -844,7 +897,7 @@ class Daemon(metaclass=JSONRPCServerType): """ try: stream = await self.stream_manager.download_stream_from_uri( - uri, self.exchange_rate_manager, file_name, timeout + uri, timeout, self.exchange_rate_manager, file_name ) if not stream: raise DownloadSDTimeout(uri)