diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 7e469bd74..3661bb977 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -201,18 +201,28 @@ class BlockProcessor: return await asyncio.shield(run_in_thread()) async def check_mempool(self): - current_mempool = { - k.tx_hash: v.raw_tx for (k, v) in self.db.prefix_db.mempool_tx.iterate() - } + def fetch_mempool(mempool_prefix): + return { + k.tx_hash: v.raw_tx for (k, v) in mempool_prefix.iterate() + } + + def update_mempool(mempool_prefix, to_put, to_delete): + for tx_hash, raw_tx in to_put: + mempool_prefix.stage_put((tx_hash,), (raw_tx,)) + for tx_hash, raw_tx in to_delete.items(): + mempool_prefix.stage_delete((tx_hash,), (raw_tx,)) + + current_mempool = await self.run_in_thread_with_lock(fetch_mempool, self.db.prefix_db.mempool_tx) + + _to_put = [] for hh in await self.daemon.mempool_hashes(): tx_hash = bytes.fromhex(hh)[::-1] if tx_hash in current_mempool: current_mempool.pop(tx_hash) else: - raw_tx = bytes.fromhex(await self.daemon.getrawtransaction(hh)) - self.db.prefix_db.mempool_tx.stage_put((tx_hash,), (raw_tx,)) - for tx_hash, raw_tx in current_mempool.items(): - self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), (raw_tx,)) + _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh)))) + + await self.run_in_thread_with_lock(update_mempool, self.db.prefix_db.mempool_tx, _to_put, current_mempool) async def check_and_advance_blocks(self, raw_blocks): """Process the list of raw blocks passed. Detects and handles @@ -1571,7 +1581,7 @@ class BlockProcessor: if not blocks: try: await self.check_mempool() - self.db.prefix_db.unsafe_commit() + await self.run_in_thread_with_lock(self.db.prefix_db.unsafe_commit) except Exception: self.logger.exception("error while updating mempool txs") raise