insane working version of paged resolve, to be refactored

This commit is contained in:
Victor Shyba 2018-07-12 00:46:01 -03:00 committed by Jack Robison
parent e0230864ce
commit 9e8cb17ecb
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 27 additions and 18 deletions

View file

@ -152,7 +152,7 @@ class MainNetLedger(BaseLedger):
resolutions = yield self.network.get_values_for_uris(self.headers.hash(), *uris) resolutions = yield self.network.get_values_for_uris(self.headers.hash(), *uris)
resolver = Resolver(self.headers.claim_trie_root, self.headers.height, self.transaction_class, resolver = Resolver(self.headers.claim_trie_root, self.headers.height, self.transaction_class,
hash160_to_address=lambda x: self.hash160_to_address(x), network=self.network) hash160_to_address=lambda x: self.hash160_to_address(x), network=self.network)
defer.returnValue(resolver._handle_resolutions(resolutions, uris, page, page_size)) defer.returnValue((yield resolver._handle_resolutions(resolutions, uris, page, page_size)))
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):

View file

@ -7,4 +7,4 @@ class Network(BaseNetwork):
return self.rpc('blockchain.claimtrie.getvaluesforuris', block_hash, *uris) return self.rpc('blockchain.claimtrie.getvaluesforuris', block_hash, *uris)
def get_claims_by_ids(self, *claim_ids): def get_claims_by_ids(self, *claim_ids):
return self.rpc("blockchain.claimtrie.getclaimsbyids", *claim_ids) return self.rpc('blockchain.claimtrie.getclaimsbyids', *claim_ids)

View file

@ -3,6 +3,8 @@ import logging
from ecdsa import BadSignatureError from ecdsa import BadSignatureError
from binascii import unhexlify from binascii import unhexlify
from twisted.internet import defer
from lbrynet.core.Error import UnknownNameError, UnknownClaimID, UnknownURI, UnknownOutpoint from lbrynet.core.Error import UnknownNameError, UnknownClaimID, UnknownURI, UnknownOutpoint
from lbryschema.address import is_address from lbryschema.address import is_address
from lbryschema.claim import ClaimDict from lbryschema.claim import ClaimDict
@ -23,6 +25,7 @@ class Resolver:
self.hash160_to_address = hash160_to_address self.hash160_to_address = hash160_to_address
self.network = network self.network = network
@defer.inlineCallbacks
def _handle_resolutions(self, resolutions, requested_uris, page, page_size): def _handle_resolutions(self, resolutions, requested_uris, page, page_size):
results = {} results = {}
for uri in requested_uris: for uri in requested_uris:
@ -30,12 +33,13 @@ class Resolver:
if resolution: if resolution:
try: try:
results[uri] = _handle_claim_result( results[uri] = _handle_claim_result(
self._handle_resolve_uri_response(uri, resolution, page, page_size) (yield self._handle_resolve_uri_response(uri, resolution, page, page_size))
) )
except (UnknownNameError, UnknownClaimID, UnknownURI) as err: except (UnknownNameError, UnknownClaimID, UnknownURI) as err:
results[uri] = {'error': err.message} results[uri] = {'error': err.message}
return results defer.returnValue(results)
@defer.inlineCallbacks
def _handle_resolve_uri_response(self, uri, resolution, page=0, page_size=10, raw=False): def _handle_resolve_uri_response(self, uri, resolution, page=0, page_size=10, raw=False):
result = {} result = {}
claim_trie_root = self.claim_trie_root claim_trie_root = self.claim_trie_root
@ -114,8 +118,8 @@ class Resolver:
elif 'unverified_claims_for_name' in resolution and 'certificate' in result: elif 'unverified_claims_for_name' in resolution and 'certificate' in result:
unverified_claims_for_name = resolution['unverified_claims_for_name'] unverified_claims_for_name = resolution['unverified_claims_for_name']
channel_info = self.get_channel_claims_page(unverified_claims_for_name, channel_info = yield self.get_channel_claims_page(unverified_claims_for_name,
result['certificate'], page=1) result['certificate'], page=1)
claims_in_channel, upper_bound = channel_info claims_in_channel, upper_bound = channel_info
if len(claims_in_channel) > 1: if len(claims_in_channel) > 1:
@ -128,8 +132,8 @@ class Resolver:
# parse and validate claims in a channel iteratively into pages of results # parse and validate claims in a channel iteratively into pages of results
elif 'unverified_claims_in_channel' in resolution and 'certificate' in result: elif 'unverified_claims_in_channel' in resolution and 'certificate' in result:
ids_to_check = resolution['unverified_claims_in_channel'] ids_to_check = resolution['unverified_claims_in_channel']
channel_info = self.get_channel_claims_page(ids_to_check, result['certificate'], channel_info = yield self.get_channel_claims_page(ids_to_check, result['certificate'],
page=page, page_size=page_size) page=page, page_size=page_size)
claims_in_channel, upper_bound = channel_info claims_in_channel, upper_bound = channel_info
if claims_in_channel: if claims_in_channel:
@ -139,7 +143,7 @@ class Resolver:
result['success'] = False result['success'] = False
result['uri'] = str(parsed_uri) result['uri'] = str(parsed_uri)
return result defer.returnValue(result)
def parse_and_validate_claim_result(self, claim_result, certificate=None, raw=False): def parse_and_validate_claim_result(self, claim_result, certificate=None, raw=False):
if not claim_result or 'value' not in claim_result: if not claim_result or 'value' not in claim_result:
@ -210,6 +214,7 @@ class Resolver:
abs_position += 1 abs_position += 1
return queries, names, absolute_position_index return queries, names, absolute_position_index
@defer.inlineCallbacks
def iter_channel_claims_pages(self, queries, claim_positions, claim_names, certificate, def iter_channel_claims_pages(self, queries, claim_positions, claim_names, certificate,
page_size=10): page_size=10):
# lbryum server returns a dict of {claim_id: (name, claim_height)} # lbryum server returns a dict of {claim_id: (name, claim_height)}
@ -225,9 +230,10 @@ class Resolver:
# processed them. # processed them.
# TODO: fix ^ in lbryschema # TODO: fix ^ in lbryschema
@defer.inlineCallbacks
def iter_validate_channel_claims(): def iter_validate_channel_claims():
formatted_claims = []
for claim_ids in queries: for claim_ids in queries:
log.info(claim_ids)
batch_result = yield self.network.get_claims_by_ids(*claim_ids) batch_result = yield self.network.get_claims_by_ids(*claim_ids)
for claim_id in claim_ids: for claim_id in claim_ids:
claim = batch_result[claim_id] claim = batch_result[claim_id]
@ -235,25 +241,28 @@ class Resolver:
formatted_claim = self.parse_and_validate_claim_result(claim, certificate) formatted_claim = self.parse_and_validate_claim_result(claim, certificate)
formatted_claim['absolute_channel_position'] = claim_positions[ formatted_claim['absolute_channel_position'] = claim_positions[
claim['claim_id']] claim['claim_id']]
yield formatted_claim formatted_claims.append(formatted_claim)
else: else:
log.warning("ignoring claim with name mismatch %s %s", claim['name'], log.warning("ignoring claim with name mismatch %s %s", claim['name'],
claim['claim_id']) claim['claim_id'])
defer.returnValue(formatted_claims)
yielded_page = False yielded_page = False
results = [] results = []
for claim in iter_validate_channel_claims():
for claim in (yield iter_validate_channel_claims()):
results.append(claim) results.append(claim)
# if there is a full page of results, yield it # if there is a full page of results, yield it
if len(results) and len(results) % page_size == 0: if len(results) and len(results) % page_size == 0:
yield results[-page_size:] defer.returnValue(results[-page_size:])
yielded_page = True yielded_page = True
# if we didn't get a full page of results, yield what results we did get # if we didn't get a full page of results, yield what results we did get
if not yielded_page: if not yielded_page:
yield results defer.returnValue(results)
@defer.inlineCallbacks
def get_channel_claims_page(self, channel_claim_infos, certificate, page, page_size=10): def get_channel_claims_page(self, channel_claim_infos, certificate, page, page_size=10):
page = page or 0 page = page or 0
page_size = max(page_size, 1) page_size = max(page_size, 1)
@ -262,14 +271,14 @@ class Resolver:
start_position = (page - 1) * page_size start_position = (page - 1) * page_size
queries, names, claim_positions = self.prepare_claim_queries(start_position, page_size, queries, names, claim_positions = self.prepare_claim_queries(start_position, page_size,
channel_claim_infos) channel_claim_infos)
page_generator = self.iter_channel_claims_pages(queries, claim_positions, names, page_generator = yield self.iter_channel_claims_pages(queries, claim_positions, names,
certificate, page_size=page_size) certificate, page_size=page_size)
upper_bound = len(claim_positions) upper_bound = len(claim_positions)
if not page: if not page:
return None, upper_bound defer.returnValue((None, upper_bound))
if start_position > upper_bound: if start_position > upper_bound:
raise IndexError("claim %i greater than max %i" % (start_position, upper_bound)) raise IndexError("claim %i greater than max %i" % (start_position, upper_bound))
return next(page_generator), upper_bound defer.returnValue((page_generator, upper_bound))
# Format amount to be decimal encoded string # Format amount to be decimal encoded string