diff --git a/lbry/blockchain/sync/steps.py b/lbry/blockchain/sync/steps.py index 93637dacc..f6189db9b 100644 --- a/lbry/blockchain/sync/steps.py +++ b/lbry/blockchain/sync/steps.py @@ -4,7 +4,7 @@ from contextvars import ContextVar from functools import partial from typing import Optional, Tuple -from sqlalchemy import bindparam, case, distinct, text, func, between, desc +from sqlalchemy import table, bindparam, case, distinct, text, func, between, desc from sqlalchemy.future import select from sqlalchemy.schema import CreateTable @@ -170,17 +170,14 @@ def process_spends(initial_sync: bool, p: ProgressContext): p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;")) p.step(next_step()) # G. insert - old_txo = TXO.alias('old_txo') + old_txo = table('old_txo', *(c.copy() for c in TXO.columns)) columns = [c for c in old_txo.columns if c.name != 'spent_height'] select_columns = columns + [func.coalesce(TXI.c.height, 0).label('spent_height')] insert_columns = columns + [TXO.c.spent_height] - select_txos = select(*select_columns).select_from(old_txo.join(TXI, isouter=True)) + join_txo_on_txi = old_txo.join(TXI, old_txo.c.txo_hash == TXI.c.txo_hash, isouter=True) + select_txos = (select(*select_columns).select_from(join_txo_on_txi)) insert_txos = TXO.insert().from_select(insert_columns, select_txos) - p.ctx.execute(text( - str(insert_txos.compile(p.ctx.engine)) - .replace('txo AS old_txo', 'old_txo') - .replace('%(coalesce_1)s', '0') - )) + p.ctx.execute(insert_txos) p.step(next_step()) # H. drop old txo p.ctx.execute(text("DROP TABLE old_txo;")) diff --git a/lbry/db/queries.py b/lbry/db/queries.py index e5365dd49..bc580dad3 100644 --- a/lbry/db/queries.py +++ b/lbry/db/queries.py @@ -39,8 +39,6 @@ log = logging.getLogger(__name__) def check_version_and_create_tables(): with context("db.connecting") as ctx: - if ctx.is_sqlite: - ctx.execute(text("PRAGMA journal_mode=WAL;")) if ctx.has_table('version'): version = ctx.fetchone(select(Version.c.version).limit(1)) if version and version['version'] == SCHEMA_VERSION: diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 2e0362262..4d0de0ac7 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -8,7 +8,7 @@ from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, field from contextvars import ContextVar -from sqlalchemy import create_engine, inspect, bindparam, func, exists, case +from sqlalchemy import create_engine, inspect, bindparam, func, exists, case, event from sqlalchemy.future import select from sqlalchemy.engine import Engine, Connection from sqlalchemy.sql import Insert @@ -158,11 +158,27 @@ def context(with_timer: str = None) -> 'QueryContext': return _context.get() +def set_postgres_settings(connection, _): + cursor = connection.cursor() + cursor.execute('SET work_mem="100MB";') + cursor.close() + + +def set_sqlite_settings(connection, _): + cursor = connection.cursor() + cursor.execute('PRAGMA journal_mode=WAL;') + cursor.close() + + def initialize( ledger: Ledger, message_queue: mp.Queue, stop_event: mp.Event, track_metrics=False, block_and_filter=None, print_timers=None): url = ledger.conf.db_url_or_default engine = create_engine(url) + if engine.name == "postgresql": + event.listen(engine, "connect", set_postgres_settings) + elif engine.name == "sqlite": + event.listen(engine, "connect", set_sqlite_settings) connection = engine.connect() if block_and_filter is not None: blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter diff --git a/lbry/db/sync.py b/lbry/db/sync.py index 492c8c670..9463e540d 100644 --- a/lbry/db/sync.py +++ b/lbry/db/sync.py @@ -68,6 +68,7 @@ def update_spent_outputs(ctx): TXO.c.spent_height: ( select(TXI.c.height) .where(TXI.c.txo_hash == TXO.c.txo_hash) + .scalar_subquery() ) }).where( (TXO.c.spent_height == 0) &