Merge pull request #25 from lbryio/internal-api
Adds calls to internal-api when CrUD operations occur & fixes up erro…
This commit is contained in:
commit
114ba46298
6 changed files with 130 additions and 68 deletions
|
@ -13,5 +13,9 @@
|
|||
"host": "localhost",
|
||||
"port": 5921,
|
||||
"backup_int": 3600,
|
||||
"lbrynet": "http://localhost:5279"
|
||||
"lbrynet": "http://localhost:5279",
|
||||
"notifications": {
|
||||
"url": "https://api.lbry.com/event/comment",
|
||||
"auth_token": "token goes here"
|
||||
}
|
||||
}
|
|
@ -179,7 +179,7 @@ def get_comments_by_id(conn, comment_ids: typing.Union[list, tuple]) -> typing.U
|
|||
)]
|
||||
|
||||
|
||||
def delete_comment_by_id(conn: sqlite3.Connection, comment_id: str):
|
||||
def delete_comment_by_id(conn: sqlite3.Connection, comment_id: str) -> bool:
|
||||
with conn:
|
||||
curs = conn.execute("DELETE FROM COMMENT WHERE CommentId = ?", (comment_id,))
|
||||
return bool(curs.rowcount)
|
||||
|
@ -209,7 +209,7 @@ def get_claim_ids_from_comment_ids(conn: sqlite3.Connection, comment_ids: list):
|
|||
return {row['comment_id']: row['claim_id'] for row in cids.fetchall()}
|
||||
|
||||
|
||||
def hide_comments_by_id(conn: sqlite3.Connection, comment_ids: list):
|
||||
def hide_comments_by_id(conn: sqlite3.Connection, comment_ids: list) -> bool:
|
||||
with conn:
|
||||
curs = conn.cursor()
|
||||
curs.executemany(
|
||||
|
|
|
@ -3,13 +3,17 @@ import sqlite3
|
|||
|
||||
from asyncio import coroutine
|
||||
|
||||
from src.database.queries import delete_comment_by_id
|
||||
from src.database.queries import delete_comment_by_id, get_comments_by_id
|
||||
from src.database.queries import get_claim_ids_from_comment_ids
|
||||
from src.database.queries import get_comment_or_none
|
||||
from src.database.queries import hide_comments_by_id
|
||||
from src.database.queries import insert_channel
|
||||
from src.database.queries import insert_comment
|
||||
from src.server.misc import channel_matches_pattern_or_error
|
||||
from src.server.misc import channel_matches_pattern_or_error, create_notification_batch
|
||||
from src.server.misc import is_valid_base_comment
|
||||
from src.server.misc import is_valid_credential_input
|
||||
from src.server.misc import send_notification
|
||||
from src.server.misc import send_notifications
|
||||
from src.server.misc import get_claim_from_id
|
||||
from src.server.misc import validate_signature_from_claim
|
||||
|
||||
|
@ -41,28 +45,49 @@ def insert_channel_or_error(conn: sqlite3.Connection, channel_name: str, channel
|
|||
raise ValueError('Received invalid values for channel_id or channel_name')
|
||||
|
||||
|
||||
async def abandon_comment(app, comment_id):
|
||||
""" COROUTINE WRAPPERS """
|
||||
|
||||
|
||||
async def write_comment(app, params): # CREATE
|
||||
return await coroutine(create_comment_or_error)(app['writer'], **params)
|
||||
|
||||
|
||||
async def hide_comments(app, comment_ids): # UPDATE
|
||||
return await coroutine(hide_comments_by_id)(app['writer'], comment_ids)
|
||||
|
||||
|
||||
async def abandon_comment(app, comment_id): # DELETE
|
||||
return await coroutine(delete_comment_by_id)(app['writer'], comment_id)
|
||||
|
||||
|
||||
""" Core Functions called by request handlers """
|
||||
|
||||
|
||||
async def abandon_comment_if_authorized(app, comment_id, channel_id, signature, signing_ts, **kwargs):
|
||||
claim = await get_claim_from_id(app, channel_id)
|
||||
if not validate_signature_from_claim(claim, signature, signing_ts, comment_id):
|
||||
return False
|
||||
|
||||
comment = get_comment_or_none(app['reader'], comment_id)
|
||||
job = await app['comment_scheduler'].spawn(abandon_comment(app, comment_id))
|
||||
await app['webhooks'].spawn(send_notification(app, 'DELETE', comment))
|
||||
return await job.wait()
|
||||
|
||||
|
||||
async def write_comment(app, params):
|
||||
return await coroutine(create_comment_or_error)(app['writer'], **params)
|
||||
async def create_comment(app, params):
|
||||
if is_valid_base_comment(**params) and is_valid_credential_input(**params):
|
||||
job = await app['comment_scheduler'].spawn(write_comment(app, params))
|
||||
comment = await job.wait()
|
||||
if comment:
|
||||
await app['webhooks'].spawn(
|
||||
send_notification(app, 'CREATE', comment)
|
||||
)
|
||||
return comment
|
||||
else:
|
||||
raise ValueError('base comment is invalid')
|
||||
|
||||
|
||||
async def hide_comments(app, comment_ids):
|
||||
return await coroutine(hide_comments_by_id)(app['writer'], comment_ids)
|
||||
|
||||
|
||||
async def hide_comments_where_authorized(app, pieces: list):
|
||||
async def hide_comments_where_authorized(app, pieces: list) -> list:
|
||||
comment_cids = get_claim_ids_from_comment_ids(
|
||||
conn=app['reader'],
|
||||
comment_ids=[p['comment_id'] for p in pieces]
|
||||
|
@ -76,10 +101,15 @@ async def hide_comments_where_authorized(app, pieces: list):
|
|||
claims[claim_id] = await get_claim_from_id(app, claim_id, no_totals=True)
|
||||
channel = claims[claim_id].get('signing_channel')
|
||||
if validate_signature_from_claim(channel, p['signature'], p['signing_ts'], p['comment_id']):
|
||||
comments_to_hide.append(p['comment_id'])
|
||||
comments_to_hide.append(p)
|
||||
|
||||
if comments_to_hide:
|
||||
job = await app['comment_scheduler'].spawn(hide_comments(app, comments_to_hide))
|
||||
await job.wait()
|
||||
comment_ids = [c['comment_id'] for c in comments_to_hide]
|
||||
job = await app['comment_scheduler'].spawn(hide_comments(app, comment_ids))
|
||||
await app['webhooks'].spawn(
|
||||
send_notifications(
|
||||
app, 'UPDATE', get_comments_by_id(app['reader'], comment_ids)
|
||||
)
|
||||
)
|
||||
|
||||
return {'hidden': comments_to_hide}
|
||||
await job.wait()
|
||||
return comment_ids
|
||||
|
|
|
@ -38,33 +38,18 @@ async def database_backup_routine(app):
|
|||
pass
|
||||
|
||||
|
||||
async def post_errors_to_slack(client, app):
|
||||
while app['errors'].qsize() > 0:
|
||||
msg = app['errors'].get_nowait()
|
||||
if 'slack_webhook' in app['config']:
|
||||
await client.post(app['config']['slack_webhook'], json=msg)
|
||||
|
||||
|
||||
async def report_errors_to_slack_webhook(app):
|
||||
async with aiohttp.ClientSession() as client:
|
||||
try:
|
||||
while True:
|
||||
await asyncio.shield(post_errors_to_slack(client, app))
|
||||
await asyncio.sleep(10)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
await asyncio.shield(post_errors_to_slack(client, app))
|
||||
|
||||
|
||||
async def start_background_tasks(app):
|
||||
# Reading the DB
|
||||
app['reader'] = obtain_connection(app['db_path'], True)
|
||||
app['waitful_backup'] = asyncio.create_task(database_backup_routine(app))
|
||||
|
||||
# Scheduler to prevent multiple threads from writing to DB simulataneously
|
||||
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
|
||||
app['db_writer'] = DatabaseWriter(app['db_path'])
|
||||
app['writer'] = app['db_writer'].connection
|
||||
|
||||
app['errors'] = queue.Queue()
|
||||
app['slack_webhook'] = asyncio.create_task(report_errors_to_slack_webhook(app))
|
||||
# for requesting to external and internal APIs
|
||||
app['webhooks'] = await aiojobs.create_scheduler(pending_limit=0)
|
||||
|
||||
|
||||
async def close_database_connections(app):
|
||||
|
@ -76,33 +61,38 @@ async def close_database_connections(app):
|
|||
app['db_writer'].cleanup()
|
||||
|
||||
|
||||
async def stop_reporting_errors(app):
|
||||
app['slack_webhook'].cancel()
|
||||
await app['slack_webhook']
|
||||
|
||||
|
||||
async def close_comment_scheduler(app):
|
||||
async def close_schedulers(app):
|
||||
logger.info('Closing comment_scheduler')
|
||||
await app['comment_scheduler'].close()
|
||||
|
||||
logger.info('Closing scheduler for webhook requests')
|
||||
await app['webhooks'].close()
|
||||
|
||||
|
||||
class CommentDaemon:
|
||||
def __init__(self, config, db_file=None, backup=None, **kwargs):
|
||||
app = web.Application()
|
||||
|
||||
# configure the config
|
||||
app['config'] = config
|
||||
self.config = app['config']
|
||||
|
||||
# configure the db file
|
||||
if db_file:
|
||||
app['db_path'] = db_file
|
||||
app['backup'] = backup
|
||||
else:
|
||||
app['db_path'] = config['path']['database']
|
||||
app['backup'] = backup or (app['db_path'] + '.backup')
|
||||
|
||||
# configure the order of tasks to run during app lifetime
|
||||
app.on_startup.append(setup_db_schema)
|
||||
app.on_startup.append(start_background_tasks)
|
||||
app.on_shutdown.append(close_comment_scheduler)
|
||||
app.on_shutdown.append(close_schedulers)
|
||||
app.on_cleanup.append(close_database_connections)
|
||||
app.on_cleanup.append(stop_reporting_errors)
|
||||
aiojobs.aiohttp.setup(app, **kwargs)
|
||||
|
||||
# Configure the routes
|
||||
app.add_routes([
|
||||
web.post('/api', api_endpoint),
|
||||
web.get('/', get_api_endpoint),
|
||||
|
|
|
@ -5,16 +5,13 @@ import asyncio
|
|||
from aiohttp import web
|
||||
from aiojobs.aiohttp import atomic
|
||||
|
||||
from src.server.misc import clean_input_params
|
||||
from src.server.misc import clean_input_params, report_error
|
||||
from src.database.queries import get_claim_comments
|
||||
from src.database.queries import get_comments_by_id, get_comment_ids
|
||||
from src.database.queries import get_channel_id_from_comment_id
|
||||
from src.database.queries import get_claim_hidden_comments
|
||||
from src.server.misc import is_valid_base_comment
|
||||
from src.server.misc import is_valid_credential_input
|
||||
from src.server.misc import make_error
|
||||
from src.database.writes import abandon_comment_if_authorized
|
||||
from src.database.writes import write_comment
|
||||
from src.database.writes import abandon_comment_if_authorized, create_comment
|
||||
from src.database.writes import hide_comments_where_authorized
|
||||
|
||||
|
||||
|
@ -47,11 +44,7 @@ def handle_get_claim_hidden_comments(app, kwargs):
|
|||
|
||||
|
||||
async def handle_create_comment(app, params):
|
||||
if is_valid_base_comment(**params) and is_valid_credential_input(**params):
|
||||
job = await app['comment_scheduler'].spawn(write_comment(app, params))
|
||||
return await job.wait()
|
||||
else:
|
||||
raise ValueError('base comment is invalid')
|
||||
return await create_comment(app, params)
|
||||
|
||||
|
||||
async def handle_abandon_comment(app, params):
|
||||
|
@ -59,7 +52,7 @@ async def handle_abandon_comment(app, params):
|
|||
|
||||
|
||||
async def handle_hide_comments(app, params):
|
||||
return await hide_comments_where_authorized(app, **params)
|
||||
return {'hidden': await hide_comments_where_authorized(app, **params)}
|
||||
|
||||
|
||||
METHODS = {
|
||||
|
@ -92,10 +85,11 @@ async def process_json(app, body: dict) -> dict:
|
|||
response['result'] = result
|
||||
except Exception as err:
|
||||
logger.exception(f'Got {type(err).__name__}:')
|
||||
if type(err) in (ValueError, TypeError):
|
||||
response['error'] = make_error('INVALID_PARAMS', err, app)
|
||||
if type(err) in (ValueError, TypeError): # param error, not too important
|
||||
response['error'] = make_error('INVALID_PARAMS', err)
|
||||
else:
|
||||
response['error'] = make_error('INTERNAL', err, app)
|
||||
response['error'] = make_error('INTERNAL', err)
|
||||
await app['webhooks'].spawn(report_error(app, err))
|
||||
|
||||
finally:
|
||||
end = time.time()
|
||||
|
@ -123,7 +117,7 @@ async def api_endpoint(request: web.Request):
|
|||
else:
|
||||
return web.json_response(await process_json(request.app, body))
|
||||
except Exception as e:
|
||||
return make_error('INVALID_REQUEST', e, request.app)
|
||||
return make_error('INVALID_REQUEST', e)
|
||||
|
||||
|
||||
async def get_api_endpoint(request: web.Request):
|
||||
|
|
|
@ -3,6 +3,7 @@ import hashlib
|
|||
import logging
|
||||
import re
|
||||
from json import JSONDecodeError
|
||||
from typing import List
|
||||
|
||||
import aiohttp
|
||||
import ecdsa
|
||||
|
@ -31,22 +32,65 @@ ERRORS = {
|
|||
}
|
||||
|
||||
|
||||
def make_error(error, exc=None, app=None) -> dict:
|
||||
def make_error(error, exc=None) -> dict:
|
||||
body = ERRORS[error] if error in ERRORS else ERRORS['INTERNAL']
|
||||
try:
|
||||
if exc:
|
||||
exc_name = type(exc).__name__
|
||||
body.update({exc_name: str(exc)})
|
||||
|
||||
if app:
|
||||
app['errors'].put_nowait({
|
||||
"text": f"Got `{exc_name}`: ```\n{exc}```"
|
||||
})
|
||||
|
||||
finally:
|
||||
return body
|
||||
|
||||
|
||||
async def report_error(app, exc, msg=''):
|
||||
try:
|
||||
if 'slack_webhook' in app['config']:
|
||||
if msg:
|
||||
msg = f'"{msg}"'
|
||||
body = {
|
||||
"text": f"Got `{type(exc).__name__}`: ```\n{str(exc)}```\n{msg}"
|
||||
}
|
||||
async with aiohttp.ClientSession() as sesh:
|
||||
async with sesh.post(app['config']['slack_webhook'], json=body) as resp:
|
||||
await resp.wait_for_close()
|
||||
|
||||
except Exception:
|
||||
logger.critical('Error while logging to slack webhook')
|
||||
|
||||
|
||||
async def send_notifications(app, action: str, comments: List[dict]):
|
||||
events = create_notification_batch(action, comments)
|
||||
async with aiohttp.ClientSession() as session:
|
||||
for event in events:
|
||||
event.update(auth_token=app['config']['notifications']['auth_token'])
|
||||
try:
|
||||
async with session.get(app['config']['notifications']['url'], params=event) as resp:
|
||||
logger.debug(f'Completed Notification: {await resp.text()}, HTTP Status: {resp.status}')
|
||||
except Exception:
|
||||
logger.exception(f'Error requesting internal API, Status {resp.status}: {resp.text()}, '
|
||||
f'comment_id: {event["comment_id"]}')
|
||||
|
||||
|
||||
async def send_notification(app, action: str, comment: dict):
|
||||
await send_notifications(app, action, [comment])
|
||||
|
||||
|
||||
def create_notification_batch(action: str, comments: List[dict]) -> List[dict]:
|
||||
action_type = action[0].capitalize() # to turn Create -> C, edit -> U, delete -> D
|
||||
events = []
|
||||
for comment in comments:
|
||||
event = {
|
||||
'action_type': action_type,
|
||||
'comment_id': comment['comment_id'],
|
||||
'claim_id': comment['claim_id']
|
||||
}
|
||||
if comment.get('channel_id'):
|
||||
event['channel_id'] = comment['channel_id']
|
||||
events.append(event)
|
||||
return events
|
||||
|
||||
|
||||
async def request_lbrynet(app, method, **params):
|
||||
body = {'method': method, 'params': {**params}}
|
||||
try:
|
||||
|
@ -66,7 +110,7 @@ async def request_lbrynet(app, method, **params):
|
|||
|
||||
|
||||
async def get_claim_from_id(app, claim_id, **kwargs):
|
||||
return (await request_lbrynet(app, 'claim_search', no_totals=True, claim_id=claim_id, **kwargs))['items'][0]
|
||||
return (await request_lbrynet(app, 'claim_search', claim_id=claim_id, **kwargs))['items'][0]
|
||||
|
||||
|
||||
def get_encoded_signature(signature):
|
||||
|
|
Loading…
Add table
Reference in a new issue