cancel pending reflector request when connection is lost

-add 180s timeout
This commit is contained in:
Jack Robison 2020-05-11 16:05:13 -04:00
parent f20ca70c01
commit 78b8261a3a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -35,6 +35,8 @@ class StreamReflectorClient(asyncio.Protocol):
def connection_lost(self, exc: typing.Optional[Exception]): def connection_lost(self, exc: typing.Optional[Exception]):
self.transport = None self.transport = None
self.connected.clear() self.connected.clear()
if self.pending_request:
self.pending_request.cancel()
if self.reflected_blobs: if self.reflected_blobs:
log.info("Finished sending reflector %i blobs", len(self.reflected_blobs)) log.info("Finished sending reflector %i blobs", len(self.reflected_blobs))
@ -56,11 +58,11 @@ class StreamReflectorClient(asyncio.Protocol):
self.response_buff = b'' self.response_buff = b''
return return
async def send_request(self, request_dict: typing.Dict): async def send_request(self, request_dict: typing.Dict, timeout: int = 180):
msg = json.dumps(request_dict) msg = json.dumps(request_dict)
self.transport.write(msg.encode()) self.transport.write(msg.encode())
try: try:
self.pending_request = self.loop.create_task(self.response_queue.get()) self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
return await self.pending_request return await self.pending_request
finally: finally:
self.pending_request = None self.pending_request = None
@ -87,7 +89,7 @@ class StreamReflectorClient(asyncio.Protocol):
sent_sd = False sent_sd = False
if response['send_sd_blob']: if response['send_sd_blob']:
await sd_blob.sendfile(self) await sd_blob.sendfile(self)
received = await self.response_queue.get() received = await asyncio.wait_for(self.response_queue.get(), 30)
if received.get('received_sd_blob'): if received.get('received_sd_blob'):
sent_sd = True sent_sd = True
if not needed: if not needed:
@ -111,7 +113,7 @@ class StreamReflectorClient(asyncio.Protocol):
raise ValueError("I don't know whether to send the blob or not!") raise ValueError("I don't know whether to send the blob or not!")
if response['send_blob']: if response['send_blob']:
await blob.sendfile(self) await blob.sendfile(self)
received = await self.response_queue.get() received = await asyncio.wait_for(self.response_queue.get(), 30)
if received.get('received_blob'): if received.get('received_blob'):
self.reflected_blobs.append(blob.blob_hash) self.reflected_blobs.append(blob.blob_hash)
log.info("Sent reflector blob %s", blob.blob_hash[:8]) log.info("Sent reflector blob %s", blob.blob_hash[:8])