forked from LBRYCommunity/lbry-sdk
Compare commits
14 commits
master
...
crazy_hub_
Author | SHA1 | Date | |
---|---|---|---|
|
1f64275da1 | ||
|
8c605864e7 | ||
|
6ec88e2101 | ||
|
17eeafd62c | ||
|
28e4b3eefd | ||
|
b2de89ca29 | ||
|
8bfff2d549 | ||
|
39d5078788 | ||
|
f8c46647d7 | ||
|
82de92c324 | ||
|
3dc3792478 | ||
|
ba1d0a12d1 | ||
|
e75047a0ab | ||
|
83a167bd37 |
12 changed files with 229 additions and 172 deletions
|
@ -424,10 +424,11 @@ class RPCSession(SessionBase):
|
||||||
self.max_errors = 0
|
self.max_errors = 0
|
||||||
self._bump_errors()
|
self._bump_errors()
|
||||||
else:
|
else:
|
||||||
for request in requests:
|
self.schedule_requests(requests)
|
||||||
await self._task_group.add(self._handle_request(request))
|
|
||||||
|
|
||||||
async def _handle_request(self, request):
|
async def _handle_request(self, request):
|
||||||
|
if self.is_closing():
|
||||||
|
return
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
result = await self.handle_request(request)
|
result = await self.handle_request(request)
|
||||||
|
@ -472,6 +473,10 @@ class RPCSession(SessionBase):
|
||||||
async def handle_request(self, request):
|
async def handle_request(self, request):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def schedule_requests(self, requests):
|
||||||
|
for request in requests:
|
||||||
|
self._task_group.add(self._handle_request(request))
|
||||||
|
|
||||||
async def send_request(self, method, args=()):
|
async def send_request(self, method, args=()):
|
||||||
"""Send an RPC request over the network."""
|
"""Send an RPC request over the network."""
|
||||||
if self.is_closing():
|
if self.is_closing():
|
||||||
|
|
|
@ -3,14 +3,16 @@ import asyncio
|
||||||
import typing
|
import typing
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from struct import pack, unpack
|
from struct import pack, unpack
|
||||||
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
|
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
|
||||||
from prometheus_client import Gauge, Histogram
|
from prometheus_client import Gauge, Histogram
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
import lbry
|
import lbry
|
||||||
|
from lbry.schema.url import URL
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
||||||
|
from lbry.utils import LRUCache
|
||||||
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
||||||
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
||||||
from lbry.wallet.server.daemon import DaemonError
|
from lbry.wallet.server.daemon import DaemonError
|
||||||
|
@ -202,6 +204,8 @@ class BlockProcessor:
|
||||||
self.env = env
|
self.env = env
|
||||||
self.db = db
|
self.db = db
|
||||||
self.daemon = daemon
|
self.daemon = daemon
|
||||||
|
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
|
||||||
|
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
|
||||||
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
|
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
|
||||||
self.shutdown_event = shutdown_event
|
self.shutdown_event = shutdown_event
|
||||||
self.coin = env.coin
|
self.coin = env.coin
|
||||||
|
@ -231,6 +235,9 @@ class BlockProcessor:
|
||||||
self.db_op_stack: Optional[RevertableOpStack] = None
|
self.db_op_stack: Optional[RevertableOpStack] = None
|
||||||
|
|
||||||
# self.search_cache = {}
|
# self.search_cache = {}
|
||||||
|
self.resolve_cache = LRUCache(2**16)
|
||||||
|
self.resolve_outputs_cache = LRUCache(2 ** 16)
|
||||||
|
|
||||||
self.history_cache = {}
|
self.history_cache = {}
|
||||||
self.status_server = StatusServer()
|
self.status_server = StatusServer()
|
||||||
|
|
||||||
|
@ -297,7 +304,11 @@ class BlockProcessor:
|
||||||
|
|
||||||
for claim_hash in self.removed_claims_to_send_es:
|
for claim_hash in self.removed_claims_to_send_es:
|
||||||
yield 'delete', claim_hash.hex()
|
yield 'delete', claim_hash.hex()
|
||||||
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
|
|
||||||
|
to_update = await asyncio.get_event_loop().run_in_executor(
|
||||||
|
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
|
||||||
|
)
|
||||||
|
for claim in to_update:
|
||||||
yield 'update', claim
|
yield 'update', claim
|
||||||
|
|
||||||
async def run_in_thread_with_lock(self, func, *args):
|
async def run_in_thread_with_lock(self, func, *args):
|
||||||
|
@ -308,13 +319,12 @@ class BlockProcessor:
|
||||||
# consistent and not being updated elsewhere.
|
# consistent and not being updated elsewhere.
|
||||||
async def run_in_thread_locked():
|
async def run_in_thread_locked():
|
||||||
async with self.state_lock:
|
async with self.state_lock:
|
||||||
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
|
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
|
||||||
return await asyncio.shield(run_in_thread_locked())
|
return await asyncio.shield(run_in_thread_locked())
|
||||||
|
|
||||||
@staticmethod
|
async def run_in_thread(self, func, *args):
|
||||||
async def run_in_thread(func, *args):
|
|
||||||
async def run_in_thread():
|
async def run_in_thread():
|
||||||
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
|
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
|
||||||
return await asyncio.shield(run_in_thread())
|
return await asyncio.shield(run_in_thread())
|
||||||
|
|
||||||
async def check_and_advance_blocks(self, raw_blocks):
|
async def check_and_advance_blocks(self, raw_blocks):
|
||||||
|
@ -1440,6 +1450,7 @@ class BlockProcessor:
|
||||||
|
|
||||||
self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
|
self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
|
||||||
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
|
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
|
||||||
|
self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
|
||||||
|
|
||||||
for tx, tx_hash in txs:
|
for tx, tx_hash in txs:
|
||||||
spent_claims = {}
|
spent_claims = {}
|
||||||
|
@ -1580,6 +1591,8 @@ class BlockProcessor:
|
||||||
self.pending_transaction_num_mapping.clear()
|
self.pending_transaction_num_mapping.clear()
|
||||||
self.pending_transactions.clear()
|
self.pending_transactions.clear()
|
||||||
self.pending_support_amount_change.clear()
|
self.pending_support_amount_change.clear()
|
||||||
|
self.resolve_cache.clear()
|
||||||
|
self.resolve_outputs_cache.clear()
|
||||||
|
|
||||||
async def backup_block(self):
|
async def backup_block(self):
|
||||||
assert len(self.db.prefix_db._op_stack) == 0
|
assert len(self.db.prefix_db._op_stack) == 0
|
||||||
|
@ -1742,5 +1755,6 @@ class BlockProcessor:
|
||||||
self.status_server.stop()
|
self.status_server.stop()
|
||||||
# Shut down block processing
|
# Shut down block processing
|
||||||
self.logger.info('closing the DB for a clean shutdown...')
|
self.logger.info('closing the DB for a clean shutdown...')
|
||||||
|
self._sync_reader_executor.shutdown(wait=True)
|
||||||
|
self._chain_executor.shutdown(wait=True)
|
||||||
self.db.close()
|
self.db.close()
|
||||||
# self.executor.shutdown(wait=True)
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ class Daemon:
|
||||||
self.available_rpcs = {}
|
self.available_rpcs = {}
|
||||||
self.connector = aiohttp.TCPConnector()
|
self.connector = aiohttp.TCPConnector()
|
||||||
self._block_hash_cache = LRUCacheWithMetrics(100000)
|
self._block_hash_cache = LRUCacheWithMetrics(100000)
|
||||||
self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE)
|
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
if self.connector:
|
if self.connector:
|
||||||
|
|
|
@ -39,3 +39,4 @@ class DB_PREFIXES(enum.Enum):
|
||||||
db_state = b's'
|
db_state = b's'
|
||||||
channel_count = b'Z'
|
channel_count = b'Z'
|
||||||
support_amount = b'a'
|
support_amount = b'a'
|
||||||
|
block_txs = b'b'
|
||||||
|
|
|
@ -51,9 +51,7 @@ class SearchIndex:
|
||||||
self.index = index_prefix + 'claims'
|
self.index = index_prefix + 'claims'
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
self.claim_cache = LRUCache(2 ** 15)
|
self.claim_cache = LRUCache(2 ** 15)
|
||||||
self.short_id_cache = LRUCache(2 ** 17)
|
|
||||||
self.search_cache = LRUCache(2 ** 17)
|
self.search_cache = LRUCache(2 ** 17)
|
||||||
self.resolution_cache = LRUCache(2 ** 17)
|
|
||||||
self._elastic_host = elastic_host
|
self._elastic_host = elastic_host
|
||||||
self._elastic_port = elastic_port
|
self._elastic_port = elastic_port
|
||||||
self._trending_half_life = half_life
|
self._trending_half_life = half_life
|
||||||
|
@ -260,9 +258,7 @@ class SearchIndex:
|
||||||
|
|
||||||
def clear_caches(self):
|
def clear_caches(self):
|
||||||
self.search_cache.clear()
|
self.search_cache.clear()
|
||||||
self.short_id_cache.clear()
|
|
||||||
self.claim_cache.clear()
|
self.claim_cache.clear()
|
||||||
self.resolution_cache.clear()
|
|
||||||
|
|
||||||
async def cached_search(self, kwargs):
|
async def cached_search(self, kwargs):
|
||||||
total_referenced = []
|
total_referenced = []
|
||||||
|
@ -354,21 +350,6 @@ class SearchIndex:
|
||||||
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
|
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
|
||||||
self.claim_cache.set(result['claim_id'], result)
|
self.claim_cache.set(result['claim_id'], result)
|
||||||
|
|
||||||
async def full_id_from_short_id(self, name, short_id, channel_id=None):
|
|
||||||
key = '#'.join((channel_id or '', name, short_id))
|
|
||||||
if key not in self.short_id_cache:
|
|
||||||
query = {'name': name, 'claim_id': short_id}
|
|
||||||
if channel_id:
|
|
||||||
query['channel_id'] = channel_id
|
|
||||||
query['order_by'] = ['^channel_join']
|
|
||||||
query['signature_valid'] = True
|
|
||||||
else:
|
|
||||||
query['order_by'] = '^creation_height'
|
|
||||||
result, _, _ = await self.search(**query, limit=1)
|
|
||||||
if len(result) == 1:
|
|
||||||
result = result[0]['claim_id']
|
|
||||||
self.short_id_cache[key] = result
|
|
||||||
return self.short_id_cache.get(key, None)
|
|
||||||
|
|
||||||
async def search(self, **kwargs):
|
async def search(self, **kwargs):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -169,6 +169,14 @@ class BlockHashValue(NamedTuple):
|
||||||
return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})"
|
return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})"
|
||||||
|
|
||||||
|
|
||||||
|
class BlockTxsKey(NamedTuple):
|
||||||
|
height: int
|
||||||
|
|
||||||
|
|
||||||
|
class BlockTxsValue(NamedTuple):
|
||||||
|
tx_hashes: typing.List[bytes]
|
||||||
|
|
||||||
|
|
||||||
class TxCountKey(NamedTuple):
|
class TxCountKey(NamedTuple):
|
||||||
height: int
|
height: int
|
||||||
|
|
||||||
|
@ -1540,6 +1548,36 @@ class DBStatePrefixRow(PrefixRow):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class BlockTxsPrefixRow(PrefixRow):
|
||||||
|
prefix = DB_PREFIXES.block_txs.value
|
||||||
|
key_struct = struct.Struct(b'>L')
|
||||||
|
key_part_lambdas = [
|
||||||
|
lambda: b'',
|
||||||
|
struct.Struct(b'>L').pack
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_key(cls, height: int):
|
||||||
|
return super().pack_key(height)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_key(cls, key: bytes) -> BlockTxsKey:
|
||||||
|
return BlockTxsKey(*super().unpack_key(key))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_value(cls, tx_hashes: typing.List[bytes]) -> bytes:
|
||||||
|
assert all(len(tx_hash) == 32 for tx_hash in tx_hashes)
|
||||||
|
return b''.join(tx_hashes)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_value(cls, data: bytes) -> BlockTxsValue:
|
||||||
|
return BlockTxsValue([data[i*32:(i+1)*32] for i in range(len(data) // 32)])
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_item(cls, height, tx_hashes):
|
||||||
|
return cls.pack_key(height), cls.pack_value(tx_hashes)
|
||||||
|
|
||||||
|
|
||||||
class LevelDBStore(KeyValueStorage):
|
class LevelDBStore(KeyValueStorage):
|
||||||
def __init__(self, path: str, cache_mb: int, max_open_files: int):
|
def __init__(self, path: str, cache_mb: int, max_open_files: int):
|
||||||
import plyvel
|
import plyvel
|
||||||
|
@ -1604,6 +1642,7 @@ class HubDB(PrefixDB):
|
||||||
self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
|
self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
|
||||||
self.db_state = DBStatePrefixRow(db, self._op_stack)
|
self.db_state = DBStatePrefixRow(db, self._op_stack)
|
||||||
self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
|
self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
|
||||||
|
self.block_txs = BlockTxsPrefixRow(db, self._op_stack)
|
||||||
|
|
||||||
|
|
||||||
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
|
||||||
|
|
|
@ -83,11 +83,26 @@ class OpStackIntegrity(Exception):
|
||||||
|
|
||||||
class RevertableOpStack:
|
class RevertableOpStack:
|
||||||
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
|
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
|
||||||
|
"""
|
||||||
|
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
|
||||||
|
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
|
||||||
|
are not deleted, and that when keys are deleted the current value is correctly known so that the delete
|
||||||
|
may be undone. When putting values, the integrity checks assure that existing values are not overwritten
|
||||||
|
without first being deleted. Updates are performed by applying a delete op for the old value and a put op
|
||||||
|
for the new value.
|
||||||
|
|
||||||
|
:param get_fn: getter function from an object implementing `KeyValueStorage`
|
||||||
|
:param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged
|
||||||
|
"""
|
||||||
self._get = get_fn
|
self._get = get_fn
|
||||||
self._items = defaultdict(list)
|
self._items = defaultdict(list)
|
||||||
self._unsafe_prefixes = unsafe_prefixes or set()
|
self._unsafe_prefixes = unsafe_prefixes or set()
|
||||||
|
|
||||||
def append_op(self, op: RevertableOp):
|
def append_op(self, op: RevertableOp):
|
||||||
|
"""
|
||||||
|
Apply a put or delete op, checking that it introduces no integrity errors
|
||||||
|
"""
|
||||||
|
|
||||||
inverted = op.invert()
|
inverted = op.invert()
|
||||||
if self._items[op.key] and inverted == self._items[op.key][-1]:
|
if self._items[op.key] and inverted == self._items[op.key][-1]:
|
||||||
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
|
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
|
||||||
|
@ -119,6 +134,9 @@ class RevertableOpStack:
|
||||||
self._items[op.key].append(op)
|
self._items[op.key].append(op)
|
||||||
|
|
||||||
def extend_ops(self, ops: Iterable[RevertableOp]):
|
def extend_ops(self, ops: Iterable[RevertableOp]):
|
||||||
|
"""
|
||||||
|
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
|
||||||
|
"""
|
||||||
for op in ops:
|
for op in ops:
|
||||||
self.append_op(op)
|
self.append_op(op)
|
||||||
|
|
||||||
|
@ -139,9 +157,15 @@ class RevertableOpStack:
|
||||||
yield op
|
yield op
|
||||||
|
|
||||||
def get_undo_ops(self) -> bytes:
|
def get_undo_ops(self) -> bytes:
|
||||||
|
"""
|
||||||
|
Get the serialized bytes to undo all of the changes made by the pending ops
|
||||||
|
"""
|
||||||
return b''.join(op.invert().pack() for op in reversed(self))
|
return b''.join(op.invert().pack() for op in reversed(self))
|
||||||
|
|
||||||
def apply_packed_undo_ops(self, packed: bytes):
|
def apply_packed_undo_ops(self, packed: bytes):
|
||||||
|
"""
|
||||||
|
Unpack and apply a sequence of undo ops from serialized undo bytes
|
||||||
|
"""
|
||||||
while packed:
|
while packed:
|
||||||
op, packed = RevertableOp.unpack(packed)
|
op, packed = RevertableOp.unpack(packed)
|
||||||
self.append_op(op)
|
self.append_op(op)
|
||||||
|
|
|
@ -112,7 +112,7 @@ class LevelDB:
|
||||||
self.merkle = Merkle()
|
self.merkle = Merkle()
|
||||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||||
|
|
||||||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||||
|
|
||||||
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
||||||
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
|
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
|
||||||
|
@ -696,23 +696,21 @@ class LevelDB:
|
||||||
yield meta
|
yield meta
|
||||||
batch.clear()
|
batch.clear()
|
||||||
|
|
||||||
async def claims_producer(self, claim_hashes: Set[bytes]):
|
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||||
batch = []
|
batch = []
|
||||||
results = []
|
results = []
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
for claim_hash in claim_hashes:
|
||||||
|
|
||||||
def produce_claim(claim_hash):
|
|
||||||
if claim_hash not in self.claim_to_txo:
|
if claim_hash not in self.claim_to_txo:
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
return
|
continue
|
||||||
name = self.claim_to_txo[claim_hash].normalized_name
|
name = self.claim_to_txo[claim_hash].normalized_name
|
||||||
if not self.prefix_db.claim_takeover.get(name):
|
if not self.prefix_db.claim_takeover.get(name):
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
return
|
continue
|
||||||
claim_txo = self.claim_to_txo.get(claim_hash)
|
claim_txo = self.claim_to_txo.get(claim_hash)
|
||||||
if not claim_txo:
|
if not claim_txo:
|
||||||
return
|
continue
|
||||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||||
claim = self._prepare_resolve_result(
|
claim = self._prepare_resolve_result(
|
||||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||||
|
@ -721,25 +719,13 @@ class LevelDB:
|
||||||
if claim:
|
if claim:
|
||||||
batch.append(claim)
|
batch.append(claim)
|
||||||
|
|
||||||
def get_metadata(claim):
|
|
||||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
|
||||||
if meta:
|
|
||||||
results.append(meta)
|
|
||||||
|
|
||||||
if claim_hashes:
|
|
||||||
await asyncio.wait(
|
|
||||||
[loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes]
|
|
||||||
)
|
|
||||||
batch.sort(key=lambda x: x.tx_hash)
|
batch.sort(key=lambda x: x.tx_hash)
|
||||||
|
|
||||||
if batch:
|
for claim in batch:
|
||||||
await asyncio.wait(
|
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||||
[loop.run_in_executor(None, get_metadata, claim) for claim in batch]
|
if _meta:
|
||||||
)
|
results.append(_meta)
|
||||||
for meta in results:
|
return results
|
||||||
yield meta
|
|
||||||
|
|
||||||
batch.clear()
|
|
||||||
|
|
||||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||||
activated = defaultdict(list)
|
activated = defaultdict(list)
|
||||||
|
@ -923,57 +909,51 @@ class LevelDB:
|
||||||
return None, tx_height
|
return None, tx_height
|
||||||
|
|
||||||
def get_block_txs(self, height: int) -> List[bytes]:
|
def get_block_txs(self, height: int) -> List[bytes]:
|
||||||
return [
|
return self.prefix_db.block_txs.get(height).tx_hashes
|
||||||
tx_hash for tx_hash in self.prefix_db.tx_hash.iterate(
|
|
||||||
start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],),
|
|
||||||
deserialize_value=False, include_key=False
|
|
||||||
)
|
|
||||||
]
|
|
||||||
|
|
||||||
def _fs_transactions(self, txids: Iterable[str]):
|
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
||||||
tx_counts = self.tx_counts
|
|
||||||
tx_db_get = self.prefix_db.tx.get
|
|
||||||
tx_cache = self._tx_and_merkle_cache
|
|
||||||
tx_infos = {}
|
tx_infos = {}
|
||||||
|
for tx_hash in tx_hashes:
|
||||||
for tx_hash in txids:
|
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
|
||||||
cached_tx = tx_cache.get(tx_hash)
|
None, self._get_transaction_and_merkle, tx_hash
|
||||||
if cached_tx:
|
)
|
||||||
tx, merkle = cached_tx
|
await asyncio.sleep(0)
|
||||||
else:
|
|
||||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
|
||||||
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
|
||||||
tx = None
|
|
||||||
tx_height = -1
|
|
||||||
tx_num = None if not tx_num else tx_num.tx_num
|
|
||||||
if tx_num is not None:
|
|
||||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
|
||||||
tx_height = bisect_right(tx_counts, tx_num)
|
|
||||||
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
|
||||||
if tx_height == -1:
|
|
||||||
merkle = {
|
|
||||||
'block_height': -1
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
|
||||||
branch, root = self.merkle.branch_and_root(
|
|
||||||
self.get_block_txs(tx_height), tx_pos
|
|
||||||
)
|
|
||||||
merkle = {
|
|
||||||
'block_height': tx_height,
|
|
||||||
'merkle': [
|
|
||||||
hash_to_hex_str(hash)
|
|
||||||
for hash in branch
|
|
||||||
],
|
|
||||||
'pos': tx_pos
|
|
||||||
}
|
|
||||||
if tx_height + 10 < self.db_height:
|
|
||||||
tx_cache[tx_hash] = tx, merkle
|
|
||||||
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
|
|
||||||
return tx_infos
|
return tx_infos
|
||||||
|
|
||||||
async def fs_transactions(self, txids):
|
def _get_transaction_and_merkle(self, tx_hash):
|
||||||
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
|
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
|
||||||
|
if cached_tx:
|
||||||
|
tx, merkle = cached_tx
|
||||||
|
else:
|
||||||
|
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||||
|
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
||||||
|
tx = None
|
||||||
|
tx_height = -1
|
||||||
|
tx_num = None if not tx_num else tx_num.tx_num
|
||||||
|
if tx_num is not None:
|
||||||
|
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
||||||
|
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||||
|
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||||
|
if tx_height == -1:
|
||||||
|
merkle = {
|
||||||
|
'block_height': -1
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
tx_pos = tx_num - self.tx_counts[tx_height - 1]
|
||||||
|
branch, root = self.merkle.branch_and_root(
|
||||||
|
self.get_block_txs(tx_height), tx_pos
|
||||||
|
)
|
||||||
|
merkle = {
|
||||||
|
'block_height': tx_height,
|
||||||
|
'merkle': [
|
||||||
|
hash_to_hex_str(hash)
|
||||||
|
for hash in branch
|
||||||
|
],
|
||||||
|
'pos': tx_pos
|
||||||
|
}
|
||||||
|
if tx_height + 10 < self.db_height:
|
||||||
|
self._tx_and_merkle_cache[tx_hash] = tx, merkle
|
||||||
|
return (None if not tx else tx.hex(), merkle)
|
||||||
|
|
||||||
async def fs_block_hashes(self, height, count):
|
async def fs_block_hashes(self, height, count):
|
||||||
if height + count > len(self.headers):
|
if height + count > len(self.headers):
|
||||||
|
|
|
@ -69,7 +69,7 @@ class Server:
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
executor = ThreadPoolExecutor(self.env.max_query_workers)
|
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
|
||||||
loop.set_default_executor(executor)
|
loop.set_default_executor(executor)
|
||||||
|
|
||||||
def __exit():
|
def __exit():
|
||||||
|
|
|
@ -2,7 +2,6 @@ import os
|
||||||
import ssl
|
import ssl
|
||||||
import math
|
import math
|
||||||
import time
|
import time
|
||||||
import json
|
|
||||||
import base64
|
import base64
|
||||||
import codecs
|
import codecs
|
||||||
import typing
|
import typing
|
||||||
|
@ -197,6 +196,8 @@ class SessionManager:
|
||||||
self.notified_height: typing.Optional[int] = None
|
self.notified_height: typing.Optional[int] = None
|
||||||
# Cache some idea of room to avoid recounting on each subscription
|
# Cache some idea of room to avoid recounting on each subscription
|
||||||
self.subs_room = 0
|
self.subs_room = 0
|
||||||
|
self.consumers = 32
|
||||||
|
self.priority_queue = asyncio.PriorityQueue()
|
||||||
|
|
||||||
self.session_event = Event()
|
self.session_event = Event()
|
||||||
|
|
||||||
|
@ -546,8 +547,13 @@ class SessionManager:
|
||||||
# because we connect to ourself
|
# because we connect to ourself
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
self._clear_stale_sessions(),
|
self._clear_stale_sessions(),
|
||||||
self._manage_servers()
|
self._manage_servers(),
|
||||||
|
self.serve_requests()
|
||||||
])
|
])
|
||||||
|
except Exception as err:
|
||||||
|
if not isinstance(err, asyncio.CancelledError):
|
||||||
|
log.exception("hub server died")
|
||||||
|
raise err
|
||||||
finally:
|
finally:
|
||||||
await self._close_servers(list(self.servers.keys()))
|
await self._close_servers(list(self.servers.keys()))
|
||||||
log.warning("disconnect %i sessions", len(self.sessions))
|
log.warning("disconnect %i sessions", len(self.sessions))
|
||||||
|
@ -557,6 +563,18 @@ class SessionManager:
|
||||||
])
|
])
|
||||||
await self.stop_other()
|
await self.stop_other()
|
||||||
|
|
||||||
|
async def serve_requests(self):
|
||||||
|
async def consumer():
|
||||||
|
while True:
|
||||||
|
_, _, fut = await self.priority_queue.get()
|
||||||
|
try:
|
||||||
|
await fut
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
log.exception("raised while serving a request. This should never happen.")
|
||||||
|
await asyncio.gather(*(consumer() for _ in range(self.consumers)))
|
||||||
|
|
||||||
async def start_other(self):
|
async def start_other(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -633,7 +651,7 @@ class SessionManager:
|
||||||
self.mempool_statuses.pop(hashX, None)
|
self.mempool_statuses.pop(hashX, None)
|
||||||
|
|
||||||
await asyncio.get_event_loop().run_in_executor(
|
await asyncio.get_event_loop().run_in_executor(
|
||||||
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
||||||
)
|
)
|
||||||
|
|
||||||
if touched or new_touched or (height_changed and self.mempool_statuses):
|
if touched or new_touched or (height_changed and self.mempool_statuses):
|
||||||
|
@ -775,10 +793,9 @@ class LBRYSessionManager(SessionManager):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.query_executor = None
|
|
||||||
self.websocket = None
|
self.websocket = None
|
||||||
# self.metrics = ServerLoadData()
|
# self.metrics = ServerLoadData()
|
||||||
self.metrics_loop = None
|
# self.metrics_loop = None
|
||||||
self.running = False
|
self.running = False
|
||||||
if self.env.websocket_host is not None and self.env.websocket_port is not None:
|
if self.env.websocket_host is not None and self.env.websocket_port is not None:
|
||||||
self.websocket = AdminWebSocket(self)
|
self.websocket = AdminWebSocket(self)
|
||||||
|
@ -795,12 +812,6 @@ class LBRYSessionManager(SessionManager):
|
||||||
|
|
||||||
async def start_other(self):
|
async def start_other(self):
|
||||||
self.running = True
|
self.running = True
|
||||||
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
|
|
||||||
self.query_executor = ThreadPoolExecutor(max_workers=1)
|
|
||||||
else:
|
|
||||||
self.query_executor = ProcessPoolExecutor(
|
|
||||||
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
|
|
||||||
)
|
|
||||||
if self.websocket is not None:
|
if self.websocket is not None:
|
||||||
await self.websocket.start()
|
await self.websocket.start()
|
||||||
|
|
||||||
|
@ -808,7 +819,6 @@ class LBRYSessionManager(SessionManager):
|
||||||
self.running = False
|
self.running = False
|
||||||
if self.websocket is not None:
|
if self.websocket is not None:
|
||||||
await self.websocket.stop()
|
await self.websocket.stop()
|
||||||
self.query_executor.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
class LBRYElectrumX(SessionBase):
|
class LBRYElectrumX(SessionBase):
|
||||||
|
@ -880,6 +890,14 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.daemon = self.session_mgr.daemon
|
self.daemon = self.session_mgr.daemon
|
||||||
self.bp: BlockProcessor = self.session_mgr.bp
|
self.bp: BlockProcessor = self.session_mgr.bp
|
||||||
self.db: LevelDB = self.bp.db
|
self.db: LevelDB = self.bp.db
|
||||||
|
self.last_request_received_at = 0
|
||||||
|
|
||||||
|
def schedule_requests(self, requests):
|
||||||
|
for request in requests:
|
||||||
|
current = time.perf_counter()
|
||||||
|
elapsed = (1 << 65) - current - self.last_request_received_at
|
||||||
|
self.last_request_received_at = current
|
||||||
|
self.session_mgr.priority_queue.put_nowait((elapsed, id(request), self._handle_request(request)))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def protocol_min_max_strings(cls):
|
def protocol_min_max_strings(cls):
|
||||||
|
@ -971,24 +989,6 @@ class LBRYElectrumX(SessionBase):
|
||||||
# else:
|
# else:
|
||||||
# return APICallMetrics(query_name)
|
# return APICallMetrics(query_name)
|
||||||
|
|
||||||
async def run_in_executor(self, query_name, func, kwargs):
|
|
||||||
start = time.perf_counter()
|
|
||||||
try:
|
|
||||||
self.session_mgr.pending_query_metric.inc()
|
|
||||||
result = await asyncio.get_running_loop().run_in_executor(
|
|
||||||
self.session_mgr.query_executor, func, kwargs
|
|
||||||
)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except Exception:
|
|
||||||
log.exception("dear devs, please handle this exception better")
|
|
||||||
self.session_mgr.db_error_metric.inc()
|
|
||||||
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
|
|
||||||
else:
|
|
||||||
return base64.b64encode(result).decode()
|
|
||||||
finally:
|
|
||||||
self.session_mgr.pending_query_metric.dec()
|
|
||||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
|
||||||
|
|
||||||
# async def run_and_cache_query(self, query_name, kwargs):
|
# async def run_and_cache_query(self, query_name, kwargs):
|
||||||
# start = time.perf_counter()
|
# start = time.perf_counter()
|
||||||
|
@ -1036,41 +1036,52 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.session_mgr.pending_query_metric.dec()
|
self.session_mgr.pending_query_metric.dec()
|
||||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||||
|
|
||||||
def _claimtrie_resolve(self, *urls):
|
async def _cached_resolve_url(self, url):
|
||||||
rows, extra = [], []
|
if url not in self.bp.resolve_cache:
|
||||||
for url in urls:
|
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||||
self.session_mgr.urls_to_resolve_count_metric.inc()
|
return self.bp.resolve_cache[url]
|
||||||
stream, channel, repost, reposted_channel = self.db._resolve(url)
|
|
||||||
if isinstance(channel, ResolveCensoredError):
|
|
||||||
rows.append(channel)
|
|
||||||
extra.append(channel.censor_row)
|
|
||||||
elif isinstance(stream, ResolveCensoredError):
|
|
||||||
rows.append(stream)
|
|
||||||
extra.append(stream.censor_row)
|
|
||||||
elif channel and not stream:
|
|
||||||
rows.append(channel)
|
|
||||||
# print("resolved channel", channel.name.decode())
|
|
||||||
if repost:
|
|
||||||
extra.append(repost)
|
|
||||||
if reposted_channel:
|
|
||||||
extra.append(reposted_channel)
|
|
||||||
elif stream:
|
|
||||||
# print("resolved stream", stream.name.decode())
|
|
||||||
rows.append(stream)
|
|
||||||
if channel:
|
|
||||||
# print("and channel", channel.name.decode())
|
|
||||||
extra.append(channel)
|
|
||||||
if repost:
|
|
||||||
extra.append(repost)
|
|
||||||
if reposted_channel:
|
|
||||||
extra.append(reposted_channel)
|
|
||||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
|
||||||
return Outputs.to_base64(rows, extra, 0, None, None)
|
|
||||||
|
|
||||||
async def claimtrie_resolve(self, *urls):
|
async def claimtrie_resolve(self, *urls) -> str:
|
||||||
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
|
sorted_urls = tuple(sorted(urls))
|
||||||
self.session_mgr.resolved_url_count_metric.inc(len(urls))
|
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||||
return result
|
try:
|
||||||
|
if sorted_urls in self.bp.resolve_outputs_cache:
|
||||||
|
return self.bp.resolve_outputs_cache[sorted_urls]
|
||||||
|
rows, extra = [], []
|
||||||
|
for url in urls:
|
||||||
|
if url not in self.bp.resolve_cache:
|
||||||
|
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||||
|
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
|
||||||
|
if isinstance(channel, ResolveCensoredError):
|
||||||
|
rows.append(channel)
|
||||||
|
extra.append(channel.censor_row)
|
||||||
|
elif isinstance(stream, ResolveCensoredError):
|
||||||
|
rows.append(stream)
|
||||||
|
extra.append(stream.censor_row)
|
||||||
|
elif channel and not stream:
|
||||||
|
rows.append(channel)
|
||||||
|
# print("resolved channel", channel.name.decode())
|
||||||
|
if repost:
|
||||||
|
extra.append(repost)
|
||||||
|
if reposted_channel:
|
||||||
|
extra.append(reposted_channel)
|
||||||
|
elif stream:
|
||||||
|
# print("resolved stream", stream.name.decode())
|
||||||
|
rows.append(stream)
|
||||||
|
if channel:
|
||||||
|
# print("and channel", channel.name.decode())
|
||||||
|
extra.append(channel)
|
||||||
|
if repost:
|
||||||
|
extra.append(repost)
|
||||||
|
if reposted_channel:
|
||||||
|
extra.append(reposted_channel)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||||
|
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
finally:
|
||||||
|
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||||
|
|
||||||
async def get_server_height(self):
|
async def get_server_height(self):
|
||||||
return self.bp.height
|
return self.bp.height
|
||||||
|
@ -1221,9 +1232,11 @@ class LBRYElectrumX(SessionBase):
|
||||||
address: the address to subscribe to"""
|
address: the address to subscribe to"""
|
||||||
if len(addresses) > 1000:
|
if len(addresses) > 1000:
|
||||||
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
|
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
|
||||||
return [
|
results = []
|
||||||
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses
|
for address in addresses:
|
||||||
]
|
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
return results
|
||||||
|
|
||||||
async def address_unsubscribe(self, address):
|
async def address_unsubscribe(self, address):
|
||||||
"""Unsubscribe an address.
|
"""Unsubscribe an address.
|
||||||
|
@ -1472,7 +1485,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
|
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
|
||||||
for tx_hash in tx_hashes:
|
for tx_hash in tx_hashes:
|
||||||
assert_tx_hash(tx_hash)
|
assert_tx_hash(tx_hash)
|
||||||
batch_result = await self.db.fs_transactions(tx_hashes)
|
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
|
||||||
needed_merkles = {}
|
needed_merkles = {}
|
||||||
|
|
||||||
for tx_hash in tx_hashes:
|
for tx_hash in tx_hashes:
|
||||||
|
|
|
@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
|
||||||
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||||
|
|
||||||
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
|
||||||
txs = await bp.db.fs_transactions(txids)
|
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||||
|
|
|
@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
||||||
txids = [
|
txids = [
|
||||||
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
|
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
|
||||||
]
|
]
|
||||||
txs = await bp.db.fs_transactions(txids)
|
txs = await bp.db.get_transactions_and_merkles(txids)
|
||||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||||
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')
|
||||||
|
|
Loading…
Add table
Reference in a new issue