forked from LBRYCommunity/lbry-sdk
Rewrite of variable_decay.py for speed improvements
This commit is contained in:
parent
511a5c3f82
commit
1cdff47477
2 changed files with 324 additions and 269 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -11,6 +11,7 @@
|
|||
lbry.egg-info
|
||||
__pycache__
|
||||
_trial_temp/
|
||||
trending*.log
|
||||
|
||||
/tests/integration/blockchain/files
|
||||
/tests/.coverage.*
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
"""
|
||||
Delayed AR with variable decay rate.
|
||||
|
||||
The spike height function is also simpler.
|
||||
AR-like trending with a delayed effect and a faster
|
||||
decay rate for high valued claims.
|
||||
"""
|
||||
|
||||
import copy
|
||||
import math
|
||||
import time
|
||||
import apsw
|
||||
|
||||
# Half life in blocks *for lower LBC claims* (it's shorter for whale claims)
|
||||
HALF_LIFE = 200
|
||||
|
||||
# Whale threshold (higher -> less DB writing)
|
||||
WHALE_THRESHOLD = 3.0
|
||||
# Whale threshold, in LBC (higher -> less DB writing)
|
||||
WHALE_THRESHOLD = 10000.0
|
||||
|
||||
# Decay coefficient per block
|
||||
DECAY = 0.5**(1.0/HALF_LIFE)
|
||||
|
@ -38,6 +37,7 @@ def install(connection):
|
|||
Install the trending algorithm.
|
||||
"""
|
||||
check_trending_values(connection)
|
||||
trending_data.initialise(connection.cursor())
|
||||
|
||||
if TRENDING_LOG:
|
||||
f = open("trending_variable_decay.log", "a")
|
||||
|
@ -46,7 +46,6 @@ def install(connection):
|
|||
# Stub
|
||||
CREATE_TREND_TABLE = ""
|
||||
|
||||
|
||||
def check_trending_values(connection):
|
||||
"""
|
||||
If the trending values appear to be based on the zscore algorithm,
|
||||
|
@ -60,19 +59,304 @@ def check_trending_values(connection):
|
|||
break
|
||||
|
||||
if needs_reset:
|
||||
print("Resetting some columns. This might take a while...", flush=True, end="")
|
||||
print("Resetting some columns. This might take a while...", flush=True,
|
||||
end="")
|
||||
c.execute(""" BEGIN;
|
||||
UPDATE claim SET trending_group = 0;
|
||||
UPDATE claim SET trending_mixed = 0;
|
||||
UPDATE claim SET trending_global = 0;
|
||||
UPDATE claim SET trending_local = 0;
|
||||
COMMIT;""")
|
||||
print("done.")
|
||||
|
||||
|
||||
def spike_height(x, x_old):
|
||||
|
||||
|
||||
def trending_log(s):
|
||||
"""
|
||||
Compute the size of a trending spike (normed - constant units).
|
||||
Log a string to the log file
|
||||
"""
|
||||
if TRENDING_LOG:
|
||||
fout = open("trending_variable_decay.log", "a")
|
||||
fout.write(s)
|
||||
fout.flush()
|
||||
fout.close()
|
||||
|
||||
|
||||
def trending_unit(height):
|
||||
"""
|
||||
Return the trending score unit at a given height.
|
||||
"""
|
||||
# Round to the beginning of a SAVE_INTERVAL batch of blocks.
|
||||
_height = height - (height % SAVE_INTERVAL)
|
||||
return 1.0/DECAY**(height % RENORM_INTERVAL)
|
||||
|
||||
|
||||
class TrendingDB:
|
||||
"""
|
||||
An in-memory database of trending scores
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.conn = apsw.Connection(":memory:")
|
||||
self.cursor = self.conn.cursor()
|
||||
self.initialised = False
|
||||
self.write_needed = set()
|
||||
|
||||
def execute(self, query, *args, **kwargs):
|
||||
return self.cursor.execute(query, *args, **kwargs)
|
||||
|
||||
def executemany(self, query, *args, **kwargs):
|
||||
return self.cursor.executemany(query, *args, **kwargs)
|
||||
|
||||
def begin(self):
|
||||
self.execute("BEGIN;")
|
||||
|
||||
def commit(self):
|
||||
self.execute("COMMIT;")
|
||||
|
||||
def initialise(self, db):
|
||||
"""
|
||||
Pass in claims.db
|
||||
"""
|
||||
if self.initialised:
|
||||
return
|
||||
|
||||
trending_log("Initialising trending database...")
|
||||
|
||||
# The need for speed
|
||||
self.execute("PRAGMA JOURNAL_MODE=OFF;")
|
||||
self.execute("PRAGMA SYNCHRONOUS=0;")
|
||||
|
||||
self.begin()
|
||||
|
||||
# Create the tables
|
||||
self.execute("""
|
||||
CREATE TABLE IF NOT EXISTS claims
|
||||
(claim_hash BYTES PRIMARY KEY,
|
||||
lbc REAL NOT NULL DEFAULT 0.0,
|
||||
trending_score REAL NOT NULL DEFAULT 0.0)
|
||||
WITHOUT ROWID;""")
|
||||
|
||||
self.execute("""
|
||||
CREATE TABLE IF NOT EXISTS spikes
|
||||
(id INTEGER PRIMARY KEY,
|
||||
claim_hash BYTES NOT NULL,
|
||||
height INTEGER NOT NULL,
|
||||
mass REAL NOT NULL,
|
||||
FOREIGN KEY (claim_hash)
|
||||
REFERENCES claims (claim_hash));""")
|
||||
|
||||
# Clear out any existing data
|
||||
self.execute("DELETE FROM claims;")
|
||||
self.execute("DELETE FROM spikes;")
|
||||
|
||||
# Create indexes
|
||||
self.execute("CREATE INDEX idx1 ON spikes (claim_hash, height, mass);")
|
||||
self.execute("CREATE INDEX idx2 ON spikes (claim_hash, height, mass DESC);")
|
||||
self.execute("CREATE INDEX idx3 on claims (lbc DESC, claim_hash, trending_score);")
|
||||
|
||||
# Import data from claims.db
|
||||
for row in db.execute("""
|
||||
SELECT claim_hash,
|
||||
1E-8*(amount + support_amount) AS lbc,
|
||||
trending_mixed
|
||||
FROM claim;
|
||||
"""):
|
||||
self.execute("INSERT INTO claims VALUES (?, ?, ?);", row)
|
||||
self.commit()
|
||||
|
||||
self.initialised = True
|
||||
trending_log("done.\n")
|
||||
|
||||
def apply_spikes(self, height):
|
||||
"""
|
||||
Apply spikes that are due. This occurs inside a transaction.
|
||||
"""
|
||||
|
||||
spikes = []
|
||||
unit = trending_unit(height)
|
||||
for row in self.execute("""
|
||||
SELECT SUM(mass), claim_hash FROM spikes
|
||||
WHERE height = ?
|
||||
GROUP BY claim_hash;
|
||||
""", (height, )):
|
||||
spikes.append((row[0]*unit, row[1]))
|
||||
self.write_needed.add(row[1])
|
||||
|
||||
self.executemany("""
|
||||
UPDATE claims
|
||||
SET trending_score = (trending_score + ?)
|
||||
WHERE claim_hash = ?;
|
||||
""", spikes)
|
||||
self.execute("DELETE FROM spikes WHERE height = ?;", (height, ))
|
||||
|
||||
|
||||
def decay_whales(self, height):
|
||||
"""
|
||||
Occurs inside transaction.
|
||||
"""
|
||||
if height % SAVE_INTERVAL != 0:
|
||||
return
|
||||
|
||||
whales = self.execute("""
|
||||
SELECT trending_score, lbc, claim_hash
|
||||
FROM claims
|
||||
WHERE lbc >= ?;
|
||||
""", (WHALE_THRESHOLD, )).fetchall()
|
||||
whales2 = []
|
||||
for whale in whales:
|
||||
trending, lbc, claim_hash = whale
|
||||
|
||||
# Overall multiplication factor for decay rate
|
||||
# At WHALE_THRESHOLD, this is 1
|
||||
# At 10*WHALE_THRESHOLD, it is 3
|
||||
decay_rate_factor = 1.0 + 2.0*math.log10(lbc/WHALE_THRESHOLD)
|
||||
|
||||
# The -1 is because this is just the *extra* part being applied
|
||||
factor = (DECAY**SAVE_INTERVAL)**(decay_rate_factor - 1.0)
|
||||
|
||||
# Decay
|
||||
trending *= factor
|
||||
whales2.append((trending, claim_hash))
|
||||
self.write_needed.add(claim_hash)
|
||||
|
||||
self.executemany("UPDATE claims SET trending_score=? WHERE claim_hash=?;",
|
||||
whales2)
|
||||
|
||||
|
||||
def renorm(self, height):
|
||||
"""
|
||||
Renormalise trending scores. Occurs inside a transaction.
|
||||
"""
|
||||
|
||||
if height % RENORM_INTERVAL == 0:
|
||||
threshold = 1.0E-3/DECAY_PER_RENORM
|
||||
for row in self.execute("""SELECT claim_hash FROM claims
|
||||
WHERE ABS(trending_score) >= ?;""",
|
||||
(threshold, )):
|
||||
self.write_needed.add(row[0])
|
||||
|
||||
self.execute("""UPDATE claims SET trending_score = ?*trending_score
|
||||
WHERE ABS(trending_score) >= ?;""",
|
||||
(DECAY_PER_RENORM, threshold))
|
||||
|
||||
def write_to_claims_db(self, db, height):
|
||||
"""
|
||||
Write changed trending scores to claims.db.
|
||||
"""
|
||||
if height % SAVE_INTERVAL != 0:
|
||||
return
|
||||
|
||||
rows = self.execute(f"""
|
||||
SELECT trending_score, claim_hash
|
||||
FROM claims
|
||||
WHERE claim_hash IN
|
||||
({','.join('?' for _ in self.write_needed)});
|
||||
""", self.write_needed).fetchall()
|
||||
|
||||
db.executemany("""UPDATE claim SET trending_mixed = ?
|
||||
WHERE claim_hash = ?;""", rows)
|
||||
|
||||
# Clear list of claims needing to be written to claims.db
|
||||
self.write_needed = set()
|
||||
|
||||
|
||||
def update(self, db, height, recalculate_claim_hashes):
|
||||
"""
|
||||
Update trending scores.
|
||||
Input is a cursor to claims.db, the block height, and the list of
|
||||
claims that changed.
|
||||
"""
|
||||
assert self.initialised
|
||||
|
||||
self.begin()
|
||||
self.renorm(height)
|
||||
|
||||
# Fetch changed/new claims from claims.db
|
||||
for row in db.execute(f"""
|
||||
SELECT claim_hash,
|
||||
1E-8*(amount + support_amount) AS lbc
|
||||
FROM claim
|
||||
WHERE claim_hash IN
|
||||
({','.join('?' for _ in recalculate_claim_hashes)});
|
||||
""", recalculate_claim_hashes):
|
||||
claim_hash, lbc = row
|
||||
|
||||
# Insert into trending db if it does not exist
|
||||
self.execute("""
|
||||
INSERT INTO claims (claim_hash)
|
||||
VALUES (?)
|
||||
ON CONFLICT (claim_hash) DO NOTHING;""",
|
||||
(claim_hash, ))
|
||||
|
||||
# See if it was an LBC change
|
||||
old = self.execute("SELECT * FROM claims WHERE claim_hash=?;",
|
||||
(claim_hash, )).fetchone()
|
||||
lbc_old = old[1]
|
||||
|
||||
# Save new LBC value into trending db
|
||||
self.execute("UPDATE claims SET lbc = ? WHERE claim_hash = ?;",
|
||||
(lbc, claim_hash))
|
||||
|
||||
if lbc > lbc_old:
|
||||
|
||||
# Schedule a future spike
|
||||
delay = min(int((lbc + 1E-8)**0.4), HALF_LIFE)
|
||||
spike = (claim_hash, height + delay, spike_mass(lbc, lbc_old))
|
||||
self.execute("""INSERT INTO spikes
|
||||
(claim_hash, height, mass)
|
||||
VALUES (?, ?, ?);""", spike)
|
||||
|
||||
elif lbc < lbc_old:
|
||||
|
||||
# Subtract from future spikes
|
||||
penalty = spike_mass(lbc_old, lbc)
|
||||
spikes = self.execute("""
|
||||
SELECT * FROM spikes
|
||||
WHERE claim_hash = ?
|
||||
ORDER BY height ASC, mass DESC;
|
||||
""", (claim_hash, )).fetchall()
|
||||
for spike in spikes:
|
||||
spike_id, mass = spike[0], spike[3]
|
||||
|
||||
if mass > penalty:
|
||||
# The entire penalty merely reduces this spike
|
||||
self.execute("UPDATE spikes SET mass=? WHERE id=?;",
|
||||
(mass - penalty, spike_id))
|
||||
penalty = 0.0
|
||||
else:
|
||||
# Removing this spike entirely accounts for some (or
|
||||
# all) of the penalty, then move on to other spikes
|
||||
self.execute("DELETE FROM spikes WHERE id=?;",
|
||||
(spike_id, ))
|
||||
penalty -= mass
|
||||
|
||||
# If penalty remains, that's a negative spike to be applied
|
||||
# immediately.
|
||||
if penalty > 0.0:
|
||||
self.execute("""
|
||||
INSERT INTO spikes (claim_hash, height, mass)
|
||||
VALUES (?, ?, ?);""",
|
||||
(claim_hash, height, -penalty))
|
||||
|
||||
self.apply_spikes(height)
|
||||
self.decay_whales(height)
|
||||
self.commit()
|
||||
|
||||
self.write_to_claims_db(db, height)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# The "global" instance to work with
|
||||
# pylint: disable=C0103
|
||||
trending_data = TrendingDB()
|
||||
|
||||
def spike_mass(x, x_old):
|
||||
"""
|
||||
Compute the mass of a trending spike (normed - constant units).
|
||||
x_old = old LBC value
|
||||
x = new LBC value
|
||||
"""
|
||||
|
||||
# Sign of trending spike
|
||||
|
@ -89,155 +373,16 @@ def spike_height(x, x_old):
|
|||
return sign*mag
|
||||
|
||||
|
||||
def run(db, height, final_height, recalculate_claim_hashes):
|
||||
if height < final_height - 5*HALF_LIFE:
|
||||
trending_log(f"Skipping trending calculations at block {height}.\n")
|
||||
return
|
||||
|
||||
def get_time_boost(height):
|
||||
"""
|
||||
Return the time boost at a given height.
|
||||
"""
|
||||
return 1.0/DECAY**(height % RENORM_INTERVAL)
|
||||
|
||||
|
||||
def trending_log(s):
|
||||
"""
|
||||
Log a string.
|
||||
"""
|
||||
if TRENDING_LOG:
|
||||
fout = open("trending_variable_decay.log", "a")
|
||||
fout.write(s)
|
||||
fout.flush()
|
||||
fout.close()
|
||||
|
||||
class TrendingData:
|
||||
"""
|
||||
An object of this class holds trending data
|
||||
"""
|
||||
def __init__(self):
|
||||
|
||||
# Dict from claim id to some trending info.
|
||||
# Units are TIME VARIABLE in here
|
||||
self.claims = {}
|
||||
|
||||
# Claims with >= WHALE_THRESHOLD LBC total amount
|
||||
self.whales = set([])
|
||||
|
||||
# Have all claims been read from db yet?
|
||||
self.initialised = False
|
||||
|
||||
# List of pending spikes.
|
||||
# Units are CONSTANT in here
|
||||
self.pending_spikes = []
|
||||
|
||||
def insert_claim_from_load(self, height, claim_hash, trending_score, total_amount):
|
||||
assert not self.initialised
|
||||
self.claims[claim_hash] = {"trending_score": trending_score,
|
||||
"total_amount": total_amount,
|
||||
"changed": False}
|
||||
|
||||
if trending_score >= WHALE_THRESHOLD*get_time_boost(height):
|
||||
self.add_whale(claim_hash)
|
||||
|
||||
def add_whale(self, claim_hash):
|
||||
self.whales.add(claim_hash)
|
||||
|
||||
def apply_spikes(self, height):
|
||||
"""
|
||||
Apply all pending spikes that are due at this height.
|
||||
Apply with time boost ON.
|
||||
"""
|
||||
time_boost = get_time_boost(height)
|
||||
|
||||
for spike in self.pending_spikes:
|
||||
if spike["height"] > height:
|
||||
# Ignore
|
||||
pass
|
||||
if spike["height"] == height:
|
||||
# Apply
|
||||
self.claims[spike["claim_hash"]]["trending_score"] += time_boost*spike["size"]
|
||||
self.claims[spike["claim_hash"]]["changed"] = True
|
||||
|
||||
if self.claims[spike["claim_hash"]]["trending_score"] >= WHALE_THRESHOLD*time_boost:
|
||||
self.add_whale(spike["claim_hash"])
|
||||
if spike["claim_hash"] in self.whales and \
|
||||
self.claims[spike["claim_hash"]]["trending_score"] < WHALE_THRESHOLD*time_boost:
|
||||
self.whales.remove(spike["claim_hash"])
|
||||
|
||||
|
||||
# Keep only future spikes
|
||||
self.pending_spikes = [s for s in self.pending_spikes \
|
||||
if s["height"] > height]
|
||||
|
||||
|
||||
|
||||
|
||||
def update_claim(self, height, claim_hash, total_amount):
|
||||
"""
|
||||
Update trending data for a claim, given its new total amount.
|
||||
"""
|
||||
assert self.initialised
|
||||
|
||||
# Extract existing total amount and trending score
|
||||
# or use starting values if the claim is new
|
||||
if claim_hash in self.claims:
|
||||
old_state = copy.deepcopy(self.claims[claim_hash])
|
||||
else:
|
||||
old_state = {"trending_score": 0.0,
|
||||
"total_amount": 0.0,
|
||||
"changed": False}
|
||||
|
||||
# Calculate LBC change
|
||||
change = total_amount - old_state["total_amount"]
|
||||
|
||||
# Modify data if there was an LBC change
|
||||
if change != 0.0:
|
||||
spike = spike_height(total_amount,
|
||||
old_state["total_amount"])
|
||||
delay = min(int((total_amount + 1E-8)**0.4), HALF_LIFE)
|
||||
|
||||
if change < 0.0:
|
||||
|
||||
# How big would the spike be for the inverse movement?
|
||||
reverse_spike = spike_height(old_state["total_amount"], total_amount)
|
||||
|
||||
# Remove that much spike from future pending ones
|
||||
for future_spike in self.pending_spikes:
|
||||
if future_spike["claim_hash"] == claim_hash:
|
||||
if reverse_spike >= future_spike["size"]:
|
||||
reverse_spike -= future_spike["size"]
|
||||
future_spike["size"] = 0.0
|
||||
elif reverse_spike > 0.0:
|
||||
future_spike["size"] -= reverse_spike
|
||||
reverse_spike = 0.0
|
||||
|
||||
delay = 0
|
||||
spike = -reverse_spike
|
||||
|
||||
self.pending_spikes.append({"height": height + delay,
|
||||
"claim_hash": claim_hash,
|
||||
"size": spike})
|
||||
|
||||
self.claims[claim_hash] = {"total_amount": total_amount,
|
||||
"trending_score": old_state["trending_score"],
|
||||
"changed": False}
|
||||
|
||||
def process_whales(self, height):
|
||||
"""
|
||||
Whale claims decay faster.
|
||||
"""
|
||||
if height % SAVE_INTERVAL != 0:
|
||||
return
|
||||
|
||||
for claim_hash in self.whales:
|
||||
trending_normed = self.claims[claim_hash]["trending_score"]/get_time_boost(height)
|
||||
|
||||
# Overall multiplication factor for decay rate
|
||||
decay_rate_factor = trending_normed/WHALE_THRESHOLD
|
||||
|
||||
# The -1 is because this is just the *extra* part being applied
|
||||
factor = (DECAY**SAVE_INTERVAL)**(decay_rate_factor - 1.0)
|
||||
# print(claim_hash, trending_normed, decay_rate_factor)
|
||||
self.claims[claim_hash]["trending_score"] *= factor
|
||||
self.claims[claim_hash]["changed"] = True
|
||||
|
||||
start = time.time()
|
||||
trending_log(f"Calculating variable_decay trending at block {height}.\n")
|
||||
trending_data.update(db, height, recalculate_claim_hashes)
|
||||
end = time.time()
|
||||
trending_log(f"Trending operations took {end - start} seconds.\n\n")
|
||||
|
||||
def test_trending():
|
||||
"""
|
||||
|
@ -260,12 +405,12 @@ def test_trending():
|
|||
COMMIT;
|
||||
""")
|
||||
|
||||
# Initialise trending data before anything happens with the claims
|
||||
trending_data.initialise(db)
|
||||
|
||||
# Insert initial states of claims
|
||||
everything = {"huge_whale": 0.01,
|
||||
"huge_whale_botted": 0.01,
|
||||
"medium_whale": 0.01,
|
||||
"small_whale": 0.01,
|
||||
"minnow": 0.01}
|
||||
everything = {"huge_whale": 0.01, "medium_whale": 0.01, "small_whale": 0.01,
|
||||
"huge_whale_botted": 0.01, "minnow": 0.01}
|
||||
|
||||
def to_list_of_tuples(stuff):
|
||||
l = []
|
||||
|
@ -277,13 +422,17 @@ def test_trending():
|
|||
INSERT INTO claim (claim_hash, amount) VALUES (?, 1E8*?);
|
||||
""", to_list_of_tuples(everything))
|
||||
|
||||
# Process block zero
|
||||
height = 0
|
||||
run(db, height, height, everything.keys())
|
||||
|
||||
# Save trajectories for plotting
|
||||
trajectories = {}
|
||||
for key in trending_data.claims:
|
||||
trajectories[key] = [trending_data.claims[key]["trending_score"]]
|
||||
for row in trending_data.execute("""
|
||||
SELECT claim_hash, trending_score
|
||||
FROM claims;
|
||||
"""):
|
||||
trajectories[row[0]] = [row[1]/trending_unit(height)]
|
||||
|
||||
# Main loop
|
||||
for height in range(1, 1000):
|
||||
|
@ -312,119 +461,24 @@ def test_trending():
|
|||
# Call run()
|
||||
run(db, height, height, everything.keys())
|
||||
|
||||
for key in trending_data.claims:
|
||||
trajectories[key].append(trending_data.claims[key]["trending_score"]\
|
||||
/get_time_boost(height))
|
||||
# Append current trending scores to trajectories
|
||||
for row in db.execute("""
|
||||
SELECT claim_hash, trending_mixed
|
||||
FROM claim;
|
||||
"""):
|
||||
trajectories[row[0]].append(row[1]/trending_unit(height))
|
||||
|
||||
dbc.close()
|
||||
|
||||
# pylint: disable=C0415
|
||||
import matplotlib.pyplot as plt
|
||||
for key in trending_data.claims:
|
||||
for key in trajectories:
|
||||
plt.plot(trajectories[key], label=key)
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
|
||||
# One global instance
|
||||
# pylint: disable=C0103
|
||||
trending_data = TrendingData()
|
||||
|
||||
def run(db, height, final_height, recalculate_claim_hashes):
|
||||
|
||||
if height < final_height - 5*HALF_LIFE:
|
||||
trending_log("Skipping variable_decay trending at block {h}.\n".format(h=height))
|
||||
return
|
||||
|
||||
start = time.time()
|
||||
|
||||
trending_log("Calculating variable_decay trending at block {h}.\n".format(h=height))
|
||||
trending_log(" Length of trending data = {l}.\n"\
|
||||
.format(l=len(trending_data.claims)))
|
||||
|
||||
# Renormalise trending scores and mark all as having changed
|
||||
if height % RENORM_INTERVAL == 0:
|
||||
trending_log(" Renormalising trending scores...")
|
||||
|
||||
keys = trending_data.claims.keys()
|
||||
trending_data.whales = set([])
|
||||
for key in keys:
|
||||
if trending_data.claims[key]["trending_score"] != 0.0:
|
||||
trending_data.claims[key]["trending_score"] *= DECAY_PER_RENORM
|
||||
trending_data.claims[key]["changed"] = True
|
||||
|
||||
# Tiny becomes zero
|
||||
if abs(trending_data.claims[key]["trending_score"]) < 1E-3:
|
||||
trending_data.claims[key]["trending_score"] = 0.0
|
||||
|
||||
# Re-mark whales
|
||||
if trending_data.claims[key]["trending_score"] >= WHALE_THRESHOLD*get_time_boost(height):
|
||||
trending_data.add_whale(key)
|
||||
|
||||
trending_log("done.\n")
|
||||
|
||||
|
||||
# Regular message.
|
||||
trending_log(" Reading total_amounts from db and updating"\
|
||||
+ " trending scores in RAM...")
|
||||
|
||||
# Update claims from db
|
||||
if not trending_data.initialised:
|
||||
|
||||
trending_log("initial load...")
|
||||
# On fresh launch
|
||||
for row in db.execute("""
|
||||
SELECT claim_hash, trending_mixed,
|
||||
(amount + support_amount)
|
||||
AS total_amount
|
||||
FROM claim;
|
||||
"""):
|
||||
trending_data.insert_claim_from_load(height, row[0], row[1], 1E-8*row[2])
|
||||
trending_data.initialised = True
|
||||
else:
|
||||
for row in db.execute(f"""
|
||||
SELECT claim_hash,
|
||||
(amount + support_amount)
|
||||
AS total_amount
|
||||
FROM claim
|
||||
WHERE claim_hash IN
|
||||
({','.join('?' for _ in recalculate_claim_hashes)});
|
||||
""", recalculate_claim_hashes):
|
||||
trending_data.update_claim(height, row[0], 1E-8*row[1])
|
||||
|
||||
# Apply pending spikes
|
||||
trending_data.apply_spikes(height)
|
||||
|
||||
trending_log("done.\n")
|
||||
|
||||
|
||||
# Write trending scores to DB
|
||||
if height % SAVE_INTERVAL == 0:
|
||||
|
||||
trending_log(" Finding and processing whales...")
|
||||
trending_log(str(len(trending_data.whales)) + " whales found...")
|
||||
trending_data.process_whales(height)
|
||||
trending_log("done.\n")
|
||||
|
||||
trending_log(" Writing trending scores to db...")
|
||||
|
||||
the_list = []
|
||||
keys = trending_data.claims.keys()
|
||||
|
||||
for key in keys:
|
||||
if trending_data.claims[key]["changed"]:
|
||||
the_list.append((trending_data.claims[key]["trending_score"], key))
|
||||
trending_data.claims[key]["changed"] = False
|
||||
|
||||
trending_log("{n} scores to write...".format(n=len(the_list)))
|
||||
|
||||
db.executemany("UPDATE claim SET trending_mixed=? WHERE claim_hash=?;",
|
||||
the_list)
|
||||
|
||||
trending_log("done.\n")
|
||||
|
||||
trending_log("Trending operations took {time} seconds.\n\n"\
|
||||
.format(time=time.time() - start))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Add table
Reference in a new issue