diff --git a/lbry/scripts/wallet_server_monitor.py b/lbry/scripts/wallet_server_monitor.py index cf7273386..53942b876 100644 --- a/lbry/scripts/wallet_server_monitor.py +++ b/lbry/scripts/wallet_server_monitor.py @@ -1,5 +1,6 @@ import sys import json +import random import asyncio import argparse from datetime import datetime @@ -7,10 +8,11 @@ from datetime import datetime try: import aiohttp import psycopg2 + import slack except ImportError: - print(f"To run {sys.argv[0]} you need to install aiohttp and psycopg2:") + print(f"To run {sys.argv[0]} you need to install aiohttp, psycopg2 and slackclient:") print(f"") - print(f" $ pip install aiohttp psycopg2") + print(f" $ pip install aiohttp psycopg2 slackclient") print("") sys.exit(1) @@ -19,58 +21,99 @@ if not sys.version_info >= (3, 7): sys.exit(1) -async def monitor(db, server): +async def handle_analytics_event(cursor, event, server): + cursor.execute(""" + INSERT INTO wallet_server_stats (server, sessions, event_time) VALUES (%s,%s,%s); + """, (server, event['status']['sessions'], datetime.now())) + + for command, stats in event["api"].items(): + data = { + 'server': server, + 'command': command, + 'event_time': datetime.now() + } + for key, value in stats.items(): + if key.endswith("_queries"): + continue + if isinstance(value, list): + data.update({ + key + '_avg': value[0], + key + '_min': value[1], + key + '_five': value[2], + key + '_twenty_five': value[3], + key + '_fifty': value[4], + key + '_seventy_five': value[5], + key + '_ninety_five': value[6], + key + '_max': value[7], + }) + else: + data[key] = value + + cursor.execute(f""" + INSERT INTO wallet_server_command_stats ({','.join(data)}) + VALUES ({','.join('%s' for _ in data)}); + """, list(data.values())) + + +async def boris_says(slackclient, what_boris_says): + if slackclient: + await slackclient.chat_postMessage( + username="boris the wallet monitor", + icon_emoji=":boris:", + channel='#tech-sdk', + text=what_boris_says + ) + + +async def monitor(db, server, slackclient): c = db.cursor() - async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session: + delay = 30 + first_attempt = True + while True: try: - ws = await session.ws_connect(server) + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session: + try: + ws = await session.ws_connect(server) + except (aiohttp.ClientConnectorError, asyncio.TimeoutError): + if first_attempt: + print(f"failed connecting to {server}") + await boris_says(slackclient, random.choice([ + f"{server} is not responding, probably dead, will not connect again.", + ])) + return + raise + + if first_attempt: + await boris_says(slackclient, f"{server} is online") + else: + await boris_says(slackclient, f"{server} is back online") + + delay = 30 + first_attempt = False + print(f"connected to {server}") + + async for msg in ws: + await handle_analytics_event(c, json.loads(msg.data), server) + db.commit() + except (aiohttp.ClientConnectorError, asyncio.TimeoutError): - print(f"failed connecting to {server}") - return - print(f"connected to {server}") - - async for msg in ws: - r = json.loads(msg.data) - - c.execute(""" - INSERT INTO wallet_server_stats (server, sessions, event_time) VALUES (%s,%s,%s); - """, (server, r['status']['sessions'], datetime.now())) - - for command, stats in r["api"].items(): - data = { - 'server': server, - 'command': command, - 'event_time': datetime.now() - } - for key, value in stats.items(): - if key.endswith("_queries"): - continue - if isinstance(value, list): - data.update({ - key+'_avg': value[0], - key+'_min': value[1], - key+'_five': value[2], - key+'_twenty_five': value[3], - key+'_fifty': value[4], - key+'_seventy_five': value[5], - key+'_ninety_five': value[6], - key+'_max': value[7], - }) - else: - data[key] = value - - c.execute(f""" - INSERT INTO wallet_server_command_stats ({','.join(data)}) - VALUES ({','.join('%s' for _ in data)}); - """, list(data.values())) - - db.commit() + await boris_says(slackclient, random.choice([ + f"Guys, we have a problem! Nobody home at {server}. Will check on it again in {delay} seconds." + f"Something wrong with {server}. I think dead. Will poke it again in {delay} seconds.", + f"Don't hear anything from {server}, maybe dead. Will try it again in {delay} seconds." + ])) + await asyncio.sleep(delay) + delay += 30 -async def main(dsn, servers): +async def main(dsn, servers, slackclient): db = ensure_database(dsn) + await boris_says(slackclient, random.choice([ + "No fear, Boris is here! I will monitor the servers now and will try not to fall asleep again.", + "Comrad the Cat and Boris are here now, monitoring wallet servers.", + ])) await asyncio.gather(*( - asyncio.create_task(monitor(db, server)) + asyncio.create_task(monitor(db, server, slackclient)) for server in servers )) @@ -199,6 +242,11 @@ def get_servers(args): ] +def get_slack_client(args): + if args.slack_token: + return slack.WebClient(token=args.slack_token, run_async=True) + + def get_args(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--pg-dbname", default="analytics", help="PostgreSQL database name") @@ -208,9 +256,21 @@ def get_args(): parser.add_argument("--pg-port", default="5432", help="PostgreSQL port") parser.add_argument("--server-url", default="http://spv{}.lbry.com:50005", help="URL with '{}' placeholder") parser.add_argument("--server-range", default="1..5", help="Range of numbers or single number to use in URL placeholder") + parser.add_argument("--slack-token") return parser.parse_args() if __name__ == "__main__": + loop = asyncio.get_event_loop() args = get_args() - asyncio.run(main(get_dsn(args), get_servers(args))) + slackclient = get_slack_client(args) + try: + loop.run_until_complete(main(get_dsn(args), get_servers(args), slackclient)) + finally: + loop.run_until_complete( + boris_says(slackclient, random.choice([ + "Wallet servers will have to watch themselves, I'm leaving now.", + "I'm going to go take a nap, hopefully nothing blows up while I'm gone.", + "Babushka is calling, I'll be back later, someone else watch the servers while I'm gone.", + ])) + )