From 39d8a20fd51cd4f73169656e5aa6f6afcac4be6b Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Tue, 30 Jun 2020 17:32:51 -0400 Subject: [PATCH] pgcopy COPY command --- lbry/blockchain/sync/steps.py | 33 ++++++--- lbry/db/queries.py | 20 +++--- lbry/db/query_context.py | 68 +++++++++++++------ lbry/db/tables.py | 2 +- .../integration/blockchain/test_blockchain.py | 11 ++- 5 files changed, 93 insertions(+), 41 deletions(-) diff --git a/lbry/blockchain/sync/steps.py b/lbry/blockchain/sync/steps.py index 30b70b1f5..3bcafbeba 100644 --- a/lbry/blockchain/sync/steps.py +++ b/lbry/blockchain/sync/steps.py @@ -4,7 +4,7 @@ import functools from contextvars import ContextVar from typing import Set -from sqlalchemy import bindparam, case, distinct +from sqlalchemy import bindparam, case, distinct, text from lbry.db import queries from lbry.db.tables import Block as BlockTable @@ -94,7 +94,7 @@ def process_block_file(block_file_number: int, starting_height: int, initial_syn def process_metadata(starting_height: int, ending_height: int, initial_sync: bool): chain = get_or_initialize_lbrycrd() - process_inputs_outputs() + process_inputs_outputs(initial_sync) changes = None if not initial_sync: changes = ClaimChanges() @@ -146,14 +146,31 @@ def process_block_save(block_file_number: int, loader, p=None): @sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True) -def process_inputs_outputs(p=None): - p.start(2) +def process_inputs_outputs(initial_sync=False, p=None): + + step = 1 + if initial_sync and p.ctx.is_postgres: + p.start(4) + else: + p.start(2) + # 1. Update TXIs to have the address of TXO they are spending. set_input_addresses(p.ctx) - p.step(1) + p.step(step) + step += 1 + if initial_sync and p.ctx.is_postgres: + p.ctx.execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);")) + p.step(step) + step += 1 + # 2. Update spent TXOs setting is_spent = True update_spent_outputs(p.ctx) - p.step(2) + p.step(step) + step += 1 + if initial_sync and p.ctx.is_postgres: + p.ctx.execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) + p.step(step) + step += 1 @sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True) @@ -334,7 +351,7 @@ def process_claim_signatures(changes: ClaimChanges, p=None): changes.channels_with_changed_content.add(claim['channel_hash']) if claim['previous_channel_hash']: changes.channels_with_changed_content.add(claim['previous_channel_hash']) - if len(claim_updates) > 500: + if len(claim_updates) > 1000: p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) steps += len(claim_updates) p.step(steps) @@ -353,7 +370,7 @@ def process_support_signatures(changes: ClaimChanges, p=None): ) if changes is not None: changes.channels_with_changed_content.add(support['channel_hash']) - if len(support_updates) > 500: + if len(support_updates) > 1000: p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) p.step(len(support_updates)) support_updates.clear() diff --git a/lbry/db/queries.py b/lbry/db/queries.py index 344e0d92b..745b09145 100644 --- a/lbry/db/queries.py +++ b/lbry/db/queries.py @@ -49,12 +49,16 @@ def check_version_and_create_tables(): metadata.create_all(ctx.engine) ctx.execute(Version.insert().values(version=SCHEMA_VERSION)) if ctx.is_postgres: - ctx.execute(text("ALTER TABLE txi DISABLE TRIGGER ALL;")) - ctx.execute(text("ALTER TABLE txo DISABLE TRIGGER ALL;")) - ctx.execute(text("ALTER TABLE tx DISABLE TRIGGER ALL;")) - ctx.execute(text("ALTER TABLE claim DISABLE TRIGGER ALL;")) - ctx.execute(text("ALTER TABLE support DISABLE TRIGGER ALL;")) - ctx.execute(text("ALTER TABLE block DISABLE TRIGGER ALL;")) + disable_indexes_and_integrity_enforcement() + + +def disable_indexes_and_integrity_enforcement(): + with context('disable indexes and integrity enforcement (triggers, primary keys, etc)') as ctx: + for table in metadata.sorted_tables: + ctx.execute(text(f"ALTER TABLE {table.name} DISABLE TRIGGER ALL;")) + if table.name == 'tag': + continue + ctx.execute(text(f"ALTER TABLE {table.name} DROP CONSTRAINT {table.name}_pkey CASCADE;")) def insert_block(block): @@ -313,8 +317,8 @@ def select_txos( (TXI.c.address.notin_(my_addresses)) ) joins = TXO.join(TX) - if constraints.get('is_spent', None) is False: - s = s.where((TXO.c.is_spent == False) & (TXO.c.is_reserved == False)) + #if constraints.get('is_spent', None) is False: + # s = s.where((TXO.c.is_spent == False) & (TXO.c.is_reserved == False)) if include_is_my_input: joins = joins.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True) if claim_id_not_in_claim_table: diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 0c15cfc8f..3eff04856 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -1,14 +1,20 @@ import os import time +from io import BytesIO import multiprocessing as mp from enum import Enum from decimal import Decimal from typing import Dict, List, Optional, Tuple -from dataclasses import dataclass +from dataclasses import dataclass, field from contextvars import ContextVar from sqlalchemy import create_engine, inspect, bindparam from sqlalchemy.engine import Engine, Connection +from sqlalchemy.sql import Insert +try: + from pgcopy import CopyManager +except ImportError: + CopyManager = None from lbry.event import EventQueuePublisher from lbry.blockchain.ledger import Ledger @@ -47,6 +53,8 @@ class QueryContext: current_timer_time: float = 0 current_progress: Optional['ProgressContext'] = None + copy_managers: Dict[str, CopyManager] = field(default_factory=dict) + @property def is_postgres(self): return self.connection.dialect.name == 'postgresql' @@ -64,6 +72,16 @@ class QueryContext: def get_search_censor(self) -> Censor: return Censor(self.filtered_streams, self.filtered_channels) + def pg_copy(self, table, rows): + connection = self.connection.connection + copy_manager = self.copy_managers.get(table.name) + if copy_manager is None: + self.copy_managers[table.name] = copy_manager = CopyManager( + self.connection.connection, table.name, rows[0].keys() + ) + copy_manager.copy(map(dict.values, rows), BytesIO) + connection.commit() + def execute(self, sql, *args): return self.connection.execute(sql, *args) @@ -562,25 +580,33 @@ class BulkLoader: execute = self.ctx.connection.execute for sql, rows in queries: - for chunk_rows in chunk(rows, batch_size): - try: - execute(sql, chunk_rows) - except Exception: - for row in chunk_rows: - try: - execute(sql, [row]) - except Exception: - p.ctx.message_queue.put_nowait( - (Event.COMPLETE.value, os.getpid(), 1, 1) - ) - with open('badrow', 'a') as badrow: - badrow.write(repr(sql)) - badrow.write('\n') - badrow.write(repr(row)) - badrow.write('\n') - print(sql) - print(row) - raise + if not rows: + continue + if self.ctx.is_postgres and isinstance(sql, Insert): + self.ctx.pg_copy(sql.table, rows) if p: - done += int(len(chunk_rows)/row_scale) + done += int(len(rows) / row_scale) p.step(done) + else: + for chunk_rows in chunk(rows, batch_size): + try: + execute(sql, chunk_rows) + except Exception: + for row in chunk_rows: + try: + execute(sql, [row]) + except Exception: + p.ctx.message_queue.put_nowait( + (Event.COMPLETE.value, os.getpid(), 1, 1) + ) + with open('badrow', 'a') as badrow: + badrow.write(repr(sql)) + badrow.write('\n') + badrow.write(repr(row)) + badrow.write('\n') + print(sql) + print(row) + raise + if p: + done += int(len(chunk_rows)/row_scale) + p.step(done) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index dcd61314c..7d62f83ed 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -137,7 +137,7 @@ Claim = Table( Column('staked_support_amount', BigInteger, server_default='0'), # streams - Column('stream_type', Text, nullable=True), + Column('stream_type', SmallInteger, nullable=True), Column('media_type', Text, nullable=True), Column('fee_amount', BigInteger, server_default='0'), Column('fee_currency', Text, nullable=True), diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index a57668174..840104c76 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -42,6 +42,7 @@ class BasicBlockchainTestCase(AsyncioTestCase): if db_driver == 'sqlite': db = Database.temp_sqlite_regtest(chain.ledger.conf.lbrycrd_dir) elif db_driver.startswith('postgres') or db_driver.startswith('psycopg'): + db_driver = 'postgresql' db_name = f'lbry_test_chain' db_connection = 'postgres:postgres@localhost:5432' meta_db = Database.from_url(f'postgresql://{db_connection}/postgres') @@ -53,6 +54,7 @@ class BasicBlockchainTestCase(AsyncioTestCase): self.addCleanup(remove_tree, db.ledger.conf.data_dir) await db.open() self.addCleanup(db.close) + self.db_driver = db_driver return db @staticmethod @@ -454,9 +456,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): self.assertEqual(2, await db.get_support_metadata_count(0, 500)) self.assertEqual(0, await db.get_support_metadata_count(500, 1000)) - def foo(c): - return - # get_support_metadata self.assertEqual( [{'name': b'two', 'activation_height': 359, 'expiration_height': 852}, @@ -536,6 +535,12 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): # 3 - db.sync.input self.assertEventsAlmostEqual( self.extract_events('db.sync.input', events), [ + [0, 4], + [1, 4], + [2, 4], + [3, 4], + [4, 4], + ] if self.db_driver == 'postgresql' else [ [0, 2], [1, 2], [2, 2],