diff --git a/lbry/scripts/fast_sync.py b/lbry/scripts/fast_sync.py deleted file mode 100644 index 19f97b843..000000000 --- a/lbry/scripts/fast_sync.py +++ /dev/null @@ -1,276 +0,0 @@ -import os -import time -from glob import glob -import sqlite3 -import asyncio -import struct -from contextvars import ContextVar -from concurrent.futures import ProcessPoolExecutor -from lbry.wallet.bcd_data_stream import BCDataStream -from lbry.crypto.hash import double_sha256 -from lbry.wallet.transaction import Transaction -from binascii import hexlify - - -db = ContextVar('db') - - -def initializer(_path): - _db = sqlite3.connect(_path, isolation_level=None, uri=True, timeout=60.0*5) - _db.row_factory = sqlite3.Row - _db.executescript(""" - pragma journal_mode=wal; - """) - db.set(_db) - - -def read_block(block: bytes): - reader = BCDataStream() - reader.data = block - - -ZERO_BLOCK = bytes((0,)*32) - - -def parse_header(header): - version, = struct.unpack(' 1 - ) - SELECT block_hash FROM blocks; - """) - for i, fork in enumerate(forks): - sql.execute('DELETE FROM block WHERE block_hash = ?', (fork[0],)) - print(f' - block (fork:{i+1}): {hexlify(fork[0][::-1]).decode()}') - deleted_stats = {} - for table in ('tx', 'txi', 'claim_history', 'support'): - deleted = sql.execute(f"DELETE FROM {table} WHERE block_hash = ?", (fork[0],)).rowcount - if deleted > 0: - deleted_stats[table] = deleted - print(f' - {deleted_stats}') - - sql.execute('commit;') - sql.execute('begin;') - - print(' + adding unique tx (tx_hash, block_hash) index') - sql.execute("create unique index tx_hash_idx on tx (tx_hash, block_hash)") - print(' + adding unique txi (txo_hash) index') - sql.execute("create unique index txi_txo_hash_idx on txi (txo_hash)") - - print('processing claim history & populating claim table') - - print(' * setting claim_history.height and claim_history.is_spent') - sql.execute(""" - UPDATE claim_history SET - height = (SELECT height FROM tx JOIN block USING (block_hash) WHERE tx.tx_hash=claim_history.tx_hash), - is_spent = COALESCE((SELECT 1 FROM txi WHERE txo_hash=claim_history.txo_hash), 0) - """) - - print(' + adding claim_history (claim_hash) index') - sql.execute("create index claim_history_hash_idx on claim_history (claim_hash)") - - print(' * populating claim table') - sql.execute(""" - INSERT INTO claim - SELECT txo_hash, claim_history.claim_hash, claim_name, amount, height FROM ( - SELECT claim_hash, is_spent FROM claim_history - GROUP BY claim_hash HAVING MAX(height) AND MAX(tx_position) - ) AS latest_claim_state JOIN claim_history USING (claim_hash) - WHERE latest_claim_state.is_spent=0; - """) - - sql.execute('commit;') - - -async def main(): - db_file = '/tmp/fast_sync.db' - if os.path.exists(db_file): - os.remove(db_file) - initializer(db_file) - create_db() - executor = ProcessPoolExecutor( - 4, initializer=initializer, initargs=(db_file,) - ) - file_paths = glob(os.path.join(os.path.expanduser('~/.lbrycrd/blocks/'), 'blk*.dat')) - file_paths.sort() - total_blocks, total_txs = 0, 0 - start = time.perf_counter() - for file_path, (blocks, txs) in zip(file_paths, executor.map(process_file, file_paths)): - print(f"{file_path} {blocks}") - total_blocks += blocks - total_txs += txs - print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s') - print('cleaning chain: set block heights and delete forks') - #clean_chain() - print(f'done in {time.perf_counter()-start}s') - executor.shutdown(True) - -if __name__ == '__main__': - asyncio.run(main()) diff --git a/scripts/fast_sync.py b/scripts/fast_sync.py index c98494824..19f97b843 100644 --- a/scripts/fast_sync.py +++ b/scripts/fast_sync.py @@ -1,19 +1,276 @@ -import os.path +import os +import time +from glob import glob +import sqlite3 import asyncio -from lbry.blockchain import Lbrycrd -from lbry.blockchain.sync import BlockSync +import struct +from contextvars import ContextVar +from concurrent.futures import ProcessPoolExecutor +from lbry.wallet.bcd_data_stream import BCDataStream +from lbry.crypto.hash import double_sha256 +from lbry.wallet.transaction import Transaction +from binascii import hexlify + + +db = ContextVar('db') + + +def initializer(_path): + _db = sqlite3.connect(_path, isolation_level=None, uri=True, timeout=60.0*5) + _db.row_factory = sqlite3.Row + _db.executescript(""" + pragma journal_mode=wal; + """) + db.set(_db) + + +def read_block(block: bytes): + reader = BCDataStream() + reader.data = block + + +ZERO_BLOCK = bytes((0,)*32) + + +def parse_header(header): + version, = struct.unpack(' 1 + ) + SELECT block_hash FROM blocks; + """) + for i, fork in enumerate(forks): + sql.execute('DELETE FROM block WHERE block_hash = ?', (fork[0],)) + print(f' - block (fork:{i+1}): {hexlify(fork[0][::-1]).decode()}') + deleted_stats = {} + for table in ('tx', 'txi', 'claim_history', 'support'): + deleted = sql.execute(f"DELETE FROM {table} WHERE block_hash = ?", (fork[0],)).rowcount + if deleted > 0: + deleted_stats[table] = deleted + print(f' - {deleted_stats}') + + sql.execute('commit;') + sql.execute('begin;') + + print(' + adding unique tx (tx_hash, block_hash) index') + sql.execute("create unique index tx_hash_idx on tx (tx_hash, block_hash)") + print(' + adding unique txi (txo_hash) index') + sql.execute("create unique index txi_txo_hash_idx on txi (txo_hash)") + + print('processing claim history & populating claim table') + + print(' * setting claim_history.height and claim_history.is_spent') + sql.execute(""" + UPDATE claim_history SET + height = (SELECT height FROM tx JOIN block USING (block_hash) WHERE tx.tx_hash=claim_history.tx_hash), + is_spent = COALESCE((SELECT 1 FROM txi WHERE txo_hash=claim_history.txo_hash), 0) + """) + + print(' + adding claim_history (claim_hash) index') + sql.execute("create index claim_history_hash_idx on claim_history (claim_hash)") + + print(' * populating claim table') + sql.execute(""" + INSERT INTO claim + SELECT txo_hash, claim_history.claim_hash, claim_name, amount, height FROM ( + SELECT claim_hash, is_spent FROM claim_history + GROUP BY claim_hash HAVING MAX(height) AND MAX(tx_position) + ) AS latest_claim_state JOIN claim_history USING (claim_hash) + WHERE latest_claim_state.is_spent=0; + """) + + sql.execute('commit;') async def main(): - chain = Lbrycrd(os.path.expanduser('~/.lbrycrd'), False) - sync = BlockSync(chain, use_process_pool=True) - if os.path.exists(sync.db.sync_db.db_file_path): - os.remove(sync.db.sync_db.db_file_path) - await sync.db.open() - await sync.load_blocks() - #await chain.stop(False) + db_file = '/tmp/fast_sync.db' + if os.path.exists(db_file): + os.remove(db_file) + initializer(db_file) + create_db() + executor = ProcessPoolExecutor( + 4, initializer=initializer, initargs=(db_file,) + ) + file_paths = glob(os.path.join(os.path.expanduser('~/.lbrycrd/blocks/'), 'blk*.dat')) + file_paths.sort() + total_blocks, total_txs = 0, 0 + start = time.perf_counter() + for file_path, (blocks, txs) in zip(file_paths, executor.map(process_file, file_paths)): + print(f"{file_path} {blocks}") + total_blocks += blocks + total_txs += txs + print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s') + print('cleaning chain: set block heights and delete forks') + #clean_chain() + print(f'done in {time.perf_counter()-start}s') + executor.shutdown(True) -try: +if __name__ == '__main__': asyncio.run(main()) -except KeyboardInterrupt: - print('exiting')