Merge pull request #2835 from lbryio/wallet-server-threadpools

Improve wallet server performance with separate thread pools for leveldb, the block processor, and mempool
This commit is contained in:
Jack Robison 2020-03-01 15:15:20 -05:00 committed by GitHub
commit 31c141e757
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 15 deletions

View file

@ -1,6 +1,7 @@
import time import time
import asyncio import asyncio
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
import lbry import lbry
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
@ -145,6 +146,7 @@ class BlockProcessor:
self.blocks_event = asyncio.Event() self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.executor = ThreadPoolExecutor(1)
# Meta # Meta
self.next_cache_check = 0 self.next_cache_check = 0
@ -165,6 +167,7 @@ class BlockProcessor:
self.state_lock = asyncio.Lock() self.state_lock = asyncio.Lock()
self.search_cache = {} self.search_cache = {}
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -173,7 +176,7 @@ class BlockProcessor:
# consistent and not being updated elsewhere. # consistent and not being updated elsewhere.
async def run_in_thread_locked(): async def run_in_thread_locked():
async with self.state_lock: async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(None, func, *args) return await asyncio.get_event_loop().run_in_executor(self.executor, func, *args)
return await asyncio.shield(run_in_thread_locked()) return await asyncio.shield(run_in_thread_locked())
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
@ -657,6 +660,7 @@ class BlockProcessor:
self.logger.info('flushing to DB for a clean shutdown...') self.logger.info('flushing to DB for a clean shutdown...')
await self.flush(True) await self.flush(True)
self.db.close() self.db.close()
self.executor.shutdown(wait=True)
def force_chain_reorg(self, count): def force_chain_reorg(self, count):
"""Force a reorg of the given number of blocks. """Force a reorg of the given number of blocks.

View file

@ -20,7 +20,7 @@ from collections import namedtuple
from functools import partial from functools import partial
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
import attr import attr
from lbry.wallet.server import util from lbry.wallet.server import util
@ -63,6 +63,7 @@ class LevelDB:
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.env = env self.env = env
self.coin = env.coin self.coin = env.coin
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
# Setup block header size handlers # Setup block header size handlers
if self.coin.STATIC_BLOCK_HEADERS: if self.coin.STATIC_BLOCK_HEADERS:
@ -139,6 +140,7 @@ class LevelDB:
def close(self): def close(self):
self.utxo_db.close() self.utxo_db.close()
self.history.close_db() self.history.close_db()
self.executor.shutdown(wait=True)
async def open_for_compacting(self): async def open_for_compacting(self):
await self._open_dbs(True, True) await self._open_dbs(True, True)
@ -406,7 +408,7 @@ class LevelDB:
return self.headers_file.read(offset, size), disk_count return self.headers_file.read(offset, size), disk_count
return b'', 0 return b'', 0
return await asyncio.get_event_loop().run_in_executor(None, read_headers) return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
def fs_tx_hash(self, tx_num): def fs_tx_hash(self, tx_num):
"""Return a par (tx_hash, tx_height) for the given tx number. """Return a par (tx_hash, tx_height) for the given tx number.
@ -445,7 +447,7 @@ class LevelDB:
return [fs_tx_hash(tx_num) for tx_num in tx_nums] return [fs_tx_hash(tx_num) for tx_num in tx_nums]
while True: while True:
history = await asyncio.get_event_loop().run_in_executor(None, read_history) history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
if all(hash is not None for hash, height in history): if all(hash is not None for hash, height in history):
return history return history
self.logger.warning(f'limited_history: tx hash ' self.logger.warning(f'limited_history: tx hash '
@ -611,7 +613,7 @@ class LevelDB:
return utxos return utxos
while True: while True:
utxos = await asyncio.get_event_loop().run_in_executor(None, read_utxos) utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos)
if all(utxo.tx_hash is not None for utxo in utxos): if all(utxo.tx_hash is not None for utxo in utxos):
return utxos return utxos
self.logger.warning(f'all_utxos: tx hash not ' self.logger.warning(f'all_utxos: tx hash not '
@ -664,5 +666,5 @@ class LevelDB:
return hashX, value return hashX, value
return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs] return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs]
hashX_pairs = await asyncio.get_event_loop().run_in_executor(None, lookup_hashXs) hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs)
return await asyncio.get_event_loop().run_in_executor(None, lookup_utxos, hashX_pairs) return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs)

View file

@ -6,12 +6,13 @@
# and warranty status of this software. # and warranty status of this software.
"""Mempool handling.""" """Mempool handling."""
import os
import asyncio import asyncio
import itertools import itertools
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
import attr import attr
@ -105,6 +106,7 @@ class MemPool:
# Prevents mempool refreshes during fee histogram calculation # Prevents mempool refreshes during fee histogram calculation
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.wakeup = asyncio.Event() self.wakeup = asyncio.Event()
self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1))
async def _logging(self, synchronized_event): async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats.""" """Print regular logs of mempool stats."""
@ -121,12 +123,15 @@ class MemPool:
await synchronized_event.wait() await synchronized_event.wait()
async def _refresh_histogram(self, synchronized_event): async def _refresh_histogram(self, synchronized_event):
try:
while True: while True:
await synchronized_event.wait() await synchronized_event.wait()
async with self.lock: async with self.lock:
# Threaded as can be expensive # Threaded as can be expensive
await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000) await asyncio.get_event_loop().run_in_executor(self.executor, self._update_histogram, 100_000)
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
finally:
self.executor.shutdown(wait=True)
def _update_histogram(self, bin_size): def _update_histogram(self, bin_size):
# Build a histogram by fee rate # Build a histogram by fee rate
@ -289,7 +294,7 @@ class MemPool:
return txs return txs
# Thread this potentially slow operation so as not to block # Thread this potentially slow operation so as not to block
tx_map = await asyncio.get_event_loop().run_in_executor(None, deserialize_txs) tx_map = await asyncio.get_event_loop().run_in_executor(self.executor, deserialize_txs)
# Determine all prevouts not in the mempool, and fetch the # Determine all prevouts not in the mempool, and fetch the
# UTXO information from the database. Failed prevout lookups # UTXO information from the database. Failed prevout lookups