From b91d2190f44dfb3b049ccb9da288dddc6b66c4e2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 13 May 2019 14:24:13 -0300 Subject: [PATCH] disable infinite peer search, cleanup logging, tune scores to slow connections --- lbrynet/blob_exchange/downloader.py | 5 ++--- lbrynet/dht/node.py | 8 ++------ lbrynet/dht/protocol/protocol.py | 2 -- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 06c7c755c..364a22f70 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -37,7 +37,6 @@ class BlobDownloader: async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): if blob.get_is_verified(): return - self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones transport = self.connections.get(peer) start = self.loop.time() bytes_received, transport = await request_blob( @@ -55,14 +54,14 @@ class BlobDownloader: self.failures[peer] = 0 self.connections[peer] = transport elapsed = self.loop.time() - start - self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 0 + self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 async def new_peer_or_finished(self): active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)] await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') def cleanup_active(self): - if not self.active_connections: + if not self.active_connections and not self.connections: self.clearbanned() to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] for peer in to_remove: diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index f3e7c340e..83f7f6f92 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -208,12 +208,8 @@ class Node: task.cancel() async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): - for interval in range(1000): - log.info("Searching %s", blob_hash[:8]) - async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): - result_queue.put_nowait(results) - log.info("Search expired %s", blob_hash[:8]) - await asyncio.sleep(interval ** 2) + async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): + result_queue.put_nowait(results) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index c9c0a7c4f..9559f984c 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -311,7 +311,6 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): - log.debug("Trying to add %s:%d", peer.address, peer.udp_port) for p in self.routing_table.get_peers(): if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: self.routing_table.remove_peer(p) @@ -391,7 +390,6 @@ class KademliaProtocol(DatagramProtocol): while self._to_remove: async with self._split_lock: peer = self._to_remove.pop() - log.debug("Trying to remove %s:%d", peer.address, peer.udp_port) self.routing_table.remove_peer(peer) self.routing_table.join_buckets() while self._to_add: