diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 4270cbe24..b77bbce28 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -2,6 +2,7 @@ import asyncio import logging import typing import binascii +from lbrynet.error import InvalidBlobHashError, InvalidDataError from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest if typing.TYPE_CHECKING: from lbrynet.blob.blob_file import BlobFile @@ -104,13 +105,16 @@ class BlobExchangeClientProtocol(asyncio.Protocol): log.debug(msg) msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) - await self.blob.finished_writing.wait() log.info(msg) + await self.blob.finished_writing.wait() return True, True except asyncio.CancelledError: return False, True except asyncio.TimeoutError: return False, False + except (InvalidBlobHashError, InvalidDataError): + log.warning("invalid blob from %s:%i", self.peer_address, self.peer_port) + return False, False finally: await self.close() diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 1a653339e..ada08c471 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -29,28 +29,22 @@ class BlobDownloader: self.ignored: typing.Set['KademliaPeer'] = set() self.scores: typing.Dict['KademliaPeer', int] = {} - def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): - async def _request_blob(): - if blob.get_is_verified(): - return - try: - success, keep_connection = await request_blob( - self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout - ) - finally: - if peer in self.active_connections: - self.active_connections.pop(peer) - if not keep_connection and peer not in self.ignored: - self.ignored.add(peer) - log.debug("drop peer %s:%i", peer.address, peer.tcp_port) - elif keep_connection: - log.debug("keep peer %s:%i", peer.address, peer.tcp_port) - if success: - self.scores[peer] = self.scores.get(peer, 0) + 2 - else: - self.scores[peer] = self.scores.get(peer, 0) - 1 - return self.loop.create_task(_request_blob()) + async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): + if blob.get_is_verified(): + return + success, keep_connection = await request_blob( + self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, + self.config.blob_download_timeout + ) + if not keep_connection and peer not in self.ignored: + self.ignored.add(peer) + log.debug("drop peer %s:%i", peer.address, peer.tcp_port) + elif keep_connection: + log.debug("keep peer %s:%i", peer.address, peer.tcp_port) + if success: + self.scores[peer] = self.scores.get(peer, 0) + 2 + else: + self.scores[peer] = self.scores.get(peer, 0) - 1 async def new_peer_or_finished(self, blob: 'BlobFile'): async def get_and_re_add_peers(): @@ -77,7 +71,12 @@ class BlobDownloader: break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - self.active_connections[peer] = self.request_blob_from_peer(blob, peer) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) + self.active_connections[peer] = t + t.add_done_callback( + lambda _: + None if peer not in self.active_connections else self.active_connections.pop(peer) + ) await self.new_peer_or_finished(blob) to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) @@ -88,7 +87,7 @@ class BlobDownloader: if task and not task.done(): task.cancel() await blob.close() - log.info("downloaded %s", blob_hash[:8]) + log.debug("downloaded %s", blob_hash[:8]) return blob except asyncio.CancelledError: while self.active_connections: