From 0c7be8975f07b9807a5a94504d6ad602af0eec1d Mon Sep 17 00:00:00 2001 From: "Brendon J. Brewer" Date: Mon, 16 Aug 2021 09:52:40 +1200 Subject: [PATCH] trending --- lbry/wallet/server/block_processor.py | 17 + lbry/wallet/server/db/trending.py | 299 +++++++++++ lbry/wallet/server/db/trending/__init__.py | 9 - lbry/wallet/server/db/trending/ar.py | 265 ---------- .../server/db/trending/variable_decay.py | 485 ------------------ lbry/wallet/server/db/trending/zscore.py | 119 ----- 6 files changed, 316 insertions(+), 878 deletions(-) create mode 100644 lbry/wallet/server/db/trending.py delete mode 100644 lbry/wallet/server/db/trending/__init__.py delete mode 100644 lbry/wallet/server/db/trending/ar.py delete mode 100644 lbry/wallet/server/db/trending/variable_decay.py delete mode 100644 lbry/wallet/server/db/trending/zscore.py diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index a2c9045a1..982ae3713 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -30,6 +30,7 @@ from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivat from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue +from lbry.wallet.server.db.trending import TrendingDB from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack if typing.TYPE_CHECKING: @@ -263,6 +264,8 @@ class BlockProcessor: self.claim_channels: Dict[bytes, bytes] = {} self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list) + self.trending_db = TrendingDB(env.db_dir) + async def claim_producer(self): if self.db.db_height <= 1: return @@ -310,6 +313,7 @@ class BlockProcessor: start = time.perf_counter() await self.run_in_thread(self.advance_block, block) await self.flush() + self.trending_db.process_block(self.height, self.daemon.cached_height()) self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) if self.height == self.coin.nExtendedClaimExpirationForkHeight: self.logger.warning( @@ -514,6 +518,9 @@ class BlockProcessor: self.txo_to_claim[(tx_num, nout)] = pending self.claim_hash_to_txo[claim_hash] = (tx_num, nout) self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops()) + self.trending_db.add_event({"claim_hash": claim_hash, + "event": "upsert", + "lbc": 1E-8*txo.amount}) def _add_support(self, txo: 'Output', tx_num: int, nout: int): supported_claim_hash = txo.claim_hash[::-1] @@ -523,6 +530,9 @@ class BlockProcessor: self.db_op_stack.extend_ops(StagedClaimtrieSupport( supported_claim_hash, tx_num, nout, txo.amount ).get_add_support_utxo_ops()) + self.trending_db.add_event({"claim_hash": supported_claim_hash, + "event": "support", + "lbc": 1E-8*txo.amount}) def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output', spent_claims: typing.Dict[bytes, Tuple[int, int, str]]): @@ -542,6 +552,10 @@ class BlockProcessor: self.db_op_stack.extend_ops(StagedClaimtrieSupport( spent_support, txin_num, txin.prev_idx, support_amount ).get_spend_support_txo_ops()) + self.trending_db.add_event({"claim_hash": spent_support, + "event": "support", + "lbc": -1E-8*support_amount}) + spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) if spent_support: supported_name = self._get_pending_claim_name(spent_support) @@ -619,6 +633,9 @@ class BlockProcessor: if normalized_name.startswith('@'): # abandon a channel, invalidate signatures self._invalidate_channel_signatures(claim_hash) + self.trending_db.add_event({"claim_hash": claim_hash, + "event": "delete"}) + def _invalidate_channel_signatures(self, claim_hash: bytes): for k, signed_claim_hash in self.db.db.iterator( prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)): diff --git a/lbry/wallet/server/db/trending.py b/lbry/wallet/server/db/trending.py new file mode 100644 index 000000000..5c2aef513 --- /dev/null +++ b/lbry/wallet/server/db/trending.py @@ -0,0 +1,299 @@ +import math +import os +import sqlite3 +import time + + +HALF_LIFE = 400 +RENORM_INTERVAL = 1000 +WHALE_THRESHOLD = 10000.0 + +def whale_decay_factor(lbc): + """ + An additional decay factor applied to whale claims. + """ + if lbc <= WHALE_THRESHOLD: + return 1.0 + adjusted_half_life = HALF_LIFE/(math.log10(lbc/WHALE_THRESHOLD) + 1.0) + return 2.0**(1.0/HALF_LIFE - 1.0/adjusted_half_life) + + +def soften(lbc): + mag = abs(lbc) + 1E-8 + sign = 1.0 if lbc >= 0.0 else -1.0 + return sign*mag**0.25 + +def delay(lbc: int): + if lbc <= 0: + return 0 + elif lbc < 1000000: + return int(lbc**0.5) + else: + return 1000 + + +def inflate_units(height): + blocks = height % RENORM_INTERVAL + return 2.0 ** (blocks/HALF_LIFE) + + +PRAGMAS = ["PRAGMA FOREIGN_KEYS = OFF;", + "PRAGMA JOURNAL_MODE = WAL;", + "PRAGMA SYNCHRONOUS = 0;"] + + +class TrendingDB: + + def __init__(self, data_dir): + """ + Opens the trending database in the directory data_dir. + For testing, pass data_dir=":memory:" + """ + if data_dir == ":memory:": + path = ":memory:" + else: + path = os.path.join(data_dir, "trending.db") + self.db = sqlite3.connect(path, check_same_thread=False) + + for pragma in PRAGMAS: + self.execute(pragma) + self.execute("BEGIN;") + self._create_tables() + self._create_indices() + self.execute("COMMIT;") + self.pending_events = [] + + def execute(self, *args, **kwargs): + return self.db.execute(*args, **kwargs) + + def add_event(self, event): + self.pending_events.append(event) +# print(f"Added event: {event}.", flush=True) + + + def _create_tables(self): + + self.execute("""CREATE TABLE IF NOT EXISTS claims + (claim_hash BYTES NOT NULL PRIMARY KEY, + bid_lbc REAL NOT NULL, + support_lbc REAL NOT NULL, + trending_score REAL NOT NULL, + needs_write BOOLEAN NOT NULL) + WITHOUT ROWID;""") + + self.execute("""CREATE TABLE IF NOT EXISTS spikes + (claim_hash BYTES NOT NULL REFERENCES claims (claim_hash), + activation_height INTEGER NOT NULL, + mass REAL NOT NULL);""") + + + def _create_indices(self): + self.execute("CREATE INDEX IF NOT EXISTS idx1 ON spikes\ + (activation_height, claim_hash, mass);") + self.execute("CREATE INDEX IF NOT EXISTS idx2 ON spikes\ + (claim_hash);") + self.execute("CREATE INDEX IF NOT EXISTS idx3 ON claims (trending_score);") + self.execute("CREATE INDEX IF NOT EXISTS idx4 ON claims (needs_write, claim_hash);") + self.execute("CREATE INDEX IF NOT EXISTS idx5 ON claims (bid_lbc + support_lbc);") + + def get_trending_score(self, claim_hash): + result = self.execute("SELECT trending_score FROM claims\ + WHERE claim_hash = ?;", (claim_hash, ))\ + .fetchall() + if len(result) == 0: + return 0.0 + else: + return result[0] + + def _upsert_claim(self, height, event): + + claim_hash = event["claim_hash"] + + # Get old total lbc value of claim + old_lbc_pair = self.execute("SELECT bid_lbc, support_lbc FROM claims\ + WHERE claim_hash = ?;", + (claim_hash, )).fetchone() + if old_lbc_pair is None: + old_lbc_pair = (0.0, 0.0) + + if event["event"] == "upsert": + new_lbc_pair = (event["lbc"], old_lbc_pair[1]) + elif event["event"] == "support": + new_lbc_pair = (old_lbc_pair[0], old_lbc_pair[1] + event["lbc"]) + + # Upsert the claim + self.execute("INSERT INTO claims VALUES (?, ?, ?, ?, 1)\ + ON CONFLICT (claim_hash) DO UPDATE\ + SET bid_lbc = excluded.bid_lbc,\ + support_lbc = excluded.support_lbc;", + (claim_hash, new_lbc_pair[0], new_lbc_pair[1], 0.0)) + + if self.active: + old_lbc, lbc = sum(old_lbc_pair), sum(new_lbc_pair) + + # Add the spike + softened_change = soften(lbc - old_lbc) + change_in_softened = soften(lbc) - soften(old_lbc) + spike_mass = (softened_change**0.25*change_in_softened**0.75).real + activation_height = height + delay(lbc) + if spike_mass != 0.0: + self.execute("INSERT INTO spikes VALUES (?, ?, ?);", + (claim_hash, activation_height, spike_mass)) + + def _delete_claim(self, claim_hash): + self.execute("DELETE FROM spikes WHERE claim_hash = ?;", (claim_hash, )) + self.execute("DELETE FROM claims WHERE claim_hash = ?;", (claim_hash, )) + + + def _apply_spikes(self, height): + spikes = self.execute("SELECT claim_hash, mass FROM spikes\ + WHERE activation_height = ?;", + (height, )).fetchall() + for claim_hash, mass in spikes: # TODO: executemany for efficiency + self.execute("UPDATE claims SET trending_score = trending_score + ?,\ + needs_write = 1\ + WHERE claim_hash = ?;", + (mass, claim_hash)) + self.execute("DELETE FROM spikes WHERE activation_height = ?;", + (height, )) + + def _decay_whales(self): + + whales = self.execute("SELECT claim_hash, bid_lbc + support_lbc FROM claims\ + WHERE bid_lbc + support_lbc >= ?;", (WHALE_THRESHOLD, ))\ + .fetchall() + for claim_hash, lbc in whales: + factor = whale_decay_factor(lbc) + self.execute("UPDATE claims SET trending_score = trending_score*?, needs_write = 1\ + WHERE claim_hash = ?;", (factor, claim_hash)) + + + def _renorm(self): + factor = 2.0**(-RENORM_INTERVAL/HALF_LIFE) + + # Zero small values + self.execute("UPDATE claims SET trending_score = 0.0, needs_write = 1\ + WHERE trending_score <> 0.0 AND ABS(?*trending_score) < 1E-6;", + (factor, )) + + # Normalise other values + self.execute("UPDATE claims SET trending_score = ?*trending_score, needs_write = 1\ + WHERE trending_score <> 0.0;", (factor, )) + + + def process_block(self, height, daemon_height): + + self.active = daemon_height - height <= 10*HALF_LIFE + + self.execute("BEGIN;") + + if self.active: + + # Check for a unit change + if height % RENORM_INTERVAL == 0: + self._renorm() + + # Apply extra whale decay + self._decay_whales() + + # Upsert claims + for event in self.pending_events: + if event["event"] == "upsert": + self._upsert_claim(height, event) + + # Process supports + for event in self.pending_events: + if event["event"] == "support": + self._upsert_claim(height, event) + + # Delete claims + for event in self.pending_events: + if event["event"] == "delete": + self._delete_claim(event["claim_hash"]) + + if self.active: + # Apply spikes + self._apply_spikes(height) + + # Get set of claims that need writing to ES + claims_to_write = set() + for row in self.db.execute("SELECT claim_hash FROM claims WHERE\ + needs_write = 1;"): + claims_to_write.add(row[0]) + self.db.execute("UPDATE claims SET needs_write = 0\ + WHERE needs_write = 1;") + + self.execute("COMMIT;") + + self.pending_events.clear() + + return claims_to_write + + +if __name__ == "__main__": + import matplotlib.pyplot as plt + import numpy as np + import numpy.random as rng + import os + + trending_db = TrendingDB(":memory:") + + heights = list(range(1, 1000)) + heights = heights + heights[::-1] + heights + + events = [{"height": 45, + "what": dict(claim_hash="a", event="upsert", lbc=1.0)}, + {"height": 100, + "what": dict(claim_hash="a", event="support", lbc=3.0)}, + {"height": 150, + "what": dict(claim_hash="a", event="support", lbc=-3.0)}, + {"height": 170, + "what": dict(claim_hash="a", event="upsert", lbc=100000.0)}, + {"height": 730, + "what": dict(claim_hash="a", event="delete")}] + inverse_events = [{"height": 730, + "what": dict(claim_hash="a", event="upsert", lbc=100000.0)}, + {"height": 170, + "what": dict(claim_hash="a", event="upsert", lbc=1.0)}, + {"height": 150, + "what": dict(claim_hash="a", event="support", lbc=3.0)}, + {"height": 100, + "what": dict(claim_hash="a", event="support", lbc=-3.0)}, + {"height": 45, + "what": dict(claim_hash="a", event="delete")}] + + + xs, ys = [], [] + last_height = 0 + for height in heights: + + # Prepare the changes + if height > last_height: + es = events + else: + es = inverse_events + + for event in es: + if event["height"] == height: + trending_db.add_event(event["what"]) + + # Process the block + trending_db.process_block(height, height) + + if height > last_height: # Only plot when moving forward + xs.append(height) + y = trending_db.execute("SELECT trending_score FROM claims;").fetchone() + y = 0.0 if y is None else y[0] + ys.append(y/inflate_units(height)) + + last_height = height + + xs = np.array(xs) + ys = np.array(ys) + + plt.figure(1) + plt.plot(xs, ys, "o-", alpha=0.2) + + plt.figure(2) + plt.plot(xs) + plt.show() diff --git a/lbry/wallet/server/db/trending/__init__.py b/lbry/wallet/server/db/trending/__init__.py deleted file mode 100644 index 86d94bdc3..000000000 --- a/lbry/wallet/server/db/trending/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from . import zscore -from . import ar -from . import variable_decay - -TRENDING_ALGORITHMS = { - 'zscore': zscore, - 'ar': ar, - 'variable_decay': variable_decay -} diff --git a/lbry/wallet/server/db/trending/ar.py b/lbry/wallet/server/db/trending/ar.py deleted file mode 100644 index 2e7b3474f..000000000 --- a/lbry/wallet/server/db/trending/ar.py +++ /dev/null @@ -1,265 +0,0 @@ -import copy -import math -import time - -# Half life in blocks -HALF_LIFE = 134 - -# Decay coefficient per block -DECAY = 0.5**(1.0/HALF_LIFE) - -# How frequently to write trending values to the db -SAVE_INTERVAL = 10 - -# Renormalisation interval -RENORM_INTERVAL = 1000 - -# Assertion -assert RENORM_INTERVAL % SAVE_INTERVAL == 0 - -# Decay coefficient per renormalisation interval -DECAY_PER_RENORM = DECAY**(RENORM_INTERVAL) - -# Log trending calculations? -TRENDING_LOG = True - - -def install(connection): - """ - Install the AR trending algorithm. - """ - check_trending_values(connection) - - if TRENDING_LOG: - f = open("trending_ar.log", "a") - f.close() - -# Stub -CREATE_TREND_TABLE = "" - - -def check_trending_values(connection): - """ - If the trending values appear to be based on the zscore algorithm, - reset them. This will allow resyncing from a standard snapshot. - """ - c = connection.cursor() - needs_reset = False - for row in c.execute("SELECT COUNT(*) num FROM claim WHERE trending_global <> 0;"): - if row[0] != 0: - needs_reset = True - break - - if needs_reset: - print("Resetting some columns. This might take a while...", flush=True, end="") - c.execute(""" BEGIN; - UPDATE claim SET trending_group = 0; - UPDATE claim SET trending_mixed = 0; - UPDATE claim SET trending_global = 0; - UPDATE claim SET trending_local = 0; - COMMIT;""") - print("done.") - - -def spike_height(trending_score, x, x_old, time_boost=1.0): - """ - Compute the size of a trending spike. - """ - - # Change in softened amount - change_in_softened_amount = x**0.25 - x_old**0.25 - - # Softened change in amount - delta = x - x_old - softened_change_in_amount = abs(delta)**0.25 - - # Softened change in amount counts more for minnows - if delta > 0.0: - if trending_score >= 0.0: - multiplier = 0.1/((trending_score/time_boost + softened_change_in_amount) + 1.0) - softened_change_in_amount *= multiplier - else: - softened_change_in_amount *= -1.0 - - return time_boost*(softened_change_in_amount + change_in_softened_amount) - - -def get_time_boost(height): - """ - Return the time boost at a given height. - """ - return 1.0/DECAY**(height % RENORM_INTERVAL) - - -def trending_log(s): - """ - Log a string. - """ - if TRENDING_LOG: - fout = open("trending_ar.log", "a") - fout.write(s) - fout.flush() - fout.close() - -class TrendingData: - """ - An object of this class holds trending data - """ - def __init__(self): - self.claims = {} - - # Have all claims been read from db yet? - self.initialised = False - - def insert_claim_from_load(self, claim_hash, trending_score, total_amount): - assert not self.initialised - self.claims[claim_hash] = {"trending_score": trending_score, - "total_amount": total_amount, - "changed": False} - - - def update_claim(self, claim_hash, total_amount, time_boost=1.0): - """ - Update trending data for a claim, given its new total amount. - """ - assert self.initialised - - # Extract existing total amount and trending score - # or use starting values if the claim is new - if claim_hash in self.claims: - old_state = copy.deepcopy(self.claims[claim_hash]) - else: - old_state = {"trending_score": 0.0, - "total_amount": 0.0, - "changed": False} - - # Calculate LBC change - change = total_amount - old_state["total_amount"] - - # Modify data if there was an LBC change - if change != 0.0: - spike = spike_height(old_state["trending_score"], - total_amount, - old_state["total_amount"], - time_boost) - trending_score = old_state["trending_score"] + spike - self.claims[claim_hash] = {"total_amount": total_amount, - "trending_score": trending_score, - "changed": True} - - - -def test_trending(): - """ - Quick trending test for something receiving 10 LBC per block - """ - data = TrendingData() - data.insert_claim_from_load("abc", 10.0, 1.0) - data.initialised = True - - for height in range(1, 5000): - - if height % RENORM_INTERVAL == 0: - data.claims["abc"]["trending_score"] *= DECAY_PER_RENORM - - time_boost = get_time_boost(height) - data.update_claim("abc", data.claims["abc"]["total_amount"] + 10.0, - time_boost=time_boost) - - - print(str(height) + " " + str(time_boost) + " " \ - + str(data.claims["abc"]["trending_score"])) - - - -# One global instance -# pylint: disable=C0103 -trending_data = TrendingData() - -def run(db, height, final_height, recalculate_claim_hashes): - - if height < final_height - 5*HALF_LIFE: - trending_log("Skipping AR trending at block {h}.\n".format(h=height)) - return - - start = time.time() - - trending_log("Calculating AR trending at block {h}.\n".format(h=height)) - trending_log(" Length of trending data = {l}.\n"\ - .format(l=len(trending_data.claims))) - - # Renormalise trending scores and mark all as having changed - if height % RENORM_INTERVAL == 0: - trending_log(" Renormalising trending scores...") - - keys = trending_data.claims.keys() - for key in keys: - if trending_data.claims[key]["trending_score"] != 0.0: - trending_data.claims[key]["trending_score"] *= DECAY_PER_RENORM - trending_data.claims[key]["changed"] = True - - # Tiny becomes zero - if abs(trending_data.claims[key]["trending_score"]) < 1E-9: - trending_data.claims[key]["trending_score"] = 0.0 - - trending_log("done.\n") - - - # Regular message. - trending_log(" Reading total_amounts from db and updating"\ - + " trending scores in RAM...") - - # Get the value of the time boost - time_boost = get_time_boost(height) - - # Update claims from db - if not trending_data.initialised: - # On fresh launch - for row in db.execute(""" - SELECT claim_hash, trending_mixed, - (amount + support_amount) - AS total_amount - FROM claim; - """): - trending_data.insert_claim_from_load(row[0], row[1], 1E-8*row[2]) - trending_data.initialised = True - else: - for row in db.execute(f""" - SELECT claim_hash, - (amount + support_amount) - AS total_amount - FROM claim - WHERE claim_hash IN - ({','.join('?' for _ in recalculate_claim_hashes)}); - """, list(recalculate_claim_hashes)): - trending_data.update_claim(row[0], 1E-8*row[1], time_boost) - - trending_log("done.\n") - - - # Write trending scores to DB - if height % SAVE_INTERVAL == 0: - - trending_log(" Writing trending scores to db...") - - the_list = [] - keys = trending_data.claims.keys() - for key in keys: - if trending_data.claims[key]["changed"]: - the_list.append((trending_data.claims[key]["trending_score"], - key)) - trending_data.claims[key]["changed"] = False - - trending_log("{n} scores to write...".format(n=len(the_list))) - - db.executemany("UPDATE claim SET trending_mixed=? WHERE claim_hash=?;", - the_list) - - trending_log("done.\n") - - trending_log("Trending operations took {time} seconds.\n\n"\ - .format(time=time.time() - start)) - - -if __name__ == "__main__": - test_trending() diff --git a/lbry/wallet/server/db/trending/variable_decay.py b/lbry/wallet/server/db/trending/variable_decay.py deleted file mode 100644 index d900920a0..000000000 --- a/lbry/wallet/server/db/trending/variable_decay.py +++ /dev/null @@ -1,485 +0,0 @@ -""" -AR-like trending with a delayed effect and a faster -decay rate for high valued claims. -""" - -import math -import time -import sqlite3 - -# Half life in blocks *for lower LBC claims* (it's shorter for whale claims) -HALF_LIFE = 200 - -# Whale threshold, in LBC (higher -> less DB writing) -WHALE_THRESHOLD = 10000.0 - -# Decay coefficient per block -DECAY = 0.5**(1.0/HALF_LIFE) - -# How frequently to write trending values to the db -SAVE_INTERVAL = 10 - -# Renormalisation interval -RENORM_INTERVAL = 1000 - -# Assertion -assert RENORM_INTERVAL % SAVE_INTERVAL == 0 - -# Decay coefficient per renormalisation interval -DECAY_PER_RENORM = DECAY**(RENORM_INTERVAL) - -# Log trending calculations? -TRENDING_LOG = True - - -def install(connection): - """ - Install the trending algorithm. - """ - check_trending_values(connection) - trending_data.initialise(connection.cursor()) - - if TRENDING_LOG: - f = open("trending_variable_decay.log", "a") - f.close() - -# Stub -CREATE_TREND_TABLE = "" - -def check_trending_values(connection): - """ - If the trending values appear to be based on the zscore algorithm, - reset them. This will allow resyncing from a standard snapshot. - """ - c = connection.cursor() - needs_reset = False - for row in c.execute("SELECT COUNT(*) num FROM claim WHERE trending_global <> 0;"): - if row[0] != 0: - needs_reset = True - break - - if needs_reset: - print("Resetting some columns. This might take a while...", flush=True, - end="") - c.execute(""" BEGIN; - UPDATE claim SET trending_group = 0; - UPDATE claim SET trending_mixed = 0; - COMMIT;""") - print("done.") - - - - -def trending_log(s): - """ - Log a string to the log file - """ - if TRENDING_LOG: - fout = open("trending_variable_decay.log", "a") - fout.write(s) - fout.flush() - fout.close() - - -def trending_unit(height): - """ - Return the trending score unit at a given height. - """ - # Round to the beginning of a SAVE_INTERVAL batch of blocks. - _height = height - (height % SAVE_INTERVAL) - return 1.0/DECAY**(height % RENORM_INTERVAL) - - -class TrendingDB: - """ - An in-memory database of trending scores - """ - - def __init__(self): - self.conn = sqlite3.connect(":memory:", check_same_thread=False) - self.cursor = self.conn.cursor() - self.initialised = False - self.write_needed = set() - - def execute(self, query, *args, **kwargs): - return self.conn.execute(query, *args, **kwargs) - - def executemany(self, query, *args, **kwargs): - return self.conn.executemany(query, *args, **kwargs) - - def begin(self): - self.execute("BEGIN;") - - def commit(self): - self.execute("COMMIT;") - - def initialise(self, db): - """ - Pass in claims.db - """ - if self.initialised: - return - - trending_log("Initialising trending database...") - - # The need for speed - self.execute("PRAGMA JOURNAL_MODE=OFF;") - self.execute("PRAGMA SYNCHRONOUS=0;") - - self.begin() - - # Create the tables - self.execute(""" - CREATE TABLE IF NOT EXISTS claims - (claim_hash BYTES PRIMARY KEY, - lbc REAL NOT NULL DEFAULT 0.0, - trending_score REAL NOT NULL DEFAULT 0.0) - WITHOUT ROWID;""") - - self.execute(""" - CREATE TABLE IF NOT EXISTS spikes - (id INTEGER PRIMARY KEY, - claim_hash BYTES NOT NULL, - height INTEGER NOT NULL, - mass REAL NOT NULL, - FOREIGN KEY (claim_hash) - REFERENCES claims (claim_hash));""") - - # Clear out any existing data - self.execute("DELETE FROM claims;") - self.execute("DELETE FROM spikes;") - - # Create indexes - self.execute("CREATE INDEX idx1 ON spikes (claim_hash, height, mass);") - self.execute("CREATE INDEX idx2 ON spikes (claim_hash, height, mass DESC);") - self.execute("CREATE INDEX idx3 on claims (lbc DESC, claim_hash, trending_score);") - - # Import data from claims.db - for row in db.execute(""" - SELECT claim_hash, - 1E-8*(amount + support_amount) AS lbc, - trending_mixed - FROM claim; - """): - self.execute("INSERT INTO claims VALUES (?, ?, ?);", row) - self.commit() - - self.initialised = True - trending_log("done.\n") - - def apply_spikes(self, height): - """ - Apply spikes that are due. This occurs inside a transaction. - """ - - spikes = [] - unit = trending_unit(height) - for row in self.execute(""" - SELECT SUM(mass), claim_hash FROM spikes - WHERE height = ? - GROUP BY claim_hash; - """, (height, )): - spikes.append((row[0]*unit, row[1])) - self.write_needed.add(row[1]) - - self.executemany(""" - UPDATE claims - SET trending_score = (trending_score + ?) - WHERE claim_hash = ?; - """, spikes) - self.execute("DELETE FROM spikes WHERE height = ?;", (height, )) - - - def decay_whales(self, height): - """ - Occurs inside transaction. - """ - if height % SAVE_INTERVAL != 0: - return - - whales = self.execute(""" - SELECT trending_score, lbc, claim_hash - FROM claims - WHERE lbc >= ?; - """, (WHALE_THRESHOLD, )).fetchall() - whales2 = [] - for whale in whales: - trending, lbc, claim_hash = whale - - # Overall multiplication factor for decay rate - # At WHALE_THRESHOLD, this is 1 - # At 10*WHALE_THRESHOLD, it is 3 - decay_rate_factor = 1.0 + 2.0*math.log10(lbc/WHALE_THRESHOLD) - - # The -1 is because this is just the *extra* part being applied - factor = (DECAY**SAVE_INTERVAL)**(decay_rate_factor - 1.0) - - # Decay - trending *= factor - whales2.append((trending, claim_hash)) - self.write_needed.add(claim_hash) - - self.executemany("UPDATE claims SET trending_score=? WHERE claim_hash=?;", - whales2) - - - def renorm(self, height): - """ - Renormalise trending scores. Occurs inside a transaction. - """ - - if height % RENORM_INTERVAL == 0: - threshold = 1.0E-3/DECAY_PER_RENORM - for row in self.execute("""SELECT claim_hash FROM claims - WHERE ABS(trending_score) >= ?;""", - (threshold, )): - self.write_needed.add(row[0]) - - self.execute("""UPDATE claims SET trending_score = ?*trending_score - WHERE ABS(trending_score) >= ?;""", - (DECAY_PER_RENORM, threshold)) - - def write_to_claims_db(self, db, height): - """ - Write changed trending scores to claims.db. - """ - if height % SAVE_INTERVAL != 0: - return - - rows = self.execute(f""" - SELECT trending_score, claim_hash - FROM claims - WHERE claim_hash IN - ({','.join('?' for _ in self.write_needed)}); - """, list(self.write_needed)).fetchall() - - db.executemany("""UPDATE claim SET trending_mixed = ? - WHERE claim_hash = ?;""", rows) - - # Clear list of claims needing to be written to claims.db - self.write_needed = set() - - - def update(self, db, height, recalculate_claim_hashes): - """ - Update trending scores. - Input is a cursor to claims.db, the block height, and the list of - claims that changed. - """ - assert self.initialised - - self.begin() - self.renorm(height) - - # Fetch changed/new claims from claims.db - for row in db.execute(f""" - SELECT claim_hash, - 1E-8*(amount + support_amount) AS lbc - FROM claim - WHERE claim_hash IN - ({','.join('?' for _ in recalculate_claim_hashes)}); - """, list(recalculate_claim_hashes)): - claim_hash, lbc = row - - # Insert into trending db if it does not exist - self.execute(""" - INSERT INTO claims (claim_hash) - VALUES (?) - ON CONFLICT (claim_hash) DO NOTHING;""", - (claim_hash, )) - - # See if it was an LBC change - old = self.execute("SELECT * FROM claims WHERE claim_hash=?;", - (claim_hash, )).fetchone() - lbc_old = old[1] - - # Save new LBC value into trending db - self.execute("UPDATE claims SET lbc = ? WHERE claim_hash = ?;", - (lbc, claim_hash)) - - if lbc > lbc_old: - - # Schedule a future spike - delay = min(int((lbc + 1E-8)**0.4), HALF_LIFE) - spike = (claim_hash, height + delay, spike_mass(lbc, lbc_old)) - self.execute("""INSERT INTO spikes - (claim_hash, height, mass) - VALUES (?, ?, ?);""", spike) - - elif lbc < lbc_old: - - # Subtract from future spikes - penalty = spike_mass(lbc_old, lbc) - spikes = self.execute(""" - SELECT * FROM spikes - WHERE claim_hash = ? - ORDER BY height ASC, mass DESC; - """, (claim_hash, )).fetchall() - for spike in spikes: - spike_id, mass = spike[0], spike[3] - - if mass > penalty: - # The entire penalty merely reduces this spike - self.execute("UPDATE spikes SET mass=? WHERE id=?;", - (mass - penalty, spike_id)) - penalty = 0.0 - else: - # Removing this spike entirely accounts for some (or - # all) of the penalty, then move on to other spikes - self.execute("DELETE FROM spikes WHERE id=?;", - (spike_id, )) - penalty -= mass - - # If penalty remains, that's a negative spike to be applied - # immediately. - if penalty > 0.0: - self.execute(""" - INSERT INTO spikes (claim_hash, height, mass) - VALUES (?, ?, ?);""", - (claim_hash, height, -penalty)) - - self.apply_spikes(height) - self.decay_whales(height) - self.commit() - - self.write_to_claims_db(db, height) - - - - - -# The "global" instance to work with -# pylint: disable=C0103 -trending_data = TrendingDB() - -def spike_mass(x, x_old): - """ - Compute the mass of a trending spike (normed - constant units). - x_old = old LBC value - x = new LBC value - """ - - # Sign of trending spike - sign = 1.0 - if x < x_old: - sign = -1.0 - - # Magnitude - mag = abs(x**0.25 - x_old**0.25) - - # Minnow boost - mag *= 1.0 + 2E4/(x + 100.0)**2 - - return sign*mag - - -def run(db, height, final_height, recalculate_claim_hashes): - if height < final_height - 5*HALF_LIFE: - trending_log(f"Skipping trending calculations at block {height}.\n") - return - - start = time.time() - trending_log(f"Calculating variable_decay trending at block {height}.\n") - trending_data.update(db, height, recalculate_claim_hashes) - end = time.time() - trending_log(f"Trending operations took {end - start} seconds.\n\n") - -def test_trending(): - """ - Quick trending test for claims with different support patterns. - Actually use the run() function. - """ - - # Create a fake "claims.db" for testing - # pylint: disable=I1101 - dbc = apsw.Connection(":memory:") - db = dbc.cursor() - - # Create table - db.execute(""" - BEGIN; - CREATE TABLE claim (claim_hash TEXT PRIMARY KEY, - amount REAL NOT NULL DEFAULT 0.0, - support_amount REAL NOT NULL DEFAULT 0.0, - trending_mixed REAL NOT NULL DEFAULT 0.0); - COMMIT; - """) - - # Initialise trending data before anything happens with the claims - trending_data.initialise(db) - - # Insert initial states of claims - everything = {"huge_whale": 0.01, "medium_whale": 0.01, "small_whale": 0.01, - "huge_whale_botted": 0.01, "minnow": 0.01} - - def to_list_of_tuples(stuff): - l = [] - for key in stuff: - l.append((key, stuff[key])) - return l - - db.executemany(""" - INSERT INTO claim (claim_hash, amount) VALUES (?, 1E8*?); - """, to_list_of_tuples(everything)) - - # Process block zero - height = 0 - run(db, height, height, everything.keys()) - - # Save trajectories for plotting - trajectories = {} - for row in trending_data.execute(""" - SELECT claim_hash, trending_score - FROM claims; - """): - trajectories[row[0]] = [row[1]/trending_unit(height)] - - # Main loop - for height in range(1, 1000): - - # One-off supports - if height == 1: - everything["huge_whale"] += 5E5 - everything["medium_whale"] += 5E4 - everything["small_whale"] += 5E3 - - # Every block - if height < 500: - everything["huge_whale_botted"] += 5E5/500 - everything["minnow"] += 1 - - # Remove supports - if height == 500: - for key in everything: - everything[key] = 0.01 - - # Whack into the db - db.executemany(""" - UPDATE claim SET amount = 1E8*? WHERE claim_hash = ?; - """, [(y, x) for (x, y) in to_list_of_tuples(everything)]) - - # Call run() - run(db, height, height, everything.keys()) - - # Append current trending scores to trajectories - for row in db.execute(""" - SELECT claim_hash, trending_mixed - FROM claim; - """): - trajectories[row[0]].append(row[1]/trending_unit(height)) - - dbc.close() - - # pylint: disable=C0415 - import matplotlib.pyplot as plt - for key in trajectories: - plt.plot(trajectories[key], label=key) - plt.legend() - plt.show() - - - - - -if __name__ == "__main__": - test_trending() diff --git a/lbry/wallet/server/db/trending/zscore.py b/lbry/wallet/server/db/trending/zscore.py deleted file mode 100644 index ff442fdec..000000000 --- a/lbry/wallet/server/db/trending/zscore.py +++ /dev/null @@ -1,119 +0,0 @@ -from math import sqrt - -# TRENDING_WINDOW is the number of blocks in ~6hr period (21600 seconds / 161 seconds per block) -TRENDING_WINDOW = 134 - -# TRENDING_DATA_POINTS says how many samples to use for the trending algorithm -# i.e. only consider claims from the most recent (TRENDING_WINDOW * TRENDING_DATA_POINTS) blocks -TRENDING_DATA_POINTS = 28 - -CREATE_TREND_TABLE = """ - create table if not exists trend ( - claim_hash bytes not null, - height integer not null, - amount integer not null, - primary key (claim_hash, height) - ) without rowid; -""" - - -class ZScore: - __slots__ = 'count', 'total', 'power', 'last' - - def __init__(self): - self.count = 0 - self.total = 0 - self.power = 0 - self.last = None - - def step(self, value): - if self.last is not None: - self.count += 1 - self.total += self.last - self.power += self.last ** 2 - self.last = value - - @property - def mean(self): - return self.total / self.count - - @property - def standard_deviation(self): - value = (self.power / self.count) - self.mean ** 2 - return sqrt(value) if value > 0 else 0 - - def finalize(self): - if self.count == 0: - return self.last - return (self.last - self.mean) / (self.standard_deviation or 1) - - -def install(connection): - connection.create_aggregate("zscore", 1, ZScore) - connection.executescript(CREATE_TREND_TABLE) - - -def run(db, height, final_height, affected_claims): - # don't start tracking until we're at the end of initial sync - if height < (final_height - (TRENDING_WINDOW * TRENDING_DATA_POINTS)): - return - - if height % TRENDING_WINDOW != 0: - return - - db.execute(f""" - DELETE FROM trend WHERE height < {height - (TRENDING_WINDOW * TRENDING_DATA_POINTS)} - """) - - start = (height - TRENDING_WINDOW) + 1 - db.execute(f""" - INSERT OR IGNORE INTO trend (claim_hash, height, amount) - SELECT claim_hash, {start}, COALESCE( - (SELECT SUM(amount) FROM support WHERE claim_hash=claim.claim_hash - AND height >= {start}), 0 - ) AS support_sum - FROM claim WHERE support_sum > 0 - """) - - zscore = ZScore() - for global_sum in db.execute("SELECT AVG(amount) AS avg_amount FROM trend GROUP BY height"): - zscore.step(global_sum.avg_amount) - global_mean, global_deviation = 0, 1 - if zscore.count > 0: - global_mean = zscore.mean - global_deviation = zscore.standard_deviation - - db.execute(f""" - UPDATE claim SET - trending_local = COALESCE(( - SELECT zscore(amount) FROM trend - WHERE claim_hash=claim.claim_hash ORDER BY height DESC - ), 0), - trending_global = COALESCE(( - SELECT (amount - {global_mean}) / {global_deviation} FROM trend - WHERE claim_hash=claim.claim_hash AND height = {start} - ), 0), - trending_group = 0, - trending_mixed = 0 - """) - - # trending_group and trending_mixed determine how trending will show in query results - # normally the SQL will be: "ORDER BY trending_group, trending_mixed" - # changing the trending_group will have significant impact on trending results - # changing the value used for trending_mixed will only impact trending within a trending_group - db.execute(f""" - UPDATE claim SET - trending_group = CASE - WHEN trending_local > 0 AND trending_global > 0 THEN 4 - WHEN trending_local <= 0 AND trending_global > 0 THEN 3 - WHEN trending_local > 0 AND trending_global <= 0 THEN 2 - WHEN trending_local <= 0 AND trending_global <= 0 THEN 1 - END, - trending_mixed = CASE - WHEN trending_local > 0 AND trending_global > 0 THEN trending_global - WHEN trending_local <= 0 AND trending_global > 0 THEN trending_local - WHEN trending_local > 0 AND trending_global <= 0 THEN trending_local - WHEN trending_local <= 0 AND trending_global <= 0 THEN trending_global - END - WHERE trending_local <> 0 OR trending_global <> 0 - """)