use stream controller and documented errors

This commit is contained in:
Victor Shyba 2020-02-21 00:04:37 -03:00
parent f40a61cf9a
commit d2560d260c
4 changed files with 52 additions and 23 deletions

View file

@ -78,3 +78,7 @@ Code | Name | Message
701 | InvalidExchangeRateResponse | Failed to get exchange rate from {source}: {reason}
702 | CurrencyConversion | {message}
703 | InvalidCurrency | Invalid currency: {currency} is not a supported currency.
**8xx** | WalletServerPayment | **Wallet Server Payment**
801 | InvalidAddressForServerPayment | Invalid address from wallet server: '{address}' - skipping payment round.
802 | WalletLockedDuringServerPayment | Cannot spend funds with locked wallet, skipping payment round.
803 | ServerFeeHigherThanAllowedServerPayment | Server asked {daily_fee} LBC as daily fee, but maximum allowed is {max_fee} LBC. Skipping payment round.

View file

@ -377,3 +377,31 @@ class InvalidCurrencyError(CurrencyExchangeError):
def __init__(self, currency):
self.currency = currency
super().__init__(f"Invalid currency: {currency} is not a supported currency.")
class WalletServerPaymentError(BaseError):
"""
**Wallet Server Payment**
"""
class InvalidAddressForServerPaymentError(WalletServerPaymentError):
def __init__(self, address):
self.address = address
super().__init__(f"Invalid address from wallet server: '{address}' - skipping payment round.")
class WalletLockedDuringServerPaymentError(WalletServerPaymentError):
def __init__(self):
super().__init__("Cannot spend funds with locked wallet, skipping payment round.")
class ServerFeeHigherThanAllowedServerPaymentError(WalletServerPaymentError):
def __init__(self, daily_fee, max_fee):
self.daily_fee = daily_fee
self.max_fee = max_fee
super().__init__(f"Server asked {daily_fee} LBC as daily fee,"
f" but maximum allowed is {max_fee} LBC. Skipping payment round.")

View file

@ -1,7 +1,10 @@
import asyncio
import logging
from lbry.error import InvalidAddressForServerPaymentError, WalletLockedDuringServerPaymentError, \
ServerFeeHigherThanAllowedServerPaymentError
from lbry.wallet.dewies import lbc_to_dewies
from lbry.wallet.stream import StreamController
from lbry.wallet.transaction import Output, Transaction
log = logging.getLogger(__name__)
@ -16,6 +19,9 @@ class WalletServerPayer:
self.payment_period = payment_period
self.analytics_manager = analytics_manager
self.max_fee = max_fee
self._on_payment_controller = StreamController()
self.on_payment = self._on_payment_controller.stream
self.on_payment.listen(None, on_error=lambda e: logging.warning(e.args[0]))
async def pay(self):
while self.running:
@ -27,18 +33,17 @@ class WalletServerPayer:
continue
if not self.ledger.is_valid_address(address):
log.warning("Invalid address from wallet server: '%s' - skipping payment round.", address)
self._on_payment_controller.add_error(InvalidAddressForServerPaymentError(address))
continue
if self.wallet.is_locked:
log.warning("Cannot spend funds with locked wallet, skipping payment round.")
self._on_payment_controller.add_error(WalletLockedDuringServerPaymentError())
continue
amount = lbc_to_dewies(features['daily_fee']) # check that this is in lbc and not dewies
limit = lbc_to_dewies(self.max_fee)
if amount > limit:
log.warning(
"Server asked %s LBC as daily fee, but maximum allowed is %s LBC. Skipping payment round.",
features['daily_fee'], self.max_fee
self._on_payment_controller.add_error(
ServerFeeHigherThanAllowedServerPaymentError(features['daily_fee'], self.max_fee)
)
continue
@ -52,6 +57,7 @@ class WalletServerPayer:
await self.ledger.broadcast(tx)
if self.analytics_manager:
await self.analytics_manager.send_credits_sent()
self._on_payment_controller.add(tx)
async def start(self, ledger=None, wallet=None):
self.ledger = ledger

View file

@ -2,10 +2,10 @@ import asyncio
import lbry
import lbry.wallet
from lbry.error import ServerFeeHigherThanAllowedServerPaymentError
from lbry.wallet.network import ClientSession
from lbry.testcase import IntegrationTestCase, CommandTestCase, AdvanceTimeTestCase
from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.usage_payment import WalletServerPayer
class TestSessions(IntegrationTestCase):
@ -67,23 +67,14 @@ class TestUsagePayment(CommandTestCase):
node = SPVNode(self.conductor.spv_module, node_number=2)
await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.1"})
self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"])
with self.assertLogs(level='WARNING') as cm:
await self.daemon.jsonrpc_wallet_reconnect()
on_error = wallet_pay_service.on_payment.where(lambda e: isinstance(e, ServerFeeHigherThanAllowedServerPaymentError))
await self.daemon.jsonrpc_wallet_reconnect()
features = await self.ledger.network.get_server_features()
self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.1")
elapsed = 0
while not cm.output:
await asyncio.sleep(0.1)
elapsed += 1
if elapsed > 30:
raise TimeoutError('Nothing logged for 3 seconds.')
self.assertEqual(
cm.output,
['WARNING:lbry.wallet.usage_payment:Server asked 1.1 LBC as daily fee, but '
'maximum allowed is 1.0 LBC. Skipping payment round.']
)
features = await self.ledger.network.get_server_features()
self.assertEqual(features["payment_address"], address)
self.assertEqual(features["daily_fee"], "1.1")
with self.assertRaises(ServerFeeHigherThanAllowedServerPaymentError):
await asyncio.wait_for(on_error, timeout=3)
await node.stop(False)
await node.start(self.blockchain, extraconf={"PAYMENT_ADDRESS": address, "DAILY_FEE": "1.0"})
self.addCleanup(node.stop)