From a58ed6f331bf5c217f03f171b0079b0542e51e4c Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 5 Aug 2019 09:23:47 -0400 Subject: [PATCH] wallet server metabase analytics script --- .../wallet_server_metabase_analytics.py | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 lbry/scripts/wallet_server_metabase_analytics.py diff --git a/lbry/scripts/wallet_server_metabase_analytics.py b/lbry/scripts/wallet_server_metabase_analytics.py new file mode 100644 index 000000000..cf7273386 --- /dev/null +++ b/lbry/scripts/wallet_server_metabase_analytics.py @@ -0,0 +1,216 @@ +import sys +import json +import asyncio +import argparse +from datetime import datetime + +try: + import aiohttp + import psycopg2 +except ImportError: + print(f"To run {sys.argv[0]} you need to install aiohttp and psycopg2:") + print(f"") + print(f" $ pip install aiohttp psycopg2") + print("") + sys.exit(1) + +if not sys.version_info >= (3, 7): + print("Please use Python 3.7 or higher, this script expects that dictionary keys preserve order.") + sys.exit(1) + + +async def monitor(db, server): + c = db.cursor() + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session: + try: + ws = await session.ws_connect(server) + except (aiohttp.ClientConnectorError, asyncio.TimeoutError): + print(f"failed connecting to {server}") + return + print(f"connected to {server}") + + async for msg in ws: + r = json.loads(msg.data) + + c.execute(""" + INSERT INTO wallet_server_stats (server, sessions, event_time) VALUES (%s,%s,%s); + """, (server, r['status']['sessions'], datetime.now())) + + for command, stats in r["api"].items(): + data = { + 'server': server, + 'command': command, + 'event_time': datetime.now() + } + for key, value in stats.items(): + if key.endswith("_queries"): + continue + if isinstance(value, list): + data.update({ + key+'_avg': value[0], + key+'_min': value[1], + key+'_five': value[2], + key+'_twenty_five': value[3], + key+'_fifty': value[4], + key+'_seventy_five': value[5], + key+'_ninety_five': value[6], + key+'_max': value[7], + }) + else: + data[key] = value + + c.execute(f""" + INSERT INTO wallet_server_command_stats ({','.join(data)}) + VALUES ({','.join('%s' for _ in data)}); + """, list(data.values())) + + db.commit() + + +async def main(dsn, servers): + db = ensure_database(dsn) + await asyncio.gather(*( + asyncio.create_task(monitor(db, server)) + for server in servers + )) + + +def ensure_database(dsn): + db = psycopg2.connect(**dsn) + c = db.cursor() + c.execute("SELECT to_regclass('wallet_server_stats');") + if c.fetchone()[0] is None: + print("creating table 'wallet_server_stats'...") + c.execute(""" + CREATE TABLE wallet_server_stats ( + server text, + sessions integer, + event_time timestamp + ); + """) + print("creating table 'wallet_server_command_stats'...") + c.execute(""" + CREATE TABLE wallet_server_command_stats ( + server text, + command text, + event_time timestamp, + + -- total requests received during event window + receive_count integer, + + -- sum of these is total responses made + cache_response_count integer, + query_response_count integer, + intrp_response_count integer, + error_response_count integer, + + -- millisecond timings for non-cache responses (response_*, interrupt_*, error_*) + + response_avg float, + response_min float, + response_five float, + response_twenty_five float, + response_fifty float, + response_seventy_five float, + response_ninety_five float, + response_max float, + + interrupt_avg float, + interrupt_min float, + interrupt_five float, + interrupt_twenty_five float, + interrupt_fifty float, + interrupt_seventy_five float, + interrupt_ninety_five float, + interrupt_max float, + + error_avg float, + error_min float, + error_five float, + error_twenty_five float, + error_fifty float, + error_seventy_five float, + error_ninety_five float, + error_max float, + + -- response, interrupt and error each also report the python, wait and sql stats + + python_avg float, + python_min float, + python_five float, + python_twenty_five float, + python_fifty float, + python_seventy_five float, + python_ninety_five float, + python_max float, + + wait_avg float, + wait_min float, + wait_five float, + wait_twenty_five float, + wait_fifty float, + wait_seventy_five float, + wait_ninety_five float, + wait_max float, + + sql_avg float, + sql_min float, + sql_five float, + sql_twenty_five float, + sql_fifty float, + sql_seventy_five float, + sql_ninety_five float, + sql_max float, + + -- extended timings for individual sql executions + individual_sql_avg float, + individual_sql_min float, + individual_sql_five float, + individual_sql_twenty_five float, + individual_sql_fifty float, + individual_sql_seventy_five float, + individual_sql_ninety_five float, + individual_sql_max float, + + individual_sql_count integer + ); + """) + db.commit() + return db + + +def get_dsn(args): + dsn = {} + for attr in ('dbname', 'user', 'password', 'host', 'port'): + value = getattr(args, f'pg_{attr}') + if value: + dsn[attr] = value + return dsn + + +def get_servers(args): + if '..' in args.server_range: + start, end = args.server_range.split('..') + else: + start = end = args.server_range + return [ + args.server_url.format(i) + for i in range(int(start), int(end)+1) + ] + + +def get_args(): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--pg-dbname", default="analytics", help="PostgreSQL database name") + parser.add_argument("--pg-user", help="PostgreSQL username") + parser.add_argument("--pg-password", help="PostgreSQL password") + parser.add_argument("--pg-host", default="localhost", help="PostgreSQL host") + parser.add_argument("--pg-port", default="5432", help="PostgreSQL port") + parser.add_argument("--server-url", default="http://spv{}.lbry.com:50005", help="URL with '{}' placeholder") + parser.add_argument("--server-range", default="1..5", help="Range of numbers or single number to use in URL placeholder") + return parser.parse_args() + + +if __name__ == "__main__": + args = get_args() + asyncio.run(main(get_dsn(args), get_servers(args)))