Merge pull request #15 from lbryio/webhook

Adds slack webhook for error logging
This commit is contained in:
Oleg Silkin 2019-09-06 20:32:30 -04:00 committed by GitHub
commit bf2bdcd7a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 6 deletions

View file

@ -3,10 +3,13 @@ import logging
import pathlib import pathlib
import signal import signal
import time import time
import queue
import aiojobs import aiojobs
import aiojobs.aiohttp import aiojobs.aiohttp
import asyncio import asyncio
import aiohttp
from aiohttp import web from aiohttp import web
from src.database.queries import setup_database, backup_database from src.database.queries import setup_database, backup_database
@ -35,6 +38,24 @@ async def database_backup_routine(app):
pass pass
async def post_errors_to_slack(client, app):
while app['errors'].qsize() > 0:
msg = app['errors'].get_nowait()
if 'slack_webhook' in app['config']:
await client.post(app['config']['slack_webhook'], json=msg)
async def report_errors_to_slack_webhook(app):
async with aiohttp.ClientSession() as client:
try:
while True:
await asyncio.shield(post_errors_to_slack(client, app))
await asyncio.sleep(300)
except asyncio.CancelledError:
await asyncio.shield(post_errors_to_slack(client, app))
async def start_background_tasks(app): async def start_background_tasks(app):
app['reader'] = obtain_connection(app['db_path'], True) app['reader'] = obtain_connection(app['db_path'], True)
app['waitful_backup'] = asyncio.create_task(database_backup_routine(app)) app['waitful_backup'] = asyncio.create_task(database_backup_routine(app))
@ -42,6 +63,9 @@ async def start_background_tasks(app):
app['db_writer'] = DatabaseWriter(app['db_path']) app['db_writer'] = DatabaseWriter(app['db_path'])
app['writer'] = app['db_writer'].connection app['writer'] = app['db_writer'].connection
app['errors'] = queue.Queue()
app['slack_webhook'] = asyncio.create_task(report_errors_to_slack_webhook(app))
async def close_database_connections(app): async def close_database_connections(app):
logger.info('Ending background backup loop') logger.info('Ending background backup loop')
@ -52,6 +76,11 @@ async def close_database_connections(app):
app['db_writer'].cleanup() app['db_writer'].cleanup()
async def stop_reporting_errors(app):
app['slack_webhook'].cancel()
await app['slack_webhook']
async def close_comment_scheduler(app): async def close_comment_scheduler(app):
logger.info('Closing comment_scheduler') logger.info('Closing comment_scheduler')
await app['comment_scheduler'].close() await app['comment_scheduler'].close()
@ -72,6 +101,7 @@ class CommentDaemon:
app.on_startup.append(start_background_tasks) app.on_startup.append(start_background_tasks)
app.on_shutdown.append(close_comment_scheduler) app.on_shutdown.append(close_comment_scheduler)
app.on_cleanup.append(close_database_connections) app.on_cleanup.append(close_database_connections)
app.on_cleanup.append(stop_reporting_errors)
aiojobs.aiohttp.setup(app, **kwargs) aiojobs.aiohttp.setup(app, **kwargs)
app.add_routes([ app.add_routes([
web.post('/api', api_endpoint), web.post('/api', api_endpoint),

View file

@ -91,11 +91,12 @@ async def process_json(app, body: dict) -> dict:
result = METHODS[method](app, params) result = METHODS[method](app, params)
response['result'] = result response['result'] = result
except Exception as err: except Exception as err:
logger.exception(f'Got {type(err).__name__}: {err}') logger.exception(f'Got {type(err).__name__}:')
if type(err) in (ValueError, TypeError): if type(err) in (ValueError, TypeError):
response['error'] = make_error('INVALID_PARAMS', err) response['error'] = make_error('INVALID_PARAMS', err, app)
else: else:
response['error'] = make_error('INTERNAL', err) response['error'] = make_error('INTERNAL', err, app)
finally: finally:
end = time.time() end = time.time()
logger.debug(f'Time taken to process {method}: {end - start} secs') logger.debug(f'Time taken to process {method}: {end - start} secs')
@ -122,7 +123,7 @@ async def api_endpoint(request: web.Request):
else: else:
return web.json_response(await process_json(request.app, body)) return web.json_response(await process_json(request.app, body))
except Exception as e: except Exception as e:
return make_error('INVALID_REQUEST', e) return make_error('INVALID_REQUEST', e, request.app)
async def get_api_endpoint(request: web.Request): async def get_api_endpoint(request: web.Request):

View file

@ -31,11 +31,18 @@ ERRORS = {
} }
def make_error(error, exc=None) -> dict: def make_error(error, exc=None, app=None) -> dict:
body = ERRORS[error] if error in ERRORS else ERRORS['INTERNAL'] body = ERRORS[error] if error in ERRORS else ERRORS['INTERNAL']
try: try:
if exc: if exc:
body.update({type(exc).__name__: str(exc)}) exc_name = type(exc).__name__
body.update({exc_name: str(exc)})
if app:
app['errors'].put_nowait({
"text": f"Got `{exc_name}`: ```\n{exc}```"
})
finally: finally:
return body return body

View file

@ -13,6 +13,10 @@ from src.server import app
from test.testcase import AsyncioTestCase from test.testcase import AsyncioTestCase
if 'slack_webhook' in config:
config.pop('slack_webhook')
fake = faker.Faker() fake = faker.Faker()
fake.add_provider(internet) fake.add_provider(internet)
fake.add_provider(lorem) fake.add_provider(lorem)