forked from LBRYCommunity/lbry-sdk
add sd_hash prober
This commit is contained in:
parent
13af7800c2
commit
0d6125de0b
4 changed files with 115 additions and 28 deletions
|
@ -153,9 +153,10 @@ class PeerManager:
|
||||||
def peer_is_good(self, peer: 'KademliaPeer'):
|
def peer_is_good(self, peer: 'KademliaPeer'):
|
||||||
return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port)
|
return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port)
|
||||||
|
|
||||||
def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use
|
|
||||||
node_id, address, tcp_port = decode_compact_address(compact_address)
|
def decode_tcp_peer_from_compact_address(compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use
|
||||||
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)
|
node_id, address, tcp_port = decode_compact_address(compact_address)
|
||||||
|
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(unsafe_hash=True)
|
@dataclass(unsafe_hash=True)
|
||||||
|
|
|
@ -8,7 +8,7 @@ from typing import TYPE_CHECKING
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.error import RemoteException, TransportNotConnected
|
from lbry.dht.error import RemoteException, TransportNotConnected
|
||||||
from lbry.dht.protocol.distance import Distance
|
from lbry.dht.protocol.distance import Distance
|
||||||
from lbry.dht.peer import make_kademlia_peer
|
from lbry.dht.peer import make_kademlia_peer, decode_tcp_peer_from_compact_address
|
||||||
from lbry.dht.serialization.datagram import PAGE_KEY
|
from lbry.dht.serialization.datagram import PAGE_KEY
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -26,6 +26,15 @@ class FindResponse:
|
||||||
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
|
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def get_close_kademlia_peers(self, peer_info) -> typing.Generator[typing.Iterator['KademliaPeer'], None, None]:
|
||||||
|
for contact_triple in self.get_close_triples():
|
||||||
|
node_id, address, udp_port = contact_triple
|
||||||
|
try:
|
||||||
|
yield make_kademlia_peer(node_id, address, udp_port)
|
||||||
|
except ValueError:
|
||||||
|
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer_info.address,
|
||||||
|
peer_info.udp_port, address, udp_port)
|
||||||
|
|
||||||
|
|
||||||
class FindNodeResponse(FindResponse):
|
class FindNodeResponse(FindResponse):
|
||||||
def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
|
def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
|
||||||
|
@ -125,13 +134,8 @@ class IterativeFinder(AsyncIterator):
|
||||||
|
|
||||||
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
||||||
self._add_active(peer)
|
self._add_active(peer)
|
||||||
for contact_triple in response.get_close_triples():
|
for new_peer in response.get_close_kademlia_peers(peer):
|
||||||
node_id, address, udp_port = contact_triple
|
self._add_active(new_peer)
|
||||||
try:
|
|
||||||
self._add_active(make_kademlia_peer(node_id, address, udp_port))
|
|
||||||
except ValueError:
|
|
||||||
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
|
|
||||||
peer.udp_port, address, udp_port)
|
|
||||||
self.check_result_ready(response)
|
self.check_result_ready(response)
|
||||||
self._log_state(reason="check result")
|
self._log_state(reason="check result")
|
||||||
|
|
||||||
|
@ -319,7 +323,7 @@ class IterativeValueFinder(IterativeFinder):
|
||||||
decoded_peers = set()
|
decoded_peers = set()
|
||||||
for compact_addr in parsed.found_compact_addresses:
|
for compact_addr in parsed.found_compact_addresses:
|
||||||
try:
|
try:
|
||||||
decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr))
|
decoded_peers.add(decode_tcp_peer_from_compact_address(compact_addr))
|
||||||
except ValueError:
|
except ValueError:
|
||||||
log.warning("misbehaving peer %s:%i returned invalid peer for blob",
|
log.warning("misbehaving peer %s:%i returned invalid peer for blob",
|
||||||
peer.address, peer.udp_port)
|
peer.address, peer.udp_port)
|
||||||
|
@ -341,7 +345,7 @@ class IterativeValueFinder(IterativeFinder):
|
||||||
|
|
||||||
def check_result_ready(self, response: FindValueResponse):
|
def check_result_ready(self, response: FindValueResponse):
|
||||||
if response.found:
|
if response.found:
|
||||||
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
|
blob_peers = [decode_tcp_peer_from_compact_address(compact_addr)
|
||||||
for compact_addr in response.found_compact_addresses]
|
for compact_addr in response.found_compact_addresses]
|
||||||
to_yield = []
|
to_yield = []
|
||||||
for blob_peer in blob_peers:
|
for blob_peer in blob_peers:
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
from lbry.conf import Config
|
|
||||||
from lbry.extras.cli import execute_command
|
from lbry.extras.cli import execute_command
|
||||||
|
from lbry.conf import Config
|
||||||
|
|
||||||
|
|
||||||
def daemon_rpc(conf: Config, method: str, **kwargs):
|
def daemon_rpc(conf: Config, method: str, **kwargs):
|
||||||
|
|
|
@ -1,18 +1,21 @@
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import os.path
|
||||||
|
import random
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
from dataclasses import dataclass, astuple, replace
|
from dataclasses import dataclass, astuple, replace
|
||||||
|
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
from prometheus_client import Gauge, generate_latest as prom_generate_latest
|
from prometheus_client import Gauge, generate_latest as prom_generate_latest, Counter
|
||||||
|
|
||||||
import lbry.dht.error
|
import lbry.dht.error
|
||||||
from lbry.dht.constants import generate_id
|
from lbry.dht.constants import generate_id
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
from lbry.dht.peer import make_kademlia_peer, PeerManager
|
from lbry.dht.peer import make_kademlia_peer, PeerManager, decode_tcp_peer_from_compact_address
|
||||||
from lbry.dht.protocol.distance import Distance
|
from lbry.dht.protocol.distance import Distance
|
||||||
|
from lbry.dht.protocol.iterative_find import FindValueResponse, FindNodeResponse, FindResponse
|
||||||
from lbry.extras.daemon.storage import SQLiteMixin
|
from lbry.extras.daemon.storage import SQLiteMixin
|
||||||
from lbry.conf import Config
|
from lbry.conf import Config
|
||||||
from lbry.utils import resolve_host
|
from lbry.utils import resolve_host
|
||||||
|
@ -22,6 +25,19 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(na
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SDHashSamples:
|
||||||
|
def __init__(self, samples_file_path):
|
||||||
|
with open(samples_file_path, "rb") as sample_file:
|
||||||
|
self._samples = sample_file.read()
|
||||||
|
assert len(self._samples) % 48 == 0
|
||||||
|
self.size = len(self._samples) // 48
|
||||||
|
|
||||||
|
def read_samples(self, count=1):
|
||||||
|
for _ in range(count):
|
||||||
|
offset = 48 * random.randrange(0, self.size)
|
||||||
|
yield self._samples[offset:offset + 48]
|
||||||
|
|
||||||
|
|
||||||
class PeerStorage(SQLiteMixin):
|
class PeerStorage(SQLiteMixin):
|
||||||
CREATE_TABLES_QUERY = """
|
CREATE_TABLES_QUERY = """
|
||||||
PRAGMA JOURNAL_MODE=WAL;
|
PRAGMA JOURNAL_MODE=WAL;
|
||||||
|
@ -139,11 +155,25 @@ class Crawler:
|
||||||
"host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node",
|
"host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node",
|
||||||
labelnames=("host", "port")
|
labelnames=("host", "port")
|
||||||
)
|
)
|
||||||
|
probed_streams_metric = Counter(
|
||||||
|
"probed_streams", "Amount of streams probed.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("sd_hash",)
|
||||||
|
)
|
||||||
|
announced_streams_metric = Counter(
|
||||||
|
"announced_streams", "Amount of streams where announcements were found.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("sd_hash",)
|
||||||
|
)
|
||||||
|
working_streams_metric = Counter(
|
||||||
|
"working_streams", "Amount of streams with reachable hosts.", namespace="dht_crawler_node",
|
||||||
|
labelnames=("sd_hash",)
|
||||||
|
)
|
||||||
|
|
||||||
def __init__(self, db_path: str):
|
def __init__(self, db_path: str, sd_hash_samples: SDHashSamples):
|
||||||
self.node = new_node()
|
self.node = new_node()
|
||||||
self.db = PeerStorage(db_path)
|
self.db = PeerStorage(db_path)
|
||||||
|
self.sd_hashes = sd_hash_samples
|
||||||
self._memory_peers = {}
|
self._memory_peers = {}
|
||||||
|
self._reachable_by_node_id = {}
|
||||||
self._connections = {}
|
self._connections = {}
|
||||||
|
|
||||||
async def open(self):
|
async def open(self):
|
||||||
|
@ -151,6 +181,46 @@ class Crawler:
|
||||||
self._memory_peers = {
|
self._memory_peers = {
|
||||||
(peer.address, peer.udp_port): peer for peer in await self.db.all_peers()
|
(peer.address, peer.udp_port): peer for peer in await self.db.all_peers()
|
||||||
}
|
}
|
||||||
|
self.refresh_reachable_set()
|
||||||
|
|
||||||
|
def refresh_reachable_set(self):
|
||||||
|
self._reachable_by_node_id = {
|
||||||
|
bytes.fromhex(peer.node_id): peer for peer in self._memory_peers.values() if (peer.latency or 0) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
async def probe_files(self):
|
||||||
|
if not self.sd_hashes:
|
||||||
|
return
|
||||||
|
while True:
|
||||||
|
for sd_hash in self.sd_hashes.read_samples(10_000):
|
||||||
|
self.refresh_reachable_set()
|
||||||
|
print(sd_hash.hex())
|
||||||
|
distance = Distance(sd_hash)
|
||||||
|
node_ids = list(self._reachable_by_node_id.keys())
|
||||||
|
node_ids.sort(key=lambda node_id: distance(node_id))
|
||||||
|
k_closest = [self._reachable_by_node_id[node_id] for node_id in node_ids[:8]]
|
||||||
|
for response in asyncio.as_completed(
|
||||||
|
[self.request_peers(peer.address, peer.udp_port, peer.node_id, sd_hash) for peer in k_closest]):
|
||||||
|
response = await response
|
||||||
|
self.probed_streams_metric.labels(sd_hash).inc()
|
||||||
|
if response and response.found:
|
||||||
|
self.announced_streams_metric.labels(sd_hash).inc()
|
||||||
|
blob_peers = [decode_tcp_peer_from_compact_address(compact_addr)
|
||||||
|
for compact_addr in response.found_compact_addresses]
|
||||||
|
print('FOUND', blob_peers, response.pages)
|
||||||
|
for blob_peer in blob_peers:
|
||||||
|
response = await self.request_peers(blob_peer.address, blob_peer.tcp_port, blob_peer.node_id, sd_hash)
|
||||||
|
if response:
|
||||||
|
self.working_streams_metric.labels(sd_hash).inc()
|
||||||
|
print('ALIVE', blob_peer.address)
|
||||||
|
if response.found:
|
||||||
|
blob_peers = [decode_tcp_peer_from_compact_address(compact_addr)
|
||||||
|
for compact_addr in response.found_compact_addresses]
|
||||||
|
print('REPLIED+FOUND', blob_peers, response.pages)
|
||||||
|
else:
|
||||||
|
print('DEAD', blob_peer.address, blob_peer.tcp_port)
|
||||||
|
else:
|
||||||
|
print('NOT FOUND', response)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def refresh_limit(self):
|
def refresh_limit(self):
|
||||||
|
@ -206,7 +276,10 @@ class Crawler:
|
||||||
def set_latency(self, peer, latency=None):
|
def set_latency(self, peer, latency=None):
|
||||||
if latency:
|
if latency:
|
||||||
self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency)
|
self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency)
|
||||||
db_peer = replace(self.get_from_peer(peer), latency=latency)
|
db_peer = self.get_from_peer(peer)
|
||||||
|
if not db_peer:
|
||||||
|
return
|
||||||
|
db_peer = replace(db_peer, latency=latency)
|
||||||
if not db_peer.node_id and peer.node_id:
|
if not db_peer.node_id and peer.node_id:
|
||||||
db_peer = replace(db_peer, node_id=peer.node_id.hex())
|
db_peer = replace(db_peer, node_id=peer.node_id.hex())
|
||||||
if db_peer.first_online and latency is None:
|
if db_peer.first_online and latency is None:
|
||||||
|
@ -224,16 +297,22 @@ class Crawler:
|
||||||
self._connections[self.get_from_peer(peer).peer_id] = [
|
self._connections[self.get_from_peer(peer).peer_id] = [
|
||||||
self.get_from_peer(other_peer).peer_id for other_peer in other_peers]
|
self.get_from_peer(other_peer).peer_id for other_peer in other_peers]
|
||||||
|
|
||||||
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
|
async def request_peers(self, host, port, node_id, key=None) -> typing.Optional[FindResponse]:
|
||||||
|
key = key or node_id
|
||||||
peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port)
|
peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port)
|
||||||
for attempt in range(3):
|
for attempt in range(3):
|
||||||
try:
|
try:
|
||||||
req_start = time.perf_counter_ns()
|
req_start = time.perf_counter_ns()
|
||||||
response = await self.node.protocol.get_rpc_peer(peer).find_node(key)
|
if key == node_id:
|
||||||
|
response = await self.node.protocol.get_rpc_peer(peer).find_node(key)
|
||||||
|
response = FindNodeResponse(key, response)
|
||||||
|
else:
|
||||||
|
response = await self.node.protocol.get_rpc_peer(peer).find_value(key)
|
||||||
|
response = FindValueResponse(key, response)
|
||||||
await asyncio.sleep(0.05)
|
await asyncio.sleep(0.05)
|
||||||
latency = time.perf_counter_ns() - req_start
|
latency = time.perf_counter_ns() - req_start
|
||||||
self.set_latency(peer, latency)
|
self.set_latency(peer, latency)
|
||||||
return [make_kademlia_peer(*peer_tuple) for peer_tuple in response]
|
return response
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
self.set_latency(peer, None)
|
self.set_latency(peer, None)
|
||||||
continue
|
continue
|
||||||
|
@ -243,11 +322,10 @@ class Crawler:
|
||||||
self.inc_errors(peer)
|
self.inc_errors(peer)
|
||||||
self.set_latency(peer, None)
|
self.set_latency(peer, None)
|
||||||
continue
|
continue
|
||||||
return []
|
|
||||||
|
|
||||||
async def crawl_routing_table(self, host, port, node_id=None):
|
async def crawl_routing_table(self, host, port, node_id=None):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
log.info("querying %s:%d", host, port)
|
log.debug("querying %s:%d", host, port)
|
||||||
address = await resolve_host(host, port, 'udp')
|
address = await resolve_host(host, port, 'udp')
|
||||||
key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
peer = make_kademlia_peer(key, address, port)
|
peer = make_kademlia_peer(key, address, port)
|
||||||
|
@ -278,7 +356,8 @@ class Crawler:
|
||||||
peers = set()
|
peers = set()
|
||||||
factor = 2048
|
factor = 2048
|
||||||
for i in range(200):
|
for i in range(200):
|
||||||
new_peers = await self.request_peers(address, port, key)
|
response = await self.request_peers(address, port, key)
|
||||||
|
new_peers = list(response.get_close_kademlia_peers(peer)) if response else None
|
||||||
if not new_peers:
|
if not new_peers:
|
||||||
break
|
break
|
||||||
new_peers.sort(key=lambda peer: distance(peer.node_id))
|
new_peers.sort(key=lambda peer: distance(peer.node_id))
|
||||||
|
@ -298,8 +377,9 @@ class Crawler:
|
||||||
else:
|
else:
|
||||||
key = far_key
|
key = far_key
|
||||||
factor = 2048
|
factor = 2048
|
||||||
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
if peers:
|
||||||
host, port, (time.time() - start), len(peers), i)
|
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
||||||
|
host, port, (time.time() - start), len(peers), i)
|
||||||
self.add_peers(*peers)
|
self.add_peers(*peers)
|
||||||
if peers:
|
if peers:
|
||||||
self.connections_found_metric.labels(host=host, port=port).set(len(peers))
|
self.connections_found_metric.labels(host=host, port=port).set(len(peers))
|
||||||
|
@ -384,14 +464,16 @@ async def test():
|
||||||
asyncio.get_event_loop().set_debug(True)
|
asyncio.get_event_loop().set_debug(True)
|
||||||
metrics = SimpleMetrics('8080')
|
metrics = SimpleMetrics('8080')
|
||||||
await metrics.start()
|
await metrics.start()
|
||||||
crawler = Crawler("/tmp/a.db")
|
conf = Config()
|
||||||
|
hosting_samples = SDHashSamples("test.sample") if os.path.isfile("test.sample") else None
|
||||||
|
crawler = Crawler("/tmp/a.db", hosting_samples)
|
||||||
await crawler.open()
|
await crawler.open()
|
||||||
await crawler.flush_to_db()
|
await crawler.flush_to_db()
|
||||||
await crawler.node.start_listening()
|
await crawler.node.start_listening()
|
||||||
conf = Config()
|
|
||||||
if crawler.active_peers_count < 100:
|
if crawler.active_peers_count < 100:
|
||||||
for (host, port) in conf.known_dht_nodes:
|
for (host, port) in conf.known_dht_nodes:
|
||||||
await crawler.crawl_routing_table(host, port)
|
await crawler.crawl_routing_table(host, port)
|
||||||
|
probe_task = asyncio.ensure_future(crawler.probe_files())
|
||||||
await crawler.process()
|
await crawler.process()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
Loading…
Reference in a new issue