forked from LBRYCommunity/lbry-sdk
Catch and log exceptions coming from the pay() task.
Change test to reproduce failure.
This commit is contained in:
parent
718d046833
commit
fd69401791
2 changed files with 31 additions and 5 deletions
|
@ -1,5 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
|
|
||||||
from lbry.error import (
|
from lbry.error import (
|
||||||
ServerPaymentFeeAboveMaxAllowedError,
|
ServerPaymentFeeAboveMaxAllowedError,
|
||||||
|
@ -27,6 +29,20 @@ class WalletServerPayer:
|
||||||
self.on_payment.listen(None, on_error=lambda e: logging.warning(e.args[0]))
|
self.on_payment.listen(None, on_error=lambda e: logging.warning(e.args[0]))
|
||||||
|
|
||||||
async def pay(self):
|
async def pay(self):
|
||||||
|
while self.running:
|
||||||
|
try:
|
||||||
|
await self._pay()
|
||||||
|
except BaseException:
|
||||||
|
if self.running:
|
||||||
|
traceback.print_exception(*sys.exc_info())
|
||||||
|
log.warning("Caught exception: %s", sys.exc_info()[0].__name__)
|
||||||
|
else:
|
||||||
|
log.warning("Caught exception: %s", sys.exc_info()[0].__name__)
|
||||||
|
raise
|
||||||
|
#if not self.running:
|
||||||
|
# raise
|
||||||
|
|
||||||
|
async def _pay(self):
|
||||||
while self.running:
|
while self.running:
|
||||||
log.info("pay loop: before sleep")
|
log.info("pay loop: before sleep")
|
||||||
await asyncio.sleep(self.payment_period)
|
await asyncio.sleep(self.payment_period)
|
||||||
|
@ -80,7 +96,16 @@ class WalletServerPayer:
|
||||||
self.wallet = wallet
|
self.wallet = wallet
|
||||||
self.running = True
|
self.running = True
|
||||||
self.task = asyncio.ensure_future(self.pay())
|
self.task = asyncio.ensure_future(self.pay())
|
||||||
self.task.add_done_callback(lambda _: log.info("Stopping wallet server payments."))
|
self.task.add_done_callback(self._done_callback)
|
||||||
|
|
||||||
|
def _done_callback(self, f):
|
||||||
|
if f.cancelled():
|
||||||
|
reason = "Cancelled"
|
||||||
|
elif not self.running:
|
||||||
|
reason = "Stopped"
|
||||||
|
else:
|
||||||
|
reason = f'Exception: {f.exception()}'
|
||||||
|
log.info("Stopping wallet server payments. %s", reason)
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
if self.running:
|
if self.running:
|
||||||
|
|
|
@ -47,7 +47,8 @@ class TestSessions(IntegrationTestCase):
|
||||||
class TestUsagePayment(CommandTestCase):
|
class TestUsagePayment(CommandTestCase):
|
||||||
async def test_single_server_payment(self):
|
async def test_single_server_payment(self):
|
||||||
wallet_pay_service = self.daemon.component_manager.get_component('wallet_server_payments')
|
wallet_pay_service = self.daemon.component_manager.get_component('wallet_server_payments')
|
||||||
wallet_pay_service.payment_period = 1
|
self.assertFalse(wallet_pay_service.running)
|
||||||
|
wallet_pay_service.payment_period = 0.1
|
||||||
# only starts with a positive max key fee
|
# only starts with a positive max key fee
|
||||||
wallet_pay_service.max_fee = "0.0"
|
wallet_pay_service.max_fee = "0.0"
|
||||||
await wallet_pay_service.start(ledger=self.ledger, wallet=self.wallet)
|
await wallet_pay_service.start(ledger=self.ledger, wallet=self.wallet)
|
||||||
|
@ -73,17 +74,17 @@ class TestUsagePayment(CommandTestCase):
|
||||||
self.assertEqual(features["daily_fee"], "1.1")
|
self.assertEqual(features["daily_fee"], "1.1")
|
||||||
with self.assertRaises(ServerPaymentFeeAboveMaxAllowedError):
|
with self.assertRaises(ServerPaymentFeeAboveMaxAllowedError):
|
||||||
await asyncio.wait_for(wallet_pay_service.on_payment.first, timeout=30)
|
await asyncio.wait_for(wallet_pay_service.on_payment.first, timeout=30)
|
||||||
node.server.env.daily_fee = "1.0"
|
node.server.env.daily_fee = "0.1"
|
||||||
node.server.env.payment_address = address
|
node.server.env.payment_address = address
|
||||||
LBRYElectrumX.set_server_features(node.server.env)
|
LBRYElectrumX.set_server_features(node.server.env)
|
||||||
# self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"])
|
# self.daemon.jsonrpc_settings_set('lbryum_servers', [f"{node.hostname}:{node.port}"])
|
||||||
await self.daemon.jsonrpc_wallet_reconnect()
|
await self.daemon.jsonrpc_wallet_reconnect()
|
||||||
features = await self.ledger.network.get_server_features()
|
features = await self.ledger.network.get_server_features()
|
||||||
self.assertEqual(features["payment_address"], address)
|
self.assertEqual(features["payment_address"], address)
|
||||||
self.assertEqual(features["daily_fee"], "1.0")
|
self.assertEqual(features["daily_fee"], "0.1")
|
||||||
tx = await asyncio.wait_for(wallet_pay_service.on_payment.first, timeout=30)
|
tx = await asyncio.wait_for(wallet_pay_service.on_payment.first, timeout=30)
|
||||||
self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted
|
self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted
|
||||||
self.assertEqual(tx.outputs[0].amount, 100000000)
|
self.assertEqual(tx.outputs[0].amount, 10000000)
|
||||||
self.assertEqual(tx.outputs[0].get_address(self.ledger), address)
|
self.assertEqual(tx.outputs[0].get_address(self.ledger), address)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue