implement blocking and filtering

This commit is contained in:
Victor Shyba 2021-07-29 23:01:13 -03:00
parent bc0c1b9a3e
commit 08d635322e
6 changed files with 98 additions and 37 deletions

View file

@ -1,6 +1,5 @@
import base64
import struct
from typing import List, TYPE_CHECKING, Union
from typing import List, TYPE_CHECKING, Union, Optional
from binascii import hexlify
from itertools import chain
@ -43,19 +42,19 @@ class Censor:
def apply(self, rows):
return [row for row in rows if not self.censor(row)]
def censor(self, row) -> bool:
def censor(self, row) -> Optional[bytes]:
if self.is_censored(row):
censoring_channel_hash = row['censoring_channel_hash']
self.censored.setdefault(censoring_channel_hash, set())
self.censored[censoring_channel_hash].add(row['tx_hash'])
return True
return False
return censoring_channel_hash
return None
def to_message(self, outputs: OutputsMessage, extra_txo_rows: dict):
for censoring_channel_hash, count in self.censored.items():
blocked = outputs.blocked.add()
blocked.count = len(count)
set_reference(blocked.channel, extra_txo_rows.get(censoring_channel_hash))
set_reference(blocked.channel, censoring_channel_hash, extra_txo_rows)
outputs.blocked_total += len(count)
@ -178,8 +177,8 @@ class Outputs:
page.offset = offset
if total is not None:
page.total = total
# if blocked is not None:
# blocked.to_message(page, extra_txo_rows)
if blocked is not None:
blocked.to_message(page, extra_txo_rows)
for row in extra_txo_rows:
cls.encode_txo(page.extra_txos.add(), row)
@ -192,7 +191,8 @@ class Outputs:
set_reference(txo_message.claim.channel, row.channel_hash, extra_txo_rows)
if row.reposted_claim_hash:
set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows)
# set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows)
elif isinstance(row, ResolveCensoredError):
set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows)
return page.SerializeToString()
@classmethod

View file

@ -318,7 +318,10 @@ class BlockProcessor:
await self.run_in_thread(self.db.apply_expiration_extension_fork)
# TODO: we shouldnt wait on the search index updating before advancing to the next block
if not self.db.first_sync:
self.db.reload_blocking_filtering_streams()
await self.db.search_index.claim_consumer(self.claim_producer())
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
self.db.filtered_streams, self.db.filtered_channels)
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()

View file

@ -130,7 +130,7 @@ class SearchIndex:
self.logger.debug("Indexing done.")
def update_filter_query(self, censor_type, blockdict, channels=False):
blockdict = {key[::-1].hex(): value[::-1].hex() for key, value in blockdict.items()}
blockdict = {key.hex(): value.hex() for key, value in blockdict.items()}
if channels:
update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
else:
@ -483,7 +483,7 @@ def extract_doc(doc, index):
channel_hash = doc.pop('channel_hash')
doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash
channel_hash = doc.pop('censoring_channel_hash')
doc['censoring_channel_hash'] = channel_hash[::-1].hex() if channel_hash else channel_hash
doc['censoring_channel_hash'] = channel_hash.hex() if channel_hash else channel_hash
# txo_hash = doc.pop('txo_hash')
# doc['tx_id'] = txo_hash[:32][::-1].hex()
# doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0]

View file

@ -23,6 +23,9 @@ from functools import partial
from asyncio import sleep
from bisect import bisect_right
from collections import defaultdict, OrderedDict
from lbry.error import ResolveCensoredError
from lbry.schema.result import Censor
from lbry.utils import LRUCacheWithMetrics
from lbry.schema.url import URL
from lbry.wallet.server import util
@ -115,6 +118,20 @@ class LevelDB:
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
# blocking/filtering dicts
blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ')
filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ')
self.blocked_streams = {}
self.blocked_channels = {}
self.blocking_channel_hashes = {
bytes.fromhex(channel_id)[::-1] for channel_id in blocking_channels if channel_id
}
self.filtered_streams = {}
self.filtered_channels = {}
self.filtering_channel_hashes = {
bytes.fromhex(channel_id)[::-1] for channel_id in filtering_channels if channel_id
}
self.tx_counts = None
self.headers = None
self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server')
@ -352,6 +369,16 @@ class LevelDB:
if not resolved_stream:
return LookupError(f'Could not find claim at "{url}".'), None
if resolved_stream or resolved_channel:
claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash
claim = resolved_stream if resolved_stream else resolved_channel
reposted_claim_hash = resolved_stream.reposted_claim_hash if resolved_stream else None
blocker_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
if blocker_hash:
reason_row = self._fs_get_claim_by_hash(blocker_hash)
return None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row)
return resolved_stream, resolved_channel
async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]:
@ -435,6 +462,31 @@ class LevelDB:
count += 1
return count
def reload_blocking_filtering_streams(self):
self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.blocking_channel_hashes)
self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.filtering_channel_hashes)
def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: bytes):
streams, channels = {}, {}
for reposter_channel_hash in reposter_channel_hashes:
reposts = self.get_reposts_in_channel(reposter_channel_hash)
for repost in reposts:
txo = self.get_claim_txo(repost)
if txo.name.startswith('@'):
channels[repost] = reposter_channel_hash
else:
streams[repost] = reposter_channel_hash
return streams, channels
def get_reposts_in_channel(self, channel_hash):
reposts = set()
for value in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash), include_key=False):
stream = Prefixes.channel_to_claim.unpack_value(value)
repost = self.get_repost(stream.claim_hash)
if repost:
reposts.add(repost)
return reposts
def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]:
return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position))
@ -542,18 +594,22 @@ class LevelDB:
)
return
if reposted_metadata:
reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags]
reposted_languages = [] if not reposted_metadata.is_stream else (
[lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none']
)
meta = reposted_metadata.stream if reposted_metadata.is_stream else reposted_metadata.channel
reposted_tags = [tag for tag in meta.tags]
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags]
claim_languages = [] if not metadata.is_stream else (
[lang.language or 'none' for lang in metadata.stream.languages] or ['none']
)
lang_tags = metadata.stream if metadata.is_stream else metadata.channel if metadata.is_channel else metadata.repost
claim_tags = [tag for tag in lang_tags.tags]
claim_languages = [lang.language or 'none' for lang in lang_tags.languages] or ['none']
tags = list(set(claim_tags).union(set(reposted_tags)))
languages = list(set(claim_languages).union(set(reposted_languages)))
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
value = {
'claim_hash': claim_hash[::-1],
# 'claim_id': claim_hash.hex(),
@ -603,8 +659,8 @@ class LevelDB:
'signature_valid': claim.signature_valid,
'tags': tags,
'languages': languages,
'censor_type': 0, # TODO: fix
'censoring_channel_hash': None, # TODO: fix
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
'censoring_channel_hash': blocked_hash or filtered_hash or None,
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash)
# 'trending_group': 0,
# 'trending_mixed': 0,

View file

@ -21,7 +21,7 @@ from elasticsearch import ConnectionTimeout
from prometheus_client import Counter, Info, Histogram, Gauge
import lbry
from lbry.error import TooManyClaimSearchParametersError
from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.schema.result import Outputs
from lbry.wallet.server.block_processor import BlockProcessor
@ -997,7 +997,13 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel = await self.db.fs_resolve(url)
self.session_mgr.resolved_url_count_metric.inc()
if channel and not stream:
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
elif stream:

View file

@ -1492,12 +1492,10 @@ class StreamCommands(ClaimTestCase):
filtering_channel_id = self.get_claim_id(
await self.channel_create('@filtering', '0.1')
)
# self.conductor.spv_node.server.db.sql.filtering_channel_hashes.add(
# unhexlify(filtering_channel_id)[::-1]
# )
self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.filtered_streams))
self.conductor.spv_node.server.db.filtering_channel_hashes.add(bytes.fromhex(filtering_channel_id))
self.assertEqual(0, len(self.conductor.spv_node.server.db.filtered_streams))
await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering')
self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.filtered_streams))
self.assertEqual(1, len(self.conductor.spv_node.server.db.filtered_streams))
# search for filtered content directly
result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content'))
@ -1539,12 +1537,10 @@ class StreamCommands(ClaimTestCase):
blocking_channel_id = self.get_claim_id(
await self.channel_create('@blocking', '0.1')
)
self.conductor.spv_node.server.db.sql.blocking_channel_hashes.add(
unhexlify(blocking_channel_id)[::-1]
)
self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.blocked_streams))
self.conductor.spv_node.server.db.blocking_channel_hashes.add(bytes.fromhex(blocking_channel_id))
self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_streams))
await self.stream_repost(bad_content_id, 'block1', '0.1', channel_name='@blocking')
self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.blocked_streams))
self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_streams))
# blocked content is not resolveable
error = (await self.resolve('lbry://@some_channel/bad_content'))['error']
@ -1567,9 +1563,9 @@ class StreamCommands(ClaimTestCase):
self.assertEqual('@bad_channel', result['items'][1]['name'])
# filter channel out
self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.filtered_channels))
self.assertEqual(0, len(self.conductor.spv_node.server.db.filtered_channels))
await self.stream_repost(bad_channel_id, 'filter2', '0.1', channel_name='@filtering')
self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.filtered_channels))
self.assertEqual(1, len(self.conductor.spv_node.server.db.filtered_channels))
# same claim search as previous now returns 0 results
result = await self.out(self.daemon.jsonrpc_claim_search(any_tags=['bad-stuff'], order_by=['height']))
@ -1594,9 +1590,9 @@ class StreamCommands(ClaimTestCase):
self.assertEqual(worse_content_id, result['claim_id'])
# block channel
self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.blocked_channels))
self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_channels))
await self.stream_repost(bad_channel_id, 'block2', '0.1', channel_name='@blocking')
self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.blocked_channels))
self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_channels))
# channel, claim in channel or claim individually no longer resolve
self.assertEqual((await self.resolve('lbry://@bad_channel'))['error']['name'], 'BLOCKED')