From 08d635322e08b2f189b1cfa8f9095a20b7721b1b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 29 Jul 2021 23:01:13 -0300 Subject: [PATCH] implement blocking and filtering --- lbry/schema/result.py | 18 ++--- lbry/wallet/server/block_processor.py | 3 + lbry/wallet/server/db/elasticsearch/search.py | 4 +- lbry/wallet/server/leveldb.py | 76 ++++++++++++++++--- lbry/wallet/server/session.py | 10 ++- .../blockchain/test_claim_commands.py | 24 +++--- 6 files changed, 98 insertions(+), 37 deletions(-) diff --git a/lbry/schema/result.py b/lbry/schema/result.py index d9da9911f..5e3bf54b9 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -1,6 +1,5 @@ import base64 -import struct -from typing import List, TYPE_CHECKING, Union +from typing import List, TYPE_CHECKING, Union, Optional from binascii import hexlify from itertools import chain @@ -43,19 +42,19 @@ class Censor: def apply(self, rows): return [row for row in rows if not self.censor(row)] - def censor(self, row) -> bool: + def censor(self, row) -> Optional[bytes]: if self.is_censored(row): censoring_channel_hash = row['censoring_channel_hash'] self.censored.setdefault(censoring_channel_hash, set()) self.censored[censoring_channel_hash].add(row['tx_hash']) - return True - return False + return censoring_channel_hash + return None def to_message(self, outputs: OutputsMessage, extra_txo_rows: dict): for censoring_channel_hash, count in self.censored.items(): blocked = outputs.blocked.add() blocked.count = len(count) - set_reference(blocked.channel, extra_txo_rows.get(censoring_channel_hash)) + set_reference(blocked.channel, censoring_channel_hash, extra_txo_rows) outputs.blocked_total += len(count) @@ -178,8 +177,8 @@ class Outputs: page.offset = offset if total is not None: page.total = total - # if blocked is not None: - # blocked.to_message(page, extra_txo_rows) + if blocked is not None: + blocked.to_message(page, extra_txo_rows) for row in extra_txo_rows: cls.encode_txo(page.extra_txos.add(), row) @@ -192,7 +191,8 @@ class Outputs: set_reference(txo_message.claim.channel, row.channel_hash, extra_txo_rows) if row.reposted_claim_hash: set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows) - # set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows) + elif isinstance(row, ResolveCensoredError): + set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows) return page.SerializeToString() @classmethod diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 98563aaba..386a01f2c 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -318,7 +318,10 @@ class BlockProcessor: await self.run_in_thread(self.db.apply_expiration_extension_fork) # TODO: we shouldnt wait on the search index updating before advancing to the next block if not self.db.first_sync: + self.db.reload_blocking_filtering_streams() await self.db.search_index.claim_consumer(self.claim_producer()) + await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, + self.db.filtered_streams, self.db.filtered_channels) self.db.search_index.clear_caches() self.touched_claims_to_send_es.clear() self.removed_claims_to_send_es.clear() diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 3ec121b47..0e333ae22 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -130,7 +130,7 @@ class SearchIndex: self.logger.debug("Indexing done.") def update_filter_query(self, censor_type, blockdict, channels=False): - blockdict = {key[::-1].hex(): value[::-1].hex() for key, value in blockdict.items()} + blockdict = {key.hex(): value.hex() for key, value in blockdict.items()} if channels: update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") else: @@ -483,7 +483,7 @@ def extract_doc(doc, index): channel_hash = doc.pop('channel_hash') doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash channel_hash = doc.pop('censoring_channel_hash') - doc['censoring_channel_hash'] = channel_hash[::-1].hex() if channel_hash else channel_hash + doc['censoring_channel_hash'] = channel_hash.hex() if channel_hash else channel_hash # txo_hash = doc.pop('txo_hash') # doc['tx_id'] = txo_hash[:32][::-1].hex() # doc['tx_nout'] = struct.unpack(' typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: @@ -435,6 +462,31 @@ class LevelDB: count += 1 return count + def reload_blocking_filtering_streams(self): + self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.blocking_channel_hashes) + self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes(self.filtering_channel_hashes) + + def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: bytes): + streams, channels = {}, {} + for reposter_channel_hash in reposter_channel_hashes: + reposts = self.get_reposts_in_channel(reposter_channel_hash) + for repost in reposts: + txo = self.get_claim_txo(repost) + if txo.name.startswith('@'): + channels[repost] = reposter_channel_hash + else: + streams[repost] = reposter_channel_hash + return streams, channels + + def get_reposts_in_channel(self, channel_hash): + reposts = set() + for value in self.db.iterator(prefix=Prefixes.channel_to_claim.pack_partial_key(channel_hash), include_key=False): + stream = Prefixes.channel_to_claim.unpack_value(value) + repost = self.get_repost(stream.claim_hash) + if repost: + reposts.add(repost) + return reposts + def get_channel_for_claim(self, claim_hash, tx_num, position) -> Optional[bytes]: return self.db.get(Prefixes.claim_to_channel.pack_key(claim_hash, tx_num, position)) @@ -542,18 +594,22 @@ class LevelDB: ) return if reposted_metadata: - reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags] - reposted_languages = [] if not reposted_metadata.is_stream else ( - [lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none'] - ) + meta = reposted_metadata.stream if reposted_metadata.is_stream else reposted_metadata.channel + reposted_tags = [tag for tag in meta.tags] + reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none'] reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type] - claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags] - claim_languages = [] if not metadata.is_stream else ( - [lang.language or 'none' for lang in metadata.stream.languages] or ['none'] - ) + lang_tags = metadata.stream if metadata.is_stream else metadata.channel if metadata.is_channel else metadata.repost + claim_tags = [tag for tag in lang_tags.tags] + claim_languages = [lang.language or 'none' for lang in lang_tags.languages] or ['none'] tags = list(set(claim_tags).union(set(reposted_tags))) languages = list(set(claim_languages).union(set(reposted_languages))) + blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get( + reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get( + reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash) + filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get( + reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get( + reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash) value = { 'claim_hash': claim_hash[::-1], # 'claim_id': claim_hash.hex(), @@ -603,8 +659,8 @@ class LevelDB: 'signature_valid': claim.signature_valid, 'tags': tags, 'languages': languages, - 'censor_type': 0, # TODO: fix - 'censoring_channel_hash': None, # TODO: fix + 'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED, + 'censoring_channel_hash': blocked_hash or filtered_hash or None, 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash) # 'trending_group': 0, # 'trending_mixed': 0, diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index f40b07c39..31acb6c21 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -21,7 +21,7 @@ from elasticsearch import ConnectionTimeout from prometheus_client import Counter, Info, Histogram, Gauge import lbry -from lbry.error import TooManyClaimSearchParametersError +from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.schema.result import Outputs from lbry.wallet.server.block_processor import BlockProcessor @@ -997,7 +997,13 @@ class LBRYElectrumX(SessionBase): self.session_mgr.urls_to_resolve_count_metric.inc() stream, channel = await self.db.fs_resolve(url) self.session_mgr.resolved_url_count_metric.inc() - if channel and not stream: + if isinstance(channel, ResolveCensoredError): + rows.append(channel) + extra.append(channel.censor_row) + elif isinstance(stream, ResolveCensoredError): + rows.append(stream) + extra.append(stream.censor_row) + elif channel and not stream: rows.append(channel) # print("resolved channel", channel.name.decode()) elif stream: diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 2e7b36cea..f60fcb455 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -1492,12 +1492,10 @@ class StreamCommands(ClaimTestCase): filtering_channel_id = self.get_claim_id( await self.channel_create('@filtering', '0.1') ) - # self.conductor.spv_node.server.db.sql.filtering_channel_hashes.add( - # unhexlify(filtering_channel_id)[::-1] - # ) - self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.filtered_streams)) + self.conductor.spv_node.server.db.filtering_channel_hashes.add(bytes.fromhex(filtering_channel_id)) + self.assertEqual(0, len(self.conductor.spv_node.server.db.filtered_streams)) await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering') - self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.filtered_streams)) + self.assertEqual(1, len(self.conductor.spv_node.server.db.filtered_streams)) # search for filtered content directly result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content')) @@ -1539,12 +1537,10 @@ class StreamCommands(ClaimTestCase): blocking_channel_id = self.get_claim_id( await self.channel_create('@blocking', '0.1') ) - self.conductor.spv_node.server.db.sql.blocking_channel_hashes.add( - unhexlify(blocking_channel_id)[::-1] - ) - self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.blocked_streams)) + self.conductor.spv_node.server.db.blocking_channel_hashes.add(bytes.fromhex(blocking_channel_id)) + self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_streams)) await self.stream_repost(bad_content_id, 'block1', '0.1', channel_name='@blocking') - self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.blocked_streams)) + self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_streams)) # blocked content is not resolveable error = (await self.resolve('lbry://@some_channel/bad_content'))['error'] @@ -1567,9 +1563,9 @@ class StreamCommands(ClaimTestCase): self.assertEqual('@bad_channel', result['items'][1]['name']) # filter channel out - self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.filtered_channels)) + self.assertEqual(0, len(self.conductor.spv_node.server.db.filtered_channels)) await self.stream_repost(bad_channel_id, 'filter2', '0.1', channel_name='@filtering') - self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.filtered_channels)) + self.assertEqual(1, len(self.conductor.spv_node.server.db.filtered_channels)) # same claim search as previous now returns 0 results result = await self.out(self.daemon.jsonrpc_claim_search(any_tags=['bad-stuff'], order_by=['height'])) @@ -1594,9 +1590,9 @@ class StreamCommands(ClaimTestCase): self.assertEqual(worse_content_id, result['claim_id']) # block channel - self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.blocked_channels)) + self.assertEqual(0, len(self.conductor.spv_node.server.db.blocked_channels)) await self.stream_repost(bad_channel_id, 'block2', '0.1', channel_name='@blocking') - self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.blocked_channels)) + self.assertEqual(1, len(self.conductor.spv_node.server.db.blocked_channels)) # channel, claim in channel or claim individually no longer resolve self.assertEqual((await self.resolve('lbry://@bad_channel'))['error']['name'], 'BLOCKED')