forked from LBRYCommunity/lbry-sdk
485 lines
15 KiB
Python
485 lines
15 KiB
Python
"""
|
|
AR-like trending with a delayed effect and a faster
|
|
decay rate for high valued claims.
|
|
"""
|
|
|
|
import math
|
|
import time
|
|
import apsw
|
|
|
|
# Half life in blocks *for lower LBC claims* (it's shorter for whale claims)
|
|
HALF_LIFE = 200
|
|
|
|
# Whale threshold, in LBC (higher -> less DB writing)
|
|
WHALE_THRESHOLD = 10000.0
|
|
|
|
# Decay coefficient per block
|
|
DECAY = 0.5**(1.0/HALF_LIFE)
|
|
|
|
# How frequently to write trending values to the db
|
|
SAVE_INTERVAL = 10
|
|
|
|
# Renormalisation interval
|
|
RENORM_INTERVAL = 1000
|
|
|
|
# Assertion
|
|
assert RENORM_INTERVAL % SAVE_INTERVAL == 0
|
|
|
|
# Decay coefficient per renormalisation interval
|
|
DECAY_PER_RENORM = DECAY**(RENORM_INTERVAL)
|
|
|
|
# Log trending calculations?
|
|
TRENDING_LOG = True
|
|
|
|
|
|
def install(connection):
|
|
"""
|
|
Install the trending algorithm.
|
|
"""
|
|
check_trending_values(connection)
|
|
trending_data.initialise(connection.cursor())
|
|
|
|
if TRENDING_LOG:
|
|
f = open("trending_variable_decay.log", "a")
|
|
f.close()
|
|
|
|
# Stub
|
|
CREATE_TREND_TABLE = ""
|
|
|
|
def check_trending_values(connection):
|
|
"""
|
|
If the trending values appear to be based on the zscore algorithm,
|
|
reset them. This will allow resyncing from a standard snapshot.
|
|
"""
|
|
c = connection.cursor()
|
|
needs_reset = False
|
|
for row in c.execute("SELECT COUNT(*) num FROM claim WHERE trending_global <> 0;"):
|
|
if row[0] != 0:
|
|
needs_reset = True
|
|
break
|
|
|
|
if needs_reset:
|
|
print("Resetting some columns. This might take a while...", flush=True,
|
|
end="")
|
|
c.execute(""" BEGIN;
|
|
UPDATE claim SET trending_group = 0;
|
|
UPDATE claim SET trending_mixed = 0;
|
|
COMMIT;""")
|
|
print("done.")
|
|
|
|
|
|
|
|
|
|
def trending_log(s):
|
|
"""
|
|
Log a string to the log file
|
|
"""
|
|
if TRENDING_LOG:
|
|
fout = open("trending_variable_decay.log", "a")
|
|
fout.write(s)
|
|
fout.flush()
|
|
fout.close()
|
|
|
|
|
|
def trending_unit(height):
|
|
"""
|
|
Return the trending score unit at a given height.
|
|
"""
|
|
# Round to the beginning of a SAVE_INTERVAL batch of blocks.
|
|
_height = height - (height % SAVE_INTERVAL)
|
|
return 1.0/DECAY**(height % RENORM_INTERVAL)
|
|
|
|
|
|
class TrendingDB:
|
|
"""
|
|
An in-memory database of trending scores
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.conn = apsw.Connection(":memory:")
|
|
self.cursor = self.conn.cursor()
|
|
self.initialised = False
|
|
self.write_needed = set()
|
|
|
|
def execute(self, query, *args, **kwargs):
|
|
return self.cursor.execute(query, *args, **kwargs)
|
|
|
|
def executemany(self, query, *args, **kwargs):
|
|
return self.cursor.executemany(query, *args, **kwargs)
|
|
|
|
def begin(self):
|
|
self.execute("BEGIN;")
|
|
|
|
def commit(self):
|
|
self.execute("COMMIT;")
|
|
|
|
def initialise(self, db):
|
|
"""
|
|
Pass in claims.db
|
|
"""
|
|
if self.initialised:
|
|
return
|
|
|
|
trending_log("Initialising trending database...")
|
|
|
|
# The need for speed
|
|
self.execute("PRAGMA JOURNAL_MODE=OFF;")
|
|
self.execute("PRAGMA SYNCHRONOUS=0;")
|
|
|
|
self.begin()
|
|
|
|
# Create the tables
|
|
self.execute("""
|
|
CREATE TABLE IF NOT EXISTS claims
|
|
(claim_hash BYTES PRIMARY KEY,
|
|
lbc REAL NOT NULL DEFAULT 0.0,
|
|
trending_score REAL NOT NULL DEFAULT 0.0)
|
|
WITHOUT ROWID;""")
|
|
|
|
self.execute("""
|
|
CREATE TABLE IF NOT EXISTS spikes
|
|
(id INTEGER PRIMARY KEY,
|
|
claim_hash BYTES NOT NULL,
|
|
height INTEGER NOT NULL,
|
|
mass REAL NOT NULL,
|
|
FOREIGN KEY (claim_hash)
|
|
REFERENCES claims (claim_hash));""")
|
|
|
|
# Clear out any existing data
|
|
self.execute("DELETE FROM claims;")
|
|
self.execute("DELETE FROM spikes;")
|
|
|
|
# Create indexes
|
|
self.execute("CREATE INDEX idx1 ON spikes (claim_hash, height, mass);")
|
|
self.execute("CREATE INDEX idx2 ON spikes (claim_hash, height, mass DESC);")
|
|
self.execute("CREATE INDEX idx3 on claims (lbc DESC, claim_hash, trending_score);")
|
|
|
|
# Import data from claims.db
|
|
for row in db.execute("""
|
|
SELECT claim_hash,
|
|
1E-8*(amount + support_amount) AS lbc,
|
|
trending_mixed
|
|
FROM claim;
|
|
"""):
|
|
self.execute("INSERT INTO claims VALUES (?, ?, ?);", row)
|
|
self.commit()
|
|
|
|
self.initialised = True
|
|
trending_log("done.\n")
|
|
|
|
def apply_spikes(self, height):
|
|
"""
|
|
Apply spikes that are due. This occurs inside a transaction.
|
|
"""
|
|
|
|
spikes = []
|
|
unit = trending_unit(height)
|
|
for row in self.execute("""
|
|
SELECT SUM(mass), claim_hash FROM spikes
|
|
WHERE height = ?
|
|
GROUP BY claim_hash;
|
|
""", (height, )):
|
|
spikes.append((row[0]*unit, row[1]))
|
|
self.write_needed.add(row[1])
|
|
|
|
self.executemany("""
|
|
UPDATE claims
|
|
SET trending_score = (trending_score + ?)
|
|
WHERE claim_hash = ?;
|
|
""", spikes)
|
|
self.execute("DELETE FROM spikes WHERE height = ?;", (height, ))
|
|
|
|
|
|
def decay_whales(self, height):
|
|
"""
|
|
Occurs inside transaction.
|
|
"""
|
|
if height % SAVE_INTERVAL != 0:
|
|
return
|
|
|
|
whales = self.execute("""
|
|
SELECT trending_score, lbc, claim_hash
|
|
FROM claims
|
|
WHERE lbc >= ?;
|
|
""", (WHALE_THRESHOLD, )).fetchall()
|
|
whales2 = []
|
|
for whale in whales:
|
|
trending, lbc, claim_hash = whale
|
|
|
|
# Overall multiplication factor for decay rate
|
|
# At WHALE_THRESHOLD, this is 1
|
|
# At 10*WHALE_THRESHOLD, it is 3
|
|
decay_rate_factor = 1.0 + 2.0*math.log10(lbc/WHALE_THRESHOLD)
|
|
|
|
# The -1 is because this is just the *extra* part being applied
|
|
factor = (DECAY**SAVE_INTERVAL)**(decay_rate_factor - 1.0)
|
|
|
|
# Decay
|
|
trending *= factor
|
|
whales2.append((trending, claim_hash))
|
|
self.write_needed.add(claim_hash)
|
|
|
|
self.executemany("UPDATE claims SET trending_score=? WHERE claim_hash=?;",
|
|
whales2)
|
|
|
|
|
|
def renorm(self, height):
|
|
"""
|
|
Renormalise trending scores. Occurs inside a transaction.
|
|
"""
|
|
|
|
if height % RENORM_INTERVAL == 0:
|
|
threshold = 1.0E-3/DECAY_PER_RENORM
|
|
for row in self.execute("""SELECT claim_hash FROM claims
|
|
WHERE ABS(trending_score) >= ?;""",
|
|
(threshold, )):
|
|
self.write_needed.add(row[0])
|
|
|
|
self.execute("""UPDATE claims SET trending_score = ?*trending_score
|
|
WHERE ABS(trending_score) >= ?;""",
|
|
(DECAY_PER_RENORM, threshold))
|
|
|
|
def write_to_claims_db(self, db, height):
|
|
"""
|
|
Write changed trending scores to claims.db.
|
|
"""
|
|
if height % SAVE_INTERVAL != 0:
|
|
return
|
|
|
|
rows = self.execute(f"""
|
|
SELECT trending_score, claim_hash
|
|
FROM claims
|
|
WHERE claim_hash IN
|
|
({','.join('?' for _ in self.write_needed)});
|
|
""", self.write_needed).fetchall()
|
|
|
|
db.executemany("""UPDATE claim SET trending_mixed = ?
|
|
WHERE claim_hash = ?;""", rows)
|
|
|
|
# Clear list of claims needing to be written to claims.db
|
|
self.write_needed = set()
|
|
|
|
|
|
def update(self, db, height, recalculate_claim_hashes):
|
|
"""
|
|
Update trending scores.
|
|
Input is a cursor to claims.db, the block height, and the list of
|
|
claims that changed.
|
|
"""
|
|
assert self.initialised
|
|
|
|
self.begin()
|
|
self.renorm(height)
|
|
|
|
# Fetch changed/new claims from claims.db
|
|
for row in db.execute(f"""
|
|
SELECT claim_hash,
|
|
1E-8*(amount + support_amount) AS lbc
|
|
FROM claim
|
|
WHERE claim_hash IN
|
|
({','.join('?' for _ in recalculate_claim_hashes)});
|
|
""", recalculate_claim_hashes):
|
|
claim_hash, lbc = row
|
|
|
|
# Insert into trending db if it does not exist
|
|
self.execute("""
|
|
INSERT INTO claims (claim_hash)
|
|
VALUES (?)
|
|
ON CONFLICT (claim_hash) DO NOTHING;""",
|
|
(claim_hash, ))
|
|
|
|
# See if it was an LBC change
|
|
old = self.execute("SELECT * FROM claims WHERE claim_hash=?;",
|
|
(claim_hash, )).fetchone()
|
|
lbc_old = old[1]
|
|
|
|
# Save new LBC value into trending db
|
|
self.execute("UPDATE claims SET lbc = ? WHERE claim_hash = ?;",
|
|
(lbc, claim_hash))
|
|
|
|
if lbc > lbc_old:
|
|
|
|
# Schedule a future spike
|
|
delay = min(int((lbc + 1E-8)**0.4), HALF_LIFE)
|
|
spike = (claim_hash, height + delay, spike_mass(lbc, lbc_old))
|
|
self.execute("""INSERT INTO spikes
|
|
(claim_hash, height, mass)
|
|
VALUES (?, ?, ?);""", spike)
|
|
|
|
elif lbc < lbc_old:
|
|
|
|
# Subtract from future spikes
|
|
penalty = spike_mass(lbc_old, lbc)
|
|
spikes = self.execute("""
|
|
SELECT * FROM spikes
|
|
WHERE claim_hash = ?
|
|
ORDER BY height ASC, mass DESC;
|
|
""", (claim_hash, )).fetchall()
|
|
for spike in spikes:
|
|
spike_id, mass = spike[0], spike[3]
|
|
|
|
if mass > penalty:
|
|
# The entire penalty merely reduces this spike
|
|
self.execute("UPDATE spikes SET mass=? WHERE id=?;",
|
|
(mass - penalty, spike_id))
|
|
penalty = 0.0
|
|
else:
|
|
# Removing this spike entirely accounts for some (or
|
|
# all) of the penalty, then move on to other spikes
|
|
self.execute("DELETE FROM spikes WHERE id=?;",
|
|
(spike_id, ))
|
|
penalty -= mass
|
|
|
|
# If penalty remains, that's a negative spike to be applied
|
|
# immediately.
|
|
if penalty > 0.0:
|
|
self.execute("""
|
|
INSERT INTO spikes (claim_hash, height, mass)
|
|
VALUES (?, ?, ?);""",
|
|
(claim_hash, height, -penalty))
|
|
|
|
self.apply_spikes(height)
|
|
self.decay_whales(height)
|
|
self.commit()
|
|
|
|
self.write_to_claims_db(db, height)
|
|
|
|
|
|
|
|
|
|
|
|
# The "global" instance to work with
|
|
# pylint: disable=C0103
|
|
trending_data = TrendingDB()
|
|
|
|
def spike_mass(x, x_old):
|
|
"""
|
|
Compute the mass of a trending spike (normed - constant units).
|
|
x_old = old LBC value
|
|
x = new LBC value
|
|
"""
|
|
|
|
# Sign of trending spike
|
|
sign = 1.0
|
|
if x < x_old:
|
|
sign = -1.0
|
|
|
|
# Magnitude
|
|
mag = abs(x**0.25 - x_old**0.25)
|
|
|
|
# Minnow boost
|
|
mag *= 1.0 + 2E4/(x + 100.0)**2
|
|
|
|
return sign*mag
|
|
|
|
|
|
def run(db, height, final_height, recalculate_claim_hashes):
|
|
if height < final_height - 5*HALF_LIFE:
|
|
trending_log(f"Skipping trending calculations at block {height}.\n")
|
|
return
|
|
|
|
start = time.time()
|
|
trending_log(f"Calculating variable_decay trending at block {height}.\n")
|
|
trending_data.update(db, height, recalculate_claim_hashes)
|
|
end = time.time()
|
|
trending_log(f"Trending operations took {end - start} seconds.\n\n")
|
|
|
|
def test_trending():
|
|
"""
|
|
Quick trending test for claims with different support patterns.
|
|
Actually use the run() function.
|
|
"""
|
|
|
|
# Create a fake "claims.db" for testing
|
|
# pylint: disable=I1101
|
|
dbc = apsw.Connection(":memory:")
|
|
db = dbc.cursor()
|
|
|
|
# Create table
|
|
db.execute("""
|
|
BEGIN;
|
|
CREATE TABLE claim (claim_hash TEXT PRIMARY KEY,
|
|
amount REAL NOT NULL DEFAULT 0.0,
|
|
support_amount REAL NOT NULL DEFAULT 0.0,
|
|
trending_mixed REAL NOT NULL DEFAULT 0.0);
|
|
COMMIT;
|
|
""")
|
|
|
|
# Initialise trending data before anything happens with the claims
|
|
trending_data.initialise(db)
|
|
|
|
# Insert initial states of claims
|
|
everything = {"huge_whale": 0.01, "medium_whale": 0.01, "small_whale": 0.01,
|
|
"huge_whale_botted": 0.01, "minnow": 0.01}
|
|
|
|
def to_list_of_tuples(stuff):
|
|
l = []
|
|
for key in stuff:
|
|
l.append((key, stuff[key]))
|
|
return l
|
|
|
|
db.executemany("""
|
|
INSERT INTO claim (claim_hash, amount) VALUES (?, 1E8*?);
|
|
""", to_list_of_tuples(everything))
|
|
|
|
# Process block zero
|
|
height = 0
|
|
run(db, height, height, everything.keys())
|
|
|
|
# Save trajectories for plotting
|
|
trajectories = {}
|
|
for row in trending_data.execute("""
|
|
SELECT claim_hash, trending_score
|
|
FROM claims;
|
|
"""):
|
|
trajectories[row[0]] = [row[1]/trending_unit(height)]
|
|
|
|
# Main loop
|
|
for height in range(1, 1000):
|
|
|
|
# One-off supports
|
|
if height == 1:
|
|
everything["huge_whale"] += 5E5
|
|
everything["medium_whale"] += 5E4
|
|
everything["small_whale"] += 5E3
|
|
|
|
# Every block
|
|
if height < 500:
|
|
everything["huge_whale_botted"] += 5E5/500
|
|
everything["minnow"] += 1
|
|
|
|
# Remove supports
|
|
if height == 500:
|
|
for key in everything:
|
|
everything[key] = 0.01
|
|
|
|
# Whack into the db
|
|
db.executemany("""
|
|
UPDATE claim SET amount = 1E8*? WHERE claim_hash = ?;
|
|
""", [(y, x) for (x, y) in to_list_of_tuples(everything)])
|
|
|
|
# Call run()
|
|
run(db, height, height, everything.keys())
|
|
|
|
# Append current trending scores to trajectories
|
|
for row in db.execute("""
|
|
SELECT claim_hash, trending_mixed
|
|
FROM claim;
|
|
"""):
|
|
trajectories[row[0]].append(row[1]/trending_unit(height))
|
|
|
|
dbc.close()
|
|
|
|
# pylint: disable=C0415
|
|
import matplotlib.pyplot as plt
|
|
for key in trajectories:
|
|
plt.plot(trajectories[key], label=key)
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_trending()
|