From 06d93e667abdf893c30f222dfe0f74d0fc0a9553 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Wed, 20 May 2020 17:54:38 -0400 Subject: [PATCH] event reporting from blockchain.sync --- lbry/blockchain/database.py | 26 +++--- lbry/blockchain/sync.py | 154 +++++++++++++++++++++++++++--------- 2 files changed, 131 insertions(+), 49 deletions(-) diff --git a/lbry/blockchain/database.py b/lbry/blockchain/database.py index 490b2b544..46421aa68 100644 --- a/lbry/blockchain/database.py +++ b/lbry/blockchain/database.py @@ -60,29 +60,33 @@ class BlockchainDB: async def execute_fetchall(self, sql: str, *args): return await self.run_in_executor(self.sync_execute_fetchall, sql, *args) - def sync_get_block_files(self): + def sync_get_block_files(self, above_height=-1): return self.sync_execute_fetchall( """ - SELECT file as file_number, COUNT(hash) as blocks, SUM(txcount) as txs - FROM block_info GROUP BY file ORDER BY file ASC; - """ + SELECT + file as file_number, + COUNT(hash) as blocks, + SUM(txcount) as txs, + MAX(height) as max_height + FROM block_info WHERE height > ? GROUP BY file ORDER BY file ASC; + """, (above_height,) ) - async def get_block_files(self): - return await self.run_in_executor(self.sync_get_block_files) + async def get_block_files(self, above_height=-1): + return await self.run_in_executor(self.sync_get_block_files, above_height) - def sync_get_file_details(self, block_file): + def sync_get_blocks_in_file(self, block_file, above_height=-1): return self.sync_execute_fetchall( """ SELECT datapos as data_offset, height, hash as block_hash, txCount as txs FROM block_info - WHERE file = ? and status&1 > 0 + WHERE file = ? AND height > ? AND status&1 > 0 ORDER BY datapos ASC; - """, (block_file,) + """, (block_file, above_height) ) - async def get_file_details(self, block_file): - return await self.run_in_executor(self.sync_get_file_details, block_file) + async def get_blocks_in_file(self, block_file, above_height=-1): + return await self.run_in_executor(self.sync_get_blocks_in_file, block_file, above_height) def sync_get_claimtrie(self): return self.sync_execute_fetchall( diff --git a/lbry/blockchain/sync.py b/lbry/blockchain/sync.py index e5f596363..8b6612b9a 100644 --- a/lbry/blockchain/sync.py +++ b/lbry/blockchain/sync.py @@ -9,9 +9,9 @@ from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor from sqlalchemy import func, bindparam from sqlalchemy.future import select -from lbry.event import EventController, BroadcastSubscription -from lbry.service.base import Service, Sync, BlockEvent -from lbry.db import queries, TXO_TYPES +from lbry.event import EventController, BroadcastSubscription, EventQueuePublisher +from lbry.service.base import Sync, BlockEvent +from lbry.db import Database, queries, TXO_TYPES from lbry.db.tables import Claim, Claimtrie, TX, TXO, TXI, Block as BlockTable from .lbrycrd import Lbrycrd @@ -21,48 +21,63 @@ from .ledger import Ledger log = logging.getLogger(__name__) -_context: ContextVar[Tuple[Lbrycrd, mp.Queue, mp.Event]] = ContextVar('ctx') +_context: ContextVar[Tuple[Lbrycrd, mp.Queue, mp.Event, int]] = ContextVar('ctx') def ctx(): return _context.get() -def initialize(url: str, ledger: Ledger, progress: mp.Queue, stop: mp.Event, track_metrics=False): +def initialize(url: str, ledger: Ledger, progress: mp.Queue, stop: mp.Event, track_metrics: bool): chain = Lbrycrd(ledger) chain.db.sync_open() - _context.set((chain, progress, stop)) + _context.set((chain, progress, stop, os.getpid())) queries.initialize(url=url, ledger=ledger, track_metrics=track_metrics) +PARSING = 1 +SAVING = 2 +PROCESSED = 3 +FINISHED = 4 + + def process_block_file(block_file_number): - chain, progress, stop = ctx() + chain, progress, stop, pid = ctx() block_file_path = chain.get_block_file_path_from_number(block_file_number) + current_height = queries.get_best_height() + new_blocks = chain.db.sync_get_blocks_in_file(block_file_number, current_height) + if not new_blocks: + return -1 num = 0 - progress.put_nowait((block_file_number, 1, num)) - best_height = queries.get_best_height() - best_block_processed = -1 + total = len(new_blocks) + progress.put_nowait((PARSING, pid, block_file_number, num, total)) collector = queries.RowCollector(queries.ctx()) + last_block_processed = -1 with open(block_file_path, 'rb') as fp: stream = BCDataStream(fp=fp) - for num, block_info in enumerate(chain.db.sync_get_file_details(block_file_number), start=1): + for num, block_info in enumerate(new_blocks, start=1): if stop.is_set(): - return - if num % 100 == 0: - progress.put_nowait((block_file_number, 1, num)) + return -1 + block_height = block_info['height'] fp.seek(block_info['data_offset']) - block = Block.from_data_stream(stream, block_info['height'], block_file_number) - if block.height <= best_height: - continue - best_block_processed = max(block.height, best_block_processed) + block = Block.from_data_stream(stream, block_height, block_file_number) collector.add_block(block) - collector.save(lambda remaining, total: progress.put((block_file_number, 2, remaining, total))) - return best_block_processed + last_block_processed = block_height + if num % 100 == 0: + progress.put_nowait((PARSING, pid, block_file_number, num, total)) + progress.put_nowait((PARSING, pid, block_file_number, num, total)) + collector.save( + lambda remaining, total: progress.put_nowait( + (SAVING, pid, block_file_number, remaining, total) + ) + ) + progress.put((PROCESSED, pid, block_file_number)) + return last_block_processed def process_claimtrie(): execute = queries.ctx().execute - chain, progress, stop = ctx() + chain, progress, stop, _ = ctx() execute(Claimtrie.delete()) for record in chain.db.sync_get_claimtrie(): @@ -133,39 +148,81 @@ def process_block_and_tx_filters(): # execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs) +class SyncMessageToEvent(EventQueuePublisher): + + def message_to_event(self, message): + if message[0] == PARSING: + event = "blockchain.sync.parsing" + elif message[0] == SAVING: + event = "blockchain.sync.saving" + elif message[0] == PROCESSED: + return { + "event": "blockchain.sync.processed", + "data": {"pid": message[1], "block_file": message[2]} + } + elif message[0] == FINISHED: + return { + 'event': 'blockchain.sync.finish', + 'data': {'finished_height': message[1]} + } + else: + raise ValueError("Unknown message type.") + return { + "event": event, + "data": { + "pid": message[1], + "block_file": message[2], + "step": message[3], + "total": message[4] + } + } + + class BlockchainSync(Sync): - def __init__(self, service: Service, chain: Lbrycrd, multiprocess=False): - super().__init__(service) + def __init__(self, chain: Lbrycrd, db: Database, processes=-1): + super().__init__(chain.ledger, db) self.chain = chain self.message_queue = mp.Queue() self.stop_event = mp.Event() self.on_block_subscription: Optional[BroadcastSubscription] = None self.advance_loop_task: Optional[asyncio.Task] = None self.advance_loop_event = asyncio.Event() - self.executor = self._create_executor(multiprocess) self._on_progress_controller = EventController() self.on_progress = self._on_progress_controller.stream + self.progress_publisher = SyncMessageToEvent( + self.message_queue, self._on_progress_controller + ) + self.track_metrics = False + self.processes = self._normalize_processes(processes) + self.executor = self._create_executor() - def _create_executor(self, multiprocess) -> Executor: + @staticmethod + def _normalize_processes(processes): + if processes == 0: + return os.cpu_count() + elif processes > 0: + return processes + return 1 + + def _create_executor(self) -> Executor: args = dict( initializer=initialize, initargs=( - self.service.db.url, self.chain.ledger, - self.message_queue, self.stop_event + self.db.url, self.chain.ledger, + self.message_queue, self.stop_event, + self.track_metrics ) ) - if multiprocess: - return ProcessPoolExecutor( - max_workers=max(os.cpu_count() - 1, 4), **args - ) + if self.processes > 1: + return ProcessPoolExecutor(max_workers=self.processes, **args) else: - return ThreadPoolExecutor( - max_workers=1, **args - ) + return ThreadPoolExecutor(max_workers=1, **args) async def start(self): - await self.advance() + self.progress_publisher.start() + self.advance_loop_task = asyncio.create_task(self.advance()) + await self.advance_loop_task self.chain.subscribe() self.advance_loop_task = asyncio.create_task(self.advance_loop()) self.on_block_subscription = self.chain.on_block.listen( @@ -178,14 +235,31 @@ class BlockchainSync(Sync): self.on_block_subscription.cancel() self.stop_event.set() self.advance_loop_task.cancel() + self.progress_publisher.stop() self.executor.shutdown() async def load_blocks(self): tasks = [] - for file in await self.chain.db.get_block_files(): + best_height = await self.db.get_best_height() + tx_count = block_count = ending_height = 0 + #for file in (await self.chain.db.get_block_files(best_height))[:1]: + for file in await self.chain.db.get_block_files(best_height): + tx_count += file['txs'] + block_count += file['blocks'] + ending_height = max(ending_height, file['max_height']) tasks.append(asyncio.get_running_loop().run_in_executor( self.executor, process_block_file, file['file_number'] )) + await self._on_progress_controller.add({ + 'event': 'blockchain.sync.start', + 'data': { + 'starting_height': best_height, + 'ending_height': ending_height, + 'files': len(tasks), + 'blocks': block_count, + 'txs': tx_count + } + }) done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_EXCEPTION ) @@ -193,7 +267,11 @@ class BlockchainSync(Sync): self.stop_event.set() for future in pending: future.cancel() - return max(f.result() for f in done) + best_height_processed = max(f.result() for f in done) + # putting event in queue instead of add to progress_controller because + # we want this message to appear after all of the queued messages from workers + self.message_queue.put((FINISHED, best_height_processed)) + return best_height_processed async def process_claims(self): await asyncio.get_event_loop().run_in_executor( @@ -212,7 +290,7 @@ class BlockchainSync(Sync): async def post_process(self): await self.process_claims() - if self.service.conf.spv_address_filters: + if self.conf.spv_address_filters: await self.process_block_and_tx_filters() await self.process_claimtrie()