forked from LBRYCommunity/lbry-sdk
wait on connection tasks
This commit is contained in:
parent
43ac928f0b
commit
a616582733
1 changed files with 20 additions and 12 deletions
|
@ -13,11 +13,6 @@ if typing.TYPE_CHECKING:
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def drain_into(a: list, b: list):
|
|
||||||
while a:
|
|
||||||
b.append(a.pop())
|
|
||||||
|
|
||||||
|
|
||||||
class BlobDownloader:
|
class BlobDownloader:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
|
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
|
||||||
peer_queue: asyncio.Queue):
|
peer_queue: asyncio.Queue):
|
||||||
|
@ -52,11 +47,17 @@ class BlobDownloader:
|
||||||
self.peer_queue.put_nowait(new_peers)
|
self.peer_queue.put_nowait(new_peers)
|
||||||
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
||||||
try:
|
try:
|
||||||
await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
done, pending = await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
||||||
|
drain_tasks(pending)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
drain_tasks(tasks)
|
drain_tasks(tasks)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def cleanup_active(self):
|
||||||
|
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
||||||
|
for peer in to_remove:
|
||||||
|
del self.active_connections[peer]
|
||||||
|
|
||||||
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||||
blob = self.blob_manager.get_blob(blob_hash, length)
|
blob = self.blob_manager.get_blob(blob_hash, length)
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
|
@ -65,7 +66,11 @@ class BlobDownloader:
|
||||||
while not blob.get_is_verified():
|
while not blob.get_is_verified():
|
||||||
batch: typing.List['KademliaPeer'] = []
|
batch: typing.List['KademliaPeer'] = []
|
||||||
while not self.peer_queue.empty():
|
while not self.peer_queue.empty():
|
||||||
batch.extend(await self.peer_queue.get())
|
batch.extend(self.peer_queue.get_nowait())
|
||||||
|
log.debug(
|
||||||
|
"running, %d peers, %d ignored, %d active",
|
||||||
|
len(batch), len(self.ignored), len(self.active_connections)
|
||||||
|
)
|
||||||
for peer in batch:
|
for peer in batch:
|
||||||
if len(self.active_connections) >= self.config.max_connections_per_download:
|
if len(self.active_connections) >= self.config.max_connections_per_download:
|
||||||
break
|
break
|
||||||
|
@ -73,10 +78,10 @@ class BlobDownloader:
|
||||||
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
||||||
t = self.loop.create_task(self.request_blob_from_peer(blob, peer))
|
t = self.loop.create_task(self.request_blob_from_peer(blob, peer))
|
||||||
self.active_connections[peer] = t
|
self.active_connections[peer] = t
|
||||||
t.add_done_callback(
|
if self.active_connections:
|
||||||
lambda _:
|
await asyncio.wait(self.active_connections.values(), return_when='FIRST_COMPLETED')
|
||||||
None if peer not in self.active_connections else self.active_connections.pop(peer)
|
self.cleanup_active()
|
||||||
)
|
else:
|
||||||
await self.new_peer_or_finished(blob)
|
await self.new_peer_or_finished(blob)
|
||||||
to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch)))
|
to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch)))
|
||||||
to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True)
|
to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True)
|
||||||
|
@ -95,6 +100,9 @@ class BlobDownloader:
|
||||||
if task and not task.done():
|
if task and not task.done():
|
||||||
task.cancel()
|
task.cancel()
|
||||||
raise
|
raise
|
||||||
|
except (OSError, Exception) as e:
|
||||||
|
log.exception(e)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node',
|
async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node',
|
||||||
|
|
Loading…
Reference in a new issue