From 99c64021ebf8293476cc029dc0650d200ed3d6be Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Fri, 22 Jan 2021 09:41:51 -0500 Subject: [PATCH] up to 128 nodes --- lbry/dht/node.py | 4 +++- scripts/tracker.py | 30 ++++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 74adbc99f..6dd4d1a93 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -174,7 +174,9 @@ class Node: if not self.joined.is_set(): self.joined.set() log.info( - "joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), + "%s: joined dht, %i peers known in %i buckets", + bytes.fromhex(self.protocol.node_id)[:8], + len(self.protocol.routing_table.get_peers()), self.protocol.routing_table.buckets_with_contacts() ) else: diff --git a/scripts/tracker.py b/scripts/tracker.py index ec61de24b..ba68bdd89 100644 --- a/scripts/tracker.py +++ b/scripts/tracker.py @@ -2,11 +2,11 @@ import asyncio import logging import signal import time -from aioupnp import upnp import sqlite3 import pickle from os import path from pprint import pprint +import aioupnp from aiohttp import web import json @@ -30,7 +30,7 @@ async def main(): pass # Not implemented on Windows peer_manager = peer.PeerManager(loop) - u = await upnp.UPnP.discover() + u = await aioupnp.upnp.UPnP.discover() db = sqlite3.connect(data_dir + "/tracker.sqlite3") db.execute( @@ -43,7 +43,7 @@ async def main(): asyncio.create_task(run_web_api(loop, db)) - num_nodes = 16 + num_nodes = 128 start_port = 4444 known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)] external_ip = await u.get_external_ip() @@ -52,12 +52,11 @@ async def main(): try: for i in range(num_nodes): - assert i < 16 # my ghetto int -> node_id converter requires this - node_id = '0123456789abcdef'[i] + '0' * 95 + node_id = make_node_id(i, num_nodes) # pprint(node_id) port = start_port + i - await u.get_next_mapping(port, "UDP", "lbry dht tracker") + # await u.get_next_mapping(port, "UDP", "lbry dht tracker") # SOMETHING ABOUT THIS DOESNT WORK # port = await u.get_next_mapping(start_port, "UDP", "lbry dht tracker") @@ -109,8 +108,12 @@ async def main(): for n in nodes: node_id = bytes.hex(n.protocol.node_id) n.stop() - print(f'deleting upnp port mapping {n.protocol.udp_port}') - await u.delete_port_mapping(n.protocol.udp_port, "UDP") + # print(f'deleting upnp port mapping {n.protocol.udp_port}') + try: + await u.delete_port_mapping(n.protocol.udp_port, "UDP") + except aioupnp.fault.UPnPError: + pass + state = n.get_state() # keep existing rt if there is one @@ -137,6 +140,17 @@ def shutdown(): raise ShutdownErr() +def make_node_id(i: int, n: int) -> str: + """ + split dht address space into N chunks and return the first id of the i'th chunk + make_node_id(0,n) returns 000...000 for any n + """ + if not 0 <= i < n: + raise ValueError("i must be between 0 (inclusive) and n (exclusive)") + bytes_in_id = 48 + return "{0:0{1}x}".format(i * ((2**8)**bytes_in_id // n), bytes_in_id*2) + + async def drain(n, q): print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}') while True: