From 2ac4883bda70e4a903825ca792416abd4761aca6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 24 Jul 2019 22:32:20 -0300 Subject: [PATCH] cleanup and fix prober --- lbry/lbry/blob_exchange/client.py | 1 + lbry/lbry/blob_exchange/downloader.py | 25 +++++++++++-------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index 78de6fa1f..37b07edbd 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -242,6 +242,7 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['Abstr if not connected_transport: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) + connected_transport = protocol.transport if blob is None or blob.get_is_verified() or not blob.is_writeable(): # blob is None happens when we are just opening a connection # file exists but not verified means someone is writing right now, give it time, come back later diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index c6fca2113..3c60e5db1 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -30,7 +30,8 @@ class BlobDownloader: self.is_running = asyncio.Event(loop=self.loop) def should_race_continue(self, blob: 'AbstractBlob'): - if len(self.active_connections) >= self.config.max_connections_per_download: + max_probes = self.config.max_connections_per_download * (1 if self.connections else 10) + if len(self.active_connections) >= max_probes: return False return not (blob.get_is_verified() or not blob.is_writeable()) @@ -92,22 +93,18 @@ class BlobDownloader: if batch: self.peer_queue.put_nowait(list(batch)) log.debug( - "running, %d peers, %d ignored, %d active", - len(batch), len(self.ignored), len(self.active_connections) + "running, %d peers, %d ignored, %d active, %s connections", + len(batch), len(self.ignored), len(self.active_connections), len(self.connections) ) for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): - just_probe = False + if peer in self.ignored or peer in self.active_connections: + continue if not self.should_race_continue(blob): - if len(self.connections) >= self.config.max_connections_per_download * 10: - if peer in self.connections: - continue - just_probe = True - else: - break - if peer not in self.active_connections and peer not in self.ignored: - log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) - self.active_connections[peer] = t + break + log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) + just_probe = len(self.connections) == 0 + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) + self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active() log.debug("downloaded %s", blob_hash[:8])