forked from LBRYCommunity/lbry-sdk
Compare commits
14 commits
master
...
crazy_hub_
Author | SHA1 | Date | |
---|---|---|---|
|
1f64275da1 | ||
|
8c605864e7 | ||
|
6ec88e2101 | ||
|
17eeafd62c | ||
|
28e4b3eefd | ||
|
b2de89ca29 | ||
|
8bfff2d549 | ||
|
39d5078788 | ||
|
f8c46647d7 | ||
|
82de92c324 | ||
|
3dc3792478 | ||
|
ba1d0a12d1 | ||
|
e75047a0ab | ||
|
83a167bd37 |
12 changed files with 229 additions and 172 deletions
|
@ -424,10 +424,11 @@ class RPCSession(SessionBase):
|
|||
self.max_errors = 0
|
||||
self._bump_errors()
|
||||
else:
|
||||
for request in requests:
|
||||
await self._task_group.add(self._handle_request(request))
|
||||
self.schedule_requests(requests)
|
||||
|
||||
async def _handle_request(self, request):
|
||||
if self.is_closing():
|
||||
return
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
result = await self.handle_request(request)
|
||||
|
@ -472,6 +473,10 @@ class RPCSession(SessionBase):
|
|||
async def handle_request(self, request):
|
||||
pass
|
||||
|
||||
def schedule_requests(self, requests):
|
||||
for request in requests:
|
||||
self._task_group.add(self._handle_request(request))
|
||||
|
||||
async def send_request(self, method, args=()):
|
||||
"""Send an RPC request over the network."""
|
||||
if self.is_closing():
|
||||
|
|
|
@ -3,14 +3,16 @@ import asyncio
|
|||
import typing
|
||||
from bisect import bisect_right
|
||||
from struct import pack, unpack
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
|
||||
from prometheus_client import Gauge, Histogram
|
||||
from collections import defaultdict
|
||||
|
||||
import lbry
|
||||
from lbry.schema.url import URL
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
||||
|
||||
from lbry.utils import LRUCache
|
||||
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
||||
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
||||
from lbry.wallet.server.daemon import DaemonError
|
||||
|
@ -202,6 +204,8 @@ class BlockProcessor:
|
|||
self.env = env
|
||||
self.db = db
|
||||
self.daemon = daemon
|
||||
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
|
||||
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
|
||||
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
|
||||
self.shutdown_event = shutdown_event
|
||||
self.coin = env.coin
|
||||
|
@ -231,6 +235,9 @@ class BlockProcessor:
|
|||
self.db_op_stack: Optional[RevertableOpStack] = None
|
||||
|
||||
# self.search_cache = {}
|
||||
self.resolve_cache = LRUCache(2**16)
|
||||
self.resolve_outputs_cache = LRUCache(2 ** 16)
|
||||
|
||||
self.history_cache = {}
|
||||
self.status_server = StatusServer()
|
||||
|
||||
|
@ -297,7 +304,11 @@ class BlockProcessor:
|
|||
|
||||
for claim_hash in self.removed_claims_to_send_es:
|
||||
yield 'delete', claim_hash.hex()
|
||||
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
|
||||
|
||||
to_update = await asyncio.get_event_loop().run_in_executor(
|
||||
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
|
||||
)
|
||||
for claim in to_update:
|
||||
yield 'update', claim
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
|
@ -308,13 +319,12 @@ class BlockProcessor:
|
|||
# consistent and not being updated elsewhere.
|
||||
async def run_in_thread_locked():
|
||||
async with self.state_lock:
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
|
||||
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
|
||||
return await asyncio.shield(run_in_thread_locked())
|
||||
|
||||
@staticmethod
|
||||
async def run_in_thread(func, *args):
|
||||
async def run_in_thread(self, func, *args):
|
||||
async def run_in_thread():
|
||||
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
|
||||
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
|
||||
return await asyncio.shield(run_in_thread())
|
||||
|
||||
async def check_and_advance_blocks(self, raw_blocks):
|
||||
|
@ -1440,6 +1450,7 @@ class BlockProcessor:
|
|||
|
||||
self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
|
||||
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
|
||||
self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
|
||||
|
||||
for tx, tx_hash in txs:
|
||||
spent_claims = {}
|
||||
|
@ -1580,6 +1591,8 @@ class BlockProcessor:
|
|||
self.pending_transaction_num_mapping.clear()
|
||||
self.pending_transactions.clear()
|
||||
self.pending_support_amount_change.clear()
|
||||
self.resolve_cache.clear()
|
||||
self.resolve_outputs_cache.clear()
|
||||
|
||||
async def backup_block(self):
|
||||
assert len(self.db.prefix_db._op_stack) == 0
|
||||
|
@ -1742,5 +1755,6 @@ class BlockProcessor:
|
|||
self.status_server.stop()
|
||||
# Shut down block processing
|
||||
self.logger.info('closing the DB for a clean shutdown...')
|
||||
self._sync_reader_executor.shutdown(wait=True)
|
||||
self._chain_executor.shutdown(wait=True)
|
||||
self.db.close()
|
||||
# self.executor.shutdown(wait=True)
|
||||
|
|
|
@ -55,7 +55,7 @@ class Daemon:
|
|||
self.available_rpcs = {}
|
||||
self.connector = aiohttp.TCPConnector()
|
||||
self._block_hash_cache = LRUCacheWithMetrics(100000)
|
||||
self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE)
|
||||
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
|
||||
|
||||
async def close(self):
|
||||
if self.connector:
|
||||
|
|
|
@ -39,3 +39,4 @@ class DB_PREFIXES(enum.Enum):
|
|||
db_state = b's'
|
||||
channel_count = b'Z'
|
||||
support_amount = b'a'
|
||||
block_txs = b'b'
|
||||
|
|
|
@ -51,9 +51,7 @@ class SearchIndex:
|
|||
self.index = index_prefix + 'claims'
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
self.claim_cache = LRUCache(2 ** 15)
|
||||
self.short_id_cache = LRUCache(2 ** 17)
|
||||
self.search_cache = LRUCache(2 ** 17)
|
||||
self.resolution_cache = LRUCache(2 ** 17)
|
||||
self._elastic_host = elastic_host
|
||||
self._elastic_port = elastic_port
|
||||
self._trending_half_life = half_life
|
||||
|
@ -260,9 +258,7 @@ class SearchIndex:
|
|||
|
||||
def clear_caches(self):
|
||||
self.search_cache.clear()
|
||||
self.short_id_cache.clear()
|
||||
self.claim_cache.clear()
|
||||
self.resolution_cache.clear()
|
||||
|
||||
async def cached_search(self, kwargs):
|
||||
total_referenced = []
|
||||
|
@ -354,21 +350,6 @@ class SearchIndex:
|
|||
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
|
||||
self.claim_cache.set(result['claim_id'], result)
|
||||
|
||||
async def full_id_from_short_id(self, name, short_id, channel_id=None):
|
||||
key = '#'.join((channel_id or '', name, short_id))
|
||||
if key not in self.short_id_cache:
|
||||
query = {'name': name, 'claim_id': short_id}
|
||||
if channel_id:
|
||||
query['channel_id'] = channel_id
|
||||
query['order_by'] = ['^channel_join']
|
||||
query['signature_valid'] = True
|
||||
else:
|
||||
query['order_by'] = '^creation_height'
|
||||
result, _, _ = await self.search(**query, limit=1)
|
||||
if len(result) == 1:
|
||||
result = result[0]['claim_id']
|
||||
self.short_id_cache[key] = result
|
||||
return self.short_id_cache.get(key, None)
|
||||
|
||||
async def search(self, **kwargs):
|
||||
try:
|
||||
|
|
|
@ -169,6 +169,14 @@ class BlockHashValue(NamedTuple):
|
|||
return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})"
|
||||
|
||||
|
||||
class BlockTxsKey(NamedTuple):
|
||||
height: int
|
||||
|
||||
|
||||
class BlockTxsValue(NamedTuple):
|
||||
tx_hashes: typing.List[bytes]
|
||||
|
||||
|
||||
class TxCountKey(NamedTuple):
|
||||
height: int
|
||||
|
||||
|
@ -1540,6 +1548,36 @@ class DBStatePrefixRow(PrefixRow):
|
|||
)
|
||||
|
||||
|
||||
class BlockTxsPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.block_txs.value
|
||||
key_struct = struct.Struct(b'>L')
|
||||
key_part_lambdas = [
|
||||
lambda: b'',
|
||||
struct.Struct(b'>L').pack
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, height: int):
|
||||
return super().pack_key(height)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> BlockTxsKey:
|
||||
return BlockTxsKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, tx_hashes: typing.List[bytes]) -> bytes:
|
||||
assert all(len(tx_hash) == 32 for tx_hash in tx_hashes)
|
||||
return b''.join(tx_hashes)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> BlockTxsValue:
|
||||
return BlockTxsValue([data[i*32:(i+1)*32] for i in range(len(data) // 32)])
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, height, tx_hashes):
|
||||
return cls.pack_key(height), cls.pack_value(tx_hashes)
|
||||
|
||||
|
||||
class LevelDBStore(KeyValueStorage):
|
||||
def __init__(self, path: str, cache_mb: int, max_open_files: int):
|
||||
import plyvel
|
||||
|
@ -1604,6 +1642,7 @@ class HubDB(PrefixDB):
|
|||
self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
|
||||
self.db_state = DBStatePrefixRow(db, self._op_stack)
|
||||
self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
|
||||
self.block_txs = BlockTxsPrefixRow(db, self._op_stack)
|
||||
|
||||
|
||||
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
||||
|
|
|
@ -83,11 +83,26 @@ class OpStackIntegrity(Exception):
|
|||
|
||||
class RevertableOpStack:
|
||||
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
|
||||
"""
|
||||
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
|
||||
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
|
||||
are not deleted, and that when keys are deleted the current value is correctly known so that the delete
|
||||
may be undone. When putting values, the integrity checks assure that existing values are not overwritten
|
||||
without first being deleted. Updates are performed by applying a delete op for the old value and a put op
|
||||
for the new value.
|
||||
|
||||
:param get_fn: getter function from an object implementing `KeyValueStorage`
|
||||
:param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged
|
||||
"""
|
||||
self._get = get_fn
|
||||
self._items = defaultdict(list)
|
||||
self._unsafe_prefixes = unsafe_prefixes or set()
|
||||
|
||||
def append_op(self, op: RevertableOp):
|
||||
"""
|
||||
Apply a put or delete op, checking that it introduces no integrity errors
|
||||
"""
|
||||
|
||||
inverted = op.invert()
|
||||
if self._items[op.key] and inverted == self._items[op.key][-1]:
|
||||
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
|
||||
|
@ -119,6 +134,9 @@ class RevertableOpStack:
|
|||
self._items[op.key].append(op)
|
||||
|
||||
def extend_ops(self, ops: Iterable[RevertableOp]):
|
||||
"""
|
||||
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
|
||||
"""
|
||||
for op in ops:
|
||||
self.append_op(op)
|
||||
|
||||
|
@ -139,9 +157,15 @@ class RevertableOpStack:
|
|||
yield op
|
||||
|
||||
def get_undo_ops(self) -> bytes:
|
||||
"""
|
||||
Get the serialized bytes to undo all of the changes made by the pending ops
|
||||
"""
|
||||
return b''.join(op.invert().pack() for op in reversed(self))
|
||||
|
||||
def apply_packed_undo_ops(self, packed: bytes):
|
||||
"""
|
||||
Unpack and apply a sequence of undo ops from serialized undo bytes
|
||||
"""
|
||||
while packed:
|
||||
op, packed = RevertableOp.unpack(packed)
|
||||
self.append_op(op)
|
||||
|
|
|
@ -112,7 +112,7 @@ class LevelDB:
|
|||
self.merkle = Merkle()
|
||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||
|
||||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||
|
||||
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
||||
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
|
||||
|
@ -696,23 +696,21 @@ class LevelDB:
|
|||
yield meta
|
||||
batch.clear()
|
||||
|
||||
async def claims_producer(self, claim_hashes: Set[bytes]):
|
||||
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||
batch = []
|
||||
results = []
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def produce_claim(claim_hash):
|
||||
for claim_hash in claim_hashes:
|
||||
if claim_hash not in self.claim_to_txo:
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
return
|
||||
continue
|
||||
name = self.claim_to_txo[claim_hash].normalized_name
|
||||
if not self.prefix_db.claim_takeover.get(name):
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
return
|
||||
continue
|
||||
claim_txo = self.claim_to_txo.get(claim_hash)
|
||||
if not claim_txo:
|
||||
return
|
||||
continue
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
|
@ -721,25 +719,13 @@ class LevelDB:
|
|||
if claim:
|
||||
batch.append(claim)
|
||||
|
||||
def get_metadata(claim):
|
||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if meta:
|
||||
results.append(meta)
|
||||
|
||||
if claim_hashes:
|
||||
await asyncio.wait(
|
||||
[loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes]
|
||||
)
|
||||
batch.sort(key=lambda x: x.tx_hash)
|
||||
|
||||
if batch:
|
||||
await asyncio.wait(
|
||||
[loop.run_in_executor(None, get_metadata, claim) for claim in batch]
|
||||
)
|
||||
for meta in results:
|
||||
yield meta
|
||||
|
||||
batch.clear()
|
||||
for claim in batch:
|
||||
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if _meta:
|
||||
results.append(_meta)
|
||||
return results
|
||||
|
||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||
activated = defaultdict(list)
|
||||
|
@ -923,57 +909,51 @@ class LevelDB:
|
|||
return None, tx_height
|
||||
|
||||
def get_block_txs(self, height: int) -> List[bytes]:
|
||||
return [
|
||||
tx_hash for tx_hash in self.prefix_db.tx_hash.iterate(
|
||||
start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],),
|
||||
deserialize_value=False, include_key=False
|
||||
)
|
||||
]
|
||||
return self.prefix_db.block_txs.get(height).tx_hashes
|
||||
|
||||
def _fs_transactions(self, txids: Iterable[str]):
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.prefix_db.tx.get
|
||||
tx_cache = self._tx_and_merkle_cache
|
||||
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
||||
tx_infos = {}
|
||||
|
||||
for tx_hash in txids:
|
||||
cached_tx = tx_cache.get(tx_hash)
|
||||
if cached_tx:
|
||||
tx, merkle = cached_tx
|
||||
else:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
tx_num = None if not tx_num else tx_num.tx_num
|
||||
if tx_num is not None:
|
||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||
if tx_height == -1:
|
||||
merkle = {
|
||||
'block_height': -1
|
||||
}
|
||||
else:
|
||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
||||
branch, root = self.merkle.branch_and_root(
|
||||
self.get_block_txs(tx_height), tx_pos
|
||||
)
|
||||
merkle = {
|
||||
'block_height': tx_height,
|
||||
'merkle': [
|
||||
hash_to_hex_str(hash)
|
||||
for hash in branch
|
||||
],
|
||||
'pos': tx_pos
|
||||
}
|
||||
if tx_height + 10 < self.db_height:
|
||||
tx_cache[tx_hash] = tx, merkle
|
||||
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
|
||||
for tx_hash in tx_hashes:
|
||||
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
|
||||
None, self._get_transaction_and_merkle, tx_hash
|
||||
)
|
||||
await asyncio.sleep(0)
|
||||
return tx_infos
|
||||
|
||||
async def fs_transactions(self, txids):
|
||||
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
|
||||
def _get_transaction_and_merkle(self, tx_hash):
|
||||
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
|
||||
if cached_tx:
|
||||
tx, merkle = cached_tx
|
||||
else:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
tx_num = None if not tx_num else tx_num.tx_num
|
||||
if tx_num is not None:
|
||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
||||
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||
if tx_height == -1:
|
||||
merkle = {
|
||||
'block_height': -1
|
||||
}
|
||||
else:
|
||||
tx_pos = tx_num - self.tx_counts[tx_height - 1]
|
||||
branch, root = self.merkle.branch_and_root(
|
||||
self.get_block_txs(tx_height), tx_pos
|
||||
)
|
||||
merkle = {
|
||||
'block_height': tx_height,
|
||||
'merkle': [
|
||||
hash_to_hex_str(hash)
|
||||
for hash in branch
|
||||
],
|
||||
'pos': tx_pos
|
||||
}
|
||||
if tx_height + 10 < self.db_height:
|
||||
self._tx_and_merkle_cache[tx_hash] = tx, merkle
|
||||
return (None if not tx else tx.hex(), merkle)
|
||||
|
||||
async def fs_block_hashes(self, height, count):
|
||||
if height + count > len(self.headers):
|
||||
|
|
|
@ -69,7 +69,7 @@ class Server:
|
|||
|
||||
def run(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
executor = ThreadPoolExecutor(self.env.max_query_workers)
|
||||
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
|
||||
loop.set_default_executor(executor)
|
||||
|
||||
def __exit():
|
||||
|
|
|
@ -2,7 +2,6 @@ import os
|
|||
import ssl
|
||||
import math
|
||||
import time
|
||||
import json
|
||||
import base64
|
||||
import codecs
|
||||
import typing
|
||||
|
@ -197,6 +196,8 @@ class SessionManager:
|
|||
self.notified_height: typing.Optional[int] = None
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
self.consumers = 32
|
||||
self.priority_queue = asyncio.PriorityQueue()
|
||||
|
||||
self.session_event = Event()
|
||||
|
||||
|
@ -546,8 +547,13 @@ class SessionManager:
|
|||
# because we connect to ourself
|
||||
await asyncio.wait([
|
||||
self._clear_stale_sessions(),
|
||||
self._manage_servers()
|
||||
self._manage_servers(),
|
||||
self.serve_requests()
|
||||
])
|
||||
except Exception as err:
|
||||
if not isinstance(err, asyncio.CancelledError):
|
||||
log.exception("hub server died")
|
||||
raise err
|
||||
finally:
|
||||
await self._close_servers(list(self.servers.keys()))
|
||||
log.warning("disconnect %i sessions", len(self.sessions))
|
||||
|
@ -557,6 +563,18 @@ class SessionManager:
|
|||
])
|
||||
await self.stop_other()
|
||||
|
||||
async def serve_requests(self):
|
||||
async def consumer():
|
||||
while True:
|
||||
_, _, fut = await self.priority_queue.get()
|
||||
try:
|
||||
await fut
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.exception("raised while serving a request. This should never happen.")
|
||||
await asyncio.gather(*(consumer() for _ in range(self.consumers)))
|
||||
|
||||
async def start_other(self):
|
||||
pass
|
||||
|
||||
|
@ -633,7 +651,7 @@ class SessionManager:
|
|||
self.mempool_statuses.pop(hashX, None)
|
||||
|
||||
await asyncio.get_event_loop().run_in_executor(
|
||||
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||
)
|
||||
|
||||
if touched or new_touched or (height_changed and self.mempool_statuses):
|
||||
|
@ -775,10 +793,9 @@ class LBRYSessionManager(SessionManager):
|
|||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.query_executor = None
|
||||
self.websocket = None
|
||||
# self.metrics = ServerLoadData()
|
||||
self.metrics_loop = None
|
||||
# self.metrics_loop = None
|
||||
self.running = False
|
||||
if self.env.websocket_host is not None and self.env.websocket_port is not None:
|
||||
self.websocket = AdminWebSocket(self)
|
||||
|
@ -795,12 +812,6 @@ class LBRYSessionManager(SessionManager):
|
|||
|
||||
async def start_other(self):
|
||||
self.running = True
|
||||
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
|
||||
self.query_executor = ThreadPoolExecutor(max_workers=1)
|
||||
else:
|
||||
self.query_executor = ProcessPoolExecutor(
|
||||
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
|
||||
)
|
||||
if self.websocket is not None:
|
||||
await self.websocket.start()
|
||||
|
||||
|
@ -808,7 +819,6 @@ class LBRYSessionManager(SessionManager):
|
|||
self.running = False
|
||||
if self.websocket is not None:
|
||||
await self.websocket.stop()
|
||||
self.query_executor.shutdown()
|
||||
|
||||
|
||||
class LBRYElectrumX(SessionBase):
|
||||
|
@ -880,6 +890,14 @@ class LBRYElectrumX(SessionBase):
|
|||
self.daemon = self.session_mgr.daemon
|
||||
self.bp: BlockProcessor = self.session_mgr.bp
|
||||
self.db: LevelDB = self.bp.db
|
||||
self.last_request_received_at = 0
|
||||
|
||||
def schedule_requests(self, requests):
|
||||
for request in requests:
|
||||
current = time.perf_counter()
|
||||
elapsed = (1 << 65) - current - self.last_request_received_at
|
||||
self.last_request_received_at = current
|
||||
self.session_mgr.priority_queue.put_nowait((elapsed, id(request), self._handle_request(request)))
|
||||
|
||||
@classmethod
|
||||
def protocol_min_max_strings(cls):
|
||||
|
@ -971,24 +989,6 @@ class LBRYElectrumX(SessionBase):
|
|||
# else:
|
||||
# return APICallMetrics(query_name)
|
||||
|
||||
async def run_in_executor(self, query_name, func, kwargs):
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
self.session_mgr.pending_query_metric.inc()
|
||||
result = await asyncio.get_running_loop().run_in_executor(
|
||||
self.session_mgr.query_executor, func, kwargs
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
log.exception("dear devs, please handle this exception better")
|
||||
self.session_mgr.db_error_metric.inc()
|
||||
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
|
||||
else:
|
||||
return base64.b64encode(result).decode()
|
||||
finally:
|
||||
self.session_mgr.pending_query_metric.dec()
|
||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
# async def run_and_cache_query(self, query_name, kwargs):
|
||||
# start = time.perf_counter()
|
||||
|
@ -1036,41 +1036,52 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.pending_query_metric.dec()
|
||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
def _claimtrie_resolve(self, *urls):
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc()
|
||||
stream, channel, repost, reposted_channel = self.db._resolve(url)
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
extra.append(channel.censor_row)
|
||||
elif isinstance(stream, ResolveCensoredError):
|
||||
rows.append(stream)
|
||||
extra.append(stream.censor_row)
|
||||
elif channel and not stream:
|
||||
rows.append(channel)
|
||||
# print("resolved channel", channel.name.decode())
|
||||
if repost:
|
||||
extra.append(repost)
|
||||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
elif stream:
|
||||
# print("resolved stream", stream.name.decode())
|
||||
rows.append(stream)
|
||||
if channel:
|
||||
# print("and channel", channel.name.decode())
|
||||
extra.append(channel)
|
||||
if repost:
|
||||
extra.append(repost)
|
||||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
||||
return Outputs.to_base64(rows, extra, 0, None, None)
|
||||
async def _cached_resolve_url(self, url):
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.bp.resolve_cache[url]
|
||||
|
||||
async def claimtrie_resolve(self, *urls):
|
||||
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
|
||||
self.session_mgr.resolved_url_count_metric.inc(len(urls))
|
||||
return result
|
||||
async def claimtrie_resolve(self, *urls) -> str:
|
||||
sorted_urls = tuple(sorted(urls))
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||
try:
|
||||
if sorted_urls in self.bp.resolve_outputs_cache:
|
||||
return self.bp.resolve_outputs_cache[sorted_urls]
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
if url not in self.bp.resolve_cache:
|
||||
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
extra.append(channel.censor_row)
|
||||
elif isinstance(stream, ResolveCensoredError):
|
||||
rows.append(stream)
|
||||
extra.append(stream.censor_row)
|
||||
elif channel and not stream:
|
||||
rows.append(channel)
|
||||
# print("resolved channel", channel.name.decode())
|
||||
if repost:
|
||||
extra.append(repost)
|
||||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
elif stream:
|
||||
# print("resolved stream", stream.name.decode())
|
||||
rows.append(stream)
|
||||
if channel:
|
||||
# print("and channel", channel.name.decode())
|
||||
extra.append(channel)
|
||||
if repost:
|
||||
extra.append(repost)
|
||||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
await asyncio.sleep(0)
|
||||
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||
)
|
||||
return result
|
||||
finally:
|
||||
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||
|
||||
async def get_server_height(self):
|
||||
return self.bp.height
|
||||
|
@ -1221,9 +1232,11 @@ class LBRYElectrumX(SessionBase):
|
|||
address: the address to subscribe to"""
|
||||
if len(addresses) > 1000:
|
||||
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
|
||||
return [
|
||||
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses
|
||||
]
|
||||
results = []
|
||||
for address in addresses:
|
||||
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
|
||||
await asyncio.sleep(0)
|
||||
return results
|
||||
|
||||
async def address_unsubscribe(self, address):
|
||||
"""Unsubscribe an address.
|
||||
|
@ -1472,7 +1485,7 @@ class LBRYElectrumX(SessionBase):
|
|||
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
|
||||
for tx_hash in tx_hashes:
|
||||
assert_tx_hash(tx_hash)
|
||||
batch_result = await self.db.fs_transactions(tx_hashes)
|
||||
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
|
||||
needed_merkles = {}
|
||||
|
||||
for tx_hash in tx_hashes:
|
||||
|
|
|
@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
|
|||
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||
|
||||
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
||||
txs = await bp.db.fs_transactions(txids)
|
||||
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||
|
|
|
@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
|||
txids = [
|
||||
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
|
||||
]
|
||||
txs = await bp.db.fs_transactions(txids)
|
||||
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||
|
|
Loading…
Reference in a new issue