update mempool in thread

This commit is contained in:
Jack Robison 2022-01-21 11:34:01 -05:00
parent b82dc8e45f
commit 876a72f18d
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -201,18 +201,28 @@ class BlockProcessor:
return await asyncio.shield(run_in_thread()) return await asyncio.shield(run_in_thread())
async def check_mempool(self): async def check_mempool(self):
current_mempool = { def fetch_mempool(mempool_prefix):
k.tx_hash: v.raw_tx for (k, v) in self.db.prefix_db.mempool_tx.iterate() return {
k.tx_hash: v.raw_tx for (k, v) in mempool_prefix.iterate()
} }
def update_mempool(mempool_prefix, to_put, to_delete):
for tx_hash, raw_tx in to_put:
mempool_prefix.stage_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items():
mempool_prefix.stage_delete((tx_hash,), (raw_tx,))
current_mempool = await self.run_in_thread_with_lock(fetch_mempool, self.db.prefix_db.mempool_tx)
_to_put = []
for hh in await self.daemon.mempool_hashes(): for hh in await self.daemon.mempool_hashes():
tx_hash = bytes.fromhex(hh)[::-1] tx_hash = bytes.fromhex(hh)[::-1]
if tx_hash in current_mempool: if tx_hash in current_mempool:
current_mempool.pop(tx_hash) current_mempool.pop(tx_hash)
else: else:
raw_tx = bytes.fromhex(await self.daemon.getrawtransaction(hh)) _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh))))
self.db.prefix_db.mempool_tx.stage_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in current_mempool.items(): await self.run_in_thread_with_lock(update_mempool, self.db.prefix_db.mempool_tx, _to_put, current_mempool)
self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), (raw_tx,))
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
"""Process the list of raw blocks passed. Detects and handles """Process the list of raw blocks passed. Detects and handles
@ -1571,7 +1581,7 @@ class BlockProcessor:
if not blocks: if not blocks:
try: try:
await self.check_mempool() await self.check_mempool()
self.db.prefix_db.unsafe_commit() await self.run_in_thread_with_lock(self.db.prefix_db.unsafe_commit)
except Exception: except Exception:
self.logger.exception("error while updating mempool txs") self.logger.exception("error while updating mempool txs")
raise raise