From a30e2daf0590f6b8f487eabf702468e2cf588ec0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 7 Jun 2019 13:54:31 -0400 Subject: [PATCH] handle chunked responses from reflector server --- lbrynet/stream/reflector/client.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/lbrynet/stream/reflector/client.py b/lbrynet/stream/reflector/client.py index bd00ed412..364011058 100644 --- a/lbrynet/stream/reflector/client.py +++ b/lbrynet/stream/reflector/client.py @@ -10,13 +10,15 @@ if typing.TYPE_CHECKING: REFLECTOR_V1 = 0 REFLECTOR_V2 = 1 +MAX_RESPONSE_SIZE = 2000000 + log = logging.getLogger(__name__) class StreamReflectorClient(asyncio.Protocol): def __init__(self, blob_manager: 'BlobManager', descriptor: 'StreamDescriptor'): self.loop = asyncio.get_event_loop() - self.transport: asyncio.StreamWriter = None + self.transport: typing.Optional[asyncio.WriteTransport] = None self.blob_manager = blob_manager self.descriptor = descriptor self.response_buff = b'' @@ -37,11 +39,21 @@ class StreamReflectorClient(asyncio.Protocol): log.info("Finished sending reflector %i blobs", len(self.reflected_blobs)) def data_received(self, data): + if len(self.response_buff + (data or b'')) > MAX_RESPONSE_SIZE: + log.warning("response message to large from reflector server: %i bytes", + len(self.response_buff + (data or b''))) + self.response_buff = b'' + self.transport.close() + return + self.response_buff += (data or b'') try: - response = json.loads(data.decode()) + response = json.loads(self.response_buff.decode()) + self.response_buff = b'' self.response_queue.put_nowait(response) except ValueError: - self.transport.close() + if not data: + log.warning("got undecodable response from reflector server") + self.response_buff = b'' return async def send_request(self, request_dict: typing.Dict):