From 377442d9f08ddd988bf56bde8388978ca16afd6f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 4 Feb 2019 16:03:08 -0500 Subject: [PATCH 1/3] fix request_blob cancellation --- lbrynet/blob_exchange/downloader.py | 45 ++++++++++++++--------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 1a653339e..c034e6d13 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -29,28 +29,22 @@ class BlobDownloader: self.ignored: typing.Set['KademliaPeer'] = set() self.scores: typing.Dict['KademliaPeer', int] = {} - def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): - async def _request_blob(): - if blob.get_is_verified(): - return - try: - success, keep_connection = await request_blob( - self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout - ) - finally: - if peer in self.active_connections: - self.active_connections.pop(peer) - if not keep_connection and peer not in self.ignored: - self.ignored.add(peer) - log.debug("drop peer %s:%i", peer.address, peer.tcp_port) - elif keep_connection: - log.debug("keep peer %s:%i", peer.address, peer.tcp_port) - if success: - self.scores[peer] = self.scores.get(peer, 0) + 2 - else: - self.scores[peer] = self.scores.get(peer, 0) - 1 - return self.loop.create_task(_request_blob()) + async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): + if blob.get_is_verified(): + return + success, keep_connection = await request_blob( + self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, + self.config.blob_download_timeout + ) + if not keep_connection and peer not in self.ignored: + self.ignored.add(peer) + log.debug("drop peer %s:%i", peer.address, peer.tcp_port) + elif keep_connection: + log.debug("keep peer %s:%i", peer.address, peer.tcp_port) + if success: + self.scores[peer] = self.scores.get(peer, 0) + 2 + else: + self.scores[peer] = self.scores.get(peer, 0) - 1 async def new_peer_or_finished(self, blob: 'BlobFile'): async def get_and_re_add_peers(): @@ -77,7 +71,12 @@ class BlobDownloader: break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - self.active_connections[peer] = self.request_blob_from_peer(blob, peer) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) + self.active_connections[peer] = t + t.add_done_callback( + lambda _: + None if peer not in self.active_connections else self.active_connections.pop(peer) + ) await self.new_peer_or_finished(blob) to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) From c85991704d26b17bd7625ac48e273a330616469e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 4 Feb 2019 16:03:22 -0500 Subject: [PATCH 2/3] catch invalid data error --- lbrynet/blob_exchange/client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 4270cbe24..05e0d9d9d 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -2,6 +2,7 @@ import asyncio import logging import typing import binascii +from lbrynet.error import InvalidBlobHashError, InvalidDataError from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest if typing.TYPE_CHECKING: from lbrynet.blob.blob_file import BlobFile @@ -111,6 +112,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return False, True except asyncio.TimeoutError: return False, False + except (InvalidBlobHashError, InvalidDataError): + log.warning("invalid blob from %s:%i", self.peer_address, self.peer_port) + return False, False finally: await self.close() From 8883587cc7fe64b2e88162c79da60ace7c452179 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 4 Feb 2019 16:09:50 -0500 Subject: [PATCH 3/3] logging --- lbrynet/blob_exchange/client.py | 2 +- lbrynet/blob_exchange/downloader.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 05e0d9d9d..b77bbce28 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -105,8 +105,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol): log.debug(msg) msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) - await self.blob.finished_writing.wait() log.info(msg) + await self.blob.finished_writing.wait() return True, True except asyncio.CancelledError: return False, True diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index c034e6d13..ada08c471 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -87,7 +87,7 @@ class BlobDownloader: if task and not task.done(): task.cancel() await blob.close() - log.info("downloaded %s", blob_hash[:8]) + log.debug("downloaded %s", blob_hash[:8]) return blob except asyncio.CancelledError: while self.active_connections: