Merge pull request #3105 from lbryio/dont-rerequest-mempool
fix mempool txs being re-requested
This commit is contained in:
commit
9db3d01e09
3 changed files with 16 additions and 17 deletions
|
@ -513,7 +513,6 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
acquire_lock_tasks = []
|
acquire_lock_tasks = []
|
||||||
synced_txs = []
|
|
||||||
to_request = {}
|
to_request = {}
|
||||||
pending_synced_history = {}
|
pending_synced_history = {}
|
||||||
updated_cached_items = {}
|
updated_cached_items = {}
|
||||||
|
@ -531,9 +530,16 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
cache_item = TransactionCacheItem()
|
cache_item = TransactionCacheItem()
|
||||||
self._tx_cache[txid] = cache_item
|
self._tx_cache[txid] = cache_item
|
||||||
|
|
||||||
|
unsynced_offset = already_synced_offset
|
||||||
for txid, remote_height in remote_history[already_synced_offset:]:
|
for txid, remote_height in remote_history[already_synced_offset:]:
|
||||||
cache_item = self._tx_cache[txid]
|
cache_item = self._tx_cache[txid]
|
||||||
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
|
if cache_item.tx is not None and cache_item.tx.height >= remote_height \
|
||||||
|
and (not cache_item.tx.is_verified or remote_height < 1):
|
||||||
|
pending_synced_history[unsynced_offset] = f'{txid}:{cache_item.tx.height}:'
|
||||||
|
already_synced.add((txid, cache_item.tx.height))
|
||||||
|
else:
|
||||||
|
acquire_lock_tasks.append(asyncio.create_task(cache_item.lock.acquire()))
|
||||||
|
unsynced_offset += 1
|
||||||
|
|
||||||
if acquire_lock_tasks:
|
if acquire_lock_tasks:
|
||||||
await asyncio.wait(acquire_lock_tasks)
|
await asyncio.wait(acquire_lock_tasks)
|
||||||
|
@ -550,23 +556,16 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
|
|
||||||
assert cache_item is not None, 'cache item is none'
|
assert cache_item is not None, 'cache item is none'
|
||||||
assert cache_item.lock.locked(), 'cache lock is not held?'
|
assert cache_item.lock.locked(), 'cache lock is not held?'
|
||||||
# tx = cache_item.tx
|
|
||||||
# if cache_item.tx is not None and \
|
|
||||||
# cache_item.tx.height >= remote_height and \
|
|
||||||
# (cache_item.tx.is_verified or remote_height < 1):
|
|
||||||
# synced_txs.append(cache_item.tx) # cached tx is already up-to-date
|
|
||||||
# pending_synced_history[i] = f'{tx.id}:{tx.height}:'
|
|
||||||
# continue
|
|
||||||
to_request[i] = (txid, remote_height)
|
to_request[i] = (txid, remote_height)
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"request %i transactions, %i/%i for %s are already synced", len(to_request), len(synced_txs),
|
"request %i transactions, %i/%i for %s are already synced", len(to_request),
|
||||||
len(remote_history), address
|
len(pending_synced_history), len(remote_history), address
|
||||||
)
|
)
|
||||||
requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address)
|
requested_txes = await self._request_transaction_batch(to_request, len(remote_history), address)
|
||||||
for tx in requested_txes:
|
for tx in requested_txes:
|
||||||
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
pending_synced_history[tx_indexes[tx.id]] = f"{tx.id}:{tx.height}:"
|
||||||
synced_txs.append(tx)
|
|
||||||
|
|
||||||
assert len(pending_synced_history) == len(remote_history), \
|
assert len(pending_synced_history) == len(remote_history), \
|
||||||
f"{len(pending_synced_history)} vs {len(remote_history)}"
|
f"{len(pending_synced_history)} vs {len(remote_history)}"
|
||||||
|
|
|
@ -67,17 +67,17 @@ def hash160(x):
|
||||||
return ripemd160(sha256(x))
|
return ripemd160(sha256(x))
|
||||||
|
|
||||||
|
|
||||||
def hash_to_hex_str(x):
|
def hash_to_hex_str(x: bytes) -> str:
|
||||||
"""Convert a big-endian binary hash to displayed hex string.
|
"""Convert a big-endian binary hash to displayed hex string.
|
||||||
|
|
||||||
Display form of a binary hash is reversed and converted to hex.
|
Display form of a binary hash is reversed and converted to hex.
|
||||||
"""
|
"""
|
||||||
return bytes(reversed(x)).hex()
|
return x[::-1].hex()
|
||||||
|
|
||||||
|
|
||||||
def hex_str_to_hash(x):
|
def hex_str_to_hash(x: str) -> bytes:
|
||||||
"""Convert a displayed hex string to a binary hash."""
|
"""Convert a displayed hex string to a binary hash."""
|
||||||
return bytes(reversed(hex_to_bytes(x)))
|
return hex_to_bytes(x)[::-1]
|
||||||
|
|
||||||
|
|
||||||
class Base58Error(Exception):
|
class Base58Error(Exception):
|
||||||
|
|
|
@ -651,7 +651,7 @@ class SessionManager:
|
||||||
]
|
]
|
||||||
if header_tasks:
|
if header_tasks:
|
||||||
self.logger.info(f'notify {len(header_tasks)} sessions of new header')
|
self.logger.info(f'notify {len(header_tasks)} sessions of new header')
|
||||||
await asyncio.wait(header_tasks)
|
asyncio.create_task(asyncio.wait(header_tasks))
|
||||||
for hashX in touched.intersection(self.mempool_statuses.keys()):
|
for hashX in touched.intersection(self.mempool_statuses.keys()):
|
||||||
self.mempool_statuses.pop(hashX, None)
|
self.mempool_statuses.pop(hashX, None)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue