fix mempool txs being re-requested with each address notification

This commit is contained in:
Jack Robison 2020-12-08 12:45:45 -05:00
parent 429c0951f3
commit b8c16d8ac5
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 12 additions and 13 deletions

View file

@ -513,7 +513,6 @@ class Ledger(metaclass=LedgerRegistry):
return True
acquire_lock_tasks = []
synced_txs = []
to_request = {}
pending_synced_history = {}
updated_cached_items = {}
@ -531,9 +530,16 @@ class Ledger(metaclass=LedgerRegistry):
cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item
unsynced_offset = already_synced_offset
for txid, remote_height in remote_history[already_synced_offset:]:
cache_item = self._tx_cache[txid]
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
if cache_item.tx is not None and cache_item.tx.height >= remote_height \
and (not cache_item.tx.is_verified or remote_height < 1):
pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:'
already_synced.add((txid, cache_item.tx.height))
else:
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
unsynced_offset += 1
if acquire_lock_tasks:
await asyncio.wait(acquire_lock_tasks)
@ -550,23 +556,16 @@ class Ledger(metaclass=LedgerRegistry):
assert cache_item is not None, 'cache item is none'
assert cache_item.lock.locked(), 'cache lock is not held?'
# tx = cache_item.tx
# if cache_item.tx is not None and \
# cache_item.tx.height >= remote_height and \
# (cache_item.tx.is_verified or remote_height < 1):
# synced_txs.append(cache_item.tx) # cached tx is already up-to-date
# pending_synced_history[i] = f'{tx.id}:{tx.height}:'
# continue
to_request[i] = (txid, remote_height)
log.debug(
"request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs),
len(remote_history), address
"request %i transactions, %i/%i for %s are already synced", len(to_request),
len(pending_synced_history), len(remote_history), address
)
requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address)
for tx in requested_txes:
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
synced_txs.append(tx)
assert len(pending_synced_history) == len(remote_history), \
f"{len(pending_synced_history)} vs {len(remote_history)}"

View file

@ -651,7 +651,7 @@ class SessionManager:
]
if header_tasks:
self.logger.info(f'notify {len(header_tasks)} sessions of new header')
await asyncio.wait(header_tasks)
asyncio.create_task(asyncio.wait(header_tasks))
for hashX in touched.intersection(self.mempool_statuses.keys()):
self.mempool_statuses.pop(hashX, None)