fix race when closing blob client protocol after getting a response
This commit is contained in:
parent
d7032b12d7
commit
a480b2d25f
1 changed files with 11 additions and 1 deletions
|
@ -24,9 +24,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
self.blob: typing.Optional['AbstractBlob'] = None
|
self.blob: typing.Optional['AbstractBlob'] = None
|
||||||
|
|
||||||
self._blob_bytes_received = 0
|
self._blob_bytes_received = 0
|
||||||
self._response_fut: asyncio.Future = None
|
self._response_fut: typing.Optional[asyncio.Future] = None
|
||||||
self.buf = b''
|
self.buf = b''
|
||||||
|
|
||||||
|
# this is here to handle the race when the downloader is closed right as response_fut gets a result
|
||||||
|
self.closed = asyncio.Event(loop=self.loop)
|
||||||
|
|
||||||
def data_received(self, data: bytes):
|
def data_received(self, data: bytes):
|
||||||
log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
|
log.debug("%s:%d -- got %s bytes -- %s bytes on buffer -- %s blob bytes received",
|
||||||
self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
|
self.peer_address, self.peer_port, len(data), len(self.buf), self._blob_bytes_received)
|
||||||
|
@ -90,6 +93,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
:return: download success (bool), keep connection (bool)
|
:return: download success (bool), keep connection (bool)
|
||||||
"""
|
"""
|
||||||
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
|
request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash)
|
||||||
|
blob_hash = self.blob.blob_hash
|
||||||
try:
|
try:
|
||||||
msg = request.serialize()
|
msg = request.serialize()
|
||||||
log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode())
|
log.debug("send request to %s:%i -> %s", self.peer_address, self.peer_port, msg.decode())
|
||||||
|
@ -98,6 +102,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
availability_response = response.get_availability_response()
|
availability_response = response.get_availability_response()
|
||||||
price_response = response.get_price_response()
|
price_response = response.get_price_response()
|
||||||
blob_response = response.get_blob_response()
|
blob_response = response.get_blob_response()
|
||||||
|
if self.closed.is_set():
|
||||||
|
msg = f"cancelled blob request for {blob_hash} immediately after we got a response"
|
||||||
|
log.warning(msg)
|
||||||
|
raise asyncio.CancelledError(msg)
|
||||||
if (not blob_response or blob_response.error) and\
|
if (not blob_response or blob_response.error) and\
|
||||||
(not availability_response or not availability_response.available_blobs):
|
(not availability_response or not availability_response.available_blobs):
|
||||||
log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address,
|
log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address,
|
||||||
|
@ -136,6 +144,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
return self._blob_bytes_received, self.close()
|
return self._blob_bytes_received, self.close()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
self.closed.set()
|
||||||
if self._response_fut and not self._response_fut.done():
|
if self._response_fut and not self._response_fut.done():
|
||||||
self._response_fut.cancel()
|
self._response_fut.cancel()
|
||||||
if self.writer and not self.writer.closed():
|
if self.writer and not self.writer.closed():
|
||||||
|
@ -149,6 +158,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
self.buf = b''
|
self.buf = b''
|
||||||
|
|
||||||
async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
|
async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
|
||||||
|
self.closed.clear()
|
||||||
blob_hash = blob.blob_hash
|
blob_hash = blob.blob_hash
|
||||||
if blob.get_is_verified() or not blob.is_writeable():
|
if blob.get_is_verified() or not blob.is_writeable():
|
||||||
return 0, self.transport
|
return 0, self.transport
|
||||||
|
|
Loading…
Add table
Reference in a new issue