diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 1805966ec..06c7c755c 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -62,10 +62,11 @@ class BlobDownloader: await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') def cleanup_active(self): + if not self.active_connections: + self.clearbanned() to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] for peer in to_remove: del self.active_connections[peer] - self.clearbanned() def clearbanned(self): now = self.loop.time() diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index e8d4a6286..b5671cc05 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -147,7 +147,7 @@ class IterativeFinder: self.prev_closest_peer = self.closest_peer self.closest_peer = peer else: - self.protocol.remove_peer(peer) + self.protocol.ping_queue.enqueue_maybe_ping(peer, 0.0) async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index a6ffc569b..5b053082e 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -83,6 +83,8 @@ class StreamDownloader: # set up peer accumulation if node: self.node = node + if self.accumulate_task and not self.accumulate_task.done(): + self.accumulate_task.cancel() _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) await self.add_fixed_peers() # start searching for peers for the sd hash diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py index cf03231ca..33910d6df 100644 --- a/tests/integration/test_dht.py +++ b/tests/integration/test_dht.py @@ -27,7 +27,7 @@ class DHTIntegrationTest(AsyncioTestCase): await node.start_listening('127.0.0.1') self.addCleanup(node.stop) for node in self.nodes: - node.protocol.rpc_timeout = .2 + node.protocol.rpc_timeout = .5 node.protocol.ping_queue._default_delay = .5 node.start('127.0.0.1', self.known_node_addresses[:seed_nodes]) await asyncio.gather(*[node.joined.wait() for node in self.nodes]) @@ -109,6 +109,6 @@ class DHTIntegrationTest(AsyncioTestCase): self.assertFalse(node1.protocol.peer_manager.peer_is_good(peer)) # now a search happens, which removes bad peers while contacting them - self.assertNotIn(peer, node1.protocol._to_remove) + self.assertTrue(node1.protocol.routing_table.get_peers()) await node1.peer_search(node2.protocol.node_id) - self.assertIn(peer, node1.protocol._to_remove) + self.assertFalse(node1.protocol.routing_table.get_peers())