From bad4320ddfd5cae7323ebeeb0f290e4dd957a5cb Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 26 Jul 2019 00:44:27 -0400 Subject: [PATCH] initial import of fast sync --- lbry/scripts/fast_sync.py | 273 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 lbry/scripts/fast_sync.py diff --git a/lbry/scripts/fast_sync.py b/lbry/scripts/fast_sync.py new file mode 100644 index 000000000..b6e7c2153 --- /dev/null +++ b/lbry/scripts/fast_sync.py @@ -0,0 +1,273 @@ +import os +import time +from glob import glob +import sqlite3 +import asyncio +import struct +from contextvars import ContextVar +from concurrent.futures import ProcessPoolExecutor +from torba.client.bcd_data_stream import BCDataStream +from torba.client.hash import double_sha256 +from lbry.wallet.transaction import Transaction +from binascii import hexlify + + +db = ContextVar('db') + + +def initializer(_path): + _db = sqlite3.connect(_path, isolation_level=None, uri=True, timeout=60.0*5) + _db.row_factory = sqlite3.Row + _db.executescript(""" + pragma journal_mode=wal; + """) + db.set(_db) + + +def read_block(block: bytes): + reader = BCDataStream() + reader.data = block + + +ZERO_BLOCK = bytes((0,)*32) + + +def parse_header(header): + version, = struct.unpack(' 1 + ) + SELECT block_hash FROM blocks; + """) + for i, fork in enumerate(forks): + sql.execute('DELETE FROM block WHERE block_hash = ?', (fork[0],)) + print(f' - block (fork:{i+1}): {hexlify(fork[0][::-1]).decode()}') + deleted_stats = {} + for table in ('tx', 'txi', 'claim_history', 'support'): + deleted = sql.execute(f"DELETE FROM {table} WHERE block_hash = ?", (fork[0],)).rowcount + if deleted > 0: + deleted_stats[table] = deleted + print(f' - {deleted_stats}') + + sql.execute('commit;') + sql.execute('begin;') + + print(' + adding unique tx (tx_hash, block_hash) index') + sql.execute("create unique index tx_hash_idx on tx (tx_hash, block_hash)") + print(' + adding unique txi (txo_hash) index') + sql.execute("create unique index txi_txo_hash_idx on txi (txo_hash)") + + print('processing claim history & populating claim table') + + print(' * setting claim_history.height and claim_history.is_spent') + sql.execute(""" + UPDATE claim_history SET + height = (SELECT height FROM tx JOIN block USING (block_hash) WHERE tx.tx_hash=claim_history.tx_hash), + is_spent = COALESCE((SELECT 1 FROM txi WHERE txo_hash=claim_history.txo_hash), 0) + """) + + print(' + adding claim_history (claim_hash) index') + sql.execute("create index claim_history_hash_idx on claim_history (claim_hash)") + + print(' * populating claim table') + sql.execute(""" + INSERT INTO claim + SELECT txo_hash, claim_history.claim_hash, claim_name, amount, height FROM ( + SELECT claim_hash, is_spent FROM claim_history + GROUP BY claim_hash HAVING MAX(height) AND MAX(tx_position) + ) AS latest_claim_state JOIN claim_history USING (claim_hash) + WHERE latest_claim_state.is_spent=0; + """) + + sql.execute('commit;') + + +async def main(): + db_file = '/tmp/fast_sync.db' + if os.path.exists(db_file): + os.remove(db_file) + initializer(db_file) + create_db() + executor = ProcessPoolExecutor( + 4, initializer=initializer, initargs=(db_file,) + ) + file_paths = glob(os.path.join(os.path.expanduser('~/.lbrycrd/blocks/'), 'blk*.dat')) + file_paths.sort() + total_blocks, total_txs = 0, 0 + start = time.perf_counter() + for file_path, (blocks, txs) in zip(file_paths, executor.map(process_file, file_paths)): + print(f"{file_path} {blocks}") + total_blocks += blocks + total_txs += txs + print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s') + print('cleaning chain: set block heights and delete forks') + clean_chain() + print(f'done in {time.perf_counter()-start}s') + executor.shutdown(True) + +if __name__ == '__main__': + asyncio.run(main())