diff --git a/lbrynet/wallet/server/block_processor.py b/lbrynet/wallet/server/block_processor.py index 14025830a..d73c0e846 100644 --- a/lbrynet/wallet/server/block_processor.py +++ b/lbrynet/wallet/server/block_processor.py @@ -1,9 +1,63 @@ +import time + from torba.server.block_processor import BlockProcessor from lbrynet.schema.claim import Claim from lbrynet.wallet.server.db import SQLDB +class Timer: + + def __init__(self, name): + self.name = name + self.total = 0 + self.count = 0 + self.sub_timers = {} + self._last_start = None + + def add_timer(self, name): + if name not in self.sub_timers: + self.sub_timers[name] = Timer(name) + return self.sub_timers[name] + + def run(self, func, *args, forward_timer=False, timer_name=None, **kwargs): + t = self.add_timer(timer_name or func.__name__) + t.start() + try: + if forward_timer: + return func(*args, **kwargs, timer=t) + else: + return func(*args, **kwargs) + finally: + t.stop() + + def start(self): + self._last_start = time.time() + return self + + def stop(self): + self.total += (time.time() - self._last_start) + self.count += 1 + self._last_start = None + return self + + def show(self, depth=0, height=None): + if depth == 0: + print('='*100) + if height is not None: + print(f'STATISTICS AT HEIGHT {height}') + print('='*100) + else: + print( + f"{' '*depth} {self.total/60:.2f}mins {self.name}" + # f"{self.total/self.count:.5f}sec/call, " + ) + for sub_timer in self.sub_timers.values(): + sub_timer.show(depth+1) + if depth == 0: + print('='*100) + + class LBRYBlockProcessor(BlockProcessor): def __init__(self, *args, **kwargs): @@ -13,11 +67,12 @@ class LBRYBlockProcessor(BlockProcessor): self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False) self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.sql: SQLDB = self.db.sql + self.timer = Timer('BlockProcessor') def advance_blocks(self, blocks): self.sql.begin() try: - super().advance_blocks(blocks) + self.timer.run(super().advance_blocks, blocks) except: self.logger.exception(f'Error while advancing transaction in new block.') raise @@ -25,8 +80,11 @@ class LBRYBlockProcessor(BlockProcessor): self.sql.commit() def advance_txs(self, height, txs): - undo = super().advance_txs(height, txs) - self.sql.advance_txs(height, txs) + timer = self.timer.sub_timers['advance_blocks'] + undo = timer.run(super().advance_txs, height, txs, timer_name='super().advance_txs') + timer.run(self.sql.advance_txs, height, txs, forward_timer=True) + if height % 20000 == 0: + self.timer.show(height=height) return undo def _checksig(self, value, address): diff --git a/lbrynet/wallet/server/db.py b/lbrynet/wallet/server/db.py index 40a305bcf..5f1c6daa5 100644 --- a/lbrynet/wallet/server/db.py +++ b/lbrynet/wallet/server/db.py @@ -181,8 +181,8 @@ class SQLDB: try: assert txo.claim_name assert txo.normalized_name - except (AssertionError, UnicodeDecodeError): - self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.") + except: + #self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.") continue txo_hash = sqlite3.Binary(txo.ref.hash) @@ -201,8 +201,8 @@ class SQLDB: try: claim = txo.claim - except DecodeError: - self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.") + except: + #self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.") continue claim_record['is_channel'] = claim.is_channel @@ -348,18 +348,19 @@ class SQLDB: (overtake['normalized'],) ) self.execute( - f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height}", - (sqlite3.Binary(overtake['claim_hash']),) + f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} WHERE normalized = ?", + (sqlite3.Binary(overtake['claim_hash']), overtake['normalized']) ) - def update_claimtrie(self, height): - self._make_claims_without_competition_become_controlling(height) - self._update_trending_amount(height) - self._update_effective_amount(height) - self._set_activation_height(height) - self._perform_overtake(height) - self._update_effective_amount(height) - self._perform_overtake(height) + def update_claimtrie(self, height, timer): + r = timer.run + r(self._make_claims_without_competition_become_controlling, height) + r(self._update_trending_amount, height) + r(self._update_effective_amount, height) + r(self._set_activation_height, height) + r(self._perform_overtake, height) + r(self._update_effective_amount, height) + r(self._perform_overtake, height) def get_claims(self, cols, **constraints): if 'is_controlling' in constraints: @@ -487,14 +488,18 @@ class SQLDB: result.append(channel) return result - def advance_txs(self, height, all_txs): + def advance_txs(self, height, all_txs, timer): sql, txs = self, set() abandon_claim_hashes, stale_claim_metadata_txo_hashes = set(), set() insert_claims, update_claims = set(), set() delete_txo_hashes, insert_supports = set(), set() for position, (etx, txid) in enumerate(all_txs): - tx = Transaction(etx.serialize(), height=height, position=position) - claim_abandon_map, delete_txo_hashes = sql.split_inputs_into_claims_and_other(tx.inputs) + tx = timer.run( + Transaction, etx.serialize(), height=height, position=position + ) + claim_abandon_map, delete_txo_hashes = timer.run( + sql.split_inputs_into_claims_and_other, tx.inputs + ) stale_claim_metadata_txo_hashes.update(claim_abandon_map) for output in tx.outputs: if output.is_support: @@ -512,13 +517,14 @@ class SQLDB: del claim_abandon_map[txo_hash] break abandon_claim_hashes.update(claim_abandon_map.values()) - sql.abandon_claims(abandon_claim_hashes) - sql.clear_claim_metadata(stale_claim_metadata_txo_hashes) - sql.delete_other_txos(delete_txo_hashes) - sql.insert_claims(insert_claims) - sql.update_claims(update_claims) - sql.insert_supports(insert_supports) - sql.update_claimtrie(height) + r = timer.run + r(sql.abandon_claims, abandon_claim_hashes) + r(sql.clear_claim_metadata, stale_claim_metadata_txo_hashes) + r(sql.delete_other_txos, delete_txo_hashes) + r(sql.insert_claims, insert_claims) + r(sql.update_claims, update_claims) + r(sql.insert_supports, insert_supports) + r(sql.update_claimtrie, height, forward_timer=True) class LBRYDB(DB):