Merge remote-tracking branch 'origin/dht_bug' into temp_dht_supermerge

This commit is contained in:
Victor Shyba 2022-02-22 22:38:51 -03:00
commit f4fa217b71
11 changed files with 171 additions and 197 deletions

View file

@ -622,7 +622,7 @@ class Config(CLIConfig):
"Routing table bucket index below which we always split the bucket if given a new key to add to it and " "Routing table bucket index below which we always split the bucket if given a new key to add to it and "
"the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) " "the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal " "will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
"use.", 1 "use.", 2
) )
# protocol timeouts # protocol timeouts

View file

@ -27,21 +27,24 @@ class BlobAnnouncer:
self.storage = storage self.storage = storage
self.announce_task: asyncio.Task = None self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = [] self.announce_queue: typing.List[str] = []
self._done = asyncio.Event()
self.announced = set()
async def _submit_announcement(self, blob_hash): async def _run_consumer(self):
try: while self.announce_queue:
try:
peers = len(await self.node.announce_blob(blob_hash)) blob_hash = self.announce_queue.pop()
self.announcements_sent_metric.labels(peers=peers, error=False).inc() peers = len(await self.node.announce_blob(blob_hash))
if peers > 4: self.announcements_sent_metric.labels(peers=peers, error=False).inc()
return blob_hash if peers > 4:
else: self.announced.add(blob_hash)
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) else:
except Exception as err: log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
self.announcements_sent_metric.labels(peers=0, error=True).inc() except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 self.announcements_sent_metric.labels(peers=0, error=True).inc()
raise err if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
log.warning("error announcing %s: %s", blob_hash[:8], str(err)) raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10): async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size: while batch_size:
@ -56,14 +59,14 @@ class BlobAnnouncer:
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0: while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue)) log.info("%i blobs to announce", len(self.announce_queue))
announced = await asyncio.gather(*[ await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
self._submit_announcement( announced = list(filter(None, self.announced))
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
], loop=self.loop)
announced = list(filter(None, announced))
if announced: if announced:
await self.storage.update_last_announced_blobs(announced) await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced)) log.info("announced %i blobs", len(announced))
self.announced.clear()
self._done.set()
self._done.clear()
def start(self, batch_size: typing.Optional[int] = 10): def start(self, batch_size: typing.Optional[int] = 10):
assert not self.announce_task or self.announce_task.done(), "already running" assert not self.announce_task or self.announce_task.done(), "already running"
@ -72,3 +75,6 @@ class BlobAnnouncer:
def stop(self): def stop(self):
if self.announce_task and not self.announce_task.done(): if self.announce_task and not self.announce_task.done():
self.announce_task.cancel() self.announce_task.cancel()
def wait(self):
return self._done.wait()

View file

@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300 # 5 minutes
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5 CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
RPC_ID_LENGTH = 20 RPC_ID_LENGTH = 20
PROTOCOL_VERSION = 1 PROTOCOL_VERSION = 1
BOTTOM_OUT_LIMIT = 3
MSG_SIZE_LIMIT = 1400 MSG_SIZE_LIMIT = 1400

View file

@ -202,25 +202,23 @@ class Node:
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.K) -> IterativeNodeFinder: max_results: int = constants.K) -> IterativeNodeFinder:
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist) key, max_results, None, shortlist)
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = 40,
max_results: int = -1) -> IterativeValueFinder: max_results: int = -1) -> IterativeValueFinder:
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist) key, max_results, None, shortlist)
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2, async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']: ) -> typing.List['KademliaPeer']:
peers = [] peers = []
async for iteration_peers in self.get_iterative_node_finder( async for iteration_peers in self.get_iterative_node_finder(
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results): node_id, shortlist=shortlist, max_results=max_results):
peers.extend(iteration_peers) peers.extend(iteration_peers)
distance = Distance(node_id) distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id)) peers.sort(key=lambda peer: distance(peer.node_id))

View file

@ -190,3 +190,6 @@ class KademliaPeer:
def compact_ip(self): def compact_ip(self):
return make_compact_ip(self.address) return make_compact_ip(self.address)
def __str__(self):
return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})"

View file

@ -1,6 +1,6 @@
import asyncio import asyncio
from itertools import chain from itertools import chain
from collections import defaultdict from collections import defaultdict, OrderedDict
import typing import typing
import logging import logging
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder: class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH: if len(key) != constants.HASH_LENGTH:
@ -85,28 +85,22 @@ class IterativeFinder:
self.protocol = protocol self.protocol = protocol
self.key = key self.key = key
self.bottom_out_limit = bottom_out_limit self.max_results = max(constants.K, max_results)
self.max_results = max_results
self.exclude = exclude or [] self.exclude = exclude or []
self.active: typing.Set['KademliaPeer'] = set() self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set() self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key) self.distance = Distance(key)
self.closest_peer: typing.Optional['KademliaPeer'] = None
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
self.iteration_queue = asyncio.Queue(loop=self.loop) self.iteration_queue = asyncio.Queue(loop=self.loop)
self.running_probes: typing.Set[asyncio.Task] = set() self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0 self.iteration_count = 0
self.bottom_out_count = 0
self.running = False self.running = False
self.tasks: typing.List[asyncio.Task] = [] self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
for peer in get_shortlist(routing_table, key, shortlist): for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id: if peer.node_id:
self._add_active(peer) self._add_active(peer, force=True)
else: else:
# seed nodes # seed nodes
self._schedule_probe(peer) self._schedule_probe(peer)
@ -138,15 +132,14 @@ class IterativeFinder:
""" """
return [] return []
def _is_closer(self, peer: 'KademliaPeer') -> bool: def _add_active(self, peer, force=False):
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) if not force and self.peer_manager.peer_is_good(peer) is False:
return
def _add_active(self, peer): if peer in self.contacted:
return
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer) self.active[peer] = self.distance(peer.node_id)
if self._is_closer(peer): self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer) self._add_active(peer)
@ -158,33 +151,43 @@ class IterativeFinder:
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
peer.udp_port, address, udp_port) peer.udp_port, address, udp_port)
self.check_result_ready(response) self.check_result_ready(response)
self._log_state()
def _reset_closest(self, peer):
if peer in self.active:
del self.active[peer]
async def _send_probe(self, peer: 'KademliaPeer'): async def _send_probe(self, peer: 'KademliaPeer'):
try: try:
response = await self.send_probe(peer) response = await self.send_probe(peer)
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.active.discard(peer) self._reset_closest(peer)
return return
except ValueError as err: except ValueError as err:
log.warning(str(err)) log.warning(str(err))
self.active.discard(peer) self._reset_closest(peer)
return return
except TransportNotConnected: except TransportNotConnected:
return self.aclose() return self.aclose()
except RemoteException: except RemoteException:
self._reset_closest(peer)
return return
return await self._handle_probe_result(peer, response) return await self._handle_probe_result(peer, response)
async def _search_round(self): def _search_round(self):
""" """
Send up to constants.alpha (5) probes to closest active peers Send up to constants.alpha (5) probes to closest active peers
""" """
added = 0 added = 0
to_probe = list(self.active - self.contacted) for index, peer in enumerate(self.active.keys()):
to_probe.sort(key=lambda peer: self.distance(self.key)) if index == 0:
for peer in to_probe: log.debug("closest to probe: %s", peer.node_id.hex()[:8])
if added >= constants.ALPHA: if peer in self.contacted:
continue
if len(self.running_probes) >= constants.ALPHA:
break
if index > (constants.K + len(self.running_probes)):
break break
origin_address = (peer.address, peer.udp_port) origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude: if origin_address in self.exclude:
@ -206,33 +209,22 @@ class IterativeFinder:
t = self.loop.create_task(self._send_probe(peer)) t = self.loop.create_task(self._send_probe(peer))
def callback(_): def callback(_):
self.running_probes.difference_update({ self.running_probes.pop(peer, None)
probe for probe in self.running_probes if probe.done() or probe == t if self.running:
}) self._search_round()
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
t.add_done_callback(callback) t.add_done_callback(callback)
self.running_probes.add(t) self.running_probes[peer] = t
async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): def _log_state(self):
try: log.debug("[%s] check result: %i active nodes %i contacted",
if self.running: self.key.hex()[:8], len(self.active), len(self.contacted))
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
if self.running:
self.loop.call_soon(self.aclose)
def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
def __aiter__(self): def __aiter__(self):
if self.running: if self.running:
raise Exception("already running") raise Exception("already running")
self.running = True self.running = True
self._search() self.loop.call_soon(self._search_round)
return self return self
async def __anext__(self) -> typing.List['KademliaPeer']: async def __anext__(self) -> typing.List['KademliaPeer']:
@ -252,20 +244,19 @@ class IterativeFinder:
def aclose(self): def aclose(self):
self.running = False self.running = False
self.iteration_queue.put_nowait(None) self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls): for task in chain(self.tasks, self.running_probes.values()):
task.cancel() task.cancel()
self.tasks.clear() self.tasks.clear()
self.running_probes.clear() self.running_probes.clear()
self.delayed_calls.clear()
class IterativeNodeFinder(IterativeFinder): class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist) shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set() self.yielded_peers: typing.Set['KademliaPeer'] = set()
@ -276,14 +267,14 @@ class IterativeNodeFinder(IterativeFinder):
return FindNodeResponse(self.key, response) return FindNodeResponse(self.key, response)
def search_exhausted(self): def search_exhausted(self):
self.put_result(self.active, finish=True) self.put_result(self.active.keys(), finish=True)
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [ not_yet_yielded = [
peer for peer in from_iter peer for peer in from_iter
if peer not in self.yielded_peers if peer not in self.yielded_peers
and peer.node_id != self.protocol.node_id and peer.node_id != self.protocol.node_id
and self.peer_manager.peer_is_good(peer) is not False and self.peer_manager.peer_is_good(peer) is True # return only peers who answered
] ]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:max(constants.K, self.max_results)] to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
@ -298,26 +289,16 @@ class IterativeNodeFinder(IterativeFinder):
if found: if found:
log.debug("found") log.debug("found")
return self.put_result(self.active, finish=True) return self.put_result(self.active.keys(), finish=True)
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
# self.bottom_out_count, self.iteration_count)
self.bottom_out_count = 0
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("limit hit")
self.put_result(self.active, True)
class IterativeValueFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist) shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set() self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer # this tracks the index of the most recent page we requested from each peer
@ -362,23 +343,12 @@ class IterativeValueFinder(IterativeFinder):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses] for compact_addr in response.found_compact_addresses]
to_yield = [] to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers: for blob_peer in blob_peers:
if blob_peer not in self.blob_peers: if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer) self.blob_peers.add(blob_peer)
to_yield.append(blob_peer) to_yield.append(blob_peer)
if to_yield: if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield) self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
self.iteration_queue.put_nowait(None)
def get_initial_result(self) -> typing.List['KademliaPeer']: def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key): if self.protocol.data_store.has_peers_for_blob(self.key):

View file

@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType):
""" """
@requires(DHT_COMPONENT) @requires(DHT_COMPONENT)
async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None): async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
""" """
Get peers for blob hash Get peers for blob hash
Usage: Usage:
peer_list (<blob_hash> | --blob_hash=<blob_hash>) peer_list (<blob_hash> | --blob_hash=<blob_hash>)
[<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
[--page=<page>] [--page_size=<page_size>] [--page=<page>] [--page_size=<page_size>]
Options: Options:
--blob_hash=<blob_hash> : (str) find available peers for this blob hash --blob_hash=<blob_hash> : (str) find available peers for this blob hash
--search_bottom_out_limit=<search_bottom_out_limit> : (int) the number of search probes in a row
that don't find any new peers
before giving up and returning
--page=<page> : (int) page to return during paginating --page=<page> : (int) page to return during paginating
--page_size=<page_size> : (int) number of items on page during pagination --page_size=<page_size> : (int) number of items on page during pagination
@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType):
if not is_valid_blobhash(blob_hash): if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error # TODO: use error from lbry.error
raise Exception("invalid blob hash") raise Exception("invalid blob hash")
if search_bottom_out_limit is not None:
search_bottom_out_limit = int(search_bottom_out_limit)
if search_bottom_out_limit <= 0:
# TODO: use error from lbry.error
raise Exception("invalid bottom out limit")
else:
search_bottom_out_limit = 4
peers = [] peers = []
peer_q = asyncio.Queue(loop=self.component_manager.loop) peer_q = asyncio.Queue(loop=self.component_manager.loop)
await self.dht_node._peers_for_value_producer(blob_hash, peer_q) await self.dht_node._peers_for_value_producer(blob_hash, peer_q)

View file

@ -204,7 +204,13 @@ class AsyncioTestCase(unittest.TestCase):
def add_timeout(self): def add_timeout(self):
if self.TIMEOUT: if self.TIMEOUT:
self.loop.call_later(self.TIMEOUT, self.cancel) self.loop.call_later(self.TIMEOUT, self.check_timeout, time())
def check_timeout(self, started):
if time() - started >= self.TIMEOUT:
self.cancel()
else:
self.loop.call_later(self.TIMEOUT, self.check_timeout, started)
class AdvanceTimeTestCase(AsyncioTestCase): class AdvanceTimeTestCase(AsyncioTestCase):

View file

@ -9,7 +9,7 @@ if typing.TYPE_CHECKING:
def get_time_accelerator(loop: asyncio.AbstractEventLoop, def get_time_accelerator(loop: asyncio.AbstractEventLoop,
now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]: instant_step: bool = False) -> typing.Callable[[float], typing.Awaitable[None]]:
""" """
Returns an async advance() function Returns an async advance() function
@ -17,32 +17,22 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop,
made by call_later, call_at, and call_soon. made by call_later, call_at, and call_soon.
""" """
_time = now or loop.time() original = loop.time
loop.time = functools.wraps(loop.time)(lambda: _time) _drift = 0
loop.time = functools.wraps(loop.time)(lambda: original() + _drift)
async def accelerate_time(seconds: float) -> None: async def accelerate_time(seconds: float) -> None:
nonlocal _time nonlocal _drift
if seconds < 0: if seconds < 0:
raise ValueError(f'Cannot go back in time ({seconds} seconds)') raise ValueError(f'Cannot go back in time ({seconds} seconds)')
_time += seconds _drift += seconds
await past_events()
await asyncio.sleep(0) await asyncio.sleep(0)
async def past_events() -> None:
while loop._scheduled:
timer: asyncio.TimerHandle = loop._scheduled[0]
if timer not in loop._ready and timer._when <= _time:
loop._scheduled.remove(timer)
loop._ready.append(timer)
if timer._when > _time:
break
await asyncio.sleep(0)
async def accelerator(seconds: float): async def accelerator(seconds: float):
steps = seconds * 10.0 steps = seconds * 10.0 if not instant_step else 1
for _ in range(max(int(steps), 1)): for _ in range(max(int(steps), 1)):
await accelerate_time(0.1) await accelerate_time(seconds/steps)
return accelerator return accelerator

View file

@ -1,60 +1,68 @@
import contextlib import contextlib
import logging
import typing import typing
import binascii import binascii
import socket import socket
import asyncio import asyncio
from lbry.testcase import AsyncioTestCase from lbry.testcase import AsyncioTestCase
from tests import dht_mocks from tests import dht_mocks
from lbry.dht.protocol.distance import Distance
from lbry.conf import Config from lbry.conf import Config
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht.peer import PeerManager, make_kademlia_peer
from lbry.dht.blob_announcer import BlobAnnouncer from lbry.dht.blob_announcer import BlobAnnouncer
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from unittest import skip
class TestBlobAnnouncer(AsyncioTestCase): class TestBlobAnnouncer(AsyncioTestCase):
async def setup_node(self, peer_addresses, address, node_id): async def setup_node(self, peer_addresses, address, node_id):
self.nodes: typing.Dict[int, Node] = {} self.nodes: typing.Dict[int, Node] = {}
self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time()) self.advance = dht_mocks.get_time_accelerator(self.loop)
self.instant_advance = dht_mocks.get_time_accelerator(self.loop)
self.conf = Config() self.conf = Config()
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
await self.storage.open()
self.peer_manager = PeerManager(self.loop) self.peer_manager = PeerManager(self.loop)
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
await self.node.start_listening(address) await self.node.start_listening(address)
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses])
for node_id, address in peer_addresses: for first_peer in self.nodes.values():
await self.add_peer(node_id, address) for second_peer in self.nodes.values():
if first_peer == second_peer:
continue
self.add_peer_to_routing_table(first_peer, second_peer)
self.add_peer_to_routing_table(second_peer, first_peer)
await self.advance(0.1) # just to make pings go through
self.node.joined.set() self.node.joined.set()
self.node._refresh_task = self.loop.create_task(self.node.refresh_node()) self.node._refresh_task = self.loop.create_task(self.node.refresh_node())
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
await self.storage.open()
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
async def add_peer(self, node_id, address, add_to_routing_table=True): async def add_peer(self, node_id, address, add_to_routing_table=True):
#print('add', node_id.hex()[:8], address)
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
await n.start_listening(address) await n.start_listening(address)
self.nodes.update({len(self.nodes): n}) self.nodes.update({len(self.nodes): n})
if add_to_routing_table: if add_to_routing_table:
self.node.protocol.add_peer( self.add_peer_to_routing_table(self.node, n)
make_kademlia_peer(
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port def add_peer_to_routing_table(self, adder, being_added):
) adder.protocol.add_peer(
make_kademlia_peer(
being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port
) )
)
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def _test_network_context(self, peer_addresses=None): async def _test_network_context(self, peer_count=200):
self.peer_addresses = peer_addresses or [ self.peer_addresses = [
(constants.generate_id(2), '1.2.3.2'), (constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
(constants.generate_id(3), '1.2.3.3'), for i in range(1, peer_count + 1)
(constants.generate_id(4), '1.2.3.4'),
(constants.generate_id(5), '1.2.3.5'),
(constants.generate_id(6), '1.2.3.6'),
(constants.generate_id(7), '1.2.3.7'),
(constants.generate_id(8), '1.2.3.8'),
(constants.generate_id(9), '1.2.3.9'),
] ]
try: try:
with dht_mocks.mock_network_loop(self.loop): with dht_mocks.mock_network_loop(self.loop):
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1)) await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000))
yield yield
finally: finally:
self.blob_announcer.stop() self.blob_announcer.stop()
@ -73,43 +81,58 @@ class TestBlobAnnouncer(AsyncioTestCase):
) )
) )
await peer.ping() await peer.ping()
return peer return last_node
@skip("Something from a previous test is leaking into this test and causing it to fail intermittently")
async def test_announce_blobs(self): async def test_announce_blobs(self):
blob1 = binascii.hexlify(b'1' * 48).decode() blob1 = binascii.hexlify(b'1' * 48).decode()
blob2 = binascii.hexlify(b'2' * 48).decode() blob2 = binascii.hexlify(b'2' * 48).decode()
async with self._test_network_context(): async with self._test_network_context(peer_count=100):
await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True) await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True)
await self.storage.db.execute( await self.storage.add_blobs(
"update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)", *((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)),
(blob1, blob2) finished=True)
) await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1")
to_announce = await self.storage.get_blobs_to_announce() to_announce = await self.storage.get_blobs_to_announce()
self.assertEqual(2, len(to_announce)) self.assertEqual(92, len(to_announce))
self.blob_announcer.start(batch_size=1) # so it covers batching logic self.blob_announcer.start(batch_size=10) # so it covers batching logic
# takes 60 seconds to start, but we advance 120 to ensure it processed all batches # takes 60 seconds to start, but we advance 120 to ensure it processed all batches
await self.advance(60.0 * 2) ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait())
await self.instant_advance(60.0)
await ongoing_announcements
to_announce = await self.storage.get_blobs_to_announce() to_announce = await self.storage.get_blobs_to_announce()
self.assertEqual(0, len(to_announce)) self.assertEqual(0, len(to_announce))
self.blob_announcer.stop() self.blob_announcer.stop()
# as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI
tolerance = 0.8 # at least 80% of the announcements are within the top K
for blob in await self.storage.get_all_blob_hashes():
distance = Distance(bytes.fromhex(blob))
candidates = list(self.nodes.values())
candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id))
has_it = 0
for index, node in enumerate(candidates[:constants.K], start=1):
if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)):
has_it += 1
else:
logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8])
self.assertGreaterEqual(has_it, int(tolerance * constants.K))
# test that we can route from a poorly connected peer all the way to the announced blob # test that we can route from a poorly connected peer all the way to the announced blob
await self.chain_peer(constants.generate_id(10), '1.2.3.10') current = len(self.nodes)
await self.chain_peer(constants.generate_id(11), '1.2.3.11') await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10')
await self.chain_peer(constants.generate_id(12), '1.2.3.12') await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11')
await self.chain_peer(constants.generate_id(13), '1.2.3.13') await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12')
await self.chain_peer(constants.generate_id(14), '1.2.3.14') await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13')
await self.advance(61.0) last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14')
last = self.nodes[len(self.nodes) - 1]
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop) search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
search_q.put_nowait(blob1) search_q.put_nowait(blob1)
_, task = last.accumulate_peers(search_q, peer_q) _, task = last.accumulate_peers(search_q, peer_q)
found_peers = await peer_q.get() found_peers = await asyncio.wait_for(peer_q.get(), 1.0)
task.cancel() task.cancel()
self.assertEqual(1, len(found_peers)) self.assertEqual(1, len(found_peers))
@ -119,21 +142,13 @@ class TestBlobAnnouncer(AsyncioTestCase):
async def test_popular_blob(self): async def test_popular_blob(self):
peer_count = 150 peer_count = 150
addresses = [ blob_hash = constants.generate_id(99999)
(constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
for i in range(peer_count)
]
blob_hash = b'1' * 48
async with self._test_network_context(peer_addresses=addresses): async with self._test_network_context(peer_count=peer_count):
total_seen = set() total_seen = set()
announced_to = self.nodes[0] announced_to = self.nodes.pop(0)
for i in range(1, peer_count): for i, node in enumerate(self.nodes.values()):
node = self.nodes[i] self.add_peer_to_routing_table(announced_to, node)
kad_peer = make_kademlia_peer(
node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port
)
await announced_to.protocol._add_peer(kad_peer)
peer = node.protocol.get_rpc_peer( peer = node.protocol.get_rpc_peer(
make_kademlia_peer( make_kademlia_peer(
announced_to.protocol.node_id, announced_to.protocol.node_id,
@ -144,15 +159,15 @@ class TestBlobAnnouncer(AsyncioTestCase):
response = await peer.store(blob_hash) response = await peer.store(blob_hash)
self.assertEqual(response, b'OK') self.assertEqual(response, b'OK')
peers_for_blob = await peer.find_value(blob_hash, 0) peers_for_blob = await peer.find_value(blob_hash, 0)
if i == 1: if i == 0:
self.assertNotIn(blob_hash, peers_for_blob) self.assertNotIn(blob_hash, peers_for_blob)
self.assertEqual(peers_for_blob[b'p'], 0) self.assertEqual(peers_for_blob[b'p'], 0)
else: else:
self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K)) self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K))
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i) self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1)
if i - 1 > constants.K: if i - 1 > constants.K:
self.assertEqual(len(peers_for_blob[b'contacts']), constants.K) self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1) self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1)
seen = set(peers_for_blob[blob_hash]) seen = set(peers_for_blob[blob_hash])
self.assertEqual(len(seen), constants.K) self.assertEqual(len(seen), constants.K)
self.assertEqual(len(peers_for_blob[blob_hash]), len(seen)) self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
@ -167,5 +182,5 @@ class TestBlobAnnouncer(AsyncioTestCase):
seen.intersection_update(page_x_set) seen.intersection_update(page_x_set)
total_seen.update(page_x_set) total_seen.update(page_x_set)
else: else:
self.assertEqual(len(peers_for_blob[b'contacts']), i - 1) self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page
self.assertEqual(len(total_seen), peer_count - 2) self.assertEqual(len(total_seen), peer_count - 2)

View file

@ -12,7 +12,6 @@ from lbry.extras.daemon.storage import SQLiteStorage
class TestNodePingQueueDiscover(AsyncioTestCase): class TestNodePingQueueDiscover(AsyncioTestCase):
TIMEOUT = None # not supported as it advances time
async def test_ping_queue_discover(self): async def test_ping_queue_discover(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.set_debug(False) loop.set_debug(False)
@ -29,7 +28,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
(constants.generate_id(9), '1.2.3.9'), (constants.generate_id(9), '1.2.3.9'),
] ]
with dht_mocks.mock_network_loop(loop): with dht_mocks.mock_network_loop(loop):
advance = dht_mocks.get_time_accelerator(loop, loop.time()) advance = dht_mocks.get_time_accelerator(loop)
# start the nodes # start the nodes
nodes: typing.Dict[int, Node] = { nodes: typing.Dict[int, Node] = {
i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address) i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
@ -93,7 +92,6 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
class TestTemporarilyLosingConnection(AsyncioTestCase): class TestTemporarilyLosingConnection(AsyncioTestCase):
TIMEOUT = None # not supported as it advances time
@unittest.SkipTest @unittest.SkipTest
async def test_losing_connection(self): async def test_losing_connection(self):
async def wait_for(check_ok, insist, timeout=20): async def wait_for(check_ok, insist, timeout=20):
@ -131,7 +129,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase):
await asyncio.gather(*[n.joined.wait() for n in nodes]) await asyncio.gather(*[n.joined.wait() for n in nodes])
node = nodes[-1] node = nodes[-1]
advance = dht_mocks.get_time_accelerator(loop, loop.time()) advance = dht_mocks.get_time_accelerator(loop)
await advance(500) await advance(500)
# Join the network, assert that at least the known peers are in RT # Join the network, assert that at least the known peers are in RT