forked from LBRYCommunity/lbry-sdk
Merge pull request #3625 from lbryio/dht_crawler
Add script to collect DHT metrics
This commit is contained in:
commit
f7455600cc
6 changed files with 591 additions and 14 deletions
|
@ -2,6 +2,7 @@ FROM debian:10-slim
|
||||||
|
|
||||||
ARG user=lbry
|
ARG user=lbry
|
||||||
ARG projects_dir=/home/$user
|
ARG projects_dir=/home/$user
|
||||||
|
ARG db_dir=/database
|
||||||
|
|
||||||
ARG DOCKER_TAG
|
ARG DOCKER_TAG
|
||||||
ARG DOCKER_COMMIT=docker
|
ARG DOCKER_COMMIT=docker
|
||||||
|
@ -27,12 +28,16 @@ RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user
|
||||||
|
|
||||||
COPY . $projects_dir
|
COPY . $projects_dir
|
||||||
RUN chown -R $user:$user $projects_dir
|
RUN chown -R $user:$user $projects_dir
|
||||||
|
RUN mkdir -p $db_dir
|
||||||
|
RUN chown -R $user:$user $db_dir
|
||||||
|
|
||||||
USER $user
|
USER $user
|
||||||
WORKDIR $projects_dir
|
WORKDIR $projects_dir
|
||||||
|
|
||||||
|
RUN python3 -m pip install -U setuptools pip
|
||||||
RUN make install
|
RUN make install
|
||||||
RUN python3 docker/set_build.py
|
RUN python3 docker/set_build.py
|
||||||
RUN rm ~/.cache -rf
|
RUN rm ~/.cache -rf
|
||||||
|
VOLUME $db_dir
|
||||||
ENTRYPOINT ["python3", "scripts/dht_node.py"]
|
ENTRYPOINT ["python3", "scripts/dht_node.py"]
|
||||||
|
|
||||||
|
|
|
@ -100,6 +100,9 @@ class PeerManager:
|
||||||
self._node_id_reverse_mapping[node_id] = (address, udp_port)
|
self._node_id_reverse_mapping[node_id] = (address, udp_port)
|
||||||
self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys())
|
self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys())
|
||||||
|
|
||||||
|
def get_node_id_for_endpoint(self, address, port):
|
||||||
|
return self._node_id_mapping.get((address, port))
|
||||||
|
|
||||||
def prune(self): # TODO: periodically call this
|
def prune(self): # TODO: periodically call this
|
||||||
now = self._loop.time()
|
now = self._loop.time()
|
||||||
to_pop = []
|
to_pop = []
|
||||||
|
@ -150,9 +153,10 @@ class PeerManager:
|
||||||
def peer_is_good(self, peer: 'KademliaPeer'):
|
def peer_is_good(self, peer: 'KademliaPeer'):
|
||||||
return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port)
|
return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port)
|
||||||
|
|
||||||
def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use
|
|
||||||
node_id, address, tcp_port = decode_compact_address(compact_address)
|
def decode_tcp_peer_from_compact_address(compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use
|
||||||
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)
|
node_id, address, tcp_port = decode_compact_address(compact_address)
|
||||||
|
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(unsafe_hash=True)
|
@dataclass(unsafe_hash=True)
|
||||||
|
|
|
@ -8,7 +8,7 @@ from typing import TYPE_CHECKING
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.error import RemoteException, TransportNotConnected
|
from lbry.dht.error import RemoteException, TransportNotConnected
|
||||||
from lbry.dht.protocol.distance import Distance
|
from lbry.dht.protocol.distance import Distance
|
||||||
from lbry.dht.peer import make_kademlia_peer
|
from lbry.dht.peer import make_kademlia_peer, decode_tcp_peer_from_compact_address
|
||||||
from lbry.dht.serialization.datagram import PAGE_KEY
|
from lbry.dht.serialization.datagram import PAGE_KEY
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -26,6 +26,15 @@ class FindResponse:
|
||||||
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
|
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def get_close_kademlia_peers(self, peer_info) -> typing.Generator[typing.Iterator['KademliaPeer'], None, None]:
|
||||||
|
for contact_triple in self.get_close_triples():
|
||||||
|
node_id, address, udp_port = contact_triple
|
||||||
|
try:
|
||||||
|
yield make_kademlia_peer(node_id, address, udp_port)
|
||||||
|
except ValueError:
|
||||||
|
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer_info.address,
|
||||||
|
peer_info.udp_port, address, udp_port)
|
||||||
|
|
||||||
|
|
||||||
class FindNodeResponse(FindResponse):
|
class FindNodeResponse(FindResponse):
|
||||||
def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
|
def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
|
||||||
|
@ -125,13 +134,8 @@ class IterativeFinder(AsyncIterator):
|
||||||
|
|
||||||
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
||||||
self._add_active(peer)
|
self._add_active(peer)
|
||||||
for contact_triple in response.get_close_triples():
|
for new_peer in response.get_close_kademlia_peers(peer):
|
||||||
node_id, address, udp_port = contact_triple
|
self._add_active(new_peer)
|
||||||
try:
|
|
||||||
self._add_active(make_kademlia_peer(node_id, address, udp_port))
|
|
||||||
except ValueError:
|
|
||||||
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
|
|
||||||
peer.udp_port, address, udp_port)
|
|
||||||
self.check_result_ready(response)
|
self.check_result_ready(response)
|
||||||
self._log_state(reason="check result")
|
self._log_state(reason="check result")
|
||||||
|
|
||||||
|
@ -319,7 +323,7 @@ class IterativeValueFinder(IterativeFinder):
|
||||||
decoded_peers = set()
|
decoded_peers = set()
|
||||||
for compact_addr in parsed.found_compact_addresses:
|
for compact_addr in parsed.found_compact_addresses:
|
||||||
try:
|
try:
|
||||||
decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr))
|
decoded_peers.add(decode_tcp_peer_from_compact_address(compact_addr))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
log.warning("misbehaving peer %s:%i returned invalid peer for blob",
|
log.warning("misbehaving peer %s:%i returned invalid peer for blob",
|
||||||
peer.address, peer.udp_port)
|
peer.address, peer.udp_port)
|
||||||
|
@ -341,7 +345,7 @@ class IterativeValueFinder(IterativeFinder):
|
||||||
|
|
||||||
def check_result_ready(self, response: FindValueResponse):
|
def check_result_ready(self, response: FindValueResponse):
|
||||||
if response.found:
|
if response.found:
|
||||||
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
|
blob_peers = [decode_tcp_peer_from_compact_address(compact_addr)
|
||||||
for compact_addr in response.found_compact_addresses]
|
for compact_addr in response.found_compact_addresses]
|
||||||
to_yield = []
|
to_yield = []
|
||||||
for blob_peer in blob_peers:
|
for blob_peer in blob_peers:
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
from lbry.conf import Config
|
|
||||||
from lbry.extras.cli import execute_command
|
from lbry.extras.cli import execute_command
|
||||||
|
from lbry.conf import Config
|
||||||
|
|
||||||
|
|
||||||
def daemon_rpc(conf: Config, method: str, **kwargs):
|
def daemon_rpc(conf: Config, method: str, **kwargs):
|
||||||
|
|
520
scripts/dht_crawler.py
Normal file
520
scripts/dht_crawler.py
Normal file
|
@ -0,0 +1,520 @@
|
||||||
|
import sys
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
import os.path
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
import typing
|
||||||
|
from dataclasses import dataclass, astuple, replace
|
||||||
|
|
||||||
|
from aiohttp import web
|
||||||
|
from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter, Histogram
|
||||||
|
|
||||||
|
import lbry.dht.error
|
||||||
|
from lbry.dht.constants import generate_id
|
||||||
|
from lbry.dht.node import Node
|
||||||
|
from lbry.dht.peer import make_kademlia_peer, PeerManager, decode_tcp_peer_from_compact_address
|
||||||
|
from lbry.dht.protocol.distance import Distance
|
||||||
|
from lbry.dht.protocol.iterative_find import FindValueResponse, FindNodeResponse, FindResponse
|
||||||
|
from lbry.extras.daemon.storage import SQLiteMixin
|
||||||
|
from lbry.conf import Config
|
||||||
|
from lbry.utils import resolve_host
|
||||||
|
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SDHashSamples:
|
||||||
|
def __init__(self, samples_file_path):
|
||||||
|
with open(samples_file_path, "rb") as sample_file:
|
||||||
|
self._samples = sample_file.read()
|
||||||
|
assert len(self._samples) % 48 == 0
|
||||||
|
self.size = len(self._samples) // 48
|
||||||
|
|
||||||
|
def read_samples(self, count=1):
|
||||||
|
for _ in range(count):
|
||||||
|
offset = 48 * random.randrange(0, self.size)
|
||||||
|
yield self._samples[offset:offset + 48]
|
||||||
|
|
||||||
|
|
||||||
|
class PeerStorage(SQLiteMixin):
|
||||||
|
CREATE_TABLES_QUERY = """
|
||||||
|
PRAGMA JOURNAL_MODE=WAL;
|
||||||
|
CREATE TABLE IF NOT EXISTS peer (
|
||||||
|
peer_id INTEGER NOT NULL,
|
||||||
|
node_id VARCHAR(96),
|
||||||
|
address VARCHAR,
|
||||||
|
udp_port INTEGER,
|
||||||
|
tcp_port INTEGER,
|
||||||
|
first_online DATETIME,
|
||||||
|
errors INTEGER,
|
||||||
|
last_churn INTEGER,
|
||||||
|
added_on DATETIME NOT NULL,
|
||||||
|
last_check DATETIME,
|
||||||
|
last_seen DATETIME,
|
||||||
|
latency INTEGER,
|
||||||
|
PRIMARY KEY (peer_id)
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS connection (
|
||||||
|
from_peer_id INTEGER NOT NULL,
|
||||||
|
to_peer_id INTEGER NOT NULL,
|
||||||
|
PRIMARY KEY (from_peer_id, to_peer_id),
|
||||||
|
FOREIGN KEY(from_peer_id) REFERENCES peer (peer_id),
|
||||||
|
FOREIGN KEY(to_peer_id) REFERENCES peer (peer_id)
|
||||||
|
);
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def open(self):
|
||||||
|
await super().open()
|
||||||
|
self.db.writer_connection.row_factory = dict_row_factory
|
||||||
|
|
||||||
|
async def all_peers(self):
|
||||||
|
return [
|
||||||
|
DHTPeer(**peer) for peer in await self.db.execute_fetchall(
|
||||||
|
"select * from peer where latency > 0 or last_seen > datetime('now', '-1 hour')")
|
||||||
|
]
|
||||||
|
|
||||||
|
async def save_peers(self, *peers):
|
||||||
|
log.info("Saving graph nodes (peers) to DB")
|
||||||
|
await self.db.executemany(
|
||||||
|
"INSERT OR REPLACE INTO peer("
|
||||||
|
"node_id, address, udp_port, tcp_port, first_online, errors, last_churn,"
|
||||||
|
"added_on, last_check, last_seen, latency, peer_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
|
||||||
|
[astuple(peer) for peer in peers]
|
||||||
|
)
|
||||||
|
log.info("Finished saving graph nodes (peers) to DB")
|
||||||
|
|
||||||
|
async def save_connections(self, connections_map):
|
||||||
|
log.info("Saving graph edges (connections) to DB")
|
||||||
|
await self.db.executemany(
|
||||||
|
"DELETE FROM connection WHERE from_peer_id = ?", [(key,) for key in connections_map])
|
||||||
|
for from_peer_id in connections_map:
|
||||||
|
await self.db.executemany(
|
||||||
|
"INSERT INTO connection(from_peer_id, to_peer_id) VALUES(?,?)",
|
||||||
|
[(from_peer_id, to_peer_id) for to_peer_id in connections_map[from_peer_id]])
|
||||||
|
log.info("Finished saving graph edges (connections) to DB")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class DHTPeer:
|
||||||
|
node_id: str
|
||||||
|
address: str
|
||||||
|
udp_port: int
|
||||||
|
tcp_port: int = None
|
||||||
|
first_online: datetime.datetime = None
|
||||||
|
errors: int = None
|
||||||
|
last_churn: int = None
|
||||||
|
added_on: datetime.datetime = None
|
||||||
|
last_check: datetime.datetime = None
|
||||||
|
last_seen: datetime.datetime = None
|
||||||
|
latency: int = None
|
||||||
|
peer_id: int = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_kad_peer(cls, peer, peer_id):
|
||||||
|
node_id = peer.node_id.hex() if peer.node_id else None
|
||||||
|
return DHTPeer(
|
||||||
|
node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port,
|
||||||
|
peer_id=peer_id, added_on=datetime.datetime.utcnow())
|
||||||
|
|
||||||
|
def to_kad_peer(self):
|
||||||
|
node_id = bytes.fromhex(self.node_id) if self.node_id else None
|
||||||
|
return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port)
|
||||||
|
|
||||||
|
|
||||||
|
def new_node(address="0.0.0.0", udp_port=0, node_id=None):
|
||||||
|
node_id = node_id or generate_id()
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
return Node(loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address)
|
||||||
|
|
||||||
|
|
||||||
|
class Crawler:
|
||||||
|
unique_total_hosts_metric = Gauge(
|
||||||
|
"unique_total_hosts", "Number of unique hosts seen in the last interval", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
reachable_hosts_metric = Gauge(
|
||||||
|
"reachable_hosts", "Number of hosts that replied in the last interval", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
total_historic_hosts_metric = Gauge(
|
||||||
|
"history_total_hosts", "Number of hosts seen since first run.", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
pending_check_hosts_metric = Gauge(
|
||||||
|
"pending_hosts", "Number of hosts on queue to be checked.", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
hosts_with_errors_metric = Gauge(
|
||||||
|
"error_hosts", "Number of hosts that raised errors during contact.", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS = tuple(map(float, range(100))) + (
|
||||||
|
500., 1000., 2000., float('inf')
|
||||||
|
)
|
||||||
|
connections_found_metric = Histogram(
|
||||||
|
"connections_found", "Number of hosts returned by the last successful contact.", namespace="dht_crawler_node",
|
||||||
|
buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS
|
||||||
|
)
|
||||||
|
known_connections_found_metric = Histogram(
|
||||||
|
"known_connections_found", "Number of already known hosts returned by last contact.",
|
||||||
|
namespace="dht_crawler_node", buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS
|
||||||
|
)
|
||||||
|
reachable_connections_found_metric = Histogram(
|
||||||
|
"reachable_connections_found", "Number of reachable known hosts returned by last contact.",
|
||||||
|
namespace="dht_crawler_node", buckets=ROUTING_TABLE_SIZE_HISTOGRAM_BUCKETS
|
||||||
|
)
|
||||||
|
LATENCY_HISTOGRAM_BUCKETS = (
|
||||||
|
0., 5., 10., 15., 30., 60., 120., 180., 240., 300., 600., 1200., 1800., 4000., 6000., float('inf')
|
||||||
|
)
|
||||||
|
host_latency_metric = Histogram(
|
||||||
|
"host_latency", "Time spent on the last request, in milliseconds.", namespace="dht_crawler_node",
|
||||||
|
buckets=LATENCY_HISTOGRAM_BUCKETS
|
||||||
|
)
|
||||||
|
probed_streams_metric = Counter(
|
||||||
|
"probed_streams", "Amount of streams probed.", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
announced_streams_metric = Counter(
|
||||||
|
"announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
working_streams_metric = Counter(
|
||||||
|
"working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node",
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, db_path: str, sd_hash_samples: SDHashSamples):
|
||||||
|
self.node = new_node()
|
||||||
|
self.db = PeerStorage(db_path)
|
||||||
|
self.sd_hashes = sd_hash_samples
|
||||||
|
self._memory_peers = {}
|
||||||
|
self._reachable_by_node_id = {}
|
||||||
|
self._connections = {}
|
||||||
|
|
||||||
|
async def open(self):
|
||||||
|
await self.db.open()
|
||||||
|
self._memory_peers = {
|
||||||
|
(peer.address, peer.udp_port): peer for peer in await self.db.all_peers()
|
||||||
|
}
|
||||||
|
self.refresh_reachable_set()
|
||||||
|
|
||||||
|
def refresh_reachable_set(self):
|
||||||
|
self._reachable_by_node_id = {
|
||||||
|
bytes.fromhex(peer.node_id): peer for peer in self._memory_peers.values() if (peer.latency or 0) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
async def probe_files(self):
|
||||||
|
if not self.sd_hashes:
|
||||||
|
return
|
||||||
|
while True:
|
||||||
|
for sd_hash in self.sd_hashes.read_samples(10_000):
|
||||||
|
self.refresh_reachable_set()
|
||||||
|
distance = Distance(sd_hash)
|
||||||
|
node_ids = list(self._reachable_by_node_id.keys())
|
||||||
|
node_ids.sort(key=lambda node_id: distance(node_id))
|
||||||
|
k_closest = [self._reachable_by_node_id[node_id] for node_id in node_ids[:8]]
|
||||||
|
found = False
|
||||||
|
working = False
|
||||||
|
for response in asyncio.as_completed(
|
||||||
|
[self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]):
|
||||||
|
response = await response
|
||||||
|
if response and response.found:
|
||||||
|
found = True
|
||||||
|
blob_peers = []
|
||||||
|
for compact_addr in response.found_compact_addresses:
|
||||||
|
try:
|
||||||
|
blob_peers.append(decode_tcp_peer_from_compact_address(compact_addr))
|
||||||
|
except ValueError as e:
|
||||||
|
log.error("Error decoding compact peers: %s", e)
|
||||||
|
for blob_peer in blob_peers:
|
||||||
|
response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash)
|
||||||
|
if response:
|
||||||
|
working = True
|
||||||
|
log.info("Found responsive peer for %s: %s:%d(%d)",
|
||||||
|
sd_hash.hex()[:8], blob_peer.address,
|
||||||
|
blob_peer.udp_port or -1, blob_peer.tcp_port or -1)
|
||||||
|
else:
|
||||||
|
log.info("Found dead peer for %s: %s:%d(%d)",
|
||||||
|
sd_hash.hex()[:8], blob_peer.address,
|
||||||
|
blob_peer.udp_port or -1, blob_peer.tcp_port or -1)
|
||||||
|
self.probed_streams_metric.inc()
|
||||||
|
if found:
|
||||||
|
self.announced_streams_metric.inc()
|
||||||
|
if working:
|
||||||
|
self.working_streams_metric.inc()
|
||||||
|
log.info("Done querying stream %s for peers. Found: %s, working: %s", sd_hash.hex()[:8], found, working)
|
||||||
|
await asyncio.sleep(.5)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def refresh_limit(self):
|
||||||
|
return datetime.datetime.utcnow() - datetime.timedelta(hours=1)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def all_peers(self):
|
||||||
|
return [
|
||||||
|
peer for peer in self._memory_peers.values()
|
||||||
|
if (peer.last_seen and peer.last_seen > self.refresh_limit) or (peer.latency or 0) > 0
|
||||||
|
]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active_peers_count(self):
|
||||||
|
return len(self.all_peers)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def checked_peers_count(self):
|
||||||
|
return len([peer for peer in self.all_peers if peer.last_check and peer.last_check > self.refresh_limit])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unreachable_peers_count(self):
|
||||||
|
return len([peer for peer in self.all_peers
|
||||||
|
if peer.last_check and peer.last_check > self.refresh_limit and not peer.latency])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def peers_with_errors_count(self):
|
||||||
|
return len([peer for peer in self.all_peers if (peer.errors or 0) > 0])
|
||||||
|
|
||||||
|
def get_peers_needing_check(self):
|
||||||
|
to_check = [peer for peer in self.all_peers if peer.last_check is None or peer.last_check < self.refresh_limit]
|
||||||
|
return to_check
|
||||||
|
|
||||||
|
def remove_expired_peers(self):
|
||||||
|
for key, peer in list(self._memory_peers.items()):
|
||||||
|
if (peer.latency or 0) < 1 and peer.last_seen < self.refresh_limit:
|
||||||
|
del self._memory_peers[key]
|
||||||
|
|
||||||
|
def add_peers(self, *peers):
|
||||||
|
for peer in peers:
|
||||||
|
db_peer = self.get_from_peer(peer)
|
||||||
|
if db_peer and db_peer.node_id is None and peer.node_id is not None:
|
||||||
|
db_peer = replace(db_peer, node_id=peer.node_id.hex())
|
||||||
|
elif not db_peer:
|
||||||
|
db_peer = DHTPeer.from_kad_peer(peer, len(self._memory_peers) + 1)
|
||||||
|
db_peer = replace(db_peer, last_seen=datetime.datetime.utcnow())
|
||||||
|
self._memory_peers[(peer.address, peer.udp_port)] = db_peer
|
||||||
|
|
||||||
|
async def flush_to_db(self):
|
||||||
|
await self.db.save_peers(*self._memory_peers.values())
|
||||||
|
connections_to_save = self._connections
|
||||||
|
self._connections = {}
|
||||||
|
# await self.db.save_connections(connections_to_save) heavy call
|
||||||
|
self.remove_expired_peers()
|
||||||
|
|
||||||
|
def get_from_peer(self, peer):
|
||||||
|
return self._memory_peers.get((peer.address, peer.udp_port), None)
|
||||||
|
|
||||||
|
def set_latency(self, peer, latency=None):
|
||||||
|
if latency:
|
||||||
|
self.host_latency_metric.observe(latency / 1_000_000.0)
|
||||||
|
db_peer = self.get_from_peer(peer)
|
||||||
|
if not db_peer:
|
||||||
|
return
|
||||||
|
db_peer = replace(db_peer, latency=latency)
|
||||||
|
if not db_peer.node_id and peer.node_id:
|
||||||
|
db_peer = replace(db_peer, node_id=peer.node_id.hex())
|
||||||
|
if db_peer.first_online and latency is None:
|
||||||
|
db_peer = replace(db_peer, last_churn=(datetime.datetime.utcnow() - db_peer.first_online).seconds)
|
||||||
|
elif latency is not None and db_peer.first_online is None:
|
||||||
|
db_peer = replace(db_peer, first_online=datetime.datetime.utcnow())
|
||||||
|
db_peer = replace(db_peer, last_check=datetime.datetime.utcnow())
|
||||||
|
self._memory_peers[(db_peer.address, db_peer.udp_port)] = db_peer
|
||||||
|
|
||||||
|
def inc_errors(self, peer):
|
||||||
|
db_peer = self.get_from_peer(peer)
|
||||||
|
self._memory_peers[(peer.address, peer.node_id)] = replace(db_peer, errors=(db_peer.errors or 0) + 1)
|
||||||
|
|
||||||
|
def associate_peers(self, peer, other_peers):
|
||||||
|
self._connections[self.get_from_peer(peer).peer_id] = [
|
||||||
|
self.get_from_peer(other_peer).peer_id for other_peer in other_peers]
|
||||||
|
|
||||||
|
async def request_peers(self, host, port, node_id, key=None) -> typing.Optional[FindResponse]:
|
||||||
|
key = key or node_id
|
||||||
|
peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port)
|
||||||
|
for attempt in range(3):
|
||||||
|
try:
|
||||||
|
req_start = time.perf_counter_ns()
|
||||||
|
if key == node_id:
|
||||||
|
response = await self.node.protocol.get_rpc_peer(peer).find_node(key)
|
||||||
|
response = FindNodeResponse(key, response)
|
||||||
|
latency = time.perf_counter_ns() - req_start
|
||||||
|
self.set_latency(peer, latency)
|
||||||
|
else:
|
||||||
|
response = await self.node.protocol.get_rpc_peer(peer).find_value(key)
|
||||||
|
response = FindValueResponse(key, response)
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
return response
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
if key == node_id:
|
||||||
|
self.set_latency(peer, None)
|
||||||
|
continue
|
||||||
|
except lbry.dht.error.TransportNotConnected:
|
||||||
|
log.info("Transport unavailable, waiting 1s to retry")
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
except lbry.dht.error.RemoteException as e:
|
||||||
|
log.info('Peer errored: %s:%d attempt #%d - %s',
|
||||||
|
host, port, (attempt + 1), str(e))
|
||||||
|
if key == node_id:
|
||||||
|
self.inc_errors(peer)
|
||||||
|
self.set_latency(peer, None)
|
||||||
|
continue
|
||||||
|
|
||||||
|
async def crawl_routing_table(self, host, port, node_id=None):
|
||||||
|
start = time.time()
|
||||||
|
log.debug("querying %s:%d", host, port)
|
||||||
|
address = await resolve_host(host, port, 'udp')
|
||||||
|
key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
|
peer = make_kademlia_peer(key, address, port)
|
||||||
|
self.add_peers(peer)
|
||||||
|
if not key:
|
||||||
|
latency = None
|
||||||
|
for _ in range(3):
|
||||||
|
try:
|
||||||
|
ping_start = time.perf_counter_ns()
|
||||||
|
await self.node.protocol.get_rpc_peer(peer).ping()
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
|
peer = make_kademlia_peer(key, address, port)
|
||||||
|
latency = time.perf_counter_ns() - ping_start
|
||||||
|
break
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
except lbry.dht.error.RemoteException:
|
||||||
|
self.inc_errors(peer)
|
||||||
|
pass
|
||||||
|
self.set_latency(peer, latency if peer.node_id else None)
|
||||||
|
if not latency or not peer.node_id:
|
||||||
|
if latency and not peer.node_id:
|
||||||
|
log.warning("No node id from %s:%d", host, port)
|
||||||
|
return set()
|
||||||
|
distance = Distance(key)
|
||||||
|
max_distance = int.from_bytes(bytes([0xff] * 48), 'big')
|
||||||
|
peers = set()
|
||||||
|
factor = 2048
|
||||||
|
for i in range(1000):
|
||||||
|
response = await self.request_peers(address, port, key)
|
||||||
|
new_peers = list(response.get_close_kademlia_peers(peer)) if response else None
|
||||||
|
if not new_peers:
|
||||||
|
break
|
||||||
|
new_peers.sort(key=lambda peer: distance(peer.node_id))
|
||||||
|
peers.update(new_peers)
|
||||||
|
far_key = new_peers[-1].node_id
|
||||||
|
if distance(far_key) <= distance(key):
|
||||||
|
current_distance = distance(key)
|
||||||
|
next_jump = current_distance + int(max_distance // factor) # jump closer
|
||||||
|
factor /= 2
|
||||||
|
if factor > 8 and next_jump < max_distance:
|
||||||
|
key = int.from_bytes(peer.node_id, 'big') ^ next_jump
|
||||||
|
if key.bit_length() > 384:
|
||||||
|
break
|
||||||
|
key = key.to_bytes(48, 'big')
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
key = far_key
|
||||||
|
factor = 2048
|
||||||
|
if peers:
|
||||||
|
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
||||||
|
host, port, (time.time() - start), len(peers), i)
|
||||||
|
if peers:
|
||||||
|
self.connections_found_metric.observe(len(peers))
|
||||||
|
known_peers = 0
|
||||||
|
reachable_connections = 0
|
||||||
|
for peer in peers:
|
||||||
|
known_peer = self.get_from_peer(peer)
|
||||||
|
known_peers += 1 if known_peer else 0
|
||||||
|
reachable_connections += 1 if known_peer and (known_peer.latency or 0) > 0 else 0
|
||||||
|
self.known_connections_found_metric.observe(known_peers)
|
||||||
|
self.reachable_connections_found_metric.observe(reachable_connections)
|
||||||
|
self.add_peers(*peers)
|
||||||
|
self.associate_peers(peer, peers)
|
||||||
|
return peers
|
||||||
|
|
||||||
|
async def process(self):
|
||||||
|
to_process = {}
|
||||||
|
|
||||||
|
def submit(_peer):
|
||||||
|
f = asyncio.ensure_future(
|
||||||
|
self.crawl_routing_table(_peer.address, _peer.udp_port, bytes.fromhex(_peer.node_id)))
|
||||||
|
to_process[_peer.peer_id] = f
|
||||||
|
f.add_done_callback(lambda _: to_process.pop(_peer.peer_id))
|
||||||
|
|
||||||
|
to_check = self.get_peers_needing_check()
|
||||||
|
last_flush = datetime.datetime.utcnow()
|
||||||
|
while True:
|
||||||
|
for peer in to_check[:200]:
|
||||||
|
if peer.peer_id not in to_process:
|
||||||
|
submit(peer)
|
||||||
|
await asyncio.sleep(.05)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
self.unique_total_hosts_metric.set(self.checked_peers_count)
|
||||||
|
self.reachable_hosts_metric.set(self.checked_peers_count - self.unreachable_peers_count)
|
||||||
|
self.total_historic_hosts_metric.set(len(self._memory_peers))
|
||||||
|
self.pending_check_hosts_metric.set(len(to_check))
|
||||||
|
self.hosts_with_errors_metric.set(self.peers_with_errors_count)
|
||||||
|
log.info("%d known, %d contacted recently, %d unreachable, %d error, %d processing, %d on queue",
|
||||||
|
self.active_peers_count, self.checked_peers_count, self.unreachable_peers_count,
|
||||||
|
self.peers_with_errors_count, len(to_process), len(to_check))
|
||||||
|
if to_process:
|
||||||
|
await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
to_check = self.get_peers_needing_check()
|
||||||
|
if (datetime.datetime.utcnow() - last_flush).seconds > 60:
|
||||||
|
log.info("flushing to db")
|
||||||
|
await self.flush_to_db()
|
||||||
|
last_flush = datetime.datetime.utcnow()
|
||||||
|
while not to_check and not to_process:
|
||||||
|
port = self.node.listening_port.get_extra_info('socket').getsockname()[1]
|
||||||
|
self.node.stop()
|
||||||
|
await self.node.start_listening()
|
||||||
|
log.info("Idle, sleeping a minute. Port changed to %d", port)
|
||||||
|
await asyncio.sleep(60.0)
|
||||||
|
to_check = self.get_peers_needing_check()
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleMetrics:
|
||||||
|
def __init__(self, port):
|
||||||
|
self.prometheus_port = port
|
||||||
|
|
||||||
|
async def handle_metrics_get_request(self, _):
|
||||||
|
try:
|
||||||
|
return web.Response(
|
||||||
|
text=prom_generate_latest().decode(),
|
||||||
|
content_type='text/plain; version=0.0.4'
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
log.exception('could not generate prometheus data')
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
prom_app = web.Application()
|
||||||
|
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
||||||
|
metrics_runner = web.AppRunner(prom_app)
|
||||||
|
await metrics_runner.setup()
|
||||||
|
prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port)
|
||||||
|
await prom_site.start()
|
||||||
|
|
||||||
|
|
||||||
|
def dict_row_factory(cursor, row):
|
||||||
|
d = {}
|
||||||
|
for idx, col in enumerate(cursor.description):
|
||||||
|
if col[0] in ('added_on', 'first_online', 'last_seen', 'last_check'):
|
||||||
|
d[col[0]] = datetime.datetime.fromisoformat(row[idx]) if row[idx] else None
|
||||||
|
else:
|
||||||
|
d[col[0]] = row[idx]
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
async def test():
|
||||||
|
db_path = "/tmp/peers.db" if len(sys.argv) == 1 else sys.argv[-1]
|
||||||
|
asyncio.get_event_loop().set_debug(True)
|
||||||
|
metrics = SimpleMetrics('8080')
|
||||||
|
await metrics.start()
|
||||||
|
conf = Config()
|
||||||
|
hosting_samples = SDHashSamples("test.sample") if os.path.isfile("test.sample") else None
|
||||||
|
crawler = Crawler(db_path, hosting_samples)
|
||||||
|
await crawler.open()
|
||||||
|
await crawler.flush_to_db()
|
||||||
|
await crawler.node.start_listening()
|
||||||
|
if crawler.active_peers_count < 100:
|
||||||
|
probes = []
|
||||||
|
for (host, port) in conf.known_dht_nodes:
|
||||||
|
probes.append(asyncio.create_task(crawler.crawl_routing_table(host, port)))
|
||||||
|
await asyncio.gather(*probes)
|
||||||
|
await crawler.flush_to_db()
|
||||||
|
await asyncio.gather(crawler.process(), crawler.probe_files())
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
asyncio.run(test())
|
44
scripts/sd_hash_sampler.py
Normal file
44
scripts/sd_hash_sampler.py
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
import asyncio
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
from lbry.extras.daemon.client import daemon_rpc
|
||||||
|
from lbry.conf import Config
|
||||||
|
conf = Config()
|
||||||
|
|
||||||
|
|
||||||
|
async def sample_prefix(prefix: bytes):
|
||||||
|
result = await daemon_rpc(conf, "claim_search", sd_hash=prefix.hex(), page_size=50)
|
||||||
|
total_pages = result['total_pages']
|
||||||
|
print(total_pages)
|
||||||
|
sd_hashes = set()
|
||||||
|
for page in range(1, total_pages + 1):
|
||||||
|
if page > 1:
|
||||||
|
result = await daemon_rpc(conf, "claim_search", sd_hash=prefix.hex(), page=page, page_size=50)
|
||||||
|
for item in result['items']:
|
||||||
|
sd_hash = item.get('value', {}).get('source', {}).get('sd_hash')
|
||||||
|
if not sd_hash:
|
||||||
|
print('err', item)
|
||||||
|
continue
|
||||||
|
sd_hashes.add(sd_hash)
|
||||||
|
print('page', page, len(sd_hashes))
|
||||||
|
return sd_hashes
|
||||||
|
|
||||||
|
|
||||||
|
def save_sample(name: str, samples: Iterable[str]):
|
||||||
|
with open(name, 'wb') as outfile:
|
||||||
|
for sample in samples:
|
||||||
|
outfile.write(bytes.fromhex(sample))
|
||||||
|
outfile.flush()
|
||||||
|
print(outfile.tell())
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
samples = set()
|
||||||
|
futs = [asyncio.ensure_future(sample_prefix(bytes([i]))) for i in range(256)]
|
||||||
|
for i, completed in enumerate(asyncio.as_completed(futs)):
|
||||||
|
samples.update(await completed)
|
||||||
|
print(i, len(samples))
|
||||||
|
print(save_sample("test.sample", samples))
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
Loading…
Reference in a new issue