forked from LBRYCommunity/lbry-sdk
skip loading tx/claim caches in the elastic sync script when not needed
This commit is contained in:
parent
755e8ce101
commit
b3705073a3
4 changed files with 16 additions and 13 deletions
|
@ -1754,11 +1754,13 @@ class BlockProcessor:
|
|||
|
||||
self._caught_up_event = caught_up_event
|
||||
try:
|
||||
await self.db.open_dbs()
|
||||
self.db.open_db()
|
||||
self.height = self.db.db_height
|
||||
self.tip = self.db.db_tip
|
||||
self.tx_count = self.db.db_tx_count
|
||||
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
|
||||
await self.db.initialize_caches()
|
||||
await self.db.search_index.start()
|
||||
await asyncio.wait([
|
||||
self.prefetcher.main_loop(self.height),
|
||||
self._process_prefetched_blocks()
|
||||
|
|
|
@ -15,14 +15,15 @@ async def get_recent_claims(env, index_name='claims', db=None):
|
|||
db = db or LevelDB(env)
|
||||
try:
|
||||
if need_open:
|
||||
await db.open_dbs()
|
||||
db_state = db.prefix_db.db_state.get()
|
||||
if db_state.es_sync_height == db_state.height:
|
||||
db.open_db()
|
||||
if db.es_sync_height == db.db_height or db.db_height <= 0:
|
||||
return
|
||||
if need_open:
|
||||
await db.initialize_caches()
|
||||
cnt = 0
|
||||
touched_claims = set()
|
||||
deleted_claims = set()
|
||||
for height in range(db_state.es_sync_height, db_state.height + 1):
|
||||
for height in range(db.es_sync_height, db.db_height + 1):
|
||||
touched_or_deleted = db.prefix_db.touched_or_deleted.get(height)
|
||||
touched_claims.update(touched_or_deleted.touched_claims)
|
||||
deleted_claims.update(touched_or_deleted.deleted_claims)
|
||||
|
@ -65,7 +66,8 @@ async def get_all_claims(env, index_name='claims', db=None):
|
|||
need_open = db is None
|
||||
db = db or LevelDB(env)
|
||||
if need_open:
|
||||
await db.open_dbs()
|
||||
db.open_db()
|
||||
await db.initialize_caches()
|
||||
logging.info("Fetching claims to send ES from leveldb")
|
||||
try:
|
||||
cnt = 0
|
||||
|
|
|
@ -823,7 +823,7 @@ class LevelDB:
|
|||
return struct.unpack('<I', self.headers[height][100:104])[0]
|
||||
return int(160.6855883050695 * height)
|
||||
|
||||
async def open_dbs(self):
|
||||
def open_db(self):
|
||||
if self.prefix_db and not self.prefix_db.closed:
|
||||
return
|
||||
|
||||
|
@ -856,10 +856,9 @@ class LevelDB:
|
|||
self.logger.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
|
||||
# Read TX counts (requires meta directory)
|
||||
async def initialize_caches(self):
|
||||
await self._read_tx_counts()
|
||||
await self._read_headers()
|
||||
if self.env.cache_all_claim_txos:
|
||||
|
@ -867,9 +866,6 @@ class LevelDB:
|
|||
if self.env.cache_all_tx_hashes:
|
||||
await self._read_tx_hashes()
|
||||
|
||||
# start search index
|
||||
await self.search_index.start()
|
||||
|
||||
def close(self):
|
||||
self.prefix_db.close()
|
||||
|
||||
|
@ -1082,6 +1078,7 @@ class LevelDB:
|
|||
self.hist_comp_flush_count = -1
|
||||
self.hist_comp_cursor = -1
|
||||
self.hist_db_version = max(self.DB_VERSIONS)
|
||||
self.es_sync_height = 0
|
||||
else:
|
||||
self.db_version = state.db_version
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
|
@ -1102,6 +1099,7 @@ class LevelDB:
|
|||
self.hist_comp_flush_count = state.comp_flush_count
|
||||
self.hist_comp_cursor = state.comp_cursor
|
||||
self.hist_db_version = state.db_version
|
||||
self.es_sync_height = state.es_sync_height
|
||||
|
||||
def assert_db_state(self):
|
||||
state = self.prefix_db.db_state.get()
|
||||
|
|
|
@ -199,5 +199,6 @@ class EpicAdventuresOfChris45(CommandTestCase):
|
|||
# He closes and opens the wallet server databases to see how horribly they break
|
||||
db = self.conductor.spv_node.server.db
|
||||
db.close()
|
||||
await db.open_dbs()
|
||||
db.open_db()
|
||||
await db.initialize_caches()
|
||||
# They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup)
|
||||
|
|
Loading…
Add table
Reference in a new issue