cleanup and fix prober

This commit is contained in:
Victor Shyba 2019-07-24 22:32:20 -03:00
parent 5a7212aebb
commit 2ac4883bda
2 changed files with 12 additions and 14 deletions

View file

@ -242,6 +242,7 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['Abstr
if not connected_transport:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout, loop=loop)
connected_transport = protocol.transport
if blob is None or blob.get_is_verified() or not blob.is_writeable():
# blob is None happens when we are just opening a connection
# file exists but not verified means someone is writing right now, give it time, come back later

View file

@ -30,7 +30,8 @@ class BlobDownloader:
self.is_running = asyncio.Event(loop=self.loop)
def should_race_continue(self, blob: 'AbstractBlob'):
if len(self.active_connections) >= self.config.max_connections_per_download:
max_probes = self.config.max_connections_per_download * (1 if self.connections else 10)
if len(self.active_connections) >= max_probes:
return False
return not (blob.get_is_verified() or not blob.is_writeable())
@ -92,22 +93,18 @@ class BlobDownloader:
if batch:
self.peer_queue.put_nowait(list(batch))
log.debug(
"running, %d peers, %d ignored, %d active",
len(batch), len(self.ignored), len(self.active_connections)
"running, %d peers, %d ignored, %d active, %s connections",
len(batch), len(self.ignored), len(self.active_connections), len(self.connections)
)
for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True):
just_probe = False
if peer in self.ignored or peer in self.active_connections:
continue
if not self.should_race_continue(blob):
if len(self.connections) >= self.config.max_connections_per_download * 10:
if peer in self.connections:
continue
just_probe = True
else:
break
if peer not in self.active_connections and peer not in self.ignored:
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe))
self.active_connections[peer] = t
break
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
just_probe = len(self.connections) == 0
t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe))
self.active_connections[peer] = t
await self.new_peer_or_finished()
self.cleanup_active()
log.debug("downloaded %s", blob_hash[:8])