Added support to differentiate between filtering and blocking for content censoring

This commit is contained in:
Lex Berezhny 2020-01-22 01:55:37 -05:00
parent 0a21b72f9c
commit 2cd7ea257c
7 changed files with 355 additions and 211 deletions
lbry
tests
integration/blockchain
unit/wallet/server

View file

@ -1,6 +1,6 @@
import base64 import base64
import struct import struct
from typing import List, Optional, Tuple from typing import List
from binascii import hexlify from binascii import hexlify
from itertools import chain from itertools import chain
@ -9,50 +9,36 @@ from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage
class Censor: class Censor:
def __init__(self, claim_ids: dict = None, channel_ids: set = None, tags: set = None): __slots__ = 'streams', 'channels', 'censored', 'total'
self.claim_ids = claim_ids or {}
self.channel_ids = channel_ids or set() def __init__(self, streams: dict = None, channels: dict = None):
self.tags = tags or set() self.streams = streams or {}
self.blocked_claims = {} self.channels = channels or {}
self.blocked_channels = {} self.censored = {}
self.blocked_tags = {}
self.total = 0 self.total = 0
def censor(self, row) -> bool: def censor(self, row) -> bool:
censored = False was_censored = False
if row['claim_hash'] in self.claim_ids: for claim_hash, lookup in (
censored = True (row['claim_hash'], self.streams),
channel_id = self.claim_ids[row['claim_hash']] (row['claim_hash'], self.channels),
self.blocked_claims.setdefault(channel_id, 0) (row['channel_hash'], self.channels)):
self.blocked_claims[channel_id] += 1 censoring_channel_hash = lookup.get(claim_hash)
if row['channel_hash'] in self.channel_ids: if censoring_channel_hash:
censored = True was_censored = True
self.blocked_channels.setdefault(row['channel_hash'], 0) self.censored.setdefault(censoring_channel_hash, 0)
self.blocked_channels[row['channel_hash']] += 1 self.censored[censoring_channel_hash] += 1
if self.tags.intersection(row['tags']): break
censored = True if was_censored:
for tag in self.tags:
if tag in row['tags']:
self.blocked_tags.setdefault(tag, 0)
self.blocked_tags[tag] += 1
if censored:
self.total += 1 self.total += 1
return censored return was_censored
def to_message(self, outputs: OutputsMessage): def to_message(self, outputs: OutputsMessage):
outputs.blocked_total = self.total outputs.blocked_total = self.total
for channel_hash, count in self.blocked_claims.items(): for censoring_channel_hash, count in self.censored.items():
block = outputs.blocked.add() block = outputs.blocked.add()
block.count = count block.count = count
block.reposted_in_channel = channel_hash block.channel_hash = censoring_channel_hash
for channel_hash, count in self.blocked_channels.items():
block = outputs.blocked.add()
block.count = count
block.in_channel = channel_hash
for tag, count in self.blocked_tags.items():
block = outputs.blocked.add()
block.count = count
block.has_tag = tag
class Outputs: class Outputs:
@ -77,15 +63,13 @@ class Outputs:
return txos, self.inflate_blocked() return txos, self.inflate_blocked()
def inflate_blocked(self): def inflate_blocked(self):
result = {"total": self.blocked_total} return {
for blocked_message in self.blocked: "total": self.blocked_total,
reason = blocked_message.WhichOneof('reason') "channels": {
if reason == "has_tag": hexlify(message.channel_hash[::-1]).decode(): message.count
key = blocked_message.has_tag for message in self.blocked
else: }
key = hexlify(getattr(blocked_message, reason)[::-1]).decode() }
result.setdefault(reason, {})[key] = blocked_message.count
return result
def message_to_txo(self, txo_message, tx_map): def message_to_txo(self, txo_message, tx_map):
if txo_message.WhichOneof('meta') == 'error': if txo_message.WhichOneof('meta') == 'error':

View file

@ -19,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='result.proto', name='result.proto',
package='pb', package='pb',
syntax='proto3', syntax='proto3',
serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"\x97\x01\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\r\n\x05total\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\r\x12\x1c\n\x07\x62locked\x18\x05 \x03(\x0b\x32\x0b.pb.Blocked\x12\x15\n\rblocked_total\x18\x06 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"\x94\x01\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x1c\n\x07\x62locked\x18\x03 \x01(\x0b\x32\x0b.pb.Blocked\"A\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x12\x0b\n\x07\x42LOCKED\x10\x03\"j\n\x07\x42locked\x12\r\n\x05\x63ount\x18\x01 \x01(\r\x12\x1d\n\x13reposted_in_channel\x18\x02 \x01(\x0cH\x00\x12\x14\n\nin_channel\x18\x03 \x01(\x0cH\x00\x12\x11\n\x07has_tag\x18\x04 \x01(\tH\x00\x42\x08\n\x06reasonb\x06proto3') serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"\x97\x01\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\r\n\x05total\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\r\x12\x1c\n\x07\x62locked\x18\x05 \x03(\x0b\x32\x0b.pb.Blocked\x12\x15\n\rblocked_total\x18\x06 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"\x94\x01\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x1c\n\x07\x62locked\x18\x03 \x01(\x0b\x32\x0b.pb.Blocked\"A\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x12\x0b\n\x07\x42LOCKED\x10\x03\".\n\x07\x42locked\x12\r\n\x05\x63ount\x18\x01 \x01(\r\x12\x14\n\x0c\x63hannel_hash\x18\x02 \x01(\x0c\x62\x06proto3')
) )
_sym_db.RegisterFileDescriptor(DESCRIPTOR) _sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -388,26 +388,12 @@ _BLOCKED = _descriptor.Descriptor(
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
options=None), options=None),
_descriptor.FieldDescriptor( _descriptor.FieldDescriptor(
name='reposted_in_channel', full_name='pb.Blocked.reposted_in_channel', index=1, name='channel_hash', full_name='pb.Blocked.channel_hash', index=1,
number=2, type=12, cpp_type=9, label=1, number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""), has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None, message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None, is_extension=False, extension_scope=None,
options=None), options=None),
_descriptor.FieldDescriptor(
name='in_channel', full_name='pb.Blocked.in_channel', index=2,
number=3, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='has_tag', full_name='pb.Blocked.has_tag', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
], ],
extensions=[ extensions=[
], ],
@ -419,12 +405,9 @@ _BLOCKED = _descriptor.Descriptor(
syntax='proto3', syntax='proto3',
extension_ranges=[], extension_ranges=[],
oneofs=[ oneofs=[
_descriptor.OneofDescriptor(
name='reason', full_name='pb.Blocked.reason',
index=0, containing_type=None, fields=[]),
], ],
serialized_start=884, serialized_start=884,
serialized_end=990, serialized_end=930,
) )
_OUTPUTS.fields_by_name['txos'].message_type = _OUTPUT _OUTPUTS.fields_by_name['txos'].message_type = _OUTPUT
@ -443,15 +426,6 @@ _CLAIMMETA.fields_by_name['repost'].message_type = _OUTPUT
_ERROR.fields_by_name['code'].enum_type = _ERROR_CODE _ERROR.fields_by_name['code'].enum_type = _ERROR_CODE
_ERROR.fields_by_name['blocked'].message_type = _BLOCKED _ERROR.fields_by_name['blocked'].message_type = _BLOCKED
_ERROR_CODE.containing_type = _ERROR _ERROR_CODE.containing_type = _ERROR
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['reposted_in_channel'])
_BLOCKED.fields_by_name['reposted_in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['in_channel'])
_BLOCKED.fields_by_name['in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['has_tag'])
_BLOCKED.fields_by_name['has_tag'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
DESCRIPTOR.message_types_by_name['Outputs'] = _OUTPUTS DESCRIPTOR.message_types_by_name['Outputs'] = _OUTPUTS
DESCRIPTOR.message_types_by_name['Output'] = _OUTPUT DESCRIPTOR.message_types_by_name['Output'] = _OUTPUT
DESCRIPTOR.message_types_by_name['ClaimMeta'] = _CLAIMMETA DESCRIPTOR.message_types_by_name['ClaimMeta'] = _CLAIMMETA

View file

@ -70,7 +70,10 @@ class ReaderState:
ledger: Type[Ledger] ledger: Type[Ledger]
query_timeout: float query_timeout: float
log: logging.Logger log: logging.Logger
blocked_claims: Dict blocked_streams: Dict
blocked_channels: Dict
filtered_streams: Dict
filtered_channels: Dict
def close(self): def close(self):
self.db.close() self.db.close()
@ -89,6 +92,12 @@ class ReaderState:
self.db.setprogresshandler(interruptor, 100) self.db.setprogresshandler(interruptor, 100)
def get_resolve_censor(self) -> Censor:
return Censor(self.blocked_streams, self.blocked_channels)
def get_search_censor(self) -> Censor:
return Censor(self.filtered_streams, self.filtered_channels)
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
@ -100,15 +109,20 @@ def row_factory(cursor, row):
} }
def initializer(log, _path, _ledger_name, query_timeout, _measure=False, blocked_claims=None): def initializer(log, _path, _ledger_name, query_timeout, _measure=False, block_and_filter=None):
db = apsw.Connection(_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) db = apsw.Connection(_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI)
db.setrowtrace(row_factory) db.setrowtrace(row_factory)
if block_and_filter:
blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter
else:
blocked_streams = blocked_channels = filtered_streams = filtered_channels = {}
ctx.set( ctx.set(
ReaderState( ReaderState(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure, db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger, ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger,
query_timeout=query_timeout, log=log, query_timeout=query_timeout, log=log,
blocked_claims={} if blocked_claims is None else blocked_claims blocked_streams=blocked_streams, blocked_channels=blocked_channels,
filtered_streams=filtered_streams, filtered_channels=filtered_channels,
) )
) )
@ -199,7 +213,7 @@ def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor)
raise SQLiteOperationalError(context.metrics) raise SQLiteOperationalError(context.metrics)
def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]: def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
if 'order_by' in constraints: if 'order_by' in constraints:
order_by_parts = constraints['order_by'] order_by_parts = constraints['order_by']
if isinstance(order_by_parts, str): if isinstance(order_by_parts, str):
@ -270,6 +284,19 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
constraints['claim.channel_hash__in'] = [ constraints['claim.channel_hash__in'] = [
unhexlify(cid)[::-1] for cid in channel_ids unhexlify(cid)[::-1] for cid in channel_ids
] ]
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = [
unhexlify(ncid)[::-1] for ncid in not_channel_ids
]
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'signature_valid' in constraints: if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False) has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature: if has_channel_signature:
@ -318,36 +345,32 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
return query(select, **constraints) return query(select, **constraints)
def get_claims(cols, for_count=False, **constraints) -> Tuple[List, Censor]: def select_claims(censor: Censor, cols: str, for_count=False, **constraints) -> List:
if 'channel' in constraints: if 'channel' in constraints:
channel_url = constraints.pop('channel') channel_url = constraints.pop('channel')
match = resolve_url(channel_url) match = resolve_url(channel_url)
if isinstance(match, dict): if isinstance(match, dict):
constraints['channel_hash'] = match['claim_hash'] constraints['channel_hash'] = match['claim_hash']
else: else:
return ([{'row_count': 0}] if cols == 'count(*) as row_count' else []), Censor() return [{'row_count': 0}] if cols == 'count(*) as row_count' else []
censor = Censor(
ctx.get().blocked_claims,
{unhexlify(ncid)[::-1] for ncid in constraints.pop('not_channel_ids', [])},
set(clean_tags(constraints.pop('not_tags', {})))
)
row_offset = constraints.pop('offset', 0) row_offset = constraints.pop('offset', 0)
row_limit = constraints.pop('limit', 20) row_limit = constraints.pop('limit', 20)
sql, values = _get_claims(cols, for_count, **constraints) sql, values = claims_query(cols, for_count, **constraints)
return execute_query(sql, values, row_offset, row_limit, censor), censor return execute_query(sql, values, row_offset, row_limit, censor)
@measure @measure
def get_claims_count(**constraints) -> int: def count_claims(**constraints) -> int:
constraints.pop('offset', None) constraints.pop('offset', None)
constraints.pop('limit', None) constraints.pop('limit', None)
constraints.pop('order_by', None) constraints.pop('order_by', None)
count, _ = get_claims('count(*) as row_count', for_count=True, **constraints) count = select_claims(Censor(), 'count(*) as row_count', for_count=True, **constraints)
return count[0]['row_count'] return count[0]['row_count']
def _search(**constraints) -> Tuple[List, Censor]: def search_claims(censor: Censor, **constraints) -> List:
return get_claims( return select_claims(
censor,
""" """
claimtrie.claim_hash as is_controlling, claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height, claimtrie.last_take_over_height,
@ -360,24 +383,23 @@ def _search(**constraints) -> Tuple[List, Censor]:
claim.trending_local, claim.trending_global, claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url, claim.short_url, claim.canonical_url,
claim.channel_hash, claim.reposted_claim_hash, claim.channel_hash, claim.reposted_claim_hash,
claim.signature_valid, claim.signature_valid
COALESCE((SELECT group_concat(tag) FROM tag WHERE tag.claim_hash = claim.claim_hash), "") as tags
""", **constraints """, **constraints
) )
def _get_referenced_rows(txo_rows: List[dict]): def _get_referenced_rows(censor: Censor, txo_rows: List[dict]):
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows))) repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows)))
channel_hashes = set(filter(None, map(itemgetter('channel_hash'), txo_rows))) channel_hashes = set(filter(None, map(itemgetter('channel_hash'), txo_rows)))
reposted_txos = [] reposted_txos = []
if repost_hashes: if repost_hashes:
reposted_txos, _ = _search(**{'claim.claim_hash__in': repost_hashes}) reposted_txos = search_claims(censor, **{'claim.claim_hash__in': repost_hashes})
channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos))) channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos)))
channel_txos = [] channel_txos = []
if channel_hashes: if channel_hashes:
channel_txos, _ = _search(**{'claim.claim_hash__in': channel_hashes}) channel_txos = search_claims(censor, **{'claim.claim_hash__in': channel_hashes})
# channels must come first for client side inflation to work properly # channels must come first for client side inflation to work properly
return channel_txos + reposted_txos return channel_txos + reposted_txos
@ -389,25 +411,30 @@ def search(constraints) -> Tuple[List, List, int, int, Censor]:
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None total = None
if not constraints.pop('no_totals', False): if not constraints.pop('no_totals', False):
total = get_claims_count(**constraints) total = count_claims(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0)) constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints: if 'order_by' not in constraints:
constraints['order_by'] = ["claim_hash"] constraints['order_by'] = ["claim_hash"]
txo_rows, censor = _search(**constraints) context = ctx.get()
extra_txo_rows = _get_referenced_rows(txo_rows) search_censor = context.get_search_censor()
return txo_rows, extra_txo_rows, constraints['offset'], total, censor txo_rows = search_claims(search_censor, **constraints)
extra_txo_rows = _get_referenced_rows(context.get_resolve_censor(), txo_rows)
return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor
@measure @measure
def resolve(urls) -> Tuple[List, List]: def resolve(urls) -> Tuple[List, List]:
txo_rows = [resolve_url(raw_url) for raw_url in urls] txo_rows = [resolve_url(raw_url) for raw_url in urls]
extra_txo_rows = _get_referenced_rows([r for r in txo_rows if isinstance(r, dict)]) extra_txo_rows = _get_referenced_rows(
ctx.get().get_resolve_censor(), [r for r in txo_rows if isinstance(r, dict)]
)
return txo_rows, extra_txo_rows return txo_rows, extra_txo_rows
@measure @measure
def resolve_url(raw_url): def resolve_url(raw_url):
censor = ctx.get().get_resolve_censor()
try: try:
url = URL.parse(raw_url) url = URL.parse(raw_url)
@ -422,7 +449,7 @@ def resolve_url(raw_url):
query['is_controlling'] = True query['is_controlling'] = True
else: else:
query['order_by'] = ['^creation_height'] query['order_by'] = ['^creation_height']
matches, _ = _search(**query, limit=1) matches = search_claims(censor, **query, limit=1)
if matches: if matches:
channel = matches[0] channel = matches[0]
else: else:
@ -440,7 +467,7 @@ def resolve_url(raw_url):
query['signature_valid'] = 1 query['signature_valid'] = 1
elif set(query) == {'name'}: elif set(query) == {'name'}:
query['is_controlling'] = 1 query['is_controlling'] = 1
matches, _ = _search(**query, limit=1) matches = search_claims(censor, **query, limit=1)
if matches: if matches:
return matches[0] return matches[0]
else: else:

View file

@ -141,6 +141,8 @@ class SQLDB:
create unique index if not exists claim_type_release_idx on claim (claim_type, release_time, claim_hash); create unique index if not exists claim_type_release_idx on claim (claim_type, release_time, claim_hash);
create unique index if not exists claim_type_effective_amount_idx on claim (claim_type, effective_amount, claim_hash); create unique index if not exists claim_type_effective_amount_idx on claim (claim_type, effective_amount, claim_hash);
create unique index if not exists channel_hash_release_time_idx on claim (channel_hash, release_time, claim_hash);
-- TODO: verify that all indexes below are used -- TODO: verify that all indexes below are used
create index if not exists claim_height_normalized_idx on claim (height, normalized asc); create index if not exists claim_height_normalized_idx on claim (height, normalized asc);
@ -170,15 +172,21 @@ class SQLDB:
CREATE_TAG_TABLE CREATE_TAG_TABLE
) )
def __init__(self, main, path: str, filtering_channels: list): def __init__(self, main, path: str, blocking_channels: list, filtering_channels: list):
self.main = main self.main = main
self._db_path = path self._db_path = path
self.db = None self.db = None
self.state_manager = None
self.blocked_claims = None
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger
self._fts_synced = False self._fts_synced = False
self.state_manager = None
self.blocked_streams = None
self.blocked_channels = None
self.blocking_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in blocking_channels if channel_id
}
self.filtered_streams = None
self.filtered_channels = None
self.filtering_channel_hashes = { self.filtering_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
} }
@ -202,8 +210,11 @@ class SQLDB:
register_canonical_functions(self.db) register_canonical_functions(self.db)
register_trending_functions(self.db) register_trending_functions(self.db)
self.state_manager = Manager() self.state_manager = Manager()
self.blocked_claims = self.state_manager.dict() self.blocked_streams = self.state_manager.dict()
self.update_blocked_claims() self.blocked_channels = self.state_manager.dict()
self.filtered_streams = self.state_manager.dict()
self.filtered_channels = self.state_manager.dict()
self.update_blocked_and_filtered_claims()
def close(self): def close(self):
if self.db is not None: if self.db is not None:
@ -211,17 +222,34 @@ class SQLDB:
if self.state_manager is not None: if self.state_manager is not None:
self.state_manager.shutdown() self.state_manager.shutdown()
def update_blocked_claims(self): def update_blocked_and_filtered_claims(self):
sql = query( self.update_claims_from_channel_hashes(
"SELECT channel_hash, reposted_claim_hash FROM claim", self.blocked_streams, self.blocked_channels, self.blocking_channel_hashes
reposted_claim_hash__is_not_null=1,
channel_hash__in=self.filtering_channel_hashes
) )
blocked_claims = {} self.update_claims_from_channel_hashes(
self.filtered_streams, self.filtered_channels, self.filtering_channel_hashes
)
self.filtered_streams.update(self.blocked_streams)
self.filtered_channels.update(self.blocked_channels)
def update_claims_from_channel_hashes(self, shared_streams, shared_channels, channel_hashes):
sql = query(
"SELECT claim.channel_hash, claim.reposted_claim_hash, reposted.claim_type "
"FROM claim JOIN claim AS reposted ON (reposted.claim_hash=claim.reposted_claim_hash)", **{
'claim.reposted_claim_hash__is_not_null': 1,
'claim.channel_hash__in': channel_hashes
}
)
streams, channels = {}, {}
for blocked_claim in self.execute(*sql): for blocked_claim in self.execute(*sql):
blocked_claims[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash if blocked_claim.claim_type == CLAIM_TYPES['stream']:
self.blocked_claims.clear() streams[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
self.blocked_claims.update(blocked_claims) elif blocked_claim.claim_type == CLAIM_TYPES['channel']:
channels[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
shared_streams.clear()
shared_streams.update(streams)
shared_channels.clear()
shared_channels.update(channels)
@staticmethod @staticmethod
def _insert_sql(table: str, data: dict) -> Tuple[str, list]: def _insert_sql(table: str, data: dict) -> Tuple[str, list]:
@ -613,8 +641,9 @@ class SQLDB:
sub_timer = timer.add_timer('update blocked claims list') sub_timer = timer.add_timer('update blocked claims list')
sub_timer.start() sub_timer.start()
if self.filtering_channel_hashes.intersection(all_channel_keys): if (self.blocking_channel_hashes.intersection(all_channel_keys) or
self.update_blocked_claims() self.filtering_channel_hashes.intersection(all_channel_keys)):
self.update_blocked_and_filtered_claims()
sub_timer.stop() sub_timer.stop()
def _update_support_amount(self, claim_hashes): def _update_support_amount(self, claim_hashes):
@ -816,7 +845,11 @@ class LBRYLevelDB(LevelDB):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
path = os.path.join(self.env.db_dir, 'claims.db') path = os.path.join(self.env.db_dir, 'claims.db')
# space separated list of channel URIs used for filtering bad content # space separated list of channel URIs used for filtering bad content
self.sql = SQLDB(self, path, self.env.default('FILTERING_CHANNELS_IDS', '').split(' ')) self.sql = SQLDB(
self, path,
self.env.default('BLOCKING_CHANNELS_IDS', '').split(' '),
self.env.default('FILTERING_CHANNELS_IDS', '').split(' '),
)
def close(self): def close(self):
super().close() super().close()

View file

@ -745,8 +745,13 @@ class LBRYSessionManager(SessionManager):
path = os.path.join(self.env.db_dir, 'claims.db') path = os.path.join(self.env.db_dir, 'claims.db')
args = dict( args = dict(
initializer=reader.initializer, initializer=reader.initializer,
initargs=(self.logger, path, self.env.coin.NET, self.env.database_query_timeout, initargs=(
self.env.track_metrics, self.db.sql.blocked_claims) self.logger, path, self.env.coin.NET, self.env.database_query_timeout,
self.env.track_metrics, (
self.db.sql.blocked_streams, self.db.sql.blocked_channels,
self.db.sql.filtered_streams, self.db.sql.filtered_channels
)
)
) )
if self.env.max_query_workers is not None and self.env.max_query_workers == 0: if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1, **args) self.query_executor = ThreadPoolExecutor(max_workers=1, **args)

View file

@ -767,21 +767,17 @@ class StreamCommands(ClaimTestCase):
# search for blocked content directly # search for blocked content directly
result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content')) result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content'))
self.assertEqual([], result['items']) self.assertEqual([], result['items'])
self.assertEqual({"reposted_in_channel": {blocking_channel_id: 1}, "total": 1}, result['blocked']) self.assertEqual({"channels": {blocking_channel_id: 1}, "total": 1}, result['blocked'])
# search channel containing blocked content # search channel containing blocked content
result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel')) result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel'))
self.assertEqual(1, len(result['items'])) self.assertEqual(1, len(result['items']))
self.assertEqual({"reposted_in_channel": {blocking_channel_id: 1}, "total": 1}, result['blocked']) self.assertEqual({"channels": {blocking_channel_id: 1}, "total": 1}, result['blocked'])
# search channel containing blocked content, also block tag # content was filtered by not_tag before censoring
result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel', not_tags=["good", "bad"])) result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel', not_tags=["good", "bad"]))
self.assertEqual(0, len(result['items'])) self.assertEqual(0, len(result['items']))
self.assertEqual({ self.assertEqual({"channels": {}, "total": 0}, result['blocked'])
"reposted_in_channel": {blocking_channel_id: 1},
"has_tag": {"good": 1, "bad": 1},
"total": 2
}, result['blocked'])
async def test_publish_updates_file_list(self): async def test_publish_updates_file_list(self):
tx = await self.stream_create(title='created') tx = await self.stream_create(title='created')

View file

@ -3,9 +3,11 @@ import ecdsa
import hashlib import hashlib
import logging import logging
from binascii import hexlify from binascii import hexlify
from typing import List, Tuple
from lbry.wallet.constants import COIN, NULL_HASH32 from lbry.wallet.constants import COIN, NULL_HASH32
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.schema.result import Censor
from lbry.wallet.server.db import reader, writer from lbry.wallet.server.db import reader, writer
from lbry.wallet.server.coin import LBCRegTest from lbry.wallet.server.coin import LBCRegTest
from lbry.wallet.server.db.trending import TRENDING_WINDOW from lbry.wallet.server.db.trending import TRENDING_WINDOW
@ -28,6 +30,15 @@ def get_tx():
return Transaction().add_inputs([get_input()]) return Transaction().add_inputs([get_input()])
def search(**constraints) -> List:
return reader.search_claims(Censor(), **constraints)
def censored_search(**constraints) -> Tuple[List, Censor]:
rows, _, _, _, censor = reader.search(constraints)
return rows, censor
class TestSQLDB(unittest.TestCase): class TestSQLDB(unittest.TestCase):
query_timeout = 0.25 query_timeout = 0.25
@ -36,12 +47,15 @@ class TestSQLDB(unittest.TestCase):
self.daemon_height = 1 self.daemon_height = 1
self.coin = LBCRegTest() self.coin = LBCRegTest()
db_url = 'file:test_sqldb?mode=memory&cache=shared' db_url = 'file:test_sqldb?mode=memory&cache=shared'
self.sql = writer.SQLDB(self, db_url, []) self.sql = writer.SQLDB(self, db_url, [], [])
self.addCleanup(self.sql.close) self.addCleanup(self.sql.close)
self.sql.open() self.sql.open()
reader.initializer( reader.initializer(
logging.getLogger(__name__), db_url, 'regtest', logging.getLogger(__name__), db_url, 'regtest',
self.query_timeout, blocked_claims=self.sql.blocked_claims self.query_timeout, block_and_filter=(
self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels
)
) )
self.addCleanup(reader.cleanup) self.addCleanup(reader.cleanup)
self.timer = Timer('BlockProcessor') self.timer = Timer('BlockProcessor')
@ -330,7 +344,7 @@ class TestClaimtrie(TestSQLDB):
advance, state = self.advance, self.state advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN) stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_stream_update(stream, 11*COIN)]) advance(10, [stream, self.get_stream_update(stream, 11*COIN)])
self.assertTrue(reader._search()[0]) self.assertTrue(search()[0])
def test_double_updates_in_same_block(self): def test_double_updates_in_same_block(self):
advance, state = self.advance, self.state advance, state = self.advance, self.state
@ -338,13 +352,13 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream]) advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN) update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_stream_update(update, 9*COIN)]) advance(20, [update, self.get_stream_update(update, 9*COIN)])
self.assertTrue(reader._search()[0]) self.assertTrue(search()[0])
def test_create_and_abandon_in_same_block(self): def test_create_and_abandon_in_same_block(self):
advance, state = self.advance, self.state advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN) stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_abandon(stream)]) advance(10, [stream, self.get_abandon(stream)])
self.assertFalse(reader._search()[0]) self.assertFalse(search())
def test_update_and_abandon_in_same_block(self): def test_update_and_abandon_in_same_block(self):
advance, state = self.advance, self.state advance, state = self.advance, self.state
@ -352,14 +366,14 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream]) advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN) update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_abandon(update)]) advance(20, [update, self.get_abandon(update)])
self.assertFalse(reader._search()[0]) self.assertFalse(search())
def test_create_update_and_delete_in_same_block(self): def test_create_update_and_delete_in_same_block(self):
advance, state = self.advance, self.state advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN) stream = self.get_stream('Claim A', 10*COIN)
update = self.get_stream_update(stream, 11*COIN) update = self.get_stream_update(stream, 11*COIN)
advance(10, [stream, update, self.get_abandon(update)]) advance(10, [stream, update, self.get_abandon(update)])
self.assertFalse(reader._search()[0]) self.assertFalse(search())
def test_support_added_and_removed_in_same_block(self): def test_support_added_and_removed_in_same_block(self):
advance, state = self.advance, self.state advance, state = self.advance, self.state
@ -367,7 +381,7 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream]) advance(10, [stream])
support = self.get_support(stream, COIN) support = self.get_support(stream, COIN)
advance(20, [support, self.get_abandon(support)]) advance(20, [support, self.get_abandon(support)])
self.assertEqual(reader._search()[0][0]['support_amount'], 0) self.assertEqual(search()[0]['support_amount'], 0)
@staticmethod @staticmethod
def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs): def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs):
@ -396,7 +410,7 @@ class TestClaimtrie(TestSQLDB):
txo_chan_ab = tx_chan_ab[0].outputs[0] txo_chan_ab = tx_chan_ab[0].outputs[0]
advance(1, [tx_chan_a]) advance(1, [tx_chan_a])
advance(2, [tx_chan_ab]) advance(2, [tx_chan_ab])
(r_ab, r_a), _ = reader._search(order_by=['creation_height'], limit=2) (r_ab, r_a) = search(order_by=['creation_height'], limit=2)
self.assertEqual("@foo#a", r_a['short_url']) self.assertEqual("@foo#a", r_a['short_url'])
self.assertEqual("@foo#ab", r_ab['short_url']) self.assertEqual("@foo#ab", r_ab['short_url'])
self.assertIsNone(r_a['canonical_url']) self.assertIsNone(r_a['canonical_url'])
@ -409,7 +423,7 @@ class TestClaimtrie(TestSQLDB):
tx_abc = self.get_stream_with_claim_id_prefix('abc', 65) tx_abc = self.get_stream_with_claim_id_prefix('abc', 65)
advance(3, [tx_a]) advance(3, [tx_a])
advance(4, [tx_ab, tx_abc]) advance(4, [tx_ab, tx_abc])
(r_abc, r_ab, r_a), _ = reader._search(order_by=['creation_height', 'tx_position'], limit=3) (r_abc, r_ab, r_a) = search(order_by=['creation_height', 'tx_position'], limit=3)
self.assertEqual("foo#a", r_a['short_url']) self.assertEqual("foo#a", r_a['short_url'])
self.assertEqual("foo#ab", r_ab['short_url']) self.assertEqual("foo#ab", r_ab['short_url'])
self.assertEqual("foo#abc", r_abc['short_url']) self.assertEqual("foo#abc", r_abc['short_url'])
@ -423,51 +437,51 @@ class TestClaimtrie(TestSQLDB):
ab2_claim = tx_ab2[0].outputs[0] ab2_claim = tx_ab2[0].outputs[0]
advance(6, [tx_a2]) advance(6, [tx_a2])
advance(7, [tx_ab2]) advance(7, [tx_ab2])
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# change channel public key, invaliding stream claim signatures # change channel public key, invaliding stream claim signatures
advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')]) advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')])
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertIsNone(r_a2['canonical_url']) self.assertIsNone(r_a2['canonical_url'])
self.assertIsNone(r_ab2['canonical_url']) self.assertIsNone(r_ab2['canonical_url'])
self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) self.assertEqual(0, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# reinstate previous channel public key (previous stream claim signatures become valid again) # reinstate previous channel public key (previous stream claim signatures become valid again)
channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c') channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c')
advance(9, [channel_update]) advance(9, [channel_update])
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2) (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel']) self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
# change channel of stream # change channel of stream
self.assertEqual("@foo#a/foo#ab", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url']) self.assertEqual("@foo#a/foo#ab", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab) tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab)
advance(10, [tx_ab2]) advance(10, [tx_ab2])
self.assertEqual("@foo#ab/foo#a", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url']) self.assertEqual("@foo#ab/foo#a", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
# TODO: currently there is a bug where stream leaving a channel does not update that channels claims count # TODO: currently there is a bug where stream leaving a channel does not update that channels claims count
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# TODO: after bug is fixed remove test above and add test below # TODO: after bug is fixed remove test above and add test below
#self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel']) #self.assertEqual(1, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(1, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel']) self.assertEqual(1, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
# claim abandon updates claims_in_channel # claim abandon updates claims_in_channel
advance(11, [self.get_abandon(tx_ab2)]) advance(11, [self.get_abandon(tx_ab2)])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel']) self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
# delete channel, invaliding stream claim signatures # delete channel, invaliding stream claim signatures
advance(12, [self.get_abandon(channel_update)]) advance(12, [self.get_abandon(channel_update)])
(r_a2,), _ = reader._search(order_by=['creation_height'], limit=1) (r_a2,) = search(order_by=['creation_height'], limit=1)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertIsNone(r_a2['canonical_url']) self.assertIsNone(r_a2['canonical_url'])
@ -525,7 +539,7 @@ class TestTrending(TestSQLDB):
self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN), self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN),
self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN), self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN),
]) ])
results, _ = reader._search(order_by=['trending_local']) results = search(order_by=['trending_local'])
self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results]) self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results])
self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results]) self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results])
self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results]) self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])
@ -541,73 +555,180 @@ class TestTrending(TestSQLDB):
class TestContentBlocking(TestSQLDB): class TestContentBlocking(TestSQLDB):
def test_blocking(self): def test_blocking_and_filtering(self):
tx0 = self.get_channel('A Channel', COIN) # content claims and channels
a_channel = tx0[0].outputs[0] tx0 = self.get_channel('A Channel', COIN, '@channel1')
tx1 = self.get_stream('Claim One', COIN) regular_channel = tx0[0].outputs[0]
tx2 = self.get_stream('Claim Two', COIN, tags=["mature"], channel=a_channel) tx1 = self.get_stream('Claim One', COIN, 'claim1')
self.advance(1, [tx0, tx1, tx2]) tx2 = self.get_stream('Claim Two', COIN, 'claim2', regular_channel)
claim1, claim2 = tx1[0].outputs[0], tx2[0].outputs[0] tx3 = self.get_stream('Claim Three', COIN, 'claim3')
self.advance(1, [tx0, tx1, tx2, tx3])
claim1, claim2, claim3 = tx1[0].outputs[0], tx2[0].outputs[0], tx3[0].outputs[0]
# block and filter channels
tx0 = self.get_channel('Blocking Channel', COIN, '@block')
tx1 = self.get_channel('Filtering Channel', COIN, '@filter')
blocking_channel = tx0[0].outputs[0]
filtering_channel = tx1[0].outputs[0]
self.sql.blocking_channel_hashes.add(blocking_channel.claim_hash)
self.sql.filtering_channel_hashes.add(filtering_channel.claim_hash)
self.advance(2, [tx0, tx1])
self.assertEqual({}, dict(self.sql.blocked_streams))
self.assertEqual({}, dict(self.sql.blocked_channels))
self.assertEqual({}, dict(self.sql.filtered_streams))
self.assertEqual({}, dict(self.sql.filtered_channels))
# nothing blocked # nothing blocked
results, censor = reader._search(text='Claim') results, _ = reader.resolve([
self.assertEqual(2, len(results)) claim1.claim_name, claim2.claim_name,
claim3.claim_name, regular_channel.claim_name
])
self.assertEqual(claim1.claim_hash, results[0]['claim_hash'])
self.assertEqual(claim2.claim_hash, results[1]['claim_hash'])
self.assertEqual(claim3.claim_hash, results[2]['claim_hash'])
self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash'])
# nothing filtered
results, censor = censored_search()
self.assertEqual(6, len(results))
self.assertEqual(0, censor.total) self.assertEqual(0, censor.total)
self.assertEqual({}, dict(self.sql.blocked_claims)) self.assertEqual({}, censor.censored)
# block claim reposted to blocking channel # block claim reposted to blocking channel, also gets filtered
tx = self.get_channel('Blocking Channel', COIN) repost_tx1 = self.get_repost(claim1.claim_id, COIN, blocking_channel)
channel = tx[0].outputs[0] repost1 = repost_tx1[0].outputs[0]
self.sql.filtering_channel_hashes.add(channel.claim_hash) self.advance(3, [repost_tx1])
self.advance(2, [tx])
self.assertEqual({}, dict(self.sql.blocked_claims))
tx = self.get_repost(claim1.claim_id, COIN, channel)
reposting_claim = tx[0].outputs[0]
self.advance(3, [tx])
self.assertEqual( self.assertEqual(
{reposting_claim.claim.repost.reference.claim_hash: channel.claim_hash}, {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_claims) dict(self.sql.blocked_streams)
)
self.assertEqual({}, dict(self.sql.blocked_channels))
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_streams)
)
self.assertEqual({}, dict(self.sql.filtered_channels))
# claim is blocked from results by direct repost
results, censor = censored_search(text='Claim')
self.assertEqual(2, len(results))
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
self.assertEqual(claim3.claim_hash, results[1]['claim_hash'])
self.assertEqual(1, censor.total)
self.assertEqual({blocking_channel.claim_hash: 1}, censor.censored)
results, _ = reader.resolve([claim1.claim_name])
self.assertEqual('Could not find stream in "claim1".', results[0].args[0])
results, _ = reader.resolve([
claim2.claim_name, regular_channel.claim_name # claim2 and channel still resolved
])
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
self.assertEqual(regular_channel.claim_hash, results[1]['claim_hash'])
# block claim indirectly by blocking its parent channel
repost_tx2 = self.get_repost(regular_channel.claim_id, COIN, blocking_channel)
repost2 = repost_tx2[0].outputs[0]
self.advance(4, [repost_tx2])
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_channels)
)
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_channels)
) )
# claim is blocked from results by repost # claim in blocked channel is filtered from search and can't resolve
results, censor = reader._search(text='Claim') results, censor = censored_search(text='Claim')
self.assertEqual(1, len(results)) self.assertEqual(1, len(results))
self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
self.assertEqual(1, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims)
self.assertEqual({}, censor.blocked_channels)
self.assertEqual({}, censor.blocked_tags)
# claim is blocked from results by repost and tags
results, censor = reader._search(text='Claim', not_tags=["mature"])
self.assertEqual(0, len(results))
self.assertEqual(2, censor.total) self.assertEqual(2, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims) self.assertEqual({blocking_channel.claim_hash: 2}, censor.censored)
self.assertEqual({}, censor.blocked_channels) results, _ = reader.resolve([
self.assertEqual({"mature": 1}, censor.blocked_tags) claim2.claim_name, regular_channel.claim_name # claim2 and channel don't resolve
])
self.assertEqual('Could not find stream in "claim2".', results[0].args[0])
self.assertEqual('Could not find channel in "@channel1".', results[1].args[0])
results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved
self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
# claim is blocked from results by repost and channel # filtered claim is only filtered and not blocked
results, censor = reader._search(text='Claim', not_channel_ids=[a_channel.claim_id]) repost_tx3 = self.get_repost(claim3.claim_id, COIN, filtering_channel)
repost3 = repost_tx3[0].outputs[0]
self.advance(5, [repost_tx3])
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_channels)
)
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash,
repost3.claim.repost.reference.claim_hash: filtering_channel.claim_hash},
dict(self.sql.filtered_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_channels)
)
# filtered claim doesn't return in search but is resolveable
results, censor = censored_search(text='Claim')
self.assertEqual(0, len(results)) self.assertEqual(0, len(results))
self.assertEqual(2, censor.total) self.assertEqual(3, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims) self.assertEqual({blocking_channel.claim_hash: 2, filtering_channel.claim_hash: 1}, censor.censored)
self.assertEqual({a_channel.claim_hash: 1}, censor.blocked_channels) results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved
self.assertEqual({}, censor.blocked_tags) self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
# abandon unblocks content
self.advance(6, [
self.get_abandon(repost_tx1),
self.get_abandon(repost_tx2),
self.get_abandon(repost_tx3)
])
self.assertEqual({}, dict(self.sql.blocked_streams))
self.assertEqual({}, dict(self.sql.blocked_channels))
self.assertEqual({}, dict(self.sql.filtered_streams))
self.assertEqual({}, dict(self.sql.filtered_channels))
results, censor = censored_search(text='Claim')
self.assertEqual(3, len(results))
self.assertEqual(0, censor.total)
results, censor = censored_search()
self.assertEqual(6, len(results))
self.assertEqual(0, censor.total)
results, _ = reader.resolve([
claim1.claim_name, claim2.claim_name,
claim3.claim_name, regular_channel.claim_name
])
self.assertEqual(claim1.claim_hash, results[0]['claim_hash'])
self.assertEqual(claim2.claim_hash, results[1]['claim_hash'])
self.assertEqual(claim3.claim_hash, results[2]['claim_hash'])
self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash'])
def test_pagination(self): def test_pagination(self):
one, two, three, four, five, six, seven = ( one, two, three, four, five, six, seven, filter_channel = self.advance(1, [
self.advance(1, [self.get_stream('One', COIN, tags=["mature"])])[0], self.get_stream('One', COIN),
self.advance(2, [self.get_stream('Two', COIN, tags=["mature"])])[0], self.get_stream('Two', COIN),
self.advance(3, [self.get_stream('Three', COIN)])[0], self.get_stream('Three', COIN),
self.advance(4, [self.get_stream('Four', COIN)])[0], self.get_stream('Four', COIN),
self.advance(5, [self.get_stream('Five', COIN)])[0], self.get_stream('Five', COIN),
self.advance(6, [self.get_stream('Six', COIN)])[0], self.get_stream('Six', COIN),
self.advance(7, [self.get_stream('Seven', COIN)])[0], self.get_stream('Seven', COIN),
) self.get_channel('Filtering Channel', COIN, '@filter'),
])
self.sql.filtering_channel_hashes.add(filter_channel.claim_hash)
# nothing blocked # nothing filtered
results, censor = reader._search(order_by='^height', offset=1, limit=3) results, censor = censored_search(order_by='^height', offset=1, limit=3)
self.assertEqual(3, len(results)) self.assertEqual(3, len(results))
self.assertEqual( self.assertEqual(
[two.claim_hash, three.claim_hash, four.claim_hash], [two.claim_hash, three.claim_hash, four.claim_hash],
@ -615,12 +736,16 @@ class TestContentBlocking(TestSQLDB):
) )
self.assertEqual(0, censor.total) self.assertEqual(0, censor.total)
# tags blocked # content filtered
results, censor = reader._search(order_by='^height', not_tags=('mature',), offset=1, limit=3) repost1, repost2 = self.advance(2, [
self.get_repost(one.claim_id, COIN, filter_channel),
self.get_repost(two.claim_id, COIN, filter_channel),
])
results, censor = censored_search(order_by='^height', offset=1, limit=3)
self.assertEqual(3, len(results)) self.assertEqual(3, len(results))
self.assertEqual( self.assertEqual(
[four.claim_hash, five.claim_hash, six.claim_hash], [four.claim_hash, five.claim_hash, six.claim_hash],
[r['claim_hash'] for r in results] [r['claim_hash'] for r in results]
) )
self.assertEqual(2, censor.total) self.assertEqual(2, censor.total)
self.assertEqual({"mature": 2}, censor.blocked_tags) self.assertEqual({filter_channel.claim_hash: 2}, censor.censored)