diff --git a/lbry/stream/reflector/client.py b/lbry/stream/reflector/client.py index fbc084b9b..07544733e 100644 --- a/lbry/stream/reflector/client.py +++ b/lbry/stream/reflector/client.py @@ -60,10 +60,16 @@ class StreamReflectorClient(asyncio.Protocol): async def send_request(self, request_dict: typing.Dict, timeout: int = 180): msg = json.dumps(request_dict) - self.transport.write(msg.encode()) try: + self.transport.write(msg.encode()) self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout)) return await self.pending_request + except (AttributeError, asyncio.CancelledError): + # attribute error happens when we transport.write after disconnect + # cancelled error happens when the pending_request task is cancelled by a disconnect + if self.transport: + self.transport.close() + raise asyncio.TimeoutError() finally: self.pending_request = None diff --git a/lbry/stream/reflector/server.py b/lbry/stream/reflector/server.py index da11e5ec5..2ca76ca59 100644 --- a/lbry/stream/reflector/server.py +++ b/lbry/stream/reflector/server.py @@ -34,6 +34,7 @@ class ReflectorServerProtocol(asyncio.Protocol): self.not_incoming = not_incoming_event or asyncio.Event(loop=self.loop) self.stop_event = stop_event or asyncio.Event(loop=self.loop) self.chunk_size = response_chunk_size + self.wait_for_stop_task: typing.Optional[asyncio.Task] = None async def wait_for_stop(self): await self.stop_event.wait()