diff --git a/lbrynet/wallet/server/block_processor.py b/lbrynet/wallet/server/block_processor.py index d73c0e846..f2b341621 100644 --- a/lbrynet/wallet/server/block_processor.py +++ b/lbrynet/wallet/server/block_processor.py @@ -49,7 +49,7 @@ class Timer: print('='*100) else: print( - f"{' '*depth} {self.total/60:.2f}mins {self.name}" + f"{' '*depth} {self.total/60:4.2f}mins {self.name}" # f"{self.total/self.count:.5f}sec/call, " ) for sub_timer in self.sub_timers.values(): @@ -83,7 +83,7 @@ class LBRYBlockProcessor(BlockProcessor): timer = self.timer.sub_timers['advance_blocks'] undo = timer.run(super().advance_txs, height, txs, timer_name='super().advance_txs') timer.run(self.sql.advance_txs, height, txs, forward_timer=True) - if height % 20000 == 0: + if height % 10000 == 0: self.timer.show(height=height) return undo diff --git a/lbrynet/wallet/server/db.py b/lbrynet/wallet/server/db.py index 02e3dcd72..fc77453ac 100644 --- a/lbrynet/wallet/server/db.py +++ b/lbrynet/wallet/server/db.py @@ -75,6 +75,7 @@ class SQLDB: channel_hash bytes, activation_height integer, effective_amount integer not null default 0, + support_amount integer not null default 0, trending_amount integer not null default 0 ); create index if not exists claim_normalized_idx on claim (normalized); @@ -98,11 +99,11 @@ class SQLDB: CREATE_TAG_TABLE = """ create table if not exists tag ( tag text not null, - txo_hash bytes not null, + claim_hash bytes not null, height integer not null ); create index if not exists tag_tag_idx on tag (tag); - create index if not exists tag_txo_hash_idx on tag (txo_hash); + create index if not exists tag_claim_hash_idx on tag (claim_hash); create index if not exists tag_height_idx on tag (height); """ @@ -123,7 +124,8 @@ class SQLDB: CREATE_TAG_TABLE ) - def __init__(self, path): + def __init__(self, main, path): + self.main = main self._db_path = path self.db = None self.logger = class_logger(__name__, self.__class__.__name__) @@ -172,8 +174,8 @@ class SQLDB: def commit(self): self.execute('commit;') - def _upsertable_claims(self, txos: Set[Output]): - claims, tags = [], [] + def _upsertable_claims(self, txos: Set[Output], clear_first=False): + claim_hashes, claims, tags = [], [], [] for txo in txos: tx = txo.tx_ref.tx @@ -184,13 +186,14 @@ class SQLDB: #self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.") continue - txo_hash = sqlite3.Binary(txo.ref.hash) + claim_hash = sqlite3.Binary(txo.claim_hash) + claim_hashes.append(claim_hash) claim_record = { - 'claim_hash': sqlite3.Binary(txo.claim_hash), + 'claim_hash': claim_hash, 'normalized': txo.normalized_name, 'claim_name': txo.claim_name, 'is_channel': False, - 'txo_hash': txo_hash, + 'txo_hash': sqlite3.Binary(txo.ref.hash), 'tx_position': tx.position, 'height': tx.height, 'amount': txo.amount, @@ -208,11 +211,14 @@ class SQLDB: if claim.signing_channel_hash: claim_record['channel_hash'] = sqlite3.Binary(claim.signing_channel_hash) for tag in claim.message.tags: - tags.append((tag, txo_hash, tx.height)) + tags.append((tag, claim_hash, tx.height)) + + if clear_first: + self._clear_claim_metadata(claim_hashes) if tags: self.db.executemany( - "INSERT INTO tag (tag, txo_hash, height) VALUES (?, ?, ?)", tags + "INSERT INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags ) return claims @@ -231,7 +237,7 @@ class SQLDB: ) def update_claims(self, txos: Set[Output]): - claims = self._upsertable_claims(txos) + claims = self._upsertable_claims(txos, clear_first=True) if claims: self.db.executemany( "UPDATE claim SET " @@ -241,27 +247,34 @@ class SQLDB: claims ) - def clear_claim_metadata(self, txo_hashes: Set[bytes]): - """ Deletes metadata associated with claim in case of an update or an abandon. """ - if txo_hashes: - binary_txo_hashes = [sqlite3.Binary(txo_hash) for txo_hash in txo_hashes] - for table in ('tag',): # 'language', 'location', etc - self.execute(*self._delete_sql(table, {'txo_hash__in': binary_txo_hashes})) - - def abandon_claims(self, claim_hashes: Set[bytes]): + def delete_claims(self, claim_hashes: Set[bytes]): """ Deletes claim supports and from claimtrie in case of an abandon. """ if claim_hashes: binary_claim_hashes = [sqlite3.Binary(claim_hash) for claim_hash in claim_hashes] for table in ('claim', 'support', 'claimtrie'): self.execute(*self._delete_sql(table, {'claim_hash__in': binary_claim_hashes})) + self._clear_claim_metadata(binary_claim_hashes) - def split_inputs_into_claims_and_other(self, txis): - all = set(txi.txo_ref.hash for txi in txis) + def _clear_claim_metadata(self, binary_claim_hashes: List[sqlite3.Binary]): + if binary_claim_hashes: + for table in ('tag',): # 'language', 'location', etc + self.execute(*self._delete_sql(table, {'claim_hash__in': binary_claim_hashes})) + + def split_inputs_into_claims_supports_and_other(self, txis): + txo_hashes = set(txi.txo_ref.hash for txi in txis) claims = dict(self.execute(*query( "SELECT txo_hash, claim_hash FROM claim", - txo_hash__in=[sqlite3.Binary(txo_hash) for txo_hash in all] + txo_hash__in=[sqlite3.Binary(txo_hash) for txo_hash in txo_hashes] ))) - return claims, all-set(claims) + txo_hashes -= set(claims) + supports = {} + if txo_hashes: + supports = dict(self.execute(*query( + "SELECT txo_hash, claim_hash FROM support", + txo_hash__in=[sqlite3.Binary(txo_hash) for txo_hash in txo_hashes] + ))) + txo_hashes -= set(supports) + return claims, supports, txo_hashes def insert_supports(self, txos: Set[Output]): supports = [] @@ -279,7 +292,7 @@ class SQLDB: "VALUES (?, ?, ?, ?, ?)", supports ) - def delete_other_txos(self, txo_hashes: Set[bytes]): + def delete_supports(self, txo_hashes: Set[bytes]): if txo_hashes: self.execute(*self._delete_sql( 'support', {'txo_hash__in': [sqlite3.Binary(txo_hash) for txo_hash in txo_hashes]} @@ -299,26 +312,6 @@ class SQLDB: SELECT claim_hash FROM claimtrie ) """) - - def _update_trending_amount(self, height): - self.execute(f""" - UPDATE claim SET - trending_amount = COALESCE( - (SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash - AND support.height > {height-self.TRENDING_BLOCKS}), 0 - ) - """) - - def _update_effective_amount(self, height): - self.execute(f""" - UPDATE claim SET - effective_amount = claim.amount + COALESCE( - (SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash), 0 - ) - WHERE activation_height <= {height} - """) - - def _set_activation_height(self, height): self.execute(f""" UPDATE claim SET activation_height = {height} + min(4032, cast( @@ -330,6 +323,38 @@ class SQLDB: WHERE activation_height IS NULL """) + def _update_trending_amount(self, height): + self.execute(f""" + UPDATE claim SET + trending_amount = COALESCE( + (SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash + AND support.height > {height-self.TRENDING_BLOCKS}), 0 + ) + """) + + def _update_support_amount(self, claim_hashes): + if claim_hashes: + self.execute(f""" + UPDATE claim SET + support_amount = COALESCE( + (SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash), 0 + ) + WHERE claim_hash IN ({','.join('?' for _ in claim_hashes)}) + """, [sqlite3.Binary(claim_hash) for claim_hash in claim_hashes]) + + def _update_effective_amount(self, height, claim_hashes=None): + sql = f""" + UPDATE claim SET effective_amount = claim.amount + claim.support_amount + WHERE activation_height = {height} + """ + if claim_hashes: + self.execute( + f"{sql} OR (activation_height <= {height} AND claim_hash IN ({','.join('?' for _ in claim_hashes)}))", + [sqlite3.Binary(claim_hash) for claim_hash in claim_hashes] + ) + else: + self.execute(sql) + def get_overtakings(self): return self.execute(f""" SELECT winner.normalized, winner.claim_hash FROM ( @@ -351,12 +376,13 @@ class SQLDB: (sqlite3.Binary(overtake['claim_hash']), overtake['normalized']) ) - def update_claimtrie(self, height, timer): + def update_claimtrie(self, height, removed_claims, new_claims, recalc_claims, timer): r = timer.run r(self._make_claims_without_competition_become_controlling, height) - r(self._update_trending_amount, height) - r(self._update_effective_amount, height) - r(self._set_activation_height, height) + r(self._update_support_amount, recalc_claims) + r(self._update_effective_amount, height, recalc_claims) + if not self.main.first_sync: + r(self._update_trending_amount, height) r(self._perform_overtake, height) r(self._update_effective_amount, height) r(self._perform_overtake, height) @@ -488,49 +514,54 @@ class SQLDB: return result def advance_txs(self, height, all_txs, timer): - sql, txs = self, set() - abandon_claim_hashes, stale_claim_metadata_txo_hashes = set(), set() - insert_claims, update_claims = set(), set() - delete_txo_hashes, insert_supports = set(), set() + body_timer = timer.add_timer('body') + body_timer.start() + insert_claims = set() + update_claims = set() + delete_claims = set() + recalc_claims = set() + insert_supports = set() + delete_supports = set() for position, (etx, txid) in enumerate(all_txs): + body_timer.stop() tx = timer.run( Transaction, etx.serialize(), height=height, position=position ) - claim_abandon_map, delete_txo_hashes = timer.run( - sql.split_inputs_into_claims_and_other, tx.inputs + spent_claims, spent_supports, spent_other = timer.run( + self.split_inputs_into_claims_supports_and_other, tx.inputs ) - stale_claim_metadata_txo_hashes.update(claim_abandon_map) + body_timer.start() + delete_claims.update(spent_claims.values()) + recalc_claims.update(spent_supports.values()) + delete_supports.update(spent_supports) for output in tx.outputs: if output.is_support: - txs.add(tx) insert_supports.add(output) + recalc_claims.add(output.claim_hash) elif output.script.is_claim_name: - txs.add(tx) insert_claims.add(output) + recalc_claims.add(output.claim_hash) elif output.script.is_update_claim: - txs.add(tx) + claim_hash = output.claim_hash + if claim_hash in delete_claims: + delete_claims.remove(claim_hash) update_claims.add(output) - # don't abandon update claims (removes supports & removes from claimtrie) - for txo_hash, input_claim_hash in claim_abandon_map.items(): - if output.claim_hash == input_claim_hash: - del claim_abandon_map[txo_hash] - break - abandon_claim_hashes.update(claim_abandon_map.values()) + recalc_claims.add(claim_hash) + body_timer.stop() r = timer.run - r(sql.abandon_claims, abandon_claim_hashes) - r(sql.clear_claim_metadata, stale_claim_metadata_txo_hashes) - r(sql.delete_other_txos, delete_txo_hashes) - r(sql.insert_claims, insert_claims) - r(sql.update_claims, update_claims) - r(sql.insert_supports, insert_supports) - r(sql.update_claimtrie, height, forward_timer=True) + r(self.delete_claims, delete_claims) + r(self.delete_supports, delete_supports) + r(self.insert_claims, insert_claims) + r(self.update_claims, update_claims) + r(self.insert_supports, insert_supports) + r(self.update_claimtrie, height, delete_claims, insert_claims, recalc_claims, forward_timer=True) class LBRYDB(DB): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.sql = SQLDB('claims.db') + self.sql = SQLDB(self, 'claims.db') def close(self): super().close() diff --git a/tests/unit/wallet/server/test_sqldb.py b/tests/unit/wallet/server/test_sqldb.py index 46f3ad548..2620c7d47 100644 --- a/tests/unit/wallet/server/test_sqldb.py +++ b/tests/unit/wallet/server/test_sqldb.py @@ -3,6 +3,7 @@ from torba.client.constants import COIN, NULL_HASH32 from lbrynet.schema.claim import Claim from lbrynet.wallet.server.db import SQLDB +from lbrynet.wallet.server.block_processor import Timer from lbrynet.wallet.transaction import Transaction, Input, Output @@ -31,13 +32,17 @@ class OldWalletServerTransaction: class TestSQLDB(unittest.TestCase): def setUp(self): - self.sql = SQLDB(':memory:') + self.first_sync = False + self.sql = SQLDB(self, ':memory:') + self.timer = Timer('BlockProcessor') self.sql.open() self._current_height = 0 self._txos = {} - def _make_tx(self, output): + def _make_tx(self, output, txi=None): tx = get_tx().add_outputs([output]) + if txi is not None: + tx.add_inputs([txi]) self._txos[output.ref.hash] = output return OldWalletServerTransaction(tx), tx.hash @@ -58,7 +63,8 @@ class TestSQLDB(unittest.TestCase): return self._make_tx( Output.pay_update_claim_pubkey_hash( amount, claim.claim_name, claim.claim_id, claim.claim, b'abc' - ) + ), + Input.spend(claim) ) def get_support(self, tx, amount): @@ -95,8 +101,10 @@ class TestSQLDB(unittest.TestCase): return accepted def advance(self, height, txs): + #for skipped_height in range(self._current_height+1, height): + # self.sql.advance_txs(skipped_height, [], self.timer) self._current_height = height - self.sql.advance_txs(height, txs) + self.sql.advance_txs(height, txs, self.timer) def state(self, controlling=None, active=None, accepted=None): self.assertEqual(controlling or [], self.get_controlling())