diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 699f0709c..ebb0efc4b 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -165,42 +165,54 @@ class ElasticWriter(BlockchainReader): self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) await self.sync_client.indices.refresh(self.index) + @staticmethod + def _upsert_claim_query(index, claim): + return { + 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, + '_id': claim['claim_id'], + '_index': index, + '_op_type': 'update', + 'doc_as_upsert': True + } + + @staticmethod + def _delete_claim_query(index, claim_hash: bytes): + return { + '_index': index, + '_op_type': 'delete', + '_id': claim_hash.hex() + } + + @staticmethod + def _update_trending_query(index, claim_hash, notifications): + return { + '_id': claim_hash.hex(), + '_index': index, + '_op_type': 'update', + 'script': { + 'lang': 'painless', + 'source': FAST_AR_TRENDING_SCRIPT, + 'params': {'src': { + 'changes': [ + { + 'height': notify_height, + 'prev_amount': trending_v.previous_amount / 1E8, + 'new_amount': trending_v.new_amount / 1E8, + } for (notify_height, trending_v) in notifications + ] + }} + }, + } + async def _claim_producer(self): for deleted in self._deleted_claims: - yield { - '_index': self.index, - '_op_type': 'delete', - '_id': deleted.hex() - } + yield self._delete_claim_query(self.index, deleted) for touched in self._touched_claims: claim = self.db.claim_producer(touched) if claim: - yield { - 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, - '_id': claim['claim_id'], - '_index': self.index, - '_op_type': 'update', - 'doc_as_upsert': True - } + yield self._upsert_claim_query(self.index, claim) for claim_hash, notifications in self._trending.items(): - yield { - '_id': claim_hash.hex(), - '_index': self.index, - '_op_type': 'update', - 'script': { - 'lang': 'painless', - 'source': FAST_AR_TRENDING_SCRIPT, - 'params': {'src': { - 'changes': [ - { - 'height': notify_height, - 'prev_amount': trending_v.previous_amount / 1E8, - 'new_amount': trending_v.new_amount / 1E8, - } for (notify_height, trending_v) in notifications - ] - }} - }, - } + yield self._update_trending_query(self.index, claim_hash, notifications) def advance(self, height: int): super().advance(height)