do not wait blob to be written, log errors from races

This commit is contained in:
Victor Shyba 2019-02-08 04:12:53 -03:00
parent e2b06677b5
commit 8b25acff6e
2 changed files with 8 additions and 3 deletions

View file

@ -116,7 +116,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
log.info(msg)
await self.blob.finished_writing.wait()
# await self.blob.finished_writing.wait() not necessary, but a dangerous change. TODO: is it needed?
return self._blob_bytes_received, self.transport
except asyncio.TimeoutError:
return self._blob_bytes_received, self.close()
@ -143,9 +143,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0
self._response_fut = asyncio.Future(loop=self.loop)
return await self._download_blob()
except OSError:
except OSError as e:
log.error("race happened downloading from %s:%i", self.peer_address, self.peer_port)
# i'm not sure how to fix this race condition - jack
log.exception(e)
return self._blob_bytes_received, self.transport
except asyncio.TimeoutError:
if self._response_fut and not self._response_fut.done():
@ -182,6 +183,7 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s
if connected_transport and not connected_transport.is_closing():
connected_transport.set_protocol(protocol)
protocol.connection_made(connected_transport)
log.debug("reusing connection for %s:%d", address, tcp_port)
else:
connected_transport = None
try:

View file

@ -70,6 +70,7 @@ class TestBlobExchange(BlobExchangeTestBase):
# download the blob
downloaded = await request_blob(self.loop, client_blob, self.server_from_client.address,
self.server_from_client.tcp_port, 2, 3)
await client_blob.finished_writing.wait()
self.assertEqual(client_blob.get_is_verified(), True)
self.assertTrue(downloaded)
@ -111,6 +112,7 @@ class TestBlobExchange(BlobExchangeTestBase):
),
self._test_transfer_blob(blob_hash)
)
await second_client_blob.finished_writing.wait()
self.assertEqual(second_client_blob.get_is_verified(), True)
async def test_host_different_blobs_to_multiple_peers_at_once(self):
@ -140,7 +142,8 @@ class TestBlobExchange(BlobExchangeTestBase):
self.loop, second_client_blob, server_from_second_client.address,
server_from_second_client.tcp_port, 2, 3
),
self._test_transfer_blob(sd_hash)
self._test_transfer_blob(sd_hash),
second_client_blob.finished_writing.wait()
)
self.assertEqual(second_client_blob.get_is_verified(), True)