multi resolve

This commit is contained in:
Jack Robison 2017-06-09 13:47:13 -04:00
parent 2cfd29564b
commit ea0c6eca32
4 changed files with 126 additions and 161 deletions

View file

@ -13,8 +13,8 @@ at anytime.
*
### Changed
*
*
* Support resolution of multiple uris with `resolve`, all results are keyed by uri
* Add `error` responses for failed resolves
### Fixed
* Race condition from improper initialization and shutdown of the blob manager database
@ -25,7 +25,7 @@ at anytime.
*
### Removed
*
* Remove `claims_in_channel` from `resolve` response
*
## [0.11.0] - 2017-06-09

View file

@ -684,18 +684,6 @@ class Wallet(object):
break
defer.returnValue(my_claim)
@defer.inlineCallbacks
def get_claim_info(self, name, txid=None, nout=None, claim_id=None, check_expire=True):
if claim_id is not None:
results = yield self.get_claim(claim_id, check_expire)
if results['name'] != name:
raise Exception("Name does not match claim referenced by id")
elif txid is None or nout is None:
results = yield self.get_claim_by_name(name)
else:
results = yield self.get_claim_by_outpoint(ClaimOutpoint(txid, nout), check_expire)
defer.returnValue(results)
@defer.inlineCallbacks
def _handle_claim_result(self, results, update_caches=True):
if not results:
@ -803,27 +791,39 @@ class Wallet(object):
defer.returnValue(results)
@defer.inlineCallbacks
def resolve_uri(self, uri, check_cache=True):
cached_claim = None
if check_cache:
cached_claim = yield self._storage.get_cached_claim_for_uri(uri, check_cache)
if cached_claim:
log.debug("Using cached results for %s", uri)
resolve_results = cached_claim
else:
log.info("Resolving %s", uri)
resolve_results = yield self._get_value_for_uri(uri)
def resolve(self, *uris, **kwargs):
check_cache = kwargs.get('check_cache', True)
page = kwargs.get('page', 0)
page_size = kwargs.get('page_size', 10)
claim_id = None
if resolve_results and 'claim' in resolve_results:
claim_id = resolve_results['claim']['claim_id']
certificate_id = None
if resolve_results and 'certificate' in resolve_results:
certificate_id = resolve_results['certificate']['claim_id']
result = {}
needed = []
for uri in uris:
cached_claim = None
if check_cache:
cached_claim = yield self._storage.get_cached_claim_for_uri(uri, check_cache)
if cached_claim:
log.debug("Using cached results for %s", uri)
result[uri] = yield self._handle_claim_result(cached_claim, update_caches=False)
else:
log.info("Resolving %s", uri)
needed.append(uri)
result = yield self._handle_claim_result(resolve_results, cached_claim is None)
if claim_id:
yield self._storage.save_claim_to_uri_cache(uri, claim_id, certificate_id)
batch_results = yield self._get_values_for_uris(page, page_size, *uris)
for uri, resolve_results in batch_results.iteritems():
claim_id = None
if resolve_results and 'claim' in resolve_results:
claim_id = resolve_results['claim']['claim_id']
certificate_id = None
if resolve_results and 'certificate' in resolve_results:
certificate_id = resolve_results['certificate']['claim_id']
try:
result[uri] = yield self._handle_claim_result(resolve_results, update_caches=True)
if claim_id:
yield self._storage.save_claim_to_uri_cache(uri, claim_id, certificate_id)
except (UnknownNameError, UnknownClaimID, UnknownURI) as err:
result[uri] = {'error': err.message}
defer.returnValue(result)
@ -1099,6 +1099,9 @@ class Wallet(object):
def _get_claim_by_claimid(self, claim_id):
return defer.fail(NotImplementedError())
def _get_values_for_uris(self, page, page_size, *uris):
return defer.fail(NotImplementedError())
def _start(self):
pass
@ -1386,6 +1389,10 @@ class LBRYumWallet(Wallet):
raise Exception("No uri given")
return self._run_cmd_as_defer_to_thread('getvalueforuri', uri)
def _get_values_for_uris(self, page, page_size, *uris):
return self._run_cmd_as_defer_to_thread('getvaluesforuris', False, page, page_size,
*uris)
def _claim_certificate(self, name, amount):
return self._run_cmd_as_defer_succeed('claimcertificate', name, amount)

View file

@ -727,6 +727,7 @@ class Daemon(AuthJSONRPCServer):
f.close()
return defer.succeed(True)
@defer.inlineCallbacks
def _resolve_name(self, name, force_refresh=False):
"""Resolves a name. Checks the cache first before going out to the blockchain.
@ -734,10 +735,12 @@ class Daemon(AuthJSONRPCServer):
name: the lbry://<name> to resolve
force_refresh: if True, always go out to the blockchain to resolve.
"""
if name.startswith('lbry://'):
raise ValueError('name {} should not start with lbry://'.format(name))
helper = _ResolveNameHelper(self, name, force_refresh)
return helper.get_deferred()
parsed = parse_lbry_uri(name)
resolution = yield self.session.wallet.resolve(parsed.name, check_cache=not force_refresh)
if parsed.name in resolution:
result = resolution[parsed.name]
defer.returnValue(result)
def _get_or_download_sd_blob(self, blob, sd_hash):
if blob:
@ -789,9 +792,10 @@ class Daemon(AuthJSONRPCServer):
cost = self._get_est_cost_from_stream_size(size)
resolved = yield self.session.wallet.resolve_uri(uri)
if 'claim' in resolved:
claim = ClaimDict.load_dict(resolved['claim']['value'])
resolved = yield self.session.wallet.resolve(uri)
if uri in resolved and 'claim' in resolved[uri]:
claim = ClaimDict.load_dict(resolved[uri]['claim']['value'])
final_fee = self._add_key_fee_to_est_data_cost(claim.source_fee, cost)
result = yield self._render_response(final_fee)
defer.returnValue(result)
@ -834,10 +838,11 @@ class Daemon(AuthJSONRPCServer):
"""
Resolve a name and return the estimated stream cost
"""
try:
claim_response = yield self.session.wallet.resolve_uri(uri)
# TODO: fix me, this is a hack
except Exception:
resolved = yield self.session.wallet.resolve(uri)
if resolved:
claim_response = resolved[uri]
else:
claim_response = None
result = None
@ -1324,7 +1329,9 @@ class Daemon(AuthJSONRPCServer):
outpoint = ClaimOutpoint(txid, nout)
claim_results = yield self.session.wallet.get_claim_by_outpoint(outpoint)
else:
claim_results = yield self.session.wallet.get_claim_by_name(name)
claim_results = yield self.session.wallet.resolve(name)
if claim_results:
claim_results = claim_results[name]
result = format_json_out_amount_as_float(claim_results)
except (TypeError, UnknownNameError, UnknownClaimID, UnknownURI):
result = False
@ -1334,41 +1341,24 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
@AuthJSONRPCServer.flags(force='-f')
def jsonrpc_resolve(self, uri, force=False):
def jsonrpc_resolve(self, force=False, uri=None, uris=[]):
"""
Resolve a LBRY URI
Resolve given LBRY URIs
Usage:
resolve <uri> [-f]
resolve [-f] (<uri> | --uri=<uri>) [<uris>...]
Options:
-f : force refresh and ignore cache
Returns:
None if nothing can be resolved, otherwise:
If uri resolves to a channel or a claim in a channel:
'certificate': {
'address': (str) claim address,
'amount': (float) claim amount,
'effective_amount': (float) claim amount including supports,
'claim_id': (str) claim id,
'claim_sequence': (int) claim sequence number,
'decoded_claim': (bool) whether or not the claim value was decoded,
'height': (int) claim height,
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}],
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
If uri resolves to a channel:
'claims_in_channel': [
{
Dictionary of results, keyed by uri
'<uri>': {
If a resolution error occurs:
'error': Error message
If the uri resolves to a channel or a claim in a channel:
'certificate': {
'address': (str) claim address,
'amount': (float) claim amount,
'effective_amount': (float) claim amount including supports,
@ -1387,37 +1377,51 @@ class Daemon(AuthJSONRPCServer):
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
]
If uri resolves to a claim:
'claim': {
'address': (str) claim address,
'amount': (float) claim amount,
'effective_amount': (float) claim amount including supports,
'claim_id': (str) claim id,
'claim_sequence': (int) claim sequence number,
'decoded_claim': (bool) whether or not the claim value was decoded,
'height': (int) claim height,
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'channel_name': (str) channel name if claim is in a channel
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}]
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
If the uri resolves to a claim:
'claim': {
'address': (str) claim address,
'amount': (float) claim amount,
'effective_amount': (float) claim amount including supports,
'claim_id': (str) claim id,
'claim_sequence': (int) claim sequence number,
'decoded_claim': (bool) whether or not the claim value was decoded,
'height': (int) claim height,
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'channel_name': (str) channel name if claim is in a channel
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}]
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
}
"""
try:
resolved = yield self.session.wallet.resolve_uri(uri, check_cache=not force)
except UnknownNameError:
resolved = None
results = yield self._render_response(resolved)
defer.returnValue(results)
uris = tuple(uris)
if uri is not None:
uris += (uri,)
results = {}
valid_uris = tuple()
for u in uris:
try:
parse_lbry_uri(u)
valid_uris += (u, )
except URIParseError:
results[u] = {"error": "%s is not a valid uri" % u}
resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=not force)
for resolved_uri in resolved:
results[resolved_uri] = resolved[resolved_uri]
response = yield self._render_response(results)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
@ -1462,9 +1466,13 @@ class Daemon(AuthJSONRPCServer):
timeout = timeout if timeout is not None else self.download_timeout
download_directory = download_directory or self.download_directory
resolved = yield self.session.wallet.resolve_uri(uri)
resolved_result = yield self.session.wallet.resolve(uri)
if resolved_result and uri in resolved_result:
resolved = resolved_result[uri]
else:
resolved = None
if 'value' not in resolved:
if not resolved or 'value' not in resolved:
if 'claim' not in resolved:
raise Exception("Nothing to download")
else:
@ -1932,7 +1940,7 @@ class Daemon(AuthJSONRPCServer):
@defer.inlineCallbacks
def jsonrpc_claim_list(self, name):
"""
Get claims for a name
List current claims and information about them for a given name
Usage:
claim_list (<name> | --name=<name>)
@ -2353,12 +2361,13 @@ class Daemon(AuthJSONRPCServer):
sd_blob.close_read_handle(sd_blob_file)
return decoded_sd_blob
try:
resolved = yield self.session.wallet.resolve_uri(uri)
except Exception:
resolved_result = yield self.session.wallet.resolve(uri)
if resolved_result and uri in resolved_result:
resolved = resolved_result[uri]
else:
defer.returnValue(None)
if resolved and 'claim' in resolved:
if 'claim' in resolved:
metadata = resolved['claim']['value']
else:
defer.returnValue(None)
@ -2416,58 +2425,6 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(response)
class _ResolveNameHelper(object):
def __init__(self, daemon, name, force_refresh):
self.daemon = daemon
self.name = name
self.force_refresh = force_refresh
def get_deferred(self):
if self.need_fresh_stream():
log.info("Resolving stream info for lbry://%s", self.name)
d = self.wallet.get_claim_by_name(self.name)
d.addCallback(self._cache_stream_info)
else:
log.debug("Returning cached stream info for lbry://%s", self.name)
d = defer.succeed(self.name_data['claim_metadata'])
return d
@property
def name_data(self):
return self.daemon.name_cache[self.name]
@property
def wallet(self):
return self.daemon.session.wallet
def now(self):
return self.daemon._get_long_count_timestamp()
def _add_txid(self, txid):
self.name_data['txid'] = txid
return defer.succeed(None)
def _cache_stream_info(self, stream_info):
self.daemon.name_cache[self.name] = {
'claim_metadata': stream_info['value'],
'timestamp': self.now()
}
d = self._add_txid(stream_info['txid'])
d.addCallback(lambda _: self.daemon._update_claim_cache())
d.addCallback(lambda _: self.name_data['claim_metadata'])
return d
def need_fresh_stream(self):
return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired()
def is_in_cache(self):
return self.name in self.daemon.name_cache
def is_cached_name_expired(self):
time_in_cache = self.now() - self.name_data['timestamp']
return time_in_cache >= self.daemon.cache_time
def loggly_time_string(dt):
formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S")
milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3))

View file

@ -33,6 +33,7 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
prm = PaymentRateManager.NegotiatedPaymentRateManager(base_prm, DummyBlobAvailabilityTracker(),
generous=generous)
daemon.session.payment_rate_manager = prm
metadata = {
"author": "fake author",
"language": "en",
@ -53,7 +54,7 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
{"fee": {"USD": {"address": "bQ6BGboPV2SpTMEP7wLNiAcnsZiH8ye6eA", "amount": 0.75}}})
daemon._resolve_name = lambda _: defer.succeed(metadata)
migrated = smart_decode(json.dumps(metadata))
daemon.session.wallet.resolve_uri = lambda _: defer.succeed({'claim': {'value': migrated.claim_dict}})
daemon.session.wallet.resolve = lambda *_: defer.succeed({"test": {'claim': {'value': migrated.claim_dict}}})
return daemon