From 22db04068e0cb13d8ca5fcd0a2c6fca4e0f7c49c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 11 Aug 2019 22:29:04 -0400 Subject: [PATCH] add server side timeouts for idle blob exchange connections and slow transfers --- lbry/lbry/blob_exchange/server.py | 73 ++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 20 deletions(-) diff --git a/lbry/lbry/blob_exchange/server.py b/lbry/lbry/blob_exchange/server.py index 0dc6036c7..691ea98eb 100644 --- a/lbry/lbry/blob_exchange/server.py +++ b/lbry/lbry/blob_exchange/server.py @@ -17,15 +17,32 @@ MAX_REQUEST_SIZE = 1200 class BlobServerProtocol(asyncio.Protocol): - def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str): + def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str, + idle_timeout: float = 30.0, transfer_timeout: float = 60.0): self.loop = loop self.blob_manager = blob_manager - self.server_task: asyncio.Task = None + self.idle_timeout = idle_timeout + self.transfer_timeout = transfer_timeout + self.server_task: typing.Optional[asyncio.Task] = None self.started_listening = asyncio.Event(loop=self.loop) self.buf = b'' - self.transport = None + self.transport: typing.Optional[asyncio.Transport] = None self.lbrycrd_address = lbrycrd_address self.peer_address_and_port: typing.Optional[str] = None + self.started_transfer = asyncio.Event(loop=self.loop) + self.transfer_finished = asyncio.Event(loop=self.loop) + self.close_on_idle_task: typing.Optional[asyncio.Task] = None + + async def close_on_idle(self): + while self.transport: + try: + await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout, loop=self.loop) + except asyncio.TimeoutError: + log.debug("closing idle connection from %s", self.peer_address_and_port) + return self.close() + self.started_transfer.clear() + await self.transfer_finished.wait() + self.transfer_finished.clear() def close(self): if self.transport: @@ -33,11 +50,18 @@ class BlobServerProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport + self.close_on_idle_task = self.loop.create_task(self.close_on_idle()) self.peer_address_and_port = "%s:%i" % self.transport.get_extra_info('peername') self.blob_manager.connection_manager.connection_received(self.peer_address_and_port) + log.debug("received connection from %s", self.peer_address_and_port) def connection_lost(self, exc: typing.Optional[Exception]) -> None: + log.debug("lost connection from %s", self.peer_address_and_port) self.blob_manager.connection_manager.incoming_connection_lost(self.peer_address_and_port) + self.transport = None + if self.close_on_idle_task and not self.close_on_idle_task.done(): + self.close_on_idle_task.cancel() + self.close_on_idle_task = None def send_response(self, responses: typing.List[blob_response_types]): to_send = [] @@ -72,17 +96,23 @@ class BlobServerProtocol(asyncio.Protocol): incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length} responses.append(BlobDownloadResponse(incoming_blob=incoming_blob)) self.send_response(responses) - log.debug("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port) + bh = blob.blob_hash[:8] + log.debug("send %s to %s:%i", bh, peer_address, peer_port) + self.started_transfer.set() try: - sent = await blob.sendfile(self) - except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError): - if self.transport: - self.transport.close() - return - log.info("sent %s (%i bytes) to %s:%i", blob.blob_hash[:8], sent, peer_address, peer_port) + sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop) + self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) + log.debug("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port) + except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, asyncio.TimeoutError) as err: + if isinstance(err, asyncio.TimeoutError): + log.debug("timed out sending blob %s to %s", bh, peer_address) + else: + log.debug("stopped sending %s to %s:%i", bh, peer_address, peer_port) + self.close() + finally: + self.transfer_finished.set() if responses: self.send_response(responses) - # self.transport.close() def data_received(self, data): request = None @@ -100,26 +130,28 @@ class BlobServerProtocol(asyncio.Protocol): request = BlobRequest.deserialize(self.buf + data) self.buf = remainder except JSONDecodeError: - addr = self.transport.get_extra_info('peername') - peer_address, peer_port = addr - log.error("failed to decode blob request from %s:%i (%i bytes): %s", peer_address, peer_port, - len(data), '' if not data else binascii.hexlify(data).decode()) + log.error("request from %s is not valid json (%i bytes): %s", self.peer_address_and_port, + len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode()) + self.transport.close() + return if not request: - addr = self.transport.get_extra_info('peername') - peer_address, peer_port = addr - log.warning("failed to decode blob request from %s:%i", peer_address, peer_port) + log.error("failed to decode request from %s (%i bytes): %s", self.peer_address_and_port, + len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode()) self.transport.close() return self.loop.create_task(self.handle_request(request)) class BlobServer: - def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str): + def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str, + idle_timeout: float = 30.0, transfer_timeout: float = 60.0): self.loop = loop self.blob_manager = blob_manager self.server_task: typing.Optional[asyncio.Task] = None self.started_listening = asyncio.Event(loop=self.loop) self.lbrycrd_address = lbrycrd_address + self.idle_timeout = idle_timeout + self.transfer_timeout = transfer_timeout self.server_protocol_class = BlobServerProtocol def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): @@ -128,7 +160,8 @@ class BlobServer: async def _start_server(): server = await self.loop.create_server( - lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address), + lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address, + self.idle_timeout, self.transfer_timeout), interface, port ) self.started_listening.set()