diff --git a/lbry/conf.py b/lbry/conf.py index 234a1709e..15fe5f8b6 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -622,7 +622,7 @@ class Config(CLIConfig): "Routing table bucket index below which we always split the bucket if given a new key to add to it and " "the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) " "will increase. This setting is used by seed nodes, you probably don't want to change it during normal " - "use.", 1 + "use.", 2 ) # protocol timeouts diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py index 24cf18bbe..9629e06b6 100644 --- a/lbry/dht/blob_announcer.py +++ b/lbry/dht/blob_announcer.py @@ -27,21 +27,24 @@ class BlobAnnouncer: self.storage = storage self.announce_task: asyncio.Task = None self.announce_queue: typing.List[str] = [] + self._done = asyncio.Event() + self.announced = set() - async def _submit_announcement(self, blob_hash): - try: - - peers = len(await self.node.announce_blob(blob_hash)) - self.announcements_sent_metric.labels(peers=peers, error=False).inc() - if peers > 4: - return blob_hash - else: - log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) - except Exception as err: - self.announcements_sent_metric.labels(peers=0, error=True).inc() - if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 - raise err - log.warning("error announcing %s: %s", blob_hash[:8], str(err)) + async def _run_consumer(self): + while self.announce_queue: + try: + blob_hash = self.announce_queue.pop() + peers = len(await self.node.announce_blob(blob_hash)) + self.announcements_sent_metric.labels(peers=peers, error=False).inc() + if peers > 4: + self.announced.add(blob_hash) + else: + log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) + except Exception as err: + self.announcements_sent_metric.labels(peers=0, error=True).inc() + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 + raise err + log.warning("error announcing %s: %s", blob_hash[:8], str(err)) async def _announce(self, batch_size: typing.Optional[int] = 10): while batch_size: @@ -56,14 +59,14 @@ class BlobAnnouncer: log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) while len(self.announce_queue) > 0: log.info("%i blobs to announce", len(self.announce_queue)) - announced = await asyncio.gather(*[ - self._submit_announcement( - self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue - ], loop=self.loop) - announced = list(filter(None, announced)) + await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop) + announced = list(filter(None, self.announced)) if announced: await self.storage.update_last_announced_blobs(announced) log.info("announced %i blobs", len(announced)) + self.announced.clear() + self._done.set() + self._done.clear() def start(self, batch_size: typing.Optional[int] = 10): assert not self.announce_task or self.announce_task.done(), "already running" @@ -72,3 +75,6 @@ class BlobAnnouncer: def stop(self): if self.announce_task and not self.announce_task.done(): self.announce_task.cancel() + + def wait(self): + return self._done.wait() diff --git a/lbry/dht/constants.py b/lbry/dht/constants.py index 07dcec18a..7380ce60a 100644 --- a/lbry/dht/constants.py +++ b/lbry/dht/constants.py @@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300 # 5 minutes CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5 RPC_ID_LENGTH = 20 PROTOCOL_VERSION = 1 -BOTTOM_OUT_LIMIT = 3 MSG_SIZE_LIMIT = 1400 diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 345662460..864edc077 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -202,25 +202,23 @@ class Node: self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, - bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT, max_results: int = constants.K) -> IterativeNodeFinder: return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, - key, bottom_out_limit, max_results, None, shortlist) + key, max_results, None, shortlist) def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, - bottom_out_limit: int = 40, max_results: int = -1) -> IterativeValueFinder: return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, - key, bottom_out_limit, max_results, None, shortlist) + key, max_results, None, shortlist) async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2, - bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None + shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] async for iteration_peers in self.get_iterative_node_finder( - node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results): + node_id, shortlist=shortlist, max_results=max_results): peers.extend(iteration_peers) distance = Distance(node_id) peers.sort(key=lambda peer: distance(peer.node_id)) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index 495f71eff..c5a9c9e84 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -190,3 +190,6 @@ class KademliaPeer: def compact_ip(self): return make_compact_ip(self.address) + + def __str__(self): + return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})" diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index f8bd2ba6b..ab89edddc 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -1,6 +1,6 @@ import asyncio from itertools import chain -from collections import defaultdict +from collections import defaultdict, OrderedDict import typing import logging from typing import TYPE_CHECKING @@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, class IterativeFinder: def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, + max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): if len(key) != constants.HASH_LENGTH: @@ -85,28 +85,22 @@ class IterativeFinder: self.protocol = protocol self.key = key - self.bottom_out_limit = bottom_out_limit - self.max_results = max_results + self.max_results = max(constants.K, max_results) self.exclude = exclude or [] - self.active: typing.Set['KademliaPeer'] = set() + self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) - self.closest_peer: typing.Optional['KademliaPeer'] = None - self.prev_closest_peer: typing.Optional['KademliaPeer'] = None - self.iteration_queue = asyncio.Queue(loop=self.loop) - self.running_probes: typing.Set[asyncio.Task] = set() + self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {} self.iteration_count = 0 - self.bottom_out_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] - self.delayed_calls: typing.List[asyncio.Handle] = [] for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: - self._add_active(peer) + self._add_active(peer, force=True) else: # seed nodes self._schedule_probe(peer) @@ -138,15 +132,14 @@ class IterativeFinder: """ return [] - def _is_closer(self, peer: 'KademliaPeer') -> bool: - return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) - - def _add_active(self, peer): + def _add_active(self, peer, force=False): + if not force and self.peer_manager.peer_is_good(peer) is False: + return + if peer in self.contacted: + return if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: - self.active.add(peer) - if self._is_closer(peer): - self.prev_closest_peer = self.closest_peer - self.closest_peer = peer + self.active[peer] = self.distance(peer.node_id) + self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1])) async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) @@ -158,33 +151,43 @@ class IterativeFinder: log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, peer.udp_port, address, udp_port) self.check_result_ready(response) + self._log_state() + + def _reset_closest(self, peer): + if peer in self.active: + del self.active[peer] async def _send_probe(self, peer: 'KademliaPeer'): try: response = await self.send_probe(peer) except asyncio.TimeoutError: - self.active.discard(peer) + self._reset_closest(peer) return except ValueError as err: log.warning(str(err)) - self.active.discard(peer) + self._reset_closest(peer) return except TransportNotConnected: return self.aclose() except RemoteException: + self._reset_closest(peer) return return await self._handle_probe_result(peer, response) - async def _search_round(self): + def _search_round(self): """ Send up to constants.alpha (5) probes to closest active peers """ added = 0 - to_probe = list(self.active - self.contacted) - to_probe.sort(key=lambda peer: self.distance(self.key)) - for peer in to_probe: - if added >= constants.ALPHA: + for index, peer in enumerate(self.active.keys()): + if index == 0: + log.debug("closest to probe: %s", peer.node_id.hex()[:8]) + if peer in self.contacted: + continue + if len(self.running_probes) >= constants.ALPHA: + break + if index > (constants.K + len(self.running_probes)): break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: @@ -206,33 +209,22 @@ class IterativeFinder: t = self.loop.create_task(self._send_probe(peer)) def callback(_): - self.running_probes.difference_update({ - probe for probe in self.running_probes if probe.done() or probe == t - }) - if not self.running_probes: - self.tasks.append(self.loop.create_task(self._search_task(0.0))) + self.running_probes.pop(peer, None) + if self.running: + self._search_round() t.add_done_callback(callback) - self.running_probes.add(t) + self.running_probes[peer] = t - async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): - try: - if self.running: - await self._search_round() - if self.running: - self.delayed_calls.append(self.loop.call_later(delay, self._search)) - except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected): - if self.running: - self.loop.call_soon(self.aclose) - - def _search(self): - self.tasks.append(self.loop.create_task(self._search_task())) + def _log_state(self): + log.debug("[%s] check result: %i active nodes %i contacted", + self.key.hex()[:8], len(self.active), len(self.contacted)) def __aiter__(self): if self.running: raise Exception("already running") self.running = True - self._search() + self.loop.call_soon(self._search_round) return self async def __anext__(self) -> typing.List['KademliaPeer']: @@ -252,20 +244,19 @@ class IterativeFinder: def aclose(self): self.running = False self.iteration_queue.put_nowait(None) - for task in chain(self.tasks, self.running_probes, self.delayed_calls): + for task in chain(self.tasks, self.running_probes.values()): task.cancel() self.tasks.clear() self.running_probes.clear() - self.delayed_calls.clear() class IterativeNodeFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, + max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): - super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, + super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude, shortlist) self.yielded_peers: typing.Set['KademliaPeer'] = set() @@ -276,14 +267,14 @@ class IterativeNodeFinder(IterativeFinder): return FindNodeResponse(self.key, response) def search_exhausted(self): - self.put_result(self.active, finish=True) + self.put_result(self.active.keys(), finish=True) def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): not_yet_yielded = [ peer for peer in from_iter if peer not in self.yielded_peers and peer.node_id != self.protocol.node_id - and self.peer_manager.peer_is_good(peer) is not False + and self.peer_manager.peer_is_good(peer) is True # return only peers who answered ] not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) to_yield = not_yet_yielded[:max(constants.K, self.max_results)] @@ -298,26 +289,16 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") - return self.put_result(self.active, finish=True) - if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer): - # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted), - # self.bottom_out_count, self.iteration_count) - self.bottom_out_count = 0 - elif self.prev_closest_peer and self.closest_peer: - self.bottom_out_count += 1 - log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count) - if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.info("limit hit") - self.put_result(self.active, True) + return self.put_result(self.active.keys(), finish=True) class IterativeValueFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, + max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): - super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, + super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude, shortlist) self.blob_peers: typing.Set['KademliaPeer'] = set() # this tracks the index of the most recent page we requested from each peer @@ -362,23 +343,12 @@ class IterativeValueFinder(IterativeFinder): blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] to_yield = [] - self.bottom_out_count = 0 for blob_peer in blob_peers: if blob_peer not in self.blob_peers: self.blob_peers.add(blob_peer) to_yield.append(blob_peer) if to_yield: - # log.info("found %i new peers for blob", len(to_yield)) self.iteration_queue.put_nowait(to_yield) - # if self.max_results and len(self.blob_peers) >= self.max_results: - # log.info("enough blob peers found") - # if not self.finished.is_set(): - # self.finished.set() - elif self.prev_closest_peer and self.closest_peer: - self.bottom_out_count += 1 - if self.bottom_out_count >= self.bottom_out_limit: - log.info("blob peer search bottomed out") - self.iteration_queue.put_nowait(None) def get_initial_result(self) -> typing.List['KademliaPeer']: if self.protocol.data_store.has_peers_for_blob(self.key): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 003ea9cf7..fb25207f7 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType): """ @requires(DHT_COMPONENT) - async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None): + async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None): """ Get peers for blob hash Usage: peer_list ( | --blob_hash=) - [ | --search_bottom_out_limit=] [--page=] [--page_size=] Options: --blob_hash= : (str) find available peers for this blob hash - --search_bottom_out_limit= : (int) the number of search probes in a row - that don't find any new peers - before giving up and returning --page= : (int) page to return during paginating --page_size= : (int) number of items on page during pagination @@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType): if not is_valid_blobhash(blob_hash): # TODO: use error from lbry.error raise Exception("invalid blob hash") - if search_bottom_out_limit is not None: - search_bottom_out_limit = int(search_bottom_out_limit) - if search_bottom_out_limit <= 0: - # TODO: use error from lbry.error - raise Exception("invalid bottom out limit") - else: - search_bottom_out_limit = 4 peers = [] peer_q = asyncio.Queue(loop=self.component_manager.loop) await self.dht_node._peers_for_value_producer(blob_hash, peer_q) diff --git a/lbry/testcase.py b/lbry/testcase.py index b10ea9b27..6214553e5 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -204,7 +204,13 @@ class AsyncioTestCase(unittest.TestCase): def add_timeout(self): if self.TIMEOUT: - self.loop.call_later(self.TIMEOUT, self.cancel) + self.loop.call_later(self.TIMEOUT, self.check_timeout, time()) + + def check_timeout(self, started): + if time() - started >= self.TIMEOUT: + self.cancel() + else: + self.loop.call_later(self.TIMEOUT, self.check_timeout, started) class AdvanceTimeTestCase(AsyncioTestCase): diff --git a/tests/dht_mocks.py b/tests/dht_mocks.py index 2e01986c0..4bebcfaf1 100644 --- a/tests/dht_mocks.py +++ b/tests/dht_mocks.py @@ -9,7 +9,7 @@ if typing.TYPE_CHECKING: def get_time_accelerator(loop: asyncio.AbstractEventLoop, - now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]: + instant_step: bool = False) -> typing.Callable[[float], typing.Awaitable[None]]: """ Returns an async advance() function @@ -17,32 +17,22 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop, made by call_later, call_at, and call_soon. """ - _time = now or loop.time() - loop.time = functools.wraps(loop.time)(lambda: _time) + original = loop.time + _drift = 0 + loop.time = functools.wraps(loop.time)(lambda: original() + _drift) async def accelerate_time(seconds: float) -> None: - nonlocal _time + nonlocal _drift if seconds < 0: raise ValueError(f'Cannot go back in time ({seconds} seconds)') - _time += seconds - await past_events() + _drift += seconds await asyncio.sleep(0) - async def past_events() -> None: - while loop._scheduled: - timer: asyncio.TimerHandle = loop._scheduled[0] - if timer not in loop._ready and timer._when <= _time: - loop._scheduled.remove(timer) - loop._ready.append(timer) - if timer._when > _time: - break - await asyncio.sleep(0) - async def accelerator(seconds: float): - steps = seconds * 10.0 + steps = seconds * 10.0 if not instant_step else 1 for _ in range(max(int(steps), 1)): - await accelerate_time(0.1) + await accelerate_time(seconds/steps) return accelerator diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 5c7d921aa..be445aae7 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -1,60 +1,68 @@ import contextlib +import logging import typing import binascii import socket import asyncio + from lbry.testcase import AsyncioTestCase from tests import dht_mocks +from lbry.dht.protocol.distance import Distance from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht.blob_announcer import BlobAnnouncer from lbry.extras.daemon.storage import SQLiteStorage -from unittest import skip + class TestBlobAnnouncer(AsyncioTestCase): async def setup_node(self, peer_addresses, address, node_id): self.nodes: typing.Dict[int, Node] = {} - self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time()) + self.advance = dht_mocks.get_time_accelerator(self.loop) + self.instant_advance = dht_mocks.get_time_accelerator(self.loop) self.conf = Config() - self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time) - await self.storage.open() self.peer_manager = PeerManager(self.loop) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) await self.node.start_listening(address) - self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) - for node_id, address in peer_addresses: - await self.add_peer(node_id, address) + await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses]) + for first_peer in self.nodes.values(): + for second_peer in self.nodes.values(): + if first_peer == second_peer: + continue + self.add_peer_to_routing_table(first_peer, second_peer) + self.add_peer_to_routing_table(second_peer, first_peer) + await self.advance(0.1) # just to make pings go through self.node.joined.set() self.node._refresh_task = self.loop.create_task(self.node.refresh_node()) + self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time) + await self.storage.open() + self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) async def add_peer(self, node_id, address, add_to_routing_table=True): + #print('add', node_id.hex()[:8], address) n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) await n.start_listening(address) self.nodes.update({len(self.nodes): n}) if add_to_routing_table: - self.node.protocol.add_peer( - make_kademlia_peer( - n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port - ) + self.add_peer_to_routing_table(self.node, n) + + def add_peer_to_routing_table(self, adder, being_added): + adder.protocol.add_peer( + make_kademlia_peer( + being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port ) + ) @contextlib.asynccontextmanager - async def _test_network_context(self, peer_addresses=None): - self.peer_addresses = peer_addresses or [ - (constants.generate_id(2), '1.2.3.2'), - (constants.generate_id(3), '1.2.3.3'), - (constants.generate_id(4), '1.2.3.4'), - (constants.generate_id(5), '1.2.3.5'), - (constants.generate_id(6), '1.2.3.6'), - (constants.generate_id(7), '1.2.3.7'), - (constants.generate_id(8), '1.2.3.8'), - (constants.generate_id(9), '1.2.3.9'), + async def _test_network_context(self, peer_count=200): + self.peer_addresses = [ + (constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big'))) + for i in range(1, peer_count + 1) ] try: with dht_mocks.mock_network_loop(self.loop): - await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1)) + await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000)) yield finally: self.blob_announcer.stop() @@ -73,43 +81,58 @@ class TestBlobAnnouncer(AsyncioTestCase): ) ) await peer.ping() - return peer + return last_node - @skip("Something from a previous test is leaking into this test and causing it to fail intermittently") async def test_announce_blobs(self): blob1 = binascii.hexlify(b'1' * 48).decode() blob2 = binascii.hexlify(b'2' * 48).decode() - async with self._test_network_context(): - await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True) - await self.storage.db.execute( - "update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)", - (blob1, blob2) - ) + async with self._test_network_context(peer_count=100): + await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True) + await self.storage.add_blobs( + *((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)), + finished=True) + await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1") to_announce = await self.storage.get_blobs_to_announce() - self.assertEqual(2, len(to_announce)) - self.blob_announcer.start(batch_size=1) # so it covers batching logic + self.assertEqual(92, len(to_announce)) + self.blob_announcer.start(batch_size=10) # so it covers batching logic # takes 60 seconds to start, but we advance 120 to ensure it processed all batches - await self.advance(60.0 * 2) + ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait()) + await self.instant_advance(60.0) + await ongoing_announcements to_announce = await self.storage.get_blobs_to_announce() self.assertEqual(0, len(to_announce)) self.blob_announcer.stop() + # as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI + tolerance = 0.8 # at least 80% of the announcements are within the top K + for blob in await self.storage.get_all_blob_hashes(): + distance = Distance(bytes.fromhex(blob)) + candidates = list(self.nodes.values()) + candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id)) + has_it = 0 + for index, node in enumerate(candidates[:constants.K], start=1): + if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)): + has_it += 1 + else: + logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8]) + self.assertGreaterEqual(has_it, int(tolerance * constants.K)) + + # test that we can route from a poorly connected peer all the way to the announced blob - await self.chain_peer(constants.generate_id(10), '1.2.3.10') - await self.chain_peer(constants.generate_id(11), '1.2.3.11') - await self.chain_peer(constants.generate_id(12), '1.2.3.12') - await self.chain_peer(constants.generate_id(13), '1.2.3.13') - await self.chain_peer(constants.generate_id(14), '1.2.3.14') - await self.advance(61.0) + current = len(self.nodes) + await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10') + await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11') + await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12') + await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13') + last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14') - last = self.nodes[len(self.nodes) - 1] search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop) search_q.put_nowait(blob1) _, task = last.accumulate_peers(search_q, peer_q) - found_peers = await peer_q.get() + found_peers = await asyncio.wait_for(peer_q.get(), 1.0) task.cancel() self.assertEqual(1, len(found_peers)) @@ -119,21 +142,13 @@ class TestBlobAnnouncer(AsyncioTestCase): async def test_popular_blob(self): peer_count = 150 - addresses = [ - (constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big'))) - for i in range(peer_count) - ] - blob_hash = b'1' * 48 + blob_hash = constants.generate_id(99999) - async with self._test_network_context(peer_addresses=addresses): + async with self._test_network_context(peer_count=peer_count): total_seen = set() - announced_to = self.nodes[0] - for i in range(1, peer_count): - node = self.nodes[i] - kad_peer = make_kademlia_peer( - node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port - ) - await announced_to.protocol._add_peer(kad_peer) + announced_to = self.nodes.pop(0) + for i, node in enumerate(self.nodes.values()): + self.add_peer_to_routing_table(announced_to, node) peer = node.protocol.get_rpc_peer( make_kademlia_peer( announced_to.protocol.node_id, @@ -144,15 +159,15 @@ class TestBlobAnnouncer(AsyncioTestCase): response = await peer.store(blob_hash) self.assertEqual(response, b'OK') peers_for_blob = await peer.find_value(blob_hash, 0) - if i == 1: + if i == 0: self.assertNotIn(blob_hash, peers_for_blob) self.assertEqual(peers_for_blob[b'p'], 0) else: - self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K)) - self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i) + self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K)) + self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1) if i - 1 > constants.K: self.assertEqual(len(peers_for_blob[b'contacts']), constants.K) - self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1) + self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1) seen = set(peers_for_blob[blob_hash]) self.assertEqual(len(seen), constants.K) self.assertEqual(len(peers_for_blob[blob_hash]), len(seen)) @@ -167,5 +182,5 @@ class TestBlobAnnouncer(AsyncioTestCase): seen.intersection_update(page_x_set) total_seen.update(page_x_set) else: - self.assertEqual(len(peers_for_blob[b'contacts']), i - 1) + self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page self.assertEqual(len(total_seen), peer_count - 2) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index c862305ec..5ecad5181 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -12,7 +12,6 @@ from lbry.extras.daemon.storage import SQLiteStorage class TestNodePingQueueDiscover(AsyncioTestCase): - TIMEOUT = None # not supported as it advances time async def test_ping_queue_discover(self): loop = asyncio.get_event_loop() loop.set_debug(False) @@ -29,7 +28,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase): (constants.generate_id(9), '1.2.3.9'), ] with dht_mocks.mock_network_loop(loop): - advance = dht_mocks.get_time_accelerator(loop, loop.time()) + advance = dht_mocks.get_time_accelerator(loop) # start the nodes nodes: typing.Dict[int, Node] = { i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address) @@ -93,7 +92,6 @@ class TestNodePingQueueDiscover(AsyncioTestCase): class TestTemporarilyLosingConnection(AsyncioTestCase): - TIMEOUT = None # not supported as it advances time @unittest.SkipTest async def test_losing_connection(self): async def wait_for(check_ok, insist, timeout=20): @@ -131,7 +129,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase): await asyncio.gather(*[n.joined.wait() for n in nodes]) node = nodes[-1] - advance = dht_mocks.get_time_accelerator(loop, loop.time()) + advance = dht_mocks.get_time_accelerator(loop) await advance(500) # Join the network, assert that at least the known peers are in RT