From 53e683d307acb026ab3efba3a8a406bb1255f9e4 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 19 Jan 2021 16:04:04 -0500 Subject: [PATCH] 16-node tracker works --- lbry/dht/protocol/protocol.py | 4 +- scripts/tracker.py | 121 ++++++++++++++++++---------------- 2 files changed, 68 insertions(+), 57 deletions(-) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 4f5f94e00..d71ebc258 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -429,8 +429,10 @@ class KademliaProtocol(DatagramProtocol): log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(), sender_contact.address, sender_contact.udp_port) - if not self.event_queue.full(): + try: self.event_queue.put_nowait((sender_contact.node_id, sender_contact.address, method, args)) + except asyncio.QueueFull: + pass if method == b'ping': result = self.node_rpc.ping() diff --git a/scripts/tracker.py b/scripts/tracker.py index 3df2da8d6..457af42ed 100644 --- a/scripts/tracker.py +++ b/scripts/tracker.py @@ -18,7 +18,7 @@ log.setLevel(logging.INFO) async def main(): data_dir = "/home/grin/code/lbry/sdk" - state_file = data_dir + '/nodestate' + state_dir = data_dir + '/nodestate/' loop = asyncio.get_event_loop() try: @@ -29,84 +29,84 @@ async def main(): peer_manager = peer.PeerManager(loop) u = await upnp.UPnP.discover() - await u.get_next_mapping(4444, "UDP", "lbry dht tracker", 4444) - my_node_id = "38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95b" - n = node.Node(loop, peer_manager, node_id=bytes.fromhex(my_node_id), external_ip=(await u.get_external_ip()), - udp_port=4444, internal_udp_port=4444, peer_port=4444) db = sqlite3.connect(data_dir + "/tracker.sqlite3") db.execute( - '''CREATE TABLE IF NOT EXISTS log (hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)''' + 'CREATE TABLE IF NOT EXISTS log (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)' ) # curr = db.cursor() # res = curr.execute("SELECT 1, 2, 3") # for items in res: # print(items) + num_nodes = 16 + start_port = 4444 + known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)] + external_ip = await u.get_external_ip() + + nodes = [] + try: - known_node_urls=[("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)] - persisted_peers =[] - if path.exists(state_file): - with open(state_file, 'rb') as f: - state = pickle.load(f) - # pprint(state.routing_table_peers) - # pprint(state.datastore) - print(f'loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') - n.load_state(state) - persisted_peers = state.routing_table_peers + for i in range(num_nodes): + assert i < 16 # my ghetto int -> node_id converter requires this + node_id = '0123456789abcdef'[i] + '0' * 95 + # pprint(node_id) + port = start_port + i + await u.get_next_mapping(port, "UDP", "lbry dht tracker", port) + n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip, + udp_port=port, internal_udp_port=port, peer_port=3333) - n.start("0.0.0.0", known_node_urls, persisted_peers) - await n.started_listening.wait() + persisted_peers =[] + if path.exists(state_dir + node_id): + with open(state_dir + node_id, 'rb') as f: + state = pickle.load(f) + # pprint(state.routing_table_peers) + # pprint(state.datastore) + print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') + n.load_state(state) + persisted_peers = state.routing_table_peers + + n.start("0.0.0.0", known_node_urls, persisted_peers) + nodes.append(n) + + await asyncio.gather(*map(lambda n: n.started_listening.wait(), nodes), loop=loop) print("joined") - # jack = peer.make_kademlia_peer( - # bytes.fromhex("38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95c"), - # "216.19.244.226", udp_port=4444, - # ) - # print(await n.protocol.get_rpc_peer(jack).ping()) - await dostuff(n, db) - finally: - print("shutting down") - n.stop() - state = n.get_state() - with open(state_file, 'wb') as f: - # pprint(state.routing_table_peers) - # pprint(state.datastore) - print(f'saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') - pickle.dump(state, f) - db.close() - await u.delete_port_mapping(4444, "UDP") + queue = asyncio.Queue(maxsize=100*num_nodes) + for n in nodes: + asyncio.create_task(drain(n, queue)) + while True: + (n, node_id, ip, method, args) = await queue.get() + local_node_id = bytes.hex(n.protocol.node_id) + if method != b'store': + # print(f"{local_node_id[:8]}: {method} from {bytes.hex(node_id)} ({ip})") + continue -async def dostuff(n, db): - # gather - # as_completed - # wait - # wait_for - - # make a task to loop over the things in the node. those tasks drain into one combined queue - # t = asyncio.create_task for each node - # keep the t - # handle teardown at the end - # - - while True: - (node_id, ip, method, args) = await n.protocol.event_queue.get() - if method == b'store': blob_hash, token, port, original_publisher_id, age = args[:5] - print(f"STORE from {bytes.hex(node_id)} ({ip}) for blob {bytes.hex(blob_hash)}") + print(f"STORE to {local_node_id[:8]} from {bytes.hex(node_id)[:8]} ({ip}) for blob {bytes.hex(blob_hash)[:8]}") try: cur = db.cursor() - cur.execute('INSERT INTO log (hash, node_id, ip, port, timestamp) VALUES (?,?,?,?,?)', - (bytes.hex(blob_hash), bytes.hex(node_id), ip, port, int(time.time()))) + cur.execute('INSERT INTO log (local_id, hash, node_id, ip, port, timestamp) VALUES (?,?,?,?,?,?)', + (local_node_id, bytes.hex(blob_hash), bytes.hex(node_id), ip, port, int(time.time()))) db.commit() cur.close() except sqlite3.Error as err: print("failed insert", err) - else: - pass - # print(f"{method} from {bytes.hex(node_id)} ({ip})") + finally: + print("shutting down") + for n in nodes: + node_id = bytes.hex(n.protocol.node_id) + n.stop() + state = n.get_state() + with open(state_dir + node_id, 'wb') as f: + # pprint(state.routing_table_peers) + # pprint(state.datastore) + print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') + pickle.dump(state, f) + db.close() + await u.delete_port_mapping(n.protocol.udp_port, "UDP") class ShutdownErr(BaseException): @@ -118,6 +118,15 @@ def shutdown(): raise ShutdownErr() +async def drain(n, q): + print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}') + while True: + (node_id, ip, method, args) = await n.protocol.event_queue.get() + try: + q.put_nowait((n, node_id, ip, method, args)) + except asyncio.QueueFull: + pass + if __name__ == "__main__": try: asyncio.run(main())