diff --git a/lbry_comment_server/main.py b/lbry_comment_server/main.py index 1ab621c..c3e509a 100644 --- a/lbry_comment_server/main.py +++ b/lbry_comment_server/main.py @@ -1,101 +1,53 @@ import asyncio -import json - from aiohttp import web +import aiojobs.aiohttp -from settings import config -from lbry_comment_server import DATABASE -from lbry_comment_server.database import obtain_connection -from lbry_comment_server import api_endpoint +import schema.db_helpers as helpers +import lbry_comment_server.writes as writes +from lbry_comment_server.settings import config +from lbry_comment_server.handles import api_endpoint -def add_routes(app: web.Application): +async def setup_db_schema(app): + helpers.setup_database(app['db_path']) + + +async def close_comment_scheduler(app): + await app['comment_scheduler'].close() + + +def create_app(**kwargs): + app = web.Application() + app['config'] = config + app['db_path'] = config['PATH']['DEFAULT'] + app['backup'] = config['PATH']['BACKUP'] + app.on_startup.append(setup_db_schema) + app.on_shutdown.append(close_comment_scheduler) + aiojobs.aiohttp.setup(app, **kwargs) app.add_routes([web.post('/api', api_endpoint)]) + return app -class CommentServer: - def __init__(self, port=2903): - self.port = port - self.app = web.Application(debug=True) - self.app.add_routes([web.post('/api', self.api)]) - self.runner = None - self.server = None - self.db_conn = DatabaseConnection(database_dir) +async def stop_app(runner): + await runner.cleanup() - def ping(self): - return 'pong' - - methods = { - 'ping': ping, - 'get_claim_comments': None, - 'get_comment_ids': None, - 'get_comments_by_id': None, - 'create_comment': None - } - - __db_methods = { - 'get_claim_comments', - 'get_comment_ids', - 'get_comments_by_id', - 'create_comment' - } - - def process_json(self, body) -> dict: - response = {'jsonrpc': '2.0', 'id': body['id']} - if body['method'] in self.methods: - method = body['method'] - params = body.get('params', {}) - try: - if method in self.__db_methods: - result = self.db_conn.__getattribute__(method).__call__(**params) - else: - result = self.methods[method](self, **params) - response['result'] = result - except TypeError as te: - print(te) - response['error'] = ERRORS['INVALID_PARAMS'] - else: - response['error'] = ERRORS['UNKNOWN'] - return response - - async def _start(self): - self.db_conn.obtain_connection() - self.runner = web.AppRunner(self.app) - await self.runner.setup() - self.server = web.TCPSite(self.runner, 'localhost', self.port) - await self.server.start() - - async def _stop(self): - self.db_conn.connection.close() - await self.runner.cleanup() - - async def run(self, max_timeout=3600): - try: - await self._start() - await asyncio.sleep(max_timeout) - except asyncio.CancelledError: - pass - finally: - await self._stop() - - async def api(self, request): - try: - body = await request.json() - if type(body) is list or type(body) is dict: - if type(body) is list: # batch request - response = [self.process_json(part) for part in body] - else: # single rpc request - response = self.process_json(body) - return web.json_response(response) - else: - return web.json_response({'error': ERRORS['UNKNOWN']}) - except json.decoder.JSONDecodeError as jde: - return web.json_response({ - 'error': {'message': jde.msg, 'code': -1} - }) +async def run_app(app, duration=3600): + app['comment_scheduler'] = await writes.create_comment_scheduler() + app['writer'] = writes.DatabaseWriter(config['PATH']['DEFAULT']) + runner = None + try: + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, config['HOST'], config['PORT']) + await site.start() + await asyncio.sleep(duration) + except asyncio.CancelledError: + pass + finally: + await stop_app(runner) if __name__ == '__main__': - app = CommentServer() - asyncio.run(app.run()) + appl = create_app(close_timeout=5.0) + asyncio.run(run_app(appl))