cache bypass

This commit is contained in:
Victor Shyba 2020-12-21 22:37:48 -03:00
parent eb2a4aebba
commit 6a610187e0

View file

@ -606,12 +606,6 @@ class Ledger(metaclass=LedgerRegistry):
async def maybe_verify_transaction(self, tx, remote_height, merkle=None): async def maybe_verify_transaction(self, tx, remote_height, merkle=None):
tx.height = remote_height tx.height = remote_height
cached = self._tx_cache.get(tx.id)
if not cached:
# cache txs looked up by transaction_show too
cached = TransactionCacheItem()
self._tx_cache[tx.id] = cached
cached.tx = tx
if 0 < remote_height < len(self.headers): if 0 < remote_height < len(self.headers):
# can't be tx.pending_verifications == 1 because we have to handle the transaction_show case # can't be tx.pending_verifications == 1 because we have to handle the transaction_show case
if not merkle: if not merkle:
@ -623,11 +617,12 @@ class Ledger(metaclass=LedgerRegistry):
tx.position = merkle['pos'] tx.position = merkle['pos']
tx.is_verified = merkle_root == header['merkle_root'] tx.is_verified = merkle_root == header['merkle_root']
async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...]): async def request_transactions(self, to_request: Tuple[Tuple[str, int], ...], cached=False):
batches = [[]] batches = [[]]
remote_heights = {} remote_heights = {}
for txid, height in sorted(to_request, key=lambda x: x[1]): for txid, height in sorted(to_request, key=lambda x: x[1]):
if cached:
if txid in self._tx_cache: if txid in self._tx_cache:
if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified:
yield self._tx_cache[txid].tx yield self._tx_cache[txid].tx
@ -643,14 +638,16 @@ class Ledger(metaclass=LedgerRegistry):
for batch in batches: for batch in batches:
async for tx in self._single_batch(batch, remote_heights): async for tx in self._single_batch(batch, remote_heights):
if cached:
self._tx_cache[tx.id].tx = tx
yield tx yield tx
async def request_synced_transactions(self, to_request, remote_history, address): async def request_synced_transactions(self, to_request, remote_history, address):
pending_sync = [] pending_sync = {}
async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())): async for tx in self.request_transactions(((txid, height) for txid, height in to_request.values())):
pending_sync.append(asyncio.ensure_future(self._sync(tx, remote_history))) pending_sync[tx.id] = tx
yield tx yield tx
await asyncio.gather(*pending_sync) await asyncio.gather(*(self._sync(tx, remote_history, pending_sync) for tx in pending_sync.values()))
async def _single_batch(self, batch, remote_heights): async def _single_batch(self, batch, remote_heights):
heights = {remote_heights[txid] for txid in batch} heights = {remote_heights[txid] for txid in batch}
@ -662,7 +659,7 @@ class Ledger(metaclass=LedgerRegistry):
await self.maybe_verify_transaction(tx, remote_height, merkle) await self.maybe_verify_transaction(tx, remote_height, merkle)
yield tx yield tx
async def _sync(self, tx, remote_history): async def _sync(self, tx, remote_history, pending_txs):
check_db_for_txos = {} check_db_for_txos = {}
for txi in tx.inputs: for txi in tx.inputs:
if txi.txo_ref.txo is not None: if txi.txo_ref.txo is not None:
@ -670,9 +667,8 @@ class Ledger(metaclass=LedgerRegistry):
wanted_txid = txi.txo_ref.tx_ref.id wanted_txid = txi.txo_ref.tx_ref.id
if wanted_txid not in remote_history: if wanted_txid not in remote_history:
continue continue
if wanted_txid in self._tx_cache: if wanted_txid in pending_txs:
await self._tx_cache[wanted_txid].has_tx.wait() tx = pending_txs[wanted_txid]
txi.txo_ref = self._tx_cache[wanted_txid].tx.outputs[txi.txo_ref.position].ref
else: else:
check_db_for_txos[txi] = txi.txo_ref.id check_db_for_txos[txi] = txi.txo_ref.id
@ -686,8 +682,11 @@ class Ledger(metaclass=LedgerRegistry):
if txi.txo_ref.id in referenced_txos: if txi.txo_ref.id in referenced_txos:
txi.txo_ref = referenced_txos[txi.txo_ref.id].ref txi.txo_ref = referenced_txos[txi.txo_ref.id].ref
else: else:
tx = await self.db.get_transaction(txid=wanted_txid)
if tx is None:
log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id) log.warning("%s not on db, not on cache, but on remote history!", txi.txo_ref.id)
return tx else:
txi.txo_ref = tx.outputs[txi.txo_ref.position].ref
async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: async def get_address_manager_for_address(self, address) -> Optional[AddressManager]:
details = await self.db.get_address(address=address) details = await self.db.get_address(address=address)
@ -757,7 +756,7 @@ class Ledger(metaclass=LedgerRegistry):
outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None?
txs: List[Transaction] = [] txs: List[Transaction] = []
if len(outputs.txs) > 0: if len(outputs.txs) > 0:
async for tx in self.request_transactions(tuple(outputs.txs)): async for tx in self.request_transactions(tuple(outputs.txs), cached=True):
txs.append(tx) txs.append(tx)
_txos, blocked = outputs.inflate(txs) _txos, blocked = outputs.inflate(txs)