fix chunked response parsing

This commit is contained in:
Victor Shyba 2019-02-12 03:08:34 -03:00 committed by Lex Berezhny
parent 1193d2cea4
commit 0a068011d8
2 changed files with 12 additions and 2 deletions

View file

@ -24,8 +24,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self._blob_bytes_received = 0
self._response_fut: asyncio.Future = None
self.buf = b''
def data_received(self, data: bytes):
log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
if not self.transport or self.transport.is_closing():
log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port,
binascii.hexlify(data))
@ -35,7 +38,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
if self._blob_bytes_received and not self.writer.closed():
return self._write(data)
response = BlobResponse.deserialize(data)
response = BlobResponse.deserialize(self.buf + data)
if not response.responses and not self._response_fut.done():
self.buf += data
return
else:
self.buf = b''
if response.responses and self.blob:
blob_response = response.get_blob_response()
@ -135,6 +143,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
if self.transport:
self.transport.close()
self.transport = None
self.buf = b''
async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
if blob.get_is_verified() or blob.file_exists or blob.blob_write_lock.locked():

View file

@ -100,6 +100,7 @@ class BlobServer:
self.server_task: asyncio.Task = None
self.started_listening = asyncio.Event(loop=self.loop)
self.lbrycrd_address = lbrycrd_address
self.server_protocol_class = BlobServerProtocol
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
if self.server_task is not None:
@ -107,7 +108,7 @@ class BlobServer:
async def _start_server():
server = await self.loop.create_server(
lambda: BlobServerProtocol(self.loop, self.blob_manager, self.lbrycrd_address),
lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address),
interface, port
)
self.started_listening.set()