From a480b2d25fd92713a6120b2beffea739dedb7f56 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 May 2019 12:21:11 -0400 Subject: [PATCH] fix race when closing blob client protocol after getting a response --- lbrynet/blob_exchange/client.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 65af4f4cc..1b4c46f01 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -24,9 +24,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.blob: typing.Optional['AbstractBlob'] = None self._blob_bytes_received = 0 - self._response_fut: asyncio.Future = None + self._response_fut: typing.Optional[asyncio.Future] = None self.buf = b'' + # this is here to handle the race when the downloader is closed right as response_fut gets a result + self.closed = asyncio.Event(loop=self.loop) + def data_received(self, data: bytes): log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received", self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received) @@ -90,6 +93,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): :return: download success (bool), keep connection (bool) """ request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash) + blob_hash = self.blob.blob_hash try: msg = request.serialize() log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode()) @@ -98,6 +102,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol): availability_response = response.get_availability_response() price_response = response.get_price_response() blob_response = response.get_blob_response() + if self.closed.is_set(): + msg = f"cancelled blob request for {blob_hash} immediately after we got a response" + log.warning(msg) + raise asyncio.CancelledError(msg) if (not blob_response or blob_response.error) and\ (not availability_response or not availability_response.available_blobs): log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address, @@ -136,6 +144,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return self._blob_bytes_received, self.close() def close(self): + self.closed.set() if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() if self.writer and not self.writer.closed(): @@ -149,6 +158,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.buf = b'' async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + self.closed.clear() blob_hash = blob.blob_hash if blob.get_is_verified() or not blob.is_writeable(): return 0, self.transport