api to check seeder count, better saving of routing table

This commit is contained in:
Alex Grintsvayg 2021-01-20 17:12:20 -05:00
parent 53e683d307
commit 366b0d590c
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5

View file

@ -7,6 +7,8 @@ import sqlite3
import pickle import pickle
from os import path from os import path
from pprint import pprint from pprint import pprint
from aiohttp import web
import json
from lbry.dht import node, peer from lbry.dht import node, peer
@ -39,6 +41,8 @@ async def main():
# for items in res: # for items in res:
# print(items) # print(items)
asyncio.create_task(run_web_api(loop, db))
num_nodes = 16 num_nodes = 16
start_port = 4444 start_port = 4444
known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)] known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)]
@ -48,15 +52,19 @@ async def main():
try: try:
for i in range(num_nodes): for i in range(num_nodes):
assert i < 16 # my ghetto int -> node_id converter requires this assert i < 16 # my ghetto int -> node_id converter requires this
node_id = '0123456789abcdef'[i] + '0' * 95 node_id = '0123456789abcdef'[i] + '0' * 95
# pprint(node_id) # pprint(node_id)
port = start_port + i port = start_port + i
await u.get_next_mapping(port, "UDP", "lbry dht tracker", port) await u.get_next_mapping(port, "UDP", "lbry dht tracker")
# SOMETHING ABOUT THIS DOESNT WORK
# port = await u.get_next_mapping(start_port, "UDP", "lbry dht tracker")
n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip, n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip,
udp_port=port, internal_udp_port=port, peer_port=3333) udp_port=port, internal_udp_port=port, peer_port=3333)
persisted_peers =[] persisted_peers = []
if path.exists(state_dir + node_id): if path.exists(state_dir + node_id):
with open(state_dir + node_id, 'rb') as f: with open(state_dir + node_id, 'rb') as f:
state = pickle.load(f) state = pickle.load(f)
@ -65,7 +73,9 @@ async def main():
print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
n.load_state(state) n.load_state(state)
persisted_peers = state.routing_table_peers persisted_peers = state.routing_table_peers
if len(persisted_peers) == 0 and len(state.datastore) > 0:
persisted_peers.extend(map(lambda x: (x[0], x[1], x[2], x[3]), state.datastore))
print(f'{node_id[:8]}: rt is empty but we recovered {len(persisted_peers)} peers from the datastore')
n.start("0.0.0.0", known_node_urls, persisted_peers) n.start("0.0.0.0", known_node_urls, persisted_peers)
nodes.append(n) nodes.append(n)
@ -99,14 +109,23 @@ async def main():
for n in nodes: for n in nodes:
node_id = bytes.hex(n.protocol.node_id) node_id = bytes.hex(n.protocol.node_id)
n.stop() n.stop()
print(f'deleting upnp port mapping {n.protocol.udp_port}')
await u.delete_port_mapping(n.protocol.udp_port, "UDP")
state = n.get_state() state = n.get_state()
# keep existing rt if there is one
if len(state.routing_table_peers) == 0 and path.exists(state_dir + node_id):
with open(state_dir + node_id, 'rb') as f:
existing_state = pickle.load(f)
if len(existing_state.routing_table_peers) > 0:
state.routing_table_peers = existing_state.routing_table_peers
print(f'rt empty on save, but old rt was recovered ({len(state.routing_table_peers)} peers)')
with open(state_dir + node_id, 'wb') as f: with open(state_dir + node_id, 'wb') as f:
# pprint(state.routing_table_peers) # pprint(state.routing_table_peers)
# pprint(state.datastore) # pprint(state.datastore)
print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
pickle.dump(state, f) pickle.dump(state, f)
db.close() db.close()
await u.delete_port_mapping(n.protocol.udp_port, "UDP")
class ShutdownErr(BaseException): class ShutdownErr(BaseException):
@ -127,6 +146,40 @@ async def drain(n, q):
except asyncio.QueueFull: except asyncio.QueueFull:
pass pass
async def run_web_api(loop, db):
app = web.Application(loop=loop)
app['db'] = db
app.add_routes([
web.get('/', api_handler),
web.get('/seeds/{hash}', seeds_handler),
])
# server = web.Server(api_handler, loop=loop)
# runner = web.ServerRunner(server)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
# web.run_app(app)
async def seeds_handler(request):
blobhash = request.match_info['hash']
db = request.app['db']
try:
cur = db.cursor()
c = cur.execute("""
select count(distinct(node_id)) from log where hash = ? and timestamp > strftime('%s','now','-1 day')
""", (blobhash,)).fetchone()[0]
cur.close()
return web.Response(text=json.dumps({'seeds': c})+"\n")
except Exception as err:
return web.Response(text=json.dumps({'error': err})+"\n")
async def api_handler(request):
return web.Response(text="tracker OK")
if __name__ == "__main__": if __name__ == "__main__":
try: try:
asyncio.run(main()) asyncio.run(main())