instrumented most wallet server claim related functions with timer
This commit is contained in:
parent
82c739daa6
commit
0e1705d19e
2 changed files with 91 additions and 27 deletions
|
@ -1,9 +1,63 @@
|
||||||
|
import time
|
||||||
|
|
||||||
from torba.server.block_processor import BlockProcessor
|
from torba.server.block_processor import BlockProcessor
|
||||||
|
|
||||||
from lbrynet.schema.claim import Claim
|
from lbrynet.schema.claim import Claim
|
||||||
from lbrynet.wallet.server.db import SQLDB
|
from lbrynet.wallet.server.db import SQLDB
|
||||||
|
|
||||||
|
|
||||||
|
class Timer:
|
||||||
|
|
||||||
|
def __init__(self, name):
|
||||||
|
self.name = name
|
||||||
|
self.total = 0
|
||||||
|
self.count = 0
|
||||||
|
self.sub_timers = {}
|
||||||
|
self._last_start = None
|
||||||
|
|
||||||
|
def add_timer(self, name):
|
||||||
|
if name not in self.sub_timers:
|
||||||
|
self.sub_timers[name] = Timer(name)
|
||||||
|
return self.sub_timers[name]
|
||||||
|
|
||||||
|
def run(self, func, *args, forward_timer=False, timer_name=None, **kwargs):
|
||||||
|
t = self.add_timer(timer_name or func.__name__)
|
||||||
|
t.start()
|
||||||
|
try:
|
||||||
|
if forward_timer:
|
||||||
|
return func(*args, **kwargs, timer=t)
|
||||||
|
else:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
finally:
|
||||||
|
t.stop()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self._last_start = time.time()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.total += (time.time() - self._last_start)
|
||||||
|
self.count += 1
|
||||||
|
self._last_start = None
|
||||||
|
return self
|
||||||
|
|
||||||
|
def show(self, depth=0, height=None):
|
||||||
|
if depth == 0:
|
||||||
|
print('='*100)
|
||||||
|
if height is not None:
|
||||||
|
print(f'STATISTICS AT HEIGHT {height}')
|
||||||
|
print('='*100)
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
f"{' '*depth} {self.total/60:.2f}mins {self.name}"
|
||||||
|
# f"{self.total/self.count:.5f}sec/call, "
|
||||||
|
)
|
||||||
|
for sub_timer in self.sub_timers.values():
|
||||||
|
sub_timer.show(depth+1)
|
||||||
|
if depth == 0:
|
||||||
|
print('='*100)
|
||||||
|
|
||||||
|
|
||||||
class LBRYBlockProcessor(BlockProcessor):
|
class LBRYBlockProcessor(BlockProcessor):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
@ -13,11 +67,12 @@ class LBRYBlockProcessor(BlockProcessor):
|
||||||
self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False)
|
self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False)
|
||||||
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
|
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
|
||||||
self.sql: SQLDB = self.db.sql
|
self.sql: SQLDB = self.db.sql
|
||||||
|
self.timer = Timer('BlockProcessor')
|
||||||
|
|
||||||
def advance_blocks(self, blocks):
|
def advance_blocks(self, blocks):
|
||||||
self.sql.begin()
|
self.sql.begin()
|
||||||
try:
|
try:
|
||||||
super().advance_blocks(blocks)
|
self.timer.run(super().advance_blocks, blocks)
|
||||||
except:
|
except:
|
||||||
self.logger.exception(f'Error while advancing transaction in new block.')
|
self.logger.exception(f'Error while advancing transaction in new block.')
|
||||||
raise
|
raise
|
||||||
|
@ -25,8 +80,11 @@ class LBRYBlockProcessor(BlockProcessor):
|
||||||
self.sql.commit()
|
self.sql.commit()
|
||||||
|
|
||||||
def advance_txs(self, height, txs):
|
def advance_txs(self, height, txs):
|
||||||
undo = super().advance_txs(height, txs)
|
timer = self.timer.sub_timers['advance_blocks']
|
||||||
self.sql.advance_txs(height, txs)
|
undo = timer.run(super().advance_txs, height, txs, timer_name='super().advance_txs')
|
||||||
|
timer.run(self.sql.advance_txs, height, txs, forward_timer=True)
|
||||||
|
if height % 20000 == 0:
|
||||||
|
self.timer.show(height=height)
|
||||||
return undo
|
return undo
|
||||||
|
|
||||||
def _checksig(self, value, address):
|
def _checksig(self, value, address):
|
||||||
|
|
|
@ -181,8 +181,8 @@ class SQLDB:
|
||||||
try:
|
try:
|
||||||
assert txo.claim_name
|
assert txo.claim_name
|
||||||
assert txo.normalized_name
|
assert txo.normalized_name
|
||||||
except (AssertionError, UnicodeDecodeError):
|
except:
|
||||||
self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.")
|
#self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
txo_hash = sqlite3.Binary(txo.ref.hash)
|
txo_hash = sqlite3.Binary(txo.ref.hash)
|
||||||
|
@ -201,8 +201,8 @@ class SQLDB:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
claim = txo.claim
|
claim = txo.claim
|
||||||
except DecodeError:
|
except:
|
||||||
self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.")
|
#self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
claim_record['is_channel'] = claim.is_channel
|
claim_record['is_channel'] = claim.is_channel
|
||||||
|
@ -348,18 +348,19 @@ class SQLDB:
|
||||||
(overtake['normalized'],)
|
(overtake['normalized'],)
|
||||||
)
|
)
|
||||||
self.execute(
|
self.execute(
|
||||||
f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height}",
|
f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} WHERE normalized = ?",
|
||||||
(sqlite3.Binary(overtake['claim_hash']),)
|
(sqlite3.Binary(overtake['claim_hash']), overtake['normalized'])
|
||||||
)
|
)
|
||||||
|
|
||||||
def update_claimtrie(self, height):
|
def update_claimtrie(self, height, timer):
|
||||||
self._make_claims_without_competition_become_controlling(height)
|
r = timer.run
|
||||||
self._update_trending_amount(height)
|
r(self._make_claims_without_competition_become_controlling, height)
|
||||||
self._update_effective_amount(height)
|
r(self._update_trending_amount, height)
|
||||||
self._set_activation_height(height)
|
r(self._update_effective_amount, height)
|
||||||
self._perform_overtake(height)
|
r(self._set_activation_height, height)
|
||||||
self._update_effective_amount(height)
|
r(self._perform_overtake, height)
|
||||||
self._perform_overtake(height)
|
r(self._update_effective_amount, height)
|
||||||
|
r(self._perform_overtake, height)
|
||||||
|
|
||||||
def get_claims(self, cols, **constraints):
|
def get_claims(self, cols, **constraints):
|
||||||
if 'is_controlling' in constraints:
|
if 'is_controlling' in constraints:
|
||||||
|
@ -487,14 +488,18 @@ class SQLDB:
|
||||||
result.append(channel)
|
result.append(channel)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def advance_txs(self, height, all_txs):
|
def advance_txs(self, height, all_txs, timer):
|
||||||
sql, txs = self, set()
|
sql, txs = self, set()
|
||||||
abandon_claim_hashes, stale_claim_metadata_txo_hashes = set(), set()
|
abandon_claim_hashes, stale_claim_metadata_txo_hashes = set(), set()
|
||||||
insert_claims, update_claims = set(), set()
|
insert_claims, update_claims = set(), set()
|
||||||
delete_txo_hashes, insert_supports = set(), set()
|
delete_txo_hashes, insert_supports = set(), set()
|
||||||
for position, (etx, txid) in enumerate(all_txs):
|
for position, (etx, txid) in enumerate(all_txs):
|
||||||
tx = Transaction(etx.serialize(), height=height, position=position)
|
tx = timer.run(
|
||||||
claim_abandon_map, delete_txo_hashes = sql.split_inputs_into_claims_and_other(tx.inputs)
|
Transaction, etx.serialize(), height=height, position=position
|
||||||
|
)
|
||||||
|
claim_abandon_map, delete_txo_hashes = timer.run(
|
||||||
|
sql.split_inputs_into_claims_and_other, tx.inputs
|
||||||
|
)
|
||||||
stale_claim_metadata_txo_hashes.update(claim_abandon_map)
|
stale_claim_metadata_txo_hashes.update(claim_abandon_map)
|
||||||
for output in tx.outputs:
|
for output in tx.outputs:
|
||||||
if output.is_support:
|
if output.is_support:
|
||||||
|
@ -512,13 +517,14 @@ class SQLDB:
|
||||||
del claim_abandon_map[txo_hash]
|
del claim_abandon_map[txo_hash]
|
||||||
break
|
break
|
||||||
abandon_claim_hashes.update(claim_abandon_map.values())
|
abandon_claim_hashes.update(claim_abandon_map.values())
|
||||||
sql.abandon_claims(abandon_claim_hashes)
|
r = timer.run
|
||||||
sql.clear_claim_metadata(stale_claim_metadata_txo_hashes)
|
r(sql.abandon_claims, abandon_claim_hashes)
|
||||||
sql.delete_other_txos(delete_txo_hashes)
|
r(sql.clear_claim_metadata, stale_claim_metadata_txo_hashes)
|
||||||
sql.insert_claims(insert_claims)
|
r(sql.delete_other_txos, delete_txo_hashes)
|
||||||
sql.update_claims(update_claims)
|
r(sql.insert_claims, insert_claims)
|
||||||
sql.insert_supports(insert_supports)
|
r(sql.update_claims, update_claims)
|
||||||
sql.update_claimtrie(height)
|
r(sql.insert_supports, insert_supports)
|
||||||
|
r(sql.update_claimtrie, height, forward_timer=True)
|
||||||
|
|
||||||
|
|
||||||
class LBRYDB(DB):
|
class LBRYDB(DB):
|
||||||
|
|
Loading…
Reference in a new issue