more efficient mempool tx refresh
-hopefully safer from the rare RocksIOError without a bare iterate()
This commit is contained in:
parent
c6e5c92765
commit
32b8afe154
3 changed files with 12 additions and 5 deletions
|
@ -169,8 +169,9 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
|
|
||||||
async def refresh_mempool(self):
|
async def refresh_mempool(self):
|
||||||
def fetch_mempool(mempool_prefix):
|
def fetch_mempool(mempool_prefix):
|
||||||
|
lower, upper = mempool_prefix.MIN_TX_HASH, mempool_prefix.MAX_TX_HASH
|
||||||
return {
|
return {
|
||||||
k.tx_hash: v.raw_tx for (k, v) in mempool_prefix.iterate()
|
k.tx_hash: v.raw_tx for (k, v) in mempool_prefix.iterate(start=(lower,), stop=(upper,))
|
||||||
}
|
}
|
||||||
|
|
||||||
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
|
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
|
||||||
|
|
|
@ -1513,6 +1513,9 @@ class MempoolTXPrefixRow(PrefixRow):
|
||||||
prefix = DB_PREFIXES.mempool_tx.value
|
prefix = DB_PREFIXES.mempool_tx.value
|
||||||
key_struct = struct.Struct(b'>32s')
|
key_struct = struct.Struct(b'>32s')
|
||||||
|
|
||||||
|
MAX_TX_HASH = b'\xff' * 32
|
||||||
|
MIN_TX_HASH = b'\x00' * 32
|
||||||
|
|
||||||
key_part_lambdas = [
|
key_part_lambdas = [
|
||||||
lambda: b'',
|
lambda: b'',
|
||||||
struct.Struct(b'>32s').pack
|
struct.Struct(b'>32s').pack
|
||||||
|
|
|
@ -55,17 +55,20 @@ class MemPool:
|
||||||
|
|
||||||
def refresh(self) -> typing.Set[bytes]: # returns list of new touched hashXs
|
def refresh(self) -> typing.Set[bytes]: # returns list of new touched hashXs
|
||||||
prefix_db = self._db.prefix_db
|
prefix_db = self._db.prefix_db
|
||||||
|
mempool_tx_hashes = set()
|
||||||
try:
|
try:
|
||||||
new_mempool = {k.tx_hash: v.raw_tx for k, v in prefix_db.mempool_tx.iterate()} # TODO: make this more efficient
|
lower, upper = prefix_db.mempool_tx.MIN_TX_HASH, prefix_db.mempool_tx.MAX_TX_HASH
|
||||||
|
for k, v in prefix_db.mempool_tx.iterate(start=(lower,), stop=(upper,)):
|
||||||
|
self.raw_mempool[k.tx_hash] = v.raw_tx
|
||||||
|
mempool_tx_hashes.add(k.tx_hash)
|
||||||
|
for removed_mempool_tx in set(self.raw_mempool.keys()).difference(mempool_tx_hashes):
|
||||||
|
self.raw_mempool.pop(removed_mempool_tx)
|
||||||
except rocksdb.errors.RocksIOError as err:
|
except rocksdb.errors.RocksIOError as err:
|
||||||
# FIXME: why does this happen? can it happen elsewhere?
|
# FIXME: why does this happen? can it happen elsewhere?
|
||||||
if err.args[0].startswith(b'IO error: No such file or directory: While open a file for random read:'):
|
if err.args[0].startswith(b'IO error: No such file or directory: While open a file for random read:'):
|
||||||
self.logger.error("failed to process mempool, retrying later")
|
self.logger.error("failed to process mempool, retrying later")
|
||||||
return set()
|
return set()
|
||||||
raise err
|
raise err
|
||||||
else:
|
|
||||||
self.raw_mempool.clear()
|
|
||||||
self.raw_mempool.update(new_mempool)
|
|
||||||
|
|
||||||
# hashXs = self.hashXs # hashX: [tx_hash, ...]
|
# hashXs = self.hashXs # hashX: [tx_hash, ...]
|
||||||
touched_hashXs = set()
|
touched_hashXs = set()
|
||||||
|
|
Loading…
Add table
Reference in a new issue