working integration tests

This commit is contained in:
Lex Berezhny 2018-10-16 15:04:20 -04:00
parent 1ca82b9d52
commit 5eac4247e1
6 changed files with 122 additions and 103 deletions

View file

@ -53,17 +53,16 @@ class ComponentManager:
for component_class in self.component_classes.values():
self.components.add(component_class(self))
@defer.inlineCallbacks
def evaluate_condition(self, condition_name):
if condition_name not in RegisteredConditions.conditions:
raise NameError(condition_name)
condition = RegisteredConditions.conditions[condition_name]
try:
component = self.get_component(condition.component)
result = yield defer.maybeDeferred(condition.evaluate, component)
result = condition.evaluate(component)
except Exception as err:
result = False
defer.returnValue((result, "" if result else condition.message))
return result, "" if result else condition.message
def sort_components(self, reverse=False):
"""

View file

@ -83,8 +83,7 @@ DIRECTION_DESCENDING = 'desc'
DIRECTIONS = DIRECTION_ASCENDING, DIRECTION_DESCENDING
@defer.inlineCallbacks
def maybe_paginate(get_records: Callable, get_record_count: Callable,
async def maybe_paginate(get_records: Callable, get_record_count: Callable,
page: Optional[int], page_size: Optional[int], **constraints):
if None not in (page, page_size):
constraints.update({
@ -92,11 +91,11 @@ def maybe_paginate(get_records: Callable, get_record_count: Callable,
"limit": page_size
})
return {
"items": (yield get_records(**constraints)),
"total_pages": int(((yield get_record_count(**constraints)) + (page_size-1)) / page_size),
"items": await get_records(**constraints),
"total_pages": int(((await get_record_count(**constraints)) + (page_size-1)) / page_size),
"page": page, "page_size": page_size
}
return (yield get_records(**constraints))
return await get_records(**constraints)
class IterableContainer:
@ -200,9 +199,7 @@ class WalletIsUnlocked(RequiredCondition):
@staticmethod
def evaluate(component):
d = component.check_locked()
d.addCallback(lambda r: not r)
return d
return not component.check_locked()
class Daemon(AuthJSONRPCServer):
@ -403,8 +400,7 @@ class Daemon(AuthJSONRPCServer):
del self.streams[sd_hash]
defer.returnValue(result)
@defer.inlineCallbacks
def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate=None,
async def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate=None,
claim_address=None, change_address=None):
publisher = Publisher(
self.blob_manager, self.payment_rate_manager, self.storage,
@ -412,11 +408,11 @@ class Daemon(AuthJSONRPCServer):
)
parse_lbry_uri(name)
if not file_path:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(
claim_dict['stream']['source']['source'])
tx = yield publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address)
stream_hash = await d2f(self.storage.get_stream_hash_for_sd_hash(
claim_dict['stream']['source']['source']))
tx = await publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address)
else:
tx = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address)
tx = await publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address)
if conf.settings['reflect_uploads']:
d = reupload.reflect_file(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
@ -425,13 +421,13 @@ class Daemon(AuthJSONRPCServer):
nout = 0
txo = tx.outputs[nout]
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout)
defer.returnValue({
return {
"success": True,
"tx": tx,
"claim_id": txo.claim_id,
"claim_address": self.ledger.hash160_to_address(txo.script.values['pubkey_hash']),
"output": tx.outputs[nout]
})
}
def _get_or_download_sd_blob(self, blob, sd_hash):
if blob:
@ -1043,8 +1039,7 @@ class Daemon(AuthJSONRPCServer):
pass
@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_wallet_send(self, amount, address=None, claim_id=None, account_id=None):
async def jsonrpc_wallet_send(self, amount, address=None, claim_id=None, account_id=None):
"""
Send credits. If given an address, send credits to it. If given a claim id, send a tip
to the owner of a claim specified by uri. A tip is a claim support where the recipient
@ -1102,11 +1097,11 @@ class Daemon(AuthJSONRPCServer):
if reserved_points is None:
raise InsufficientFundsError()
account = self.get_account_or_default(account_id)
result = yield self.wallet_manager.send_points_to_address(reserved_points, amount, account)
result = await self.wallet_manager.send_points_to_address(reserved_points, amount, account)
self.analytics_manager.send_credits_sent()
else:
log.info("This command is deprecated for sending tips, please use the newer claim_tip command")
result = yield self.jsonrpc_claim_tip(claim_id=claim_id, amount=amount, account_id=account_id)
result = await self.jsonrpc_claim_tip(claim_id=claim_id, amount=amount, account_id=account_id)
return result
@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@ -1181,8 +1176,7 @@ class Daemon(AuthJSONRPCServer):
confirmations=confirmations, show_seed=show_seed)
@requires("wallet")
@defer.inlineCallbacks
def jsonrpc_account_balance(self, account_id=None, confirmations=0):
async def jsonrpc_account_balance(self, account_id=None, confirmations=0):
"""
Return the balance of an account
@ -1199,12 +1193,11 @@ class Daemon(AuthJSONRPCServer):
(decimal) amount of lbry credits in wallet
"""
account = self.get_account_or_default(account_id)
dewies = yield account.get_balance(confirmations=confirmations)
dewies = await account.get_balance(confirmations=confirmations)
return dewies_to_lbc(dewies)
@requires("wallet")
@defer.inlineCallbacks
def jsonrpc_account_add(
async def jsonrpc_account_add(
self, account_name, single_key=False, seed=None, private_key=None, public_key=None):
"""
Add a previously created account from a seed, private key or public key (read-only).
@ -1239,7 +1232,7 @@ class Daemon(AuthJSONRPCServer):
)
if self.ledger.network.is_connected:
yield self.ledger.update_account(account)
await self.ledger.update_account(account)
self.default_wallet.save()
@ -1251,8 +1244,7 @@ class Daemon(AuthJSONRPCServer):
return result
@requires("wallet")
@defer.inlineCallbacks
def jsonrpc_account_create(self, account_name, single_key=False):
async def jsonrpc_account_create(self, account_name, single_key=False):
"""
Create a new account. Specify --single_key if you want to use
the same address for all transactions (not recommended).
@ -1275,7 +1267,7 @@ class Daemon(AuthJSONRPCServer):
)
if self.ledger.network.is_connected:
yield self.ledger.update_account(account)
await self.ledger.update_account(account)
self.default_wallet.save()
@ -1710,8 +1702,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(response)
@requires(WALLET_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_resolve(self, force=False, uri=None, uris=[]):
async def jsonrpc_resolve(self, force=False, uri=None, uris=None):
"""
Resolve given LBRY URIs
@ -1779,7 +1770,7 @@ class Daemon(AuthJSONRPCServer):
}
"""
uris = tuple(uris)
uris = tuple(uris or [])
if uri is not None:
uris += (uri,)
@ -1793,12 +1784,10 @@ class Daemon(AuthJSONRPCServer):
except URIParseError:
results[u] = {"error": "%s is not a valid uri" % u}
resolved = yield self.wallet_manager.resolve(*valid_uris, check_cache=not force)
resolved = await self.wallet_manager.resolve(*valid_uris, check_cache=not force)
for resolved_uri in resolved:
results[resolved_uri] = resolved[resolved_uri]
response = yield self._render_response(results)
defer.returnValue(response)
return results
@requires(STREAM_IDENTIFIER_COMPONENT, WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
@ -2010,8 +1999,7 @@ class Daemon(AuthJSONRPCServer):
return self.get_est_cost(uri, size)
@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_channel_new(self, channel_name, amount):
async def jsonrpc_channel_new(self, channel_name, amount):
"""
Generate a publisher key and create a new '@' prefixed certificate claim
@ -2046,7 +2034,7 @@ class Daemon(AuthJSONRPCServer):
if amount <= 0:
raise Exception("Invalid amount")
tx = yield self.wallet_manager.claim_new_channel(channel_name, amount)
tx = await self.wallet_manager.claim_new_channel(channel_name, amount)
self.default_wallet.save()
self.analytics_manager.send_new_channel()
nout = 0
@ -2125,8 +2113,7 @@ class Daemon(AuthJSONRPCServer):
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None,
async def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None,
description=None, author=None, language=None, license=None,
license_url=None, thumbnail=None, preview=None, nsfw=None, sources=None,
channel_name=None, channel_id=None,
@ -2221,7 +2208,7 @@ class Daemon(AuthJSONRPCServer):
# raises an error if the address is invalid
decode_address(address)
available = yield self.default_account.get_balance()
available = await self.default_account.get_balance()
if amount >= available:
# TODO: add check for existing claim balance
#balance = yield self.wallet.get_max_usable_balance_for_claim(name)
@ -2273,7 +2260,7 @@ class Daemon(AuthJSONRPCServer):
log.warning("Stripping empty fee from published metadata")
del metadata['fee']
elif 'address' not in metadata['fee']:
address = yield self.default_account.receiving.get_or_create_usable_address()
address = await self.default_account.receiving.get_or_create_usable_address()
metadata['fee']['address'] = address
if 'fee' in metadata and 'version' not in metadata['fee']:
metadata['fee']['version'] = '_0_0_1'
@ -2316,7 +2303,7 @@ class Daemon(AuthJSONRPCServer):
certificate = None
if channel_id or channel_name:
certificate = yield self.get_channel_or_error(channel_id, channel_name)
certificate = await self.get_channel_or_error(channel_id, channel_name)
log.info("Publish: %s", {
'name': name,
@ -2329,13 +2316,13 @@ class Daemon(AuthJSONRPCServer):
'channel_name': channel_name
})
result = yield self._publish_stream(name, amount, claim_dict, file_path, certificate,
claim_address, change_address)
return result
return await self._publish_stream(
name, amount, claim_dict, file_path, certificate,
claim_address, change_address
)
@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_claim_abandon(self, claim_id=None, txid=None, nout=None, account_id=None):
async def jsonrpc_claim_abandon(self, claim_id=None, txid=None, nout=None, account_id=None):
"""
Abandon a name and reclaim credits from the claim
@ -2366,16 +2353,12 @@ class Daemon(AuthJSONRPCServer):
if nout is None and txid is not None:
raise Exception('Must specify nout')
tx = yield self.wallet_manager.abandon_claim(claim_id, txid, nout, account)
tx = await self.wallet_manager.abandon_claim(claim_id, txid, nout, account)
self.analytics_manager.send_claim_action('abandon')
defer.returnValue({
"success": True,
"tx": tx,
})
return {"success": True, "tx": tx}
@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_claim_new_support(self, name, claim_id, amount, account_id=None):
async def jsonrpc_claim_new_support(self, name, claim_id, amount, account_id=None):
"""
Support a name claim
@ -2403,13 +2386,12 @@ class Daemon(AuthJSONRPCServer):
"""
account = self.get_account_or_default(account_id)
amount = self.get_dewies_or_error("amount", amount)
result = yield self.wallet_manager.support_claim(name, claim_id, amount, account)
result = await self.wallet_manager.support_claim(name, claim_id, amount, account)
self.analytics_manager.send_claim_action('new_support')
return result
@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_claim_tip(self, claim_id, amount, account_id=None):
async def jsonrpc_claim_tip(self, claim_id, amount, account_id=None):
"""
Tip the owner of the claim
@ -2437,7 +2419,7 @@ class Daemon(AuthJSONRPCServer):
account = self.get_account_or_default(account_id)
amount = self.get_dewies_or_error("amount", amount)
validate_claim_id(claim_id)
result = yield self.wallet_manager.tip_claim(amount, claim_id, account)
result = await self.wallet_manager.tip_claim(amount, claim_id, account)
self.analytics_manager.send_claim_action('new_support')
return result
@ -3294,16 +3276,15 @@ class Daemon(AuthJSONRPCServer):
response['head_blob_availability'].get('is_available')
defer.returnValue(response)
@defer.inlineCallbacks
def get_channel_or_error(self, channel_id: str = None, channel_name: str = None):
async def get_channel_or_error(self, channel_id: str = None, channel_name: str = None):
if channel_id is not None:
certificates = yield self.wallet_manager.get_certificates(
certificates = await self.wallet_manager.get_certificates(
private_key_accounts=[self.default_account], claim_id=channel_id)
if not certificates:
raise ValueError("Couldn't find channel with claim_id '{}'." .format(channel_id))
return certificates[0]
if channel_name is not None:
certificates = yield self.wallet_manager.get_certificates(
certificates = await self.wallet_manager.get_certificates(
private_key_accounts=[self.default_account], claim_name=channel_name)
if not certificates:
raise ValueError("Couldn't find channel with name '{}'.".format(channel_name))

View file

@ -1,14 +1,17 @@
import asyncio
import logging
import mimetypes
import os
from twisted.internet import defer
from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file
log = logging.getLogger(__name__)
def d2f(d):
return d.asFuture(asyncio.get_event_loop())
class Publisher:
def __init__(self, blob_manager, payment_rate_manager, storage, lbry_file_manager, wallet, certificate):
self.blob_manager = blob_manager
@ -19,8 +22,7 @@ class Publisher:
self.certificate = certificate
self.lbry_file = None
@defer.inlineCallbacks
def create_and_publish_stream(self, name, bid, claim_dict, file_path, holding_address=None):
async def create_and_publish_stream(self, name, bid, claim_dict, file_path, holding_address=None):
"""Create lbry file and make claim"""
log.info('Starting publish for %s', name)
if not os.path.isfile(file_path):
@ -30,10 +32,10 @@ class Publisher:
file_name = os.path.basename(file_path)
with open(file_path, 'rb') as read_handle:
self.lbry_file = yield create_lbry_file(
self.lbry_file = await d2f(create_lbry_file(
self.blob_manager, self.storage, self.payment_rate_manager, self.lbry_file_manager, file_name,
read_handle
)
))
if 'source' not in claim_dict['stream']:
claim_dict['stream']['source'] = {}
@ -41,37 +43,36 @@ class Publisher:
claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash'
claim_dict['stream']['source']['contentType'] = get_content_type(file_path)
claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here
tx = yield self.wallet.claim_name(
tx = await self.wallet.claim_name(
name, bid, claim_dict, self.certificate, holding_address
)
# check if we have a file already for this claim (if this is a publish update with a new stream)
old_stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(
old_stream_hashes = await d2f(self.storage.get_old_stream_hashes_for_claim_id(
tx.outputs[0].claim_id, self.lbry_file.stream_hash
)
))
if old_stream_hashes:
for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes,
list(self.lbry_file_manager.lbry_files)):
yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False)
await d2f(self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False))
log.info("Removed old stream for claim update: %s", lbry_file.stream_hash)
yield self.storage.save_content_claim(
await d2f(self.storage.save_content_claim(
self.lbry_file.stream_hash, tx.outputs[0].id
)
defer.returnValue(tx)
))
return tx
@defer.inlineCallbacks
def publish_stream(self, name, bid, claim_dict, stream_hash, holding_address=None):
async def publish_stream(self, name, bid, claim_dict, stream_hash, holding_address=None):
"""Make a claim without creating a lbry file"""
tx = yield self.wallet.claim_name(
tx = await self.wallet.claim_name(
name, bid, claim_dict, self.certificate, holding_address
)
if stream_hash: # the stream_hash returned from the db will be None if this isn't a stream we have
yield self.storage.save_content_claim(
await d2f(self.storage.save_content_claim(
stream_hash, tx.outputs[0].id
)
))
self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0]
defer.returnValue(tx)
return tx
def get_content_type(filename):

View file

@ -1,3 +1,4 @@
import asyncio
import logging
from six.moves.urllib import parse as urlparse
import json
@ -7,6 +8,7 @@ import signal
from functools import wraps
from twisted.web import server
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from twisted.internet.error import ConnectionDone, ConnectionLost
from txjsonrpc import jsonrpclib
@ -141,19 +143,17 @@ class AuthorizedBase(metaclass=JSONRPCServerType):
condition_names = conditions.get("conditions", [])
def _wrap(fn):
@defer.inlineCallbacks
@wraps(fn)
def _inner(*args, **kwargs):
component_manager = args[0].component_manager
for condition_name in condition_names:
condition_result, err_msg = yield component_manager.evaluate_condition(condition_name)
condition_result, err_msg = component_manager.evaluate_condition(condition_name)
if not condition_result:
raise ComponentStartConditionNotMet(err_msg)
if not component_manager.all_components_running(*components):
raise ComponentsNotStarted("the following required components have not yet started: "
"%s" % json.dumps(components))
result = yield fn(*args, **kwargs)
defer.returnValue(result)
return fn(*args, **kwargs)
return _inner
return _wrap
@ -446,7 +446,18 @@ class AuthJSONRPCServer(AuthorizedBase):
)
return server.NOT_DONE_YET
d = defer.maybeDeferred(fn, self, *_args, **_kwargs)
try:
result = fn(self, *_args, **_kwargs)
if isinstance(result, Deferred):
d = result
elif isinstance(result, Failure):
d = defer.fail(result)
elif asyncio.iscoroutine(result):
d = Deferred.fromFuture(asyncio.ensure_future(result))
else:
d = defer.succeed(result)
except:
d = Failure(captureVars=Deferred.debug)
# finished_deferred will callback when the request is finished
# and errback if something went wrong. If the errback is

View file

@ -1,5 +1,6 @@
import os
import json
import asyncio
import logging
from binascii import unhexlify
@ -265,7 +266,8 @@ class LbryWalletManager(BaseWalletManager):
ledger: MainNetLedger = self.default_account.ledger
results = await ledger.resolve(page, page_size, *uris)
await self.old_db.save_claims_for_resolve(
(value for value in results.values() if 'error' not in value))
(value for value in results.values() if 'error' not in value)
).asFuture(asyncio.get_event_loop())
return results
def get_claims_for_name(self, name: str):
@ -362,7 +364,7 @@ class LbryWalletManager(BaseWalletManager):
await account.ledger.broadcast(tx)
await self.old_db.save_claims([self._old_get_temp_claim_info(
tx, tx.outputs[0], claim_address, claim_dict, name, amount
)])
)]).asFuture(asyncio.get_event_loop())
# TODO: release reserved tx outputs in case anything fails by this point
return tx

View file

@ -4,7 +4,11 @@ import tempfile
import logging
from types import SimpleNamespace
from orchstr8.testcase import IntegrationTestCase
from twisted.trial import unittest
from twisted.internet import utils, defer
from twisted.internet.utils import runWithWarningsSuppressed as originalRunWith
from orchstr8.testcase import IntegrationTestCase as BaseIntegrationTestCase
import lbryschema
lbryschema.BLOCKCHAIN_NAME = 'lbrycrd_regtest'
@ -86,13 +90,34 @@ class FakeAnalytics:
pass
class IntegrationTestCase(unittest.TestCase, BaseIntegrationTestCase):
async def setUp(self):
await self.asyncSetUp()
async def tearDown(self):
await self.asyncTearDown()
def run_with_async_support(suppress, f, *a, **kw):
if asyncio.iscoroutinefunction(f):
def test_method(*args, **kwargs):
return defer.Deferred.fromFuture(asyncio.ensure_future(f(*args, **kwargs)))
else:
test_method = f
return originalRunWith(suppress, test_method, *a, **kw)
utils.runWithWarningsSuppressed = run_with_async_support
class CommandTestCase(IntegrationTestCase):
timeout = 180
WALLET_MANAGER = LbryWalletManager
async def asyncSetUp(self):
await super().asyncSetUp()
async def setUp(self):
await super().setUp()
if self.VERBOSE:
log.setLevel(logging.DEBUG)
@ -139,8 +164,8 @@ class CommandTestCase(IntegrationTestCase):
self.daemon.wallet_manager = self.wallet_component.wallet_manager
self.manager.old_db = self.daemon.storage
async def asyncTearDown(self):
await super().asyncTearDown()
async def tearDown(self):
await super().tearDown()
self.wallet_component._running = False
await d2f(self.daemon._shutdown())
@ -300,7 +325,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
file.write(b'Totally un-cliched ending')
file.write(b'**Audience Gasps**')
file.flush()
claim3 = yield self.out(self.daemon.jsonrpc_publish(
claim3 = await self.out(self.daemon.jsonrpc_publish(
'fresh-start', '1.0', file_path=file.name, channel_name='@spam'
))
self.assertTrue(claim3['success'])
@ -360,7 +385,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
file.write(b'I know right? Totally a hit song')
file.write(b'That\'s what goes around for songs these days anyways')
file.flush()
claim4 = yield self.out(self.daemon.jsonrpc_publish(
claim4 = await self.out(self.daemon.jsonrpc_publish(
'hit-song', '1.0', file_path=file.name, channel_id=channel['claim_id']
))
self.assertTrue(claim4['success'])
@ -384,7 +409,7 @@ class EpicAdventuresOfChris45(CommandTestCase):
class AccountManagement(CommandTestCase):
VERBOSE = True
VERBOSE = False
async def test_performing_account_management_commands(self):
# check initial account
@ -393,7 +418,7 @@ class AccountManagement(CommandTestCase):
# change account name and gap
account_id = response['lbc_regtest'][0]['id']
await self.daemon.jsonrpc_account_set(
self.daemon.jsonrpc_account_set(
account_id=account_id, new_name='test account',
receiving_gap=95, receiving_max_uses=96,
change_gap=97, change_max_uses=98
@ -424,7 +449,7 @@ class AccountManagement(CommandTestCase):
account_seed = response['lbc_regtest'][1]['seed']
# remove account
await self.daemon.jsonrpc_account_remove(response['lbc_regtest'][1]['id'])
self.daemon.jsonrpc_account_remove(response['lbc_regtest'][1]['id'])
response = await self.daemon.jsonrpc_account_list()
self.assertEqual(len(response['lbc_regtest']), 1)