monitor wallet server block transitions and report if new block is excessively delayed
This commit is contained in:
parent
ff73418fc1
commit
956e21269e
2 changed files with 49 additions and 20 deletions
|
@ -54,7 +54,10 @@ class LBRYSessionManager(SessionManager):
|
|||
|
||||
async def process_metrics(self):
|
||||
while self.running:
|
||||
data = self.metrics.to_json_and_reset({'sessions': self.session_count()})
|
||||
data = self.metrics.to_json_and_reset({
|
||||
'sessions': self.session_count(),
|
||||
'height': self.db.db_height,
|
||||
})
|
||||
if self.websocket is not None:
|
||||
self.websocket.send_message(data)
|
||||
await asyncio.sleep(1)
|
||||
|
|
|
@ -3,6 +3,8 @@ import json
|
|||
import random
|
||||
import asyncio
|
||||
import argparse
|
||||
import traceback
|
||||
from time import time
|
||||
from datetime import datetime
|
||||
|
||||
try:
|
||||
|
@ -64,19 +66,26 @@ async def handle_analytics_event(cursor, event, server):
|
|||
""", list(data.values()))
|
||||
|
||||
|
||||
async def boris_says(slackclient, what_boris_says):
|
||||
if slackclient:
|
||||
await slackclient.chat_postMessage(
|
||||
SLACKCLIENT = None
|
||||
|
||||
|
||||
async def boris_says(what_boris_says):
|
||||
if SLACKCLIENT:
|
||||
await SLACKCLIENT.chat_postMessage(
|
||||
username="boris the wallet monitor",
|
||||
icon_emoji=":boris:",
|
||||
channel='#tech-sdk',
|
||||
text=what_boris_says
|
||||
)
|
||||
else:
|
||||
print(what_boris_says)
|
||||
|
||||
|
||||
async def monitor(db, server, slackclient):
|
||||
async def monitor(db, server):
|
||||
c = db.cursor()
|
||||
delay = 30
|
||||
height_changed = None, time()
|
||||
height_change_reported = False
|
||||
first_attempt = True
|
||||
while True:
|
||||
try:
|
||||
|
@ -86,43 +95,59 @@ async def monitor(db, server, slackclient):
|
|||
except (aiohttp.ClientConnectorError, asyncio.TimeoutError):
|
||||
if first_attempt:
|
||||
print(f"failed connecting to {server}")
|
||||
await boris_says(slackclient, random.choice([
|
||||
await boris_says(random.choice([
|
||||
f"{server} is not responding, probably dead, will not connect again.",
|
||||
]))
|
||||
return
|
||||
raise
|
||||
|
||||
if first_attempt:
|
||||
await boris_says(slackclient, f"{server} is online")
|
||||
await boris_says(f"{server} is online")
|
||||
else:
|
||||
await boris_says(slackclient, f"{server} is back online")
|
||||
await boris_says(f"{server} is back online")
|
||||
|
||||
delay = 30
|
||||
first_attempt = False
|
||||
print(f"connected to {server}")
|
||||
|
||||
async for msg in ws:
|
||||
await handle_analytics_event(c, json.loads(msg.data), server)
|
||||
event = json.loads(msg.data)
|
||||
height = event['status']['height']
|
||||
height_change_time = int(time()-height_changed[1])
|
||||
if height_changed[0] != height:
|
||||
height_changed = (height, time())
|
||||
if height_change_reported:
|
||||
await boris_says(
|
||||
f"Server {server} received new block after {height_change_time / 60:.1f} minutes.",
|
||||
)
|
||||
height_change_reported = False
|
||||
elif height_change_time > 10*60:
|
||||
if not height_change_reported or height_change_time % (2*60) == 0:
|
||||
await boris_says(
|
||||
f"It's been {height_change_time/60:.1f} minutes since {server} received a new block.",
|
||||
)
|
||||
height_change_reported = True
|
||||
await handle_analytics_event(c, event, server)
|
||||
db.commit()
|
||||
|
||||
except (aiohttp.ClientConnectorError, asyncio.TimeoutError):
|
||||
await boris_says(slackclient, random.choice([
|
||||
f"Guys, we have a problem! Nobody home at {server}. Will check on it again in {delay} seconds.",
|
||||
f"Something wrong with {server}. I think dead. Will poke it again in {delay} seconds.",
|
||||
f"Don't hear anything from {server}, maybe dead. Will try it again in {delay} seconds.",
|
||||
await boris_says(random.choice([
|
||||
f"<!channel> Guys, we have a problem! Nobody home at {server}. Will check on it again in {delay} seconds.",
|
||||
f"<!channel> Something wrong with {server}. I think dead. Will poke it again in {delay} seconds.",
|
||||
f"<!channel> Don't hear anything from {server}, maybe dead. Will try it again in {delay} seconds.",
|
||||
]))
|
||||
await asyncio.sleep(delay)
|
||||
delay += 30
|
||||
|
||||
|
||||
async def main(dsn, servers, slackclient):
|
||||
async def main(dsn, servers):
|
||||
db = ensure_database(dsn)
|
||||
await boris_says(slackclient, random.choice([
|
||||
await boris_says(random.choice([
|
||||
"No fear, Boris is here! I will monitor the servers now and will try not to fall asleep again.",
|
||||
"Comrad the Cat and Boris are here now, monitoring wallet servers.",
|
||||
]))
|
||||
await asyncio.gather(*(
|
||||
asyncio.create_task(monitor(db, server, slackclient))
|
||||
asyncio.create_task(monitor(db, server))
|
||||
for server in servers
|
||||
))
|
||||
|
||||
|
@ -288,16 +313,17 @@ def get_args():
|
|||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
args = get_args()
|
||||
slackclient = get_slack_client(args)
|
||||
SLACKCLIENT = get_slack_client(args)
|
||||
try:
|
||||
loop.run_until_complete(main(get_dsn(args), get_servers(args), slackclient))
|
||||
loop.run_until_complete(main(get_dsn(args), get_servers(args)))
|
||||
except KeyboardInterrupt as e:
|
||||
pass
|
||||
except Exception as e:
|
||||
loop.run_until_complete(boris_says(slackclient, repr(e)))
|
||||
loop.run_until_complete(boris_says("<!channel> I crashed with the following exception:"))
|
||||
loop.run_until_complete(boris_says(traceback.format_exc()))
|
||||
finally:
|
||||
loop.run_until_complete(
|
||||
boris_says(slackclient, random.choice([
|
||||
boris_says(random.choice([
|
||||
"Wallet servers will have to watch themselves, I'm leaving now.",
|
||||
"I'm going to go take a nap, hopefully nothing blows up while I'm gone.",
|
||||
"Babushka is calling, I'll be back later, someone else watch the servers while I'm gone.",
|
||||
|
|
Loading…
Reference in a new issue