increase work_mem and add more indexes

This commit is contained in:
Lex Berezhny 2020-07-13 12:38:28 -04:00
parent e63151a370
commit 462daf4dc4
2 changed files with 31 additions and 5 deletions

View file

@ -1,5 +1,6 @@
import os import os
import time import time
import traceback
import functools import functools
from io import BytesIO from io import BytesIO
import multiprocessing as mp import multiprocessing as mp
@ -160,7 +161,7 @@ def context(with_timer: str = None) -> 'QueryContext':
def set_postgres_settings(connection, _): def set_postgres_settings(connection, _):
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute('SET work_mem="100MB";') cursor.execute('SET work_mem="500MB";')
cursor.close() cursor.close()
@ -241,7 +242,11 @@ def event_emitter(name: str, *units: str, throttle=1):
@functools.wraps(f) @functools.wraps(f)
def with_progress(*args, **kwargs): def with_progress(*args, **kwargs):
with progress(event, throttle=throttle) as p: with progress(event, throttle=throttle) as p:
try:
return f(*args, **kwargs, p=p) return f(*args, **kwargs, p=p)
except:
traceback.print_exc()
raise
return with_progress return with_progress
return wrapper return wrapper
@ -546,7 +551,7 @@ class BulkLoader:
if claim.is_signed: if claim.is_signed:
d['channel_hash'] = claim.signing_channel_hash d['channel_hash'] = claim.signing_channel_hash
d['is_signature_valid'] = ( d['is_signature_valid'] = (
all((signature, signature_digest, channel_public_key)) & all((signature, signature_digest, channel_public_key)) and
Output.is_signature_valid( Output.is_signature_valid(
signature, signature_digest, channel_public_key signature, signature_digest, channel_public_key
) )
@ -584,7 +589,7 @@ class BulkLoader:
if support.is_signed: if support.is_signed:
d['channel_hash'] = support.signing_channel_hash d['channel_hash'] = support.signing_channel_hash
d['is_signature_valid'] = ( d['is_signature_valid'] = (
all((signature, signature_digest, channel_public_key)) & all((signature, signature_digest, channel_public_key)) and
Output.is_signature_valid( Output.is_signature_valid(
signature, signature_digest, channel_public_key signature, signature_digest, channel_public_key
) )

View file

@ -5,7 +5,7 @@ from sqlalchemy import (
LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean, LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean,
text text
) )
from .constants import TXO_TYPES from .constants import TXO_TYPES, CLAIM_TYPE_CODES
SCHEMA_VERSION = '1.4' SCHEMA_VERSION = '1.4'
@ -97,16 +97,34 @@ txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddres
def pg_add_txo_constraints_and_indexes(execute): def pg_add_txo_constraints_and_indexes(execute):
execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);"))
# find appropriate channel public key for signing a content claim
execute(text(f""" execute(text(f"""
CREATE INDEX txo_channel_hash_w_height_desc_and_pub_key CREATE INDEX txo_channel_hash_w_height_desc_and_pub_key
ON txo (claim_hash, height desc) INCLUDE (public_key) ON txo (claim_hash, height desc) INCLUDE (public_key)
WHERE txo_type={TXO_TYPES['channel']}; WHERE txo_type={TXO_TYPES['channel']};
""")) """))
# update supports for a claim
execute(text(f""" execute(text(f"""
CREATE INDEX txo_unspent_supports CREATE INDEX txo_unspent_supports
ON txo (claim_hash) INCLUDE (amount) ON txo (claim_hash) INCLUDE (amount)
WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']}; WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']};
""")) """))
# claim changes by height
execute(text(f"""
CREATE INDEX txo_claim_changes
ON txo (height DESC) INCLUDE (txo_hash)
WHERE spent_height = 0 AND txo_type IN {tuple(CLAIM_TYPE_CODES)};
"""))
# supports added
execute(text(f"""
CREATE INDEX txo_added_supports_by_height ON txo (height DESC)
INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};
"""))
# supports spent
execute(text(f"""
CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC)
INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};
"""))
TXI = Table( TXI = Table(
@ -187,6 +205,9 @@ Claim = Table(
def pg_add_claim_constraints_and_indexes(execute): def pg_add_claim_constraints_and_indexes(execute):
execute(text("ALTER TABLE claim ADD PRIMARY KEY (claim_hash);")) execute(text("ALTER TABLE claim ADD PRIMARY KEY (claim_hash);"))
# finding claims that aren't updated with latest TXO
execute(text("CREATE INDEX claim_txo_hash ON claim (txo_hash);"))
# used to calculate content in a channel
execute(text(""" execute(text("""
CREATE INDEX signed_content ON claim (channel_hash) CREATE INDEX signed_content ON claim (channel_hash)
INCLUDE (amount) WHERE is_signature_valid; INCLUDE (amount) WHERE is_signature_valid;