diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index e73c35ace..c4c8bde30 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -1,7 +1,9 @@ import asyncio +import time import logging import typing import binascii +from typing import Optional from lbry.error import InvalidBlobHashError, InvalidDataError from lbry.blob_exchange.serialization import BlobResponse, BlobRequest from lbry.utils import cache_concurrent @@ -65,7 +67,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.blob.set_length(blob_response.length) elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash: # the server started sending a blob we didn't request - log.warning("mismatch with self.blob %s", self.blob.blob_hash) + log.warning("%s started sending blob we didnt request %s instead of %s", self.peer_address, + blob_response.blob_hash, self.blob.blob_hash) return if response.responses: log.debug("got response from %s:%i <- %s", self.peer_address, self.peer_port, response.to_dict()) @@ -94,10 +97,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) - async def _download_blob(self) -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: """ - :return: download success (bool), keep connection (bool) + :return: download success (bool), connected protocol (BlobExchangeClientProtocol) """ + start_time = time.perf_counter() request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash) blob_hash = self.blob.blob_hash if not self.peer_address: @@ -147,12 +151,14 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return self._blob_bytes_received, self.close() msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ f" timeout in {self.peer_timeout}" - log.debug(msg) + log.info(msg) msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) - log.info(msg) + log.info("%s at %fMB/s", msg, + round((float(self._blob_bytes_received) / + float(time.perf_counter() - start_time)) / 1000000.0, 2)) # await self.blob.finished_writing.wait() not necessary, but a dangerous change. TODO: is it needed? - return self._blob_bytes_received, self.transport + return self._blob_bytes_received, self except asyncio.TimeoutError: return self._blob_bytes_received, self.close() except (InvalidBlobHashError, InvalidDataError): @@ -173,11 +179,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.transport = None self.buf = b'' - async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: self.closed.clear() blob_hash = blob.blob_hash if blob.get_is_verified() or not blob.is_writeable(): - return 0, self.transport + return 0, self try: self._blob_bytes_received = 0 self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port) @@ -218,33 +224,32 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent -async def request_blob(loop: asyncio.AbstractEventLoop, blob: typing.Optional['AbstractBlob'], address: str, +async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['AbstractBlob'], address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, - connected_transport: asyncio.Transport = None, connection_id: int = 0, - connection_manager: typing.Optional['ConnectionManager'] = None)\ - -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + connected_protocol: Optional['BlobExchangeClientProtocol'] = None, + connection_id: int = 0, connection_manager: Optional['ConnectionManager'] = None)\ + -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: """ - Returns [, ] + Returns [, ] """ - protocol = BlobExchangeClientProtocol( - loop, blob_download_timeout, connection_manager - ) - if connected_transport and not connected_transport.is_closing(): - connected_transport.set_protocol(protocol) - protocol.transport = connected_transport - log.debug("reusing connection for %s:%d", address, tcp_port) + protocol = connected_protocol + if not connected_protocol or not connected_protocol.transport or connected_protocol.transport.is_closing(): + connected_protocol = None + protocol = BlobExchangeClientProtocol( + loop, blob_download_timeout, connection_manager + ) else: - connected_transport = None + log.debug("reusing connection for %s:%d", address, tcp_port) try: - if not connected_transport: + if not connected_protocol: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) - connected_transport = protocol.transport + connected_protocol = protocol if blob is None or blob.get_is_verified() or not blob.is_writeable(): # blob is None happens when we are just opening a connection # file exists but not verified means someone is writing right now, give it time, come back later - return 0, connected_transport - return await protocol.download_blob(blob) + return 0, connected_protocol + return await connected_protocol.download_blob(blob) except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): return 0, None diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 84e8414bc..153bb92a3 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -9,6 +9,7 @@ if typing.TYPE_CHECKING: from lbry.dht.peer import KademliaPeer from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_file import AbstractBlob + from lbry.blob_exchange.client import BlobExchangeClientProtocol log = logging.getLogger(__name__) @@ -27,7 +28,7 @@ class BlobDownloader: self.scores: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {} self.connection_failures: typing.List['KademliaPeer'] = [] - self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} + self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {} self.is_running = asyncio.Event(loop=self.loop) def should_race_continue(self, blob: 'AbstractBlob'): @@ -40,26 +41,24 @@ class BlobDownloader: just_probe: bool = False): if blob.get_is_verified(): return - transport = self.connections.get(peer) start = self.loop.time() - bytes_received, transport = await request_blob( + bytes_received, protocol = await request_blob( self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, - connection_manager=self.blob_manager.connection_manager - + self.config.blob_download_timeout, connected_protocol=self.connections.get(peer), + connection_id=connection_id, connection_manager=self.blob_manager.connection_manager ) - if not bytes_received and not transport and peer not in self.connection_failures: + if not bytes_received and not protocol and peer not in self.connection_failures: self.connection_failures.append(peer) - if not transport and peer not in self.ignored: + if not protocol and peer not in self.ignored: self.ignored[peer] = self.loop.time() log.debug("drop peer %s:%i", peer.address, peer.tcp_port) self.failures[peer] = self.failures.get(peer, 0) + 1 if peer in self.connections: del self.connections[peer] - elif transport: + elif protocol: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) self.failures[peer] = 0 - self.connections[peer] = transport + self.connections[peer] = protocol elapsed = self.loop.time() - start self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 @@ -120,8 +119,8 @@ class BlobDownloader: self.scores.clear() self.ignored.clear() self.is_running.clear() - for transport in self.connections.values(): - transport.close() + for protocol in self.connections.values(): + protocol.close() async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node', diff --git a/lbry/lbry/blob_exchange/server.py b/lbry/lbry/blob_exchange/server.py index 93867e2e7..eeabe538e 100644 --- a/lbry/lbry/blob_exchange/server.py +++ b/lbry/lbry/blob_exchange/server.py @@ -102,7 +102,7 @@ class BlobServerProtocol(asyncio.Protocol): try: sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop) self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) - log.debug("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port) + log.info("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port) except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, asyncio.TimeoutError) as err: if isinstance(err, asyncio.TimeoutError): log.debug("timed out sending blob %s to %s", bh, peer_address) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index d6d358db5..625db46b5 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -208,8 +208,36 @@ class Node: task.cancel() async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): + async def ping(_peer): + try: + await self.protocol.get_rpc_peer(_peer).ping() + result_queue.put_nowait([_peer]) + except asyncio.TimeoutError: + pass + async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): - result_queue.put_nowait(results) + to_put = [] + for peer in results: + if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: + continue + is_good = self.protocol.peer_manager.peer_is_good(peer) + if is_good: + to_put.append(peer) + elif is_good is None: + udp_port_to_try = 4444 + if 3400 > peer.tcp_port > 3332: + if not peer.udp_port: + udp_port_to_try = (peer.tcp_port - 3333) + 4444 + elif 4500 > peer.tcp_port > 4443: + if not peer.udp_port: + udp_port_to_try = peer.tcp_port + if not peer.udp_port: + peer.update_udp_port(udp_port_to_try) + self.loop.create_task(ping(peer)) + else: + log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) + if to_put: + result_queue.put_nowait(to_put) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ diff --git a/lbry/lbry/extras/daemon/Daemon.py b/lbry/lbry/extras/daemon/Daemon.py index 179bd0be2..5c12ddebf 100644 --- a/lbry/lbry/extras/daemon/Daemon.py +++ b/lbry/lbry/extras/daemon/Daemon.py @@ -3169,9 +3169,10 @@ class Daemon(metaclass=JSONRPCServerType): else: search_bottom_out_limit = 4 peers = [] - async for new_peers in self.dht_node.get_iterative_value_finder(unhexlify(blob_hash.encode()), max_results=1, - bottom_out_limit=search_bottom_out_limit): - peers.extend(new_peers) + peer_q = asyncio.Queue(loop=self.component_manager.loop) + await self.dht_node._value_producer(blob_hash, peer_q) + while not peer_q.empty(): + peers.extend(peer_q.get_nowait()) results = [ { "node_id": hexlify(peer.node_id).decode(), diff --git a/lbry/tests/unit/blob_exchange/test_transfer_blob.py b/lbry/tests/unit/blob_exchange/test_transfer_blob.py index ae9764939..5c9a0c587 100644 --- a/lbry/tests/unit/blob_exchange/test_transfer_blob.py +++ b/lbry/tests/unit/blob_exchange/test_transfer_blob.py @@ -235,10 +235,10 @@ class TestBlobExchange(BlobExchangeTestBase): client_blob = self.client_blob_manager.get_blob(blob_hash) # download the blob - downloaded, transport = await request_blob(self.loop, client_blob, self.server_from_client.address, + downloaded, protocol = await request_blob(self.loop, client_blob, self.server_from_client.address, self.server_from_client.tcp_port, 2, 3) - self.assertIsNotNone(transport) - self.assertFalse(transport.is_closing()) + self.assertIsNotNone(protocol) + self.assertFalse(protocol.transport.is_closing()) await client_blob.verified.wait() self.assertTrue(client_blob.get_is_verified()) self.assertTrue(downloaded) @@ -248,11 +248,11 @@ class TestBlobExchange(BlobExchangeTestBase): await asyncio.sleep(0.5, loop=self.loop) # download the blob again - downloaded, transport2 = await request_blob(self.loop, client_blob, self.server_from_client.address, + downloaded, protocol2 = await request_blob(self.loop, client_blob, self.server_from_client.address, self.server_from_client.tcp_port, 2, 3, - connected_transport=transport) - self.assertTrue(transport is transport2) - self.assertFalse(transport.is_closing()) + connected_protocol=protocol) + self.assertTrue(protocol is protocol2) + self.assertFalse(protocol.transport.is_closing()) await client_blob.verified.wait() self.assertTrue(client_blob.get_is_verified()) self.assertTrue(downloaded) @@ -260,11 +260,10 @@ class TestBlobExchange(BlobExchangeTestBase): # check that the connection times out from the server side await asyncio.sleep(0.9, loop=self.loop) - self.assertFalse(transport.is_closing()) - self.assertIsNotNone(transport._sock) + self.assertFalse(protocol.transport.is_closing()) + self.assertIsNotNone(protocol.transport._sock) await asyncio.sleep(0.1, loop=self.loop) - self.assertIsNone(transport._sock) - self.assertTrue(transport.is_closing()) + self.assertIsNone(protocol.transport) def test_max_request_size(self): protocol = BlobServerProtocol(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')