From 78b8261a3a1216cccf775950cf92a354acd7fb71 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 May 2020 16:05:13 -0400 Subject: [PATCH] cancel pending reflector request when connection is lost -add 180s timeout --- lbry/stream/reflector/client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lbry/stream/reflector/client.py b/lbry/stream/reflector/client.py index 7a8032c99..fbc084b9b 100644 --- a/lbry/stream/reflector/client.py +++ b/lbry/stream/reflector/client.py @@ -35,6 +35,8 @@ class StreamReflectorClient(asyncio.Protocol): def connection_lost(self, exc: typing.Optional[Exception]): self.transport = None self.connected.clear() + if self.pending_request: + self.pending_request.cancel() if self.reflected_blobs: log.info("Finished sending reflector %i blobs", len(self.reflected_blobs)) @@ -56,11 +58,11 @@ class StreamReflectorClient(asyncio.Protocol): self.response_buff = b'' return - async def send_request(self, request_dict: typing.Dict): + async def send_request(self, request_dict: typing.Dict, timeout: int = 180): msg = json.dumps(request_dict) self.transport.write(msg.encode()) try: - self.pending_request = self.loop.create_task(self.response_queue.get()) + self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout)) return await self.pending_request finally: self.pending_request = None @@ -87,7 +89,7 @@ class StreamReflectorClient(asyncio.Protocol): sent_sd = False if response['send_sd_blob']: await sd_blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_sd_blob'): sent_sd = True if not needed: @@ -111,7 +113,7 @@ class StreamReflectorClient(asyncio.Protocol): raise ValueError("I don't know whether to send the blob or not!") if response['send_blob']: await blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_blob'): self.reflected_blobs.append(blob.blob_hash) log.info("Sent reflector blob %s", blob.blob_hash[:8])