diff --git a/lbry/conf.py b/lbry/conf.py index da2f45f85..c204a49f5 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -558,6 +558,7 @@ class Config(CLIConfig): "Don't download streams with fees exceeding this amount. When set to " "null, the amount is unbounded.", {'currency': 'USD', 'amount': 50.0} ) + max_wallet_server_fee = String("Maximum daily LBC amount allowed as payment for wallet servers.", "1.0") # reflector settings reflect_streams = Toggle( diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 2af376904..fe045a6c2 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -21,6 +21,7 @@ from lbry.extras.daemon.component import Component from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage from lbry.wallet import WalletManager +from lbry.wallet.usage_payment import WalletServerPayer log = logging.getLogger(__name__) @@ -29,6 +30,7 @@ log = logging.getLogger(__name__) DATABASE_COMPONENT = "database" BLOB_COMPONENT = "blob_manager" WALLET_COMPONENT = "wallet" +WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments" DHT_COMPONENT = "dht" HASH_ANNOUNCER_COMPONENT = "hash_announcer" STREAM_MANAGER_COMPONENT = "stream_manager" @@ -160,6 +162,34 @@ class WalletComponent(Component): self.wallet_manager = None +class WalletServerPaymentsComponent(Component): + component_name = WALLET_SERVER_PAYMENTS_COMPONENT + depends_on = [WALLET_COMPONENT] + + def __init__(self, component_manager): + super().__init__(component_manager) + self.usage_payment_service = WalletServerPayer( + max_fee=self.conf.max_wallet_server_fee, analytics_manager=self.component_manager.analytics_manager, + ) + + @property + def component(self) -> typing.Optional[WalletServerPayer]: + return self.usage_payment_service + + async def start(self): + wallet_manager = self.component_manager.get_component(WALLET_COMPONENT) + await self.usage_payment_service.start(wallet_manager.ledger, wallet_manager.default_wallet) + + async def stop(self): + await self.usage_payment_service.stop() + + async def get_status(self): + return { + 'max_fee': self.usage_payment_service.max_fee, + 'running': self.usage_payment_service.running + } + + class BlobComponent(Component): component_name = BLOB_COMPONENT depends_on = [DATABASE_COMPONENT] diff --git a/lbry/testcase.py b/lbry/testcase.py index 2f9989076..39118c953 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -253,6 +253,11 @@ class IntegrationTestCase(AsyncioTestCase): lambda e: e.tx.id == txid ) + def on_address_update(self, address): + return self.ledger.on_transaction.where( + lambda e: e.address == address + ) + def on_transaction_address(self, tx, address): return self.ledger.on_transaction.where( lambda e: e.tx.id == tx.id and e.address == address diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 44a190814..a94a7bc0a 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -173,6 +173,9 @@ class SPVNode: async def start(self, blockchain_node: 'BlockchainNode', extraconf=None): self.data_path = tempfile.mkdtemp() conf = { + 'DESCRIPTION': '', + 'PAYMENT_ADDRESS': '', + 'DAILY_FEE': '0', 'DB_DIRECTORY': self.data_path, 'DAEMON_URL': blockchain_node.rpc_url, 'REORG_LIMIT': '100', diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 857b70803..80b4c3e1f 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -72,6 +72,7 @@ class Env: self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified + self.payment_address = self.default('PAYMENT_ADDRESS', '') self.donation_address = self.default('DONATION_ADDRESS', '') # Server limits to help prevent DoS self.max_send = self.integer('MAX_SEND', 1000000) @@ -81,7 +82,7 @@ class Env: self.session_timeout = self.integer('SESSION_TIMEOUT', 600) self.drop_client = self.custom("DROP_CLIENT", None, re.compile) self.description = self.default('DESCRIPTION', '') - self.daily_fee = self.integer('DAILY_FEE', 0) + self.daily_fee = self.string_amount('DAILY_FEE', '0') # Identities clearnet_identity = self.clearnet_identity() @@ -107,6 +108,14 @@ class Env: raise cls.Error(f'required envvar {envvar} not set') return value + @classmethod + def string_amount(cls, envvar, default): + value = environ.get(envvar, default) + amount_pattern = re.compile("[0-9]{0,10}(\.[0-9]{1,8})?") + if len(value) > 0 and not amount_pattern.fullmatch(value): + raise cls.Error(f'{value} is not a valid amount for {envvar}') + return value + @classmethod def integer(cls, envvar, default): value = environ.get(envvar) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index dcc05f8e9..3eee3f0f5 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -800,6 +800,7 @@ class LBRYElectrumX(SessionBase): 'blockchain.transaction.get_merkle': cls.transaction_merkle, 'server.add_peer': cls.add_peer, 'server.banner': cls.banner, + 'server.payment_address': cls.payment_address, 'server.donation_address': cls.donation_address, 'server.features': cls.server_features_async, 'server.peers.subscribe': cls.peers_subscribe, @@ -854,7 +855,8 @@ class LBRYElectrumX(SessionBase): 'protocol_max': max_str, 'genesis_hash': env.coin.GENESIS_HASH, 'description': env.description, - 'payment_address': env.donation_address, + 'payment_address': env.payment_address, + 'donation_address': env.donation_address, 'daily_fee': env.daily_fee, 'hash_function': 'sha256', } @@ -1362,11 +1364,16 @@ class LBRYElectrumX(SessionBase): ('$SERVER_VERSION', self.version), ('$DAEMON_VERSION', daemon_version), ('$DAEMON_SUBVERSION', network_info['subversion']), + ('$PAYMENT_ADDRESS', self.env.payment_address), ('$DONATION_ADDRESS', self.env.donation_address), ]: banner = banner.replace(*pair) return banner + async def payment_address(self): + """Return the payment address as a string, empty if there is none.""" + return self.env.payment_address + async def donation_address(self): """Return the donation address as a string, empty if there is none.""" return self.env.donation_address diff --git a/lbry/wallet/usage_payment.py b/lbry/wallet/usage_payment.py new file mode 100644 index 000000000..167f10ed6 --- /dev/null +++ b/lbry/wallet/usage_payment.py @@ -0,0 +1,66 @@ +import asyncio +import logging + +from lbry.wallet.dewies import lbc_to_dewies +from lbry.wallet.transaction import Output, Transaction + +log = logging.getLogger(__name__) + + +class WalletServerPayer: + def __init__(self, payment_period=24 * 60 * 60, max_fee='1.0', analytics_manager=None): + self.ledger = None + self.wallet = None + self.running = False + self.task = None + self.payment_period = payment_period + self.analytics_manager = analytics_manager + self.max_fee = max_fee + + async def pay(self): + while self.running: + await asyncio.sleep(self.payment_period) + features = await self.ledger.network.get_server_features() + address = features['payment_address'] + amount = str(features['daily_fee']) + if not address or not amount: + continue + + if not self.ledger.is_valid_address(address): + log.warning("Invalid address from wallet server: '%s' - skipping payment round.", address) + continue + if self.wallet.is_locked: + log.warning("Cannot spend funds with locked wallet, skipping payment round.") + continue + + amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies + limit = lbc_to_dewies(self.max_fee) + if amount > limit: + log.warning( + "Server asked %s LBC as daily fee, but maximum allowed is %s LBC. Skipping payment round.", + features['daily_fee'], self.max_fee + ) + continue + + tx = await Transaction.create( + [], + [Output.pay_pubkey_hash(amount, self.ledger.address_to_hash160(address))], + self.wallet.get_accounts_or_all(None), + self.wallet.get_account_or_default(None) + ) + + await self.ledger.broadcast(tx) + if self.analytics_manager: + await self.analytics_manager.send_credits_sent() + + async def start(self, ledger=None, wallet=None): + self.ledger = ledger + self.wallet = wallet + self.running = True + self.task = asyncio.ensure_future(self.pay()) + self.task.add_done_callback(lambda _: log.info("Stopping wallet server payments.")) + + async def stop(self): + if self.running: + self.running = False + self.task.cancel() diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 2dea647ef..6f87cdc37 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -1,5 +1,4 @@ import logging -import os import asyncio import lbry @@ -29,15 +28,20 @@ class NetworkTests(IntegrationTestCase): 'pruning': None, 'description': '', 'payment_address': '', - 'daily_fee': 0, + 'donation_address': '', + 'daily_fee': '0', 'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) await self.conductor.spv_node.stop() - address = (await self.account.get_addresses(limit=1))[0] - os.environ.update({ - 'DESCRIPTION': 'Fastest server in the west.', - 'DONATION_ADDRESS': address, - 'DAILY_FEE': '42'}) - await self.conductor.spv_node.start(self.conductor.blockchain_node) + payment_address, donation_address = await self.account.get_addresses(limit=2) + await self.conductor.spv_node.start( + self.conductor.blockchain_node, + extraconf={ + 'DESCRIPTION': 'Fastest server in the west.', + 'PAYMENT_ADDRESS': payment_address, + 'DONATION_ADDRESS': donation_address, + 'DAILY_FEE': '42' + } + ) await self.ledger.network.on_connected.first self.assertDictEqual({ 'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH, @@ -47,8 +51,9 @@ class NetworkTests(IntegrationTestCase): 'protocol_min': '0.54.0', 'pruning': None, 'description': 'Fastest server in the west.', - 'payment_address': address, - 'daily_fee': 42, + 'payment_address': payment_address, + 'donation_address': donation_address, + 'daily_fee': '42', 'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 59b44a450..1dbb7735f 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -3,7 +3,9 @@ import asyncio import lbry import lbry.wallet from lbry.wallet.network import ClientSession -from lbry.testcase import IntegrationTestCase +from lbry.testcase import IntegrationTestCase, CommandTestCase, AdvanceTimeTestCase +from lbry.wallet.orchstr8.node import SPVNode +from lbry.wallet.usage_payment import WalletServerPayer class TestSessions(IntegrationTestCase): @@ -49,3 +51,50 @@ class TestSegwitServer(IntegrationTestCase): async def test_at_least_it_starts(self): await asyncio.wait_for(self.ledger.network.get_headers(0, 1), 1.0) + + +class TestUsagePayment(CommandTestCase): + async def test_single_server_payment(self): + wallet_pay_service = self.daemon.component_manager.get_component('wallet_server_payments') + wallet_pay_service.payment_period = 1 + await wallet_pay_service.stop() + await wallet_pay_service.start(ledger=self.ledger, wallet=self.wallet) + + address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0] + _, history = await self.ledger.get_local_status_and_history(address) + self.assertEqual(history, []) + + node = SPVNode(self.conductor.spv_module, node_number=2) + await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"}) + self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) + with self.assertLogs(level='WARNING') as cm: + await self.daemon.jsonrpc_wallet_reconnect() + + features = await self.ledger.network.get_server_features() + self.assertEqual(features["payment_address"], address) + self.assertEqual(features["daily_fee"], "1.1") + elapsed = 0 + while not cm.output: + await asyncio.sleep(0.1) + elapsed += 1 + if elapsed > 30: + raise TimeoutError('Nothing logged for 3 seconds.') + self.assertEqual( + cm.output, + ['WARNING:lbry.wallet.usage_payment:Server asked 1.1 LBC as daily fee, but ' + 'maximum allowed is 1.0 LBC. Skipping payment round.'] + ) + await node.stop(False) + await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.0"}) + self.addCleanup(node.stop) + self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) + await self.daemon.jsonrpc_wallet_reconnect() + features = await self.ledger.network.get_server_features() + self.assertEqual(features["payment_address"], address) + self.assertEqual(features["daily_fee"], "1.0") + await asyncio.wait_for(self.on_address_update(address), timeout=1) + _, history = await self.ledger.get_local_status_and_history(address) + txid, nout = history[0] + tx_details = await self.daemon.jsonrpc_transaction_show(txid) + self.assertEqual(tx_details.outputs[nout].amount, 100000000) + self.assertEqual(tx_details.outputs[nout].get_address(self.ledger), address) diff --git a/tests/integration/other/test_cli.py b/tests/integration/other/test_cli.py index b9e6c3d28..59b629747 100644 --- a/tests/integration/other/test_cli.py +++ b/tests/integration/other/test_cli.py @@ -7,7 +7,7 @@ from lbry.extras import cli from lbry.extras.daemon.components import ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, - UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT + UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) from lbry.extras.daemon.daemon import Daemon @@ -22,7 +22,7 @@ class CLIIntegrationTest(AsyncioTestCase): conf.components_to_skip = ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, - UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT + UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) Daemon.component_attributes = {} self.daemon = Daemon(conf) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index f9d2463f5..d8d2ed5a9 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -27,6 +27,7 @@ class TestComponentManager(AsyncioTestCase): components.HashAnnouncerComponent, components.PeerProtocolServerComponent, components.StreamManagerComponent, + components.WalletServerPaymentsComponent ] ] self.component_manager = ComponentManager(Config())