Compare commits
33 commits
master
...
temp_dht_s
Author | SHA1 | Date | |
---|---|---|---|
|
f4fa217b71 | ||
|
f68ea01056 | ||
|
6fb1443e63 | ||
|
af7574dc9d | ||
|
af9cc457ec | ||
|
2a1e1d542f | ||
|
f5b3e9bacd | ||
|
d00b5befbf | ||
|
c1e64df528 | ||
|
3e79dcd179 | ||
|
cc104369cb | ||
|
586b09c1bc | ||
|
b574fb7771 | ||
|
51be734a08 | ||
|
0120d989d8 | ||
|
12f156257e | ||
|
dcde0e78e3 | ||
|
dc1c0e6851 | ||
|
c45f27d5cc | ||
|
6335590b65 | ||
|
b7b8831109 | ||
|
2ed23fbc4b | ||
|
023cfb593a | ||
|
2884dba52d | ||
|
44c4b03d44 | ||
|
6ba8f96511 | ||
|
4987f57944 | ||
|
f5bf8b8684 | ||
|
809a8c1226 | ||
|
e319b55db5 | ||
|
f274562c92 | ||
|
d1bc981b11 | ||
|
53d78e9194 |
15 changed files with 221 additions and 207 deletions
|
@ -622,7 +622,7 @@ class Config(CLIConfig):
|
|||
"Routing table bucket index below which we always split the bucket if given a new key to add to it and "
|
||||
"the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
|
||||
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
|
||||
"use.", 1
|
||||
"use.", 2
|
||||
)
|
||||
|
||||
# protocol timeouts
|
||||
|
|
|
@ -27,21 +27,24 @@ class BlobAnnouncer:
|
|||
self.storage = storage
|
||||
self.announce_task: asyncio.Task = None
|
||||
self.announce_queue: typing.List[str] = []
|
||||
self._done = asyncio.Event()
|
||||
self.announced = set()
|
||||
|
||||
async def _submit_announcement(self, blob_hash):
|
||||
try:
|
||||
|
||||
peers = len(await self.node.announce_blob(blob_hash))
|
||||
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
|
||||
if peers > 4:
|
||||
return blob_hash
|
||||
else:
|
||||
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||
except Exception as err:
|
||||
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise err
|
||||
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||
async def _run_consumer(self):
|
||||
while self.announce_queue:
|
||||
try:
|
||||
blob_hash = self.announce_queue.pop()
|
||||
peers = len(await self.node.announce_blob(blob_hash))
|
||||
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
|
||||
if peers > 4:
|
||||
self.announced.add(blob_hash)
|
||||
else:
|
||||
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||
except Exception as err:
|
||||
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||
raise err
|
||||
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||
|
||||
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
||||
while batch_size:
|
||||
|
@ -56,14 +59,14 @@ class BlobAnnouncer:
|
|||
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
||||
while len(self.announce_queue) > 0:
|
||||
log.info("%i blobs to announce", len(self.announce_queue))
|
||||
announced = await asyncio.gather(*[
|
||||
self._submit_announcement(
|
||||
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
|
||||
], loop=self.loop)
|
||||
announced = list(filter(None, announced))
|
||||
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
|
||||
announced = list(filter(None, self.announced))
|
||||
if announced:
|
||||
await self.storage.update_last_announced_blobs(announced)
|
||||
log.info("announced %i blobs", len(announced))
|
||||
self.announced.clear()
|
||||
self._done.set()
|
||||
self._done.clear()
|
||||
|
||||
def start(self, batch_size: typing.Optional[int] = 10):
|
||||
assert not self.announce_task or self.announce_task.done(), "already running"
|
||||
|
@ -72,3 +75,6 @@ class BlobAnnouncer:
|
|||
def stop(self):
|
||||
if self.announce_task and not self.announce_task.done():
|
||||
self.announce_task.cancel()
|
||||
|
||||
def wait(self):
|
||||
return self._done.wait()
|
||||
|
|
|
@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300 # 5 minutes
|
|||
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
|
||||
RPC_ID_LENGTH = 20
|
||||
PROTOCOL_VERSION = 1
|
||||
BOTTOM_OUT_LIMIT = 3
|
||||
MSG_SIZE_LIMIT = 1400
|
||||
|
||||
|
||||
|
|
|
@ -202,25 +202,23 @@ class Node:
|
|||
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
|
||||
|
||||
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
||||
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
|
||||
max_results: int = constants.K) -> IterativeNodeFinder:
|
||||
|
||||
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
||||
key, bottom_out_limit, max_results, None, shortlist)
|
||||
key, max_results, None, shortlist)
|
||||
|
||||
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
||||
bottom_out_limit: int = 40,
|
||||
max_results: int = -1) -> IterativeValueFinder:
|
||||
|
||||
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
||||
key, bottom_out_limit, max_results, None, shortlist)
|
||||
key, max_results, None, shortlist)
|
||||
|
||||
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
|
||||
bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
|
||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
|
||||
) -> typing.List['KademliaPeer']:
|
||||
peers = []
|
||||
async for iteration_peers in self.get_iterative_node_finder(
|
||||
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
|
||||
node_id, shortlist=shortlist, max_results=max_results):
|
||||
peers.extend(iteration_peers)
|
||||
distance = Distance(node_id)
|
||||
peers.sort(key=lambda peer: distance(peer.node_id))
|
||||
|
|
|
@ -190,3 +190,6 @@ class KademliaPeer:
|
|||
|
||||
def compact_ip(self):
|
||||
return make_compact_ip(self.address)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import asyncio
|
||||
from itertools import chain
|
||||
from collections import defaultdict
|
||||
from collections import defaultdict, OrderedDict
|
||||
import typing
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
|
|||
class IterativeFinder:
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
||||
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
|
||||
max_results: typing.Optional[int] = constants.K,
|
||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||
if len(key) != constants.HASH_LENGTH:
|
||||
|
@ -85,28 +85,22 @@ class IterativeFinder:
|
|||
self.protocol = protocol
|
||||
|
||||
self.key = key
|
||||
self.bottom_out_limit = bottom_out_limit
|
||||
self.max_results = max_results
|
||||
self.max_results = max(constants.K, max_results)
|
||||
self.exclude = exclude or []
|
||||
|
||||
self.active: typing.Set['KademliaPeer'] = set()
|
||||
self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
|
||||
self.contacted: typing.Set['KademliaPeer'] = set()
|
||||
self.distance = Distance(key)
|
||||
|
||||
self.closest_peer: typing.Optional['KademliaPeer'] = None
|
||||
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
|
||||
|
||||
self.iteration_queue = asyncio.Queue(loop=self.loop)
|
||||
|
||||
self.running_probes: typing.Set[asyncio.Task] = set()
|
||||
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
|
||||
self.iteration_count = 0
|
||||
self.bottom_out_count = 0
|
||||
self.running = False
|
||||
self.tasks: typing.List[asyncio.Task] = []
|
||||
self.delayed_calls: typing.List[asyncio.Handle] = []
|
||||
for peer in get_shortlist(routing_table, key, shortlist):
|
||||
if peer.node_id:
|
||||
self._add_active(peer)
|
||||
self._add_active(peer, force=True)
|
||||
else:
|
||||
# seed nodes
|
||||
self._schedule_probe(peer)
|
||||
|
@ -138,15 +132,14 @@ class IterativeFinder:
|
|||
"""
|
||||
return []
|
||||
|
||||
def _is_closer(self, peer: 'KademliaPeer') -> bool:
|
||||
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
|
||||
|
||||
def _add_active(self, peer):
|
||||
def _add_active(self, peer, force=False):
|
||||
if not force and self.peer_manager.peer_is_good(peer) is False:
|
||||
return
|
||||
if peer in self.contacted:
|
||||
return
|
||||
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
|
||||
self.active.add(peer)
|
||||
if self._is_closer(peer):
|
||||
self.prev_closest_peer = self.closest_peer
|
||||
self.closest_peer = peer
|
||||
self.active[peer] = self.distance(peer.node_id)
|
||||
self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))
|
||||
|
||||
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
|
||||
self._add_active(peer)
|
||||
|
@ -158,33 +151,43 @@ class IterativeFinder:
|
|||
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
|
||||
peer.udp_port, address, udp_port)
|
||||
self.check_result_ready(response)
|
||||
self._log_state()
|
||||
|
||||
def _reset_closest(self, peer):
|
||||
if peer in self.active:
|
||||
del self.active[peer]
|
||||
|
||||
async def _send_probe(self, peer: 'KademliaPeer'):
|
||||
try:
|
||||
response = await self.send_probe(peer)
|
||||
except asyncio.TimeoutError:
|
||||
self.active.discard(peer)
|
||||
self._reset_closest(peer)
|
||||
return
|
||||
except ValueError as err:
|
||||
log.warning(str(err))
|
||||
self.active.discard(peer)
|
||||
self._reset_closest(peer)
|
||||
return
|
||||
except TransportNotConnected:
|
||||
return self.aclose()
|
||||
except RemoteException:
|
||||
self._reset_closest(peer)
|
||||
return
|
||||
return await self._handle_probe_result(peer, response)
|
||||
|
||||
async def _search_round(self):
|
||||
def _search_round(self):
|
||||
"""
|
||||
Send up to constants.alpha (5) probes to closest active peers
|
||||
"""
|
||||
|
||||
added = 0
|
||||
to_probe = list(self.active - self.contacted)
|
||||
to_probe.sort(key=lambda peer: self.distance(self.key))
|
||||
for peer in to_probe:
|
||||
if added >= constants.ALPHA:
|
||||
for index, peer in enumerate(self.active.keys()):
|
||||
if index == 0:
|
||||
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
|
||||
if peer in self.contacted:
|
||||
continue
|
||||
if len(self.running_probes) >= constants.ALPHA:
|
||||
break
|
||||
if index > (constants.K + len(self.running_probes)):
|
||||
break
|
||||
origin_address = (peer.address, peer.udp_port)
|
||||
if origin_address in self.exclude:
|
||||
|
@ -206,33 +209,22 @@ class IterativeFinder:
|
|||
t = self.loop.create_task(self._send_probe(peer))
|
||||
|
||||
def callback(_):
|
||||
self.running_probes.difference_update({
|
||||
probe for probe in self.running_probes if probe.done() or probe == t
|
||||
})
|
||||
if not self.running_probes:
|
||||
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
|
||||
self.running_probes.pop(peer, None)
|
||||
if self.running:
|
||||
self._search_round()
|
||||
|
||||
t.add_done_callback(callback)
|
||||
self.running_probes.add(t)
|
||||
self.running_probes[peer] = t
|
||||
|
||||
async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
|
||||
try:
|
||||
if self.running:
|
||||
await self._search_round()
|
||||
if self.running:
|
||||
self.delayed_calls.append(self.loop.call_later(delay, self._search))
|
||||
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
|
||||
if self.running:
|
||||
self.loop.call_soon(self.aclose)
|
||||
|
||||
def _search(self):
|
||||
self.tasks.append(self.loop.create_task(self._search_task()))
|
||||
def _log_state(self):
|
||||
log.debug("[%s] check result: %i active nodes %i contacted",
|
||||
self.key.hex()[:8], len(self.active), len(self.contacted))
|
||||
|
||||
def __aiter__(self):
|
||||
if self.running:
|
||||
raise Exception("already running")
|
||||
self.running = True
|
||||
self._search()
|
||||
self.loop.call_soon(self._search_round)
|
||||
return self
|
||||
|
||||
async def __anext__(self) -> typing.List['KademliaPeer']:
|
||||
|
@ -252,20 +244,19 @@ class IterativeFinder:
|
|||
def aclose(self):
|
||||
self.running = False
|
||||
self.iteration_queue.put_nowait(None)
|
||||
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
|
||||
for task in chain(self.tasks, self.running_probes.values()):
|
||||
task.cancel()
|
||||
self.tasks.clear()
|
||||
self.running_probes.clear()
|
||||
self.delayed_calls.clear()
|
||||
|
||||
|
||||
class IterativeNodeFinder(IterativeFinder):
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
||||
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
|
||||
max_results: typing.Optional[int] = constants.K,
|
||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
|
||||
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
||||
shortlist)
|
||||
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
||||
|
||||
|
@ -276,14 +267,14 @@ class IterativeNodeFinder(IterativeFinder):
|
|||
return FindNodeResponse(self.key, response)
|
||||
|
||||
def search_exhausted(self):
|
||||
self.put_result(self.active, finish=True)
|
||||
self.put_result(self.active.keys(), finish=True)
|
||||
|
||||
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
|
||||
not_yet_yielded = [
|
||||
peer for peer in from_iter
|
||||
if peer not in self.yielded_peers
|
||||
and peer.node_id != self.protocol.node_id
|
||||
and self.peer_manager.peer_is_good(peer) is not False
|
||||
and self.peer_manager.peer_is_good(peer) is True # return only peers who answered
|
||||
]
|
||||
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
|
||||
to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
|
||||
|
@ -298,26 +289,16 @@ class IterativeNodeFinder(IterativeFinder):
|
|||
|
||||
if found:
|
||||
log.debug("found")
|
||||
return self.put_result(self.active, finish=True)
|
||||
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
|
||||
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
|
||||
# self.bottom_out_count, self.iteration_count)
|
||||
self.bottom_out_count = 0
|
||||
elif self.prev_closest_peer and self.closest_peer:
|
||||
self.bottom_out_count += 1
|
||||
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
|
||||
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
|
||||
log.info("limit hit")
|
||||
self.put_result(self.active, True)
|
||||
return self.put_result(self.active.keys(), finish=True)
|
||||
|
||||
|
||||
class IterativeValueFinder(IterativeFinder):
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
||||
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
|
||||
max_results: typing.Optional[int] = constants.K,
|
||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
|
||||
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
||||
shortlist)
|
||||
self.blob_peers: typing.Set['KademliaPeer'] = set()
|
||||
# this tracks the index of the most recent page we requested from each peer
|
||||
|
@ -362,23 +343,12 @@ class IterativeValueFinder(IterativeFinder):
|
|||
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
|
||||
for compact_addr in response.found_compact_addresses]
|
||||
to_yield = []
|
||||
self.bottom_out_count = 0
|
||||
for blob_peer in blob_peers:
|
||||
if blob_peer not in self.blob_peers:
|
||||
self.blob_peers.add(blob_peer)
|
||||
to_yield.append(blob_peer)
|
||||
if to_yield:
|
||||
# log.info("found %i new peers for blob", len(to_yield))
|
||||
self.iteration_queue.put_nowait(to_yield)
|
||||
# if self.max_results and len(self.blob_peers) >= self.max_results:
|
||||
# log.info("enough blob peers found")
|
||||
# if not self.finished.is_set():
|
||||
# self.finished.set()
|
||||
elif self.prev_closest_peer and self.closest_peer:
|
||||
self.bottom_out_count += 1
|
||||
if self.bottom_out_count >= self.bottom_out_limit:
|
||||
log.info("blob peer search bottomed out")
|
||||
self.iteration_queue.put_nowait(None)
|
||||
|
||||
def get_initial_result(self) -> typing.List['KademliaPeer']:
|
||||
if self.protocol.data_store.has_peers_for_blob(self.key):
|
||||
|
|
|
@ -6,6 +6,7 @@ import itertools
|
|||
|
||||
from prometheus_client import Gauge
|
||||
|
||||
from lbry import utils
|
||||
from lbry.dht import constants
|
||||
from lbry.dht.protocol.distance import Distance
|
||||
if typing.TYPE_CHECKING:
|
||||
|
@ -70,9 +71,8 @@ class KBucket:
|
|||
if len(self.peers) < constants.K:
|
||||
self.peers.append(peer)
|
||||
self.peer_in_routing_table_metric.labels("global").inc()
|
||||
if peer.node_id[0] == self._node_id[0]:
|
||||
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
|
||||
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc()
|
||||
bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
|
||||
self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).inc()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@ -140,9 +140,8 @@ class KBucket:
|
|||
def remove_peer(self, peer: 'KademliaPeer') -> None:
|
||||
self.peers.remove(peer)
|
||||
self.peer_in_routing_table_metric.labels("global").dec()
|
||||
if peer.node_id[0] == self._node_id[0]:
|
||||
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
|
||||
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec()
|
||||
bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
|
||||
self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).dec()
|
||||
|
||||
def key_in_range(self, key: bytes) -> bool:
|
||||
""" Tests whether the specified key (i.e. node ID) is in the range
|
||||
|
|
|
@ -381,6 +381,7 @@ class FileManagerComponent(Component):
|
|||
|
||||
|
||||
class BackgroundDownloaderComponent(Component):
|
||||
MIN_PREFIX_COLLIDING_BITS = 8
|
||||
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
||||
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
|
||||
|
||||
|
@ -412,12 +413,18 @@ class BackgroundDownloaderComponent(Component):
|
|||
while True:
|
||||
self.space_available = await self.space_manager.get_free_space_mb(True)
|
||||
if not self.is_busy and self.space_available > 10:
|
||||
blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if
|
||||
key.hex() not in self.blob_manager.completed_blob_hashes), None)
|
||||
if blob_hash:
|
||||
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
|
||||
self._download_next_close_blob_hash()
|
||||
await asyncio.sleep(self.download_loop_delay_seconds)
|
||||
|
||||
def _download_next_close_blob_hash(self):
|
||||
node_id = self.dht_node.protocol.node_id
|
||||
for blob_hash in self.dht_node.stored_blob_hashes:
|
||||
if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
|
||||
continue
|
||||
if utils.get_colliding_prefix_bits(node_id, blob_hash) >= self.MIN_PREFIX_COLLIDING_BITS:
|
||||
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
|
||||
return
|
||||
|
||||
async def start(self):
|
||||
self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
|
||||
if not self.component_manager.has_component(DHT_COMPONENT):
|
||||
|
|
|
@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
"""
|
||||
|
||||
@requires(DHT_COMPONENT)
|
||||
async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None):
|
||||
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
|
||||
"""
|
||||
Get peers for blob hash
|
||||
|
||||
Usage:
|
||||
peer_list (<blob_hash> | --blob_hash=<blob_hash>)
|
||||
[<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
|
||||
[--page=<page>] [--page_size=<page_size>]
|
||||
|
||||
Options:
|
||||
--blob_hash=<blob_hash> : (str) find available peers for this blob hash
|
||||
--search_bottom_out_limit=<search_bottom_out_limit> : (int) the number of search probes in a row
|
||||
that don't find any new peers
|
||||
before giving up and returning
|
||||
--page=<page> : (int) page to return during paginating
|
||||
--page_size=<page_size> : (int) number of items on page during pagination
|
||||
|
||||
|
@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
if not is_valid_blobhash(blob_hash):
|
||||
# TODO: use error from lbry.error
|
||||
raise Exception("invalid blob hash")
|
||||
if search_bottom_out_limit is not None:
|
||||
search_bottom_out_limit = int(search_bottom_out_limit)
|
||||
if search_bottom_out_limit <= 0:
|
||||
# TODO: use error from lbry.error
|
||||
raise Exception("invalid bottom out limit")
|
||||
else:
|
||||
search_bottom_out_limit = 4
|
||||
peers = []
|
||||
peer_q = asyncio.Queue(loop=self.component_manager.loop)
|
||||
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
|
||||
|
|
|
@ -204,7 +204,13 @@ class AsyncioTestCase(unittest.TestCase):
|
|||
|
||||
def add_timeout(self):
|
||||
if self.TIMEOUT:
|
||||
self.loop.call_later(self.TIMEOUT, self.cancel)
|
||||
self.loop.call_later(self.TIMEOUT, self.check_timeout, time())
|
||||
|
||||
def check_timeout(self, started):
|
||||
if time() - started >= self.TIMEOUT:
|
||||
self.cancel()
|
||||
else:
|
||||
self.loop.call_later(self.TIMEOUT, self.check_timeout, started)
|
||||
|
||||
|
||||
class AdvanceTimeTestCase(AsyncioTestCase):
|
||||
|
|
|
@ -474,3 +474,18 @@ class LockWithMetrics(asyncio.Lock):
|
|||
return super().release()
|
||||
finally:
|
||||
self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time)
|
||||
|
||||
|
||||
def get_colliding_prefix_bits(first_value: bytes, second_value: bytes):
|
||||
"""
|
||||
Calculates the amount of colliding prefix bits between <first_value> and <second_value>.
|
||||
This is given by the amount of bits that are the same until the first different one (via XOR),
|
||||
starting from the most significant bit to the least significant bit.
|
||||
:param first_value: first value to compare, bigger than size.
|
||||
:param second_value: second value to compare, bigger than size.
|
||||
:return: amount of prefix colliding bits.
|
||||
"""
|
||||
assert len(first_value) == len(second_value), "length should be the same"
|
||||
size = len(first_value) * 8
|
||||
first_value, second_value = int.from_bytes(first_value, "big"), int.from_bytes(second_value, "big")
|
||||
return size - (first_value ^ second_value).bit_length()
|
||||
|
|
|
@ -9,7 +9,7 @@ if typing.TYPE_CHECKING:
|
|||
|
||||
|
||||
def get_time_accelerator(loop: asyncio.AbstractEventLoop,
|
||||
now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]:
|
||||
instant_step: bool = False) -> typing.Callable[[float], typing.Awaitable[None]]:
|
||||
"""
|
||||
Returns an async advance() function
|
||||
|
||||
|
@ -17,32 +17,22 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop,
|
|||
made by call_later, call_at, and call_soon.
|
||||
"""
|
||||
|
||||
_time = now or loop.time()
|
||||
loop.time = functools.wraps(loop.time)(lambda: _time)
|
||||
original = loop.time
|
||||
_drift = 0
|
||||
loop.time = functools.wraps(loop.time)(lambda: original() + _drift)
|
||||
|
||||
async def accelerate_time(seconds: float) -> None:
|
||||
nonlocal _time
|
||||
nonlocal _drift
|
||||
if seconds < 0:
|
||||
raise ValueError(f'Cannot go back in time ({seconds} seconds)')
|
||||
_time += seconds
|
||||
await past_events()
|
||||
_drift += seconds
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def past_events() -> None:
|
||||
while loop._scheduled:
|
||||
timer: asyncio.TimerHandle = loop._scheduled[0]
|
||||
if timer not in loop._ready and timer._when <= _time:
|
||||
loop._scheduled.remove(timer)
|
||||
loop._ready.append(timer)
|
||||
if timer._when > _time:
|
||||
break
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def accelerator(seconds: float):
|
||||
steps = seconds * 10.0
|
||||
steps = seconds * 10.0 if not instant_step else 1
|
||||
|
||||
for _ in range(max(int(steps), 1)):
|
||||
await accelerate_time(0.1)
|
||||
await accelerate_time(seconds/steps)
|
||||
|
||||
return accelerator
|
||||
|
||||
|
|
|
@ -1,60 +1,68 @@
|
|||
import contextlib
|
||||
import logging
|
||||
import typing
|
||||
import binascii
|
||||
import socket
|
||||
import asyncio
|
||||
|
||||
from lbry.testcase import AsyncioTestCase
|
||||
from tests import dht_mocks
|
||||
from lbry.dht.protocol.distance import Distance
|
||||
from lbry.conf import Config
|
||||
from lbry.dht import constants
|
||||
from lbry.dht.node import Node
|
||||
from lbry.dht.peer import PeerManager, make_kademlia_peer
|
||||
from lbry.dht.blob_announcer import BlobAnnouncer
|
||||
from lbry.extras.daemon.storage import SQLiteStorage
|
||||
from unittest import skip
|
||||
|
||||
|
||||
class TestBlobAnnouncer(AsyncioTestCase):
|
||||
async def setup_node(self, peer_addresses, address, node_id):
|
||||
self.nodes: typing.Dict[int, Node] = {}
|
||||
self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time())
|
||||
self.advance = dht_mocks.get_time_accelerator(self.loop)
|
||||
self.instant_advance = dht_mocks.get_time_accelerator(self.loop)
|
||||
self.conf = Config()
|
||||
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
|
||||
await self.storage.open()
|
||||
self.peer_manager = PeerManager(self.loop)
|
||||
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
|
||||
await self.node.start_listening(address)
|
||||
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
|
||||
for node_id, address in peer_addresses:
|
||||
await self.add_peer(node_id, address)
|
||||
await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses])
|
||||
for first_peer in self.nodes.values():
|
||||
for second_peer in self.nodes.values():
|
||||
if first_peer == second_peer:
|
||||
continue
|
||||
self.add_peer_to_routing_table(first_peer, second_peer)
|
||||
self.add_peer_to_routing_table(second_peer, first_peer)
|
||||
await self.advance(0.1) # just to make pings go through
|
||||
self.node.joined.set()
|
||||
self.node._refresh_task = self.loop.create_task(self.node.refresh_node())
|
||||
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
|
||||
await self.storage.open()
|
||||
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
|
||||
|
||||
async def add_peer(self, node_id, address, add_to_routing_table=True):
|
||||
#print('add', node_id.hex()[:8], address)
|
||||
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
|
||||
await n.start_listening(address)
|
||||
self.nodes.update({len(self.nodes): n})
|
||||
if add_to_routing_table:
|
||||
self.node.protocol.add_peer(
|
||||
make_kademlia_peer(
|
||||
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port
|
||||
)
|
||||
self.add_peer_to_routing_table(self.node, n)
|
||||
|
||||
def add_peer_to_routing_table(self, adder, being_added):
|
||||
adder.protocol.add_peer(
|
||||
make_kademlia_peer(
|
||||
being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port
|
||||
)
|
||||
)
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def _test_network_context(self, peer_addresses=None):
|
||||
self.peer_addresses = peer_addresses or [
|
||||
(constants.generate_id(2), '1.2.3.2'),
|
||||
(constants.generate_id(3), '1.2.3.3'),
|
||||
(constants.generate_id(4), '1.2.3.4'),
|
||||
(constants.generate_id(5), '1.2.3.5'),
|
||||
(constants.generate_id(6), '1.2.3.6'),
|
||||
(constants.generate_id(7), '1.2.3.7'),
|
||||
(constants.generate_id(8), '1.2.3.8'),
|
||||
(constants.generate_id(9), '1.2.3.9'),
|
||||
async def _test_network_context(self, peer_count=200):
|
||||
self.peer_addresses = [
|
||||
(constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
|
||||
for i in range(1, peer_count + 1)
|
||||
]
|
||||
try:
|
||||
with dht_mocks.mock_network_loop(self.loop):
|
||||
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1))
|
||||
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000))
|
||||
yield
|
||||
finally:
|
||||
self.blob_announcer.stop()
|
||||
|
@ -73,43 +81,58 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
|||
)
|
||||
)
|
||||
await peer.ping()
|
||||
return peer
|
||||
return last_node
|
||||
|
||||
@skip("Something from a previous test is leaking into this test and causing it to fail intermittently")
|
||||
async def test_announce_blobs(self):
|
||||
blob1 = binascii.hexlify(b'1' * 48).decode()
|
||||
blob2 = binascii.hexlify(b'2' * 48).decode()
|
||||
|
||||
async with self._test_network_context():
|
||||
await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True)
|
||||
await self.storage.db.execute(
|
||||
"update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)",
|
||||
(blob1, blob2)
|
||||
)
|
||||
async with self._test_network_context(peer_count=100):
|
||||
await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True)
|
||||
await self.storage.add_blobs(
|
||||
*((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)),
|
||||
finished=True)
|
||||
await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1")
|
||||
to_announce = await self.storage.get_blobs_to_announce()
|
||||
self.assertEqual(2, len(to_announce))
|
||||
self.blob_announcer.start(batch_size=1) # so it covers batching logic
|
||||
self.assertEqual(92, len(to_announce))
|
||||
self.blob_announcer.start(batch_size=10) # so it covers batching logic
|
||||
# takes 60 seconds to start, but we advance 120 to ensure it processed all batches
|
||||
await self.advance(60.0 * 2)
|
||||
ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait())
|
||||
await self.instant_advance(60.0)
|
||||
await ongoing_announcements
|
||||
to_announce = await self.storage.get_blobs_to_announce()
|
||||
self.assertEqual(0, len(to_announce))
|
||||
self.blob_announcer.stop()
|
||||
|
||||
# as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI
|
||||
tolerance = 0.8 # at least 80% of the announcements are within the top K
|
||||
for blob in await self.storage.get_all_blob_hashes():
|
||||
distance = Distance(bytes.fromhex(blob))
|
||||
candidates = list(self.nodes.values())
|
||||
candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id))
|
||||
has_it = 0
|
||||
for index, node in enumerate(candidates[:constants.K], start=1):
|
||||
if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)):
|
||||
has_it += 1
|
||||
else:
|
||||
logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8])
|
||||
self.assertGreaterEqual(has_it, int(tolerance * constants.K))
|
||||
|
||||
|
||||
# test that we can route from a poorly connected peer all the way to the announced blob
|
||||
|
||||
await self.chain_peer(constants.generate_id(10), '1.2.3.10')
|
||||
await self.chain_peer(constants.generate_id(11), '1.2.3.11')
|
||||
await self.chain_peer(constants.generate_id(12), '1.2.3.12')
|
||||
await self.chain_peer(constants.generate_id(13), '1.2.3.13')
|
||||
await self.chain_peer(constants.generate_id(14), '1.2.3.14')
|
||||
await self.advance(61.0)
|
||||
current = len(self.nodes)
|
||||
await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10')
|
||||
await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11')
|
||||
await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12')
|
||||
await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13')
|
||||
last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14')
|
||||
|
||||
last = self.nodes[len(self.nodes) - 1]
|
||||
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
|
||||
search_q.put_nowait(blob1)
|
||||
|
||||
_, task = last.accumulate_peers(search_q, peer_q)
|
||||
found_peers = await peer_q.get()
|
||||
found_peers = await asyncio.wait_for(peer_q.get(), 1.0)
|
||||
task.cancel()
|
||||
|
||||
self.assertEqual(1, len(found_peers))
|
||||
|
@ -119,21 +142,13 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
|||
|
||||
async def test_popular_blob(self):
|
||||
peer_count = 150
|
||||
addresses = [
|
||||
(constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
|
||||
for i in range(peer_count)
|
||||
]
|
||||
blob_hash = b'1' * 48
|
||||
blob_hash = constants.generate_id(99999)
|
||||
|
||||
async with self._test_network_context(peer_addresses=addresses):
|
||||
async with self._test_network_context(peer_count=peer_count):
|
||||
total_seen = set()
|
||||
announced_to = self.nodes[0]
|
||||
for i in range(1, peer_count):
|
||||
node = self.nodes[i]
|
||||
kad_peer = make_kademlia_peer(
|
||||
node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port
|
||||
)
|
||||
await announced_to.protocol._add_peer(kad_peer)
|
||||
announced_to = self.nodes.pop(0)
|
||||
for i, node in enumerate(self.nodes.values()):
|
||||
self.add_peer_to_routing_table(announced_to, node)
|
||||
peer = node.protocol.get_rpc_peer(
|
||||
make_kademlia_peer(
|
||||
announced_to.protocol.node_id,
|
||||
|
@ -144,15 +159,15 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
|||
response = await peer.store(blob_hash)
|
||||
self.assertEqual(response, b'OK')
|
||||
peers_for_blob = await peer.find_value(blob_hash, 0)
|
||||
if i == 1:
|
||||
if i == 0:
|
||||
self.assertNotIn(blob_hash, peers_for_blob)
|
||||
self.assertEqual(peers_for_blob[b'p'], 0)
|
||||
else:
|
||||
self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K))
|
||||
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i)
|
||||
self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K))
|
||||
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1)
|
||||
if i - 1 > constants.K:
|
||||
self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
|
||||
self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1)
|
||||
self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1)
|
||||
seen = set(peers_for_blob[blob_hash])
|
||||
self.assertEqual(len(seen), constants.K)
|
||||
self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
|
||||
|
@ -167,5 +182,5 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
|||
seen.intersection_update(page_x_set)
|
||||
total_seen.update(page_x_set)
|
||||
else:
|
||||
self.assertEqual(len(peers_for_blob[b'contacts']), i - 1)
|
||||
self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page
|
||||
self.assertEqual(len(total_seen), peer_count - 2)
|
||||
|
|
|
@ -12,7 +12,6 @@ from lbry.extras.daemon.storage import SQLiteStorage
|
|||
|
||||
|
||||
class TestNodePingQueueDiscover(AsyncioTestCase):
|
||||
TIMEOUT = None # not supported as it advances time
|
||||
async def test_ping_queue_discover(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.set_debug(False)
|
||||
|
@ -29,7 +28,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
|
|||
(constants.generate_id(9), '1.2.3.9'),
|
||||
]
|
||||
with dht_mocks.mock_network_loop(loop):
|
||||
advance = dht_mocks.get_time_accelerator(loop, loop.time())
|
||||
advance = dht_mocks.get_time_accelerator(loop)
|
||||
# start the nodes
|
||||
nodes: typing.Dict[int, Node] = {
|
||||
i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
|
||||
|
@ -93,7 +92,6 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
|
|||
|
||||
|
||||
class TestTemporarilyLosingConnection(AsyncioTestCase):
|
||||
TIMEOUT = None # not supported as it advances time
|
||||
@unittest.SkipTest
|
||||
async def test_losing_connection(self):
|
||||
async def wait_for(check_ok, insist, timeout=20):
|
||||
|
@ -131,7 +129,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase):
|
|||
await asyncio.gather(*[n.joined.wait() for n in nodes])
|
||||
|
||||
node = nodes[-1]
|
||||
advance = dht_mocks.get_time_accelerator(loop, loop.time())
|
||||
advance = dht_mocks.get_time_accelerator(loop)
|
||||
await advance(500)
|
||||
|
||||
# Join the network, assert that at least the known peers are in RT
|
||||
|
|
19
tests/unit/test_utils.py
Normal file
19
tests/unit/test_utils.py
Normal file
|
@ -0,0 +1,19 @@
|
|||
import unittest
|
||||
from lbry import utils
|
||||
|
||||
|
||||
class UtilsTestCase(unittest.TestCase):
|
||||
|
||||
def test_get_colliding_prefix_bits(self):
|
||||
self.assertEqual(
|
||||
0, utils.get_colliding_prefix_bits(0xffffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
|
||||
self.assertEqual(
|
||||
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
|
||||
self.assertEqual(
|
||||
8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
|
||||
self.assertEqual(
|
||||
8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
|
||||
self.assertEqual(
|
||||
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
|
||||
self.assertEqual(
|
||||
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
|
Loading…
Reference in a new issue