From fcbe8cf00b4a43fe39b58b375d71afab06a08706 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Oct 2021 03:09:38 -0300 Subject: [PATCH] create downloader component and initial tests --- lbry/extras/daemon/components.py | 58 +++++++++++++++++++ lbry/extras/daemon/daemon.py | 40 +++++++++++++ lbry/extras/daemon/storage.py | 19 ++++++ .../datanetwork/test_file_commands.py | 34 ++++++++++- 4 files changed, 150 insertions(+), 1 deletion(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 8e32cf6c1..a5b23d743 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -42,6 +42,7 @@ DHT_COMPONENT = "dht" HASH_ANNOUNCER_COMPONENT = "hash_announcer" FILE_MANAGER_COMPONENT = "file_manager" DISK_SPACE_COMPONENT = "disk_space" +BACKGROUND_DOWNLOADER_COMPONENT = "background_downloader" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" @@ -377,6 +378,63 @@ class FileManagerComponent(Component): self.file_manager.stop() +class BackgroundDownloader(Component): + component_name = BACKGROUND_DOWNLOADER_COMPONENT + depends_on = [FILE_MANAGER_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] + + def __init__(self, component_manager): + super().__init__(component_manager) + self.status = {'pending': 0, 'ongoing': 0} + self.task: typing.Optional[asyncio.Task] = None + self.download_loop_delay_seconds = 60 + + @property + def component(self) -> 'BackgroundDownloader': + return self + + async def get_status(self): + self.status['running'] = self.task is not None and not self.task.done() + return self.status + + async def loop(self): + return + db: SQLiteStorage = self.component_manager.get_component(DATABASE_COMPONENT) + while True: + for channel_id, download_latest, download_all in await db.get_subscriptions(): + amount = 1_000_000 if download_all else download_latest + if not amount: + continue + await self.ensure_download(channel_id, amount) + await asyncio.sleep(self.download_loop_delay_seconds) + + async def ensure_download(self, channel_id, amount): + file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) + wallet = self.component_manager.get_component(WALLET_COMPONENT) + ledger = wallet.ledger + claims, _, _, _ = await ledger.claim_search( + ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height']) + page = 0 + while claims and amount > 0: + for claim in claims: + if not claim.script.source or claim.has_price: + continue + stream = await file_manager.download_from_uri( + claim.permanent_url, None, 60.0, save_file=False, wallet=wallet + ) + amount -= 1 + if amount == 0: + break + page += 1 + claims, _, _, _ = await ledger.claim_search( + ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], page=page) + + async def start(self): + self.task = asyncio.create_task(self.loop()) + + async def stop(self): + self.task.cancel() + + class DiskSpaceComponent(Component): component_name = DISK_SPACE_COMPONENT depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT] diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 89e56dd8f..c9060eaa7 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -3030,6 +3030,46 @@ class Daemon(metaclass=JSONRPCServerType): } return base58.b58encode(json.dumps(export, separators=(',', ':'))) + @requires(WALLET_COMPONENT) + def jsonrpc_channel_subscribe(self, channel_id, download_latest=None, download_all=False): + """ + Subscribe to a channel and optionally start downloading streams proactively. + + Usage: + channel_subscribe ( | --channel_id=) [--download_latest=] + [--download_all] + + Options: + --channel_id= : (str) claim id of channel to subscribe. + --download_latest= : (int) amount of newest streams to ensure download. + --download_all : (bool) download all streams from the channel. + + Returns: + (bool) Subscription successful? (False only if channel doesn't exist) + """ + if download_all and download_latest is not None: + raise ConflictingInputValueError("Please set either download_latest or download_all, not both.") + return self.storage.add_subscription(channel_id, download_latest, download_all) + + @requires(WALLET_COMPONENT) + def jsonrpc_channel_unsubscribe(self, channel_id): + """ + Subscribe to a channel and optionally start downloading streams proactively. + + Usage: + channel_subscribe ( | --channel_id=) [--download=] + + Options: + --channel_id= : (str) claim id of channel to subscribe + --download= : (str) which strategy to use for downloads: 'all' for everything. + 'latest-X' for the latest X streams. None (default) for nothing. + + Returns: + (bool) Subscription successful? (False only if channel doesn't exist) + """ + return self.storage.remove_subscription(channel_id) + + @requires(WALLET_COMPONENT) async def jsonrpc_channel_import(self, channel_data, wallet_id=None): """ diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 758c25970..6fc25313f 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -235,6 +235,12 @@ class SQLiteStorage(SQLiteMixin): pragma foreign_keys=on; pragma journal_mode=WAL; + create table if not exists subscription ( + channel_id char(40) primary key not null, + download_latest integer not null default 0, + download_all integer not null default 0 + ); + create table if not exists blob ( blob_hash char(96) primary key not null, blob_length integer not null, @@ -539,6 +545,19 @@ class SQLiteStorage(SQLiteMixin): async def delete_torrent(self, bt_infohash: str): return await self.db.run(delete_torrent, bt_infohash) + # # # # # # # # # subscriptions # # # # # # # # # + + def add_subscription(self, channel_id, download_latest=None, download_all=None): + return self.db.execute_fetchall( + "insert or replace into subscription(channel_id, download_latest, download_all) values (?, ?, ?)", + (channel_id, download_latest or 0, 1 if download_all else 0)) + + def remove_subscription(self, channel_id): + return self.db.execute_fetchall("delete from subscriptions where channel_id=?", (channel_id,)) + + def get_subscriptions(self): + return self.db.execute_fetchall("select channel_id, download_latest, download_all from subscription") + # # # # # # # # # file stuff # # # # # # # # # def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str], diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 8edd2cc6a..424bb0f2b 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -5,7 +5,7 @@ from binascii import hexlify from lbry.schema import Claim from lbry.testcase import CommandTestCase -from lbry.extras.daemon.components import TorrentSession +from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT from lbry.wallet import Transaction @@ -571,3 +571,35 @@ class DiskSpaceManagement(CommandTestCase): self.assertTrue(blobs2.issubset(blobs)) self.assertFalse(blobs3.issubset(blobs)) self.assertTrue(blobs4.issubset(blobs)) + + +class TestProactiveDownloaderComponent(CommandTestCase): + async def assertFileList(self, *txos): + txos_names = {txo['outputs'][0]['name'] for txo in txos} + files = await self.file_list() + self.assertEqual(len(txos), len(files)) + file_claim_names = {file['claim_name'] for file in files} + self.assertSetEqual(txos_names, file_claim_names) + + async def test_ensure_download(self): + unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01')) + channel_id = self.get_claim_id(await self.channel_create('@cool')) + content1 = await self.stream_create('content1', '0.01', channel_id=channel_id) + content2 = await self.stream_create('content2', '0.01', channel_id=channel_id) + await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD') + await self.stream_repost(unrelated_claim_id, 'repost') + await self.daemon.jsonrpc_file_delete(delete_all=True) + self.assertEqual(0, len(await self.file_list())) + + proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) + await self.assertFileList() + await proactive_downloader.ensure_download(channel_id, 1) + await self.assertFileList(content1) + await proactive_downloader.ensure_download(channel_id, 2) + await self.assertFileList(content1, content2) + # ignores paid content + await proactive_downloader.ensure_download(channel_id, 3) + await self.assertFileList(content1, content2) + # ignores reposts + await proactive_downloader.ensure_download(channel_id, 4) + await self.assertFileList(content1, content2)