From 2cd7ea257c860ff550827edeef7d4d0d2b5d1373 Mon Sep 17 00:00:00 2001 From: Lex Berezhny <lex@damoti.com> Date: Wed, 22 Jan 2020 01:55:37 -0500 Subject: [PATCH] Added support to differentiate between filtering and blocking for content censoring --- lbry/schema/result.py | 74 ++--- lbry/schema/types/v2/result_pb2.py | 32 +- lbry/wallet/server/db/reader.py | 85 ++++-- lbry/wallet/server/db/writer.py | 67 ++-- lbry/wallet/server/session.py | 9 +- .../blockchain/test_claim_commands.py | 12 +- tests/unit/wallet/server/test_sqldb.py | 287 +++++++++++++----- 7 files changed, 355 insertions(+), 211 deletions(-) diff --git a/lbry/schema/result.py b/lbry/schema/result.py index b9069a74e..b9d47d3fe 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -1,6 +1,6 @@ import base64 import struct -from typing import List, Optional, Tuple +from typing import List from binascii import hexlify from itertools import chain @@ -9,50 +9,36 @@ from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage class Censor: - def __init__(self, claim_ids: dict = None, channel_ids: set = None, tags: set = None): - self.claim_ids = claim_ids or {} - self.channel_ids = channel_ids or set() - self.tags = tags or set() - self.blocked_claims = {} - self.blocked_channels = {} - self.blocked_tags = {} + __slots__ = 'streams', 'channels', 'censored', 'total' + + def __init__(self, streams: dict = None, channels: dict = None): + self.streams = streams or {} + self.channels = channels or {} + self.censored = {} self.total = 0 def censor(self, row) -> bool: - censored = False - if row['claim_hash'] in self.claim_ids: - censored = True - channel_id = self.claim_ids[row['claim_hash']] - self.blocked_claims.setdefault(channel_id, 0) - self.blocked_claims[channel_id] += 1 - if row['channel_hash'] in self.channel_ids: - censored = True - self.blocked_channels.setdefault(row['channel_hash'], 0) - self.blocked_channels[row['channel_hash']] += 1 - if self.tags.intersection(row['tags']): - censored = True - for tag in self.tags: - if tag in row['tags']: - self.blocked_tags.setdefault(tag, 0) - self.blocked_tags[tag] += 1 - if censored: + was_censored = False + for claim_hash, lookup in ( + (row['claim_hash'], self.streams), + (row['claim_hash'], self.channels), + (row['channel_hash'], self.channels)): + censoring_channel_hash = lookup.get(claim_hash) + if censoring_channel_hash: + was_censored = True + self.censored.setdefault(censoring_channel_hash, 0) + self.censored[censoring_channel_hash] += 1 + break + if was_censored: self.total += 1 - return censored + return was_censored def to_message(self, outputs: OutputsMessage): outputs.blocked_total = self.total - for channel_hash, count in self.blocked_claims.items(): + for censoring_channel_hash, count in self.censored.items(): block = outputs.blocked.add() block.count = count - block.reposted_in_channel = channel_hash - for channel_hash, count in self.blocked_channels.items(): - block = outputs.blocked.add() - block.count = count - block.in_channel = channel_hash - for tag, count in self.blocked_tags.items(): - block = outputs.blocked.add() - block.count = count - block.has_tag = tag + block.channel_hash = censoring_channel_hash class Outputs: @@ -77,15 +63,13 @@ class Outputs: return txos, self.inflate_blocked() def inflate_blocked(self): - result = {"total": self.blocked_total} - for blocked_message in self.blocked: - reason = blocked_message.WhichOneof('reason') - if reason == "has_tag": - key = blocked_message.has_tag - else: - key = hexlify(getattr(blocked_message, reason)[::-1]).decode() - result.setdefault(reason, {})[key] = blocked_message.count - return result + return { + "total": self.blocked_total, + "channels": { + hexlify(message.channel_hash[::-1]).decode(): message.count + for message in self.blocked + } + } def message_to_txo(self, txo_message, tx_map): if txo_message.WhichOneof('meta') == 'error': diff --git a/lbry/schema/types/v2/result_pb2.py b/lbry/schema/types/v2/result_pb2.py index d5962e1b3..a2e15acd5 100644 --- a/lbry/schema/types/v2/result_pb2.py +++ b/lbry/schema/types/v2/result_pb2.py @@ -19,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='result.proto', package='pb', syntax='proto3', - serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"\x97\x01\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\r\n\x05total\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\r\x12\x1c\n\x07\x62locked\x18\x05 \x03(\x0b\x32\x0b.pb.Blocked\x12\x15\n\rblocked_total\x18\x06 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"\x94\x01\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x1c\n\x07\x62locked\x18\x03 \x01(\x0b\x32\x0b.pb.Blocked\"A\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x12\x0b\n\x07\x42LOCKED\x10\x03\"j\n\x07\x42locked\x12\r\n\x05\x63ount\x18\x01 \x01(\r\x12\x1d\n\x13reposted_in_channel\x18\x02 \x01(\x0cH\x00\x12\x14\n\nin_channel\x18\x03 \x01(\x0cH\x00\x12\x11\n\x07has_tag\x18\x04 \x01(\tH\x00\x42\x08\n\x06reasonb\x06proto3') + serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"\x97\x01\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\r\n\x05total\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\r\x12\x1c\n\x07\x62locked\x18\x05 \x03(\x0b\x32\x0b.pb.Blocked\x12\x15\n\rblocked_total\x18\x06 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"\x94\x01\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x1c\n\x07\x62locked\x18\x03 \x01(\x0b\x32\x0b.pb.Blocked\"A\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x12\x0b\n\x07\x42LOCKED\x10\x03\".\n\x07\x42locked\x12\r\n\x05\x63ount\x18\x01 \x01(\r\x12\x14\n\x0c\x63hannel_hash\x18\x02 \x01(\x0c\x62\x06proto3') ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -388,26 +388,12 @@ _BLOCKED = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='reposted_in_channel', full_name='pb.Blocked.reposted_in_channel', index=1, + name='channel_hash', full_name='pb.Blocked.channel_hash', index=1, number=2, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), - _descriptor.FieldDescriptor( - name='in_channel', full_name='pb.Blocked.in_channel', index=2, - number=3, type=12, cpp_type=9, label=1, - has_default_value=False, default_value=_b(""), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='has_tag', full_name='pb.Blocked.has_tag', index=3, - number=4, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), ], extensions=[ ], @@ -419,12 +405,9 @@ _BLOCKED = _descriptor.Descriptor( syntax='proto3', extension_ranges=[], oneofs=[ - _descriptor.OneofDescriptor( - name='reason', full_name='pb.Blocked.reason', - index=0, containing_type=None, fields=[]), ], serialized_start=884, - serialized_end=990, + serialized_end=930, ) _OUTPUTS.fields_by_name['txos'].message_type = _OUTPUT @@ -443,15 +426,6 @@ _CLAIMMETA.fields_by_name['repost'].message_type = _OUTPUT _ERROR.fields_by_name['code'].enum_type = _ERROR_CODE _ERROR.fields_by_name['blocked'].message_type = _BLOCKED _ERROR_CODE.containing_type = _ERROR -_BLOCKED.oneofs_by_name['reason'].fields.append( - _BLOCKED.fields_by_name['reposted_in_channel']) -_BLOCKED.fields_by_name['reposted_in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason'] -_BLOCKED.oneofs_by_name['reason'].fields.append( - _BLOCKED.fields_by_name['in_channel']) -_BLOCKED.fields_by_name['in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason'] -_BLOCKED.oneofs_by_name['reason'].fields.append( - _BLOCKED.fields_by_name['has_tag']) -_BLOCKED.fields_by_name['has_tag'].containing_oneof = _BLOCKED.oneofs_by_name['reason'] DESCRIPTOR.message_types_by_name['Outputs'] = _OUTPUTS DESCRIPTOR.message_types_by_name['Output'] = _OUTPUT DESCRIPTOR.message_types_by_name['ClaimMeta'] = _CLAIMMETA diff --git a/lbry/wallet/server/db/reader.py b/lbry/wallet/server/db/reader.py index 4447632d0..a8daf229c 100644 --- a/lbry/wallet/server/db/reader.py +++ b/lbry/wallet/server/db/reader.py @@ -70,7 +70,10 @@ class ReaderState: ledger: Type[Ledger] query_timeout: float log: logging.Logger - blocked_claims: Dict + blocked_streams: Dict + blocked_channels: Dict + filtered_streams: Dict + filtered_channels: Dict def close(self): self.db.close() @@ -89,6 +92,12 @@ class ReaderState: self.db.setprogresshandler(interruptor, 100) + def get_resolve_censor(self) -> Censor: + return Censor(self.blocked_streams, self.blocked_channels) + + def get_search_censor(self) -> Censor: + return Censor(self.filtered_streams, self.filtered_channels) + ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') @@ -100,15 +109,20 @@ def row_factory(cursor, row): } -def initializer(log, _path, _ledger_name, query_timeout, _measure=False, blocked_claims=None): +def initializer(log, _path, _ledger_name, query_timeout, _measure=False, block_and_filter=None): db = apsw.Connection(_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) db.setrowtrace(row_factory) + if block_and_filter: + blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter + else: + blocked_streams = blocked_channels = filtered_streams = filtered_channels = {} ctx.set( ReaderState( db=db, stack=[], metrics={}, is_tracking_metrics=_measure, ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger, query_timeout=query_timeout, log=log, - blocked_claims={} if blocked_claims is None else blocked_claims + blocked_streams=blocked_streams, blocked_channels=blocked_channels, + filtered_streams=filtered_streams, filtered_channels=filtered_channels, ) ) @@ -199,7 +213,7 @@ def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor) raise SQLiteOperationalError(context.metrics) -def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]: +def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]: if 'order_by' in constraints: order_by_parts = constraints['order_by'] if isinstance(order_by_parts, str): @@ -270,6 +284,19 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]: constraints['claim.channel_hash__in'] = [ unhexlify(cid)[::-1] for cid in channel_ids ] + if 'not_channel_ids' in constraints: + not_channel_ids = constraints.pop('not_channel_ids') + if not_channel_ids: + not_channel_ids_binary = [ + unhexlify(ncid)[::-1] for ncid in not_channel_ids + ] + if constraints.get('has_channel_signature', False): + constraints['claim.channel_hash__not_in'] = not_channel_ids_binary + else: + constraints['null_or_not_channel__or'] = { + 'claim.signature_valid__is_null': True, + 'claim.channel_hash__not_in': not_channel_ids_binary + } if 'signature_valid' in constraints: has_channel_signature = constraints.pop('has_channel_signature', False) if has_channel_signature: @@ -318,36 +345,32 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]: return query(select, **constraints) -def get_claims(cols, for_count=False, **constraints) -> Tuple[List, Censor]: +def select_claims(censor: Censor, cols: str, for_count=False, **constraints) -> List: if 'channel' in constraints: channel_url = constraints.pop('channel') match = resolve_url(channel_url) if isinstance(match, dict): constraints['channel_hash'] = match['claim_hash'] else: - return ([{'row_count': 0}] if cols == 'count(*) as row_count' else []), Censor() - censor = Censor( - ctx.get().blocked_claims, - {unhexlify(ncid)[::-1] for ncid in constraints.pop('not_channel_ids', [])}, - set(clean_tags(constraints.pop('not_tags', {}))) - ) + return [{'row_count': 0}] if cols == 'count(*) as row_count' else [] row_offset = constraints.pop('offset', 0) row_limit = constraints.pop('limit', 20) - sql, values = _get_claims(cols, for_count, **constraints) - return execute_query(sql, values, row_offset, row_limit, censor), censor + sql, values = claims_query(cols, for_count, **constraints) + return execute_query(sql, values, row_offset, row_limit, censor) @measure -def get_claims_count(**constraints) -> int: +def count_claims(**constraints) -> int: constraints.pop('offset', None) constraints.pop('limit', None) constraints.pop('order_by', None) - count, _ = get_claims('count(*) as row_count', for_count=True, **constraints) + count = select_claims(Censor(), 'count(*) as row_count', for_count=True, **constraints) return count[0]['row_count'] -def _search(**constraints) -> Tuple[List, Censor]: - return get_claims( +def search_claims(censor: Censor, **constraints) -> List: + return select_claims( + censor, """ claimtrie.claim_hash as is_controlling, claimtrie.last_take_over_height, @@ -360,24 +383,23 @@ def _search(**constraints) -> Tuple[List, Censor]: claim.trending_local, claim.trending_global, claim.short_url, claim.canonical_url, claim.channel_hash, claim.reposted_claim_hash, - claim.signature_valid, - COALESCE((SELECT group_concat(tag) FROM tag WHERE tag.claim_hash = claim.claim_hash), "") as tags + claim.signature_valid """, **constraints ) -def _get_referenced_rows(txo_rows: List[dict]): +def _get_referenced_rows(censor: Censor, txo_rows: List[dict]): repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows))) channel_hashes = set(filter(None, map(itemgetter('channel_hash'), txo_rows))) reposted_txos = [] if repost_hashes: - reposted_txos, _ = _search(**{'claim.claim_hash__in': repost_hashes}) + reposted_txos = search_claims(censor, **{'claim.claim_hash__in': repost_hashes}) channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos))) channel_txos = [] if channel_hashes: - channel_txos, _ = _search(**{'claim.claim_hash__in': channel_hashes}) + channel_txos = search_claims(censor, **{'claim.claim_hash__in': channel_hashes}) # channels must come first for client side inflation to work properly return channel_txos + reposted_txos @@ -389,25 +411,30 @@ def search(constraints) -> Tuple[List, List, int, int, Censor]: f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" total = None if not constraints.pop('no_totals', False): - total = get_claims_count(**constraints) + total = count_claims(**constraints) constraints['offset'] = abs(constraints.get('offset', 0)) constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) if 'order_by' not in constraints: constraints['order_by'] = ["claim_hash"] - txo_rows, censor = _search(**constraints) - extra_txo_rows = _get_referenced_rows(txo_rows) - return txo_rows, extra_txo_rows, constraints['offset'], total, censor + context = ctx.get() + search_censor = context.get_search_censor() + txo_rows = search_claims(search_censor, **constraints) + extra_txo_rows = _get_referenced_rows(context.get_resolve_censor(), txo_rows) + return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor @measure def resolve(urls) -> Tuple[List, List]: txo_rows = [resolve_url(raw_url) for raw_url in urls] - extra_txo_rows = _get_referenced_rows([r for r in txo_rows if isinstance(r, dict)]) + extra_txo_rows = _get_referenced_rows( + ctx.get().get_resolve_censor(), [r for r in txo_rows if isinstance(r, dict)] + ) return txo_rows, extra_txo_rows @measure def resolve_url(raw_url): + censor = ctx.get().get_resolve_censor() try: url = URL.parse(raw_url) @@ -422,7 +449,7 @@ def resolve_url(raw_url): query['is_controlling'] = True else: query['order_by'] = ['^creation_height'] - matches, _ = _search(**query, limit=1) + matches = search_claims(censor, **query, limit=1) if matches: channel = matches[0] else: @@ -440,7 +467,7 @@ def resolve_url(raw_url): query['signature_valid'] = 1 elif set(query) == {'name'}: query['is_controlling'] = 1 - matches, _ = _search(**query, limit=1) + matches = search_claims(censor, **query, limit=1) if matches: return matches[0] else: diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index a86c5dcb6..b7a8672d1 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -141,6 +141,8 @@ class SQLDB: create unique index if not exists claim_type_release_idx on claim (claim_type, release_time, claim_hash); create unique index if not exists claim_type_effective_amount_idx on claim (claim_type, effective_amount, claim_hash); + create unique index if not exists channel_hash_release_time_idx on claim (channel_hash, release_time, claim_hash); + -- TODO: verify that all indexes below are used create index if not exists claim_height_normalized_idx on claim (height, normalized asc); @@ -170,15 +172,21 @@ class SQLDB: CREATE_TAG_TABLE ) - def __init__(self, main, path: str, filtering_channels: list): + def __init__(self, main, path: str, blocking_channels: list, filtering_channels: list): self.main = main self._db_path = path self.db = None - self.state_manager = None - self.blocked_claims = None self.logger = class_logger(__name__, self.__class__.__name__) self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger self._fts_synced = False + self.state_manager = None + self.blocked_streams = None + self.blocked_channels = None + self.blocking_channel_hashes = { + unhexlify(channel_id)[::-1] for channel_id in blocking_channels if channel_id + } + self.filtered_streams = None + self.filtered_channels = None self.filtering_channel_hashes = { unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id } @@ -202,8 +210,11 @@ class SQLDB: register_canonical_functions(self.db) register_trending_functions(self.db) self.state_manager = Manager() - self.blocked_claims = self.state_manager.dict() - self.update_blocked_claims() + self.blocked_streams = self.state_manager.dict() + self.blocked_channels = self.state_manager.dict() + self.filtered_streams = self.state_manager.dict() + self.filtered_channels = self.state_manager.dict() + self.update_blocked_and_filtered_claims() def close(self): if self.db is not None: @@ -211,17 +222,34 @@ class SQLDB: if self.state_manager is not None: self.state_manager.shutdown() - def update_blocked_claims(self): - sql = query( - "SELECT channel_hash, reposted_claim_hash FROM claim", - reposted_claim_hash__is_not_null=1, - channel_hash__in=self.filtering_channel_hashes + def update_blocked_and_filtered_claims(self): + self.update_claims_from_channel_hashes( + self.blocked_streams, self.blocked_channels, self.blocking_channel_hashes ) - blocked_claims = {} + self.update_claims_from_channel_hashes( + self.filtered_streams, self.filtered_channels, self.filtering_channel_hashes + ) + self.filtered_streams.update(self.blocked_streams) + self.filtered_channels.update(self.blocked_channels) + + def update_claims_from_channel_hashes(self, shared_streams, shared_channels, channel_hashes): + sql = query( + "SELECT claim.channel_hash, claim.reposted_claim_hash, reposted.claim_type " + "FROM claim JOIN claim AS reposted ON (reposted.claim_hash=claim.reposted_claim_hash)", **{ + 'claim.reposted_claim_hash__is_not_null': 1, + 'claim.channel_hash__in': channel_hashes + } + ) + streams, channels = {}, {} for blocked_claim in self.execute(*sql): - blocked_claims[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash - self.blocked_claims.clear() - self.blocked_claims.update(blocked_claims) + if blocked_claim.claim_type == CLAIM_TYPES['stream']: + streams[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash + elif blocked_claim.claim_type == CLAIM_TYPES['channel']: + channels[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash + shared_streams.clear() + shared_streams.update(streams) + shared_channels.clear() + shared_channels.update(channels) @staticmethod def _insert_sql(table: str, data: dict) -> Tuple[str, list]: @@ -613,8 +641,9 @@ class SQLDB: sub_timer = timer.add_timer('update blocked claims list') sub_timer.start() - if self.filtering_channel_hashes.intersection(all_channel_keys): - self.update_blocked_claims() + if (self.blocking_channel_hashes.intersection(all_channel_keys) or + self.filtering_channel_hashes.intersection(all_channel_keys)): + self.update_blocked_and_filtered_claims() sub_timer.stop() def _update_support_amount(self, claim_hashes): @@ -816,7 +845,11 @@ class LBRYLevelDB(LevelDB): super().__init__(*args, **kwargs) path = os.path.join(self.env.db_dir, 'claims.db') # space separated list of channel URIs used for filtering bad content - self.sql = SQLDB(self, path, self.env.default('FILTERING_CHANNELS_IDS', '').split(' ')) + self.sql = SQLDB( + self, path, + self.env.default('BLOCKING_CHANNELS_IDS', '').split(' '), + self.env.default('FILTERING_CHANNELS_IDS', '').split(' '), + ) def close(self): super().close() diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index cf4eb7ce6..367ba3a40 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -745,8 +745,13 @@ class LBRYSessionManager(SessionManager): path = os.path.join(self.env.db_dir, 'claims.db') args = dict( initializer=reader.initializer, - initargs=(self.logger, path, self.env.coin.NET, self.env.database_query_timeout, - self.env.track_metrics, self.db.sql.blocked_claims) + initargs=( + self.logger, path, self.env.coin.NET, self.env.database_query_timeout, + self.env.track_metrics, ( + self.db.sql.blocked_streams, self.db.sql.blocked_channels, + self.db.sql.filtered_streams, self.db.sql.filtered_channels + ) + ) ) if self.env.max_query_workers is not None and self.env.max_query_workers == 0: self.query_executor = ThreadPoolExecutor(max_workers=1, **args) diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 78eb7d1fd..fd6bdefd9 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -767,21 +767,17 @@ class StreamCommands(ClaimTestCase): # search for blocked content directly result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content')) self.assertEqual([], result['items']) - self.assertEqual({"reposted_in_channel": {blocking_channel_id: 1}, "total": 1}, result['blocked']) + self.assertEqual({"channels": {blocking_channel_id: 1}, "total": 1}, result['blocked']) # search channel containing blocked content result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel')) self.assertEqual(1, len(result['items'])) - self.assertEqual({"reposted_in_channel": {blocking_channel_id: 1}, "total": 1}, result['blocked']) + self.assertEqual({"channels": {blocking_channel_id: 1}, "total": 1}, result['blocked']) - # search channel containing blocked content, also block tag + # content was filtered by not_tag before censoring result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel', not_tags=["good", "bad"])) self.assertEqual(0, len(result['items'])) - self.assertEqual({ - "reposted_in_channel": {blocking_channel_id: 1}, - "has_tag": {"good": 1, "bad": 1}, - "total": 2 - }, result['blocked']) + self.assertEqual({"channels": {}, "total": 0}, result['blocked']) async def test_publish_updates_file_list(self): tx = await self.stream_create(title='created') diff --git a/tests/unit/wallet/server/test_sqldb.py b/tests/unit/wallet/server/test_sqldb.py index 6054111fe..52921514e 100644 --- a/tests/unit/wallet/server/test_sqldb.py +++ b/tests/unit/wallet/server/test_sqldb.py @@ -3,9 +3,11 @@ import ecdsa import hashlib import logging from binascii import hexlify +from typing import List, Tuple from lbry.wallet.constants import COIN, NULL_HASH32 from lbry.schema.claim import Claim +from lbry.schema.result import Censor from lbry.wallet.server.db import reader, writer from lbry.wallet.server.coin import LBCRegTest from lbry.wallet.server.db.trending import TRENDING_WINDOW @@ -28,6 +30,15 @@ def get_tx(): return Transaction().add_inputs([get_input()]) +def search(**constraints) -> List: + return reader.search_claims(Censor(), **constraints) + + +def censored_search(**constraints) -> Tuple[List, Censor]: + rows, _, _, _, censor = reader.search(constraints) + return rows, censor + + class TestSQLDB(unittest.TestCase): query_timeout = 0.25 @@ -36,12 +47,15 @@ class TestSQLDB(unittest.TestCase): self.daemon_height = 1 self.coin = LBCRegTest() db_url = 'file:test_sqldb?mode=memory&cache=shared' - self.sql = writer.SQLDB(self, db_url, []) + self.sql = writer.SQLDB(self, db_url, [], []) self.addCleanup(self.sql.close) self.sql.open() reader.initializer( logging.getLogger(__name__), db_url, 'regtest', - self.query_timeout, blocked_claims=self.sql.blocked_claims + self.query_timeout, block_and_filter=( + self.sql.blocked_streams, self.sql.blocked_channels, + self.sql.filtered_streams, self.sql.filtered_channels + ) ) self.addCleanup(reader.cleanup) self.timer = Timer('BlockProcessor') @@ -330,7 +344,7 @@ class TestClaimtrie(TestSQLDB): advance, state = self.advance, self.state stream = self.get_stream('Claim A', 10*COIN) advance(10, [stream, self.get_stream_update(stream, 11*COIN)]) - self.assertTrue(reader._search()[0]) + self.assertTrue(search()[0]) def test_double_updates_in_same_block(self): advance, state = self.advance, self.state @@ -338,13 +352,13 @@ class TestClaimtrie(TestSQLDB): advance(10, [stream]) update = self.get_stream_update(stream, 11*COIN) advance(20, [update, self.get_stream_update(update, 9*COIN)]) - self.assertTrue(reader._search()[0]) + self.assertTrue(search()[0]) def test_create_and_abandon_in_same_block(self): advance, state = self.advance, self.state stream = self.get_stream('Claim A', 10*COIN) advance(10, [stream, self.get_abandon(stream)]) - self.assertFalse(reader._search()[0]) + self.assertFalse(search()) def test_update_and_abandon_in_same_block(self): advance, state = self.advance, self.state @@ -352,14 +366,14 @@ class TestClaimtrie(TestSQLDB): advance(10, [stream]) update = self.get_stream_update(stream, 11*COIN) advance(20, [update, self.get_abandon(update)]) - self.assertFalse(reader._search()[0]) + self.assertFalse(search()) def test_create_update_and_delete_in_same_block(self): advance, state = self.advance, self.state stream = self.get_stream('Claim A', 10*COIN) update = self.get_stream_update(stream, 11*COIN) advance(10, [stream, update, self.get_abandon(update)]) - self.assertFalse(reader._search()[0]) + self.assertFalse(search()) def test_support_added_and_removed_in_same_block(self): advance, state = self.advance, self.state @@ -367,7 +381,7 @@ class TestClaimtrie(TestSQLDB): advance(10, [stream]) support = self.get_support(stream, COIN) advance(20, [support, self.get_abandon(support)]) - self.assertEqual(reader._search()[0][0]['support_amount'], 0) + self.assertEqual(search()[0]['support_amount'], 0) @staticmethod def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs): @@ -396,7 +410,7 @@ class TestClaimtrie(TestSQLDB): txo_chan_ab = tx_chan_ab[0].outputs[0] advance(1, [tx_chan_a]) advance(2, [tx_chan_ab]) - (r_ab, r_a), _ = reader._search(order_by=['creation_height'], limit=2) + (r_ab, r_a) = search(order_by=['creation_height'], limit=2) self.assertEqual("@foo#a", r_a['short_url']) self.assertEqual("@foo#ab", r_ab['short_url']) self.assertIsNone(r_a['canonical_url']) @@ -409,7 +423,7 @@ class TestClaimtrie(TestSQLDB): tx_abc = self.get_stream_with_claim_id_prefix('abc', 65) advance(3, [tx_a]) advance(4, [tx_ab, tx_abc]) - (r_abc, r_ab, r_a), _ = reader._search(order_by=['creation_height', 'tx_position'], limit=3) + (r_abc, r_ab, r_a) = search(order_by=['creation_height', 'tx_position'], limit=3) self.assertEqual("foo#a", r_a['short_url']) self.assertEqual("foo#ab", r_ab['short_url']) self.assertEqual("foo#abc", r_abc['short_url']) @@ -423,51 +437,51 @@ class TestClaimtrie(TestSQLDB): ab2_claim = tx_ab2[0].outputs[0] advance(6, [tx_a2]) advance(7, [tx_ab2]) - (r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) + (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) + self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) # change channel public key, invaliding stream claim signatures advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')]) - (r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) + (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) self.assertIsNone(r_a2['canonical_url']) self.assertIsNone(r_ab2['canonical_url']) - self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) + self.assertEqual(0, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) # reinstate previous channel public key (previous stream claim signatures become valid again) channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c') advance(9, [channel_update]) - (r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) + (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) - self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel']) + self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) # change channel of stream - self.assertEqual("@foo#a/foo#ab", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url']) + self.assertEqual("@foo#a/foo#ab", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url']) tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab) advance(10, [tx_ab2]) - self.assertEqual("@foo#ab/foo#a", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url']) + self.assertEqual("@foo#ab/foo#a", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url']) # TODO: currently there is a bug where stream leaving a channel does not update that channels claims count - self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) + self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) # TODO: after bug is fixed remove test above and add test below - #self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) - self.assertEqual(1, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel']) + #self.assertEqual(1, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(1, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) # claim abandon updates claims_in_channel advance(11, [self.get_abandon(tx_ab2)]) - self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel']) + self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) # delete channel, invaliding stream claim signatures advance(12, [self.get_abandon(channel_update)]) - (r_a2,), _ = reader._search(order_by=['creation_height'], limit=1) + (r_a2,) = search(order_by=['creation_height'], limit=1) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertIsNone(r_a2['canonical_url']) @@ -525,7 +539,7 @@ class TestTrending(TestSQLDB): self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN), self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN), ]) - results, _ = reader._search(order_by=['trending_local']) + results = search(order_by=['trending_local']) self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results]) self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results]) self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results]) @@ -541,73 +555,180 @@ class TestTrending(TestSQLDB): class TestContentBlocking(TestSQLDB): - def test_blocking(self): - tx0 = self.get_channel('A Channel', COIN) - a_channel = tx0[0].outputs[0] - tx1 = self.get_stream('Claim One', COIN) - tx2 = self.get_stream('Claim Two', COIN, tags=["mature"], channel=a_channel) - self.advance(1, [tx0, tx1, tx2]) - claim1, claim2 = tx1[0].outputs[0], tx2[0].outputs[0] + def test_blocking_and_filtering(self): + # content claims and channels + tx0 = self.get_channel('A Channel', COIN, '@channel1') + regular_channel = tx0[0].outputs[0] + tx1 = self.get_stream('Claim One', COIN, 'claim1') + tx2 = self.get_stream('Claim Two', COIN, 'claim2', regular_channel) + tx3 = self.get_stream('Claim Three', COIN, 'claim3') + self.advance(1, [tx0, tx1, tx2, tx3]) + claim1, claim2, claim3 = tx1[0].outputs[0], tx2[0].outputs[0], tx3[0].outputs[0] + + # block and filter channels + tx0 = self.get_channel('Blocking Channel', COIN, '@block') + tx1 = self.get_channel('Filtering Channel', COIN, '@filter') + blocking_channel = tx0[0].outputs[0] + filtering_channel = tx1[0].outputs[0] + self.sql.blocking_channel_hashes.add(blocking_channel.claim_hash) + self.sql.filtering_channel_hashes.add(filtering_channel.claim_hash) + self.advance(2, [tx0, tx1]) + self.assertEqual({}, dict(self.sql.blocked_streams)) + self.assertEqual({}, dict(self.sql.blocked_channels)) + self.assertEqual({}, dict(self.sql.filtered_streams)) + self.assertEqual({}, dict(self.sql.filtered_channels)) # nothing blocked - results, censor = reader._search(text='Claim') - self.assertEqual(2, len(results)) + results, _ = reader.resolve([ + claim1.claim_name, claim2.claim_name, + claim3.claim_name, regular_channel.claim_name + ]) + self.assertEqual(claim1.claim_hash, results[0]['claim_hash']) + self.assertEqual(claim2.claim_hash, results[1]['claim_hash']) + self.assertEqual(claim3.claim_hash, results[2]['claim_hash']) + self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash']) + + # nothing filtered + results, censor = censored_search() + self.assertEqual(6, len(results)) self.assertEqual(0, censor.total) - self.assertEqual({}, dict(self.sql.blocked_claims)) + self.assertEqual({}, censor.censored) - # block claim reposted to blocking channel - tx = self.get_channel('Blocking Channel', COIN) - channel = tx[0].outputs[0] - self.sql.filtering_channel_hashes.add(channel.claim_hash) - self.advance(2, [tx]) - self.assertEqual({}, dict(self.sql.blocked_claims)) - - tx = self.get_repost(claim1.claim_id, COIN, channel) - reposting_claim = tx[0].outputs[0] - self.advance(3, [tx]) + # block claim reposted to blocking channel, also gets filtered + repost_tx1 = self.get_repost(claim1.claim_id, COIN, blocking_channel) + repost1 = repost_tx1[0].outputs[0] + self.advance(3, [repost_tx1]) self.assertEqual( - {reposting_claim.claim.repost.reference.claim_hash: channel.claim_hash}, - dict(self.sql.blocked_claims) + {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.blocked_streams) + ) + self.assertEqual({}, dict(self.sql.blocked_channels)) + self.assertEqual( + {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.filtered_streams) + ) + self.assertEqual({}, dict(self.sql.filtered_channels)) + + # claim is blocked from results by direct repost + results, censor = censored_search(text='Claim') + self.assertEqual(2, len(results)) + self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) + self.assertEqual(claim3.claim_hash, results[1]['claim_hash']) + self.assertEqual(1, censor.total) + self.assertEqual({blocking_channel.claim_hash: 1}, censor.censored) + results, _ = reader.resolve([claim1.claim_name]) + self.assertEqual('Could not find stream in "claim1".', results[0].args[0]) + results, _ = reader.resolve([ + claim2.claim_name, regular_channel.claim_name # claim2 and channel still resolved + ]) + self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) + self.assertEqual(regular_channel.claim_hash, results[1]['claim_hash']) + + # block claim indirectly by blocking its parent channel + repost_tx2 = self.get_repost(regular_channel.claim_id, COIN, blocking_channel) + repost2 = repost_tx2[0].outputs[0] + self.advance(4, [repost_tx2]) + self.assertEqual( + {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.blocked_streams) + ) + self.assertEqual( + {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.blocked_channels) + ) + self.assertEqual( + {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.filtered_streams) + ) + self.assertEqual( + {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.filtered_channels) ) - # claim is blocked from results by repost - results, censor = reader._search(text='Claim') + # claim in blocked channel is filtered from search and can't resolve + results, censor = censored_search(text='Claim') self.assertEqual(1, len(results)) - self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) - self.assertEqual(1, censor.total) - self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims) - self.assertEqual({}, censor.blocked_channels) - self.assertEqual({}, censor.blocked_tags) - - # claim is blocked from results by repost and tags - results, censor = reader._search(text='Claim', not_tags=["mature"]) - self.assertEqual(0, len(results)) + self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) self.assertEqual(2, censor.total) - self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims) - self.assertEqual({}, censor.blocked_channels) - self.assertEqual({"mature": 1}, censor.blocked_tags) + self.assertEqual({blocking_channel.claim_hash: 2}, censor.censored) + results, _ = reader.resolve([ + claim2.claim_name, regular_channel.claim_name # claim2 and channel don't resolve + ]) + self.assertEqual('Could not find stream in "claim2".', results[0].args[0]) + self.assertEqual('Could not find channel in "@channel1".', results[1].args[0]) + results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved + self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) - # claim is blocked from results by repost and channel - results, censor = reader._search(text='Claim', not_channel_ids=[a_channel.claim_id]) + # filtered claim is only filtered and not blocked + repost_tx3 = self.get_repost(claim3.claim_id, COIN, filtering_channel) + repost3 = repost_tx3[0].outputs[0] + self.advance(5, [repost_tx3]) + self.assertEqual( + {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.blocked_streams) + ) + self.assertEqual( + {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.blocked_channels) + ) + self.assertEqual( + {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash, + repost3.claim.repost.reference.claim_hash: filtering_channel.claim_hash}, + dict(self.sql.filtered_streams) + ) + self.assertEqual( + {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, + dict(self.sql.filtered_channels) + ) + + # filtered claim doesn't return in search but is resolveable + results, censor = censored_search(text='Claim') self.assertEqual(0, len(results)) - self.assertEqual(2, censor.total) - self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims) - self.assertEqual({a_channel.claim_hash: 1}, censor.blocked_channels) - self.assertEqual({}, censor.blocked_tags) + self.assertEqual(3, censor.total) + self.assertEqual({blocking_channel.claim_hash: 2, filtering_channel.claim_hash: 1}, censor.censored) + results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved + self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) + + # abandon unblocks content + self.advance(6, [ + self.get_abandon(repost_tx1), + self.get_abandon(repost_tx2), + self.get_abandon(repost_tx3) + ]) + self.assertEqual({}, dict(self.sql.blocked_streams)) + self.assertEqual({}, dict(self.sql.blocked_channels)) + self.assertEqual({}, dict(self.sql.filtered_streams)) + self.assertEqual({}, dict(self.sql.filtered_channels)) + results, censor = censored_search(text='Claim') + self.assertEqual(3, len(results)) + self.assertEqual(0, censor.total) + results, censor = censored_search() + self.assertEqual(6, len(results)) + self.assertEqual(0, censor.total) + results, _ = reader.resolve([ + claim1.claim_name, claim2.claim_name, + claim3.claim_name, regular_channel.claim_name + ]) + self.assertEqual(claim1.claim_hash, results[0]['claim_hash']) + self.assertEqual(claim2.claim_hash, results[1]['claim_hash']) + self.assertEqual(claim3.claim_hash, results[2]['claim_hash']) + self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash']) def test_pagination(self): - one, two, three, four, five, six, seven = ( - self.advance(1, [self.get_stream('One', COIN, tags=["mature"])])[0], - self.advance(2, [self.get_stream('Two', COIN, tags=["mature"])])[0], - self.advance(3, [self.get_stream('Three', COIN)])[0], - self.advance(4, [self.get_stream('Four', COIN)])[0], - self.advance(5, [self.get_stream('Five', COIN)])[0], - self.advance(6, [self.get_stream('Six', COIN)])[0], - self.advance(7, [self.get_stream('Seven', COIN)])[0], - ) + one, two, three, four, five, six, seven, filter_channel = self.advance(1, [ + self.get_stream('One', COIN), + self.get_stream('Two', COIN), + self.get_stream('Three', COIN), + self.get_stream('Four', COIN), + self.get_stream('Five', COIN), + self.get_stream('Six', COIN), + self.get_stream('Seven', COIN), + self.get_channel('Filtering Channel', COIN, '@filter'), + ]) + self.sql.filtering_channel_hashes.add(filter_channel.claim_hash) - # nothing blocked - results, censor = reader._search(order_by='^height', offset=1, limit=3) + # nothing filtered + results, censor = censored_search(order_by='^height', offset=1, limit=3) self.assertEqual(3, len(results)) self.assertEqual( [two.claim_hash, three.claim_hash, four.claim_hash], @@ -615,12 +736,16 @@ class TestContentBlocking(TestSQLDB): ) self.assertEqual(0, censor.total) - # tags blocked - results, censor = reader._search(order_by='^height', not_tags=('mature',), offset=1, limit=3) + # content filtered + repost1, repost2 = self.advance(2, [ + self.get_repost(one.claim_id, COIN, filter_channel), + self.get_repost(two.claim_id, COIN, filter_channel), + ]) + results, censor = censored_search(order_by='^height', offset=1, limit=3) self.assertEqual(3, len(results)) self.assertEqual( [four.claim_hash, five.claim_hash, six.claim_hash], [r['claim_hash'] for r in results] ) self.assertEqual(2, censor.total) - self.assertEqual({"mature": 2}, censor.blocked_tags) + self.assertEqual({filter_channel.claim_hash: 2}, censor.censored)