From 2d7eb835180159f4239f2401c8014df8f79cd472 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 8 Feb 2019 01:04:38 -0300 Subject: [PATCH] change score calculation, wait for active peers too, simplify peer sorting/keeping --- lbrynet/blob_exchange/downloader.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 5b9467742..9f6b5ab99 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -27,6 +27,7 @@ class BlobDownloader: async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): if blob.get_is_verified(): return + self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones success, keep_connection = await request_blob( self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout @@ -36,22 +37,18 @@ class BlobDownloader: log.debug("drop peer %s:%i", peer.address, peer.tcp_port) elif keep_connection: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) - if success: - self.scores[peer] = self.scores.get(peer, 0) + 2 - else: - self.scores[peer] = self.scores.get(peer, 0) - 1 + self.scores[peer] = self.scores.get(peer, 0) + 2 if success else 0 async def new_peer_or_finished(self, blob: 'BlobFile'): async def get_and_re_add_peers(): new_peers = await self.peer_queue.get() self.peer_queue.put_nowait(new_peers) tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())] + active_tasks = list(self.active_connections.values()) try: - done, pending = await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED') - drain_tasks(pending) - except asyncio.CancelledError: + await asyncio.wait(tasks + active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') + finally: drain_tasks(tasks) - raise def cleanup_active(self): to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] @@ -67,6 +64,7 @@ class BlobDownloader: batch: typing.List['KademliaPeer'] = [] while not self.peer_queue.empty(): batch.extend(self.peer_queue.get_nowait()) + batch.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) log.debug( "running, %d peers, %d ignored, %d active", len(batch), len(self.ignored), len(self.active_connections) @@ -78,15 +76,10 @@ class BlobDownloader: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) self.active_connections[peer] = t - if self.active_connections: - await asyncio.wait(self.active_connections.values(), return_when='FIRST_COMPLETED') - self.cleanup_active() - else: - await self.new_peer_or_finished(blob) - to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) - to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) - if to_re_add: - self.peer_queue.put_nowait(to_re_add) + await self.new_peer_or_finished(blob) + self.cleanup_active() + if batch: + self.peer_queue.put_nowait(set(batch).difference(self.ignored)) while self.active_connections: peer, task = self.active_connections.popitem() if task and not task.done():