download from stored announcements and dont reannounce
This commit is contained in:
parent
5959b1be72
commit
1ff914a6f4
6 changed files with 13 additions and 13 deletions
|
@ -32,9 +32,8 @@ class Node:
|
||||||
self._storage = storage
|
self._storage = storage
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def last_requested_blob_hash(self):
|
def stored_blob_hashes(self):
|
||||||
if len(self.protocol.data_store.requested_blobs) > 0:
|
return self.protocol.data_store.keys()
|
||||||
return self.protocol.data_store.requested_blobs[-1]
|
|
||||||
|
|
||||||
async def refresh_node(self, force_once=False):
|
async def refresh_node(self, force_once=False):
|
||||||
while True:
|
while True:
|
||||||
|
|
|
@ -16,7 +16,9 @@ class DictDataStore:
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self._peer_manager = peer_manager
|
self._peer_manager = peer_manager
|
||||||
self.completed_blobs: typing.Set[str] = set()
|
self.completed_blobs: typing.Set[str] = set()
|
||||||
self.requested_blobs: typing.Deque = deque(maxlen=10)
|
|
||||||
|
def keys(self):
|
||||||
|
return self._data_store.keys()
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
return self._data_store.__len__()
|
return self._data_store.__len__()
|
||||||
|
|
|
@ -95,7 +95,6 @@ class KademliaRPC:
|
||||||
for peer in self.protocol.data_store.get_peers_for_blob(key)
|
for peer in self.protocol.data_store.get_peers_for_blob(key)
|
||||||
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
|
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
|
||||||
]
|
]
|
||||||
self.protocol.data_store.requested_blobs.append(key.hex())
|
|
||||||
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
||||||
if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs:
|
if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs:
|
||||||
peers.append(self.compact_address())
|
peers.append(self.compact_address())
|
||||||
|
|
|
@ -390,6 +390,7 @@ class BackgroundDownloaderComponent(Component):
|
||||||
self.download_loop_delay_seconds = 60
|
self.download_loop_delay_seconds = 60
|
||||||
self.ongoing_download: typing.Optional[asyncio.Task] = None
|
self.ongoing_download: typing.Optional[asyncio.Task] = None
|
||||||
self.space_manager: typing.Optional[DiskSpaceManager] = None
|
self.space_manager: typing.Optional[DiskSpaceManager] = None
|
||||||
|
self.blob_manager: typing.Optional[BlobManager] = None
|
||||||
self.background_downloader: typing.Optional[BackgroundDownloader] = None
|
self.background_downloader: typing.Optional[BackgroundDownloader] = None
|
||||||
self.dht_node: typing.Optional[Node] = None
|
self.dht_node: typing.Optional[Node] = None
|
||||||
|
|
||||||
|
@ -409,7 +410,8 @@ class BackgroundDownloaderComponent(Component):
|
||||||
async def loop(self):
|
async def loop(self):
|
||||||
while True:
|
while True:
|
||||||
if not self.is_busy and await self.space_manager.get_free_space_mb(True) > 10:
|
if not self.is_busy and await self.space_manager.get_free_space_mb(True) > 10:
|
||||||
blob_hash = self.dht_node.last_requested_blob_hash
|
blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if
|
||||||
|
key.hex() not in self.blob_manager.completed_blob_hashes), None)
|
||||||
if blob_hash:
|
if blob_hash:
|
||||||
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
|
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
|
||||||
await asyncio.sleep(self.download_loop_delay_seconds)
|
await asyncio.sleep(self.download_loop_delay_seconds)
|
||||||
|
@ -419,9 +421,9 @@ class BackgroundDownloaderComponent(Component):
|
||||||
if not self.component_manager.has_component(DHT_COMPONENT):
|
if not self.component_manager.has_component(DHT_COMPONENT):
|
||||||
return
|
return
|
||||||
self.dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
self.dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
self.blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||||
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||||
self.background_downloader = BackgroundDownloader(self.conf, storage, blob_manager, self.dht_node)
|
self.background_downloader = BackgroundDownloader(self.conf, storage, self.blob_manager, self.dht_node)
|
||||||
self.task = asyncio.create_task(self.loop())
|
self.task = asyncio.create_task(self.loop())
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
|
|
@ -18,4 +18,5 @@ class BackgroundDownloader:
|
||||||
return
|
return
|
||||||
for blob_info in downloader.descriptor.blobs[:-1]:
|
for blob_info in downloader.descriptor.blobs[:-1]:
|
||||||
await downloader.download_stream_blob(blob_info)
|
await downloader.download_stream_blob(blob_info)
|
||||||
await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash)
|
# for now, announcing is unnecessary because the blobs we have were announced to us, se they will be queried
|
||||||
|
# await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash)
|
||||||
|
|
|
@ -589,13 +589,10 @@ class TestBackgroundDownloaderComponent(CommandTestCase):
|
||||||
async def assertBlobs(self, *sd_hashes, no_files=True):
|
async def assertBlobs(self, *sd_hashes, no_files=True):
|
||||||
# checks that we have ony the finished blobs needed for the the referenced streams
|
# checks that we have ony the finished blobs needed for the the referenced streams
|
||||||
seen = set(sd_hashes)
|
seen = set(sd_hashes)
|
||||||
to_announce = await self.daemon.storage.get_blobs_to_announce()
|
|
||||||
for sd_hash in sd_hashes:
|
for sd_hash in sd_hashes:
|
||||||
self.assertIn(sd_hash, to_announce)
|
|
||||||
sd_blob = self.daemon.blob_manager.get_blob(sd_hash)
|
sd_blob = self.daemon.blob_manager.get_blob(sd_hash)
|
||||||
self.assertTrue(sd_blob.get_is_verified())
|
self.assertTrue(sd_blob.get_is_verified())
|
||||||
blobs = await self.get_blobs_from_sd_blob(sd_blob)
|
blobs = await self.get_blobs_from_sd_blob(sd_blob)
|
||||||
self.assertIn(blobs[0].blob_hash, to_announce)
|
|
||||||
for blob in blobs[:-1]:
|
for blob in blobs[:-1]:
|
||||||
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
||||||
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
||||||
|
@ -609,7 +606,7 @@ class TestBackgroundDownloaderComponent(CommandTestCase):
|
||||||
await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True)
|
await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True)
|
||||||
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
||||||
|
|
||||||
async def test_ensure_download(self):
|
async def test_download(self):
|
||||||
content1 = await self.stream_create('content1', '0.01', data=bytes([0] * 32 * 1024 * 1024))
|
content1 = await self.stream_create('content1', '0.01', data=bytes([0] * 32 * 1024 * 1024))
|
||||||
content1 = content1['outputs'][0]['value']['source']['sd_hash']
|
content1 = content1['outputs'][0]['value']['source']['sd_hash']
|
||||||
content2 = await self.stream_create('content2', '0.01', data=bytes([0] * 16 * 1024 * 1024))
|
content2 = await self.stream_create('content2', '0.01', data=bytes([0] * 16 * 1024 * 1024))
|
||||||
|
|
Loading…
Add table
Reference in a new issue