lbry-sdk/lbry/testcase.py
2022-01-17 15:07:14 -05:00

711 lines
27 KiB
Python

import os
import sys
import json
import shutil
import logging
import tempfile
import functools
import asyncio
from asyncio.runners import _cancel_all_tasks # type: ignore
import unittest
from unittest.case import _Outcome
from typing import Optional
from time import time
from binascii import unhexlify
from functools import partial
from lbry.wallet import WalletManager, Wallet, Ledger, Account, Transaction
from lbry.conf import Config
from lbry.wallet.util import satoshis_to_coins
from lbry.wallet.dewies import lbc_to_dewies
from lbry.wallet.orchstr8 import Conductor
from lbry.wallet.orchstr8.node import LBCWalletNode, WalletNode, HubNode
from lbry.schema.claim import Claim
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
from lbry.extras.daemon.components import Component, WalletComponent
from lbry.extras.daemon.components import (
DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, LIBTORRENT_COMPONENT
)
from lbry.extras.daemon.componentmanager import ComponentManager
from lbry.extras.daemon.exchange_rate_manager import (
ExchangeRateManager, ExchangeRate, BittrexBTCFeed, BittrexUSDFeed
)
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.blob.blob_manager import BlobManager
from lbry.stream.reflector.server import ReflectorServer
from lbry.blob_exchange.server import BlobServer
class ColorHandler(logging.StreamHandler):
level_color = {
logging.DEBUG: "black",
logging.INFO: "light_gray",
logging.WARNING: "yellow",
logging.ERROR: "red"
}
color_code = dict(
black=30,
red=31,
green=32,
yellow=33,
blue=34,
magenta=35,
cyan=36,
white=37,
light_gray='0;37',
dark_gray='1;30'
)
def emit(self, record):
try:
msg = self.format(record)
color_name = self.level_color.get(record.levelno, "black")
color_code = self.color_code[color_name]
stream = self.stream
stream.write(f'\x1b[{color_code}m{msg}\x1b[0m')
stream.write(self.terminator)
self.flush()
except Exception:
self.handleError(record)
HANDLER = ColorHandler(sys.stdout)
HANDLER.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logging.getLogger().addHandler(HANDLER)
class AsyncioTestCase(unittest.TestCase):
# Implementation inspired by discussion:
# https://bugs.python.org/issue32972
LOOP_SLOW_CALLBACK_DURATION = 0.2
TIMEOUT = 120.0
maxDiff = None
async def asyncSetUp(self): # pylint: disable=C0103
pass
async def asyncTearDown(self): # pylint: disable=C0103
pass
def run(self, result=None): # pylint: disable=R0915
orig_result = result
if result is None:
result = self.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None) # pylint: disable=C0103
if startTestRun is not None:
startTestRun()
result.startTest(self)
testMethod = getattr(self, self._testMethodName) # pylint: disable=C0103
if (getattr(self.__class__, "__unittest_skip__", False) or
getattr(testMethod, "__unittest_skip__", False)):
# If the class or method was skipped.
try:
skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
or getattr(testMethod, '__unittest_skip_why__', ''))
self._addSkip(result, self, skip_why)
finally:
result.stopTest(self)
return
expecting_failure_method = getattr(testMethod,
"__unittest_expecting_failure__", False)
expecting_failure_class = getattr(self,
"__unittest_expecting_failure__", False)
expecting_failure = expecting_failure_class or expecting_failure_method
outcome = _Outcome(result)
self.loop = asyncio.new_event_loop() # pylint: disable=W0201
asyncio.set_event_loop(self.loop)
self.loop.set_debug(True)
self.loop.slow_callback_duration = self.LOOP_SLOW_CALLBACK_DURATION
try:
self._outcome = outcome
with outcome.testPartExecutor(self):
self.setUp()
self.add_timeout()
self.loop.run_until_complete(self.asyncSetUp())
if outcome.success:
outcome.expecting_failure = expecting_failure
with outcome.testPartExecutor(self, isTest=True):
maybe_coroutine = testMethod()
if asyncio.iscoroutine(maybe_coroutine):
self.add_timeout()
self.loop.run_until_complete(maybe_coroutine)
outcome.expecting_failure = False
with outcome.testPartExecutor(self):
self.add_timeout()
self.loop.run_until_complete(self.asyncTearDown())
self.tearDown()
self.doAsyncCleanups()
try:
_cancel_all_tasks(self.loop)
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
self.loop.close()
for test, reason in outcome.skipped:
self._addSkip(result, test, reason)
self._feedErrorsToResult(result, outcome.errors)
if outcome.success:
if expecting_failure:
if outcome.expectedFailure:
self._addExpectedFailure(result, outcome.expectedFailure)
else:
self._addUnexpectedSuccess(result)
else:
result.addSuccess(self)
return result
finally:
result.stopTest(self)
if orig_result is None:
stopTestRun = getattr(result, 'stopTestRun', None) # pylint: disable=C0103
if stopTestRun is not None:
stopTestRun() # pylint: disable=E1102
# explicitly break reference cycles:
# outcome.errors -> frame -> outcome -> outcome.errors
# outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure
outcome.errors.clear()
outcome.expectedFailure = None
# clear the outcome, no more needed
self._outcome = None
def doAsyncCleanups(self): # pylint: disable=C0103
outcome = self._outcome or _Outcome()
while self._cleanups:
function, args, kwargs = self._cleanups.pop()
with outcome.testPartExecutor(self):
maybe_coroutine = function(*args, **kwargs)
if asyncio.iscoroutine(maybe_coroutine):
self.add_timeout()
self.loop.run_until_complete(maybe_coroutine)
def cancel(self):
for task in asyncio.all_tasks(self.loop):
if not task.done():
task.print_stack()
task.cancel()
def add_timeout(self):
if self.TIMEOUT:
self.loop.call_later(self.TIMEOUT, self.cancel)
class AdvanceTimeTestCase(AsyncioTestCase):
async def asyncSetUp(self):
self._time = 0 # pylint: disable=W0201
self.loop.time = functools.wraps(self.loop.time)(lambda: self._time)
await super().asyncSetUp()
async def advance(self, seconds):
while self.loop._ready:
await asyncio.sleep(0)
self._time += seconds
await asyncio.sleep(0)
while self.loop._ready:
await asyncio.sleep(0)
class IntegrationTestCase(AsyncioTestCase):
SEED = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conductor: Optional[Conductor] = None
self.blockchain: Optional[LBCWalletNode] = None
self.hub: Optional[HubNode] = None
self.wallet_node: Optional[WalletNode] = None
self.manager: Optional[WalletManager] = None
self.ledger: Optional[Ledger] = None
self.wallet: Optional[Wallet] = None
self.account: Optional[Account] = None
async def asyncSetUp(self):
self.conductor = Conductor(seed=self.SEED)
await self.conductor.start_lbcd()
self.addCleanup(self.conductor.stop_lbcd)
await self.conductor.start_lbcwallet()
self.addCleanup(self.conductor.stop_lbcwallet)
await self.conductor.start_spv()
self.addCleanup(self.conductor.stop_spv)
await self.conductor.start_wallet()
self.addCleanup(self.conductor.stop_wallet)
await self.conductor.start_hub()
self.addCleanup(self.conductor.stop_hub)
self.blockchain = self.conductor.lbcwallet_node
self.hub = self.conductor.hub_node
self.wallet_node = self.conductor.wallet_node
self.manager = self.wallet_node.manager
self.ledger = self.wallet_node.ledger
self.wallet = self.wallet_node.wallet
self.account = self.wallet_node.wallet.default_account
async def assertBalance(self, account, expected_balance: str): # pylint: disable=C0103
balance = await account.get_balance()
self.assertEqual(satoshis_to_coins(balance), expected_balance)
def broadcast(self, tx):
return self.ledger.broadcast(tx)
async def broadcast_and_confirm(self, tx, ledger=None):
ledger = ledger or self.ledger
notifications = asyncio.create_task(ledger.wait(tx))
await ledger.broadcast(tx)
await notifications
await self.generate_and_wait(1, [tx.id], ledger)
async def on_header(self, height):
if self.ledger.headers.height < height:
await self.ledger.on_header.where(
lambda e: e.height == height
)
return True
async def send_to_address_and_wait(self, address, amount, blocks_to_generate=0, ledger=None):
tx_watch = []
txid = None
done = False
watcher = (ledger or self.ledger).on_transaction.where(
lambda e: e.tx.id == txid or done or tx_watch.append(e.tx.id)
)
txid = await self.blockchain.send_to_address(address, amount)
done = txid in tx_watch
await watcher
await self.generate_and_wait(blocks_to_generate, [txid], ledger)
return txid
async def generate_and_wait(self, blocks_to_generate, txids, ledger=None):
if blocks_to_generate > 0:
watcher = (ledger or self.ledger).on_transaction.where(
lambda e: ((e.tx.id in txids and txids.remove(e.tx.id)), len(txids) <= 0)[-1] # multi-statement lambda
)
await self.blockchain.generate(blocks_to_generate)
await watcher
def on_address_update(self, address):
return self.ledger.on_transaction.where(
lambda e: e.address == address
)
def on_transaction_address(self, tx, address):
return self.ledger.on_transaction.where(
lambda e: e.tx.id == tx.id and e.address == address
)
class FakeExchangeRateManager(ExchangeRateManager):
def __init__(self, market_feeds, rates): # pylint: disable=super-init-not-called
self.market_feeds = market_feeds
for feed in self.market_feeds:
feed.last_check = time()
feed.rate = ExchangeRate(feed.market, rates[feed.market], time())
def start(self):
pass
def stop(self):
pass
def get_fake_exchange_rate_manager(rates=None):
return FakeExchangeRateManager(
[BittrexBTCFeed(), BittrexUSDFeed()],
rates or {'BTCLBC': 3.0, 'USDLBC': 2.0}
)
class ExchangeRateManagerComponent(Component):
component_name = EXCHANGE_RATE_MANAGER_COMPONENT
def __init__(self, component_manager, rates=None):
super().__init__(component_manager)
self.exchange_rate_manager = get_fake_exchange_rate_manager(rates)
@property
def component(self) -> ExchangeRateManager:
return self.exchange_rate_manager
async def start(self):
self.exchange_rate_manager.start()
async def stop(self):
self.exchange_rate_manager.stop()
class CommandTestCase(IntegrationTestCase):
VERBOSITY = logging.WARN
blob_lru_cache_size = 0
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.daemon = None
self.daemons = []
self.server_config = None
self.server_storage = None
self.extra_wallet_nodes = []
self.extra_wallet_node_port = 5280
self.server_blob_manager = None
self.server = None
self.reflector = None
self.skip_libtorrent = True
async def asyncSetUp(self):
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY)
logging.getLogger('lbry.stream').setLevel(self.VERBOSITY)
logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY)
await super().asyncSetUp()
self.daemon = await self.add_daemon(self.wallet_node)
await self.account.ensure_address_gap()
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
await self.send_to_address_and_wait(address, 10, 6)
server_tmp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, server_tmp_dir)
self.server_config = Config(
data_dir=server_tmp_dir,
wallet_dir=server_tmp_dir,
save_files=True,
download_dir=server_tmp_dir
)
self.server_config.transaction_cache_size = 10000
self.server_storage = SQLiteStorage(self.server_config, ':memory:')
await self.server_storage.open()
self.server_blob_manager = BlobManager(self.loop, server_tmp_dir, self.server_storage, self.server_config)
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
self.server.start_server(5567, '127.0.0.1')
await self.server.started_listening.wait()
self.reflector = ReflectorServer(self.server_blob_manager)
self.reflector.start_server(5566, '127.0.0.1')
await self.reflector.started_listening.wait()
self.addCleanup(self.reflector.stop_server)
async def asyncTearDown(self):
await super().asyncTearDown()
for wallet_node in self.extra_wallet_nodes:
await wallet_node.stop(cleanup=True)
for daemon in self.daemons:
daemon.component_manager.get_component('wallet')._running = False
await daemon.stop()
async def add_daemon(self, wallet_node=None, seed=None):
start_wallet_node = False
if wallet_node is None:
wallet_node = WalletNode(
self.wallet_node.manager_class,
self.wallet_node.ledger_class,
port=self.extra_wallet_node_port
)
self.extra_wallet_node_port += 1
start_wallet_node = True
upload_dir = os.path.join(wallet_node.data_path, 'uploads')
os.mkdir(upload_dir)
conf = Config(
# needed during instantiation to access known_hubs path
data_dir=wallet_node.data_path,
wallet_dir=wallet_node.data_path,
save_files=True,
download_dir=wallet_node.data_path
)
conf.upload_dir = upload_dir # not a real conf setting
conf.share_usage_data = False
conf.use_upnp = False
conf.reflect_streams = True
conf.blockchain_name = 'lbrycrd_regtest'
conf.lbryum_servers = [(self.conductor.spv_node.hostname, self.conductor.spv_node.port)]
conf.reflector_servers = [('127.0.0.1', 5566)]
conf.fixed_peers = [('127.0.0.1', 5567)]
conf.known_dht_nodes = []
conf.blob_lru_cache_size = self.blob_lru_cache_size
conf.transaction_cache_size = 10000
conf.components_to_skip = [
DHT_COMPONENT, UPNP_COMPONENT, HASH_ANNOUNCER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT
]
if self.skip_libtorrent:
conf.components_to_skip.append(LIBTORRENT_COMPONENT)
if start_wallet_node:
await wallet_node.start(self.conductor.spv_node, seed=seed, config=conf)
self.extra_wallet_nodes.append(wallet_node)
else:
wallet_node.manager.config = conf
wallet_node.manager.ledger.config['known_hubs'] = conf.known_hubs
def wallet_maker(component_manager):
wallet_component = WalletComponent(component_manager)
wallet_component.wallet_manager = wallet_node.manager
wallet_component._running = True
return wallet_component
daemon = Daemon(conf, ComponentManager(
conf, skip_components=conf.components_to_skip, wallet=wallet_maker,
exchange_rate_manager=partial(ExchangeRateManagerComponent, rates={
'BTCLBC': 1.0, 'USDLBC': 2.0
})
))
await daemon.initialize()
self.daemons.append(daemon)
wallet_node.manager.old_db = daemon.storage
return daemon
async def confirm_tx(self, txid, ledger=None):
""" Wait for tx to be in mempool, then generate a block, wait for tx to be in a block. """
# actually, if it's in the mempool or in the block we're fine
await self.generate_and_wait(1, [txid], ledger=ledger)
return txid
async def on_transaction_dict(self, tx):
await self.ledger.wait(Transaction(unhexlify(tx['hex'])))
@staticmethod
def get_all_addresses(tx):
addresses = set()
for txi in tx['inputs']:
addresses.add(txi['address'])
for txo in tx['outputs']:
addresses.add(txo['address'])
return list(addresses)
async def generate(self, blocks):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
await self.blockchain.generate(blocks)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True):
txid = await self.blockchain._cli_cmnd('claimname', name, value, amount)
if confirm:
await self.generate(1)
return txid
async def blockchain_update_name(self, txid: str, value: str, amount: str, confirm=True):
txid = await self.blockchain._cli_cmnd('updateclaim', txid, value, amount)
if confirm:
await self.generate(1)
return txid
async def out(self, awaitable):
""" Serializes lbrynet API results to JSON then loads and returns it as dictionary. """
return json.loads(jsonrpc_dumps_pretty(await awaitable, ledger=self.ledger))['result']
def sout(self, value):
""" Synchronous version of `out` method. """
return json.loads(jsonrpc_dumps_pretty(value, ledger=self.ledger))['result']
async def confirm_and_render(self, awaitable, confirm, return_tx=False) -> Transaction:
tx = await awaitable
if confirm:
await self.ledger.wait(tx)
await self.generate(1)
await self.ledger.wait(tx, self.blockchain.block_expected)
if not return_tx:
return self.sout(tx)
return tx
async def create_nondeterministic_channel(self, name, price, pubkey_bytes, daemon=None, blocking=False):
account = (daemon or self.daemon).wallet_manager.default_account
claim_address = await account.receiving.get_or_create_usable_address()
claim = Claim()
claim.channel.public_key_bytes = pubkey_bytes
tx = await Transaction.claim_create(
name, claim, lbc_to_dewies(price),
claim_address, [self.account], self.account
)
await tx.sign([self.account])
await (daemon or self.daemon).broadcast_or_release(tx, blocking)
return self.sout(tx)
def create_upload_file(self, data, prefix=None, suffix=None):
file_path = tempfile.mktemp(prefix=prefix or "tmp", suffix=suffix or "", dir=self.daemon.conf.upload_dir)
with open(file_path, 'w+b') as file:
file.write(data)
file.flush()
return file.name
async def stream_create(
self, name='hovercraft', bid='1.0', file_path=None,
data=b'hi!', confirm=True, prefix=None, suffix=None, return_tx=False, **kwargs):
if file_path is None and data is not None:
file_path = self.create_upload_file(data=data, prefix=prefix, suffix=suffix)
return await self.confirm_and_render(
self.daemon.jsonrpc_stream_create(name, bid, file_path=file_path, **kwargs), confirm, return_tx
)
async def stream_update(
self, claim_id, data=None, prefix=None, suffix=None, confirm=True, return_tx=False, **kwargs):
if data is not None:
file_path = self.create_upload_file(data=data, prefix=prefix, suffix=suffix)
return await self.confirm_and_render(
self.daemon.jsonrpc_stream_update(claim_id, file_path=file_path, **kwargs), confirm, return_tx
)
return await self.confirm_and_render(
self.daemon.jsonrpc_stream_update(claim_id, **kwargs), confirm
)
async def stream_repost(self, claim_id, name='repost', bid='1.0', confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_stream_repost(claim_id=claim_id, name=name, bid=bid, **kwargs), confirm
)
async def stream_abandon(self, *args, confirm=True, **kwargs):
if 'blocking' not in kwargs:
kwargs['blocking'] = False
return await self.confirm_and_render(
self.daemon.jsonrpc_stream_abandon(*args, **kwargs), confirm
)
async def purchase_create(self, *args, confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_purchase_create(*args, **kwargs), confirm
)
async def publish(self, name, *args, confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_publish(name, *args, **kwargs), confirm
)
async def channel_create(self, name='@arena', bid='1.0', confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_channel_create(name, bid, **kwargs), confirm
)
async def channel_update(self, claim_id, confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_channel_update(claim_id, **kwargs), confirm
)
async def channel_abandon(self, *args, confirm=True, **kwargs):
if 'blocking' not in kwargs:
kwargs['blocking'] = False
return await self.confirm_and_render(
self.daemon.jsonrpc_channel_abandon(*args, **kwargs), confirm
)
async def collection_create(
self, name='firstcollection', bid='1.0', confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_collection_create(name, bid, **kwargs), confirm
)
async def collection_update(
self, claim_id, confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_collection_update(claim_id, **kwargs), confirm
)
async def collection_abandon(self, *args, confirm=True, **kwargs):
if 'blocking' not in kwargs:
kwargs['blocking'] = False
return await self.confirm_and_render(
self.daemon.jsonrpc_stream_abandon(*args, **kwargs), confirm
)
async def support_create(self, claim_id, bid='1.0', confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_support_create(claim_id, bid, **kwargs), confirm
)
async def support_abandon(self, *args, confirm=True, **kwargs):
if 'blocking' not in kwargs:
kwargs['blocking'] = False
return await self.confirm_and_render(
self.daemon.jsonrpc_support_abandon(*args, **kwargs), confirm
)
async def account_send(self, *args, confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_account_send(*args, **kwargs), confirm
)
async def wallet_send(self, *args, confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_wallet_send(*args, **kwargs), confirm
)
async def txo_spend(self, *args, confirm=True, **kwargs):
txs = await self.daemon.jsonrpc_txo_spend(*args, **kwargs)
if confirm:
await asyncio.wait([self.ledger.wait(tx) for tx in txs])
await self.generate(1)
await asyncio.wait([self.ledger.wait(tx, self.blockchain.block_expected) for tx in txs])
return self.sout(txs)
async def blob_clean(self):
return await self.out(self.daemon.jsonrpc_blob_clean())
async def status(self):
return await self.out(self.daemon.jsonrpc_status())
async def resolve(self, uri, **kwargs):
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]
async def claim_search(self, **kwargs):
return (await self.out(self.daemon.jsonrpc_claim_search(**kwargs)))['items']
async def get_claim_by_claim_id(self, claim_id):
return await self.out(self.ledger.get_claim_by_claim_id(claim_id))
async def file_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_file_list(*args, **kwargs)))['items']
async def txo_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_txo_list(*args, **kwargs)))['items']
async def txo_sum(self, *args, **kwargs):
return await self.out(self.daemon.jsonrpc_txo_sum(*args, **kwargs))
async def txo_plot(self, *args, **kwargs):
return await self.out(self.daemon.jsonrpc_txo_plot(*args, **kwargs))
async def claim_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_claim_list(*args, **kwargs)))['items']
async def stream_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_stream_list(*args, **kwargs)))['items']
async def channel_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_channel_list(*args, **kwargs)))['items']
async def transaction_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_transaction_list(*args, **kwargs)))['items']
async def blob_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_blob_list(*args, **kwargs)))['items']
@staticmethod
def get_claim_id(tx):
return tx['outputs'][0]['claim_id']
def assertItemCount(self, result, count): # pylint: disable=invalid-name
self.assertEqual(count, result['total_items'])