diff --git a/lbry/wallet/server/db/canonical.py b/lbry/wallet/server/db/canonical.py deleted file mode 100644 index 1b0edacba..000000000 --- a/lbry/wallet/server/db/canonical.py +++ /dev/null @@ -1,22 +0,0 @@ -class FindShortestID: - __slots__ = 'short_id', 'new_id' - - def __init__(self): - self.short_id = '' - self.new_id = None - - def step(self, other_id, new_id): - self.new_id = new_id - for i in range(len(self.new_id)): - if other_id[i] != self.new_id[i]: - if i > len(self.short_id)-1: - self.short_id = self.new_id[:i+1] - break - - def finalize(self): - if self.short_id: - return '#'+self.short_id - - -def register_canonical_functions(connection): - connection.create_aggregate("shortest_id", 2, FindShortestID) diff --git a/lbry/wallet/server/db/elasticsearch/constants.py b/lbry/wallet/server/db/elasticsearch/constants.py index 45c5eb853..3ba70f84d 100644 --- a/lbry/wallet/server/db/elasticsearch/constants.py +++ b/lbry/wallet/server/db/elasticsearch/constants.py @@ -66,7 +66,7 @@ RANGE_FIELDS = { 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', 'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel', 'amount', 'effective_amount', 'support_amount', - 'trending_score', 'censor_type', 'tx_num', 'trending_score_change' + 'trending_score', 'censor_type', 'tx_num' } ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS diff --git a/lbry/wallet/server/db/trending.py b/lbry/wallet/server/db/trending.py deleted file mode 100644 index 5c2aef513..000000000 --- a/lbry/wallet/server/db/trending.py +++ /dev/null @@ -1,299 +0,0 @@ -import math -import os -import sqlite3 -import time - - -HALF_LIFE = 400 -RENORM_INTERVAL = 1000 -WHALE_THRESHOLD = 10000.0 - -def whale_decay_factor(lbc): - """ - An additional decay factor applied to whale claims. - """ - if lbc <= WHALE_THRESHOLD: - return 1.0 - adjusted_half_life = HALF_LIFE/(math.log10(lbc/WHALE_THRESHOLD) + 1.0) - return 2.0**(1.0/HALF_LIFE - 1.0/adjusted_half_life) - - -def soften(lbc): - mag = abs(lbc) + 1E-8 - sign = 1.0 if lbc >= 0.0 else -1.0 - return sign*mag**0.25 - -def delay(lbc: int): - if lbc <= 0: - return 0 - elif lbc < 1000000: - return int(lbc**0.5) - else: - return 1000 - - -def inflate_units(height): - blocks = height % RENORM_INTERVAL - return 2.0 ** (blocks/HALF_LIFE) - - -PRAGMAS = ["PRAGMA FOREIGN_KEYS = OFF;", - "PRAGMA JOURNAL_MODE = WAL;", - "PRAGMA SYNCHRONOUS = 0;"] - - -class TrendingDB: - - def __init__(self, data_dir): - """ - Opens the trending database in the directory data_dir. - For testing, pass data_dir=":memory:" - """ - if data_dir == ":memory:": - path = ":memory:" - else: - path = os.path.join(data_dir, "trending.db") - self.db = sqlite3.connect(path, check_same_thread=False) - - for pragma in PRAGMAS: - self.execute(pragma) - self.execute("BEGIN;") - self._create_tables() - self._create_indices() - self.execute("COMMIT;") - self.pending_events = [] - - def execute(self, *args, **kwargs): - return self.db.execute(*args, **kwargs) - - def add_event(self, event): - self.pending_events.append(event) -# print(f"Added event: {event}.", flush=True) - - - def _create_tables(self): - - self.execute("""CREATE TABLE IF NOT EXISTS claims - (claim_hash BYTES NOT NULL PRIMARY KEY, - bid_lbc REAL NOT NULL, - support_lbc REAL NOT NULL, - trending_score REAL NOT NULL, - needs_write BOOLEAN NOT NULL) - WITHOUT ROWID;""") - - self.execute("""CREATE TABLE IF NOT EXISTS spikes - (claim_hash BYTES NOT NULL REFERENCES claims (claim_hash), - activation_height INTEGER NOT NULL, - mass REAL NOT NULL);""") - - - def _create_indices(self): - self.execute("CREATE INDEX IF NOT EXISTS idx1 ON spikes\ - (activation_height, claim_hash, mass);") - self.execute("CREATE INDEX IF NOT EXISTS idx2 ON spikes\ - (claim_hash);") - self.execute("CREATE INDEX IF NOT EXISTS idx3 ON claims (trending_score);") - self.execute("CREATE INDEX IF NOT EXISTS idx4 ON claims (needs_write, claim_hash);") - self.execute("CREATE INDEX IF NOT EXISTS idx5 ON claims (bid_lbc + support_lbc);") - - def get_trending_score(self, claim_hash): - result = self.execute("SELECT trending_score FROM claims\ - WHERE claim_hash = ?;", (claim_hash, ))\ - .fetchall() - if len(result) == 0: - return 0.0 - else: - return result[0] - - def _upsert_claim(self, height, event): - - claim_hash = event["claim_hash"] - - # Get old total lbc value of claim - old_lbc_pair = self.execute("SELECT bid_lbc, support_lbc FROM claims\ - WHERE claim_hash = ?;", - (claim_hash, )).fetchone() - if old_lbc_pair is None: - old_lbc_pair = (0.0, 0.0) - - if event["event"] == "upsert": - new_lbc_pair = (event["lbc"], old_lbc_pair[1]) - elif event["event"] == "support": - new_lbc_pair = (old_lbc_pair[0], old_lbc_pair[1] + event["lbc"]) - - # Upsert the claim - self.execute("INSERT INTO claims VALUES (?, ?, ?, ?, 1)\ - ON CONFLICT (claim_hash) DO UPDATE\ - SET bid_lbc = excluded.bid_lbc,\ - support_lbc = excluded.support_lbc;", - (claim_hash, new_lbc_pair[0], new_lbc_pair[1], 0.0)) - - if self.active: - old_lbc, lbc = sum(old_lbc_pair), sum(new_lbc_pair) - - # Add the spike - softened_change = soften(lbc - old_lbc) - change_in_softened = soften(lbc) - soften(old_lbc) - spike_mass = (softened_change**0.25*change_in_softened**0.75).real - activation_height = height + delay(lbc) - if spike_mass != 0.0: - self.execute("INSERT INTO spikes VALUES (?, ?, ?);", - (claim_hash, activation_height, spike_mass)) - - def _delete_claim(self, claim_hash): - self.execute("DELETE FROM spikes WHERE claim_hash = ?;", (claim_hash, )) - self.execute("DELETE FROM claims WHERE claim_hash = ?;", (claim_hash, )) - - - def _apply_spikes(self, height): - spikes = self.execute("SELECT claim_hash, mass FROM spikes\ - WHERE activation_height = ?;", - (height, )).fetchall() - for claim_hash, mass in spikes: # TODO: executemany for efficiency - self.execute("UPDATE claims SET trending_score = trending_score + ?,\ - needs_write = 1\ - WHERE claim_hash = ?;", - (mass, claim_hash)) - self.execute("DELETE FROM spikes WHERE activation_height = ?;", - (height, )) - - def _decay_whales(self): - - whales = self.execute("SELECT claim_hash, bid_lbc + support_lbc FROM claims\ - WHERE bid_lbc + support_lbc >= ?;", (WHALE_THRESHOLD, ))\ - .fetchall() - for claim_hash, lbc in whales: - factor = whale_decay_factor(lbc) - self.execute("UPDATE claims SET trending_score = trending_score*?, needs_write = 1\ - WHERE claim_hash = ?;", (factor, claim_hash)) - - - def _renorm(self): - factor = 2.0**(-RENORM_INTERVAL/HALF_LIFE) - - # Zero small values - self.execute("UPDATE claims SET trending_score = 0.0, needs_write = 1\ - WHERE trending_score <> 0.0 AND ABS(?*trending_score) < 1E-6;", - (factor, )) - - # Normalise other values - self.execute("UPDATE claims SET trending_score = ?*trending_score, needs_write = 1\ - WHERE trending_score <> 0.0;", (factor, )) - - - def process_block(self, height, daemon_height): - - self.active = daemon_height - height <= 10*HALF_LIFE - - self.execute("BEGIN;") - - if self.active: - - # Check for a unit change - if height % RENORM_INTERVAL == 0: - self._renorm() - - # Apply extra whale decay - self._decay_whales() - - # Upsert claims - for event in self.pending_events: - if event["event"] == "upsert": - self._upsert_claim(height, event) - - # Process supports - for event in self.pending_events: - if event["event"] == "support": - self._upsert_claim(height, event) - - # Delete claims - for event in self.pending_events: - if event["event"] == "delete": - self._delete_claim(event["claim_hash"]) - - if self.active: - # Apply spikes - self._apply_spikes(height) - - # Get set of claims that need writing to ES - claims_to_write = set() - for row in self.db.execute("SELECT claim_hash FROM claims WHERE\ - needs_write = 1;"): - claims_to_write.add(row[0]) - self.db.execute("UPDATE claims SET needs_write = 0\ - WHERE needs_write = 1;") - - self.execute("COMMIT;") - - self.pending_events.clear() - - return claims_to_write - - -if __name__ == "__main__": - import matplotlib.pyplot as plt - import numpy as np - import numpy.random as rng - import os - - trending_db = TrendingDB(":memory:") - - heights = list(range(1, 1000)) - heights = heights + heights[::-1] + heights - - events = [{"height": 45, - "what": dict(claim_hash="a", event="upsert", lbc=1.0)}, - {"height": 100, - "what": dict(claim_hash="a", event="support", lbc=3.0)}, - {"height": 150, - "what": dict(claim_hash="a", event="support", lbc=-3.0)}, - {"height": 170, - "what": dict(claim_hash="a", event="upsert", lbc=100000.0)}, - {"height": 730, - "what": dict(claim_hash="a", event="delete")}] - inverse_events = [{"height": 730, - "what": dict(claim_hash="a", event="upsert", lbc=100000.0)}, - {"height": 170, - "what": dict(claim_hash="a", event="upsert", lbc=1.0)}, - {"height": 150, - "what": dict(claim_hash="a", event="support", lbc=3.0)}, - {"height": 100, - "what": dict(claim_hash="a", event="support", lbc=-3.0)}, - {"height": 45, - "what": dict(claim_hash="a", event="delete")}] - - - xs, ys = [], [] - last_height = 0 - for height in heights: - - # Prepare the changes - if height > last_height: - es = events - else: - es = inverse_events - - for event in es: - if event["height"] == height: - trending_db.add_event(event["what"]) - - # Process the block - trending_db.process_block(height, height) - - if height > last_height: # Only plot when moving forward - xs.append(height) - y = trending_db.execute("SELECT trending_score FROM claims;").fetchone() - y = 0.0 if y is None else y[0] - ys.append(y/inflate_units(height)) - - last_height = height - - xs = np.array(xs) - ys = np.array(ys) - - plt.figure(1) - plt.plot(xs, ys, "o-", alpha=0.2) - - plt.figure(2) - plt.plot(xs) - plt.show() diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py deleted file mode 100644 index 4b4de924f..000000000 --- a/lbry/wallet/server/db/writer.py +++ /dev/null @@ -1,955 +0,0 @@ -import os - -import sqlite3 -from typing import Union, Tuple, Set, List -from itertools import chain -from decimal import Decimal -from collections import namedtuple -from binascii import unhexlify, hexlify -from lbry.wallet.server.leveldb import LevelDB -from lbry.wallet.server.util import class_logger -from lbry.wallet.database import query, constraints_to_sql - -from lbry.schema.tags import clean_tags -from lbry.schema.mime_types import guess_stream_type -from lbry.wallet import Ledger, RegTestLedger -from lbry.wallet.transaction import Transaction, Output -from lbry.wallet.server.db.canonical import register_canonical_functions -from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS - -from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES - -ATTRIBUTE_ARRAY_MAX_LENGTH = 100 -sqlite3.enable_callback_tracebacks(True) - - -class SQLDB: - - PRAGMAS = """ - pragma journal_mode=WAL; - """ - - CREATE_CLAIM_TABLE = """ - create table if not exists claim ( - claim_hash bytes primary key, - claim_id text not null, - claim_name text not null, - normalized text not null, - txo_hash bytes not null, - tx_position integer not null, - amount integer not null, - timestamp integer not null, -- last updated timestamp - creation_timestamp integer not null, - height integer not null, -- last updated height - creation_height integer not null, - activation_height integer, - expiration_height integer not null, - release_time integer not null, - - short_url text not null, -- normalized#shortest-unique-claim_id - canonical_url text, -- channel's-short_url/normalized#shortest-unique-claim_id-within-channel - - title text, - author text, - description text, - - claim_type integer, - has_source bool, - reposted integer default 0, - - -- streams - stream_type text, - media_type text, - fee_amount integer default 0, - fee_currency text, - duration integer, - - -- reposts - reposted_claim_hash bytes, - - -- claims which are channels - public_key_bytes bytes, - public_key_hash bytes, - claims_in_channel integer, - - -- claims which are inside channels - channel_hash bytes, - channel_join integer, -- height at which claim got valid signature / joined channel - signature bytes, - signature_digest bytes, - signature_valid bool, - - effective_amount integer not null default 0, - support_amount integer not null default 0, - trending_group integer not null default 0, - trending_mixed integer not null default 0, - trending_local integer not null default 0, - trending_global integer not null default 0 - ); - - create index if not exists claim_normalized_idx on claim (normalized, activation_height); - create index if not exists claim_channel_hash_idx on claim (channel_hash, signature, claim_hash); - create index if not exists claim_claims_in_channel_idx on claim (signature_valid, channel_hash, normalized); - create index if not exists claim_txo_hash_idx on claim (txo_hash); - create index if not exists claim_activation_height_idx on claim (activation_height, claim_hash); - create index if not exists claim_expiration_height_idx on claim (expiration_height); - create index if not exists claim_reposted_claim_hash_idx on claim (reposted_claim_hash); - """ - - CREATE_SUPPORT_TABLE = """ - create table if not exists support ( - txo_hash bytes primary key, - tx_position integer not null, - height integer not null, - claim_hash bytes not null, - amount integer not null - ); - create index if not exists support_claim_hash_idx on support (claim_hash, height); - """ - - CREATE_TAG_TABLE = """ - create table if not exists tag ( - tag text not null, - claim_hash bytes not null, - height integer not null - ); - create unique index if not exists tag_claim_hash_tag_idx on tag (claim_hash, tag); - """ - - CREATE_LANGUAGE_TABLE = """ - create table if not exists language ( - language text not null, - claim_hash bytes not null, - height integer not null - ); - create unique index if not exists language_claim_hash_language_idx on language (claim_hash, language); - """ - - CREATE_CLAIMTRIE_TABLE = """ - create table if not exists claimtrie ( - normalized text primary key, - claim_hash bytes not null, - last_take_over_height integer not null - ); - create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash); - """ - - CREATE_CHANGELOG_TRIGGER = """ - create table if not exists changelog ( - claim_hash bytes primary key - ); - create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash); - create trigger if not exists claim_changelog after update on claim - begin - insert or ignore into changelog (claim_hash) values (new.claim_hash); - end; - create trigger if not exists claimtrie_changelog after update on claimtrie - begin - insert or ignore into changelog (claim_hash) values (new.claim_hash); - insert or ignore into changelog (claim_hash) values (old.claim_hash); - end; - """ - - SEARCH_INDEXES = """ - -- used by any tag clouds - create index if not exists tag_tag_idx on tag (tag, claim_hash); - - -- naked order bys (no filters) - create unique index if not exists claim_release_idx on claim (release_time, claim_hash); - create unique index if not exists claim_trending_idx on claim (trending_group, trending_mixed, claim_hash); - create unique index if not exists claim_effective_amount_idx on claim (effective_amount, claim_hash); - - -- claim_type filter + order by - create unique index if not exists claim_type_release_idx on claim (release_time, claim_type, claim_hash); - create unique index if not exists claim_type_trending_idx on claim (trending_group, trending_mixed, claim_type, claim_hash); - create unique index if not exists claim_type_effective_amount_idx on claim (effective_amount, claim_type, claim_hash); - - -- stream_type filter + order by - create unique index if not exists stream_type_release_idx on claim (stream_type, release_time, claim_hash); - create unique index if not exists stream_type_trending_idx on claim (stream_type, trending_group, trending_mixed, claim_hash); - create unique index if not exists stream_type_effective_amount_idx on claim (stream_type, effective_amount, claim_hash); - - -- channel_hash filter + order by - create unique index if not exists channel_hash_release_idx on claim (channel_hash, release_time, claim_hash); - create unique index if not exists channel_hash_trending_idx on claim (channel_hash, trending_group, trending_mixed, claim_hash); - create unique index if not exists channel_hash_effective_amount_idx on claim (channel_hash, effective_amount, claim_hash); - - -- duration filter + order by - create unique index if not exists duration_release_idx on claim (duration, release_time, claim_hash); - create unique index if not exists duration_trending_idx on claim (duration, trending_group, trending_mixed, claim_hash); - create unique index if not exists duration_effective_amount_idx on claim (duration, effective_amount, claim_hash); - - -- fee_amount + order by - create unique index if not exists fee_amount_release_idx on claim (fee_amount, release_time, claim_hash); - create unique index if not exists fee_amount_trending_idx on claim (fee_amount, trending_group, trending_mixed, claim_hash); - create unique index if not exists fee_amount_effective_amount_idx on claim (fee_amount, effective_amount, claim_hash); - - -- TODO: verify that all indexes below are used - create index if not exists claim_height_normalized_idx on claim (height, normalized asc); - create index if not exists claim_resolve_idx on claim (normalized, claim_id); - create index if not exists claim_id_idx on claim (claim_id, claim_hash); - create index if not exists claim_timestamp_idx on claim (timestamp); - create index if not exists claim_public_key_hash_idx on claim (public_key_hash); - create index if not exists claim_signature_valid_idx on claim (signature_valid); - """ - - TAG_INDEXES = '\n'.join( - f"create unique index if not exists tag_{tag_key}_idx on tag (tag, claim_hash) WHERE tag='{tag_value}';" - for tag_value, tag_key in COMMON_TAGS.items() - ) - - LANGUAGE_INDEXES = '\n'.join( - f"create unique index if not exists language_{language}_idx on language (language, claim_hash) WHERE language='{language}';" - for language in INDEXED_LANGUAGES - ) - - CREATE_TABLES_QUERY = ( - CREATE_CLAIM_TABLE + - CREATE_SUPPORT_TABLE + - CREATE_CLAIMTRIE_TABLE + - CREATE_TAG_TABLE + - CREATE_CHANGELOG_TRIGGER + - CREATE_LANGUAGE_TABLE - ) - - def __init__( - self, main, path: str, blocking_channels: list, filtering_channels: list, trending: list): - self.main = main - self._db_path = path - self.db = None - self.logger = class_logger(__name__, self.__class__.__name__) - self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger - self.blocked_streams = None - self.blocked_channels = None - self.blocking_channel_hashes = { - unhexlify(channel_id)[::-1] for channel_id in blocking_channels if channel_id - } - self.filtered_streams = None - self.filtered_channels = None - self.filtering_channel_hashes = { - unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id - } - self.trending = trending - self.pending_deletes = set() - - def open(self): - self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False, uri=True) - - def namedtuple_factory(cursor, row): - Row = namedtuple('Row', (d[0] for d in cursor.description)) - return Row(*row) - self.db.row_factory = namedtuple_factory - self.db.executescript(self.PRAGMAS) - self.db.executescript(self.CREATE_TABLES_QUERY) - register_canonical_functions(self.db) - self.blocked_streams = {} - self.blocked_channels = {} - self.filtered_streams = {} - self.filtered_channels = {} - self.update_blocked_and_filtered_claims() - for algorithm in self.trending: - algorithm.install(self.db) - - def close(self): - if self.db is not None: - self.db.close() - - def update_blocked_and_filtered_claims(self): - self.update_claims_from_channel_hashes( - self.blocked_streams, self.blocked_channels, self.blocking_channel_hashes - ) - self.update_claims_from_channel_hashes( - self.filtered_streams, self.filtered_channels, self.filtering_channel_hashes - ) - self.filtered_streams.update(self.blocked_streams) - self.filtered_channels.update(self.blocked_channels) - - def update_claims_from_channel_hashes(self, shared_streams, shared_channels, channel_hashes): - streams, channels = {}, {} - if channel_hashes: - sql = query( - "SELECT repost.channel_hash, repost.reposted_claim_hash, target.claim_type " - "FROM claim as repost JOIN claim AS target ON (target.claim_hash=repost.reposted_claim_hash)", **{ - 'repost.reposted_claim_hash__is_not_null': 1, - 'repost.channel_hash__in': channel_hashes - } - ) - for blocked_claim in self.execute(*sql): - if blocked_claim.claim_type == CLAIM_TYPES['stream']: - streams[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash - elif blocked_claim.claim_type == CLAIM_TYPES['channel']: - channels[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash - shared_streams.clear() - shared_streams.update(streams) - shared_channels.clear() - shared_channels.update(channels) - - @staticmethod - def _insert_sql(table: str, data: dict) -> Tuple[str, list]: - columns, values = [], [] - for column, value in data.items(): - columns.append(column) - values.append(value) - sql = ( - f"INSERT INTO {table} ({', '.join(columns)}) " - f"VALUES ({', '.join(['?'] * len(values))})" - ) - return sql, values - - @staticmethod - def _update_sql(table: str, data: dict, where: str, - constraints: Union[list, tuple]) -> Tuple[str, list]: - columns, values = [], [] - for column, value in data.items(): - columns.append(f"{column} = ?") - values.append(value) - values.extend(constraints) - return f"UPDATE {table} SET {', '.join(columns)} WHERE {where}", values - - @staticmethod - def _delete_sql(table: str, constraints: dict) -> Tuple[str, dict]: - where, values = constraints_to_sql(constraints) - return f"DELETE FROM {table} WHERE {where}", values - - def execute(self, *args): - return self.db.execute(*args) - - def executemany(self, *args): - return self.db.executemany(*args) - - def begin(self): - self.execute('begin;') - - def commit(self): - self.execute('commit;') - - def _upsertable_claims(self, txos: List[Output], header, clear_first=False): - claim_hashes, claims, tags, languages = set(), [], {}, {} - for txo in txos: - tx = txo.tx_ref.tx - - try: - assert txo.claim_name - assert txo.normalized_name - except: - #self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.") - continue - - language = 'none' - try: - if txo.claim.is_stream and txo.claim.stream.languages: - language = txo.claim.stream.languages[0].language - except: - pass - - claim_hash = txo.claim_hash - claim_hashes.add(claim_hash) - claim_record = { - 'claim_hash': claim_hash, - 'claim_id': txo.claim_id, - 'claim_name': txo.claim_name, - 'normalized': txo.normalized_name, - 'txo_hash': txo.ref.hash, - 'tx_position': tx.position, - 'amount': txo.amount, - 'timestamp': header['timestamp'], - 'height': tx.height, - 'title': None, - 'description': None, - 'author': None, - 'duration': None, - 'claim_type': None, - 'has_source': False, - 'stream_type': None, - 'media_type': None, - 'release_time': None, - 'fee_currency': None, - 'fee_amount': 0, - 'reposted_claim_hash': None - } - claims.append(claim_record) - - try: - claim = txo.claim - except: - #self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.") - continue - - if claim.is_stream: - claim_record['claim_type'] = CLAIM_TYPES['stream'] - claim_record['has_source'] = claim.stream.has_source - claim_record['media_type'] = claim.stream.source.media_type - claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])] - claim_record['title'] = claim.stream.title - claim_record['description'] = claim.stream.description - claim_record['author'] = claim.stream.author - if claim.stream.video and claim.stream.video.duration: - claim_record['duration'] = claim.stream.video.duration - if claim.stream.audio and claim.stream.audio.duration: - claim_record['duration'] = claim.stream.audio.duration - if claim.stream.release_time: - claim_record['release_time'] = claim.stream.release_time - if claim.stream.has_fee: - fee = claim.stream.fee - if isinstance(fee.currency, str): - claim_record['fee_currency'] = fee.currency.lower() - if isinstance(fee.amount, Decimal): - if fee.amount >= 0 and int(fee.amount*1000) < 9223372036854775807: - claim_record['fee_amount'] = int(fee.amount*1000) - elif claim.is_repost: - claim_record['claim_type'] = CLAIM_TYPES['repost'] - claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash - elif claim.is_channel: - claim_record['claim_type'] = CLAIM_TYPES['channel'] - elif claim.is_collection: - claim_record['claim_type'] = CLAIM_TYPES['collection'] - - languages[(language, claim_hash)] = (language, claim_hash, tx.height) - - for tag in clean_tags(claim.message.tags): - tags[(tag, claim_hash)] = (tag, claim_hash, tx.height) - - if clear_first: - self._clear_claim_metadata(claim_hashes) - - if tags: - self.executemany( - "INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values() - ) - if languages: - self.executemany( - "INSERT OR IGNORE INTO language (language, claim_hash, height) VALUES (?, ?, ?)", languages.values() - ) - - return claims - - def insert_claims(self, txos: List[Output], header): - claims = self._upsertable_claims(txos, header) - if claims: - self.executemany(""" - INSERT OR REPLACE INTO claim ( - claim_hash, claim_id, claim_name, normalized, txo_hash, tx_position, amount, - claim_type, media_type, stream_type, timestamp, creation_timestamp, has_source, - fee_currency, fee_amount, title, description, author, duration, height, reposted_claim_hash, - creation_height, release_time, activation_height, expiration_height, short_url) - VALUES ( - :claim_hash, :claim_id, :claim_name, :normalized, :txo_hash, :tx_position, :amount, - :claim_type, :media_type, :stream_type, :timestamp, :timestamp, :has_source, - :fee_currency, :fee_amount, :title, :description, :author, :duration, :height, :reposted_claim_hash, :height, - CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE :timestamp END, - CASE WHEN :normalized NOT IN (SELECT normalized FROM claimtrie) THEN :height END, - CASE WHEN :height >= 137181 THEN :height+2102400 ELSE :height+262974 END, - :claim_name||COALESCE( - (SELECT shortest_id(claim_id, :claim_id) FROM claim WHERE normalized = :normalized), - '#'||substr(:claim_id, 1, 1) - ) - )""", claims) - - def update_claims(self, txos: List[Output], header): - claims = self._upsertable_claims(txos, header, clear_first=True) - if claims: - self.executemany(""" - UPDATE claim SET - txo_hash=:txo_hash, tx_position=:tx_position, amount=:amount, height=:height, - claim_type=:claim_type, media_type=:media_type, stream_type=:stream_type, - timestamp=:timestamp, fee_amount=:fee_amount, fee_currency=:fee_currency, has_source=:has_source, - title=:title, duration=:duration, description=:description, author=:author, reposted_claim_hash=:reposted_claim_hash, - release_time=CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE release_time END - WHERE claim_hash=:claim_hash; - """, claims) - - def delete_claims(self, claim_hashes: Set[bytes]): - """ Deletes claim supports and from claimtrie in case of an abandon. """ - if claim_hashes: - affected_channels = self.execute(*query( - "SELECT channel_hash FROM claim", channel_hash__is_not_null=1, claim_hash__in=claim_hashes - )).fetchall() - for table in ('claim', 'support', 'claimtrie'): - self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes})) - self._clear_claim_metadata(claim_hashes) - return {r.channel_hash for r in affected_channels} - return set() - - def delete_claims_above_height(self, height: int): - claim_hashes = [x[0] for x in self.execute( - "SELECT claim_hash FROM claim WHERE height>?", (height, ) - ).fetchall()] - while claim_hashes: - batch = set(claim_hashes[:500]) - claim_hashes = claim_hashes[500:] - self.delete_claims(batch) - - def _clear_claim_metadata(self, claim_hashes: Set[bytes]): - if claim_hashes: - for table in ('tag',): # 'language', 'location', etc - self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes})) - - def split_inputs_into_claims_supports_and_other(self, txis): - txo_hashes = {txi.txo_ref.hash for txi in txis} - claims = self.execute(*query( - "SELECT txo_hash, claim_hash, normalized FROM claim", txo_hash__in=txo_hashes - )).fetchall() - txo_hashes -= {r.txo_hash for r in claims} - supports = {} - if txo_hashes: - supports = self.execute(*query( - "SELECT txo_hash, claim_hash FROM support", txo_hash__in=txo_hashes - )).fetchall() - txo_hashes -= {r.txo_hash for r in supports} - return claims, supports, txo_hashes - - def insert_supports(self, txos: List[Output]): - supports = [] - for txo in txos: - tx = txo.tx_ref.tx - supports.append(( - txo.ref.hash, tx.position, tx.height, - txo.claim_hash, txo.amount - )) - if supports: - self.executemany( - "INSERT OR IGNORE INTO support (" - " txo_hash, tx_position, height, claim_hash, amount" - ") " - "VALUES (?, ?, ?, ?, ?)", supports - ) - - def delete_supports(self, txo_hashes: Set[bytes]): - if txo_hashes: - self.execute(*self._delete_sql('support', {'txo_hash__in': txo_hashes})) - - def calculate_reposts(self, txos: List[Output]): - targets = set() - for txo in txos: - try: - claim = txo.claim - except: - continue - if claim.is_repost: - targets.add((claim.repost.reference.claim_hash,)) - if targets: - self.executemany( - """ - UPDATE claim SET reposted = ( - SELECT count(*) FROM claim AS repost WHERE repost.reposted_claim_hash = claim.claim_hash - ) - WHERE claim_hash = ? - """, targets - ) - return {target[0] for target in targets} - - def validate_channel_signatures(self, height, new_claims, updated_claims, spent_claims, affected_channels, timer): - if not new_claims and not updated_claims and not spent_claims: - return - - sub_timer = timer.add_timer('segregate channels and signables') - sub_timer.start() - channels, new_channel_keys, signables = {}, {}, {} - for txo in chain(new_claims, updated_claims): - try: - claim = txo.claim - except: - continue - if claim.is_channel: - channels[txo.claim_hash] = txo - new_channel_keys[txo.claim_hash] = claim.channel.public_key_bytes - else: - signables[txo.claim_hash] = txo - sub_timer.stop() - - sub_timer = timer.add_timer('make list of channels we need to lookup') - sub_timer.start() - missing_channel_keys = set() - for txo in signables.values(): - claim = txo.claim - if claim.is_signed and claim.signing_channel_hash not in new_channel_keys: - missing_channel_keys.add(claim.signing_channel_hash) - sub_timer.stop() - - sub_timer = timer.add_timer('lookup missing channels') - sub_timer.start() - all_channel_keys = {} - if new_channel_keys or missing_channel_keys or affected_channels: - all_channel_keys = dict(self.execute(*query( - "SELECT claim_hash, public_key_bytes FROM claim", - claim_hash__in=set(new_channel_keys) | missing_channel_keys | affected_channels - ))) - sub_timer.stop() - - sub_timer = timer.add_timer('prepare for updating claims') - sub_timer.start() - changed_channel_keys = {} - for claim_hash, new_key in new_channel_keys.items(): - if claim_hash not in all_channel_keys or all_channel_keys[claim_hash] != new_key: - all_channel_keys[claim_hash] = new_key - changed_channel_keys[claim_hash] = new_key - - claim_updates = [] - - for claim_hash, txo in signables.items(): - claim = txo.claim - update = { - 'claim_hash': claim_hash, - 'channel_hash': None, - 'signature': None, - 'signature_digest': None, - 'signature_valid': None - } - if claim.is_signed: - update.update({ - 'channel_hash': claim.signing_channel_hash, - 'signature': txo.get_encoded_signature(), - 'signature_digest': txo.get_signature_digest(self.ledger), - 'signature_valid': 0 - }) - claim_updates.append(update) - sub_timer.stop() - - sub_timer = timer.add_timer('find claims affected by a change in channel key') - sub_timer.start() - if changed_channel_keys: - sql = f""" - SELECT * FROM claim WHERE - channel_hash IN ({','.join('?' for _ in changed_channel_keys)}) AND - signature IS NOT NULL - """ - for affected_claim in self.execute(sql, list(changed_channel_keys.keys())): - if affected_claim.claim_hash not in signables: - claim_updates.append({ - 'claim_hash': affected_claim.claim_hash, - 'channel_hash': affected_claim.channel_hash, - 'signature': affected_claim.signature, - 'signature_digest': affected_claim.signature_digest, - 'signature_valid': 0 - }) - sub_timer.stop() - - sub_timer = timer.add_timer('verify signatures') - sub_timer.start() - for update in claim_updates: - channel_pub_key = all_channel_keys.get(update['channel_hash']) - if channel_pub_key and update['signature']: - update['signature_valid'] = Output.is_signature_valid( - bytes(update['signature']), bytes(update['signature_digest']), channel_pub_key - ) - sub_timer.stop() - - sub_timer = timer.add_timer('update claims') - sub_timer.start() - if claim_updates: - self.executemany(f""" - UPDATE claim SET - channel_hash=:channel_hash, signature=:signature, signature_digest=:signature_digest, - signature_valid=:signature_valid, - channel_join=CASE - WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN channel_join - WHEN :signature_valid=1 THEN {height} - END, - canonical_url=CASE - WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN canonical_url - WHEN :signature_valid=1 THEN - (SELECT short_url FROM claim WHERE claim_hash=:channel_hash)||'/'|| - claim_name||COALESCE( - (SELECT shortest_id(other_claim.claim_id, claim.claim_id) FROM claim AS other_claim - WHERE other_claim.signature_valid = 1 AND - other_claim.channel_hash = :channel_hash AND - other_claim.normalized = claim.normalized), - '#'||substr(claim_id, 1, 1) - ) - END - WHERE claim_hash=:claim_hash; - """, claim_updates) - sub_timer.stop() - - sub_timer = timer.add_timer('update claims affected by spent channels') - sub_timer.start() - if spent_claims: - self.execute( - f""" - UPDATE claim SET - signature_valid=CASE WHEN signature IS NOT NULL THEN 0 END, - channel_join=NULL, canonical_url=NULL - WHERE channel_hash IN ({','.join('?' for _ in spent_claims)}) - """, list(spent_claims) - ) - sub_timer.stop() - - sub_timer = timer.add_timer('update channels') - sub_timer.start() - if channels: - self.executemany( - """ - UPDATE claim SET - public_key_bytes=:public_key_bytes, - public_key_hash=:public_key_hash - WHERE claim_hash=:claim_hash""", [{ - 'claim_hash': claim_hash, - 'public_key_bytes': txo.claim.channel.public_key_bytes, - 'public_key_hash': self.ledger.address_to_hash160( - self.ledger.public_key_to_address(txo.claim.channel.public_key_bytes) - ) - } for claim_hash, txo in channels.items()] - ) - sub_timer.stop() - - sub_timer = timer.add_timer('update claims_in_channel counts') - sub_timer.start() - if all_channel_keys: - self.executemany(f""" - UPDATE claim SET - claims_in_channel=( - SELECT COUNT(*) FROM claim AS claim_in_channel - WHERE claim_in_channel.signature_valid=1 AND - claim_in_channel.channel_hash=claim.claim_hash - ) - WHERE claim_hash = ? - """, [(channel_hash,) for channel_hash in all_channel_keys]) - sub_timer.stop() - - sub_timer = timer.add_timer('update blocked claims list') - sub_timer.start() - if (self.blocking_channel_hashes.intersection(all_channel_keys) or - self.filtering_channel_hashes.intersection(all_channel_keys)): - self.update_blocked_and_filtered_claims() - sub_timer.stop() - - def _update_support_amount(self, claim_hashes): - if claim_hashes: - self.execute(f""" - UPDATE claim SET - support_amount = COALESCE( - (SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash), 0 - ) - WHERE claim_hash IN ({','.join('?' for _ in claim_hashes)}) - """, claim_hashes) - - def _update_effective_amount(self, height, claim_hashes=None): - self.execute( - f"UPDATE claim SET effective_amount = amount + support_amount " - f"WHERE activation_height = {height}" - ) - if claim_hashes: - self.execute( - f"UPDATE claim SET effective_amount = amount + support_amount " - f"WHERE activation_height < {height} " - f" AND claim_hash IN ({','.join('?' for _ in claim_hashes)})", - claim_hashes - ) - - def _calculate_activation_height(self, height): - last_take_over_height = f"""COALESCE( - (SELECT last_take_over_height FROM claimtrie - WHERE claimtrie.normalized=claim.normalized), - {height} - ) - """ - self.execute(f""" - UPDATE claim SET activation_height = - {height} + min(4032, cast(({height} - {last_take_over_height}) / 32 AS INT)) - WHERE activation_height IS NULL - """) - - def _perform_overtake(self, height, changed_claim_hashes, deleted_names): - deleted_names_sql = claim_hashes_sql = "" - if changed_claim_hashes: - claim_hashes_sql = f"OR claim_hash IN ({','.join('?' for _ in changed_claim_hashes)})" - if deleted_names: - deleted_names_sql = f"OR normalized IN ({','.join('?' for _ in deleted_names)})" - overtakes = self.execute(f""" - SELECT winner.normalized, winner.claim_hash, - claimtrie.claim_hash AS current_winner, - MAX(winner.effective_amount) AS max_winner_effective_amount - FROM ( - SELECT normalized, claim_hash, effective_amount FROM claim - WHERE normalized IN ( - SELECT normalized FROM claim WHERE activation_height={height} {claim_hashes_sql} - ) {deleted_names_sql} - ORDER BY effective_amount DESC, height ASC, tx_position ASC - ) AS winner LEFT JOIN claimtrie USING (normalized) - GROUP BY winner.normalized - HAVING current_winner IS NULL OR current_winner <> winner.claim_hash - """, list(changed_claim_hashes)+deleted_names) - for overtake in overtakes: - if overtake.current_winner: - self.execute( - f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} " - f"WHERE normalized = ?", - (overtake.claim_hash, overtake.normalized) - ) - else: - self.execute( - f"INSERT INTO claimtrie (claim_hash, normalized, last_take_over_height) " - f"VALUES (?, ?, {height})", - (overtake.claim_hash, overtake.normalized) - ) - self.execute( - f"UPDATE claim SET activation_height = {height} WHERE normalized = ? " - f"AND (activation_height IS NULL OR activation_height > {height})", - (overtake.normalized,) - ) - - def _copy(self, height): - if height > 50: - self.execute(f"DROP TABLE claimtrie{height-50}") - self.execute(f"CREATE TABLE claimtrie{height} AS SELECT * FROM claimtrie") - - def update_claimtrie(self, height, changed_claim_hashes, deleted_names, timer): - r = timer.run - binary_claim_hashes = list(changed_claim_hashes) - - r(self._calculate_activation_height, height) - r(self._update_support_amount, binary_claim_hashes) - - r(self._update_effective_amount, height, binary_claim_hashes) - r(self._perform_overtake, height, binary_claim_hashes, list(deleted_names)) - - r(self._update_effective_amount, height) - r(self._perform_overtake, height, [], []) - - def get_expiring(self, height): - return self.execute( - f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" - ) - - def enqueue_changes(self): - query = """ - SELECT claimtrie.claim_hash as is_controlling, - claimtrie.last_take_over_height, - (select group_concat(tag, ',,') from tag where tag.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as tags, - (select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages, - cr.has_source as reposted_has_source, - cr.claim_type as reposted_claim_type, - cr.stream_type as reposted_stream_type, - cr.media_type as reposted_media_type, - cr.duration as reposted_duration, - cr.fee_amount as reposted_fee_amount, - cr.fee_currency as reposted_fee_currency, - claim.* - FROM claim LEFT JOIN claimtrie USING (claim_hash) LEFT JOIN claim cr ON cr.claim_hash=claim.reposted_claim_hash - WHERE claim.claim_hash in (SELECT claim_hash FROM changelog) - """ - for claim in self.execute(query): - claim = claim._asdict() - id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash']))) - claim['censor_type'] = 0 - censoring_channel_hash = None - claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source']) - claim['stream_type'] = claim.pop('reposted_stream_type') or claim['stream_type'] - claim['media_type'] = claim.pop('reposted_media_type') or claim['media_type'] - claim['fee_amount'] = claim.pop('reposted_fee_amount') or claim['fee_amount'] - claim['fee_currency'] = claim.pop('reposted_fee_currency') or claim['fee_currency'] - claim['duration'] = claim.pop('reposted_duration') or claim['duration'] - for reason_id in id_set: - if reason_id in self.blocked_streams: - claim['censor_type'] = 2 - censoring_channel_hash = self.blocked_streams.get(reason_id) - elif reason_id in self.blocked_channels: - claim['censor_type'] = 2 - censoring_channel_hash = self.blocked_channels.get(reason_id) - elif reason_id in self.filtered_streams: - claim['censor_type'] = 1 - censoring_channel_hash = self.filtered_streams.get(reason_id) - elif reason_id in self.filtered_channels: - claim['censor_type'] = 1 - censoring_channel_hash = self.filtered_channels.get(reason_id) - claim['censoring_channel_id'] = censoring_channel_hash[::-1].hex() if censoring_channel_hash else None - - claim['tags'] = claim['tags'].split(',,') if claim['tags'] else [] - claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] - yield 'update', claim - - def clear_changelog(self): - self.execute("delete from changelog;") - - def claim_producer(self): - while self.pending_deletes: - claim_hash = self.pending_deletes.pop() - yield 'delete', hexlify(claim_hash[::-1]).decode() - for claim in self.enqueue_changes(): - yield claim - self.clear_changelog() - - def advance_txs(self, height, all_txs, header, daemon_height, timer): - insert_claims = [] - update_claims = [] - update_claim_hashes = set() - delete_claim_hashes = self.pending_deletes - insert_supports = [] - delete_support_txo_hashes = set() - recalculate_claim_hashes = set() # added/deleted supports, added/updated claim - deleted_claim_names = set() - delete_others = set() - body_timer = timer.add_timer('body') - for position, (etx, txid) in enumerate(all_txs): - tx = timer.run( - Transaction, etx.raw, height=height, position=position - ) - # Inputs - spent_claims, spent_supports, spent_others = timer.run( - self.split_inputs_into_claims_supports_and_other, tx.inputs - ) - body_timer.start() - delete_claim_hashes.update({r.claim_hash for r in spent_claims}) - deleted_claim_names.update({r.normalized for r in spent_claims}) - delete_support_txo_hashes.update({r.txo_hash for r in spent_supports}) - recalculate_claim_hashes.update({r.claim_hash for r in spent_supports}) - delete_others.update(spent_others) - # Outputs - for output in tx.outputs: - if output.is_support: - insert_supports.append(output) - recalculate_claim_hashes.add(output.claim_hash) - elif output.script.is_claim_name: - insert_claims.append(output) - recalculate_claim_hashes.add(output.claim_hash) - elif output.script.is_update_claim: - claim_hash = output.claim_hash - update_claims.append(output) - recalculate_claim_hashes.add(claim_hash) - body_timer.stop() - - skip_update_claim_timer = timer.add_timer('skip update of abandoned claims') - skip_update_claim_timer.start() - for updated_claim in list(update_claims): - if updated_claim.ref.hash in delete_others: - update_claims.remove(updated_claim) - for updated_claim in update_claims: - claim_hash = updated_claim.claim_hash - delete_claim_hashes.discard(claim_hash) - update_claim_hashes.add(claim_hash) - skip_update_claim_timer.stop() - - skip_insert_claim_timer = timer.add_timer('skip insertion of abandoned claims') - skip_insert_claim_timer.start() - for new_claim in list(insert_claims): - if new_claim.ref.hash in delete_others: - if new_claim.claim_hash not in update_claim_hashes: - insert_claims.remove(new_claim) - skip_insert_claim_timer.stop() - - skip_insert_support_timer = timer.add_timer('skip insertion of abandoned supports') - skip_insert_support_timer.start() - for new_support in list(insert_supports): - if new_support.ref.hash in delete_others: - insert_supports.remove(new_support) - skip_insert_support_timer.stop() - - expire_timer = timer.add_timer('recording expired claims') - expire_timer.start() - for expired in self.get_expiring(height): - delete_claim_hashes.add(expired.claim_hash) - deleted_claim_names.add(expired.normalized) - expire_timer.stop() - - r = timer.run - affected_channels = r(self.delete_claims, delete_claim_hashes) - r(self.delete_supports, delete_support_txo_hashes) - r(self.insert_claims, insert_claims, header) - r(self.calculate_reposts, insert_claims) - r(self.update_claims, update_claims, header) - r(self.validate_channel_signatures, height, insert_claims, - update_claims, delete_claim_hashes, affected_channels, forward_timer=True) - r(self.insert_supports, insert_supports) - r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True) - for algorithm in self.trending: - r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes) diff --git a/tests/unit/wallet/server/reader.py b/tests/unit/wallet/server/reader.py deleted file mode 100644 index 0d8cb7d21..000000000 --- a/tests/unit/wallet/server/reader.py +++ /dev/null @@ -1,616 +0,0 @@ -import time -import struct -import sqlite3 -import logging -from operator import itemgetter -from typing import Tuple, List, Dict, Union, Type, Optional -from binascii import unhexlify -from decimal import Decimal -from contextvars import ContextVar -from functools import wraps -from itertools import chain -from dataclasses import dataclass - -from lbry.wallet.database import query, interpolate -from lbry.error import ResolveCensoredError -from lbry.schema.url import URL, normalize_name -from lbry.schema.tags import clean_tags -from lbry.schema.result import Outputs, Censor -from lbry.wallet import Ledger, RegTestLedger - -from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES - - -class SQLiteOperationalError(sqlite3.OperationalError): - def __init__(self, metrics): - super().__init__('sqlite query errored') - self.metrics = metrics - - -class SQLiteInterruptedError(sqlite3.OperationalError): - def __init__(self, metrics): - super().__init__('sqlite query interrupted') - self.metrics = metrics - - -ATTRIBUTE_ARRAY_MAX_LENGTH = 100 -sqlite3.enable_callback_tracebacks(True) - -INTEGER_PARAMS = { - 'height', 'creation_height', 'activation_height', 'expiration_height', - 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', - 'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel', - 'amount', 'effective_amount', 'support_amount', - 'trending_group', 'trending_mixed', - 'trending_local', 'trending_global', -} - -SEARCH_PARAMS = { - 'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids', - 'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency', - 'has_channel_signature', 'signature_valid', - 'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id', - 'any_locations', 'all_locations', 'not_locations', - 'any_languages', 'all_languages', 'not_languages', - 'is_controlling', 'limit', 'offset', 'order_by', - 'no_totals', 'has_source' -} | INTEGER_PARAMS - - -ORDER_FIELDS = { - 'name', 'claim_hash' -} | INTEGER_PARAMS - - -@dataclass -class ReaderState: - db: sqlite3.Connection - stack: List[List] - metrics: Dict - is_tracking_metrics: bool - ledger: Type[Ledger] - query_timeout: float - log: logging.Logger - blocked_streams: Dict - blocked_channels: Dict - filtered_streams: Dict - filtered_channels: Dict - - def close(self): - self.db.close() - - def reset_metrics(self): - self.stack = [] - self.metrics = {} - - def set_query_timeout(self): - stop_at = time.perf_counter() + self.query_timeout - - def interruptor(): - if time.perf_counter() >= stop_at: - self.db.interrupt() - return - - self.db.set_progress_handler(interruptor, 100) - - def get_resolve_censor(self) -> Censor: - return Censor(Censor.RESOLVE) - - def get_search_censor(self, limit_claims_per_channel: int) -> Censor: - return Censor(Censor.SEARCH) - - -ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') - - -def row_factory(cursor, row): - return { - k[0]: (set(row[i].split(',')) if k[0] == 'tags' else row[i]) - for i, k in enumerate(cursor.description) - } - - -def initializer(log, _path, _ledger_name, query_timeout, _measure=False, block_and_filter=None): - db = sqlite3.connect(_path, isolation_level=None, uri=True) - db.row_factory = row_factory - if block_and_filter: - blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter - else: - blocked_streams = blocked_channels = filtered_streams = filtered_channels = {} - ctx.set( - ReaderState( - db=db, stack=[], metrics={}, is_tracking_metrics=_measure, - ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger, - query_timeout=query_timeout, log=log, - blocked_streams=blocked_streams, blocked_channels=blocked_channels, - filtered_streams=filtered_streams, filtered_channels=filtered_channels, - ) - ) - - -def cleanup(): - ctx.get().close() - ctx.set(None) - - -def measure(func): - @wraps(func) - def wrapper(*args, **kwargs): - state = ctx.get() - if not state.is_tracking_metrics: - return func(*args, **kwargs) - metric = {} - state.metrics.setdefault(func.__name__, []).append(metric) - state.stack.append([]) - start = time.perf_counter() - try: - return func(*args, **kwargs) - finally: - elapsed = int((time.perf_counter()-start)*1000) - metric['total'] = elapsed - metric['isolated'] = (elapsed-sum(state.stack.pop())) - if state.stack: - state.stack[-1].append(elapsed) - return wrapper - - -def reports_metrics(func): - @wraps(func) - def wrapper(*args, **kwargs): - state = ctx.get() - if not state.is_tracking_metrics: - return func(*args, **kwargs) - state.reset_metrics() - r = func(*args, **kwargs) - return r, state.metrics - return wrapper - - -@reports_metrics -def search_to_bytes(constraints) -> Union[bytes, Tuple[bytes, Dict]]: - return encode_result(search(constraints)) - - -@reports_metrics -def resolve_to_bytes(urls) -> Union[bytes, Tuple[bytes, Dict]]: - return encode_result(resolve(urls)) - - -def encode_result(result): - return Outputs.to_bytes(*result) - - -@measure -def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor) -> List: - context = ctx.get() - context.set_query_timeout() - try: - rows = context.db.execute(sql, values).fetchall() - return rows[row_offset:row_limit] - except sqlite3.OperationalError as err: - plain_sql = interpolate(sql, values) - if context.is_tracking_metrics: - context.metrics['execute_query'][-1]['sql'] = plain_sql - context.log.exception('failed running query', exc_info=err) - raise SQLiteOperationalError(context.metrics) - - -def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]: - if 'order_by' in constraints: - order_by_parts = constraints['order_by'] - if isinstance(order_by_parts, str): - order_by_parts = [order_by_parts] - sql_order_by = [] - for order_by in order_by_parts: - is_asc = order_by.startswith('^') - column = order_by[1:] if is_asc else order_by - if column not in ORDER_FIELDS: - raise NameError(f'{column} is not a valid order_by field') - if column == 'name': - column = 'normalized' - sql_order_by.append( - f"claim.{column} ASC" if is_asc else f"claim.{column} DESC" - ) - constraints['order_by'] = sql_order_by - - ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'} - for constraint in INTEGER_PARAMS: - if constraint in constraints: - value = constraints.pop(constraint) - postfix = '' - if isinstance(value, str): - if len(value) >= 2 and value[:2] in ops: - postfix, value = ops[value[:2]], value[2:] - elif len(value) >= 1 and value[0] in ops: - postfix, value = ops[value[0]], value[1:] - if constraint == 'fee_amount': - value = Decimal(value)*1000 - constraints[f'claim.{constraint}{postfix}'] = int(value) - - if constraints.pop('is_controlling', False): - if {'sequence', 'amount_order'}.isdisjoint(constraints): - for_count = False - constraints['claimtrie.claim_hash__is_not_null'] = '' - if 'sequence' in constraints: - constraints['order_by'] = 'claim.activation_height ASC' - constraints['offset'] = int(constraints.pop('sequence')) - 1 - constraints['limit'] = 1 - if 'amount_order' in constraints: - constraints['order_by'] = 'claim.effective_amount DESC' - constraints['offset'] = int(constraints.pop('amount_order')) - 1 - constraints['limit'] = 1 - - if 'claim_id' in constraints: - claim_id = constraints.pop('claim_id') - if len(claim_id) == 40: - constraints['claim.claim_id'] = claim_id - else: - constraints['claim.claim_id__like'] = f'{claim_id[:40]}%' - elif 'claim_ids' in constraints: - constraints['claim.claim_id__in'] = set(constraints.pop('claim_ids')) - - if 'reposted_claim_id' in constraints: - constraints['claim.reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1] - - if 'name' in constraints: - constraints['claim.normalized'] = normalize_name(constraints.pop('name')) - - if 'public_key_id' in constraints: - constraints['claim.public_key_hash'] = ( - ctx.get().ledger.address_to_hash160(constraints.pop('public_key_id'))) - if 'channel_hash' in constraints: - constraints['claim.channel_hash'] = constraints.pop('channel_hash') - if 'channel_ids' in constraints: - channel_ids = constraints.pop('channel_ids') - if channel_ids: - constraints['claim.channel_hash__in'] = { - unhexlify(cid)[::-1] for cid in channel_ids if cid - } - if 'not_channel_ids' in constraints: - not_channel_ids = constraints.pop('not_channel_ids') - if not_channel_ids: - not_channel_ids_binary = { - unhexlify(ncid)[::-1] for ncid in not_channel_ids - } - constraints['claim.claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary - if constraints.get('has_channel_signature', False): - constraints['claim.channel_hash__not_in'] = not_channel_ids_binary - else: - constraints['null_or_not_channel__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.channel_hash__not_in': not_channel_ids_binary - } - if 'signature_valid' in constraints: - has_channel_signature = constraints.pop('has_channel_signature', False) - if has_channel_signature: - constraints['claim.signature_valid'] = constraints.pop('signature_valid') - else: - constraints['null_or_signature__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.signature_valid': constraints.pop('signature_valid') - } - elif constraints.pop('has_channel_signature', False): - constraints['claim.signature_valid__is_not_null'] = True - - if 'txid' in constraints: - tx_hash = unhexlify(constraints.pop('txid'))[::-1] - nout = constraints.pop('nout', 0) - constraints['claim.txo_hash'] = tx_hash + struct.pack(' List: - if 'channel' in constraints: - channel_url = constraints.pop('channel') - match = resolve_url(channel_url) - if isinstance(match, dict): - constraints['channel_hash'] = match['claim_hash'] - else: - return [{'row_count': 0}] if cols == 'count(*) as row_count' else [] - row_offset = constraints.pop('offset', 0) - row_limit = constraints.pop('limit', 20) - sql, values = claims_query(cols, for_count, **constraints) - return execute_query(sql, values, row_offset, row_limit, censor) - - -@measure -def count_claims(**constraints) -> int: - constraints.pop('offset', None) - constraints.pop('limit', None) - constraints.pop('order_by', None) - count = select_claims(Censor(Censor.SEARCH), 'count(*) as row_count', for_count=True, **constraints) - return count[0]['row_count'] - - -def search_claims(censor: Censor, **constraints) -> List: - return select_claims( - censor, - """ - claimtrie.claim_hash as is_controlling, - claimtrie.last_take_over_height, - claim.claim_hash, claim.txo_hash, - claim.claims_in_channel, claim.reposted, - claim.height, claim.creation_height, - claim.activation_height, claim.expiration_height, - claim.effective_amount, claim.support_amount, - claim.trending_group, claim.trending_mixed, - claim.trending_local, claim.trending_global, - claim.short_url, claim.canonical_url, - claim.channel_hash, claim.reposted_claim_hash, - claim.signature_valid - """, **constraints - ) - - -def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]): - censor = ctx.get().get_resolve_censor() - repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows))) - channel_hashes = set(chain( - filter(None, map(itemgetter('channel_hash'), txo_rows)), - censor_channels - )) - - reposted_txos = [] - if repost_hashes: - reposted_txos = search_claims(censor, **{'claim.claim_hash__in': repost_hashes}) - channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos))) - - channel_txos = [] - if channel_hashes: - channel_txos = search_claims(censor, **{'claim.claim_hash__in': channel_hashes}) - - # channels must come first for client side inflation to work properly - return channel_txos + reposted_txos - -@measure -def search(constraints) -> Tuple[List, List, int, int, Censor]: - assert set(constraints).issubset(SEARCH_PARAMS), \ - f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" - total = None - limit_claims_per_channel = constraints.pop('limit_claims_per_channel', None) - if not constraints.pop('no_totals', False): - total = count_claims(**constraints) - constraints['offset'] = abs(constraints.get('offset', 0)) - constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) - context = ctx.get() - search_censor = context.get_search_censor(limit_claims_per_channel) - txo_rows = search_claims(search_censor, **constraints) - extra_txo_rows = _get_referenced_rows(txo_rows, search_censor.censored.keys()) - return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor - - -@measure -def resolve(urls) -> Tuple[List, List]: - txo_rows = [resolve_url(raw_url) for raw_url in urls] - extra_txo_rows = _get_referenced_rows( - [txo for txo in txo_rows if isinstance(txo, dict)], - [txo.censor_id for txo in txo_rows if isinstance(txo, ResolveCensoredError)] - ) - return txo_rows, extra_txo_rows - - -@measure -def resolve_url(raw_url): - censor = ctx.get().get_resolve_censor() - - try: - url = URL.parse(raw_url) - except ValueError as e: - return e - - channel = None - - if url.has_channel: - query = url.channel.to_dict() - if set(query) == {'name'}: - query['is_controlling'] = True - else: - query['order_by'] = ['^creation_height'] - matches = search_claims(censor, **query, limit=1) - if matches: - channel = matches[0] - elif censor.censored: - return ResolveCensoredError(raw_url, next(iter(censor.censored))) - else: - return LookupError(f'Could not find channel in "{raw_url}".') - - if url.has_stream: - query = url.stream.to_dict() - if channel is not None: - if set(query) == {'name'}: - # temporarily emulate is_controlling for claims in channel - query['order_by'] = ['effective_amount', '^height'] - else: - query['order_by'] = ['^channel_join'] - query['channel_hash'] = channel['claim_hash'] - query['signature_valid'] = 1 - elif set(query) == {'name'}: - query['is_controlling'] = 1 - matches = search_claims(censor, **query, limit=1) - if matches: - return matches[0] - elif censor.censored: - return ResolveCensoredError(raw_url, next(iter(censor.censored))) - else: - return LookupError(f'Could not find claim at "{raw_url}".') - - return channel - - -CLAIM_HASH_OR_REPOST_HASH_SQL = f""" -CASE WHEN claim.claim_type = {CLAIM_TYPES['repost']} - THEN claim.reposted_claim_hash - ELSE claim.claim_hash -END -""" - - -def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): - any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - - all_items = {item for item in all_items if item not in not_items} - any_items = {item for item in any_items if item not in not_items} - - any_queries = {} - - if attr == 'tag': - common_tags = any_items & COMMON_TAGS.keys() - if common_tags: - any_items -= common_tags - if len(common_tags) < 5: - for item in common_tags: - index_name = COMMON_TAGS[item] - any_queries[f'#_common_tag_{index_name}'] = f""" - EXISTS( - SELECT 1 FROM tag INDEXED BY tag_{index_name}_idx - WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash - AND tag = '{item}' - ) - """ - elif len(common_tags) >= 5: - constraints.update({ - f'$any_common_tag{i}': item for i, item in enumerate(common_tags) - }) - values = ', '.join( - f':$any_common_tag{i}' for i in range(len(common_tags)) - ) - any_queries[f'#_any_common_tags'] = f""" - EXISTS( - SELECT 1 FROM tag WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash - AND tag IN ({values}) - ) - """ - elif attr == 'language': - indexed_languages = any_items & set(INDEXED_LANGUAGES) - if indexed_languages: - any_items -= indexed_languages - for language in indexed_languages: - any_queries[f'#_any_common_languages_{language}'] = f""" - EXISTS( - SELECT 1 FROM language INDEXED BY language_{language}_idx - WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=language.claim_hash - AND language = '{language}' - ) - """ - - if any_items: - - constraints.update({ - f'$any_{attr}{i}': item for i, item in enumerate(any_items) - }) - values = ', '.join( - f':$any_{attr}{i}' for i in range(len(any_items)) - ) - if for_count or attr == 'tag': - if attr == 'tag': - any_queries[f'#_any_{attr}'] = f""" - ((claim.claim_type != {CLAIM_TYPES['repost']} - AND claim.claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR - (claim.claim_type == {CLAIM_TYPES['repost']} AND - claim.reposted_claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values})))) - """ - else: - any_queries[f'#_any_{attr}'] = f""" - {CLAIM_HASH_OR_REPOST_HASH_SQL} IN ( - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - ) - """ - else: - any_queries[f'#_any_{attr}'] = f""" - EXISTS( - SELECT 1 FROM {attr} WHERE - {CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - if len(any_queries) == 1: - constraints.update(any_queries) - elif len(any_queries) > 1: - constraints[f'ORed_{attr}_queries__any'] = any_queries - - if all_items: - constraints[f'$all_{attr}_count'] = len(all_items) - constraints.update({ - f'$all_{attr}{i}': item for i, item in enumerate(all_items) - }) - values = ', '.join( - f':$all_{attr}{i}' for i in range(len(all_items)) - ) - if for_count: - constraints[f'#_all_{attr}'] = f""" - {CLAIM_HASH_OR_REPOST_HASH_SQL} IN ( - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count - ) - """ - else: - constraints[f'#_all_{attr}'] = f""" - {len(all_items)}=( - SELECT count(*) FROM {attr} WHERE - {CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - if not_items: - constraints.update({ - f'$not_{attr}{i}': item for i, item in enumerate(not_items) - }) - values = ', '.join( - f':$not_{attr}{i}' for i in range(len(not_items)) - ) - if for_count: - if attr == 'tag': - constraints[f'#_not_{attr}'] = f""" - ((claim.claim_type != {CLAIM_TYPES['repost']} - AND claim.claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR - (claim.claim_type == {CLAIM_TYPES['repost']} AND - claim.reposted_claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values})))) - """ - else: - constraints[f'#_not_{attr}'] = f""" - {CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN ( - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - ) - """ - else: - constraints[f'#_not_{attr}'] = f""" - NOT EXISTS( - SELECT 1 FROM {attr} WHERE - {CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ diff --git a/tests/unit/wallet/server/test_sqldb.py b/tests/unit/wallet/server/test_sqldb.py deleted file mode 100644 index 37095ef8d..000000000 --- a/tests/unit/wallet/server/test_sqldb.py +++ /dev/null @@ -1,765 +0,0 @@ -import unittest -import ecdsa -import hashlib -import logging -from binascii import hexlify -from typing import List, Tuple - -from lbry.wallet.constants import COIN, NULL_HASH32 -from lbry.schema.claim import Claim -from lbry.schema.result import Censor -from lbry.wallet.server.db import writer -from lbry.wallet.server.coin import LBCRegTest -from lbry.wallet.server.db.trending import zscore -from lbry.wallet.server.db.canonical import FindShortestID -from lbry.wallet.transaction import Transaction, Input, Output -try: - import reader -except: - from . import reader - - -def get_output(amount=COIN, pubkey_hash=NULL_HASH32): - return Transaction() \ - .add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \ - .outputs[0] - - -def get_input(): - return Input.spend(get_output()) - - -def get_tx(): - return Transaction().add_inputs([get_input()]) - - -def search(**constraints) -> List: - return reader.search_claims(Censor(Censor.SEARCH), **constraints) - - -def censored_search(**constraints) -> Tuple[List, Censor]: - rows, _, _, _, censor = reader.search(constraints) - return rows, censor - - -class TestSQLDB(unittest.TestCase): - query_timeout = 0.25 - - def setUp(self): - self.first_sync = False - self.daemon_height = 1 - self.coin = LBCRegTest() - db_url = 'file:test_sqldb?mode=memory&cache=shared' - self.sql = writer.SQLDB(self, db_url, [], [], [zscore]) - self.addCleanup(self.sql.close) - self.sql.open() - reader.initializer( - logging.getLogger(__name__), db_url, 'regtest', - self.query_timeout, block_and_filter=( - self.sql.blocked_streams, self.sql.blocked_channels, - self.sql.filtered_streams, self.sql.filtered_channels - ) - ) - self.addCleanup(reader.cleanup) - self._current_height = 0 - self._txos = {} - - def _make_tx(self, output, txi=None): - tx = get_tx().add_outputs([output]) - if txi is not None: - tx.add_inputs([txi]) - self._txos[output.ref.hash] = output - return tx, tx.hash - - def _set_channel_key(self, channel, key): - private_key = ecdsa.SigningKey.from_string(key*32, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256) - channel.private_key = private_key - channel.claim.channel.public_key_bytes = private_key.get_verifying_key().to_der() - channel.script.generate() - - def get_channel(self, title, amount, name='@foo', key=b'a'): - claim = Claim() - claim.channel.title = title - channel = Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc') - self._set_channel_key(channel, key) - return self._make_tx(channel) - - def get_channel_update(self, channel, amount, key=b'a'): - self._set_channel_key(channel, key) - return self._make_tx( - Output.pay_update_claim_pubkey_hash( - amount, channel.claim_name, channel.claim_id, channel.claim, b'abc' - ), - Input.spend(channel) - ) - - def get_stream(self, title, amount, name='foo', channel=None, **kwargs): - claim = Claim() - claim.stream.update(title=title, **kwargs) - result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc')) - if channel: - result[0].outputs[0].sign(channel) - result[0]._reset() - return result - - def get_stream_update(self, tx, amount, channel=None): - stream = Transaction(tx[0].raw).outputs[0] - result = self._make_tx( - Output.pay_update_claim_pubkey_hash( - amount, stream.claim_name, stream.claim_id, stream.claim, b'abc' - ), - Input.spend(stream) - ) - if channel: - result[0].outputs[0].sign(channel) - result[0]._reset() - return result - - def get_repost(self, claim_id, amount, channel): - claim = Claim() - claim.repost.reference.claim_id = claim_id - result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, 'repost', claim, b'abc')) - result[0].outputs[0].sign(channel) - result[0]._reset() - return result - - def get_abandon(self, tx): - claim = Transaction(tx[0].raw).outputs[0] - return self._make_tx( - Output.pay_pubkey_hash(claim.amount, b'abc'), - Input.spend(claim) - ) - - def get_support(self, tx, amount): - claim = Transaction(tx[0].raw).outputs[0] - return self._make_tx( - Output.pay_support_pubkey_hash( - amount, claim.claim_name, claim.claim_id, b'abc' - ) - ) - - def get_controlling(self): - for claim in self.sql.execute("select claim.* from claimtrie natural join claim"): - txo = self._txos[claim.txo_hash] - controlling = txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height - return controlling - - def get_active(self): - controlling = self.get_controlling() - active = [] - for claim in self.sql.execute( - f"select * from claim where activation_height <= {self._current_height}"): - txo = self._txos[claim.txo_hash] - if controlling and controlling[0] == txo.claim.stream.title: - continue - active.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height)) - return active - - def get_accepted(self): - accepted = [] - for claim in self.sql.execute( - f"select * from claim where activation_height > {self._current_height}"): - txo = self._txos[claim.txo_hash] - accepted.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height)) - return accepted - - def advance(self, height, txs): - self._current_height = height - self.sql.advance_txs(height, txs, {'timestamp': 1}, self.daemon_height, self.timer) - return [otx[0].outputs[0] for otx in txs] - - def state(self, controlling=None, active=None, accepted=None): - self.assertEqual(controlling, self.get_controlling()) - self.assertEqual(active or [], self.get_active()) - self.assertEqual(accepted or [], self.get_accepted()) - - -@unittest.skip("port canonical url tests to leveldb") # TODO: port canonical url tests to leveldb -class TestClaimtrie(TestSQLDB): - - def test_example_from_spec(self): - # https://spec.lbry.com/#claim-activation-example - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(13, [stream]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[] - ) - advance(1001, [self.get_stream('Claim B', 20*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[('Claim B', 20*COIN, 0, 1031)] - ) - advance(1010, [self.get_support(stream, 14*COIN)]) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[], - accepted=[('Claim B', 20*COIN, 0, 1031)] - ) - advance(1020, [self.get_stream('Claim C', 50*COIN)]) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[], - accepted=[ - ('Claim B', 20*COIN, 0, 1031), - ('Claim C', 50*COIN, 0, 1051)] - ) - advance(1031, []) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[('Claim B', 20*COIN, 20*COIN, 1031)], - accepted=[('Claim C', 50*COIN, 0, 1051)] - ) - advance(1040, [self.get_stream('Claim D', 300*COIN)]) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[('Claim B', 20*COIN, 20*COIN, 1031)], - accepted=[ - ('Claim C', 50*COIN, 0, 1051), - ('Claim D', 300*COIN, 0, 1072)] - ) - advance(1051, []) - state( - controlling=('Claim D', 300*COIN, 300*COIN, 1051), - active=[ - ('Claim A', 10*COIN, 24*COIN, 13), - ('Claim B', 20*COIN, 20*COIN, 1031), - ('Claim C', 50*COIN, 50*COIN, 1051)], - accepted=[] - ) - # beyond example - advance(1052, [self.get_stream_update(stream, 290*COIN)]) - state( - controlling=('Claim A', 290*COIN, 304*COIN, 13), - active=[ - ('Claim B', 20*COIN, 20*COIN, 1031), - ('Claim C', 50*COIN, 50*COIN, 1051), - ('Claim D', 300*COIN, 300*COIN, 1051), - ], - accepted=[] - ) - - def test_competing_claims_subsequent_blocks_height_wins(self): - advance, state = self.advance, self.state - advance(13, [self.get_stream('Claim A', 10*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[] - ) - advance(14, [self.get_stream('Claim B', 10*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[('Claim B', 10*COIN, 10*COIN, 14)], - accepted=[] - ) - advance(15, [self.get_stream('Claim C', 10*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[ - ('Claim B', 10*COIN, 10*COIN, 14), - ('Claim C', 10*COIN, 10*COIN, 15)], - accepted=[] - ) - - def test_competing_claims_in_single_block_position_wins(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 10*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[('Claim B', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - - def test_competing_claims_in_single_block_effective_amount_wins(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 11*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim B', 11*COIN, 11*COIN, 13), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - - def test_winning_claim_deleted(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 11*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim B', 11*COIN, 11*COIN, 13), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - advance(14, [self.get_abandon(stream2)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[] - ) - - def test_winning_claim_deleted_and_new_claim_becomes_winner(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 11*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim B', 11*COIN, 11*COIN, 13), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - advance(15, [self.get_abandon(stream2), self.get_stream('Claim C', 12*COIN)]) - state( - controlling=('Claim C', 12*COIN, 12*COIN, 15), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - - def test_winning_claim_expires_and_another_takes_over(self): - advance, state = self.advance, self.state - advance(10, [self.get_stream('Claim A', 11*COIN)]) - advance(20, [self.get_stream('Claim B', 10*COIN)]) - state( - controlling=('Claim A', 11*COIN, 11*COIN, 10), - active=[('Claim B', 10*COIN, 10*COIN, 20)], - accepted=[] - ) - advance(262984, []) - state( - controlling=('Claim B', 10*COIN, 10*COIN, 20), - active=[], - accepted=[] - ) - advance(262994, []) - state( - controlling=None, - active=[], - accepted=[] - ) - - def test_create_and_update_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream, self.get_stream_update(stream, 11*COIN)]) - self.assertTrue(search()[0]) - - def test_double_updates_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream]) - update = self.get_stream_update(stream, 11*COIN) - advance(20, [update, self.get_stream_update(update, 9*COIN)]) - self.assertTrue(search()[0]) - - def test_create_and_abandon_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream, self.get_abandon(stream)]) - self.assertFalse(search()) - - def test_update_and_abandon_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream]) - update = self.get_stream_update(stream, 11*COIN) - advance(20, [update, self.get_abandon(update)]) - self.assertFalse(search()) - - def test_create_update_and_delete_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - update = self.get_stream_update(stream, 11*COIN) - advance(10, [stream, update, self.get_abandon(update)]) - self.assertFalse(search()) - - def test_support_added_and_removed_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream]) - support = self.get_support(stream, COIN) - advance(20, [support, self.get_abandon(support)]) - self.assertEqual(search()[0]['support_amount'], 0) - - @staticmethod - def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs): - iterations = cached_iteration+1 if cached_iteration else 100 - for i in range(cached_iteration or 1, iterations): - stream = getter(f'claim #{i}', COIN, **kwargs) - if stream[0].outputs[0].claim_id.startswith(prefix): - cached_iteration is None and print(f'Found "{prefix}" in {i} iterations.') - return stream - if cached_iteration: - raise ValueError(f'Failed to find "{prefix}" at cached iteration, run with None to find iteration.') - raise ValueError(f'Failed to find "{prefix}" in {iterations} iterations, try different values.') - - def get_channel_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs): - return self._get_x_with_claim_id_prefix(self.get_channel, prefix, cached_iteration, **kwargs) - - def get_stream_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs): - return self._get_x_with_claim_id_prefix(self.get_stream, prefix, cached_iteration, **kwargs) - - def test_canonical_url_and_channel_validation(self): - advance = self.advance - - tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c') - tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c') - txo_chan_a = tx_chan_a[0].outputs[0] - txo_chan_ab = tx_chan_ab[0].outputs[0] - advance(1, [tx_chan_a]) - advance(2, [tx_chan_ab]) - (r_ab, r_a) = search(order_by=['creation_height'], limit=2) - self.assertEqual("@foo#a", r_a['short_url']) - self.assertEqual("@foo#ab", r_ab['short_url']) - self.assertIsNone(r_a['canonical_url']) - self.assertIsNone(r_ab['canonical_url']) - self.assertEqual(0, r_a['claims_in_channel']) - self.assertEqual(0, r_ab['claims_in_channel']) - - tx_a = self.get_stream_with_claim_id_prefix('a', 2) - tx_ab = self.get_stream_with_claim_id_prefix('ab', 42) - tx_abc = self.get_stream_with_claim_id_prefix('abc', 65) - advance(3, [tx_a]) - advance(4, [tx_ab, tx_abc]) - (r_abc, r_ab, r_a) = search(order_by=['creation_height', 'tx_position'], limit=3) - self.assertEqual("foo#a", r_a['short_url']) - self.assertEqual("foo#ab", r_ab['short_url']) - self.assertEqual("foo#abc", r_abc['short_url']) - self.assertIsNone(r_a['canonical_url']) - self.assertIsNone(r_ab['canonical_url']) - self.assertIsNone(r_abc['canonical_url']) - - tx_a2 = self.get_stream_with_claim_id_prefix('a', 7, channel=txo_chan_a) - tx_ab2 = self.get_stream_with_claim_id_prefix('ab', 23, channel=txo_chan_a) - a2_claim = tx_a2[0].outputs[0] - ab2_claim = tx_ab2[0].outputs[0] - advance(6, [tx_a2]) - advance(7, [tx_ab2]) - (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) - self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) - self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - - # change channel public key, invaliding stream claim signatures - advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')]) - (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) - self.assertIsNone(r_a2['canonical_url']) - self.assertIsNone(r_ab2['canonical_url']) - self.assertEqual(0, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - - # reinstate previous channel public key (previous stream claim signatures become valid again) - channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c') - advance(9, [channel_update]) - (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) - self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) - self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) - - # change channel of stream - self.assertEqual("@foo#a/foo#ab", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url']) - tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab) - advance(10, [tx_ab2]) - self.assertEqual("@foo#ab/foo#a", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url']) - # TODO: currently there is a bug where stream leaving a channel does not update that channels claims count - self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - # TODO: after bug is fixed remove test above and add test below - #self.assertEqual(1, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - self.assertEqual(1, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) - - # claim abandon updates claims_in_channel - advance(11, [self.get_abandon(tx_ab2)]) - self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) - - # delete channel, invaliding stream claim signatures - advance(12, [self.get_abandon(channel_update)]) - (r_a2,) = search(order_by=['creation_height'], limit=1) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertIsNone(r_a2['canonical_url']) - - def test_resolve_issue_2448(self): - advance = self.advance - - tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c') - tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c') - txo_chan_a = tx_chan_a[0].outputs[0] - txo_chan_ab = tx_chan_ab[0].outputs[0] - advance(1, [tx_chan_a]) - advance(2, [tx_chan_ab]) - - self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash) - self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash) - - # update increase last height change of channel - advance(9, [self.get_channel_update(txo_chan_a, COIN, key=b'c')]) - - # make sure that activation_height is used instead of height (issue #2448) - self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash) - self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash) - - def test_canonical_find_shortest_id(self): - new_hash = 'abcdef0123456789beef' - other0 = '1bcdef0123456789beef' - other1 = 'ab1def0123456789beef' - other2 = 'abc1ef0123456789beef' - other3 = 'abcdef0123456789bee1' - f = FindShortestID() - f.step(other0, new_hash) - self.assertEqual('#a', f.finalize()) - f.step(other1, new_hash) - self.assertEqual('#abc', f.finalize()) - f.step(other2, new_hash) - self.assertEqual('#abcd', f.finalize()) - f.step(other3, new_hash) - self.assertEqual('#abcdef0123456789beef', f.finalize()) - - -@unittest.skip("port trending tests to ES") # TODO: port trending tests to ES -class TestTrending(TestSQLDB): - - def test_trending(self): - advance, state = self.advance, self.state - no_trend = self.get_stream('Claim A', COIN) - downwards = self.get_stream('Claim B', COIN) - up_small = self.get_stream('Claim C', COIN) - up_medium = self.get_stream('Claim D', COIN) - up_biggly = self.get_stream('Claim E', COIN) - claims = advance(1, [up_biggly, up_medium, up_small, no_trend, downwards]) - for window in range(1, 8): - advance(zscore.TRENDING_WINDOW * window, [ - self.get_support(downwards, (20-window)*COIN), - self.get_support(up_small, int(20+(window/10)*COIN)), - self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN), - self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN), - ]) - results = search(order_by=['trending_local']) - self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results]) - self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results]) - self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results]) - self.assertEqual([4, 4, 2, 0, 1], [int(c['trending_group']) for c in results]) - self.assertEqual([53, 38, 2, 0, -6], [int(c['trending_mixed']) for c in results]) - - def test_edge(self): - problematic = self.get_stream('Problem', COIN) - self.advance(1, [problematic]) - self.advance(zscore.TRENDING_WINDOW, [self.get_support(problematic, 53000000000)]) - self.advance(zscore.TRENDING_WINDOW * 2, [self.get_support(problematic, 500000000)]) - - -@unittest.skip("filtering/blocking is applied during ES sync, this needs to be ported to integration test") -class TestContentBlocking(TestSQLDB): - - def test_blocking_and_filtering(self): - # content claims and channels - tx0 = self.get_channel('A Channel', COIN, '@channel1') - regular_channel = tx0[0].outputs[0] - tx1 = self.get_stream('Claim One', COIN, 'claim1') - tx2 = self.get_stream('Claim Two', COIN, 'claim2', regular_channel) - tx3 = self.get_stream('Claim Three', COIN, 'claim3') - self.advance(1, [tx0, tx1, tx2, tx3]) - claim1, claim2, claim3 = tx1[0].outputs[0], tx2[0].outputs[0], tx3[0].outputs[0] - - # block and filter channels - tx0 = self.get_channel('Blocking Channel', COIN, '@block') - tx1 = self.get_channel('Filtering Channel', COIN, '@filter') - blocking_channel = tx0[0].outputs[0] - filtering_channel = tx1[0].outputs[0] - self.sql.blocking_channel_hashes.add(blocking_channel.claim_hash) - self.sql.filtering_channel_hashes.add(filtering_channel.claim_hash) - self.advance(2, [tx0, tx1]) - self.assertEqual({}, dict(self.sql.blocked_streams)) - self.assertEqual({}, dict(self.sql.blocked_channels)) - self.assertEqual({}, dict(self.sql.filtered_streams)) - self.assertEqual({}, dict(self.sql.filtered_channels)) - - # nothing blocked - results, _ = reader.resolve([ - claim1.claim_name, claim2.claim_name, - claim3.claim_name, regular_channel.claim_name - ]) - self.assertEqual(claim1.claim_hash, results[0]['claim_hash']) - self.assertEqual(claim2.claim_hash, results[1]['claim_hash']) - self.assertEqual(claim3.claim_hash, results[2]['claim_hash']) - self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash']) - - # nothing filtered - results, censor = censored_search() - self.assertEqual(6, len(results)) - self.assertEqual(0, censor.total) - self.assertEqual({}, censor.censored) - - # block claim reposted to blocking channel, also gets filtered - repost_tx1 = self.get_repost(claim1.claim_id, COIN, blocking_channel) - repost1 = repost_tx1[0].outputs[0] - self.advance(3, [repost_tx1]) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_streams) - ) - self.assertEqual({}, dict(self.sql.blocked_channels)) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_streams) - ) - self.assertEqual({}, dict(self.sql.filtered_channels)) - - # claim is blocked from results by direct repost - results, censor = censored_search(text='Claim') - self.assertEqual(2, len(results)) - self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) - self.assertEqual(claim3.claim_hash, results[1]['claim_hash']) - self.assertEqual(1, censor.total) - self.assertEqual({blocking_channel.claim_hash: 1}, censor.censored) - results, _ = reader.resolve([claim1.claim_name]) - self.assertEqual( - f"Resolve of 'claim1' was censored by channel with claim id '{blocking_channel.claim_id}'.", - results[0].args[0] - ) - results, _ = reader.resolve([ - claim2.claim_name, regular_channel.claim_name # claim2 and channel still resolved - ]) - self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) - self.assertEqual(regular_channel.claim_hash, results[1]['claim_hash']) - - # block claim indirectly by blocking its parent channel - repost_tx2 = self.get_repost(regular_channel.claim_id, COIN, blocking_channel) - repost2 = repost_tx2[0].outputs[0] - self.advance(4, [repost_tx2]) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_channels) - ) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_channels) - ) - - # claim in blocked channel is filtered from search and can't resolve - results, censor = censored_search(text='Claim') - self.assertEqual(1, len(results)) - self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) - self.assertEqual(2, censor.total) - self.assertEqual({blocking_channel.claim_hash: 2}, censor.censored) - results, _ = reader.resolve([ - claim2.claim_name, regular_channel.claim_name # claim2 and channel don't resolve - ]) - self.assertEqual( - f"Resolve of 'claim2' was censored by channel with claim id '{blocking_channel.claim_id}'.", - results[0].args[0] - ) - self.assertEqual( - f"Resolve of '@channel1' was censored by channel with claim id '{blocking_channel.claim_id}'.", - results[1].args[0] - ) - results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved - self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) - - # filtered claim is only filtered and not blocked - repost_tx3 = self.get_repost(claim3.claim_id, COIN, filtering_channel) - repost3 = repost_tx3[0].outputs[0] - self.advance(5, [repost_tx3]) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_channels) - ) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash, - repost3.claim.repost.reference.claim_hash: filtering_channel.claim_hash}, - dict(self.sql.filtered_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_channels) - ) - - # filtered claim doesn't return in search but is resolveable - results, censor = censored_search(text='Claim') - self.assertEqual(0, len(results)) - self.assertEqual(3, censor.total) - self.assertEqual({blocking_channel.claim_hash: 2, filtering_channel.claim_hash: 1}, censor.censored) - results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved - self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) - - # abandon unblocks content - self.advance(6, [ - self.get_abandon(repost_tx1), - self.get_abandon(repost_tx2), - self.get_abandon(repost_tx3) - ]) - self.assertEqual({}, dict(self.sql.blocked_streams)) - self.assertEqual({}, dict(self.sql.blocked_channels)) - self.assertEqual({}, dict(self.sql.filtered_streams)) - self.assertEqual({}, dict(self.sql.filtered_channels)) - results, censor = censored_search(text='Claim') - self.assertEqual(3, len(results)) - self.assertEqual(0, censor.total) - results, censor = censored_search() - self.assertEqual(6, len(results)) - self.assertEqual(0, censor.total) - results, _ = reader.resolve([ - claim1.claim_name, claim2.claim_name, - claim3.claim_name, regular_channel.claim_name - ]) - self.assertEqual(claim1.claim_hash, results[0]['claim_hash']) - self.assertEqual(claim2.claim_hash, results[1]['claim_hash']) - self.assertEqual(claim3.claim_hash, results[2]['claim_hash']) - self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash']) - - def test_pagination(self): - one, two, three, four, five, six, seven, filter_channel = self.advance(1, [ - self.get_stream('One', COIN), - self.get_stream('Two', COIN), - self.get_stream('Three', COIN), - self.get_stream('Four', COIN), - self.get_stream('Five', COIN), - self.get_stream('Six', COIN), - self.get_stream('Seven', COIN), - self.get_channel('Filtering Channel', COIN, '@filter'), - ]) - self.sql.filtering_channel_hashes.add(filter_channel.claim_hash) - - # nothing filtered - results, censor = censored_search(order_by='^height', offset=1, limit=3) - self.assertEqual(3, len(results)) - self.assertEqual( - [two.claim_hash, three.claim_hash, four.claim_hash], - [r['claim_hash'] for r in results] - ) - self.assertEqual(0, censor.total) - - # content filtered - repost1, repost2 = self.advance(2, [ - self.get_repost(one.claim_id, COIN, filter_channel), - self.get_repost(two.claim_id, COIN, filter_channel), - ]) - results, censor = censored_search(order_by='^height', offset=1, limit=3) - self.assertEqual(3, len(results)) - self.assertEqual( - [four.claim_hash, five.claim_hash, six.claim_hash], - [r['claim_hash'] for r in results] - ) - self.assertEqual(2, censor.total) - self.assertEqual({filter_channel.claim_hash: 2}, censor.censored)