fix uncaught errors from broken reflector connections

This commit is contained in:
Jack Robison 2020-07-15 16:40:47 -04:00
parent b3b5e3d8f0
commit c2f5f84118
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 8 additions and 1 deletions

View file

@ -60,10 +60,16 @@ class StreamReflectorClient(asyncio.Protocol):
async def send_request(self, request_dict: typing.Dict, timeout: int = 180):
msg = json.dumps(request_dict)
self.transport.write(msg.encode())
try:
self.transport.write(msg.encode())
self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
return await self.pending_request
except (AttributeError, asyncio.CancelledError):
# attribute error happens when we transport.write after disconnect
# cancelled error happens when the pending_request task is cancelled by a disconnect
if self.transport:
self.transport.close()
raise asyncio.TimeoutError()
finally:
self.pending_request = None

View file

@ -34,6 +34,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
self.not_incoming = not_incoming_event or asyncio.Event(loop=self.loop)
self.stop_event = stop_event or asyncio.Event(loop=self.loop)
self.chunk_size = response_chunk_size
self.wait_for_stop_task: typing.Optional[asyncio.Task] = None
async def wait_for_stop(self):
await self.stop_event.wait()