diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index fc972bc98..7ab36fd96 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -24,8 +24,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self._blob_bytes_received = 0 self._response_fut: asyncio.Future = None + self.buf = b'' def data_received(self, data: bytes): + log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received", + self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received) if not self.transport or self.transport.is_closing(): log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port, binascii.hexlify(data)) @@ -35,7 +38,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._blob_bytes_received and not self.writer.closed(): return self._write(data) - response = BlobResponse.deserialize(data) + response = BlobResponse.deserialize(self.buf + data) + if not response.responses and not self._response_fut.done(): + self.buf += data + return + else: + self.buf = b'' if response.responses and self.blob: blob_response = response.get_blob_response() @@ -135,6 +143,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self.transport: self.transport.close() self.transport = None + self.buf = b'' async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: if blob.get_is_verified() or blob.file_exists or blob.blob_write_lock.locked(): diff --git a/lbrynet/blob_exchange/server.py b/lbrynet/blob_exchange/server.py index 05614e1ce..2f47718a8 100644 --- a/lbrynet/blob_exchange/server.py +++ b/lbrynet/blob_exchange/server.py @@ -100,6 +100,7 @@ class BlobServer: self.server_task: asyncio.Task = None self.started_listening = asyncio.Event(loop=self.loop) self.lbrycrd_address = lbrycrd_address + self.server_protocol_class = BlobServerProtocol def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): if self.server_task is not None: @@ -107,7 +108,7 @@ class BlobServer: async def _start_server(): server = await self.loop.create_server( - lambda: BlobServerProtocol(self.loop, self.blob_manager, self.lbrycrd_address), + lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address), interface, port ) self.started_listening.set()