Merge pull request #2401 from lbryio/check-blob-peers-pingable

UDP ping blob peers to prioritize those who can be connected to
This commit is contained in:
Jack Robison 2019-08-19 13:01:40 -04:00 committed by GitHub
commit fde8c34088
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 53 deletions

View file

@ -1,7 +1,9 @@
import asyncio import asyncio
import time
import logging import logging
import typing import typing
import binascii import binascii
from typing import Optional
from lbry.error import InvalidBlobHashError, InvalidDataError from lbry.error import InvalidBlobHashError, InvalidDataError
from lbry.blob_exchange.serialization import BlobResponse, BlobRequest from lbry.blob_exchange.serialization import BlobResponse, BlobRequest
from lbry.utils import cache_concurrent from lbry.utils import cache_concurrent
@ -65,7 +67,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.blob.set_length(blob_response.length) self.blob.set_length(blob_response.length)
elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash: elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash:
# the server started sending a blob we didn't request # the server started sending a blob we didn't request
log.warning("mismatch with self.blob %s", self.blob.blob_hash) log.warning("%s started sending blob we didnt request %s instead of %s", self.peer_address,
blob_response.blob_hash, self.blob.blob_hash)
return return
if response.responses: if response.responses:
log.debug("got response from %s:%i <- %s", self.peer_address, self.peer_port, response.to_dict()) log.debug("got response from %s:%i <- %s", self.peer_address, self.peer_port, response.to_dict())
@ -94,10 +97,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
if self._response_fut and not self._response_fut.done(): if self._response_fut and not self._response_fut.done():
self._response_fut.set_exception(err) self._response_fut.set_exception(err)
async def _download_blob(self) -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
""" """
:return: download success (bool), keep connection (bool) :return: download success (bool), connected protocol (BlobExchangeClientProtocol)
""" """
start_time = time.perf_counter()
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash) request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
blob_hash = self.blob.blob_hash blob_hash = self.blob.blob_hash
if not self.peer_address: if not self.peer_address:
@ -147,12 +151,14 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
return self._blob_bytes_received, self.close() return self._blob_bytes_received, self.close()
msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \
f" timeout in {self.peer_timeout}" f" timeout in {self.peer_timeout}"
log.debug(msg) log.info(msg)
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
log.info(msg) log.info("%s at %fMB/s", msg,
round((float(self._blob_bytes_received) /
float(time.perf_counter() - start_time)) / 1000000.0, 2))
# await self.blob.finished_writing.wait() not necessary, but a dangerous change. TODO: is it needed? # await self.blob.finished_writing.wait() not necessary, but a dangerous change. TODO: is it needed?
return self._blob_bytes_received, self.transport return self._blob_bytes_received, self
except asyncio.TimeoutError: except asyncio.TimeoutError:
return self._blob_bytes_received, self.close() return self._blob_bytes_received, self.close()
except (InvalidBlobHashError, InvalidDataError): except (InvalidBlobHashError, InvalidDataError):
@ -173,11 +179,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.transport = None self.transport = None
self.buf = b'' self.buf = b''
async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
self.closed.clear() self.closed.clear()
blob_hash = blob.blob_hash blob_hash = blob.blob_hash
if blob.get_is_verified() or not blob.is_writeable(): if blob.get_is_verified() or not blob.is_writeable():
return 0, self.transport return 0, self
try: try:
self._blob_bytes_received = 0 self._blob_bytes_received = 0
self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port) self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port)
@ -218,33 +224,32 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
@cache_concurrent @cache_concurrent
async def request_blob(loop: asyncio.AbstractEventLoop, blob: typing.Optional['AbstractBlob'], address: str, async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['AbstractBlob'], address: str,
tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float,
connected_transport: asyncio.Transport = None, connection_id: int = 0, connected_protocol: Optional['BlobExchangeClientProtocol'] = None,
connection_manager: typing.Optional['ConnectionManager'] = None)\ connection_id: int = 0, connection_manager: Optional['ConnectionManager'] = None)\
-> typing.Tuple[int, typing.Optional[asyncio.Transport]]: -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
""" """
Returns [<downloaded blob>, <keep connection>] Returns [<amount of bytes received>, <client protocol if connected>]
""" """
protocol = BlobExchangeClientProtocol( protocol = connected_protocol
loop, blob_download_timeout, connection_manager if not connected_protocol or not connected_protocol.transport or connected_protocol.transport.is_closing():
) connected_protocol = None
if connected_transport and not connected_transport.is_closing(): protocol = BlobExchangeClientProtocol(
connected_transport.set_protocol(protocol) loop, blob_download_timeout, connection_manager
protocol.transport = connected_transport )
log.debug("reusing connection for %s:%d", address, tcp_port)
else: else:
connected_transport = None log.debug("reusing connection for %s:%d", address, tcp_port)
try: try:
if not connected_transport: if not connected_protocol:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout, loop=loop) peer_connect_timeout, loop=loop)
connected_transport = protocol.transport connected_protocol = protocol
if blob is None or blob.get_is_verified() or not blob.is_writeable(): if blob is None or blob.get_is_verified() or not blob.is_writeable():
# blob is None happens when we are just opening a connection # blob is None happens when we are just opening a connection
# file exists but not verified means someone is writing right now, give it time, come back later # file exists but not verified means someone is writing right now, give it time, come back later
return 0, connected_transport return 0, connected_protocol
return await protocol.download_blob(blob) return await connected_protocol.download_blob(blob)
except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError):
return 0, None return 0, None

View file

@ -9,6 +9,7 @@ if typing.TYPE_CHECKING:
from lbry.dht.peer import KademliaPeer from lbry.dht.peer import KademliaPeer
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.blob.blob_file import AbstractBlob from lbry.blob.blob_file import AbstractBlob
from lbry.blob_exchange.client import BlobExchangeClientProtocol
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -27,7 +28,7 @@ class BlobDownloader:
self.scores: typing.Dict['KademliaPeer', int] = {} self.scores: typing.Dict['KademliaPeer', int] = {}
self.failures: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {}
self.connection_failures: typing.List['KademliaPeer'] = [] self.connection_failures: typing.List['KademliaPeer'] = []
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {}
self.is_running = asyncio.Event(loop=self.loop) self.is_running = asyncio.Event(loop=self.loop)
def should_race_continue(self, blob: 'AbstractBlob'): def should_race_continue(self, blob: 'AbstractBlob'):
@ -40,26 +41,24 @@ class BlobDownloader:
just_probe: bool = False): just_probe: bool = False):
if blob.get_is_verified(): if blob.get_is_verified():
return return
transport = self.connections.get(peer)
start = self.loop.time() start = self.loop.time()
bytes_received, transport = await request_blob( bytes_received, protocol = await request_blob(
self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, self.config.blob_download_timeout, connected_protocol=self.connections.get(peer),
connection_manager=self.blob_manager.connection_manager connection_id=connection_id, connection_manager=self.blob_manager.connection_manager
) )
if not bytes_received and not transport and peer not in self.connection_failures: if not bytes_received and not protocol and peer not in self.connection_failures:
self.connection_failures.append(peer) self.connection_failures.append(peer)
if not transport and peer not in self.ignored: if not protocol and peer not in self.ignored:
self.ignored[peer] = self.loop.time() self.ignored[peer] = self.loop.time()
log.debug("drop peer %s:%i", peer.address, peer.tcp_port) log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
self.failures[peer] = self.failures.get(peer, 0) + 1 self.failures[peer] = self.failures.get(peer, 0) + 1
if peer in self.connections: if peer in self.connections:
del self.connections[peer] del self.connections[peer]
elif transport: elif protocol:
log.debug("keep peer %s:%i", peer.address, peer.tcp_port) log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
self.failures[peer] = 0 self.failures[peer] = 0
self.connections[peer] = transport self.connections[peer] = protocol
elapsed = self.loop.time() - start elapsed = self.loop.time() - start
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1 self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
@ -120,8 +119,8 @@ class BlobDownloader:
self.scores.clear() self.scores.clear()
self.ignored.clear() self.ignored.clear()
self.is_running.clear() self.is_running.clear()
for transport in self.connections.values(): for protocol in self.connections.values():
transport.close() protocol.close()
async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node', async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node',

View file

@ -102,7 +102,7 @@ class BlobServerProtocol(asyncio.Protocol):
try: try:
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop) sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop)
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent)
log.debug("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port) log.info("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port)
except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, asyncio.TimeoutError) as err: except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, asyncio.TimeoutError) as err:
if isinstance(err, asyncio.TimeoutError): if isinstance(err, asyncio.TimeoutError):
log.debug("timed out sending blob %s to %s", bh, peer_address) log.debug("timed out sending blob %s to %s", bh, peer_address)

View file

@ -208,8 +208,36 @@ class Node:
task.cancel() task.cancel()
async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue):
async def ping(_peer):
try:
await self.protocol.get_rpc_peer(_peer).ping()
result_queue.put_nowait([_peer])
except asyncio.TimeoutError:
pass
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
result_queue.put_nowait(results) to_put = []
for peer in results:
if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port:
continue
is_good = self.protocol.peer_manager.peer_is_good(peer)
if is_good:
to_put.append(peer)
elif is_good is None:
udp_port_to_try = 4444
if 3400 > peer.tcp_port > 3332:
if not peer.udp_port:
udp_port_to_try = (peer.tcp_port - 3333) + 4444
elif 4500 > peer.tcp_port > 4443:
if not peer.udp_port:
udp_port_to_try = peer.tcp_port
if not peer.udp_port:
peer.update_udp_port(udp_port_to_try)
self.loop.create_task(ping(peer))
else:
log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash)
if to_put:
result_queue.put_nowait(to_put)
def accumulate_peers(self, search_queue: asyncio.Queue, def accumulate_peers(self, search_queue: asyncio.Queue,
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[

View file

@ -3169,9 +3169,10 @@ class Daemon(metaclass=JSONRPCServerType):
else: else:
search_bottom_out_limit = 4 search_bottom_out_limit = 4
peers = [] peers = []
async for new_peers in self.dht_node.get_iterative_value_finder(unhexlify(blob_hash.encode()), max_results=1, peer_q = asyncio.Queue(loop=self.component_manager.loop)
bottom_out_limit=search_bottom_out_limit): await self.dht_node._value_producer(blob_hash, peer_q)
peers.extend(new_peers) while not peer_q.empty():
peers.extend(peer_q.get_nowait())
results = [ results = [
{ {
"node_id": hexlify(peer.node_id).decode(), "node_id": hexlify(peer.node_id).decode(),

View file

@ -235,10 +235,10 @@ class TestBlobExchange(BlobExchangeTestBase):
client_blob = self.client_blob_manager.get_blob(blob_hash) client_blob = self.client_blob_manager.get_blob(blob_hash)
# download the blob # download the blob
downloaded, transport = await request_blob(self.loop, client_blob, self.server_from_client.address, downloaded, protocol = await request_blob(self.loop, client_blob, self.server_from_client.address,
self.server_from_client.tcp_port, 2, 3) self.server_from_client.tcp_port, 2, 3)
self.assertIsNotNone(transport) self.assertIsNotNone(protocol)
self.assertFalse(transport.is_closing()) self.assertFalse(protocol.transport.is_closing())
await client_blob.verified.wait() await client_blob.verified.wait()
self.assertTrue(client_blob.get_is_verified()) self.assertTrue(client_blob.get_is_verified())
self.assertTrue(downloaded) self.assertTrue(downloaded)
@ -248,11 +248,11 @@ class TestBlobExchange(BlobExchangeTestBase):
await asyncio.sleep(0.5, loop=self.loop) await asyncio.sleep(0.5, loop=self.loop)
# download the blob again # download the blob again
downloaded, transport2 = await request_blob(self.loop, client_blob, self.server_from_client.address, downloaded, protocol2 = await request_blob(self.loop, client_blob, self.server_from_client.address,
self.server_from_client.tcp_port, 2, 3, self.server_from_client.tcp_port, 2, 3,
connected_transport=transport) connected_protocol=protocol)
self.assertTrue(transport is transport2) self.assertTrue(protocol is protocol2)
self.assertFalse(transport.is_closing()) self.assertFalse(protocol.transport.is_closing())
await client_blob.verified.wait() await client_blob.verified.wait()
self.assertTrue(client_blob.get_is_verified()) self.assertTrue(client_blob.get_is_verified())
self.assertTrue(downloaded) self.assertTrue(downloaded)
@ -260,11 +260,10 @@ class TestBlobExchange(BlobExchangeTestBase):
# check that the connection times out from the server side # check that the connection times out from the server side
await asyncio.sleep(0.9, loop=self.loop) await asyncio.sleep(0.9, loop=self.loop)
self.assertFalse(transport.is_closing()) self.assertFalse(protocol.transport.is_closing())
self.assertIsNotNone(transport._sock) self.assertIsNotNone(protocol.transport._sock)
await asyncio.sleep(0.1, loop=self.loop) await asyncio.sleep(0.1, loop=self.loop)
self.assertIsNone(transport._sock) self.assertIsNone(protocol.transport)
self.assertTrue(transport.is_closing())
def test_max_request_size(self): def test_max_request_size(self):
protocol = BlobServerProtocol(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP') protocol = BlobServerProtocol(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')