forked from LBRYCommunity/lbry-sdk
apply filter and block from ES script lang
This commit is contained in:
parent
146b693e4a
commit
f9471f297e
3 changed files with 33 additions and 7 deletions
|
@ -217,6 +217,8 @@ class BlockProcessor:
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||||
await self.db.search_index.sync_queue(self.sql.claim_queue)
|
await self.db.search_index.sync_queue(self.sql.claim_queue)
|
||||||
|
await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
|
||||||
|
self.sql.filtered_streams, self.sql.filtered_channels)
|
||||||
for cache in self.search_cache.values():
|
for cache in self.search_cache.values():
|
||||||
cache.clear()
|
cache.clear()
|
||||||
self.history_cache.clear()
|
self.history_cache.clear()
|
||||||
|
|
|
@ -77,6 +77,34 @@ class SearchIndex:
|
||||||
await self.update(to_update)
|
await self.update(to_update)
|
||||||
await self.client.indices.refresh(self.index)
|
await self.client.indices.refresh(self.index)
|
||||||
|
|
||||||
|
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
|
||||||
|
def make_query(censor_type, blockdict, channels=False):
|
||||||
|
blockdict = dict(
|
||||||
|
(hexlify(key[::-1]).decode(), hexlify(value[::-1]).decode()) for key, value in blockdict.items())
|
||||||
|
if channels:
|
||||||
|
update = expand_query(channel_id__in=list(blockdict.keys()))
|
||||||
|
else:
|
||||||
|
update = expand_query(claim_id__in=list(blockdict.keys()))
|
||||||
|
key = 'channel_id' if channels else 'claim_id'
|
||||||
|
update['script'] = {
|
||||||
|
"source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_hash=params[ctx._source.{key}]",
|
||||||
|
"lang": "painless",
|
||||||
|
"params": blockdict
|
||||||
|
}
|
||||||
|
return update
|
||||||
|
if filtered_streams:
|
||||||
|
await self.client.update_by_query(self.index, body=make_query(1, filtered_streams))
|
||||||
|
await self.client.indices.refresh(self.index)
|
||||||
|
if filtered_channels:
|
||||||
|
await self.client.update_by_query(self.index, body=make_query(1, filtered_channels, True))
|
||||||
|
await self.client.indices.refresh(self.index)
|
||||||
|
if blocked_streams:
|
||||||
|
await self.client.update_by_query(self.index, body=make_query(2, blocked_streams))
|
||||||
|
await self.client.indices.refresh(self.index)
|
||||||
|
if blocked_channels:
|
||||||
|
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True))
|
||||||
|
await self.client.indices.refresh(self.index)
|
||||||
|
|
||||||
async def update(self, claims):
|
async def update(self, claims):
|
||||||
if not claims:
|
if not claims:
|
||||||
return
|
return
|
||||||
|
|
|
@ -809,9 +809,6 @@ class SQLDB:
|
||||||
def enqueue_changes(self, changed_claim_hashes, deleted_claims):
|
def enqueue_changes(self, changed_claim_hashes, deleted_claims):
|
||||||
if not changed_claim_hashes and not deleted_claims:
|
if not changed_claim_hashes and not deleted_claims:
|
||||||
return
|
return
|
||||||
blocklist = set(self.blocked_streams.keys()) | set(self.filtered_streams.keys())
|
|
||||||
blocked_channels = set(self.blocked_channels.keys()) | set(self.filtered_channels.keys())
|
|
||||||
changed_claim_hashes |= blocklist | blocked_channels
|
|
||||||
for claim in self.execute(f"""
|
for claim in self.execute(f"""
|
||||||
SELECT claimtrie.claim_hash as is_controlling,
|
SELECT claimtrie.claim_hash as is_controlling,
|
||||||
claimtrie.last_take_over_height,
|
claimtrie.last_take_over_height,
|
||||||
|
@ -820,13 +817,12 @@ class SQLDB:
|
||||||
claim.*
|
claim.*
|
||||||
FROM claim LEFT JOIN claimtrie USING (claim_hash)
|
FROM claim LEFT JOIN claimtrie USING (claim_hash)
|
||||||
WHERE claim_hash IN ({','.join('?' for _ in changed_claim_hashes)})
|
WHERE claim_hash IN ({','.join('?' for _ in changed_claim_hashes)})
|
||||||
OR channel_hash IN ({','.join('?' for _ in blocked_channels)})
|
""", list(changed_claim_hashes)):
|
||||||
""", list(changed_claim_hashes) + list(blocked_channels)):
|
claim = claim._asdict()
|
||||||
claim = dict(claim._asdict())
|
|
||||||
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
|
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
|
||||||
claim['censor_type'] = 0
|
claim['censor_type'] = 0
|
||||||
claim['censoring_channel_hash'] = None
|
claim['censoring_channel_hash'] = None
|
||||||
for reason_id in id_set.intersection(blocklist | blocked_channels):
|
for reason_id in id_set:
|
||||||
if reason_id in self.blocked_streams:
|
if reason_id in self.blocked_streams:
|
||||||
claim['censor_type'] = 2
|
claim['censor_type'] = 2
|
||||||
claim['censoring_channel_hash'] = self.blocked_streams.get(reason_id)
|
claim['censoring_channel_hash'] = self.blocked_streams.get(reason_id)
|
||||||
|
|
Loading…
Add table
Reference in a new issue