lbry-sdk/lbry/service/daemon.py

190 lines
6.7 KiB
Python
Raw Normal View History

2020-05-01 15:33:58 +02:00
import json
2020-08-20 16:43:44 +02:00
import signal
2020-05-01 15:33:58 +02:00
import asyncio
import logging
from weakref import WeakSet
2020-10-16 18:52:57 +02:00
from functools import partial
2020-08-20 16:43:44 +02:00
from asyncio.runners import _cancel_all_tasks
2020-10-16 18:52:57 +02:00
from typing import Type, List, Dict, Tuple
2020-08-20 16:43:44 +02:00
2020-05-01 15:33:58 +02:00
from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite, Response
from aiohttp.http_websocket import WSMsgType, WSCloseCode
2020-09-17 01:50:51 +02:00
from lbry.conf import Config
from lbry.console import Console, console_class_from_name
from lbry.service import API, Service
2020-06-05 06:35:22 +02:00
from lbry.service.json_encoder import JSONResponseEncoder
2020-09-17 01:50:51 +02:00
from lbry.blockchain.ledger import ledger_class_from_name
2020-10-16 18:52:57 +02:00
from lbry.event import BroadcastSubscription
2020-05-01 15:33:58 +02:00
2020-08-20 16:43:44 +02:00
log = logging.getLogger(__name__)
def jsonrpc_dumps_pretty(obj, message_id=None, **kwargs):
2020-06-05 06:35:22 +02:00
#if not isinstance(obj, dict):
# data = {"jsonrpc": "2.0", "error": obj.to_dict()}
#else:
data = {"jsonrpc": "2.0", "result": obj}
if message_id is not None:
data["id"] = message_id
2020-05-01 15:33:58 +02:00
return json.dumps(data, cls=JSONResponseEncoder, sort_keys=True, indent=2, **kwargs) + "\n"
class WebSocketLogHandler(logging.Handler):
def __init__(self, send_message):
super().__init__()
self.send_message = send_message
def emit(self, record):
try:
self.send_message({
'type': 'log',
'name': record.name,
'message': self.format(record)
})
2020-06-05 06:35:22 +02:00
except Exception:
2020-05-01 15:33:58 +02:00
self.handleError(record)
class WebSocketManager(WebSocketResponse):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class Daemon:
2020-09-11 20:08:06 +02:00
"""
Mostly connects API to aiohttp stuff.
Handles starting and stopping API
"""
2020-09-17 01:50:51 +02:00
def __init__(self, service: Service, console: Console):
self._loop = asyncio.get_running_loop()
2020-05-01 15:33:58 +02:00
self.service = service
self.conf = service.conf
2020-05-21 00:05:13 +02:00
self.console = console
2020-05-01 15:33:58 +02:00
self.api = API(service)
self.app = Application()
self.app['websockets'] = WeakSet()
2020-10-16 18:52:57 +02:00
self.app['subscriptions']: Dict[str, Tuple[BroadcastSubscription, WeakSet]] = {}
2020-05-01 15:33:58 +02:00
self.app.router.add_get('/ws', self.on_connect)
2020-07-21 05:48:27 +02:00
self.app.router.add_post('/api', self.on_rpc)
2020-05-01 15:33:58 +02:00
self.app.on_shutdown.append(self.on_shutdown)
self.runner = AppRunner(self.app)
2020-09-16 22:40:05 +02:00
@classmethod
2020-09-17 01:50:51 +02:00
def from_config(cls, service_class: Type[Service], conf: Config, ) -> 'Daemon':
async def setup():
ledger_class = ledger_class_from_name(conf.blockchain)
ledger = ledger_class(conf)
service = service_class(ledger)
console_class = console_class_from_name(conf.console)
console = console_class(service)
return cls(service, console)
return asyncio.new_event_loop().run_until_complete(setup())
2020-09-16 22:40:05 +02:00
def run(self):
2020-08-20 16:43:44 +02:00
for sig in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
2020-09-16 22:40:05 +02:00
self._loop.add_signal_handler(sig, self._loop.stop)
2020-05-21 00:05:13 +02:00
try:
2020-09-16 22:40:05 +02:00
self._loop.run_until_complete(self.start())
self._loop.run_forever()
2020-05-21 00:05:13 +02:00
finally:
2020-08-20 16:43:44 +02:00
try:
2020-09-16 22:40:05 +02:00
self._loop.run_until_complete(self.stop())
2020-08-20 16:43:44 +02:00
finally:
try:
2020-09-16 22:40:05 +02:00
_cancel_all_tasks(self._loop)
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
2020-08-20 16:43:44 +02:00
finally:
2020-09-16 22:40:05 +02:00
self._loop.close()
2020-05-21 00:05:13 +02:00
2020-05-01 15:33:58 +02:00
async def start(self):
2020-05-21 00:05:13 +02:00
self.console.starting()
2020-05-01 15:33:58 +02:00
await self.runner.setup()
site = TCPSite(self.runner, self.conf.api_host, self.conf.api_port)
2020-05-01 15:33:58 +02:00
await site.start()
await self.service.start()
async def stop(self):
await self.service.stop()
await self.runner.cleanup()
async def on_rpc(self, request):
data = await request.json()
params = data.get('params', {})
method = getattr(self.api, data['method'])
2020-09-19 01:18:48 +02:00
try:
result = await method(**params)
encoded_result = jsonrpc_dumps_pretty(result, service=self.service)
return Response(
text=encoded_result,
content_type='application/json'
)
except Exception as e:
log.exception("RPC error")
raise e
2020-05-01 15:33:58 +02:00
async def on_connect(self, request):
web_socket = WebSocketManager()
await web_socket.prepare(request)
self.app['websockets'].add(web_socket)
try:
async for msg in web_socket:
if msg.type == WSMsgType.TEXT:
asyncio.create_task(self.on_message(web_socket, msg.json()))
elif msg.type == WSMsgType.ERROR:
print('web socket connection closed with exception %s' %
web_socket.exception())
finally:
self.app['websockets'].discard(web_socket)
return web_socket
async def on_message(self, web_socket: WebSocketManager, msg: dict):
2020-07-07 16:52:41 +02:00
if msg['method'] == 'subscribe':
streams = msg['params']
2020-05-01 15:33:58 +02:00
if isinstance(streams, str):
streams = [streams]
2020-10-16 18:52:57 +02:00
await self.on_subscribe(web_socket, streams)
2020-07-07 16:52:41 +02:00
else:
params = msg.get('params', {})
method = getattr(self.api, msg['method'])
2020-09-03 01:38:30 +02:00
try:
result = await method(**params)
encoded_result = jsonrpc_dumps_pretty(
result, message_id=msg.get('id', ''), service=self.service
)
await web_socket.send_str(encoded_result)
2020-09-03 01:38:30 +02:00
except Exception as e:
log.exception("RPC error")
await web_socket.send_json({'id': msg.get('id', ''), 'result': "unexpected error: " + str(e)})
raise e
2020-05-01 15:33:58 +02:00
2020-10-16 18:52:57 +02:00
async def on_subscribe(self, web_socket: WebSocketManager, events: List[str]):
for event_name in events:
if event_name not in self.app["subscriptions"]:
event_stream = self.conf.events.get(event_name)
subscribers = WeakSet()
event_stream.listen(partial(self.broadcast_event, event_name, subscribers))
self.app["subscriptions"][event_name] = {
"stream": event_stream,
"subscribers": subscribers
}
else:
subscribers = self.app["subscriptions"][event_name]["subscribers"]
subscribers.add(web_socket)
def broadcast_event(self, event_name, subscribers, payload):
for web_socket in subscribers:
asyncio.create_task(web_socket.send_json({
'event': event_name, 'payload': payload
}))
2020-05-01 15:33:58 +02:00
@staticmethod
async def on_shutdown(app):
for web_socket in set(app['websockets']):
await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')