forked from LBRYCommunity/lbry-sdk
16-node tracker works
This commit is contained in:
parent
411698f306
commit
53e683d307
2 changed files with 68 additions and 57 deletions
|
@ -429,8 +429,10 @@ class KademliaProtocol(DatagramProtocol):
|
|||
log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
|
||||
sender_contact.address, sender_contact.udp_port)
|
||||
|
||||
if not self.event_queue.full():
|
||||
try:
|
||||
self.event_queue.put_nowait((sender_contact.node_id, sender_contact.address, method, args))
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
|
||||
if method == b'ping':
|
||||
result = self.node_rpc.ping()
|
||||
|
|
|
@ -18,7 +18,7 @@ log.setLevel(logging.INFO)
|
|||
|
||||
async def main():
|
||||
data_dir = "/home/grin/code/lbry/sdk"
|
||||
state_file = data_dir + '/nodestate'
|
||||
state_dir = data_dir + '/nodestate/'
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
try:
|
||||
|
@ -29,84 +29,84 @@ async def main():
|
|||
|
||||
peer_manager = peer.PeerManager(loop)
|
||||
u = await upnp.UPnP.discover()
|
||||
await u.get_next_mapping(4444, "UDP", "lbry dht tracker", 4444)
|
||||
my_node_id = "38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95b"
|
||||
n = node.Node(loop, peer_manager, node_id=bytes.fromhex(my_node_id), external_ip=(await u.get_external_ip()),
|
||||
udp_port=4444, internal_udp_port=4444, peer_port=4444)
|
||||
|
||||
db = sqlite3.connect(data_dir + "/tracker.sqlite3")
|
||||
db.execute(
|
||||
'''CREATE TABLE IF NOT EXISTS log (hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)'''
|
||||
'CREATE TABLE IF NOT EXISTS log (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)'
|
||||
)
|
||||
# curr = db.cursor()
|
||||
# res = curr.execute("SELECT 1, 2, 3")
|
||||
# for items in res:
|
||||
# print(items)
|
||||
|
||||
num_nodes = 16
|
||||
start_port = 4444
|
||||
known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)]
|
||||
external_ip = await u.get_external_ip()
|
||||
|
||||
nodes = []
|
||||
|
||||
try:
|
||||
known_node_urls=[("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)]
|
||||
persisted_peers =[]
|
||||
if path.exists(state_file):
|
||||
with open(state_file, 'rb') as f:
|
||||
state = pickle.load(f)
|
||||
# pprint(state.routing_table_peers)
|
||||
# pprint(state.datastore)
|
||||
print(f'loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
|
||||
n.load_state(state)
|
||||
persisted_peers = state.routing_table_peers
|
||||
for i in range(num_nodes):
|
||||
assert i < 16 # my ghetto int -> node_id converter requires this
|
||||
node_id = '0123456789abcdef'[i] + '0' * 95
|
||||
# pprint(node_id)
|
||||
port = start_port + i
|
||||
await u.get_next_mapping(port, "UDP", "lbry dht tracker", port)
|
||||
n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip,
|
||||
udp_port=port, internal_udp_port=port, peer_port=3333)
|
||||
|
||||
n.start("0.0.0.0", known_node_urls, persisted_peers)
|
||||
await n.started_listening.wait()
|
||||
persisted_peers =[]
|
||||
if path.exists(state_dir + node_id):
|
||||
with open(state_dir + node_id, 'rb') as f:
|
||||
state = pickle.load(f)
|
||||
# pprint(state.routing_table_peers)
|
||||
# pprint(state.datastore)
|
||||
print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
|
||||
n.load_state(state)
|
||||
persisted_peers = state.routing_table_peers
|
||||
|
||||
n.start("0.0.0.0", known_node_urls, persisted_peers)
|
||||
nodes.append(n)
|
||||
|
||||
await asyncio.gather(*map(lambda n: n.started_listening.wait(), nodes), loop=loop)
|
||||
print("joined")
|
||||
# jack = peer.make_kademlia_peer(
|
||||
# bytes.fromhex("38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95c"),
|
||||
# "216.19.244.226", udp_port=4444,
|
||||
# )
|
||||
# print(await n.protocol.get_rpc_peer(jack).ping())
|
||||
|
||||
await dostuff(n, db)
|
||||
finally:
|
||||
print("shutting down")
|
||||
n.stop()
|
||||
state = n.get_state()
|
||||
with open(state_file, 'wb') as f:
|
||||
# pprint(state.routing_table_peers)
|
||||
# pprint(state.datastore)
|
||||
print(f'saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
|
||||
pickle.dump(state, f)
|
||||
db.close()
|
||||
await u.delete_port_mapping(4444, "UDP")
|
||||
queue = asyncio.Queue(maxsize=100*num_nodes)
|
||||
for n in nodes:
|
||||
asyncio.create_task(drain(n, queue))
|
||||
|
||||
while True:
|
||||
(n, node_id, ip, method, args) = await queue.get()
|
||||
local_node_id = bytes.hex(n.protocol.node_id)
|
||||
if method != b'store':
|
||||
# print(f"{local_node_id[:8]}: {method} from {bytes.hex(node_id)} ({ip})")
|
||||
continue
|
||||
|
||||
async def dostuff(n, db):
|
||||
# gather
|
||||
# as_completed
|
||||
# wait
|
||||
# wait_for
|
||||
|
||||
# make a task to loop over the things in the node. those tasks drain into one combined queue
|
||||
# t = asyncio.create_task for each node
|
||||
# keep the t
|
||||
# handle teardown at the end
|
||||
#
|
||||
|
||||
while True:
|
||||
(node_id, ip, method, args) = await n.protocol.event_queue.get()
|
||||
if method == b'store':
|
||||
blob_hash, token, port, original_publisher_id, age = args[:5]
|
||||
print(f"STORE from {bytes.hex(node_id)} ({ip}) for blob {bytes.hex(blob_hash)}")
|
||||
print(f"STORE to {local_node_id[:8]} from {bytes.hex(node_id)[:8]} ({ip}) for blob {bytes.hex(blob_hash)[:8]}")
|
||||
|
||||
try:
|
||||
cur = db.cursor()
|
||||
cur.execute('INSERT INTO log (hash, node_id, ip, port, timestamp) VALUES (?,?,?,?,?)',
|
||||
(bytes.hex(blob_hash), bytes.hex(node_id), ip, port, int(time.time())))
|
||||
cur.execute('INSERT INTO log (local_id, hash, node_id, ip, port, timestamp) VALUES (?,?,?,?,?,?)',
|
||||
(local_node_id, bytes.hex(blob_hash), bytes.hex(node_id), ip, port, int(time.time())))
|
||||
db.commit()
|
||||
cur.close()
|
||||
except sqlite3.Error as err:
|
||||
print("failed insert", err)
|
||||
else:
|
||||
pass
|
||||
# print(f"{method} from {bytes.hex(node_id)} ({ip})")
|
||||
finally:
|
||||
print("shutting down")
|
||||
for n in nodes:
|
||||
node_id = bytes.hex(n.protocol.node_id)
|
||||
n.stop()
|
||||
state = n.get_state()
|
||||
with open(state_dir + node_id, 'wb') as f:
|
||||
# pprint(state.routing_table_peers)
|
||||
# pprint(state.datastore)
|
||||
print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
|
||||
pickle.dump(state, f)
|
||||
db.close()
|
||||
await u.delete_port_mapping(n.protocol.udp_port, "UDP")
|
||||
|
||||
|
||||
class ShutdownErr(BaseException):
|
||||
|
@ -118,6 +118,15 @@ def shutdown():
|
|||
raise ShutdownErr()
|
||||
|
||||
|
||||
async def drain(n, q):
|
||||
print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}')
|
||||
while True:
|
||||
(node_id, ip, method, args) = await n.protocol.event_queue.get()
|
||||
try:
|
||||
q.put_nowait((n, node_id, ip, method, args))
|
||||
except asyncio.QueueFull:
|
||||
pass
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
|
|
Loading…
Reference in a new issue