reuse blob protocol object instead of transport

This commit is contained in:
Jack Robison 2019-08-16 15:52:02 -04:00
parent 819a551b77
commit 1d9705fb17
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 68 additions and 38 deletions

View file

@ -1,7 +1,9 @@
import asyncio
import time
import logging
import typing
import binascii
from typing import Optional
from lbry.error import InvalidBlobHashError, InvalidDataError
from lbry.blob_exchange.serialization import BlobResponse, BlobRequest
from lbry.utils import cache_concurrent
@ -65,7 +67,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.blob.set_length(blob_response.length)
elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash:
# the server started sending a blob we didn't request
log.warning("mismatch with self.blob %s", self.blob.blob_hash)
log.warning("%s started sending blob we didnt request %s instead of %s", self.peer_address,
blob_response.blob_hash, self.blob.blob_hash)
return
if response.responses:
log.debug("got response from %s:%i <- %s", self.peer_address, self.peer_port, response.to_dict())
@ -94,10 +97,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
if self._response_fut and not self._response_fut.done():
self._response_fut.set_exception(err)
async def _download_blob(self) -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
"""
:return: download success (bool), keep connection (bool)
:return: download success (bool), connected protocol (BlobExchangeClientProtocol)
"""
start_time = time.perf_counter()
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
blob_hash = self.blob.blob_hash
if not self.peer_address:
@ -147,12 +151,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
return self._blob_bytes_received, self.close()
msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \
f" timeout in {self.peer_timeout}"
log.debug(msg)
log.info(msg)
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
log.info(msg)
log.info(msg + f" at {round((float(self._blob_bytes_received) / float(time.perf_counter() - start_time)) / 1000000.0, 2)}MB/s")
# await self.blob.finished_writing.wait() not necessary, but a dangerous change. TODO: is it needed?
return self._blob_bytes_received, self.transport
return self._blob_bytes_received, self
except asyncio.TimeoutError:
return self._blob_bytes_received, self.close()
except (InvalidBlobHashError, InvalidDataError):
@ -173,11 +177,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.transport = None
self.buf = b''
async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
self.closed.clear()
blob_hash = blob.blob_hash
if blob.get_is_verified() or not blob.is_writeable():
return 0, self.transport
return 0, self
try:
self._blob_bytes_received = 0
self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port)
@ -218,33 +222,32 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
@cache_concurrent
async def request_blob(loop: asyncio.AbstractEventLoop, blob: typing.Optional['AbstractBlob'], address: str,
async def request_blob(loop: asyncio.AbstractEventLoop, blob: Optional['AbstractBlob'], address: str,
tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float,
connected_transport: asyncio.Transport = None, connection_id: int = 0,
connection_manager: typing.Optional['ConnectionManager'] = None)\
-> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
connected_protocol: Optional['BlobExchangeClientProtocol'] = None,
connection_id: int = 0, connection_manager: Optional['ConnectionManager'] = None)\
-> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]:
"""
Returns [<downloaded blob>, <keep connection>]
Returns [<amount of bytes received>, <client protocol if connected>]
"""
protocol = BlobExchangeClientProtocol(
loop, blob_download_timeout, connection_manager
)
if connected_transport and not connected_transport.is_closing():
connected_transport.set_protocol(protocol)
protocol.transport = connected_transport
log.debug("reusing connection for %s:%d", address, tcp_port)
protocol = connected_protocol
if not connected_protocol or not connected_protocol.transport or connected_protocol.transport.is_closing():
connected_protocol = None
protocol = BlobExchangeClientProtocol(
loop, blob_download_timeout, connection_manager
)
else:
connected_transport = None
log.debug("reusing connection for %s:%d", address, tcp_port)
try:
if not connected_transport:
if not connected_protocol:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout, loop=loop)
connected_transport = protocol.transport
connected_protocol = protocol
if blob is None or blob.get_is_verified() or not blob.is_writeable():
# blob is None happens when we are just opening a connection
# file exists but not verified means someone is writing right now, give it time, come back later
return 0, connected_transport
return await protocol.download_blob(blob)
return 0, connected_protocol
return await connected_protocol.download_blob(blob)
except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError):
return 0, None

View file

@ -9,6 +9,7 @@ if typing.TYPE_CHECKING:
from lbry.dht.peer import KademliaPeer
from lbry.blob.blob_manager import BlobManager
from lbry.blob.blob_file import AbstractBlob
from lbry.blob_exchange.client import BlobExchangeClientProtocol
log = logging.getLogger(__name__)
@ -27,7 +28,7 @@ class BlobDownloader:
self.scores: typing.Dict['KademliaPeer', int] = {}
self.failures: typing.Dict['KademliaPeer', int] = {}
self.connection_failures: typing.List['KademliaPeer'] = []
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {}
self.is_running = asyncio.Event(loop=self.loop)
def should_race_continue(self, blob: 'AbstractBlob'):
@ -40,26 +41,24 @@ class BlobDownloader:
just_probe: bool = False):
if blob.get_is_verified():
return
transport = self.connections.get(peer)
start = self.loop.time()
bytes_received, transport = await request_blob(
bytes_received, protocol = await request_blob(
self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id,
connection_manager=self.blob_manager.connection_manager
self.config.blob_download_timeout, connected_protocol=self.connections.get(peer),
connection_id=connection_id, connection_manager=self.blob_manager.connection_manager
)
if not bytes_received and not transport and peer not in self.connection_failures:
if not bytes_received and not protocol and peer not in self.connection_failures:
self.connection_failures.append(peer)
if not transport and peer not in self.ignored:
if not protocol and peer not in self.ignored:
self.ignored[peer] = self.loop.time()
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
self.failures[peer] = self.failures.get(peer, 0) + 1
if peer in self.connections:
del self.connections[peer]
elif transport:
elif protocol:
log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
self.failures[peer] = 0
self.connections[peer] = transport
self.connections[peer] = protocol
elapsed = self.loop.time() - start
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
@ -120,8 +119,8 @@ class BlobDownloader:
self.scores.clear()
self.ignored.clear()
self.is_running.clear()
for transport in self.connections.values():
transport.close()
for protocol in self.connections.values():
protocol.close()
async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node',

View file

@ -208,8 +208,36 @@ class Node:
task.cancel()
async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue):
async def ping(_peer):
try:
await self.protocol.get_rpc_peer(_peer).ping()
result_queue.put_nowait([_peer])
except asyncio.TimeoutError:
log.info("blob peer timed out udp")
pass
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
result_queue.put_nowait(results)
to_put = []
for peer in results:
is_good = self.protocol.peer_manager.peer_is_good(peer)
if is_good:
to_put.append(peer)
elif is_good is None:
udp_port_to_try = 4444
if 3400 > peer.tcp_port > 3332:
if not peer.udp_port:
udp_port_to_try = (peer.tcp_port - 3333) + 4444
elif 4500 > peer.tcp_port > 4443:
if not peer.udp_port:
udp_port_to_try = peer.tcp_port
if not peer.udp_port:
log.info("guess %i for %s:%i", udp_port_to_try, peer.address, peer.tcp_port)
peer.update_udp_port(udp_port_to_try)
self.loop.create_task(ping(peer))
else:
log.debug("skip bad peer %s:%i for %s", blob_hash)
if to_put:
result_queue.put_nowait(to_put)
def accumulate_peers(self, search_queue: asyncio.Queue,
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[