From 8fe16fbfc9618b7a7b49fc0dff17e31efcc102f3 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Thu, 16 May 2019 01:34:18 -0400 Subject: [PATCH] performance --- lbrynet/wallet/server/db.py | 228 ++++++++++++++++++++---------------- 1 file changed, 125 insertions(+), 103 deletions(-) diff --git a/lbrynet/wallet/server/db.py b/lbrynet/wallet/server/db.py index 1bb18aef9..614e1e9c1 100644 --- a/lbrynet/wallet/server/db.py +++ b/lbrynet/wallet/server/db.py @@ -262,11 +262,14 @@ class SQLDB: def split_inputs_into_claims_supports_and_other(self, txis): txo_hashes = set(txi.txo_ref.hash for txi in txis) - claims = dict(self.execute(*query( - "SELECT txo_hash, claim_hash FROM claim", - txo_hash__in=[sqlite3.Binary(txo_hash) for txo_hash in txo_hashes] - ))) - txo_hashes -= set(claims) + claim_txo_hashes = set() + claims = {} + for claim in self.execute(*query( + "SELECT txo_hash, claim_hash, normalized FROM claim", + txo_hash__in=[sqlite3.Binary(txo_hash) for txo_hash in txo_hashes])): + claim_txo_hashes.add(claim[0]) + claims[claim[1]] = claim[2] + txo_hashes -= set(claim_txo_hashes) supports = {} if txo_hashes: supports = dict(self.execute(*query( @@ -298,31 +301,6 @@ class SQLDB: 'support', {'txo_hash__in': [sqlite3.Binary(txo_hash) for txo_hash in txo_hashes]} )) - def _make_claims_without_competition_become_controlling(self, height): - self.execute(f""" - INSERT INTO claimtrie (normalized, claim_hash, last_take_over_height) - SELECT claim.normalized, claim.claim_hash, {height} FROM claim - LEFT JOIN claimtrie USING (normalized) - WHERE claimtrie.claim_hash IS NULL - GROUP BY claim.normalized HAVING COUNT(*) = 1 - """) - self.execute(f""" - UPDATE claim SET activation_height = {height} - WHERE activation_height IS NULL AND claim_hash IN ( - SELECT claim_hash FROM claimtrie - ) - """) - self.execute(f""" - UPDATE claim SET - activation_height = {height} + min(4032, cast( - ( - {height} - - (SELECT last_take_over_height FROM claimtrie - WHERE claimtrie.normalized=claim.normalized) - ) / 32 AS INT)) - WHERE activation_height IS NULL - """) - def _update_trending_amount(self, height): self.execute(f""" UPDATE claim SET @@ -340,39 +318,65 @@ class SQLDB: (SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash), 0 ) WHERE claim_hash IN ({','.join('?' for _ in claim_hashes)}) - """, [sqlite3.Binary(claim_hash) for claim_hash in claim_hashes]) + """, claim_hashes) - def _update_effective_amount(self, height, claim_hashes=None): - sql = f""" - UPDATE claim SET effective_amount = claim.amount + claim.support_amount - WHERE activation_height = {height} - """ - if claim_hashes: - self.execute( - f"{sql} OR (claim_hash IN ({','.join('?' for _ in claim_hashes)}) AND activation_height <= {height})", - [sqlite3.Binary(claim_hash) for claim_hash in claim_hashes] - ) - else: - self.execute(sql) + def _make_claims_without_competition_become_controlling(self, height, changed, timer): + if not changed: + return - def get_overtakings(self): - return self.execute(f""" - SELECT winner.normalized, winner.claim_hash FROM ( - SELECT normalized, claim_hash, MAX(effective_amount) - FROM claim GROUP BY normalized - ) AS winner JOIN claimtrie USING (normalized) - WHERE claimtrie.claim_hash <> winner.claim_hash + t = timer.add_timer('insert into claimtrie') + t.start() + self.execute(f""" + INSERT INTO claimtrie (normalized, claim_hash, last_take_over_height) + SELECT claim.normalized, claim.claim_hash, {height} FROM claim + WHERE normalized IN ({','.join('?' for _ in changed)}) AND + normalized NOT IN (SELECT normalized FROM claimtrie) + GROUP BY normalized HAVING COUNT(*) = 1 + """, list(changed)) + t.stop() + + t = timer.add_timer('set activation_height to current height for default winner') + t.start() + self.execute(f""" + UPDATE claim SET activation_height = {height} + WHERE (activation_height IS NULL OR activation_height > {height}) + AND EXISTS(SELECT * FROM claimtrie WHERE claimtrie.claim_hash=claim.claim_hash) """) + t.stop() - def _perform_overtake(self, height): - for overtake in self.get_overtakings(): + t = timer.add_timer('calculate activation_height for contentious claim name') + t.start() + self.execute(f""" + UPDATE claim SET activation_height = + {height} + + min(4032, cast(({height} - (SELECT last_take_over_height FROM claimtrie + WHERE claimtrie.normalized=claim.normalized)) / 32 AS INT)) + WHERE activation_height IS NULL + """) + t.stop() + + def _update_effective_amount(self, constraints): + where, values = constraints_to_sql(constraints) + self.execute("UPDATE claim SET effective_amount = amount + support_amount WHERE "+where, values) + + def _perform_overtake(self, height, constraints): + where, values = constraints_to_sql(constraints) + overtakes = self.execute(f""" + SELECT winner.normalized, winner.claim_hash FROM claimtrie JOIN ( + SELECT normalized, claim_hash, MAX(effective_amount) + FROM claim WHERE normalized IN (SELECT normalized FROM claim WHERE {where}) + GROUP BY normalized + ) AS winner USING (normalized) WHERE claimtrie.claim_hash <> winner.claim_hash + """, values) + for overtake in overtakes: self.execute( f"UPDATE claim SET activation_height = {height} WHERE normalized = ? " f"AND (activation_height IS NULL OR activation_height > {height})", (overtake['normalized'],) ) self.execute( - f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} WHERE normalized = ?", + f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} " + f"WHERE normalized = ?", (sqlite3.Binary(overtake['claim_hash']), overtake['normalized']) ) @@ -381,18 +385,79 @@ class SQLDB: self.execute(f"DROP TABLE claimtrie{height-50}") self.execute(f"CREATE TABLE claimtrie{height} AS SELECT * FROM claimtrie") - def update_claimtrie(self, height, removed_claims, new_claims, recalc_claims, timer): + def update_claimtrie(self, height, changed_names, recalc_claims, timer): + binary_recalc_claims = [sqlite3.Binary(claim_hash) for claim_hash in recalc_claims] r = timer.run - r(self._make_claims_without_competition_become_controlling, height) - r(self._update_support_amount, recalc_claims) - r(self._update_effective_amount, height, recalc_claims) + r(self._make_claims_without_competition_become_controlling, height, changed_names, forward_timer=True) + r(self._update_support_amount, binary_recalc_claims) if not self.main.first_sync: r(self._update_trending_amount, height) - r(self._perform_overtake, height) - r(self._update_effective_amount, height) - r(self._perform_overtake, height) + + if binary_recalc_claims: + claims_filter = { + '__or': { + 'activation_height': height, + '__and': { + 'claim_hash__in': binary_recalc_claims, + 'activation_height__lte': height + } + } + } + else: + claims_filter = { + 'activation_height': height + } + + r(self._update_effective_amount, claims_filter) + r(self._perform_overtake, height, claims_filter) + r(self._update_effective_amount, {'activation_height': height}) + r(self._perform_overtake, height, {'activation_height': height}) #r(self._copy, height) + def advance_txs(self, height, all_txs, timer): + insert_claims = set() + update_claims = set() + delete_claims = set() + changed_names = set() + recalc_claims = set() + insert_supports = set() + delete_supports = set() + body_timer = timer.add_timer('body') + for position, (etx, txid) in enumerate(all_txs): + tx = timer.run( + Transaction, etx.serialize(), height=height, position=position + ) + spent_claims, spent_supports, spent_other = timer.run( + self.split_inputs_into_claims_supports_and_other, tx.inputs + ) + body_timer.start() + delete_claims.update(spent_claims.keys()) + changed_names.update(spent_claims.values()) + recalc_claims.update(spent_supports.values()) + delete_supports.update(spent_supports) + for output in tx.outputs: + if output.is_support: + insert_supports.add(output) + recalc_claims.add(output.claim_hash) + elif output.script.is_claim_name: + insert_claims.add(output) + recalc_claims.add(output.claim_hash) + changed_names.add(output.normalized_name) + elif output.script.is_update_claim: + claim_hash = output.claim_hash + if claim_hash in delete_claims: + delete_claims.remove(claim_hash) + update_claims.add(output) + recalc_claims.add(claim_hash) + body_timer.stop() + r = timer.run + r(self.delete_claims, delete_claims) + r(self.delete_supports, delete_supports) + r(self.insert_claims, insert_claims) + r(self.update_claims, update_claims) + r(self.insert_supports, insert_supports) + r(self.update_claimtrie, height, changed_names, recalc_claims, forward_timer=True) + def get_claims(self, cols, **constraints): if 'is_controlling' in constraints: if {'sequence', 'amount_order'}.isdisjoint(constraints): @@ -519,49 +584,6 @@ class SQLDB: result.append(channel) return result - def advance_txs(self, height, all_txs, timer): - body_timer = timer.add_timer('body') - body_timer.start() - insert_claims = set() - update_claims = set() - delete_claims = set() - recalc_claims = set() - insert_supports = set() - delete_supports = set() - body_timer.stop() - for position, (etx, txid) in enumerate(all_txs): - tx = timer.run( - Transaction, etx.serialize(), height=height, position=position - ) - spent_claims, spent_supports, spent_other = timer.run( - self.split_inputs_into_claims_supports_and_other, tx.inputs - ) - body_timer.start() - delete_claims.update(spent_claims.values()) - recalc_claims.update(spent_supports.values()) - delete_supports.update(spent_supports) - for output in tx.outputs: - if output.is_support: - insert_supports.add(output) - recalc_claims.add(output.claim_hash) - elif output.script.is_claim_name: - insert_claims.add(output) - recalc_claims.add(output.claim_hash) - elif output.script.is_update_claim: - claim_hash = output.claim_hash - if claim_hash in delete_claims: - delete_claims.remove(claim_hash) - update_claims.add(output) - recalc_claims.add(claim_hash) - body_timer.stop() - r = timer.run - r(self.delete_claims, delete_claims) - r(self.delete_supports, delete_supports) - r(self.insert_claims, insert_claims) - r(self.update_claims, update_claims) - r(self.insert_supports, insert_supports) - r(self.update_claimtrie, height, delete_claims, insert_claims, recalc_claims, forward_timer=True) - class LBRYDB(DB):