prune stale trending notifications during initial bulk sync

This commit is contained in:
Jack Robison 2022-06-29 12:17:39 -04:00
parent 903a44d991
commit 2363865e00

View file

@ -435,6 +435,11 @@ class ElasticSyncService(BlockchainReaderService):
for k, v in self.db.prefix_db.trending_notification.iterate(start=(batch_height,), stop=(batch_height+10000,)): for k, v in self.db.prefix_db.trending_notification.iterate(start=(batch_height,), stop=(batch_height+10000,)):
notifications[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount)) notifications[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount))
async for (k,), v in self.db.prefix_db.claim_to_txo.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in notifications]):
if not v:
notifications.pop(k)
for claim_hash, trending in notifications.items(): for claim_hash, trending in notifications.items():
yield self._update_trending_query(self.index, claim_hash, trending) yield self._update_trending_query(self.index, claim_hash, trending)
self._trending.clear() self._trending.clear()
@ -452,7 +457,7 @@ class ElasticSyncService(BlockchainReaderService):
else: else:
success += 1 success += 1
if cnt % batch_size == 0: if cnt % batch_size == 0:
self.log.info(f"indexed {success} claims") self.log.info(f"indexed {success}/{cnt} claims")
finished = True finished = True
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
self.log.info("indexed %i/%i claims", success, cnt) self.log.info("indexed %i/%i claims", success, cnt)