From 0da2827c78d880c8d3cbcfbc4ecc5ab91525b3a1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 May 2019 19:54:36 -0300 Subject: [PATCH 01/20] fix dht deadlock --- lbrynet/dht/node.py | 5 ++- lbrynet/dht/protocol/protocol.py | 37 ++++++++++++++----- tests/unit/dht/protocol/test_protocol.py | 4 ++ tests/unit/dht/protocol/test_routing_table.py | 4 +- tests/unit/dht/test_blob_announcer.py | 5 ++- 5 files changed, 42 insertions(+), 13 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 4db51a55a..10af7d1c2 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -101,6 +101,7 @@ class Node: self._refresh_task.cancel() if self.protocol and self.protocol.ping_queue.running: self.protocol.ping_queue.stop() + self.protocol.stop() if self.listening_port is not None: self.listening_port.close() self._join_task = None @@ -113,6 +114,7 @@ class Node: lambda: self.protocol, (interface, self.internal_udp_port) ) log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port) + self.protocol.start() else: log.warning("Already bound to port %s", self.listening_port) @@ -130,7 +132,8 @@ class Node: if known_node_urls: for host, port in known_node_urls: address = await resolve_host(host, port, proto='udp') - if (address, port) not in known_node_addresses and address != self.protocol.external_ip: + if (address, port) not in known_node_addresses and\ + (address, port) != (self.protocol.external_ip, self.protocol.udp_port): known_node_addresses.append((address, port)) url_to_addr[address] = host diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index f1aab10ad..2c5f85a7f 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -207,7 +207,7 @@ class PingQueue: try: if self._protocol.peer_manager.peer_is_good(peer): if peer not in self._protocol.routing_table.get_peers(): - await self._protocol.add_peer(peer) + self._protocol.add_peer(peer) return await self._protocol.get_rpc_peer(peer).ping() except asyncio.TimeoutError: @@ -268,11 +268,19 @@ class KademliaProtocol(DatagramProtocol): self.node_rpc = KademliaRPC(self, self.loop, self.peer_port) self.rpc_timeout = rpc_timeout self._split_lock = asyncio.Lock(loop=self.loop) + self._to_remove: typing.Set['KademliaPeer'] = set() + self._to_add: typing.Set['KademliaPeer'] = set() + self.maintaing_routing_task: typing.Optional[asyncio.Task] = None def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC: return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer) + def start(self, force_delay=None): + self.maintaing_routing_task = asyncio.create_task(self.routing_table_task(force_delay)) + def stop(self): + if self.maintaing_routing_task: + self.maintaing_routing_task.cancel() if self.transport: self.disconnect() @@ -299,6 +307,7 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): + log.debug("Trying to add %s:%d", peer.address, peer.udp_port) for p in self.routing_table.get_peers(): if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: self.routing_table.remove_peer(p) @@ -363,11 +372,23 @@ class KademliaProtocol(DatagramProtocol): self.routing_table.buckets[bucket_index].remove_peer(to_replace) return await self._add_peer(peer) - async def add_peer(self, peer: 'KademliaPeer') -> bool: + def add_peer(self, peer: 'KademliaPeer') -> bool: if peer.node_id == self.node_id: return False - async with self._split_lock: - return await self._add_peer(peer) + self._to_add.add(peer) + + async def routing_table_task(self, force_delay=None): + while True: + while self._to_remove: + async with self._split_lock: + peer = self._to_remove.pop() + log.debug("Trying to remove %s:%d", peer.address, peer.udp_port) + self.routing_table.remove_peer(peer) + self.routing_table.join_buckets() + while self._to_add: + async with self._split_lock: + await self._add_peer(self._to_add.pop()) + await asyncio.sleep(force_delay or constants.rpc_timeout) async def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(), @@ -416,7 +437,7 @@ class KademliaProtocol(DatagramProtocol): self.ping_queue.enqueue_maybe_ping(peer) # only add a requesting contact to the routing table if it has replied to one of our requests elif is_good is True: - await self.add_peer(peer) + self.add_peer(peer) except ValueError as err: log.debug("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), @@ -459,7 +480,7 @@ class KademliaProtocol(DatagramProtocol): self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) if not df.cancelled(): df.set_result(response_datagram) - await self.add_peer(peer) + self.add_peer(peer) else: log.warning("%s:%i replied, but after we cancelled the request attempt", peer.address, peer.udp_port) @@ -531,9 +552,7 @@ class KademliaProtocol(DatagramProtocol): except (asyncio.TimeoutError, RemoteException): self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: - async with self._split_lock: - self.routing_table.remove_peer(peer) - self.routing_table.join_buckets() + self._to_remove.add(peer) raise async def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): diff --git a/tests/unit/dht/protocol/test_protocol.py b/tests/unit/dht/protocol/test_protocol.py index d5c4f2da8..e183a05bd 100644 --- a/tests/unit/dht/protocol/test_protocol.py +++ b/tests/unit/dht/protocol/test_protocol.py @@ -99,6 +99,7 @@ class TestProtocol(AsyncioTestCase): self.loop, PeerManager(self.loop), node_id, address, udp_port, tcp_port ) await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444)) + proto.start(0.1) return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port) async def test_add_peer_after_handle_request(self): @@ -112,6 +113,7 @@ class TestProtocol(AsyncioTestCase): self.loop, PeerManager(self.loop), node_id1, '1.2.3.4', 4444, 3333 ) await self.loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) + peer1.start(0.1) peer2, peer_2_from_peer_1 = await self._make_protocol(peer1, node_id2, '1.2.3.5', 4444, 3333) peer3, peer_3_from_peer_1 = await self._make_protocol(peer1, node_id3, '1.2.3.6', 4444, 3333) @@ -119,6 +121,7 @@ class TestProtocol(AsyncioTestCase): # peers who reply should be added await peer1.get_rpc_peer(peer_2_from_peer_1).ping() + await asyncio.sleep(0.5) self.assertListEqual([peer_2_from_peer_1], peer1.routing_table.get_peers()) peer1.routing_table.remove_peer(peer_2_from_peer_1) @@ -137,6 +140,7 @@ class TestProtocol(AsyncioTestCase): self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) pong = await peer1_from_peer4.ping() self.assertEqual(b'pong', pong) + await asyncio.sleep(0.5) self.assertEqual(1, len(peer1.routing_table.get_peers())) self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) peer1.routing_table.buckets[0].peers.clear() diff --git a/tests/unit/dht/protocol/test_routing_table.py b/tests/unit/dht/protocol/test_routing_table.py index a27b316e2..c5d832783 100644 --- a/tests/unit/dht/protocol/test_routing_table.py +++ b/tests/unit/dht/protocol/test_routing_table.py @@ -57,7 +57,7 @@ class TestRouting(AsyncioTestCase): node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) - added = await node_1.protocol.add_peer(peer) + added = await node_1.protocol._add_peer(peer) self.assertEqual(True, added) contact_cnt += 1 @@ -88,7 +88,7 @@ class TestRouting(AsyncioTestCase): # set all of the peers to good (as to not attempt pinging stale ones during split) node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port) node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port) - await node_1.protocol.add_peer(peer) + await node_1.protocol._add_peer(peer) # check that bucket 0 is always the one covering the local node id self.assertEqual(True, node_1.protocol.routing_table.buckets[0].key_in_range(node_1.protocol.node_id)) self.assertEqual(40, len(node_1.protocol.routing_table.get_peers())) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index f1fca0717..840e4ae75 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -21,6 +21,7 @@ class TestBlobAnnouncer(AsyncioTestCase): await self.storage.open() self.peer_manager = PeerManager(self.loop) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) + self.node.protocol.start(0.1) await self.node.start_listening(address) self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) for node_id, address in peer_addresses: @@ -30,9 +31,10 @@ class TestBlobAnnouncer(AsyncioTestCase): async def add_peer(self, node_id, address, add_to_routing_table=True): n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) await n.start_listening(address) + n.protocol.start(0.1) self.nodes.update({len(self.nodes): n}) if add_to_routing_table: - await self.node.protocol.add_peer( + self.node.protocol.add_peer( self.peer_manager.get_kademlia_peer( n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port ) @@ -98,6 +100,7 @@ class TestBlobAnnouncer(AsyncioTestCase): await self.chain_peer(constants.generate_id(12), '1.2.3.12') await self.chain_peer(constants.generate_id(13), '1.2.3.13') await self.chain_peer(constants.generate_id(14), '1.2.3.14') + await self.advance(61.0) last = self.nodes[len(self.nodes) - 1] search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop) From e675f1387cd94550e5240e99cacdb1f5641e3d97 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 May 2019 04:58:50 -0300 Subject: [PATCH 02/20] remove peer junction and some refactoring --- lbrynet/dht/node.py | 111 ++++++------------ .../dht/protocol/async_generator_junction.py | 94 --------------- lbrynet/dht/protocol/iterative_find.py | 58 +++++---- lbrynet/dht/protocol/protocol.py | 42 ++++--- .../dht/protocol/test_async_gen_junction.py | 102 ---------------- tests/unit/dht/protocol/test_protocol.py | 4 +- tests/unit/dht/test_blob_announcer.py | 4 +- 7 files changed, 89 insertions(+), 326 deletions(-) delete mode 100644 lbrynet/dht/protocol/async_generator_junction.py delete mode 100644 tests/unit/dht/protocol/test_async_gen_junction.py diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 10af7d1c2..955798c8e 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -2,11 +2,8 @@ import logging import asyncio import typing import binascii -import contextlib from lbrynet.utils import resolve_host from lbrynet.dht import constants -from lbrynet.dht.error import RemoteException -from lbrynet.dht.protocol.async_generator_junction import AsyncGeneratorJunction from lbrynet.dht.protocol.distance import Distance from lbrynet.dht.protocol.iterative_find import IterativeNodeFinder, IterativeValueFinder from lbrynet.dht.protocol.protocol import KademliaProtocol @@ -138,28 +135,13 @@ class Node: url_to_addr[address] = host if known_node_addresses: - while not self.protocol.routing_table.get_peers(): - success = False - # ping the seed nodes, this will set their node ids (since we don't know them ahead of time) - for address, port in known_node_addresses: - peer = self.protocol.get_rpc_peer(KademliaPeer(self.loop, address, udp_port=port)) - try: - await peer.ping() - success = True - except asyncio.TimeoutError: - log.warning("seed node (%s:%i) timed out in %s", url_to_addr.get(address, address), port, - round(self.protocol.rpc_timeout, 2)) - if success: - break - # now that we have the seed nodes in routing, to an iterative lookup of our own id to populate the buckets - # in the routing table with good peers who are near us - async with self.peer_search_junction(self.protocol.node_id, max_results=16) as junction: - async for peers in junction: - for peer in peers: - try: - await self.protocol.get_rpc_peer(peer).ping() - except (asyncio.TimeoutError, RemoteException): - pass + peers = [ + KademliaPeer(self.loop, address, udp_port=port) + for (address, port) in known_node_addresses + ] + while not len(self.protocol.routing_table.get_peers()): + peers.extend(await self.peer_search(self.protocol.node_id, shortlist=peers, count=32)) + self.protocol.ping_queue.enqueue_maybe_ping(*peers, delay=0.0) log.info("Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), self.protocol.routing_table.buckets_with_contacts()) @@ -186,61 +168,40 @@ class Node: return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, key, bottom_out_limit, max_results, None, shortlist) - @contextlib.asynccontextmanager - async def stream_peer_search_junction(self, hash_queue: asyncio.Queue, bottom_out_limit=20, - max_results=-1) -> AsyncGeneratorJunction: - peer_generator = AsyncGeneratorJunction(self.loop) - - async def _add_hashes_from_queue(): - while True: - blob_hash = await hash_queue.get() - peer_generator.add_generator( - self.get_iterative_value_finder( - binascii.unhexlify(blob_hash.encode()), bottom_out_limit=bottom_out_limit, - max_results=max_results - ) - ) - add_hashes_task = self.loop.create_task(_add_hashes_from_queue()) - try: - async with peer_generator as junction: - yield junction - finally: - if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()): - add_hashes_task.cancel() - - def peer_search_junction(self, node_id: bytes, max_results=constants.k*2, - bottom_out_limit=20) -> AsyncGeneratorJunction: - peer_generator = AsyncGeneratorJunction(self.loop) - peer_generator.add_generator( - self.get_iterative_node_finder( - node_id, bottom_out_limit=bottom_out_limit, max_results=max_results - ) - ) - return peer_generator - async def peer_search(self, node_id: bytes, count=constants.k, max_results=constants.k*2, - bottom_out_limit=20) -> typing.List['KademliaPeer']: - accumulated: typing.List['KademliaPeer'] = [] - async with self.peer_search_junction(node_id, max_results=max_results, - bottom_out_limit=bottom_out_limit) as junction: - async for peers in junction: - accumulated.extend(peers) + bottom_out_limit=20, shortlist: typing.Optional[typing.List] = None + ) -> typing.List['KademliaPeer']: + peers = [] + async for iteration_peers in self.get_iterative_node_finder( + node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results): + peers.extend(iteration_peers) distance = Distance(node_id) - accumulated.sort(key=lambda peer: distance(peer.node_id)) - return accumulated[:count] + peers.sort(key=lambda peer: distance(peer.node_id)) + return peers[:count] async def _accumulate_search_junction(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue): - async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701 - async for peers in search_junction: - if peers: - result_queue.put_nowait([ - peer for peer in peers - if not ( - peer.address == self.protocol.external_ip - and peer.tcp_port == self.protocol.peer_port - ) - ]) + ongoing = {} + async def __start_producing_task(): + while True: + blob_hash = await search_queue.get() + ongoing[blob_hash] = asyncio.create_task(self._value_producer(blob_hash, result_queue)) + ongoing[''] = asyncio.create_task(__start_producing_task()) + try: + while True: + await asyncio.wait(ongoing.values(), return_when='FIRST_COMPLETED') + for key in list(ongoing.keys())[:]: + if key and ongoing[key].done(): + ongoing[key] = asyncio.create_task(self._value_producer(key, result_queue)) + finally: + for task in ongoing.values(): + task.cancel() + + async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): + log.info("Searching %s", blob_hash[:8]) + async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): + result_queue.put_nowait(results) + log.info("Search expired %s", blob_hash[:8]) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ diff --git a/lbrynet/dht/protocol/async_generator_junction.py b/lbrynet/dht/protocol/async_generator_junction.py deleted file mode 100644 index 4812ca522..000000000 --- a/lbrynet/dht/protocol/async_generator_junction.py +++ /dev/null @@ -1,94 +0,0 @@ -import asyncio -import typing -import logging -import traceback -if typing.TYPE_CHECKING: - from types import AsyncGeneratorType - -log = logging.getLogger(__name__) - - -def cancel_task(task: typing.Optional[asyncio.Task]): - if task and not (task.done() or task.cancelled()): - task.cancel() - - -def drain_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]): - while tasks: - cancel_task(tasks.pop()) - - -class AsyncGeneratorJunction: - """ - A helper to interleave the results from multiple async generators into one - async generator. - """ - - def __init__(self, loop: asyncio.BaseEventLoop, queue: typing.Optional[asyncio.Queue] = None): - self.loop = loop - self.__iterator_queue = asyncio.Queue(loop=loop) - self.result_queue = queue or asyncio.Queue(loop=loop) - self.tasks: typing.List[asyncio.Task] = [] - self.running_iterators: typing.Dict[typing.AsyncGenerator, bool] = {} - self.generator_queue: asyncio.Queue = asyncio.Queue(loop=self.loop) - - @property - def running(self): - return any(self.running_iterators.values()) - - async def wait_for_generators(self): - async def iterate(iterator: typing.AsyncGenerator): - try: - async for item in iterator: - self.result_queue.put_nowait(item) - self.__iterator_queue.put_nowait(item) - finally: - self.running_iterators[iterator] = False - if not self.running: - self.__iterator_queue.put_nowait(StopAsyncIteration) - - while True: - async_gen: typing.Union[typing.AsyncGenerator, 'AsyncGeneratorType'] = await self.generator_queue.get() - self.running_iterators[async_gen] = True - self.tasks.append(self.loop.create_task(iterate(async_gen))) - - def add_generator(self, async_gen: typing.Union[typing.AsyncGenerator, 'AsyncGeneratorType']): - """ - Add an async generator. This can be called during an iteration of the generator junction. - """ - self.generator_queue.put_nowait(async_gen) - - def __aiter__(self): - return self - - async def __anext__(self): - result = await self.__iterator_queue.get() - if result is StopAsyncIteration: - raise result - return result - - def aclose(self): - async def _aclose(): - for iterator in list(self.running_iterators.keys()): - result = iterator.aclose() - if asyncio.iscoroutine(result): - await result - self.running_iterators[iterator] = False - drain_tasks(self.tasks) - raise StopAsyncIteration() - return self.loop.create_task(_aclose()) - - async def __aenter__(self): - self.tasks.append(self.loop.create_task(self.wait_for_generators())) - return self - - async def __aexit__(self, exc_type, exc, tb): - try: - await self.aclose() - except StopAsyncIteration: - pass - finally: - if exc_type: - if exc_type not in (asyncio.CancelledError, asyncio.TimeoutError, StopAsyncIteration, GeneratorExit): - err = traceback.format_exception(exc_type, exc, tb) - log.error(err) diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index 87472d87c..f60b3af6b 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -66,11 +66,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, """ if len(key) != constants.hash_length: raise ValueError("invalid key length: %i" % len(key)) - if not shortlist: - shortlist = routing_table.find_close_peers(key) - distance = Distance(key) - shortlist.sort(key=lambda peer: distance(peer.node_id), reverse=True) - return shortlist + return shortlist or routing_table.find_close_peers(key) class IterativeFinder: @@ -92,11 +88,11 @@ class IterativeFinder: self.exclude = exclude or [] self.shortlist: typing.List['KademliaPeer'] = get_shortlist(routing_table, key, shortlist) - self.active: typing.Set['KademliaPeer'] = set() + self.active: typing.List['KademliaPeer'] = [] self.contacted: typing.Set[typing.Tuple[str, int]] = set() self.distance = Distance(key) - self.closest_peer: typing.Optional['KademliaPeer'] = None if not self.shortlist else self.shortlist[0] + self.closest_peer: typing.Optional['KademliaPeer'] = None self.prev_closest_peer: typing.Optional['KademliaPeer'] = None self.iteration_queue = asyncio.Queue(loop=self.loop) @@ -139,23 +135,21 @@ class IterativeFinder: return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) def _update_closest(self): - self.shortlist.sort(key=lambda peer: self.distance(peer.node_id), reverse=True) - if self.closest_peer and self.closest_peer is not self.shortlist[-1]: - if self._is_closer(self.shortlist[-1]): + self.active.sort(key=lambda peer: self.distance(peer.node_id)) + if self.closest_peer and self.closest_peer is not self.active[0]: + if self._is_closer(self.active[0]): self.prev_closest_peer = self.closest_peer - self.closest_peer = self.shortlist[-1] + self.closest_peer = self.active[0] async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): - if peer not in self.shortlist: - self.shortlist.append(peer) - if peer not in self.active: - self.active.add(peer) + if peer not in self.active and peer.node_id: + self.active.append(peer) for contact_triple in response.get_close_triples(): node_id, address, udp_port = contact_triple if (address, udp_port) not in self.contacted: # and not self.peer_manager.is_ignored(addr_tuple) found_peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port) - if found_peer not in self.shortlist and self.peer_manager.peer_is_good(peer) is not False: - self.shortlist.append(found_peer) + if found_peer not in self.active and self.peer_manager.peer_is_good(found_peer) is not False: + self.active.append(found_peer) self._update_closest() self.check_result_ready(response) @@ -163,11 +157,13 @@ class IterativeFinder: try: response = await self.send_probe(peer) except asyncio.TimeoutError: - self.active.discard(peer) + if peer in self.active: + self.active.remove(peer) return except ValueError as err: log.warning(str(err)) - self.active.discard(peer) + if peer in self.active: + self.active.remove(peer) return except TransportNotConnected: return self.aclose() @@ -181,18 +177,18 @@ class IterativeFinder: """ added = 0 - self.shortlist.sort(key=lambda p: self.distance(p.node_id), reverse=True) - while self.running and len(self.shortlist) and added < constants.alpha: - peer = self.shortlist.pop() + for peer in chain(self.active, self.shortlist): + if added >= constants.alpha: + break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude or self.peer_manager.peer_is_good(peer) is False: continue if peer.node_id == self.protocol.node_id: continue - if (peer.address, peer.udp_port) == (self.protocol.external_ip, self.protocol.udp_port): + if origin_address == (self.protocol.external_ip, self.protocol.udp_port): continue - if (peer.address, peer.udp_port) not in self.contacted: - self.contacted.add((peer.address, peer.udp_port)) + if origin_address not in self.contacted: + self.contacted.add(origin_address) t = self.loop.create_task(self._send_probe(peer)) @@ -200,7 +196,7 @@ class IterativeFinder: self.running_probes.difference_update({ probe for probe in self.running_probes if probe.done() or probe == t }) - if not self.running_probes and self.shortlist: + if not self.running_probes: self.tasks.append(self.loop.create_task(self._search_task(0.0))) t.add_done_callback(callback) @@ -266,6 +262,7 @@ class IterativeNodeFinder(IterativeFinder): self.yielded_peers: typing.Set['KademliaPeer'] = set() async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse: + log.debug("probing %s:%d %s", peer.address, peer.udp_port, hexlify(peer.node_id)[:8] if peer.node_id else '') response = await self.protocol.get_rpc_peer(peer).find_node(self.key) return FindNodeResponse(self.key, response) @@ -273,7 +270,9 @@ class IterativeNodeFinder(IterativeFinder): self.put_result(self.active, finish=True) def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): - not_yet_yielded = [peer for peer in from_iter if peer not in self.yielded_peers] + not_yet_yielded = [ + peer for peer in from_iter if peer not in self.yielded_peers and peer.node_id != self.protocol.node_id + ] not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))] if to_yield: @@ -288,7 +287,7 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") - return self.put_result(self.shortlist, finish=True) + return self.put_result(self.active, finish=True) if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer): # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted), # self.bottom_out_count, self.iteration_count) @@ -300,9 +299,6 @@ class IterativeNodeFinder(IterativeFinder): if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: log.info("limit hit") self.put_result(self.active, True) - elif self.max_results and len(self.active) - len(self.yielded_peers) >= self.max_results: - log.debug("max results") - self.put_result(self.active, True) class IterativeValueFinder(IterativeFinder): diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 2c5f85a7f..152965b3f 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -270,13 +270,14 @@ class KademliaProtocol(DatagramProtocol): self._split_lock = asyncio.Lock(loop=self.loop) self._to_remove: typing.Set['KademliaPeer'] = set() self._to_add: typing.Set['KademliaPeer'] = set() + self._wakeup_routing_task = asyncio.Event(loop=self.loop) self.maintaing_routing_task: typing.Optional[asyncio.Task] = None def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC: return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer) - def start(self, force_delay=None): - self.maintaing_routing_task = asyncio.create_task(self.routing_table_task(force_delay)) + def start(self): + self.maintaing_routing_task = asyncio.create_task(self.routing_table_task()) def stop(self): if self.maintaing_routing_task: @@ -376,8 +377,9 @@ class KademliaProtocol(DatagramProtocol): if peer.node_id == self.node_id: return False self._to_add.add(peer) + self._wakeup_routing_task.set() - async def routing_table_task(self, force_delay=None): + async def routing_table_task(self): while True: while self._to_remove: async with self._split_lock: @@ -388,9 +390,10 @@ class KademliaProtocol(DatagramProtocol): while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) - await asyncio.sleep(force_delay or constants.rpc_timeout) + await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(0.2)) + self._wakeup_routing_task.clear() - async def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): + def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(), binascii.hexlify(self.node_id)[:8].decode()) method = message.method @@ -417,11 +420,11 @@ class KademliaProtocol(DatagramProtocol): key, = a result = self.node_rpc.find_value(sender_contact, key) - await self.send_response( + self.send_response( sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result), ) - async def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): + def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): # This is an RPC method request self.peer_manager.report_last_requested(address[0], address[1]) try: @@ -429,7 +432,7 @@ class KademliaProtocol(DatagramProtocol): except IndexError: peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1]) try: - await self._handle_rpc(peer, request_datagram) + self._handle_rpc(peer, request_datagram) # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it # will be added to our routing table if successful is_good = self.peer_manager.peer_is_good(peer) @@ -442,7 +445,7 @@ class KademliaProtocol(DatagramProtocol): log.debug("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), str(err)) - await self.send_error( + self.send_error( peer, ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(), str(err).encode()) @@ -451,13 +454,13 @@ class KademliaProtocol(DatagramProtocol): log.warning("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), str(err)) - await self.send_error( + self.send_error( peer, ErrorDatagram(ERROR_TYPE, request_datagram.rpc_id, self.node_id, str(type(err)).encode(), str(err).encode()) ) - async def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram): + def handle_response_datagram(self, address: typing.Tuple[str, int], response_datagram: ResponseDatagram): # Find the message that triggered this response if response_datagram.rpc_id in self.sent_messages: peer, df, request = self.sent_messages[response_datagram.rpc_id] @@ -531,15 +534,15 @@ class KademliaProtocol(DatagramProtocol): return if isinstance(message, RequestDatagram): - self.loop.create_task(self.handle_request_datagram(address, message)) + self.handle_request_datagram(address, message) elif isinstance(message, ErrorDatagram): self.handle_error_datagram(address, message) else: assert isinstance(message, ResponseDatagram), "sanity" - self.loop.create_task(self.handle_response_datagram(address, message)) + self.handle_response_datagram(address, message) async def send_request(self, peer: 'KademliaPeer', request: RequestDatagram) -> ResponseDatagram: - await self._send(peer, request) + self._send(peer, request) response_fut = self.sent_messages[request.rpc_id][1] try: response = await asyncio.wait_for(response_fut, self.rpc_timeout) @@ -553,15 +556,16 @@ class KademliaProtocol(DatagramProtocol): self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: self._to_remove.add(peer) + self._wakeup_routing_task.set() raise - async def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): - await self._send(peer, response) + def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): + self._send(peer, response) - async def send_error(self, peer: 'KademliaPeer', error: ErrorDatagram): - await self._send(peer, error) + def send_error(self, peer: 'KademliaPeer', error: ErrorDatagram): + self._send(peer, error) - async def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, ResponseDatagram, + def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, ResponseDatagram, ErrorDatagram]): if not self.transport or self.transport.is_closing(): raise TransportNotConnected() diff --git a/tests/unit/dht/protocol/test_async_gen_junction.py b/tests/unit/dht/protocol/test_async_gen_junction.py deleted file mode 100644 index 5dd9c8f29..000000000 --- a/tests/unit/dht/protocol/test_async_gen_junction.py +++ /dev/null @@ -1,102 +0,0 @@ -import unittest -import asyncio -from torba.testcase import AsyncioTestCase -from lbrynet.dht.protocol.async_generator_junction import AsyncGeneratorJunction - - -class MockAsyncGen: - def __init__(self, loop, result, delay, stop_cnt=10): - self.loop = loop - self.result = result - self.delay = delay - self.count = 0 - self.stop_cnt = stop_cnt - self.called_close = False - - def __aiter__(self): - return self - - async def __anext__(self): - await asyncio.sleep(self.delay, loop=self.loop) - if self.count > self.stop_cnt - 1: - raise StopAsyncIteration() - self.count += 1 - return self.result - - async def aclose(self): - self.called_close = True - - -class TestAsyncGeneratorJunction(AsyncioTestCase): - def setUp(self): - self.loop = asyncio.get_event_loop() - - async def _test_junction(self, expected, *generators): - order = [] - async with AsyncGeneratorJunction(self.loop) as junction: - for generator in generators: - junction.add_generator(generator) - async for item in junction: - order.append(item) - self.assertListEqual(order, expected) - - async def test_yield_order(self): - expected_order = [1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 2, 2, 2, 2, 2] - fast_gen = MockAsyncGen(self.loop, 1, 0.2) - slow_gen = MockAsyncGen(self.loop, 2, 0.4) - await self._test_junction(expected_order, fast_gen, slow_gen) - self.assertEqual(fast_gen.called_close, True) - self.assertEqual(slow_gen.called_close, True) - - async def test_nothing_to_yield(self): - async def __nothing(): - for _ in []: - yield self.fail("nada") - await self._test_junction([], __nothing()) - - async def test_fast_iteratiors(self): - async def __gotta_go_fast(): - for _ in range(10): - yield 0 - await self._test_junction([0]*40, __gotta_go_fast(), __gotta_go_fast(), __gotta_go_fast(), __gotta_go_fast()) - - @unittest.SkipTest - async def test_one_stopped_first(self): - expected_order = [1, 2, 1, 1, 2, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2] - fast_gen = MockAsyncGen(self.loop, 1, 0.2, 5) - slow_gen = MockAsyncGen(self.loop, 2, 0.4) - await self._test_junction(expected_order, fast_gen, slow_gen) - self.assertEqual(fast_gen.called_close, True) - self.assertEqual(slow_gen.called_close, True) - - async def test_with_non_async_gen_class(self): - expected_order = [1, 2, 1, 1, 2, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2] - - async def fast_gen(): - for i in range(10): - if i == 5: - return - await asyncio.sleep(0.2) - yield 1 - - slow_gen = MockAsyncGen(self.loop, 2, 0.4) - await self._test_junction(expected_order, fast_gen(), slow_gen) - self.assertEqual(slow_gen.called_close, True) - - async def test_stop_when_encapsulating_task_cancelled(self): - fast_gen = MockAsyncGen(self.loop, 1, 0.2) - slow_gen = MockAsyncGen(self.loop, 2, 0.4) - - async def _task(): - async with AsyncGeneratorJunction(self.loop) as junction: - junction.add_generator(fast_gen) - junction.add_generator(slow_gen) - async for _ in junction: - pass - - task = self.loop.create_task(_task()) - self.loop.call_later(1.0, task.cancel) - with self.assertRaises(asyncio.CancelledError): - await task - self.assertEqual(fast_gen.called_close, True) - self.assertEqual(slow_gen.called_close, True) diff --git a/tests/unit/dht/protocol/test_protocol.py b/tests/unit/dht/protocol/test_protocol.py index e183a05bd..bfad0165f 100644 --- a/tests/unit/dht/protocol/test_protocol.py +++ b/tests/unit/dht/protocol/test_protocol.py @@ -99,7 +99,7 @@ class TestProtocol(AsyncioTestCase): self.loop, PeerManager(self.loop), node_id, address, udp_port, tcp_port ) await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444)) - proto.start(0.1) + proto.start() return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port) async def test_add_peer_after_handle_request(self): @@ -113,7 +113,7 @@ class TestProtocol(AsyncioTestCase): self.loop, PeerManager(self.loop), node_id1, '1.2.3.4', 4444, 3333 ) await self.loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) - peer1.start(0.1) + peer1.start() peer2, peer_2_from_peer_1 = await self._make_protocol(peer1, node_id2, '1.2.3.5', 4444, 3333) peer3, peer_3_from_peer_1 = await self._make_protocol(peer1, node_id3, '1.2.3.6', 4444, 3333) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 840e4ae75..8e75e0e76 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -21,7 +21,6 @@ class TestBlobAnnouncer(AsyncioTestCase): await self.storage.open() self.peer_manager = PeerManager(self.loop) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) - self.node.protocol.start(0.1) await self.node.start_listening(address) self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) for node_id, address in peer_addresses: @@ -31,7 +30,6 @@ class TestBlobAnnouncer(AsyncioTestCase): async def add_peer(self, node_id, address, add_to_routing_table=True): n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) await n.start_listening(address) - n.protocol.start(0.1) self.nodes.update({len(self.nodes): n}) if add_to_routing_table: self.node.protocol.add_peer( @@ -108,7 +106,7 @@ class TestBlobAnnouncer(AsyncioTestCase): _, task = last.accumulate_peers(search_q, peer_q) found_peers = await peer_q.get() - await task + task.cancel() self.assertEqual(1, len(found_peers)) self.assertEqual(self.node.protocol.node_id, found_peers[0].node_id) From b7d76fd09fcab38d7326990b4a122edf58114592 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 11 May 2019 23:30:05 -0300 Subject: [PATCH 03/20] add integration tests, fix bug that made refresh not exclude bad peers --- lbrynet/dht/node.py | 6 +++- lbrynet/dht/peer.py | 12 +++---- lbrynet/dht/protocol/protocol.py | 6 ++-- tests/integration/test_dht.py | 57 ++++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 9 deletions(-) create mode 100644 tests/integration/test_dht.py diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 955798c8e..b10884644 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -28,7 +28,7 @@ class Node: self._join_task: asyncio.Task = None self._refresh_task: asyncio.Task = None - async def refresh_node(self): + async def refresh_node(self, force_once=False): while True: # remove peers with expired blob announcements from the datastore self.protocol.data_store.removed_expired_peers() @@ -55,6 +55,8 @@ class Node: peers = await self.peer_search(node_ids.pop()) total_peers.extend(peers) else: + if force_once: + break fut = asyncio.Future(loop=self.loop) self.loop.call_later(constants.refresh_interval // 4, fut.set_result, None) await fut @@ -64,6 +66,8 @@ class Node: to_ping = [peer for peer in set(total_peers) if self.protocol.peer_manager.peer_is_good(peer) is not True] if to_ping: self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) + if force_once: + break fut = asyncio.Future(loop=self.loop) self.loop.call_later(constants.refresh_interval, fut.set_result, None) diff --git a/lbrynet/dht/peer.py b/lbrynet/dht/peer.py index 23bc29635..8253d1332 100644 --- a/lbrynet/dht/peer.py +++ b/lbrynet/dht/peer.py @@ -31,7 +31,6 @@ class PeerManager: self._node_id_mapping: typing.Dict[typing.Tuple[str, int], bytes] = {} self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = {} self._node_tokens: typing.Dict[bytes, (float, bytes)] = {} - self._kademlia_peers: typing.Dict[typing.Tuple[bytes, str, int], 'KademliaPeer'] def report_failure(self, address: str, udp_port: int): now = self._loop.time() @@ -104,11 +103,12 @@ class PeerManager: delay = self._loop.time() - constants.check_refresh_interval - if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: - return - addr_tup = (address, udp_port) - if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id: - return + # fixme: find a way to re-enable that without breaking other parts + #if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: + # return + #addr_tup = (address, udp_port) + #if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id: + # return previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None)) last_requested = self._last_requested.get((address, udp_port)) last_replied = self._last_replied.get((address, udp_port)) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 152965b3f..346c7c8b4 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -191,12 +191,14 @@ class PingQueue: self._process_task: asyncio.Task = None self._running = False self._running_pings: typing.Set[asyncio.Task] = set() + self._default_delay = constants.maybe_ping_delay @property def running(self): return self._running - def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: float = constants.maybe_ping_delay): + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): + delay = delay if delay is not None else self._default_delay now = self._loop.time() for peer in peers: if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]: @@ -390,7 +392,7 @@ class KademliaProtocol(DatagramProtocol): while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) - await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(0.2)) + await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1)) self._wakeup_routing_task.clear() def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py new file mode 100644 index 000000000..8092116cb --- /dev/null +++ b/tests/integration/test_dht.py @@ -0,0 +1,57 @@ +import asyncio + +from lbrynet.dht import constants +from lbrynet.dht.node import Node +from lbrynet.dht.peer import PeerManager, KademliaPeer +from torba.testcase import AsyncioTestCase + + +class CLIIntegrationTest(AsyncioTestCase): + + async def asyncSetUp(self): + import logging + logging.getLogger('asyncio').setLevel(logging.ERROR) + logging.getLogger('lbrynet.dht').setLevel(logging.DEBUG) + self.nodes = [] + self.known_node_addresses = [] + + async def setup_network(self, size: int, start_port=40000): + for i in range(size): + node_port = start_port + i + node = Node(self.loop, PeerManager(self.loop), node_id=constants.generate_id(i), + udp_port=node_port, internal_udp_port=node_port, + peer_port=3333, external_ip='127.0.0.1') + self.nodes.append(node) + self.known_node_addresses.append(('127.0.0.1', node_port)) + await node.start_listening('127.0.0.1') + for node in self.nodes: + node.protocol.rpc_timeout = .2 + node.protocol.ping_queue._default_delay = .5 + node.start('127.0.0.1', self.known_node_addresses[:1]) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) + + async def asyncTearDown(self): + for node in self.nodes: + node.stop() + + async def test_replace_bad_nodes(self): + await self.setup_network(20) + self.assertEquals(len(self.nodes), 20) + node = self.nodes[0] + bad_peers = [] + for candidate in self.nodes[1:10]: + address, port, node_id = candidate.protocol.external_ip, candidate.protocol.udp_port, candidate.protocol.node_id + peer = KademliaPeer(self.loop, address, node_id, port) + bad_peers.append(peer) + node.protocol.add_peer(peer) + candidate.stop() + await asyncio.sleep(.3) # let pending events settle + for bad_peer in bad_peers: + self.assertIn(bad_peer, node.protocol.routing_table.get_peers()) + await node.refresh_node(True) + await asyncio.sleep(.3) # let pending events settle + good_nodes = {good_node.protocol.node_id for good_node in self.nodes[10:]} + for peer in node.protocol.routing_table.get_peers(): + self.assertIn(peer.node_id, good_nodes) + + From f02df8670966517dcc4dbd93e1e230fb1d8d23df Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 May 2019 00:42:19 -0300 Subject: [PATCH 04/20] fix announce loop when there are no peers to announce --- lbrynet/dht/node.py | 33 +++++++++++++------------- lbrynet/dht/protocol/iterative_find.py | 3 +-- tests/integration/test_dht.py | 14 +++++++---- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index b10884644..aa52a9067 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -74,24 +74,25 @@ class Node: await fut async def announce_blob(self, blob_hash: str) -> typing.List[bytes]: - announced_to_node_ids = [] - while not announced_to_node_ids: - hash_value = binascii.unhexlify(blob_hash.encode()) - assert len(hash_value) == constants.hash_length - peers = await self.peer_search(hash_value) + hash_value = binascii.unhexlify(blob_hash.encode()) + assert len(hash_value) == constants.hash_length + peers = await self.peer_search(hash_value) - if not self.protocol.external_ip: - raise Exception("Cannot determine external IP") - log.debug("Store to %i peers", len(peers)) - for peer in peers: - log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port) - stored_to_tup = await asyncio.gather( - *(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop - ) - announced_to_node_ids.extend([node_id for node_id, contacted in stored_to_tup if contacted]) + if not self.protocol.external_ip: + raise Exception("Cannot determine external IP") + log.debug("Store to %i peers", len(peers)) + for peer in peers: + log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port) + stored_to_tup = await asyncio.gather( + *(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop + ) + stored_to = [node_id for node_id, contacted in stored_to_tup if contacted] + if stored_to: log.info("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8], - len(announced_to_node_ids), len(peers)) - return announced_to_node_ids + len(stored_to), len(peers)) + else: + log.warning("Failed announcing %s, stored to 0 peers") + return stored_to def stop(self) -> None: if self.joined.is_set(): diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index f60b3af6b..ad92342cf 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -276,8 +276,7 @@ class IterativeNodeFinder(IterativeFinder): not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))] if to_yield: - for peer in to_yield: - self.yielded_peers.add(peer) + self.yielded_peers.update(to_yield) self.iteration_queue.put_nowait(to_yield) if finish: self.iteration_queue.put_nowait(None) diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 8092116cb..afa2f1341 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -1,4 +1,5 @@ import asyncio +from binascii import hexlify from lbrynet.dht import constants from lbrynet.dht.node import Node @@ -6,7 +7,7 @@ from lbrynet.dht.peer import PeerManager, KademliaPeer from torba.testcase import AsyncioTestCase -class CLIIntegrationTest(AsyncioTestCase): +class DHTIntegrationTest(AsyncioTestCase): async def asyncSetUp(self): import logging @@ -24,16 +25,13 @@ class CLIIntegrationTest(AsyncioTestCase): self.nodes.append(node) self.known_node_addresses.append(('127.0.0.1', node_port)) await node.start_listening('127.0.0.1') + self.addCleanup(node.stop) for node in self.nodes: node.protocol.rpc_timeout = .2 node.protocol.ping_queue._default_delay = .5 node.start('127.0.0.1', self.known_node_addresses[:1]) await asyncio.gather(*[node.joined.wait() for node in self.nodes]) - async def asyncTearDown(self): - for node in self.nodes: - node.stop() - async def test_replace_bad_nodes(self): await self.setup_network(20) self.assertEquals(len(self.nodes), 20) @@ -55,3 +53,9 @@ class CLIIntegrationTest(AsyncioTestCase): self.assertIn(peer.node_id, good_nodes) + async def test_announce_no_peers(self): + await self.setup_network(1) + node = self.nodes[0] + blob_hash = hexlify(constants.generate_id(1337)).decode() + peers = await node.announce_blob(blob_hash) + self.assertEqual(len(peers), 0) From 6f06026511dba886672f1d1df0fcfc74931469f1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 May 2019 00:52:46 -0300 Subject: [PATCH 05/20] announcer goes idle if no peers in routing table --- lbrynet/dht/blob_announcer.py | 5 ++++- lbrynet/dht/node.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lbrynet/dht/blob_announcer.py b/lbrynet/dht/blob_announcer.py index 33ab4f70e..9321b8536 100644 --- a/lbrynet/dht/blob_announcer.py +++ b/lbrynet/dht/blob_announcer.py @@ -33,6 +33,10 @@ class BlobAnnouncer: while batch_size: if not self.node.joined.is_set(): await self.node.joined.wait() + await asyncio.sleep(60) + if not self.node.protocol.routing_table.get_peers(): + log.warning("No peers in DHT, announce round skipped") + continue self.announce_queue.extend(await self.storage.get_blobs_to_announce()) log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) while len(self.announce_queue): @@ -45,7 +49,6 @@ class BlobAnnouncer: if announced: await self.storage.update_last_announced_blobs(announced) log.info("announced %i blobs", len(announced)) - await asyncio.sleep(60) def start(self, batch_size: typing.Optional[int] = 10): assert not self.announce_task or self.announce_task.done(), "already running" diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index aa52a9067..1efe78037 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -91,7 +91,7 @@ class Node: log.info("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8], len(stored_to), len(peers)) else: - log.warning("Failed announcing %s, stored to 0 peers") + log.warning("Failed announcing %s, stored to 0 peers", blob_hash[:8]) return stored_to def stop(self) -> None: From bac7d99b8ab9cd85e7912acb01ed584f6cb4b545 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 May 2019 03:39:11 -0300 Subject: [PATCH 06/20] add ability to re-join network on disconnect + tests --- lbrynet/blob_exchange/client.py | 4 +-- lbrynet/dht/blob_announcer.py | 2 +- lbrynet/dht/node.py | 42 +++++++++++++++++---------- lbrynet/dht/peer.py | 4 +++ lbrynet/dht/protocol/protocol.py | 1 + tests/integration/test_dht.py | 22 ++++++++++++-- tests/unit/dht/test_blob_announcer.py | 3 +- 7 files changed, 56 insertions(+), 22 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 1b4c46f01..4051d8dc3 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -31,8 +31,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.closed = asyncio.Event(loop=self.loop) def data_received(self, data: bytes): - log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received", - self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received) + #log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received", + # self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received) if not self.transport or self.transport.is_closing(): log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port, binascii.hexlify(data)) diff --git a/lbrynet/dht/blob_announcer.py b/lbrynet/dht/blob_announcer.py index 9321b8536..1fc3bc069 100644 --- a/lbrynet/dht/blob_announcer.py +++ b/lbrynet/dht/blob_announcer.py @@ -33,7 +33,7 @@ class BlobAnnouncer: while batch_size: if not self.node.joined.is_set(): await self.node.joined.wait() - await asyncio.sleep(60) + await asyncio.sleep(60, loop=self.loop) if not self.node.protocol.routing_table.get_peers(): log.warning("No peers in DHT, announce round skipped") continue diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 1efe78037..62ec7f3d9 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -144,9 +144,21 @@ class Node: KademliaPeer(self.loop, address, udp_port=port) for (address, port) in known_node_addresses ] - while not len(self.protocol.routing_table.get_peers()): - peers.extend(await self.peer_search(self.protocol.node_id, shortlist=peers, count=32)) - self.protocol.ping_queue.enqueue_maybe_ping(*peers, delay=0.0) + while True: + if not self.protocol.routing_table.get_peers(): + if self.joined.is_set(): + self.joined.clear() + self.protocol.peer_manager.reset() + self.protocol.ping_queue.enqueue_maybe_ping(*peers, delay=0.0) + peers.extend(await self.peer_search(self.protocol.node_id, shortlist=peers, count=32)) + if self.protocol.routing_table.get_peers(): + self.joined.set() + log.info( + "Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), + self.protocol.routing_table.buckets_with_contacts()) + else: + continue + await asyncio.sleep(1, loop=self.loop) log.info("Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), self.protocol.routing_table.buckets_with_contacts()) @@ -186,27 +198,25 @@ class Node: async def _accumulate_search_junction(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue): - ongoing = {} + tasks = [] async def __start_producing_task(): while True: blob_hash = await search_queue.get() - ongoing[blob_hash] = asyncio.create_task(self._value_producer(blob_hash, result_queue)) - ongoing[''] = asyncio.create_task(__start_producing_task()) + tasks.append(asyncio.create_task(self._value_producer(blob_hash, result_queue))) + tasks.append(asyncio.create_task(__start_producing_task())) try: - while True: - await asyncio.wait(ongoing.values(), return_when='FIRST_COMPLETED') - for key in list(ongoing.keys())[:]: - if key and ongoing[key].done(): - ongoing[key] = asyncio.create_task(self._value_producer(key, result_queue)) + await asyncio.wait(tasks) finally: - for task in ongoing.values(): + for task in tasks: task.cancel() async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): - log.info("Searching %s", blob_hash[:8]) - async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): - result_queue.put_nowait(results) - log.info("Search expired %s", blob_hash[:8]) + for interval in range(1000): + log.info("Searching %s", blob_hash[:8]) + async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): + result_queue.put_nowait(results) + log.info("Search expired %s", blob_hash[:8]) + await asyncio.sleep(interval ** 2) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ diff --git a/lbrynet/dht/peer.py b/lbrynet/dht/peer.py index 8253d1332..c4fb5a9ba 100644 --- a/lbrynet/dht/peer.py +++ b/lbrynet/dht/peer.py @@ -32,6 +32,10 @@ class PeerManager: self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = {} self._node_tokens: typing.Dict[bytes, (float, bytes)] = {} + def reset(self): + for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested): + statistic.clear() + def report_failure(self, address: str, udp_port: int): now = self._loop.time() _, previous = self._rpc_failures.pop((address, udp_port), (None, None)) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 346c7c8b4..03a8df7a8 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -275,6 +275,7 @@ class KademliaProtocol(DatagramProtocol): self._wakeup_routing_task = asyncio.Event(loop=self.loop) self.maintaing_routing_task: typing.Optional[asyncio.Task] = None + @functools.lru_cache(128) def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC: return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer) diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index afa2f1341..66f69d6a5 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -16,7 +16,7 @@ class DHTIntegrationTest(AsyncioTestCase): self.nodes = [] self.known_node_addresses = [] - async def setup_network(self, size: int, start_port=40000): + async def setup_network(self, size: int, start_port=40000, seed_nodes=1): for i in range(size): node_port = start_port + i node = Node(self.loop, PeerManager(self.loop), node_id=constants.generate_id(i), @@ -29,7 +29,7 @@ class DHTIntegrationTest(AsyncioTestCase): for node in self.nodes: node.protocol.rpc_timeout = .2 node.protocol.ping_queue._default_delay = .5 - node.start('127.0.0.1', self.known_node_addresses[:1]) + node.start('127.0.0.1', self.known_node_addresses[:seed_nodes]) await asyncio.gather(*[node.joined.wait() for node in self.nodes]) async def test_replace_bad_nodes(self): @@ -52,6 +52,24 @@ class DHTIntegrationTest(AsyncioTestCase): for peer in node.protocol.routing_table.get_peers(): self.assertIn(peer.node_id, good_nodes) + async def test_re_join(self): + await self.setup_network(20, seed_nodes=10) + node = self.nodes[-1] + self.assertTrue(node.joined.is_set()) + self.assertTrue(node.protocol.routing_table.get_peers()) + for network_node in self.nodes[:-1]: + network_node.stop() + await node.refresh_node(True) + self.assertFalse(node.protocol.routing_table.get_peers()) + for network_node in self.nodes[:-1]: + await network_node.start_listening('127.0.0.1') + self.assertFalse(node.protocol.routing_table.get_peers()) + timeout = 20 + while not node.protocol.routing_table.get_peers(): + await asyncio.sleep(.1) + timeout -= 1 + if not timeout: + self.fail("node didnt join back after 2 seconds") async def test_announce_no_peers(self): await self.setup_network(1) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 8e75e0e76..1f4c535a8 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -86,7 +86,8 @@ class TestBlobAnnouncer(AsyncioTestCase): to_announce = await self.storage.get_blobs_to_announce() self.assertEqual(2, len(to_announce)) self.blob_announcer.start(batch_size=1) # so it covers batching logic - await self.advance(61.0) + # takes 60 seconds to start, but we advance 120 to ensure it processed all batches + await self.advance(60.0 * 2) to_announce = await self.storage.get_blobs_to_announce() self.assertEqual(0, len(to_announce)) self.blob_announcer.stop() From f5cef14d8d3d595669aa31a07a6d941a9269e2b0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 May 2019 03:53:46 -0300 Subject: [PATCH 07/20] test get token on announce --- tests/integration/test_dht.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 66f69d6a5..32ac27757 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -60,6 +60,7 @@ class DHTIntegrationTest(AsyncioTestCase): for network_node in self.nodes[:-1]: network_node.stop() await node.refresh_node(True) + await asyncio.sleep(.3) # let pending events settle self.assertFalse(node.protocol.routing_table.get_peers()) for network_node in self.nodes[:-1]: await network_node.start_listening('127.0.0.1') @@ -77,3 +78,11 @@ class DHTIntegrationTest(AsyncioTestCase): blob_hash = hexlify(constants.generate_id(1337)).decode() peers = await node.announce_blob(blob_hash) self.assertEqual(len(peers), 0) + + async def test_get_token_on_announce(self): + await self.setup_network(2, seed_nodes=2) + node1, node2 = self.nodes + node1.protocol.peer_manager.clear_token(node2.protocol.node_id) + blob_hash = hexlify(constants.generate_id(1337)).decode() + node_ids = await node1.announce_blob(blob_hash) + self.assertIn(node2.protocol.node_id, node_ids) From 32d15638856b2aef5b13994189f860c4820f4c4d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 May 2019 04:12:12 -0300 Subject: [PATCH 08/20] fix handling of invalid token --- lbrynet/dht/protocol/protocol.py | 9 ++++++++- tests/integration/test_dht.py | 6 ++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 03a8df7a8..7474218e5 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -628,12 +628,15 @@ class KademliaProtocol(DatagramProtocol): return True async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer') -> typing.Tuple[bytes, bool]: - try: + async def __store(): res = await self.get_rpc_peer(peer).store(hash_value) if res != b"OK": raise ValueError(res) log.debug("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer) return peer.node_id, True + + try: + return await __store() except asyncio.TimeoutError: log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer) except ValueError as err: @@ -641,6 +644,10 @@ class KademliaProtocol(DatagramProtocol): except Exception as err: if 'Invalid token' in str(err): self.peer_manager.clear_token(peer.node_id) + try: + return await __store() + except: + return peer.node_id, False else: log.exception("Unexpected error while storing blob_hash") return peer.node_id, False diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 32ac27757..5b714ef3d 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -86,3 +86,9 @@ class DHTIntegrationTest(AsyncioTestCase): blob_hash = hexlify(constants.generate_id(1337)).decode() node_ids = await node1.announce_blob(blob_hash) self.assertIn(node2.protocol.node_id, node_ids) + node2.protocol.node_rpc.refresh_token() + node_ids = await node1.announce_blob(blob_hash) + self.assertIn(node2.protocol.node_id, node_ids) + node2.protocol.node_rpc.refresh_token() + node_ids = await node1.announce_blob(blob_hash) + self.assertIn(node2.protocol.node_id, node_ids) From 20022d49c1b87e28065c1fe45d27a6d7f6798a2b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 May 2019 04:13:58 -0300 Subject: [PATCH 09/20] less verbose dht tests --- tests/integration/test_dht.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 5b714ef3d..8fdcdaf83 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -12,7 +12,7 @@ class DHTIntegrationTest(AsyncioTestCase): async def asyncSetUp(self): import logging logging.getLogger('asyncio').setLevel(logging.ERROR) - logging.getLogger('lbrynet.dht').setLevel(logging.DEBUG) + logging.getLogger('lbrynet.dht').setLevel(logging.WARN) self.nodes = [] self.known_node_addresses = [] From 6e327d729bb236116dd968effe3139af34953679 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 May 2019 04:28:58 -0300 Subject: [PATCH 10/20] fix test --- tests/integration/test_dht.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 8fdcdaf83..23d6c5491 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -63,7 +63,7 @@ class DHTIntegrationTest(AsyncioTestCase): await asyncio.sleep(.3) # let pending events settle self.assertFalse(node.protocol.routing_table.get_peers()) for network_node in self.nodes[:-1]: - await network_node.start_listening('127.0.0.1') + network_node.start('127.0.0.1', self.known_node_addresses) self.assertFalse(node.protocol.routing_table.get_peers()) timeout = 20 while not node.protocol.routing_table.get_peers(): From 0075dcc2c04b307159c8c57ea814b67935bb2db0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 02:40:04 -0300 Subject: [PATCH 11/20] iterative find probes bad peers + test case for edge scenario Tom found --- lbrynet/dht/protocol/iterative_find.py | 2 +- tests/integration/test_dht.py | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index ad92342cf..94cebb609 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -181,7 +181,7 @@ class IterativeFinder: if added >= constants.alpha: break origin_address = (peer.address, peer.udp_port) - if origin_address in self.exclude or self.peer_manager.peer_is_good(peer) is False: + if origin_address in self.exclude: continue if peer.node_id == self.protocol.node_id: continue diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 23d6c5491..cccf2e8f9 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -92,3 +92,23 @@ class DHTIntegrationTest(AsyncioTestCase): node2.protocol.node_rpc.refresh_token() node_ids = await node1.announce_blob(blob_hash) self.assertIn(node2.protocol.node_id, node_ids) + + async def test_peer_search_removes_bad_peers(self): + # that's an edge case discovered by Tom, but an important one + # imagine that you only got bad peers and refresh will happen in one hour + # instead of failing for one hour we should be able to recover by scheduling pings to bad peers we find + await self.setup_network(2, seed_nodes=2) + node1, node2 = self.nodes + node2.stop() + # forcefully make it a bad peer but dont remove it from routing table + address, port, node_id = node2.protocol.external_ip, node2.protocol.udp_port, node2.protocol.node_id + peer = KademliaPeer(self.loop, address, node_id, port) + self.assertTrue(node1.protocol.peer_manager.peer_is_good(peer)) + node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) + node1.protocol.peer_manager.report_failure(node2.protocol.external_ip, node2.protocol.udp_port) + self.assertFalse(node1.protocol.peer_manager.peer_is_good(peer)) + + # now a search happens, which removes bad peers while contacting them + self.assertTrue(node1.protocol.routing_table.get_peers()) + await node1.peer_search(node2.protocol.node_id) + self.assertFalse(node1.protocol.routing_table.get_peers()) From 18af2dcd4e9cf9bf9108a2e94aaae713654b3f9d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 02:52:24 -0300 Subject: [PATCH 12/20] remove peer immediately instead of probing when its bad during search --- lbrynet/dht/protocol/iterative_find.py | 3 +++ lbrynet/dht/protocol/protocol.py | 9 ++++++--- tests/integration/test_dht.py | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index 94cebb609..6516d7114 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -181,6 +181,9 @@ class IterativeFinder: if added >= constants.alpha: break origin_address = (peer.address, peer.udp_port) + if self.peer_manager.peer_is_good(peer) is False: + self.protocol.remove_peer(peer) + continue if origin_address in self.exclude: continue if peer.node_id == self.protocol.node_id: diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 7474218e5..c9c0a7c4f 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -376,12 +376,16 @@ class KademliaProtocol(DatagramProtocol): self.routing_table.buckets[bucket_index].remove_peer(to_replace) return await self._add_peer(peer) - def add_peer(self, peer: 'KademliaPeer') -> bool: + def add_peer(self, peer: 'KademliaPeer'): if peer.node_id == self.node_id: return False self._to_add.add(peer) self._wakeup_routing_task.set() + def remove_peer(self, peer: 'KademliaPeer'): + self._to_remove.add(peer) + self._wakeup_routing_task.set() + async def routing_table_task(self): while True: while self._to_remove: @@ -558,8 +562,7 @@ class KademliaProtocol(DatagramProtocol): except (asyncio.TimeoutError, RemoteException): self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: - self._to_remove.add(peer) - self._wakeup_routing_task.set() + self.remove_peer(peer) raise def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index cccf2e8f9..cf03231ca 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -109,6 +109,6 @@ class DHTIntegrationTest(AsyncioTestCase): self.assertFalse(node1.protocol.peer_manager.peer_is_good(peer)) # now a search happens, which removes bad peers while contacting them - self.assertTrue(node1.protocol.routing_table.get_peers()) + self.assertNotIn(peer, node1.protocol._to_remove) await node1.peer_search(node2.protocol.node_id) - self.assertFalse(node1.protocol.routing_table.get_peers()) + self.assertIn(peer, node1.protocol._to_remove) From fb457c820a88c7ddae2e2ea66a7e3cd7801eb5e9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 04:34:39 -0300 Subject: [PATCH 13/20] reorganizing iterative find code --- lbrynet/dht/protocol/iterative_find.py | 85 +++++++++++++------------- 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index 6516d7114..e8d4a6286 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -87,9 +87,8 @@ class IterativeFinder: self.max_results = max_results self.exclude = exclude or [] - self.shortlist: typing.List['KademliaPeer'] = get_shortlist(routing_table, key, shortlist) - self.active: typing.List['KademliaPeer'] = [] - self.contacted: typing.Set[typing.Tuple[str, int]] = set() + self.active: typing.Set['KademliaPeer'] = set() + self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) self.closest_peer: typing.Optional['KademliaPeer'] = None @@ -103,6 +102,12 @@ class IterativeFinder: self.running = False self.tasks: typing.List[asyncio.Task] = [] self.delayed_calls: typing.List[asyncio.Handle] = [] + for peer in get_shortlist(routing_table, key, shortlist): + if peer.node_id: + self._add_active(peer) + else: + # seed nodes + self._schedule_probe(peer) async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ @@ -134,36 +139,32 @@ class IterativeFinder: def _is_closer(self, peer: 'KademliaPeer') -> bool: return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) - def _update_closest(self): - self.active.sort(key=lambda peer: self.distance(peer.node_id)) - if self.closest_peer and self.closest_peer is not self.active[0]: - if self._is_closer(self.active[0]): - self.prev_closest_peer = self.closest_peer - self.closest_peer = self.active[0] + def _add_active(self, peer): + if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: + if self.peer_manager.peer_is_good(peer) is not False: + self.active.add(peer) + if self._is_closer(peer): + self.prev_closest_peer = self.closest_peer + self.closest_peer = peer + else: + self.protocol.remove_peer(peer) async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): - if peer not in self.active and peer.node_id: - self.active.append(peer) + self._add_active(peer) for contact_triple in response.get_close_triples(): node_id, address, udp_port = contact_triple - if (address, udp_port) not in self.contacted: # and not self.peer_manager.is_ignored(addr_tuple) - found_peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port) - if found_peer not in self.active and self.peer_manager.peer_is_good(found_peer) is not False: - self.active.append(found_peer) - self._update_closest() + self._add_active(self.peer_manager.get_kademlia_peer(node_id, address, udp_port)) self.check_result_ready(response) async def _send_probe(self, peer: 'KademliaPeer'): try: response = await self.send_probe(peer) except asyncio.TimeoutError: - if peer in self.active: - self.active.remove(peer) + self.active.discard(peer) return except ValueError as err: log.warning(str(err)) - if peer in self.active: - self.active.remove(peer) + self.active.discard(peer) return except TransportNotConnected: return self.aclose() @@ -173,43 +174,44 @@ class IterativeFinder: async def _search_round(self): """ - Send up to constants.alpha (5) probes to the closest peers in the shortlist + Send up to constants.alpha (5) probes to closest active peers """ added = 0 - for peer in chain(self.active, self.shortlist): + to_probe = list(self.active - self.contacted) + to_probe.sort(key=lambda peer: self.distance(self.key)) + for peer in to_probe: if added >= constants.alpha: break origin_address = (peer.address, peer.udp_port) - if self.peer_manager.peer_is_good(peer) is False: - self.protocol.remove_peer(peer) - continue if origin_address in self.exclude: continue if peer.node_id == self.protocol.node_id: continue if origin_address == (self.protocol.external_ip, self.protocol.udp_port): continue - if origin_address not in self.contacted: - self.contacted.add(origin_address) - - t = self.loop.create_task(self._send_probe(peer)) - - def callback(_): - self.running_probes.difference_update({ - probe for probe in self.running_probes if probe.done() or probe == t - }) - if not self.running_probes: - self.tasks.append(self.loop.create_task(self._search_task(0.0))) - - t.add_done_callback(callback) - self.running_probes.add(t) - added += 1 + self._schedule_probe(peer) + added += 1 log.debug("running %d probes", len(self.running_probes)) if not added and not self.running_probes: log.debug("search for %s exhausted", hexlify(self.key)[:8]) self.search_exhausted() + def _schedule_probe(self, peer: 'KademliaPeer'): + self.contacted.add(peer) + + t = self.loop.create_task(self._send_probe(peer)) + + def callback(_): + self.running_probes.difference_update({ + probe for probe in self.running_probes if probe.done() or probe == t + }) + if not self.running_probes: + self.tasks.append(self.loop.create_task(self._search_task(0.0))) + + t.add_done_callback(callback) + self.running_probes.add(t) + async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay): try: if self.running: @@ -296,8 +298,7 @@ class IterativeNodeFinder(IterativeFinder): self.bottom_out_count = 0 elif self.prev_closest_peer and self.closest_peer: self.bottom_out_count += 1 - log.info("bottom out %i %i %i %i", len(self.active), len(self.contacted), len(self.shortlist), - self.bottom_out_count) + log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count) if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: log.info("limit hit") self.put_result(self.active, True) From 7c4eb416d6525102ea91390a8e311c25770859e8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 13:14:08 -0300 Subject: [PATCH 14/20] dont clearbanned if there are working peers, dont remove node immediately during search --- lbrynet/blob_exchange/downloader.py | 3 ++- lbrynet/dht/protocol/iterative_find.py | 2 +- lbrynet/stream/downloader.py | 2 ++ tests/integration/test_dht.py | 6 +++--- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 1805966ec..06c7c755c 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -62,10 +62,11 @@ class BlobDownloader: await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') def cleanup_active(self): + if not self.active_connections: + self.clearbanned() to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] for peer in to_remove: del self.active_connections[peer] - self.clearbanned() def clearbanned(self): now = self.loop.time() diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index e8d4a6286..b5671cc05 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -147,7 +147,7 @@ class IterativeFinder: self.prev_closest_peer = self.closest_peer self.closest_peer = peer else: - self.protocol.remove_peer(peer) + self.protocol.ping_queue.enqueue_maybe_ping(peer, 0.0) async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index a6ffc569b..5b053082e 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -83,6 +83,8 @@ class StreamDownloader: # set up peer accumulation if node: self.node = node + if self.accumulate_task and not self.accumulate_task.done(): + self.accumulate_task.cancel() _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) await self.add_fixed_peers() # start searching for peers for the sd hash diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index cf03231ca..33910d6df 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -27,7 +27,7 @@ class DHTIntegrationTest(AsyncioTestCase): await node.start_listening('127.0.0.1') self.addCleanup(node.stop) for node in self.nodes: - node.protocol.rpc_timeout = .2 + node.protocol.rpc_timeout = .5 node.protocol.ping_queue._default_delay = .5 node.start('127.0.0.1', self.known_node_addresses[:seed_nodes]) await asyncio.gather(*[node.joined.wait() for node in self.nodes]) @@ -109,6 +109,6 @@ class DHTIntegrationTest(AsyncioTestCase): self.assertFalse(node1.protocol.peer_manager.peer_is_good(peer)) # now a search happens, which removes bad peers while contacting them - self.assertNotIn(peer, node1.protocol._to_remove) + self.assertTrue(node1.protocol.routing_table.get_peers()) await node1.peer_search(node2.protocol.node_id) - self.assertIn(peer, node1.protocol._to_remove) + self.assertFalse(node1.protocol.routing_table.get_peers()) From d024433d1b947e000ee934c6e473191dfd0d0df9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 13:34:49 -0300 Subject: [PATCH 15/20] simplify accumulate task --- lbrynet/dht/node.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 62ec7f3d9..f3e7c340e 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -199,13 +199,10 @@ class Node: async def _accumulate_search_junction(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue): tasks = [] - async def __start_producing_task(): + try: while True: blob_hash = await search_queue.get() tasks.append(asyncio.create_task(self._value_producer(blob_hash, result_queue))) - tasks.append(asyncio.create_task(__start_producing_task())) - try: - await asyncio.wait(tasks) finally: for task in tasks: task.cancel() From b91d2190f44dfb3b049ccb9da288dddc6b66c4e2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 14:24:13 -0300 Subject: [PATCH 16/20] disable infinite peer search, cleanup logging, tune scores to slow connections --- lbrynet/blob_exchange/downloader.py | 5 ++--- lbrynet/dht/node.py | 8 ++------ lbrynet/dht/protocol/protocol.py | 2 -- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 06c7c755c..364a22f70 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -37,7 +37,6 @@ class BlobDownloader: async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): if blob.get_is_verified(): return - self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones transport = self.connections.get(peer) start = self.loop.time() bytes_received, transport = await request_blob( @@ -55,14 +54,14 @@ class BlobDownloader: self.failures[peer] = 0 self.connections[peer] = transport elapsed = self.loop.time() - start - self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 0 + self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 async def new_peer_or_finished(self): active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)] await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') def cleanup_active(self): - if not self.active_connections: + if not self.active_connections and not self.connections: self.clearbanned() to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] for peer in to_remove: diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index f3e7c340e..83f7f6f92 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -208,12 +208,8 @@ class Node: task.cancel() async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): - for interval in range(1000): - log.info("Searching %s", blob_hash[:8]) - async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): - result_queue.put_nowait(results) - log.info("Search expired %s", blob_hash[:8]) - await asyncio.sleep(interval ** 2) + async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): + result_queue.put_nowait(results) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index c9c0a7c4f..9559f984c 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -311,7 +311,6 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): - log.debug("Trying to add %s:%d", peer.address, peer.udp_port) for p in self.routing_table.get_peers(): if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: self.routing_table.remove_peer(p) @@ -391,7 +390,6 @@ class KademliaProtocol(DatagramProtocol): while self._to_remove: async with self._split_lock: peer = self._to_remove.pop() - log.debug("Trying to remove %s:%d", peer.address, peer.udp_port) self.routing_table.remove_peer(peer) self.routing_table.join_buckets() while self._to_add: From 6f7987513501a9291855c417cb1a0deeed6ea9b4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 14:27:49 -0300 Subject: [PATCH 17/20] fix test --- tests/integration/test_dht.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 33910d6df..83a9dd459 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -111,4 +111,4 @@ class DHTIntegrationTest(AsyncioTestCase): # now a search happens, which removes bad peers while contacting them self.assertTrue(node1.protocol.routing_table.get_peers()) await node1.peer_search(node2.protocol.node_id) - self.assertFalse(node1.protocol.routing_table.get_peers()) + self.assertIn(peer, node1.protocol.ping_queue._pending_contacts) From 2439743804725085549a5fc6757643f49b8f8ab8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 14:46:06 -0300 Subject: [PATCH 18/20] probe peer instead of scheduling ping for removal --- lbrynet/dht/protocol/iterative_find.py | 16 ++++++++-------- tests/integration/test_dht.py | 3 ++- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index b5671cc05..e5480887c 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -141,13 +141,10 @@ class IterativeFinder: def _add_active(self, peer): if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: - if self.peer_manager.peer_is_good(peer) is not False: - self.active.add(peer) - if self._is_closer(peer): - self.prev_closest_peer = self.closest_peer - self.closest_peer = peer - else: - self.protocol.ping_queue.enqueue_maybe_ping(peer, 0.0) + self.active.add(peer) + if self._is_closer(peer): + self.prev_closest_peer = self.closest_peer + self.closest_peer = peer async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) @@ -276,7 +273,10 @@ class IterativeNodeFinder(IterativeFinder): def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): not_yet_yielded = [ - peer for peer in from_iter if peer not in self.yielded_peers and peer.node_id != self.protocol.node_id + peer for peer in from_iter + if peer not in self.yielded_peers + and peer.node_id != self.protocol.node_id + and self.peer_manager.peer_is_good(peer) is not False ] not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))] diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index 83a9dd459..ae8ad111f 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -111,4 +111,5 @@ class DHTIntegrationTest(AsyncioTestCase): # now a search happens, which removes bad peers while contacting them self.assertTrue(node1.protocol.routing_table.get_peers()) await node1.peer_search(node2.protocol.node_id) - self.assertIn(peer, node1.protocol.ping_queue._pending_contacts) + await asyncio.sleep(.3) # let pending events settle + self.assertFalse(node1.protocol.routing_table.get_peers()) From 124fdb7d31be7a15053304b7aa007ff0724aea2a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 14:57:58 -0300 Subject: [PATCH 19/20] fixes from review --- lbrynet/dht/node.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 83f7f6f92..dfc60b0a6 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -171,14 +171,14 @@ class Node: ) ) - def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List] = None, + def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, bottom_out_limit: int = constants.bottom_out_limit, max_results: int = constants.k) -> IterativeNodeFinder: return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, key, bottom_out_limit, max_results, None, shortlist) - def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List] = None, + def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, bottom_out_limit: int = 40, max_results: int = -1) -> IterativeValueFinder: @@ -186,7 +186,7 @@ class Node: key, bottom_out_limit, max_results, None, shortlist) async def peer_search(self, node_id: bytes, count=constants.k, max_results=constants.k*2, - bottom_out_limit=20, shortlist: typing.Optional[typing.List] = None + bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] async for iteration_peers in self.get_iterative_node_finder( @@ -202,7 +202,7 @@ class Node: try: while True: blob_hash = await search_queue.get() - tasks.append(asyncio.create_task(self._value_producer(blob_hash, result_queue))) + tasks.append(self.loop.create_task(self._value_producer(blob_hash, result_queue))) finally: for task in tasks: task.cancel() @@ -214,5 +214,5 @@ class Node: def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ asyncio.Queue, asyncio.Task]: - q = peer_queue or asyncio.Queue() - return q, asyncio.create_task(self._accumulate_search_junction(search_queue, q)) + q = peer_queue or asyncio.Queue(loop=self.loop) + return q, self.loop.create_task(self._accumulate_search_junction(search_queue, q)) From 71a4be02dd820dde580b95531484c2d5f0841ecc Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 15:56:21 -0300 Subject: [PATCH 20/20] use loop to create task --- lbrynet/dht/protocol/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 9559f984c..eff0211c2 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -280,7 +280,7 @@ class KademliaProtocol(DatagramProtocol): return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer) def start(self): - self.maintaing_routing_task = asyncio.create_task(self.routing_table_task()) + self.maintaing_routing_task = self.loop.create_task(self.routing_table_task()) def stop(self): if self.maintaing_routing_task: @@ -395,7 +395,7 @@ class KademliaProtocol(DatagramProtocol): while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) - await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1)) + await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1, loop=self.loop), loop=self.loop) self._wakeup_routing_task.clear() def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):