_single_batch

This commit is contained in:
Jack Robison 2020-12-03 18:31:23 -05:00
parent 2d1c6a5402
commit d02ff232e5
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -654,6 +654,33 @@ class Ledger(metaclass=LedgerRegistry):
tx.position = merkle['pos'] tx.position = merkle['pos']
tx.is_verified = merkle_root == header['merkle_root'] tx.is_verified = merkle_root == header['merkle_root']
async def _single_batch(self, batch, remote_heights, header_cache, transactions):
batch_result = await self.network.retriable_call(
self.network.get_transaction_batch, batch
)
for txid, (raw, merkle) in batch_result.items():
remote_height = remote_heights[txid]
merkle_height = merkle['block_height']
cache_item = self._tx_cache.get(txid)
if cache_item is None:
cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item
tx = cache_item.tx or Transaction(bytes.fromhex(raw.decode()), height=remote_height)
tx.height = remote_height
cache_item.tx = tx
if 'merkle' in merkle and remote_heights[txid] > 0:
merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
try:
header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height))
except IndexError:
log.warning("failed to verify %s at height %i", tx.id, merkle_height)
else:
header_cache[remote_heights[txid]] = header
tx.position = merkle['pos']
tx.is_verified = merkle_root == header['merkle_root']
transactions.append(tx)
return transactions
async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]): async def request_transactions_for_inflate(self, to_request: Tuple[Tuple[str, int], ...]):
header_cache = {} header_cache = {}
batches = [[]] batches = [[]]
@ -674,34 +701,8 @@ class Ledger(metaclass=LedgerRegistry):
if not batches[-1]: if not batches[-1]:
batches.pop() batches.pop()
async def _single_batch(batch):
batch_result = await self.network.retriable_call(
self.network.get_transaction_batch, batch, restricted=False
)
for txid, (raw, merkle) in batch_result.items():
remote_height = remote_heights[txid]
merkle_height = merkle['block_height']
cache_item = self._tx_cache.get(txid)
if cache_item is None:
cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item
tx = cache_item.tx or Transaction(bytes.fromhex(raw), height=remote_height)
tx.height = remote_height
cache_item.tx = tx
if 'merkle' in merkle and remote_heights[txid] > 0:
merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
try:
header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height))
except IndexError:
log.warning("failed to verify %s at height %i", tx.id, merkle_height)
else:
header_cache[remote_heights[txid]] = header
tx.position = merkle['pos']
tx.is_verified = merkle_root == header['merkle_root']
transactions.append(tx)
for batch in batches: for batch in batches:
await _single_batch(batch) await self._single_batch(batch, remote_heights, header_cache, transactions)
return transactions return transactions
async def _request_transaction_batch(self, to_request, remote_history_size, address): async def _request_transaction_batch(self, to_request, remote_history_size, address):
@ -728,28 +729,10 @@ class Ledger(metaclass=LedgerRegistry):
last_showed_synced_count = 0 last_showed_synced_count = 0
async def _single_batch(batch): async def _single_batch(batch):
transactions = await self._single_batch(batch, remote_heights, header_cache, [])
this_batch_synced = [] this_batch_synced = []
batch_result = await self.network.retriable_call(self.network.get_transaction_batch, batch)
for txid, (raw, merkle) in batch_result.items(): for tx in transactions:
remote_height = remote_heights[txid]
merkle_height = merkle['block_height']
cache_item = self._tx_cache.get(txid)
if cache_item is None:
cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item
tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height)
tx.height = remote_height
cache_item.tx = tx
if 'merkle' in merkle and remote_heights[txid] > 0:
merkle_root = self.get_root_of_merkle_tree(merkle['merkle'], merkle['pos'], tx.hash)
try:
header = header_cache.get(remote_heights[txid]) or (await self.headers.get(merkle_height))
except IndexError:
log.warning("failed to verify %s at height %i", tx.id, merkle_height)
else:
header_cache[remote_heights[txid]] = header
tx.position = merkle['pos']
tx.is_verified = merkle_root == header['merkle_root']
check_db_for_txos = [] check_db_for_txos = []
for txi in tx.inputs: for txi in tx.inputs:
@ -783,6 +766,7 @@ class Ledger(metaclass=LedgerRegistry):
synced_txs.append(tx) synced_txs.append(tx)
this_batch_synced.append(tx) this_batch_synced.append(tx)
await self.db.save_transaction_io_batch( await self.db.save_transaction_io_batch(
this_batch_synced, address, self.address_to_hash160(address), "" this_batch_synced, address, self.address_to_hash160(address), ""
) )
@ -794,8 +778,10 @@ class Ledger(metaclass=LedgerRegistry):
if last_showed_synced_count + 100 < len(synced_txs): if last_showed_synced_count + 100 < len(synced_txs):
log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address)
last_showed_synced_count = len(synced_txs) last_showed_synced_count = len(synced_txs)
for batch in batches: for batch in batches:
await _single_batch(batch) await _single_batch(batch)
return synced_txs return synced_txs
async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: