From 9ed985d6ec5dcb6ea0c0c858e0c5326fb316f73b Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 25 Jan 2021 12:39:26 -0500 Subject: [PATCH] prune stale peers when loading node state --- lbry/dht/node.py | 10 ++-- lbry/dht/protocol/data_store.py | 4 +- lbry/dht/protocol/routing_table.py | 2 +- scripts/tracker.py | 82 +++++++++++++++--------------- 4 files changed, 52 insertions(+), 46 deletions(-) mode change 100644 => 100755 scripts/tracker.py diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 6dd4d1a93..3bdef15ab 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -3,6 +3,7 @@ import asyncio import typing import binascii import socket +import time from lbry.utils import resolve_host from lbry.dht import constants from lbry.dht.peer import make_kademlia_peer @@ -20,10 +21,10 @@ log = logging.getLogger(__name__) class NodeState: def __init__(self, routing_table_peers: typing.List[typing.Tuple[bytes, str, int, int]], - datastore: typing.List[typing.Tuple[bytes, str, int, int, bytes]]): + datastore: typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]): # List[Tuple[node_id, address, udp_port, tcp_port]] self.routing_table_peers = routing_table_peers - # List[Tuple[node_id, address, udp_port, tcp_port, blob_hash]] + # List[Tuple[node_id, address, udp_port, tcp_port, blob_hash, added_at]] self.datastore = datastore @@ -138,7 +139,10 @@ class Node: ) def load_state(self, state: NodeState): - for node_id, address, udp_port, tcp_port, blob_hash in state.datastore: + now = self.loop.time() + for node_id, address, udp_port, tcp_port, blob_hash, added_at in state.datastore: + if added_at + constants.DATA_EXPIRATION < now: + continue p = make_kademlia_peer(node_id, address, udp_port, tcp_port) self.protocol.data_store.add_peer_to_blob(p, blob_hash) diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index 62673f547..000883206 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -69,8 +69,8 @@ class DictDataStore: peers.update(set(map(lambda tup: tup[0], stored))) return list(peers) - def dump(self) -> typing.List[typing.Tuple[bytes, str, int, int, bytes]]: + def dump(self) -> typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]: data = [] for k, peers in self._data_store.items(): - data.extend([(p.node_id, p.address, p.udp_port, p.tcp_port, k) for p in map(lambda t: t[0], peers)]) + data.extend([(p.node_id, p.address, p.udp_port, p.tcp_port, k, added_at) for (p, added_at) in peers]) return data diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 1cf18d778..815ea7b98 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -286,7 +286,7 @@ class TreeRoutingTable: to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0] if not to_pop: return - log.info("join buckets %i", len(to_pop)) + log.info("%s: join buckets %i", bytes.hex(self._parent_node_id)[:8], len(to_pop)) bucket_index_to_pop = to_pop[0] assert len(self.buckets[bucket_index_to_pop]) == 0 can_go_lower = bucket_index_to_pop - 1 >= 0 diff --git a/scripts/tracker.py b/scripts/tracker.py old mode 100644 new mode 100755 index 35d19d87b..4f77582da --- a/scripts/tracker.py +++ b/scripts/tracker.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + import asyncio import logging import signal @@ -29,61 +31,25 @@ async def main(): except NotImplementedError: pass # Not implemented on Windows - peer_manager = peer.PeerManager(loop) - u = await upnp.UPnP.discover() - db = sqlite3.connect(data_dir + "/tracker.sqlite3") db.execute('CREATE TABLE IF NOT EXISTS announce (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)') db.execute('CREATE UNIQUE INDEX IF NOT EXISTS node_id_hash_idx ON announce (node_id, hash)') - # curr = db.cursor() - # res = curr.execute("SELECT 1, 2, 3") - # for items in res: - # print(items) - asyncio.create_task(run_web_api(loop, db)) num_nodes = 128 - start_port = 4444 - known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)] + u = await upnp.UPnP.discover() external_ip = await u.get_external_ip() - nodes = [] - try: - for i in range(num_nodes): - node_id = make_node_id(i, num_nodes) - # pprint(node_id) - - port = start_port + i - # await u.get_next_mapping(port, "UDP", "lbry dht tracker") - # SOMETHING ABOUT THIS DOESNT WORK - # port = await u.get_next_mapping(start_port, "UDP", "lbry dht tracker") - - n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip, - udp_port=port, internal_udp_port=port, peer_port=3333) - - persisted_peers = [] - if path.exists(state_dir + node_id): - with open(state_dir + node_id, 'rb') as f: - state = pickle.load(f) - # pprint(state.routing_table_peers) - # pprint(state.datastore) - print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') - n.load_state(state) - persisted_peers = state.routing_table_peers - if len(persisted_peers) == 0 and len(state.datastore) > 0: - persisted_peers.extend(map(lambda x: (x[0], x[1], x[2], x[3]), state.datastore)) - print(f'{node_id[:8]}: rt is empty but we recovered {len(persisted_peers)} peers from the datastore') - n.start("0.0.0.0", known_node_urls, persisted_peers) - nodes.append(n) + nodes = await start_nodes(loop, num_nodes, external_ip, state_dir) await asyncio.gather(*map(lambda n: n.started_listening.wait(), nodes), loop=loop) print("joined") queue = asyncio.Queue(maxsize=100*num_nodes) for n in nodes: - asyncio.create_task(drain(n, queue)) + asyncio.create_task(drain_events(n, queue)) while True: (n, node_id, ip, method, args) = await queue.get() @@ -156,7 +122,43 @@ def make_node_id(i: int, n: int) -> str: return "{0:0{1}x}".format(i * ((2**8)**bytes_in_id // n), bytes_in_id*2) -async def drain(n, q): +async def start_nodes(loop, num_nodes, external_ip, state_dir): + start_port = 4445 + known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)] + peer_manager = peer.PeerManager(loop) + + nodes = [] + for i in range(num_nodes): + node_id = make_node_id(i, num_nodes) + # pprint(node_id) + + port = start_port + i + # await u.get_next_mapping(port, "UDP", "lbry dht tracker") # not necessary, i just opened ports in router + + n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip, + udp_port=port, internal_udp_port=port, peer_port=3333) + + persisted_peers = [] + if path.exists(state_dir + node_id): + with open(state_dir + node_id, 'rb') as f: + state = pickle.load(f) + # pprint(state.routing_table_peers) + # pprint(state.datastore) + print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') + n.load_state(state) + persisted_peers = state.routing_table_peers + # if len(persisted_peers) == 0 and len(state.datastore) > 0: + # peers_to_import = map(lambda p: (p[0], p[1], p[2], p[3]), n.get_state().datastore) + # persisted_peers.extend(peers_to_import) + # print(f'{node_id[:8]}: rt is empty but we recovered {len(state.datastore)} ' + # f'peers from the datastore. {len(peers_to_import)} of those were recent enough to import') + + n.start("0.0.0.0", known_node_urls, persisted_peers) + nodes.append(n) + return nodes + + +async def drain_events(n, q): print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}') while True: (node_id, ip, method, args) = await n.protocol.event_queue.get()