lbry-sdk/lbry/wallet/server/db/trending.py
Brendon J. Brewer 0c0e36b6f8
trending
2021-10-05 16:44:49 -04:00

300 lines
9.8 KiB
Python

import math
import os
import sqlite3
import time
HALF_LIFE = 400
RENORM_INTERVAL = 1000
WHALE_THRESHOLD = 10000.0
def whale_decay_factor(lbc):
"""
An additional decay factor applied to whale claims.
"""
if lbc <= WHALE_THRESHOLD:
return 1.0
adjusted_half_life = HALF_LIFE/(math.log10(lbc/WHALE_THRESHOLD) + 1.0)
return 2.0**(1.0/HALF_LIFE - 1.0/adjusted_half_life)
def soften(lbc):
mag = abs(lbc) + 1E-8
sign = 1.0 if lbc >= 0.0 else -1.0
return sign*mag**0.25
def delay(lbc: int):
if lbc <= 0:
return 0
elif lbc < 1000000:
return int(lbc**0.5)
else:
return 1000
def inflate_units(height):
blocks = height % RENORM_INTERVAL
return 2.0 ** (blocks/HALF_LIFE)
PRAGMAS = ["PRAGMA FOREIGN_KEYS = OFF;",
"PRAGMA JOURNAL_MODE = WAL;",
"PRAGMA SYNCHRONOUS = 0;"]
class TrendingDB:
def __init__(self, data_dir):
"""
Opens the trending database in the directory data_dir.
For testing, pass data_dir=":memory:"
"""
if data_dir == ":memory:":
path = ":memory:"
else:
path = os.path.join(data_dir, "trending.db")
self.db = sqlite3.connect(path, check_same_thread=False)
for pragma in PRAGMAS:
self.execute(pragma)
self.execute("BEGIN;")
self._create_tables()
self._create_indices()
self.execute("COMMIT;")
self.pending_events = []
def execute(self, *args, **kwargs):
return self.db.execute(*args, **kwargs)
def add_event(self, event):
self.pending_events.append(event)
# print(f"Added event: {event}.", flush=True)
def _create_tables(self):
self.execute("""CREATE TABLE IF NOT EXISTS claims
(claim_hash BYTES NOT NULL PRIMARY KEY,
bid_lbc REAL NOT NULL,
support_lbc REAL NOT NULL,
trending_score REAL NOT NULL,
needs_write BOOLEAN NOT NULL)
WITHOUT ROWID;""")
self.execute("""CREATE TABLE IF NOT EXISTS spikes
(claim_hash BYTES NOT NULL REFERENCES claims (claim_hash),
activation_height INTEGER NOT NULL,
mass REAL NOT NULL);""")
def _create_indices(self):
self.execute("CREATE INDEX IF NOT EXISTS idx1 ON spikes\
(activation_height, claim_hash, mass);")
self.execute("CREATE INDEX IF NOT EXISTS idx2 ON spikes\
(claim_hash);")
self.execute("CREATE INDEX IF NOT EXISTS idx3 ON claims (trending_score);")
self.execute("CREATE INDEX IF NOT EXISTS idx4 ON claims (needs_write, claim_hash);")
self.execute("CREATE INDEX IF NOT EXISTS idx5 ON claims (bid_lbc + support_lbc);")
def get_trending_score(self, claim_hash):
result = self.execute("SELECT trending_score FROM claims\
WHERE claim_hash = ?;", (claim_hash, ))\
.fetchall()
if len(result) == 0:
return 0.0
else:
return result[0]
def _upsert_claim(self, height, event):
claim_hash = event["claim_hash"]
# Get old total lbc value of claim
old_lbc_pair = self.execute("SELECT bid_lbc, support_lbc FROM claims\
WHERE claim_hash = ?;",
(claim_hash, )).fetchone()
if old_lbc_pair is None:
old_lbc_pair = (0.0, 0.0)
if event["event"] == "upsert":
new_lbc_pair = (event["lbc"], old_lbc_pair[1])
elif event["event"] == "support":
new_lbc_pair = (old_lbc_pair[0], old_lbc_pair[1] + event["lbc"])
# Upsert the claim
self.execute("INSERT INTO claims VALUES (?, ?, ?, ?, 1)\
ON CONFLICT (claim_hash) DO UPDATE\
SET bid_lbc = excluded.bid_lbc,\
support_lbc = excluded.support_lbc;",
(claim_hash, new_lbc_pair[0], new_lbc_pair[1], 0.0))
if self.active:
old_lbc, lbc = sum(old_lbc_pair), sum(new_lbc_pair)
# Add the spike
softened_change = soften(lbc - old_lbc)
change_in_softened = soften(lbc) - soften(old_lbc)
spike_mass = (softened_change**0.25*change_in_softened**0.75).real
activation_height = height + delay(lbc)
if spike_mass != 0.0:
self.execute("INSERT INTO spikes VALUES (?, ?, ?);",
(claim_hash, activation_height, spike_mass))
def _delete_claim(self, claim_hash):
self.execute("DELETE FROM spikes WHERE claim_hash = ?;", (claim_hash, ))
self.execute("DELETE FROM claims WHERE claim_hash = ?;", (claim_hash, ))
def _apply_spikes(self, height):
spikes = self.execute("SELECT claim_hash, mass FROM spikes\
WHERE activation_height = ?;",
(height, )).fetchall()
for claim_hash, mass in spikes: # TODO: executemany for efficiency
self.execute("UPDATE claims SET trending_score = trending_score + ?,\
needs_write = 1\
WHERE claim_hash = ?;",
(mass, claim_hash))
self.execute("DELETE FROM spikes WHERE activation_height = ?;",
(height, ))
def _decay_whales(self):
whales = self.execute("SELECT claim_hash, bid_lbc + support_lbc FROM claims\
WHERE bid_lbc + support_lbc >= ?;", (WHALE_THRESHOLD, ))\
.fetchall()
for claim_hash, lbc in whales:
factor = whale_decay_factor(lbc)
self.execute("UPDATE claims SET trending_score = trending_score*?, needs_write = 1\
WHERE claim_hash = ?;", (factor, claim_hash))
def _renorm(self):
factor = 2.0**(-RENORM_INTERVAL/HALF_LIFE)
# Zero small values
self.execute("UPDATE claims SET trending_score = 0.0, needs_write = 1\
WHERE trending_score <> 0.0 AND ABS(?*trending_score) < 1E-6;",
(factor, ))
# Normalise other values
self.execute("UPDATE claims SET trending_score = ?*trending_score, needs_write = 1\
WHERE trending_score <> 0.0;", (factor, ))
def process_block(self, height, daemon_height):
self.active = daemon_height - height <= 10*HALF_LIFE
self.execute("BEGIN;")
if self.active:
# Check for a unit change
if height % RENORM_INTERVAL == 0:
self._renorm()
# Apply extra whale decay
self._decay_whales()
# Upsert claims
for event in self.pending_events:
if event["event"] == "upsert":
self._upsert_claim(height, event)
# Process supports
for event in self.pending_events:
if event["event"] == "support":
self._upsert_claim(height, event)
# Delete claims
for event in self.pending_events:
if event["event"] == "delete":
self._delete_claim(event["claim_hash"])
if self.active:
# Apply spikes
self._apply_spikes(height)
# Get set of claims that need writing to ES
claims_to_write = set()
for row in self.db.execute("SELECT claim_hash FROM claims WHERE\
needs_write = 1;"):
claims_to_write.add(row[0])
self.db.execute("UPDATE claims SET needs_write = 0\
WHERE needs_write = 1;")
self.execute("COMMIT;")
self.pending_events.clear()
return claims_to_write
if __name__ == "__main__":
import matplotlib.pyplot as plt
import numpy as np
import numpy.random as rng
import os
trending_db = TrendingDB(":memory:")
heights = list(range(1, 1000))
heights = heights + heights[::-1] + heights
events = [{"height": 45,
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
{"height": 100,
"what": dict(claim_hash="a", event="support", lbc=3.0)},
{"height": 150,
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
{"height": 170,
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
{"height": 730,
"what": dict(claim_hash="a", event="delete")}]
inverse_events = [{"height": 730,
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
{"height": 170,
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
{"height": 150,
"what": dict(claim_hash="a", event="support", lbc=3.0)},
{"height": 100,
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
{"height": 45,
"what": dict(claim_hash="a", event="delete")}]
xs, ys = [], []
last_height = 0
for height in heights:
# Prepare the changes
if height > last_height:
es = events
else:
es = inverse_events
for event in es:
if event["height"] == height:
trending_db.add_event(event["what"])
# Process the block
trending_db.process_block(height, height)
if height > last_height: # Only plot when moving forward
xs.append(height)
y = trending_db.execute("SELECT trending_score FROM claims;").fetchone()
y = 0.0 if y is None else y[0]
ys.append(y/inflate_units(height))
last_height = height
xs = np.array(xs)
ys = np.array(ys)
plt.figure(1)
plt.plot(xs, ys, "o-", alpha=0.2)
plt.figure(2)
plt.plot(xs)
plt.show()