don't deserialize mempool in a thread
This commit is contained in:
parent
2318e6d8e9
commit
751cc4c44d
1 changed files with 23 additions and 35 deletions
|
@ -6,13 +6,11 @@
|
||||||
# and warranty status of this software.
|
# and warranty status of this software.
|
||||||
|
|
||||||
"""Mempool handling."""
|
"""Mempool handling."""
|
||||||
import os
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import time
|
import time
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
|
||||||
from prometheus_client import Histogram
|
from prometheus_client import Histogram
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -117,7 +115,6 @@ class MemPool:
|
||||||
# Prevents mempool refreshes during fee histogram calculation
|
# Prevents mempool refreshes during fee histogram calculation
|
||||||
self.lock = asyncio.Lock()
|
self.lock = asyncio.Lock()
|
||||||
self.wakeup = asyncio.Event()
|
self.wakeup = asyncio.Event()
|
||||||
self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1))
|
|
||||||
self.mempool_process_time_metric = mempool_process_time_metric
|
self.mempool_process_time_metric = mempool_process_time_metric
|
||||||
|
|
||||||
async def _logging(self, synchronized_event):
|
async def _logging(self, synchronized_event):
|
||||||
|
@ -135,15 +132,11 @@ class MemPool:
|
||||||
await synchronized_event.wait()
|
await synchronized_event.wait()
|
||||||
|
|
||||||
async def _refresh_histogram(self, synchronized_event):
|
async def _refresh_histogram(self, synchronized_event):
|
||||||
try:
|
while True:
|
||||||
while True:
|
await synchronized_event.wait()
|
||||||
await synchronized_event.wait()
|
async with self.lock:
|
||||||
async with self.lock:
|
self._update_histogram(100_000)
|
||||||
# Threaded as can be expensive
|
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
|
||||||
await asyncio.get_event_loop().run_in_executor(self.executor, self._update_histogram, 100_000)
|
|
||||||
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
|
|
||||||
finally:
|
|
||||||
self.executor.shutdown(wait=True)
|
|
||||||
|
|
||||||
def _update_histogram(self, bin_size):
|
def _update_histogram(self, bin_size):
|
||||||
# Build a histogram by fee rate
|
# Build a histogram by fee rate
|
||||||
|
@ -286,30 +279,25 @@ class MemPool:
|
||||||
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes)
|
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes)
|
||||||
raw_txs = await self.api.raw_transactions(hex_hashes_iter)
|
raw_txs = await self.api.raw_transactions(hex_hashes_iter)
|
||||||
|
|
||||||
def deserialize_txs(): # This function is pure
|
to_hashX = self.coin.hashX_from_script
|
||||||
to_hashX = self.coin.hashX_from_script
|
deserializer = self.coin.DESERIALIZER
|
||||||
deserializer = self.coin.DESERIALIZER
|
|
||||||
|
|
||||||
txs = {}
|
tx_map = {}
|
||||||
for hash, raw_tx in zip(hashes, raw_txs):
|
for hash, raw_tx in zip(hashes, raw_txs):
|
||||||
# The daemon may have evicted the tx from its
|
# The daemon may have evicted the tx from its
|
||||||
# mempool or it may have gotten in a block
|
# mempool or it may have gotten in a block
|
||||||
if not raw_tx:
|
if not raw_tx:
|
||||||
continue
|
continue
|
||||||
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
|
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
|
||||||
# Convert the inputs and outputs into (hashX, value) pairs
|
# Convert the inputs and outputs into (hashX, value) pairs
|
||||||
# Drop generation-like inputs from MemPoolTx.prevouts
|
# Drop generation-like inputs from MemPoolTx.prevouts
|
||||||
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
|
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
|
||||||
for txin in tx.inputs
|
for txin in tx.inputs
|
||||||
if not txin.is_generation())
|
if not txin.is_generation())
|
||||||
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
|
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
|
||||||
for txout in tx.outputs)
|
for txout in tx.outputs)
|
||||||
txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
|
tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
|
||||||
0, tx_size)
|
0, tx_size)
|
||||||
return txs
|
|
||||||
|
|
||||||
# Thread this potentially slow operation so as not to block
|
|
||||||
tx_map = await asyncio.get_event_loop().run_in_executor(self.executor, deserialize_txs)
|
|
||||||
|
|
||||||
# Determine all prevouts not in the mempool, and fetch the
|
# Determine all prevouts not in the mempool, and fetch the
|
||||||
# UTXO information from the database. Failed prevout lookups
|
# UTXO information from the database. Failed prevout lookups
|
||||||
|
|
Loading…
Reference in a new issue