forked from LBRYCommunity/lbry-sdk
dht_crawler: skip ping if known node_id
This commit is contained in:
parent
508bdb8e94
commit
0497698c5b
1 changed files with 31 additions and 24 deletions
|
@ -136,7 +136,7 @@ class Crawler:
|
||||||
def set_latency(self, peer, latency=None):
|
def set_latency(self, peer, latency=None):
|
||||||
db_peer = self.get_from_peer(peer)
|
db_peer = self.get_from_peer(peer)
|
||||||
db_peer.latency = latency
|
db_peer.latency = latency
|
||||||
if not db_peer.node_id:
|
if not db_peer.node_id and peer.node_id:
|
||||||
db_peer.node_id = peer.node_id.hex()
|
db_peer.node_id = peer.node_id.hex()
|
||||||
if db_peer.first_online and latency is None:
|
if db_peer.first_online and latency is None:
|
||||||
db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds
|
db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds
|
||||||
|
@ -156,42 +156,48 @@ class Crawler:
|
||||||
peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port)
|
peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port)
|
||||||
for attempt in range(3):
|
for attempt in range(3):
|
||||||
try:
|
try:
|
||||||
|
req_start = time.perf_counter_ns()
|
||||||
response = await self.node.protocol.get_rpc_peer(peer).find_node(key)
|
response = await self.node.protocol.get_rpc_peer(peer).find_node(key)
|
||||||
|
latency = time.perf_counter_ns() - req_start
|
||||||
|
self.set_latency(make_kademlia_peer(key, host, port), latency)
|
||||||
return [make_kademlia_peer(*peer_tuple) for peer_tuple in response]
|
return [make_kademlia_peer(*peer_tuple) for peer_tuple in response]
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log.info('Previously responding peer timed out: %s:%d attempt #%d', host, port, (attempt + 1))
|
self.set_latency(make_kademlia_peer(key, host, port), None)
|
||||||
continue
|
continue
|
||||||
except lbry.dht.error.RemoteException as e:
|
except lbry.dht.error.RemoteException as e:
|
||||||
log.info('Previously responding peer errored: %s:%d attempt #%d - %s',
|
log.info('Peer errored: %s:%d attempt #%d - %s',
|
||||||
host, port, (attempt + 1), str(e))
|
host, port, (attempt + 1), str(e))
|
||||||
self.inc_errors(peer)
|
self.inc_errors(peer)
|
||||||
|
self.set_latency(make_kademlia_peer(key, host, port), None)
|
||||||
continue
|
continue
|
||||||
return []
|
return []
|
||||||
|
|
||||||
async def crawl_routing_table(self, host, port):
|
async def crawl_routing_table(self, host, port, node_id=None):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
log.info("querying %s:%d", host, port)
|
log.info("querying %s:%d", host, port)
|
||||||
address = await resolve_host(host, port, 'udp')
|
address = await resolve_host(host, port, 'udp')
|
||||||
self.add_peers(make_kademlia_peer(None, address, port))
|
self.add_peers(make_kademlia_peer(None, address, port))
|
||||||
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
latency = None
|
if not key:
|
||||||
for _ in range(3):
|
latency = None
|
||||||
try:
|
for _ in range(3):
|
||||||
ping_start = time.perf_counter_ns()
|
try:
|
||||||
async with self.semaphore:
|
ping_start = time.perf_counter_ns()
|
||||||
await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping()
|
async with self.semaphore:
|
||||||
key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping()
|
||||||
latency = time.perf_counter_ns() - ping_start
|
key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
except asyncio.TimeoutError:
|
latency = time.perf_counter_ns() - ping_start
|
||||||
pass
|
break
|
||||||
except lbry.dht.error.RemoteException:
|
except asyncio.TimeoutError:
|
||||||
self.inc_errors(make_kademlia_peer(None, address, port))
|
pass
|
||||||
pass
|
except lbry.dht.error.RemoteException:
|
||||||
self.set_latency(make_kademlia_peer(key, address, port), latency if key else None)
|
self.inc_errors(make_kademlia_peer(None, address, port))
|
||||||
if not latency or not key:
|
pass
|
||||||
if latency and not key:
|
self.set_latency(make_kademlia_peer(key, address, port), latency if key else None)
|
||||||
log.warning("No node id from %s:%d", host, port)
|
if not latency or not key:
|
||||||
return set()
|
if latency and not key:
|
||||||
|
log.warning("No node id from %s:%d", host, port)
|
||||||
|
return set()
|
||||||
node_id = key
|
node_id = key
|
||||||
distance = Distance(key)
|
distance = Distance(key)
|
||||||
max_distance = int.from_bytes(bytes([0xff] * 48), 'big')
|
max_distance = int.from_bytes(bytes([0xff] * 48), 'big')
|
||||||
|
@ -229,7 +235,8 @@ class Crawler:
|
||||||
to_process = {}
|
to_process = {}
|
||||||
|
|
||||||
def submit(_peer):
|
def submit(_peer):
|
||||||
f = asyncio.ensure_future(self.crawl_routing_table(_peer.address, peer.udp_port))
|
f = asyncio.ensure_future(
|
||||||
|
self.crawl_routing_table(_peer.address, peer.udp_port, bytes.fromhex(peer.node_id)))
|
||||||
to_process[_peer] = f
|
to_process[_peer] = f
|
||||||
f.add_done_callback(lambda _: to_process.pop(_peer))
|
f.add_done_callback(lambda _: to_process.pop(_peer))
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue