From fd2753b95af1e288e11b59f9716711d7c63b29b3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 8 Jul 2021 16:08:33 -0400 Subject: [PATCH] cleanup --- lbry/wallet/server/block_processor.py | 104 ++++---------------------- lbry/wallet/server/leveldb.py | 1 - lbry/wallet/server/session.py | 18 ++--- 3 files changed, 23 insertions(+), 100 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 9c9734774..e44f2bd0b 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -224,9 +224,9 @@ class BlockProcessor: ################################# # txo to pending claim - self.txo_to_claim: typing.Dict[Tuple[int, int], StagedClaimtrieItem] = {} + self.txo_to_claim: Dict[Tuple[int, int], StagedClaimtrieItem] = {} # claim hash to pending claim txo - self.claim_hash_to_txo: typing.Dict[bytes, Tuple[int, int]] = {} + self.claim_hash_to_txo: Dict[bytes, Tuple[int, int]] = {} # claim hash to lists of pending support txos self.support_txos_by_claim: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) # support txo: (supported claim hash, support amount) @@ -270,7 +270,10 @@ class BlockProcessor: to_send_es = set(self.touched_claims_to_send_es) to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es)) - to_send_es.update({k for k, v in self.pending_channel_counts.items() if v != 0}.difference(self.removed_claims_to_send_es)) + to_send_es.update( + {k for k, v in self.pending_channel_counts.items() if v != 0}.difference( + self.removed_claims_to_send_es) + ) for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() @@ -372,7 +375,6 @@ class BlockProcessor: return await self.daemon.raw_blocks(hex_hashes) try: - await self.flush(True) start, last, hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] @@ -399,15 +401,7 @@ class BlockProcessor: The hashes are returned in order of increasing height. Start is the height of the first hash, last of the last. """ - start, count = await self.calc_reorg_range(count) - last = start + count - 1 - s = '' if count == 1 else 's' - self.logger.info(f'chain was reorganised replacing {count:,d} ' - f'block{s} at heights {start:,d}-{last:,d}') - return start, last, await self.db.fs_block_hashes(start, count) - - async def calc_reorg_range(self, count: Optional[int]): """Calculate the reorg range""" def diff_pos(hashes1, hashes2): @@ -436,8 +430,12 @@ class BlockProcessor: count = (self.height - start) + 1 else: start = (self.height - count) + 1 + last = start + count - 1 + s = '' if count == 1 else 's' + self.logger.info(f'chain was reorganised replacing {count:,d} ' + f'block{s} at heights {start:,d}-{last:,d}') - return start, count + return start, last, await self.db.fs_block_hashes(start, count) # - Flushing def flush_data(self): @@ -447,20 +445,11 @@ class BlockProcessor: self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip, self.undo_claims) - async def flush(self, flush_utxos): + async def flush(self): def flush(): self.db.flush_dbs(self.flush_data()) await self.run_in_thread_with_lock(flush) - async def _maybe_flush(self): - # If caught up, flush everything as client queries are - # performed on the DB. - if self._caught_up_event.is_set(): - await self.flush(True) - elif time.perf_counter() > self.next_cache_check: - await self.flush(True) - self.next_cache_check = time.perf_counter() + 30 - def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]): try: @@ -1494,14 +1483,12 @@ class BlockProcessor: # Flush everything but with first_sync->False state. first_sync = self.db.first_sync self.db.first_sync = False - await self.flush(True) + await self.flush() if first_sync: self.logger.info(f'{lbry.__version__} synced to ' f'height {self.height:,d}, halting here.') self.shutdown_event.set() - # --- External API - async def fetch_and_process_blocks(self, caught_up_event): """Fetch, process and index blocks from the daemon. @@ -1536,69 +1523,6 @@ class BlockProcessor: finally: self.status_server.stop() # Shut down block processing - self.logger.info('flushing to DB for a clean shutdown...') - await self.flush(True) + self.logger.info('closing the DB for a clean shutdown...') self.db.close() self.executor.shutdown(wait=True) - - def force_chain_reorg(self, count): - """Force a reorg of the given number of blocks. - - Returns True if a reorg is queued, false if not caught up. - """ - if self._caught_up_event.is_set(): - self.reorg_count = count - self.blocks_event.set() - return True - return False - - -class Timer: - def __init__(self, name): - self.name = name - self.total = 0 - self.count = 0 - self.sub_timers = {} - self._last_start = None - - def add_timer(self, name): - if name not in self.sub_timers: - self.sub_timers[name] = Timer(name) - return self.sub_timers[name] - - def run(self, func, *args, forward_timer=False, timer_name=None, **kwargs): - t = self.add_timer(timer_name or func.__name__) - t.start() - try: - if forward_timer: - return func(*args, **kwargs, timer=t) - else: - return func(*args, **kwargs) - finally: - t.stop() - - def start(self): - self._last_start = time.time() - return self - - def stop(self): - self.total += (time.time() - self._last_start) - self.count += 1 - self._last_start = None - return self - - def show(self, depth=0, height=None): - if depth == 0: - print('='*100) - if height is not None: - print(f'STATISTICS AT HEIGHT {height}') - print('='*100) - else: - print( - f"{' '*depth} {self.total/60:4.2f}mins {self.name}" - # f"{self.total/self.count:.5f}sec/call, " - ) - for sub_timer in self.sub_timers.values(): - sub_timer.show(depth+1) - if depth == 0: - print('='*100) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 2b72f9065..ed9d3fd48 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -995,7 +995,6 @@ class LevelDB: batch_delete(op.key) flush_data.undo.clear() - flush_data.db_op_stack.clear() while self.fs_height > flush_data.height: self.fs_height -= 1 diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index b40173933..88d07e513 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -542,15 +542,15 @@ class SessionManager: """Return statistics about connected sessions.""" return self._session_data(for_log=False) - async def rpc_reorg(self, count): - """Force a reorg of the given number of blocks. - - count: number of blocks to reorg - """ - count = non_negative_integer(count) - if not self.bp.force_chain_reorg(count): - raise RPCError(BAD_REQUEST, 'still catching up with daemon') - return f'scheduled a reorg of {count:,d} blocks' + # async def rpc_reorg(self, count): + # """Force a reorg of the given number of blocks. + # + # count: number of blocks to reorg + # """ + # count = non_negative_integer(count) + # if not self.bp.force_chain_reorg(count): + # raise RPCError(BAD_REQUEST, 'still catching up with daemon') + # return f'scheduled a reorg of {count:,d} blocks' # --- External Interface