From 5a7212aebba33fd51d2cd2d615b323e43da063e2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 24 Jul 2019 19:59:12 -0300 Subject: [PATCH 1/3] gotta probe 'em all --- lbry/lbry/blob_exchange/client.py | 5 +++-- lbry/lbry/blob_exchange/downloader.py | 15 +++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index cbf376946..78de6fa1f 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -220,7 +220,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent -async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, +async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['AbstractBlob'], address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, connected_transport: asyncio.Transport = None, connection_id: int = 0, connection_manager: typing.Optional['ConnectionManager'] = None)\ @@ -242,7 +242,8 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', addres if not connected_transport: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) - if blob.get_is_verified() or not blob.is_writeable(): + if blob is None or blob.get_is_verified() or not blob.is_writeable(): + # blob is None happens when we are just opening a connection # file exists but not verified means someone is writing right now, give it time, come back later return 0, connected_transport return await protocol.download_blob(blob) diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 02943749b..c6fca2113 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -34,13 +34,14 @@ class BlobDownloader: return False return not (blob.get_is_verified() or not blob.is_writeable()) - async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): + async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0, + just_probe: bool = False): if blob.get_is_verified(): return transport = self.connections.get(peer) start = self.loop.time() bytes_received, transport = await request_blob( - self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, + self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, connection_manager=self.blob_manager.connection_manager @@ -95,11 +96,17 @@ class BlobDownloader: len(batch), len(self.ignored), len(self.active_connections) ) for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): + just_probe = False if not self.should_race_continue(blob): - break + if len(self.connections) >= self.config.max_connections_per_download * 10: + if peer in self.connections: + continue + just_probe = True + else: + break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active() From 2ac4883bda70e4a903825ca792416abd4761aca6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 24 Jul 2019 22:32:20 -0300 Subject: [PATCH 2/3] cleanup and fix prober --- lbry/lbry/blob_exchange/client.py | 1 + lbry/lbry/blob_exchange/downloader.py | 25 +++++++++++-------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index 78de6fa1f..37b07edbd 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -242,6 +242,7 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['Abstr if not connected_transport: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) + connected_transport = protocol.transport if blob is None or blob.get_is_verified() or not blob.is_writeable(): # blob is None happens when we are just opening a connection # file exists but not verified means someone is writing right now, give it time, come back later diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index c6fca2113..3c60e5db1 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -30,7 +30,8 @@ class BlobDownloader: self.is_running = asyncio.Event(loop=self.loop) def should_race_continue(self, blob: 'AbstractBlob'): - if len(self.active_connections) >= self.config.max_connections_per_download: + max_probes = self.config.max_connections_per_download * (1 if self.connections else 10) + if len(self.active_connections) >= max_probes: return False return not (blob.get_is_verified() or not blob.is_writeable()) @@ -92,22 +93,18 @@ class BlobDownloader: if batch: self.peer_queue.put_nowait(list(batch)) log.debug( - "running, %d peers, %d ignored, %d active", - len(batch), len(self.ignored), len(self.active_connections) + "running, %d peers, %d ignored, %d active, %s connections", + len(batch), len(self.ignored), len(self.active_connections), len(self.connections) ) for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): - just_probe = False + if peer in self.ignored or peer in self.active_connections: + continue if not self.should_race_continue(blob): - if len(self.connections) >= self.config.max_connections_per_download * 10: - if peer in self.connections: - continue - just_probe = True - else: - break - if peer not in self.active_connections and peer not in self.ignored: - log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) - self.active_connections[peer] = t + break + log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) + just_probe = len(self.connections) == 0 + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) + self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active() log.debug("downloaded %s", blob_hash[:8]) From 35e7cc685f3428c5c7b08c93e0fc7c83e19e80a7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 24 Jul 2019 22:12:51 -0400 Subject: [PATCH 3/3] fix logging error --- lbry/lbry/blob_exchange/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index 37b07edbd..ff3516d32 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -187,7 +187,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return await self._download_blob() except OSError as e: # i'm not sure how to fix this race condition - jack - log.warning("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) + log.warning("race happened downloading %s from %s:%s", blob_hash, self.peer_address, self.peer_port) # return self._blob_bytes_received, self.transport raise except asyncio.TimeoutError: