diff --git a/config/conf.json b/config/conf.json index 75776dd..dcecbe0 100644 --- a/config/conf.json +++ b/config/conf.json @@ -13,5 +13,9 @@ "host": "localhost", "port": 5921, "backup_int": 3600, - "lbrynet": "http://localhost:5279" + "lbrynet": "http://localhost:5279", + "notifications": { + "url": "https://api.lbry.com/event/comment", + "auth_token": "token goes here" + } } \ No newline at end of file diff --git a/src/database/queries.py b/src/database/queries.py index 7a12793..4d16f80 100644 --- a/src/database/queries.py +++ b/src/database/queries.py @@ -179,7 +179,7 @@ def get_comments_by_id(conn, comment_ids: typing.Union[list, tuple]) -> typing.U )] -def delete_comment_by_id(conn: sqlite3.Connection, comment_id: str): +def delete_comment_by_id(conn: sqlite3.Connection, comment_id: str) -> bool: with conn: curs = conn.execute("DELETE FROM COMMENT WHERE CommentId = ?", (comment_id,)) return bool(curs.rowcount) @@ -209,7 +209,7 @@ def get_claim_ids_from_comment_ids(conn: sqlite3.Connection, comment_ids: list): return {row['comment_id']: row['claim_id'] for row in cids.fetchall()} -def hide_comments_by_id(conn: sqlite3.Connection, comment_ids: list): +def hide_comments_by_id(conn: sqlite3.Connection, comment_ids: list) -> bool: with conn: curs = conn.cursor() curs.executemany( diff --git a/src/database/writes.py b/src/database/writes.py index 82b57fe..611874a 100644 --- a/src/database/writes.py +++ b/src/database/writes.py @@ -3,13 +3,17 @@ import sqlite3 from asyncio import coroutine -from src.database.queries import delete_comment_by_id +from src.database.queries import delete_comment_by_id, get_comments_by_id from src.database.queries import get_claim_ids_from_comment_ids from src.database.queries import get_comment_or_none from src.database.queries import hide_comments_by_id from src.database.queries import insert_channel from src.database.queries import insert_comment -from src.server.misc import channel_matches_pattern_or_error +from src.server.misc import channel_matches_pattern_or_error, create_notification_batch +from src.server.misc import is_valid_base_comment +from src.server.misc import is_valid_credential_input +from src.server.misc import send_notification +from src.server.misc import send_notifications from src.server.misc import get_claim_from_id from src.server.misc import validate_signature_from_claim @@ -41,28 +45,49 @@ def insert_channel_or_error(conn: sqlite3.Connection, channel_name: str, channel raise ValueError('Received invalid values for channel_id or channel_name') -async def abandon_comment(app, comment_id): +""" COROUTINE WRAPPERS """ + + +async def write_comment(app, params): # CREATE + return await coroutine(create_comment_or_error)(app['writer'], **params) + + +async def hide_comments(app, comment_ids): # UPDATE + return await coroutine(hide_comments_by_id)(app['writer'], comment_ids) + + +async def abandon_comment(app, comment_id): # DELETE return await coroutine(delete_comment_by_id)(app['writer'], comment_id) +""" Core Functions called by request handlers """ + + async def abandon_comment_if_authorized(app, comment_id, channel_id, signature, signing_ts, **kwargs): claim = await get_claim_from_id(app, channel_id) if not validate_signature_from_claim(claim, signature, signing_ts, comment_id): return False + comment = get_comment_or_none(app['reader'], comment_id) job = await app['comment_scheduler'].spawn(abandon_comment(app, comment_id)) + await app['webhooks'].spawn(send_notification(app, 'DELETE', comment)) return await job.wait() -async def write_comment(app, params): - return await coroutine(create_comment_or_error)(app['writer'], **params) +async def create_comment(app, params): + if is_valid_base_comment(**params) and is_valid_credential_input(**params): + job = await app['comment_scheduler'].spawn(write_comment(app, params)) + comment = await job.wait() + if comment: + await app['webhooks'].spawn( + send_notification(app, 'CREATE', comment) + ) + return comment + else: + raise ValueError('base comment is invalid') -async def hide_comments(app, comment_ids): - return await coroutine(hide_comments_by_id)(app['writer'], comment_ids) - - -async def hide_comments_where_authorized(app, pieces: list): +async def hide_comments_where_authorized(app, pieces: list) -> list: comment_cids = get_claim_ids_from_comment_ids( conn=app['reader'], comment_ids=[p['comment_id'] for p in pieces] @@ -76,10 +101,15 @@ async def hide_comments_where_authorized(app, pieces: list): claims[claim_id] = await get_claim_from_id(app, claim_id, no_totals=True) channel = claims[claim_id].get('signing_channel') if validate_signature_from_claim(channel, p['signature'], p['signing_ts'], p['comment_id']): - comments_to_hide.append(p['comment_id']) + comments_to_hide.append(p) - if comments_to_hide: - job = await app['comment_scheduler'].spawn(hide_comments(app, comments_to_hide)) - await job.wait() + comment_ids = [c['comment_id'] for c in comments_to_hide] + job = await app['comment_scheduler'].spawn(hide_comments(app, comment_ids)) + await app['webhooks'].spawn( + send_notifications( + app, 'UPDATE', get_comments_by_id(app['reader'], comment_ids) + ) + ) - return {'hidden': comments_to_hide} + await job.wait() + return comment_ids diff --git a/src/server/app.py b/src/server/app.py index 87d20db..79b4cbd 100644 --- a/src/server/app.py +++ b/src/server/app.py @@ -38,33 +38,18 @@ async def database_backup_routine(app): pass -async def post_errors_to_slack(client, app): - while app['errors'].qsize() > 0: - msg = app['errors'].get_nowait() - if 'slack_webhook' in app['config']: - await client.post(app['config']['slack_webhook'], json=msg) - - -async def report_errors_to_slack_webhook(app): - async with aiohttp.ClientSession() as client: - try: - while True: - await asyncio.shield(post_errors_to_slack(client, app)) - await asyncio.sleep(10) - - except asyncio.CancelledError: - await asyncio.shield(post_errors_to_slack(client, app)) - - async def start_background_tasks(app): + # Reading the DB app['reader'] = obtain_connection(app['db_path'], True) app['waitful_backup'] = asyncio.create_task(database_backup_routine(app)) + + # Scheduler to prevent multiple threads from writing to DB simulataneously app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0) app['db_writer'] = DatabaseWriter(app['db_path']) app['writer'] = app['db_writer'].connection - app['errors'] = queue.Queue() - app['slack_webhook'] = asyncio.create_task(report_errors_to_slack_webhook(app)) + # for requesting to external and internal APIs + app['webhooks'] = await aiojobs.create_scheduler(pending_limit=0) async def close_database_connections(app): @@ -76,33 +61,38 @@ async def close_database_connections(app): app['db_writer'].cleanup() -async def stop_reporting_errors(app): - app['slack_webhook'].cancel() - await app['slack_webhook'] - - -async def close_comment_scheduler(app): +async def close_schedulers(app): logger.info('Closing comment_scheduler') await app['comment_scheduler'].close() + logger.info('Closing scheduler for webhook requests') + await app['webhooks'].close() + class CommentDaemon: def __init__(self, config, db_file=None, backup=None, **kwargs): app = web.Application() + + # configure the config app['config'] = config self.config = app['config'] + + # configure the db file if db_file: app['db_path'] = db_file app['backup'] = backup else: app['db_path'] = config['path']['database'] app['backup'] = backup or (app['db_path'] + '.backup') + + # configure the order of tasks to run during app lifetime app.on_startup.append(setup_db_schema) app.on_startup.append(start_background_tasks) - app.on_shutdown.append(close_comment_scheduler) + app.on_shutdown.append(close_schedulers) app.on_cleanup.append(close_database_connections) - app.on_cleanup.append(stop_reporting_errors) aiojobs.aiohttp.setup(app, **kwargs) + + # Configure the routes app.add_routes([ web.post('/api', api_endpoint), web.get('/', get_api_endpoint), diff --git a/src/server/handles.py b/src/server/handles.py index b769178..222c69a 100644 --- a/src/server/handles.py +++ b/src/server/handles.py @@ -5,16 +5,13 @@ import asyncio from aiohttp import web from aiojobs.aiohttp import atomic -from src.server.misc import clean_input_params +from src.server.misc import clean_input_params, report_error from src.database.queries import get_claim_comments from src.database.queries import get_comments_by_id, get_comment_ids from src.database.queries import get_channel_id_from_comment_id from src.database.queries import get_claim_hidden_comments -from src.server.misc import is_valid_base_comment -from src.server.misc import is_valid_credential_input from src.server.misc import make_error -from src.database.writes import abandon_comment_if_authorized -from src.database.writes import write_comment +from src.database.writes import abandon_comment_if_authorized, create_comment from src.database.writes import hide_comments_where_authorized @@ -47,11 +44,7 @@ def handle_get_claim_hidden_comments(app, kwargs): async def handle_create_comment(app, params): - if is_valid_base_comment(**params) and is_valid_credential_input(**params): - job = await app['comment_scheduler'].spawn(write_comment(app, params)) - return await job.wait() - else: - raise ValueError('base comment is invalid') + return await create_comment(app, params) async def handle_abandon_comment(app, params): @@ -59,7 +52,7 @@ async def handle_abandon_comment(app, params): async def handle_hide_comments(app, params): - return await hide_comments_where_authorized(app, **params) + return {'hidden': await hide_comments_where_authorized(app, **params)} METHODS = { @@ -92,10 +85,11 @@ async def process_json(app, body: dict) -> dict: response['result'] = result except Exception as err: logger.exception(f'Got {type(err).__name__}:') - if type(err) in (ValueError, TypeError): - response['error'] = make_error('INVALID_PARAMS', err, app) + if type(err) in (ValueError, TypeError): # param error, not too important + response['error'] = make_error('INVALID_PARAMS', err) else: - response['error'] = make_error('INTERNAL', err, app) + response['error'] = make_error('INTERNAL', err) + await app['webhooks'].spawn(report_error(app, err)) finally: end = time.time() @@ -123,7 +117,7 @@ async def api_endpoint(request: web.Request): else: return web.json_response(await process_json(request.app, body)) except Exception as e: - return make_error('INVALID_REQUEST', e, request.app) + return make_error('INVALID_REQUEST', e) async def get_api_endpoint(request: web.Request): diff --git a/src/server/misc.py b/src/server/misc.py index 5c181ac..76a3bd2 100644 --- a/src/server/misc.py +++ b/src/server/misc.py @@ -3,6 +3,7 @@ import hashlib import logging import re from json import JSONDecodeError +from typing import List import aiohttp import ecdsa @@ -31,22 +32,65 @@ ERRORS = { } -def make_error(error, exc=None, app=None) -> dict: +def make_error(error, exc=None) -> dict: body = ERRORS[error] if error in ERRORS else ERRORS['INTERNAL'] try: if exc: exc_name = type(exc).__name__ body.update({exc_name: str(exc)}) - if app: - app['errors'].put_nowait({ - "text": f"Got `{exc_name}`: ```\n{exc}```" - }) - finally: return body +async def report_error(app, exc, msg=''): + try: + if 'slack_webhook' in app['config']: + if msg: + msg = f'"{msg}"' + body = { + "text": f"Got `{type(exc).__name__}`: ```\n{str(exc)}```\n{msg}" + } + async with aiohttp.ClientSession() as sesh: + async with sesh.post(app['config']['slack_webhook'], json=body) as resp: + await resp.wait_for_close() + + except Exception: + logger.critical('Error while logging to slack webhook') + + +async def send_notifications(app, action: str, comments: List[dict]): + events = create_notification_batch(action, comments) + async with aiohttp.ClientSession() as session: + for event in events: + event.update(auth_token=app['config']['notifications']['auth_token']) + try: + async with session.get(app['config']['notifications']['url'], params=event) as resp: + logger.debug(f'Completed Notification: {await resp.text()}, HTTP Status: {resp.status}') + except Exception: + logger.exception(f'Error requesting internal API, Status {resp.status}: {resp.text()}, ' + f'comment_id: {event["comment_id"]}') + + +async def send_notification(app, action: str, comment: dict): + await send_notifications(app, action, [comment]) + + +def create_notification_batch(action: str, comments: List[dict]) -> List[dict]: + action_type = action[0].capitalize() # to turn Create -> C, edit -> U, delete -> D + events = [] + for comment in comments: + event = { + 'action_type': action_type, + 'comment_id': comment['comment_id'], + 'claim_id': comment['claim_id'] + } + if comment.get('channel_id'): + event['channel_id'] = comment['channel_id'] + events.append(event) + return events + + async def request_lbrynet(app, method, **params): body = {'method': method, 'params': {**params}} try: @@ -66,7 +110,7 @@ async def request_lbrynet(app, method, **params): async def get_claim_from_id(app, claim_id, **kwargs): - return (await request_lbrynet(app, 'claim_search', no_totals=True, claim_id=claim_id, **kwargs))['items'][0] + return (await request_lbrynet(app, 'claim_search', claim_id=claim_id, **kwargs))['items'][0] def get_encoded_signature(signature):