From fd2f9846e923b0b25acf818107c464493b2a1b39 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Thu, 27 Feb 2020 23:52:18 -0500 Subject: [PATCH] parsing and inserting works --- lbry/blockchain/block.py | 11 +- lbry/blockchain/db.py | 122 ++++++++++++++------- lbry/blockchain/lbrycrd.py | 65 ++++++++--- lbry/blockchain/sync.py | 213 +++++++++++++++++++++++-------------- lbry/blockchain/worker.py | 101 ++++++++++++++++++ lbry/scripts/fast_sync.py | 11 +- scripts/fast_sync.py | 19 ++++ 7 files changed, 393 insertions(+), 149 deletions(-) create mode 100644 lbry/blockchain/worker.py create mode 100644 scripts/fast_sync.py diff --git a/lbry/blockchain/block.py b/lbry/blockchain/block.py index 150cb6e14..c59755b32 100644 --- a/lbry/blockchain/block.py +++ b/lbry/blockchain/block.py @@ -15,8 +15,7 @@ class Block: 'bits', 'nonce', 'txs' ) - def __init__(self, stream): - stream.read_uint32() # block size + def __init__(self, stream: BCDataStream): header = stream.data.read(112) version, = struct.unpack(' 'AsyncBlockchainDB': return cls(BlockchainDB(path)) - @staticmethod - async def run_in_executor(func, *args): + def get_block_file_path_from_number(self, block_file_number): + return self.sync_db.get_block_file_path_from_number(block_file_number) + + async def run_in_executor(self, func, *args): return await asyncio.get_running_loop().run_in_executor( - None, func, *args + self.executor, func, *args ) async def open(self): - return await self.run_in_executor(self.db.open) + return await self.run_in_executor(self.sync_db.open) async def close(self): - return await self.run_in_executor(self.db.close) + return await self.run_in_executor(self.sync_db.close) - async def get_blocks(self): - return await self.run_in_executor(self.db.get_blocks) + async def get_block_files_not_synced(self): + return await self.run_in_executor(self.sync_db.get_block_files_not_synced) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 5f5d94428..24b5ecf2b 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -56,8 +56,12 @@ class Process(asyncio.SubprocessProtocol): class Lbrycrd: - def __init__(self, path=None): - self.data_path = path + def __init__(self, path, regtest=False): + self.data_dir = self.actual_data_dir = path + self.regtest = regtest + if regtest: + self.actual_data_dir = os.path.join(self.data_dir, 'regtest') + self.blocks_dir = os.path.join(self.actual_data_dir, 'blocks') self.bin_dir = os.path.join(os.path.dirname(__file__), 'bin') self.daemon_bin = os.path.join(self.bin_dir, 'lbrycrdd') self.cli_bin = os.path.join(self.bin_dir, 'lbrycrd-cli') @@ -71,13 +75,15 @@ class Lbrycrd: self.session: Optional[aiohttp.ClientSession] = None self.subscribed = False self.subscription: Optional[asyncio.Task] = None + self.subscription_url = 'tcp://127.0.0.1:29000' + self.default_generate_address = None self._on_block_controller = StreamController() self.on_block = self._on_block_controller.stream self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg'])) @classmethod - def regtest(cls): - return cls(tempfile.mkdtemp()) + def temp_regtest(cls): + return cls(tempfile.mkdtemp(), True) @property def rpc_url(self): @@ -125,18 +131,26 @@ class Lbrycrd: async def ensure(self): return self.exists or await self.download() + def get_start_command(self, *args): + if self.regtest: + args += '-regtest', + return ( + self.daemon_bin, + f'-datadir={self.data_dir}', + f'-port={self.peerport}', + f'-rpcport={self.rpcport}', + f'-rpcuser={self.rpcuser}', + f'-rpcpassword={self.rpcpassword}', + f'-zmqpubhashblock={self.subscription_url}', + '-server', '-printtoconsole', + *args + ) + async def start(self, *args): loop = asyncio.get_event_loop() - command = [ - self.daemon_bin, - f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server', - f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}', - f'-port={self.peerport}', '-zmqpubhashblock=tcp://127.0.0.1:29000', *args - ] + command = self.get_start_command(*args) log.info(' '.join(command)) - self.transport, self.protocol = await loop.subprocess_exec( - Process, *command - ) + self.transport, self.protocol = await loop.subprocess_exec(Process, *command) await self.protocol.ready.wait() assert not self.protocol.stopped.is_set() self.session = aiohttp.ClientSession() @@ -146,14 +160,14 @@ class Lbrycrd: await self.session.close() self.transport.terminate() await self.protocol.stopped.wait() - self.transport.close() + assert self.transport.get_returncode() == 0, "lbrycrd daemon exit with error" finally: if cleanup: await self.cleanup() async def cleanup(self): await asyncio.get_running_loop().run_in_executor( - None, shutil.rmtree, self.data_path, True + None, shutil.rmtree, self.data_dir, True ) def subscribe(self): @@ -161,7 +175,7 @@ class Lbrycrd: self.subscribed = True ctx = zmq.asyncio.Context.instance() sock = ctx.socket(zmq.SUB) - sock.connect("tcp://127.0.0.1:29000") + sock.connect(self.subscription_url) sock.subscribe("hashblock") self.subscription = asyncio.create_task(self.subscription_handler(sock)) @@ -202,7 +216,24 @@ class Lbrycrd: raise Exception(result['error']) async def generate(self, blocks): - return await self.rpc("generate", [blocks]) + if self.default_generate_address is None: + self.default_generate_address = await self.get_new_address() + return await self.generate_to_address(blocks, self.default_generate_address) + + async def get_new_address(self): + return await self.rpc("getnewaddress") + + async def generate_to_address(self, blocks, address): + return await self.rpc("generatetoaddress", [blocks, address]) + + async def fund_raw_transaction(self, tx): + return await self.rpc("fundrawtransaction", [tx]) + + async def sign_raw_transaction_with_wallet(self, tx): + return await self.rpc("signrawtransactionwithwallet", [tx]) + + async def send_raw_transaction(self, tx): + return await self.rpc("sendrawtransaction", [tx]) async def claim_name(self, name, data, amount): return await self.rpc("claimname", [name, data, amount]) diff --git a/lbry/blockchain/sync.py b/lbry/blockchain/sync.py index f1004b546..a6374c580 100644 --- a/lbry/blockchain/sync.py +++ b/lbry/blockchain/sync.py @@ -1,22 +1,100 @@ import os -import time +import asyncio import logging -from glob import glob -from concurrent.futures import ProcessPoolExecutor +import tqdm +from threading import Thread +from multiprocessing import Queue, Event +from concurrent import futures +from typing import Dict, Tuple + +from lbry.wallet.stream import StreamController from .lbrycrd import Lbrycrd -from .block import read_blocks from .db import AsyncBlockchainDB +from . import worker log = logging.getLogger(__name__) +class ProgressMonitorThread(Thread): + + STOP = 'stop' + FORMAT = '{l_bar}{bar}| {n_fmt:>6}/{total_fmt:>7} [{elapsed}<{remaining:>5}, {rate_fmt:>15}]' + + def __init__(self, state: dict, queue: Queue, stream_controller: StreamController): + super().__init__() + self.state = state + self.queue = queue + self.stream_controller = stream_controller + self.loop = asyncio.get_event_loop() + + def run(self): + asyncio.set_event_loop(self.loop) + block_bar = tqdm.tqdm( + desc='total parsing', total=sum(s['total_blocks'] for s in self.state.values()), + unit='blocks', bar_format=self.FORMAT + ) + tx_bar = tqdm.tqdm( + desc='total loading', total=sum(s['total_txs'] for s in self.state.values()), + unit='txs', bar_format=self.FORMAT + ) + bars: Dict[int, tqdm.tqdm] = {} + while True: + msg = self.queue.get() + if msg == self.STOP: + return + file_num, msg_type, done = msg + bar, state = bars.get(file_num, None), self.state[file_num] + if msg_type == 1: + if bar is None: + bar = bars[file_num] = tqdm.tqdm( + desc=f'├─ blk{file_num:05}.dat parsing', total=state['total_blocks'], + unit='blocks', bar_format=self.FORMAT + ) + change = done - state['done_blocks'] + state['done_blocks'] = done + bar.update(change) + block_bar.update(change) + if state['total_blocks'] == done: + bar.set_description('✔ '+bar.desc[3:]) + bar.close() + bars.pop(file_num) + elif msg_type == 2: + if bar is None: + bar = bars[file_num] = tqdm.tqdm( + desc=f'├─ blk{file_num:05}.dat loading', total=state['total_txs'], + unit='txs', bar_format=self.FORMAT + ) + change = done - state['done_txs'] + state['done_txs'] = done + bar.update(change) + tx_bar.update(change) + if state['total_txs'] == done: + bar.set_description('✔ '+bar.desc[3:]) + bar.close() + bars.pop(file_num) + self.stream_controller.add(msg) + + def shutdown(self): + self.queue.put(self.STOP) + self.join() + + def __enter__(self): + self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + + class BlockSync: - def __init__(self, chain: Lbrycrd): + def __init__(self, chain: Lbrycrd, use_process_pool=False): self.chain = chain - self.db = AsyncBlockchainDB.from_path(os.path.join(self.chain.data_path, 'regtest')) + self.use_process_pool = use_process_pool + self.db = AsyncBlockchainDB.from_path(self.chain.actual_data_dir) + self._on_progress_controller = StreamController() + self.on_progress = self._on_progress_controller.stream async def start(self): await self.db.open() @@ -24,86 +102,59 @@ class BlockSync: async def stop(self): await self.db.close() - async def cleanup(self): - pass + def get_worker_pool(self, queue, full_stop) -> futures.Executor: + args = dict( + initializer=worker.initializer, + initargs=(self.chain.actual_data_dir, queue, full_stop) + ) + if not self.use_process_pool: + return futures.ThreadPoolExecutor(max_workers=1, **args) + return futures.ProcessPoolExecutor(max_workers=max(os.cpu_count()-1, 4), **args) + def get_progress_monitor(self, state, queue) -> ProgressMonitorThread: + return ProgressMonitorThread(state, queue, self._on_progress_controller) -def process_file(block_file): - blocks, txs, claims, supports, spends = [], [], [], [], [] - for block in read_blocks(block_file): - for tx in block.txs: - txs.append((block.block_hash, tx.position, tx.hash)) - for txi in tx.inputs: - if not txi.is_coinbase: - spends.append((block.block_hash, tx.hash, txi.txo_ref.hash)) - for output in tx.outputs: - try: - if output.is_support: - supports.append(( - block.block_hash, tx.hash, output.ref.hash, output.claim_hash, output.amount - )) - elif output.script.is_claim_name: - claims.append(( - block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash, - output.claim_name, 1, output.amount, None, None - )) - elif output.script.is_update_claim: - claims.append(( - block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash, - output.claim_name, 2, output.amount, None, None - )) - except: - pass - blocks.append((block.block_hash, block.prev_block_hash, 0 if block.is_first_block else None)) + async def load_blocks(self): + jobs = [] + queue, full_stop = Queue(), Event() + executor = self.get_worker_pool(queue, full_stop) + files = list(await self.db.get_block_files_not_synced()) + state = { + file.file_number: { + 'status': worker.PENDING, + 'done_txs': 0, + 'total_txs': file.txs, + 'done_blocks': 0, + 'total_blocks': file.blocks, + } for file in files + } + progress = self.get_progress_monitor(state, queue) + progress.start() - sql = db.get() + def cancel_all_the_things(): + for job in jobs: + job.cancel() + full_stop.set() + for job in jobs: + exception = job.exception() + if exception is not None: + raise exception - sql.execute('begin;') - sql.executemany("insert into block values (?, ?, ?)", blocks) - sql.execute('commit;') + try: - sql.execute('begin;') - sql.executemany("insert into tx values (?, ?, ?)", txs) - sql.execute('commit;') + for file in files: + jobs.append(executor.submit(worker.process_block_file, file.file_number)) - sql.execute('begin;') - sql.executemany("insert into txi values (?, ?, ?)", spends) - sql.execute('commit;') + done, not_done = await asyncio.get_event_loop().run_in_executor( + None, futures.wait, jobs, None, futures.FIRST_EXCEPTION + ) + if not_done: + cancel_all_the_things() - sql.execute('begin;') - sql.executemany("insert into support values (?, ?, ?, ?, ?)", supports) - sql.execute('commit;') + except asyncio.CancelledError: + cancel_all_the_things() + raise - sql.execute('begin;') - sql.executemany("insert into claim_history values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", claims) - sql.execute('commit;') - - return len(blocks), len(txs) - - -async def main(): - - #lbrycrd = os.path.expanduser('~/.lbrycrd/') - - output_db = '/tmp/fast_sync.db' - if os.path.exists(output_db): - os.remove(output_db) - initializer(output_db) - create_db() - executor = ProcessPoolExecutor( - 6, initializer=initializer, initargs=(output_db,) - ) - file_paths = glob(os.path.join(lbrycrd, 'regtest', 'blocks', 'blk*.dat')) - file_paths.sort() - total_blocks, total_txs = 0, 0 - start = time.perf_counter() - for file_path, (blocks, txs) in zip(file_paths, executor.map(process_file, file_paths)): - print(f"{file_path} {blocks}") - total_blocks += blocks - total_txs += txs - print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s') - print('cleaning chain: set block heights and delete forks') - #clean_chain() - print(f'done in {time.perf_counter()-start}s') - - await blockchain.stop() + finally: + progress.shutdown() + executor.shutdown() diff --git a/lbry/blockchain/worker.py b/lbry/blockchain/worker.py new file mode 100644 index 000000000..a0cf48c41 --- /dev/null +++ b/lbry/blockchain/worker.py @@ -0,0 +1,101 @@ +from typing import Optional +from contextvars import ContextVar +from multiprocessing import Queue, Event +from dataclasses import dataclass +from itertools import islice + +from lbry.wallet.bcd_data_stream import BCDataStream +from .db import BlockchainDB +from .block import Block + + +PENDING = 'pending' +RUNNING = 'running' +STOPPED = 'stopped' + + +def chunk(rows, step): + it, total = iter(rows), len(rows) + for _ in range(0, total, step): + yield min(step, total), islice(it, step) + total -= step + + +@dataclass +class WorkerContext: + db: BlockchainDB + progress: Queue + stop: Event + + +context: ContextVar[Optional[WorkerContext]] = ContextVar('context') + + +def initializer(data_dir: str, progress: Queue, stop: Event): + context.set(WorkerContext( + db=BlockchainDB(data_dir).open(), + progress=progress, + stop=stop + )) + + +def process_block_file(block_file_number): + ctx: WorkerContext = context.get() + db, progress, stop = ctx.db, ctx.progress, ctx.stop + block_file_path = db.get_block_file_path_from_number(block_file_number) + num = 0 + progress.put_nowait((block_file_number, 1, num)) + with open(block_file_path, 'rb') as fp: + stream = BCDataStream(fp=fp) + blocks, txs, claims, supports, spends = [], [], [], [], [] + for num, block_info in enumerate(db.get_blocks_not_synced(block_file_number), start=1): + if ctx.stop.is_set(): + return + if num % 100 == 0: + progress.put_nowait((block_file_number, 1, num)) + fp.seek(block_info.data_offset) + block = Block(stream) + for tx in block.txs: + txs.append((block.block_hash, tx.position, tx.hash)) + for txi in tx.inputs: + if not txi.is_coinbase: + spends.append((block.block_hash, tx.hash, txi.txo_ref.hash)) + for output in tx.outputs: + try: + if output.is_support: + supports.append(( + block.block_hash, tx.hash, output.ref.hash, output.claim_hash, output.amount + )) + elif output.script.is_claim_name: + claims.append(( + block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash, + output.claim_name, 1, output.amount, None, None + )) + elif output.script.is_update_claim: + claims.append(( + block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash, + output.claim_name, 2, output.amount, None, None + )) + except: + pass + blocks.append((block.block_hash, block.prev_block_hash, block_file_number, 0 if block.is_first_block else None)) + + progress.put((block_file_number, 1, num)) + + queries = ( + ("insert into block values (?, ?, ?, ?)", blocks), + ("insert into tx values (?, ?, ?)", txs), + ("insert into txi values (?, ?, ?)", spends), + ("insert into support values (?, ?, ?, ?, ?)", supports), + ("insert into claim_history values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", claims), + ) + total_txs = len(txs) + done_txs = 0 + step = int(sum(len(q[1]) for q in queries)/total_txs) + progress.put((block_file_number, 2, done_txs)) + for sql, rows in queries: + for chunk_size, chunk_rows in chunk(rows, 10000): + db.execute_many_tx(sql, chunk_rows) + done_txs += int(chunk_size/step) + progress.put((block_file_number, 2, done_txs)) + progress.put((block_file_number, 2, total_txs)) diff --git a/lbry/scripts/fast_sync.py b/lbry/scripts/fast_sync.py index b6e7c2153..19f97b843 100644 --- a/lbry/scripts/fast_sync.py +++ b/lbry/scripts/fast_sync.py @@ -6,8 +6,8 @@ import asyncio import struct from contextvars import ContextVar from concurrent.futures import ProcessPoolExecutor -from torba.client.bcd_data_stream import BCDataStream -from torba.client.hash import double_sha256 +from lbry.wallet.bcd_data_stream import BCDataStream +from lbry.crypto.hash import double_sha256 from lbry.wallet.transaction import Transaction from binascii import hexlify @@ -49,10 +49,11 @@ def parse_header(header): def parse_txs(stream): tx_count = stream.read_compact_size() - return [Transaction.from_stream(i, stream) for i in range(tx_count)] + return [Transaction(position=i)._deserialize(stream) for i in range(tx_count)] def process_file(file_path): + print(f'started: {file_path}') sql = db.get() stream = BCDataStream() stream.data = open(file_path, 'rb') @@ -86,6 +87,8 @@ def process_file(file_path): pass blocks.append((header['block_hash'], header['prev_block_hash'], 0 if is_first_block else None)) + print(f'inserting sql in {file_path}') + sql.execute('begin;') sql.executemany("insert into block values (?, ?, ?)", blocks) sql.execute('commit;') @@ -265,7 +268,7 @@ async def main(): total_txs += txs print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s') print('cleaning chain: set block heights and delete forks') - clean_chain() + #clean_chain() print(f'done in {time.perf_counter()-start}s') executor.shutdown(True) diff --git a/scripts/fast_sync.py b/scripts/fast_sync.py new file mode 100644 index 000000000..c98494824 --- /dev/null +++ b/scripts/fast_sync.py @@ -0,0 +1,19 @@ +import os.path +import asyncio +from lbry.blockchain import Lbrycrd +from lbry.blockchain.sync import BlockSync + + +async def main(): + chain = Lbrycrd(os.path.expanduser('~/.lbrycrd'), False) + sync = BlockSync(chain, use_process_pool=True) + if os.path.exists(sync.db.sync_db.db_file_path): + os.remove(sync.db.sync_db.db_file_path) + await sync.db.open() + await sync.load_blocks() + #await chain.stop(False) + +try: + asyncio.run(main()) +except KeyboardInterrupt: + print('exiting')