diff --git a/lbry/stream/reflector/client.py b/lbry/stream/reflector/client.py index 7a8032c99..fbc084b9b 100644 --- a/lbry/stream/reflector/client.py +++ b/lbry/stream/reflector/client.py @@ -35,6 +35,8 @@ class StreamReflectorClient(asyncio.Protocol): def connection_lost(self, exc: typing.Optional[Exception]): self.transport = None self.connected.clear() + if self.pending_request: + self.pending_request.cancel() if self.reflected_blobs: log.info("Finished sending reflector %i blobs", len(self.reflected_blobs)) @@ -56,11 +58,11 @@ class StreamReflectorClient(asyncio.Protocol): self.response_buff = b'' return - async def send_request(self, request_dict: typing.Dict): + async def send_request(self, request_dict: typing.Dict, timeout: int = 180): msg = json.dumps(request_dict) self.transport.write(msg.encode()) try: - self.pending_request = self.loop.create_task(self.response_queue.get()) + self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout)) return await self.pending_request finally: self.pending_request = None @@ -87,7 +89,7 @@ class StreamReflectorClient(asyncio.Protocol): sent_sd = False if response['send_sd_blob']: await sd_blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_sd_blob'): sent_sd = True if not needed: @@ -111,7 +113,7 @@ class StreamReflectorClient(asyncio.Protocol): raise ValueError("I don't know whether to send the blob or not!") if response['send_blob']: await blob.sendfile(self) - received = await self.response_queue.get() + received = await asyncio.wait_for(self.response_queue.get(), 30) if received.get('received_blob'): self.reflected_blobs.append(blob.blob_hash) log.info("Sent reflector blob %s", blob.blob_hash[:8])