From c2d717aba5315d3033fb58aa68bfece6f1dc421a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 6 Jan 2020 01:29:48 -0300 Subject: [PATCH 01/10] old code from #2683 --- lbry/wallet/server/env.py | 2 +- lbry/wallet/server/session.py | 10 ++-- lbry/wallet/usage_payment.py | 50 +++++++++++++++++++ tests/integration/blockchain/test_network.py | 2 +- .../blockchain/test_wallet_server_sessions.py | 40 ++++++++++++++- 5 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 lbry/wallet/usage_payment.py diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 857b70803..9a06c8145 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -72,7 +72,7 @@ class Env: self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified - self.donation_address = self.default('DONATION_ADDRESS', '') + self.payment_address = self.default('PAYMENT_ADDRESS', '') # Server limits to help prevent DoS self.max_send = self.integer('MAX_SEND', 1000000) self.max_subs = self.integer('MAX_SUBS', 250000) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index dcc05f8e9..e8aca5cc9 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -854,7 +854,7 @@ class LBRYElectrumX(SessionBase): 'protocol_max': max_str, 'genesis_hash': env.coin.GENESIS_HASH, 'description': env.description, - 'payment_address': env.donation_address, + 'payment_address': env.payment_address, 'daily_fee': env.daily_fee, 'hash_function': 'sha256', } @@ -1362,14 +1362,14 @@ class LBRYElectrumX(SessionBase): ('$SERVER_VERSION', self.version), ('$DAEMON_VERSION', daemon_version), ('$DAEMON_SUBVERSION', network_info['subversion']), - ('$DONATION_ADDRESS', self.env.donation_address), + ('$PAYMENT_ADDRESS', self.env.payment_address), ]: banner = banner.replace(*pair) return banner - async def donation_address(self): - """Return the donation address as a string, empty if there is none.""" - return self.env.donation_address + async def payment_address(self): + """Return the payment address as a string, empty if there is none.""" + return self.env.payment_address async def banner(self): """Return the server banner text.""" diff --git a/lbry/wallet/usage_payment.py b/lbry/wallet/usage_payment.py new file mode 100644 index 000000000..563a0c5ea --- /dev/null +++ b/lbry/wallet/usage_payment.py @@ -0,0 +1,50 @@ +import asyncio +import logging + +from lbry.wallet import Wallet +from lbry.wallet.ledger import Ledger +from lbry.wallet.dewies import lbc_to_dewies +from lbry.wallet.transaction import Output, Transaction + +log = logging.getLogger(__name__) + + +class WalletServerPayer: + + def __init__(self, ledger: Ledger, wallet: Wallet): + self.ledger = ledger + self.wallet = wallet + self.running = False + self.task = None + + async def pay(self): + while self.running: + await asyncio.sleep(24 * 60 * 60) + features = await self.ledger.network.get_server_features() + address = features['payment_address'] + + if not self.ledger.is_valid_address(address): + raise Exception(f"Invalid address: {address}") + if self.wallet.is_locked: + raise Exception("Cannot spend funds with locked wallet") + + amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies + # todo: check that amount is less than our max + + tx = await Transaction.create([], + [Output.pay_pubkey_hash(amount, self.ledger.address_to_hash160(address))], + self.wallet.get_accounts_or_all(None), + self.wallet.get_account_or_default(None)) + + await self.ledger.broadcast(tx) + await self.analytics_manager.send_credits_sent() + + async def start(self): + self.running = True + self.task = asyncio.ensure_future(self.pay()) + self.task.add_done_callback(lambda _: log.info("Stopping wallet server payments.")) + + async def stop(self): + if self.running: + self.running = False + self.task.cancel() diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 2dea647ef..a4148a996 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -35,7 +35,7 @@ class NetworkTests(IntegrationTestCase): address = (await self.account.get_addresses(limit=1))[0] os.environ.update({ 'DESCRIPTION': 'Fastest server in the west.', - 'DONATION_ADDRESS': address, + 'PAYMENT_ADDRESS': address, 'DAILY_FEE': '42'}) await self.conductor.spv_node.start(self.conductor.blockchain_node) await self.ledger.network.on_connected.first diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 59b44a450..8ed002165 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -3,7 +3,8 @@ import asyncio import lbry import lbry.wallet from lbry.wallet.network import ClientSession -from lbry.testcase import IntegrationTestCase +from lbry.testcase import IntegrationTestCase, CommandTestCase +from lbry.wallet.orchstr8.node import SPVNode class TestSessions(IntegrationTestCase): @@ -49,3 +50,40 @@ class TestSegwitServer(IntegrationTestCase): async def test_at_least_it_starts(self): await asyncio.wait_for(self.ledger.network.get_headers(0, 1), 1.0) + + +class TestUsagePayment(CommandTestCase): + + async def test_single_server_payment(self): + # create wallet server + # set payment address and fee rate on server + # connect to server + # fast forward 24 hours + # check that payment was sent to server + + address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0] + + node = SPVNode(self.conductor.spv_module, node_number=2) + await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1"}) + + self.ledger.network.config['default_servers'] = [(node.hostname, node.port)] + await self.ledger.stop() + await self.ledger.start() + + features = await self.ledger.network.get_server_features() + + + + pass + + # async def test_daily_payment(self): + # node2 = SPVNode(self.conductor.spv_module, node_number=2) + # self.ledger.network.config['default_servers'].append((node2.hostname, node2.port)) + # await asyncio.wait_for(self.ledger.stop(), timeout=1) + # await asyncio.wait_for(self.ledger.start(), timeout=1) + # self.ledger.network.session_pool.new_connection_event.clear() + # await node2.start(self.blockchain) + # # this is only to speed up the test as retrying would take 4+ seconds + # for session in self.ledger.network.session_pool.sessions: + # session.trigger_urgent_reconnect.set() + # await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1) \ No newline at end of file From d317a4042c967aea44abf153ce58ab4f240073cb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 6 Jan 2020 01:53:51 -0300 Subject: [PATCH 02/10] change daily_fee to be lbc instead of dewies --- lbry/wallet/server/env.py | 10 +++++++++- tests/integration/blockchain/test_network.py | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 9a06c8145..a8c2a9ae8 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -81,7 +81,7 @@ class Env: self.session_timeout = self.integer('SESSION_TIMEOUT', 600) self.drop_client = self.custom("DROP_CLIENT", None, re.compile) self.description = self.default('DESCRIPTION', '') - self.daily_fee = self.integer('DAILY_FEE', 0) + self.daily_fee = self.string_amount('DAILY_FEE', '0') # Identities clearnet_identity = self.clearnet_identity() @@ -107,6 +107,14 @@ class Env: raise cls.Error(f'required envvar {envvar} not set') return value + @classmethod + def string_amount(cls, envvar, default): + value = environ.get(envvar, default) + amount_pattern = re.compile("[0-9]{0,10}(\.[0-9]{1,8})?") + if len(value) > 0 and not amount_pattern.fullmatch(value): + raise cls.Error(f'{value} is not a valid amount for {envvar}') + return value + @classmethod def integer(cls, envvar, default): value = environ.get(envvar) diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index a4148a996..113c9a474 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -29,7 +29,7 @@ class NetworkTests(IntegrationTestCase): 'pruning': None, 'description': '', 'payment_address': '', - 'daily_fee': 0, + 'daily_fee': '0', 'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) await self.conductor.spv_node.stop() address = (await self.account.get_addresses(limit=1))[0] @@ -48,7 +48,7 @@ class NetworkTests(IntegrationTestCase): 'pruning': None, 'description': 'Fastest server in the west.', 'payment_address': address, - 'daily_fee': 42, + 'daily_fee': '42', 'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) From 15a2f048acd96f845367ba58a007b9288ac45fa8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 6 Jan 2020 05:07:37 -0300 Subject: [PATCH 03/10] pay wallet server test and initial prototype --- lbry/wallet/manager.py | 5 ++ lbry/wallet/usage_payment.py | 8 ++- .../blockchain/test_wallet_server_sessions.py | 50 +++++++++---------- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 6e8cc5db1..ca3dff4fc 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -15,6 +15,7 @@ from .account import Account from .ledger import Ledger, LedgerRegistry from .transaction import Transaction, Output from .database import Database +from .usage_payment import WalletServerPayer from .wallet import Wallet, WalletStorage, ENCRYPT_ON_DISK from .rpc.jsonrpc import CodeMessageError @@ -33,6 +34,7 @@ class WalletManager: self.ledgers = ledgers or {} self.running = False self.config: Optional[Config] = None + self.usage_payment_service: Optional[WalletManager] = None @classmethod def from_config(cls, config: dict) -> 'WalletManager': @@ -79,11 +81,14 @@ class WalletManager: await asyncio.gather(*( l.start() for l in self.ledgers.values() )) + self.usage_payment_service = WalletServerPayer(self.ledger, self.default_wallet) + await self.usage_payment_service.start() async def stop(self): await asyncio.gather(*( l.stop() for l in self.ledgers.values() )) + await self.usage_payment_service.stop() self.running = False def get_wallet_or_default(self, wallet_id: Optional[str]) -> Wallet: diff --git a/lbry/wallet/usage_payment.py b/lbry/wallet/usage_payment.py index 563a0c5ea..3a4f25fec 100644 --- a/lbry/wallet/usage_payment.py +++ b/lbry/wallet/usage_payment.py @@ -10,6 +10,7 @@ log = logging.getLogger(__name__) class WalletServerPayer: + PAYMENT_PERIOD = 24 * 60 * 60 def __init__(self, ledger: Ledger, wallet: Wallet): self.ledger = ledger @@ -19,9 +20,12 @@ class WalletServerPayer: async def pay(self): while self.running: - await asyncio.sleep(24 * 60 * 60) + await asyncio.sleep(self.PAYMENT_PERIOD) features = await self.ledger.network.get_server_features() address = features['payment_address'] + amount = str(features['daily_fee']) + if not address or not amount: + continue if not self.ledger.is_valid_address(address): raise Exception(f"Invalid address: {address}") @@ -37,7 +41,7 @@ class WalletServerPayer: self.wallet.get_account_or_default(None)) await self.ledger.broadcast(tx) - await self.analytics_manager.send_credits_sent() + # await self.analytics_manager.send_credits_sent() fixme: handle that async def start(self): self.running = True diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 8ed002165..8b3c0025e 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -3,8 +3,9 @@ import asyncio import lbry import lbry.wallet from lbry.wallet.network import ClientSession -from lbry.testcase import IntegrationTestCase, CommandTestCase +from lbry.testcase import IntegrationTestCase, CommandTestCase, AdvanceTimeTestCase from lbry.wallet.orchstr8.node import SPVNode +from lbry.wallet.usage_payment import WalletServerPayer class TestSessions(IntegrationTestCase): @@ -53,37 +54,34 @@ class TestSegwitServer(IntegrationTestCase): class TestUsagePayment(CommandTestCase): + VERBOSITY = 'DEBUG' + LEDGER = lbry.wallet + + def setUp(self) -> None: + WalletServerPayer.PAYMENT_PERIOD = 1 + + def tearDown(self) -> None: + WalletServerPayer.PAYMENT_PERIOD = 24 * 60 * 60 async def test_single_server_payment(self): - # create wallet server - # set payment address and fee rate on server - # connect to server - # fast forward 24 hours - # check that payment was sent to server - address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0] + _, history = await self.ledger.get_local_status_and_history(address) + self.assertEqual(history, []) node = SPVNode(self.conductor.spv_module, node_number=2) - await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1"}) - - self.ledger.network.config['default_servers'] = [(node.hostname, node.port)] - await self.ledger.stop() - await self.ledger.start() + await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"}) + self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) + await self.daemon.jsonrpc_wallet_reconnect() features = await self.ledger.network.get_server_features() + self.assertEqual(features["payment_address"], address) + self.assertEqual(features["daily_fee"], "1.1") + await asyncio.sleep(1) # fixme: wait on something better - - pass - - # async def test_daily_payment(self): - # node2 = SPVNode(self.conductor.spv_module, node_number=2) - # self.ledger.network.config['default_servers'].append((node2.hostname, node2.port)) - # await asyncio.wait_for(self.ledger.stop(), timeout=1) - # await asyncio.wait_for(self.ledger.start(), timeout=1) - # self.ledger.network.session_pool.new_connection_event.clear() - # await node2.start(self.blockchain) - # # this is only to speed up the test as retrying would take 4+ seconds - # for session in self.ledger.network.session_pool.sessions: - # session.trigger_urgent_reconnect.set() - # await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1) \ No newline at end of file + _, history = await self.ledger.get_local_status_and_history(address) + self.assertNotEqual(history, []) + txid, nout = history[0] + tx_details = await self.daemon.jsonrpc_transaction_show(txid) + self.assertEqual(tx_details.outputs[nout].amount, 110000000) + self.assertEqual(tx_details.outputs[nout].get_address(self.ledger), address) From 200531dd96a84e79f84a628041dcbf4c75b860d4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 8 Jan 2020 13:09:16 -0300 Subject: [PATCH 04/10] wait for tx instead --- lbry/testcase.py | 5 +++++ tests/integration/blockchain/test_wallet_server_sessions.py | 4 +--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lbry/testcase.py b/lbry/testcase.py index 2f9989076..39118c953 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -253,6 +253,11 @@ class IntegrationTestCase(AsyncioTestCase): lambda e: e.tx.id == txid ) + def on_address_update(self, address): + return self.ledger.on_transaction.where( + lambda e: e.address == address + ) + def on_transaction_address(self, tx, address): return self.ledger.on_transaction.where( lambda e: e.tx.id == tx.id and e.address == address diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 8b3c0025e..a39ac916f 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -77,10 +77,8 @@ class TestUsagePayment(CommandTestCase): self.assertEqual(features["payment_address"], address) self.assertEqual(features["daily_fee"], "1.1") - await asyncio.sleep(1) # fixme: wait on something better - + await self.on_address_update(address) _, history = await self.ledger.get_local_status_and_history(address) - self.assertNotEqual(history, []) txid, nout = history[0] tx_details = await self.daemon.jsonrpc_transaction_show(txid) self.assertEqual(tx_details.outputs[nout].amount, 110000000) From 5a6218eeca8900d56c373fc988891acaed7aa2bf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 10 Jan 2020 12:32:32 -0300 Subject: [PATCH 05/10] improve tests, fix types --- lbry/wallet/manager.py | 2 +- tests/integration/blockchain/test_wallet_server_sessions.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index ca3dff4fc..d833884ee 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -34,7 +34,7 @@ class WalletManager: self.ledgers = ledgers or {} self.running = False self.config: Optional[Config] = None - self.usage_payment_service: Optional[WalletManager] = None + self.usage_payment_service: Optional[WalletServerPayer] = None @classmethod def from_config(cls, config: dict) -> 'WalletManager': diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index a39ac916f..9c1a3a513 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -54,7 +54,6 @@ class TestSegwitServer(IntegrationTestCase): class TestUsagePayment(CommandTestCase): - VERBOSITY = 'DEBUG' LEDGER = lbry.wallet def setUp(self) -> None: @@ -77,8 +76,9 @@ class TestUsagePayment(CommandTestCase): self.assertEqual(features["payment_address"], address) self.assertEqual(features["daily_fee"], "1.1") - await self.on_address_update(address) - _, history = await self.ledger.get_local_status_and_history(address) + if len(history) == 0: + await self.on_address_update(address) + _, history = await self.ledger.get_local_status_and_history(address) txid, nout = history[0] tx_details = await self.daemon.jsonrpc_transaction_show(txid) self.assertEqual(tx_details.outputs[nout].amount, 110000000) From ffa5c20c8872d2df5dbbdbc4b1c43a7ca157a8df Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 12 Jan 2020 00:48:46 -0300 Subject: [PATCH 06/10] fix hanging test --- lbry/wallet/orchstr8/node.py | 3 +++ tests/integration/blockchain/test_network.py | 14 ++++++++------ .../blockchain/test_wallet_server_sessions.py | 1 + 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 44a190814..a94a7bc0a 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -173,6 +173,9 @@ class SPVNode: async def start(self, blockchain_node: 'BlockchainNode', extraconf=None): self.data_path = tempfile.mkdtemp() conf = { + 'DESCRIPTION': '', + 'PAYMENT_ADDRESS': '', + 'DAILY_FEE': '0', 'DB_DIRECTORY': self.data_path, 'DAEMON_URL': blockchain_node.rpc_url, 'REORG_LIMIT': '100', diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index 113c9a474..fb119bf5e 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -1,5 +1,4 @@ import logging -import os import asyncio import lbry @@ -33,11 +32,14 @@ class NetworkTests(IntegrationTestCase): 'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) await self.conductor.spv_node.stop() address = (await self.account.get_addresses(limit=1))[0] - os.environ.update({ - 'DESCRIPTION': 'Fastest server in the west.', - 'PAYMENT_ADDRESS': address, - 'DAILY_FEE': '42'}) - await self.conductor.spv_node.start(self.conductor.blockchain_node) + await self.conductor.spv_node.start( + self.conductor.blockchain_node, + extraconf={ + 'DESCRIPTION': 'Fastest server in the west.', + 'PAYMENT_ADDRESS': address, + 'DAILY_FEE': '42' + } + ) await self.ledger.network.on_connected.first self.assertDictEqual({ 'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH, diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 9c1a3a513..18c7b9504 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -69,6 +69,7 @@ class TestUsagePayment(CommandTestCase): node = SPVNode(self.conductor.spv_module, node_number=2) await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"}) + self.addCleanup(node.stop) self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) await self.daemon.jsonrpc_wallet_reconnect() From ad6c6fbe354209d2f38c89ec21bcb45637df66d4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 11 Feb 2020 22:05:49 -0300 Subject: [PATCH 07/10] fixes from review and add analytics --- lbry/extras/daemon/components.py | 1 + lbry/wallet/manager.py | 5 +-- lbry/wallet/server/session.py | 2 +- lbry/wallet/usage_payment.py | 37 +++++++++++-------- .../blockchain/test_wallet_server_sessions.py | 14 +++---- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 2af376904..55e2c30bd 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -154,6 +154,7 @@ class WalletComponent(Component): log.info("Starting wallet") self.wallet_manager = await WalletManager.from_lbrynet_config(self.conf) await self.wallet_manager.start() + self.wallet_manager.usage_payment_service.analytics_manager = self.component_manager.analytics_manager async def stop(self): await self.wallet_manager.stop() diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index d833884ee..5005f3732 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -34,7 +34,7 @@ class WalletManager: self.ledgers = ledgers or {} self.running = False self.config: Optional[Config] = None - self.usage_payment_service: Optional[WalletServerPayer] = None + self.usage_payment_service = WalletServerPayer() @classmethod def from_config(cls, config: dict) -> 'WalletManager': @@ -81,8 +81,7 @@ class WalletManager: await asyncio.gather(*( l.start() for l in self.ledgers.values() )) - self.usage_payment_service = WalletServerPayer(self.ledger, self.default_wallet) - await self.usage_payment_service.start() + await self.usage_payment_service.start(self.ledger, self.default_wallet) async def stop(self): await asyncio.gather(*( diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index e8aca5cc9..40e2fc032 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -800,7 +800,7 @@ class LBRYElectrumX(SessionBase): 'blockchain.transaction.get_merkle': cls.transaction_merkle, 'server.add_peer': cls.add_peer, 'server.banner': cls.banner, - 'server.donation_address': cls.donation_address, + 'server.payment_address': cls.payment_address, 'server.features': cls.server_features_async, 'server.peers.subscribe': cls.peers_subscribe, 'server.version': cls.server_version, diff --git a/lbry/wallet/usage_payment.py b/lbry/wallet/usage_payment.py index 3a4f25fec..d480fd566 100644 --- a/lbry/wallet/usage_payment.py +++ b/lbry/wallet/usage_payment.py @@ -1,8 +1,6 @@ import asyncio import logging -from lbry.wallet import Wallet -from lbry.wallet.ledger import Ledger from lbry.wallet.dewies import lbc_to_dewies from lbry.wallet.transaction import Output, Transaction @@ -10,17 +8,17 @@ log = logging.getLogger(__name__) class WalletServerPayer: - PAYMENT_PERIOD = 24 * 60 * 60 - - def __init__(self, ledger: Ledger, wallet: Wallet): - self.ledger = ledger - self.wallet = wallet + def __init__(self, payment_period=24 * 60 * 60): + self.ledger = None + self.wallet = None self.running = False self.task = None + self.payment_period = payment_period + self.analytics_manager = None async def pay(self): while self.running: - await asyncio.sleep(self.PAYMENT_PERIOD) + await asyncio.sleep(self.payment_period) features = await self.ledger.network.get_server_features() address = features['payment_address'] amount = str(features['daily_fee']) @@ -28,22 +26,29 @@ class WalletServerPayer: continue if not self.ledger.is_valid_address(address): - raise Exception(f"Invalid address: {address}") + log.warning("Invalid address from wallet server: '%s' - skipping payment round.", address) + continue if self.wallet.is_locked: - raise Exception("Cannot spend funds with locked wallet") + log.warning("Cannot spend funds with locked wallet, skipping payment round.") + continue amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies # todo: check that amount is less than our max - tx = await Transaction.create([], - [Output.pay_pubkey_hash(amount, self.ledger.address_to_hash160(address))], - self.wallet.get_accounts_or_all(None), - self.wallet.get_account_or_default(None)) + tx = await Transaction.create( + [], + [Output.pay_pubkey_hash(amount, self.ledger.address_to_hash160(address))], + self.wallet.get_accounts_or_all(None), + self.wallet.get_account_or_default(None) + ) await self.ledger.broadcast(tx) - # await self.analytics_manager.send_credits_sent() fixme: handle that + if self.analytics_manager: + await self.analytics_manager.send_credits_sent() - async def start(self): + async def start(self, ledger, default_wallet): + self.ledger = ledger + self.wallet = default_wallet self.running = True self.task = asyncio.ensure_future(self.pay()) self.task.add_done_callback(lambda _: log.info("Stopping wallet server payments.")) diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 18c7b9504..460fac052 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -54,15 +54,11 @@ class TestSegwitServer(IntegrationTestCase): class TestUsagePayment(CommandTestCase): - LEDGER = lbry.wallet - - def setUp(self) -> None: - WalletServerPayer.PAYMENT_PERIOD = 1 - - def tearDown(self) -> None: - WalletServerPayer.PAYMENT_PERIOD = 24 * 60 * 60 - async def test_single_server_payment(self): + self.manager.usage_payment_service.payment_period = 1 + await self.manager.usage_payment_service.stop() + await self.manager.usage_payment_service.start(self.ledger, self.wallet) + address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0] _, history = await self.ledger.get_local_status_and_history(address) self.assertEqual(history, []) @@ -78,7 +74,7 @@ class TestUsagePayment(CommandTestCase): self.assertEqual(features["daily_fee"], "1.1") if len(history) == 0: - await self.on_address_update(address) + await asyncio.wait_for(self.on_address_update(address), timeout=1) _, history = await self.ledger.get_local_status_and_history(address) txid, nout = history[0] tx_details = await self.daemon.jsonrpc_transaction_show(txid) From ae9ba14b595c728b72bf2da129604a986d114004 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 11 Feb 2020 22:19:13 -0300 Subject: [PATCH 08/10] use both donation and payment addresses separately --- lbry/wallet/server/env.py | 1 + lbry/wallet/server/session.py | 7 +++++++ tests/integration/blockchain/test_network.py | 9 ++++++--- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index a8c2a9ae8..80b4c3e1f 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -73,6 +73,7 @@ class Env: self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified self.payment_address = self.default('PAYMENT_ADDRESS', '') + self.donation_address = self.default('DONATION_ADDRESS', '') # Server limits to help prevent DoS self.max_send = self.integer('MAX_SEND', 1000000) self.max_subs = self.integer('MAX_SUBS', 250000) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 40e2fc032..3eee3f0f5 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -801,6 +801,7 @@ class LBRYElectrumX(SessionBase): 'server.add_peer': cls.add_peer, 'server.banner': cls.banner, 'server.payment_address': cls.payment_address, + 'server.donation_address': cls.donation_address, 'server.features': cls.server_features_async, 'server.peers.subscribe': cls.peers_subscribe, 'server.version': cls.server_version, @@ -855,6 +856,7 @@ class LBRYElectrumX(SessionBase): 'genesis_hash': env.coin.GENESIS_HASH, 'description': env.description, 'payment_address': env.payment_address, + 'donation_address': env.donation_address, 'daily_fee': env.daily_fee, 'hash_function': 'sha256', } @@ -1363,6 +1365,7 @@ class LBRYElectrumX(SessionBase): ('$DAEMON_VERSION', daemon_version), ('$DAEMON_SUBVERSION', network_info['subversion']), ('$PAYMENT_ADDRESS', self.env.payment_address), + ('$DONATION_ADDRESS', self.env.donation_address), ]: banner = banner.replace(*pair) return banner @@ -1371,6 +1374,10 @@ class LBRYElectrumX(SessionBase): """Return the payment address as a string, empty if there is none.""" return self.env.payment_address + async def donation_address(self): + """Return the donation address as a string, empty if there is none.""" + return self.env.donation_address + async def banner(self): """Return the server banner text.""" banner = f'You are connected to an {self.version} server.' diff --git a/tests/integration/blockchain/test_network.py b/tests/integration/blockchain/test_network.py index fb119bf5e..6f87cdc37 100644 --- a/tests/integration/blockchain/test_network.py +++ b/tests/integration/blockchain/test_network.py @@ -28,15 +28,17 @@ class NetworkTests(IntegrationTestCase): 'pruning': None, 'description': '', 'payment_address': '', + 'donation_address': '', 'daily_fee': '0', 'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) await self.conductor.spv_node.stop() - address = (await self.account.get_addresses(limit=1))[0] + payment_address, donation_address = await self.account.get_addresses(limit=2) await self.conductor.spv_node.start( self.conductor.blockchain_node, extraconf={ 'DESCRIPTION': 'Fastest server in the west.', - 'PAYMENT_ADDRESS': address, + 'PAYMENT_ADDRESS': payment_address, + 'DONATION_ADDRESS': donation_address, 'DAILY_FEE': '42' } ) @@ -49,7 +51,8 @@ class NetworkTests(IntegrationTestCase): 'protocol_min': '0.54.0', 'pruning': None, 'description': 'Fastest server in the west.', - 'payment_address': address, + 'payment_address': payment_address, + 'donation_address': donation_address, 'daily_fee': '42', 'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) From 3950715237fa4434b3c77db2bfcc2e90c500838a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 17 Feb 2020 13:35:46 -0300 Subject: [PATCH 09/10] add max_wallet_server_fee conf to limit daily wallet server payments --- lbry/conf.py | 1 + lbry/wallet/manager.py | 4 ++- lbry/wallet/usage_payment.py | 14 +++++++-- .../blockchain/test_wallet_server_sessions.py | 31 ++++++++++++++----- 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/lbry/conf.py b/lbry/conf.py index da2f45f85..c204a49f5 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -558,6 +558,7 @@ class Config(CLIConfig): "Don't download streams with fees exceeding this amount. When set to " "null, the amount is unbounded.", {'currency': 'USD', 'amount': 50.0} ) + max_wallet_server_fee = String("Maximum daily LBC amount allowed as payment for wallet servers.", "1.0") # reflector settings reflect_streams = Toggle( diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 5005f3732..5e374051c 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -81,7 +81,9 @@ class WalletManager: await asyncio.gather(*( l.start() for l in self.ledgers.values() )) - await self.usage_payment_service.start(self.ledger, self.default_wallet) + await self.usage_payment_service.start( + self.ledger, self.default_wallet, self.config.max_wallet_server_fee if self.config else None + ) async def stop(self): await asyncio.gather(*( diff --git a/lbry/wallet/usage_payment.py b/lbry/wallet/usage_payment.py index d480fd566..b5f7130bf 100644 --- a/lbry/wallet/usage_payment.py +++ b/lbry/wallet/usage_payment.py @@ -8,13 +8,14 @@ log = logging.getLogger(__name__) class WalletServerPayer: - def __init__(self, payment_period=24 * 60 * 60): + def __init__(self, payment_period=24 * 60 * 60, max_fee='1.0'): self.ledger = None self.wallet = None self.running = False self.task = None self.payment_period = payment_period self.analytics_manager = None + self.max_fee = max_fee async def pay(self): while self.running: @@ -33,7 +34,13 @@ class WalletServerPayer: continue amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies - # todo: check that amount is less than our max + limit = lbc_to_dewies(self.max_fee) + if amount > limit: + log.warning( + "Server asked %s LBC as daily fee, but maximum allowed is %s LBC. Skipping payment round.", + features['daily_fee'], self.max_fee + ) + continue tx = await Transaction.create( [], @@ -46,7 +53,8 @@ class WalletServerPayer: if self.analytics_manager: await self.analytics_manager.send_credits_sent() - async def start(self, ledger, default_wallet): + async def start(self, ledger, default_wallet, max_fee=None): + self.max_fee = max_fee or self.max_fee self.ledger = ledger self.wallet = default_wallet self.running = True diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 460fac052..233cbd2b5 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -65,18 +65,35 @@ class TestUsagePayment(CommandTestCase): node = SPVNode(self.conductor.spv_module, node_number=2) await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"}) + self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) + with self.assertLogs(level='WARNING') as cm: + await self.daemon.jsonrpc_wallet_reconnect() + + features = await self.ledger.network.get_server_features() + self.assertEqual(features["payment_address"], address) + self.assertEqual(features["daily_fee"], "1.1") + elapsed = 0 + while not cm.output: + await asyncio.sleep(0.1) + elapsed += 1 + if elapsed > 30: + raise TimeoutError('Nothing logged for 3 seconds.') + self.assertEqual( + cm.output, + ['WARNING:lbry.wallet.usage_payment:Server asked 1.1 LBC as daily fee, but ' + 'maximum allowed is 1.0 LBC. Skipping payment round.'] + ) + await node.stop(False) + await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.0"}) self.addCleanup(node.stop) self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) await self.daemon.jsonrpc_wallet_reconnect() - features = await self.ledger.network.get_server_features() self.assertEqual(features["payment_address"], address) - self.assertEqual(features["daily_fee"], "1.1") - - if len(history) == 0: - await asyncio.wait_for(self.on_address_update(address), timeout=1) - _, history = await self.ledger.get_local_status_and_history(address) + self.assertEqual(features["daily_fee"], "1.0") + await asyncio.wait_for(self.on_address_update(address), timeout=1) + _, history = await self.ledger.get_local_status_and_history(address) txid, nout = history[0] tx_details = await self.daemon.jsonrpc_transaction_show(txid) - self.assertEqual(tx_details.outputs[nout].amount, 110000000) + self.assertEqual(tx_details.outputs[nout].amount, 100000000) self.assertEqual(tx_details.outputs[nout].get_address(self.ledger), address) From f0e1db319ce52f9aab0237e85e6ad18e3737764c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 18 Feb 2020 13:50:41 -0300 Subject: [PATCH 10/10] make wallet server payments a component --- lbry/extras/daemon/components.py | 31 ++++++++++++++++++- lbry/wallet/manager.py | 6 ---- lbry/wallet/usage_payment.py | 9 +++--- .../blockchain/test_wallet_server_sessions.py | 7 +++-- tests/integration/other/test_cli.py | 4 +-- .../unit/components/test_component_manager.py | 1 + 6 files changed, 41 insertions(+), 17 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 55e2c30bd..fe045a6c2 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -21,6 +21,7 @@ from lbry.extras.daemon.component import Component from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage from lbry.wallet import WalletManager +from lbry.wallet.usage_payment import WalletServerPayer log = logging.getLogger(__name__) @@ -29,6 +30,7 @@ log = logging.getLogger(__name__) DATABASE_COMPONENT = "database" BLOB_COMPONENT = "blob_manager" WALLET_COMPONENT = "wallet" +WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments" DHT_COMPONENT = "dht" HASH_ANNOUNCER_COMPONENT = "hash_announcer" STREAM_MANAGER_COMPONENT = "stream_manager" @@ -154,13 +156,40 @@ class WalletComponent(Component): log.info("Starting wallet") self.wallet_manager = await WalletManager.from_lbrynet_config(self.conf) await self.wallet_manager.start() - self.wallet_manager.usage_payment_service.analytics_manager = self.component_manager.analytics_manager async def stop(self): await self.wallet_manager.stop() self.wallet_manager = None +class WalletServerPaymentsComponent(Component): + component_name = WALLET_SERVER_PAYMENTS_COMPONENT + depends_on = [WALLET_COMPONENT] + + def __init__(self, component_manager): + super().__init__(component_manager) + self.usage_payment_service = WalletServerPayer( + max_fee=self.conf.max_wallet_server_fee, analytics_manager=self.component_manager.analytics_manager, + ) + + @property + def component(self) -> typing.Optional[WalletServerPayer]: + return self.usage_payment_service + + async def start(self): + wallet_manager = self.component_manager.get_component(WALLET_COMPONENT) + await self.usage_payment_service.start(wallet_manager.ledger, wallet_manager.default_wallet) + + async def stop(self): + await self.usage_payment_service.stop() + + async def get_status(self): + return { + 'max_fee': self.usage_payment_service.max_fee, + 'running': self.usage_payment_service.running + } + + class BlobComponent(Component): component_name = BLOB_COMPONENT depends_on = [DATABASE_COMPONENT] diff --git a/lbry/wallet/manager.py b/lbry/wallet/manager.py index 5e374051c..6e8cc5db1 100644 --- a/lbry/wallet/manager.py +++ b/lbry/wallet/manager.py @@ -15,7 +15,6 @@ from .account import Account from .ledger import Ledger, LedgerRegistry from .transaction import Transaction, Output from .database import Database -from .usage_payment import WalletServerPayer from .wallet import Wallet, WalletStorage, ENCRYPT_ON_DISK from .rpc.jsonrpc import CodeMessageError @@ -34,7 +33,6 @@ class WalletManager: self.ledgers = ledgers or {} self.running = False self.config: Optional[Config] = None - self.usage_payment_service = WalletServerPayer() @classmethod def from_config(cls, config: dict) -> 'WalletManager': @@ -81,15 +79,11 @@ class WalletManager: await asyncio.gather(*( l.start() for l in self.ledgers.values() )) - await self.usage_payment_service.start( - self.ledger, self.default_wallet, self.config.max_wallet_server_fee if self.config else None - ) async def stop(self): await asyncio.gather(*( l.stop() for l in self.ledgers.values() )) - await self.usage_payment_service.stop() self.running = False def get_wallet_or_default(self, wallet_id: Optional[str]) -> Wallet: diff --git a/lbry/wallet/usage_payment.py b/lbry/wallet/usage_payment.py index b5f7130bf..167f10ed6 100644 --- a/lbry/wallet/usage_payment.py +++ b/lbry/wallet/usage_payment.py @@ -8,13 +8,13 @@ log = logging.getLogger(__name__) class WalletServerPayer: - def __init__(self, payment_period=24 * 60 * 60, max_fee='1.0'): + def __init__(self, payment_period=24 * 60 * 60, max_fee='1.0', analytics_manager=None): self.ledger = None self.wallet = None self.running = False self.task = None self.payment_period = payment_period - self.analytics_manager = None + self.analytics_manager = analytics_manager self.max_fee = max_fee async def pay(self): @@ -53,10 +53,9 @@ class WalletServerPayer: if self.analytics_manager: await self.analytics_manager.send_credits_sent() - async def start(self, ledger, default_wallet, max_fee=None): - self.max_fee = max_fee or self.max_fee + async def start(self, ledger=None, wallet=None): self.ledger = ledger - self.wallet = default_wallet + self.wallet = wallet self.running = True self.task = asyncio.ensure_future(self.pay()) self.task.add_done_callback(lambda _: log.info("Stopping wallet server payments.")) diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 233cbd2b5..1dbb7735f 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -55,9 +55,10 @@ class TestSegwitServer(IntegrationTestCase): class TestUsagePayment(CommandTestCase): async def test_single_server_payment(self): - self.manager.usage_payment_service.payment_period = 1 - await self.manager.usage_payment_service.stop() - await self.manager.usage_payment_service.start(self.ledger, self.wallet) + wallet_pay_service = self.daemon.component_manager.get_component('wallet_server_payments') + wallet_pay_service.payment_period = 1 + await wallet_pay_service.stop() + await wallet_pay_service.start(ledger=self.ledger, wallet=self.wallet) address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0] _, history = await self.ledger.get_local_status_and_history(address) diff --git a/tests/integration/other/test_cli.py b/tests/integration/other/test_cli.py index b9e6c3d28..59b629747 100644 --- a/tests/integration/other/test_cli.py +++ b/tests/integration/other/test_cli.py @@ -7,7 +7,7 @@ from lbry.extras import cli from lbry.extras.daemon.components import ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, - UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT + UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) from lbry.extras.daemon.daemon import Daemon @@ -22,7 +22,7 @@ class CLIIntegrationTest(AsyncioTestCase): conf.components_to_skip = ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, - UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT + UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) Daemon.component_attributes = {} self.daemon = Daemon(conf) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index f9d2463f5..d8d2ed5a9 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -27,6 +27,7 @@ class TestComponentManager(AsyncioTestCase): components.HashAnnouncerComponent, components.PeerProtocolServerComponent, components.StreamManagerComponent, + components.WalletServerPaymentsComponent ] ] self.component_manager = ComponentManager(Config())