do not keep trying when blob is being written
This commit is contained in:
parent
5c217f1544
commit
576da1bc30
2 changed files with 6 additions and 6 deletions
|
@ -185,9 +185,6 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s
|
||||||
Returns [<downloaded blob>, <keep connection>]
|
Returns [<downloaded blob>, <keep connection>]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if blob.get_is_verified() or blob.file_exists:
|
|
||||||
# file exists but not verified means someone is writing right now, give it time, come back later
|
|
||||||
return 0, connected_transport
|
|
||||||
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
|
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
|
||||||
if connected_transport and not connected_transport.is_closing():
|
if connected_transport and not connected_transport.is_closing():
|
||||||
connected_transport.set_protocol(protocol)
|
connected_transport.set_protocol(protocol)
|
||||||
|
@ -199,6 +196,9 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s
|
||||||
if not connected_transport:
|
if not connected_transport:
|
||||||
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
|
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
|
||||||
peer_connect_timeout, loop=loop)
|
peer_connect_timeout, loop=loop)
|
||||||
|
if blob.get_is_verified() or blob.file_exists:
|
||||||
|
# file exists but not verified means someone is writing right now, give it time, come back later
|
||||||
|
return 0, connected_transport
|
||||||
return await protocol.download_blob(blob)
|
return await protocol.download_blob(blob)
|
||||||
except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError):
|
except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError):
|
||||||
return 0, None
|
return 0, None
|
||||||
|
|
|
@ -26,7 +26,7 @@ class BlobDownloader:
|
||||||
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
|
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
|
||||||
self.rounds_won: typing.Dict['KademliaPeer', int] = {}
|
self.rounds_won: typing.Dict['KademliaPeer', int] = {}
|
||||||
|
|
||||||
def should_race_continue(self):
|
def should_race_continue(self, blob: 'BlobFile'):
|
||||||
if len(self.active_connections) >= self.config.max_connections_per_download:
|
if len(self.active_connections) >= self.config.max_connections_per_download:
|
||||||
return False
|
return False
|
||||||
# if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves
|
# if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves
|
||||||
|
@ -35,7 +35,7 @@ class BlobDownloader:
|
||||||
# for peer, task in self.active_connections.items():
|
# for peer, task in self.active_connections.items():
|
||||||
# if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done():
|
# if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done():
|
||||||
# return False
|
# return False
|
||||||
return True
|
return not (blob.get_is_verified() or blob.file_exists)
|
||||||
|
|
||||||
async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'):
|
async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'):
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
|
@ -91,7 +91,7 @@ class BlobDownloader:
|
||||||
len(batch), len(self.ignored), len(self.active_connections)
|
len(batch), len(self.ignored), len(self.active_connections)
|
||||||
)
|
)
|
||||||
for peer in batch:
|
for peer in batch:
|
||||||
if not self.should_race_continue():
|
if not self.should_race_continue(blob):
|
||||||
break
|
break
|
||||||
if peer not in self.active_connections and peer not in self.ignored:
|
if peer not in self.active_connections and peer not in self.ignored:
|
||||||
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
||||||
|
|
Loading…
Reference in a new issue