2021-09-27 03:26:34 -03:00
|
|
|
import asyncio
|
|
|
|
import argparse
|
|
|
|
import logging
|
2021-11-17 03:58:27 -03:00
|
|
|
import csv
|
|
|
|
from io import StringIO
|
2021-09-28 18:52:23 -03:00
|
|
|
from typing import Optional
|
2021-10-22 03:39:46 -03:00
|
|
|
from aiohttp import web
|
|
|
|
from prometheus_client import generate_latest as prom_generate_latest, Gauge
|
2021-09-27 03:26:34 -03:00
|
|
|
|
|
|
|
from lbry.dht.constants import generate_id
|
|
|
|
from lbry.dht.node import Node
|
|
|
|
from lbry.dht.peer import PeerManager
|
|
|
|
from lbry.extras.daemon.storage import SQLiteStorage
|
|
|
|
from lbry.conf import Config
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
|
|
|
log = logging.getLogger(__name__)
|
2021-10-22 03:39:46 -03:00
|
|
|
BLOBS_STORED = Gauge(
|
|
|
|
"blobs_stored", "Number of blob info received", namespace="dht_node",
|
|
|
|
labelnames=("method",)
|
|
|
|
)
|
|
|
|
PEERS = Gauge(
|
|
|
|
"known_peers", "Number of peers on routing table", namespace="dht_node",
|
|
|
|
labelnames=("method",)
|
|
|
|
)
|
2021-09-27 03:26:34 -03:00
|
|
|
|
|
|
|
|
2021-10-22 03:39:46 -03:00
|
|
|
class SimpleMetrics:
|
2021-11-17 03:58:27 -03:00
|
|
|
def __init__(self, port, node):
|
2021-10-22 03:39:46 -03:00
|
|
|
self.prometheus_port = port
|
2021-11-17 03:58:27 -03:00
|
|
|
self.dht_node: Node = node
|
2021-10-22 03:39:46 -03:00
|
|
|
|
|
|
|
async def handle_metrics_get_request(self, request: web.Request):
|
|
|
|
try:
|
|
|
|
return web.Response(
|
|
|
|
text=prom_generate_latest().decode(),
|
|
|
|
content_type='text/plain; version=0.0.4'
|
|
|
|
)
|
|
|
|
except Exception:
|
|
|
|
log.exception('could not generate prometheus data')
|
|
|
|
raise
|
|
|
|
|
2021-11-17 03:58:27 -03:00
|
|
|
async def handle_peers_csv(self, request: web.Request):
|
|
|
|
out = StringIO()
|
|
|
|
writer = csv.DictWriter(out, fieldnames=["ip", "port", "dht_id"])
|
|
|
|
writer.writeheader()
|
|
|
|
for peer in self.dht_node.protocol.routing_table.get_peers():
|
|
|
|
writer.writerow({"ip": peer.address, "port": peer.udp_port, "dht_id": peer.node_id.hex()})
|
2021-11-17 04:04:38 -03:00
|
|
|
return web.Response(text=out.getvalue(), content_type='text/csv')
|
2021-11-17 03:58:27 -03:00
|
|
|
|
2021-11-17 04:04:38 -03:00
|
|
|
async def handle_blobs_csv(self, request: web.Request):
|
|
|
|
out = StringIO()
|
|
|
|
writer = csv.DictWriter(out, fieldnames=["blob_hash"])
|
|
|
|
writer.writeheader()
|
|
|
|
for blob in self.dht_node.protocol.data_store.keys():
|
|
|
|
writer.writerow({"blob_hash": blob.hex()})
|
|
|
|
return web.Response(text=out.getvalue(), content_type='text/csv')
|
2021-11-17 03:58:27 -03:00
|
|
|
|
2021-11-28 21:58:15 -03:00
|
|
|
async def estimate_peers(self, request: web.Request):
|
|
|
|
amount = 2000
|
|
|
|
peers = await self.dht_node.peer_search(self.dht_node.protocol.node_id, count=amount, max_results=amount)
|
|
|
|
close_ids = [peer for peer in peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]]
|
|
|
|
print(self.dht_node.protocol.node_id.hex())
|
|
|
|
print([cid.node_id.hex() for cid in close_ids])
|
|
|
|
return web.json_response({"total": len(peers), "close": len(close_ids)})
|
|
|
|
|
2021-12-03 10:56:22 -05:00
|
|
|
async def peers_in_routing_table(self, request: web.Request):
|
|
|
|
total_peers = self.dht_node.protocol.routing_table.get_peers()
|
|
|
|
close_ids = [peer for peer in total_peers if peer.node_id[0] == self.dht_node.protocol.node_id[0]]
|
|
|
|
return web.json_response({"total": len(total_peers), "close": len(close_ids), 'estimated_network_size': len(close_ids) * 256})
|
|
|
|
|
2021-10-22 03:39:46 -03:00
|
|
|
async def start(self):
|
|
|
|
prom_app = web.Application()
|
|
|
|
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
2021-11-17 03:58:27 -03:00
|
|
|
prom_app.router.add_get('/peers.csv', self.handle_peers_csv)
|
2021-11-17 04:04:38 -03:00
|
|
|
prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv)
|
2021-11-28 21:58:15 -03:00
|
|
|
prom_app.router.add_get('/estimate', self.estimate_peers)
|
2021-12-03 10:56:22 -05:00
|
|
|
prom_app.router.add_get('/count', self.peers_in_routing_table)
|
2021-10-22 03:39:46 -03:00
|
|
|
metrics_runner = web.AppRunner(prom_app)
|
|
|
|
await metrics_runner.setup()
|
|
|
|
prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port)
|
|
|
|
await prom_site.start()
|
|
|
|
|
|
|
|
|
|
|
|
async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int):
|
2021-09-27 03:26:34 -03:00
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
conf = Config()
|
2021-09-28 03:58:31 -03:00
|
|
|
storage = SQLiteStorage(conf, db_file_path, loop, loop.time)
|
2021-09-28 18:52:23 -03:00
|
|
|
if bootstrap_node:
|
|
|
|
nodes = bootstrap_node.split(':')
|
|
|
|
nodes = [(nodes[0], int(nodes[1]))]
|
|
|
|
else:
|
|
|
|
nodes = conf.known_dht_nodes
|
2021-09-27 03:26:34 -03:00
|
|
|
await storage.open()
|
|
|
|
node = Node(
|
|
|
|
loop, PeerManager(loop), generate_id(), port, port, 3333, None,
|
|
|
|
storage=storage
|
|
|
|
)
|
2021-11-17 03:58:27 -03:00
|
|
|
if prometheus_port > 0:
|
|
|
|
metrics = SimpleMetrics(prometheus_port, node)
|
|
|
|
await metrics.start()
|
2021-09-28 18:52:23 -03:00
|
|
|
node.start(host, nodes)
|
2021-09-27 03:26:34 -03:00
|
|
|
while True:
|
|
|
|
await asyncio.sleep(10)
|
2021-10-22 03:39:46 -03:00
|
|
|
PEERS.labels('main').set(len(node.protocol.routing_table.get_peers()))
|
|
|
|
BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts()))
|
2021-09-27 03:26:34 -03:00
|
|
|
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
|
|
|
|
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
|
|
|
|
len(node.protocol.data_store.get_storing_contacts()))
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
description="Starts a single DHT node, which then can be used as a seed node or just a contributing node.")
|
2021-09-27 13:33:10 -03:00
|
|
|
parser.add_argument("--host", default='0.0.0.0', type=str, help="Host to listen for requests. Default: 0.0.0.0")
|
|
|
|
parser.add_argument("--port", default=4444, type=int, help="Port to listen for requests. Default: 4444")
|
2021-09-28 03:58:31 -03:00
|
|
|
parser.add_argument("--db_file", default='/tmp/dht.db', type=str, help="DB file to save peers. Default: /tmp/dht.db")
|
2021-09-28 18:52:23 -03:00
|
|
|
parser.add_argument("--bootstrap_node", default=None, type=str,
|
|
|
|
help="Node to connect for bootstraping this node. Leave unset to use the default ones. "
|
|
|
|
"Format: host:port Example: lbrynet1.lbry.com:4444")
|
2021-11-17 04:04:38 -03:00
|
|
|
parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus and raw CSV metrics. 0 to disable. Default: 0")
|
2021-09-27 03:26:34 -03:00
|
|
|
args = parser.parse_args()
|
2021-11-24 02:47:11 -03:00
|
|
|
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port))
|