Merge pull request #643 from lbryio/batched-resolve

Batched uri resolution
This commit is contained in:
Jack Robison 2017-06-09 16:01:57 -04:00 committed by GitHub
commit 9a3c6ea18b
7 changed files with 243 additions and 169 deletions

View file

@ -13,8 +13,9 @@ at anytime.
* *
### Changed ### Changed
* * Support resolution of multiple uris with `resolve`, all results are keyed by uri
* * Add `error` responses for failed resolves
* Add `claim_list_by_channel`, supports multiple channel resolution
### Fixed ### Fixed
* Race condition from improper initialization and shutdown of the blob manager database * Race condition from improper initialization and shutdown of the blob manager database
@ -25,7 +26,7 @@ at anytime.
* *
### Removed ### Removed
* * Remove `claims_in_channel` from `resolve` response
* *
## [0.11.0] - 2017-06-09 ## [0.11.0] - 2017-06-09

View file

@ -39,6 +39,18 @@ class UnknownNameError(Exception):
self.name = name self.name = name
class UnknownClaimID(Exception):
def __init__(self, claim_id):
Exception.__init__(self, 'Claim {} is unknown'.format(claim_id))
self.claim_id = claim_id
class UnknownURI(Exception):
def __init__(self, uri):
Exception.__init__(self, 'URI {} cannot be resolved'.format(uri))
self.name = uri
class InvalidName(Exception): class InvalidName(Exception):
def __init__(self, name, invalid_characters): def __init__(self, name, invalid_characters):
self.name = name self.name = name

View file

@ -26,6 +26,7 @@ from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import RequestCanceledError, InsufficientFundsError, UnknownNameError from lbrynet.core.Error import RequestCanceledError, InsufficientFundsError, UnknownNameError
from lbrynet.core.Error import UnknownClaimID, UnknownURI
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -683,18 +684,6 @@ class Wallet(object):
break break
defer.returnValue(my_claim) defer.returnValue(my_claim)
@defer.inlineCallbacks
def get_claim_info(self, name, txid=None, nout=None, claim_id=None, check_expire=True):
if claim_id is not None:
results = yield self.get_claim(claim_id, check_expire)
if results['name'] != name:
raise Exception("Name does not match claim referenced by id")
elif txid is None or nout is None:
results = yield self.get_claim_by_name(name)
else:
results = yield self.get_claim_by_outpoint(ClaimOutpoint(txid, nout), check_expire)
defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_claim_result(self, results, update_caches=True): def _handle_claim_result(self, results, update_caches=True):
if not results: if not results:
@ -702,8 +691,12 @@ class Wallet(object):
if 'error' in results: if 'error' in results:
if results['error'] in ['name is not claimed', 'claim not found']: if results['error'] in ['name is not claimed', 'claim not found']:
raise UnknownNameError(results['error']) if 'claim_id' in results:
else: raise UnknownClaimID(results['claim_id'])
elif 'name' in results:
raise UnknownNameError(results['name'])
elif 'uri' in results:
raise UnknownURI(results['uri'])
raise Exception(results['error']) raise Exception(results['error'])
if 'certificate' in results: if 'certificate' in results:
@ -798,27 +791,39 @@ class Wallet(object):
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks
def resolve_uri(self, uri, check_cache=True): def resolve(self, *uris, **kwargs):
check_cache = kwargs.get('check_cache', True)
page = kwargs.get('page', 0)
page_size = kwargs.get('page_size', 10)
result = {}
needed = []
for uri in uris:
cached_claim = None cached_claim = None
if check_cache: if check_cache:
cached_claim = yield self._storage.get_cached_claim_for_uri(uri, check_cache) cached_claim = yield self._storage.get_cached_claim_for_uri(uri, check_cache)
if cached_claim: if cached_claim:
log.debug("Using cached results for %s", uri) log.debug("Using cached results for %s", uri)
resolve_results = cached_claim result[uri] = yield self._handle_claim_result(cached_claim, update_caches=False)
else: else:
log.info("Resolving %s", uri) log.info("Resolving %s", uri)
resolve_results = yield self._get_value_for_uri(uri) needed.append(uri)
batch_results = yield self._get_values_for_uris(page, page_size, *uris)
for uri, resolve_results in batch_results.iteritems():
claim_id = None claim_id = None
if resolve_results and 'claim' in resolve_results: if resolve_results and 'claim' in resolve_results:
claim_id = resolve_results['claim']['claim_id'] claim_id = resolve_results['claim']['claim_id']
certificate_id = None certificate_id = None
if resolve_results and 'certificate' in resolve_results: if resolve_results and 'certificate' in resolve_results:
certificate_id = resolve_results['certificate']['claim_id'] certificate_id = resolve_results['certificate']['claim_id']
try:
result = yield self._handle_claim_result(resolve_results, cached_claim is None) result[uri] = yield self._handle_claim_result(resolve_results, update_caches=True)
if claim_id: if claim_id:
yield self._storage.save_claim_to_uri_cache(uri, claim_id, certificate_id) yield self._storage.save_claim_to_uri_cache(uri, claim_id, certificate_id)
except (UnknownNameError, UnknownClaimID, UnknownURI) as err:
result[uri] = {'error': err.message}
defer.returnValue(result) defer.returnValue(result)
@ -832,7 +837,10 @@ class Wallet(object):
cached_claim = None cached_claim = None
if not cached_claim: if not cached_claim:
claim = yield self._get_claim_by_outpoint(txid, nout) claim = yield self._get_claim_by_outpoint(txid, nout)
try:
result = yield self._handle_claim_result(claim) result = yield self._handle_claim_result(claim)
except (UnknownNameError, UnknownClaimID, UnknownURI) as err:
result = {'error': err.message}
else: else:
result = cached_claim result = cached_claim
defer.returnValue(result) defer.returnValue(result)
@ -1091,6 +1099,9 @@ class Wallet(object):
def _get_claim_by_claimid(self, claim_id): def _get_claim_by_claimid(self, claim_id):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _get_values_for_uris(self, page, page_size, *uris):
return defer.fail(NotImplementedError())
def _start(self): def _start(self):
pass pass
@ -1378,6 +1389,10 @@ class LBRYumWallet(Wallet):
raise Exception("No uri given") raise Exception("No uri given")
return self._run_cmd_as_defer_to_thread('getvalueforuri', uri) return self._run_cmd_as_defer_to_thread('getvalueforuri', uri)
def _get_values_for_uris(self, page, page_size, *uris):
return self._run_cmd_as_defer_to_thread('getvaluesforuris', False, page, page_size,
*uris)
def _claim_certificate(self, name, amount): def _claim_certificate(self, name, amount):
return self._run_cmd_as_defer_succeed('claimcertificate', name, amount) return self._run_cmd_as_defer_succeed('claimcertificate', name, amount)

View file

@ -46,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash from lbrynet.core.Error import NoSuchStreamHash, UnknownClaimID, UnknownURI
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -727,6 +727,7 @@ class Daemon(AuthJSONRPCServer):
f.close() f.close()
return defer.succeed(True) return defer.succeed(True)
@defer.inlineCallbacks
def _resolve_name(self, name, force_refresh=False): def _resolve_name(self, name, force_refresh=False):
"""Resolves a name. Checks the cache first before going out to the blockchain. """Resolves a name. Checks the cache first before going out to the blockchain.
@ -734,10 +735,12 @@ class Daemon(AuthJSONRPCServer):
name: the lbry://<name> to resolve name: the lbry://<name> to resolve
force_refresh: if True, always go out to the blockchain to resolve. force_refresh: if True, always go out to the blockchain to resolve.
""" """
if name.startswith('lbry://'):
raise ValueError('name {} should not start with lbry://'.format(name)) parsed = parse_lbry_uri(name)
helper = _ResolveNameHelper(self, name, force_refresh) resolution = yield self.session.wallet.resolve(parsed.name, check_cache=not force_refresh)
return helper.get_deferred() if parsed.name in resolution:
result = resolution[parsed.name]
defer.returnValue(result)
def _get_or_download_sd_blob(self, blob, sd_hash): def _get_or_download_sd_blob(self, blob, sd_hash):
if blob: if blob:
@ -789,9 +792,10 @@ class Daemon(AuthJSONRPCServer):
cost = self._get_est_cost_from_stream_size(size) cost = self._get_est_cost_from_stream_size(size)
resolved = yield self.session.wallet.resolve_uri(uri) resolved = yield self.session.wallet.resolve(uri)
if 'claim' in resolved:
claim = ClaimDict.load_dict(resolved['claim']['value']) if uri in resolved and 'claim' in resolved[uri]:
claim = ClaimDict.load_dict(resolved[uri]['claim']['value'])
final_fee = self._add_key_fee_to_est_data_cost(claim.source_fee, cost) final_fee = self._add_key_fee_to_est_data_cost(claim.source_fee, cost)
result = yield self._render_response(final_fee) result = yield self._render_response(final_fee)
defer.returnValue(result) defer.returnValue(result)
@ -834,10 +838,11 @@ class Daemon(AuthJSONRPCServer):
""" """
Resolve a name and return the estimated stream cost Resolve a name and return the estimated stream cost
""" """
try:
claim_response = yield self.session.wallet.resolve_uri(uri) resolved = yield self.session.wallet.resolve(uri)
# TODO: fix me, this is a hack if resolved:
except Exception: claim_response = resolved[uri]
else:
claim_response = None claim_response = None
result = None result = None
@ -1324,9 +1329,11 @@ class Daemon(AuthJSONRPCServer):
outpoint = ClaimOutpoint(txid, nout) outpoint = ClaimOutpoint(txid, nout)
claim_results = yield self.session.wallet.get_claim_by_outpoint(outpoint) claim_results = yield self.session.wallet.get_claim_by_outpoint(outpoint)
else: else:
claim_results = yield self.session.wallet.get_claim_by_name(name) claim_results = yield self.session.wallet.resolve(name)
if claim_results:
claim_results = claim_results[name]
result = format_json_out_amount_as_float(claim_results) result = format_json_out_amount_as_float(claim_results)
except (TypeError, UnknownNameError): except (TypeError, UnknownNameError, UnknownClaimID, UnknownURI):
result = False result = False
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@ -1334,19 +1341,23 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks @defer.inlineCallbacks
@AuthJSONRPCServer.flags(force='-f') @AuthJSONRPCServer.flags(force='-f')
def jsonrpc_resolve(self, uri, force=False): def jsonrpc_resolve(self, force=False, uri=None, uris=[]):
""" """
Resolve a LBRY URI Resolve given LBRY URIs
Usage: Usage:
resolve <uri> [-f] resolve [-f] (<uri> | --uri=<uri>) [<uris>...]
Options: Options:
-f : force refresh and ignore cache -f : force refresh and ignore cache
Returns: Returns:
None if nothing can be resolved, otherwise: Dictionary of results, keyed by uri
If uri resolves to a channel or a claim in a channel: '<uri>': {
If a resolution error occurs:
'error': Error message
If the uri resolves to a channel or a claim in a channel:
'certificate': { 'certificate': {
'address': (str) claim address, 'address': (str) claim address,
'amount': (float) claim amount, 'amount': (float) claim amount,
@ -1366,29 +1377,8 @@ class Daemon(AuthJSONRPCServer):
'signature_is_valid': (bool), included if has_signature, 'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string 'value': ClaimDict if decoded, otherwise hex string
} }
If uri resolves to a channel:
'claims_in_channel': [ If the uri resolves to a claim:
{
'address': (str) claim address,
'amount': (float) claim amount,
'effective_amount': (float) claim amount including supports,
'claim_id': (str) claim id,
'claim_sequence': (int) claim sequence number,
'decoded_claim': (bool) whether or not the claim value was decoded,
'height': (int) claim height,
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}],
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
]
If uri resolves to a claim:
'claim': { 'claim': {
'address': (str) claim address, 'address': (str) claim address,
'amount': (float) claim amount, 'amount': (float) claim amount,
@ -1412,12 +1402,26 @@ class Daemon(AuthJSONRPCServer):
} }
""" """
uris = tuple(uris)
if uri is not None:
uris += (uri,)
results = {}
valid_uris = tuple()
for u in uris:
try: try:
resolved = yield self.session.wallet.resolve_uri(uri, check_cache=not force) parse_lbry_uri(u)
except UnknownNameError: valid_uris += (u, )
resolved = None except URIParseError:
results = yield self._render_response(resolved) results[u] = {"error": "%s is not a valid uri" % u}
defer.returnValue(results)
resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=not force)
for resolved_uri in resolved:
results[resolved_uri] = resolved[resolved_uri]
response = yield self._render_response(results)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks @defer.inlineCallbacks
@ -1462,9 +1466,13 @@ class Daemon(AuthJSONRPCServer):
timeout = timeout if timeout is not None else self.download_timeout timeout = timeout if timeout is not None else self.download_timeout
download_directory = download_directory or self.download_directory download_directory = download_directory or self.download_directory
resolved = yield self.session.wallet.resolve_uri(uri) resolved_result = yield self.session.wallet.resolve(uri)
if resolved_result and uri in resolved_result:
resolved = resolved_result[uri]
else:
resolved = None
if 'value' not in resolved: if not resolved or 'value' not in resolved:
if 'claim' not in resolved: if 'claim' not in resolved:
raise Exception("Nothing to download") raise Exception("Nothing to download")
else: else:
@ -1932,7 +1940,7 @@ class Daemon(AuthJSONRPCServer):
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_claim_list(self, name): def jsonrpc_claim_list(self, name):
""" """
Get claims for a name List current claims and information about them for a given name
Usage: Usage:
claim_list (<name> | --name=<name>) claim_list (<name> | --name=<name>)
@ -1962,6 +1970,94 @@ class Daemon(AuthJSONRPCServer):
claims = yield self.session.wallet.get_claims_for_name(name) claims = yield self.session.wallet.get_claims_for_name(name)
defer.returnValue(claims) defer.returnValue(claims)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_claim_list_by_channel(self, page=0, page_size=10, uri=None, uris=[]):
"""
Get paginated claims in a channel specified by a channel uri
Usage:
claim_list_by_channel (<uri> | --uri=<uri>) [<uris>...] [--page=<page>]
[--page_size=<page_size>]
Options:
--page=<page> : which page of results to return where page 1 is the first
page, defaults to no pages
--page_size=<page_size> : number of results in a page, default of 10
Returns:
{
resolved channel uri: {
If there was an error:
'error': (str) error message
'claims_in_channel_pages': total number of pages with <page_size> results,
If a page of results was requested:
'returned_page': page number returned,
'claims_in_channel': [
{
'absolute_channel_position': (int) claim index number in sorted list of
claims which assert to be part of the
channel
'address': (str) claim address,
'amount': (float) claim amount,
'effective_amount': (float) claim amount including supports,
'claim_id': (str) claim id,
'claim_sequence': (int) claim sequence number,
'decoded_claim': (bool) whether or not the claim value was decoded,
'height': (int) claim height,
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'supports: (list) list of supports [{'txid': txid,
'nout': nout,
'amount': amount}],
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
],
}
}
"""
uris = tuple(uris)
if uri is not None:
uris += (uri, )
results = {}
valid_uris = tuple()
for chan_uri in uris:
try:
parsed = parse_lbry_uri(chan_uri)
if not parsed.is_channel:
results[chan_uri] = {"error": "%s is not a channel uri" % parsed.name}
elif parsed.path:
results[chan_uri] = {"error": "%s is a claim in a channel" % parsed.path}
else:
valid_uris += (chan_uri, )
except URIParseError:
results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri}
resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=False, page=page,
page_size=page_size)
for u in resolved:
if 'error' in resolved[u]:
results[u] = resolved[u]
else:
results[u] = {
'claims_in_channel_pages': resolved[u]['claims_in_channel_pages']
}
if page:
results[u]['returned_page'] = page
results[u]['claims_in_channel'] = resolved[u].get('claims_in_channel', [])
response = yield self._render_response(results)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_transaction_list(self): def jsonrpc_transaction_list(self):
""" """
@ -2353,12 +2449,13 @@ class Daemon(AuthJSONRPCServer):
sd_blob.close_read_handle(sd_blob_file) sd_blob.close_read_handle(sd_blob_file)
return decoded_sd_blob return decoded_sd_blob
try: resolved_result = yield self.session.wallet.resolve(uri)
resolved = yield self.session.wallet.resolve_uri(uri) if resolved_result and uri in resolved_result:
except Exception: resolved = resolved_result[uri]
else:
defer.returnValue(None) defer.returnValue(None)
if resolved and 'claim' in resolved: if 'claim' in resolved:
metadata = resolved['claim']['value'] metadata = resolved['claim']['value']
else: else:
defer.returnValue(None) defer.returnValue(None)
@ -2416,58 +2513,6 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(response) defer.returnValue(response)
class _ResolveNameHelper(object):
def __init__(self, daemon, name, force_refresh):
self.daemon = daemon
self.name = name
self.force_refresh = force_refresh
def get_deferred(self):
if self.need_fresh_stream():
log.info("Resolving stream info for lbry://%s", self.name)
d = self.wallet.get_claim_by_name(self.name)
d.addCallback(self._cache_stream_info)
else:
log.debug("Returning cached stream info for lbry://%s", self.name)
d = defer.succeed(self.name_data['claim_metadata'])
return d
@property
def name_data(self):
return self.daemon.name_cache[self.name]
@property
def wallet(self):
return self.daemon.session.wallet
def now(self):
return self.daemon._get_long_count_timestamp()
def _add_txid(self, txid):
self.name_data['txid'] = txid
return defer.succeed(None)
def _cache_stream_info(self, stream_info):
self.daemon.name_cache[self.name] = {
'claim_metadata': stream_info['value'],
'timestamp': self.now()
}
d = self._add_txid(stream_info['txid'])
d.addCallback(lambda _: self.daemon._update_claim_cache())
d.addCallback(lambda _: self.name_data['claim_metadata'])
return d
def need_fresh_stream(self):
return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired()
def is_in_cache(self):
return self.name in self.daemon.name_cache
def is_cached_name_expired(self):
time_in_cache = self.now() - self.name_data['timestamp']
return time_in_cache >= self.daemon.cache_time
def loggly_time_string(dt): def loggly_time_string(dt):
formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S") formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S")
milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3)) milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3))

View file

@ -14,7 +14,7 @@ jsonrpc==1.2
jsonrpclib==0.1.7 jsonrpclib==0.1.7
jsonschema==2.5.1 jsonschema==2.5.1
git+https://github.com/lbryio/lbryschema.git@v0.0.7#egg=lbryschema git+https://github.com/lbryio/lbryschema.git@v0.0.7#egg=lbryschema
git+https://github.com/lbryio/lbryum.git@v2.7.25#egg=lbryum git+https://github.com/lbryio/lbryum.git@v2.8.0#egg=lbryum
miniupnpc==1.9 miniupnpc==1.9
pbkdf2==1.3 pbkdf2==1.3
pycrypto==2.6.1 pycrypto==2.6.1

View file

@ -21,7 +21,7 @@ requires = [
'envparse', 'envparse',
'jsonrpc', 'jsonrpc',
'jsonschema', 'jsonschema',
'lbryum==2.7.25', 'lbryum==2.8.0',
'lbryschema==0.0.7', 'lbryschema==0.0.7',
'miniupnpc', 'miniupnpc',
'pycrypto', 'pycrypto',

View file

@ -33,6 +33,7 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
prm = PaymentRateManager.NegotiatedPaymentRateManager(base_prm, DummyBlobAvailabilityTracker(), prm = PaymentRateManager.NegotiatedPaymentRateManager(base_prm, DummyBlobAvailabilityTracker(),
generous=generous) generous=generous)
daemon.session.payment_rate_manager = prm daemon.session.payment_rate_manager = prm
metadata = { metadata = {
"author": "fake author", "author": "fake author",
"language": "en", "language": "en",
@ -53,7 +54,7 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
{"fee": {"USD": {"address": "bQ6BGboPV2SpTMEP7wLNiAcnsZiH8ye6eA", "amount": 0.75}}}) {"fee": {"USD": {"address": "bQ6BGboPV2SpTMEP7wLNiAcnsZiH8ye6eA", "amount": 0.75}}})
daemon._resolve_name = lambda _: defer.succeed(metadata) daemon._resolve_name = lambda _: defer.succeed(metadata)
migrated = smart_decode(json.dumps(metadata)) migrated = smart_decode(json.dumps(metadata))
daemon.session.wallet.resolve_uri = lambda _: defer.succeed({'claim': {'value': migrated.claim_dict}}) daemon.session.wallet.resolve = lambda *_: defer.succeed({"test": {'claim': {'value': migrated.claim_dict}}})
return daemon return daemon