forked from LBRYCommunity/lbry-sdk
change score calculation, wait for active peers too, simplify peer sorting/keeping
This commit is contained in:
parent
ad03f91d24
commit
2d7eb83518
1 changed files with 10 additions and 17 deletions
|
@ -27,6 +27,7 @@ class BlobDownloader:
|
||||||
async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'):
|
async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'):
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
return
|
return
|
||||||
|
self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones
|
||||||
success, keep_connection = await request_blob(
|
success, keep_connection = await request_blob(
|
||||||
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
||||||
self.config.blob_download_timeout
|
self.config.blob_download_timeout
|
||||||
|
@ -36,22 +37,18 @@ class BlobDownloader:
|
||||||
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
||||||
elif keep_connection:
|
elif keep_connection:
|
||||||
log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
|
log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
|
||||||
if success:
|
self.scores[peer] = self.scores.get(peer, 0) + 2 if success else 0
|
||||||
self.scores[peer] = self.scores.get(peer, 0) + 2
|
|
||||||
else:
|
|
||||||
self.scores[peer] = self.scores.get(peer, 0) - 1
|
|
||||||
|
|
||||||
async def new_peer_or_finished(self, blob: 'BlobFile'):
|
async def new_peer_or_finished(self, blob: 'BlobFile'):
|
||||||
async def get_and_re_add_peers():
|
async def get_and_re_add_peers():
|
||||||
new_peers = await self.peer_queue.get()
|
new_peers = await self.peer_queue.get()
|
||||||
self.peer_queue.put_nowait(new_peers)
|
self.peer_queue.put_nowait(new_peers)
|
||||||
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
||||||
|
active_tasks = list(self.active_connections.values())
|
||||||
try:
|
try:
|
||||||
done, pending = await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
await asyncio.wait(tasks + active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
||||||
drain_tasks(pending)
|
finally:
|
||||||
except asyncio.CancelledError:
|
|
||||||
drain_tasks(tasks)
|
drain_tasks(tasks)
|
||||||
raise
|
|
||||||
|
|
||||||
def cleanup_active(self):
|
def cleanup_active(self):
|
||||||
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
||||||
|
@ -67,6 +64,7 @@ class BlobDownloader:
|
||||||
batch: typing.List['KademliaPeer'] = []
|
batch: typing.List['KademliaPeer'] = []
|
||||||
while not self.peer_queue.empty():
|
while not self.peer_queue.empty():
|
||||||
batch.extend(self.peer_queue.get_nowait())
|
batch.extend(self.peer_queue.get_nowait())
|
||||||
|
batch.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True)
|
||||||
log.debug(
|
log.debug(
|
||||||
"running, %d peers, %d ignored, %d active",
|
"running, %d peers, %d ignored, %d active",
|
||||||
len(batch), len(self.ignored), len(self.active_connections)
|
len(batch), len(self.ignored), len(self.active_connections)
|
||||||
|
@ -78,15 +76,10 @@ class BlobDownloader:
|
||||||
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
||||||
t = self.loop.create_task(self.request_blob_from_peer(blob, peer))
|
t = self.loop.create_task(self.request_blob_from_peer(blob, peer))
|
||||||
self.active_connections[peer] = t
|
self.active_connections[peer] = t
|
||||||
if self.active_connections:
|
|
||||||
await asyncio.wait(self.active_connections.values(), return_when='FIRST_COMPLETED')
|
|
||||||
self.cleanup_active()
|
|
||||||
else:
|
|
||||||
await self.new_peer_or_finished(blob)
|
await self.new_peer_or_finished(blob)
|
||||||
to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch)))
|
self.cleanup_active()
|
||||||
to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True)
|
if batch:
|
||||||
if to_re_add:
|
self.peer_queue.put_nowait(set(batch).difference(self.ignored))
|
||||||
self.peer_queue.put_nowait(to_re_add)
|
|
||||||
while self.active_connections:
|
while self.active_connections:
|
||||||
peer, task = self.active_connections.popitem()
|
peer, task = self.active_connections.popitem()
|
||||||
if task and not task.done():
|
if task and not task.done():
|
||||||
|
|
Loading…
Reference in a new issue