forked from LBRYCommunity/lbry-sdk
216 lines
6.9 KiB
Python
216 lines
6.9 KiB
Python
import sys
|
|
import json
|
|
import asyncio
|
|
import argparse
|
|
from datetime import datetime
|
|
|
|
try:
|
|
import aiohttp
|
|
import psycopg2
|
|
except ImportError:
|
|
print(f"To run {sys.argv[0]} you need to install aiohttp and psycopg2:")
|
|
print(f"")
|
|
print(f" $ pip install aiohttp psycopg2")
|
|
print("")
|
|
sys.exit(1)
|
|
|
|
if not sys.version_info >= (3, 7):
|
|
print("Please use Python 3.7 or higher, this script expects that dictionary keys preserve order.")
|
|
sys.exit(1)
|
|
|
|
|
|
async def monitor(db, server):
|
|
c = db.cursor()
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
|
|
try:
|
|
ws = await session.ws_connect(server)
|
|
except (aiohttp.ClientConnectorError, asyncio.TimeoutError):
|
|
print(f"failed connecting to {server}")
|
|
return
|
|
print(f"connected to {server}")
|
|
|
|
async for msg in ws:
|
|
r = json.loads(msg.data)
|
|
|
|
c.execute("""
|
|
INSERT INTO wallet_server_stats (server, sessions, event_time) VALUES (%s,%s,%s);
|
|
""", (server, r['status']['sessions'], datetime.now()))
|
|
|
|
for command, stats in r["api"].items():
|
|
data = {
|
|
'server': server,
|
|
'command': command,
|
|
'event_time': datetime.now()
|
|
}
|
|
for key, value in stats.items():
|
|
if key.endswith("_queries"):
|
|
continue
|
|
if isinstance(value, list):
|
|
data.update({
|
|
key+'_avg': value[0],
|
|
key+'_min': value[1],
|
|
key+'_five': value[2],
|
|
key+'_twenty_five': value[3],
|
|
key+'_fifty': value[4],
|
|
key+'_seventy_five': value[5],
|
|
key+'_ninety_five': value[6],
|
|
key+'_max': value[7],
|
|
})
|
|
else:
|
|
data[key] = value
|
|
|
|
c.execute(f"""
|
|
INSERT INTO wallet_server_command_stats ({','.join(data)})
|
|
VALUES ({','.join('%s' for _ in data)});
|
|
""", list(data.values()))
|
|
|
|
db.commit()
|
|
|
|
|
|
async def main(dsn, servers):
|
|
db = ensure_database(dsn)
|
|
await asyncio.gather(*(
|
|
asyncio.create_task(monitor(db, server))
|
|
for server in servers
|
|
))
|
|
|
|
|
|
def ensure_database(dsn):
|
|
db = psycopg2.connect(**dsn)
|
|
c = db.cursor()
|
|
c.execute("SELECT to_regclass('wallet_server_stats');")
|
|
if c.fetchone()[0] is None:
|
|
print("creating table 'wallet_server_stats'...")
|
|
c.execute("""
|
|
CREATE TABLE wallet_server_stats (
|
|
server text,
|
|
sessions integer,
|
|
event_time timestamp
|
|
);
|
|
""")
|
|
print("creating table 'wallet_server_command_stats'...")
|
|
c.execute("""
|
|
CREATE TABLE wallet_server_command_stats (
|
|
server text,
|
|
command text,
|
|
event_time timestamp,
|
|
|
|
-- total requests received during event window
|
|
receive_count integer,
|
|
|
|
-- sum of these is total responses made
|
|
cache_response_count integer,
|
|
query_response_count integer,
|
|
intrp_response_count integer,
|
|
error_response_count integer,
|
|
|
|
-- millisecond timings for non-cache responses (response_*, interrupt_*, error_*)
|
|
|
|
response_avg float,
|
|
response_min float,
|
|
response_five float,
|
|
response_twenty_five float,
|
|
response_fifty float,
|
|
response_seventy_five float,
|
|
response_ninety_five float,
|
|
response_max float,
|
|
|
|
interrupt_avg float,
|
|
interrupt_min float,
|
|
interrupt_five float,
|
|
interrupt_twenty_five float,
|
|
interrupt_fifty float,
|
|
interrupt_seventy_five float,
|
|
interrupt_ninety_five float,
|
|
interrupt_max float,
|
|
|
|
error_avg float,
|
|
error_min float,
|
|
error_five float,
|
|
error_twenty_five float,
|
|
error_fifty float,
|
|
error_seventy_five float,
|
|
error_ninety_five float,
|
|
error_max float,
|
|
|
|
-- response, interrupt and error each also report the python, wait and sql stats
|
|
|
|
python_avg float,
|
|
python_min float,
|
|
python_five float,
|
|
python_twenty_five float,
|
|
python_fifty float,
|
|
python_seventy_five float,
|
|
python_ninety_five float,
|
|
python_max float,
|
|
|
|
wait_avg float,
|
|
wait_min float,
|
|
wait_five float,
|
|
wait_twenty_five float,
|
|
wait_fifty float,
|
|
wait_seventy_five float,
|
|
wait_ninety_five float,
|
|
wait_max float,
|
|
|
|
sql_avg float,
|
|
sql_min float,
|
|
sql_five float,
|
|
sql_twenty_five float,
|
|
sql_fifty float,
|
|
sql_seventy_five float,
|
|
sql_ninety_five float,
|
|
sql_max float,
|
|
|
|
-- extended timings for individual sql executions
|
|
individual_sql_avg float,
|
|
individual_sql_min float,
|
|
individual_sql_five float,
|
|
individual_sql_twenty_five float,
|
|
individual_sql_fifty float,
|
|
individual_sql_seventy_five float,
|
|
individual_sql_ninety_five float,
|
|
individual_sql_max float,
|
|
|
|
individual_sql_count integer
|
|
);
|
|
""")
|
|
db.commit()
|
|
return db
|
|
|
|
|
|
def get_dsn(args):
|
|
dsn = {}
|
|
for attr in ('dbname', 'user', 'password', 'host', 'port'):
|
|
value = getattr(args, f'pg_{attr}')
|
|
if value:
|
|
dsn[attr] = value
|
|
return dsn
|
|
|
|
|
|
def get_servers(args):
|
|
if '..' in args.server_range:
|
|
start, end = args.server_range.split('..')
|
|
else:
|
|
start = end = args.server_range
|
|
return [
|
|
args.server_url.format(i)
|
|
for i in range(int(start), int(end)+1)
|
|
]
|
|
|
|
|
|
def get_args():
|
|
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument("--pg-dbname", default="analytics", help="PostgreSQL database name")
|
|
parser.add_argument("--pg-user", help="PostgreSQL username")
|
|
parser.add_argument("--pg-password", help="PostgreSQL password")
|
|
parser.add_argument("--pg-host", default="localhost", help="PostgreSQL host")
|
|
parser.add_argument("--pg-port", default="5432", help="PostgreSQL port")
|
|
parser.add_argument("--server-url", default="http://spv{}.lbry.com:50005", help="URL with '{}' placeholder")
|
|
parser.add_argument("--server-range", default="1..5", help="Range of numbers or single number to use in URL placeholder")
|
|
return parser.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = get_args()
|
|
asyncio.run(main(get_dsn(args), get_servers(args)))
|