skip es sync during initial hub sync, halt the hub upon finishing initial sync
This commit is contained in:
parent
ad7dee3e7f
commit
768934e1cc
2 changed files with 7 additions and 6 deletions
|
@ -172,11 +172,12 @@ class BlockProcessor:
|
||||||
"reorg_count", "Number of reorgs", namespace=NAMESPACE
|
"reorg_count", "Number of reorgs", namespace=NAMESPACE
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, env, db: 'LevelDB', daemon, notifications):
|
def __init__(self, env, db: 'LevelDB', daemon, notifications, shutdown_event: asyncio.Event):
|
||||||
self.env = env
|
self.env = env
|
||||||
self.db = db
|
self.db = db
|
||||||
self.daemon = daemon
|
self.daemon = daemon
|
||||||
self.notifications = notifications
|
self.notifications = notifications
|
||||||
|
self.shutdown_event = shutdown_event
|
||||||
|
|
||||||
self.coin = env.coin
|
self.coin = env.coin
|
||||||
if env.coin.NET == 'mainnet':
|
if env.coin.NET == 'mainnet':
|
||||||
|
@ -308,7 +309,8 @@ class BlockProcessor:
|
||||||
await self.run_in_thread_with_lock(self.advance_block, block)
|
await self.run_in_thread_with_lock(self.advance_block, block)
|
||||||
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
||||||
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||||
await self.db.search_index.claim_consumer(self.claim_producer())
|
if not self.db.first_sync:
|
||||||
|
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||||
self.db.search_index.clear_caches()
|
self.db.search_index.clear_caches()
|
||||||
self.touched_claims_to_send_es.clear()
|
self.touched_claims_to_send_es.clear()
|
||||||
self.removed_claims_to_send_es.clear()
|
self.removed_claims_to_send_es.clear()
|
||||||
|
@ -1495,9 +1497,8 @@ class BlockProcessor:
|
||||||
await self.flush(True)
|
await self.flush(True)
|
||||||
if first_sync:
|
if first_sync:
|
||||||
self.logger.info(f'{lbry.__version__} synced to '
|
self.logger.info(f'{lbry.__version__} synced to '
|
||||||
f'height {self.height:,d}')
|
f'height {self.height:,d}, halting here.')
|
||||||
# Reopen for serving
|
self.shutdown_event.set()
|
||||||
await self.db.open_dbs()
|
|
||||||
|
|
||||||
# --- External API
|
# --- External API
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ class Server:
|
||||||
self.notifications = notifications = Notifications()
|
self.notifications = notifications = Notifications()
|
||||||
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
|
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
|
||||||
self.db = db = env.coin.DB(env)
|
self.db = db = env.coin.DB(env)
|
||||||
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications)
|
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications, self.shutdown_event)
|
||||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||||
|
|
||||||
# Set notifications up to implement the MemPoolAPI
|
# Set notifications up to implement the MemPoolAPI
|
||||||
|
|
Loading…
Reference in a new issue