From 956e21269ebaf49868e3d80e2c561114b8689d32 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 19 Aug 2019 09:17:23 -0400 Subject: [PATCH] monitor wallet server block transitions and report if new block is excessively delayed --- lbry/lbry/wallet/server/session.py | 5 ++- lbry/scripts/wallet_server_monitor.py | 64 +++++++++++++++++++-------- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index 7b861c934..2f3c29314 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -54,7 +54,10 @@ class LBRYSessionManager(SessionManager): async def process_metrics(self): while self.running: - data = self.metrics.to_json_and_reset({'sessions': self.session_count()}) + data = self.metrics.to_json_and_reset({ + 'sessions': self.session_count(), + 'height': self.db.db_height, + }) if self.websocket is not None: self.websocket.send_message(data) await asyncio.sleep(1) diff --git a/lbry/scripts/wallet_server_monitor.py b/lbry/scripts/wallet_server_monitor.py index 3e7240f80..4b98814ef 100644 --- a/lbry/scripts/wallet_server_monitor.py +++ b/lbry/scripts/wallet_server_monitor.py @@ -3,6 +3,8 @@ import json import random import asyncio import argparse +import traceback +from time import time from datetime import datetime try: @@ -64,19 +66,26 @@ async def handle_analytics_event(cursor, event, server): """, list(data.values())) -async def boris_says(slackclient, what_boris_says): - if slackclient: - await slackclient.chat_postMessage( +SLACKCLIENT = None + + +async def boris_says(what_boris_says): + if SLACKCLIENT: + await SLACKCLIENT.chat_postMessage( username="boris the wallet monitor", icon_emoji=":boris:", channel='#tech-sdk', text=what_boris_says ) + else: + print(what_boris_says) -async def monitor(db, server, slackclient): +async def monitor(db, server): c = db.cursor() delay = 30 + height_changed = None, time() + height_change_reported = False first_attempt = True while True: try: @@ -86,43 +95,59 @@ async def monitor(db, server, slackclient): except (aiohttp.ClientConnectorError, asyncio.TimeoutError): if first_attempt: print(f"failed connecting to {server}") - await boris_says(slackclient, random.choice([ + await boris_says(random.choice([ f"{server} is not responding, probably dead, will not connect again.", ])) return raise if first_attempt: - await boris_says(slackclient, f"{server} is online") + await boris_says(f"{server} is online") else: - await boris_says(slackclient, f"{server} is back online") + await boris_says(f"{server} is back online") delay = 30 first_attempt = False print(f"connected to {server}") async for msg in ws: - await handle_analytics_event(c, json.loads(msg.data), server) + event = json.loads(msg.data) + height = event['status']['height'] + height_change_time = int(time()-height_changed[1]) + if height_changed[0] != height: + height_changed = (height, time()) + if height_change_reported: + await boris_says( + f"Server {server} received new block after {height_change_time / 60:.1f} minutes.", + ) + height_change_reported = False + elif height_change_time > 10*60: + if not height_change_reported or height_change_time % (2*60) == 0: + await boris_says( + f"It's been {height_change_time/60:.1f} minutes since {server} received a new block.", + ) + height_change_reported = True + await handle_analytics_event(c, event, server) db.commit() except (aiohttp.ClientConnectorError, asyncio.TimeoutError): - await boris_says(slackclient, random.choice([ - f"Guys, we have a problem! Nobody home at {server}. Will check on it again in {delay} seconds.", - f"Something wrong with {server}. I think dead. Will poke it again in {delay} seconds.", - f"Don't hear anything from {server}, maybe dead. Will try it again in {delay} seconds.", + await boris_says(random.choice([ + f" Guys, we have a problem! Nobody home at {server}. Will check on it again in {delay} seconds.", + f" Something wrong with {server}. I think dead. Will poke it again in {delay} seconds.", + f" Don't hear anything from {server}, maybe dead. Will try it again in {delay} seconds.", ])) await asyncio.sleep(delay) delay += 30 -async def main(dsn, servers, slackclient): +async def main(dsn, servers): db = ensure_database(dsn) - await boris_says(slackclient, random.choice([ + await boris_says(random.choice([ "No fear, Boris is here! I will monitor the servers now and will try not to fall asleep again.", "Comrad the Cat and Boris are here now, monitoring wallet servers.", ])) await asyncio.gather(*( - asyncio.create_task(monitor(db, server, slackclient)) + asyncio.create_task(monitor(db, server)) for server in servers )) @@ -288,16 +313,17 @@ def get_args(): if __name__ == "__main__": loop = asyncio.get_event_loop() args = get_args() - slackclient = get_slack_client(args) + SLACKCLIENT = get_slack_client(args) try: - loop.run_until_complete(main(get_dsn(args), get_servers(args), slackclient)) + loop.run_until_complete(main(get_dsn(args), get_servers(args))) except KeyboardInterrupt as e: pass except Exception as e: - loop.run_until_complete(boris_says(slackclient, repr(e))) + loop.run_until_complete(boris_says(" I crashed with the following exception:")) + loop.run_until_complete(boris_says(traceback.format_exc())) finally: loop.run_until_complete( - boris_says(slackclient, random.choice([ + boris_says(random.choice([ "Wallet servers will have to watch themselves, I'm leaving now.", "I'm going to go take a nap, hopefully nothing blows up while I'm gone.", "Babushka is calling, I'll be back later, someone else watch the servers while I'm gone.",