cache and share txs fetches while validating concurrently on batches
This commit is contained in:
parent
7f5deaf6c8
commit
911ca8c37e
1 changed files with 18 additions and 3 deletions
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
|
||||
import asyncio
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from binascii import unhexlify, hexlify
|
||||
|
||||
|
@ -24,14 +25,18 @@ class Resolver:
|
|||
self.transaction_class = ledger.transaction_class
|
||||
self.network = ledger.network
|
||||
self.ledger = ledger
|
||||
self._tx_cache = {}
|
||||
|
||||
async def resolve(self, page, page_size, *uris):
|
||||
uris = set(uris)
|
||||
try:
|
||||
for uri in uris:
|
||||
parsed_uri = parse_lbry_uri(uri)
|
||||
if parsed_uri.claim_id:
|
||||
validate_claim_id(parsed_uri.claim_id)
|
||||
resolutions = await self.network.get_values_for_uris(self.header_hash, *uris)
|
||||
if len(uris) > 1:
|
||||
return await self._batch_handle(resolutions, uris, page, page_size)
|
||||
return await self._handle_resolutions(resolutions, uris, page, page_size)
|
||||
except URIParseError as err:
|
||||
return {'error': err.args[0]}
|
||||
|
@ -39,6 +44,17 @@ class Resolver:
|
|||
log.exception(e)
|
||||
return {'error': str(e)}
|
||||
|
||||
async def _batch_handle(self, resolutions, uris, page, page_size):
|
||||
futs = []
|
||||
for uri in uris:
|
||||
futs.append(asyncio.ensure_future(self._handle_resolutions(resolutions, [uri], page, page_size)))
|
||||
results = await asyncio.gather(*futs)
|
||||
return {uri: results[uri] for uri in results if uri in results}
|
||||
|
||||
def _fetch_tx(self, txid):
|
||||
self._tx_cache[txid] = self._tx_cache.get(txid) or asyncio.ensure_future(self.network.get_transaction(txid))
|
||||
return self._tx_cache[txid]
|
||||
|
||||
async def _handle_resolutions(self, resolutions, requested_uris, page, page_size):
|
||||
results = {}
|
||||
for uri in requested_uris:
|
||||
|
@ -135,13 +151,12 @@ class Resolver:
|
|||
if not claim_result or 'value' not in claim_result:
|
||||
return claim_result
|
||||
claim_result = _decode_claim_result(claim_result)
|
||||
channel_id = None
|
||||
|
||||
if claim_result['value']:
|
||||
claim_result['has_signature'] = False
|
||||
if claim_result['value'].is_signed:
|
||||
claim_result['has_signature'] = True
|
||||
claim_tx = await self.network.get_transaction(claim_result['txid'])
|
||||
claim_tx = await self._fetch_tx(claim_result['txid'])
|
||||
if certificate is None:
|
||||
log.info("fetching certificate to check claim signature")
|
||||
channel_id = claim_result['value'].signing_channel_id
|
||||
|
@ -149,7 +164,7 @@ class Resolver:
|
|||
if not certificate:
|
||||
log.warning('Certificate %s not found', channel_id)
|
||||
claim_result['channel_name'] = certificate['name'] if certificate else None
|
||||
cert_tx = await self.network.get_transaction(certificate['txid']) if certificate else None
|
||||
cert_tx = await self._fetch_tx(certificate['txid']) if certificate else None
|
||||
claim_result['signature_is_valid'] = validate_claim_signature_and_get_channel_name(
|
||||
claim_result, certificate, self.ledger, claim_tx=claim_tx, cert_tx=cert_tx
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue