handle chunked responses from reflector server

This commit is contained in:
Jack Robison 2019-06-07 13:54:31 -04:00
parent adb7edd3b2
commit a30e2daf05
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -10,13 +10,15 @@ if typing.TYPE_CHECKING:
REFLECTOR_V1 = 0 REFLECTOR_V1 = 0
REFLECTOR_V2 = 1 REFLECTOR_V2 = 1
MAX_RESPONSE_SIZE = 2000000
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class StreamReflectorClient(asyncio.Protocol): class StreamReflectorClient(asyncio.Protocol):
def __init__(self, blob_manager: 'BlobManager', descriptor: 'StreamDescriptor'): def __init__(self, blob_manager: 'BlobManager', descriptor: 'StreamDescriptor'):
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.transport: asyncio.StreamWriter = None self.transport: typing.Optional[asyncio.WriteTransport] = None
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.descriptor = descriptor self.descriptor = descriptor
self.response_buff = b'' self.response_buff = b''
@ -37,11 +39,21 @@ class StreamReflectorClient(asyncio.Protocol):
log.info("Finished sending reflector %i blobs", len(self.reflected_blobs)) log.info("Finished sending reflector %i blobs", len(self.reflected_blobs))
def data_received(self, data): def data_received(self, data):
if len(self.response_buff + (data or b'')) > MAX_RESPONSE_SIZE:
log.warning("response message to large from reflector server: %i bytes",
len(self.response_buff + (data or b'')))
self.response_buff = b''
self.transport.close()
return
self.response_buff += (data or b'')
try: try:
response = json.loads(data.decode()) response = json.loads(self.response_buff.decode())
self.response_buff = b''
self.response_queue.put_nowait(response) self.response_queue.put_nowait(response)
except ValueError: except ValueError:
self.transport.close() if not data:
log.warning("got undecodable response from reflector server")
self.response_buff = b''
return return
async def send_request(self, request_dict: typing.Dict): async def send_request(self, request_dict: typing.Dict):