diff --git a/.gitignore b/.gitignore index e72269ce1..22999a054 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ lbry.egg-info __pycache__ _trial_temp/ +trending*.log /tests/integration/blockchain/files /tests/.coverage.* diff --git a/lbry/wallet/server/db/trending/variable_decay.py b/lbry/wallet/server/db/trending/variable_decay.py index a64c0ff7c..0f6fa71be 100644 --- a/lbry/wallet/server/db/trending/variable_decay.py +++ b/lbry/wallet/server/db/trending/variable_decay.py @@ -1,18 +1,17 @@ """ -Delayed AR with variable decay rate. - -The spike height function is also simpler. +AR-like trending with a delayed effect and a faster +decay rate for high valued claims. """ -import copy +import math import time import apsw # Half life in blocks *for lower LBC claims* (it's shorter for whale claims) HALF_LIFE = 200 -# Whale threshold (higher -> less DB writing) -WHALE_THRESHOLD = 3.0 +# Whale threshold, in LBC (higher -> less DB writing) +WHALE_THRESHOLD = 10000.0 # Decay coefficient per block DECAY = 0.5**(1.0/HALF_LIFE) @@ -38,6 +37,7 @@ def install(connection): Install the trending algorithm. """ check_trending_values(connection) + trending_data.initialise(connection.cursor()) if TRENDING_LOG: f = open("trending_variable_decay.log", "a") @@ -46,7 +46,6 @@ def install(connection): # Stub CREATE_TREND_TABLE = "" - def check_trending_values(connection): """ If the trending values appear to be based on the zscore algorithm, @@ -60,19 +59,304 @@ def check_trending_values(connection): break if needs_reset: - print("Resetting some columns. This might take a while...", flush=True, end="") + print("Resetting some columns. This might take a while...", flush=True, + end="") c.execute(""" BEGIN; UPDATE claim SET trending_group = 0; UPDATE claim SET trending_mixed = 0; - UPDATE claim SET trending_global = 0; - UPDATE claim SET trending_local = 0; COMMIT;""") print("done.") -def spike_height(x, x_old): + + +def trending_log(s): """ - Compute the size of a trending spike (normed - constant units). + Log a string to the log file + """ + if TRENDING_LOG: + fout = open("trending_variable_decay.log", "a") + fout.write(s) + fout.flush() + fout.close() + + +def trending_unit(height): + """ + Return the trending score unit at a given height. + """ + # Round to the beginning of a SAVE_INTERVAL batch of blocks. + _height = height - (height % SAVE_INTERVAL) + return 1.0/DECAY**(height % RENORM_INTERVAL) + + +class TrendingDB: + """ + An in-memory database of trending scores + """ + + def __init__(self): + self.conn = apsw.Connection(":memory:") + self.cursor = self.conn.cursor() + self.initialised = False + self.write_needed = set() + + def execute(self, query, *args, **kwargs): + return self.cursor.execute(query, *args, **kwargs) + + def executemany(self, query, *args, **kwargs): + return self.cursor.executemany(query, *args, **kwargs) + + def begin(self): + self.execute("BEGIN;") + + def commit(self): + self.execute("COMMIT;") + + def initialise(self, db): + """ + Pass in claims.db + """ + if self.initialised: + return + + trending_log("Initialising trending database...") + + # The need for speed + self.execute("PRAGMA JOURNAL_MODE=OFF;") + self.execute("PRAGMA SYNCHRONOUS=0;") + + self.begin() + + # Create the tables + self.execute(""" + CREATE TABLE IF NOT EXISTS claims + (claim_hash BYTES PRIMARY KEY, + lbc REAL NOT NULL DEFAULT 0.0, + trending_score REAL NOT NULL DEFAULT 0.0) + WITHOUT ROWID;""") + + self.execute(""" + CREATE TABLE IF NOT EXISTS spikes + (id INTEGER PRIMARY KEY, + claim_hash BYTES NOT NULL, + height INTEGER NOT NULL, + mass REAL NOT NULL, + FOREIGN KEY (claim_hash) + REFERENCES claims (claim_hash));""") + + # Clear out any existing data + self.execute("DELETE FROM claims;") + self.execute("DELETE FROM spikes;") + + # Create indexes + self.execute("CREATE INDEX idx1 ON spikes (claim_hash, height, mass);") + self.execute("CREATE INDEX idx2 ON spikes (claim_hash, height, mass DESC);") + self.execute("CREATE INDEX idx3 on claims (lbc DESC, claim_hash, trending_score);") + + # Import data from claims.db + for row in db.execute(""" + SELECT claim_hash, + 1E-8*(amount + support_amount) AS lbc, + trending_mixed + FROM claim; + """): + self.execute("INSERT INTO claims VALUES (?, ?, ?);", row) + self.commit() + + self.initialised = True + trending_log("done.\n") + + def apply_spikes(self, height): + """ + Apply spikes that are due. This occurs inside a transaction. + """ + + spikes = [] + unit = trending_unit(height) + for row in self.execute(""" + SELECT SUM(mass), claim_hash FROM spikes + WHERE height = ? + GROUP BY claim_hash; + """, (height, )): + spikes.append((row[0]*unit, row[1])) + self.write_needed.add(row[1]) + + self.executemany(""" + UPDATE claims + SET trending_score = (trending_score + ?) + WHERE claim_hash = ?; + """, spikes) + self.execute("DELETE FROM spikes WHERE height = ?;", (height, )) + + + def decay_whales(self, height): + """ + Occurs inside transaction. + """ + if height % SAVE_INTERVAL != 0: + return + + whales = self.execute(""" + SELECT trending_score, lbc, claim_hash + FROM claims + WHERE lbc >= ?; + """, (WHALE_THRESHOLD, )).fetchall() + whales2 = [] + for whale in whales: + trending, lbc, claim_hash = whale + + # Overall multiplication factor for decay rate + # At WHALE_THRESHOLD, this is 1 + # At 10*WHALE_THRESHOLD, it is 3 + decay_rate_factor = 1.0 + 2.0*math.log10(lbc/WHALE_THRESHOLD) + + # The -1 is because this is just the *extra* part being applied + factor = (DECAY**SAVE_INTERVAL)**(decay_rate_factor - 1.0) + + # Decay + trending *= factor + whales2.append((trending, claim_hash)) + self.write_needed.add(claim_hash) + + self.executemany("UPDATE claims SET trending_score=? WHERE claim_hash=?;", + whales2) + + + def renorm(self, height): + """ + Renormalise trending scores. Occurs inside a transaction. + """ + + if height % RENORM_INTERVAL == 0: + threshold = 1.0E-3/DECAY_PER_RENORM + for row in self.execute("""SELECT claim_hash FROM claims + WHERE ABS(trending_score) >= ?;""", + (threshold, )): + self.write_needed.add(row[0]) + + self.execute("""UPDATE claims SET trending_score = ?*trending_score + WHERE ABS(trending_score) >= ?;""", + (DECAY_PER_RENORM, threshold)) + + def write_to_claims_db(self, db, height): + """ + Write changed trending scores to claims.db. + """ + if height % SAVE_INTERVAL != 0: + return + + rows = self.execute(f""" + SELECT trending_score, claim_hash + FROM claims + WHERE claim_hash IN + ({','.join('?' for _ in self.write_needed)}); + """, self.write_needed).fetchall() + + db.executemany("""UPDATE claim SET trending_mixed = ? + WHERE claim_hash = ?;""", rows) + + # Clear list of claims needing to be written to claims.db + self.write_needed = set() + + + def update(self, db, height, recalculate_claim_hashes): + """ + Update trending scores. + Input is a cursor to claims.db, the block height, and the list of + claims that changed. + """ + assert self.initialised + + self.begin() + self.renorm(height) + + # Fetch changed/new claims from claims.db + for row in db.execute(f""" + SELECT claim_hash, + 1E-8*(amount + support_amount) AS lbc + FROM claim + WHERE claim_hash IN + ({','.join('?' for _ in recalculate_claim_hashes)}); + """, recalculate_claim_hashes): + claim_hash, lbc = row + + # Insert into trending db if it does not exist + self.execute(""" + INSERT INTO claims (claim_hash) + VALUES (?) + ON CONFLICT (claim_hash) DO NOTHING;""", + (claim_hash, )) + + # See if it was an LBC change + old = self.execute("SELECT * FROM claims WHERE claim_hash=?;", + (claim_hash, )).fetchone() + lbc_old = old[1] + + # Save new LBC value into trending db + self.execute("UPDATE claims SET lbc = ? WHERE claim_hash = ?;", + (lbc, claim_hash)) + + if lbc > lbc_old: + + # Schedule a future spike + delay = min(int((lbc + 1E-8)**0.4), HALF_LIFE) + spike = (claim_hash, height + delay, spike_mass(lbc, lbc_old)) + self.execute("""INSERT INTO spikes + (claim_hash, height, mass) + VALUES (?, ?, ?);""", spike) + + elif lbc < lbc_old: + + # Subtract from future spikes + penalty = spike_mass(lbc_old, lbc) + spikes = self.execute(""" + SELECT * FROM spikes + WHERE claim_hash = ? + ORDER BY height ASC, mass DESC; + """, (claim_hash, )).fetchall() + for spike in spikes: + spike_id, mass = spike[0], spike[3] + + if mass > penalty: + # The entire penalty merely reduces this spike + self.execute("UPDATE spikes SET mass=? WHERE id=?;", + (mass - penalty, spike_id)) + penalty = 0.0 + else: + # Removing this spike entirely accounts for some (or + # all) of the penalty, then move on to other spikes + self.execute("DELETE FROM spikes WHERE id=?;", + (spike_id, )) + penalty -= mass + + # If penalty remains, that's a negative spike to be applied + # immediately. + if penalty > 0.0: + self.execute(""" + INSERT INTO spikes (claim_hash, height, mass) + VALUES (?, ?, ?);""", + (claim_hash, height, -penalty)) + + self.apply_spikes(height) + self.decay_whales(height) + self.commit() + + self.write_to_claims_db(db, height) + + + + + +# The "global" instance to work with +# pylint: disable=C0103 +trending_data = TrendingDB() + +def spike_mass(x, x_old): + """ + Compute the mass of a trending spike (normed - constant units). + x_old = old LBC value + x = new LBC value """ # Sign of trending spike @@ -89,155 +373,16 @@ def spike_height(x, x_old): return sign*mag +def run(db, height, final_height, recalculate_claim_hashes): + if height < final_height - 5*HALF_LIFE: + trending_log(f"Skipping trending calculations at block {height}.\n") + return -def get_time_boost(height): - """ - Return the time boost at a given height. - """ - return 1.0/DECAY**(height % RENORM_INTERVAL) - - -def trending_log(s): - """ - Log a string. - """ - if TRENDING_LOG: - fout = open("trending_variable_decay.log", "a") - fout.write(s) - fout.flush() - fout.close() - -class TrendingData: - """ - An object of this class holds trending data - """ - def __init__(self): - - # Dict from claim id to some trending info. - # Units are TIME VARIABLE in here - self.claims = {} - - # Claims with >= WHALE_THRESHOLD LBC total amount - self.whales = set([]) - - # Have all claims been read from db yet? - self.initialised = False - - # List of pending spikes. - # Units are CONSTANT in here - self.pending_spikes = [] - - def insert_claim_from_load(self, height, claim_hash, trending_score, total_amount): - assert not self.initialised - self.claims[claim_hash] = {"trending_score": trending_score, - "total_amount": total_amount, - "changed": False} - - if trending_score >= WHALE_THRESHOLD*get_time_boost(height): - self.add_whale(claim_hash) - - def add_whale(self, claim_hash): - self.whales.add(claim_hash) - - def apply_spikes(self, height): - """ - Apply all pending spikes that are due at this height. - Apply with time boost ON. - """ - time_boost = get_time_boost(height) - - for spike in self.pending_spikes: - if spike["height"] > height: - # Ignore - pass - if spike["height"] == height: - # Apply - self.claims[spike["claim_hash"]]["trending_score"] += time_boost*spike["size"] - self.claims[spike["claim_hash"]]["changed"] = True - - if self.claims[spike["claim_hash"]]["trending_score"] >= WHALE_THRESHOLD*time_boost: - self.add_whale(spike["claim_hash"]) - if spike["claim_hash"] in self.whales and \ - self.claims[spike["claim_hash"]]["trending_score"] < WHALE_THRESHOLD*time_boost: - self.whales.remove(spike["claim_hash"]) - - - # Keep only future spikes - self.pending_spikes = [s for s in self.pending_spikes \ - if s["height"] > height] - - - - - def update_claim(self, height, claim_hash, total_amount): - """ - Update trending data for a claim, given its new total amount. - """ - assert self.initialised - - # Extract existing total amount and trending score - # or use starting values if the claim is new - if claim_hash in self.claims: - old_state = copy.deepcopy(self.claims[claim_hash]) - else: - old_state = {"trending_score": 0.0, - "total_amount": 0.0, - "changed": False} - - # Calculate LBC change - change = total_amount - old_state["total_amount"] - - # Modify data if there was an LBC change - if change != 0.0: - spike = spike_height(total_amount, - old_state["total_amount"]) - delay = min(int((total_amount + 1E-8)**0.4), HALF_LIFE) - - if change < 0.0: - - # How big would the spike be for the inverse movement? - reverse_spike = spike_height(old_state["total_amount"], total_amount) - - # Remove that much spike from future pending ones - for future_spike in self.pending_spikes: - if future_spike["claim_hash"] == claim_hash: - if reverse_spike >= future_spike["size"]: - reverse_spike -= future_spike["size"] - future_spike["size"] = 0.0 - elif reverse_spike > 0.0: - future_spike["size"] -= reverse_spike - reverse_spike = 0.0 - - delay = 0 - spike = -reverse_spike - - self.pending_spikes.append({"height": height + delay, - "claim_hash": claim_hash, - "size": spike}) - - self.claims[claim_hash] = {"total_amount": total_amount, - "trending_score": old_state["trending_score"], - "changed": False} - - def process_whales(self, height): - """ - Whale claims decay faster. - """ - if height % SAVE_INTERVAL != 0: - return - - for claim_hash in self.whales: - trending_normed = self.claims[claim_hash]["trending_score"]/get_time_boost(height) - - # Overall multiplication factor for decay rate - decay_rate_factor = trending_normed/WHALE_THRESHOLD - - # The -1 is because this is just the *extra* part being applied - factor = (DECAY**SAVE_INTERVAL)**(decay_rate_factor - 1.0) - # print(claim_hash, trending_normed, decay_rate_factor) - self.claims[claim_hash]["trending_score"] *= factor - self.claims[claim_hash]["changed"] = True - + start = time.time() + trending_log(f"Calculating variable_decay trending at block {height}.\n") + trending_data.update(db, height, recalculate_claim_hashes) + end = time.time() + trending_log(f"Trending operations took {end - start} seconds.\n\n") def test_trending(): """ @@ -260,12 +405,12 @@ def test_trending(): COMMIT; """) + # Initialise trending data before anything happens with the claims + trending_data.initialise(db) + # Insert initial states of claims - everything = {"huge_whale": 0.01, - "huge_whale_botted": 0.01, - "medium_whale": 0.01, - "small_whale": 0.01, - "minnow": 0.01} + everything = {"huge_whale": 0.01, "medium_whale": 0.01, "small_whale": 0.01, + "huge_whale_botted": 0.01, "minnow": 0.01} def to_list_of_tuples(stuff): l = [] @@ -277,13 +422,17 @@ def test_trending(): INSERT INTO claim (claim_hash, amount) VALUES (?, 1E8*?); """, to_list_of_tuples(everything)) + # Process block zero height = 0 run(db, height, height, everything.keys()) # Save trajectories for plotting trajectories = {} - for key in trending_data.claims: - trajectories[key] = [trending_data.claims[key]["trending_score"]] + for row in trending_data.execute(""" + SELECT claim_hash, trending_score + FROM claims; + """): + trajectories[row[0]] = [row[1]/trending_unit(height)] # Main loop for height in range(1, 1000): @@ -312,119 +461,24 @@ def test_trending(): # Call run() run(db, height, height, everything.keys()) - for key in trending_data.claims: - trajectories[key].append(trending_data.claims[key]["trending_score"]\ - /get_time_boost(height)) + # Append current trending scores to trajectories + for row in db.execute(""" + SELECT claim_hash, trending_mixed + FROM claim; + """): + trajectories[row[0]].append(row[1]/trending_unit(height)) dbc.close() # pylint: disable=C0415 import matplotlib.pyplot as plt - for key in trending_data.claims: + for key in trajectories: plt.plot(trajectories[key], label=key) plt.legend() plt.show() -# One global instance -# pylint: disable=C0103 -trending_data = TrendingData() -def run(db, height, final_height, recalculate_claim_hashes): - - if height < final_height - 5*HALF_LIFE: - trending_log("Skipping variable_decay trending at block {h}.\n".format(h=height)) - return - - start = time.time() - - trending_log("Calculating variable_decay trending at block {h}.\n".format(h=height)) - trending_log(" Length of trending data = {l}.\n"\ - .format(l=len(trending_data.claims))) - - # Renormalise trending scores and mark all as having changed - if height % RENORM_INTERVAL == 0: - trending_log(" Renormalising trending scores...") - - keys = trending_data.claims.keys() - trending_data.whales = set([]) - for key in keys: - if trending_data.claims[key]["trending_score"] != 0.0: - trending_data.claims[key]["trending_score"] *= DECAY_PER_RENORM - trending_data.claims[key]["changed"] = True - - # Tiny becomes zero - if abs(trending_data.claims[key]["trending_score"]) < 1E-3: - trending_data.claims[key]["trending_score"] = 0.0 - - # Re-mark whales - if trending_data.claims[key]["trending_score"] >= WHALE_THRESHOLD*get_time_boost(height): - trending_data.add_whale(key) - - trending_log("done.\n") - - - # Regular message. - trending_log(" Reading total_amounts from db and updating"\ - + " trending scores in RAM...") - - # Update claims from db - if not trending_data.initialised: - - trending_log("initial load...") - # On fresh launch - for row in db.execute(""" - SELECT claim_hash, trending_mixed, - (amount + support_amount) - AS total_amount - FROM claim; - """): - trending_data.insert_claim_from_load(height, row[0], row[1], 1E-8*row[2]) - trending_data.initialised = True - else: - for row in db.execute(f""" - SELECT claim_hash, - (amount + support_amount) - AS total_amount - FROM claim - WHERE claim_hash IN - ({','.join('?' for _ in recalculate_claim_hashes)}); - """, recalculate_claim_hashes): - trending_data.update_claim(height, row[0], 1E-8*row[1]) - - # Apply pending spikes - trending_data.apply_spikes(height) - - trending_log("done.\n") - - - # Write trending scores to DB - if height % SAVE_INTERVAL == 0: - - trending_log(" Finding and processing whales...") - trending_log(str(len(trending_data.whales)) + " whales found...") - trending_data.process_whales(height) - trending_log("done.\n") - - trending_log(" Writing trending scores to db...") - - the_list = [] - keys = trending_data.claims.keys() - - for key in keys: - if trending_data.claims[key]["changed"]: - the_list.append((trending_data.claims[key]["trending_score"], key)) - trending_data.claims[key]["changed"] = False - - trending_log("{n} scores to write...".format(n=len(the_list))) - - db.executemany("UPDATE claim SET trending_mixed=? WHERE claim_hash=?;", - the_list) - - trending_log("done.\n") - - trending_log("Trending operations took {time} seconds.\n\n"\ - .format(time=time.time() - start)) if __name__ == "__main__":