remove extra open functions

This commit is contained in:
Jack Robison 2021-06-06 13:08:15 -04:00 committed by Victor Shyba
parent 5541b80179
commit 5be04448ea
4 changed files with 21 additions and 57 deletions

View file

@ -1576,13 +1576,7 @@ class BlockProcessor:
self.logger.info(f'{lbry.__version__} synced to ' self.logger.info(f'{lbry.__version__} synced to '
f'height {self.height:,d}') f'height {self.height:,d}')
# Reopen for serving # Reopen for serving
await self.db.open_for_serving() await self.db.open_dbs()
async def _first_open_dbs(self):
await self.db.open_for_sync()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
# --- External API # --- External API
@ -1601,7 +1595,11 @@ class BlockProcessor:
self._caught_up_event = caught_up_event self._caught_up_event = caught_up_event
try: try:
await self._first_open_dbs() await self.db.open_dbs()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
self.status_server.set_height(self.db.fs_height, self.db.db_tip) self.status_server.set_height(self.db.fs_height, self.db.db_tip)
await asyncio.wait([ await asyncio.wait([
self.prefetcher.main_loop(self.height), self.prefetcher.main_loop(self.height),

View file

@ -111,8 +111,11 @@ class SearchIndex:
yield extract_doc(doc, self.index) yield extract_doc(doc, self.index)
count += 1 count += 1
if count % 100 == 0: if count % 100 == 0:
self.logger.info("Indexing in progress, %d claims.", count) self.logger.debug("Indexing in progress, %d claims.", count)
self.logger.info("Indexing done for %d claims.", count) if count:
self.logger.info("Indexing done for %d claims.", count)
else:
self.logger.debug("Indexing done for %d claims.", count)
async def claim_consumer(self, claim_producer): async def claim_consumer(self, claim_producer):
touched = set() touched = set()
@ -124,7 +127,7 @@ class SearchIndex:
item = item.popitem()[1] item = item.popitem()[1]
touched.add(item['_id']) touched.add(item['_id'])
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
self.logger.info("Indexing done.") self.logger.debug("Indexing done.")
def update_filter_query(self, censor_type, blockdict, channels=False): def update_filter_query(self, censor_type, blockdict, channels=False):
blockdict = {key[::-1].hex(): value[::-1].hex() for key, value in blockdict.items()} blockdict = {key[::-1].hex(): value[::-1].hex() for key, value in blockdict.items()}

View file

@ -495,7 +495,9 @@ class LevelDB:
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers self.headers = headers
async def _open_dbs(self, for_sync, compacting): async def open_dbs(self):
if self.db:
return
if self.executor is None: if self.executor is None:
self.executor = ThreadPoolExecutor(1) self.executor = ThreadPoolExecutor(1)
@ -595,33 +597,6 @@ class LevelDB:
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
self.executor = None self.executor = None
async def open_for_compacting(self):
await self._open_dbs(True, True)
async def open_for_sync(self):
"""Open the databases to sync to the daemon.
When syncing we want to reserve a lot of open files for the
synchronization. When serving clients we want the open files for
serving network connections.
"""
self.logger.info("opened for sync")
await self._open_dbs(True, False)
async def open_for_serving(self):
"""Open the databases for serving. If they are already open they are
closed first.
"""
if self.db:
return
# self.logger.info('closing DBs to re-open for serving')
# self.db.close()
# self.history.close_db()
# self.db = None
await self._open_dbs(False, False)
self.logger.info("opened for serving")
# Header merkle cache # Header merkle cache
async def populate_header_merkle_cache(self): async def populate_header_merkle_cache(self):
@ -656,7 +631,7 @@ class LevelDB:
self.assert_flushed(flush_data) self.assert_flushed(flush_data)
return return
start_time = time.time() # start_time = time.time()
prior_flush = self.last_flush prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count tx_delta = flush_data.tx_count - self.last_flush_tx_count
@ -675,7 +650,7 @@ class LevelDB:
) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}" ) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}"
# Write the headers # Write the headers
start_time = time.perf_counter() # start_time = time.perf_counter()
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
self.put = batch.put self.put = batch.put
@ -701,7 +676,7 @@ class LevelDB:
flush_data.headers.clear() flush_data.headers.clear()
flush_data.block_txs.clear() flush_data.block_txs.clear()
flush_data.block_hashes.clear() flush_data.block_hashes.clear()
op_count = len(flush_data.claimtrie_stash)
for staged_change in flush_data.claimtrie_stash: for staged_change in flush_data.claimtrie_stash:
# print("ADVANCE", staged_change) # print("ADVANCE", staged_change)
if staged_change.is_put: if staged_change.is_put:
@ -759,9 +734,9 @@ class LevelDB:
block_count = flush_data.height - self.db_height block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time elapsed = time.time() - start_time
self.logger.info(f'flushed {block_count:,d} blocks with ' self.logger.info(f'advanced to {flush_data.height:,d} with '
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
f'{spend_count:,d} spends in ' f'{spend_count:,d} spends, {op_count:,d} claim ops in '
f'{elapsed:.1f}s, committing...') f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.hist_flush_count self.utxo_flush_count = self.hist_flush_count
@ -776,18 +751,6 @@ class LevelDB:
self.last_flush_tx_count = self.fs_tx_count self.last_flush_tx_count = self.fs_tx_count
self.write_db_state(batch) self.write_db_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.hist_flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)}')
def flush_backup(self, flush_data, touched): def flush_backup(self, flush_data, touched):
"""Like flush_dbs() but when backing up. All UTXOs are flushed.""" """Like flush_dbs() but when backing up. All UTXOs are flushed."""

View file

@ -199,5 +199,5 @@ class EpicAdventuresOfChris45(CommandTestCase):
# He closes and opens the wallet server databases to see how horribly they break # He closes and opens the wallet server databases to see how horribly they break
db = self.conductor.spv_node.server.db db = self.conductor.spv_node.server.db
db.close() db.close()
await db.open_for_serving() await db.open_dbs()
# They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup) # They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup)