forked from LBRYCommunity/lbry-sdk
drop channel support, prepare to hook into DHT
This commit is contained in:
parent
02b2103d94
commit
abbd8473bb
2 changed files with 19 additions and 47 deletions
|
@ -381,7 +381,7 @@ class FileManagerComponent(Component):
|
|||
|
||||
class BackgroundDownloader(Component):
|
||||
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
||||
depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT]
|
||||
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
|
@ -399,35 +399,11 @@ class BackgroundDownloader(Component):
|
|||
return self.status
|
||||
|
||||
async def loop(self):
|
||||
db: SQLiteStorage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||
while True:
|
||||
for channel_id, download_latest, download_all in await db.get_subscriptions():
|
||||
amount = 1_000_000 if download_all else download_latest
|
||||
if not amount:
|
||||
continue
|
||||
await self.ensure_download(channel_id, amount)
|
||||
self.finished_iteration.set()
|
||||
self.finished_iteration.clear()
|
||||
await asyncio.sleep(self.download_loop_delay_seconds)
|
||||
|
||||
async def ensure_download(self, channel_id, amount):
|
||||
wallet = self.component_manager.get_component(WALLET_COMPONENT)
|
||||
ledger = wallet.ledger
|
||||
claims, _, _, _ = await ledger.claim_search(
|
||||
ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'])
|
||||
offset = 0
|
||||
while claims and amount > 0:
|
||||
for claim in claims:
|
||||
offset += 1
|
||||
if not claim.script.source or claim.has_price:
|
||||
continue
|
||||
await self.download_blobs(claim.claim.stream.source.sd_hash)
|
||||
amount -= 1
|
||||
if amount == 0:
|
||||
break
|
||||
claims, _, _, _ = await ledger.claim_search(
|
||||
ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset)
|
||||
|
||||
async def download_blobs(self, sd_hash):
|
||||
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
|
||||
|
|
|
@ -574,7 +574,7 @@ class DiskSpaceManagement(CommandTestCase):
|
|||
|
||||
|
||||
class TestProactiveDownloaderComponent(CommandTestCase):
|
||||
async def assertBlobs(self, *sd_hashes):
|
||||
async def assertBlobs(self, *sd_hashes, no_files=True):
|
||||
# checks that we have ony the finished blobs needed for the the referenced streams
|
||||
seen = set(sd_hashes)
|
||||
for sd_hash in sd_hashes:
|
||||
|
@ -586,39 +586,35 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
|||
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
||||
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
||||
self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes)
|
||||
if no_files:
|
||||
self.assertEqual(0, len(await self.file_list()))
|
||||
|
||||
async def test_ensure_download(self):
|
||||
unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01'))
|
||||
channel_id = self.get_claim_id(await self.channel_create('@cool'))
|
||||
content1 = await self.stream_create('content1', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23)))
|
||||
content1 = content1['outputs'][0]['value']['source']['sd_hash']
|
||||
content2 = await self.stream_create('content2', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23)))
|
||||
content2 = content2['outputs'][0]['value']['source']['sd_hash']
|
||||
await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD')
|
||||
await self.stream_repost(unrelated_claim_id, 'repost')
|
||||
async def clear(self):
|
||||
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
||||
self.assertEqual(0, len(await self.file_list()))
|
||||
await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True)
|
||||
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
||||
|
||||
async def test_ensure_download(self):
|
||||
content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 23)))
|
||||
content1 = content1['outputs'][0]['value']['source']['sd_hash']
|
||||
content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23)))
|
||||
content2 = content2['outputs'][0]['value']['source']['sd_hash']
|
||||
|
||||
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
||||
await proactive_downloader.ensure_download(channel_id, 1)
|
||||
await self.clear()
|
||||
await proactive_downloader.download_blobs(content1)
|
||||
await self.assertBlobs(content1)
|
||||
await proactive_downloader.ensure_download(channel_id, 2)
|
||||
await self.assertBlobs(content1, content2)
|
||||
# ignores paid content
|
||||
await proactive_downloader.ensure_download(channel_id, 3)
|
||||
await self.assertBlobs(content1, content2)
|
||||
# ignores reposts
|
||||
await proactive_downloader.ensure_download(channel_id, 4)
|
||||
await proactive_downloader.download_blobs(content2)
|
||||
await self.assertBlobs(content1, content2)
|
||||
await self.clear()
|
||||
await proactive_downloader.download_blobs(content2)
|
||||
await self.assertBlobs(content2)
|
||||
|
||||
# tests that an attempt to download something that isn't a sd blob will download the single blob and stop
|
||||
blobs = await self.daemon.storage.get_blobs_for_stream(
|
||||
await self.daemon.storage.get_stream_hash_for_sd_hash(content1)
|
||||
)
|
||||
await self.daemon.jsonrpc_file_delete(delete_all=True)
|
||||
self.assertEqual(0, len(await self.file_list()))
|
||||
await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True)
|
||||
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
||||
await self.clear()
|
||||
await proactive_downloader.download_blobs(blobs[0].blob_hash)
|
||||
self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)
|
||||
|
|
Loading…
Add table
Reference in a new issue