create downloader component and initial tests

This commit is contained in:
Victor Shyba 2021-10-15 03:09:38 -03:00
parent a1866c40f5
commit fcbe8cf00b
4 changed files with 150 additions and 1 deletions

View file

@ -42,6 +42,7 @@ DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer" HASH_ANNOUNCER_COMPONENT = "hash_announcer"
FILE_MANAGER_COMPONENT = "file_manager" FILE_MANAGER_COMPONENT = "file_manager"
DISK_SPACE_COMPONENT = "disk_space" DISK_SPACE_COMPONENT = "disk_space"
BACKGROUND_DOWNLOADER_COMPONENT = "background_downloader"
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp" UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
@ -377,6 +378,63 @@ class FileManagerComponent(Component):
self.file_manager.stop() self.file_manager.stop()
class BackgroundDownloader(Component):
component_name = BACKGROUND_DOWNLOADER_COMPONENT
depends_on = [FILE_MANAGER_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
self.status = {'pending': 0, 'ongoing': 0}
self.task: typing.Optional[asyncio.Task] = None
self.download_loop_delay_seconds = 60
@property
def component(self) -> 'BackgroundDownloader':
return self
async def get_status(self):
self.status['running'] = self.task is not None and not self.task.done()
return self.status
async def loop(self):
return
db: SQLiteStorage = self.component_manager.get_component(DATABASE_COMPONENT)
while True:
for channel_id, download_latest, download_all in await db.get_subscriptions():
amount = 1_000_000 if download_all else download_latest
if not amount:
continue
await self.ensure_download(channel_id, amount)
await asyncio.sleep(self.download_loop_delay_seconds)
async def ensure_download(self, channel_id, amount):
file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT)
ledger = wallet.ledger
claims, _, _, _ = await ledger.claim_search(
ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'])
page = 0
while claims and amount > 0:
for claim in claims:
if not claim.script.source or claim.has_price:
continue
stream = await file_manager.download_from_uri(
claim.permanent_url, None, 60.0, save_file=False, wallet=wallet
)
amount -= 1
if amount == 0:
break
page += 1
claims, _, _, _ = await ledger.claim_search(
ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], page=page)
async def start(self):
self.task = asyncio.create_task(self.loop())
async def stop(self):
self.task.cancel()
class DiskSpaceComponent(Component): class DiskSpaceComponent(Component):
component_name = DISK_SPACE_COMPONENT component_name = DISK_SPACE_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT] depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]

View file

@ -3030,6 +3030,46 @@ class Daemon(metaclass=JSONRPCServerType):
} }
return base58.b58encode(json.dumps(export, separators=(',', ':'))) return base58.b58encode(json.dumps(export, separators=(',', ':')))
@requires(WALLET_COMPONENT)
def jsonrpc_channel_subscribe(self, channel_id, download_latest=None, download_all=False):
"""
Subscribe to a channel and optionally start downloading streams proactively.
Usage:
channel_subscribe (<channel_id> | --channel_id=<channel_id>) [--download_latest=<download_latest>]
[--download_all]
Options:
--channel_id=<channel_id> : (str) claim id of channel to subscribe.
--download_latest=<download_latest> : (int) amount of newest streams to ensure download.
--download_all : (bool) download all streams from the channel.
Returns:
(bool) Subscription successful? (False only if channel doesn't exist)
"""
if download_all and download_latest is not None:
raise ConflictingInputValueError("Please set either download_latest or download_all, not both.")
return self.storage.add_subscription(channel_id, download_latest, download_all)
@requires(WALLET_COMPONENT)
def jsonrpc_channel_unsubscribe(self, channel_id):
"""
Subscribe to a channel and optionally start downloading streams proactively.
Usage:
channel_subscribe (<channel_id> | --channel_id=<channel_id>) [--download=<download>]
Options:
--channel_id=<channel_id> : (str) claim id of channel to subscribe
--download=<download> : (str) which strategy to use for downloads: 'all' for everything.
'latest-X' for the latest X streams. None (default) for nothing.
Returns:
(bool) Subscription successful? (False only if channel doesn't exist)
"""
return self.storage.remove_subscription(channel_id)
@requires(WALLET_COMPONENT) @requires(WALLET_COMPONENT)
async def jsonrpc_channel_import(self, channel_data, wallet_id=None): async def jsonrpc_channel_import(self, channel_data, wallet_id=None):
""" """

View file

@ -235,6 +235,12 @@ class SQLiteStorage(SQLiteMixin):
pragma foreign_keys=on; pragma foreign_keys=on;
pragma journal_mode=WAL; pragma journal_mode=WAL;
create table if not exists subscription (
channel_id char(40) primary key not null,
download_latest integer not null default 0,
download_all integer not null default 0
);
create table if not exists blob ( create table if not exists blob (
blob_hash char(96) primary key not null, blob_hash char(96) primary key not null,
blob_length integer not null, blob_length integer not null,
@ -539,6 +545,19 @@ class SQLiteStorage(SQLiteMixin):
async def delete_torrent(self, bt_infohash: str): async def delete_torrent(self, bt_infohash: str):
return await self.db.run(delete_torrent, bt_infohash) return await self.db.run(delete_torrent, bt_infohash)
# # # # # # # # # subscriptions # # # # # # # # #
def add_subscription(self, channel_id, download_latest=None, download_all=None):
return self.db.execute_fetchall(
"insert or replace into subscription(channel_id, download_latest, download_all) values (?, ?, ?)",
(channel_id, download_latest or 0, 1 if download_all else 0))
def remove_subscription(self, channel_id):
return self.db.execute_fetchall("delete from subscriptions where channel_id=?", (channel_id,))
def get_subscriptions(self):
return self.db.execute_fetchall("select channel_id, download_latest, download_all from subscription")
# # # # # # # # # file stuff # # # # # # # # # # # # # # # # # # file stuff # # # # # # # # #
def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str], def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str],

View file

@ -5,7 +5,7 @@ from binascii import hexlify
from lbry.schema import Claim from lbry.schema import Claim
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.extras.daemon.components import TorrentSession from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
from lbry.wallet import Transaction from lbry.wallet import Transaction
@ -571,3 +571,35 @@ class DiskSpaceManagement(CommandTestCase):
self.assertTrue(blobs2.issubset(blobs)) self.assertTrue(blobs2.issubset(blobs))
self.assertFalse(blobs3.issubset(blobs)) self.assertFalse(blobs3.issubset(blobs))
self.assertTrue(blobs4.issubset(blobs)) self.assertTrue(blobs4.issubset(blobs))
class TestProactiveDownloaderComponent(CommandTestCase):
async def assertFileList(self, *txos):
txos_names = {txo['outputs'][0]['name'] for txo in txos}
files = await self.file_list()
self.assertEqual(len(txos), len(files))
file_claim_names = {file['claim_name'] for file in files}
self.assertSetEqual(txos_names, file_claim_names)
async def test_ensure_download(self):
unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01'))
channel_id = self.get_claim_id(await self.channel_create('@cool'))
content1 = await self.stream_create('content1', '0.01', channel_id=channel_id)
content2 = await self.stream_create('content2', '0.01', channel_id=channel_id)
await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD')
await self.stream_repost(unrelated_claim_id, 'repost')
await self.daemon.jsonrpc_file_delete(delete_all=True)
self.assertEqual(0, len(await self.file_list()))
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
await self.assertFileList()
await proactive_downloader.ensure_download(channel_id, 1)
await self.assertFileList(content1)
await proactive_downloader.ensure_download(channel_id, 2)
await self.assertFileList(content1, content2)
# ignores paid content
await proactive_downloader.ensure_download(channel_id, 3)
await self.assertFileList(content1, content2)
# ignores reposts
await proactive_downloader.ensure_download(channel_id, 4)
await self.assertFileList(content1, content2)