block scribe-hub
and scribe-elastic-sync
startup on scribe
initially catching up
fixes https://github.com/lbryio/scribe/issues/1
This commit is contained in:
parent
9c863c02d4
commit
f59adef282
6 changed files with 62 additions and 29 deletions
|
@ -1540,10 +1540,12 @@ class BlockchainProcessorService(BlockchainService):
|
|||
"""Loop forever processing blocks as they arrive."""
|
||||
self._caught_up_event = caught_up_event
|
||||
try:
|
||||
if self.height != self.daemon.cached_height() and not self.db.catching_up:
|
||||
await self._need_catch_up() # tell the readers that we're still catching up with lbrycrd/lbcd
|
||||
while not self._stopping:
|
||||
if self.height == self.daemon.cached_height():
|
||||
if not self._caught_up_event.is_set():
|
||||
await self._first_caught_up()
|
||||
await self._finished_initial_catch_up()
|
||||
self._caught_up_event.set()
|
||||
try:
|
||||
await asyncio.wait_for(self.blocks_event.wait(), self.wait_for_blocks_duration)
|
||||
|
@ -1558,25 +1560,27 @@ class BlockchainProcessorService(BlockchainService):
|
|||
await self.refresh_mempool()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
self.log.exception("error while updating mempool txs")
|
||||
raise
|
||||
except Exception as err:
|
||||
self.log.exception("error while updating mempool txs: %s", err)
|
||||
raise err
|
||||
else:
|
||||
try:
|
||||
await self.check_and_advance_blocks(blocks)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
self.log.exception("error while processing txs")
|
||||
raise
|
||||
except Exception as err:
|
||||
self.log.exception("error while processing txs: %s", err)
|
||||
raise err
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as err:
|
||||
self.log.exception("error in block processor loop: %s", err)
|
||||
raise err
|
||||
finally:
|
||||
self._ready_to_stop.set()
|
||||
|
||||
async def _first_caught_up(self):
|
||||
self.log.info(f'caught up to height {self.height}')
|
||||
# Flush everything but with first_sync->False state.
|
||||
first_sync = self.db.first_sync
|
||||
self.db.first_sync = False
|
||||
async def _need_catch_up(self):
|
||||
self.db.catching_up = True
|
||||
|
||||
def flush():
|
||||
assert len(self.db.prefix_db._op_stack) == 0
|
||||
|
@ -1586,10 +1590,18 @@ class BlockchainProcessorService(BlockchainService):
|
|||
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
if first_sync:
|
||||
self.log.info(f'{__version__} synced to '
|
||||
f'height {self.height:,d}, halting here.')
|
||||
self.shutdown_event.set()
|
||||
async def _finished_initial_catch_up(self):
|
||||
self.log.info(f'caught up to height {self.height}')
|
||||
# Flush everything but with catching_up->False state.
|
||||
self.db.catching_up = False
|
||||
|
||||
def flush():
|
||||
assert len(self.db.prefix_db._op_stack) == 0
|
||||
self.db.write_db_state()
|
||||
self.db.prefix_db.unsafe_commit()
|
||||
self.db.assert_db_state()
|
||||
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
def _iter_start_tasks(self):
|
||||
self.height = self.db.db_height
|
||||
|
|
|
@ -1045,10 +1045,10 @@ class HubDB:
|
|||
if self.db_height > 0:
|
||||
self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get())
|
||||
self.prefix_db.db_state.stage_put((), (
|
||||
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
||||
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
|
||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
|
||||
self.es_sync_height
|
||||
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
||||
self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version,
|
||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
|
||||
self.es_sync_height
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -1062,7 +1062,7 @@ class HubDB:
|
|||
self.db_version = max(self.DB_VERSIONS)
|
||||
self.utxo_flush_count = 0
|
||||
self.wall_time = 0
|
||||
self.first_sync = True
|
||||
self.catching_up = True
|
||||
self.hist_flush_count = 0
|
||||
self.hist_comp_flush_count = -1
|
||||
self.hist_comp_cursor = -1
|
||||
|
@ -1083,7 +1083,7 @@ class HubDB:
|
|||
self.db_tip = state.tip
|
||||
self.utxo_flush_count = state.utxo_flush_count
|
||||
self.wall_time = state.wall_time
|
||||
self.first_sync = state.first_sync
|
||||
self.catching_up = state.catching_up
|
||||
self.hist_flush_count = state.hist_flush_count
|
||||
self.hist_comp_flush_count = state.comp_flush_count
|
||||
self.hist_comp_cursor = state.comp_cursor
|
||||
|
@ -1097,7 +1097,7 @@ class HubDB:
|
|||
assert self.db_height == state.height, f"{self.db_height} != {state.height}"
|
||||
assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}"
|
||||
assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}"
|
||||
assert self.first_sync == state.first_sync, f"{self.first_sync} != {state.first_sync}"
|
||||
assert self.catching_up == state.catching_up, f"{self.catching_up} != {state.catching_up}"
|
||||
assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}"
|
||||
|
||||
async def all_utxos(self, hashX):
|
||||
|
|
|
@ -422,7 +422,7 @@ class DBState(typing.NamedTuple):
|
|||
tip: bytes
|
||||
utxo_flush_count: int
|
||||
wall_time: int
|
||||
first_sync: bool
|
||||
catching_up: bool
|
||||
db_version: int
|
||||
hist_flush_count: int
|
||||
comp_flush_count: int
|
||||
|
@ -1439,11 +1439,11 @@ class DBStatePrefixRow(PrefixRow):
|
|||
|
||||
@classmethod
|
||||
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
|
||||
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
comp_cursor: int, es_sync_height: int) -> bytes:
|
||||
return super().pack_value(
|
||||
genesis, height, tx_count, tip, utxo_flush_count,
|
||||
wall_time, 1 if first_sync else 0, db_version, hist_flush_count,
|
||||
wall_time, 1 if catching_up else 0, db_version, hist_flush_count,
|
||||
comp_flush_count, comp_cursor, es_sync_height
|
||||
)
|
||||
|
||||
|
@ -1457,10 +1457,10 @@ class DBStatePrefixRow(PrefixRow):
|
|||
|
||||
@classmethod
|
||||
def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
|
||||
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
comp_cursor: int, es_sync_height: int):
|
||||
return cls.pack_key(), cls.pack_value(
|
||||
genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync, db_version, hist_flush_count,
|
||||
genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count,
|
||||
comp_flush_count, comp_cursor, es_sync_height
|
||||
)
|
||||
|
||||
|
|
|
@ -336,7 +336,24 @@ class ElasticSyncService(BlockchainReaderService):
|
|||
self.log.info("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height)
|
||||
await self._reindex()
|
||||
|
||||
async def block_bulk_sync_on_writer_catchup(self):
|
||||
def _check_if_catching_up():
|
||||
self.db.prefix_db.try_catch_up_with_primary()
|
||||
state = self.db.prefix_db.db_state.get()
|
||||
return state.catching_up
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
catching_up = True
|
||||
while catching_up:
|
||||
catching_up = await loop.run_in_executor(self._executor, _check_if_catching_up)
|
||||
if catching_up:
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
return
|
||||
|
||||
def _iter_start_tasks(self):
|
||||
yield self.block_bulk_sync_on_writer_catchup()
|
||||
yield self.read_es_height()
|
||||
yield self.start_index()
|
||||
yield self.start_cancellable(self.run_es_notifier)
|
||||
|
|
|
@ -92,8 +92,9 @@ class HubServerService(BlockchainReaderService):
|
|||
def _iter_start_tasks(self):
|
||||
yield self.start_status_server()
|
||||
yield self.start_cancellable(self.es_notification_client.maintain_connection)
|
||||
yield self.start_cancellable(self.receive_es_notifications)
|
||||
yield self.start_cancellable(self.refresh_blocks_forever)
|
||||
yield self.finished_initial_catch_up.wait()
|
||||
yield self.start_cancellable(self.receive_es_notifications)
|
||||
yield self.session_manager.search_index.start()
|
||||
yield self.start_cancellable(self.session_manager.serve, self.mempool)
|
||||
|
||||
|
|
|
@ -140,6 +140,7 @@ class BlockchainReaderService(BlockchainService):
|
|||
super().__init__(env, secondary_name, thread_workers, thread_prefix)
|
||||
self._refresh_interval = 0.1
|
||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||
self.finished_initial_catch_up = asyncio.Event()
|
||||
|
||||
async def poll_for_changes(self):
|
||||
"""
|
||||
|
@ -233,6 +234,8 @@ class BlockchainReaderService(BlockchainService):
|
|||
try:
|
||||
async with self.lock:
|
||||
await self.poll_for_changes()
|
||||
if not self.db.catching_up and not self.finished_initial_catch_up.is_set():
|
||||
self.finished_initial_catch_up.set()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except:
|
||||
|
|
Loading…
Reference in a new issue