lbry-sdk/scripts/dht_monitor.py
2020-01-01 15:57:56 -05:00

64 lines
1.4 KiB
Python

import curses
import time
import asyncio
from lbry.conf import Config
from lbry.extras.daemon.client import daemon_rpc
stdscr = curses.initscr()
def init_curses():
curses.noecho()
curses.cbreak()
stdscr.nodelay(1)
stdscr.keypad(1)
def teardown_curses():
curses.nocbreak()
stdscr.keypad(0)
curses.echo()
curses.endwin()
def refresh(routing_table_info):
height, width = stdscr.getmaxyx()
node_id = routing_table_info['node_id']
for y in range(height):
stdscr.addstr(y, 0, " " * (width - 1))
buckets = routing_table_info['buckets']
stdscr.addstr(0, 0, f"node id: {node_id}")
stdscr.addstr(1, 0, f"{len(buckets)} buckets")
y = 3
for i in range(len(buckets)):
stdscr.addstr(y, 0, "bucket %s" % i)
y += 1
for peer in buckets[str(i)]:
stdscr.addstr(y, 0, f"{peer['node_id'][:8]} ({peer['address']}:{peer['udp_port']})")
y += 1
y += 1
stdscr.addstr(y + 1, 0, str(time.time()))
stdscr.refresh()
async def main():
conf = Config()
try:
init_curses()
c = None
while c not in [ord('q'), ord('Q')]:
routing_info = await daemon_rpc(conf, 'routing_table_get')
refresh(routing_info)
c = stdscr.getch()
time.sleep(0.1)
finally:
teardown_curses()
if __name__ == "__main__":
asyncio.run(main())