diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index 3d937e84e..b3fa6b946 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -1,5 +1,6 @@ import asyncio import typing +from collections import deque from lbry.dht import constants if typing.TYPE_CHECKING: @@ -15,6 +16,7 @@ class DictDataStore: self.loop = loop self._peer_manager = peer_manager self.completed_blobs: typing.Set[str] = set() + self.requested_blobs: typing.Deque = deque(maxlen=10) def __len__(self): return self._data_store.__len__() diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 66165740b..0717e5826 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -95,6 +95,7 @@ class KademliaRPC: for peer in self.protocol.data_store.get_peers_for_blob(key) if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] + self.protocol.data_store.requested_blobs.append(key.hex()) # if we don't have k storing peers to return and we have this hash locally, include our contact information if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 8162322e1..4f3692519 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -4,6 +4,8 @@ import asyncio import logging import binascii import typing +from collections import deque + import base58 from aioupnp import __version__ as aioupnp_version @@ -385,21 +387,26 @@ class BackgroundDownloader(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.status = {'pending': 0, 'ongoing': 0} self.task: typing.Optional[asyncio.Task] = None self.download_loop_delay_seconds = 60 self.finished_iteration = asyncio.Event() + self.requested_blobs = deque(maxlen=10) @property def component(self) -> 'BackgroundDownloader': return self async def get_status(self): - self.status['running'] = self.task is not None and not self.task.done() - return self.status + return {'running': self.task is not None and not self.task.done(), 'enqueued': len(self.requested_blobs)} async def loop(self): while True: + if self.component_manager.has_component(DHT_COMPONENT): + node = self.component_manager.get_component(DHT_COMPONENT) + self.requested_blobs = node.protocol.data_store.requested_blobs + if self.requested_blobs: + blob_hash = self.requested_blobs.pop() + await self.download_blobs(blob_hash) self.finished_iteration.set() self.finished_iteration.clear() await asyncio.sleep(self.download_loop_delay_seconds) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 3d0ad6166..5c2ee3e0b 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -618,3 +618,12 @@ class TestProactiveDownloaderComponent(CommandTestCase): await self.clear() await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) + + # trigger from requested blobs + await self.clear() + await proactive_downloader.stop() + proactive_downloader.requested_blobs.append(content1) + finished = proactive_downloader.finished_iteration.wait() + await proactive_downloader.start() + await finished + await self.assertBlobs(content1)