diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 7f5806553..672a04d49 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -11,7 +11,6 @@ import lbry from lbry.schema.claim import Claim from lbry.wallet.transaction import OutputScript, Output from lbry.wallet.server.tx import Tx -from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger @@ -238,16 +237,19 @@ class BlockProcessor: if hprevs == chain: start = time.perf_counter() try: - await self.run_in_thread_with_lock(self.advance_blocks, blocks) + for block in blocks: + await self.run_in_thread_with_lock(self.advance_block, block) + print("advanced\n") except: self.logger.exception("advance blocks failed") raise - if self.sql: - await self.db.search_index.claim_consumer(self.sql.claim_producer()) + # if self.sql: + # await self.db.search_index.claim_consumer(self.db.claim_producer()) for cache in self.search_cache.values(): cache.clear() self.history_cache.clear() # TODO: is this needed? self.notifications.notified_mempool_txs.clear() + processed_time = time.perf_counter() - start self.block_count_metric.set(self.height) self.block_update_time_metric.observe(processed_time) @@ -256,9 +258,9 @@ class BlockProcessor: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) if self._caught_up_event.is_set(): - if self.sql: - await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels, - self.sql.filtered_streams, self.sql.filtered_channels) + # if self.sql: + # await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels, + # self.sql.filtered_streams, self.sql.filtered_channels) await self.notifications.on_block(self.touched, self.height) self.touched = set() elif hprevs[0] != chain[0]: @@ -1122,36 +1124,3 @@ class Timer: sub_timer.show(depth+1) if depth == 0: print('='*100) - - -class LBRYBlockProcessor(BlockProcessor): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if self.env.coin.NET == "regtest": - self.prefetcher.polling_delay = 0.5 - self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False) - self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") - self.sql: SQLDB = self.db.sql - self.timer = Timer('BlockProcessor') - - def advance_blocks(self, blocks): - if self.sql: - self.sql.begin() - try: - self.timer.run(super().advance_blocks, blocks) - except: - self.logger.exception(f'Error while advancing transaction in new block.') - raise - finally: - if self.sql: - self.sql.commit() - - def advance_txs(self, height, txs, header, block_hash): - timer = self.timer.sub_timers['advance_blocks'] - undo = timer.run(super().advance_txs, height, txs, header, block_hash, timer_name='super().advance_txs') - if self.sql: - timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True) - if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10): - self.timer.show(height=height) - return undo diff --git a/lbry/wallet/server/coin.py b/lbry/wallet/server/coin.py index 2a75f994d..569cd50bd 100644 --- a/lbry/wallet/server/coin.py +++ b/lbry/wallet/server/coin.py @@ -241,7 +241,6 @@ class Coin: class LBC(Coin): DAEMON = LBCDaemon SESSIONCLS = LBRYElectrumX - BLOCK_PROCESSOR = LBRYBlockProcessor SESSION_MANAGER = LBRYSessionManager DESERIALIZER = DeserializerSegWit DB = LevelDB diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 34e14ced1..4b4de924f 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -18,7 +18,6 @@ from lbry.wallet.server.db.canonical import register_canonical_functions from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES -from lbry.wallet.server.db.elasticsearch import SearchIndex ATTRIBUTE_ARRAY_MAX_LENGTH = 100 sqlite3.enable_callback_tracebacks(True) @@ -954,41 +953,3 @@ class SQLDB: r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True) for algorithm in self.trending: r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes) - - -class LBRYLevelDB(LevelDB): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - path = os.path.join(self.env.db_dir, 'claims.db') - trending = [] - for algorithm_name in self.env.trending_algorithms: - if algorithm_name in TRENDING_ALGORITHMS: - trending.append(TRENDING_ALGORITHMS[algorithm_name]) - if self.env.es_mode == 'reader': - self.logger.info('Index mode: reader') - self.sql = None - else: - self.logger.info('Index mode: writer. Using SQLite db to sync ES') - self.sql = SQLDB( - self, path, - self.env.default('BLOCKING_CHANNEL_IDS', '').split(' '), - self.env.default('FILTERING_CHANNEL_IDS', '').split(' '), - trending - ) - - # Search index - self.search_index = SearchIndex( - self.env.es_index_prefix, self.env.database_query_timeout, self.env.elastic_host, self.env.elastic_port - ) - - def close(self): - super().close() - if self.sql: - self.sql.close() - - async def _open_dbs(self, *args, **kwargs): - await self.search_index.start() - await super()._open_dbs(*args, **kwargs) - if self.sql: - self.sql.open() diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 9518310ad..0c0cb5c6e 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -37,6 +37,7 @@ from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.prefixes import Prefixes from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name from lbry.wallet.server.db.claimtrie import get_expiration_height +from lbry.wallet.server.db.elasticsearch import SearchIndex class UTXO(typing.NamedTuple): @@ -178,6 +179,9 @@ class LevelDB: self.total_transactions = None self.transaction_num_mapping = {} + # Search index + self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout) + def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int): claim_hash_and_name = self.db.get( DB_PREFIXES.txo_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx) @@ -558,6 +562,9 @@ class LevelDB: await self._read_txids() await self._read_headers() + # start search index + await self.search_index.start() + def close(self): self.db.close() self.executor.shutdown(wait=True) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index d5b9ddd69..0c15651bf 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -24,7 +24,7 @@ import lbry from lbry.error import TooManyClaimSearchParametersError from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.schema.result import Outputs -from lbry.wallet.server.block_processor import LBRYBlockProcessor +from lbry.wallet.server.block_processor import BlockProcessor from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics @@ -176,7 +176,7 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'Env', db: LevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', + def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', mempool: 'MemPool', shutdown_event: asyncio.Event): env.max_send = max(350000, env.max_send) self.env = env @@ -914,7 +914,7 @@ class LBRYElectrumX(SessionBase): self.protocol_tuple = self.PROTOCOL_MIN self.protocol_string = None self.daemon = self.session_mgr.daemon - self.bp: LBRYBlockProcessor = self.session_mgr.bp + self.bp: BlockProcessor = self.session_mgr.bp self.db: LevelDB = self.bp.db @classmethod @@ -1019,14 +1019,9 @@ class LBRYElectrumX(SessionBase): return self.mempool.compact_fee_histogram() async def claimtrie_search(self, **kwargs): - if kwargs: - try: - return await self.run_and_cache_query('search', kwargs) - except TooManyClaimSearchParametersError as err: - await asyncio.sleep(2) - self.logger.warning("Got an invalid query from %s, for %s with more than %d elements.", - self.peer_address()[0], err.key, err.limit) - return RPCError(1, str(err)) + raise NotImplementedError() + # if kwargs: + # return await self.run_and_cache_query('search', kwargs) async def claimtrie_resolve(self, *urls): rows, extra = [], [] @@ -1068,54 +1063,6 @@ class LBRYElectrumX(SessionBase): # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) return Outputs.to_base64(rows, extra, 0, None, None) - def format_claim_from_daemon(self, claim, name=None): - """Changes the returned claim data to the format expected by lbry and adds missing fields.""" - - if not claim: - return {} - - # this ISO-8859 nonsense stems from a nasty form of encoding extended characters in lbrycrd - # it will be fixed after the lbrycrd upstream merge to v17 is done - # it originated as a fear of terminals not supporting unicode. alas, they all do - - if 'name' in claim: - name = claim['name'].encode('ISO-8859-1').decode() - info = self.db.sql.get_claims(claim_id=claim['claimId']) - if not info: - # raise RPCError("Lbrycrd has {} but not lbryumx, please submit a bug report.".format(claim_id)) - return {} - address = info.address.decode() - # fixme: temporary - #supports = self.format_supports_from_daemon(claim.get('supports', [])) - supports = [] - - amount = get_from_possible_keys(claim, 'amount', 'nAmount') - height = get_from_possible_keys(claim, 'height', 'nHeight') - effective_amount = get_from_possible_keys(claim, 'effective amount', 'nEffectiveAmount') - valid_at_height = get_from_possible_keys(claim, 'valid at height', 'nValidAtHeight') - - result = { - "name": name, - "claim_id": claim['claimId'], - "txid": claim['txid'], - "nout": claim['n'], - "amount": amount, - "depth": self.db.db_height - height + 1, - "height": height, - "value": hexlify(claim['value'].encode('ISO-8859-1')).decode(), - "address": address, # from index - "supports": supports, - "effective_amount": effective_amount, - "valid_at_height": valid_at_height - } - if 'claim_sequence' in claim: - # TODO: ensure that lbrycrd #209 fills in this value - result['claim_sequence'] = claim['claim_sequence'] - else: - result['claim_sequence'] = -1 - if 'normalized_name' in claim: - result['normalized_name'] = claim['normalized_name'].encode('ISO-8859-1').decode() - return result def assert_tx_hash(self, value): '''Raise an RPCError if the value is not a valid transaction