From 462daf4dc447d30acca83f7db94e9b464ed5f7db Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 13 Jul 2020 12:38:28 -0400 Subject: [PATCH] increase work_mem and add more indexes --- lbry/db/query_context.py | 13 +++++++++---- lbry/db/tables.py | 23 ++++++++++++++++++++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index a33909aea..e2728d361 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -1,5 +1,6 @@ import os import time +import traceback import functools from io import BytesIO import multiprocessing as mp @@ -160,7 +161,7 @@ def context(with_timer: str = None) -> 'QueryContext': def set_postgres_settings(connection, _): cursor = connection.cursor() - cursor.execute('SET work_mem="100MB";') + cursor.execute('SET work_mem="500MB";') cursor.close() @@ -241,7 +242,11 @@ def event_emitter(name: str, *units: str, throttle=1): @functools.wraps(f) def with_progress(*args, **kwargs): with progress(event, throttle=throttle) as p: - return f(*args, **kwargs, p=p) + try: + return f(*args, **kwargs, p=p) + except: + traceback.print_exc() + raise return with_progress return wrapper @@ -546,7 +551,7 @@ class BulkLoader: if claim.is_signed: d['channel_hash'] = claim.signing_channel_hash d['is_signature_valid'] = ( - all((signature, signature_digest, channel_public_key)) & + all((signature, signature_digest, channel_public_key)) and Output.is_signature_valid( signature, signature_digest, channel_public_key ) @@ -584,7 +589,7 @@ class BulkLoader: if support.is_signed: d['channel_hash'] = support.signing_channel_hash d['is_signature_valid'] = ( - all((signature, signature_digest, channel_public_key)) & + all((signature, signature_digest, channel_public_key)) and Output.is_signature_valid( signature, signature_digest, channel_public_key ) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 0d83f65ab..b8fe1e0ed 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -5,7 +5,7 @@ from sqlalchemy import ( LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean, text ) -from .constants import TXO_TYPES +from .constants import TXO_TYPES, CLAIM_TYPE_CODES SCHEMA_VERSION = '1.4' @@ -97,16 +97,34 @@ txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddres def pg_add_txo_constraints_and_indexes(execute): execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) + # find appropriate channel public key for signing a content claim execute(text(f""" CREATE INDEX txo_channel_hash_w_height_desc_and_pub_key ON txo (claim_hash, height desc) INCLUDE (public_key) WHERE txo_type={TXO_TYPES['channel']}; """)) + # update supports for a claim execute(text(f""" CREATE INDEX txo_unspent_supports ON txo (claim_hash) INCLUDE (amount) WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']}; """)) + # claim changes by height + execute(text(f""" + CREATE INDEX txo_claim_changes + ON txo (height DESC) INCLUDE (txo_hash) + WHERE spent_height = 0 AND txo_type IN {tuple(CLAIM_TYPE_CODES)}; + """)) + # supports added + execute(text(f""" + CREATE INDEX txo_added_supports_by_height ON txo (height DESC) + INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']}; + """)) + # supports spent + execute(text(f""" + CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) + INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']}; + """)) TXI = Table( @@ -187,6 +205,9 @@ Claim = Table( def pg_add_claim_constraints_and_indexes(execute): execute(text("ALTER TABLE claim ADD PRIMARY KEY (claim_hash);")) + # finding claims that aren't updated with latest TXO + execute(text("CREATE INDEX claim_txo_hash ON claim (txo_hash);")) + # used to calculate content in a channel execute(text(""" CREATE INDEX signed_content ON claim (channel_hash) INCLUDE (amount) WHERE is_signature_valid;