From 1ff914a6f46921ebe27aa3c8708cbe38537f469a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 3 Nov 2021 11:43:42 -0300 Subject: [PATCH] download from stored announcements and dont reannounce --- lbry/dht/node.py | 5 ++--- lbry/dht/protocol/data_store.py | 4 +++- lbry/dht/protocol/protocol.py | 1 - lbry/extras/daemon/components.py | 8 +++++--- lbry/stream/background_downloader.py | 3 ++- tests/integration/datanetwork/test_file_commands.py | 5 +---- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 4bfb3d478..300c1a774 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -32,9 +32,8 @@ class Node: self._storage = storage @property - def last_requested_blob_hash(self): - if len(self.protocol.data_store.requested_blobs) > 0: - return self.protocol.data_store.requested_blobs[-1] + def stored_blob_hashes(self): + return self.protocol.data_store.keys() async def refresh_node(self, force_once=False): while True: diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index b3fa6b946..a5434aaa2 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -16,7 +16,9 @@ class DictDataStore: self.loop = loop self._peer_manager = peer_manager self.completed_blobs: typing.Set[str] = set() - self.requested_blobs: typing.Deque = deque(maxlen=10) + + def keys(self): + return self._data_store.keys() def __len__(self): return self._data_store.__len__() diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 0717e5826..66165740b 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -95,7 +95,6 @@ class KademliaRPC: for peer in self.protocol.data_store.get_peers_for_blob(key) if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] - self.protocol.data_store.requested_blobs.append(key.hex()) # if we don't have k storing peers to return and we have this hash locally, include our contact information if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 9ab9388a9..f771cd2f3 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -390,6 +390,7 @@ class BackgroundDownloaderComponent(Component): self.download_loop_delay_seconds = 60 self.ongoing_download: typing.Optional[asyncio.Task] = None self.space_manager: typing.Optional[DiskSpaceManager] = None + self.blob_manager: typing.Optional[BlobManager] = None self.background_downloader: typing.Optional[BackgroundDownloader] = None self.dht_node: typing.Optional[Node] = None @@ -409,7 +410,8 @@ class BackgroundDownloaderComponent(Component): async def loop(self): while True: if not self.is_busy and await self.space_manager.get_free_space_mb(True) > 10: - blob_hash = self.dht_node.last_requested_blob_hash + blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if + key.hex() not in self.blob_manager.completed_blob_hashes), None) if blob_hash: self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash)) await asyncio.sleep(self.download_loop_delay_seconds) @@ -419,9 +421,9 @@ class BackgroundDownloaderComponent(Component): if not self.component_manager.has_component(DHT_COMPONENT): return self.dht_node = self.component_manager.get_component(DHT_COMPONENT) - blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + self.blob_manager = self.component_manager.get_component(BLOB_COMPONENT) storage = self.component_manager.get_component(DATABASE_COMPONENT) - self.background_downloader = BackgroundDownloader(self.conf, storage, blob_manager, self.dht_node) + self.background_downloader = BackgroundDownloader(self.conf, storage, self.blob_manager, self.dht_node) self.task = asyncio.create_task(self.loop()) async def stop(self): diff --git a/lbry/stream/background_downloader.py b/lbry/stream/background_downloader.py index d02a7c9c0..045a47824 100644 --- a/lbry/stream/background_downloader.py +++ b/lbry/stream/background_downloader.py @@ -18,4 +18,5 @@ class BackgroundDownloader: return for blob_info in downloader.descriptor.blobs[:-1]: await downloader.download_stream_blob(blob_info) - await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) + # for now, announcing is unnecessary because the blobs we have were announced to us, se they will be queried + # await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 23cb8f1ef..9443893f7 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -589,13 +589,10 @@ class TestBackgroundDownloaderComponent(CommandTestCase): async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) - to_announce = await self.daemon.storage.get_blobs_to_announce() for sd_hash in sd_hashes: - self.assertIn(sd_hash, to_announce) sd_blob = self.daemon.blob_manager.get_blob(sd_hash) self.assertTrue(sd_blob.get_is_verified()) blobs = await self.get_blobs_from_sd_blob(sd_blob) - self.assertIn(blobs[0].blob_hash, to_announce) for blob in blobs[:-1]: self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) @@ -609,7 +606,7 @@ class TestBackgroundDownloaderComponent(CommandTestCase): await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) - async def test_ensure_download(self): + async def test_download(self): content1 = await self.stream_create('content1', '0.01', data=bytes([0] * 32 * 1024 * 1024)) content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', data=bytes([0] * 16 * 1024 * 1024))