Merge pull request #2822 from lbryio/stream_controller_payment

use stream controller and documented errors for wallet server payments
This commit is contained in:
Lex Berezhny 2020-02-21 12:34:07 -05:00 committed by GitHub
commit 12a962f656
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 25 deletions

View file

@ -57,6 +57,9 @@ Code | Name | Message
420 | KeyFeeAboveMaxAllowed | {message} 420 | KeyFeeAboveMaxAllowed | {message}
421 | InvalidPassword | Password is invalid. 421 | InvalidPassword | Password is invalid.
422 | IncompatibleWalletServer | '{server}:{port}' has an incompatibly old version. 422 | IncompatibleWalletServer | '{server}:{port}' has an incompatibly old version.
431 | ServerPaymentInvalidAddress | Invalid address from wallet server: '{address}' - skipping payment round.
432 | ServerPaymentWalletLocked | Cannot spend funds with locked wallet, skipping payment round.
433 | ServerPaymentFeeAboveMaxAllowed | Daily server fee of {daily_fee} exceeds maximum configured of {max_fee} LBC.
**5xx** | Blob | **Blobs** **5xx** | Blob | **Blobs**
500 | BlobNotFound | Blob not found. 500 | BlobNotFound | Blob not found.
501 | BlobPermissionDenied | Permission denied to read blob. 501 | BlobPermissionDenied | Permission denied to read blob.

View file

@ -242,6 +242,27 @@ class IncompatibleWalletServerError(WalletError):
super().__init__(f"'{server}:{port}' has an incompatibly old version.") super().__init__(f"'{server}:{port}' has an incompatibly old version.")
class ServerPaymentInvalidAddressError(WalletError):
def __init__(self, address):
self.address = address
super().__init__(f"Invalid address from wallet server: '{address}' - skipping payment round.")
class ServerPaymentWalletLockedError(WalletError):
def __init__(self):
super().__init__("Cannot spend funds with locked wallet, skipping payment round.")
class ServerPaymentFeeAboveMaxAllowedError(WalletError):
def __init__(self, daily_fee, max_fee):
self.daily_fee = daily_fee
self.max_fee = max_fee
super().__init__(f"Daily server fee of {daily_fee} exceeds maximum configured of {max_fee} LBC.")
class BlobError(BaseError): class BlobError(BaseError):
""" """
**Blobs** **Blobs**

View file

@ -1,7 +1,13 @@
import asyncio import asyncio
import logging import logging
from lbry.error import (
ServerPaymentFeeAboveMaxAllowedError,
ServerPaymentInvalidAddressError,
ServerPaymentWalletLockedError
)
from lbry.wallet.dewies import lbc_to_dewies from lbry.wallet.dewies import lbc_to_dewies
from lbry.wallet.stream import StreamController
from lbry.wallet.transaction import Output, Transaction from lbry.wallet.transaction import Output, Transaction
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -16,6 +22,9 @@ class WalletServerPayer:
self.payment_period = payment_period self.payment_period = payment_period
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self.max_fee = max_fee self.max_fee = max_fee
self._on_payment_controller = StreamController()
self.on_payment = self._on_payment_controller.stream
self.on_payment.listen(None, on_error=lambda e: logging.warning(e.args[0]))
async def pay(self): async def pay(self):
while self.running: while self.running:
@ -27,18 +36,18 @@ class WalletServerPayer:
continue continue
if not self.ledger.is_valid_address(address): if not self.ledger.is_valid_address(address):
log.warning("Invalid address from wallet server: '%s' - skipping payment round.", address) self._on_payment_controller.add_error(ServerPaymentInvalidAddressError(address))
continue continue
if self.wallet.is_locked: if self.wallet.is_locked:
log.warning("Cannot spend funds with locked wallet, skipping payment round.") self._on_payment_controller.add_error(ServerPaymentWalletLockedError())
continue continue
amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies
limit = lbc_to_dewies(self.max_fee) limit = lbc_to_dewies(self.max_fee)
if amount > limit: if amount > limit:
log.warning( self._on_payment_controller.add_error(
"Server asked %s LBC as daily fee, but maximum allowed is %s LBC. Skipping payment round.", ServerPaymentFeeAboveMaxAllowedError(features['daily_fee'], self.max_fee)
features['daily_fee'], self.max_fee
) )
continue continue
@ -52,6 +61,7 @@ class WalletServerPayer:
await self.ledger.broadcast(tx) await self.ledger.broadcast(tx)
if self.analytics_manager: if self.analytics_manager:
await self.analytics_manager.send_credits_sent() await self.analytics_manager.send_credits_sent()
self._on_payment_controller.add(tx)
async def start(self, ledger=None, wallet=None): async def start(self, ledger=None, wallet=None):
self.ledger = ledger self.ledger = ledger

View file

@ -2,10 +2,10 @@ import asyncio
import lbry import lbry
import lbry.wallet import lbry.wallet
from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
from lbry.testcase import IntegrationTestCase, CommandTestCase, AdvanceTimeTestCase from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.usage_payment import WalletServerPayer
class TestSessions(IntegrationTestCase): class TestSessions(IntegrationTestCase):
@ -66,33 +66,27 @@ class TestUsagePayment(CommandTestCase):
node = SPVNode(self.conductor.spv_module, node_number=2) node = SPVNode(self.conductor.spv_module, node_number=2)
await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"}) await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"})
self.addCleanup(node.stop)
self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"])
with self.assertLogs(level='WARNING') as cm: await self.daemon.jsonrpc_wallet_reconnect()
await self.daemon.jsonrpc_wallet_reconnect()
features = await self.ledger.network.get_server_features()
self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.1")
with self.assertRaises(ServerPaymentFeeAboveMaxAllowedError):
await asyncio.wait_for(wallet_pay_service.on_payment.first, timeout=3)
features = await self.ledger.network.get_server_features()
self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.1")
elapsed = 0
while not cm.output:
await asyncio.sleep(0.1)
elapsed += 1
if elapsed > 30:
raise TimeoutError('Nothing logged for 3 seconds.')
self.assertEqual(
cm.output,
['WARNING:lbry.wallet.usage_payment:Server asked 1.1 LBC as daily fee, but '
'maximum allowed is 1.0 LBC. Skipping payment round.']
)
await node.stop(False) await node.stop(False)
await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.0"}) await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.0"})
self.addCleanup(node.stop)
self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"]) self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"])
await self.daemon.jsonrpc_wallet_reconnect() await self.daemon.jsonrpc_wallet_reconnect()
features = await self.ledger.network.get_server_features() features = await self.ledger.network.get_server_features()
self.assertEqual(features["payment_address"], address) self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.0") self.assertEqual(features["daily_fee"], "1.0")
await asyncio.wait_for(self.on_address_update(address), timeout=1) await asyncio.wait([
wallet_pay_service.on_payment.first,
self.on_address_update(address)
], timeout=3)
_, history = await self.ledger.get_local_status_and_history(address) _, history = await self.ledger.get_local_status_and_history(address)
txid, nout = history[0] txid, nout = history[0]
tx_details = await self.daemon.jsonrpc_transaction_show(txid) tx_details = await self.daemon.jsonrpc_transaction_show(txid)