prune stale peers when loading node state

This commit is contained in:
Alex Grintsvayg 2021-01-25 12:39:26 -05:00
parent 2d898f802d
commit 9ed985d6ec
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
4 changed files with 52 additions and 46 deletions

View file

@ -3,6 +3,7 @@ import asyncio
import typing import typing
import binascii import binascii
import socket import socket
import time
from lbry.utils import resolve_host from lbry.utils import resolve_host
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.peer import make_kademlia_peer from lbry.dht.peer import make_kademlia_peer
@ -20,10 +21,10 @@ log = logging.getLogger(__name__)
class NodeState: class NodeState:
def __init__(self, def __init__(self,
routing_table_peers: typing.List[typing.Tuple[bytes, str, int, int]], routing_table_peers: typing.List[typing.Tuple[bytes, str, int, int]],
datastore: typing.List[typing.Tuple[bytes, str, int, int, bytes]]): datastore: typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]):
# List[Tuple[node_id, address, udp_port, tcp_port]] # List[Tuple[node_id, address, udp_port, tcp_port]]
self.routing_table_peers = routing_table_peers self.routing_table_peers = routing_table_peers
# List[Tuple[node_id, address, udp_port, tcp_port, blob_hash]] # List[Tuple[node_id, address, udp_port, tcp_port, blob_hash, added_at]]
self.datastore = datastore self.datastore = datastore
@ -138,7 +139,10 @@ class Node:
) )
def load_state(self, state: NodeState): def load_state(self, state: NodeState):
for node_id, address, udp_port, tcp_port, blob_hash in state.datastore: now = self.loop.time()
for node_id, address, udp_port, tcp_port, blob_hash, added_at in state.datastore:
if added_at + constants.DATA_EXPIRATION < now:
continue
p = make_kademlia_peer(node_id, address, udp_port, tcp_port) p = make_kademlia_peer(node_id, address, udp_port, tcp_port)
self.protocol.data_store.add_peer_to_blob(p, blob_hash) self.protocol.data_store.add_peer_to_blob(p, blob_hash)

View file

@ -69,8 +69,8 @@ class DictDataStore:
peers.update(set(map(lambda tup: tup[0], stored))) peers.update(set(map(lambda tup: tup[0], stored)))
return list(peers) return list(peers)
def dump(self) -> typing.List[typing.Tuple[bytes, str, int, int, bytes]]: def dump(self) -> typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]:
data = [] data = []
for k, peers in self._data_store.items(): for k, peers in self._data_store.items():
data.extend([(p.node_id, p.address, p.udp_port, p.tcp_port, k) for p in map(lambda t: t[0], peers)]) data.extend([(p.node_id, p.address, p.udp_port, p.tcp_port, k, added_at) for (p, added_at) in peers])
return data return data

View file

@ -286,7 +286,7 @@ class TreeRoutingTable:
to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0] to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
if not to_pop: if not to_pop:
return return
log.info("join buckets %i", len(to_pop)) log.info("%s: join buckets %i", bytes.hex(self._parent_node_id)[:8], len(to_pop))
bucket_index_to_pop = to_pop[0] bucket_index_to_pop = to_pop[0]
assert len(self.buckets[bucket_index_to_pop]) == 0 assert len(self.buckets[bucket_index_to_pop]) == 0
can_go_lower = bucket_index_to_pop - 1 >= 0 can_go_lower = bucket_index_to_pop - 1 >= 0

82
scripts/tracker.py Normal file → Executable file
View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
import asyncio import asyncio
import logging import logging
import signal import signal
@ -29,61 +31,25 @@ async def main():
except NotImplementedError: except NotImplementedError:
pass # Not implemented on Windows pass # Not implemented on Windows
peer_manager = peer.PeerManager(loop)
u = await upnp.UPnP.discover()
db = sqlite3.connect(data_dir + "/tracker.sqlite3") db = sqlite3.connect(data_dir + "/tracker.sqlite3")
db.execute('CREATE TABLE IF NOT EXISTS announce (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)') db.execute('CREATE TABLE IF NOT EXISTS announce (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)')
db.execute('CREATE UNIQUE INDEX IF NOT EXISTS node_id_hash_idx ON announce (node_id, hash)') db.execute('CREATE UNIQUE INDEX IF NOT EXISTS node_id_hash_idx ON announce (node_id, hash)')
# curr = db.cursor()
# res = curr.execute("SELECT 1, 2, 3")
# for items in res:
# print(items)
asyncio.create_task(run_web_api(loop, db)) asyncio.create_task(run_web_api(loop, db))
num_nodes = 128 num_nodes = 128
start_port = 4444 u = await upnp.UPnP.discover()
known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)]
external_ip = await u.get_external_ip() external_ip = await u.get_external_ip()
nodes = []
try: try:
for i in range(num_nodes): nodes = await start_nodes(loop, num_nodes, external_ip, state_dir)
node_id = make_node_id(i, num_nodes)
# pprint(node_id)
port = start_port + i
# await u.get_next_mapping(port, "UDP", "lbry dht tracker")
# SOMETHING ABOUT THIS DOESNT WORK
# port = await u.get_next_mapping(start_port, "UDP", "lbry dht tracker")
n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip,
udp_port=port, internal_udp_port=port, peer_port=3333)
persisted_peers = []
if path.exists(state_dir + node_id):
with open(state_dir + node_id, 'rb') as f:
state = pickle.load(f)
# pprint(state.routing_table_peers)
# pprint(state.datastore)
print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
n.load_state(state)
persisted_peers = state.routing_table_peers
if len(persisted_peers) == 0 and len(state.datastore) > 0:
persisted_peers.extend(map(lambda x: (x[0], x[1], x[2], x[3]), state.datastore))
print(f'{node_id[:8]}: rt is empty but we recovered {len(persisted_peers)} peers from the datastore')
n.start("0.0.0.0", known_node_urls, persisted_peers)
nodes.append(n)
await asyncio.gather(*map(lambda n: n.started_listening.wait(), nodes), loop=loop) await asyncio.gather(*map(lambda n: n.started_listening.wait(), nodes), loop=loop)
print("joined") print("joined")
queue = asyncio.Queue(maxsize=100*num_nodes) queue = asyncio.Queue(maxsize=100*num_nodes)
for n in nodes: for n in nodes:
asyncio.create_task(drain(n, queue)) asyncio.create_task(drain_events(n, queue))
while True: while True:
(n, node_id, ip, method, args) = await queue.get() (n, node_id, ip, method, args) = await queue.get()
@ -156,7 +122,43 @@ def make_node_id(i: int, n: int) -> str:
return "{0:0{1}x}".format(i * ((2**8)**bytes_in_id // n), bytes_in_id*2) return "{0:0{1}x}".format(i * ((2**8)**bytes_in_id // n), bytes_in_id*2)
async def drain(n, q): async def start_nodes(loop, num_nodes, external_ip, state_dir):
start_port = 4445
known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)]
peer_manager = peer.PeerManager(loop)
nodes = []
for i in range(num_nodes):
node_id = make_node_id(i, num_nodes)
# pprint(node_id)
port = start_port + i
# await u.get_next_mapping(port, "UDP", "lbry dht tracker") # not necessary, i just opened ports in router
n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip,
udp_port=port, internal_udp_port=port, peer_port=3333)
persisted_peers = []
if path.exists(state_dir + node_id):
with open(state_dir + node_id, 'rb') as f:
state = pickle.load(f)
# pprint(state.routing_table_peers)
# pprint(state.datastore)
print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
n.load_state(state)
persisted_peers = state.routing_table_peers
# if len(persisted_peers) == 0 and len(state.datastore) > 0:
# peers_to_import = map(lambda p: (p[0], p[1], p[2], p[3]), n.get_state().datastore)
# persisted_peers.extend(peers_to_import)
# print(f'{node_id[:8]}: rt is empty but we recovered {len(state.datastore)} '
# f'peers from the datastore. {len(peers_to_import)} of those were recent enough to import')
n.start("0.0.0.0", known_node_urls, persisted_peers)
nodes.append(n)
return nodes
async def drain_events(n, q):
print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}') print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}')
while True: while True:
(node_id, ip, method, args) = await n.protocol.event_queue.get() (node_id, ip, method, args) = await n.protocol.event_queue.get()