From 669f3394c7c0aa3629dddd86cf6228a7a0e787fa Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 26 Nov 2019 14:53:14 -0500 Subject: [PATCH] fix requesting same blob over and over if only peer(s) say they don't have it --- lbry/lbry/blob_exchange/downloader.py | 28 ++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 55b9a305c..8584bb464 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -27,7 +27,7 @@ class BlobDownloader: self.ignored: typing.Dict['KademliaPeer', int] = {} self.scores: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {} - self.connection_failures: typing.List['KademliaPeer'] = [] + self.connection_failures: typing.Set['KademliaPeer'] = set() self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {} self.is_running = asyncio.Event(loop=self.loop) @@ -48,7 +48,7 @@ class BlobDownloader: connection_id=connection_id, connection_manager=self.blob_manager.connection_manager ) if not bytes_received and not protocol and peer not in self.connection_failures: - self.connection_failures.append(peer) + self.connection_failures.add(peer) if not protocol and peer not in self.ignored: self.ignored[peer] = self.loop.time() log.debug("drop peer %s:%i", peer.address, peer.tcp_port) @@ -87,32 +87,42 @@ class BlobDownloader: if blob.get_is_verified(): return blob self.is_running.set() + tried_for_this_blob: typing.Set['KademliaPeer'] = set() try: while not blob.get_is_verified() and self.is_running.is_set(): - batch: typing.Set['KademliaPeer'] = set() + batch: typing.Set['KademliaPeer'] = set(self.connections.keys()) while not self.peer_queue.empty(): batch.update(self.peer_queue.get_nowait()) - if batch: - self.peer_queue.put_nowait(list(batch)) log.debug( - "running, %d peers, %d ignored, %d active, %s connections", + "%s running, %d peers, %d ignored, %d active, %s connections", blob_hash[:6], len(batch), len(self.ignored), len(self.active_connections), len(self.connections) ) + re_add: typing.Set['KademliaPeer'] = set() for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): - if peer in self.ignored or peer in self.active_connections: + if peer in self.ignored: + continue + if peer in tried_for_this_blob: + continue + if peer in self.active_connections: + if peer not in re_add: + re_add.add(peer) continue if not self.should_race_continue(blob): break log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - just_probe = len(self.connections) == 0 - t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) self.active_connections[peer] = t + tried_for_this_blob.add(peer) + if not re_add: + self.peer_queue.put_nowait(list(batch)) await self.new_peer_or_finished() self.cleanup_active() log.debug("downloaded %s", blob_hash[:8]) return blob finally: blob.close() + if self.loop.is_running(): + self.loop.call_soon(self.cleanup_active) def close(self): self.connection_failures.clear()