From f46d9330b0809d8b06f13053ac7f60d3d849323a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 9 Oct 2021 14:32:30 -0400 Subject: [PATCH 01/19] smaller caches --- lbry/wallet/server/daemon.py | 2 +- lbry/wallet/server/leveldb.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/server/daemon.py b/lbry/wallet/server/daemon.py index 123f17f3b..c487de0c7 100644 --- a/lbry/wallet/server/daemon.py +++ b/lbry/wallet/server/daemon.py @@ -55,7 +55,7 @@ class Daemon: self.available_rpcs = {} self.connector = aiohttp.TCPConnector() self._block_hash_cache = LRUCacheWithMetrics(100000) - self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE) + self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE) async def close(self): if self.connector: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 44b38fced..53fbe21e3 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -112,7 +112,7 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") + self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server") self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) From f5e0ef522318c8b4e0749ff370bc68104641fca3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 12 Oct 2021 00:03:11 -0400 Subject: [PATCH 02/19] add block_txs index --- lbry/wallet/server/block_processor.py | 1 + lbry/wallet/server/db/__init__.py | 1 + lbry/wallet/server/db/prefixes.py | 39 +++++++++++++++++++++++++++ lbry/wallet/server/leveldb.py | 7 +---- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 9be9cf758..e59b470db 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1440,6 +1440,7 @@ class BlockProcessor: self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),)) self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,)) + self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],)) for tx, tx_hash in txs: spent_claims = {} diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index b3201dc79..7da046edc 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -39,3 +39,4 @@ class DB_PREFIXES(enum.Enum): db_state = b's' channel_count = b'Z' support_amount = b'a' + block_txs = b'b' diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 204babe1e..8b1603312 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -169,6 +169,14 @@ class BlockHashValue(NamedTuple): return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})" +class BlockTxsKey(NamedTuple): + height: int + + +class BlockTxsValue(NamedTuple): + tx_hashes: typing.List[bytes] + + class TxCountKey(NamedTuple): height: int @@ -1540,6 +1548,36 @@ class DBStatePrefixRow(PrefixRow): ) +class BlockTxsPrefixRow(PrefixRow): + prefix = DB_PREFIXES.block_txs.value + key_struct = struct.Struct(b'>L') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack + ] + + @classmethod + def pack_key(cls, height: int): + return super().pack_key(height) + + @classmethod + def unpack_key(cls, key: bytes) -> BlockTxsKey: + return BlockTxsKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, tx_hashes: typing.List[bytes]) -> bytes: + assert all(len(tx_hash) == 32 for tx_hash in tx_hashes) + return b''.join(tx_hashes) + + @classmethod + def unpack_value(cls, data: bytes) -> BlockTxsValue: + return BlockTxsValue([data[i*32:(i+1)*32] for i in range(len(data) // 32)]) + + @classmethod + def pack_item(cls, height, tx_hashes): + return cls.pack_key(height), cls.pack_value(tx_hashes) + + class LevelDBStore(KeyValueStorage): def __init__(self, path: str, cache_mb: int, max_open_files: int): import plyvel @@ -1604,6 +1642,7 @@ class HubDB(PrefixDB): self.channel_count = ChannelCountPrefixRow(db, self._op_stack) self.db_state = DBStatePrefixRow(db, self._op_stack) self.support_amount = SupportAmountPrefixRow(db, self._op_stack) + self.block_txs = BlockTxsPrefixRow(db, self._op_stack) def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 53fbe21e3..5de62bce6 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -923,12 +923,7 @@ class LevelDB: return None, tx_height def get_block_txs(self, height: int) -> List[bytes]: - return [ - tx_hash for tx_hash in self.prefix_db.tx_hash.iterate( - start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],), - deserialize_value=False, include_key=False - ) - ] + return self.prefix_db.block_txs.get(height).tx_hashes def _fs_transactions(self, txids: Iterable[str]): tx_counts = self.tx_counts From eabcc303678736c7dd91f90c654b9dcb148c1c49 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Oct 2021 10:18:03 -0400 Subject: [PATCH 03/19] resolve lru cache --- lbry/wallet/server/block_processor.py | 5 ++++- lbry/wallet/server/db/elasticsearch/search.py | 19 ------------------- lbry/wallet/server/session.py | 14 +++++++------- 3 files changed, 11 insertions(+), 27 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e59b470db..f2c0bcf9a 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -8,9 +8,10 @@ from prometheus_client import Gauge, Histogram from collections import defaultdict import lbry +from lbry.schema.url import URL from lbry.schema.claim import Claim from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger - +from lbry.utils import LRUCache from lbry.wallet.transaction import OutputScript, Output, Transaction from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.daemon import DaemonError @@ -231,6 +232,7 @@ class BlockProcessor: self.db_op_stack: Optional[RevertableOpStack] = None # self.search_cache = {} + self.resolve_cache = LRUCache(2**16) self.history_cache = {} self.status_server = StatusServer() @@ -1581,6 +1583,7 @@ class BlockProcessor: self.pending_transaction_num_mapping.clear() self.pending_transactions.clear() self.pending_support_amount_change.clear() + self.resolve_cache.clear() async def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 14b47677b..e7a8b58af 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -51,9 +51,7 @@ class SearchIndex: self.index = index_prefix + 'claims' self.logger = class_logger(__name__, self.__class__.__name__) self.claim_cache = LRUCache(2 ** 15) - self.short_id_cache = LRUCache(2 ** 17) self.search_cache = LRUCache(2 ** 17) - self.resolution_cache = LRUCache(2 ** 17) self._elastic_host = elastic_host self._elastic_port = elastic_port self._trending_half_life = half_life @@ -260,9 +258,7 @@ class SearchIndex: def clear_caches(self): self.search_cache.clear() - self.short_id_cache.clear() self.claim_cache.clear() - self.resolution_cache.clear() async def cached_search(self, kwargs): total_referenced = [] @@ -354,21 +350,6 @@ class SearchIndex: for result in expand_result(filter(lambda doc: doc['found'], results["docs"])): self.claim_cache.set(result['claim_id'], result) - async def full_id_from_short_id(self, name, short_id, channel_id=None): - key = '#'.join((channel_id or '', name, short_id)) - if key not in self.short_id_cache: - query = {'name': name, 'claim_id': short_id} - if channel_id: - query['channel_id'] = channel_id - query['order_by'] = ['^channel_join'] - query['signature_valid'] = True - else: - query['order_by'] = '^creation_height' - result, _, _ = await self.search(**query, limit=1) - if len(result) == 1: - result = result[0]['claim_id'] - self.short_id_cache[key] = result - return self.short_id_cache.get(key, None) async def search(self, **kwargs): try: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index c51fc76e4..f49ea5225 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1036,11 +1036,16 @@ class LBRYElectrumX(SessionBase): self.session_mgr.pending_query_metric.dec() self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) - def _claimtrie_resolve(self, *urls): + async def _cached_resolve_url(self, url): + if url not in self.bp.resolve_cache: + self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) + return self.bp.resolve_cache[url] + + async def claimtrie_resolve(self, *urls): rows, extra = [], [] for url in urls: self.session_mgr.urls_to_resolve_count_metric.inc() - stream, channel, repost, reposted_channel = self.db._resolve(url) + stream, channel, repost, reposted_channel = await self._cached_resolve_url(url) if isinstance(channel, ResolveCensoredError): rows.append(channel) extra.append(channel.censor_row) @@ -1067,11 +1072,6 @@ class LBRYElectrumX(SessionBase): # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) return Outputs.to_base64(rows, extra, 0, None, None) - async def claimtrie_resolve(self, *urls): - result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls) - self.session_mgr.resolved_url_count_metric.inc(len(urls)) - return result - async def get_server_height(self): return self.bp.height From 05e5d24c5e3e1aa7f2e90c22ce25c70ffaecec7c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Oct 2021 14:09:16 -0400 Subject: [PATCH 04/19] improve claims_producer performance --- lbry/wallet/server/leveldb.py | 69 ++++++++++++++++------------------- 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 5de62bce6..16176b3b5 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -697,49 +697,44 @@ class LevelDB: batch.clear() async def claims_producer(self, claim_hashes: Set[bytes]): - batch = [] - results = [] - loop = asyncio.get_event_loop() - def produce_claim(claim_hash): - if claim_hash not in self.claim_to_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - name = self.claim_to_txo[claim_hash].normalized_name - if not self.prefix_db.claim_takeover.get(name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - claim_txo = self.claim_to_txo.get(claim_hash) - if not claim_txo: - return - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if claim: - batch.append(claim) + def produce_claims(claims): + batch = [] + _results = [] - def get_metadata(claim): - meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if meta: - results.append(meta) + for claim_hash in claims: + if claim_hash not in self.claim_to_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + name = self.claim_to_txo[claim_hash].normalized_name + if not self.prefix_db.claim_takeover.get(name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + claim_txo = self.claim_to_txo.get(claim_hash) + if not claim_txo: + continue + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: + batch.append(claim) + + batch.sort(key=lambda x: x.tx_hash) + + for claim in batch: + _meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if _meta: + _results.append(_meta) + return _results if claim_hashes: - await asyncio.wait( - [loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes] - ) - batch.sort(key=lambda x: x.tx_hash) + results = await loop.run_in_executor(None, produce_claims, claim_hashes) - if batch: - await asyncio.wait( - [loop.run_in_executor(None, get_metadata, claim) for claim in batch] - ) - for meta in results: - yield meta - - batch.clear() + for meta in results: + yield meta def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list) From 22b43a2b0103dadc0c951f9f21981b78c71c68ef Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Oct 2021 13:17:16 -0400 Subject: [PATCH 05/19] doc strings --- lbry/wallet/server/db/revertable.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index e82c36f12..2a05f2a7d 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -83,11 +83,26 @@ class OpStackIntegrity(Exception): class RevertableOpStack: def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None): + """ + This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity + violations when applying the puts and deletes. The integrity checks assure that keys that do not exist + are not deleted, and that when keys are deleted the current value is correctly known so that the delete + may be undone. When putting values, the integrity checks assure that existing values are not overwritten + without first being deleted. Updates are performed by applying a delete op for the old value and a put op + for the new value. + + :param get_fn: getter function from an object implementing `KeyValueStorage` + :param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged + """ self._get = get_fn self._items = defaultdict(list) self._unsafe_prefixes = unsafe_prefixes or set() def append_op(self, op: RevertableOp): + """ + Apply a put or delete op, checking that it introduces no integrity errors + """ + inverted = op.invert() if self._items[op.key] and inverted == self._items[op.key][-1]: self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both @@ -119,6 +134,9 @@ class RevertableOpStack: self._items[op.key].append(op) def extend_ops(self, ops: Iterable[RevertableOp]): + """ + Apply a sequence of put or delete ops, checking that they introduce no integrity errors + """ for op in ops: self.append_op(op) @@ -139,9 +157,15 @@ class RevertableOpStack: yield op def get_undo_ops(self) -> bytes: + """ + Get the serialized bytes to undo all of the changes made by the pending ops + """ return b''.join(op.invert().pack() for op in reversed(self)) def apply_packed_undo_ops(self, packed: bytes): + """ + Unpack and apply a sequence of undo ops from serialized undo bytes + """ while packed: op, packed = RevertableOp.unpack(packed) self.append_op(op) From 6416d8ce9c3903287ddfabc10ce00c958d144d81 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Oct 2021 15:40:20 -0400 Subject: [PATCH 06/19] threadpools for block processor and es sync reader --- lbry/wallet/server/block_processor.py | 19 +++++--- lbry/wallet/server/leveldb.py | 63 ++++++++++++--------------- lbry/wallet/server/server.py | 2 +- lbry/wallet/server/session.py | 30 +------------ 4 files changed, 43 insertions(+), 71 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index f2c0bcf9a..523386951 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -3,6 +3,7 @@ import asyncio import typing from bisect import bisect_right from struct import pack, unpack +from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple from prometheus_client import Gauge, Histogram from collections import defaultdict @@ -203,6 +204,8 @@ class BlockProcessor: self.env = env self.db = db self.daemon = daemon + self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') + self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync') self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.shutdown_event = shutdown_event self.coin = env.coin @@ -299,7 +302,11 @@ class BlockProcessor: for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() - async for claim in self.db.claims_producer(self.touched_claims_to_send_es): + + to_update = await asyncio.get_event_loop().run_in_executor( + self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es + ) + for claim in to_update: yield 'update', claim async def run_in_thread_with_lock(self, func, *args): @@ -310,13 +317,12 @@ class BlockProcessor: # consistent and not being updated elsewhere. async def run_in_thread_locked(): async with self.state_lock: - return await asyncio.get_event_loop().run_in_executor(None, func, *args) + return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) return await asyncio.shield(run_in_thread_locked()) - @staticmethod - async def run_in_thread(func, *args): + async def run_in_thread(self, func, *args): async def run_in_thread(): - return await asyncio.get_event_loop().run_in_executor(None, func, *args) + return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) return await asyncio.shield(run_in_thread()) async def check_and_advance_blocks(self, raw_blocks): @@ -1746,5 +1752,6 @@ class BlockProcessor: self.status_server.stop() # Shut down block processing self.logger.info('closing the DB for a clean shutdown...') + self._sync_reader_executor.shutdown(wait=True) + self._chain_executor.shutdown(wait=True) self.db.close() - # self.executor.shutdown(wait=True) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 16176b3b5..0d02f9cbb 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -696,45 +696,36 @@ class LevelDB: yield meta batch.clear() - async def claims_producer(self, claim_hashes: Set[bytes]): - loop = asyncio.get_event_loop() + def claims_producer(self, claim_hashes: Set[bytes]): + batch = [] + results = [] - def produce_claims(claims): - batch = [] - _results = [] + for claim_hash in claim_hashes: + if claim_hash not in self.claim_to_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + name = self.claim_to_txo[claim_hash].normalized_name + if not self.prefix_db.claim_takeover.get(name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + claim_txo = self.claim_to_txo.get(claim_hash) + if not claim_txo: + continue + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: + batch.append(claim) - for claim_hash in claims: - if claim_hash not in self.claim_to_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - name = self.claim_to_txo[claim_hash].normalized_name - if not self.prefix_db.claim_takeover.get(name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - claim_txo = self.claim_to_txo.get(claim_hash) - if not claim_txo: - continue - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if claim: - batch.append(claim) + batch.sort(key=lambda x: x.tx_hash) - batch.sort(key=lambda x: x.tx_hash) - - for claim in batch: - _meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if _meta: - _results.append(_meta) - return _results - - if claim_hashes: - results = await loop.run_in_executor(None, produce_claims, claim_hashes) - - for meta in results: - yield meta + for claim in batch: + _meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if _meta: + results.append(_meta) + return results def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list) diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index 2a0a2111e..966d5c31e 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -69,7 +69,7 @@ class Server: def run(self): loop = asyncio.get_event_loop() - executor = ThreadPoolExecutor(self.env.max_query_workers) + executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker') loop.set_default_executor(executor) def __exit(): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index f49ea5225..dfd81f761 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -633,7 +633,7 @@ class SessionManager: self.mempool_statuses.pop(hashX, None) await asyncio.get_event_loop().run_in_executor( - None, touched.intersection_update, self.hashx_subscriptions_by_session.keys() + self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys() ) if touched or new_touched or (height_changed and self.mempool_statuses): @@ -775,10 +775,9 @@ class LBRYSessionManager(SessionManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.query_executor = None self.websocket = None # self.metrics = ServerLoadData() - self.metrics_loop = None + # self.metrics_loop = None self.running = False if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) @@ -795,12 +794,6 @@ class LBRYSessionManager(SessionManager): async def start_other(self): self.running = True - if self.env.max_query_workers is not None and self.env.max_query_workers == 0: - self.query_executor = ThreadPoolExecutor(max_workers=1) - else: - self.query_executor = ProcessPoolExecutor( - max_workers=self.env.max_query_workers or max(os.cpu_count(), 4) - ) if self.websocket is not None: await self.websocket.start() @@ -808,7 +801,6 @@ class LBRYSessionManager(SessionManager): self.running = False if self.websocket is not None: await self.websocket.stop() - self.query_executor.shutdown() class LBRYElectrumX(SessionBase): @@ -971,24 +963,6 @@ class LBRYElectrumX(SessionBase): # else: # return APICallMetrics(query_name) - async def run_in_executor(self, query_name, func, kwargs): - start = time.perf_counter() - try: - self.session_mgr.pending_query_metric.inc() - result = await asyncio.get_running_loop().run_in_executor( - self.session_mgr.query_executor, func, kwargs - ) - except asyncio.CancelledError: - raise - except Exception: - log.exception("dear devs, please handle this exception better") - self.session_mgr.db_error_metric.inc() - raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error') - else: - return base64.b64encode(result).decode() - finally: - self.session_mgr.pending_query_metric.dec() - self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) # async def run_and_cache_query(self, query_name, kwargs): # start = time.perf_counter() From 99df418f1de3434161f6a8ae38337a9722949615 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Oct 2021 16:08:28 -0400 Subject: [PATCH 07/19] improve resolve caching --- lbry/wallet/server/block_processor.py | 3 ++ lbry/wallet/server/session.py | 76 +++++++++++++++++---------- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 523386951..f541ed0d1 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -236,6 +236,8 @@ class BlockProcessor: # self.search_cache = {} self.resolve_cache = LRUCache(2**16) + self.resolve_outputs_cache = LRUCache(2 ** 16) + self.history_cache = {} self.status_server = StatusServer() @@ -1590,6 +1592,7 @@ class BlockProcessor: self.pending_transactions.clear() self.pending_support_amount_change.clear() self.resolve_cache.clear() + self.resolve_outputs_cache.clear() async def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index dfd81f761..9a9346880 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1016,35 +1016,53 @@ class LBRYElectrumX(SessionBase): return self.bp.resolve_cache[url] async def claimtrie_resolve(self, *urls): - rows, extra = [], [] - for url in urls: - self.session_mgr.urls_to_resolve_count_metric.inc() - stream, channel, repost, reposted_channel = await self._cached_resolve_url(url) - if isinstance(channel, ResolveCensoredError): - rows.append(channel) - extra.append(channel.censor_row) - elif isinstance(stream, ResolveCensoredError): - rows.append(stream) - extra.append(stream.censor_row) - elif channel and not stream: - rows.append(channel) - # print("resolved channel", channel.name.decode()) - if repost: - extra.append(repost) - if reposted_channel: - extra.append(reposted_channel) - elif stream: - # print("resolved stream", stream.name.decode()) - rows.append(stream) - if channel: - # print("and channel", channel.name.decode()) - extra.append(channel) - if repost: - extra.append(repost) - if reposted_channel: - extra.append(reposted_channel) - # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) - return Outputs.to_base64(rows, extra, 0, None, None) + sorted_urls = tuple(sorted(urls)) + self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) + + def _cached_resolve(): + rows, extra = [], [] + + for url in urls: + if url not in self.bp.resolve_cache: + self.bp.resolve_cache[url] = self.db._resolve(url) + stream, channel, repost, reposted_channel = self.bp.resolve_cache[url] + if isinstance(channel, ResolveCensoredError): + rows.append(channel) + extra.append(channel.censor_row) + elif isinstance(stream, ResolveCensoredError): + rows.append(stream) + extra.append(stream.censor_row) + elif channel and not stream: + rows.append(channel) + # print("resolved channel", channel.name.decode()) + if repost: + extra.append(repost) + if reposted_channel: + extra.append(reposted_channel) + elif stream: + # print("resolved stream", stream.name.decode()) + rows.append(stream) + if channel: + # print("and channel", channel.name.decode()) + extra.append(channel) + if repost: + extra.append(repost) + if reposted_channel: + extra.append(reposted_channel) + # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) + self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64( + rows, extra, 0, None, None + ) + return serialized_outputs + + try: + if sorted_urls in self.bp.resolve_outputs_cache: + return self.bp.resolve_outputs_cache[sorted_urls] + else: + + return await self.loop.run_in_executor(None, _cached_resolve) + finally: + self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) async def get_server_height(self): return self.bp.height From 7ea1a2b361c262eb25dcb55b3d8c3067d926ffea Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Oct 2021 18:57:46 -0400 Subject: [PATCH 08/19] sleeps --- lbry/wallet/server/leveldb.py | 81 +++++++++---------- lbry/wallet/server/session.py | 39 +++++---- .../test_blockchain_reorganization.py | 2 +- .../takeovers/test_resolve_command.py | 2 +- 4 files changed, 61 insertions(+), 63 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 0d02f9cbb..b5fbdc313 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -911,50 +911,49 @@ class LevelDB: def get_block_txs(self, height: int) -> List[bytes]: return self.prefix_db.block_txs.get(height).tx_hashes - def _fs_transactions(self, txids: Iterable[str]): - tx_counts = self.tx_counts - tx_db_get = self.prefix_db.tx.get - tx_cache = self._tx_and_merkle_cache + async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]): tx_infos = {} - - for tx_hash in txids: - cached_tx = tx_cache.get(tx_hash) - if cached_tx: - tx, merkle = cached_tx - else: - tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = self.prefix_db.tx_num.get(tx_hash_bytes) - tx = None - tx_height = -1 - tx_num = None if not tx_num else tx_num.tx_num - if tx_num is not None: - fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 - tx_height = bisect_right(tx_counts, tx_num) - tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) - if tx_height == -1: - merkle = { - 'block_height': -1 - } - else: - tx_pos = tx_num - tx_counts[tx_height - 1] - branch, root = self.merkle.branch_and_root( - self.get_block_txs(tx_height), tx_pos - ) - merkle = { - 'block_height': tx_height, - 'merkle': [ - hash_to_hex_str(hash) - for hash in branch - ], - 'pos': tx_pos - } - if tx_height + 10 < self.db_height: - tx_cache[tx_hash] = tx, merkle - tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle) + for tx_hash in tx_hashes: + tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor( + None, self._get_transaction_and_merkle, tx_hash + ) + await asyncio.sleep(0) return tx_infos - async def fs_transactions(self, txids): - return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids) + def _get_transaction_and_merkle(self, tx_hash): + cached_tx = self._tx_and_merkle_cache.get(tx_hash) + if cached_tx: + tx, merkle = cached_tx + else: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = self.prefix_db.tx_num.get(tx_hash_bytes) + tx = None + tx_height = -1 + tx_num = None if not tx_num else tx_num.tx_num + if tx_num is not None: + fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 + tx_height = bisect_right(self.tx_counts, tx_num) + tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) + if tx_height == -1: + merkle = { + 'block_height': -1 + } + else: + tx_pos = tx_num - self.tx_counts[tx_height - 1] + branch, root = self.merkle.branch_and_root( + self.get_block_txs(tx_height), tx_pos + ) + merkle = { + 'block_height': tx_height, + 'merkle': [ + hash_to_hex_str(hash) + for hash in branch + ], + 'pos': tx_pos + } + if tx_height + 10 < self.db_height: + self._tx_and_merkle_cache[tx_hash] = tx, merkle + return (None if not tx else tx.hex(), merkle) async def fs_block_hashes(self, height, count): if height + count > len(self.headers): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9a9346880..ca4a0142f 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -548,6 +548,10 @@ class SessionManager: self._clear_stale_sessions(), self._manage_servers() ]) + except Exception as err: + if not isinstance(err, asyncio.CancelledError): + log.exception("hub server died") + raise err finally: await self._close_servers(list(self.servers.keys())) log.warning("disconnect %i sessions", len(self.sessions)) @@ -1015,16 +1019,16 @@ class LBRYElectrumX(SessionBase): self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) return self.bp.resolve_cache[url] - async def claimtrie_resolve(self, *urls): + async def claimtrie_resolve(self, *urls) -> str: sorted_urls = tuple(sorted(urls)) self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) - - def _cached_resolve(): + try: + if sorted_urls in self.bp.resolve_outputs_cache: + return self.bp.resolve_outputs_cache[sorted_urls] rows, extra = [], [] - for url in urls: if url not in self.bp.resolve_cache: - self.bp.resolve_cache[url] = self.db._resolve(url) + self.bp.resolve_cache[url] = await self._cached_resolve_url(url) stream, channel, repost, reposted_channel = self.bp.resolve_cache[url] if isinstance(channel, ResolveCensoredError): rows.append(channel) @@ -1049,18 +1053,11 @@ class LBRYElectrumX(SessionBase): extra.append(repost) if reposted_channel: extra.append(reposted_channel) - # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) - self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64( - rows, extra, 0, None, None + await asyncio.sleep(0) + self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( + None, Outputs.to_base64, rows, extra, 0, None, None ) - return serialized_outputs - - try: - if sorted_urls in self.bp.resolve_outputs_cache: - return self.bp.resolve_outputs_cache[sorted_urls] - else: - - return await self.loop.run_in_executor(None, _cached_resolve) + return result finally: self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) @@ -1213,9 +1210,11 @@ class LBRYElectrumX(SessionBase): address: the address to subscribe to""" if len(addresses) > 1000: raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') - return [ - await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses - ] + results = [] + for address in addresses: + results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) + await asyncio.sleep(0) + return results async def address_unsubscribe(self, address): """Unsubscribe an address. @@ -1464,7 +1463,7 @@ class LBRYElectrumX(SessionBase): raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') for tx_hash in tx_hashes: assert_tx_hash(tx_hash) - batch_result = await self.db.fs_transactions(tx_hashes) + batch_result = await self.db.get_transactions_and_merkles(tx_hashes) needed_merkles = {} for tx_hash in tx_hashes: diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 72724a68e..621655add 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase): self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) - txs = await bp.db.fs_transactions(txids) + txs = await bp.db.get_transactions_and_merkles(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index e1e1d18f7..b5ec87fc8 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase): txids = [ tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height) ] - txs = await bp.db.fs_transactions(txids) + txs = await bp.db.get_transactions_and_merkles(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') From bfac02ccab1d4cc071f7d8eb250298b258a905d7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Oct 2021 20:32:11 -0400 Subject: [PATCH 09/19] add `CACHE_ALL_TX_HASHES` setting to optionally use more memory to save i/o --- lbry/wallet/server/block_processor.py | 16 ++++++--- lbry/wallet/server/env.py | 1 + lbry/wallet/server/leveldb.py | 49 +++++++++++++++++++++------ 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index f541ed0d1..c24efe6f6 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -490,9 +490,7 @@ class BlockProcessor: if signing_channel: raw_channel_tx = self.db.prefix_db.tx.get( - self.db.prefix_db.tx_hash.get( - signing_channel.tx_num, deserialize_value=False - ), deserialize_value=False + self.db.get_tx_hash(signing_channel.tx_num), deserialize_value=False ) channel_pub_key_bytes = None try: @@ -1501,6 +1499,9 @@ class BlockProcessor: self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) self.pending_transactions[tx_count] = tx_hash self.pending_transaction_num_mapping[tx_hash] = tx_count + if self.env.cache_all_tx_hashes: + self.db.total_transactions.append(tx_hash) + self.db.tx_num_mapping[tx_hash] = tx_count tx_count += 1 # handle expired claims @@ -1608,7 +1609,12 @@ class BlockProcessor: self.db.headers.pop() self.db.tx_counts.pop() self.tip = self.coin.header_hash(self.db.headers[-1]) - self.tx_count = self.db.tx_counts[-1] + if self.env.cache_all_tx_hashes: + while len(self.db.total_transactions) > self.db.tx_counts[-1]: + self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + self.tx_count -= 1 + else: + self.tx_count = self.db.tx_counts[-1] self.height -= 1 # self.touched can include other addresses which is # harmless, but remove None. @@ -1659,7 +1665,7 @@ class BlockProcessor: if tx_hash in self.pending_transaction_num_mapping: return self.pending_transaction_num_mapping[tx_hash] else: - return self.db.prefix_db.tx_num.get(tx_hash).tx_num + return self.db.get_tx_num(tx_hash) def spend_utxo(self, tx_hash: bytes, nout: int): hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None)) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 2b4c489b3..82ce3d7fc 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -78,6 +78,7 @@ class Env: self.anon_logs = self.boolean('ANON_LOGS', False) self.log_sessions = self.integer('LOG_SESSIONS', 3600) self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) + self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False) self.country = self.default('COUNTRY', 'US') # Peer discovery self.peer_discovery = self.peer_discovery_enum() diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index b5fbdc313..7348dc086 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -113,6 +113,8 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server") + self.total_transactions: List[bytes] = [] + self.tx_num_mapping: Dict[bytes, int] = {} self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) @@ -201,7 +203,7 @@ class LevelDB: normalized_name = name controlling_claim = self.get_controlling_claim(normalized_name) - tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) + tx_hash = self.get_tx_hash(tx_num) height = bisect_right(self.tx_counts, tx_num) created_height = bisect_right(self.tx_counts, root_tx_num) last_take_over_height = controlling_claim.height @@ -462,7 +464,7 @@ class LevelDB: def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]: expired = {} for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)): - tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False) + tx_hash = self.get_tx_hash(k.tx_num) tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False)) # treat it like a claim spend so it will delete/abandon properly # the _spend_claim function this result is fed to expects a txi, so make a mock one @@ -527,7 +529,7 @@ class LevelDB: if not reposted_claim: return reposted_metadata = self.get_claim_metadata( - self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position + self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position ) if not reposted_metadata: return @@ -541,7 +543,7 @@ class LevelDB: reposted_fee_currency = None reposted_duration = None if reposted_claim: - reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False) + reposted_tx_hash = self.get_tx_hash(reposted_claim.tx_num) raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False) try: reposted_claim_txo = self.coin.transaction( @@ -793,6 +795,21 @@ class LevelDB: assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" self.headers = headers + async def _read_tx_hashes(self): + def _read_tx_hashes(): + return list(self.prefix_db.tx_hash.iterate(include_key=False, fill_cache=False, deserialize_value=False)) + + self.logger.info("loading tx hashes") + self.total_transactions.clear() + self.tx_num_mapping.clear() + start = time.perf_counter() + self.total_transactions.extend(await asyncio.get_event_loop().run_in_executor(None, _read_tx_hashes)) + self.tx_num_mapping = { + tx_hash: tx_num for tx_num, tx_hash in enumerate(self.total_transactions) + } + ts = time.perf_counter() - start + self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4)) + def estimate_timestamp(self, height: int) -> int: if height < len(self.headers): return struct.unpack(' bytes: + if self.env.cache_all_tx_hashes: + return self.total_transactions[tx_num] + return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) + + def get_tx_num(self, tx_hash: bytes) -> int: + if self.env.cache_all_tx_hashes: + return self.tx_num_mapping[tx_hash] + return self.prefix_db.tx_num.get(tx_hash).tx_num + # Header merkle cache async def populate_header_merkle_cache(self): @@ -900,7 +929,7 @@ class LevelDB: if tx_height > self.db_height: return None, tx_height try: - return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height + return self.get_tx_hash(tx_num), tx_height except IndexError: self.logger.exception( "Failed to access a cached transaction, known bug #3142 " @@ -964,13 +993,13 @@ class LevelDB: txs = [] txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): - txs_extend([ - (self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num)) - for tx_num in hist - ]) + txs_extend(hist) if len(txs) >= limit: break - return txs + return [ + (self.get_tx_hash(tx_num), bisect_right(self.tx_counts, tx_num)) + for tx_num in txs + ] async def limited_history(self, hashX, *, limit=1000): """Return an unpruned, sorted list of (tx_hash, height) tuples of From 0e548b381235c3e5b600560e0a56e1b70bda4763 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 16 Oct 2021 17:22:33 -0400 Subject: [PATCH 10/19] remove dead code --- lbry/wallet/server/session.py | 8 ---- lbry/wallet/server/text.py | 82 ----------------------------------- 2 files changed, 90 deletions(-) delete mode 100644 lbry/wallet/server/text.py diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index ca4a0142f..2d564cf29 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -2,8 +2,6 @@ import os import ssl import math import time -import json -import base64 import codecs import typing import asyncio @@ -15,8 +13,6 @@ from asyncio import Event, sleep from collections import defaultdict from functools import partial -from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor - from elasticsearch import ConnectionTimeout from prometheus_client import Counter, Info, Histogram, Gauge @@ -27,7 +23,6 @@ from lbry.schema.result import Outputs from lbry.wallet.server.block_processor import BlockProcessor from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.websocket import AdminWebSocket -from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics from lbry.wallet.rpc.framing import NewlineFramer import lbry.wallet.server.version as VERSION @@ -36,13 +31,11 @@ from lbry.wallet.rpc import ( RPCSession, JSONRPCAutoDetect, JSONRPCConnection, handler_invocation, RPCError, Request, JSONRPC, Notification, Batch ) -from lbry.wallet.server import text from lbry.wallet.server import util from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error from lbry.wallet.server.daemon import DaemonError if typing.TYPE_CHECKING: from lbry.wallet.server.env import Env - from lbry.wallet.server.mempool import MemPool from lbry.wallet.server.daemon import Daemon BAD_REQUEST = 1 @@ -264,7 +257,6 @@ class SessionManager: await self._start_external_servers() paused = False - def _group_map(self): group_map = defaultdict(list) for session in self.sessions.values(): diff --git a/lbry/wallet/server/text.py b/lbry/wallet/server/text.py deleted file mode 100644 index 4919b0c01..000000000 --- a/lbry/wallet/server/text.py +++ /dev/null @@ -1,82 +0,0 @@ -import time - -from lbry.wallet.server import util - - -def sessions_lines(data): - """A generator returning lines for a list of sessions. - - data is the return value of rpc_sessions().""" - fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} ' - '{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}') - yield fmt.format('ID', 'Flags', 'Client', 'Proto', - 'Reqs', 'Txs', 'Subs', - 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer') - for (id_, flags, peer, client, proto, reqs, txs_sent, subs, - recv_count, recv_size, send_count, send_size, time) in data: - yield fmt.format(id_, flags, client, proto, - f'{reqs:,d}', - f'{txs_sent:,d}', - f'{subs:,d}', - f'{recv_count:,d}', - '{:,d}'.format(recv_size // 1024), - f'{send_count:,d}', - '{:,d}'.format(send_size // 1024), - util.formatted_time(time, sep=''), peer) - - -def groups_lines(data): - """A generator returning lines for a list of groups. - - data is the return value of rpc_groups().""" - - fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}' - '{:>7} {:>9} {:>7} {:>9}') - yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs', - 'Recv', 'Recv KB', 'Sent', 'Sent KB') - for (id_, session_count, bandwidth, reqs, txs_sent, subs, - recv_count, recv_size, send_count, send_size) in data: - yield fmt.format(id_, - f'{session_count:,d}', - '{:,d}'.format(bandwidth // 1024), - f'{reqs:,d}', - f'{txs_sent:,d}', - f'{subs:,d}', - f'{recv_count:,d}', - '{:,d}'.format(recv_size // 1024), - f'{send_count:,d}', - '{:,d}'.format(send_size // 1024)) - - -def peers_lines(data): - """A generator returning lines for a list of peers. - - data is the return value of rpc_peers().""" - def time_fmt(t): - if not t: - return 'Never' - return util.formatted_time(now - t) - - now = time.time() - fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>4} ' - '{:>4} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}') - yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min', - 'Max', 'Pruning', 'Last Good', 'Last Try', - 'Tries', 'Source', 'IP Address') - for item in data: - features = item['features'] - hostname = item['host'] - host = features['hosts'][hostname] - yield fmt.format(hostname[:30], - item['status'], - host.get('tcp_port') or '', - host.get('ssl_port') or '', - features['server_version'] or 'unknown', - features['protocol_min'], - features['protocol_max'], - features['pruning'] or '', - time_fmt(item['last_good']), - time_fmt(item['last_try']), - item['try_count'], - item['source'][:20], - item['ip_addr'] or '') From e35319e5a2d0c0955a1a3e66ed3fa5c4d426cc50 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 19 Oct 2021 13:29:13 -0400 Subject: [PATCH 11/19] add `CACHE_ALL_CLAIM_TXOS` hub setting --- lbry/wallet/server/block_processor.py | 28 ++++---- lbry/wallet/server/db/prefixes.py | 12 ++++ lbry/wallet/server/db/revertable.py | 4 ++ lbry/wallet/server/env.py | 1 + lbry/wallet/server/leveldb.py | 76 +++++++++++++++------ tests/unit/wallet/server/test_revertable.py | 3 + 6 files changed, 90 insertions(+), 34 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index c24efe6f6..66cb6af02 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -404,7 +404,8 @@ class BlockProcessor: await self.backup_block() self.logger.info(f'backed up to height {self.height:,d}') - await self.db._read_claim_txos() # TODO: don't do this + if self.env.cache_all_claim_txos: + await self.db._read_claim_txos() # TODO: don't do this for touched in self.touched_claims_to_send_es: if not self.db.get_claim_txo(touched): self.removed_claims_to_send_es.add(touched) @@ -545,10 +546,11 @@ class BlockProcessor: previous_amount = previous_claim.amount self.updated_claims.add(claim_hash) - self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( - tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name - ) - self.db.txo_to_claim[tx_num][nout] = claim_hash + if self.env.cache_all_claim_txos: + self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( + tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name + ) + self.db.txo_to_claim[tx_num][nout] = claim_hash pending = StagedClaimtrieItem( claim_name, normalized_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, @@ -693,7 +695,7 @@ class BlockProcessor: if (txin_num, txin.prev_idx) in self.txo_to_claim: spent = self.txo_to_claim[(txin_num, txin.prev_idx)] else: - if txin_num not in self.db.txo_to_claim or txin.prev_idx not in self.db.txo_to_claim[txin_num]: + if not self.db.get_cached_claim_exists(txin_num, txin.prev_idx): # txo is not a claim return False spent_claim_hash_and_name = self.db.get_claim_from_txo( @@ -701,10 +703,12 @@ class BlockProcessor: ) assert spent_claim_hash_and_name is not None spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) - claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) - if not self.db.txo_to_claim[txin_num]: - self.db.txo_to_claim.pop(txin_num) - self.db.claim_to_txo.pop(claim_hash) + + if self.env.cache_all_claim_txos: + claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) + if not self.db.txo_to_claim[txin_num]: + self.db.txo_to_claim.pop(txin_num) + self.db.claim_to_txo.pop(claim_hash) if spent.reposted_claim_hash: self.pending_reposted.add(spent.reposted_claim_hash) if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims: @@ -1022,8 +1026,8 @@ class BlockProcessor: # prepare to activate or delay activation of the pending claims being added this block for (tx_num, nout), staged in self.txo_to_claim.items(): is_delayed = not staged.is_update - if staged.claim_hash in self.db.claim_to_txo: - prev_txo = self.db.claim_to_txo[staged.claim_hash] + prev_txo = self.db.get_cached_claim_txo(staged.claim_hash) + if prev_txo: prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position) if height < prev_activation or prev_activation < 0: is_delayed = True diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 8b1603312..d71b941b4 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -85,6 +85,18 @@ class PrefixRow(metaclass=PrefixRowType): if v: return v if not deserialize_value else self.unpack_value(v) + def get_pending(self, *key_args, fill_cache=True, deserialize_value=True): + packed_key = self.pack_key(*key_args) + last_op = self._op_stack.get_last_op_for_key(packed_key) + if last_op: + if last_op.is_put: + return last_op.value if not deserialize_value else self.unpack_value(last_op.value) + else: # it's a delete + return + v = self._db.get(packed_key, fill_cache=fill_cache) + if v: + return v if not deserialize_value else self.unpack_value(v) + def stage_put(self, key_args=(), value_args=()): self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))) diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 2a05f2a7d..099c2b48e 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -169,3 +169,7 @@ class RevertableOpStack: while packed: op, packed = RevertableOp.unpack(packed) self.append_op(op) + + def get_last_op_for_key(self, key: bytes) -> Optional[RevertableOp]: + if key in self._items and self._items[key]: + return self._items[key][-1] diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 82ce3d7fc..ff9e2a316 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -79,6 +79,7 @@ class Env: self.log_sessions = self.integer('LOG_SESSIONS', 3600) self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False) + self.cache_all_claim_txos = self.boolean('CACHE_ALL_CLAIM_TXOS', False) self.country = self.default('COUNTRY', 'US') # Peer discovery self.peer_discovery = self.peer_discovery_enum() diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 7348dc086..3839c1ee3 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -113,9 +113,12 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server") + + # these are only used if the cache_all_tx_hashes setting is on self.total_transactions: List[bytes] = [] self.tx_num_mapping: Dict[bytes, int] = {} + # these are only used if the cache_all_claim_txos setting is on self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) @@ -210,7 +213,7 @@ class LevelDB: expiration_height = self.coin.get_expiration_height(height) support_amount = self.get_support_amount(claim_hash) - claim_amount = self.claim_to_txo[claim_hash].amount + claim_amount = self.get_cached_claim_txo(claim_hash).amount effective_amount = support_amount + claim_amount channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) @@ -219,7 +222,7 @@ class LevelDB: canonical_url = short_url claims_in_channel = self.get_claims_in_channel_count(claim_hash) if channel_hash: - channel_vals = self.claim_to_txo.get(channel_hash) + channel_vals = self.get_cached_claim_txo(channel_hash) if channel_vals: channel_short_url = self.get_short_claim_id_url( channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num, @@ -271,11 +274,13 @@ class LevelDB: ) # resolve by partial/complete claim id for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])): - claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position] - non_normalized_name = self.claim_to_txo.get(claim_hash).name - signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid + full_claim_hash = self.get_cached_claim_hash(claim_txo.tx_num, claim_txo.position) + c = self.get_cached_claim_txo(full_claim_hash) + + non_normalized_name = c.name + signature_is_valid = c.channel_signature_is_valid return self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, non_normalized_name, key.root_tx_num, + claim_txo.tx_num, claim_txo.position, full_claim_hash, non_normalized_name, key.root_tx_num, key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position), signature_is_valid ) @@ -285,7 +290,7 @@ class LevelDB: for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))): if amount_order > idx + 1: continue - claim_txo = self.claim_to_txo.get(claim_val.claim_hash) + claim_txo = self.get_cached_claim_txo(claim_val.claim_hash) activation = self.get_activation(key.tx_num, key.position) return self._prepare_resolve_result( key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num, @@ -360,7 +365,7 @@ class LevelDB: return await asyncio.get_event_loop().run_in_executor(None, self._resolve, url) def _fs_get_claim_by_hash(self, claim_hash): - claim = self.claim_to_txo.get(claim_hash) + claim = self.get_cached_claim_txo(claim_hash) if claim: activation = self.get_activation(claim.tx_num, claim.position) return self._prepare_resolve_result( @@ -525,7 +530,7 @@ class LevelDB: reposted_claim = None reposted_metadata = None if reposted_claim_hash: - reposted_claim = self.claim_to_txo.get(reposted_claim_hash) + reposted_claim = self.get_cached_claim_txo(reposted_claim_hash) if not reposted_claim: return reposted_metadata = self.get_claim_metadata( @@ -677,11 +682,21 @@ class LevelDB: async def all_claims_producer(self, batch_size=500_000): batch = [] - for claim_hash, claim_txo in self.claim_to_txo.items(): + if self.env.cache_all_claim_txos: + claim_iterator = self.claim_to_txo.items() + else: + claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate()) + + for claim_hash, claim_txo in claim_iterator: # TODO: fix the couple of claim txos that dont have controlling names if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): continue - claim = self._fs_get_claim_by_hash(claim_hash) + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: batch.append(claim) if len(batch) == batch_size: @@ -703,16 +718,14 @@ class LevelDB: results = [] for claim_hash in claim_hashes: - if claim_hash not in self.claim_to_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - name = self.claim_to_txo[claim_hash].normalized_name - if not self.prefix_db.claim_takeover.get(name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - claim_txo = self.claim_to_txo.get(claim_hash) + claim_txo = self.get_cached_claim_txo(claim_hash) if not claim_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) claim = self._prepare_resolve_result( claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, @@ -764,7 +777,6 @@ class LevelDB: else: assert self.db_tx_count == 0 - async def _read_claim_txos(self): def read_claim_txos(): set_claim_to_txo = self.claim_to_txo.__setitem__ @@ -853,7 +865,8 @@ class LevelDB: # Read TX counts (requires meta directory) await self._read_tx_counts() await self._read_headers() - await self._read_claim_txos() + if self.env.cache_all_claim_txos: + await self._read_claim_txos() if self.env.cache_all_tx_hashes: await self._read_tx_hashes() @@ -873,6 +886,22 @@ class LevelDB: return self.tx_num_mapping[tx_hash] return self.prefix_db.tx_num.get(tx_hash).tx_num + def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: + if self.env.cache_all_claim_txos: + return self.claim_to_txo.get(claim_hash) + return self.prefix_db.claim_to_txo.get_pending(claim_hash) + + def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]: + if self.env.cache_all_claim_txos: + if tx_num not in self.txo_to_claim: + return + return self.txo_to_claim[tx_num].get(position, None) + v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) + return None if not v else v.claim_hash + + def get_cached_claim_exists(self, tx_num: int, position: int) -> bool: + return self.get_cached_claim_hash(tx_num, position) is not None + # Header merkle cache async def populate_header_merkle_cache(self): @@ -960,7 +989,10 @@ class LevelDB: tx_height = -1 tx_num = None if not tx_num else tx_num.tx_num if tx_num is not None: - fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 + if self.env.cache_all_claim_txos: + fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 + else: + fill_cache = False tx_height = bisect_right(self.tx_counts, tx_num) tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) if tx_height == -1: diff --git a/tests/unit/wallet/server/test_revertable.py b/tests/unit/wallet/server/test_revertable.py index f5729689a..79b4cdb0c 100644 --- a/tests/unit/wallet/server/test_revertable.py +++ b/tests/unit/wallet/server/test_revertable.py @@ -123,6 +123,9 @@ class TestRevertablePrefixDB(unittest.TestCase): self.assertIsNone(self.db.claim_takeover.get(name)) self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height)) + self.assertIsNone(self.db.claim_takeover.get(name)) + self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height) + self.db.commit(10000000) self.assertEqual(10000000, self.db.claim_takeover.get(name).height) From 6bef09a3b1444c659f14b627fb38988f31357f25 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 19 Oct 2021 14:00:39 -0400 Subject: [PATCH 12/19] update lbry-hub-elastic-sync to support resyncing recent blocks --- lbry/wallet/server/db/elasticsearch/sync.py | 62 +++++++++++++++++++-- lbry/wallet/server/leveldb.py | 17 ++++++ 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index d990c96cc..7fd76e64b 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -2,7 +2,7 @@ import argparse import asyncio import logging from elasticsearch import AsyncElasticsearch -from elasticsearch.helpers import async_bulk +from elasticsearch.helpers import async_streaming_bulk from lbry.wallet.server.env import Env from lbry.wallet.server.coin import LBC from lbry.wallet.server.leveldb import LevelDB @@ -10,6 +10,50 @@ from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersion from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS +async def get_recent_claims(blocks: int, index_name='claims', db=None): + env = Env(LBC) + need_open = db is None + db = db or LevelDB(env) + if need_open: + await db.open_dbs() + try: + cnt = 0 + state = db.prefix_db.db_state.get() + touched_claims = set() + deleted_claims = set() + for height in range(state.height - blocks + 1, state.height + 1): + touched_or_deleted = db.prefix_db.touched_or_deleted.get(height) + touched_claims.update(touched_or_deleted.touched_claims) + deleted_claims.update(touched_or_deleted.deleted_claims) + touched_claims.difference_update(deleted_claims) + + for deleted in deleted_claims: + yield { + '_index': index_name, + '_op_type': 'delete', + '_id': deleted.hex() + } + for touched in touched_claims: + claim = db.claim_producer(touched) + if claim: + yield { + 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, + '_id': claim['claim_id'], + '_index': index_name, + '_op_type': 'update', + 'doc_as_upsert': True + } + cnt += 1 + else: + logging.warning("could not sync claim %s", touched.hex()) + if cnt % 10000 == 0: + print(f"{cnt} claims sent") + print("sent %i claims, deleted %i" % (len(touched_claims), len(deleted_claims))) + finally: + if need_open: + db.close() + + async def get_all_claims(index_name='claims', db=None): env = Env(LBC) need_open = db is None @@ -52,14 +96,20 @@ async def make_es_index(index=None): index.stop() -async def run_sync(index_name='claims', db=None, clients=32): +async def run_sync(index_name='claims', db=None, clients=32, blocks=0): env = Env(LBC) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) - claim_generator = get_all_claims(index_name=index_name, db=db) - + if blocks > 0: + blocks = min(blocks, 200) + logging.info("Resyncing last %i blocks", blocks) + claim_generator = get_recent_claims(blocks, index_name=index_name, db=db) + else: + claim_generator = get_all_claims(index_name=index_name, db=db) try: - await async_bulk(es, claim_generator, request_timeout=600) + async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False): + if not ok: + logging.warning("indexing failed for an item: %s", item) await es.indices.refresh(index=index_name) finally: await es.close() @@ -85,4 +135,4 @@ def run_elastic_sync(): if not args.force and not asyncio.run(make_es_index()): logging.info("ES is already initialized") return - asyncio.run(run_sync(clients=args.clients)) + asyncio.run(run_sync(clients=args.clients, blocks=args.blocks)) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 3839c1ee3..d33b1016b 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -713,6 +713,23 @@ class LevelDB: yield meta batch.clear() + def claim_producer(self, claim_hash: bytes) -> Optional[Dict]: + claim_txo = self.get_cached_claim_txo(claim_hash) + if not claim_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + return + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + return + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if not claim: + return + return self._prepare_claim_metadata(claim.claim_hash, claim) + def claims_producer(self, claim_hashes: Set[bytes]): batch = [] results = [] From 1facc0cd019684cdb8c6fa86480685682abf8ab6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 19 Oct 2021 16:17:43 -0400 Subject: [PATCH 13/19] remove unused hub env settings --- lbry/wallet/server/db/elasticsearch/search.py | 6 +----- lbry/wallet/server/env.py | 19 ++++--------------- lbry/wallet/server/leveldb.py | 6 +----- 3 files changed, 6 insertions(+), 25 deletions(-) diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index e7a8b58af..c762920ef 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -42,8 +42,7 @@ class IndexVersionMismatch(Exception): class SearchIndex: VERSION = 1 - def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200, - half_life=0.4, whale_threshold=10000, whale_half_life=0.99): + def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200): self.search_timeout = search_timeout self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None @@ -54,9 +53,6 @@ class SearchIndex: self.search_cache = LRUCache(2 ** 17) self._elastic_host = elastic_host self._elastic_port = elastic_port - self._trending_half_life = half_life - self._trending_whale_threshold = whale_threshold - self._trending_whale_half_life = whale_half_life async def get_index_version(self) -> int: try: diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index ff9e2a316..16490b0d4 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -30,7 +30,6 @@ class Env: def __init__(self, coin=None): self.logger = class_logger(__name__, self.__class__.__name__) - self.allow_root = self.boolean('ALLOW_ROOT', False) self.host = self.default('HOST', 'localhost') self.rpc_host = self.default('RPC_HOST', 'localhost') self.elastic_host = self.default('ELASTIC_HOST', 'localhost') @@ -38,17 +37,8 @@ class Env: self.loop_policy = self.set_event_loop_policy() self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.db_dir = self.required('DB_DIRECTORY') - self.db_engine = self.default('DB_ENGINE', 'leveldb') - # self.trending_algorithms = [ - # trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending - # ] - self.trending_half_life = math.log2(0.1 ** (1 / (3 + self.integer('TRENDING_DECAY_RATE', 48)))) + 1 - self.trending_whale_half_life = math.log2(0.1 ** (1 / (3 + self.integer('TRENDING_WHALE_DECAY_RATE', 24)))) + 1 - self.trending_whale_threshold = float(self.integer('TRENDING_WHALE_THRESHOLD', 10000)) * 1E8 self.max_query_workers = self.integer('MAX_QUERY_WORKERS', 4) - self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True) - self.track_metrics = self.boolean('TRACK_METRICS', False) self.websocket_host = self.default('WEBSOCKET_HOST', self.host) self.websocket_port = self.integer('WEBSOCKET_PORT', None) self.daemon_url = self.required('DAEMON_URL') @@ -85,18 +75,17 @@ class Env: self.peer_discovery = self.peer_discovery_enum() self.peer_announce = self.boolean('PEER_ANNOUNCE', True) self.peer_hubs = self.extract_peer_hubs() - self.force_proxy = self.boolean('FORCE_PROXY', False) - self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') - self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) + # self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') + # self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified self.payment_address = self.default('PAYMENT_ADDRESS', '') self.donation_address = self.default('DONATION_ADDRESS', '') # Server limits to help prevent DoS self.max_send = self.integer('MAX_SEND', 1000000) self.max_receive = self.integer('MAX_RECEIVE', 1000000) - self.max_subs = self.integer('MAX_SUBS', 250000) + # self.max_subs = self.integer('MAX_SUBS', 250000) self.max_sessions = self.sane_max_sessions() - self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) + # self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.session_timeout = self.integer('SESSION_TIMEOUT', 600) self.drop_client = self.custom("DROP_CLIENT", None, re.compile) self.description = self.default('DESCRIPTION', '') diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index d33b1016b..dbf9b089c 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -106,8 +106,6 @@ class LevelDB: self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server') self.last_flush = time.time() - self.logger.info(f'using {self.env.db_engine} for DB backend') - # Header merkle cache self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) @@ -125,9 +123,7 @@ class LevelDB: # Search index self.search_index = SearchIndex( self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port, - half_life=self.env.trending_half_life, whale_threshold=self.env.trending_whale_threshold, - whale_half_life=self.env.trending_whale_half_life + elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH) From a27d3b9689414fc89741cb6cdeeca86ecff9616f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 19 Oct 2021 16:18:28 -0400 Subject: [PATCH 14/19] set default `CACHE_MB` to 1024mb and the default `QUERY_TIMEOUT_MS` to 10s --- lbry/wallet/server/env.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 16490b0d4..d081ab8c8 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -51,7 +51,7 @@ class Env: self.coin = Coin.lookup_coin_class(coin_name, network) self.es_index_prefix = self.default('ES_INDEX_PREFIX', '') self.es_mode = self.default('ES_MODE', 'writer') - self.cache_MB = self.integer('CACHE_MB', 4096) + self.cache_MB = self.integer('CACHE_MB', 1024) self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) # Server stuff self.tcp_port = self.integer('TCP_PORT', None) @@ -97,7 +97,7 @@ class Env: self.identities = [identity for identity in (clearnet_identity, tor_identity) if identity is not None] - self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 3000)) / 1000.0 + self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0 @classmethod def default(cls, envvar, default): From b05d071a1cc830adf423fa42860532f821d27b5a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 19 Oct 2021 18:16:36 -0400 Subject: [PATCH 15/19] update Env to accept parameters from cli args --- lbry/wallet/server/cli.py | 16 +-- lbry/wallet/server/env.py | 211 ++++++++++++++++++++++++++-------- lbry/wallet/server/leveldb.py | 2 +- 3 files changed, 167 insertions(+), 62 deletions(-) diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index d99512b93..af80aeffa 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -1,7 +1,6 @@ import logging import traceback import argparse -import importlib from lbry.wallet.server.env import Env from lbry.wallet.server.server import Server @@ -10,27 +9,22 @@ def get_argument_parser(): parser = argparse.ArgumentParser( prog="lbry-hub" ) - parser.add_argument("spvserver", type=str, help="Python class path to SPV server implementation.", - nargs="?", default="lbry.wallet.server.coin.LBC") + Env.contribute_to_arg_parser(parser) + sub = parser.add_subparsers(metavar='COMMAND') + start = sub.add_parser('start', help='Start LBRY Network interface.') + return parser -def get_coin_class(spvserver): - spvserver_path, coin_class_name = spvserver.rsplit('.', 1) - spvserver_module = importlib.import_module(spvserver_path) - return getattr(spvserver_module, coin_class_name) - - def main(): parser = get_argument_parser() args = parser.parse_args() - coin_class = get_coin_class(args.spvserver) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") logging.info('lbry.server starting') logging.getLogger('aiohttp').setLevel(logging.WARNING) logging.getLogger('elasticsearch').setLevel(logging.WARNING) try: - server = Server(Env(coin_class)) + server = Server(Env.from_arg_parser(args)) server.run() except Exception: traceback.print_exc() diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index d081ab8c8..13956d525 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -13,7 +13,7 @@ from collections import namedtuple from ipaddress import ip_address from lbry.wallet.server.util import class_logger -from lbry.wallet.server.coin import Coin +from lbry.wallet.server.coin import Coin, LBC, LBCTestNet, LBCRegTest import lbry.wallet.server.util as lib_util @@ -28,49 +28,65 @@ class Env: class Error(Exception): pass - def __init__(self, coin=None): + def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None, + elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None, + chain=None, es_index_prefix=None, es_mode=None, cache_MB=None, reorg_limit=None, tcp_port=None, + udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None, + prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None, + allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, + payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, + session_timeout=None, drop_client=None, description=None, daily_fee=None, + database_query_timeout=None, db_max_open_files=512): self.logger = class_logger(__name__, self.__class__.__name__) - self.host = self.default('HOST', 'localhost') - self.rpc_host = self.default('RPC_HOST', 'localhost') - self.elastic_host = self.default('ELASTIC_HOST', 'localhost') - self.elastic_port = self.integer('ELASTIC_PORT', 9200) - self.loop_policy = self.set_event_loop_policy() - self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) - self.db_dir = self.required('DB_DIRECTORY') - self.max_query_workers = self.integer('MAX_QUERY_WORKERS', 4) - self.websocket_host = self.default('WEBSOCKET_HOST', self.host) - self.websocket_port = self.integer('WEBSOCKET_PORT', None) - self.daemon_url = self.required('DAEMON_URL') + self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') + self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + self.db_max_open_files = db_max_open_files + + self.host = host if host is not None else self.default('HOST', 'localhost') + self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost') + self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') + self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) + self.loop_policy = self.set_event_loop_policy( + loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None) + ) + self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) + self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4) + self.websocket_host = websocket_host if websocket_host is not None else self.default('WEBSOCKET_HOST', self.host) + self.websocket_port = websocket_port if websocket_port is not None else self.integer('WEBSOCKET_PORT', None) if coin is not None: assert issubclass(coin, Coin) self.coin = coin else: - coin_name = self.required('COIN').strip() - network = self.default('NET', 'mainnet').strip() - self.coin = Coin.lookup_coin_class(coin_name, network) - self.es_index_prefix = self.default('ES_INDEX_PREFIX', '') - self.es_mode = self.default('ES_MODE', 'writer') - self.cache_MB = self.integer('CACHE_MB', 1024) - self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) + chain = chain if chain is not None else self.default('NET', 'mainnet').strip().lower() + if chain == 'mainnet': + self.coin = LBC + elif chain == 'testnet': + self.coin = LBCTestNet + else: + self.coin = LBCRegTest + self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') + self.es_mode = es_mode if es_mode is not None else self.default('ES_MODE', 'writer') + self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024) + self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) # Server stuff - self.tcp_port = self.integer('TCP_PORT', None) - self.udp_port = self.integer('UDP_PORT', self.tcp_port) - self.ssl_port = self.integer('SSL_PORT', None) + self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) + self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port) + self.ssl_port = ssl_port if ssl_port is not None else self.integer('SSL_PORT', None) if self.ssl_port: - self.ssl_certfile = self.required('SSL_CERTFILE') - self.ssl_keyfile = self.required('SSL_KEYFILE') - self.rpc_port = self.integer('RPC_PORT', 8000) - self.prometheus_port = self.integer('PROMETHEUS_PORT', 0) - self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) - self.banner_file = self.default('BANNER_FILE', None) - self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) - self.anon_logs = self.boolean('ANON_LOGS', False) - self.log_sessions = self.integer('LOG_SESSIONS', 3600) - self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) - self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False) - self.cache_all_claim_txos = self.boolean('CACHE_ALL_CLAIM_TXOS', False) - self.country = self.default('COUNTRY', 'US') + self.ssl_certfile = ssl_certfile if ssl_certfile is not None else self.required('SSL_CERTFILE') + self.ssl_keyfile = ssl_keyfile if ssl_keyfile is not None else self.required('SSL_KEYFILE') + self.rpc_port = rpc_port if rpc_port is not None else self.integer('RPC_PORT', 8000) + self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0) + self.max_subscriptions = max_subscriptions if max_subscriptions is not None else self.integer('MAX_SUBSCRIPTIONS', 10000) + self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None) + # self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) + self.anon_logs = anon_logs if anon_logs is not None else self.boolean('ANON_LOGS', False) + self.log_sessions = log_sessions if log_sessions is not None else self.integer('LOG_SESSIONS', 3600) + self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False) + self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False) + self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False) + self.country = country if country is not None else self.default('COUNTRY', 'US') # Peer discovery self.peer_discovery = self.peer_discovery_enum() self.peer_announce = self.boolean('PEER_ANNOUNCE', True) @@ -78,18 +94,18 @@ class Env: # self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') # self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified - self.payment_address = self.default('PAYMENT_ADDRESS', '') - self.donation_address = self.default('DONATION_ADDRESS', '') + self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '') + self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '') # Server limits to help prevent DoS - self.max_send = self.integer('MAX_SEND', 1000000) - self.max_receive = self.integer('MAX_RECEIVE', 1000000) + self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000) + self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000) # self.max_subs = self.integer('MAX_SUBS', 250000) - self.max_sessions = self.sane_max_sessions() + self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions() # self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) - self.session_timeout = self.integer('SESSION_TIMEOUT', 600) - self.drop_client = self.custom("DROP_CLIENT", None, re.compile) - self.description = self.default('DESCRIPTION', '') - self.daily_fee = self.string_amount('DAILY_FEE', '0') + self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600) + self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile) + self.description = description if description is not None else self.default('DESCRIPTION', '') + self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0') # Identities clearnet_identity = self.clearnet_identity() @@ -97,7 +113,8 @@ class Env: self.identities = [identity for identity in (clearnet_identity, tor_identity) if identity is not None] - self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0 + self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \ + (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) @classmethod def default(cls, envvar, default): @@ -149,9 +166,9 @@ class Env: if bad: raise cls.Error(f'remove obsolete environment variables {bad}') - def set_event_loop_policy(self): - policy_name = self.default('EVENT_LOOP_POLICY', None) - if not policy_name: + @classmethod + def set_event_loop_policy(cls, policy_name: str = None): + if not policy_name or policy_name == 'default': import asyncio return asyncio.get_event_loop_policy() elif policy_name == 'uvloop': @@ -160,7 +177,7 @@ class Env: loop_policy = uvloop.EventLoopPolicy() asyncio.set_event_loop_policy(loop_policy) return loop_policy - raise self.Error(f'unknown event loop policy "{policy_name}"') + raise cls.Error(f'unknown event loop policy "{policy_name}"') def cs_host(self, *, for_rpc): """Returns the 'host' argument to pass to asyncio's create_server @@ -269,3 +286,97 @@ class Env: def extract_peer_hubs(self): return [hub.strip() for hub in self.default('PEER_HUBS', '').split(',') if hub.strip()] + + @classmethod + def contribute_to_arg_parser(cls, parser): + parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb') + parser.add_argument('--daemon_url', + help='URL for rpc from lbrycrd, :@') + parser.add_argument('--db_max_open_files', type=int, default=512, + help='number of files leveldb can have open at a time') + parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), + help='Interface for hub server to listen on') + parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001), + help='TCP port to listen on for hub server') + parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001), + help='UDP port to listen on for hub server') + parser.add_argument('--rpc_host', default=cls.default('RPC_HOST', 'localhost'), type=str, + help='Listening interface for admin rpc') + parser.add_argument('--rpc_port', default=cls.integer('RPC_PORT', 8000), type=int, + help='Listening port for admin rpc') + parser.add_argument('--websocket_host', default=cls.default('WEBSOCKET_HOST', 'localhost'), type=str, + help='Listening interface for websocket') + parser.add_argument('--websocket_port', default=cls.integer('WEBSOCKET_PORT', None), type=int, + help='Listening port for websocket') + + parser.add_argument('--ssl_port', default=cls.integer('SSL_PORT', None), type=int, + help='SSL port to listen on for hub server') + parser.add_argument('--ssl_certfile', default=cls.default('SSL_CERTFILE', None), type=str, + help='Path to SSL cert file') + parser.add_argument('--ssl_keyfile', default=cls.default('SSL_KEYFILE', None), type=str, + help='Path to SSL key file') + parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth') + parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, + help='elasticsearch host') + parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, + help='elasticsearch port') + parser.add_argument('--es_mode', default=cls.default('ES_MODE', 'writer'), type=str, + choices=['reader', 'writer']) + parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) + parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str, + choices=['default', 'uvloop']) + parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4), + help='number of threads used by the request handler to read the database') + parser.add_argument('--cache_MB', type=int, default=cls.integer('CACHE_MB', 1024), + help='size of the leveldb lru cache, in megabytes') + parser.add_argument('--cache_all_tx_hashes', type=bool, + help='Load all tx hashes into memory. This will make address subscriptions and sync, ' + 'resolve, transaction fetching, and block sync all faster at the expense of higher ' + 'memory usage') + parser.add_argument('--cache_all_claim_txos', type=bool, + help='Load all claim txos into memory. This will make address subscriptions and sync, ' + 'resolve, transaction fetching, and block sync all faster at the expense of higher ' + 'memory usage') + parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0), + help='port for hub prometheus metrics to listen on, disabled by default') + parser.add_argument('--max_subscriptions', type=int, default=cls.integer('MAX_SUBSCRIPTIONS', 10000), + help='max subscriptions per connection') + parser.add_argument('--banner_file', type=str, default=cls.default('BANNER_FILE', None), + help='path to file containing banner text') + parser.add_argument('--anon_logs', type=bool, default=cls.boolean('ANON_LOGS', False), + help="don't log ip addresses") + parser.add_argument('--allow_lan_udp', type=bool, default=cls.boolean('ALLOW_LAN_UDP', False), + help='reply to hub UDP ping messages from LAN ip addresses') + parser.add_argument('--country', type=str, default=cls.default('COUNTRY', 'US'), help='') + parser.add_argument('--max_send', type=int, default=cls.default('MAX_SEND', 1000000), help='') + parser.add_argument('--max_receive', type=int, default=cls.default('MAX_RECEIVE', 1000000), help='') + parser.add_argument('--max_sessions', type=int, default=cls.default('MAX_SESSIONS', 1000), help='') + parser.add_argument('--session_timeout', type=int, default=cls.default('SESSION_TIMEOUT', 600), help='') + parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), help='') + parser.add_argument('--description', type=str, default=cls.default('DESCRIPTION', ''), help='') + parser.add_argument('--daily_fee', type=float, default=cls.default('DAILY_FEE', 0.0), help='') + parser.add_argument('--payment_address', type=str, default=cls.default('PAYMENT_ADDRESS', ''), help='') + parser.add_argument('--donation_address', type=str, default=cls.default('DONATION_ADDRESS', ''), help='') + parser.add_argument('--chain', type=str, default=cls.default('NET', 'mainnet'), + help="Which chain to use, default is mainnet") + parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), + help="elasticsearch query timeout") + + @classmethod + def from_arg_parser(cls, args): + return cls( + db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, + host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port, + loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host, + websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix, + es_mode=args.es_mode, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, + udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile, + ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port, + max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs, + log_sessions=None, allow_lan_udp=args.allow_lan_udp, + cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos, + country=args.country, payment_address=args.payment_address, donation_address=args.donation_address, + max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions, + session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, + daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000) + ) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index dbf9b089c..6fe42fd13 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -846,7 +846,7 @@ class LevelDB: self.prefix_db = HubDB( os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB, - reorg_limit=self.env.reorg_limit, max_open_files=512 + reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files ) self.logger.info(f'opened db: lbry-leveldb') From 88fd41e597956a7d3f93ce8cd2e14fc1365ad27b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 19 Oct 2021 18:16:56 -0400 Subject: [PATCH 16/19] update docker --- docker/Dockerfile.wallet_server | 1 + docker/docker-compose-wallet-server.yml | 14 +++++++++----- docker/wallet_server_entrypoint.sh | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/docker/Dockerfile.wallet_server b/docker/Dockerfile.wallet_server index a3ed8b60e..185a184a8 100644 --- a/docker/Dockerfile.wallet_server +++ b/docker/Dockerfile.wallet_server @@ -20,6 +20,7 @@ RUN apt-get update && \ python3-dev \ python3-pip \ python3-wheel \ + python3-cffi \ python3-setuptools && \ update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \ rm -rf /var/lib/apt/lists/* diff --git a/docker/docker-compose-wallet-server.yml b/docker/docker-compose-wallet-server.yml index 548386a2d..0c58b8e25 100644 --- a/docker/docker-compose-wallet-server.yml +++ b/docker/docker-compose-wallet-server.yml @@ -18,14 +18,18 @@ services: - "wallet_server:/database" environment: - DAEMON_URL=http://lbry:lbry@127.0.0.1:9245 + - MAX_QUERY_WORKERS=4 + - CACHE_MB=1024 + - CACHE_ALL_TX_HASHES= + - CACHE_ALL_CLAIM_TXOS= + - MAX_SEND=1000000000000000000 + - MAX_RECEIVE=1000000000000000000 + - MAX_SESSIONS=100000 + - HOST=0.0.0.0 - TCP_PORT=50001 - PROMETHEUS_PORT=2112 - - QUERY_TIMEOUT_MS=3000 # how long search queries allowed to run before cancelling, in milliseconds - - TRENDING_ALGORITHMS=variable_decay - - MAX_SEND=10000000000000 # deprecated. leave it high until its removed - - MAX_SUBS=1000000000000 # deprecated. leave it high until its removed - FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8 - - BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 e4e230b131082f6b10c8f7994bbb83f29e8e6fb9 + - BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 es01: image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0 container_name: es01 diff --git a/docker/wallet_server_entrypoint.sh b/docker/wallet_server_entrypoint.sh index 1f87927ed..86cd60dd1 100755 --- a/docker/wallet_server_entrypoint.sh +++ b/docker/wallet_server_entrypoint.sh @@ -6,7 +6,7 @@ set -euo pipefail SNAPSHOT_URL="${SNAPSHOT_URL:-}" #off by default. latest snapshot at https://lbry.com/snapshot/wallet -if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then +if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/lbry-leveldb ]]; then files="$(ls)" echo "Downloading wallet snapshot from $SNAPSHOT_URL" wget --no-verbose --trust-server-names --content-disposition "$SNAPSHOT_URL" @@ -20,6 +20,6 @@ if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then rm "$filename" fi -/home/lbry/.local/bin/lbry-hub-elastic-sync /database/claims.db +/home/lbry/.local/bin/lbry-hub-elastic-sync echo 'starting server' /home/lbry/.local/bin/lbry-hub "$@" From 3dec697816fee42cc3cee614d1247a06ebedd692 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 20 Oct 2021 11:40:15 -0400 Subject: [PATCH 17/19] logging --- lbry/wallet/server/db/elasticsearch/sync.py | 7 +++--- lbry/wallet/server/leveldb.py | 25 +++------------------ 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 7fd76e64b..86614ecd6 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -47,8 +47,8 @@ async def get_recent_claims(blocks: int, index_name='claims', db=None): else: logging.warning("could not sync claim %s", touched.hex()) if cnt % 10000 == 0: - print(f"{cnt} claims sent") - print("sent %i claims, deleted %i" % (len(touched_claims), len(deleted_claims))) + logging.info("%i claims sent to ES", cnt) + logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims)) finally: if need_open: db.close() @@ -60,6 +60,7 @@ async def get_all_claims(index_name='claims', db=None): db = db or LevelDB(env) if need_open: await db.open_dbs() + logging.info("Fetching claims to send ES from leveldb") try: cnt = 0 async for claim in db.all_claims_producer(): @@ -72,7 +73,7 @@ async def get_all_claims(index_name='claims', db=None): } cnt += 1 if cnt % 10000 == 0: - print(f"{cnt} claims sent") + logging.info("sent %i claims to ES", cnt) finally: if need_open: db.close() diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 6fe42fd13..24a742a24 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -498,19 +498,9 @@ class LevelDB: script.parse() return Claim.from_bytes(script.values['claim']) except: - self.logger.error( - "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), - (raw or b'').hex() - ) + self.logger.error("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex()) return - def _prepare_claim_for_sync(self, claim_hash: bytes): - claim = self._fs_get_claim_by_hash(claim_hash) - if not claim: - print("wat") - return - return self._prepare_claim_metadata(claim_hash, claim) - def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult): metadata = self.get_claim_metadata(claim.tx_hash, claim.position) if not metadata: @@ -552,19 +542,10 @@ class LevelDB: ).outputs[reposted_claim.position] reposted_script = OutputScript(reposted_claim_txo.pk_script) reposted_script.parse() - except: - self.logger.error( - "repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), - raw_reposted_claim_tx.hex() - ) - return - try: reposted_metadata = Claim.from_bytes(reposted_script.values['claim']) except: - self.logger.error( - "reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), - raw_reposted_claim_tx.hex() - ) + self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s", + reposted_tx_hash[::-1].hex(), claim_hash.hex()) return if reposted_metadata: if reposted_metadata.is_stream: From a98ea1e66a59c4f8151e4e08e38b250426f35bc4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 20 Oct 2021 19:34:55 -0400 Subject: [PATCH 18/19] update sync script to handle ES falling behind leveldb on shutdown --- lbry/wallet/server/block_processor.py | 22 ++++++- lbry/wallet/server/cli.py | 3 - lbry/wallet/server/db/elasticsearch/sync.py | 63 +++++++++---------- lbry/wallet/server/db/prefixes.py | 11 +++- lbry/wallet/server/db/revertable.py | 4 +- lbry/wallet/server/env.py | 6 +- lbry/wallet/server/leveldb.py | 21 ++++--- .../blockchain/test_wallet_server_sessions.py | 12 ++-- 8 files changed, 85 insertions(+), 57 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 66cb6af02..fa25006a4 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -367,6 +367,7 @@ class BlockProcessor: await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams, self.db.filtered_channels) await self.db.search_index.update_trending_score(self.activation_info_to_send_es) + await self._es_caught_up() self.db.search_index.clear_caches() self.touched_claims_to_send_es.clear() self.removed_claims_to_send_es.clear() @@ -1620,6 +1621,7 @@ class BlockProcessor: else: self.tx_count = self.db.tx_counts[-1] self.height -= 1 + # self.touched can include other addresses which is # harmless, but remove None. self.touched_hashXs.discard(None) @@ -1649,8 +1651,15 @@ class BlockProcessor: self.db.last_flush = now self.db.last_flush_tx_count = self.db.fs_tx_count - await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1) + def rollback(): + self.db.prefix_db.rollback(self.height + 1) + self.db.es_sync_height = self.height + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + + await self.run_in_thread_with_lock(rollback) self.clear_after_advance_or_reorg() + self.db.assert_db_state() elapsed = self.db.last_flush - start_time self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. ' @@ -1713,6 +1722,17 @@ class BlockProcessor: self.logger.exception("error while processing txs") raise + async def _es_caught_up(self): + self.db.es_sync_height = self.height + + def flush(): + assert len(self.db.prefix_db._op_stack) == 0 + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.db.assert_db_state() + + await self.run_in_thread_with_lock(flush) + async def _first_caught_up(self): self.logger.info(f'caught up to height {self.height}') # Flush everything but with first_sync->False state. diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index af80aeffa..74a3d092a 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -10,9 +10,6 @@ def get_argument_parser(): prog="lbry-hub" ) Env.contribute_to_arg_parser(parser) - sub = parser.add_subparsers(metavar='COMMAND') - start = sub.add_parser('start', help='Start LBRY Network interface.') - return parser diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 86614ecd6..35941b61a 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -1,27 +1,28 @@ +import os import argparse import asyncio import logging from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_streaming_bulk from lbry.wallet.server.env import Env -from lbry.wallet.server.coin import LBC from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS -async def get_recent_claims(blocks: int, index_name='claims', db=None): - env = Env(LBC) +async def get_recent_claims(env, index_name='claims', db=None): need_open = db is None db = db or LevelDB(env) - if need_open: - await db.open_dbs() try: + if need_open: + await db.open_dbs() + db_state = db.prefix_db.db_state.get() + if db_state.es_sync_height == db_state.height: + return cnt = 0 - state = db.prefix_db.db_state.get() touched_claims = set() deleted_claims = set() - for height in range(state.height - blocks + 1, state.height + 1): + for height in range(db_state.es_sync_height, db_state.height + 1): touched_or_deleted = db.prefix_db.touched_or_deleted.get(height) touched_claims.update(touched_or_deleted.touched_claims) deleted_claims.update(touched_or_deleted.deleted_claims) @@ -48,14 +49,19 @@ async def get_recent_claims(blocks: int, index_name='claims', db=None): logging.warning("could not sync claim %s", touched.hex()) if cnt % 10000 == 0: logging.info("%i claims sent to ES", cnt) + + db.es_sync_height = db.db_height + db.write_db_state() + db.prefix_db.unsafe_commit() + db.assert_db_state() + logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims)) finally: if need_open: db.close() -async def get_all_claims(index_name='claims', db=None): - env = Env(LBC) +async def get_all_claims(env, index_name='claims', db=None): need_open = db is None db = db or LevelDB(env) if need_open: @@ -79,34 +85,26 @@ async def get_all_claims(index_name='claims', db=None): db.close() -async def make_es_index(index=None): - env = Env(LBC) - if index is None: - index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) - +async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'): + index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port) + logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) try: - return await index.start() + created = await index.start() except IndexVersionMismatch as err: logging.info( "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version ) await index.delete_index() await index.stop() - return await index.start() + created = await index.start() finally: index.stop() - -async def run_sync(index_name='claims', db=None, clients=32, blocks=0): - env = Env(LBC) - logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) - if blocks > 0: - blocks = min(blocks, 200) - logging.info("Resyncing last %i blocks", blocks) - claim_generator = get_recent_claims(blocks, index_name=index_name, db=db) + if force or created: + claim_generator = get_all_claims(env, index_name=index_name, db=db) else: - claim_generator = get_all_claims(index_name=index_name, db=db) + claim_generator = get_recent_claims(env, index_name=index_name, db=db) try: async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False): if not ok: @@ -123,17 +121,14 @@ def run_elastic_sync(): logging.info('lbry.server starting') parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") - # parser.add_argument("db_path", type=str) parser.add_argument("-c", "--clients", type=int, default=32) - parser.add_argument("-b", "--blocks", type=int, default=0) parser.add_argument("-f", "--force", default=False, action='store_true') + Env.contribute_to_arg_parser(parser) args = parser.parse_args() + env = Env.from_arg_parser(args) - # if not args.force and not os.path.exists(args.db_path): - # logging.info("DB path doesnt exist") - # return - - if not args.force and not asyncio.run(make_es_index()): - logging.info("ES is already initialized") + if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')): + logging.info("DB path doesnt exist, nothing to sync to ES") return - asyncio.run(run_sync(clients=args.clients, blocks=args.blocks)) + + asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force)) diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index d71b941b4..c264016fb 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -534,6 +534,7 @@ class DBState(typing.NamedTuple): hist_flush_count: int comp_flush_count: int comp_cursor: int + es_sync_height: int class ActiveAmountPrefixRow(PrefixRow): @@ -1521,7 +1522,7 @@ class SupportAmountPrefixRow(PrefixRow): class DBStatePrefixRow(PrefixRow): prefix = DB_PREFIXES.db_state.value - value_struct = struct.Struct(b'>32sLL32sLLBBlll') + value_struct = struct.Struct(b'>32sLL32sLLBBlllL') key_struct = struct.Struct(b'') key_part_lambdas = [ @@ -1539,15 +1540,19 @@ class DBStatePrefixRow(PrefixRow): @classmethod def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int) -> bytes: + comp_cursor: int, es_sync_height: int) -> bytes: return super().pack_value( genesis, height, tx_count, tip, utxo_flush_count, wall_time, 1 if first_sync else 0, db_version, hist_flush_count, - comp_flush_count, comp_cursor + comp_flush_count, comp_cursor, es_sync_height ) @classmethod def unpack_value(cls, data: bytes) -> DBState: + if len(data) == 94: + # TODO: delete this after making a new snapshot - 10/20/21 + # migrate in the es_sync_height if it doesnt exist + data += data[32:36] return DBState(*super().unpack_value(data)) @classmethod diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 099c2b48e..e59bbcdf3 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -121,14 +121,14 @@ class RevertableOpStack: elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: # there is a value and we're not deleting it in this op # check that a delete for the stored value is in the stack - raise OpStackIntegrity(f"delete {op}") + raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}") elif op.is_delete and not has_stored_val: raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") elif op.is_delete and stored_val != op.value: raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") except OpStackIntegrity as err: if op.key[:1] in self._unsafe_prefixes: - log.error(f"skipping over integrity error: {err}") + log.debug(f"skipping over integrity error: {err}") else: raise err self._items[op.key].append(op) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 13956d525..a109abf76 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -289,9 +289,11 @@ class Env: @classmethod def contribute_to_arg_parser(cls, parser): - parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb') + parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb', + default=cls.default('DB_DIRECTORY', None)) parser.add_argument('--daemon_url', - help='URL for rpc from lbrycrd, :@') + help='URL for rpc from lbrycrd, :@', + default=cls.default('DAEMON_URL', None)) parser.add_argument('--db_max_open_files', type=int, default=512, help='number of files leveldb can have open at a time') parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 24a742a24..dddf3f1fb 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -33,7 +33,7 @@ from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE -from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue +from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow from lbry.wallet.transaction import OutputScript from lbry.schema.claim import Claim, guess_stream_type from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger @@ -87,6 +87,8 @@ class LevelDB: self.hist_comp_flush_count = -1 self.hist_comp_cursor = -1 + self.es_sync_height = 0 + # blocking/filtering dicts blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ') filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ') @@ -827,7 +829,8 @@ class LevelDB: self.prefix_db = HubDB( os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB, - reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files + reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files, + unsafe_prefixes={DBStatePrefixRow.prefix} ) self.logger.info(f'opened db: lbry-leveldb') @@ -1059,7 +1062,8 @@ class LevelDB: self.prefix_db.db_state.stage_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version, - self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor + self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, + self.es_sync_height ) ) @@ -1101,11 +1105,12 @@ class LevelDB: def assert_db_state(self): state = self.prefix_db.db_state.get() - assert self.db_version == state.db_version - assert self.db_height == state.height - assert self.db_tx_count == state.tx_count - assert self.db_tip == state.tip - assert self.first_sync == state.first_sync + assert self.db_version == state.db_version, f"{self.db_version} != {state.db_version}" + assert self.db_height == state.height, f"{self.db_height} != {state.height}" + assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}" + assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}" + assert self.first_sync == state.first_sync, f"{self.first_sync} != {state.first_sync}" + assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}" async def all_utxos(self, hashX): """Return all UTXOs for an address sorted in no particular order.""" diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 4f7930c05..139a0bf0b 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -5,7 +5,7 @@ import lbry.wallet from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.wallet.network import ClientSession from lbry.wallet.rpc import RPCError -from lbry.wallet.server.db.elasticsearch.sync import run_sync, make_es_index +from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync from lbry.wallet.server.session import LBRYElectrumX from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.wallet.orchstr8.node import SPVNode @@ -95,16 +95,17 @@ class TestESSync(CommandTestCase): await self.generate(1) self.assertEqual(10, len(await self.claim_search(order_by=['height']))) db = self.conductor.spv_node.server.db + env = self.conductor.spv_node.server.env + await db.search_index.delete_index() db.search_index.clear_caches() self.assertEqual(0, len(await self.claim_search(order_by=['height']))) await db.search_index.stop() - self.assertTrue(await make_es_index(db.search_index)) async def resync(): await db.search_index.start() db.search_index.clear_caches() - await run_sync(index_name=db.search_index.index, db=db) + await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) self.assertEqual(10, len(await self.claim_search(order_by=['height']))) self.assertEqual(0, len(await self.claim_search(order_by=['height']))) @@ -114,9 +115,12 @@ class TestESSync(CommandTestCase): # this time we will test a migration from unversioned to v1 await db.search_index.sync_client.indices.delete_template(db.search_index.index) await db.search_index.stop() - self.assertTrue(await make_es_index(db.search_index)) + + await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) await db.search_index.start() + await resync() + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) class TestHubDiscovery(CommandTestCase): From 48505c2968d2c2542d900ec8ef3e5aefaa13cd4c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 20 Oct 2021 23:43:10 -0400 Subject: [PATCH 19/19] update trending with help from @eggplantbren --- lbry/wallet/server/block_processor.py | 23 +--- lbry/wallet/server/db/elasticsearch/search.py | 118 ++++++++++++++---- .../takeovers/test_resolve_command.py | 49 +++----- 3 files changed, 117 insertions(+), 73 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index fa25006a4..0e06aebd7 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -30,7 +30,6 @@ if typing.TYPE_CHECKING: class TrendingNotification(NamedTuple): height: int - added: bool prev_amount: int new_amount: int @@ -1322,9 +1321,9 @@ class BlockProcessor: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning) - def _add_claim_activation_change_notification(self, claim_id: str, height: int, added: bool, prev_amount: int, + def _add_claim_activation_change_notification(self, claim_id: str, height: int, prev_amount: int, new_amount: int): - self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, added, prev_amount, new_amount)) + self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, prev_amount, new_amount)) def _get_cumulative_update_ops(self, height: int): # update the last takeover height for names with takeovers @@ -1402,25 +1401,13 @@ class BlockProcessor: (name, prev_effective_amount, amt.tx_num, amt.position), (touched,) ) - if (name, touched) in self.activated_claim_amount_by_name_and_hash: - self._add_claim_activation_change_notification( - touched.hex(), height, True, prev_effective_amount, - self.activated_claim_amount_by_name_and_hash[(name, touched)] - ) - if touched in self.activated_support_amount_by_claim: - for support_amount in self.activated_support_amount_by_claim[touched]: - self._add_claim_activation_change_notification( - touched.hex(), height, True, prev_effective_amount, support_amount - ) - if touched in self.removed_active_support_amount_by_claim: - for support_amount in self.removed_active_support_amount_by_claim[touched]: - self._add_claim_activation_change_notification( - touched.hex(), height, False, prev_effective_amount, support_amount - ) new_effective_amount = self._get_pending_effective_amount(name, touched) self.db.prefix_db.effective_amount.stage_put( (name, new_effective_amount, tx_num, position), (touched,) ) + self._add_claim_activation_change_notification( + touched.hex(), height, prev_effective_amount, new_effective_amount + ) for channel_hash, count in self.pending_channel_counts.items(): if count != 0: diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index c762920ef..68383959a 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -154,21 +154,102 @@ class SearchIndex: async def update_trending_score(self, params): update_trending_score_script = """ - double softenLBC(double lbc) { Math.pow(lbc, 1.0f / 3.0f) } - double inflateUnits(int height) { - int renormalizationPeriod = 100000; - double doublingRate = 400.0f; - Math.pow(2.0, (height % renormalizationPeriod) / doublingRate) + double softenLBC(double lbc) { return (Math.pow(lbc, 1.0 / 3.0)); } + + double logsumexp(double x, double y) + { + double top; + if(x > y) + top = x; + else + top = y; + double result = top + Math.log(Math.exp(x-top) + Math.exp(y-top)); + return(result); } + + double logdiffexp(double big, double small) + { + return big + Math.log(1.0 - Math.exp(small - big)); + } + + double squash(double x) + { + if(x < 0.0) + return -Math.log(1.0 - x); + else + return Math.log(x + 1.0); + } + + double unsquash(double x) + { + if(x < 0.0) + return 1.0 - Math.exp(-x); + else + return Math.exp(x) - 1.0; + } + + double log_to_squash(double x) + { + return logsumexp(x, 0.0); + } + + double squash_to_log(double x) + { + //assert x > 0.0; + return logdiffexp(x, 0.0); + } + + double squashed_add(double x, double y) + { + // squash(unsquash(x) + unsquash(y)) but avoiding overflow. + // Cases where the signs are the same + if (x < 0.0 && y < 0.0) + return -logsumexp(-x, logdiffexp(-y, 0.0)); + if (x >= 0.0 && y >= 0.0) + return logsumexp(x, logdiffexp(y, 0.0)); + // Where the signs differ + if (x >= 0.0 && y < 0.0) + if (Math.abs(x) >= Math.abs(y)) + return logsumexp(0.0, logdiffexp(x, -y)); + else + return -logsumexp(0.0, logdiffexp(-y, x)); + if (x < 0.0 && y >= 0.0) + { + // Addition is commutative, hooray for new math + return squashed_add(y, x); + } + return 0.0; + } + + double squashed_multiply(double x, double y) + { + // squash(unsquash(x)*unsquash(y)) but avoiding overflow. + int sign; + if(x*y >= 0.0) + sign = 1; + else + sign = -1; + return sign*logsumexp(squash_to_log(Math.abs(x)) + + squash_to_log(Math.abs(y)), 0.0); + } + + // Squashed inflated units + double inflateUnits(int height) { + double timescale = 576.0; // Half life of 400 = e-folding time of a day + // by coincidence, so may as well go with it + return log_to_squash(height / timescale); + } + double spikePower(double newAmount) { if (newAmount < 50.0) { - 0.5 + return(0.5); } else if (newAmount < 85.0) { - newAmount / 100.0 + return(newAmount / 100.0); } else { - 0.85 + return(0.85); } } + double spikeMass(double oldAmount, double newAmount) { double softenedChange = softenLBC(Math.abs(newAmount - oldAmount)); double changeInSoftened = Math.abs(softenLBC(newAmount) - softenLBC(oldAmount)); @@ -181,19 +262,11 @@ class SearchIndex: } for (i in params.src.changes) { double units = inflateUnits(i.height); - if (i.added) { - if (ctx._source.trending_score == null) { - ctx._source.trending_score = (units * spikeMass(i.prev_amount, i.prev_amount + i.new_amount)); - } else { - ctx._source.trending_score += (units * spikeMass(i.prev_amount, i.prev_amount + i.new_amount)); - } - } else { - if (ctx._source.trending_score == null) { - ctx._source.trending_score = (units * spikeMass(i.prev_amount, i.prev_amount - i.new_amount)); - } else { - ctx._source.trending_score += (units * spikeMass(i.prev_amount, i.prev_amount - i.new_amount)); - } + if (ctx._source.trending_score == null) { + ctx._source.trending_score = 0.0; } + double bigSpike = squashed_multiply(units, squash(spikeMass(i.prev_amount, i.new_amount))); + ctx._source.trending_score = squashed_add(ctx._source.trending_score, bigSpike); } """ start = time.perf_counter() @@ -211,9 +284,8 @@ class SearchIndex: 'changes': [ { 'height': p.height, - 'added': p.added, - 'prev_amount': p.prev_amount * 1E-9, - 'new_amount': p.new_amount * 1E-9, + 'prev_amount': p.prev_amount / 1E8, + 'new_amount': p.new_amount / 1E8, } for p in claim_updates ] }} diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index b5ec87fc8..45d8dc6b1 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -1397,47 +1397,32 @@ class ResolveClaimTakeovers(BaseResolveTestCase): ))[0][0]['trending_score'] claim_id1 = (await self.stream_create('derp', '1.0'))['outputs'][0]['claim_id'] - claim_id2 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_id3 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_id4 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_id5 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - - COIN = 1E9 + COIN = 1E8 height = 99000 - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id1, height, True, 1 * COIN, 1_000_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id2, height, True, 1 * COIN, 100_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id2, height + 1, False, 100_001 * COIN, 100_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id3, height, True, 1 * COIN, 1_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id4, height, True, 1 * COIN, 10 * COIN + claim_id1, height, 0, 10 * COIN ) await self.generate(1) - - self.assertEqual(3.1711298570548195e+76, await get_trending_score(claim_id1)) - self.assertEqual(-1.369652719234026e+74, await get_trending_score(claim_id2)) - self.assertEqual(2.925275298842502e+75, await get_trending_score(claim_id3)) - self.assertEqual(5.193711055804491e+74, await get_trending_score(claim_id4)) - self.assertEqual(0.6690521635580086, await get_trending_score(claim_id5)) - + self.assertEqual(172.64252836433135, await get_trending_score(claim_id1)) self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id5, height + 100, True, 2 * COIN, 10 * COIN + claim_id1, height + 1, 10 * COIN, 100 * COIN ) await self.generate(1) - self.assertEqual(5.664516565750028e+74, await get_trending_score(claim_id5)) - + self.assertEqual(173.45931832928875, await get_trending_score(claim_id1)) + self.conductor.spv_node.server.bp._add_claim_activation_change_notification( + claim_id1, height + 100, 100 * COIN, 1000000 * COIN + ) + await self.generate(1) + self.assertEqual(176.65517070393514, await get_trending_score(claim_id1)) + self.conductor.spv_node.server.bp._add_claim_activation_change_notification( + claim_id1, height + 200, 1000000 * COIN, 1 * COIN + ) + await self.generate(1) + self.assertEqual(-174.951347102643, await get_trending_score(claim_id1)) search_results = (await self.conductor.spv_node.server.bp.db.search_index.search(claim_name="derp"))[0] - self.assertEqual(5, len(search_results)) - self.assertListEqual([claim_id1, claim_id3, claim_id4, claim_id2, claim_id5], [c['claim_id'] for c in search_results]) + self.assertEqual(1, len(search_results)) + self.assertListEqual([claim_id1], [c['claim_id'] for c in search_results]) class ResolveAfterReorg(BaseResolveTestCase):