fix scribe-elastic-sync catching up with a database that's since advanced

This commit is contained in:
Jack Robison 2022-03-16 18:41:53 -04:00
parent 50b3acb4e6
commit 28356b49dc
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -292,6 +292,42 @@ class ElasticWriter(BaseBlockchainReader):
def last_synced_height(self) -> int: def last_synced_height(self) -> int:
return self._last_wrote_height return self._last_wrote_height
async def catch_up(self):
last_state = self.db.prefix_db.db_state.get()
db_height = last_state.height
if last_state and self._last_wrote_height and db_height > self._last_wrote_height:
self.log.info(
"syncing ES from block %i to rocksdb height of %i", self._last_wrote_height, last_state.height
)
for height in range(self._last_wrote_height + 1, last_state.height + 1):
self.advance(height)
else:
return
success = 0
cnt = 0
if self._touched_claims or self._deleted_claims or self._trending:
async for ok, item in async_streaming_bulk(
self.sync_client, self._claim_producer(),
raise_on_error=False):
cnt += 1
if not ok:
self.log.warning("indexing failed for an item: %s", item)
else:
success += 1
await self.sync_client.indices.refresh(self.index)
await self.apply_filters(
self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams,
self.db.filtered_channels
)
self.write_es_height(db_height, last_state.tip[::-1].hex())
self._touched_claims.clear()
self._deleted_claims.clear()
self._removed_during_undo.clear()
self._trending.clear()
self._advanced = False
self.notify_es_notification_listeners(self._last_wrote_height, last_state.tip)
self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt)
async def reindex(self, force=False): async def reindex(self, force=False):
if force or self._last_wrote_height == 0 and self.db.db_height > 0: if force or self._last_wrote_height == 0 and self.db.db_height > 0:
if self._last_wrote_height == 0: if self._last_wrote_height == 0:
@ -305,6 +341,7 @@ class ElasticWriter(BaseBlockchainReader):
yield self.start_index() yield self.start_index()
yield self._start_cancellable(self.run_es_notifier) yield self._start_cancellable(self.run_es_notifier)
yield self.reindex(force=self._force_reindex) yield self.reindex(force=self._force_reindex)
yield self.catch_up()
yield self._start_cancellable(self.refresh_blocks_forever) yield self._start_cancellable(self.refresh_blocks_forever)
def _iter_stop_tasks(self): def _iter_stop_tasks(self):