Compare commits

...
Sign in to create a new pull request.

33 commits

Author SHA1 Message Date
Victor Shyba
f4fa217b71 Merge remote-tracking branch 'origin/dht_bug' into temp_dht_supermerge 2022-02-22 22:38:51 -03:00
Victor Shyba
f68ea01056 simplify, genaralize to any size and fix tests 2022-02-22 22:38:04 -03:00
Victor Shyba
6fb1443e63 stop after finding what to download 2022-02-22 16:00:13 -03:00
Victor Shyba
af7574dc9d replace duplicated code 2022-02-22 16:00:13 -03:00
Victor Shyba
af9cc457ec add get_colliding_prefix_bits, docs and tests 2022-02-22 15:59:59 -03:00
Victor Shyba
2a1e1d542f extract method and avoid using hash builtin name 2022-02-22 14:25:52 -03:00
Victor Shyba
f5b3e9bacd implement announcer as a consumer task on gather 2022-02-21 10:55:41 -03:00
Victor Shyba
d00b5befbf make active an explicit ordered dict 2022-02-21 10:52:06 -03:00
Victor Shyba
c1e64df528 remove unused search rounds 2022-02-21 10:52:06 -03:00
Victor Shyba
3e79dcd179 timeout is now supported on dht tests 2022-02-21 10:52:06 -03:00
Victor Shyba
cc104369cb fix and enable test_blob_announcer 2022-02-21 10:52:06 -03:00
Victor Shyba
586b09c1bc simplify dht mock and restore clock after accelerating 2022-02-21 10:52:06 -03:00
Victor Shyba
b574fb7771 better representation of kademliapeer on debug logs 2022-02-21 10:52:06 -03:00
Victor Shyba
51be734a08 add a way to wait announcements to finish so tests are reliable 2022-02-21 10:52:06 -03:00
Victor Shyba
0120d989d8 make timeout handler immune to asyncio time tricks 2022-02-21 10:52:06 -03:00
Victor Shyba
12f156257e allow running some extra probes for k replacements 2022-02-21 10:52:06 -03:00
Victor Shyba
dcde0e78e3 remove all references to bottoming out 2022-02-21 10:52:06 -03:00
Victor Shyba
dc1c0e6851 no stop condition, let it exhaust 2022-02-21 10:52:06 -03:00
Victor Shyba
c45f27d5cc bottoming out is now warning and no results for peer search 2022-02-21 10:52:06 -03:00
Victor Shyba
6335590b65 don't probe peers too far from the top closest 2022-02-21 10:52:06 -03:00
Victor Shyba
b7b8831109 use a dict for the active queue 2022-02-21 10:52:06 -03:00
Victor Shyba
2ed23fbc4b log bottom out of peer search in debug, show short key id for find value 2022-02-21 10:52:06 -03:00
Victor Shyba
023cfb593a bump bottom out limit of peer search so people can use 100 concurrent announcers 2022-02-21 10:52:06 -03:00
Victor Shyba
2884dba52d wait until k peers are ready. do not double add peers 2022-02-21 10:52:06 -03:00
Victor Shyba
44c4b03d44 only return good (contacted) peers 2022-02-21 10:52:06 -03:00
Victor Shyba
6ba8f96511 reset closest peer on failure 2022-02-21 10:52:06 -03:00
Victor Shyba
4987f57944 add peers from shortlist regardless, but check from other nodes 2022-02-21 10:52:06 -03:00
Victor Shyba
f5bf8b8684 bump split index to 2 2022-02-21 10:52:06 -03:00
Victor Shyba
809a8c1226 fix distance sorting and improve logging 2022-02-21 10:52:06 -03:00
Victor Shyba
e319b55db5 closest peer is only ready when it was contacted and isn't known to be bad 2022-02-21 10:52:06 -03:00
Victor Shyba
f274562c92 dont probe and ignore bad peers 2022-02-21 10:52:06 -03:00
Victor Shyba
d1bc981b11 extract min_prefix_colliding_bits to a contanst 2022-02-20 22:34:53 -03:00
Victor Shyba
53d78e9194 check that the stored blob is at least 1 prefix byte close to peer id 2022-02-20 22:34:53 -03:00
15 changed files with 221 additions and 207 deletions

View file

@ -622,7 +622,7 @@ class Config(CLIConfig):
"Routing table bucket index below which we always split the bucket if given a new key to add to it and "
"the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
"use.", 1
"use.", 2
)
# protocol timeouts

View file

@ -27,21 +27,24 @@ class BlobAnnouncer:
self.storage = storage
self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = []
self._done = asyncio.Event()
self.announced = set()
async def _submit_announcement(self, blob_hash):
try:
peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
return blob_hash
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _run_consumer(self):
while self.announce_queue:
try:
blob_hash = self.announce_queue.pop()
peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
self.announced.add(blob_hash)
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
@ -56,14 +59,14 @@ class BlobAnnouncer:
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue))
announced = await asyncio.gather(*[
self._submit_announcement(
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
], loop=self.loop)
announced = list(filter(None, announced))
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
announced = list(filter(None, self.announced))
if announced:
await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced))
self.announced.clear()
self._done.set()
self._done.clear()
def start(self, batch_size: typing.Optional[int] = 10):
assert not self.announce_task or self.announce_task.done(), "already running"
@ -72,3 +75,6 @@ class BlobAnnouncer:
def stop(self):
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()
def wait(self):
return self._done.wait()

View file

@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300 # 5 minutes
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
RPC_ID_LENGTH = 20
PROTOCOL_VERSION = 1
BOTTOM_OUT_LIMIT = 3
MSG_SIZE_LIMIT = 1400

View file

@ -202,25 +202,23 @@ class Node:
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.K) -> IterativeNodeFinder:
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = 40,
max_results: int = -1) -> IterativeValueFinder:
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']:
peers = []
async for iteration_peers in self.get_iterative_node_finder(
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
node_id, shortlist=shortlist, max_results=max_results):
peers.extend(iteration_peers)
distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id))

View file

@ -190,3 +190,6 @@ class KademliaPeer:
def compact_ip(self):
return make_compact_ip(self.address)
def __str__(self):
return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})"

View file

@ -1,6 +1,6 @@
import asyncio
from itertools import chain
from collections import defaultdict
from collections import defaultdict, OrderedDict
import typing
import logging
from typing import TYPE_CHECKING
@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH:
@ -85,28 +85,22 @@ class IterativeFinder:
self.protocol = protocol
self.key = key
self.bottom_out_limit = bottom_out_limit
self.max_results = max_results
self.max_results = max(constants.K, max_results)
self.exclude = exclude or []
self.active: typing.Set['KademliaPeer'] = set()
self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)
self.closest_peer: typing.Optional['KademliaPeer'] = None
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
self.iteration_queue = asyncio.Queue(loop=self.loop)
self.running_probes: typing.Set[asyncio.Task] = set()
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0
self.bottom_out_count = 0
self.running = False
self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id:
self._add_active(peer)
self._add_active(peer, force=True)
else:
# seed nodes
self._schedule_probe(peer)
@ -138,15 +132,14 @@ class IterativeFinder:
"""
return []
def _is_closer(self, peer: 'KademliaPeer') -> bool:
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
def _add_active(self, peer):
def _add_active(self, peer, force=False):
if not force and self.peer_manager.peer_is_good(peer) is False:
return
if peer in self.contacted:
return
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
self.active[peer] = self.distance(peer.node_id)
self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer)
@ -158,33 +151,43 @@ class IterativeFinder:
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
peer.udp_port, address, udp_port)
self.check_result_ready(response)
self._log_state()
def _reset_closest(self, peer):
if peer in self.active:
del self.active[peer]
async def _send_probe(self, peer: 'KademliaPeer'):
try:
response = await self.send_probe(peer)
except asyncio.TimeoutError:
self.active.discard(peer)
self._reset_closest(peer)
return
except ValueError as err:
log.warning(str(err))
self.active.discard(peer)
self._reset_closest(peer)
return
except TransportNotConnected:
return self.aclose()
except RemoteException:
self._reset_closest(peer)
return
return await self._handle_probe_result(peer, response)
async def _search_round(self):
def _search_round(self):
"""
Send up to constants.alpha (5) probes to closest active peers
"""
added = 0
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(self.key))
for peer in to_probe:
if added >= constants.ALPHA:
for index, peer in enumerate(self.active.keys()):
if index == 0:
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
if peer in self.contacted:
continue
if len(self.running_probes) >= constants.ALPHA:
break
if index > (constants.K + len(self.running_probes)):
break
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
@ -206,33 +209,22 @@ class IterativeFinder:
t = self.loop.create_task(self._send_probe(peer))
def callback(_):
self.running_probes.difference_update({
probe for probe in self.running_probes if probe.done() or probe == t
})
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
self.running_probes.pop(peer, None)
if self.running:
self._search_round()
t.add_done_callback(callback)
self.running_probes.add(t)
self.running_probes[peer] = t
async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
try:
if self.running:
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
if self.running:
self.loop.call_soon(self.aclose)
def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
def _log_state(self):
log.debug("[%s] check result: %i active nodes %i contacted",
self.key.hex()[:8], len(self.active), len(self.contacted))
def __aiter__(self):
if self.running:
raise Exception("already running")
self.running = True
self._search()
self.loop.call_soon(self._search_round)
return self
async def __anext__(self) -> typing.List['KademliaPeer']:
@ -252,20 +244,19 @@ class IterativeFinder:
def aclose(self):
self.running = False
self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
for task in chain(self.tasks, self.running_probes.values()):
task.cancel()
self.tasks.clear()
self.running_probes.clear()
self.delayed_calls.clear()
class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set()
@ -276,14 +267,14 @@ class IterativeNodeFinder(IterativeFinder):
return FindNodeResponse(self.key, response)
def search_exhausted(self):
self.put_result(self.active, finish=True)
self.put_result(self.active.keys(), finish=True)
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
peer for peer in from_iter
if peer not in self.yielded_peers
and peer.node_id != self.protocol.node_id
and self.peer_manager.peer_is_good(peer) is not False
and self.peer_manager.peer_is_good(peer) is True # return only peers who answered
]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
@ -298,26 +289,16 @@ class IterativeNodeFinder(IterativeFinder):
if found:
log.debug("found")
return self.put_result(self.active, finish=True)
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
# self.bottom_out_count, self.iteration_count)
self.bottom_out_count = 0
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("limit hit")
self.put_result(self.active, True)
return self.put_result(self.active.keys(), finish=True)
class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
@ -362,23 +343,12 @@ class IterativeValueFinder(IterativeFinder):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses]
to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers:
if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer)
to_yield.append(blob_peer)
if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
self.iteration_queue.put_nowait(None)
def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key):

View file

@ -6,6 +6,7 @@ import itertools
from prometheus_client import Gauge
from lbry import utils
from lbry.dht import constants
from lbry.dht.protocol.distance import Distance
if typing.TYPE_CHECKING:
@ -70,9 +71,8 @@ class KBucket:
if len(self.peers) < constants.K:
self.peers.append(peer)
self.peer_in_routing_table_metric.labels("global").inc()
if peer.node_id[0] == self._node_id[0]:
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc()
bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).inc()
return True
else:
return False
@ -140,9 +140,8 @@ class KBucket:
def remove_peer(self, peer: 'KademliaPeer') -> None:
self.peers.remove(peer)
self.peer_in_routing_table_metric.labels("global").dec()
if peer.node_id[0] == self._node_id[0]:
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec()
bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).dec()
def key_in_range(self, key: bytes) -> bool:
""" Tests whether the specified key (i.e. node ID) is in the range

View file

@ -381,6 +381,7 @@ class FileManagerComponent(Component):
class BackgroundDownloaderComponent(Component):
MIN_PREFIX_COLLIDING_BITS = 8
component_name = BACKGROUND_DOWNLOADER_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
@ -412,12 +413,18 @@ class BackgroundDownloaderComponent(Component):
while True:
self.space_available = await self.space_manager.get_free_space_mb(True)
if not self.is_busy and self.space_available > 10:
blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if
key.hex() not in self.blob_manager.completed_blob_hashes), None)
if blob_hash:
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
self._download_next_close_blob_hash()
await asyncio.sleep(self.download_loop_delay_seconds)
def _download_next_close_blob_hash(self):
node_id = self.dht_node.protocol.node_id
for blob_hash in self.dht_node.stored_blob_hashes:
if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
continue
if utils.get_colliding_prefix_bits(node_id, blob_hash) >= self.MIN_PREFIX_COLLIDING_BITS:
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
return
async def start(self):
self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
if not self.component_manager.has_component(DHT_COMPONENT):

View file

@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType):
"""
@requires(DHT_COMPONENT)
async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None):
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
"""
Get peers for blob hash
Usage:
peer_list (<blob_hash> | --blob_hash=<blob_hash>)
[<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
[--page=<page>] [--page_size=<page_size>]
Options:
--blob_hash=<blob_hash> : (str) find available peers for this blob hash
--search_bottom_out_limit=<search_bottom_out_limit> : (int) the number of search probes in a row
that don't find any new peers
before giving up and returning
--page=<page> : (int) page to return during paginating
--page_size=<page_size> : (int) number of items on page during pagination
@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType):
if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error
raise Exception("invalid blob hash")
if search_bottom_out_limit is not None:
search_bottom_out_limit = int(search_bottom_out_limit)
if search_bottom_out_limit <= 0:
# TODO: use error from lbry.error
raise Exception("invalid bottom out limit")
else:
search_bottom_out_limit = 4
peers = []
peer_q = asyncio.Queue(loop=self.component_manager.loop)
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)

View file

@ -204,7 +204,13 @@ class AsyncioTestCase(unittest.TestCase):
def add_timeout(self):
if self.TIMEOUT:
self.loop.call_later(self.TIMEOUT, self.cancel)
self.loop.call_later(self.TIMEOUT, self.check_timeout, time())
def check_timeout(self, started):
if time() - started >= self.TIMEOUT:
self.cancel()
else:
self.loop.call_later(self.TIMEOUT, self.check_timeout, started)
class AdvanceTimeTestCase(AsyncioTestCase):

View file

@ -474,3 +474,18 @@ class LockWithMetrics(asyncio.Lock):
return super().release()
finally:
self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time)
def get_colliding_prefix_bits(first_value: bytes, second_value: bytes):
"""
Calculates the amount of colliding prefix bits between <first_value> and <second_value>.
This is given by the amount of bits that are the same until the first different one (via XOR),
starting from the most significant bit to the least significant bit.
:param first_value: first value to compare, bigger than size.
:param second_value: second value to compare, bigger than size.
:return: amount of prefix colliding bits.
"""
assert len(first_value) == len(second_value), "length should be the same"
size = len(first_value) * 8
first_value, second_value = int.from_bytes(first_value, "big"), int.from_bytes(second_value, "big")
return size - (first_value ^ second_value).bit_length()

View file

@ -9,7 +9,7 @@ if typing.TYPE_CHECKING:
def get_time_accelerator(loop: asyncio.AbstractEventLoop,
now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]:
instant_step: bool = False) -> typing.Callable[[float], typing.Awaitable[None]]:
"""
Returns an async advance() function
@ -17,32 +17,22 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop,
made by call_later, call_at, and call_soon.
"""
_time = now or loop.time()
loop.time = functools.wraps(loop.time)(lambda: _time)
original = loop.time
_drift = 0
loop.time = functools.wraps(loop.time)(lambda: original() + _drift)
async def accelerate_time(seconds: float) -> None:
nonlocal _time
nonlocal _drift
if seconds < 0:
raise ValueError(f'Cannot go back in time ({seconds} seconds)')
_time += seconds
await past_events()
_drift += seconds
await asyncio.sleep(0)
async def past_events() -> None:
while loop._scheduled:
timer: asyncio.TimerHandle = loop._scheduled[0]
if timer not in loop._ready and timer._when <= _time:
loop._scheduled.remove(timer)
loop._ready.append(timer)
if timer._when > _time:
break
await asyncio.sleep(0)
async def accelerator(seconds: float):
steps = seconds * 10.0
steps = seconds * 10.0 if not instant_step else 1
for _ in range(max(int(steps), 1)):
await accelerate_time(0.1)
await accelerate_time(seconds/steps)
return accelerator

View file

@ -1,60 +1,68 @@
import contextlib
import logging
import typing
import binascii
import socket
import asyncio
from lbry.testcase import AsyncioTestCase
from tests import dht_mocks
from lbry.dht.protocol.distance import Distance
from lbry.conf import Config
from lbry.dht import constants
from lbry.dht.node import Node
from lbry.dht.peer import PeerManager, make_kademlia_peer
from lbry.dht.blob_announcer import BlobAnnouncer
from lbry.extras.daemon.storage import SQLiteStorage
from unittest import skip
class TestBlobAnnouncer(AsyncioTestCase):
async def setup_node(self, peer_addresses, address, node_id):
self.nodes: typing.Dict[int, Node] = {}
self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time())
self.advance = dht_mocks.get_time_accelerator(self.loop)
self.instant_advance = dht_mocks.get_time_accelerator(self.loop)
self.conf = Config()
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
await self.storage.open()
self.peer_manager = PeerManager(self.loop)
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
await self.node.start_listening(address)
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
for node_id, address in peer_addresses:
await self.add_peer(node_id, address)
await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses])
for first_peer in self.nodes.values():
for second_peer in self.nodes.values():
if first_peer == second_peer:
continue
self.add_peer_to_routing_table(first_peer, second_peer)
self.add_peer_to_routing_table(second_peer, first_peer)
await self.advance(0.1) # just to make pings go through
self.node.joined.set()
self.node._refresh_task = self.loop.create_task(self.node.refresh_node())
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
await self.storage.open()
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
async def add_peer(self, node_id, address, add_to_routing_table=True):
#print('add', node_id.hex()[:8], address)
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
await n.start_listening(address)
self.nodes.update({len(self.nodes): n})
if add_to_routing_table:
self.node.protocol.add_peer(
make_kademlia_peer(
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port
)
self.add_peer_to_routing_table(self.node, n)
def add_peer_to_routing_table(self, adder, being_added):
adder.protocol.add_peer(
make_kademlia_peer(
being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port
)
)
@contextlib.asynccontextmanager
async def _test_network_context(self, peer_addresses=None):
self.peer_addresses = peer_addresses or [
(constants.generate_id(2), '1.2.3.2'),
(constants.generate_id(3), '1.2.3.3'),
(constants.generate_id(4), '1.2.3.4'),
(constants.generate_id(5), '1.2.3.5'),
(constants.generate_id(6), '1.2.3.6'),
(constants.generate_id(7), '1.2.3.7'),
(constants.generate_id(8), '1.2.3.8'),
(constants.generate_id(9), '1.2.3.9'),
async def _test_network_context(self, peer_count=200):
self.peer_addresses = [
(constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
for i in range(1, peer_count + 1)
]
try:
with dht_mocks.mock_network_loop(self.loop):
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1))
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000))
yield
finally:
self.blob_announcer.stop()
@ -73,43 +81,58 @@ class TestBlobAnnouncer(AsyncioTestCase):
)
)
await peer.ping()
return peer
return last_node
@skip("Something from a previous test is leaking into this test and causing it to fail intermittently")
async def test_announce_blobs(self):
blob1 = binascii.hexlify(b'1' * 48).decode()
blob2 = binascii.hexlify(b'2' * 48).decode()
async with self._test_network_context():
await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True)
await self.storage.db.execute(
"update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)",
(blob1, blob2)
)
async with self._test_network_context(peer_count=100):
await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True)
await self.storage.add_blobs(
*((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)),
finished=True)
await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1")
to_announce = await self.storage.get_blobs_to_announce()
self.assertEqual(2, len(to_announce))
self.blob_announcer.start(batch_size=1) # so it covers batching logic
self.assertEqual(92, len(to_announce))
self.blob_announcer.start(batch_size=10) # so it covers batching logic
# takes 60 seconds to start, but we advance 120 to ensure it processed all batches
await self.advance(60.0 * 2)
ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait())
await self.instant_advance(60.0)
await ongoing_announcements
to_announce = await self.storage.get_blobs_to_announce()
self.assertEqual(0, len(to_announce))
self.blob_announcer.stop()
# as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI
tolerance = 0.8 # at least 80% of the announcements are within the top K
for blob in await self.storage.get_all_blob_hashes():
distance = Distance(bytes.fromhex(blob))
candidates = list(self.nodes.values())
candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id))
has_it = 0
for index, node in enumerate(candidates[:constants.K], start=1):
if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)):
has_it += 1
else:
logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8])
self.assertGreaterEqual(has_it, int(tolerance * constants.K))
# test that we can route from a poorly connected peer all the way to the announced blob
await self.chain_peer(constants.generate_id(10), '1.2.3.10')
await self.chain_peer(constants.generate_id(11), '1.2.3.11')
await self.chain_peer(constants.generate_id(12), '1.2.3.12')
await self.chain_peer(constants.generate_id(13), '1.2.3.13')
await self.chain_peer(constants.generate_id(14), '1.2.3.14')
await self.advance(61.0)
current = len(self.nodes)
await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10')
await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11')
await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12')
await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13')
last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14')
last = self.nodes[len(self.nodes) - 1]
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
search_q.put_nowait(blob1)
_, task = last.accumulate_peers(search_q, peer_q)
found_peers = await peer_q.get()
found_peers = await asyncio.wait_for(peer_q.get(), 1.0)
task.cancel()
self.assertEqual(1, len(found_peers))
@ -119,21 +142,13 @@ class TestBlobAnnouncer(AsyncioTestCase):
async def test_popular_blob(self):
peer_count = 150
addresses = [
(constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
for i in range(peer_count)
]
blob_hash = b'1' * 48
blob_hash = constants.generate_id(99999)
async with self._test_network_context(peer_addresses=addresses):
async with self._test_network_context(peer_count=peer_count):
total_seen = set()
announced_to = self.nodes[0]
for i in range(1, peer_count):
node = self.nodes[i]
kad_peer = make_kademlia_peer(
node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port
)
await announced_to.protocol._add_peer(kad_peer)
announced_to = self.nodes.pop(0)
for i, node in enumerate(self.nodes.values()):
self.add_peer_to_routing_table(announced_to, node)
peer = node.protocol.get_rpc_peer(
make_kademlia_peer(
announced_to.protocol.node_id,
@ -144,15 +159,15 @@ class TestBlobAnnouncer(AsyncioTestCase):
response = await peer.store(blob_hash)
self.assertEqual(response, b'OK')
peers_for_blob = await peer.find_value(blob_hash, 0)
if i == 1:
if i == 0:
self.assertNotIn(blob_hash, peers_for_blob)
self.assertEqual(peers_for_blob[b'p'], 0)
else:
self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K))
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i)
self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K))
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1)
if i - 1 > constants.K:
self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1)
self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1)
seen = set(peers_for_blob[blob_hash])
self.assertEqual(len(seen), constants.K)
self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
@ -167,5 +182,5 @@ class TestBlobAnnouncer(AsyncioTestCase):
seen.intersection_update(page_x_set)
total_seen.update(page_x_set)
else:
self.assertEqual(len(peers_for_blob[b'contacts']), i - 1)
self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page
self.assertEqual(len(total_seen), peer_count - 2)

View file

@ -12,7 +12,6 @@ from lbry.extras.daemon.storage import SQLiteStorage
class TestNodePingQueueDiscover(AsyncioTestCase):
TIMEOUT = None # not supported as it advances time
async def test_ping_queue_discover(self):
loop = asyncio.get_event_loop()
loop.set_debug(False)
@ -29,7 +28,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
(constants.generate_id(9), '1.2.3.9'),
]
with dht_mocks.mock_network_loop(loop):
advance = dht_mocks.get_time_accelerator(loop, loop.time())
advance = dht_mocks.get_time_accelerator(loop)
# start the nodes
nodes: typing.Dict[int, Node] = {
i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
@ -93,7 +92,6 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
class TestTemporarilyLosingConnection(AsyncioTestCase):
TIMEOUT = None # not supported as it advances time
@unittest.SkipTest
async def test_losing_connection(self):
async def wait_for(check_ok, insist, timeout=20):
@ -131,7 +129,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase):
await asyncio.gather(*[n.joined.wait() for n in nodes])
node = nodes[-1]
advance = dht_mocks.get_time_accelerator(loop, loop.time())
advance = dht_mocks.get_time_accelerator(loop)
await advance(500)
# Join the network, assert that at least the known peers are in RT

19
tests/unit/test_utils.py Normal file
View file

@ -0,0 +1,19 @@
import unittest
from lbry import utils
class UtilsTestCase(unittest.TestCase):
def test_get_colliding_prefix_bits(self):
self.assertEqual(
0, utils.get_colliding_prefix_bits(0xffffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))