diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index e73c35ace..8009a672f 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -1,7 +1,9 @@ import asyncio +import time import logging import typing import binascii +from typing import Optional from lbry.error import InvalidBlobHashError, InvalidDataError from lbry.blob_exchange.serialization import BlobResponse, BlobRequest from lbry.utils import cache_concurrent @@ -65,7 +67,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.blob.set_length(blob_response.length) elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash: # the server started sending a blob we didn't request - log.warning("mismatch with self.blob %s", self.blob.blob_hash) + log.warning("%s started sending blob we didnt request %s instead of %s", self.peer_address, + blob_response.blob_hash, self.blob.blob_hash) return if response.responses: log.debug("got response from %s:%i <- %s", self.peer_address, self.peer_port, response.to_dict()) @@ -94,10 +97,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) - async def _download_blob(self) -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: """ - :return: download success (bool), keep connection (bool) + :return: download success (bool), connected protocol (BlobExchangeClientProtocol) """ + start_time = time.perf_counter() request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash) blob_hash = self.blob.blob_hash if not self.peer_address: @@ -147,12 +151,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return self._blob_bytes_received, self.close() msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ f" timeout in {self.peer_timeout}" - log.debug(msg) + log.info(msg) msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) - log.info(msg) + log.info(msg + f" at {round((float(self._blob_bytes_received) / float(time.perf_counter() - start_time)) / 1000000.0, 2)}MB/s") # await self.blob.finished_writing.wait() not necessary, but a dangerous change. TODO: is it needed? - return self._blob_bytes_received, self.transport + return self._blob_bytes_received, self except asyncio.TimeoutError: return self._blob_bytes_received, self.close() except (InvalidBlobHashError, InvalidDataError): @@ -173,11 +177,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.transport = None self.buf = b'' - async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: self.closed.clear() blob_hash = blob.blob_hash if blob.get_is_verified() or not blob.is_writeable(): - return 0, self.transport + return 0, self try: self._blob_bytes_received = 0 self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port) @@ -218,33 +222,32 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent -async def request_blob(loop: asyncio.AbstractEventLoop, blob: typing.Optional['AbstractBlob'], address: str, +async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['AbstractBlob'], address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, - connected_transport: asyncio.Transport = None, connection_id: int = 0, - connection_manager: typing.Optional['ConnectionManager'] = None)\ - -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + connected_protocol: Optional['BlobExchangeClientProtocol'] = None, + connection_id: int = 0, connection_manager: Optional['ConnectionManager'] = None)\ + -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: """ - Returns [, ] + Returns [, ] """ - protocol = BlobExchangeClientProtocol( - loop, blob_download_timeout, connection_manager - ) - if connected_transport and not connected_transport.is_closing(): - connected_transport.set_protocol(protocol) - protocol.transport = connected_transport - log.debug("reusing connection for %s:%d", address, tcp_port) + protocol = connected_protocol + if not connected_protocol or not connected_protocol.transport or connected_protocol.transport.is_closing(): + connected_protocol = None + protocol = BlobExchangeClientProtocol( + loop, blob_download_timeout, connection_manager + ) else: - connected_transport = None + log.debug("reusing connection for %s:%d", address, tcp_port) try: - if not connected_transport: + if not connected_protocol: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) - connected_transport = protocol.transport + connected_protocol = protocol if blob is None or blob.get_is_verified() or not blob.is_writeable(): # blob is None happens when we are just opening a connection # file exists but not verified means someone is writing right now, give it time, come back later - return 0, connected_transport - return await protocol.download_blob(blob) + return 0, connected_protocol + return await connected_protocol.download_blob(blob) except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): return 0, None diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 84e8414bc..153bb92a3 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -9,6 +9,7 @@ if typing.TYPE_CHECKING: from lbry.dht.peer import KademliaPeer from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_file import AbstractBlob + from lbry.blob_exchange.client import BlobExchangeClientProtocol log = logging.getLogger(__name__) @@ -27,7 +28,7 @@ class BlobDownloader: self.scores: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {} self.connection_failures: typing.List['KademliaPeer'] = [] - self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} + self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {} self.is_running = asyncio.Event(loop=self.loop) def should_race_continue(self, blob: 'AbstractBlob'): @@ -40,26 +41,24 @@ class BlobDownloader: just_probe: bool = False): if blob.get_is_verified(): return - transport = self.connections.get(peer) start = self.loop.time() - bytes_received, transport = await request_blob( + bytes_received, protocol = await request_blob( self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, - connection_manager=self.blob_manager.connection_manager - + self.config.blob_download_timeout, connected_protocol=self.connections.get(peer), + connection_id=connection_id, connection_manager=self.blob_manager.connection_manager ) - if not bytes_received and not transport and peer not in self.connection_failures: + if not bytes_received and not protocol and peer not in self.connection_failures: self.connection_failures.append(peer) - if not transport and peer not in self.ignored: + if not protocol and peer not in self.ignored: self.ignored[peer] = self.loop.time() log.debug("drop peer %s:%i", peer.address, peer.tcp_port) self.failures[peer] = self.failures.get(peer, 0) + 1 if peer in self.connections: del self.connections[peer] - elif transport: + elif protocol: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) self.failures[peer] = 0 - self.connections[peer] = transport + self.connections[peer] = protocol elapsed = self.loop.time() - start self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 @@ -120,8 +119,8 @@ class BlobDownloader: self.scores.clear() self.ignored.clear() self.is_running.clear() - for transport in self.connections.values(): - transport.close() + for protocol in self.connections.values(): + protocol.close() async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node', diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index d6d358db5..e09f45c10 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -208,8 +208,36 @@ class Node: task.cancel() async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): + async def ping(_peer): + try: + await self.protocol.get_rpc_peer(_peer).ping() + result_queue.put_nowait([_peer]) + except asyncio.TimeoutError: + log.info("blob peer timed out udp") + pass + async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): - result_queue.put_nowait(results) + to_put = [] + for peer in results: + is_good = self.protocol.peer_manager.peer_is_good(peer) + if is_good: + to_put.append(peer) + elif is_good is None: + udp_port_to_try = 4444 + if 3400 > peer.tcp_port > 3332: + if not peer.udp_port: + udp_port_to_try = (peer.tcp_port - 3333) + 4444 + elif 4500 > peer.tcp_port > 4443: + if not peer.udp_port: + udp_port_to_try = peer.tcp_port + if not peer.udp_port: + log.info("guess %i for %s:%i", udp_port_to_try, peer.address, peer.tcp_port) + peer.update_udp_port(udp_port_to_try) + self.loop.create_task(ping(peer)) + else: + log.debug("skip bad peer %s:%i for %s", blob_hash) + if to_put: + result_queue.put_nowait(to_put) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[