diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 1a653339e..c034e6d13 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -29,28 +29,22 @@ class BlobDownloader: self.ignored: typing.Set['KademliaPeer'] = set() self.scores: typing.Dict['KademliaPeer', int] = {} - def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): - async def _request_blob(): - if blob.get_is_verified(): - return - try: - success, keep_connection = await request_blob( - self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout - ) - finally: - if peer in self.active_connections: - self.active_connections.pop(peer) - if not keep_connection and peer not in self.ignored: - self.ignored.add(peer) - log.debug("drop peer %s:%i", peer.address, peer.tcp_port) - elif keep_connection: - log.debug("keep peer %s:%i", peer.address, peer.tcp_port) - if success: - self.scores[peer] = self.scores.get(peer, 0) + 2 - else: - self.scores[peer] = self.scores.get(peer, 0) - 1 - return self.loop.create_task(_request_blob()) + async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): + if blob.get_is_verified(): + return + success, keep_connection = await request_blob( + self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, + self.config.blob_download_timeout + ) + if not keep_connection and peer not in self.ignored: + self.ignored.add(peer) + log.debug("drop peer %s:%i", peer.address, peer.tcp_port) + elif keep_connection: + log.debug("keep peer %s:%i", peer.address, peer.tcp_port) + if success: + self.scores[peer] = self.scores.get(peer, 0) + 2 + else: + self.scores[peer] = self.scores.get(peer, 0) - 1 async def new_peer_or_finished(self, blob: 'BlobFile'): async def get_and_re_add_peers(): @@ -77,7 +71,12 @@ class BlobDownloader: break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - self.active_connections[peer] = self.request_blob_from_peer(blob, peer) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) + self.active_connections[peer] = t + t.add_done_callback( + lambda _: + None if peer not in self.active_connections else self.active_connections.pop(peer) + ) await self.new_peer_or_finished(blob) to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True)