From f9471f297eac6d0697a6a471e5ba17e47d571207 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 29 Jan 2021 03:41:53 -0300 Subject: [PATCH] apply filter and block from ES script lang --- lbry/wallet/server/block_processor.py | 2 ++ lbry/wallet/server/db/elastic_search.py | 28 +++++++++++++++++++++++++ lbry/wallet/server/db/writer.py | 10 +++------ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 7aa56c996..0a2e567b0 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -217,6 +217,8 @@ class BlockProcessor: start = time.perf_counter() await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.db.search_index.sync_queue(self.sql.claim_queue) + await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels, + self.sql.filtered_streams, self.sql.filtered_channels) for cache in self.search_cache.values(): cache.clear() self.history_cache.clear() diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index ee3048a12..fb0dc61c9 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -77,6 +77,34 @@ class SearchIndex: await self.update(to_update) await self.client.indices.refresh(self.index) + async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): + def make_query(censor_type, blockdict, channels=False): + blockdict = dict( + (hexlify(key[::-1]).decode(), hexlify(value[::-1]).decode()) for key, value in blockdict.items()) + if channels: + update = expand_query(channel_id__in=list(blockdict.keys())) + else: + update = expand_query(claim_id__in=list(blockdict.keys())) + key = 'channel_id' if channels else 'claim_id' + update['script'] = { + "source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_hash=params[ctx._source.{key}]", + "lang": "painless", + "params": blockdict + } + return update + if filtered_streams: + await self.client.update_by_query(self.index, body=make_query(1, filtered_streams)) + await self.client.indices.refresh(self.index) + if filtered_channels: + await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True)) + await self.client.indices.refresh(self.index) + if blocked_streams: + await self.client.update_by_query(self.index, body=make_query(2, blocked_streams)) + await self.client.indices.refresh(self.index) + if blocked_channels: + await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True)) + await self.client.indices.refresh(self.index) + async def update(self, claims): if not claims: return diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 68e3b4fe0..13dc31bd6 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -809,9 +809,6 @@ class SQLDB: def enqueue_changes(self, changed_claim_hashes, deleted_claims): if not changed_claim_hashes and not deleted_claims: return - blocklist = set(self.blocked_streams.keys()) | set(self.filtered_streams.keys()) - blocked_channels = set(self.blocked_channels.keys()) | set(self.filtered_channels.keys()) - changed_claim_hashes |= blocklist | blocked_channels for claim in self.execute(f""" SELECT claimtrie.claim_hash as is_controlling, claimtrie.last_take_over_height, @@ -820,13 +817,12 @@ class SQLDB: claim.* FROM claim LEFT JOIN claimtrie USING (claim_hash) WHERE claim_hash IN ({','.join('?' for _ in changed_claim_hashes)}) - OR channel_hash IN ({','.join('?' for _ in blocked_channels)}) - """, list(changed_claim_hashes) + list(blocked_channels)): - claim = dict(claim._asdict()) + """, list(changed_claim_hashes)): + claim = claim._asdict() id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash']))) claim['censor_type'] = 0 claim['censoring_channel_hash'] = None - for reason_id in id_set.intersection(blocklist | blocked_channels): + for reason_id in id_set: if reason_id in self.blocked_streams: claim['censor_type'] = 2 claim['censoring_channel_hash'] = self.blocked_streams.get(reason_id)