From ea0c6eca3286d21de3998d955402337a0d8ebf75 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 9 Jun 2017 13:47:13 -0400 Subject: [PATCH] multi resolve --- CHANGELOG.md | 6 +- lbrynet/core/Wallet.py | 69 ++++---- lbrynet/lbrynet_daemon/Daemon.py | 209 +++++++++-------------- tests/unit/lbrynet_daemon/test_Daemon.py | 3 +- 4 files changed, 126 insertions(+), 161 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d85a4bded..e312a0993 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ at anytime. * ### Changed - * - * + * Support resolution of multiple uris with `resolve`, all results are keyed by uri + * Add `error` responses for failed resolves ### Fixed * Race condition from improper initialization and shutdown of the blob manager database @@ -25,7 +25,7 @@ at anytime. * ### Removed - * + * Remove `claims_in_channel` from `resolve` response * ## [0.11.0] - 2017-06-09 diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 793a9fd5c..6a7526857 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -684,18 +684,6 @@ class Wallet(object): break defer.returnValue(my_claim) - @defer.inlineCallbacks - def get_claim_info(self, name, txid=None, nout=None, claim_id=None, check_expire=True): - if claim_id is not None: - results = yield self.get_claim(claim_id, check_expire) - if results['name'] != name: - raise Exception("Name does not match claim referenced by id") - elif txid is None or nout is None: - results = yield self.get_claim_by_name(name) - else: - results = yield self.get_claim_by_outpoint(ClaimOutpoint(txid, nout), check_expire) - defer.returnValue(results) - @defer.inlineCallbacks def _handle_claim_result(self, results, update_caches=True): if not results: @@ -803,27 +791,39 @@ class Wallet(object): defer.returnValue(results) @defer.inlineCallbacks - def resolve_uri(self, uri, check_cache=True): - cached_claim = None - if check_cache: - cached_claim = yield self._storage.get_cached_claim_for_uri(uri, check_cache) - if cached_claim: - log.debug("Using cached results for %s", uri) - resolve_results = cached_claim - else: - log.info("Resolving %s", uri) - resolve_results = yield self._get_value_for_uri(uri) + def resolve(self, *uris, **kwargs): + check_cache = kwargs.get('check_cache', True) + page = kwargs.get('page', 0) + page_size = kwargs.get('page_size', 10) - claim_id = None - if resolve_results and 'claim' in resolve_results: - claim_id = resolve_results['claim']['claim_id'] - certificate_id = None - if resolve_results and 'certificate' in resolve_results: - certificate_id = resolve_results['certificate']['claim_id'] + result = {} + needed = [] + for uri in uris: + cached_claim = None + if check_cache: + cached_claim = yield self._storage.get_cached_claim_for_uri(uri, check_cache) + if cached_claim: + log.debug("Using cached results for %s", uri) + result[uri] = yield self._handle_claim_result(cached_claim, update_caches=False) + else: + log.info("Resolving %s", uri) + needed.append(uri) - result = yield self._handle_claim_result(resolve_results, cached_claim is None) - if claim_id: - yield self._storage.save_claim_to_uri_cache(uri, claim_id, certificate_id) + batch_results = yield self._get_values_for_uris(page, page_size, *uris) + + for uri, resolve_results in batch_results.iteritems(): + claim_id = None + if resolve_results and 'claim' in resolve_results: + claim_id = resolve_results['claim']['claim_id'] + certificate_id = None + if resolve_results and 'certificate' in resolve_results: + certificate_id = resolve_results['certificate']['claim_id'] + try: + result[uri] = yield self._handle_claim_result(resolve_results, update_caches=True) + if claim_id: + yield self._storage.save_claim_to_uri_cache(uri, claim_id, certificate_id) + except (UnknownNameError, UnknownClaimID, UnknownURI) as err: + result[uri] = {'error': err.message} defer.returnValue(result) @@ -1099,6 +1099,9 @@ class Wallet(object): def _get_claim_by_claimid(self, claim_id): return defer.fail(NotImplementedError()) + def _get_values_for_uris(self, page, page_size, *uris): + return defer.fail(NotImplementedError()) + def _start(self): pass @@ -1386,6 +1389,10 @@ class LBRYumWallet(Wallet): raise Exception("No uri given") return self._run_cmd_as_defer_to_thread('getvalueforuri', uri) + def _get_values_for_uris(self, page, page_size, *uris): + return self._run_cmd_as_defer_to_thread('getvaluesforuris', False, page, page_size, + *uris) + def _claim_certificate(self, name, amount): return self._run_cmd_as_defer_succeed('claimcertificate', name, amount) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index b0258aa11..47f72f16c 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -727,6 +727,7 @@ class Daemon(AuthJSONRPCServer): f.close() return defer.succeed(True) + @defer.inlineCallbacks def _resolve_name(self, name, force_refresh=False): """Resolves a name. Checks the cache first before going out to the blockchain. @@ -734,10 +735,12 @@ class Daemon(AuthJSONRPCServer): name: the lbry:// to resolve force_refresh: if True, always go out to the blockchain to resolve. """ - if name.startswith('lbry://'): - raise ValueError('name {} should not start with lbry://'.format(name)) - helper = _ResolveNameHelper(self, name, force_refresh) - return helper.get_deferred() + + parsed = parse_lbry_uri(name) + resolution = yield self.session.wallet.resolve(parsed.name, check_cache=not force_refresh) + if parsed.name in resolution: + result = resolution[parsed.name] + defer.returnValue(result) def _get_or_download_sd_blob(self, blob, sd_hash): if blob: @@ -789,9 +792,10 @@ class Daemon(AuthJSONRPCServer): cost = self._get_est_cost_from_stream_size(size) - resolved = yield self.session.wallet.resolve_uri(uri) - if 'claim' in resolved: - claim = ClaimDict.load_dict(resolved['claim']['value']) + resolved = yield self.session.wallet.resolve(uri) + + if uri in resolved and 'claim' in resolved[uri]: + claim = ClaimDict.load_dict(resolved[uri]['claim']['value']) final_fee = self._add_key_fee_to_est_data_cost(claim.source_fee, cost) result = yield self._render_response(final_fee) defer.returnValue(result) @@ -834,10 +838,11 @@ class Daemon(AuthJSONRPCServer): """ Resolve a name and return the estimated stream cost """ - try: - claim_response = yield self.session.wallet.resolve_uri(uri) - # TODO: fix me, this is a hack - except Exception: + + resolved = yield self.session.wallet.resolve(uri) + if resolved: + claim_response = resolved[uri] + else: claim_response = None result = None @@ -1324,7 +1329,9 @@ class Daemon(AuthJSONRPCServer): outpoint = ClaimOutpoint(txid, nout) claim_results = yield self.session.wallet.get_claim_by_outpoint(outpoint) else: - claim_results = yield self.session.wallet.get_claim_by_name(name) + claim_results = yield self.session.wallet.resolve(name) + if claim_results: + claim_results = claim_results[name] result = format_json_out_amount_as_float(claim_results) except (TypeError, UnknownNameError, UnknownClaimID, UnknownURI): result = False @@ -1334,41 +1341,24 @@ class Daemon(AuthJSONRPCServer): @AuthJSONRPCServer.auth_required @defer.inlineCallbacks @AuthJSONRPCServer.flags(force='-f') - def jsonrpc_resolve(self, uri, force=False): + def jsonrpc_resolve(self, force=False, uri=None, uris=[]): """ - Resolve a LBRY URI + Resolve given LBRY URIs Usage: - resolve [-f] + resolve [-f] ( | --uri=) [...] Options: -f : force refresh and ignore cache Returns: - None if nothing can be resolved, otherwise: - If uri resolves to a channel or a claim in a channel: - 'certificate': { - 'address': (str) claim address, - 'amount': (float) claim amount, - 'effective_amount': (float) claim amount including supports, - 'claim_id': (str) claim id, - 'claim_sequence': (int) claim sequence number, - 'decoded_claim': (bool) whether or not the claim value was decoded, - 'height': (int) claim height, - 'depth': (int) claim depth, - 'has_signature': (bool) included if decoded_claim - 'name': (str) claim name, - 'supports: (list) list of supports [{'txid': txid, - 'nout': nout, - 'amount': amount}], - 'txid': (str) claim txid, - 'nout': (str) claim nout, - 'signature_is_valid': (bool), included if has_signature, - 'value': ClaimDict if decoded, otherwise hex string - } - If uri resolves to a channel: - 'claims_in_channel': [ - { + Dictionary of results, keyed by uri + '': { + If a resolution error occurs: + 'error': Error message + + If the uri resolves to a channel or a claim in a channel: + 'certificate': { 'address': (str) claim address, 'amount': (float) claim amount, 'effective_amount': (float) claim amount including supports, @@ -1387,37 +1377,51 @@ class Daemon(AuthJSONRPCServer): 'signature_is_valid': (bool), included if has_signature, 'value': ClaimDict if decoded, otherwise hex string } - ] - If uri resolves to a claim: - 'claim': { - 'address': (str) claim address, - 'amount': (float) claim amount, - 'effective_amount': (float) claim amount including supports, - 'claim_id': (str) claim id, - 'claim_sequence': (int) claim sequence number, - 'decoded_claim': (bool) whether or not the claim value was decoded, - 'height': (int) claim height, - 'depth': (int) claim depth, - 'has_signature': (bool) included if decoded_claim - 'name': (str) claim name, - 'channel_name': (str) channel name if claim is in a channel - 'supports: (list) list of supports [{'txid': txid, - 'nout': nout, - 'amount': amount}] - 'txid': (str) claim txid, - 'nout': (str) claim nout, - 'signature_is_valid': (bool), included if has_signature, - 'value': ClaimDict if decoded, otherwise hex string - } + + If the uri resolves to a claim: + 'claim': { + 'address': (str) claim address, + 'amount': (float) claim amount, + 'effective_amount': (float) claim amount including supports, + 'claim_id': (str) claim id, + 'claim_sequence': (int) claim sequence number, + 'decoded_claim': (bool) whether or not the claim value was decoded, + 'height': (int) claim height, + 'depth': (int) claim depth, + 'has_signature': (bool) included if decoded_claim + 'name': (str) claim name, + 'channel_name': (str) channel name if claim is in a channel + 'supports: (list) list of supports [{'txid': txid, + 'nout': nout, + 'amount': amount}] + 'txid': (str) claim txid, + 'nout': (str) claim nout, + 'signature_is_valid': (bool), included if has_signature, + 'value': ClaimDict if decoded, otherwise hex string + } } """ - try: - resolved = yield self.session.wallet.resolve_uri(uri, check_cache=not force) - except UnknownNameError: - resolved = None - results = yield self._render_response(resolved) - defer.returnValue(results) + uris = tuple(uris) + if uri is not None: + uris += (uri,) + + results = {} + + valid_uris = tuple() + for u in uris: + try: + parse_lbry_uri(u) + valid_uris += (u, ) + except URIParseError: + results[u] = {"error": "%s is not a valid uri" % u} + + resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=not force) + + for resolved_uri in resolved: + results[resolved_uri] = resolved[resolved_uri] + response = yield self._render_response(results) + defer.returnValue(response) @AuthJSONRPCServer.auth_required @defer.inlineCallbacks @@ -1462,9 +1466,13 @@ class Daemon(AuthJSONRPCServer): timeout = timeout if timeout is not None else self.download_timeout download_directory = download_directory or self.download_directory - resolved = yield self.session.wallet.resolve_uri(uri) + resolved_result = yield self.session.wallet.resolve(uri) + if resolved_result and uri in resolved_result: + resolved = resolved_result[uri] + else: + resolved = None - if 'value' not in resolved: + if not resolved or 'value' not in resolved: if 'claim' not in resolved: raise Exception("Nothing to download") else: @@ -1932,7 +1940,7 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def jsonrpc_claim_list(self, name): """ - Get claims for a name + List current claims and information about them for a given name Usage: claim_list ( | --name=) @@ -2353,12 +2361,13 @@ class Daemon(AuthJSONRPCServer): sd_blob.close_read_handle(sd_blob_file) return decoded_sd_blob - try: - resolved = yield self.session.wallet.resolve_uri(uri) - except Exception: + resolved_result = yield self.session.wallet.resolve(uri) + if resolved_result and uri in resolved_result: + resolved = resolved_result[uri] + else: defer.returnValue(None) - if resolved and 'claim' in resolved: + if 'claim' in resolved: metadata = resolved['claim']['value'] else: defer.returnValue(None) @@ -2416,58 +2425,6 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(response) -class _ResolveNameHelper(object): - def __init__(self, daemon, name, force_refresh): - self.daemon = daemon - self.name = name - self.force_refresh = force_refresh - - def get_deferred(self): - if self.need_fresh_stream(): - log.info("Resolving stream info for lbry://%s", self.name) - d = self.wallet.get_claim_by_name(self.name) - d.addCallback(self._cache_stream_info) - else: - log.debug("Returning cached stream info for lbry://%s", self.name) - d = defer.succeed(self.name_data['claim_metadata']) - return d - - @property - def name_data(self): - return self.daemon.name_cache[self.name] - - @property - def wallet(self): - return self.daemon.session.wallet - - def now(self): - return self.daemon._get_long_count_timestamp() - - def _add_txid(self, txid): - self.name_data['txid'] = txid - return defer.succeed(None) - - def _cache_stream_info(self, stream_info): - self.daemon.name_cache[self.name] = { - 'claim_metadata': stream_info['value'], - 'timestamp': self.now() - } - d = self._add_txid(stream_info['txid']) - d.addCallback(lambda _: self.daemon._update_claim_cache()) - d.addCallback(lambda _: self.name_data['claim_metadata']) - return d - - def need_fresh_stream(self): - return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired() - - def is_in_cache(self): - return self.name in self.daemon.name_cache - - def is_cached_name_expired(self): - time_in_cache = self.now() - self.name_data['timestamp'] - return time_in_cache >= self.daemon.cache_time - - def loggly_time_string(dt): formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S") milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3)) diff --git a/tests/unit/lbrynet_daemon/test_Daemon.py b/tests/unit/lbrynet_daemon/test_Daemon.py index c3a3c6cfd..3e4a741b5 100644 --- a/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/tests/unit/lbrynet_daemon/test_Daemon.py @@ -33,6 +33,7 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False): prm = PaymentRateManager.NegotiatedPaymentRateManager(base_prm, DummyBlobAvailabilityTracker(), generous=generous) daemon.session.payment_rate_manager = prm + metadata = { "author": "fake author", "language": "en", @@ -53,7 +54,7 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False): {"fee": {"USD": {"address": "bQ6BGboPV2SpTMEP7wLNiAcnsZiH8ye6eA", "amount": 0.75}}}) daemon._resolve_name = lambda _: defer.succeed(metadata) migrated = smart_decode(json.dumps(metadata)) - daemon.session.wallet.resolve_uri = lambda _: defer.succeed({'claim': {'value': migrated.claim_dict}}) + daemon.session.wallet.resolve = lambda *_: defer.succeed({"test": {'claim': {'value': migrated.claim_dict}}}) return daemon