LBRYBlockProcessor -> BlockProcessor

- temporarily disable claim_search
This commit is contained in:
Jack Robison 2021-05-05 15:39:52 -04:00 committed by Victor Shyba
parent 4e58094e4b
commit 2e92f3acad
5 changed files with 22 additions and 139 deletions

View file

@ -11,7 +11,6 @@ import lbry
from lbry.schema.claim import Claim
from lbry.wallet.transaction import OutputScript, Output
from lbry.wallet.server.tx import Tx
from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
@ -238,16 +237,19 @@ class BlockProcessor:
if hprevs == chain:
start = time.perf_counter()
try:
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
for block in blocks:
await self.run_in_thread_with_lock(self.advance_block, block)
print("advanced\n")
except:
self.logger.exception("advance blocks failed")
raise
if self.sql:
await self.db.search_index.claim_consumer(self.sql.claim_producer())
# if self.sql:
# await self.db.search_index.claim_consumer(self.db.claim_producer())
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear() # TODO: is this needed?
self.notifications.notified_mempool_txs.clear()
processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height)
self.block_update_time_metric.observe(processed_time)
@ -256,9 +258,9 @@ class BlockProcessor:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
if self._caught_up_event.is_set():
if self.sql:
await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels)
# if self.sql:
# await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
# self.sql.filtered_streams, self.sql.filtered_channels)
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
elif hprevs[0] != chain[0]:
@ -1122,36 +1124,3 @@ class Timer:
sub_timer.show(depth+1)
if depth == 0:
print('='*100)
class LBRYBlockProcessor(BlockProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.env.coin.NET == "regtest":
self.prefetcher.polling_delay = 0.5
self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False)
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor')
def advance_blocks(self, blocks):
if self.sql:
self.sql.begin()
try:
self.timer.run(super().advance_blocks, blocks)
except:
self.logger.exception(f'Error while advancing transaction in new block.')
raise
finally:
if self.sql:
self.sql.commit()
def advance_txs(self, height, txs, header, block_hash):
timer = self.timer.sub_timers['advance_blocks']
undo = timer.run(super().advance_txs, height, txs, header, block_hash, timer_name='super().advance_txs')
if self.sql:
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
self.timer.show(height=height)
return undo

View file

@ -241,7 +241,6 @@ class Coin:
class LBC(Coin):
DAEMON = LBCDaemon
SESSIONCLS = LBRYElectrumX
BLOCK_PROCESSOR = LBRYBlockProcessor
SESSION_MANAGER = LBRYSessionManager
DESERIALIZER = DeserializerSegWit
DB = LevelDB

View file

@ -18,7 +18,6 @@ from lbry.wallet.server.db.canonical import register_canonical_functions
from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
from lbry.wallet.server.db.elasticsearch import SearchIndex
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
sqlite3.enable_callback_tracebacks(True)
@ -954,41 +953,3 @@ class SQLDB:
r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True)
for algorithm in self.trending:
r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes)
class LBRYLevelDB(LevelDB):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
path = os.path.join(self.env.db_dir, 'claims.db')
trending = []
for algorithm_name in self.env.trending_algorithms:
if algorithm_name in TRENDING_ALGORITHMS:
trending.append(TRENDING_ALGORITHMS[algorithm_name])
if self.env.es_mode == 'reader':
self.logger.info('Index mode: reader')
self.sql = None
else:
self.logger.info('Index mode: writer. Using SQLite db to sync ES')
self.sql = SQLDB(
self, path,
self.env.default('BLOCKING_CHANNEL_IDS', '').split(' '),
self.env.default('FILTERING_CHANNEL_IDS', '').split(' '),
trending
)
# Search index
self.search_index = SearchIndex(
self.env.es_index_prefix, self.env.database_query_timeout, self.env.elastic_host, self.env.elastic_port
)
def close(self):
super().close()
if self.sql:
self.sql.close()
async def _open_dbs(self, *args, **kwargs):
await self.search_index.start()
await super()._open_dbs(*args, **kwargs)
if self.sql:
self.sql.open()

View file

@ -37,6 +37,7 @@ from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name
from lbry.wallet.server.db.claimtrie import get_expiration_height
from lbry.wallet.server.db.elasticsearch import SearchIndex
class UTXO(typing.NamedTuple):
@ -178,6 +179,9 @@ class LevelDB:
self.total_transactions = None
self.transaction_num_mapping = {}
# Search index
self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout)
def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int):
claim_hash_and_name = self.db.get(
DB_PREFIXES.txo_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx)
@ -558,6 +562,9 @@ class LevelDB:
await self._read_txids()
await self._read_headers()
# start search index
await self.search_index.start()
def close(self):
self.db.close()
self.executor.shutdown(wait=True)

View file

@ -24,7 +24,7 @@ import lbry
from lbry.error import TooManyClaimSearchParametersError
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.schema.result import Outputs
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
@ -176,7 +176,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
def __init__(self, env: 'Env', db: LevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send)
self.env = env
@ -914,7 +914,7 @@ class LBRYElectrumX(SessionBase):
self.protocol_tuple = self.PROTOCOL_MIN
self.protocol_string = None
self.daemon = self.session_mgr.daemon
self.bp: LBRYBlockProcessor = self.session_mgr.bp
self.bp: BlockProcessor = self.session_mgr.bp
self.db: LevelDB = self.bp.db
@classmethod
@ -1019,14 +1019,9 @@ class LBRYElectrumX(SessionBase):
return self.mempool.compact_fee_histogram()
async def claimtrie_search(self, **kwargs):
if kwargs:
try:
return await self.run_and_cache_query('search', kwargs)
except TooManyClaimSearchParametersError as err:
await asyncio.sleep(2)
self.logger.warning("Got an invalid query from %s, for %s with more than %d elements.",
self.peer_address()[0], err.key, err.limit)
return RPCError(1, str(err))
raise NotImplementedError()
# if kwargs:
# return await self.run_and_cache_query('search', kwargs)
async def claimtrie_resolve(self, *urls):
rows, extra = [], []
@ -1068,54 +1063,6 @@ class LBRYElectrumX(SessionBase):
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None)
def format_claim_from_daemon(self, claim, name=None):
"""Changes the returned claim data to the format expected by lbry and adds missing fields."""
if not claim:
return {}
# this ISO-8859 nonsense stems from a nasty form of encoding extended characters in lbrycrd
# it will be fixed after the lbrycrd upstream merge to v17 is done
# it originated as a fear of terminals not supporting unicode. alas, they all do
if 'name' in claim:
name = claim['name'].encode('ISO-8859-1').decode()
info = self.db.sql.get_claims(claim_id=claim['claimId'])
if not info:
# raise RPCError("Lbrycrd has {} but not lbryumx, please submit a bug report.".format(claim_id))
return {}
address = info.address.decode()
# fixme: temporary
#supports = self.format_supports_from_daemon(claim.get('supports', []))
supports = []
amount = get_from_possible_keys(claim, 'amount', 'nAmount')
height = get_from_possible_keys(claim, 'height', 'nHeight')
effective_amount = get_from_possible_keys(claim, 'effective amount', 'nEffectiveAmount')
valid_at_height = get_from_possible_keys(claim, 'valid at height', 'nValidAtHeight')
result = {
"name": name,
"claim_id": claim['claimId'],
"txid": claim['txid'],
"nout": claim['n'],
"amount": amount,
"depth": self.db.db_height - height + 1,
"height": height,
"value": hexlify(claim['value'].encode('ISO-8859-1')).decode(),
"address": address, # from index
"supports": supports,
"effective_amount": effective_amount,
"valid_at_height": valid_at_height
}
if 'claim_sequence' in claim:
# TODO: ensure that lbrycrd #209 fills in this value
result['claim_sequence'] = claim['claim_sequence']
else:
result['claim_sequence'] = -1
if 'normalized_name' in claim:
result['normalized_name'] = claim['normalized_name'].encode('ISO-8859-1').decode()
return result
def assert_tx_hash(self, value):
'''Raise an RPCError if the value is not a valid transaction