delete unused code

This commit is contained in:
Jack Robison 2021-09-09 14:18:08 -04:00 committed by Victor Shyba
parent 4e687c4fd8
commit 12f790ab01
6 changed files with 1 additions and 2658 deletions

View file

@ -1,22 +0,0 @@
class FindShortestID:
__slots__ = 'short_id', 'new_id'
def __init__(self):
self.short_id = ''
self.new_id = None
def step(self, other_id, new_id):
self.new_id = new_id
for i in range(len(self.new_id)):
if other_id[i] != self.new_id[i]:
if i > len(self.short_id)-1:
self.short_id = self.new_id[:i+1]
break
def finalize(self):
if self.short_id:
return '#'+self.short_id
def register_canonical_functions(connection):
connection.create_aggregate("shortest_id", 2, FindShortestID)

View file

@ -66,7 +66,7 @@ RANGE_FIELDS = {
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_score', 'censor_type', 'tx_num', 'trending_score_change'
'trending_score', 'censor_type', 'tx_num'
}
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS

View file

@ -1,299 +0,0 @@
import math
import os
import sqlite3
import time
HALF_LIFE = 400
RENORM_INTERVAL = 1000
WHALE_THRESHOLD = 10000.0
def whale_decay_factor(lbc):
"""
An additional decay factor applied to whale claims.
"""
if lbc <= WHALE_THRESHOLD:
return 1.0
adjusted_half_life = HALF_LIFE/(math.log10(lbc/WHALE_THRESHOLD) + 1.0)
return 2.0**(1.0/HALF_LIFE - 1.0/adjusted_half_life)
def soften(lbc):
mag = abs(lbc) + 1E-8
sign = 1.0 if lbc >= 0.0 else -1.0
return sign*mag**0.25
def delay(lbc: int):
if lbc <= 0:
return 0
elif lbc < 1000000:
return int(lbc**0.5)
else:
return 1000
def inflate_units(height):
blocks = height % RENORM_INTERVAL
return 2.0 ** (blocks/HALF_LIFE)
PRAGMAS = ["PRAGMA FOREIGN_KEYS = OFF;",
"PRAGMA JOURNAL_MODE = WAL;",
"PRAGMA SYNCHRONOUS = 0;"]
class TrendingDB:
def __init__(self, data_dir):
"""
Opens the trending database in the directory data_dir.
For testing, pass data_dir=":memory:"
"""
if data_dir == ":memory:":
path = ":memory:"
else:
path = os.path.join(data_dir, "trending.db")
self.db = sqlite3.connect(path, check_same_thread=False)
for pragma in PRAGMAS:
self.execute(pragma)
self.execute("BEGIN;")
self._create_tables()
self._create_indices()
self.execute("COMMIT;")
self.pending_events = []
def execute(self, *args, **kwargs):
return self.db.execute(*args, **kwargs)
def add_event(self, event):
self.pending_events.append(event)
# print(f"Added event: {event}.", flush=True)
def _create_tables(self):
self.execute("""CREATE TABLE IF NOT EXISTS claims
(claim_hash BYTES NOT NULL PRIMARY KEY,
bid_lbc REAL NOT NULL,
support_lbc REAL NOT NULL,
trending_score REAL NOT NULL,
needs_write BOOLEAN NOT NULL)
WITHOUT ROWID;""")
self.execute("""CREATE TABLE IF NOT EXISTS spikes
(claim_hash BYTES NOT NULL REFERENCES claims (claim_hash),
activation_height INTEGER NOT NULL,
mass REAL NOT NULL);""")
def _create_indices(self):
self.execute("CREATE INDEX IF NOT EXISTS idx1 ON spikes\
(activation_height, claim_hash, mass);")
self.execute("CREATE INDEX IF NOT EXISTS idx2 ON spikes\
(claim_hash);")
self.execute("CREATE INDEX IF NOT EXISTS idx3 ON claims (trending_score);")
self.execute("CREATE INDEX IF NOT EXISTS idx4 ON claims (needs_write, claim_hash);")
self.execute("CREATE INDEX IF NOT EXISTS idx5 ON claims (bid_lbc + support_lbc);")
def get_trending_score(self, claim_hash):
result = self.execute("SELECT trending_score FROM claims\
WHERE claim_hash = ?;", (claim_hash, ))\
.fetchall()
if len(result) == 0:
return 0.0
else:
return result[0]
def _upsert_claim(self, height, event):
claim_hash = event["claim_hash"]
# Get old total lbc value of claim
old_lbc_pair = self.execute("SELECT bid_lbc, support_lbc FROM claims\
WHERE claim_hash = ?;",
(claim_hash, )).fetchone()
if old_lbc_pair is None:
old_lbc_pair = (0.0, 0.0)
if event["event"] == "upsert":
new_lbc_pair = (event["lbc"], old_lbc_pair[1])
elif event["event"] == "support":
new_lbc_pair = (old_lbc_pair[0], old_lbc_pair[1] + event["lbc"])
# Upsert the claim
self.execute("INSERT INTO claims VALUES (?, ?, ?, ?, 1)\
ON CONFLICT (claim_hash) DO UPDATE\
SET bid_lbc = excluded.bid_lbc,\
support_lbc = excluded.support_lbc;",
(claim_hash, new_lbc_pair[0], new_lbc_pair[1], 0.0))
if self.active:
old_lbc, lbc = sum(old_lbc_pair), sum(new_lbc_pair)
# Add the spike
softened_change = soften(lbc - old_lbc)
change_in_softened = soften(lbc) - soften(old_lbc)
spike_mass = (softened_change**0.25*change_in_softened**0.75).real
activation_height = height + delay(lbc)
if spike_mass != 0.0:
self.execute("INSERT INTO spikes VALUES (?, ?, ?);",
(claim_hash, activation_height, spike_mass))
def _delete_claim(self, claim_hash):
self.execute("DELETE FROM spikes WHERE claim_hash = ?;", (claim_hash, ))
self.execute("DELETE FROM claims WHERE claim_hash = ?;", (claim_hash, ))
def _apply_spikes(self, height):
spikes = self.execute("SELECT claim_hash, mass FROM spikes\
WHERE activation_height = ?;",
(height, )).fetchall()
for claim_hash, mass in spikes: # TODO: executemany for efficiency
self.execute("UPDATE claims SET trending_score = trending_score + ?,\
needs_write = 1\
WHERE claim_hash = ?;",
(mass, claim_hash))
self.execute("DELETE FROM spikes WHERE activation_height = ?;",
(height, ))
def _decay_whales(self):
whales = self.execute("SELECT claim_hash, bid_lbc + support_lbc FROM claims\
WHERE bid_lbc + support_lbc >= ?;", (WHALE_THRESHOLD, ))\
.fetchall()
for claim_hash, lbc in whales:
factor = whale_decay_factor(lbc)
self.execute("UPDATE claims SET trending_score = trending_score*?, needs_write = 1\
WHERE claim_hash = ?;", (factor, claim_hash))
def _renorm(self):
factor = 2.0**(-RENORM_INTERVAL/HALF_LIFE)
# Zero small values
self.execute("UPDATE claims SET trending_score = 0.0, needs_write = 1\
WHERE trending_score <> 0.0 AND ABS(?*trending_score) < 1E-6;",
(factor, ))
# Normalise other values
self.execute("UPDATE claims SET trending_score = ?*trending_score, needs_write = 1\
WHERE trending_score <> 0.0;", (factor, ))
def process_block(self, height, daemon_height):
self.active = daemon_height - height <= 10*HALF_LIFE
self.execute("BEGIN;")
if self.active:
# Check for a unit change
if height % RENORM_INTERVAL == 0:
self._renorm()
# Apply extra whale decay
self._decay_whales()
# Upsert claims
for event in self.pending_events:
if event["event"] == "upsert":
self._upsert_claim(height, event)
# Process supports
for event in self.pending_events:
if event["event"] == "support":
self._upsert_claim(height, event)
# Delete claims
for event in self.pending_events:
if event["event"] == "delete":
self._delete_claim(event["claim_hash"])
if self.active:
# Apply spikes
self._apply_spikes(height)
# Get set of claims that need writing to ES
claims_to_write = set()
for row in self.db.execute("SELECT claim_hash FROM claims WHERE\
needs_write = 1;"):
claims_to_write.add(row[0])
self.db.execute("UPDATE claims SET needs_write = 0\
WHERE needs_write = 1;")
self.execute("COMMIT;")
self.pending_events.clear()
return claims_to_write
if __name__ == "__main__":
import matplotlib.pyplot as plt
import numpy as np
import numpy.random as rng
import os
trending_db = TrendingDB(":memory:")
heights = list(range(1, 1000))
heights = heights + heights[::-1] + heights
events = [{"height": 45,
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
{"height": 100,
"what": dict(claim_hash="a", event="support", lbc=3.0)},
{"height": 150,
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
{"height": 170,
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
{"height": 730,
"what": dict(claim_hash="a", event="delete")}]
inverse_events = [{"height": 730,
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
{"height": 170,
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
{"height": 150,
"what": dict(claim_hash="a", event="support", lbc=3.0)},
{"height": 100,
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
{"height": 45,
"what": dict(claim_hash="a", event="delete")}]
xs, ys = [], []
last_height = 0
for height in heights:
# Prepare the changes
if height > last_height:
es = events
else:
es = inverse_events
for event in es:
if event["height"] == height:
trending_db.add_event(event["what"])
# Process the block
trending_db.process_block(height, height)
if height > last_height: # Only plot when moving forward
xs.append(height)
y = trending_db.execute("SELECT trending_score FROM claims;").fetchone()
y = 0.0 if y is None else y[0]
ys.append(y/inflate_units(height))
last_height = height
xs = np.array(xs)
ys = np.array(ys)
plt.figure(1)
plt.plot(xs, ys, "o-", alpha=0.2)
plt.figure(2)
plt.plot(xs)
plt.show()

View file

@ -1,955 +0,0 @@
import os
import sqlite3
from typing import Union, Tuple, Set, List
from itertools import chain
from decimal import Decimal
from collections import namedtuple
from binascii import unhexlify, hexlify
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger
from lbry.wallet.database import query, constraints_to_sql
from lbry.schema.tags import clean_tags
from lbry.schema.mime_types import guess_stream_type
from lbry.wallet import Ledger, RegTestLedger
from lbry.wallet.transaction import Transaction, Output
from lbry.wallet.server.db.canonical import register_canonical_functions
from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
sqlite3.enable_callback_tracebacks(True)
class SQLDB:
PRAGMAS = """
pragma journal_mode=WAL;
"""
CREATE_CLAIM_TABLE = """
create table if not exists claim (
claim_hash bytes primary key,
claim_id text not null,
claim_name text not null,
normalized text not null,
txo_hash bytes not null,
tx_position integer not null,
amount integer not null,
timestamp integer not null, -- last updated timestamp
creation_timestamp integer not null,
height integer not null, -- last updated height
creation_height integer not null,
activation_height integer,
expiration_height integer not null,
release_time integer not null,
short_url text not null, -- normalized#shortest-unique-claim_id
canonical_url text, -- channel's-short_url/normalized#shortest-unique-claim_id-within-channel
title text,
author text,
description text,
claim_type integer,
has_source bool,
reposted integer default 0,
-- streams
stream_type text,
media_type text,
fee_amount integer default 0,
fee_currency text,
duration integer,
-- reposts
reposted_claim_hash bytes,
-- claims which are channels
public_key_bytes bytes,
public_key_hash bytes,
claims_in_channel integer,
-- claims which are inside channels
channel_hash bytes,
channel_join integer, -- height at which claim got valid signature / joined channel
signature bytes,
signature_digest bytes,
signature_valid bool,
effective_amount integer not null default 0,
support_amount integer not null default 0,
trending_group integer not null default 0,
trending_mixed integer not null default 0,
trending_local integer not null default 0,
trending_global integer not null default 0
);
create index if not exists claim_normalized_idx on claim (normalized, activation_height);
create index if not exists claim_channel_hash_idx on claim (channel_hash, signature, claim_hash);
create index if not exists claim_claims_in_channel_idx on claim (signature_valid, channel_hash, normalized);
create index if not exists claim_txo_hash_idx on claim (txo_hash);
create index if not exists claim_activation_height_idx on claim (activation_height, claim_hash);
create index if not exists claim_expiration_height_idx on claim (expiration_height);
create index if not exists claim_reposted_claim_hash_idx on claim (reposted_claim_hash);
"""
CREATE_SUPPORT_TABLE = """
create table if not exists support (
txo_hash bytes primary key,
tx_position integer not null,
height integer not null,
claim_hash bytes not null,
amount integer not null
);
create index if not exists support_claim_hash_idx on support (claim_hash, height);
"""
CREATE_TAG_TABLE = """
create table if not exists tag (
tag text not null,
claim_hash bytes not null,
height integer not null
);
create unique index if not exists tag_claim_hash_tag_idx on tag (claim_hash, tag);
"""
CREATE_LANGUAGE_TABLE = """
create table if not exists language (
language text not null,
claim_hash bytes not null,
height integer not null
);
create unique index if not exists language_claim_hash_language_idx on language (claim_hash, language);
"""
CREATE_CLAIMTRIE_TABLE = """
create table if not exists claimtrie (
normalized text primary key,
claim_hash bytes not null,
last_take_over_height integer not null
);
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
"""
CREATE_CHANGELOG_TRIGGER = """
create table if not exists changelog (
claim_hash bytes primary key
);
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
create trigger if not exists claim_changelog after update on claim
begin
insert or ignore into changelog (claim_hash) values (new.claim_hash);
end;
create trigger if not exists claimtrie_changelog after update on claimtrie
begin
insert or ignore into changelog (claim_hash) values (new.claim_hash);
insert or ignore into changelog (claim_hash) values (old.claim_hash);
end;
"""
SEARCH_INDEXES = """
-- used by any tag clouds
create index if not exists tag_tag_idx on tag (tag, claim_hash);
-- naked order bys (no filters)
create unique index if not exists claim_release_idx on claim (release_time, claim_hash);
create unique index if not exists claim_trending_idx on claim (trending_group, trending_mixed, claim_hash);
create unique index if not exists claim_effective_amount_idx on claim (effective_amount, claim_hash);
-- claim_type filter + order by
create unique index if not exists claim_type_release_idx on claim (release_time, claim_type, claim_hash);
create unique index if not exists claim_type_trending_idx on claim (trending_group, trending_mixed, claim_type, claim_hash);
create unique index if not exists claim_type_effective_amount_idx on claim (effective_amount, claim_type, claim_hash);
-- stream_type filter + order by
create unique index if not exists stream_type_release_idx on claim (stream_type, release_time, claim_hash);
create unique index if not exists stream_type_trending_idx on claim (stream_type, trending_group, trending_mixed, claim_hash);
create unique index if not exists stream_type_effective_amount_idx on claim (stream_type, effective_amount, claim_hash);
-- channel_hash filter + order by
create unique index if not exists channel_hash_release_idx on claim (channel_hash, release_time, claim_hash);
create unique index if not exists channel_hash_trending_idx on claim (channel_hash, trending_group, trending_mixed, claim_hash);
create unique index if not exists channel_hash_effective_amount_idx on claim (channel_hash, effective_amount, claim_hash);
-- duration filter + order by
create unique index if not exists duration_release_idx on claim (duration, release_time, claim_hash);
create unique index if not exists duration_trending_idx on claim (duration, trending_group, trending_mixed, claim_hash);
create unique index if not exists duration_effective_amount_idx on claim (duration, effective_amount, claim_hash);
-- fee_amount + order by
create unique index if not exists fee_amount_release_idx on claim (fee_amount, release_time, claim_hash);
create unique index if not exists fee_amount_trending_idx on claim (fee_amount, trending_group, trending_mixed, claim_hash);
create unique index if not exists fee_amount_effective_amount_idx on claim (fee_amount, effective_amount, claim_hash);
-- TODO: verify that all indexes below are used
create index if not exists claim_height_normalized_idx on claim (height, normalized asc);
create index if not exists claim_resolve_idx on claim (normalized, claim_id);
create index if not exists claim_id_idx on claim (claim_id, claim_hash);
create index if not exists claim_timestamp_idx on claim (timestamp);
create index if not exists claim_public_key_hash_idx on claim (public_key_hash);
create index if not exists claim_signature_valid_idx on claim (signature_valid);
"""
TAG_INDEXES = '\n'.join(
f"create unique index if not exists tag_{tag_key}_idx on tag (tag, claim_hash) WHERE tag='{tag_value}';"
for tag_value, tag_key in COMMON_TAGS.items()
)
LANGUAGE_INDEXES = '\n'.join(
f"create unique index if not exists language_{language}_idx on language (language, claim_hash) WHERE language='{language}';"
for language in INDEXED_LANGUAGES
)
CREATE_TABLES_QUERY = (
CREATE_CLAIM_TABLE +
CREATE_SUPPORT_TABLE +
CREATE_CLAIMTRIE_TABLE +
CREATE_TAG_TABLE +
CREATE_CHANGELOG_TRIGGER +
CREATE_LANGUAGE_TABLE
)
def __init__(
self, main, path: str, blocking_channels: list, filtering_channels: list, trending: list):
self.main = main
self._db_path = path
self.db = None
self.logger = class_logger(__name__, self.__class__.__name__)
self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger
self.blocked_streams = None
self.blocked_channels = None
self.blocking_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in blocking_channels if channel_id
}
self.filtered_streams = None
self.filtered_channels = None
self.filtering_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
}
self.trending = trending
self.pending_deletes = set()
def open(self):
self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False, uri=True)
def namedtuple_factory(cursor, row):
Row = namedtuple('Row', (d[0] for d in cursor.description))
return Row(*row)
self.db.row_factory = namedtuple_factory
self.db.executescript(self.PRAGMAS)
self.db.executescript(self.CREATE_TABLES_QUERY)
register_canonical_functions(self.db)
self.blocked_streams = {}
self.blocked_channels = {}
self.filtered_streams = {}
self.filtered_channels = {}
self.update_blocked_and_filtered_claims()
for algorithm in self.trending:
algorithm.install(self.db)
def close(self):
if self.db is not None:
self.db.close()
def update_blocked_and_filtered_claims(self):
self.update_claims_from_channel_hashes(
self.blocked_streams, self.blocked_channels, self.blocking_channel_hashes
)
self.update_claims_from_channel_hashes(
self.filtered_streams, self.filtered_channels, self.filtering_channel_hashes
)
self.filtered_streams.update(self.blocked_streams)
self.filtered_channels.update(self.blocked_channels)
def update_claims_from_channel_hashes(self, shared_streams, shared_channels, channel_hashes):
streams, channels = {}, {}
if channel_hashes:
sql = query(
"SELECT repost.channel_hash, repost.reposted_claim_hash, target.claim_type "
"FROM claim as repost JOIN claim AS target ON (target.claim_hash=repost.reposted_claim_hash)", **{
'repost.reposted_claim_hash__is_not_null': 1,
'repost.channel_hash__in': channel_hashes
}
)
for blocked_claim in self.execute(*sql):
if blocked_claim.claim_type == CLAIM_TYPES['stream']:
streams[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
elif blocked_claim.claim_type == CLAIM_TYPES['channel']:
channels[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
shared_streams.clear()
shared_streams.update(streams)
shared_channels.clear()
shared_channels.update(channels)
@staticmethod
def _insert_sql(table: str, data: dict) -> Tuple[str, list]:
columns, values = [], []
for column, value in data.items():
columns.append(column)
values.append(value)
sql = (
f"INSERT INTO {table} ({', '.join(columns)}) "
f"VALUES ({', '.join(['?'] * len(values))})"
)
return sql, values
@staticmethod
def _update_sql(table: str, data: dict, where: str,
constraints: Union[list, tuple]) -> Tuple[str, list]:
columns, values = [], []
for column, value in data.items():
columns.append(f"{column} = ?")
values.append(value)
values.extend(constraints)
return f"UPDATE {table} SET {', '.join(columns)} WHERE {where}", values
@staticmethod
def _delete_sql(table: str, constraints: dict) -> Tuple[str, dict]:
where, values = constraints_to_sql(constraints)
return f"DELETE FROM {table} WHERE {where}", values
def execute(self, *args):
return self.db.execute(*args)
def executemany(self, *args):
return self.db.executemany(*args)
def begin(self):
self.execute('begin;')
def commit(self):
self.execute('commit;')
def _upsertable_claims(self, txos: List[Output], header, clear_first=False):
claim_hashes, claims, tags, languages = set(), [], {}, {}
for txo in txos:
tx = txo.tx_ref.tx
try:
assert txo.claim_name
assert txo.normalized_name
except:
#self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.")
continue
language = 'none'
try:
if txo.claim.is_stream and txo.claim.stream.languages:
language = txo.claim.stream.languages[0].language
except:
pass
claim_hash = txo.claim_hash
claim_hashes.add(claim_hash)
claim_record = {
'claim_hash': claim_hash,
'claim_id': txo.claim_id,
'claim_name': txo.claim_name,
'normalized': txo.normalized_name,
'txo_hash': txo.ref.hash,
'tx_position': tx.position,
'amount': txo.amount,
'timestamp': header['timestamp'],
'height': tx.height,
'title': None,
'description': None,
'author': None,
'duration': None,
'claim_type': None,
'has_source': False,
'stream_type': None,
'media_type': None,
'release_time': None,
'fee_currency': None,
'fee_amount': 0,
'reposted_claim_hash': None
}
claims.append(claim_record)
try:
claim = txo.claim
except:
#self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.")
continue
if claim.is_stream:
claim_record['claim_type'] = CLAIM_TYPES['stream']
claim_record['has_source'] = claim.stream.has_source
claim_record['media_type'] = claim.stream.source.media_type
claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])]
claim_record['title'] = claim.stream.title
claim_record['description'] = claim.stream.description
claim_record['author'] = claim.stream.author
if claim.stream.video and claim.stream.video.duration:
claim_record['duration'] = claim.stream.video.duration
if claim.stream.audio and claim.stream.audio.duration:
claim_record['duration'] = claim.stream.audio.duration
if claim.stream.release_time:
claim_record['release_time'] = claim.stream.release_time
if claim.stream.has_fee:
fee = claim.stream.fee
if isinstance(fee.currency, str):
claim_record['fee_currency'] = fee.currency.lower()
if isinstance(fee.amount, Decimal):
if fee.amount >= 0 and int(fee.amount*1000) < 9223372036854775807:
claim_record['fee_amount'] = int(fee.amount*1000)
elif claim.is_repost:
claim_record['claim_type'] = CLAIM_TYPES['repost']
claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash
elif claim.is_channel:
claim_record['claim_type'] = CLAIM_TYPES['channel']
elif claim.is_collection:
claim_record['claim_type'] = CLAIM_TYPES['collection']
languages[(language, claim_hash)] = (language, claim_hash, tx.height)
for tag in clean_tags(claim.message.tags):
tags[(tag, claim_hash)] = (tag, claim_hash, tx.height)
if clear_first:
self._clear_claim_metadata(claim_hashes)
if tags:
self.executemany(
"INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values()
)
if languages:
self.executemany(
"INSERT OR IGNORE INTO language (language, claim_hash, height) VALUES (?, ?, ?)", languages.values()
)
return claims
def insert_claims(self, txos: List[Output], header):
claims = self._upsertable_claims(txos, header)
if claims:
self.executemany("""
INSERT OR REPLACE INTO claim (
claim_hash, claim_id, claim_name, normalized, txo_hash, tx_position, amount,
claim_type, media_type, stream_type, timestamp, creation_timestamp, has_source,
fee_currency, fee_amount, title, description, author, duration, height, reposted_claim_hash,
creation_height, release_time, activation_height, expiration_height, short_url)
VALUES (
:claim_hash, :claim_id, :claim_name, :normalized, :txo_hash, :tx_position, :amount,
:claim_type, :media_type, :stream_type, :timestamp, :timestamp, :has_source,
:fee_currency, :fee_amount, :title, :description, :author, :duration, :height, :reposted_claim_hash, :height,
CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE :timestamp END,
CASE WHEN :normalized NOT IN (SELECT normalized FROM claimtrie) THEN :height END,
CASE WHEN :height >= 137181 THEN :height+2102400 ELSE :height+262974 END,
:claim_name||COALESCE(
(SELECT shortest_id(claim_id, :claim_id) FROM claim WHERE normalized = :normalized),
'#'||substr(:claim_id, 1, 1)
)
)""", claims)
def update_claims(self, txos: List[Output], header):
claims = self._upsertable_claims(txos, header, clear_first=True)
if claims:
self.executemany("""
UPDATE claim SET
txo_hash=:txo_hash, tx_position=:tx_position, amount=:amount, height=:height,
claim_type=:claim_type, media_type=:media_type, stream_type=:stream_type,
timestamp=:timestamp, fee_amount=:fee_amount, fee_currency=:fee_currency, has_source=:has_source,
title=:title, duration=:duration, description=:description, author=:author, reposted_claim_hash=:reposted_claim_hash,
release_time=CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE release_time END
WHERE claim_hash=:claim_hash;
""", claims)
def delete_claims(self, claim_hashes: Set[bytes]):
""" Deletes claim supports and from claimtrie in case of an abandon. """
if claim_hashes:
affected_channels = self.execute(*query(
"SELECT channel_hash FROM claim", channel_hash__is_not_null=1, claim_hash__in=claim_hashes
)).fetchall()
for table in ('claim', 'support', 'claimtrie'):
self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes}))
self._clear_claim_metadata(claim_hashes)
return {r.channel_hash for r in affected_channels}
return set()
def delete_claims_above_height(self, height: int):
claim_hashes = [x[0] for x in self.execute(
"SELECT claim_hash FROM claim WHERE height>?", (height, )
).fetchall()]
while claim_hashes:
batch = set(claim_hashes[:500])
claim_hashes = claim_hashes[500:]
self.delete_claims(batch)
def _clear_claim_metadata(self, claim_hashes: Set[bytes]):
if claim_hashes:
for table in ('tag',): # 'language', 'location', etc
self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes}))
def split_inputs_into_claims_supports_and_other(self, txis):
txo_hashes = {txi.txo_ref.hash for txi in txis}
claims = self.execute(*query(
"SELECT txo_hash, claim_hash, normalized FROM claim", txo_hash__in=txo_hashes
)).fetchall()
txo_hashes -= {r.txo_hash for r in claims}
supports = {}
if txo_hashes:
supports = self.execute(*query(
"SELECT txo_hash, claim_hash FROM support", txo_hash__in=txo_hashes
)).fetchall()
txo_hashes -= {r.txo_hash for r in supports}
return claims, supports, txo_hashes
def insert_supports(self, txos: List[Output]):
supports = []
for txo in txos:
tx = txo.tx_ref.tx
supports.append((
txo.ref.hash, tx.position, tx.height,
txo.claim_hash, txo.amount
))
if supports:
self.executemany(
"INSERT OR IGNORE INTO support ("
" txo_hash, tx_position, height, claim_hash, amount"
") "
"VALUES (?, ?, ?, ?, ?)", supports
)
def delete_supports(self, txo_hashes: Set[bytes]):
if txo_hashes:
self.execute(*self._delete_sql('support', {'txo_hash__in': txo_hashes}))
def calculate_reposts(self, txos: List[Output]):
targets = set()
for txo in txos:
try:
claim = txo.claim
except:
continue
if claim.is_repost:
targets.add((claim.repost.reference.claim_hash,))
if targets:
self.executemany(
"""
UPDATE claim SET reposted = (
SELECT count(*) FROM claim AS repost WHERE repost.reposted_claim_hash = claim.claim_hash
)
WHERE claim_hash = ?
""", targets
)
return {target[0] for target in targets}
def validate_channel_signatures(self, height, new_claims, updated_claims, spent_claims, affected_channels, timer):
if not new_claims and not updated_claims and not spent_claims:
return
sub_timer = timer.add_timer('segregate channels and signables')
sub_timer.start()
channels, new_channel_keys, signables = {}, {}, {}
for txo in chain(new_claims, updated_claims):
try:
claim = txo.claim
except:
continue
if claim.is_channel:
channels[txo.claim_hash] = txo
new_channel_keys[txo.claim_hash] = claim.channel.public_key_bytes
else:
signables[txo.claim_hash] = txo
sub_timer.stop()
sub_timer = timer.add_timer('make list of channels we need to lookup')
sub_timer.start()
missing_channel_keys = set()
for txo in signables.values():
claim = txo.claim
if claim.is_signed and claim.signing_channel_hash not in new_channel_keys:
missing_channel_keys.add(claim.signing_channel_hash)
sub_timer.stop()
sub_timer = timer.add_timer('lookup missing channels')
sub_timer.start()
all_channel_keys = {}
if new_channel_keys or missing_channel_keys or affected_channels:
all_channel_keys = dict(self.execute(*query(
"SELECT claim_hash, public_key_bytes FROM claim",
claim_hash__in=set(new_channel_keys) | missing_channel_keys | affected_channels
)))
sub_timer.stop()
sub_timer = timer.add_timer('prepare for updating claims')
sub_timer.start()
changed_channel_keys = {}
for claim_hash, new_key in new_channel_keys.items():
if claim_hash not in all_channel_keys or all_channel_keys[claim_hash] != new_key:
all_channel_keys[claim_hash] = new_key
changed_channel_keys[claim_hash] = new_key
claim_updates = []
for claim_hash, txo in signables.items():
claim = txo.claim
update = {
'claim_hash': claim_hash,
'channel_hash': None,
'signature': None,
'signature_digest': None,
'signature_valid': None
}
if claim.is_signed:
update.update({
'channel_hash': claim.signing_channel_hash,
'signature': txo.get_encoded_signature(),
'signature_digest': txo.get_signature_digest(self.ledger),
'signature_valid': 0
})
claim_updates.append(update)
sub_timer.stop()
sub_timer = timer.add_timer('find claims affected by a change in channel key')
sub_timer.start()
if changed_channel_keys:
sql = f"""
SELECT * FROM claim WHERE
channel_hash IN ({','.join('?' for _ in changed_channel_keys)}) AND
signature IS NOT NULL
"""
for affected_claim in self.execute(sql, list(changed_channel_keys.keys())):
if affected_claim.claim_hash not in signables:
claim_updates.append({
'claim_hash': affected_claim.claim_hash,
'channel_hash': affected_claim.channel_hash,
'signature': affected_claim.signature,
'signature_digest': affected_claim.signature_digest,
'signature_valid': 0
})
sub_timer.stop()
sub_timer = timer.add_timer('verify signatures')
sub_timer.start()
for update in claim_updates:
channel_pub_key = all_channel_keys.get(update['channel_hash'])
if channel_pub_key and update['signature']:
update['signature_valid'] = Output.is_signature_valid(
bytes(update['signature']), bytes(update['signature_digest']), channel_pub_key
)
sub_timer.stop()
sub_timer = timer.add_timer('update claims')
sub_timer.start()
if claim_updates:
self.executemany(f"""
UPDATE claim SET
channel_hash=:channel_hash, signature=:signature, signature_digest=:signature_digest,
signature_valid=:signature_valid,
channel_join=CASE
WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN channel_join
WHEN :signature_valid=1 THEN {height}
END,
canonical_url=CASE
WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN canonical_url
WHEN :signature_valid=1 THEN
(SELECT short_url FROM claim WHERE claim_hash=:channel_hash)||'/'||
claim_name||COALESCE(
(SELECT shortest_id(other_claim.claim_id, claim.claim_id) FROM claim AS other_claim
WHERE other_claim.signature_valid = 1 AND
other_claim.channel_hash = :channel_hash AND
other_claim.normalized = claim.normalized),
'#'||substr(claim_id, 1, 1)
)
END
WHERE claim_hash=:claim_hash;
""", claim_updates)
sub_timer.stop()
sub_timer = timer.add_timer('update claims affected by spent channels')
sub_timer.start()
if spent_claims:
self.execute(
f"""
UPDATE claim SET
signature_valid=CASE WHEN signature IS NOT NULL THEN 0 END,
channel_join=NULL, canonical_url=NULL
WHERE channel_hash IN ({','.join('?' for _ in spent_claims)})
""", list(spent_claims)
)
sub_timer.stop()
sub_timer = timer.add_timer('update channels')
sub_timer.start()
if channels:
self.executemany(
"""
UPDATE claim SET
public_key_bytes=:public_key_bytes,
public_key_hash=:public_key_hash
WHERE claim_hash=:claim_hash""", [{
'claim_hash': claim_hash,
'public_key_bytes': txo.claim.channel.public_key_bytes,
'public_key_hash': self.ledger.address_to_hash160(
self.ledger.public_key_to_address(txo.claim.channel.public_key_bytes)
)
} for claim_hash, txo in channels.items()]
)
sub_timer.stop()
sub_timer = timer.add_timer('update claims_in_channel counts')
sub_timer.start()
if all_channel_keys:
self.executemany(f"""
UPDATE claim SET
claims_in_channel=(
SELECT COUNT(*) FROM claim AS claim_in_channel
WHERE claim_in_channel.signature_valid=1 AND
claim_in_channel.channel_hash=claim.claim_hash
)
WHERE claim_hash = ?
""", [(channel_hash,) for channel_hash in all_channel_keys])
sub_timer.stop()
sub_timer = timer.add_timer('update blocked claims list')
sub_timer.start()
if (self.blocking_channel_hashes.intersection(all_channel_keys) or
self.filtering_channel_hashes.intersection(all_channel_keys)):
self.update_blocked_and_filtered_claims()
sub_timer.stop()
def _update_support_amount(self, claim_hashes):
if claim_hashes:
self.execute(f"""
UPDATE claim SET
support_amount = COALESCE(
(SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash), 0
)
WHERE claim_hash IN ({','.join('?' for _ in claim_hashes)})
""", claim_hashes)
def _update_effective_amount(self, height, claim_hashes=None):
self.execute(
f"UPDATE claim SET effective_amount = amount + support_amount "
f"WHERE activation_height = {height}"
)
if claim_hashes:
self.execute(
f"UPDATE claim SET effective_amount = amount + support_amount "
f"WHERE activation_height < {height} "
f" AND claim_hash IN ({','.join('?' for _ in claim_hashes)})",
claim_hashes
)
def _calculate_activation_height(self, height):
last_take_over_height = f"""COALESCE(
(SELECT last_take_over_height FROM claimtrie
WHERE claimtrie.normalized=claim.normalized),
{height}
)
"""
self.execute(f"""
UPDATE claim SET activation_height =
{height} + min(4032, cast(({height} - {last_take_over_height}) / 32 AS INT))
WHERE activation_height IS NULL
""")
def _perform_overtake(self, height, changed_claim_hashes, deleted_names):
deleted_names_sql = claim_hashes_sql = ""
if changed_claim_hashes:
claim_hashes_sql = f"OR claim_hash IN ({','.join('?' for _ in changed_claim_hashes)})"
if deleted_names:
deleted_names_sql = f"OR normalized IN ({','.join('?' for _ in deleted_names)})"
overtakes = self.execute(f"""
SELECT winner.normalized, winner.claim_hash,
claimtrie.claim_hash AS current_winner,
MAX(winner.effective_amount) AS max_winner_effective_amount
FROM (
SELECT normalized, claim_hash, effective_amount FROM claim
WHERE normalized IN (
SELECT normalized FROM claim WHERE activation_height={height} {claim_hashes_sql}
) {deleted_names_sql}
ORDER BY effective_amount DESC, height ASC, tx_position ASC
) AS winner LEFT JOIN claimtrie USING (normalized)
GROUP BY winner.normalized
HAVING current_winner IS NULL OR current_winner <> winner.claim_hash
""", list(changed_claim_hashes)+deleted_names)
for overtake in overtakes:
if overtake.current_winner:
self.execute(
f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} "
f"WHERE normalized = ?",
(overtake.claim_hash, overtake.normalized)
)
else:
self.execute(
f"INSERT INTO claimtrie (claim_hash, normalized, last_take_over_height) "
f"VALUES (?, ?, {height})",
(overtake.claim_hash, overtake.normalized)
)
self.execute(
f"UPDATE claim SET activation_height = {height} WHERE normalized = ? "
f"AND (activation_height IS NULL OR activation_height > {height})",
(overtake.normalized,)
)
def _copy(self, height):
if height > 50:
self.execute(f"DROP TABLE claimtrie{height-50}")
self.execute(f"CREATE TABLE claimtrie{height} AS SELECT * FROM claimtrie")
def update_claimtrie(self, height, changed_claim_hashes, deleted_names, timer):
r = timer.run
binary_claim_hashes = list(changed_claim_hashes)
r(self._calculate_activation_height, height)
r(self._update_support_amount, binary_claim_hashes)
r(self._update_effective_amount, height, binary_claim_hashes)
r(self._perform_overtake, height, binary_claim_hashes, list(deleted_names))
r(self._update_effective_amount, height)
r(self._perform_overtake, height, [], [])
def get_expiring(self, height):
return self.execute(
f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}"
)
def enqueue_changes(self):
query = """
SELECT claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
(select group_concat(tag, ',,') from tag where tag.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as tags,
(select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages,
cr.has_source as reposted_has_source,
cr.claim_type as reposted_claim_type,
cr.stream_type as reposted_stream_type,
cr.media_type as reposted_media_type,
cr.duration as reposted_duration,
cr.fee_amount as reposted_fee_amount,
cr.fee_currency as reposted_fee_currency,
claim.*
FROM claim LEFT JOIN claimtrie USING (claim_hash) LEFT JOIN claim cr ON cr.claim_hash=claim.reposted_claim_hash
WHERE claim.claim_hash in (SELECT claim_hash FROM changelog)
"""
for claim in self.execute(query):
claim = claim._asdict()
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
claim['censor_type'] = 0
censoring_channel_hash = None
claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source'])
claim['stream_type'] = claim.pop('reposted_stream_type') or claim['stream_type']
claim['media_type'] = claim.pop('reposted_media_type') or claim['media_type']
claim['fee_amount'] = claim.pop('reposted_fee_amount') or claim['fee_amount']
claim['fee_currency'] = claim.pop('reposted_fee_currency') or claim['fee_currency']
claim['duration'] = claim.pop('reposted_duration') or claim['duration']
for reason_id in id_set:
if reason_id in self.blocked_streams:
claim['censor_type'] = 2
censoring_channel_hash = self.blocked_streams.get(reason_id)
elif reason_id in self.blocked_channels:
claim['censor_type'] = 2
censoring_channel_hash = self.blocked_channels.get(reason_id)
elif reason_id in self.filtered_streams:
claim['censor_type'] = 1
censoring_channel_hash = self.filtered_streams.get(reason_id)
elif reason_id in self.filtered_channels:
claim['censor_type'] = 1
censoring_channel_hash = self.filtered_channels.get(reason_id)
claim['censoring_channel_id'] = censoring_channel_hash[::-1].hex() if censoring_channel_hash else None
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
yield 'update', claim
def clear_changelog(self):
self.execute("delete from changelog;")
def claim_producer(self):
while self.pending_deletes:
claim_hash = self.pending_deletes.pop()
yield 'delete', hexlify(claim_hash[::-1]).decode()
for claim in self.enqueue_changes():
yield claim
self.clear_changelog()
def advance_txs(self, height, all_txs, header, daemon_height, timer):
insert_claims = []
update_claims = []
update_claim_hashes = set()
delete_claim_hashes = self.pending_deletes
insert_supports = []
delete_support_txo_hashes = set()
recalculate_claim_hashes = set() # added/deleted supports, added/updated claim
deleted_claim_names = set()
delete_others = set()
body_timer = timer.add_timer('body')
for position, (etx, txid) in enumerate(all_txs):
tx = timer.run(
Transaction, etx.raw, height=height, position=position
)
# Inputs
spent_claims, spent_supports, spent_others = timer.run(
self.split_inputs_into_claims_supports_and_other, tx.inputs
)
body_timer.start()
delete_claim_hashes.update({r.claim_hash for r in spent_claims})
deleted_claim_names.update({r.normalized for r in spent_claims})
delete_support_txo_hashes.update({r.txo_hash for r in spent_supports})
recalculate_claim_hashes.update({r.claim_hash for r in spent_supports})
delete_others.update(spent_others)
# Outputs
for output in tx.outputs:
if output.is_support:
insert_supports.append(output)
recalculate_claim_hashes.add(output.claim_hash)
elif output.script.is_claim_name:
insert_claims.append(output)
recalculate_claim_hashes.add(output.claim_hash)
elif output.script.is_update_claim:
claim_hash = output.claim_hash
update_claims.append(output)
recalculate_claim_hashes.add(claim_hash)
body_timer.stop()
skip_update_claim_timer = timer.add_timer('skip update of abandoned claims')
skip_update_claim_timer.start()
for updated_claim in list(update_claims):
if updated_claim.ref.hash in delete_others:
update_claims.remove(updated_claim)
for updated_claim in update_claims:
claim_hash = updated_claim.claim_hash
delete_claim_hashes.discard(claim_hash)
update_claim_hashes.add(claim_hash)
skip_update_claim_timer.stop()
skip_insert_claim_timer = timer.add_timer('skip insertion of abandoned claims')
skip_insert_claim_timer.start()
for new_claim in list(insert_claims):
if new_claim.ref.hash in delete_others:
if new_claim.claim_hash not in update_claim_hashes:
insert_claims.remove(new_claim)
skip_insert_claim_timer.stop()
skip_insert_support_timer = timer.add_timer('skip insertion of abandoned supports')
skip_insert_support_timer.start()
for new_support in list(insert_supports):
if new_support.ref.hash in delete_others:
insert_supports.remove(new_support)
skip_insert_support_timer.stop()
expire_timer = timer.add_timer('recording expired claims')
expire_timer.start()
for expired in self.get_expiring(height):
delete_claim_hashes.add(expired.claim_hash)
deleted_claim_names.add(expired.normalized)
expire_timer.stop()
r = timer.run
affected_channels = r(self.delete_claims, delete_claim_hashes)
r(self.delete_supports, delete_support_txo_hashes)
r(self.insert_claims, insert_claims, header)
r(self.calculate_reposts, insert_claims)
r(self.update_claims, update_claims, header)
r(self.validate_channel_signatures, height, insert_claims,
update_claims, delete_claim_hashes, affected_channels, forward_timer=True)
r(self.insert_supports, insert_supports)
r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True)
for algorithm in self.trending:
r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes)

View file

@ -1,616 +0,0 @@
import time
import struct
import sqlite3
import logging
from operator import itemgetter
from typing import Tuple, List, Dict, Union, Type, Optional
from binascii import unhexlify
from decimal import Decimal
from contextvars import ContextVar
from functools import wraps
from itertools import chain
from dataclasses import dataclass
from lbry.wallet.database import query, interpolate
from lbry.error import ResolveCensoredError
from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags
from lbry.schema.result import Outputs, Censor
from lbry.wallet import Ledger, RegTestLedger
from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
class SQLiteOperationalError(sqlite3.OperationalError):
def __init__(self, metrics):
super().__init__('sqlite query errored')
self.metrics = metrics
class SQLiteInterruptedError(sqlite3.OperationalError):
def __init__(self, metrics):
super().__init__('sqlite query interrupted')
self.metrics = metrics
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
sqlite3.enable_callback_tracebacks(True)
INTEGER_PARAMS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed',
'trending_local', 'trending_global',
}
SEARCH_PARAMS = {
'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
'is_controlling', 'limit', 'offset', 'order_by',
'no_totals', 'has_source'
} | INTEGER_PARAMS
ORDER_FIELDS = {
'name', 'claim_hash'
} | INTEGER_PARAMS
@dataclass
class ReaderState:
db: sqlite3.Connection
stack: List[List]
metrics: Dict
is_tracking_metrics: bool
ledger: Type[Ledger]
query_timeout: float
log: logging.Logger
blocked_streams: Dict
blocked_channels: Dict
filtered_streams: Dict
filtered_channels: Dict
def close(self):
self.db.close()
def reset_metrics(self):
self.stack = []
self.metrics = {}
def set_query_timeout(self):
stop_at = time.perf_counter() + self.query_timeout
def interruptor():
if time.perf_counter() >= stop_at:
self.db.interrupt()
return
self.db.set_progress_handler(interruptor, 100)
def get_resolve_censor(self) -> Censor:
return Censor(Censor.RESOLVE)
def get_search_censor(self, limit_claims_per_channel: int) -> Censor:
return Censor(Censor.SEARCH)
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
def row_factory(cursor, row):
return {
k[0]: (set(row[i].split(',')) if k[0] == 'tags' else row[i])
for i, k in enumerate(cursor.description)
}
def initializer(log, _path, _ledger_name, query_timeout, _measure=False, block_and_filter=None):
db = sqlite3.connect(_path, isolation_level=None, uri=True)
db.row_factory = row_factory
if block_and_filter:
blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter
else:
blocked_streams = blocked_channels = filtered_streams = filtered_channels = {}
ctx.set(
ReaderState(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger,
query_timeout=query_timeout, log=log,
blocked_streams=blocked_streams, blocked_channels=blocked_channels,
filtered_streams=filtered_streams, filtered_channels=filtered_channels,
)
)
def cleanup():
ctx.get().close()
ctx.set(None)
def measure(func):
@wraps(func)
def wrapper(*args, **kwargs):
state = ctx.get()
if not state.is_tracking_metrics:
return func(*args, **kwargs)
metric = {}
state.metrics.setdefault(func.__name__, []).append(metric)
state.stack.append([])
start = time.perf_counter()
try:
return func(*args, **kwargs)
finally:
elapsed = int((time.perf_counter()-start)*1000)
metric['total'] = elapsed
metric['isolated'] = (elapsed-sum(state.stack.pop()))
if state.stack:
state.stack[-1].append(elapsed)
return wrapper
def reports_metrics(func):
@wraps(func)
def wrapper(*args, **kwargs):
state = ctx.get()
if not state.is_tracking_metrics:
return func(*args, **kwargs)
state.reset_metrics()
r = func(*args, **kwargs)
return r, state.metrics
return wrapper
@reports_metrics
def search_to_bytes(constraints) -> Union[bytes, Tuple[bytes, Dict]]:
return encode_result(search(constraints))
@reports_metrics
def resolve_to_bytes(urls) -> Union[bytes, Tuple[bytes, Dict]]:
return encode_result(resolve(urls))
def encode_result(result):
return Outputs.to_bytes(*result)
@measure
def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor) -> List:
context = ctx.get()
context.set_query_timeout()
try:
rows = context.db.execute(sql, values).fetchall()
return rows[row_offset:row_limit]
except sqlite3.OperationalError as err:
plain_sql = interpolate(sql, values)
if context.is_tracking_metrics:
context.metrics['execute_query'][-1]['sql'] = plain_sql
context.log.exception('failed running query', exc_info=err)
raise SQLiteOperationalError(context.metrics)
def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
if 'order_by' in constraints:
order_by_parts = constraints['order_by']
if isinstance(order_by_parts, str):
order_by_parts = [order_by_parts]
sql_order_by = []
for order_by in order_by_parts:
is_asc = order_by.startswith('^')
column = order_by[1:] if is_asc else order_by
if column not in ORDER_FIELDS:
raise NameError(f'{column} is not a valid order_by field')
if column == 'name':
column = 'normalized'
sql_order_by.append(
f"claim.{column} ASC" if is_asc else f"claim.{column} DESC"
)
constraints['order_by'] = sql_order_by
ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'}
for constraint in INTEGER_PARAMS:
if constraint in constraints:
value = constraints.pop(constraint)
postfix = ''
if isinstance(value, str):
if len(value) >= 2 and value[:2] in ops:
postfix, value = ops[value[:2]], value[2:]
elif len(value) >= 1 and value[0] in ops:
postfix, value = ops[value[0]], value[1:]
if constraint == 'fee_amount':
value = Decimal(value)*1000
constraints[f'claim.{constraint}{postfix}'] = int(value)
if constraints.pop('is_controlling', False):
if {'sequence', 'amount_order'}.isdisjoint(constraints):
for_count = False
constraints['claimtrie.claim_hash__is_not_null'] = ''
if 'sequence' in constraints:
constraints['order_by'] = 'claim.activation_height ASC'
constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1
if 'amount_order' in constraints:
constraints['order_by'] = 'claim.effective_amount DESC'
constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1
if 'claim_id' in constraints:
claim_id = constraints.pop('claim_id')
if len(claim_id) == 40:
constraints['claim.claim_id'] = claim_id
else:
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
elif 'claim_ids' in constraints:
constraints['claim.claim_id__in'] = set(constraints.pop('claim_ids'))
if 'reposted_claim_id' in constraints:
constraints['claim.reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1]
if 'name' in constraints:
constraints['claim.normalized'] = normalize_name(constraints.pop('name'))
if 'public_key_id' in constraints:
constraints['claim.public_key_hash'] = (
ctx.get().ledger.address_to_hash160(constraints.pop('public_key_id')))
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = constraints.pop('channel_hash')
if 'channel_ids' in constraints:
channel_ids = constraints.pop('channel_ids')
if channel_ids:
constraints['claim.channel_hash__in'] = {
unhexlify(cid)[::-1] for cid in channel_ids if cid
}
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = {
unhexlify(ncid)[::-1] for ncid in not_channel_ids
}
constraints['claim.claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
constraints['claim.signature_valid'] = constraints.pop('signature_valid')
else:
constraints['null_or_signature__or'] = {
'claim.signature_valid__is_null': True,
'claim.signature_valid': constraints.pop('signature_valid')
}
elif constraints.pop('has_channel_signature', False):
constraints['claim.signature_valid__is_not_null'] = True
if 'txid' in constraints:
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
nout = constraints.pop('nout', 0)
constraints['claim.txo_hash'] = tx_hash + struct.pack('<I', nout)
if 'claim_type' in constraints:
claim_types = constraints.pop('claim_type')
if isinstance(claim_types, str):
claim_types = [claim_types]
if claim_types:
constraints['claim.claim_type__in'] = {
CLAIM_TYPES[claim_type] for claim_type in claim_types
}
if 'stream_types' in constraints:
stream_types = constraints.pop('stream_types')
if stream_types:
constraints['claim.stream_type__in'] = {
STREAM_TYPES[stream_type] for stream_type in stream_types
}
if 'media_types' in constraints:
media_types = constraints.pop('media_types')
if media_types:
constraints['claim.media_type__in'] = set(media_types)
if 'fee_currency' in constraints:
constraints['claim.fee_currency'] = constraints.pop('fee_currency').lower()
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
_apply_constraints_for_array_attributes(constraints, 'location', lambda _: _, for_count)
select = f"SELECT {cols} FROM claim"
if not for_count:
select += " LEFT JOIN claimtrie USING (claim_hash)"
return query(select, **constraints)
def select_claims(censor: Censor, cols: str, for_count=False, **constraints) -> List:
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = resolve_url(channel_url)
if isinstance(match, dict):
constraints['channel_hash'] = match['claim_hash']
else:
return [{'row_count': 0}] if cols == 'count(*) as row_count' else []
row_offset = constraints.pop('offset', 0)
row_limit = constraints.pop('limit', 20)
sql, values = claims_query(cols, for_count, **constraints)
return execute_query(sql, values, row_offset, row_limit, censor)
@measure
def count_claims(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = select_claims(Censor(Censor.SEARCH), 'count(*) as row_count', for_count=True, **constraints)
return count[0]['row_count']
def search_claims(censor: Censor, **constraints) -> List:
return select_claims(
censor,
"""
claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
claim.claim_hash, claim.txo_hash,
claim.claims_in_channel, claim.reposted,
claim.height, claim.creation_height,
claim.activation_height, claim.expiration_height,
claim.effective_amount, claim.support_amount,
claim.trending_group, claim.trending_mixed,
claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url,
claim.channel_hash, claim.reposted_claim_hash,
claim.signature_valid
""", **constraints
)
def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]):
censor = ctx.get().get_resolve_censor()
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows)))
channel_hashes = set(chain(
filter(None, map(itemgetter('channel_hash'), txo_rows)),
censor_channels
))
reposted_txos = []
if repost_hashes:
reposted_txos = search_claims(censor, **{'claim.claim_hash__in': repost_hashes})
channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos)))
channel_txos = []
if channel_hashes:
channel_txos = search_claims(censor, **{'claim.claim_hash__in': channel_hashes})
# channels must come first for client side inflation to work properly
return channel_txos + reposted_txos
@measure
def search(constraints) -> Tuple[List, List, int, int, Censor]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None
limit_claims_per_channel = constraints.pop('limit_claims_per_channel', None)
if not constraints.pop('no_totals', False):
total = count_claims(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
context = ctx.get()
search_censor = context.get_search_censor(limit_claims_per_channel)
txo_rows = search_claims(search_censor, **constraints)
extra_txo_rows = _get_referenced_rows(txo_rows, search_censor.censored.keys())
return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor
@measure
def resolve(urls) -> Tuple[List, List]:
txo_rows = [resolve_url(raw_url) for raw_url in urls]
extra_txo_rows = _get_referenced_rows(
[txo for txo in txo_rows if isinstance(txo, dict)],
[txo.censor_id for txo in txo_rows if isinstance(txo, ResolveCensoredError)]
)
return txo_rows, extra_txo_rows
@measure
def resolve_url(raw_url):
censor = ctx.get().get_resolve_censor()
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
channel = None
if url.has_channel:
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
matches = search_claims(censor, **query, limit=1)
if matches:
channel = matches[0]
elif censor.censored:
return ResolveCensoredError(raw_url, next(iter(censor.censored)))
else:
return LookupError(f'Could not find channel in "{raw_url}".')
if url.has_stream:
query = url.stream.to_dict()
if channel is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_hash'] = channel['claim_hash']
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = search_claims(censor, **query, limit=1)
if matches:
return matches[0]
elif censor.censored:
return ResolveCensoredError(raw_url, next(iter(censor.censored)))
else:
return LookupError(f'Could not find claim at "{raw_url}".')
return channel
CLAIM_HASH_OR_REPOST_HASH_SQL = f"""
CASE WHEN claim.claim_type = {CLAIM_TYPES['repost']}
THEN claim.reposted_claim_hash
ELSE claim.claim_hash
END
"""
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = {item for item in all_items if item not in not_items}
any_items = {item for item in any_items if item not in not_items}
any_queries = {}
if attr == 'tag':
common_tags = any_items & COMMON_TAGS.keys()
if common_tags:
any_items -= common_tags
if len(common_tags) < 5:
for item in common_tags:
index_name = COMMON_TAGS[item]
any_queries[f'#_common_tag_{index_name}'] = f"""
EXISTS(
SELECT 1 FROM tag INDEXED BY tag_{index_name}_idx
WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
AND tag = '{item}'
)
"""
elif len(common_tags) >= 5:
constraints.update({
f'$any_common_tag{i}': item for i, item in enumerate(common_tags)
})
values = ', '.join(
f':$any_common_tag{i}' for i in range(len(common_tags))
)
any_queries[f'#_any_common_tags'] = f"""
EXISTS(
SELECT 1 FROM tag WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
AND tag IN ({values})
)
"""
elif attr == 'language':
indexed_languages = any_items & set(INDEXED_LANGUAGES)
if indexed_languages:
any_items -= indexed_languages
for language in indexed_languages:
any_queries[f'#_any_common_languages_{language}'] = f"""
EXISTS(
SELECT 1 FROM language INDEXED BY language_{language}_idx
WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=language.claim_hash
AND language = '{language}'
)
"""
if any_items:
constraints.update({
f'$any_{attr}{i}': item for i, item in enumerate(any_items)
})
values = ', '.join(
f':$any_{attr}{i}' for i in range(len(any_items))
)
if for_count or attr == 'tag':
if attr == 'tag':
any_queries[f'#_any_{attr}'] = f"""
((claim.claim_type != {CLAIM_TYPES['repost']}
AND claim.claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR
(claim.claim_type == {CLAIM_TYPES['repost']} AND
claim.reposted_claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))))
"""
else:
any_queries[f'#_any_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
any_queries[f'#_any_{attr}'] = f"""
EXISTS(
SELECT 1 FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""
if len(any_queries) == 1:
constraints.update(any_queries)
elif len(any_queries) > 1:
constraints[f'ORed_{attr}_queries__any'] = any_queries
if all_items:
constraints[f'$all_{attr}_count'] = len(all_items)
constraints.update({
f'$all_{attr}{i}': item for i, item in enumerate(all_items)
})
values = ', '.join(
f':$all_{attr}{i}' for i in range(len(all_items))
)
if for_count:
constraints[f'#_all_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count
)
"""
else:
constraints[f'#_all_{attr}'] = f"""
{len(all_items)}=(
SELECT count(*) FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""
if not_items:
constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
})
values = ', '.join(
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
if attr == 'tag':
constraints[f'#_not_{attr}'] = f"""
((claim.claim_type != {CLAIM_TYPES['repost']}
AND claim.claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR
(claim.claim_type == {CLAIM_TYPES['repost']} AND
claim.reposted_claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))))
"""
else:
constraints[f'#_not_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(
SELECT 1 FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""

View file

@ -1,765 +0,0 @@
import unittest
import ecdsa
import hashlib
import logging
from binascii import hexlify
from typing import List, Tuple
from lbry.wallet.constants import COIN, NULL_HASH32
from lbry.schema.claim import Claim
from lbry.schema.result import Censor
from lbry.wallet.server.db import writer
from lbry.wallet.server.coin import LBCRegTest
from lbry.wallet.server.db.trending import zscore
from lbry.wallet.server.db.canonical import FindShortestID
from lbry.wallet.transaction import Transaction, Input, Output
try:
import reader
except:
from . import reader
def get_output(amount=COIN, pubkey_hash=NULL_HASH32):
return Transaction() \
.add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \
.outputs[0]
def get_input():
return Input.spend(get_output())
def get_tx():
return Transaction().add_inputs([get_input()])
def search(**constraints) -> List:
return reader.search_claims(Censor(Censor.SEARCH), **constraints)
def censored_search(**constraints) -> Tuple[List, Censor]:
rows, _, _, _, censor = reader.search(constraints)
return rows, censor
class TestSQLDB(unittest.TestCase):
query_timeout = 0.25
def setUp(self):
self.first_sync = False
self.daemon_height = 1
self.coin = LBCRegTest()
db_url = 'file:test_sqldb?mode=memory&cache=shared'
self.sql = writer.SQLDB(self, db_url, [], [], [zscore])
self.addCleanup(self.sql.close)
self.sql.open()
reader.initializer(
logging.getLogger(__name__), db_url, 'regtest',
self.query_timeout, block_and_filter=(
self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels
)
)
self.addCleanup(reader.cleanup)
self._current_height = 0
self._txos = {}
def _make_tx(self, output, txi=None):
tx = get_tx().add_outputs([output])
if txi is not None:
tx.add_inputs([txi])
self._txos[output.ref.hash] = output
return tx, tx.hash
def _set_channel_key(self, channel, key):
private_key = ecdsa.SigningKey.from_string(key*32, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256)
channel.private_key = private_key
channel.claim.channel.public_key_bytes = private_key.get_verifying_key().to_der()
channel.script.generate()
def get_channel(self, title, amount, name='@foo', key=b'a'):
claim = Claim()
claim.channel.title = title
channel = Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc')
self._set_channel_key(channel, key)
return self._make_tx(channel)
def get_channel_update(self, channel, amount, key=b'a'):
self._set_channel_key(channel, key)
return self._make_tx(
Output.pay_update_claim_pubkey_hash(
amount, channel.claim_name, channel.claim_id, channel.claim, b'abc'
),
Input.spend(channel)
)
def get_stream(self, title, amount, name='foo', channel=None, **kwargs):
claim = Claim()
claim.stream.update(title=title, **kwargs)
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc'))
if channel:
result[0].outputs[0].sign(channel)
result[0]._reset()
return result
def get_stream_update(self, tx, amount, channel=None):
stream = Transaction(tx[0].raw).outputs[0]
result = self._make_tx(
Output.pay_update_claim_pubkey_hash(
amount, stream.claim_name, stream.claim_id, stream.claim, b'abc'
),
Input.spend(stream)
)
if channel:
result[0].outputs[0].sign(channel)
result[0]._reset()
return result
def get_repost(self, claim_id, amount, channel):
claim = Claim()
claim.repost.reference.claim_id = claim_id
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, 'repost', claim, b'abc'))
result[0].outputs[0].sign(channel)
result[0]._reset()
return result
def get_abandon(self, tx):
claim = Transaction(tx[0].raw).outputs[0]
return self._make_tx(
Output.pay_pubkey_hash(claim.amount, b'abc'),
Input.spend(claim)
)
def get_support(self, tx, amount):
claim = Transaction(tx[0].raw).outputs[0]
return self._make_tx(
Output.pay_support_pubkey_hash(
amount, claim.claim_name, claim.claim_id, b'abc'
)
)
def get_controlling(self):
for claim in self.sql.execute("select claim.* from claimtrie natural join claim"):
txo = self._txos[claim.txo_hash]
controlling = txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height
return controlling
def get_active(self):
controlling = self.get_controlling()
active = []
for claim in self.sql.execute(
f"select * from claim where activation_height <= {self._current_height}"):
txo = self._txos[claim.txo_hash]
if controlling and controlling[0] == txo.claim.stream.title:
continue
active.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height))
return active
def get_accepted(self):
accepted = []
for claim in self.sql.execute(
f"select * from claim where activation_height > {self._current_height}"):
txo = self._txos[claim.txo_hash]
accepted.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height))
return accepted
def advance(self, height, txs):
self._current_height = height
self.sql.advance_txs(height, txs, {'timestamp': 1}, self.daemon_height, self.timer)
return [otx[0].outputs[0] for otx in txs]
def state(self, controlling=None, active=None, accepted=None):
self.assertEqual(controlling, self.get_controlling())
self.assertEqual(active or [], self.get_active())
self.assertEqual(accepted or [], self.get_accepted())
@unittest.skip("port canonical url tests to leveldb") # TODO: port canonical url tests to leveldb
class TestClaimtrie(TestSQLDB):
def test_example_from_spec(self):
# https://spec.lbry.com/#claim-activation-example
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(13, [stream])
state(
controlling=('Claim A', 10*COIN, 10*COIN, 13),
active=[],
accepted=[]
)
advance(1001, [self.get_stream('Claim B', 20*COIN)])
state(
controlling=('Claim A', 10*COIN, 10*COIN, 13),
active=[],
accepted=[('Claim B', 20*COIN, 0, 1031)]
)
advance(1010, [self.get_support(stream, 14*COIN)])
state(
controlling=('Claim A', 10*COIN, 24*COIN, 13),
active=[],
accepted=[('Claim B', 20*COIN, 0, 1031)]
)
advance(1020, [self.get_stream('Claim C', 50*COIN)])
state(
controlling=('Claim A', 10*COIN, 24*COIN, 13),
active=[],
accepted=[
('Claim B', 20*COIN, 0, 1031),
('Claim C', 50*COIN, 0, 1051)]
)
advance(1031, [])
state(
controlling=('Claim A', 10*COIN, 24*COIN, 13),
active=[('Claim B', 20*COIN, 20*COIN, 1031)],
accepted=[('Claim C', 50*COIN, 0, 1051)]
)
advance(1040, [self.get_stream('Claim D', 300*COIN)])
state(
controlling=('Claim A', 10*COIN, 24*COIN, 13),
active=[('Claim B', 20*COIN, 20*COIN, 1031)],
accepted=[
('Claim C', 50*COIN, 0, 1051),
('Claim D', 300*COIN, 0, 1072)]
)
advance(1051, [])
state(
controlling=('Claim D', 300*COIN, 300*COIN, 1051),
active=[
('Claim A', 10*COIN, 24*COIN, 13),
('Claim B', 20*COIN, 20*COIN, 1031),
('Claim C', 50*COIN, 50*COIN, 1051)],
accepted=[]
)
# beyond example
advance(1052, [self.get_stream_update(stream, 290*COIN)])
state(
controlling=('Claim A', 290*COIN, 304*COIN, 13),
active=[
('Claim B', 20*COIN, 20*COIN, 1031),
('Claim C', 50*COIN, 50*COIN, 1051),
('Claim D', 300*COIN, 300*COIN, 1051),
],
accepted=[]
)
def test_competing_claims_subsequent_blocks_height_wins(self):
advance, state = self.advance, self.state
advance(13, [self.get_stream('Claim A', 10*COIN)])
state(
controlling=('Claim A', 10*COIN, 10*COIN, 13),
active=[],
accepted=[]
)
advance(14, [self.get_stream('Claim B', 10*COIN)])
state(
controlling=('Claim A', 10*COIN, 10*COIN, 13),
active=[('Claim B', 10*COIN, 10*COIN, 14)],
accepted=[]
)
advance(15, [self.get_stream('Claim C', 10*COIN)])
state(
controlling=('Claim A', 10*COIN, 10*COIN, 13),
active=[
('Claim B', 10*COIN, 10*COIN, 14),
('Claim C', 10*COIN, 10*COIN, 15)],
accepted=[]
)
def test_competing_claims_in_single_block_position_wins(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
stream2 = self.get_stream('Claim B', 10*COIN)
advance(13, [stream, stream2])
state(
controlling=('Claim A', 10*COIN, 10*COIN, 13),
active=[('Claim B', 10*COIN, 10*COIN, 13)],
accepted=[]
)
def test_competing_claims_in_single_block_effective_amount_wins(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
stream2 = self.get_stream('Claim B', 11*COIN)
advance(13, [stream, stream2])
state(
controlling=('Claim B', 11*COIN, 11*COIN, 13),
active=[('Claim A', 10*COIN, 10*COIN, 13)],
accepted=[]
)
def test_winning_claim_deleted(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
stream2 = self.get_stream('Claim B', 11*COIN)
advance(13, [stream, stream2])
state(
controlling=('Claim B', 11*COIN, 11*COIN, 13),
active=[('Claim A', 10*COIN, 10*COIN, 13)],
accepted=[]
)
advance(14, [self.get_abandon(stream2)])
state(
controlling=('Claim A', 10*COIN, 10*COIN, 13),
active=[],
accepted=[]
)
def test_winning_claim_deleted_and_new_claim_becomes_winner(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
stream2 = self.get_stream('Claim B', 11*COIN)
advance(13, [stream, stream2])
state(
controlling=('Claim B', 11*COIN, 11*COIN, 13),
active=[('Claim A', 10*COIN, 10*COIN, 13)],
accepted=[]
)
advance(15, [self.get_abandon(stream2), self.get_stream('Claim C', 12*COIN)])
state(
controlling=('Claim C', 12*COIN, 12*COIN, 15),
active=[('Claim A', 10*COIN, 10*COIN, 13)],
accepted=[]
)
def test_winning_claim_expires_and_another_takes_over(self):
advance, state = self.advance, self.state
advance(10, [self.get_stream('Claim A', 11*COIN)])
advance(20, [self.get_stream('Claim B', 10*COIN)])
state(
controlling=('Claim A', 11*COIN, 11*COIN, 10),
active=[('Claim B', 10*COIN, 10*COIN, 20)],
accepted=[]
)
advance(262984, [])
state(
controlling=('Claim B', 10*COIN, 10*COIN, 20),
active=[],
accepted=[]
)
advance(262994, [])
state(
controlling=None,
active=[],
accepted=[]
)
def test_create_and_update_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_stream_update(stream, 11*COIN)])
self.assertTrue(search()[0])
def test_double_updates_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_stream_update(update, 9*COIN)])
self.assertTrue(search()[0])
def test_create_and_abandon_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_abandon(stream)])
self.assertFalse(search())
def test_update_and_abandon_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_abandon(update)])
self.assertFalse(search())
def test_create_update_and_delete_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
update = self.get_stream_update(stream, 11*COIN)
advance(10, [stream, update, self.get_abandon(update)])
self.assertFalse(search())
def test_support_added_and_removed_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream])
support = self.get_support(stream, COIN)
advance(20, [support, self.get_abandon(support)])
self.assertEqual(search()[0]['support_amount'], 0)
@staticmethod
def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs):
iterations = cached_iteration+1 if cached_iteration else 100
for i in range(cached_iteration or 1, iterations):
stream = getter(f'claim #{i}', COIN, **kwargs)
if stream[0].outputs[0].claim_id.startswith(prefix):
cached_iteration is None and print(f'Found "{prefix}" in {i} iterations.')
return stream
if cached_iteration:
raise ValueError(f'Failed to find "{prefix}" at cached iteration, run with None to find iteration.')
raise ValueError(f'Failed to find "{prefix}" in {iterations} iterations, try different values.')
def get_channel_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs):
return self._get_x_with_claim_id_prefix(self.get_channel, prefix, cached_iteration, **kwargs)
def get_stream_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs):
return self._get_x_with_claim_id_prefix(self.get_stream, prefix, cached_iteration, **kwargs)
def test_canonical_url_and_channel_validation(self):
advance = self.advance
tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c')
tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c')
txo_chan_a = tx_chan_a[0].outputs[0]
txo_chan_ab = tx_chan_ab[0].outputs[0]
advance(1, [tx_chan_a])
advance(2, [tx_chan_ab])
(r_ab, r_a) = search(order_by=['creation_height'], limit=2)
self.assertEqual("@foo#a", r_a['short_url'])
self.assertEqual("@foo#ab", r_ab['short_url'])
self.assertIsNone(r_a['canonical_url'])
self.assertIsNone(r_ab['canonical_url'])
self.assertEqual(0, r_a['claims_in_channel'])
self.assertEqual(0, r_ab['claims_in_channel'])
tx_a = self.get_stream_with_claim_id_prefix('a', 2)
tx_ab = self.get_stream_with_claim_id_prefix('ab', 42)
tx_abc = self.get_stream_with_claim_id_prefix('abc', 65)
advance(3, [tx_a])
advance(4, [tx_ab, tx_abc])
(r_abc, r_ab, r_a) = search(order_by=['creation_height', 'tx_position'], limit=3)
self.assertEqual("foo#a", r_a['short_url'])
self.assertEqual("foo#ab", r_ab['short_url'])
self.assertEqual("foo#abc", r_abc['short_url'])
self.assertIsNone(r_a['canonical_url'])
self.assertIsNone(r_ab['canonical_url'])
self.assertIsNone(r_abc['canonical_url'])
tx_a2 = self.get_stream_with_claim_id_prefix('a', 7, channel=txo_chan_a)
tx_ab2 = self.get_stream_with_claim_id_prefix('ab', 23, channel=txo_chan_a)
a2_claim = tx_a2[0].outputs[0]
ab2_claim = tx_ab2[0].outputs[0]
advance(6, [tx_a2])
advance(7, [tx_ab2])
(r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# change channel public key, invaliding stream claim signatures
advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')])
(r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
self.assertIsNone(r_ab2['canonical_url'])
self.assertEqual(0, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# reinstate previous channel public key (previous stream claim signatures become valid again)
channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c')
advance(9, [channel_update])
(r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
# change channel of stream
self.assertEqual("@foo#a/foo#ab", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab)
advance(10, [tx_ab2])
self.assertEqual("@foo#ab/foo#a", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
# TODO: currently there is a bug where stream leaving a channel does not update that channels claims count
self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# TODO: after bug is fixed remove test above and add test below
#self.assertEqual(1, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(1, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
# claim abandon updates claims_in_channel
advance(11, [self.get_abandon(tx_ab2)])
self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
# delete channel, invaliding stream claim signatures
advance(12, [self.get_abandon(channel_update)])
(r_a2,) = search(order_by=['creation_height'], limit=1)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
def test_resolve_issue_2448(self):
advance = self.advance
tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c')
tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c')
txo_chan_a = tx_chan_a[0].outputs[0]
txo_chan_ab = tx_chan_ab[0].outputs[0]
advance(1, [tx_chan_a])
advance(2, [tx_chan_ab])
self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash)
self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash)
# update increase last height change of channel
advance(9, [self.get_channel_update(txo_chan_a, COIN, key=b'c')])
# make sure that activation_height is used instead of height (issue #2448)
self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash)
self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash)
def test_canonical_find_shortest_id(self):
new_hash = 'abcdef0123456789beef'
other0 = '1bcdef0123456789beef'
other1 = 'ab1def0123456789beef'
other2 = 'abc1ef0123456789beef'
other3 = 'abcdef0123456789bee1'
f = FindShortestID()
f.step(other0, new_hash)
self.assertEqual('#a', f.finalize())
f.step(other1, new_hash)
self.assertEqual('#abc', f.finalize())
f.step(other2, new_hash)
self.assertEqual('#abcd', f.finalize())
f.step(other3, new_hash)
self.assertEqual('#abcdef0123456789beef', f.finalize())
@unittest.skip("port trending tests to ES") # TODO: port trending tests to ES
class TestTrending(TestSQLDB):
def test_trending(self):
advance, state = self.advance, self.state
no_trend = self.get_stream('Claim A', COIN)
downwards = self.get_stream('Claim B', COIN)
up_small = self.get_stream('Claim C', COIN)
up_medium = self.get_stream('Claim D', COIN)
up_biggly = self.get_stream('Claim E', COIN)
claims = advance(1, [up_biggly, up_medium, up_small, no_trend, downwards])
for window in range(1, 8):
advance(zscore.TRENDING_WINDOW * window, [
self.get_support(downwards, (20-window)*COIN),
self.get_support(up_small, int(20+(window/10)*COIN)),
self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN),
self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN),
])
results = search(order_by=['trending_local'])
self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results])
self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results])
self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])
self.assertEqual([4, 4, 2, 0, 1], [int(c['trending_group']) for c in results])
self.assertEqual([53, 38, 2, 0, -6], [int(c['trending_mixed']) for c in results])
def test_edge(self):
problematic = self.get_stream('Problem', COIN)
self.advance(1, [problematic])
self.advance(zscore.TRENDING_WINDOW, [self.get_support(problematic, 53000000000)])
self.advance(zscore.TRENDING_WINDOW * 2, [self.get_support(problematic, 500000000)])
@unittest.skip("filtering/blocking is applied during ES sync, this needs to be ported to integration test")
class TestContentBlocking(TestSQLDB):
def test_blocking_and_filtering(self):
# content claims and channels
tx0 = self.get_channel('A Channel', COIN, '@channel1')
regular_channel = tx0[0].outputs[0]
tx1 = self.get_stream('Claim One', COIN, 'claim1')
tx2 = self.get_stream('Claim Two', COIN, 'claim2', regular_channel)
tx3 = self.get_stream('Claim Three', COIN, 'claim3')
self.advance(1, [tx0, tx1, tx2, tx3])
claim1, claim2, claim3 = tx1[0].outputs[0], tx2[0].outputs[0], tx3[0].outputs[0]
# block and filter channels
tx0 = self.get_channel('Blocking Channel', COIN, '@block')
tx1 = self.get_channel('Filtering Channel', COIN, '@filter')
blocking_channel = tx0[0].outputs[0]
filtering_channel = tx1[0].outputs[0]
self.sql.blocking_channel_hashes.add(blocking_channel.claim_hash)
self.sql.filtering_channel_hashes.add(filtering_channel.claim_hash)
self.advance(2, [tx0, tx1])
self.assertEqual({}, dict(self.sql.blocked_streams))
self.assertEqual({}, dict(self.sql.blocked_channels))
self.assertEqual({}, dict(self.sql.filtered_streams))
self.assertEqual({}, dict(self.sql.filtered_channels))
# nothing blocked
results, _ = reader.resolve([
claim1.claim_name, claim2.claim_name,
claim3.claim_name, regular_channel.claim_name
])
self.assertEqual(claim1.claim_hash, results[0]['claim_hash'])
self.assertEqual(claim2.claim_hash, results[1]['claim_hash'])
self.assertEqual(claim3.claim_hash, results[2]['claim_hash'])
self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash'])
# nothing filtered
results, censor = censored_search()
self.assertEqual(6, len(results))
self.assertEqual(0, censor.total)
self.assertEqual({}, censor.censored)
# block claim reposted to blocking channel, also gets filtered
repost_tx1 = self.get_repost(claim1.claim_id, COIN, blocking_channel)
repost1 = repost_tx1[0].outputs[0]
self.advance(3, [repost_tx1])
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_streams)
)
self.assertEqual({}, dict(self.sql.blocked_channels))
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_streams)
)
self.assertEqual({}, dict(self.sql.filtered_channels))
# claim is blocked from results by direct repost
results, censor = censored_search(text='Claim')
self.assertEqual(2, len(results))
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
self.assertEqual(claim3.claim_hash, results[1]['claim_hash'])
self.assertEqual(1, censor.total)
self.assertEqual({blocking_channel.claim_hash: 1}, censor.censored)
results, _ = reader.resolve([claim1.claim_name])
self.assertEqual(
f"Resolve of 'claim1' was censored by channel with claim id '{blocking_channel.claim_id}'.",
results[0].args[0]
)
results, _ = reader.resolve([
claim2.claim_name, regular_channel.claim_name # claim2 and channel still resolved
])
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
self.assertEqual(regular_channel.claim_hash, results[1]['claim_hash'])
# block claim indirectly by blocking its parent channel
repost_tx2 = self.get_repost(regular_channel.claim_id, COIN, blocking_channel)
repost2 = repost_tx2[0].outputs[0]
self.advance(4, [repost_tx2])
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_channels)
)
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_channels)
)
# claim in blocked channel is filtered from search and can't resolve
results, censor = censored_search(text='Claim')
self.assertEqual(1, len(results))
self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
self.assertEqual(2, censor.total)
self.assertEqual({blocking_channel.claim_hash: 2}, censor.censored)
results, _ = reader.resolve([
claim2.claim_name, regular_channel.claim_name # claim2 and channel don't resolve
])
self.assertEqual(
f"Resolve of 'claim2' was censored by channel with claim id '{blocking_channel.claim_id}'.",
results[0].args[0]
)
self.assertEqual(
f"Resolve of '@channel1' was censored by channel with claim id '{blocking_channel.claim_id}'.",
results[1].args[0]
)
results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved
self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
# filtered claim is only filtered and not blocked
repost_tx3 = self.get_repost(claim3.claim_id, COIN, filtering_channel)
repost3 = repost_tx3[0].outputs[0]
self.advance(5, [repost_tx3])
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.blocked_channels)
)
self.assertEqual(
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash,
repost3.claim.repost.reference.claim_hash: filtering_channel.claim_hash},
dict(self.sql.filtered_streams)
)
self.assertEqual(
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
dict(self.sql.filtered_channels)
)
# filtered claim doesn't return in search but is resolveable
results, censor = censored_search(text='Claim')
self.assertEqual(0, len(results))
self.assertEqual(3, censor.total)
self.assertEqual({blocking_channel.claim_hash: 2, filtering_channel.claim_hash: 1}, censor.censored)
results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved
self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
# abandon unblocks content
self.advance(6, [
self.get_abandon(repost_tx1),
self.get_abandon(repost_tx2),
self.get_abandon(repost_tx3)
])
self.assertEqual({}, dict(self.sql.blocked_streams))
self.assertEqual({}, dict(self.sql.blocked_channels))
self.assertEqual({}, dict(self.sql.filtered_streams))
self.assertEqual({}, dict(self.sql.filtered_channels))
results, censor = censored_search(text='Claim')
self.assertEqual(3, len(results))
self.assertEqual(0, censor.total)
results, censor = censored_search()
self.assertEqual(6, len(results))
self.assertEqual(0, censor.total)
results, _ = reader.resolve([
claim1.claim_name, claim2.claim_name,
claim3.claim_name, regular_channel.claim_name
])
self.assertEqual(claim1.claim_hash, results[0]['claim_hash'])
self.assertEqual(claim2.claim_hash, results[1]['claim_hash'])
self.assertEqual(claim3.claim_hash, results[2]['claim_hash'])
self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash'])
def test_pagination(self):
one, two, three, four, five, six, seven, filter_channel = self.advance(1, [
self.get_stream('One', COIN),
self.get_stream('Two', COIN),
self.get_stream('Three', COIN),
self.get_stream('Four', COIN),
self.get_stream('Five', COIN),
self.get_stream('Six', COIN),
self.get_stream('Seven', COIN),
self.get_channel('Filtering Channel', COIN, '@filter'),
])
self.sql.filtering_channel_hashes.add(filter_channel.claim_hash)
# nothing filtered
results, censor = censored_search(order_by='^height', offset=1, limit=3)
self.assertEqual(3, len(results))
self.assertEqual(
[two.claim_hash, three.claim_hash, four.claim_hash],
[r['claim_hash'] for r in results]
)
self.assertEqual(0, censor.total)
# content filtered
repost1, repost2 = self.advance(2, [
self.get_repost(one.claim_id, COIN, filter_channel),
self.get_repost(two.claim_id, COIN, filter_channel),
])
results, censor = censored_search(order_by='^height', offset=1, limit=3)
self.assertEqual(3, len(results))
self.assertEqual(
[four.claim_hash, five.claim_hash, six.claim_hash],
[r['claim_hash'] for r in results]
)
self.assertEqual(2, censor.total)
self.assertEqual({filter_channel.claim_hash: 2}, censor.censored)