Adds calls to internal-api when CrUD operations occur & fixes up error logging

This commit is contained in:
Oleg Silkin 2019-12-26 22:44:37 -05:00
parent 1fafeac393
commit caef26ed4b
6 changed files with 130 additions and 68 deletions

View file

@ -13,5 +13,9 @@
"host": "localhost",
"port": 5921,
"backup_int": 3600,
"lbrynet": "http://localhost:5279"
"lbrynet": "http://localhost:5279",
"notifications": {
"url": "https://api.lbry.com/event/comment",
"auth_token": "token goes here"
}
}

View file

@ -179,7 +179,7 @@ def get_comments_by_id(conn, comment_ids: typing.Union[list, tuple]) -> typing.U
)]
def delete_comment_by_id(conn: sqlite3.Connection, comment_id: str):
def delete_comment_by_id(conn: sqlite3.Connection, comment_id: str) -> bool:
with conn:
curs = conn.execute("DELETE FROM COMMENT WHERE CommentId = ?", (comment_id,))
return bool(curs.rowcount)
@ -209,7 +209,7 @@ def get_claim_ids_from_comment_ids(conn: sqlite3.Connection, comment_ids: list):
return {row['comment_id']: row['claim_id'] for row in cids.fetchall()}
def hide_comments_by_id(conn: sqlite3.Connection, comment_ids: list):
def hide_comments_by_id(conn: sqlite3.Connection, comment_ids: list) -> bool:
with conn:
curs = conn.cursor()
curs.executemany(

View file

@ -3,13 +3,17 @@ import sqlite3
from asyncio import coroutine
from src.database.queries import delete_comment_by_id
from src.database.queries import delete_comment_by_id, get_comments_by_id
from src.database.queries import get_claim_ids_from_comment_ids
from src.database.queries import get_comment_or_none
from src.database.queries import hide_comments_by_id
from src.database.queries import insert_channel
from src.database.queries import insert_comment
from src.server.misc import channel_matches_pattern_or_error
from src.server.misc import channel_matches_pattern_or_error, create_notification_batch
from src.server.misc import is_valid_base_comment
from src.server.misc import is_valid_credential_input
from src.server.misc import send_notification
from src.server.misc import send_notifications
from src.server.misc import get_claim_from_id
from src.server.misc import validate_signature_from_claim
@ -41,28 +45,49 @@ def insert_channel_or_error(conn: sqlite3.Connection, channel_name: str, channel
raise ValueError('Received invalid values for channel_id or channel_name')
async def abandon_comment(app, comment_id):
""" COROUTINE WRAPPERS """
async def write_comment(app, params): # CREATE
return await coroutine(create_comment_or_error)(app['writer'], **params)
async def hide_comments(app, comment_ids): # UPDATE
return await coroutine(hide_comments_by_id)(app['writer'], comment_ids)
async def abandon_comment(app, comment_id): # DELETE
return await coroutine(delete_comment_by_id)(app['writer'], comment_id)
""" Core Functions called by request handlers """
async def abandon_comment_if_authorized(app, comment_id, channel_id, signature, signing_ts, **kwargs):
claim = await get_claim_from_id(app, channel_id)
if not validate_signature_from_claim(claim, signature, signing_ts, comment_id):
return False
comment = get_comment_or_none(app['reader'], comment_id)
job = await app['comment_scheduler'].spawn(abandon_comment(app, comment_id))
await app['webhooks'].spawn(send_notification(app, 'DELETE', comment))
return await job.wait()
async def write_comment(app, params):
return await coroutine(create_comment_or_error)(app['writer'], **params)
async def create_comment(app, params):
if is_valid_base_comment(**params) and is_valid_credential_input(**params):
job = await app['comment_scheduler'].spawn(write_comment(app, params))
comment = await job.wait()
if comment:
await app['webhooks'].spawn(
send_notification(app, 'CREATE', comment)
)
return comment
else:
raise ValueError('base comment is invalid')
async def hide_comments(app, comment_ids):
return await coroutine(hide_comments_by_id)(app['writer'], comment_ids)
async def hide_comments_where_authorized(app, pieces: list):
async def hide_comments_where_authorized(app, pieces: list) -> list:
comment_cids = get_claim_ids_from_comment_ids(
conn=app['reader'],
comment_ids=[p['comment_id'] for p in pieces]
@ -76,10 +101,15 @@ async def hide_comments_where_authorized(app, pieces: list):
claims[claim_id] = await get_claim_from_id(app, claim_id, no_totals=True)
channel = claims[claim_id].get('signing_channel')
if validate_signature_from_claim(channel, p['signature'], p['signing_ts'], p['comment_id']):
comments_to_hide.append(p['comment_id'])
comments_to_hide.append(p)
if comments_to_hide:
job = await app['comment_scheduler'].spawn(hide_comments(app, comments_to_hide))
await job.wait()
comment_ids = [c['comment_id'] for c in comments_to_hide]
job = await app['comment_scheduler'].spawn(hide_comments(app, comment_ids))
await app['webhooks'].spawn(
send_notifications(
app, 'UPDATE', get_comments_by_id(app['reader'], comment_ids)
)
)
return {'hidden': comments_to_hide}
await job.wait()
return comment_ids

View file

@ -38,33 +38,18 @@ async def database_backup_routine(app):
pass
async def post_errors_to_slack(client, app):
while app['errors'].qsize() > 0:
msg = app['errors'].get_nowait()
if 'slack_webhook' in app['config']:
await client.post(app['config']['slack_webhook'], json=msg)
async def report_errors_to_slack_webhook(app):
async with aiohttp.ClientSession() as client:
try:
while True:
await asyncio.shield(post_errors_to_slack(client, app))
await asyncio.sleep(10)
except asyncio.CancelledError:
await asyncio.shield(post_errors_to_slack(client, app))
async def start_background_tasks(app):
# Reading the DB
app['reader'] = obtain_connection(app['db_path'], True)
app['waitful_backup'] = asyncio.create_task(database_backup_routine(app))
# Scheduler to prevent multiple threads from writing to DB simulataneously
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
app['db_writer'] = DatabaseWriter(app['db_path'])
app['writer'] = app['db_writer'].connection
app['errors'] = queue.Queue()
app['slack_webhook'] = asyncio.create_task(report_errors_to_slack_webhook(app))
# for requesting to external and internal APIs
app['webhooks'] = await aiojobs.create_scheduler(pending_limit=0)
async def close_database_connections(app):
@ -76,33 +61,38 @@ async def close_database_connections(app):
app['db_writer'].cleanup()
async def stop_reporting_errors(app):
app['slack_webhook'].cancel()
await app['slack_webhook']
async def close_comment_scheduler(app):
async def close_schedulers(app):
logger.info('Closing comment_scheduler')
await app['comment_scheduler'].close()
logger.info('Closing scheduler for webhook requests')
await app['webhooks'].close()
class CommentDaemon:
def __init__(self, config, db_file=None, backup=None, **kwargs):
app = web.Application()
# configure the config
app['config'] = config
self.config = app['config']
# configure the db file
if db_file:
app['db_path'] = db_file
app['backup'] = backup
else:
app['db_path'] = config['path']['database']
app['backup'] = backup or (app['db_path'] + '.backup')
# configure the order of tasks to run during app lifetime
app.on_startup.append(setup_db_schema)
app.on_startup.append(start_background_tasks)
app.on_shutdown.append(close_comment_scheduler)
app.on_shutdown.append(close_schedulers)
app.on_cleanup.append(close_database_connections)
app.on_cleanup.append(stop_reporting_errors)
aiojobs.aiohttp.setup(app, **kwargs)
# Configure the routes
app.add_routes([
web.post('/api', api_endpoint),
web.get('/', get_api_endpoint),

View file

@ -5,16 +5,13 @@ import asyncio
from aiohttp import web
from aiojobs.aiohttp import atomic
from src.server.misc import clean_input_params
from src.server.misc import clean_input_params, report_error
from src.database.queries import get_claim_comments
from src.database.queries import get_comments_by_id, get_comment_ids
from src.database.queries import get_channel_id_from_comment_id
from src.database.queries import get_claim_hidden_comments
from src.server.misc import is_valid_base_comment
from src.server.misc import is_valid_credential_input
from src.server.misc import make_error
from src.database.writes import abandon_comment_if_authorized
from src.database.writes import write_comment
from src.database.writes import abandon_comment_if_authorized, create_comment
from src.database.writes import hide_comments_where_authorized
@ -47,11 +44,7 @@ def handle_get_claim_hidden_comments(app, kwargs):
async def handle_create_comment(app, params):
if is_valid_base_comment(**params) and is_valid_credential_input(**params):
job = await app['comment_scheduler'].spawn(write_comment(app, params))
return await job.wait()
else:
raise ValueError('base comment is invalid')
return await create_comment(app, params)
async def handle_abandon_comment(app, params):
@ -59,7 +52,7 @@ async def handle_abandon_comment(app, params):
async def handle_hide_comments(app, params):
return await hide_comments_where_authorized(app, **params)
return {'hidden': await hide_comments_where_authorized(app, **params)}
METHODS = {
@ -92,10 +85,11 @@ async def process_json(app, body: dict) -> dict:
response['result'] = result
except Exception as err:
logger.exception(f'Got {type(err).__name__}:')
if type(err) in (ValueError, TypeError):
response['error'] = make_error('INVALID_PARAMS', err, app)
if type(err) in (ValueError, TypeError): # param error, not too important
response['error'] = make_error('INVALID_PARAMS', err)
else:
response['error'] = make_error('INTERNAL', err, app)
response['error'] = make_error('INTERNAL', err)
await app['webhooks'].spawn(report_error(app, err))
finally:
end = time.time()
@ -123,7 +117,7 @@ async def api_endpoint(request: web.Request):
else:
return web.json_response(await process_json(request.app, body))
except Exception as e:
return make_error('INVALID_REQUEST', e, request.app)
return make_error('INVALID_REQUEST', e)
async def get_api_endpoint(request: web.Request):

View file

@ -3,6 +3,7 @@ import hashlib
import logging
import re
from json import JSONDecodeError
from typing import List
import aiohttp
import ecdsa
@ -31,22 +32,65 @@ ERRORS = {
}
def make_error(error, exc=None, app=None) -> dict:
def make_error(error, exc=None) -> dict:
body = ERRORS[error] if error in ERRORS else ERRORS['INTERNAL']
try:
if exc:
exc_name = type(exc).__name__
body.update({exc_name: str(exc)})
if app:
app['errors'].put_nowait({
"text": f"Got `{exc_name}`: ```\n{exc}```"
})
finally:
return body
async def report_error(app, exc, msg=''):
try:
if 'slack_webhook' in app['config']:
if msg:
msg = f'"{msg}"'
body = {
"text": f"Got `{type(exc).__name__}`: ```\n{str(exc)}```\n{msg}"
}
async with aiohttp.ClientSession() as sesh:
async with sesh.post(app['config']['slack_webhook'], json=body) as resp:
await resp.wait_for_close()
except Exception:
logger.critical('Error while logging to slack webhook')
async def send_notifications(app, action: str, comments: List[dict]):
events = create_notification_batch(action, comments)
async with aiohttp.ClientSession() as session:
for event in events:
event.update(auth_token=app['config']['notifications']['auth_token'])
try:
async with session.get(app['config']['notifications']['url'], params=event) as resp:
logger.debug(f'Completed Notification: {await resp.text()}, HTTP Status: {resp.status}')
except Exception:
logger.exception(f'Error requesting internal API, Status {resp.status}: {resp.text()}, '
f'comment_id: {event["comment_id"]}')
async def send_notification(app, action: str, comment: dict):
await send_notifications(app, action, [comment])
def create_notification_batch(action: str, comments: List[dict]) -> List[dict]:
action_type = action[0].capitalize() # to turn Create -> C, edit -> U, delete -> D
events = []
for comment in comments:
event = {
'action_type': action_type,
'comment_id': comment['comment_id'],
'claim_id': comment['claim_id']
}
if comment.get('channel_id'):
event['channel_id'] = comment['channel_id']
events.append(event)
return events
async def request_lbrynet(app, method, **params):
body = {'method': method, 'params': {**params}}
try:
@ -66,7 +110,7 @@ async def request_lbrynet(app, method, **params):
async def get_claim_from_id(app, claim_id, **kwargs):
return (await request_lbrynet(app, 'claim_search', no_totals=True, claim_id=claim_id, **kwargs))['items'][0]
return (await request_lbrynet(app, 'claim_search', claim_id=claim_id, **kwargs))['items'][0]
def get_encoded_signature(signature):