From 366b0d590c8a472d8633e47ad30e8571dc23271b Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg <grin@lbry.com> Date: Wed, 20 Jan 2021 17:12:20 -0500 Subject: [PATCH] api to check seeder count, better saving of routing table --- scripts/tracker.py | 63 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/scripts/tracker.py b/scripts/tracker.py index 457af42ed..ec61de24b 100644 --- a/scripts/tracker.py +++ b/scripts/tracker.py @@ -7,6 +7,8 @@ import sqlite3 import pickle from os import path from pprint import pprint +from aiohttp import web +import json from lbry.dht import node, peer @@ -39,6 +41,8 @@ async def main(): # for items in res: # print(items) + asyncio.create_task(run_web_api(loop, db)) + num_nodes = 16 start_port = 4444 known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)] @@ -48,15 +52,19 @@ async def main(): try: for i in range(num_nodes): - assert i < 16 # my ghetto int -> node_id converter requires this + assert i < 16 # my ghetto int -> node_id converter requires this node_id = '0123456789abcdef'[i] + '0' * 95 # pprint(node_id) + port = start_port + i - await u.get_next_mapping(port, "UDP", "lbry dht tracker", port) + await u.get_next_mapping(port, "UDP", "lbry dht tracker") + # SOMETHING ABOUT THIS DOESNT WORK + # port = await u.get_next_mapping(start_port, "UDP", "lbry dht tracker") + n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip, udp_port=port, internal_udp_port=port, peer_port=3333) - persisted_peers =[] + persisted_peers = [] if path.exists(state_dir + node_id): with open(state_dir + node_id, 'rb') as f: state = pickle.load(f) @@ -65,7 +73,9 @@ async def main(): print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') n.load_state(state) persisted_peers = state.routing_table_peers - + if len(persisted_peers) == 0 and len(state.datastore) > 0: + persisted_peers.extend(map(lambda x: (x[0], x[1], x[2], x[3]), state.datastore)) + print(f'{node_id[:8]}: rt is empty but we recovered {len(persisted_peers)} peers from the datastore') n.start("0.0.0.0", known_node_urls, persisted_peers) nodes.append(n) @@ -99,14 +109,23 @@ async def main(): for n in nodes: node_id = bytes.hex(n.protocol.node_id) n.stop() + print(f'deleting upnp port mapping {n.protocol.udp_port}') + await u.delete_port_mapping(n.protocol.udp_port, "UDP") + state = n.get_state() + # keep existing rt if there is one + if len(state.routing_table_peers) == 0 and path.exists(state_dir + node_id): + with open(state_dir + node_id, 'rb') as f: + existing_state = pickle.load(f) + if len(existing_state.routing_table_peers) > 0: + state.routing_table_peers = existing_state.routing_table_peers + print(f'rt empty on save, but old rt was recovered ({len(state.routing_table_peers)} peers)') with open(state_dir + node_id, 'wb') as f: # pprint(state.routing_table_peers) # pprint(state.datastore) print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') pickle.dump(state, f) db.close() - await u.delete_port_mapping(n.protocol.udp_port, "UDP") class ShutdownErr(BaseException): @@ -127,6 +146,40 @@ async def drain(n, q): except asyncio.QueueFull: pass + +async def run_web_api(loop, db): + app = web.Application(loop=loop) + app['db'] = db + app.add_routes([ + web.get('/', api_handler), + web.get('/seeds/{hash}', seeds_handler), + ]) + # server = web.Server(api_handler, loop=loop) + # runner = web.ServerRunner(server) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, 'localhost', 8080) + await site.start() + # web.run_app(app) + + +async def seeds_handler(request): + blobhash = request.match_info['hash'] + db = request.app['db'] + try: + cur = db.cursor() + c = cur.execute(""" + select count(distinct(node_id)) from log where hash = ? and timestamp > strftime('%s','now','-1 day') + """, (blobhash,)).fetchone()[0] + cur.close() + return web.Response(text=json.dumps({'seeds': c})+"\n") + except Exception as err: + return web.Response(text=json.dumps({'error': err})+"\n") + + +async def api_handler(request): + return web.Response(text="tracker OK") + if __name__ == "__main__": try: asyncio.run(main())