From 5be04448ea97b8b5a664232e9fb9551ac2412922 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 6 Jun 2021 13:08:15 -0400 Subject: [PATCH] remove extra open functions --- lbry/wallet/server/block_processor.py | 14 +++-- lbry/wallet/server/db/elasticsearch/search.py | 9 ++-- lbry/wallet/server/leveldb.py | 53 +++---------------- tests/integration/other/test_chris45.py | 2 +- 4 files changed, 21 insertions(+), 57 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 99785bf51..def8457a1 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1576,13 +1576,7 @@ class BlockProcessor: self.logger.info(f'{lbry.__version__} synced to ' f'height {self.height:,d}') # Reopen for serving - await self.db.open_for_serving() - - async def _first_open_dbs(self): - await self.db.open_for_sync() - self.height = self.db.db_height - self.tip = self.db.db_tip - self.tx_count = self.db.db_tx_count + await self.db.open_dbs() # --- External API @@ -1601,7 +1595,11 @@ class BlockProcessor: self._caught_up_event = caught_up_event try: - await self._first_open_dbs() + await self.db.open_dbs() + self.height = self.db.db_height + self.tip = self.db.db_tip + self.tx_count = self.db.db_tx_count + self.status_server.set_height(self.db.fs_height, self.db.db_tip) await asyncio.wait([ self.prefetcher.main_loop(self.height), diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 9d10e378b..da7615f2b 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -111,8 +111,11 @@ class SearchIndex: yield extract_doc(doc, self.index) count += 1 if count % 100 == 0: - self.logger.info("Indexing in progress, %d claims.", count) - self.logger.info("Indexing done for %d claims.", count) + self.logger.debug("Indexing in progress, %d claims.", count) + if count: + self.logger.info("Indexing done for %d claims.", count) + else: + self.logger.debug("Indexing done for %d claims.", count) async def claim_consumer(self, claim_producer): touched = set() @@ -124,7 +127,7 @@ class SearchIndex: item = item.popitem()[1] touched.add(item['_id']) await self.sync_client.indices.refresh(self.index) - self.logger.info("Indexing done.") + self.logger.debug("Indexing done.") def update_filter_query(self, censor_type, blockdict, channels=False): blockdict = {key[::-1].hex(): value[::-1].hex() for key, value in blockdict.items()} diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index fc79c19f6..53b11c7a1 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -495,7 +495,9 @@ class LevelDB: assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" self.headers = headers - async def _open_dbs(self, for_sync, compacting): + async def open_dbs(self): + if self.db: + return if self.executor is None: self.executor = ThreadPoolExecutor(1) @@ -595,33 +597,6 @@ class LevelDB: self.executor.shutdown(wait=True) self.executor = None - async def open_for_compacting(self): - await self._open_dbs(True, True) - - async def open_for_sync(self): - """Open the databases to sync to the daemon. - - When syncing we want to reserve a lot of open files for the - synchronization. When serving clients we want the open files for - serving network connections. - """ - self.logger.info("opened for sync") - await self._open_dbs(True, False) - - async def open_for_serving(self): - """Open the databases for serving. If they are already open they are - closed first. - """ - if self.db: - return - # self.logger.info('closing DBs to re-open for serving') - # self.db.close() - # self.history.close_db() - # self.db = None - - await self._open_dbs(False, False) - self.logger.info("opened for serving") - # Header merkle cache async def populate_header_merkle_cache(self): @@ -656,7 +631,7 @@ class LevelDB: self.assert_flushed(flush_data) return - start_time = time.time() + # start_time = time.time() prior_flush = self.last_flush tx_delta = flush_data.tx_count - self.last_flush_tx_count @@ -675,7 +650,7 @@ class LevelDB: ) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}" # Write the headers - start_time = time.perf_counter() + # start_time = time.perf_counter() with self.db.write_batch() as batch: self.put = batch.put @@ -701,7 +676,7 @@ class LevelDB: flush_data.headers.clear() flush_data.block_txs.clear() flush_data.block_hashes.clear() - + op_count = len(flush_data.claimtrie_stash) for staged_change in flush_data.claimtrie_stash: # print("ADVANCE", staged_change) if staged_change.is_put: @@ -759,9 +734,9 @@ class LevelDB: block_count = flush_data.height - self.db_height tx_count = flush_data.tx_count - self.db_tx_count elapsed = time.time() - start_time - self.logger.info(f'flushed {block_count:,d} blocks with ' + self.logger.info(f'advanced to {flush_data.height:,d} with ' f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' - f'{spend_count:,d} spends in ' + f'{spend_count:,d} spends, {op_count:,d} claim ops in ' f'{elapsed:.1f}s, committing...') self.utxo_flush_count = self.hist_flush_count @@ -776,18 +751,6 @@ class LevelDB: self.last_flush_tx_count = self.fs_tx_count self.write_db_state(batch) - elapsed = self.last_flush - start_time - self.logger.info(f'flush #{self.hist_flush_count:,d} took ' - f'{elapsed:.1f}s. Height {flush_data.height:,d} ' - f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - # Catch-up stats - if self.db.for_sync: - flush_interval = self.last_flush - prior_flush - tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) - tx_per_sec_last = 1 + int(tx_delta / flush_interval) - self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' - f'since last flush: {tx_per_sec_last:,d}') - self.logger.info(f'sync time: {formatted_time(self.wall_time)}') def flush_backup(self, flush_data, touched): """Like flush_dbs() but when backing up. All UTXOs are flushed.""" diff --git a/tests/integration/other/test_chris45.py b/tests/integration/other/test_chris45.py index 51c3cd3e8..52f3b179c 100644 --- a/tests/integration/other/test_chris45.py +++ b/tests/integration/other/test_chris45.py @@ -199,5 +199,5 @@ class EpicAdventuresOfChris45(CommandTestCase): # He closes and opens the wallet server databases to see how horribly they break db = self.conductor.spv_node.server.db db.close() - await db.open_for_serving() + await db.open_dbs() # They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup)