Compare commits

...
Sign in to create a new pull request.

14 commits

Author SHA1 Message Date
Victor Shyba
1f64275da1
fix ordering 2021-10-14 19:45:41 -04:00
Victor Shyba
8c605864e7
ignore request if protocol closed 2021-10-14 19:45:41 -04:00
Victor Shyba
6ec88e2101
initialize new sessions with high priority 2021-10-14 19:45:41 -04:00
Victor Shyba
17eeafd62c
schedule_requests is not async 2021-10-14 19:45:41 -04:00
Victor Shyba
28e4b3eefd
request scheduler 2021-10-14 19:45:41 -04:00
Jack Robison
b2de89ca29
sleeps 2021-10-14 18:57:46 -04:00
Jack Robison
8bfff2d549
improve resolve caching 2021-10-14 16:08:28 -04:00
Jack Robison
39d5078788
threadpools for block processor and es sync reader 2021-10-14 15:40:20 -04:00
Jack Robison
f8c46647d7
doc strings 2021-10-14 13:17:16 -04:00
Jack Robison
82de92c324
claim producer fix 2021-10-13 15:24:25 -04:00
Jack Robison
3dc3792478
improve claims_producer performance 2021-10-13 14:09:16 -04:00
Jack Robison
ba1d0a12d1
resolve lru cache 2021-10-13 10:18:03 -04:00
Jack Robison
e75047a0ab
add block_txs index 2021-10-12 11:18:11 -04:00
Jack Robison
83a167bd37
smaller caches 2021-10-12 11:18:11 -04:00
12 changed files with 229 additions and 172 deletions

View file

@ -424,10 +424,11 @@ class RPCSession(SessionBase):
self.max_errors = 0 self.max_errors = 0
self._bump_errors() self._bump_errors()
else: else:
for request in requests: self.schedule_requests(requests)
await self._task_group.add(self._handle_request(request))
async def _handle_request(self, request): async def _handle_request(self, request):
if self.is_closing():
return
start = time.perf_counter() start = time.perf_counter()
try: try:
result = await self.handle_request(request) result = await self.handle_request(request)
@ -472,6 +473,10 @@ class RPCSession(SessionBase):
async def handle_request(self, request): async def handle_request(self, request):
pass pass
def schedule_requests(self, requests):
for request in requests:
self._task_group.add(self._handle_request(request))
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
"""Send an RPC request over the network.""" """Send an RPC request over the network."""
if self.is_closing(): if self.is_closing():

View file

@ -3,14 +3,16 @@ import asyncio
import typing import typing
from bisect import bisect_right from bisect import bisect_right
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
from collections import defaultdict from collections import defaultdict
import lbry import lbry
from lbry.schema.url import URL
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
from lbry.utils import LRUCache
from lbry.wallet.transaction import OutputScript, Output, Transaction from lbry.wallet.transaction import OutputScript, Output, Transaction
from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.tx import Tx, TxOutput, TxInput
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
@ -202,6 +204,8 @@ class BlockProcessor:
self.env = env self.env = env
self.db = db self.db = db
self.daemon = daemon self.daemon = daemon
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.coin = env.coin self.coin = env.coin
@ -231,6 +235,9 @@ class BlockProcessor:
self.db_op_stack: Optional[RevertableOpStack] = None self.db_op_stack: Optional[RevertableOpStack] = None
# self.search_cache = {} # self.search_cache = {}
self.resolve_cache = LRUCache(2**16)
self.resolve_outputs_cache = LRUCache(2 ** 16)
self.history_cache = {} self.history_cache = {}
self.status_server = StatusServer() self.status_server = StatusServer()
@ -297,7 +304,11 @@ class BlockProcessor:
for claim_hash in self.removed_claims_to_send_es: for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex() yield 'delete', claim_hash.hex()
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
to_update = await asyncio.get_event_loop().run_in_executor(
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
)
for claim in to_update:
yield 'update', claim yield 'update', claim
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
@ -308,13 +319,12 @@ class BlockProcessor:
# consistent and not being updated elsewhere. # consistent and not being updated elsewhere.
async def run_in_thread_locked(): async def run_in_thread_locked():
async with self.state_lock: async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(None, func, *args) return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread_locked()) return await asyncio.shield(run_in_thread_locked())
@staticmethod async def run_in_thread(self, func, *args):
async def run_in_thread(func, *args):
async def run_in_thread(): async def run_in_thread():
return await asyncio.get_event_loop().run_in_executor(None, func, *args) return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread()) return await asyncio.shield(run_in_thread())
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
@ -1440,6 +1450,7 @@ class BlockProcessor:
self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),)) self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,)) self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
for tx, tx_hash in txs: for tx, tx_hash in txs:
spent_claims = {} spent_claims = {}
@ -1580,6 +1591,8 @@ class BlockProcessor:
self.pending_transaction_num_mapping.clear() self.pending_transaction_num_mapping.clear()
self.pending_transactions.clear() self.pending_transactions.clear()
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.resolve_cache.clear()
self.resolve_outputs_cache.clear()
async def backup_block(self): async def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0
@ -1742,5 +1755,6 @@ class BlockProcessor:
self.status_server.stop() self.status_server.stop()
# Shut down block processing # Shut down block processing
self.logger.info('closing the DB for a clean shutdown...') self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True)
self.db.close() self.db.close()
# self.executor.shutdown(wait=True)

View file

@ -55,7 +55,7 @@ class Daemon:
self.available_rpcs = {} self.available_rpcs = {}
self.connector = aiohttp.TCPConnector() self.connector = aiohttp.TCPConnector()
self._block_hash_cache = LRUCacheWithMetrics(100000) self._block_hash_cache = LRUCacheWithMetrics(100000)
self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE) self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
async def close(self): async def close(self):
if self.connector: if self.connector:

View file

@ -39,3 +39,4 @@ class DB_PREFIXES(enum.Enum):
db_state = b's' db_state = b's'
channel_count = b'Z' channel_count = b'Z'
support_amount = b'a' support_amount = b'a'
block_txs = b'b'

View file

@ -51,9 +51,7 @@ class SearchIndex:
self.index = index_prefix + 'claims' self.index = index_prefix + 'claims'
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.claim_cache = LRUCache(2 ** 15) self.claim_cache = LRUCache(2 ** 15)
self.short_id_cache = LRUCache(2 ** 17)
self.search_cache = LRUCache(2 ** 17) self.search_cache = LRUCache(2 ** 17)
self.resolution_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host self._elastic_host = elastic_host
self._elastic_port = elastic_port self._elastic_port = elastic_port
self._trending_half_life = half_life self._trending_half_life = half_life
@ -260,9 +258,7 @@ class SearchIndex:
def clear_caches(self): def clear_caches(self):
self.search_cache.clear() self.search_cache.clear()
self.short_id_cache.clear()
self.claim_cache.clear() self.claim_cache.clear()
self.resolution_cache.clear()
async def cached_search(self, kwargs): async def cached_search(self, kwargs):
total_referenced = [] total_referenced = []
@ -354,21 +350,6 @@ class SearchIndex:
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])): for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
self.claim_cache.set(result['claim_id'], result) self.claim_cache.set(result['claim_id'], result)
async def full_id_from_short_id(self, name, short_id, channel_id=None):
key = '#'.join((channel_id or '', name, short_id))
if key not in self.short_id_cache:
query = {'name': name, 'claim_id': short_id}
if channel_id:
query['channel_id'] = channel_id
query['order_by'] = ['^channel_join']
query['signature_valid'] = True
else:
query['order_by'] = '^creation_height'
result, _, _ = await self.search(**query, limit=1)
if len(result) == 1:
result = result[0]['claim_id']
self.short_id_cache[key] = result
return self.short_id_cache.get(key, None)
async def search(self, **kwargs): async def search(self, **kwargs):
try: try:

View file

@ -169,6 +169,14 @@ class BlockHashValue(NamedTuple):
return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})" return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})"
class BlockTxsKey(NamedTuple):
height: int
class BlockTxsValue(NamedTuple):
tx_hashes: typing.List[bytes]
class TxCountKey(NamedTuple): class TxCountKey(NamedTuple):
height: int height: int
@ -1540,6 +1548,36 @@ class DBStatePrefixRow(PrefixRow):
) )
class BlockTxsPrefixRow(PrefixRow):
prefix = DB_PREFIXES.block_txs.value
key_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@classmethod
def unpack_key(cls, key: bytes) -> BlockTxsKey:
return BlockTxsKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, tx_hashes: typing.List[bytes]) -> bytes:
assert all(len(tx_hash) == 32 for tx_hash in tx_hashes)
return b''.join(tx_hashes)
@classmethod
def unpack_value(cls, data: bytes) -> BlockTxsValue:
return BlockTxsValue([data[i*32:(i+1)*32] for i in range(len(data) // 32)])
@classmethod
def pack_item(cls, height, tx_hashes):
return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage): class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int): def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel import plyvel
@ -1604,6 +1642,7 @@ class HubDB(PrefixDB):
self.channel_count = ChannelCountPrefixRow(db, self._op_stack) self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
self.db_state = DBStatePrefixRow(db, self._op_stack) self.db_state = DBStatePrefixRow(db, self._op_stack)
self.support_amount = SupportAmountPrefixRow(db, self._op_stack) self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
self.block_txs = BlockTxsPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -83,11 +83,26 @@ class OpStackIntegrity(Exception):
class RevertableOpStack: class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None): def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
"""
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
are not deleted, and that when keys are deleted the current value is correctly known so that the delete
may be undone. When putting values, the integrity checks assure that existing values are not overwritten
without first being deleted. Updates are performed by applying a delete op for the old value and a put op
for the new value.
:param get_fn: getter function from an object implementing `KeyValueStorage`
:param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged
"""
self._get = get_fn self._get = get_fn
self._items = defaultdict(list) self._items = defaultdict(list)
self._unsafe_prefixes = unsafe_prefixes or set() self._unsafe_prefixes = unsafe_prefixes or set()
def append_op(self, op: RevertableOp): def append_op(self, op: RevertableOp):
"""
Apply a put or delete op, checking that it introduces no integrity errors
"""
inverted = op.invert() inverted = op.invert()
if self._items[op.key] and inverted == self._items[op.key][-1]: if self._items[op.key] and inverted == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
@ -119,6 +134,9 @@ class RevertableOpStack:
self._items[op.key].append(op) self._items[op.key].append(op)
def extend_ops(self, ops: Iterable[RevertableOp]): def extend_ops(self, ops: Iterable[RevertableOp]):
"""
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
"""
for op in ops: for op in ops:
self.append_op(op) self.append_op(op)
@ -139,9 +157,15 @@ class RevertableOpStack:
yield op yield op
def get_undo_ops(self) -> bytes: def get_undo_ops(self) -> bytes:
"""
Get the serialized bytes to undo all of the changes made by the pending ops
"""
return b''.join(op.invert().pack() for op in reversed(self)) return b''.join(op.invert().pack() for op in reversed(self))
def apply_packed_undo_ops(self, packed: bytes): def apply_packed_undo_ops(self, packed: bytes):
"""
Unpack and apply a sequence of undo ops from serialized undo bytes
"""
while packed: while packed:
op, packed = RevertableOp.unpack(packed) op, packed = RevertableOp.unpack(packed)
self.append_op(op) self.append_op(op)

View file

@ -112,7 +112,7 @@ class LevelDB:
self.merkle = Merkle() self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
@ -696,23 +696,21 @@ class LevelDB:
yield meta yield meta
batch.clear() batch.clear()
async def claims_producer(self, claim_hashes: Set[bytes]): def claims_producer(self, claim_hashes: Set[bytes]):
batch = [] batch = []
results = [] results = []
loop = asyncio.get_event_loop() for claim_hash in claim_hashes:
def produce_claim(claim_hash):
if claim_hash not in self.claim_to_txo: if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return continue
name = self.claim_to_txo[claim_hash].normalized_name name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name): if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return continue
claim_txo = self.claim_to_txo.get(claim_hash) claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo: if not claim_txo:
return continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position) activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result( claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
@ -721,25 +719,13 @@ class LevelDB:
if claim: if claim:
batch.append(claim) batch.append(claim)
def get_metadata(claim):
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
results.append(meta)
if claim_hashes:
await asyncio.wait(
[loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes]
)
batch.sort(key=lambda x: x.tx_hash) batch.sort(key=lambda x: x.tx_hash)
if batch: for claim in batch:
await asyncio.wait( _meta = self._prepare_claim_metadata(claim.claim_hash, claim)
[loop.run_in_executor(None, get_metadata, claim) for claim in batch] if _meta:
) results.append(_meta)
for meta in results: return results
yield meta
batch.clear()
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list) activated = defaultdict(list)
@ -923,57 +909,51 @@ class LevelDB:
return None, tx_height return None, tx_height
def get_block_txs(self, height: int) -> List[bytes]: def get_block_txs(self, height: int) -> List[bytes]:
return [ return self.prefix_db.block_txs.get(height).tx_hashes
tx_hash for tx_hash in self.prefix_db.tx_hash.iterate(
start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],),
deserialize_value=False, include_key=False
)
]
def _fs_transactions(self, txids: Iterable[str]): async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
tx_counts = self.tx_counts
tx_db_get = self.prefix_db.tx.get
tx_cache = self._tx_and_merkle_cache
tx_infos = {} tx_infos = {}
for tx_hash in tx_hashes:
for tx_hash in txids: tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
cached_tx = tx_cache.get(tx_hash) None, self._get_transaction_and_merkle, tx_hash
if cached_tx: )
tx, merkle = cached_tx await asyncio.sleep(0)
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(tx_counts, tx_num)
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
tx_cache[tx_hash] = tx, merkle
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
return tx_infos return tx_infos
async def fs_transactions(self, txids): def _get_transaction_and_merkle(self, tx_hash):
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids) cached_tx = self._tx_and_merkle_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(self.tx_counts, tx_num)
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - self.tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
self._tx_and_merkle_cache[tx_hash] = tx, merkle
return (None if not tx else tx.hex(), merkle)
async def fs_block_hashes(self, height, count): async def fs_block_hashes(self, height, count):
if height + count > len(self.headers): if height + count > len(self.headers):

View file

@ -69,7 +69,7 @@ class Server:
def run(self): def run(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers) executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor) loop.set_default_executor(executor)
def __exit(): def __exit():

View file

@ -2,7 +2,6 @@ import os
import ssl import ssl
import math import math
import time import time
import json
import base64 import base64
import codecs import codecs
import typing import typing
@ -197,6 +196,8 @@ class SessionManager:
self.notified_height: typing.Optional[int] = None self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
self.consumers = 32
self.priority_queue = asyncio.PriorityQueue()
self.session_event = Event() self.session_event = Event()
@ -546,8 +547,13 @@ class SessionManager:
# because we connect to ourself # because we connect to ourself
await asyncio.wait([ await asyncio.wait([
self._clear_stale_sessions(), self._clear_stale_sessions(),
self._manage_servers() self._manage_servers(),
self.serve_requests()
]) ])
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
log.exception("hub server died")
raise err
finally: finally:
await self._close_servers(list(self.servers.keys())) await self._close_servers(list(self.servers.keys()))
log.warning("disconnect %i sessions", len(self.sessions)) log.warning("disconnect %i sessions", len(self.sessions))
@ -557,6 +563,18 @@ class SessionManager:
]) ])
await self.stop_other() await self.stop_other()
async def serve_requests(self):
async def consumer():
while True:
_, _, fut = await self.priority_queue.get()
try:
await fut
except asyncio.CancelledError:
raise
except Exception as e:
log.exception("raised while serving a request. This should never happen.")
await asyncio.gather(*(consumer() for _ in range(self.consumers)))
async def start_other(self): async def start_other(self):
pass pass
@ -633,7 +651,7 @@ class SessionManager:
self.mempool_statuses.pop(hashX, None) self.mempool_statuses.pop(hashX, None)
await asyncio.get_event_loop().run_in_executor( await asyncio.get_event_loop().run_in_executor(
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys() self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
) )
if touched or new_touched or (height_changed and self.mempool_statuses): if touched or new_touched or (height_changed and self.mempool_statuses):
@ -775,10 +793,9 @@ class LBRYSessionManager(SessionManager):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.query_executor = None
self.websocket = None self.websocket = None
# self.metrics = ServerLoadData() # self.metrics = ServerLoadData()
self.metrics_loop = None # self.metrics_loop = None
self.running = False self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None: if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self) self.websocket = AdminWebSocket(self)
@ -795,12 +812,6 @@ class LBRYSessionManager(SessionManager):
async def start_other(self): async def start_other(self):
self.running = True self.running = True
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1)
else:
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
)
if self.websocket is not None: if self.websocket is not None:
await self.websocket.start() await self.websocket.start()
@ -808,7 +819,6 @@ class LBRYSessionManager(SessionManager):
self.running = False self.running = False
if self.websocket is not None: if self.websocket is not None:
await self.websocket.stop() await self.websocket.stop()
self.query_executor.shutdown()
class LBRYElectrumX(SessionBase): class LBRYElectrumX(SessionBase):
@ -880,6 +890,14 @@ class LBRYElectrumX(SessionBase):
self.daemon = self.session_mgr.daemon self.daemon = self.session_mgr.daemon
self.bp: BlockProcessor = self.session_mgr.bp self.bp: BlockProcessor = self.session_mgr.bp
self.db: LevelDB = self.bp.db self.db: LevelDB = self.bp.db
self.last_request_received_at = 0
def schedule_requests(self, requests):
for request in requests:
current = time.perf_counter()
elapsed = (1 << 65) - current - self.last_request_received_at
self.last_request_received_at = current
self.session_mgr.priority_queue.put_nowait((elapsed, id(request), self._handle_request(request)))
@classmethod @classmethod
def protocol_min_max_strings(cls): def protocol_min_max_strings(cls):
@ -971,24 +989,6 @@ class LBRYElectrumX(SessionBase):
# else: # else:
# return APICallMetrics(query_name) # return APICallMetrics(query_name)
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
self.session_mgr.pending_query_metric.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
except asyncio.CancelledError:
raise
except Exception:
log.exception("dear devs, please handle this exception better")
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
return base64.b64encode(result).decode()
finally:
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
# async def run_and_cache_query(self, query_name, kwargs): # async def run_and_cache_query(self, query_name, kwargs):
# start = time.perf_counter() # start = time.perf_counter()
@ -1036,41 +1036,52 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.pending_query_metric.dec() self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
def _claimtrie_resolve(self, *urls): async def _cached_resolve_url(self, url):
rows, extra = [], [] if url not in self.bp.resolve_cache:
for url in urls: self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
self.session_mgr.urls_to_resolve_count_metric.inc() return self.bp.resolve_cache[url]
stream, channel, repost, reposted_channel = self.db._resolve(url)
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None)
async def claimtrie_resolve(self, *urls): async def claimtrie_resolve(self, *urls) -> str:
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls) sorted_urls = tuple(sorted(urls))
self.session_mgr.resolved_url_count_metric.inc(len(urls)) self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
return result try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
rows, extra = [], []
for url in urls:
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None
)
return result
finally:
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.bp.height
@ -1221,9 +1232,11 @@ class LBRYElectrumX(SessionBase):
address: the address to subscribe to""" address: the address to subscribe to"""
if len(addresses) > 1000: if len(addresses) > 1000:
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
return [ results = []
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses for address in addresses:
] results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
await asyncio.sleep(0)
return results
async def address_unsubscribe(self, address): async def address_unsubscribe(self, address):
"""Unsubscribe an address. """Unsubscribe an address.
@ -1472,7 +1485,7 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
assert_tx_hash(tx_hash) assert_tx_hash(tx_hash)
batch_result = await self.db.fs_transactions(tx_hashes) batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
needed_merkles = {} needed_merkles = {}
for tx_hash in tx_hashes: for tx_hash in tx_hashes:

View file

@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids) txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')

View file

@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
txids = [ txids = [
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height) tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
] ]
txs = await bp.db.fs_transactions(txids) txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')