forked from LBRYCommunity/lbry-sdk
delete unused code
This commit is contained in:
parent
701b39b043
commit
d23a0a8589
6 changed files with 1 additions and 2658 deletions
|
@ -1,22 +0,0 @@
|
|||
class FindShortestID:
|
||||
__slots__ = 'short_id', 'new_id'
|
||||
|
||||
def __init__(self):
|
||||
self.short_id = ''
|
||||
self.new_id = None
|
||||
|
||||
def step(self, other_id, new_id):
|
||||
self.new_id = new_id
|
||||
for i in range(len(self.new_id)):
|
||||
if other_id[i] != self.new_id[i]:
|
||||
if i > len(self.short_id)-1:
|
||||
self.short_id = self.new_id[:i+1]
|
||||
break
|
||||
|
||||
def finalize(self):
|
||||
if self.short_id:
|
||||
return '#'+self.short_id
|
||||
|
||||
|
||||
def register_canonical_functions(connection):
|
||||
connection.create_aggregate("shortest_id", 2, FindShortestID)
|
|
@ -66,7 +66,7 @@ RANGE_FIELDS = {
|
|||
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
|
||||
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
|
||||
'amount', 'effective_amount', 'support_amount',
|
||||
'trending_score', 'censor_type', 'tx_num', 'trending_score_change'
|
||||
'trending_score', 'censor_type', 'tx_num'
|
||||
}
|
||||
|
||||
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
|
||||
|
|
|
@ -1,299 +0,0 @@
|
|||
import math
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
|
||||
HALF_LIFE = 400
|
||||
RENORM_INTERVAL = 1000
|
||||
WHALE_THRESHOLD = 10000.0
|
||||
|
||||
def whale_decay_factor(lbc):
|
||||
"""
|
||||
An additional decay factor applied to whale claims.
|
||||
"""
|
||||
if lbc <= WHALE_THRESHOLD:
|
||||
return 1.0
|
||||
adjusted_half_life = HALF_LIFE/(math.log10(lbc/WHALE_THRESHOLD) + 1.0)
|
||||
return 2.0**(1.0/HALF_LIFE - 1.0/adjusted_half_life)
|
||||
|
||||
|
||||
def soften(lbc):
|
||||
mag = abs(lbc) + 1E-8
|
||||
sign = 1.0 if lbc >= 0.0 else -1.0
|
||||
return sign*mag**0.25
|
||||
|
||||
def delay(lbc: int):
|
||||
if lbc <= 0:
|
||||
return 0
|
||||
elif lbc < 1000000:
|
||||
return int(lbc**0.5)
|
||||
else:
|
||||
return 1000
|
||||
|
||||
|
||||
def inflate_units(height):
|
||||
blocks = height % RENORM_INTERVAL
|
||||
return 2.0 ** (blocks/HALF_LIFE)
|
||||
|
||||
|
||||
PRAGMAS = ["PRAGMA FOREIGN_KEYS = OFF;",
|
||||
"PRAGMA JOURNAL_MODE = WAL;",
|
||||
"PRAGMA SYNCHRONOUS = 0;"]
|
||||
|
||||
|
||||
class TrendingDB:
|
||||
|
||||
def __init__(self, data_dir):
|
||||
"""
|
||||
Opens the trending database in the directory data_dir.
|
||||
For testing, pass data_dir=":memory:"
|
||||
"""
|
||||
if data_dir == ":memory:":
|
||||
path = ":memory:"
|
||||
else:
|
||||
path = os.path.join(data_dir, "trending.db")
|
||||
self.db = sqlite3.connect(path, check_same_thread=False)
|
||||
|
||||
for pragma in PRAGMAS:
|
||||
self.execute(pragma)
|
||||
self.execute("BEGIN;")
|
||||
self._create_tables()
|
||||
self._create_indices()
|
||||
self.execute("COMMIT;")
|
||||
self.pending_events = []
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
return self.db.execute(*args, **kwargs)
|
||||
|
||||
def add_event(self, event):
|
||||
self.pending_events.append(event)
|
||||
# print(f"Added event: {event}.", flush=True)
|
||||
|
||||
|
||||
def _create_tables(self):
|
||||
|
||||
self.execute("""CREATE TABLE IF NOT EXISTS claims
|
||||
(claim_hash BYTES NOT NULL PRIMARY KEY,
|
||||
bid_lbc REAL NOT NULL,
|
||||
support_lbc REAL NOT NULL,
|
||||
trending_score REAL NOT NULL,
|
||||
needs_write BOOLEAN NOT NULL)
|
||||
WITHOUT ROWID;""")
|
||||
|
||||
self.execute("""CREATE TABLE IF NOT EXISTS spikes
|
||||
(claim_hash BYTES NOT NULL REFERENCES claims (claim_hash),
|
||||
activation_height INTEGER NOT NULL,
|
||||
mass REAL NOT NULL);""")
|
||||
|
||||
|
||||
def _create_indices(self):
|
||||
self.execute("CREATE INDEX IF NOT EXISTS idx1 ON spikes\
|
||||
(activation_height, claim_hash, mass);")
|
||||
self.execute("CREATE INDEX IF NOT EXISTS idx2 ON spikes\
|
||||
(claim_hash);")
|
||||
self.execute("CREATE INDEX IF NOT EXISTS idx3 ON claims (trending_score);")
|
||||
self.execute("CREATE INDEX IF NOT EXISTS idx4 ON claims (needs_write, claim_hash);")
|
||||
self.execute("CREATE INDEX IF NOT EXISTS idx5 ON claims (bid_lbc + support_lbc);")
|
||||
|
||||
def get_trending_score(self, claim_hash):
|
||||
result = self.execute("SELECT trending_score FROM claims\
|
||||
WHERE claim_hash = ?;", (claim_hash, ))\
|
||||
.fetchall()
|
||||
if len(result) == 0:
|
||||
return 0.0
|
||||
else:
|
||||
return result[0]
|
||||
|
||||
def _upsert_claim(self, height, event):
|
||||
|
||||
claim_hash = event["claim_hash"]
|
||||
|
||||
# Get old total lbc value of claim
|
||||
old_lbc_pair = self.execute("SELECT bid_lbc, support_lbc FROM claims\
|
||||
WHERE claim_hash = ?;",
|
||||
(claim_hash, )).fetchone()
|
||||
if old_lbc_pair is None:
|
||||
old_lbc_pair = (0.0, 0.0)
|
||||
|
||||
if event["event"] == "upsert":
|
||||
new_lbc_pair = (event["lbc"], old_lbc_pair[1])
|
||||
elif event["event"] == "support":
|
||||
new_lbc_pair = (old_lbc_pair[0], old_lbc_pair[1] + event["lbc"])
|
||||
|
||||
# Upsert the claim
|
||||
self.execute("INSERT INTO claims VALUES (?, ?, ?, ?, 1)\
|
||||
ON CONFLICT (claim_hash) DO UPDATE\
|
||||
SET bid_lbc = excluded.bid_lbc,\
|
||||
support_lbc = excluded.support_lbc;",
|
||||
(claim_hash, new_lbc_pair[0], new_lbc_pair[1], 0.0))
|
||||
|
||||
if self.active:
|
||||
old_lbc, lbc = sum(old_lbc_pair), sum(new_lbc_pair)
|
||||
|
||||
# Add the spike
|
||||
softened_change = soften(lbc - old_lbc)
|
||||
change_in_softened = soften(lbc) - soften(old_lbc)
|
||||
spike_mass = (softened_change**0.25*change_in_softened**0.75).real
|
||||
activation_height = height + delay(lbc)
|
||||
if spike_mass != 0.0:
|
||||
self.execute("INSERT INTO spikes VALUES (?, ?, ?);",
|
||||
(claim_hash, activation_height, spike_mass))
|
||||
|
||||
def _delete_claim(self, claim_hash):
|
||||
self.execute("DELETE FROM spikes WHERE claim_hash = ?;", (claim_hash, ))
|
||||
self.execute("DELETE FROM claims WHERE claim_hash = ?;", (claim_hash, ))
|
||||
|
||||
|
||||
def _apply_spikes(self, height):
|
||||
spikes = self.execute("SELECT claim_hash, mass FROM spikes\
|
||||
WHERE activation_height = ?;",
|
||||
(height, )).fetchall()
|
||||
for claim_hash, mass in spikes: # TODO: executemany for efficiency
|
||||
self.execute("UPDATE claims SET trending_score = trending_score + ?,\
|
||||
needs_write = 1\
|
||||
WHERE claim_hash = ?;",
|
||||
(mass, claim_hash))
|
||||
self.execute("DELETE FROM spikes WHERE activation_height = ?;",
|
||||
(height, ))
|
||||
|
||||
def _decay_whales(self):
|
||||
|
||||
whales = self.execute("SELECT claim_hash, bid_lbc + support_lbc FROM claims\
|
||||
WHERE bid_lbc + support_lbc >= ?;", (WHALE_THRESHOLD, ))\
|
||||
.fetchall()
|
||||
for claim_hash, lbc in whales:
|
||||
factor = whale_decay_factor(lbc)
|
||||
self.execute("UPDATE claims SET trending_score = trending_score*?, needs_write = 1\
|
||||
WHERE claim_hash = ?;", (factor, claim_hash))
|
||||
|
||||
|
||||
def _renorm(self):
|
||||
factor = 2.0**(-RENORM_INTERVAL/HALF_LIFE)
|
||||
|
||||
# Zero small values
|
||||
self.execute("UPDATE claims SET trending_score = 0.0, needs_write = 1\
|
||||
WHERE trending_score <> 0.0 AND ABS(?*trending_score) < 1E-6;",
|
||||
(factor, ))
|
||||
|
||||
# Normalise other values
|
||||
self.execute("UPDATE claims SET trending_score = ?*trending_score, needs_write = 1\
|
||||
WHERE trending_score <> 0.0;", (factor, ))
|
||||
|
||||
|
||||
def process_block(self, height, daemon_height):
|
||||
|
||||
self.active = daemon_height - height <= 10*HALF_LIFE
|
||||
|
||||
self.execute("BEGIN;")
|
||||
|
||||
if self.active:
|
||||
|
||||
# Check for a unit change
|
||||
if height % RENORM_INTERVAL == 0:
|
||||
self._renorm()
|
||||
|
||||
# Apply extra whale decay
|
||||
self._decay_whales()
|
||||
|
||||
# Upsert claims
|
||||
for event in self.pending_events:
|
||||
if event["event"] == "upsert":
|
||||
self._upsert_claim(height, event)
|
||||
|
||||
# Process supports
|
||||
for event in self.pending_events:
|
||||
if event["event"] == "support":
|
||||
self._upsert_claim(height, event)
|
||||
|
||||
# Delete claims
|
||||
for event in self.pending_events:
|
||||
if event["event"] == "delete":
|
||||
self._delete_claim(event["claim_hash"])
|
||||
|
||||
if self.active:
|
||||
# Apply spikes
|
||||
self._apply_spikes(height)
|
||||
|
||||
# Get set of claims that need writing to ES
|
||||
claims_to_write = set()
|
||||
for row in self.db.execute("SELECT claim_hash FROM claims WHERE\
|
||||
needs_write = 1;"):
|
||||
claims_to_write.add(row[0])
|
||||
self.db.execute("UPDATE claims SET needs_write = 0\
|
||||
WHERE needs_write = 1;")
|
||||
|
||||
self.execute("COMMIT;")
|
||||
|
||||
self.pending_events.clear()
|
||||
|
||||
return claims_to_write
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import numpy.random as rng
|
||||
import os
|
||||
|
||||
trending_db = TrendingDB(":memory:")
|
||||
|
||||
heights = list(range(1, 1000))
|
||||
heights = heights + heights[::-1] + heights
|
||||
|
||||
events = [{"height": 45,
|
||||
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
|
||||
{"height": 100,
|
||||
"what": dict(claim_hash="a", event="support", lbc=3.0)},
|
||||
{"height": 150,
|
||||
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
|
||||
{"height": 170,
|
||||
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
|
||||
{"height": 730,
|
||||
"what": dict(claim_hash="a", event="delete")}]
|
||||
inverse_events = [{"height": 730,
|
||||
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
|
||||
{"height": 170,
|
||||
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
|
||||
{"height": 150,
|
||||
"what": dict(claim_hash="a", event="support", lbc=3.0)},
|
||||
{"height": 100,
|
||||
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
|
||||
{"height": 45,
|
||||
"what": dict(claim_hash="a", event="delete")}]
|
||||
|
||||
|
||||
xs, ys = [], []
|
||||
last_height = 0
|
||||
for height in heights:
|
||||
|
||||
# Prepare the changes
|
||||
if height > last_height:
|
||||
es = events
|
||||
else:
|
||||
es = inverse_events
|
||||
|
||||
for event in es:
|
||||
if event["height"] == height:
|
||||
trending_db.add_event(event["what"])
|
||||
|
||||
# Process the block
|
||||
trending_db.process_block(height, height)
|
||||
|
||||
if height > last_height: # Only plot when moving forward
|
||||
xs.append(height)
|
||||
y = trending_db.execute("SELECT trending_score FROM claims;").fetchone()
|
||||
y = 0.0 if y is None else y[0]
|
||||
ys.append(y/inflate_units(height))
|
||||
|
||||
last_height = height
|
||||
|
||||
xs = np.array(xs)
|
||||
ys = np.array(ys)
|
||||
|
||||
plt.figure(1)
|
||||
plt.plot(xs, ys, "o-", alpha=0.2)
|
||||
|
||||
plt.figure(2)
|
||||
plt.plot(xs)
|
||||
plt.show()
|
|
@ -1,955 +0,0 @@
|
|||
import os
|
||||
|
||||
import sqlite3
|
||||
from typing import Union, Tuple, Set, List
|
||||
from itertools import chain
|
||||
from decimal import Decimal
|
||||
from collections import namedtuple
|
||||
from binascii import unhexlify, hexlify
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
from lbry.wallet.server.util import class_logger
|
||||
from lbry.wallet.database import query, constraints_to_sql
|
||||
|
||||
from lbry.schema.tags import clean_tags
|
||||
from lbry.schema.mime_types import guess_stream_type
|
||||
from lbry.wallet import Ledger, RegTestLedger
|
||||
from lbry.wallet.transaction import Transaction, Output
|
||||
from lbry.wallet.server.db.canonical import register_canonical_functions
|
||||
from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS
|
||||
|
||||
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
|
||||
|
||||
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
|
||||
sqlite3.enable_callback_tracebacks(True)
|
||||
|
||||
|
||||
class SQLDB:
|
||||
|
||||
PRAGMAS = """
|
||||
pragma journal_mode=WAL;
|
||||
"""
|
||||
|
||||
CREATE_CLAIM_TABLE = """
|
||||
create table if not exists claim (
|
||||
claim_hash bytes primary key,
|
||||
claim_id text not null,
|
||||
claim_name text not null,
|
||||
normalized text not null,
|
||||
txo_hash bytes not null,
|
||||
tx_position integer not null,
|
||||
amount integer not null,
|
||||
timestamp integer not null, -- last updated timestamp
|
||||
creation_timestamp integer not null,
|
||||
height integer not null, -- last updated height
|
||||
creation_height integer not null,
|
||||
activation_height integer,
|
||||
expiration_height integer not null,
|
||||
release_time integer not null,
|
||||
|
||||
short_url text not null, -- normalized#shortest-unique-claim_id
|
||||
canonical_url text, -- channel's-short_url/normalized#shortest-unique-claim_id-within-channel
|
||||
|
||||
title text,
|
||||
author text,
|
||||
description text,
|
||||
|
||||
claim_type integer,
|
||||
has_source bool,
|
||||
reposted integer default 0,
|
||||
|
||||
-- streams
|
||||
stream_type text,
|
||||
media_type text,
|
||||
fee_amount integer default 0,
|
||||
fee_currency text,
|
||||
duration integer,
|
||||
|
||||
-- reposts
|
||||
reposted_claim_hash bytes,
|
||||
|
||||
-- claims which are channels
|
||||
public_key_bytes bytes,
|
||||
public_key_hash bytes,
|
||||
claims_in_channel integer,
|
||||
|
||||
-- claims which are inside channels
|
||||
channel_hash bytes,
|
||||
channel_join integer, -- height at which claim got valid signature / joined channel
|
||||
signature bytes,
|
||||
signature_digest bytes,
|
||||
signature_valid bool,
|
||||
|
||||
effective_amount integer not null default 0,
|
||||
support_amount integer not null default 0,
|
||||
trending_group integer not null default 0,
|
||||
trending_mixed integer not null default 0,
|
||||
trending_local integer not null default 0,
|
||||
trending_global integer not null default 0
|
||||
);
|
||||
|
||||
create index if not exists claim_normalized_idx on claim (normalized, activation_height);
|
||||
create index if not exists claim_channel_hash_idx on claim (channel_hash, signature, claim_hash);
|
||||
create index if not exists claim_claims_in_channel_idx on claim (signature_valid, channel_hash, normalized);
|
||||
create index if not exists claim_txo_hash_idx on claim (txo_hash);
|
||||
create index if not exists claim_activation_height_idx on claim (activation_height, claim_hash);
|
||||
create index if not exists claim_expiration_height_idx on claim (expiration_height);
|
||||
create index if not exists claim_reposted_claim_hash_idx on claim (reposted_claim_hash);
|
||||
"""
|
||||
|
||||
CREATE_SUPPORT_TABLE = """
|
||||
create table if not exists support (
|
||||
txo_hash bytes primary key,
|
||||
tx_position integer not null,
|
||||
height integer not null,
|
||||
claim_hash bytes not null,
|
||||
amount integer not null
|
||||
);
|
||||
create index if not exists support_claim_hash_idx on support (claim_hash, height);
|
||||
"""
|
||||
|
||||
CREATE_TAG_TABLE = """
|
||||
create table if not exists tag (
|
||||
tag text not null,
|
||||
claim_hash bytes not null,
|
||||
height integer not null
|
||||
);
|
||||
create unique index if not exists tag_claim_hash_tag_idx on tag (claim_hash, tag);
|
||||
"""
|
||||
|
||||
CREATE_LANGUAGE_TABLE = """
|
||||
create table if not exists language (
|
||||
language text not null,
|
||||
claim_hash bytes not null,
|
||||
height integer not null
|
||||
);
|
||||
create unique index if not exists language_claim_hash_language_idx on language (claim_hash, language);
|
||||
"""
|
||||
|
||||
CREATE_CLAIMTRIE_TABLE = """
|
||||
create table if not exists claimtrie (
|
||||
normalized text primary key,
|
||||
claim_hash bytes not null,
|
||||
last_take_over_height integer not null
|
||||
);
|
||||
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
|
||||
"""
|
||||
|
||||
CREATE_CHANGELOG_TRIGGER = """
|
||||
create table if not exists changelog (
|
||||
claim_hash bytes primary key
|
||||
);
|
||||
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
|
||||
create trigger if not exists claim_changelog after update on claim
|
||||
begin
|
||||
insert or ignore into changelog (claim_hash) values (new.claim_hash);
|
||||
end;
|
||||
create trigger if not exists claimtrie_changelog after update on claimtrie
|
||||
begin
|
||||
insert or ignore into changelog (claim_hash) values (new.claim_hash);
|
||||
insert or ignore into changelog (claim_hash) values (old.claim_hash);
|
||||
end;
|
||||
"""
|
||||
|
||||
SEARCH_INDEXES = """
|
||||
-- used by any tag clouds
|
||||
create index if not exists tag_tag_idx on tag (tag, claim_hash);
|
||||
|
||||
-- naked order bys (no filters)
|
||||
create unique index if not exists claim_release_idx on claim (release_time, claim_hash);
|
||||
create unique index if not exists claim_trending_idx on claim (trending_group, trending_mixed, claim_hash);
|
||||
create unique index if not exists claim_effective_amount_idx on claim (effective_amount, claim_hash);
|
||||
|
||||
-- claim_type filter + order by
|
||||
create unique index if not exists claim_type_release_idx on claim (release_time, claim_type, claim_hash);
|
||||
create unique index if not exists claim_type_trending_idx on claim (trending_group, trending_mixed, claim_type, claim_hash);
|
||||
create unique index if not exists claim_type_effective_amount_idx on claim (effective_amount, claim_type, claim_hash);
|
||||
|
||||
-- stream_type filter + order by
|
||||
create unique index if not exists stream_type_release_idx on claim (stream_type, release_time, claim_hash);
|
||||
create unique index if not exists stream_type_trending_idx on claim (stream_type, trending_group, trending_mixed, claim_hash);
|
||||
create unique index if not exists stream_type_effective_amount_idx on claim (stream_type, effective_amount, claim_hash);
|
||||
|
||||
-- channel_hash filter + order by
|
||||
create unique index if not exists channel_hash_release_idx on claim (channel_hash, release_time, claim_hash);
|
||||
create unique index if not exists channel_hash_trending_idx on claim (channel_hash, trending_group, trending_mixed, claim_hash);
|
||||
create unique index if not exists channel_hash_effective_amount_idx on claim (channel_hash, effective_amount, claim_hash);
|
||||
|
||||
-- duration filter + order by
|
||||
create unique index if not exists duration_release_idx on claim (duration, release_time, claim_hash);
|
||||
create unique index if not exists duration_trending_idx on claim (duration, trending_group, trending_mixed, claim_hash);
|
||||
create unique index if not exists duration_effective_amount_idx on claim (duration, effective_amount, claim_hash);
|
||||
|
||||
-- fee_amount + order by
|
||||
create unique index if not exists fee_amount_release_idx on claim (fee_amount, release_time, claim_hash);
|
||||
create unique index if not exists fee_amount_trending_idx on claim (fee_amount, trending_group, trending_mixed, claim_hash);
|
||||
create unique index if not exists fee_amount_effective_amount_idx on claim (fee_amount, effective_amount, claim_hash);
|
||||
|
||||
-- TODO: verify that all indexes below are used
|
||||
create index if not exists claim_height_normalized_idx on claim (height, normalized asc);
|
||||
create index if not exists claim_resolve_idx on claim (normalized, claim_id);
|
||||
create index if not exists claim_id_idx on claim (claim_id, claim_hash);
|
||||
create index if not exists claim_timestamp_idx on claim (timestamp);
|
||||
create index if not exists claim_public_key_hash_idx on claim (public_key_hash);
|
||||
create index if not exists claim_signature_valid_idx on claim (signature_valid);
|
||||
"""
|
||||
|
||||
TAG_INDEXES = '\n'.join(
|
||||
f"create unique index if not exists tag_{tag_key}_idx on tag (tag, claim_hash) WHERE tag='{tag_value}';"
|
||||
for tag_value, tag_key in COMMON_TAGS.items()
|
||||
)
|
||||
|
||||
LANGUAGE_INDEXES = '\n'.join(
|
||||
f"create unique index if not exists language_{language}_idx on language (language, claim_hash) WHERE language='{language}';"
|
||||
for language in INDEXED_LANGUAGES
|
||||
)
|
||||
|
||||
CREATE_TABLES_QUERY = (
|
||||
CREATE_CLAIM_TABLE +
|
||||
CREATE_SUPPORT_TABLE +
|
||||
CREATE_CLAIMTRIE_TABLE +
|
||||
CREATE_TAG_TABLE +
|
||||
CREATE_CHANGELOG_TRIGGER +
|
||||
CREATE_LANGUAGE_TABLE
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self, main, path: str, blocking_channels: list, filtering_channels: list, trending: list):
|
||||
self.main = main
|
||||
self._db_path = path
|
||||
self.db = None
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger
|
||||
self.blocked_streams = None
|
||||
self.blocked_channels = None
|
||||
self.blocking_channel_hashes = {
|
||||
unhexlify(channel_id)[::-1] for channel_id in blocking_channels if channel_id
|
||||
}
|
||||
self.filtered_streams = None
|
||||
self.filtered_channels = None
|
||||
self.filtering_channel_hashes = {
|
||||
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
|
||||
}
|
||||
self.trending = trending
|
||||
self.pending_deletes = set()
|
||||
|
||||
def open(self):
|
||||
self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False, uri=True)
|
||||
|
||||
def namedtuple_factory(cursor, row):
|
||||
Row = namedtuple('Row', (d[0] for d in cursor.description))
|
||||
return Row(*row)
|
||||
self.db.row_factory = namedtuple_factory
|
||||
self.db.executescript(self.PRAGMAS)
|
||||
self.db.executescript(self.CREATE_TABLES_QUERY)
|
||||
register_canonical_functions(self.db)
|
||||
self.blocked_streams = {}
|
||||
self.blocked_channels = {}
|
||||
self.filtered_streams = {}
|
||||
self.filtered_channels = {}
|
||||
self.update_blocked_and_filtered_claims()
|
||||
for algorithm in self.trending:
|
||||
algorithm.install(self.db)
|
||||
|
||||
def close(self):
|
||||
if self.db is not None:
|
||||
self.db.close()
|
||||
|
||||
def update_blocked_and_filtered_claims(self):
|
||||
self.update_claims_from_channel_hashes(
|
||||
self.blocked_streams, self.blocked_channels, self.blocking_channel_hashes
|
||||
)
|
||||
self.update_claims_from_channel_hashes(
|
||||
self.filtered_streams, self.filtered_channels, self.filtering_channel_hashes
|
||||
)
|
||||
self.filtered_streams.update(self.blocked_streams)
|
||||
self.filtered_channels.update(self.blocked_channels)
|
||||
|
||||
def update_claims_from_channel_hashes(self, shared_streams, shared_channels, channel_hashes):
|
||||
streams, channels = {}, {}
|
||||
if channel_hashes:
|
||||
sql = query(
|
||||
"SELECT repost.channel_hash, repost.reposted_claim_hash, target.claim_type "
|
||||
"FROM claim as repost JOIN claim AS target ON (target.claim_hash=repost.reposted_claim_hash)", **{
|
||||
'repost.reposted_claim_hash__is_not_null': 1,
|
||||
'repost.channel_hash__in': channel_hashes
|
||||
}
|
||||
)
|
||||
for blocked_claim in self.execute(*sql):
|
||||
if blocked_claim.claim_type == CLAIM_TYPES['stream']:
|
||||
streams[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
|
||||
elif blocked_claim.claim_type == CLAIM_TYPES['channel']:
|
||||
channels[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
|
||||
shared_streams.clear()
|
||||
shared_streams.update(streams)
|
||||
shared_channels.clear()
|
||||
shared_channels.update(channels)
|
||||
|
||||
@staticmethod
|
||||
def _insert_sql(table: str, data: dict) -> Tuple[str, list]:
|
||||
columns, values = [], []
|
||||
for column, value in data.items():
|
||||
columns.append(column)
|
||||
values.append(value)
|
||||
sql = (
|
||||
f"INSERT INTO {table} ({', '.join(columns)}) "
|
||||
f"VALUES ({', '.join(['?'] * len(values))})"
|
||||
)
|
||||
return sql, values
|
||||
|
||||
@staticmethod
|
||||
def _update_sql(table: str, data: dict, where: str,
|
||||
constraints: Union[list, tuple]) -> Tuple[str, list]:
|
||||
columns, values = [], []
|
||||
for column, value in data.items():
|
||||
columns.append(f"{column} = ?")
|
||||
values.append(value)
|
||||
values.extend(constraints)
|
||||
return f"UPDATE {table} SET {', '.join(columns)} WHERE {where}", values
|
||||
|
||||
@staticmethod
|
||||
def _delete_sql(table: str, constraints: dict) -> Tuple[str, dict]:
|
||||
where, values = constraints_to_sql(constraints)
|
||||
return f"DELETE FROM {table} WHERE {where}", values
|
||||
|
||||
def execute(self, *args):
|
||||
return self.db.execute(*args)
|
||||
|
||||
def executemany(self, *args):
|
||||
return self.db.executemany(*args)
|
||||
|
||||
def begin(self):
|
||||
self.execute('begin;')
|
||||
|
||||
def commit(self):
|
||||
self.execute('commit;')
|
||||
|
||||
def _upsertable_claims(self, txos: List[Output], header, clear_first=False):
|
||||
claim_hashes, claims, tags, languages = set(), [], {}, {}
|
||||
for txo in txos:
|
||||
tx = txo.tx_ref.tx
|
||||
|
||||
try:
|
||||
assert txo.claim_name
|
||||
assert txo.normalized_name
|
||||
except:
|
||||
#self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.")
|
||||
continue
|
||||
|
||||
language = 'none'
|
||||
try:
|
||||
if txo.claim.is_stream and txo.claim.stream.languages:
|
||||
language = txo.claim.stream.languages[0].language
|
||||
except:
|
||||
pass
|
||||
|
||||
claim_hash = txo.claim_hash
|
||||
claim_hashes.add(claim_hash)
|
||||
claim_record = {
|
||||
'claim_hash': claim_hash,
|
||||
'claim_id': txo.claim_id,
|
||||
'claim_name': txo.claim_name,
|
||||
'normalized': txo.normalized_name,
|
||||
'txo_hash': txo.ref.hash,
|
||||
'tx_position': tx.position,
|
||||
'amount': txo.amount,
|
||||
'timestamp': header['timestamp'],
|
||||
'height': tx.height,
|
||||
'title': None,
|
||||
'description': None,
|
||||
'author': None,
|
||||
'duration': None,
|
||||
'claim_type': None,
|
||||
'has_source': False,
|
||||
'stream_type': None,
|
||||
'media_type': None,
|
||||
'release_time': None,
|
||||
'fee_currency': None,
|
||||
'fee_amount': 0,
|
||||
'reposted_claim_hash': None
|
||||
}
|
||||
claims.append(claim_record)
|
||||
|
||||
try:
|
||||
claim = txo.claim
|
||||
except:
|
||||
#self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.")
|
||||
continue
|
||||
|
||||
if claim.is_stream:
|
||||
claim_record['claim_type'] = CLAIM_TYPES['stream']
|
||||
claim_record['has_source'] = claim.stream.has_source
|
||||
claim_record['media_type'] = claim.stream.source.media_type
|
||||
claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])]
|
||||
claim_record['title'] = claim.stream.title
|
||||
claim_record['description'] = claim.stream.description
|
||||
claim_record['author'] = claim.stream.author
|
||||
if claim.stream.video and claim.stream.video.duration:
|
||||
claim_record['duration'] = claim.stream.video.duration
|
||||
if claim.stream.audio and claim.stream.audio.duration:
|
||||
claim_record['duration'] = claim.stream.audio.duration
|
||||
if claim.stream.release_time:
|
||||
claim_record['release_time'] = claim.stream.release_time
|
||||
if claim.stream.has_fee:
|
||||
fee = claim.stream.fee
|
||||
if isinstance(fee.currency, str):
|
||||
claim_record['fee_currency'] = fee.currency.lower()
|
||||
if isinstance(fee.amount, Decimal):
|
||||
if fee.amount >= 0 and int(fee.amount*1000) < 9223372036854775807:
|
||||
claim_record['fee_amount'] = int(fee.amount*1000)
|
||||
elif claim.is_repost:
|
||||
claim_record['claim_type'] = CLAIM_TYPES['repost']
|
||||
claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash
|
||||
elif claim.is_channel:
|
||||
claim_record['claim_type'] = CLAIM_TYPES['channel']
|
||||
elif claim.is_collection:
|
||||
claim_record['claim_type'] = CLAIM_TYPES['collection']
|
||||
|
||||
languages[(language, claim_hash)] = (language, claim_hash, tx.height)
|
||||
|
||||
for tag in clean_tags(claim.message.tags):
|
||||
tags[(tag, claim_hash)] = (tag, claim_hash, tx.height)
|
||||
|
||||
if clear_first:
|
||||
self._clear_claim_metadata(claim_hashes)
|
||||
|
||||
if tags:
|
||||
self.executemany(
|
||||
"INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values()
|
||||
)
|
||||
if languages:
|
||||
self.executemany(
|
||||
"INSERT OR IGNORE INTO language (language, claim_hash, height) VALUES (?, ?, ?)", languages.values()
|
||||
)
|
||||
|
||||
return claims
|
||||
|
||||
def insert_claims(self, txos: List[Output], header):
|
||||
claims = self._upsertable_claims(txos, header)
|
||||
if claims:
|
||||
self.executemany("""
|
||||
INSERT OR REPLACE INTO claim (
|
||||
claim_hash, claim_id, claim_name, normalized, txo_hash, tx_position, amount,
|
||||
claim_type, media_type, stream_type, timestamp, creation_timestamp, has_source,
|
||||
fee_currency, fee_amount, title, description, author, duration, height, reposted_claim_hash,
|
||||
creation_height, release_time, activation_height, expiration_height, short_url)
|
||||
VALUES (
|
||||
:claim_hash, :claim_id, :claim_name, :normalized, :txo_hash, :tx_position, :amount,
|
||||
:claim_type, :media_type, :stream_type, :timestamp, :timestamp, :has_source,
|
||||
:fee_currency, :fee_amount, :title, :description, :author, :duration, :height, :reposted_claim_hash, :height,
|
||||
CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE :timestamp END,
|
||||
CASE WHEN :normalized NOT IN (SELECT normalized FROM claimtrie) THEN :height END,
|
||||
CASE WHEN :height >= 137181 THEN :height+2102400 ELSE :height+262974 END,
|
||||
:claim_name||COALESCE(
|
||||
(SELECT shortest_id(claim_id, :claim_id) FROM claim WHERE normalized = :normalized),
|
||||
'#'||substr(:claim_id, 1, 1)
|
||||
)
|
||||
)""", claims)
|
||||
|
||||
def update_claims(self, txos: List[Output], header):
|
||||
claims = self._upsertable_claims(txos, header, clear_first=True)
|
||||
if claims:
|
||||
self.executemany("""
|
||||
UPDATE claim SET
|
||||
txo_hash=:txo_hash, tx_position=:tx_position, amount=:amount, height=:height,
|
||||
claim_type=:claim_type, media_type=:media_type, stream_type=:stream_type,
|
||||
timestamp=:timestamp, fee_amount=:fee_amount, fee_currency=:fee_currency, has_source=:has_source,
|
||||
title=:title, duration=:duration, description=:description, author=:author, reposted_claim_hash=:reposted_claim_hash,
|
||||
release_time=CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE release_time END
|
||||
WHERE claim_hash=:claim_hash;
|
||||
""", claims)
|
||||
|
||||
def delete_claims(self, claim_hashes: Set[bytes]):
|
||||
""" Deletes claim supports and from claimtrie in case of an abandon. """
|
||||
if claim_hashes:
|
||||
affected_channels = self.execute(*query(
|
||||
"SELECT channel_hash FROM claim", channel_hash__is_not_null=1, claim_hash__in=claim_hashes
|
||||
)).fetchall()
|
||||
for table in ('claim', 'support', 'claimtrie'):
|
||||
self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes}))
|
||||
self._clear_claim_metadata(claim_hashes)
|
||||
return {r.channel_hash for r in affected_channels}
|
||||
return set()
|
||||
|
||||
def delete_claims_above_height(self, height: int):
|
||||
claim_hashes = [x[0] for x in self.execute(
|
||||
"SELECT claim_hash FROM claim WHERE height>?", (height, )
|
||||
).fetchall()]
|
||||
while claim_hashes:
|
||||
batch = set(claim_hashes[:500])
|
||||
claim_hashes = claim_hashes[500:]
|
||||
self.delete_claims(batch)
|
||||
|
||||
def _clear_claim_metadata(self, claim_hashes: Set[bytes]):
|
||||
if claim_hashes:
|
||||
for table in ('tag',): # 'language', 'location', etc
|
||||
self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes}))
|
||||
|
||||
def split_inputs_into_claims_supports_and_other(self, txis):
|
||||
txo_hashes = {txi.txo_ref.hash for txi in txis}
|
||||
claims = self.execute(*query(
|
||||
"SELECT txo_hash, claim_hash, normalized FROM claim", txo_hash__in=txo_hashes
|
||||
)).fetchall()
|
||||
txo_hashes -= {r.txo_hash for r in claims}
|
||||
supports = {}
|
||||
if txo_hashes:
|
||||
supports = self.execute(*query(
|
||||
"SELECT txo_hash, claim_hash FROM support", txo_hash__in=txo_hashes
|
||||
)).fetchall()
|
||||
txo_hashes -= {r.txo_hash for r in supports}
|
||||
return claims, supports, txo_hashes
|
||||
|
||||
def insert_supports(self, txos: List[Output]):
|
||||
supports = []
|
||||
for txo in txos:
|
||||
tx = txo.tx_ref.tx
|
||||
supports.append((
|
||||
txo.ref.hash, tx.position, tx.height,
|
||||
txo.claim_hash, txo.amount
|
||||
))
|
||||
if supports:
|
||||
self.executemany(
|
||||
"INSERT OR IGNORE INTO support ("
|
||||
" txo_hash, tx_position, height, claim_hash, amount"
|
||||
") "
|
||||
"VALUES (?, ?, ?, ?, ?)", supports
|
||||
)
|
||||
|
||||
def delete_supports(self, txo_hashes: Set[bytes]):
|
||||
if txo_hashes:
|
||||
self.execute(*self._delete_sql('support', {'txo_hash__in': txo_hashes}))
|
||||
|
||||
def calculate_reposts(self, txos: List[Output]):
|
||||
targets = set()
|
||||
for txo in txos:
|
||||
try:
|
||||
claim = txo.claim
|
||||
except:
|
||||
continue
|
||||
if claim.is_repost:
|
||||
targets.add((claim.repost.reference.claim_hash,))
|
||||
if targets:
|
||||
self.executemany(
|
||||
"""
|
||||
UPDATE claim SET reposted = (
|
||||
SELECT count(*) FROM claim AS repost WHERE repost.reposted_claim_hash = claim.claim_hash
|
||||
)
|
||||
WHERE claim_hash = ?
|
||||
""", targets
|
||||
)
|
||||
return {target[0] for target in targets}
|
||||
|
||||
def validate_channel_signatures(self, height, new_claims, updated_claims, spent_claims, affected_channels, timer):
|
||||
if not new_claims and not updated_claims and not spent_claims:
|
||||
return
|
||||
|
||||
sub_timer = timer.add_timer('segregate channels and signables')
|
||||
sub_timer.start()
|
||||
channels, new_channel_keys, signables = {}, {}, {}
|
||||
for txo in chain(new_claims, updated_claims):
|
||||
try:
|
||||
claim = txo.claim
|
||||
except:
|
||||
continue
|
||||
if claim.is_channel:
|
||||
channels[txo.claim_hash] = txo
|
||||
new_channel_keys[txo.claim_hash] = claim.channel.public_key_bytes
|
||||
else:
|
||||
signables[txo.claim_hash] = txo
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('make list of channels we need to lookup')
|
||||
sub_timer.start()
|
||||
missing_channel_keys = set()
|
||||
for txo in signables.values():
|
||||
claim = txo.claim
|
||||
if claim.is_signed and claim.signing_channel_hash not in new_channel_keys:
|
||||
missing_channel_keys.add(claim.signing_channel_hash)
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('lookup missing channels')
|
||||
sub_timer.start()
|
||||
all_channel_keys = {}
|
||||
if new_channel_keys or missing_channel_keys or affected_channels:
|
||||
all_channel_keys = dict(self.execute(*query(
|
||||
"SELECT claim_hash, public_key_bytes FROM claim",
|
||||
claim_hash__in=set(new_channel_keys) | missing_channel_keys | affected_channels
|
||||
)))
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('prepare for updating claims')
|
||||
sub_timer.start()
|
||||
changed_channel_keys = {}
|
||||
for claim_hash, new_key in new_channel_keys.items():
|
||||
if claim_hash not in all_channel_keys or all_channel_keys[claim_hash] != new_key:
|
||||
all_channel_keys[claim_hash] = new_key
|
||||
changed_channel_keys[claim_hash] = new_key
|
||||
|
||||
claim_updates = []
|
||||
|
||||
for claim_hash, txo in signables.items():
|
||||
claim = txo.claim
|
||||
update = {
|
||||
'claim_hash': claim_hash,
|
||||
'channel_hash': None,
|
||||
'signature': None,
|
||||
'signature_digest': None,
|
||||
'signature_valid': None
|
||||
}
|
||||
if claim.is_signed:
|
||||
update.update({
|
||||
'channel_hash': claim.signing_channel_hash,
|
||||
'signature': txo.get_encoded_signature(),
|
||||
'signature_digest': txo.get_signature_digest(self.ledger),
|
||||
'signature_valid': 0
|
||||
})
|
||||
claim_updates.append(update)
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('find claims affected by a change in channel key')
|
||||
sub_timer.start()
|
||||
if changed_channel_keys:
|
||||
sql = f"""
|
||||
SELECT * FROM claim WHERE
|
||||
channel_hash IN ({','.join('?' for _ in changed_channel_keys)}) AND
|
||||
signature IS NOT NULL
|
||||
"""
|
||||
for affected_claim in self.execute(sql, list(changed_channel_keys.keys())):
|
||||
if affected_claim.claim_hash not in signables:
|
||||
claim_updates.append({
|
||||
'claim_hash': affected_claim.claim_hash,
|
||||
'channel_hash': affected_claim.channel_hash,
|
||||
'signature': affected_claim.signature,
|
||||
'signature_digest': affected_claim.signature_digest,
|
||||
'signature_valid': 0
|
||||
})
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('verify signatures')
|
||||
sub_timer.start()
|
||||
for update in claim_updates:
|
||||
channel_pub_key = all_channel_keys.get(update['channel_hash'])
|
||||
if channel_pub_key and update['signature']:
|
||||
update['signature_valid'] = Output.is_signature_valid(
|
||||
bytes(update['signature']), bytes(update['signature_digest']), channel_pub_key
|
||||
)
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('update claims')
|
||||
sub_timer.start()
|
||||
if claim_updates:
|
||||
self.executemany(f"""
|
||||
UPDATE claim SET
|
||||
channel_hash=:channel_hash, signature=:signature, signature_digest=:signature_digest,
|
||||
signature_valid=:signature_valid,
|
||||
channel_join=CASE
|
||||
WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN channel_join
|
||||
WHEN :signature_valid=1 THEN {height}
|
||||
END,
|
||||
canonical_url=CASE
|
||||
WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN canonical_url
|
||||
WHEN :signature_valid=1 THEN
|
||||
(SELECT short_url FROM claim WHERE claim_hash=:channel_hash)||'/'||
|
||||
claim_name||COALESCE(
|
||||
(SELECT shortest_id(other_claim.claim_id, claim.claim_id) FROM claim AS other_claim
|
||||
WHERE other_claim.signature_valid = 1 AND
|
||||
other_claim.channel_hash = :channel_hash AND
|
||||
other_claim.normalized = claim.normalized),
|
||||
'#'||substr(claim_id, 1, 1)
|
||||
)
|
||||
END
|
||||
WHERE claim_hash=:claim_hash;
|
||||
""", claim_updates)
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('update claims affected by spent channels')
|
||||
sub_timer.start()
|
||||
if spent_claims:
|
||||
self.execute(
|
||||
f"""
|
||||
UPDATE claim SET
|
||||
signature_valid=CASE WHEN signature IS NOT NULL THEN 0 END,
|
||||
channel_join=NULL, canonical_url=NULL
|
||||
WHERE channel_hash IN ({','.join('?' for _ in spent_claims)})
|
||||
""", list(spent_claims)
|
||||
)
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('update channels')
|
||||
sub_timer.start()
|
||||
if channels:
|
||||
self.executemany(
|
||||
"""
|
||||
UPDATE claim SET
|
||||
public_key_bytes=:public_key_bytes,
|
||||
public_key_hash=:public_key_hash
|
||||
WHERE claim_hash=:claim_hash""", [{
|
||||
'claim_hash': claim_hash,
|
||||
'public_key_bytes': txo.claim.channel.public_key_bytes,
|
||||
'public_key_hash': self.ledger.address_to_hash160(
|
||||
self.ledger.public_key_to_address(txo.claim.channel.public_key_bytes)
|
||||
)
|
||||
} for claim_hash, txo in channels.items()]
|
||||
)
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('update claims_in_channel counts')
|
||||
sub_timer.start()
|
||||
if all_channel_keys:
|
||||
self.executemany(f"""
|
||||
UPDATE claim SET
|
||||
claims_in_channel=(
|
||||
SELECT COUNT(*) FROM claim AS claim_in_channel
|
||||
WHERE claim_in_channel.signature_valid=1 AND
|
||||
claim_in_channel.channel_hash=claim.claim_hash
|
||||
)
|
||||
WHERE claim_hash = ?
|
||||
""", [(channel_hash,) for channel_hash in all_channel_keys])
|
||||
sub_timer.stop()
|
||||
|
||||
sub_timer = timer.add_timer('update blocked claims list')
|
||||
sub_timer.start()
|
||||
if (self.blocking_channel_hashes.intersection(all_channel_keys) or
|
||||
self.filtering_channel_hashes.intersection(all_channel_keys)):
|
||||
self.update_blocked_and_filtered_claims()
|
||||
sub_timer.stop()
|
||||
|
||||
def _update_support_amount(self, claim_hashes):
|
||||
if claim_hashes:
|
||||
self.execute(f"""
|
||||
UPDATE claim SET
|
||||
support_amount = COALESCE(
|
||||
(SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash), 0
|
||||
)
|
||||
WHERE claim_hash IN ({','.join('?' for _ in claim_hashes)})
|
||||
""", claim_hashes)
|
||||
|
||||
def _update_effective_amount(self, height, claim_hashes=None):
|
||||
self.execute(
|
||||
f"UPDATE claim SET effective_amount = amount + support_amount "
|
||||
f"WHERE activation_height = {height}"
|
||||
)
|
||||
if claim_hashes:
|
||||
self.execute(
|
||||
f"UPDATE claim SET effective_amount = amount + support_amount "
|
||||
f"WHERE activation_height < {height} "
|
||||
f" AND claim_hash IN ({','.join('?' for _ in claim_hashes)})",
|
||||
claim_hashes
|
||||
)
|
||||
|
||||
def _calculate_activation_height(self, height):
|
||||
last_take_over_height = f"""COALESCE(
|
||||
(SELECT last_take_over_height FROM claimtrie
|
||||
WHERE claimtrie.normalized=claim.normalized),
|
||||
{height}
|
||||
)
|
||||
"""
|
||||
self.execute(f"""
|
||||
UPDATE claim SET activation_height =
|
||||
{height} + min(4032, cast(({height} - {last_take_over_height}) / 32 AS INT))
|
||||
WHERE activation_height IS NULL
|
||||
""")
|
||||
|
||||
def _perform_overtake(self, height, changed_claim_hashes, deleted_names):
|
||||
deleted_names_sql = claim_hashes_sql = ""
|
||||
if changed_claim_hashes:
|
||||
claim_hashes_sql = f"OR claim_hash IN ({','.join('?' for _ in changed_claim_hashes)})"
|
||||
if deleted_names:
|
||||
deleted_names_sql = f"OR normalized IN ({','.join('?' for _ in deleted_names)})"
|
||||
overtakes = self.execute(f"""
|
||||
SELECT winner.normalized, winner.claim_hash,
|
||||
claimtrie.claim_hash AS current_winner,
|
||||
MAX(winner.effective_amount) AS max_winner_effective_amount
|
||||
FROM (
|
||||
SELECT normalized, claim_hash, effective_amount FROM claim
|
||||
WHERE normalized IN (
|
||||
SELECT normalized FROM claim WHERE activation_height={height} {claim_hashes_sql}
|
||||
) {deleted_names_sql}
|
||||
ORDER BY effective_amount DESC, height ASC, tx_position ASC
|
||||
) AS winner LEFT JOIN claimtrie USING (normalized)
|
||||
GROUP BY winner.normalized
|
||||
HAVING current_winner IS NULL OR current_winner <> winner.claim_hash
|
||||
""", list(changed_claim_hashes)+deleted_names)
|
||||
for overtake in overtakes:
|
||||
if overtake.current_winner:
|
||||
self.execute(
|
||||
f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} "
|
||||
f"WHERE normalized = ?",
|
||||
(overtake.claim_hash, overtake.normalized)
|
||||
)
|
||||
else:
|
||||
self.execute(
|
||||
f"INSERT INTO claimtrie (claim_hash, normalized, last_take_over_height) "
|
||||
f"VALUES (?, ?, {height})",
|
||||
(overtake.claim_hash, overtake.normalized)
|
||||
)
|
||||
self.execute(
|
||||
f"UPDATE claim SET activation_height = {height} WHERE normalized = ? "
|
||||
f"AND (activation_height IS NULL OR activation_height > {height})",
|
||||
(overtake.normalized,)
|
||||
)
|
||||
|
||||
def _copy(self, height):
|
||||
if height > 50:
|
||||
self.execute(f"DROP TABLE claimtrie{height-50}")
|
||||
self.execute(f"CREATE TABLE claimtrie{height} AS SELECT * FROM claimtrie")
|
||||
|
||||
def update_claimtrie(self, height, changed_claim_hashes, deleted_names, timer):
|
||||
r = timer.run
|
||||
binary_claim_hashes = list(changed_claim_hashes)
|
||||
|
||||
r(self._calculate_activation_height, height)
|
||||
r(self._update_support_amount, binary_claim_hashes)
|
||||
|
||||
r(self._update_effective_amount, height, binary_claim_hashes)
|
||||
r(self._perform_overtake, height, binary_claim_hashes, list(deleted_names))
|
||||
|
||||
r(self._update_effective_amount, height)
|
||||
r(self._perform_overtake, height, [], [])
|
||||
|
||||
def get_expiring(self, height):
|
||||
return self.execute(
|
||||
f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}"
|
||||
)
|
||||
|
||||
def enqueue_changes(self):
|
||||
query = """
|
||||
SELECT claimtrie.claim_hash as is_controlling,
|
||||
claimtrie.last_take_over_height,
|
||||
(select group_concat(tag, ',,') from tag where tag.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as tags,
|
||||
(select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages,
|
||||
cr.has_source as reposted_has_source,
|
||||
cr.claim_type as reposted_claim_type,
|
||||
cr.stream_type as reposted_stream_type,
|
||||
cr.media_type as reposted_media_type,
|
||||
cr.duration as reposted_duration,
|
||||
cr.fee_amount as reposted_fee_amount,
|
||||
cr.fee_currency as reposted_fee_currency,
|
||||
claim.*
|
||||
FROM claim LEFT JOIN claimtrie USING (claim_hash) LEFT JOIN claim cr ON cr.claim_hash=claim.reposted_claim_hash
|
||||
WHERE claim.claim_hash in (SELECT claim_hash FROM changelog)
|
||||
"""
|
||||
for claim in self.execute(query):
|
||||
claim = claim._asdict()
|
||||
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
|
||||
claim['censor_type'] = 0
|
||||
censoring_channel_hash = None
|
||||
claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source'])
|
||||
claim['stream_type'] = claim.pop('reposted_stream_type') or claim['stream_type']
|
||||
claim['media_type'] = claim.pop('reposted_media_type') or claim['media_type']
|
||||
claim['fee_amount'] = claim.pop('reposted_fee_amount') or claim['fee_amount']
|
||||
claim['fee_currency'] = claim.pop('reposted_fee_currency') or claim['fee_currency']
|
||||
claim['duration'] = claim.pop('reposted_duration') or claim['duration']
|
||||
for reason_id in id_set:
|
||||
if reason_id in self.blocked_streams:
|
||||
claim['censor_type'] = 2
|
||||
censoring_channel_hash = self.blocked_streams.get(reason_id)
|
||||
elif reason_id in self.blocked_channels:
|
||||
claim['censor_type'] = 2
|
||||
censoring_channel_hash = self.blocked_channels.get(reason_id)
|
||||
elif reason_id in self.filtered_streams:
|
||||
claim['censor_type'] = 1
|
||||
censoring_channel_hash = self.filtered_streams.get(reason_id)
|
||||
elif reason_id in self.filtered_channels:
|
||||
claim['censor_type'] = 1
|
||||
censoring_channel_hash = self.filtered_channels.get(reason_id)
|
||||
claim['censoring_channel_id'] = censoring_channel_hash[::-1].hex() if censoring_channel_hash else None
|
||||
|
||||
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
|
||||
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
|
||||
yield 'update', claim
|
||||
|
||||
def clear_changelog(self):
|
||||
self.execute("delete from changelog;")
|
||||
|
||||
def claim_producer(self):
|
||||
while self.pending_deletes:
|
||||
claim_hash = self.pending_deletes.pop()
|
||||
yield 'delete', hexlify(claim_hash[::-1]).decode()
|
||||
for claim in self.enqueue_changes():
|
||||
yield claim
|
||||
self.clear_changelog()
|
||||
|
||||
def advance_txs(self, height, all_txs, header, daemon_height, timer):
|
||||
insert_claims = []
|
||||
update_claims = []
|
||||
update_claim_hashes = set()
|
||||
delete_claim_hashes = self.pending_deletes
|
||||
insert_supports = []
|
||||
delete_support_txo_hashes = set()
|
||||
recalculate_claim_hashes = set() # added/deleted supports, added/updated claim
|
||||
deleted_claim_names = set()
|
||||
delete_others = set()
|
||||
body_timer = timer.add_timer('body')
|
||||
for position, (etx, txid) in enumerate(all_txs):
|
||||
tx = timer.run(
|
||||
Transaction, etx.raw, height=height, position=position
|
||||
)
|
||||
# Inputs
|
||||
spent_claims, spent_supports, spent_others = timer.run(
|
||||
self.split_inputs_into_claims_supports_and_other, tx.inputs
|
||||
)
|
||||
body_timer.start()
|
||||
delete_claim_hashes.update({r.claim_hash for r in spent_claims})
|
||||
deleted_claim_names.update({r.normalized for r in spent_claims})
|
||||
delete_support_txo_hashes.update({r.txo_hash for r in spent_supports})
|
||||
recalculate_claim_hashes.update({r.claim_hash for r in spent_supports})
|
||||
delete_others.update(spent_others)
|
||||
# Outputs
|
||||
for output in tx.outputs:
|
||||
if output.is_support:
|
||||
insert_supports.append(output)
|
||||
recalculate_claim_hashes.add(output.claim_hash)
|
||||
elif output.script.is_claim_name:
|
||||
insert_claims.append(output)
|
||||
recalculate_claim_hashes.add(output.claim_hash)
|
||||
elif output.script.is_update_claim:
|
||||
claim_hash = output.claim_hash
|
||||
update_claims.append(output)
|
||||
recalculate_claim_hashes.add(claim_hash)
|
||||
body_timer.stop()
|
||||
|
||||
skip_update_claim_timer = timer.add_timer('skip update of abandoned claims')
|
||||
skip_update_claim_timer.start()
|
||||
for updated_claim in list(update_claims):
|
||||
if updated_claim.ref.hash in delete_others:
|
||||
update_claims.remove(updated_claim)
|
||||
for updated_claim in update_claims:
|
||||
claim_hash = updated_claim.claim_hash
|
||||
delete_claim_hashes.discard(claim_hash)
|
||||
update_claim_hashes.add(claim_hash)
|
||||
skip_update_claim_timer.stop()
|
||||
|
||||
skip_insert_claim_timer = timer.add_timer('skip insertion of abandoned claims')
|
||||
skip_insert_claim_timer.start()
|
||||
for new_claim in list(insert_claims):
|
||||
if new_claim.ref.hash in delete_others:
|
||||
if new_claim.claim_hash not in update_claim_hashes:
|
||||
insert_claims.remove(new_claim)
|
||||
skip_insert_claim_timer.stop()
|
||||
|
||||
skip_insert_support_timer = timer.add_timer('skip insertion of abandoned supports')
|
||||
skip_insert_support_timer.start()
|
||||
for new_support in list(insert_supports):
|
||||
if new_support.ref.hash in delete_others:
|
||||
insert_supports.remove(new_support)
|
||||
skip_insert_support_timer.stop()
|
||||
|
||||
expire_timer = timer.add_timer('recording expired claims')
|
||||
expire_timer.start()
|
||||
for expired in self.get_expiring(height):
|
||||
delete_claim_hashes.add(expired.claim_hash)
|
||||
deleted_claim_names.add(expired.normalized)
|
||||
expire_timer.stop()
|
||||
|
||||
r = timer.run
|
||||
affected_channels = r(self.delete_claims, delete_claim_hashes)
|
||||
r(self.delete_supports, delete_support_txo_hashes)
|
||||
r(self.insert_claims, insert_claims, header)
|
||||
r(self.calculate_reposts, insert_claims)
|
||||
r(self.update_claims, update_claims, header)
|
||||
r(self.validate_channel_signatures, height, insert_claims,
|
||||
update_claims, delete_claim_hashes, affected_channels, forward_timer=True)
|
||||
r(self.insert_supports, insert_supports)
|
||||
r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True)
|
||||
for algorithm in self.trending:
|
||||
r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes)
|
|
@ -1,616 +0,0 @@
|
|||
import time
|
||||
import struct
|
||||
import sqlite3
|
||||
import logging
|
||||
from operator import itemgetter
|
||||
from typing import Tuple, List, Dict, Union, Type, Optional
|
||||
from binascii import unhexlify
|
||||
from decimal import Decimal
|
||||
from contextvars import ContextVar
|
||||
from functools import wraps
|
||||
from itertools import chain
|
||||
from dataclasses import dataclass
|
||||
|
||||
from lbry.wallet.database import query, interpolate
|
||||
from lbry.error import ResolveCensoredError
|
||||
from lbry.schema.url import URL, normalize_name
|
||||
from lbry.schema.tags import clean_tags
|
||||
from lbry.schema.result import Outputs, Censor
|
||||
from lbry.wallet import Ledger, RegTestLedger
|
||||
|
||||
from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
|
||||
|
||||
|
||||
class SQLiteOperationalError(sqlite3.OperationalError):
|
||||
def __init__(self, metrics):
|
||||
super().__init__('sqlite query errored')
|
||||
self.metrics = metrics
|
||||
|
||||
|
||||
class SQLiteInterruptedError(sqlite3.OperationalError):
|
||||
def __init__(self, metrics):
|
||||
super().__init__('sqlite query interrupted')
|
||||
self.metrics = metrics
|
||||
|
||||
|
||||
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
|
||||
sqlite3.enable_callback_tracebacks(True)
|
||||
|
||||
INTEGER_PARAMS = {
|
||||
'height', 'creation_height', 'activation_height', 'expiration_height',
|
||||
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
|
||||
'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel',
|
||||
'amount', 'effective_amount', 'support_amount',
|
||||
'trending_group', 'trending_mixed',
|
||||
'trending_local', 'trending_global',
|
||||
}
|
||||
|
||||
SEARCH_PARAMS = {
|
||||
'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
|
||||
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
|
||||
'has_channel_signature', 'signature_valid',
|
||||
'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id',
|
||||
'any_locations', 'all_locations', 'not_locations',
|
||||
'any_languages', 'all_languages', 'not_languages',
|
||||
'is_controlling', 'limit', 'offset', 'order_by',
|
||||
'no_totals', 'has_source'
|
||||
} | INTEGER_PARAMS
|
||||
|
||||
|
||||
ORDER_FIELDS = {
|
||||
'name', 'claim_hash'
|
||||
} | INTEGER_PARAMS
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReaderState:
|
||||
db: sqlite3.Connection
|
||||
stack: List[List]
|
||||
metrics: Dict
|
||||
is_tracking_metrics: bool
|
||||
ledger: Type[Ledger]
|
||||
query_timeout: float
|
||||
log: logging.Logger
|
||||
blocked_streams: Dict
|
||||
blocked_channels: Dict
|
||||
filtered_streams: Dict
|
||||
filtered_channels: Dict
|
||||
|
||||
def close(self):
|
||||
self.db.close()
|
||||
|
||||
def reset_metrics(self):
|
||||
self.stack = []
|
||||
self.metrics = {}
|
||||
|
||||
def set_query_timeout(self):
|
||||
stop_at = time.perf_counter() + self.query_timeout
|
||||
|
||||
def interruptor():
|
||||
if time.perf_counter() >= stop_at:
|
||||
self.db.interrupt()
|
||||
return
|
||||
|
||||
self.db.set_progress_handler(interruptor, 100)
|
||||
|
||||
def get_resolve_censor(self) -> Censor:
|
||||
return Censor(Censor.RESOLVE)
|
||||
|
||||
def get_search_censor(self, limit_claims_per_channel: int) -> Censor:
|
||||
return Censor(Censor.SEARCH)
|
||||
|
||||
|
||||
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
|
||||
|
||||
|
||||
def row_factory(cursor, row):
|
||||
return {
|
||||
k[0]: (set(row[i].split(',')) if k[0] == 'tags' else row[i])
|
||||
for i, k in enumerate(cursor.description)
|
||||
}
|
||||
|
||||
|
||||
def initializer(log, _path, _ledger_name, query_timeout, _measure=False, block_and_filter=None):
|
||||
db = sqlite3.connect(_path, isolation_level=None, uri=True)
|
||||
db.row_factory = row_factory
|
||||
if block_and_filter:
|
||||
blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter
|
||||
else:
|
||||
blocked_streams = blocked_channels = filtered_streams = filtered_channels = {}
|
||||
ctx.set(
|
||||
ReaderState(
|
||||
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
|
||||
ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger,
|
||||
query_timeout=query_timeout, log=log,
|
||||
blocked_streams=blocked_streams, blocked_channels=blocked_channels,
|
||||
filtered_streams=filtered_streams, filtered_channels=filtered_channels,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def cleanup():
|
||||
ctx.get().close()
|
||||
ctx.set(None)
|
||||
|
||||
|
||||
def measure(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
state = ctx.get()
|
||||
if not state.is_tracking_metrics:
|
||||
return func(*args, **kwargs)
|
||||
metric = {}
|
||||
state.metrics.setdefault(func.__name__, []).append(metric)
|
||||
state.stack.append([])
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
finally:
|
||||
elapsed = int((time.perf_counter()-start)*1000)
|
||||
metric['total'] = elapsed
|
||||
metric['isolated'] = (elapsed-sum(state.stack.pop()))
|
||||
if state.stack:
|
||||
state.stack[-1].append(elapsed)
|
||||
return wrapper
|
||||
|
||||
|
||||
def reports_metrics(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
state = ctx.get()
|
||||
if not state.is_tracking_metrics:
|
||||
return func(*args, **kwargs)
|
||||
state.reset_metrics()
|
||||
r = func(*args, **kwargs)
|
||||
return r, state.metrics
|
||||
return wrapper
|
||||
|
||||
|
||||
@reports_metrics
|
||||
def search_to_bytes(constraints) -> Union[bytes, Tuple[bytes, Dict]]:
|
||||
return encode_result(search(constraints))
|
||||
|
||||
|
||||
@reports_metrics
|
||||
def resolve_to_bytes(urls) -> Union[bytes, Tuple[bytes, Dict]]:
|
||||
return encode_result(resolve(urls))
|
||||
|
||||
|
||||
def encode_result(result):
|
||||
return Outputs.to_bytes(*result)
|
||||
|
||||
|
||||
@measure
|
||||
def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor) -> List:
|
||||
context = ctx.get()
|
||||
context.set_query_timeout()
|
||||
try:
|
||||
rows = context.db.execute(sql, values).fetchall()
|
||||
return rows[row_offset:row_limit]
|
||||
except sqlite3.OperationalError as err:
|
||||
plain_sql = interpolate(sql, values)
|
||||
if context.is_tracking_metrics:
|
||||
context.metrics['execute_query'][-1]['sql'] = plain_sql
|
||||
context.log.exception('failed running query', exc_info=err)
|
||||
raise SQLiteOperationalError(context.metrics)
|
||||
|
||||
|
||||
def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
|
||||
if 'order_by' in constraints:
|
||||
order_by_parts = constraints['order_by']
|
||||
if isinstance(order_by_parts, str):
|
||||
order_by_parts = [order_by_parts]
|
||||
sql_order_by = []
|
||||
for order_by in order_by_parts:
|
||||
is_asc = order_by.startswith('^')
|
||||
column = order_by[1:] if is_asc else order_by
|
||||
if column not in ORDER_FIELDS:
|
||||
raise NameError(f'{column} is not a valid order_by field')
|
||||
if column == 'name':
|
||||
column = 'normalized'
|
||||
sql_order_by.append(
|
||||
f"claim.{column} ASC" if is_asc else f"claim.{column} DESC"
|
||||
)
|
||||
constraints['order_by'] = sql_order_by
|
||||
|
||||
ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'}
|
||||
for constraint in INTEGER_PARAMS:
|
||||
if constraint in constraints:
|
||||
value = constraints.pop(constraint)
|
||||
postfix = ''
|
||||
if isinstance(value, str):
|
||||
if len(value) >= 2 and value[:2] in ops:
|
||||
postfix, value = ops[value[:2]], value[2:]
|
||||
elif len(value) >= 1 and value[0] in ops:
|
||||
postfix, value = ops[value[0]], value[1:]
|
||||
if constraint == 'fee_amount':
|
||||
value = Decimal(value)*1000
|
||||
constraints[f'claim.{constraint}{postfix}'] = int(value)
|
||||
|
||||
if constraints.pop('is_controlling', False):
|
||||
if {'sequence', 'amount_order'}.isdisjoint(constraints):
|
||||
for_count = False
|
||||
constraints['claimtrie.claim_hash__is_not_null'] = ''
|
||||
if 'sequence' in constraints:
|
||||
constraints['order_by'] = 'claim.activation_height ASC'
|
||||
constraints['offset'] = int(constraints.pop('sequence')) - 1
|
||||
constraints['limit'] = 1
|
||||
if 'amount_order' in constraints:
|
||||
constraints['order_by'] = 'claim.effective_amount DESC'
|
||||
constraints['offset'] = int(constraints.pop('amount_order')) - 1
|
||||
constraints['limit'] = 1
|
||||
|
||||
if 'claim_id' in constraints:
|
||||
claim_id = constraints.pop('claim_id')
|
||||
if len(claim_id) == 40:
|
||||
constraints['claim.claim_id'] = claim_id
|
||||
else:
|
||||
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
|
||||
elif 'claim_ids' in constraints:
|
||||
constraints['claim.claim_id__in'] = set(constraints.pop('claim_ids'))
|
||||
|
||||
if 'reposted_claim_id' in constraints:
|
||||
constraints['claim.reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1]
|
||||
|
||||
if 'name' in constraints:
|
||||
constraints['claim.normalized'] = normalize_name(constraints.pop('name'))
|
||||
|
||||
if 'public_key_id' in constraints:
|
||||
constraints['claim.public_key_hash'] = (
|
||||
ctx.get().ledger.address_to_hash160(constraints.pop('public_key_id')))
|
||||
if 'channel_hash' in constraints:
|
||||
constraints['claim.channel_hash'] = constraints.pop('channel_hash')
|
||||
if 'channel_ids' in constraints:
|
||||
channel_ids = constraints.pop('channel_ids')
|
||||
if channel_ids:
|
||||
constraints['claim.channel_hash__in'] = {
|
||||
unhexlify(cid)[::-1] for cid in channel_ids if cid
|
||||
}
|
||||
if 'not_channel_ids' in constraints:
|
||||
not_channel_ids = constraints.pop('not_channel_ids')
|
||||
if not_channel_ids:
|
||||
not_channel_ids_binary = {
|
||||
unhexlify(ncid)[::-1] for ncid in not_channel_ids
|
||||
}
|
||||
constraints['claim.claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary
|
||||
if constraints.get('has_channel_signature', False):
|
||||
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
|
||||
else:
|
||||
constraints['null_or_not_channel__or'] = {
|
||||
'claim.signature_valid__is_null': True,
|
||||
'claim.channel_hash__not_in': not_channel_ids_binary
|
||||
}
|
||||
if 'signature_valid' in constraints:
|
||||
has_channel_signature = constraints.pop('has_channel_signature', False)
|
||||
if has_channel_signature:
|
||||
constraints['claim.signature_valid'] = constraints.pop('signature_valid')
|
||||
else:
|
||||
constraints['null_or_signature__or'] = {
|
||||
'claim.signature_valid__is_null': True,
|
||||
'claim.signature_valid': constraints.pop('signature_valid')
|
||||
}
|
||||
elif constraints.pop('has_channel_signature', False):
|
||||
constraints['claim.signature_valid__is_not_null'] = True
|
||||
|
||||
if 'txid' in constraints:
|
||||
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
|
||||
nout = constraints.pop('nout', 0)
|
||||
constraints['claim.txo_hash'] = tx_hash + struct.pack('<I', nout)
|
||||
|
||||
if 'claim_type' in constraints:
|
||||
claim_types = constraints.pop('claim_type')
|
||||
if isinstance(claim_types, str):
|
||||
claim_types = [claim_types]
|
||||
if claim_types:
|
||||
constraints['claim.claim_type__in'] = {
|
||||
CLAIM_TYPES[claim_type] for claim_type in claim_types
|
||||
}
|
||||
if 'stream_types' in constraints:
|
||||
stream_types = constraints.pop('stream_types')
|
||||
if stream_types:
|
||||
constraints['claim.stream_type__in'] = {
|
||||
STREAM_TYPES[stream_type] for stream_type in stream_types
|
||||
}
|
||||
if 'media_types' in constraints:
|
||||
media_types = constraints.pop('media_types')
|
||||
if media_types:
|
||||
constraints['claim.media_type__in'] = set(media_types)
|
||||
|
||||
if 'fee_currency' in constraints:
|
||||
constraints['claim.fee_currency'] = constraints.pop('fee_currency').lower()
|
||||
|
||||
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
|
||||
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
|
||||
_apply_constraints_for_array_attributes(constraints, 'location', lambda _: _, for_count)
|
||||
|
||||
select = f"SELECT {cols} FROM claim"
|
||||
if not for_count:
|
||||
select += " LEFT JOIN claimtrie USING (claim_hash)"
|
||||
return query(select, **constraints)
|
||||
|
||||
|
||||
def select_claims(censor: Censor, cols: str, for_count=False, **constraints) -> List:
|
||||
if 'channel' in constraints:
|
||||
channel_url = constraints.pop('channel')
|
||||
match = resolve_url(channel_url)
|
||||
if isinstance(match, dict):
|
||||
constraints['channel_hash'] = match['claim_hash']
|
||||
else:
|
||||
return [{'row_count': 0}] if cols == 'count(*) as row_count' else []
|
||||
row_offset = constraints.pop('offset', 0)
|
||||
row_limit = constraints.pop('limit', 20)
|
||||
sql, values = claims_query(cols, for_count, **constraints)
|
||||
return execute_query(sql, values, row_offset, row_limit, censor)
|
||||
|
||||
|
||||
@measure
|
||||
def count_claims(**constraints) -> int:
|
||||
constraints.pop('offset', None)
|
||||
constraints.pop('limit', None)
|
||||
constraints.pop('order_by', None)
|
||||
count = select_claims(Censor(Censor.SEARCH), 'count(*) as row_count', for_count=True, **constraints)
|
||||
return count[0]['row_count']
|
||||
|
||||
|
||||
def search_claims(censor: Censor, **constraints) -> List:
|
||||
return select_claims(
|
||||
censor,
|
||||
"""
|
||||
claimtrie.claim_hash as is_controlling,
|
||||
claimtrie.last_take_over_height,
|
||||
claim.claim_hash, claim.txo_hash,
|
||||
claim.claims_in_channel, claim.reposted,
|
||||
claim.height, claim.creation_height,
|
||||
claim.activation_height, claim.expiration_height,
|
||||
claim.effective_amount, claim.support_amount,
|
||||
claim.trending_group, claim.trending_mixed,
|
||||
claim.trending_local, claim.trending_global,
|
||||
claim.short_url, claim.canonical_url,
|
||||
claim.channel_hash, claim.reposted_claim_hash,
|
||||
claim.signature_valid
|
||||
""", **constraints
|
||||
)
|
||||
|
||||
|
||||
def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]):
|
||||
censor = ctx.get().get_resolve_censor()
|
||||
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows)))
|
||||
channel_hashes = set(chain(
|
||||
filter(None, map(itemgetter('channel_hash'), txo_rows)),
|
||||
censor_channels
|
||||
))
|
||||
|
||||
reposted_txos = []
|
||||
if repost_hashes:
|
||||
reposted_txos = search_claims(censor, **{'claim.claim_hash__in': repost_hashes})
|
||||
channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos)))
|
||||
|
||||
channel_txos = []
|
||||
if channel_hashes:
|
||||
channel_txos = search_claims(censor, **{'claim.claim_hash__in': channel_hashes})
|
||||
|
||||
# channels must come first for client side inflation to work properly
|
||||
return channel_txos + reposted_txos
|
||||
|
||||
@measure
|
||||
def search(constraints) -> Tuple[List, List, int, int, Censor]:
|
||||
assert set(constraints).issubset(SEARCH_PARAMS), \
|
||||
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
|
||||
total = None
|
||||
limit_claims_per_channel = constraints.pop('limit_claims_per_channel', None)
|
||||
if not constraints.pop('no_totals', False):
|
||||
total = count_claims(**constraints)
|
||||
constraints['offset'] = abs(constraints.get('offset', 0))
|
||||
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
|
||||
context = ctx.get()
|
||||
search_censor = context.get_search_censor(limit_claims_per_channel)
|
||||
txo_rows = search_claims(search_censor, **constraints)
|
||||
extra_txo_rows = _get_referenced_rows(txo_rows, search_censor.censored.keys())
|
||||
return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor
|
||||
|
||||
|
||||
@measure
|
||||
def resolve(urls) -> Tuple[List, List]:
|
||||
txo_rows = [resolve_url(raw_url) for raw_url in urls]
|
||||
extra_txo_rows = _get_referenced_rows(
|
||||
[txo for txo in txo_rows if isinstance(txo, dict)],
|
||||
[txo.censor_id for txo in txo_rows if isinstance(txo, ResolveCensoredError)]
|
||||
)
|
||||
return txo_rows, extra_txo_rows
|
||||
|
||||
|
||||
@measure
|
||||
def resolve_url(raw_url):
|
||||
censor = ctx.get().get_resolve_censor()
|
||||
|
||||
try:
|
||||
url = URL.parse(raw_url)
|
||||
except ValueError as e:
|
||||
return e
|
||||
|
||||
channel = None
|
||||
|
||||
if url.has_channel:
|
||||
query = url.channel.to_dict()
|
||||
if set(query) == {'name'}:
|
||||
query['is_controlling'] = True
|
||||
else:
|
||||
query['order_by'] = ['^creation_height']
|
||||
matches = search_claims(censor, **query, limit=1)
|
||||
if matches:
|
||||
channel = matches[0]
|
||||
elif censor.censored:
|
||||
return ResolveCensoredError(raw_url, next(iter(censor.censored)))
|
||||
else:
|
||||
return LookupError(f'Could not find channel in "{raw_url}".')
|
||||
|
||||
if url.has_stream:
|
||||
query = url.stream.to_dict()
|
||||
if channel is not None:
|
||||
if set(query) == {'name'}:
|
||||
# temporarily emulate is_controlling for claims in channel
|
||||
query['order_by'] = ['effective_amount', '^height']
|
||||
else:
|
||||
query['order_by'] = ['^channel_join']
|
||||
query['channel_hash'] = channel['claim_hash']
|
||||
query['signature_valid'] = 1
|
||||
elif set(query) == {'name'}:
|
||||
query['is_controlling'] = 1
|
||||
matches = search_claims(censor, **query, limit=1)
|
||||
if matches:
|
||||
return matches[0]
|
||||
elif censor.censored:
|
||||
return ResolveCensoredError(raw_url, next(iter(censor.censored)))
|
||||
else:
|
||||
return LookupError(f'Could not find claim at "{raw_url}".')
|
||||
|
||||
return channel
|
||||
|
||||
|
||||
CLAIM_HASH_OR_REPOST_HASH_SQL = f"""
|
||||
CASE WHEN claim.claim_type = {CLAIM_TYPES['repost']}
|
||||
THEN claim.reposted_claim_hash
|
||||
ELSE claim.claim_hash
|
||||
END
|
||||
"""
|
||||
|
||||
|
||||
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
|
||||
any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
|
||||
all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
|
||||
not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
|
||||
|
||||
all_items = {item for item in all_items if item not in not_items}
|
||||
any_items = {item for item in any_items if item not in not_items}
|
||||
|
||||
any_queries = {}
|
||||
|
||||
if attr == 'tag':
|
||||
common_tags = any_items & COMMON_TAGS.keys()
|
||||
if common_tags:
|
||||
any_items -= common_tags
|
||||
if len(common_tags) < 5:
|
||||
for item in common_tags:
|
||||
index_name = COMMON_TAGS[item]
|
||||
any_queries[f'#_common_tag_{index_name}'] = f"""
|
||||
EXISTS(
|
||||
SELECT 1 FROM tag INDEXED BY tag_{index_name}_idx
|
||||
WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
|
||||
AND tag = '{item}'
|
||||
)
|
||||
"""
|
||||
elif len(common_tags) >= 5:
|
||||
constraints.update({
|
||||
f'$any_common_tag{i}': item for i, item in enumerate(common_tags)
|
||||
})
|
||||
values = ', '.join(
|
||||
f':$any_common_tag{i}' for i in range(len(common_tags))
|
||||
)
|
||||
any_queries[f'#_any_common_tags'] = f"""
|
||||
EXISTS(
|
||||
SELECT 1 FROM tag WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
|
||||
AND tag IN ({values})
|
||||
)
|
||||
"""
|
||||
elif attr == 'language':
|
||||
indexed_languages = any_items & set(INDEXED_LANGUAGES)
|
||||
if indexed_languages:
|
||||
any_items -= indexed_languages
|
||||
for language in indexed_languages:
|
||||
any_queries[f'#_any_common_languages_{language}'] = f"""
|
||||
EXISTS(
|
||||
SELECT 1 FROM language INDEXED BY language_{language}_idx
|
||||
WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=language.claim_hash
|
||||
AND language = '{language}'
|
||||
)
|
||||
"""
|
||||
|
||||
if any_items:
|
||||
|
||||
constraints.update({
|
||||
f'$any_{attr}{i}': item for i, item in enumerate(any_items)
|
||||
})
|
||||
values = ', '.join(
|
||||
f':$any_{attr}{i}' for i in range(len(any_items))
|
||||
)
|
||||
if for_count or attr == 'tag':
|
||||
if attr == 'tag':
|
||||
any_queries[f'#_any_{attr}'] = f"""
|
||||
((claim.claim_type != {CLAIM_TYPES['repost']}
|
||||
AND claim.claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR
|
||||
(claim.claim_type == {CLAIM_TYPES['repost']} AND
|
||||
claim.reposted_claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))))
|
||||
"""
|
||||
else:
|
||||
any_queries[f'#_any_{attr}'] = f"""
|
||||
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
|
||||
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
|
||||
)
|
||||
"""
|
||||
else:
|
||||
any_queries[f'#_any_{attr}'] = f"""
|
||||
EXISTS(
|
||||
SELECT 1 FROM {attr} WHERE
|
||||
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
|
||||
AND {attr} IN ({values})
|
||||
)
|
||||
"""
|
||||
|
||||
if len(any_queries) == 1:
|
||||
constraints.update(any_queries)
|
||||
elif len(any_queries) > 1:
|
||||
constraints[f'ORed_{attr}_queries__any'] = any_queries
|
||||
|
||||
if all_items:
|
||||
constraints[f'$all_{attr}_count'] = len(all_items)
|
||||
constraints.update({
|
||||
f'$all_{attr}{i}': item for i, item in enumerate(all_items)
|
||||
})
|
||||
values = ', '.join(
|
||||
f':$all_{attr}{i}' for i in range(len(all_items))
|
||||
)
|
||||
if for_count:
|
||||
constraints[f'#_all_{attr}'] = f"""
|
||||
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
|
||||
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
|
||||
GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count
|
||||
)
|
||||
"""
|
||||
else:
|
||||
constraints[f'#_all_{attr}'] = f"""
|
||||
{len(all_items)}=(
|
||||
SELECT count(*) FROM {attr} WHERE
|
||||
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
|
||||
AND {attr} IN ({values})
|
||||
)
|
||||
"""
|
||||
|
||||
if not_items:
|
||||
constraints.update({
|
||||
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
|
||||
})
|
||||
values = ', '.join(
|
||||
f':$not_{attr}{i}' for i in range(len(not_items))
|
||||
)
|
||||
if for_count:
|
||||
if attr == 'tag':
|
||||
constraints[f'#_not_{attr}'] = f"""
|
||||
((claim.claim_type != {CLAIM_TYPES['repost']}
|
||||
AND claim.claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR
|
||||
(claim.claim_type == {CLAIM_TYPES['repost']} AND
|
||||
claim.reposted_claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))))
|
||||
"""
|
||||
else:
|
||||
constraints[f'#_not_{attr}'] = f"""
|
||||
{CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN (
|
||||
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
|
||||
)
|
||||
"""
|
||||
else:
|
||||
constraints[f'#_not_{attr}'] = f"""
|
||||
NOT EXISTS(
|
||||
SELECT 1 FROM {attr} WHERE
|
||||
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
|
||||
AND {attr} IN ({values})
|
||||
)
|
||||
"""
|
|
@ -1,765 +0,0 @@
|
|||
import unittest
|
||||
import ecdsa
|
||||
import hashlib
|
||||
import logging
|
||||
from binascii import hexlify
|
||||
from typing import List, Tuple
|
||||
|
||||
from lbry.wallet.constants import COIN, NULL_HASH32
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.schema.result import Censor
|
||||
from lbry.wallet.server.db import writer
|
||||
from lbry.wallet.server.coin import LBCRegTest
|
||||
from lbry.wallet.server.db.trending import zscore
|
||||
from lbry.wallet.server.db.canonical import FindShortestID
|
||||
from lbry.wallet.transaction import Transaction, Input, Output
|
||||
try:
|
||||
import reader
|
||||
except:
|
||||
from . import reader
|
||||
|
||||
|
||||
def get_output(amount=COIN, pubkey_hash=NULL_HASH32):
|
||||
return Transaction() \
|
||||
.add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \
|
||||
.outputs[0]
|
||||
|
||||
|
||||
def get_input():
|
||||
return Input.spend(get_output())
|
||||
|
||||
|
||||
def get_tx():
|
||||
return Transaction().add_inputs([get_input()])
|
||||
|
||||
|
||||
def search(**constraints) -> List:
|
||||
return reader.search_claims(Censor(Censor.SEARCH), **constraints)
|
||||
|
||||
|
||||
def censored_search(**constraints) -> Tuple[List, Censor]:
|
||||
rows, _, _, _, censor = reader.search(constraints)
|
||||
return rows, censor
|
||||
|
||||
|
||||
class TestSQLDB(unittest.TestCase):
|
||||
query_timeout = 0.25
|
||||
|
||||
def setUp(self):
|
||||
self.first_sync = False
|
||||
self.daemon_height = 1
|
||||
self.coin = LBCRegTest()
|
||||
db_url = 'file:test_sqldb?mode=memory&cache=shared'
|
||||
self.sql = writer.SQLDB(self, db_url, [], [], [zscore])
|
||||
self.addCleanup(self.sql.close)
|
||||
self.sql.open()
|
||||
reader.initializer(
|
||||
logging.getLogger(__name__), db_url, 'regtest',
|
||||
self.query_timeout, block_and_filter=(
|
||||
self.sql.blocked_streams, self.sql.blocked_channels,
|
||||
self.sql.filtered_streams, self.sql.filtered_channels
|
||||
)
|
||||
)
|
||||
self.addCleanup(reader.cleanup)
|
||||
self._current_height = 0
|
||||
self._txos = {}
|
||||
|
||||
def _make_tx(self, output, txi=None):
|
||||
tx = get_tx().add_outputs([output])
|
||||
if txi is not None:
|
||||
tx.add_inputs([txi])
|
||||
self._txos[output.ref.hash] = output
|
||||
return tx, tx.hash
|
||||
|
||||
def _set_channel_key(self, channel, key):
|
||||
private_key = ecdsa.SigningKey.from_string(key*32, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256)
|
||||
channel.private_key = private_key
|
||||
channel.claim.channel.public_key_bytes = private_key.get_verifying_key().to_der()
|
||||
channel.script.generate()
|
||||
|
||||
def get_channel(self, title, amount, name='@foo', key=b'a'):
|
||||
claim = Claim()
|
||||
claim.channel.title = title
|
||||
channel = Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc')
|
||||
self._set_channel_key(channel, key)
|
||||
return self._make_tx(channel)
|
||||
|
||||
def get_channel_update(self, channel, amount, key=b'a'):
|
||||
self._set_channel_key(channel, key)
|
||||
return self._make_tx(
|
||||
Output.pay_update_claim_pubkey_hash(
|
||||
amount, channel.claim_name, channel.claim_id, channel.claim, b'abc'
|
||||
),
|
||||
Input.spend(channel)
|
||||
)
|
||||
|
||||
def get_stream(self, title, amount, name='foo', channel=None, **kwargs):
|
||||
claim = Claim()
|
||||
claim.stream.update(title=title, **kwargs)
|
||||
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc'))
|
||||
if channel:
|
||||
result[0].outputs[0].sign(channel)
|
||||
result[0]._reset()
|
||||
return result
|
||||
|
||||
def get_stream_update(self, tx, amount, channel=None):
|
||||
stream = Transaction(tx[0].raw).outputs[0]
|
||||
result = self._make_tx(
|
||||
Output.pay_update_claim_pubkey_hash(
|
||||
amount, stream.claim_name, stream.claim_id, stream.claim, b'abc'
|
||||
),
|
||||
Input.spend(stream)
|
||||
)
|
||||
if channel:
|
||||
result[0].outputs[0].sign(channel)
|
||||
result[0]._reset()
|
||||
return result
|
||||
|
||||
def get_repost(self, claim_id, amount, channel):
|
||||
claim = Claim()
|
||||
claim.repost.reference.claim_id = claim_id
|
||||
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, 'repost', claim, b'abc'))
|
||||
result[0].outputs[0].sign(channel)
|
||||
result[0]._reset()
|
||||
return result
|
||||
|
||||
def get_abandon(self, tx):
|
||||
claim = Transaction(tx[0].raw).outputs[0]
|
||||
return self._make_tx(
|
||||
Output.pay_pubkey_hash(claim.amount, b'abc'),
|
||||
Input.spend(claim)
|
||||
)
|
||||
|
||||
def get_support(self, tx, amount):
|
||||
claim = Transaction(tx[0].raw).outputs[0]
|
||||
return self._make_tx(
|
||||
Output.pay_support_pubkey_hash(
|
||||
amount, claim.claim_name, claim.claim_id, b'abc'
|
||||
)
|
||||
)
|
||||
|
||||
def get_controlling(self):
|
||||
for claim in self.sql.execute("select claim.* from claimtrie natural join claim"):
|
||||
txo = self._txos[claim.txo_hash]
|
||||
controlling = txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height
|
||||
return controlling
|
||||
|
||||
def get_active(self):
|
||||
controlling = self.get_controlling()
|
||||
active = []
|
||||
for claim in self.sql.execute(
|
||||
f"select * from claim where activation_height <= {self._current_height}"):
|
||||
txo = self._txos[claim.txo_hash]
|
||||
if controlling and controlling[0] == txo.claim.stream.title:
|
||||
continue
|
||||
active.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height))
|
||||
return active
|
||||
|
||||
def get_accepted(self):
|
||||
accepted = []
|
||||
for claim in self.sql.execute(
|
||||
f"select * from claim where activation_height > {self._current_height}"):
|
||||
txo = self._txos[claim.txo_hash]
|
||||
accepted.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height))
|
||||
return accepted
|
||||
|
||||
def advance(self, height, txs):
|
||||
self._current_height = height
|
||||
self.sql.advance_txs(height, txs, {'timestamp': 1}, self.daemon_height, self.timer)
|
||||
return [otx[0].outputs[0] for otx in txs]
|
||||
|
||||
def state(self, controlling=None, active=None, accepted=None):
|
||||
self.assertEqual(controlling, self.get_controlling())
|
||||
self.assertEqual(active or [], self.get_active())
|
||||
self.assertEqual(accepted or [], self.get_accepted())
|
||||
|
||||
|
||||
@unittest.skip("port canonical url tests to leveldb") # TODO: port canonical url tests to leveldb
|
||||
class TestClaimtrie(TestSQLDB):
|
||||
|
||||
def test_example_from_spec(self):
|
||||
# https://spec.lbry.com/#claim-activation-example
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
advance(13, [stream])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 10*COIN, 13),
|
||||
active=[],
|
||||
accepted=[]
|
||||
)
|
||||
advance(1001, [self.get_stream('Claim B', 20*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 10*COIN, 13),
|
||||
active=[],
|
||||
accepted=[('Claim B', 20*COIN, 0, 1031)]
|
||||
)
|
||||
advance(1010, [self.get_support(stream, 14*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 24*COIN, 13),
|
||||
active=[],
|
||||
accepted=[('Claim B', 20*COIN, 0, 1031)]
|
||||
)
|
||||
advance(1020, [self.get_stream('Claim C', 50*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 24*COIN, 13),
|
||||
active=[],
|
||||
accepted=[
|
||||
('Claim B', 20*COIN, 0, 1031),
|
||||
('Claim C', 50*COIN, 0, 1051)]
|
||||
)
|
||||
advance(1031, [])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 24*COIN, 13),
|
||||
active=[('Claim B', 20*COIN, 20*COIN, 1031)],
|
||||
accepted=[('Claim C', 50*COIN, 0, 1051)]
|
||||
)
|
||||
advance(1040, [self.get_stream('Claim D', 300*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 24*COIN, 13),
|
||||
active=[('Claim B', 20*COIN, 20*COIN, 1031)],
|
||||
accepted=[
|
||||
('Claim C', 50*COIN, 0, 1051),
|
||||
('Claim D', 300*COIN, 0, 1072)]
|
||||
)
|
||||
advance(1051, [])
|
||||
state(
|
||||
controlling=('Claim D', 300*COIN, 300*COIN, 1051),
|
||||
active=[
|
||||
('Claim A', 10*COIN, 24*COIN, 13),
|
||||
('Claim B', 20*COIN, 20*COIN, 1031),
|
||||
('Claim C', 50*COIN, 50*COIN, 1051)],
|
||||
accepted=[]
|
||||
)
|
||||
# beyond example
|
||||
advance(1052, [self.get_stream_update(stream, 290*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 290*COIN, 304*COIN, 13),
|
||||
active=[
|
||||
('Claim B', 20*COIN, 20*COIN, 1031),
|
||||
('Claim C', 50*COIN, 50*COIN, 1051),
|
||||
('Claim D', 300*COIN, 300*COIN, 1051),
|
||||
],
|
||||
accepted=[]
|
||||
)
|
||||
|
||||
def test_competing_claims_subsequent_blocks_height_wins(self):
|
||||
advance, state = self.advance, self.state
|
||||
advance(13, [self.get_stream('Claim A', 10*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 10*COIN, 13),
|
||||
active=[],
|
||||
accepted=[]
|
||||
)
|
||||
advance(14, [self.get_stream('Claim B', 10*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 10*COIN, 13),
|
||||
active=[('Claim B', 10*COIN, 10*COIN, 14)],
|
||||
accepted=[]
|
||||
)
|
||||
advance(15, [self.get_stream('Claim C', 10*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 10*COIN, 13),
|
||||
active=[
|
||||
('Claim B', 10*COIN, 10*COIN, 14),
|
||||
('Claim C', 10*COIN, 10*COIN, 15)],
|
||||
accepted=[]
|
||||
)
|
||||
|
||||
def test_competing_claims_in_single_block_position_wins(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
stream2 = self.get_stream('Claim B', 10*COIN)
|
||||
advance(13, [stream, stream2])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 10*COIN, 13),
|
||||
active=[('Claim B', 10*COIN, 10*COIN, 13)],
|
||||
accepted=[]
|
||||
)
|
||||
|
||||
def test_competing_claims_in_single_block_effective_amount_wins(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
stream2 = self.get_stream('Claim B', 11*COIN)
|
||||
advance(13, [stream, stream2])
|
||||
state(
|
||||
controlling=('Claim B', 11*COIN, 11*COIN, 13),
|
||||
active=[('Claim A', 10*COIN, 10*COIN, 13)],
|
||||
accepted=[]
|
||||
)
|
||||
|
||||
def test_winning_claim_deleted(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
stream2 = self.get_stream('Claim B', 11*COIN)
|
||||
advance(13, [stream, stream2])
|
||||
state(
|
||||
controlling=('Claim B', 11*COIN, 11*COIN, 13),
|
||||
active=[('Claim A', 10*COIN, 10*COIN, 13)],
|
||||
accepted=[]
|
||||
)
|
||||
advance(14, [self.get_abandon(stream2)])
|
||||
state(
|
||||
controlling=('Claim A', 10*COIN, 10*COIN, 13),
|
||||
active=[],
|
||||
accepted=[]
|
||||
)
|
||||
|
||||
def test_winning_claim_deleted_and_new_claim_becomes_winner(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
stream2 = self.get_stream('Claim B', 11*COIN)
|
||||
advance(13, [stream, stream2])
|
||||
state(
|
||||
controlling=('Claim B', 11*COIN, 11*COIN, 13),
|
||||
active=[('Claim A', 10*COIN, 10*COIN, 13)],
|
||||
accepted=[]
|
||||
)
|
||||
advance(15, [self.get_abandon(stream2), self.get_stream('Claim C', 12*COIN)])
|
||||
state(
|
||||
controlling=('Claim C', 12*COIN, 12*COIN, 15),
|
||||
active=[('Claim A', 10*COIN, 10*COIN, 13)],
|
||||
accepted=[]
|
||||
)
|
||||
|
||||
def test_winning_claim_expires_and_another_takes_over(self):
|
||||
advance, state = self.advance, self.state
|
||||
advance(10, [self.get_stream('Claim A', 11*COIN)])
|
||||
advance(20, [self.get_stream('Claim B', 10*COIN)])
|
||||
state(
|
||||
controlling=('Claim A', 11*COIN, 11*COIN, 10),
|
||||
active=[('Claim B', 10*COIN, 10*COIN, 20)],
|
||||
accepted=[]
|
||||
)
|
||||
advance(262984, [])
|
||||
state(
|
||||
controlling=('Claim B', 10*COIN, 10*COIN, 20),
|
||||
active=[],
|
||||
accepted=[]
|
||||
)
|
||||
advance(262994, [])
|
||||
state(
|
||||
controlling=None,
|
||||
active=[],
|
||||
accepted=[]
|
||||
)
|
||||
|
||||
def test_create_and_update_in_same_block(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
advance(10, [stream, self.get_stream_update(stream, 11*COIN)])
|
||||
self.assertTrue(search()[0])
|
||||
|
||||
def test_double_updates_in_same_block(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
advance(10, [stream])
|
||||
update = self.get_stream_update(stream, 11*COIN)
|
||||
advance(20, [update, self.get_stream_update(update, 9*COIN)])
|
||||
self.assertTrue(search()[0])
|
||||
|
||||
def test_create_and_abandon_in_same_block(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
advance(10, [stream, self.get_abandon(stream)])
|
||||
self.assertFalse(search())
|
||||
|
||||
def test_update_and_abandon_in_same_block(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
advance(10, [stream])
|
||||
update = self.get_stream_update(stream, 11*COIN)
|
||||
advance(20, [update, self.get_abandon(update)])
|
||||
self.assertFalse(search())
|
||||
|
||||
def test_create_update_and_delete_in_same_block(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
update = self.get_stream_update(stream, 11*COIN)
|
||||
advance(10, [stream, update, self.get_abandon(update)])
|
||||
self.assertFalse(search())
|
||||
|
||||
def test_support_added_and_removed_in_same_block(self):
|
||||
advance, state = self.advance, self.state
|
||||
stream = self.get_stream('Claim A', 10*COIN)
|
||||
advance(10, [stream])
|
||||
support = self.get_support(stream, COIN)
|
||||
advance(20, [support, self.get_abandon(support)])
|
||||
self.assertEqual(search()[0]['support_amount'], 0)
|
||||
|
||||
@staticmethod
|
||||
def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs):
|
||||
iterations = cached_iteration+1 if cached_iteration else 100
|
||||
for i in range(cached_iteration or 1, iterations):
|
||||
stream = getter(f'claim #{i}', COIN, **kwargs)
|
||||
if stream[0].outputs[0].claim_id.startswith(prefix):
|
||||
cached_iteration is None and print(f'Found "{prefix}" in {i} iterations.')
|
||||
return stream
|
||||
if cached_iteration:
|
||||
raise ValueError(f'Failed to find "{prefix}" at cached iteration, run with None to find iteration.')
|
||||
raise ValueError(f'Failed to find "{prefix}" in {iterations} iterations, try different values.')
|
||||
|
||||
def get_channel_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs):
|
||||
return self._get_x_with_claim_id_prefix(self.get_channel, prefix, cached_iteration, **kwargs)
|
||||
|
||||
def get_stream_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs):
|
||||
return self._get_x_with_claim_id_prefix(self.get_stream, prefix, cached_iteration, **kwargs)
|
||||
|
||||
def test_canonical_url_and_channel_validation(self):
|
||||
advance = self.advance
|
||||
|
||||
tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c')
|
||||
tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c')
|
||||
txo_chan_a = tx_chan_a[0].outputs[0]
|
||||
txo_chan_ab = tx_chan_ab[0].outputs[0]
|
||||
advance(1, [tx_chan_a])
|
||||
advance(2, [tx_chan_ab])
|
||||
(r_ab, r_a) = search(order_by=['creation_height'], limit=2)
|
||||
self.assertEqual("@foo#a", r_a['short_url'])
|
||||
self.assertEqual("@foo#ab", r_ab['short_url'])
|
||||
self.assertIsNone(r_a['canonical_url'])
|
||||
self.assertIsNone(r_ab['canonical_url'])
|
||||
self.assertEqual(0, r_a['claims_in_channel'])
|
||||
self.assertEqual(0, r_ab['claims_in_channel'])
|
||||
|
||||
tx_a = self.get_stream_with_claim_id_prefix('a', 2)
|
||||
tx_ab = self.get_stream_with_claim_id_prefix('ab', 42)
|
||||
tx_abc = self.get_stream_with_claim_id_prefix('abc', 65)
|
||||
advance(3, [tx_a])
|
||||
advance(4, [tx_ab, tx_abc])
|
||||
(r_abc, r_ab, r_a) = search(order_by=['creation_height', 'tx_position'], limit=3)
|
||||
self.assertEqual("foo#a", r_a['short_url'])
|
||||
self.assertEqual("foo#ab", r_ab['short_url'])
|
||||
self.assertEqual("foo#abc", r_abc['short_url'])
|
||||
self.assertIsNone(r_a['canonical_url'])
|
||||
self.assertIsNone(r_ab['canonical_url'])
|
||||
self.assertIsNone(r_abc['canonical_url'])
|
||||
|
||||
tx_a2 = self.get_stream_with_claim_id_prefix('a', 7, channel=txo_chan_a)
|
||||
tx_ab2 = self.get_stream_with_claim_id_prefix('ab', 23, channel=txo_chan_a)
|
||||
a2_claim = tx_a2[0].outputs[0]
|
||||
ab2_claim = tx_ab2[0].outputs[0]
|
||||
advance(6, [tx_a2])
|
||||
advance(7, [tx_ab2])
|
||||
(r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
|
||||
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
|
||||
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
|
||||
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
|
||||
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
|
||||
self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
|
||||
# change channel public key, invaliding stream claim signatures
|
||||
advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')])
|
||||
(r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
|
||||
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
|
||||
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
|
||||
self.assertIsNone(r_a2['canonical_url'])
|
||||
self.assertIsNone(r_ab2['canonical_url'])
|
||||
self.assertEqual(0, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
|
||||
# reinstate previous channel public key (previous stream claim signatures become valid again)
|
||||
channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c')
|
||||
advance(9, [channel_update])
|
||||
(r_ab2, r_a2) = search(order_by=['creation_height'], limit=2)
|
||||
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
|
||||
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
|
||||
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
|
||||
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
|
||||
self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
|
||||
# change channel of stream
|
||||
self.assertEqual("@foo#a/foo#ab", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
|
||||
tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab)
|
||||
advance(10, [tx_ab2])
|
||||
self.assertEqual("@foo#ab/foo#a", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
|
||||
# TODO: currently there is a bug where stream leaving a channel does not update that channels claims count
|
||||
self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
# TODO: after bug is fixed remove test above and add test below
|
||||
#self.assertEqual(1, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
self.assertEqual(1, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
|
||||
# claim abandon updates claims_in_channel
|
||||
advance(11, [self.get_abandon(tx_ab2)])
|
||||
self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
|
||||
|
||||
# delete channel, invaliding stream claim signatures
|
||||
advance(12, [self.get_abandon(channel_update)])
|
||||
(r_a2,) = search(order_by=['creation_height'], limit=1)
|
||||
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
|
||||
self.assertIsNone(r_a2['canonical_url'])
|
||||
|
||||
def test_resolve_issue_2448(self):
|
||||
advance = self.advance
|
||||
|
||||
tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c')
|
||||
tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c')
|
||||
txo_chan_a = tx_chan_a[0].outputs[0]
|
||||
txo_chan_ab = tx_chan_ab[0].outputs[0]
|
||||
advance(1, [tx_chan_a])
|
||||
advance(2, [tx_chan_ab])
|
||||
|
||||
self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash)
|
||||
self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash)
|
||||
|
||||
# update increase last height change of channel
|
||||
advance(9, [self.get_channel_update(txo_chan_a, COIN, key=b'c')])
|
||||
|
||||
# make sure that activation_height is used instead of height (issue #2448)
|
||||
self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash)
|
||||
self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash)
|
||||
|
||||
def test_canonical_find_shortest_id(self):
|
||||
new_hash = 'abcdef0123456789beef'
|
||||
other0 = '1bcdef0123456789beef'
|
||||
other1 = 'ab1def0123456789beef'
|
||||
other2 = 'abc1ef0123456789beef'
|
||||
other3 = 'abcdef0123456789bee1'
|
||||
f = FindShortestID()
|
||||
f.step(other0, new_hash)
|
||||
self.assertEqual('#a', f.finalize())
|
||||
f.step(other1, new_hash)
|
||||
self.assertEqual('#abc', f.finalize())
|
||||
f.step(other2, new_hash)
|
||||
self.assertEqual('#abcd', f.finalize())
|
||||
f.step(other3, new_hash)
|
||||
self.assertEqual('#abcdef0123456789beef', f.finalize())
|
||||
|
||||
|
||||
@unittest.skip("port trending tests to ES") # TODO: port trending tests to ES
|
||||
class TestTrending(TestSQLDB):
|
||||
|
||||
def test_trending(self):
|
||||
advance, state = self.advance, self.state
|
||||
no_trend = self.get_stream('Claim A', COIN)
|
||||
downwards = self.get_stream('Claim B', COIN)
|
||||
up_small = self.get_stream('Claim C', COIN)
|
||||
up_medium = self.get_stream('Claim D', COIN)
|
||||
up_biggly = self.get_stream('Claim E', COIN)
|
||||
claims = advance(1, [up_biggly, up_medium, up_small, no_trend, downwards])
|
||||
for window in range(1, 8):
|
||||
advance(zscore.TRENDING_WINDOW * window, [
|
||||
self.get_support(downwards, (20-window)*COIN),
|
||||
self.get_support(up_small, int(20+(window/10)*COIN)),
|
||||
self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN),
|
||||
self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN),
|
||||
])
|
||||
results = search(order_by=['trending_local'])
|
||||
self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results])
|
||||
self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results])
|
||||
self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])
|
||||
self.assertEqual([4, 4, 2, 0, 1], [int(c['trending_group']) for c in results])
|
||||
self.assertEqual([53, 38, 2, 0, -6], [int(c['trending_mixed']) for c in results])
|
||||
|
||||
def test_edge(self):
|
||||
problematic = self.get_stream('Problem', COIN)
|
||||
self.advance(1, [problematic])
|
||||
self.advance(zscore.TRENDING_WINDOW, [self.get_support(problematic, 53000000000)])
|
||||
self.advance(zscore.TRENDING_WINDOW * 2, [self.get_support(problematic, 500000000)])
|
||||
|
||||
|
||||
@unittest.skip("filtering/blocking is applied during ES sync, this needs to be ported to integration test")
|
||||
class TestContentBlocking(TestSQLDB):
|
||||
|
||||
def test_blocking_and_filtering(self):
|
||||
# content claims and channels
|
||||
tx0 = self.get_channel('A Channel', COIN, '@channel1')
|
||||
regular_channel = tx0[0].outputs[0]
|
||||
tx1 = self.get_stream('Claim One', COIN, 'claim1')
|
||||
tx2 = self.get_stream('Claim Two', COIN, 'claim2', regular_channel)
|
||||
tx3 = self.get_stream('Claim Three', COIN, 'claim3')
|
||||
self.advance(1, [tx0, tx1, tx2, tx3])
|
||||
claim1, claim2, claim3 = tx1[0].outputs[0], tx2[0].outputs[0], tx3[0].outputs[0]
|
||||
|
||||
# block and filter channels
|
||||
tx0 = self.get_channel('Blocking Channel', COIN, '@block')
|
||||
tx1 = self.get_channel('Filtering Channel', COIN, '@filter')
|
||||
blocking_channel = tx0[0].outputs[0]
|
||||
filtering_channel = tx1[0].outputs[0]
|
||||
self.sql.blocking_channel_hashes.add(blocking_channel.claim_hash)
|
||||
self.sql.filtering_channel_hashes.add(filtering_channel.claim_hash)
|
||||
self.advance(2, [tx0, tx1])
|
||||
self.assertEqual({}, dict(self.sql.blocked_streams))
|
||||
self.assertEqual({}, dict(self.sql.blocked_channels))
|
||||
self.assertEqual({}, dict(self.sql.filtered_streams))
|
||||
self.assertEqual({}, dict(self.sql.filtered_channels))
|
||||
|
||||
# nothing blocked
|
||||
results, _ = reader.resolve([
|
||||
claim1.claim_name, claim2.claim_name,
|
||||
claim3.claim_name, regular_channel.claim_name
|
||||
])
|
||||
self.assertEqual(claim1.claim_hash, results[0]['claim_hash'])
|
||||
self.assertEqual(claim2.claim_hash, results[1]['claim_hash'])
|
||||
self.assertEqual(claim3.claim_hash, results[2]['claim_hash'])
|
||||
self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash'])
|
||||
|
||||
# nothing filtered
|
||||
results, censor = censored_search()
|
||||
self.assertEqual(6, len(results))
|
||||
self.assertEqual(0, censor.total)
|
||||
self.assertEqual({}, censor.censored)
|
||||
|
||||
# block claim reposted to blocking channel, also gets filtered
|
||||
repost_tx1 = self.get_repost(claim1.claim_id, COIN, blocking_channel)
|
||||
repost1 = repost_tx1[0].outputs[0]
|
||||
self.advance(3, [repost_tx1])
|
||||
self.assertEqual(
|
||||
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.blocked_streams)
|
||||
)
|
||||
self.assertEqual({}, dict(self.sql.blocked_channels))
|
||||
self.assertEqual(
|
||||
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.filtered_streams)
|
||||
)
|
||||
self.assertEqual({}, dict(self.sql.filtered_channels))
|
||||
|
||||
# claim is blocked from results by direct repost
|
||||
results, censor = censored_search(text='Claim')
|
||||
self.assertEqual(2, len(results))
|
||||
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
|
||||
self.assertEqual(claim3.claim_hash, results[1]['claim_hash'])
|
||||
self.assertEqual(1, censor.total)
|
||||
self.assertEqual({blocking_channel.claim_hash: 1}, censor.censored)
|
||||
results, _ = reader.resolve([claim1.claim_name])
|
||||
self.assertEqual(
|
||||
f"Resolve of 'claim1' was censored by channel with claim id '{blocking_channel.claim_id}'.",
|
||||
results[0].args[0]
|
||||
)
|
||||
results, _ = reader.resolve([
|
||||
claim2.claim_name, regular_channel.claim_name # claim2 and channel still resolved
|
||||
])
|
||||
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
|
||||
self.assertEqual(regular_channel.claim_hash, results[1]['claim_hash'])
|
||||
|
||||
# block claim indirectly by blocking its parent channel
|
||||
repost_tx2 = self.get_repost(regular_channel.claim_id, COIN, blocking_channel)
|
||||
repost2 = repost_tx2[0].outputs[0]
|
||||
self.advance(4, [repost_tx2])
|
||||
self.assertEqual(
|
||||
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.blocked_streams)
|
||||
)
|
||||
self.assertEqual(
|
||||
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.blocked_channels)
|
||||
)
|
||||
self.assertEqual(
|
||||
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.filtered_streams)
|
||||
)
|
||||
self.assertEqual(
|
||||
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.filtered_channels)
|
||||
)
|
||||
|
||||
# claim in blocked channel is filtered from search and can't resolve
|
||||
results, censor = censored_search(text='Claim')
|
||||
self.assertEqual(1, len(results))
|
||||
self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
|
||||
self.assertEqual(2, censor.total)
|
||||
self.assertEqual({blocking_channel.claim_hash: 2}, censor.censored)
|
||||
results, _ = reader.resolve([
|
||||
claim2.claim_name, regular_channel.claim_name # claim2 and channel don't resolve
|
||||
])
|
||||
self.assertEqual(
|
||||
f"Resolve of 'claim2' was censored by channel with claim id '{blocking_channel.claim_id}'.",
|
||||
results[0].args[0]
|
||||
)
|
||||
self.assertEqual(
|
||||
f"Resolve of '@channel1' was censored by channel with claim id '{blocking_channel.claim_id}'.",
|
||||
results[1].args[0]
|
||||
)
|
||||
results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved
|
||||
self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
|
||||
|
||||
# filtered claim is only filtered and not blocked
|
||||
repost_tx3 = self.get_repost(claim3.claim_id, COIN, filtering_channel)
|
||||
repost3 = repost_tx3[0].outputs[0]
|
||||
self.advance(5, [repost_tx3])
|
||||
self.assertEqual(
|
||||
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.blocked_streams)
|
||||
)
|
||||
self.assertEqual(
|
||||
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.blocked_channels)
|
||||
)
|
||||
self.assertEqual(
|
||||
{repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash,
|
||||
repost3.claim.repost.reference.claim_hash: filtering_channel.claim_hash},
|
||||
dict(self.sql.filtered_streams)
|
||||
)
|
||||
self.assertEqual(
|
||||
{repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash},
|
||||
dict(self.sql.filtered_channels)
|
||||
)
|
||||
|
||||
# filtered claim doesn't return in search but is resolveable
|
||||
results, censor = censored_search(text='Claim')
|
||||
self.assertEqual(0, len(results))
|
||||
self.assertEqual(3, censor.total)
|
||||
self.assertEqual({blocking_channel.claim_hash: 2, filtering_channel.claim_hash: 1}, censor.censored)
|
||||
results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved
|
||||
self.assertEqual(claim3.claim_hash, results[0]['claim_hash'])
|
||||
|
||||
# abandon unblocks content
|
||||
self.advance(6, [
|
||||
self.get_abandon(repost_tx1),
|
||||
self.get_abandon(repost_tx2),
|
||||
self.get_abandon(repost_tx3)
|
||||
])
|
||||
self.assertEqual({}, dict(self.sql.blocked_streams))
|
||||
self.assertEqual({}, dict(self.sql.blocked_channels))
|
||||
self.assertEqual({}, dict(self.sql.filtered_streams))
|
||||
self.assertEqual({}, dict(self.sql.filtered_channels))
|
||||
results, censor = censored_search(text='Claim')
|
||||
self.assertEqual(3, len(results))
|
||||
self.assertEqual(0, censor.total)
|
||||
results, censor = censored_search()
|
||||
self.assertEqual(6, len(results))
|
||||
self.assertEqual(0, censor.total)
|
||||
results, _ = reader.resolve([
|
||||
claim1.claim_name, claim2.claim_name,
|
||||
claim3.claim_name, regular_channel.claim_name
|
||||
])
|
||||
self.assertEqual(claim1.claim_hash, results[0]['claim_hash'])
|
||||
self.assertEqual(claim2.claim_hash, results[1]['claim_hash'])
|
||||
self.assertEqual(claim3.claim_hash, results[2]['claim_hash'])
|
||||
self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash'])
|
||||
|
||||
def test_pagination(self):
|
||||
one, two, three, four, five, six, seven, filter_channel = self.advance(1, [
|
||||
self.get_stream('One', COIN),
|
||||
self.get_stream('Two', COIN),
|
||||
self.get_stream('Three', COIN),
|
||||
self.get_stream('Four', COIN),
|
||||
self.get_stream('Five', COIN),
|
||||
self.get_stream('Six', COIN),
|
||||
self.get_stream('Seven', COIN),
|
||||
self.get_channel('Filtering Channel', COIN, '@filter'),
|
||||
])
|
||||
self.sql.filtering_channel_hashes.add(filter_channel.claim_hash)
|
||||
|
||||
# nothing filtered
|
||||
results, censor = censored_search(order_by='^height', offset=1, limit=3)
|
||||
self.assertEqual(3, len(results))
|
||||
self.assertEqual(
|
||||
[two.claim_hash, three.claim_hash, four.claim_hash],
|
||||
[r['claim_hash'] for r in results]
|
||||
)
|
||||
self.assertEqual(0, censor.total)
|
||||
|
||||
# content filtered
|
||||
repost1, repost2 = self.advance(2, [
|
||||
self.get_repost(one.claim_id, COIN, filter_channel),
|
||||
self.get_repost(two.claim_id, COIN, filter_channel),
|
||||
])
|
||||
results, censor = censored_search(order_by='^height', offset=1, limit=3)
|
||||
self.assertEqual(3, len(results))
|
||||
self.assertEqual(
|
||||
[four.claim_hash, five.claim_hash, six.claim_hash],
|
||||
[r['claim_hash'] for r in results]
|
||||
)
|
||||
self.assertEqual(2, censor.total)
|
||||
self.assertEqual({filter_channel.claim_hash: 2}, censor.censored)
|
Loading…
Reference in a new issue