forked from LBRYCommunity/lbry-sdk
download only blobs
This commit is contained in:
parent
32a7d1a4a0
commit
9d3c401abb
2 changed files with 40 additions and 21 deletions
|
@ -17,7 +17,7 @@ from lbry.dht.blob_announcer import BlobAnnouncer
|
||||||
from lbry.blob.blob_manager import BlobManager
|
from lbry.blob.blob_manager import BlobManager
|
||||||
from lbry.blob.disk_space_manager import DiskSpaceManager
|
from lbry.blob.disk_space_manager import DiskSpaceManager
|
||||||
from lbry.blob_exchange.server import BlobServer
|
from lbry.blob_exchange.server import BlobServer
|
||||||
from lbry.stream.managed_stream import ManagedStream
|
from lbry.stream.downloader import StreamDownloader
|
||||||
from lbry.stream.stream_manager import StreamManager
|
from lbry.stream.stream_manager import StreamManager
|
||||||
from lbry.file.file_manager import FileManager
|
from lbry.file.file_manager import FileManager
|
||||||
from lbry.extras.daemon.component import Component
|
from lbry.extras.daemon.component import Component
|
||||||
|
@ -381,7 +381,7 @@ class FileManagerComponent(Component):
|
||||||
|
|
||||||
class BackgroundDownloader(Component):
|
class BackgroundDownloader(Component):
|
||||||
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
||||||
depends_on = [FILE_MANAGER_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
|
depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT]
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
|
@ -411,7 +411,6 @@ class BackgroundDownloader(Component):
|
||||||
await asyncio.sleep(self.download_loop_delay_seconds)
|
await asyncio.sleep(self.download_loop_delay_seconds)
|
||||||
|
|
||||||
async def ensure_download(self, channel_id, amount):
|
async def ensure_download(self, channel_id, amount):
|
||||||
file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
|
|
||||||
wallet = self.component_manager.get_component(WALLET_COMPONENT)
|
wallet = self.component_manager.get_component(WALLET_COMPONENT)
|
||||||
ledger = wallet.ledger
|
ledger = wallet.ledger
|
||||||
claims, _, _, _ = await ledger.claim_search(
|
claims, _, _, _ = await ledger.claim_search(
|
||||||
|
@ -422,17 +421,24 @@ class BackgroundDownloader(Component):
|
||||||
offset += 1
|
offset += 1
|
||||||
if not claim.script.source or claim.has_price:
|
if not claim.script.source or claim.has_price:
|
||||||
continue
|
continue
|
||||||
stream = await file_manager.download_from_uri(
|
await self.download_blobs(claim.claim.stream.source.sd_hash)
|
||||||
claim.permanent_url, None, 60.0, save_file=False, wallet=wallet
|
|
||||||
)
|
|
||||||
if isinstance(stream, ManagedStream):
|
|
||||||
await stream.save_blobs()
|
|
||||||
amount -= 1
|
amount -= 1
|
||||||
if amount == 0:
|
if amount == 0:
|
||||||
break
|
break
|
||||||
claims, _, _, _ = await ledger.claim_search(
|
claims, _, _, _ = await ledger.claim_search(
|
||||||
ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset)
|
ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset)
|
||||||
|
|
||||||
|
async def download_blobs(self, sd_hash):
|
||||||
|
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||||
|
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
|
||||||
|
node = None
|
||||||
|
if self.component_manager.has_component(DHT_COMPONENT):
|
||||||
|
node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
|
await downloader.start(node)
|
||||||
|
await downloader.load_descriptor()
|
||||||
|
for blob_info in downloader.descriptor.blobs[:-1]:
|
||||||
|
await downloader.download_stream_blob(blob_info)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self.task = asyncio.create_task(self.loop())
|
self.task = asyncio.create_task(self.loop())
|
||||||
|
|
||||||
|
|
|
@ -574,50 +574,63 @@ class DiskSpaceManagement(CommandTestCase):
|
||||||
|
|
||||||
|
|
||||||
class TestProactiveDownloaderComponent(CommandTestCase):
|
class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
async def assertFileList(self, *txos):
|
async def assertBlobs(self, *sd_hashes):
|
||||||
txos_names = {txo['outputs'][0]['name'] for txo in txos}
|
# checks that we have ony the finished blobs needed for the the referenced streams
|
||||||
files = await self.file_list(blobs_remaining=0)
|
seen = set(sd_hashes)
|
||||||
self.assertEqual(len(txos), len(files))
|
for sd_hash in sd_hashes:
|
||||||
file_claim_names = {file['claim_name'] for file in files}
|
self.assertTrue(self.daemon.blob_manager.get_blob(sd_hash).get_is_verified())
|
||||||
self.assertSetEqual(txos_names, file_claim_names)
|
blobs = await self.daemon.storage.get_blobs_for_stream(
|
||||||
|
await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
|
)
|
||||||
|
for blob in blobs[:-1]:
|
||||||
|
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
||||||
|
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
||||||
|
self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes)
|
||||||
|
|
||||||
async def test_ensure_download(self):
|
async def test_ensure_download(self):
|
||||||
unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01'))
|
unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01'))
|
||||||
channel_id = self.get_claim_id(await self.channel_create('@cool'))
|
channel_id = self.get_claim_id(await self.channel_create('@cool'))
|
||||||
content1 = await self.stream_create('content1', '0.01', channel_id=channel_id)
|
content1 = await self.stream_create('content1', '0.01', channel_id=channel_id)
|
||||||
|
content1 = content1['outputs'][0]['value']['source']['sd_hash']
|
||||||
content2 = await self.stream_create('content2', '0.01', channel_id=channel_id)
|
content2 = await self.stream_create('content2', '0.01', channel_id=channel_id)
|
||||||
|
content2 = content2['outputs'][0]['value']['source']['sd_hash']
|
||||||
await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD')
|
await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD')
|
||||||
await self.stream_repost(unrelated_claim_id, 'repost')
|
await self.stream_repost(unrelated_claim_id, 'repost')
|
||||||
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
||||||
self.assertEqual(0, len(await self.file_list()))
|
self.assertEqual(0, len(await self.file_list()))
|
||||||
|
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
||||||
|
|
||||||
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
||||||
await self.assertFileList()
|
|
||||||
await proactive_downloader.ensure_download(channel_id, 1)
|
await proactive_downloader.ensure_download(channel_id, 1)
|
||||||
await self.assertFileList(content1)
|
await self.assertBlobs(content1)
|
||||||
await proactive_downloader.ensure_download(channel_id, 2)
|
await proactive_downloader.ensure_download(channel_id, 2)
|
||||||
await self.assertFileList(content1, content2)
|
await self.assertBlobs(content1, content2)
|
||||||
# ignores paid content
|
# ignores paid content
|
||||||
await proactive_downloader.ensure_download(channel_id, 3)
|
await proactive_downloader.ensure_download(channel_id, 3)
|
||||||
await self.assertFileList(content1, content2)
|
await self.assertBlobs(content1, content2)
|
||||||
# ignores reposts
|
# ignores reposts
|
||||||
await proactive_downloader.ensure_download(channel_id, 4)
|
await proactive_downloader.ensure_download(channel_id, 4)
|
||||||
await self.assertFileList(content1, content2)
|
await self.assertBlobs(content1, content2)
|
||||||
|
|
||||||
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
||||||
self.assertEqual(0, len(await self.file_list()))
|
self.assertEqual(0, len(await self.file_list()))
|
||||||
|
await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True)
|
||||||
|
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
||||||
|
|
||||||
await proactive_downloader.stop()
|
await proactive_downloader.stop()
|
||||||
await self.daemon.jsonrpc_channel_subscribe(channel_id, 1)
|
await self.daemon.jsonrpc_channel_subscribe(channel_id, 1)
|
||||||
await proactive_downloader.start()
|
await proactive_downloader.start()
|
||||||
await proactive_downloader.finished_iteration.wait()
|
await proactive_downloader.finished_iteration.wait()
|
||||||
await self.assertFileList(content1)
|
await self.assertBlobs(content1)
|
||||||
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
||||||
|
|
||||||
await self.daemon.jsonrpc_channel_subscribe(channel_id, download_all=True)
|
await self.daemon.jsonrpc_channel_subscribe(channel_id, download_all=True)
|
||||||
await proactive_downloader.stop()
|
await proactive_downloader.stop()
|
||||||
await proactive_downloader.start()
|
await proactive_downloader.start()
|
||||||
await proactive_downloader.finished_iteration.wait()
|
await proactive_downloader.finished_iteration.wait()
|
||||||
await self.assertFileList(content1, content2)
|
await self.assertBlobs(content1, content2)
|
||||||
|
|
||||||
|
self.assertEqual(0, len(await self.file_list()))
|
||||||
|
|
||||||
self.assertEqual([(channel_id, 0, 1)], await self.daemon.jsonrpc_channel_subscription_list())
|
self.assertEqual([(channel_id, 0, 1)], await self.daemon.jsonrpc_channel_subscription_list())
|
||||||
await self.daemon.jsonrpc_channel_unsubscribe(channel_id)
|
await self.daemon.jsonrpc_channel_unsubscribe(channel_id)
|
||||||
|
|
Loading…
Reference in a new issue