This commit is contained in:
Jack Robison 2020-07-09 13:34:08 -04:00 committed by Lex Berezhny
parent fc1a06bc45
commit 9d5370be5f
2 changed files with 40 additions and 22 deletions

View file

@ -502,6 +502,12 @@ class Ledger(metaclass=LedgerRegistry):
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
we_need = set(remote_history) - set(local_history) we_need = set(remote_history) - set(local_history)
if not we_need: if not we_need:
remote_missing = set(local_history) - set(remote_history)
if remote_missing:
log.warning(
"%i transactions we have for %s are not in the remote address history",
len(remote_missing), address
)
return True return True
acquire_lock_tasks = [] acquire_lock_tasks = []
@ -511,18 +517,20 @@ class Ledger(metaclass=LedgerRegistry):
updated_cached_items = {} updated_cached_items = {}
already_synced = set() already_synced = set()
already_synced_offset = 0
for i, (txid, remote_height) in enumerate(remote_history): for i, (txid, remote_height) in enumerate(remote_history):
if not acquire_lock_tasks and i < len(local_history) and local_history[i] == (txid, remote_height): if i == already_synced_offset and i < len(local_history) and local_history[i] == (txid, remote_height):
pending_synced_history[i] = f'{txid}:{remote_height}:' pending_synced_history[i] = f'{txid}:{remote_height}:'
already_synced.add((txid, remote_height)) already_synced.add((txid, remote_height))
already_synced_offset += 1
continue continue
cache_item = self._tx_cache.get(txid) cache_item = self._tx_cache.get(txid)
if cache_item is None: if cache_item is None:
cache_item = TransactionCacheItem() cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item self._tx_cache[txid] = cache_item
if len(acquire_lock_tasks) > 10000:
await asyncio.wait(acquire_lock_tasks) for txid, remote_height in remote_history[already_synced_offset:]:
acquire_lock_tasks.clear() cache_item = self._tx_cache[txid]
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire())) acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
if acquire_lock_tasks: if acquire_lock_tasks:
@ -576,11 +584,9 @@ class Ledger(metaclass=LedgerRegistry):
log.warning("lock was already released?") log.warning("lock was already released?")
pass pass
log.info("updating address count and status")
await self.db.save_transaction_io_batch( await self.db.save_transaction_io_batch(
[], address, self.address_to_hash160(address), synced_history [], address, self.address_to_hash160(address), synced_history
) )
log.info("updated address count and status")
if address_manager is None: if address_manager is None:
address_manager = await self.get_address_manager_for_address(address) address_manager = await self.get_address_manager_for_address(address)
@ -623,7 +629,7 @@ class Ledger(metaclass=LedgerRegistry):
self._known_addresses_out_of_sync.add(address) self._known_addresses_out_of_sync.add(address)
return False return False
else: else:
log.info("synced %s", address) log.info("finished syncing transaction history for %s, %i known txs", address, len(local_history))
return True return True
async def cache_transaction(self, txid, remote_height, check_local=True): async def cache_transaction(self, txid, remote_height, check_local=True):
@ -682,14 +688,16 @@ class Ledger(metaclass=LedgerRegistry):
batches = [[]] batches = [[]]
remote_heights = {} remote_heights = {}
synced_txs = [] synced_txs = []
heights_in_batch = 1 heights_in_batch = 0
last_height = 0
for idx in sorted(to_request): for idx in sorted(to_request):
txid = to_request[idx][0] txid = to_request[idx][0]
height = to_request[idx][1] height = to_request[idx][1]
remote_heights[txid] = height remote_heights[txid] = height
if idx > 1 and height != remote_heights[batches[-1][-1]]: if height != last_height:
heights_in_batch += 1 heights_in_batch += 1
if len(batches[-1]) == 100 or heights_in_batch == 10: last_height = height
if len(batches[-1]) == 100 or heights_in_batch == 20:
batches.append([]) batches.append([])
heights_in_batch = 1 heights_in_batch = 1
batches[-1].append(txid) batches[-1].append(txid)
@ -728,10 +736,8 @@ class Ledger(metaclass=LedgerRegistry):
continue continue
cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id) cache_item = self._tx_cache.get(txi.txo_ref.tx_ref.id)
if cache_item is not None: if cache_item is not None:
if cache_item.tx is None: if cache_item.tx is not None:
await cache_item.has_tx.wait() txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref
assert cache_item.tx is not None
txi.txo_ref = cache_item.tx.outputs[txi.txo_ref.position].ref
else: else:
check_db_for_txos.append(txi.txo_ref.id) check_db_for_txos.append(txi.txo_ref.id)
@ -740,12 +746,20 @@ class Ledger(metaclass=LedgerRegistry):
txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True txoid__in=check_db_for_txos, order_by='txo.txoid', no_tx=True
) )
} }
for txi in tx.inputs: for txi in tx.inputs:
if txi.txo_ref.txo is not None: if txi.txo_ref.txo is not None:
continue continue
referenced_txo = referenced_txos.get(txi.txo_ref.id) referenced_txo = referenced_txos.get(txi.txo_ref.id)
if referenced_txo is not None: if referenced_txo is not None:
txi.txo_ref = referenced_txo.ref txi.txo_ref = referenced_txo.ref
continue
cache_item = self._tx_cache.get(txi.txo_ref.id)
if cache_item is None:
cache_item = self._tx_cache[txi.txo_ref.id] = TransactionCacheItem()
if cache_item.tx is not None:
txi.txo_ref = cache_item.tx.ref
synced_txs.append(tx) synced_txs.append(tx)
this_batch_synced.append(tx) this_batch_synced.append(tx)
await self.db.save_transaction_io_batch( await self.db.save_transaction_io_batch(
@ -759,11 +773,8 @@ class Ledger(metaclass=LedgerRegistry):
if last_showed_synced_count + 100 < len(synced_txs): if last_showed_synced_count + 100 < len(synced_txs):
log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address) log.info("synced %i/%i transactions for %s", len(synced_txs), remote_history_size, address)
last_showed_synced_count = len(synced_txs) last_showed_synced_count = len(synced_txs)
for batch in batches:
await asyncio.wait( await _single_batch(batch)
[_single_batch(batch) for batch in batches]
)
log.info("finished syncing history for %s", address)
return synced_txs return synced_txs
async def get_address_manager_for_address(self, address) -> Optional[AddressManager]: async def get_address_manager_for_address(self, address) -> Optional[AddressManager]:

View file

@ -35,11 +35,17 @@ class MockNetwork:
async def get_transaction_and_merkle(self, tx_hash, known_height=None): async def get_transaction_and_merkle(self, tx_hash, known_height=None):
tx = await self.get_transaction(tx_hash) tx = await self.get_transaction(tx_hash)
merkle = {} merkle = {'block_height': -1}
if known_height: if known_height:
merkle = await self.get_merkle(tx_hash, known_height) merkle = await self.get_merkle(tx_hash, known_height)
return tx, merkle return tx, merkle
async def get_transaction_batch(self, txids):
return {
txid: await self.get_transaction_and_merkle(txid)
for txid in txids
}
class LedgerTestCase(AsyncioTestCase): class LedgerTestCase(AsyncioTestCase):
@ -120,8 +126,9 @@ class TestSynchronization(LedgerTestCase):
self.ledger.network.get_history_called = [] self.ledger.network.get_history_called = []
self.ledger.network.get_transaction_called = [] self.ledger.network.get_transaction_called = []
for cache_item in self.ledger._tx_cache.values(): self.assertFalse(self.ledger._tx_cache[txid1].tx.is_verified)
cache_item.tx.is_verified = True self.assertFalse(self.ledger._tx_cache[txid2].tx.is_verified)
self.assertFalse(self.ledger._tx_cache[txid3].tx.is_verified)
await self.ledger.update_history(address, '') await self.ledger.update_history(address, '')
self.assertListEqual(self.ledger.network.get_history_called, [address]) self.assertListEqual(self.ledger.network.get_history_called, [address])
self.assertListEqual(self.ledger.network.get_transaction_called, []) self.assertListEqual(self.ledger.network.get_transaction_called, [])