add streaming_server
and streaming_get
settings
-split rpc runner from range request runner
This commit is contained in:
parent
46ee65c8f2
commit
41abfbdd9b
5 changed files with 168 additions and 129 deletions
222
docs/api.json
222
docs/api.json
File diff suppressed because one or more lines are too long
|
@ -546,6 +546,21 @@ class Config(CLIConfig):
|
||||||
previous_names=['upload_log', 'upload_log', 'share_debug_info']
|
previous_names=['upload_log', 'upload_log', 'share_debug_info']
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# media server
|
||||||
|
|
||||||
|
streaming_server = String('Host name and port to serve streaming media over range requests',
|
||||||
|
'localhost:5280', metavar='HOST:PORT')
|
||||||
|
streaming_get = Toggle("Enable the /get endpoint for the streaming media server. "
|
||||||
|
"Disable to prevent new streams from being added.", True)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def streaming_host(self):
|
||||||
|
return self.streaming_server.split(':')[0]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def streaming_port(self):
|
||||||
|
return int(self.streaming_server.split(':')[1])
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.set_default_paths()
|
self.set_default_paths()
|
||||||
|
|
|
@ -270,14 +270,17 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
self.stop_event = asyncio.Event()
|
self.stop_event = asyncio.Event()
|
||||||
|
|
||||||
logging.getLogger('aiohttp.access').setLevel(logging.WARN)
|
logging.getLogger('aiohttp.access').setLevel(logging.WARN)
|
||||||
app = web.Application()
|
rpc_app = web.Application()
|
||||||
app.router.add_get('/lbryapi', self.handle_old_jsonrpc)
|
rpc_app.router.add_get('/lbryapi', self.handle_old_jsonrpc)
|
||||||
app.router.add_post('/lbryapi', self.handle_old_jsonrpc)
|
rpc_app.router.add_post('/lbryapi', self.handle_old_jsonrpc)
|
||||||
app.router.add_get('/get/{claim_name}', self.handle_stream_get_request)
|
rpc_app.router.add_post('/', self.handle_old_jsonrpc)
|
||||||
app.router.add_get('/get/{claim_name}/{claim_id}', self.handle_stream_get_request)
|
self.rpc_runner = web.AppRunner(rpc_app)
|
||||||
app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request)
|
|
||||||
app.router.add_post('/', self.handle_old_jsonrpc)
|
streaming_app = web.Application()
|
||||||
self.runner = web.AppRunner(app)
|
streaming_app.router.add_get('/get/{claim_name}', self.handle_stream_get_request)
|
||||||
|
streaming_app.router.add_get('/get/{claim_name}/{claim_id}', self.handle_stream_get_request)
|
||||||
|
streaming_app.router.add_get('/stream/{sd_hash}', self.handle_stream_range_request)
|
||||||
|
self.streaming_runner = web.AppRunner(streaming_app)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def dht_node(self) -> typing.Optional['Node']:
|
def dht_node(self) -> typing.Optional['Node']:
|
||||||
|
@ -400,12 +403,20 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
|
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
|
||||||
log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2))
|
log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2))
|
||||||
await self.analytics_manager.send_server_startup()
|
await self.analytics_manager.send_server_startup()
|
||||||
await self.runner.setup()
|
await self.rpc_runner.setup()
|
||||||
|
await self.streaming_runner.setup()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
site = web.TCPSite(self.runner, self.conf.api_host, self.conf.api_port, shutdown_timeout=.5)
|
rpc_site = web.TCPSite(self.rpc_runner, self.conf.api_host, self.conf.api_port, shutdown_timeout=.5)
|
||||||
await site.start()
|
await rpc_site.start()
|
||||||
log.info('lbrynet API listening on TCP %s:%i', *site._server.sockets[0].getsockname()[:2])
|
log.info('lbrynet API listening on TCP %s:%i', *rpc_site._server.sockets[0].getsockname()[:2])
|
||||||
|
|
||||||
|
streaming_site = web.TCPSite(self.streaming_runner, self.conf.streaming_host, self.conf.streaming_port,
|
||||||
|
shutdown_timeout=.5)
|
||||||
|
await streaming_site.start()
|
||||||
|
log.info('lbrynet media server listening on TCP %s:%i',
|
||||||
|
*streaming_site._server.sockets[0].getsockname()[:2])
|
||||||
|
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
log.error('lbrynet API failed to bind TCP %s for listening. Daemon is already running or this port is '
|
log.error('lbrynet API failed to bind TCP %s for listening. Daemon is already running or this port is '
|
||||||
'already in use by another application.', self.conf.api)
|
'already in use by another application.', self.conf.api)
|
||||||
|
@ -441,8 +452,10 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
self.component_startup_task.cancel()
|
self.component_startup_task.cancel()
|
||||||
log.info("stopped api components")
|
log.info("stopped api components")
|
||||||
if shutdown_runner:
|
if shutdown_runner:
|
||||||
await self.runner.shutdown()
|
await self.rpc_runner.shutdown()
|
||||||
await self.runner.cleanup()
|
await self.streaming_runner.shutdown()
|
||||||
|
await self.rpc_runner.cleanup()
|
||||||
|
await self.streaming_runner.cleanup()
|
||||||
log.info("stopped api server")
|
log.info("stopped api server")
|
||||||
if self.analytics_manager.is_started:
|
if self.analytics_manager.is_started:
|
||||||
self.analytics_manager.stop()
|
self.analytics_manager.stop()
|
||||||
|
@ -471,6 +484,8 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
)
|
)
|
||||||
|
|
||||||
async def handle_stream_get_request(self, request: web.Request):
|
async def handle_stream_get_request(self, request: web.Request):
|
||||||
|
if not self.conf.streaming_get:
|
||||||
|
raise web.HTTPForbidden()
|
||||||
name_and_claim_id = request.path.split("/get/")[1]
|
name_and_claim_id = request.path.split("/get/")[1]
|
||||||
if "/" not in name_and_claim_id:
|
if "/" not in name_and_claim_id:
|
||||||
uri = f"lbry://{name_and_claim_id}"
|
uri = f"lbry://{name_and_claim_id}"
|
||||||
|
|
|
@ -213,7 +213,7 @@ class ManagedStream:
|
||||||
download_directory = None
|
download_directory = None
|
||||||
written_bytes = None
|
written_bytes = None
|
||||||
return {
|
return {
|
||||||
'streaming_url': f"http://{self.config.api_host}:{self.config.api_port}/stream/{self.sd_hash}",
|
'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}",
|
||||||
'completed': (self.output_file_exists and self.status in ('stopped', 'finished')) or all(
|
'completed': (self.output_file_exists and self.status in ('stopped', 'finished')) or all(
|
||||||
self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]),
|
self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]),
|
||||||
'file_name': file_name,
|
'file_name': file_name,
|
||||||
|
|
|
@ -35,14 +35,15 @@ class RangeRequests(CommandTestCase):
|
||||||
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo')
|
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo')
|
||||||
self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||||
# await self._restart_stream_manager()
|
# await self._restart_stream_manager()
|
||||||
await self.daemon.runner.setup()
|
await self.daemon.streaming_runner.setup()
|
||||||
site = aiohttp.web.TCPSite(self.daemon.runner, self.daemon.conf.api_host, self.daemon.conf.api_port)
|
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
|
||||||
|
self.daemon.conf.streaming_port)
|
||||||
await site.start()
|
await site.start()
|
||||||
self.assertListEqual(self.daemon.jsonrpc_file_list(), [])
|
self.assertListEqual(self.daemon.jsonrpc_file_list(), [])
|
||||||
|
|
||||||
async def _test_range_requests(self):
|
async def _test_range_requests(self):
|
||||||
name = 'foo'
|
name = 'foo'
|
||||||
url = f'http://{self.daemon.conf.api_host}:{self.daemon.conf.api_port}/get/{name}'
|
url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/{name}'
|
||||||
|
|
||||||
async with aiohttp_request('get', url) as req:
|
async with aiohttp_request('get', url) as req:
|
||||||
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
|
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
|
||||||
|
@ -104,6 +105,14 @@ class RangeRequests(CommandTestCase):
|
||||||
((MAX_BLOB_SIZE - 1) * 4) - 15, padding=b'\x00' * 15
|
((MAX_BLOB_SIZE - 1) * 4) - 15, padding=b'\x00' * 15
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def test_forbidden(self):
|
||||||
|
self.data = get_random_bytes(1000)
|
||||||
|
await self._setup_stream(self.data, file_size=1000)
|
||||||
|
url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/foo'
|
||||||
|
self.daemon.conf.streaming_get = False
|
||||||
|
async with aiohttp_request('get', url) as req:
|
||||||
|
self.assertEqual(403, req.status)
|
||||||
|
|
||||||
async def test_range_requests_last_block_of_last_blob_padding(self):
|
async def test_range_requests_last_block_of_last_blob_padding(self):
|
||||||
self.data = get_random_bytes(((MAX_BLOB_SIZE - 1) * 4) - 16)
|
self.data = get_random_bytes(((MAX_BLOB_SIZE - 1) * 4) - 16)
|
||||||
await self._setup_stream(self.data)
|
await self._setup_stream(self.data)
|
||||||
|
|
Loading…
Reference in a new issue