cleanup downloader code
This commit is contained in:
parent
e43b29fcd1
commit
609cf42868
1 changed files with 10 additions and 32 deletions
|
@ -31,12 +31,6 @@ class BlobDownloader:
|
||||||
def should_race_continue(self, blob: 'AbstractBlob'):
|
def should_race_continue(self, blob: 'AbstractBlob'):
|
||||||
if len(self.active_connections) >= self.config.max_connections_per_download:
|
if len(self.active_connections) >= self.config.max_connections_per_download:
|
||||||
return False
|
return False
|
||||||
# if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves
|
|
||||||
# the safe net side is that any failure will reset the peer score, triggering the race back
|
|
||||||
# TODO: this is a good idea for low bandwidth, but doesnt play nice on high bandwidth
|
|
||||||
# for peer, task in self.active_connections.items():
|
|
||||||
# if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done():
|
|
||||||
# return False
|
|
||||||
return not (blob.get_is_verified() or not blob.is_writeable())
|
return not (blob.get_is_verified() or not blob.is_writeable())
|
||||||
|
|
||||||
async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer'):
|
async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer'):
|
||||||
|
@ -70,14 +64,14 @@ class BlobDownloader:
|
||||||
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
||||||
for peer in to_remove:
|
for peer in to_remove:
|
||||||
del self.active_connections[peer]
|
del self.active_connections[peer]
|
||||||
|
self.clearbanned()
|
||||||
|
|
||||||
def clearbanned(self):
|
def clearbanned(self):
|
||||||
now = self.loop.time()
|
now = self.loop.time()
|
||||||
timeout_for = lambda peer: min(30.0, (self.failures.get(peer, 0) ** self.BAN_FACTOR) - 1.0)
|
self.ignored = dict((
|
||||||
forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > timeout_for(banned_peer)]
|
(peer, when) for (peer, when) in self.ignored.items()
|
||||||
self.peer_queue.put_nowait(forgiven)
|
if (now - when) < min(30.0, (self.failures.get(peer, 0) ** self.BAN_FACTOR))
|
||||||
for banned_peer in forgiven:
|
))
|
||||||
self.ignored.pop(banned_peer)
|
|
||||||
|
|
||||||
@cache_concurrent
|
@cache_concurrent
|
||||||
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'AbstractBlob':
|
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'AbstractBlob':
|
||||||
|
@ -86,15 +80,16 @@ class BlobDownloader:
|
||||||
return blob
|
return blob
|
||||||
try:
|
try:
|
||||||
while not blob.get_is_verified():
|
while not blob.get_is_verified():
|
||||||
batch: typing.List['KademliaPeer'] = []
|
batch: typing.Set['KademliaPeer'] = set()
|
||||||
while not self.peer_queue.empty():
|
while not self.peer_queue.empty():
|
||||||
batch.extend(self.peer_queue.get_nowait())
|
batch.update(self.peer_queue.get_nowait())
|
||||||
batch.sort(key=lambda p: self.scores.get(p, 0), reverse=True)
|
if batch:
|
||||||
|
self.peer_queue.put_nowait(list(batch))
|
||||||
log.debug(
|
log.debug(
|
||||||
"running, %d peers, %d ignored, %d active",
|
"running, %d peers, %d ignored, %d active",
|
||||||
len(batch), len(self.ignored), len(self.active_connections)
|
len(batch), len(self.ignored), len(self.active_connections)
|
||||||
)
|
)
|
||||||
for peer in batch:
|
for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True):
|
||||||
if not self.should_race_continue(blob):
|
if not self.should_race_continue(blob):
|
||||||
break
|
break
|
||||||
if peer not in self.active_connections and peer not in self.ignored:
|
if peer not in self.active_connections and peer not in self.ignored:
|
||||||
|
@ -103,26 +98,9 @@ class BlobDownloader:
|
||||||
self.active_connections[peer] = t
|
self.active_connections[peer] = t
|
||||||
await self.new_peer_or_finished()
|
await self.new_peer_or_finished()
|
||||||
self.cleanup_active()
|
self.cleanup_active()
|
||||||
if batch:
|
|
||||||
to_re_add = list(set(batch).difference(self.ignored))
|
|
||||||
if to_re_add:
|
|
||||||
self.peer_queue.put_nowait(to_re_add)
|
|
||||||
else:
|
|
||||||
self.clearbanned()
|
|
||||||
else:
|
|
||||||
self.clearbanned()
|
|
||||||
blob.close()
|
|
||||||
log.debug("downloaded %s", blob_hash[:8])
|
log.debug("downloaded %s", blob_hash[:8])
|
||||||
return blob
|
return blob
|
||||||
finally:
|
finally:
|
||||||
re_add = set()
|
|
||||||
while self.active_connections:
|
|
||||||
peer, t = self.active_connections.popitem()
|
|
||||||
t.cancel()
|
|
||||||
re_add.add(peer)
|
|
||||||
re_add = re_add.difference(self.ignored)
|
|
||||||
if re_add:
|
|
||||||
self.peer_queue.put_nowait(list(re_add))
|
|
||||||
blob.close()
|
blob.close()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
|
Loading…
Reference in a new issue