Compare commits

...

33 commits

Author SHA1 Message Date
Victor Shyba
f4fa217b71 Merge remote-tracking branch 'origin/dht_bug' into temp_dht_supermerge 2022-02-22 22:38:51 -03:00
Victor Shyba
f68ea01056 simplify, genaralize to any size and fix tests 2022-02-22 22:38:04 -03:00
Victor Shyba
6fb1443e63 stop after finding what to download 2022-02-22 16:00:13 -03:00
Victor Shyba
af7574dc9d replace duplicated code 2022-02-22 16:00:13 -03:00
Victor Shyba
af9cc457ec add get_colliding_prefix_bits, docs and tests 2022-02-22 15:59:59 -03:00
Victor Shyba
2a1e1d542f extract method and avoid using hash builtin name 2022-02-22 14:25:52 -03:00
Victor Shyba
f5b3e9bacd implement announcer as a consumer task on gather 2022-02-21 10:55:41 -03:00
Victor Shyba
d00b5befbf make active an explicit ordered dict 2022-02-21 10:52:06 -03:00
Victor Shyba
c1e64df528 remove unused search rounds 2022-02-21 10:52:06 -03:00
Victor Shyba
3e79dcd179 timeout is now supported on dht tests 2022-02-21 10:52:06 -03:00
Victor Shyba
cc104369cb fix and enable test_blob_announcer 2022-02-21 10:52:06 -03:00
Victor Shyba
586b09c1bc simplify dht mock and restore clock after accelerating 2022-02-21 10:52:06 -03:00
Victor Shyba
b574fb7771 better representation of kademliapeer on debug logs 2022-02-21 10:52:06 -03:00
Victor Shyba
51be734a08 add a way to wait announcements to finish so tests are reliable 2022-02-21 10:52:06 -03:00
Victor Shyba
0120d989d8 make timeout handler immune to asyncio time tricks 2022-02-21 10:52:06 -03:00
Victor Shyba
12f156257e allow running some extra probes for k replacements 2022-02-21 10:52:06 -03:00
Victor Shyba
dcde0e78e3 remove all references to bottoming out 2022-02-21 10:52:06 -03:00
Victor Shyba
dc1c0e6851 no stop condition, let it exhaust 2022-02-21 10:52:06 -03:00
Victor Shyba
c45f27d5cc bottoming out is now warning and no results for peer search 2022-02-21 10:52:06 -03:00
Victor Shyba
6335590b65 don't probe peers too far from the top closest 2022-02-21 10:52:06 -03:00
Victor Shyba
b7b8831109 use a dict for the active queue 2022-02-21 10:52:06 -03:00
Victor Shyba
2ed23fbc4b log bottom out of peer search in debug, show short key id for find value 2022-02-21 10:52:06 -03:00
Victor Shyba
023cfb593a bump bottom out limit of peer search so people can use 100 concurrent announcers 2022-02-21 10:52:06 -03:00
Victor Shyba
2884dba52d wait until k peers are ready. do not double add peers 2022-02-21 10:52:06 -03:00
Victor Shyba
44c4b03d44 only return good (contacted) peers 2022-02-21 10:52:06 -03:00
Victor Shyba
6ba8f96511 reset closest peer on failure 2022-02-21 10:52:06 -03:00
Victor Shyba
4987f57944 add peers from shortlist regardless, but check from other nodes 2022-02-21 10:52:06 -03:00
Victor Shyba
f5bf8b8684 bump split index to 2 2022-02-21 10:52:06 -03:00
Victor Shyba
809a8c1226 fix distance sorting and improve logging 2022-02-21 10:52:06 -03:00
Victor Shyba
e319b55db5 closest peer is only ready when it was contacted and isn't known to be bad 2022-02-21 10:52:06 -03:00
Victor Shyba
f274562c92 dont probe and ignore bad peers 2022-02-21 10:52:06 -03:00
Victor Shyba
d1bc981b11 extract min_prefix_colliding_bits to a contanst 2022-02-20 22:34:53 -03:00
Victor Shyba
53d78e9194 check that the stored blob is at least 1 prefix byte close to peer id 2022-02-20 22:34:53 -03:00
15 changed files with 221 additions and 207 deletions

View file

@ -622,7 +622,7 @@ class Config(CLIConfig):
"Routing table bucket index below which we always split the bucket if given a new key to add to it and " "Routing table bucket index below which we always split the bucket if given a new key to add to it and "
"the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) " "the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal " "will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
"use.", 1 "use.", 2
) )
# protocol timeouts # protocol timeouts

View file

@ -27,21 +27,24 @@ class BlobAnnouncer:
self.storage = storage self.storage = storage
self.announce_task: asyncio.Task = None self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = [] self.announce_queue: typing.List[str] = []
self._done = asyncio.Event()
self.announced = set()
async def _submit_announcement(self, blob_hash): async def _run_consumer(self):
try: while self.announce_queue:
try:
peers = len(await self.node.announce_blob(blob_hash)) blob_hash = self.announce_queue.pop()
self.announcements_sent_metric.labels(peers=peers, error=False).inc() peers = len(await self.node.announce_blob(blob_hash))
if peers > 4: self.announcements_sent_metric.labels(peers=peers, error=False).inc()
return blob_hash if peers > 4:
else: self.announced.add(blob_hash)
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) else:
except Exception as err: log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
self.announcements_sent_metric.labels(peers=0, error=True).inc() except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 self.announcements_sent_metric.labels(peers=0, error=True).inc()
raise err if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
log.warning("error announcing %s: %s", blob_hash[:8], str(err)) raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10): async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size: while batch_size:
@ -56,14 +59,14 @@ class BlobAnnouncer:
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0: while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue)) log.info("%i blobs to announce", len(self.announce_queue))
announced = await asyncio.gather(*[ await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
self._submit_announcement( announced = list(filter(None, self.announced))
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
], loop=self.loop)
announced = list(filter(None, announced))
if announced: if announced:
await self.storage.update_last_announced_blobs(announced) await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced)) log.info("announced %i blobs", len(announced))
self.announced.clear()
self._done.set()
self._done.clear()
def start(self, batch_size: typing.Optional[int] = 10): def start(self, batch_size: typing.Optional[int] = 10):
assert not self.announce_task or self.announce_task.done(), "already running" assert not self.announce_task or self.announce_task.done(), "already running"
@ -72,3 +75,6 @@ class BlobAnnouncer:
def stop(self): def stop(self):
if self.announce_task and not self.announce_task.done(): if self.announce_task and not self.announce_task.done():
self.announce_task.cancel() self.announce_task.cancel()
def wait(self):
return self._done.wait()

View file

@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300 # 5 minutes
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5 CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
RPC_ID_LENGTH = 20 RPC_ID_LENGTH = 20
PROTOCOL_VERSION = 1 PROTOCOL_VERSION = 1
BOTTOM_OUT_LIMIT = 3
MSG_SIZE_LIMIT = 1400 MSG_SIZE_LIMIT = 1400

View file

@ -202,25 +202,23 @@ class Node:
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.K) -> IterativeNodeFinder: max_results: int = constants.K) -> IterativeNodeFinder:
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist) key, max_results, None, shortlist)
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = 40,
max_results: int = -1) -> IterativeValueFinder: max_results: int = -1) -> IterativeValueFinder:
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist) key, max_results, None, shortlist)
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2, async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']: ) -> typing.List['KademliaPeer']:
peers = [] peers = []
async for iteration_peers in self.get_iterative_node_finder( async for iteration_peers in self.get_iterative_node_finder(
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results): node_id, shortlist=shortlist, max_results=max_results):
peers.extend(iteration_peers) peers.extend(iteration_peers)
distance = Distance(node_id) distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id)) peers.sort(key=lambda peer: distance(peer.node_id))

View file

@ -190,3 +190,6 @@ class KademliaPeer:
def compact_ip(self): def compact_ip(self):
return make_compact_ip(self.address) return make_compact_ip(self.address)
def __str__(self):
return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})"

View file

@ -1,6 +1,6 @@
import asyncio import asyncio
from itertools import chain from itertools import chain
from collections import defaultdict from collections import defaultdict, OrderedDict
import typing import typing
import logging import logging
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder: class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH: if len(key) != constants.HASH_LENGTH:
@ -85,28 +85,22 @@ class IterativeFinder:
self.protocol = protocol self.protocol = protocol
self.key = key self.key = key
self.bottom_out_limit = bottom_out_limit self.max_results = max(constants.K, max_results)
self.max_results = max_results
self.exclude = exclude or [] self.exclude = exclude or []
self.active: typing.Set['KademliaPeer'] = set() self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set() self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key) self.distance = Distance(key)
self.closest_peer: typing.Optional['KademliaPeer'] = None
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
self.iteration_queue = asyncio.Queue(loop=self.loop) self.iteration_queue = asyncio.Queue(loop=self.loop)
self.running_probes: typing.Set[asyncio.Task] = set() self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0 self.iteration_count = 0
self.bottom_out_count = 0
self.running = False self.running = False
self.tasks: typing.List[asyncio.Task] = [] self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
for peer in get_shortlist(routing_table, key, shortlist): for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id: if peer.node_id:
self._add_active(peer) self._add_active(peer, force=True)
else: else:
# seed nodes # seed nodes
self._schedule_probe(peer) self._schedule_probe(peer)
@ -138,15 +132,14 @@ class IterativeFinder:
""" """
return [] return []
def _is_closer(self, peer: 'KademliaPeer') -> bool: def _add_active(self, peer, force=False):
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) if not force and self.peer_manager.peer_is_good(peer) is False:
return
def _add_active(self, peer): if peer in self.contacted:
return
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer) self.active[peer] = self.distance(peer.node_id)
if self._is_closer(peer): self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer) self._add_active(peer)
@ -158,33 +151,43 @@ class IterativeFinder:
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
peer.udp_port, address, udp_port) peer.udp_port, address, udp_port)
self.check_result_ready(response) self.check_result_ready(response)
self._log_state()
def _reset_closest(self, peer):
if peer in self.active:
del self.active[peer]
async def _send_probe(self, peer: 'KademliaPeer'): async def _send_probe(self, peer: 'KademliaPeer'):
try: try:
response = await self.send_probe(peer) response = await self.send_probe(peer)
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.active.discard(peer) self._reset_closest(peer)
return return
except ValueError as err: except ValueError as err:
log.warning(str(err)) log.warning(str(err))
self.active.discard(peer) self._reset_closest(peer)
return return
except TransportNotConnected: except TransportNotConnected:
return self.aclose() return self.aclose()
except RemoteException: except RemoteException:
self._reset_closest(peer)
return return
return await self._handle_probe_result(peer, response) return await self._handle_probe_result(peer, response)
async def _search_round(self): def _search_round(self):
""" """
Send up to constants.alpha (5) probes to closest active peers Send up to constants.alpha (5) probes to closest active peers
""" """
added = 0 added = 0
to_probe = list(self.active - self.contacted) for index, peer in enumerate(self.active.keys()):
to_probe.sort(key=lambda peer: self.distance(self.key)) if index == 0:
for peer in to_probe: log.debug("closest to probe: %s", peer.node_id.hex()[:8])
if added >= constants.ALPHA: if peer in self.contacted:
continue
if len(self.running_probes) >= constants.ALPHA:
break
if index > (constants.K + len(self.running_probes)):
break break
origin_address = (peer.address, peer.udp_port) origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude: if origin_address in self.exclude:
@ -206,33 +209,22 @@ class IterativeFinder:
t = self.loop.create_task(self._send_probe(peer)) t = self.loop.create_task(self._send_probe(peer))
def callback(_): def callback(_):
self.running_probes.difference_update({ self.running_probes.pop(peer, None)
probe for probe in self.running_probes if probe.done() or probe == t if self.running:
}) self._search_round()
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
t.add_done_callback(callback) t.add_done_callback(callback)
self.running_probes.add(t) self.running_probes[peer] = t
async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): def _log_state(self):
try: log.debug("[%s] check result: %i active nodes %i contacted",
if self.running: self.key.hex()[:8], len(self.active), len(self.contacted))
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
if self.running:
self.loop.call_soon(self.aclose)
def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
def __aiter__(self): def __aiter__(self):
if self.running: if self.running:
raise Exception("already running") raise Exception("already running")
self.running = True self.running = True
self._search() self.loop.call_soon(self._search_round)
return self return self
async def __anext__(self) -> typing.List['KademliaPeer']: async def __anext__(self) -> typing.List['KademliaPeer']:
@ -252,20 +244,19 @@ class IterativeFinder:
def aclose(self): def aclose(self):
self.running = False self.running = False
self.iteration_queue.put_nowait(None) self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls): for task in chain(self.tasks, self.running_probes.values()):
task.cancel() task.cancel()
self.tasks.clear() self.tasks.clear()
self.running_probes.clear() self.running_probes.clear()
self.delayed_calls.clear()
class IterativeNodeFinder(IterativeFinder): class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist) shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set() self.yielded_peers: typing.Set['KademliaPeer'] = set()
@ -276,14 +267,14 @@ class IterativeNodeFinder(IterativeFinder):
return FindNodeResponse(self.key, response) return FindNodeResponse(self.key, response)
def search_exhausted(self): def search_exhausted(self):
self.put_result(self.active, finish=True) self.put_result(self.active.keys(), finish=True)
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [ not_yet_yielded = [
peer for peer in from_iter peer for peer in from_iter
if peer not in self.yielded_peers if peer not in self.yielded_peers
and peer.node_id != self.protocol.node_id and peer.node_id != self.protocol.node_id
and self.peer_manager.peer_is_good(peer) is not False and self.peer_manager.peer_is_good(peer) is True # return only peers who answered
] ]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:max(constants.K, self.max_results)] to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
@ -298,26 +289,16 @@ class IterativeNodeFinder(IterativeFinder):
if found: if found:
log.debug("found") log.debug("found")
return self.put_result(self.active, finish=True) return self.put_result(self.active.keys(), finish=True)
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
# self.bottom_out_count, self.iteration_count)
self.bottom_out_count = 0
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("limit hit")
self.put_result(self.active, True)
class IterativeValueFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None): shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist) shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set() self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer # this tracks the index of the most recent page we requested from each peer
@ -362,23 +343,12 @@ class IterativeValueFinder(IterativeFinder):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses] for compact_addr in response.found_compact_addresses]
to_yield = [] to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers: for blob_peer in blob_peers:
if blob_peer not in self.blob_peers: if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer) self.blob_peers.add(blob_peer)
to_yield.append(blob_peer) to_yield.append(blob_peer)
if to_yield: if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield) self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
self.iteration_queue.put_nowait(None)
def get_initial_result(self) -> typing.List['KademliaPeer']: def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key): if self.protocol.data_store.has_peers_for_blob(self.key):

View file

@ -6,6 +6,7 @@ import itertools
from prometheus_client import Gauge from prometheus_client import Gauge
from lbry import utils
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.protocol.distance import Distance from lbry.dht.protocol.distance import Distance
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -70,9 +71,8 @@ class KBucket:
if len(self.peers) < constants.K: if len(self.peers) < constants.K:
self.peers.append(peer) self.peers.append(peer)
self.peer_in_routing_table_metric.labels("global").inc() self.peer_in_routing_table_metric.labels("global").inc()
if peer.node_id[0] == self._node_id[0]: bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length() self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).inc()
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc()
return True return True
else: else:
return False return False
@ -140,9 +140,8 @@ class KBucket:
def remove_peer(self, peer: 'KademliaPeer') -> None: def remove_peer(self, peer: 'KademliaPeer') -> None:
self.peers.remove(peer) self.peers.remove(peer)
self.peer_in_routing_table_metric.labels("global").dec() self.peer_in_routing_table_metric.labels("global").dec()
if peer.node_id[0] == self._node_id[0]: bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length() self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).dec()
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec()
def key_in_range(self, key: bytes) -> bool: def key_in_range(self, key: bytes) -> bool:
""" Tests whether the specified key (i.e. node ID) is in the range """ Tests whether the specified key (i.e. node ID) is in the range

View file

@ -381,6 +381,7 @@ class FileManagerComponent(Component):
class BackgroundDownloaderComponent(Component): class BackgroundDownloaderComponent(Component):
MIN_PREFIX_COLLIDING_BITS = 8
component_name = BACKGROUND_DOWNLOADER_COMPONENT component_name = BACKGROUND_DOWNLOADER_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT] depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
@ -412,12 +413,18 @@ class BackgroundDownloaderComponent(Component):
while True: while True:
self.space_available = await self.space_manager.get_free_space_mb(True) self.space_available = await self.space_manager.get_free_space_mb(True)
if not self.is_busy and self.space_available > 10: if not self.is_busy and self.space_available > 10:
blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if self._download_next_close_blob_hash()
key.hex() not in self.blob_manager.completed_blob_hashes), None)
if blob_hash:
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
await asyncio.sleep(self.download_loop_delay_seconds) await asyncio.sleep(self.download_loop_delay_seconds)
def _download_next_close_blob_hash(self):
node_id = self.dht_node.protocol.node_id
for blob_hash in self.dht_node.stored_blob_hashes:
if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
continue
if utils.get_colliding_prefix_bits(node_id, blob_hash) >= self.MIN_PREFIX_COLLIDING_BITS:
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
return
async def start(self): async def start(self):
self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT) self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
if not self.component_manager.has_component(DHT_COMPONENT): if not self.component_manager.has_component(DHT_COMPONENT):

View file

@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType):
""" """
@requires(DHT_COMPONENT) @requires(DHT_COMPONENT)
async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None): async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
""" """
Get peers for blob hash Get peers for blob hash
Usage: Usage:
peer_list (<blob_hash> | --blob_hash=<blob_hash>) peer_list (<blob_hash> | --blob_hash=<blob_hash>)
[<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
[--page=<page>] [--page_size=<page_size>] [--page=<page>] [--page_size=<page_size>]
Options: Options:
--blob_hash=<blob_hash> : (str) find available peers for this blob hash --blob_hash=<blob_hash> : (str) find available peers for this blob hash
--search_bottom_out_limit=<search_bottom_out_limit> : (int) the number of search probes in a row
that don't find any new peers
before giving up and returning
--page=<page> : (int) page to return during paginating --page=<page> : (int) page to return during paginating
--page_size=<page_size> : (int) number of items on page during pagination --page_size=<page_size> : (int) number of items on page during pagination
@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType):
if not is_valid_blobhash(blob_hash): if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error # TODO: use error from lbry.error
raise Exception("invalid blob hash") raise Exception("invalid blob hash")
if search_bottom_out_limit is not None:
search_bottom_out_limit = int(search_bottom_out_limit)
if search_bottom_out_limit <= 0:
# TODO: use error from lbry.error
raise Exception("invalid bottom out limit")
else:
search_bottom_out_limit = 4
peers = [] peers = []
peer_q = asyncio.Queue(loop=self.component_manager.loop) peer_q = asyncio.Queue(loop=self.component_manager.loop)
await self.dht_node._peers_for_value_producer(blob_hash, peer_q) await self.dht_node._peers_for_value_producer(blob_hash, peer_q)

View file

@ -204,7 +204,13 @@ class AsyncioTestCase(unittest.TestCase):
def add_timeout(self): def add_timeout(self):
if self.TIMEOUT: if self.TIMEOUT:
self.loop.call_later(self.TIMEOUT, self.cancel) self.loop.call_later(self.TIMEOUT, self.check_timeout, time())
def check_timeout(self, started):
if time() - started >= self.TIMEOUT:
self.cancel()
else:
self.loop.call_later(self.TIMEOUT, self.check_timeout, started)
class AdvanceTimeTestCase(AsyncioTestCase): class AdvanceTimeTestCase(AsyncioTestCase):

View file

@ -474,3 +474,18 @@ class LockWithMetrics(asyncio.Lock):
return super().release() return super().release()
finally: finally:
self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time) self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time)
def get_colliding_prefix_bits(first_value: bytes, second_value: bytes):
"""
Calculates the amount of colliding prefix bits between <first_value> and <second_value>.
This is given by the amount of bits that are the same until the first different one (via XOR),
starting from the most significant bit to the least significant bit.
:param first_value: first value to compare, bigger than size.
:param second_value: second value to compare, bigger than size.
:return: amount of prefix colliding bits.
"""
assert len(first_value) == len(second_value), "length should be the same"
size = len(first_value) * 8
first_value, second_value = int.from_bytes(first_value, "big"), int.from_bytes(second_value, "big")
return size - (first_value ^ second_value).bit_length()

View file

@ -9,7 +9,7 @@ if typing.TYPE_CHECKING:
def get_time_accelerator(loop: asyncio.AbstractEventLoop, def get_time_accelerator(loop: asyncio.AbstractEventLoop,
now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]: instant_step: bool = False) -> typing.Callable[[float], typing.Awaitable[None]]:
""" """
Returns an async advance() function Returns an async advance() function
@ -17,32 +17,22 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop,
made by call_later, call_at, and call_soon. made by call_later, call_at, and call_soon.
""" """
_time = now or loop.time() original = loop.time
loop.time = functools.wraps(loop.time)(lambda: _time) _drift = 0
loop.time = functools.wraps(loop.time)(lambda: original() + _drift)
async def accelerate_time(seconds: float) -> None: async def accelerate_time(seconds: float) -> None:
nonlocal _time nonlocal _drift
if seconds < 0: if seconds < 0:
raise ValueError(f'Cannot go back in time ({seconds} seconds)') raise ValueError(f'Cannot go back in time ({seconds} seconds)')
_time += seconds _drift += seconds
await past_events()
await asyncio.sleep(0) await asyncio.sleep(0)
async def past_events() -> None:
while loop._scheduled:
timer: asyncio.TimerHandle = loop._scheduled[0]
if timer not in loop._ready and timer._when <= _time:
loop._scheduled.remove(timer)
loop._ready.append(timer)
if timer._when > _time:
break
await asyncio.sleep(0)
async def accelerator(seconds: float): async def accelerator(seconds: float):
steps = seconds * 10.0 steps = seconds * 10.0 if not instant_step else 1
for _ in range(max(int(steps), 1)): for _ in range(max(int(steps), 1)):
await accelerate_time(0.1) await accelerate_time(seconds/steps)
return accelerator return accelerator

View file

@ -1,60 +1,68 @@
import contextlib import contextlib
import logging
import typing import typing
import binascii import binascii
import socket import socket
import asyncio import asyncio
from lbry.testcase import AsyncioTestCase from lbry.testcase import AsyncioTestCase
from tests import dht_mocks from tests import dht_mocks
from lbry.dht.protocol.distance import Distance
from lbry.conf import Config from lbry.conf import Config
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht.peer import PeerManager, make_kademlia_peer
from lbry.dht.blob_announcer import BlobAnnouncer from lbry.dht.blob_announcer import BlobAnnouncer
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from unittest import skip
class TestBlobAnnouncer(AsyncioTestCase): class TestBlobAnnouncer(AsyncioTestCase):
async def setup_node(self, peer_addresses, address, node_id): async def setup_node(self, peer_addresses, address, node_id):
self.nodes: typing.Dict[int, Node] = {} self.nodes: typing.Dict[int, Node] = {}
self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time()) self.advance = dht_mocks.get_time_accelerator(self.loop)
self.instant_advance = dht_mocks.get_time_accelerator(self.loop)
self.conf = Config() self.conf = Config()
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
await self.storage.open()
self.peer_manager = PeerManager(self.loop) self.peer_manager = PeerManager(self.loop)
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
await self.node.start_listening(address) await self.node.start_listening(address)
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses])
for node_id, address in peer_addresses: for first_peer in self.nodes.values():
await self.add_peer(node_id, address) for second_peer in self.nodes.values():
if first_peer == second_peer:
continue
self.add_peer_to_routing_table(first_peer, second_peer)
self.add_peer_to_routing_table(second_peer, first_peer)
await self.advance(0.1) # just to make pings go through
self.node.joined.set() self.node.joined.set()
self.node._refresh_task = self.loop.create_task(self.node.refresh_node()) self.node._refresh_task = self.loop.create_task(self.node.refresh_node())
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
await self.storage.open()
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
async def add_peer(self, node_id, address, add_to_routing_table=True): async def add_peer(self, node_id, address, add_to_routing_table=True):
#print('add', node_id.hex()[:8], address)
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
await n.start_listening(address) await n.start_listening(address)
self.nodes.update({len(self.nodes): n}) self.nodes.update({len(self.nodes): n})
if add_to_routing_table: if add_to_routing_table:
self.node.protocol.add_peer( self.add_peer_to_routing_table(self.node, n)
make_kademlia_peer(
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port def add_peer_to_routing_table(self, adder, being_added):
) adder.protocol.add_peer(
make_kademlia_peer(
being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port
) )
)
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def _test_network_context(self, peer_addresses=None): async def _test_network_context(self, peer_count=200):
self.peer_addresses = peer_addresses or [ self.peer_addresses = [
(constants.generate_id(2), '1.2.3.2'), (constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
(constants.generate_id(3), '1.2.3.3'), for i in range(1, peer_count + 1)
(constants.generate_id(4), '1.2.3.4'),
(constants.generate_id(5), '1.2.3.5'),
(constants.generate_id(6), '1.2.3.6'),
(constants.generate_id(7), '1.2.3.7'),
(constants.generate_id(8), '1.2.3.8'),
(constants.generate_id(9), '1.2.3.9'),
] ]
try: try:
with dht_mocks.mock_network_loop(self.loop): with dht_mocks.mock_network_loop(self.loop):
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1)) await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000))
yield yield
finally: finally:
self.blob_announcer.stop() self.blob_announcer.stop()
@ -73,43 +81,58 @@ class TestBlobAnnouncer(AsyncioTestCase):
) )
) )
await peer.ping() await peer.ping()
return peer return last_node
@skip("Something from a previous test is leaking into this test and causing it to fail intermittently")
async def test_announce_blobs(self): async def test_announce_blobs(self):
blob1 = binascii.hexlify(b'1' * 48).decode() blob1 = binascii.hexlify(b'1' * 48).decode()
blob2 = binascii.hexlify(b'2' * 48).decode() blob2 = binascii.hexlify(b'2' * 48).decode()
async with self._test_network_context(): async with self._test_network_context(peer_count=100):
await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True) await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True)
await self.storage.db.execute( await self.storage.add_blobs(
"update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)", *((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)),
(blob1, blob2) finished=True)
) await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1")
to_announce = await self.storage.get_blobs_to_announce() to_announce = await self.storage.get_blobs_to_announce()
self.assertEqual(2, len(to_announce)) self.assertEqual(92, len(to_announce))
self.blob_announcer.start(batch_size=1) # so it covers batching logic self.blob_announcer.start(batch_size=10) # so it covers batching logic
# takes 60 seconds to start, but we advance 120 to ensure it processed all batches # takes 60 seconds to start, but we advance 120 to ensure it processed all batches
await self.advance(60.0 * 2) ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait())
await self.instant_advance(60.0)
await ongoing_announcements
to_announce = await self.storage.get_blobs_to_announce() to_announce = await self.storage.get_blobs_to_announce()
self.assertEqual(0, len(to_announce)) self.assertEqual(0, len(to_announce))
self.blob_announcer.stop() self.blob_announcer.stop()
# as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI
tolerance = 0.8 # at least 80% of the announcements are within the top K
for blob in await self.storage.get_all_blob_hashes():
distance = Distance(bytes.fromhex(blob))
candidates = list(self.nodes.values())
candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id))
has_it = 0
for index, node in enumerate(candidates[:constants.K], start=1):
if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)):
has_it += 1
else:
logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8])
self.assertGreaterEqual(has_it, int(tolerance * constants.K))
# test that we can route from a poorly connected peer all the way to the announced blob # test that we can route from a poorly connected peer all the way to the announced blob
await self.chain_peer(constants.generate_id(10), '1.2.3.10') current = len(self.nodes)
await self.chain_peer(constants.generate_id(11), '1.2.3.11') await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10')
await self.chain_peer(constants.generate_id(12), '1.2.3.12') await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11')
await self.chain_peer(constants.generate_id(13), '1.2.3.13') await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12')
await self.chain_peer(constants.generate_id(14), '1.2.3.14') await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13')
await self.advance(61.0) last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14')
last = self.nodes[len(self.nodes) - 1]
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop) search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
search_q.put_nowait(blob1) search_q.put_nowait(blob1)
_, task = last.accumulate_peers(search_q, peer_q) _, task = last.accumulate_peers(search_q, peer_q)
found_peers = await peer_q.get() found_peers = await asyncio.wait_for(peer_q.get(), 1.0)
task.cancel() task.cancel()
self.assertEqual(1, len(found_peers)) self.assertEqual(1, len(found_peers))
@ -119,21 +142,13 @@ class TestBlobAnnouncer(AsyncioTestCase):
async def test_popular_blob(self): async def test_popular_blob(self):
peer_count = 150 peer_count = 150
addresses = [ blob_hash = constants.generate_id(99999)
(constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
for i in range(peer_count)
]
blob_hash = b'1' * 48
async with self._test_network_context(peer_addresses=addresses): async with self._test_network_context(peer_count=peer_count):
total_seen = set() total_seen = set()
announced_to = self.nodes[0] announced_to = self.nodes.pop(0)
for i in range(1, peer_count): for i, node in enumerate(self.nodes.values()):
node = self.nodes[i] self.add_peer_to_routing_table(announced_to, node)
kad_peer = make_kademlia_peer(
node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port
)
await announced_to.protocol._add_peer(kad_peer)
peer = node.protocol.get_rpc_peer( peer = node.protocol.get_rpc_peer(
make_kademlia_peer( make_kademlia_peer(
announced_to.protocol.node_id, announced_to.protocol.node_id,
@ -144,15 +159,15 @@ class TestBlobAnnouncer(AsyncioTestCase):
response = await peer.store(blob_hash) response = await peer.store(blob_hash)
self.assertEqual(response, b'OK') self.assertEqual(response, b'OK')
peers_for_blob = await peer.find_value(blob_hash, 0) peers_for_blob = await peer.find_value(blob_hash, 0)
if i == 1: if i == 0:
self.assertNotIn(blob_hash, peers_for_blob) self.assertNotIn(blob_hash, peers_for_blob)
self.assertEqual(peers_for_blob[b'p'], 0) self.assertEqual(peers_for_blob[b'p'], 0)
else: else:
self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K)) self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K))
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i) self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1)
if i - 1 > constants.K: if i - 1 > constants.K:
self.assertEqual(len(peers_for_blob[b'contacts']), constants.K) self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1) self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1)
seen = set(peers_for_blob[blob_hash]) seen = set(peers_for_blob[blob_hash])
self.assertEqual(len(seen), constants.K) self.assertEqual(len(seen), constants.K)
self.assertEqual(len(peers_for_blob[blob_hash]), len(seen)) self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
@ -167,5 +182,5 @@ class TestBlobAnnouncer(AsyncioTestCase):
seen.intersection_update(page_x_set) seen.intersection_update(page_x_set)
total_seen.update(page_x_set) total_seen.update(page_x_set)
else: else:
self.assertEqual(len(peers_for_blob[b'contacts']), i - 1) self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page
self.assertEqual(len(total_seen), peer_count - 2) self.assertEqual(len(total_seen), peer_count - 2)

View file

@ -12,7 +12,6 @@ from lbry.extras.daemon.storage import SQLiteStorage
class TestNodePingQueueDiscover(AsyncioTestCase): class TestNodePingQueueDiscover(AsyncioTestCase):
TIMEOUT = None # not supported as it advances time
async def test_ping_queue_discover(self): async def test_ping_queue_discover(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.set_debug(False) loop.set_debug(False)
@ -29,7 +28,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
(constants.generate_id(9), '1.2.3.9'), (constants.generate_id(9), '1.2.3.9'),
] ]
with dht_mocks.mock_network_loop(loop): with dht_mocks.mock_network_loop(loop):
advance = dht_mocks.get_time_accelerator(loop, loop.time()) advance = dht_mocks.get_time_accelerator(loop)
# start the nodes # start the nodes
nodes: typing.Dict[int, Node] = { nodes: typing.Dict[int, Node] = {
i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address) i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
@ -93,7 +92,6 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
class TestTemporarilyLosingConnection(AsyncioTestCase): class TestTemporarilyLosingConnection(AsyncioTestCase):
TIMEOUT = None # not supported as it advances time
@unittest.SkipTest @unittest.SkipTest
async def test_losing_connection(self): async def test_losing_connection(self):
async def wait_for(check_ok, insist, timeout=20): async def wait_for(check_ok, insist, timeout=20):
@ -131,7 +129,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase):
await asyncio.gather(*[n.joined.wait() for n in nodes]) await asyncio.gather(*[n.joined.wait() for n in nodes])
node = nodes[-1] node = nodes[-1]
advance = dht_mocks.get_time_accelerator(loop, loop.time()) advance = dht_mocks.get_time_accelerator(loop)
await advance(500) await advance(500)
# Join the network, assert that at least the known peers are in RT # Join the network, assert that at least the known peers are in RT

19
tests/unit/test_utils.py Normal file
View file

@ -0,0 +1,19 @@
import unittest
from lbry import utils
class UtilsTestCase(unittest.TestCase):
def test_get_colliding_prefix_bits(self):
self.assertEqual(
0, utils.get_colliding_prefix_bits(0xffffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
self.assertEqual(
1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))