pgcopy COPY command

This commit is contained in:
Lex Berezhny 2020-06-30 17:32:51 -04:00
parent 9ccf00f56b
commit 39d8a20fd5
5 changed files with 93 additions and 41 deletions

View file

@ -4,7 +4,7 @@ import functools
from contextvars import ContextVar
from typing import Set
from sqlalchemy import bindparam, case, distinct
from sqlalchemy import bindparam, case, distinct, text
from lbry.db import queries
from lbry.db.tables import Block as BlockTable
@ -94,7 +94,7 @@ def process_block_file(block_file_number: int, starting_height: int, initial_syn
def process_metadata(starting_height: int, ending_height: int, initial_sync: bool):
chain = get_or_initialize_lbrycrd()
process_inputs_outputs()
process_inputs_outputs(initial_sync)
changes = None
if not initial_sync:
changes = ClaimChanges()
@ -146,14 +146,31 @@ def process_block_save(block_file_number: int, loader, p=None):
@sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True)
def process_inputs_outputs(p=None):
p.start(2)
def process_inputs_outputs(initial_sync=False, p=None):
step = 1
if initial_sync and p.ctx.is_postgres:
p.start(4)
else:
p.start(2)
# 1. Update TXIs to have the address of TXO they are spending.
set_input_addresses(p.ctx)
p.step(1)
p.step(step)
step += 1
if initial_sync and p.ctx.is_postgres:
p.ctx.execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);"))
p.step(step)
step += 1
# 2. Update spent TXOs setting is_spent = True
update_spent_outputs(p.ctx)
p.step(2)
p.step(step)
step += 1
if initial_sync and p.ctx.is_postgres:
p.ctx.execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);"))
p.step(step)
step += 1
@sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True)
@ -334,7 +351,7 @@ def process_claim_signatures(changes: ClaimChanges, p=None):
changes.channels_with_changed_content.add(claim['channel_hash'])
if claim['previous_channel_hash']:
changes.channels_with_changed_content.add(claim['previous_channel_hash'])
if len(claim_updates) > 500:
if len(claim_updates) > 1000:
p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates)
steps += len(claim_updates)
p.step(steps)
@ -353,7 +370,7 @@ def process_support_signatures(changes: ClaimChanges, p=None):
)
if changes is not None:
changes.channels_with_changed_content.add(support['channel_hash'])
if len(support_updates) > 500:
if len(support_updates) > 1000:
p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates)
p.step(len(support_updates))
support_updates.clear()

View file

@ -49,12 +49,16 @@ def check_version_and_create_tables():
metadata.create_all(ctx.engine)
ctx.execute(Version.insert().values(version=SCHEMA_VERSION))
if ctx.is_postgres:
ctx.execute(text("ALTER TABLE txi DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE txo DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE tx DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE claim DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE support DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE block DISABLE TRIGGER ALL;"))
disable_indexes_and_integrity_enforcement()
def disable_indexes_and_integrity_enforcement():
with context('disable indexes and integrity enforcement (triggers, primary keys, etc)') as ctx:
for table in metadata.sorted_tables:
ctx.execute(text(f"ALTER TABLE {table.name} DISABLE TRIGGER ALL;"))
if table.name == 'tag':
continue
ctx.execute(text(f"ALTER TABLE {table.name} DROP CONSTRAINT {table.name}_pkey CASCADE;"))
def insert_block(block):
@ -313,8 +317,8 @@ def select_txos(
(TXI.c.address.notin_(my_addresses))
)
joins = TXO.join(TX)
if constraints.get('is_spent', None) is False:
s = s.where((TXO.c.is_spent == False) & (TXO.c.is_reserved == False))
#if constraints.get('is_spent', None) is False:
# s = s.where((TXO.c.is_spent == False) & (TXO.c.is_reserved == False))
if include_is_my_input:
joins = joins.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True)
if claim_id_not_in_claim_table:

View file

@ -1,14 +1,20 @@
import os
import time
from io import BytesIO
import multiprocessing as mp
from enum import Enum
from decimal import Decimal
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from dataclasses import dataclass, field
from contextvars import ContextVar
from sqlalchemy import create_engine, inspect, bindparam
from sqlalchemy.engine import Engine, Connection
from sqlalchemy.sql import Insert
try:
from pgcopy import CopyManager
except ImportError:
CopyManager = None
from lbry.event import EventQueuePublisher
from lbry.blockchain.ledger import Ledger
@ -47,6 +53,8 @@ class QueryContext:
current_timer_time: float = 0
current_progress: Optional['ProgressContext'] = None
copy_managers: Dict[str, CopyManager] = field(default_factory=dict)
@property
def is_postgres(self):
return self.connection.dialect.name == 'postgresql'
@ -64,6 +72,16 @@ class QueryContext:
def get_search_censor(self) -> Censor:
return Censor(self.filtered_streams, self.filtered_channels)
def pg_copy(self, table, rows):
connection = self.connection.connection
copy_manager = self.copy_managers.get(table.name)
if copy_manager is None:
self.copy_managers[table.name] = copy_manager = CopyManager(
self.connection.connection, table.name, rows[0].keys()
)
copy_manager.copy(map(dict.values, rows), BytesIO)
connection.commit()
def execute(self, sql, *args):
return self.connection.execute(sql, *args)
@ -562,25 +580,33 @@ class BulkLoader:
execute = self.ctx.connection.execute
for sql, rows in queries:
for chunk_rows in chunk(rows, batch_size):
try:
execute(sql, chunk_rows)
except Exception:
for row in chunk_rows:
try:
execute(sql, [row])
except Exception:
p.ctx.message_queue.put_nowait(
(Event.COMPLETE.value, os.getpid(), 1, 1)
)
with open('badrow', 'a') as badrow:
badrow.write(repr(sql))
badrow.write('\n')
badrow.write(repr(row))
badrow.write('\n')
print(sql)
print(row)
raise
if not rows:
continue
if self.ctx.is_postgres and isinstance(sql, Insert):
self.ctx.pg_copy(sql.table, rows)
if p:
done += int(len(chunk_rows)/row_scale)
done += int(len(rows) / row_scale)
p.step(done)
else:
for chunk_rows in chunk(rows, batch_size):
try:
execute(sql, chunk_rows)
except Exception:
for row in chunk_rows:
try:
execute(sql, [row])
except Exception:
p.ctx.message_queue.put_nowait(
(Event.COMPLETE.value, os.getpid(), 1, 1)
)
with open('badrow', 'a') as badrow:
badrow.write(repr(sql))
badrow.write('\n')
badrow.write(repr(row))
badrow.write('\n')
print(sql)
print(row)
raise
if p:
done += int(len(chunk_rows)/row_scale)
p.step(done)

View file

@ -137,7 +137,7 @@ Claim = Table(
Column('staked_support_amount', BigInteger, server_default='0'),
# streams
Column('stream_type', Text, nullable=True),
Column('stream_type', SmallInteger, nullable=True),
Column('media_type', Text, nullable=True),
Column('fee_amount', BigInteger, server_default='0'),
Column('fee_currency', Text, nullable=True),

View file

@ -42,6 +42,7 @@ class BasicBlockchainTestCase(AsyncioTestCase):
if db_driver == 'sqlite':
db = Database.temp_sqlite_regtest(chain.ledger.conf.lbrycrd_dir)
elif db_driver.startswith('postgres') or db_driver.startswith('psycopg'):
db_driver = 'postgresql'
db_name = f'lbry_test_chain'
db_connection = 'postgres:postgres@localhost:5432'
meta_db = Database.from_url(f'postgresql://{db_connection}/postgres')
@ -53,6 +54,7 @@ class BasicBlockchainTestCase(AsyncioTestCase):
self.addCleanup(remove_tree, db.ledger.conf.data_dir)
await db.open()
self.addCleanup(db.close)
self.db_driver = db_driver
return db
@staticmethod
@ -454,9 +456,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
self.assertEqual(2, await db.get_support_metadata_count(0, 500))
self.assertEqual(0, await db.get_support_metadata_count(500, 1000))
def foo(c):
return
# get_support_metadata
self.assertEqual(
[{'name': b'two', 'activation_height': 359, 'expiration_height': 852},
@ -536,6 +535,12 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
# 3 - db.sync.input
self.assertEventsAlmostEqual(
self.extract_events('db.sync.input', events), [
[0, 4],
[1, 4],
[2, 4],
[3, 4],
[4, 4],
] if self.db_driver == 'postgresql' else [
[0, 2],
[1, 2],
[2, 2],