forked from LBRYCommunity/lbry-sdk
updated wallet_server_monitor to update slack
This commit is contained in:
parent
ee10ebd5d7
commit
1c12247b71
1 changed files with 108 additions and 48 deletions
|
@ -1,5 +1,6 @@
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
|
import random
|
||||||
import asyncio
|
import asyncio
|
||||||
import argparse
|
import argparse
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
@ -7,10 +8,11 @@ from datetime import datetime
|
||||||
try:
|
try:
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
import slack
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print(f"To run {sys.argv[0]} you need to install aiohttp and psycopg2:")
|
print(f"To run {sys.argv[0]} you need to install aiohttp, psycopg2 and slackclient:")
|
||||||
print(f"")
|
print(f"")
|
||||||
print(f" $ pip install aiohttp psycopg2")
|
print(f" $ pip install aiohttp psycopg2 slackclient")
|
||||||
print("")
|
print("")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
@ -19,58 +21,99 @@ if not sys.version_info >= (3, 7):
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
async def monitor(db, server):
|
async def handle_analytics_event(cursor, event, server):
|
||||||
|
cursor.execute("""
|
||||||
|
INSERT INTO wallet_server_stats (server, sessions, event_time) VALUES (%s,%s,%s);
|
||||||
|
""", (server, event['status']['sessions'], datetime.now()))
|
||||||
|
|
||||||
|
for command, stats in event["api"].items():
|
||||||
|
data = {
|
||||||
|
'server': server,
|
||||||
|
'command': command,
|
||||||
|
'event_time': datetime.now()
|
||||||
|
}
|
||||||
|
for key, value in stats.items():
|
||||||
|
if key.endswith("_queries"):
|
||||||
|
continue
|
||||||
|
if isinstance(value, list):
|
||||||
|
data.update({
|
||||||
|
key + '_avg': value[0],
|
||||||
|
key + '_min': value[1],
|
||||||
|
key + '_five': value[2],
|
||||||
|
key + '_twenty_five': value[3],
|
||||||
|
key + '_fifty': value[4],
|
||||||
|
key + '_seventy_five': value[5],
|
||||||
|
key + '_ninety_five': value[6],
|
||||||
|
key + '_max': value[7],
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
data[key] = value
|
||||||
|
|
||||||
|
cursor.execute(f"""
|
||||||
|
INSERT INTO wallet_server_command_stats ({','.join(data)})
|
||||||
|
VALUES ({','.join('%s' for _ in data)});
|
||||||
|
""", list(data.values()))
|
||||||
|
|
||||||
|
|
||||||
|
async def boris_says(slackclient, what_boris_says):
|
||||||
|
if slackclient:
|
||||||
|
await slackclient.chat_postMessage(
|
||||||
|
username="boris the wallet monitor",
|
||||||
|
icon_emoji=":boris:",
|
||||||
|
channel='#tech-sdk',
|
||||||
|
text=what_boris_says
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def monitor(db, server, slackclient):
|
||||||
c = db.cursor()
|
c = db.cursor()
|
||||||
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
|
delay = 30
|
||||||
|
first_attempt = True
|
||||||
|
while True:
|
||||||
try:
|
try:
|
||||||
ws = await session.ws_connect(server)
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
|
||||||
|
try:
|
||||||
|
ws = await session.ws_connect(server)
|
||||||
|
except (aiohttp.ClientConnectorError, asyncio.TimeoutError):
|
||||||
|
if first_attempt:
|
||||||
|
print(f"failed connecting to {server}")
|
||||||
|
await boris_says(slackclient, random.choice([
|
||||||
|
f"{server} is not responding, probably dead, will not connect again.",
|
||||||
|
]))
|
||||||
|
return
|
||||||
|
raise
|
||||||
|
|
||||||
|
if first_attempt:
|
||||||
|
await boris_says(slackclient, f"{server} is online")
|
||||||
|
else:
|
||||||
|
await boris_says(slackclient, f"{server} is back online")
|
||||||
|
|
||||||
|
delay = 30
|
||||||
|
first_attempt = False
|
||||||
|
print(f"connected to {server}")
|
||||||
|
|
||||||
|
async for msg in ws:
|
||||||
|
await handle_analytics_event(c, json.loads(msg.data), server)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
except (aiohttp.ClientConnectorError, asyncio.TimeoutError):
|
except (aiohttp.ClientConnectorError, asyncio.TimeoutError):
|
||||||
print(f"failed connecting to {server}")
|
await boris_says(slackclient, random.choice([
|
||||||
return
|
f"Guys, we have a problem! Nobody home at {server}. Will check on it again in {delay} seconds."
|
||||||
print(f"connected to {server}")
|
f"Something wrong with {server}. I think dead. Will poke it again in {delay} seconds.",
|
||||||
|
f"Don't hear anything from {server}, maybe dead. Will try it again in {delay} seconds."
|
||||||
async for msg in ws:
|
]))
|
||||||
r = json.loads(msg.data)
|
await asyncio.sleep(delay)
|
||||||
|
delay += 30
|
||||||
c.execute("""
|
|
||||||
INSERT INTO wallet_server_stats (server, sessions, event_time) VALUES (%s,%s,%s);
|
|
||||||
""", (server, r['status']['sessions'], datetime.now()))
|
|
||||||
|
|
||||||
for command, stats in r["api"].items():
|
|
||||||
data = {
|
|
||||||
'server': server,
|
|
||||||
'command': command,
|
|
||||||
'event_time': datetime.now()
|
|
||||||
}
|
|
||||||
for key, value in stats.items():
|
|
||||||
if key.endswith("_queries"):
|
|
||||||
continue
|
|
||||||
if isinstance(value, list):
|
|
||||||
data.update({
|
|
||||||
key+'_avg': value[0],
|
|
||||||
key+'_min': value[1],
|
|
||||||
key+'_five': value[2],
|
|
||||||
key+'_twenty_five': value[3],
|
|
||||||
key+'_fifty': value[4],
|
|
||||||
key+'_seventy_five': value[5],
|
|
||||||
key+'_ninety_five': value[6],
|
|
||||||
key+'_max': value[7],
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
data[key] = value
|
|
||||||
|
|
||||||
c.execute(f"""
|
|
||||||
INSERT INTO wallet_server_command_stats ({','.join(data)})
|
|
||||||
VALUES ({','.join('%s' for _ in data)});
|
|
||||||
""", list(data.values()))
|
|
||||||
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
|
|
||||||
async def main(dsn, servers):
|
async def main(dsn, servers, slackclient):
|
||||||
db = ensure_database(dsn)
|
db = ensure_database(dsn)
|
||||||
|
await boris_says(slackclient, random.choice([
|
||||||
|
"No fear, Boris is here! I will monitor the servers now and will try not to fall asleep again.",
|
||||||
|
"Comrad the Cat and Boris are here now, monitoring wallet servers.",
|
||||||
|
]))
|
||||||
await asyncio.gather(*(
|
await asyncio.gather(*(
|
||||||
asyncio.create_task(monitor(db, server))
|
asyncio.create_task(monitor(db, server, slackclient))
|
||||||
for server in servers
|
for server in servers
|
||||||
))
|
))
|
||||||
|
|
||||||
|
@ -199,6 +242,11 @@ def get_servers(args):
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def get_slack_client(args):
|
||||||
|
if args.slack_token:
|
||||||
|
return slack.WebClient(token=args.slack_token, run_async=True)
|
||||||
|
|
||||||
|
|
||||||
def get_args():
|
def get_args():
|
||||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
parser.add_argument("--pg-dbname", default="analytics", help="PostgreSQL database name")
|
parser.add_argument("--pg-dbname", default="analytics", help="PostgreSQL database name")
|
||||||
|
@ -208,9 +256,21 @@ def get_args():
|
||||||
parser.add_argument("--pg-port", default="5432", help="PostgreSQL port")
|
parser.add_argument("--pg-port", default="5432", help="PostgreSQL port")
|
||||||
parser.add_argument("--server-url", default="http://spv{}.lbry.com:50005", help="URL with '{}' placeholder")
|
parser.add_argument("--server-url", default="http://spv{}.lbry.com:50005", help="URL with '{}' placeholder")
|
||||||
parser.add_argument("--server-range", default="1..5", help="Range of numbers or single number to use in URL placeholder")
|
parser.add_argument("--server-range", default="1..5", help="Range of numbers or single number to use in URL placeholder")
|
||||||
|
parser.add_argument("--slack-token")
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
args = get_args()
|
args = get_args()
|
||||||
asyncio.run(main(get_dsn(args), get_servers(args)))
|
slackclient = get_slack_client(args)
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(main(get_dsn(args), get_servers(args), slackclient))
|
||||||
|
finally:
|
||||||
|
loop.run_until_complete(
|
||||||
|
boris_says(slackclient, random.choice([
|
||||||
|
"Wallet servers will have to watch themselves, I'm leaving now.",
|
||||||
|
"I'm going to go take a nap, hopefully nothing blows up while I'm gone.",
|
||||||
|
"Babushka is calling, I'll be back later, someone else watch the servers while I'm gone.",
|
||||||
|
]))
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue