Merge pull request #2707 from lbryio/pay_wallet_servers

wallet servers requiring a daily fee will automatically get paid by client
This commit is contained in:
Lex Berezhny 2020-02-18 17:40:23 -05:00 committed by GitHub
commit 4b87cb45ee
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 191 additions and 15 deletions

View file

@ -558,6 +558,7 @@ class Config(CLIConfig):
"Don't download streams with fees exceeding this amount. When set to " "Don't download streams with fees exceeding this amount. When set to "
"null, the amount is unbounded.", {'currency': 'USD', 'amount': 50.0} "null, the amount is unbounded.", {'currency': 'USD', 'amount': 50.0}
) )
max_wallet_server_fee = String("Maximum daily LBC amount allowed as payment for wallet servers.", "1.0")
# reflector settings # reflector settings
reflect_streams = Toggle( reflect_streams = Toggle(

View file

@ -21,6 +21,7 @@ from lbry.extras.daemon.component import Component
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import WalletManager from lbry.wallet import WalletManager
from lbry.wallet.usage_payment import WalletServerPayer
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -29,6 +30,7 @@ log = logging.getLogger(__name__)
DATABASE_COMPONENT = "database" DATABASE_COMPONENT = "database"
BLOB_COMPONENT = "blob_manager" BLOB_COMPONENT = "blob_manager"
WALLET_COMPONENT = "wallet" WALLET_COMPONENT = "wallet"
WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments"
DHT_COMPONENT = "dht" DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer" HASH_ANNOUNCER_COMPONENT = "hash_announcer"
STREAM_MANAGER_COMPONENT = "stream_manager" STREAM_MANAGER_COMPONENT = "stream_manager"
@ -160,6 +162,34 @@ class WalletComponent(Component):
self.wallet_manager = None self.wallet_manager = None
class WalletServerPaymentsComponent(Component):
component_name = WALLET_SERVER_PAYMENTS_COMPONENT
depends_on = [WALLET_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
self.usage_payment_service = WalletServerPayer(
max_fee=self.conf.max_wallet_server_fee, analytics_manager=self.component_manager.analytics_manager,
)
@property
def component(self) -> typing.Optional[WalletServerPayer]:
return self.usage_payment_service
async def start(self):
wallet_manager = self.component_manager.get_component(WALLET_COMPONENT)
await self.usage_payment_service.start(wallet_manager.ledger, wallet_manager.default_wallet)
async def stop(self):
await self.usage_payment_service.stop()
async def get_status(self):
return {
'max_fee': self.usage_payment_service.max_fee,
'running': self.usage_payment_service.running
}
class BlobComponent(Component): class BlobComponent(Component):
component_name = BLOB_COMPONENT component_name = BLOB_COMPONENT
depends_on = [DATABASE_COMPONENT] depends_on = [DATABASE_COMPONENT]

View file

@ -253,6 +253,11 @@ class IntegrationTestCase(AsyncioTestCase):
lambda e: e.tx.id == txid lambda e: e.tx.id == txid
) )
def on_address_update(self, address):
return self.ledger.on_transaction.where(
lambda e: e.address == address
)
def on_transaction_address(self, tx, address): def on_transaction_address(self, tx, address):
return self.ledger.on_transaction.where( return self.ledger.on_transaction.where(
lambda e: e.tx.id == tx.id and e.address == address lambda e: e.tx.id == tx.id and e.address == address

View file

@ -173,6 +173,9 @@ class SPVNode:
async def start(self, blockchain_node: 'BlockchainNode', extraconf=None): async def start(self, blockchain_node: 'BlockchainNode', extraconf=None):
self.data_path = tempfile.mkdtemp() self.data_path = tempfile.mkdtemp()
conf = { conf = {
'DESCRIPTION': '',
'PAYMENT_ADDRESS': '',
'DAILY_FEE': '0',
'DB_DIRECTORY': self.data_path, 'DB_DIRECTORY': self.data_path,
'DAEMON_URL': blockchain_node.rpc_url, 'DAEMON_URL': blockchain_node.rpc_url,
'REORG_LIMIT': '100', 'REORG_LIMIT': '100',

View file

@ -72,6 +72,7 @@ class Env:
self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost')
self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)
# The electrum client takes the empty string as unspecified # The electrum client takes the empty string as unspecified
self.payment_address = self.default('PAYMENT_ADDRESS', '')
self.donation_address = self.default('DONATION_ADDRESS', '') self.donation_address = self.default('DONATION_ADDRESS', '')
# Server limits to help prevent DoS # Server limits to help prevent DoS
self.max_send = self.integer('MAX_SEND', 1000000) self.max_send = self.integer('MAX_SEND', 1000000)
@ -81,7 +82,7 @@ class Env:
self.session_timeout = self.integer('SESSION_TIMEOUT', 600) self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
self.drop_client = self.custom("DROP_CLIENT", None, re.compile) self.drop_client = self.custom("DROP_CLIENT", None, re.compile)
self.description = self.default('DESCRIPTION', '') self.description = self.default('DESCRIPTION', '')
self.daily_fee = self.integer('DAILY_FEE', 0) self.daily_fee = self.string_amount('DAILY_FEE', '0')
# Identities # Identities
clearnet_identity = self.clearnet_identity() clearnet_identity = self.clearnet_identity()
@ -107,6 +108,14 @@ class Env:
raise cls.Error(f'required envvar {envvar} not set') raise cls.Error(f'required envvar {envvar} not set')
return value return value
@classmethod
def string_amount(cls, envvar, default):
value = environ.get(envvar, default)
amount_pattern = re.compile("[0-9]{0,10}(\.[0-9]{1,8})?")
if len(value) > 0 and not amount_pattern.fullmatch(value):
raise cls.Error(f'{value} is not a valid amount for {envvar}')
return value
@classmethod @classmethod
def integer(cls, envvar, default): def integer(cls, envvar, default):
value = environ.get(envvar) value = environ.get(envvar)

View file

@ -800,6 +800,7 @@ class LBRYElectrumX(SessionBase):
'blockchain.transaction.get_merkle': cls.transaction_merkle, 'blockchain.transaction.get_merkle': cls.transaction_merkle,
'server.add_peer': cls.add_peer, 'server.add_peer': cls.add_peer,
'server.banner': cls.banner, 'server.banner': cls.banner,
'server.payment_address': cls.payment_address,
'server.donation_address': cls.donation_address, 'server.donation_address': cls.donation_address,
'server.features': cls.server_features_async, 'server.features': cls.server_features_async,
'server.peers.subscribe': cls.peers_subscribe, 'server.peers.subscribe': cls.peers_subscribe,
@ -854,7 +855,8 @@ class LBRYElectrumX(SessionBase):
'protocol_max': max_str, 'protocol_max': max_str,
'genesis_hash': env.coin.GENESIS_HASH, 'genesis_hash': env.coin.GENESIS_HASH,
'description': env.description, 'description': env.description,
'payment_address': env.donation_address, 'payment_address': env.payment_address,
'donation_address': env.donation_address,
'daily_fee': env.daily_fee, 'daily_fee': env.daily_fee,
'hash_function': 'sha256', 'hash_function': 'sha256',
} }
@ -1362,11 +1364,16 @@ class LBRYElectrumX(SessionBase):
('$SERVER_VERSION', self.version), ('$SERVER_VERSION', self.version),
('$DAEMON_VERSION', daemon_version), ('$DAEMON_VERSION', daemon_version),
('$DAEMON_SUBVERSION', network_info['subversion']), ('$DAEMON_SUBVERSION', network_info['subversion']),
('$PAYMENT_ADDRESS', self.env.payment_address),
('$DONATION_ADDRESS', self.env.donation_address), ('$DONATION_ADDRESS', self.env.donation_address),
]: ]:
banner = banner.replace(*pair) banner = banner.replace(*pair)
return banner return banner
async def payment_address(self):
"""Return the payment address as a string, empty if there is none."""
return self.env.payment_address
async def donation_address(self): async def donation_address(self):
"""Return the donation address as a string, empty if there is none.""" """Return the donation address as a string, empty if there is none."""
return self.env.donation_address return self.env.donation_address

View file

@ -0,0 +1,66 @@
import asyncio
import logging
from lbry.wallet.dewies import lbc_to_dewies
from lbry.wallet.transaction import Output, Transaction
log = logging.getLogger(__name__)
class WalletServerPayer:
def __init__(self, payment_period=24 * 60 * 60, max_fee='1.0', analytics_manager=None):
self.ledger = None
self.wallet = None
self.running = False
self.task = None
self.payment_period = payment_period
self.analytics_manager = analytics_manager
self.max_fee = max_fee
async def pay(self):
while self.running:
await asyncio.sleep(self.payment_period)
features = await self.ledger.network.get_server_features()
address = features['payment_address']
amount = str(features['daily_fee'])
if not address or not amount:
continue
if not self.ledger.is_valid_address(address):
log.warning("Invalid address from wallet server: '%s' - skipping payment round.", address)
continue
if self.wallet.is_locked:
log.warning("Cannot spend funds with locked wallet, skipping payment round.")
continue
amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies
limit = lbc_to_dewies(self.max_fee)
if amount > limit:
log.warning(
"Server asked %s LBC as daily fee, but maximum allowed is %s LBC. Skipping payment round.",
features['daily_fee'], self.max_fee
)
continue
tx = await Transaction.create(
[],
[Output.pay_pubkey_hash(amount, self.ledger.address_to_hash160(address))],
self.wallet.get_accounts_or_all(None),
self.wallet.get_account_or_default(None)
)
await self.ledger.broadcast(tx)
if self.analytics_manager:
await self.analytics_manager.send_credits_sent()
async def start(self, ledger=None, wallet=None):
self.ledger = ledger
self.wallet = wallet
self.running = True
self.task = asyncio.ensure_future(self.pay())
self.task.add_done_callback(lambda _: log.info("Stopping wallet server payments."))
async def stop(self):
if self.running:
self.running = False
self.task.cancel()

View file

@ -1,5 +1,4 @@
import logging import logging
import os
import asyncio import asyncio
import lbry import lbry
@ -29,15 +28,20 @@ class NetworkTests(IntegrationTestCase):
'pruning': None, 'pruning': None,
'description': '', 'description': '',
'payment_address': '', 'payment_address': '',
'daily_fee': 0, 'donation_address': '',
'daily_fee': '0',
'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) 'server_version': lbry.__version__}, await self.ledger.network.get_server_features())
await self.conductor.spv_node.stop() await self.conductor.spv_node.stop()
address = (await self.account.get_addresses(limit=1))[0] payment_address, donation_address = await self.account.get_addresses(limit=2)
os.environ.update({ await self.conductor.spv_node.start(
self.conductor.blockchain_node,
extraconf={
'DESCRIPTION': 'Fastest server in the west.', 'DESCRIPTION': 'Fastest server in the west.',
'DONATION_ADDRESS': address, 'PAYMENT_ADDRESS': payment_address,
'DAILY_FEE': '42'}) 'DONATION_ADDRESS': donation_address,
await self.conductor.spv_node.start(self.conductor.blockchain_node) 'DAILY_FEE': '42'
}
)
await self.ledger.network.on_connected.first await self.ledger.network.on_connected.first
self.assertDictEqual({ self.assertDictEqual({
'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH, 'genesis_hash': self.conductor.spv_node.coin_class.GENESIS_HASH,
@ -47,8 +51,9 @@ class NetworkTests(IntegrationTestCase):
'protocol_min': '0.54.0', 'protocol_min': '0.54.0',
'pruning': None, 'pruning': None,
'description': 'Fastest server in the west.', 'description': 'Fastest server in the west.',
'payment_address': address, 'payment_address': payment_address,
'daily_fee': 42, 'donation_address': donation_address,
'daily_fee': '42',
'server_version': lbry.__version__}, await self.ledger.network.get_server_features()) 'server_version': lbry.__version__}, await self.ledger.network.get_server_features())

View file

@ -3,7 +3,9 @@ import asyncio
import lbry import lbry
import lbry.wallet import lbry.wallet
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
from lbry.testcase import IntegrationTestCase from lbry.testcase import IntegrationTestCase, CommandTestCase, AdvanceTimeTestCase
from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.usage_payment import WalletServerPayer
class TestSessions(IntegrationTestCase): class TestSessions(IntegrationTestCase):
@ -49,3 +51,50 @@ class TestSegwitServer(IntegrationTestCase):
async def test_at_least_it_starts(self): async def test_at_least_it_starts(self):
await asyncio.wait_for(self.ledger.network.get_headers(0, 1), 1.0) await asyncio.wait_for(self.ledger.network.get_headers(0, 1), 1.0)
class TestUsagePayment(CommandTestCase):
async def test_single_server_payment(self):
wallet_pay_service = self.daemon.component_manager.get_component('wallet_server_payments')
wallet_pay_service.payment_period = 1
await wallet_pay_service.stop()
await wallet_pay_service.start(ledger=self.ledger, wallet=self.wallet)
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
_, history = await self.ledger.get_local_status_and_history(address)
self.assertEqual(history, [])
node = SPVNode(self.conductor.spv_module, node_number=2)
await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"})
self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"])
with self.assertLogs(level='WARNING') as cm:
await self.daemon.jsonrpc_wallet_reconnect()
features = await self.ledger.network.get_server_features()
self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.1")
elapsed = 0
while not cm.output:
await asyncio.sleep(0.1)
elapsed += 1
if elapsed > 30:
raise TimeoutError('Nothing logged for 3 seconds.')
self.assertEqual(
cm.output,
['WARNING:lbry.wallet.usage_payment:Server asked 1.1 LBC as daily fee, but '
'maximum allowed is 1.0 LBC. Skipping payment round.']
)
await node.stop(False)
await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.0"})
self.addCleanup(node.stop)
self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"])
await self.daemon.jsonrpc_wallet_reconnect()
features = await self.ledger.network.get_server_features()
self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.0")
await asyncio.wait_for(self.on_address_update(address), timeout=1)
_, history = await self.ledger.get_local_status_and_history(address)
txid, nout = history[0]
tx_details = await self.daemon.jsonrpc_transaction_show(txid)
self.assertEqual(tx_details.outputs[nout].amount, 100000000)
self.assertEqual(tx_details.outputs[nout].get_address(self.ledger), address)

View file

@ -7,7 +7,7 @@ from lbry.extras import cli
from lbry.extras.daemon.components import ( from lbry.extras.daemon.components import (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT
) )
from lbry.extras.daemon.daemon import Daemon from lbry.extras.daemon.daemon import Daemon
@ -22,7 +22,7 @@ class CLIIntegrationTest(AsyncioTestCase):
conf.components_to_skip = ( conf.components_to_skip = (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT
) )
Daemon.component_attributes = {} Daemon.component_attributes = {}
self.daemon = Daemon(conf) self.daemon = Daemon(conf)

View file

@ -27,6 +27,7 @@ class TestComponentManager(AsyncioTestCase):
components.HashAnnouncerComponent, components.HashAnnouncerComponent,
components.PeerProtocolServerComponent, components.PeerProtocolServerComponent,
components.StreamManagerComponent, components.StreamManagerComponent,
components.WalletServerPaymentsComponent
] ]
] ]
self.component_manager = ComponentManager(Config()) self.component_manager = ComponentManager(Config())