This commit is contained in:
Brendon J. Brewer 2021-08-16 09:52:40 +12:00 committed by Victor Shyba
parent 6cba95c148
commit 0c7be8975f
6 changed files with 316 additions and 878 deletions

View file

@ -30,6 +30,7 @@ from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivat
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue
from lbry.wallet.server.db.trending import TrendingDB
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -263,6 +264,8 @@ class BlockProcessor:
self.claim_channels: Dict[bytes, bytes] = {} self.claim_channels: Dict[bytes, bytes] = {}
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list) self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
self.trending_db = TrendingDB(env.db_dir)
async def claim_producer(self): async def claim_producer(self):
if self.db.db_height <= 1: if self.db.db_height <= 1:
return return
@ -310,6 +313,7 @@ class BlockProcessor:
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread(self.advance_block, block) await self.run_in_thread(self.advance_block, block)
await self.flush() await self.flush()
self.trending_db.process_block(self.height, self.daemon.cached_height())
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
if self.height == self.coin.nExtendedClaimExpirationForkHeight: if self.height == self.coin.nExtendedClaimExpirationForkHeight:
self.logger.warning( self.logger.warning(
@ -514,6 +518,9 @@ class BlockProcessor:
self.txo_to_claim[(tx_num, nout)] = pending self.txo_to_claim[(tx_num, nout)] = pending
self.claim_hash_to_txo[claim_hash] = (tx_num, nout) self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops()) self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops())
self.trending_db.add_event({"claim_hash": claim_hash,
"event": "upsert",
"lbc": 1E-8*txo.amount})
def _add_support(self, txo: 'Output', tx_num: int, nout: int): def _add_support(self, txo: 'Output', tx_num: int, nout: int):
supported_claim_hash = txo.claim_hash[::-1] supported_claim_hash = txo.claim_hash[::-1]
@ -523,6 +530,9 @@ class BlockProcessor:
self.db_op_stack.extend_ops(StagedClaimtrieSupport( self.db_op_stack.extend_ops(StagedClaimtrieSupport(
supported_claim_hash, tx_num, nout, txo.amount supported_claim_hash, tx_num, nout, txo.amount
).get_add_support_utxo_ops()) ).get_add_support_utxo_ops())
self.trending_db.add_event({"claim_hash": supported_claim_hash,
"event": "support",
"lbc": 1E-8*txo.amount})
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output', def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output',
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]): spent_claims: typing.Dict[bytes, Tuple[int, int, str]]):
@ -542,6 +552,10 @@ class BlockProcessor:
self.db_op_stack.extend_ops(StagedClaimtrieSupport( self.db_op_stack.extend_ops(StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops()) ).get_spend_support_txo_ops())
self.trending_db.add_event({"claim_hash": spent_support,
"event": "support",
"lbc": -1E-8*support_amount})
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
if spent_support: if spent_support:
supported_name = self._get_pending_claim_name(spent_support) supported_name = self._get_pending_claim_name(spent_support)
@ -619,6 +633,9 @@ class BlockProcessor:
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
self._invalidate_channel_signatures(claim_hash) self._invalidate_channel_signatures(claim_hash)
self.trending_db.add_event({"claim_hash": claim_hash,
"event": "delete"})
def _invalidate_channel_signatures(self, claim_hash: bytes): def _invalidate_channel_signatures(self, claim_hash: bytes):
for k, signed_claim_hash in self.db.db.iterator( for k, signed_claim_hash in self.db.db.iterator(
prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)): prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)):

View file

@ -0,0 +1,299 @@
import math
import os
import sqlite3
import time
HALF_LIFE = 400
RENORM_INTERVAL = 1000
WHALE_THRESHOLD = 10000.0
def whale_decay_factor(lbc):
"""
An additional decay factor applied to whale claims.
"""
if lbc <= WHALE_THRESHOLD:
return 1.0
adjusted_half_life = HALF_LIFE/(math.log10(lbc/WHALE_THRESHOLD) + 1.0)
return 2.0**(1.0/HALF_LIFE - 1.0/adjusted_half_life)
def soften(lbc):
mag = abs(lbc) + 1E-8
sign = 1.0 if lbc >= 0.0 else -1.0
return sign*mag**0.25
def delay(lbc: int):
if lbc <= 0:
return 0
elif lbc < 1000000:
return int(lbc**0.5)
else:
return 1000
def inflate_units(height):
blocks = height % RENORM_INTERVAL
return 2.0 ** (blocks/HALF_LIFE)
PRAGMAS = ["PRAGMA FOREIGN_KEYS = OFF;",
"PRAGMA JOURNAL_MODE = WAL;",
"PRAGMA SYNCHRONOUS = 0;"]
class TrendingDB:
def __init__(self, data_dir):
"""
Opens the trending database in the directory data_dir.
For testing, pass data_dir=":memory:"
"""
if data_dir == ":memory:":
path = ":memory:"
else:
path = os.path.join(data_dir, "trending.db")
self.db = sqlite3.connect(path, check_same_thread=False)
for pragma in PRAGMAS:
self.execute(pragma)
self.execute("BEGIN;")
self._create_tables()
self._create_indices()
self.execute("COMMIT;")
self.pending_events = []
def execute(self, *args, **kwargs):
return self.db.execute(*args, **kwargs)
def add_event(self, event):
self.pending_events.append(event)
# print(f"Added event: {event}.", flush=True)
def _create_tables(self):
self.execute("""CREATE TABLE IF NOT EXISTS claims
(claim_hash BYTES NOT NULL PRIMARY KEY,
bid_lbc REAL NOT NULL,
support_lbc REAL NOT NULL,
trending_score REAL NOT NULL,
needs_write BOOLEAN NOT NULL)
WITHOUT ROWID;""")
self.execute("""CREATE TABLE IF NOT EXISTS spikes
(claim_hash BYTES NOT NULL REFERENCES claims (claim_hash),
activation_height INTEGER NOT NULL,
mass REAL NOT NULL);""")
def _create_indices(self):
self.execute("CREATE INDEX IF NOT EXISTS idx1 ON spikes\
(activation_height, claim_hash, mass);")
self.execute("CREATE INDEX IF NOT EXISTS idx2 ON spikes\
(claim_hash);")
self.execute("CREATE INDEX IF NOT EXISTS idx3 ON claims (trending_score);")
self.execute("CREATE INDEX IF NOT EXISTS idx4 ON claims (needs_write, claim_hash);")
self.execute("CREATE INDEX IF NOT EXISTS idx5 ON claims (bid_lbc + support_lbc);")
def get_trending_score(self, claim_hash):
result = self.execute("SELECT trending_score FROM claims\
WHERE claim_hash = ?;", (claim_hash, ))\
.fetchall()
if len(result) == 0:
return 0.0
else:
return result[0]
def _upsert_claim(self, height, event):
claim_hash = event["claim_hash"]
# Get old total lbc value of claim
old_lbc_pair = self.execute("SELECT bid_lbc, support_lbc FROM claims\
WHERE claim_hash = ?;",
(claim_hash, )).fetchone()
if old_lbc_pair is None:
old_lbc_pair = (0.0, 0.0)
if event["event"] == "upsert":
new_lbc_pair = (event["lbc"], old_lbc_pair[1])
elif event["event"] == "support":
new_lbc_pair = (old_lbc_pair[0], old_lbc_pair[1] + event["lbc"])
# Upsert the claim
self.execute("INSERT INTO claims VALUES (?, ?, ?, ?, 1)\
ON CONFLICT (claim_hash) DO UPDATE\
SET bid_lbc = excluded.bid_lbc,\
support_lbc = excluded.support_lbc;",
(claim_hash, new_lbc_pair[0], new_lbc_pair[1], 0.0))
if self.active:
old_lbc, lbc = sum(old_lbc_pair), sum(new_lbc_pair)
# Add the spike
softened_change = soften(lbc - old_lbc)
change_in_softened = soften(lbc) - soften(old_lbc)
spike_mass = (softened_change**0.25*change_in_softened**0.75).real
activation_height = height + delay(lbc)
if spike_mass != 0.0:
self.execute("INSERT INTO spikes VALUES (?, ?, ?);",
(claim_hash, activation_height, spike_mass))
def _delete_claim(self, claim_hash):
self.execute("DELETE FROM spikes WHERE claim_hash = ?;", (claim_hash, ))
self.execute("DELETE FROM claims WHERE claim_hash = ?;", (claim_hash, ))
def _apply_spikes(self, height):
spikes = self.execute("SELECT claim_hash, mass FROM spikes\
WHERE activation_height = ?;",
(height, )).fetchall()
for claim_hash, mass in spikes: # TODO: executemany for efficiency
self.execute("UPDATE claims SET trending_score = trending_score + ?,\
needs_write = 1\
WHERE claim_hash = ?;",
(mass, claim_hash))
self.execute("DELETE FROM spikes WHERE activation_height = ?;",
(height, ))
def _decay_whales(self):
whales = self.execute("SELECT claim_hash, bid_lbc + support_lbc FROM claims\
WHERE bid_lbc + support_lbc >= ?;", (WHALE_THRESHOLD, ))\
.fetchall()
for claim_hash, lbc in whales:
factor = whale_decay_factor(lbc)
self.execute("UPDATE claims SET trending_score = trending_score*?, needs_write = 1\
WHERE claim_hash = ?;", (factor, claim_hash))
def _renorm(self):
factor = 2.0**(-RENORM_INTERVAL/HALF_LIFE)
# Zero small values
self.execute("UPDATE claims SET trending_score = 0.0, needs_write = 1\
WHERE trending_score <> 0.0 AND ABS(?*trending_score) < 1E-6;",
(factor, ))
# Normalise other values
self.execute("UPDATE claims SET trending_score = ?*trending_score, needs_write = 1\
WHERE trending_score <> 0.0;", (factor, ))
def process_block(self, height, daemon_height):
self.active = daemon_height - height <= 10*HALF_LIFE
self.execute("BEGIN;")
if self.active:
# Check for a unit change
if height % RENORM_INTERVAL == 0:
self._renorm()
# Apply extra whale decay
self._decay_whales()
# Upsert claims
for event in self.pending_events:
if event["event"] == "upsert":
self._upsert_claim(height, event)
# Process supports
for event in self.pending_events:
if event["event"] == "support":
self._upsert_claim(height, event)
# Delete claims
for event in self.pending_events:
if event["event"] == "delete":
self._delete_claim(event["claim_hash"])
if self.active:
# Apply spikes
self._apply_spikes(height)
# Get set of claims that need writing to ES
claims_to_write = set()
for row in self.db.execute("SELECT claim_hash FROM claims WHERE\
needs_write = 1;"):
claims_to_write.add(row[0])
self.db.execute("UPDATE claims SET needs_write = 0\
WHERE needs_write = 1;")
self.execute("COMMIT;")
self.pending_events.clear()
return claims_to_write
if __name__ == "__main__":
import matplotlib.pyplot as plt
import numpy as np
import numpy.random as rng
import os
trending_db = TrendingDB(":memory:")
heights = list(range(1, 1000))
heights = heights + heights[::-1] + heights
events = [{"height": 45,
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
{"height": 100,
"what": dict(claim_hash="a", event="support", lbc=3.0)},
{"height": 150,
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
{"height": 170,
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
{"height": 730,
"what": dict(claim_hash="a", event="delete")}]
inverse_events = [{"height": 730,
"what": dict(claim_hash="a", event="upsert", lbc=100000.0)},
{"height": 170,
"what": dict(claim_hash="a", event="upsert", lbc=1.0)},
{"height": 150,
"what": dict(claim_hash="a", event="support", lbc=3.0)},
{"height": 100,
"what": dict(claim_hash="a", event="support", lbc=-3.0)},
{"height": 45,
"what": dict(claim_hash="a", event="delete")}]
xs, ys = [], []
last_height = 0
for height in heights:
# Prepare the changes
if height > last_height:
es = events
else:
es = inverse_events
for event in es:
if event["height"] == height:
trending_db.add_event(event["what"])
# Process the block
trending_db.process_block(height, height)
if height > last_height: # Only plot when moving forward
xs.append(height)
y = trending_db.execute("SELECT trending_score FROM claims;").fetchone()
y = 0.0 if y is None else y[0]
ys.append(y/inflate_units(height))
last_height = height
xs = np.array(xs)
ys = np.array(ys)
plt.figure(1)
plt.plot(xs, ys, "o-", alpha=0.2)
plt.figure(2)
plt.plot(xs)
plt.show()

View file

@ -1,9 +0,0 @@
from . import zscore
from . import ar
from . import variable_decay
TRENDING_ALGORITHMS = {
'zscore': zscore,
'ar': ar,
'variable_decay': variable_decay
}

View file

@ -1,265 +0,0 @@
import copy
import math
import time
# Half life in blocks
HALF_LIFE = 134
# Decay coefficient per block
DECAY = 0.5**(1.0/HALF_LIFE)
# How frequently to write trending values to the db
SAVE_INTERVAL = 10
# Renormalisation interval
RENORM_INTERVAL = 1000
# Assertion
assert RENORM_INTERVAL % SAVE_INTERVAL == 0
# Decay coefficient per renormalisation interval
DECAY_PER_RENORM = DECAY**(RENORM_INTERVAL)
# Log trending calculations?
TRENDING_LOG = True
def install(connection):
"""
Install the AR trending algorithm.
"""
check_trending_values(connection)
if TRENDING_LOG:
f = open("trending_ar.log", "a")
f.close()
# Stub
CREATE_TREND_TABLE = ""
def check_trending_values(connection):
"""
If the trending values appear to be based on the zscore algorithm,
reset them. This will allow resyncing from a standard snapshot.
"""
c = connection.cursor()
needs_reset = False
for row in c.execute("SELECT COUNT(*) num FROM claim WHERE trending_global <> 0;"):
if row[0] != 0:
needs_reset = True
break
if needs_reset:
print("Resetting some columns. This might take a while...", flush=True, end="")
c.execute(""" BEGIN;
UPDATE claim SET trending_group = 0;
UPDATE claim SET trending_mixed = 0;
UPDATE claim SET trending_global = 0;
UPDATE claim SET trending_local = 0;
COMMIT;""")
print("done.")
def spike_height(trending_score, x, x_old, time_boost=1.0):
"""
Compute the size of a trending spike.
"""
# Change in softened amount
change_in_softened_amount = x**0.25 - x_old**0.25
# Softened change in amount
delta = x - x_old
softened_change_in_amount = abs(delta)**0.25
# Softened change in amount counts more for minnows
if delta > 0.0:
if trending_score >= 0.0:
multiplier = 0.1/((trending_score/time_boost + softened_change_in_amount) + 1.0)
softened_change_in_amount *= multiplier
else:
softened_change_in_amount *= -1.0
return time_boost*(softened_change_in_amount + change_in_softened_amount)
def get_time_boost(height):
"""
Return the time boost at a given height.
"""
return 1.0/DECAY**(height % RENORM_INTERVAL)
def trending_log(s):
"""
Log a string.
"""
if TRENDING_LOG:
fout = open("trending_ar.log", "a")
fout.write(s)
fout.flush()
fout.close()
class TrendingData:
"""
An object of this class holds trending data
"""
def __init__(self):
self.claims = {}
# Have all claims been read from db yet?
self.initialised = False
def insert_claim_from_load(self, claim_hash, trending_score, total_amount):
assert not self.initialised
self.claims[claim_hash] = {"trending_score": trending_score,
"total_amount": total_amount,
"changed": False}
def update_claim(self, claim_hash, total_amount, time_boost=1.0):
"""
Update trending data for a claim, given its new total amount.
"""
assert self.initialised
# Extract existing total amount and trending score
# or use starting values if the claim is new
if claim_hash in self.claims:
old_state = copy.deepcopy(self.claims[claim_hash])
else:
old_state = {"trending_score": 0.0,
"total_amount": 0.0,
"changed": False}
# Calculate LBC change
change = total_amount - old_state["total_amount"]
# Modify data if there was an LBC change
if change != 0.0:
spike = spike_height(old_state["trending_score"],
total_amount,
old_state["total_amount"],
time_boost)
trending_score = old_state["trending_score"] + spike
self.claims[claim_hash] = {"total_amount": total_amount,
"trending_score": trending_score,
"changed": True}
def test_trending():
"""
Quick trending test for something receiving 10 LBC per block
"""
data = TrendingData()
data.insert_claim_from_load("abc", 10.0, 1.0)
data.initialised = True
for height in range(1, 5000):
if height % RENORM_INTERVAL == 0:
data.claims["abc"]["trending_score"] *= DECAY_PER_RENORM
time_boost = get_time_boost(height)
data.update_claim("abc", data.claims["abc"]["total_amount"] + 10.0,
time_boost=time_boost)
print(str(height) + " " + str(time_boost) + " " \
+ str(data.claims["abc"]["trending_score"]))
# One global instance
# pylint: disable=C0103
trending_data = TrendingData()
def run(db, height, final_height, recalculate_claim_hashes):
if height < final_height - 5*HALF_LIFE:
trending_log("Skipping AR trending at block {h}.\n".format(h=height))
return
start = time.time()
trending_log("Calculating AR trending at block {h}.\n".format(h=height))
trending_log(" Length of trending data = {l}.\n"\
.format(l=len(trending_data.claims)))
# Renormalise trending scores and mark all as having changed
if height % RENORM_INTERVAL == 0:
trending_log(" Renormalising trending scores...")
keys = trending_data.claims.keys()
for key in keys:
if trending_data.claims[key]["trending_score"] != 0.0:
trending_data.claims[key]["trending_score"] *= DECAY_PER_RENORM
trending_data.claims[key]["changed"] = True
# Tiny becomes zero
if abs(trending_data.claims[key]["trending_score"]) < 1E-9:
trending_data.claims[key]["trending_score"] = 0.0
trending_log("done.\n")
# Regular message.
trending_log(" Reading total_amounts from db and updating"\
+ " trending scores in RAM...")
# Get the value of the time boost
time_boost = get_time_boost(height)
# Update claims from db
if not trending_data.initialised:
# On fresh launch
for row in db.execute("""
SELECT claim_hash, trending_mixed,
(amount + support_amount)
AS total_amount
FROM claim;
"""):
trending_data.insert_claim_from_load(row[0], row[1], 1E-8*row[2])
trending_data.initialised = True
else:
for row in db.execute(f"""
SELECT claim_hash,
(amount + support_amount)
AS total_amount
FROM claim
WHERE claim_hash IN
({','.join('?' for _ in recalculate_claim_hashes)});
""", list(recalculate_claim_hashes)):
trending_data.update_claim(row[0], 1E-8*row[1], time_boost)
trending_log("done.\n")
# Write trending scores to DB
if height % SAVE_INTERVAL == 0:
trending_log(" Writing trending scores to db...")
the_list = []
keys = trending_data.claims.keys()
for key in keys:
if trending_data.claims[key]["changed"]:
the_list.append((trending_data.claims[key]["trending_score"],
key))
trending_data.claims[key]["changed"] = False
trending_log("{n} scores to write...".format(n=len(the_list)))
db.executemany("UPDATE claim SET trending_mixed=? WHERE claim_hash=?;",
the_list)
trending_log("done.\n")
trending_log("Trending operations took {time} seconds.\n\n"\
.format(time=time.time() - start))
if __name__ == "__main__":
test_trending()

View file

@ -1,485 +0,0 @@
"""
AR-like trending with a delayed effect and a faster
decay rate for high valued claims.
"""
import math
import time
import sqlite3
# Half life in blocks *for lower LBC claims* (it's shorter for whale claims)
HALF_LIFE = 200
# Whale threshold, in LBC (higher -> less DB writing)
WHALE_THRESHOLD = 10000.0
# Decay coefficient per block
DECAY = 0.5**(1.0/HALF_LIFE)
# How frequently to write trending values to the db
SAVE_INTERVAL = 10
# Renormalisation interval
RENORM_INTERVAL = 1000
# Assertion
assert RENORM_INTERVAL % SAVE_INTERVAL == 0
# Decay coefficient per renormalisation interval
DECAY_PER_RENORM = DECAY**(RENORM_INTERVAL)
# Log trending calculations?
TRENDING_LOG = True
def install(connection):
"""
Install the trending algorithm.
"""
check_trending_values(connection)
trending_data.initialise(connection.cursor())
if TRENDING_LOG:
f = open("trending_variable_decay.log", "a")
f.close()
# Stub
CREATE_TREND_TABLE = ""
def check_trending_values(connection):
"""
If the trending values appear to be based on the zscore algorithm,
reset them. This will allow resyncing from a standard snapshot.
"""
c = connection.cursor()
needs_reset = False
for row in c.execute("SELECT COUNT(*) num FROM claim WHERE trending_global <> 0;"):
if row[0] != 0:
needs_reset = True
break
if needs_reset:
print("Resetting some columns. This might take a while...", flush=True,
end="")
c.execute(""" BEGIN;
UPDATE claim SET trending_group = 0;
UPDATE claim SET trending_mixed = 0;
COMMIT;""")
print("done.")
def trending_log(s):
"""
Log a string to the log file
"""
if TRENDING_LOG:
fout = open("trending_variable_decay.log", "a")
fout.write(s)
fout.flush()
fout.close()
def trending_unit(height):
"""
Return the trending score unit at a given height.
"""
# Round to the beginning of a SAVE_INTERVAL batch of blocks.
_height = height - (height % SAVE_INTERVAL)
return 1.0/DECAY**(height % RENORM_INTERVAL)
class TrendingDB:
"""
An in-memory database of trending scores
"""
def __init__(self):
self.conn = sqlite3.connect(":memory:", check_same_thread=False)
self.cursor = self.conn.cursor()
self.initialised = False
self.write_needed = set()
def execute(self, query, *args, **kwargs):
return self.conn.execute(query, *args, **kwargs)
def executemany(self, query, *args, **kwargs):
return self.conn.executemany(query, *args, **kwargs)
def begin(self):
self.execute("BEGIN;")
def commit(self):
self.execute("COMMIT;")
def initialise(self, db):
"""
Pass in claims.db
"""
if self.initialised:
return
trending_log("Initialising trending database...")
# The need for speed
self.execute("PRAGMA JOURNAL_MODE=OFF;")
self.execute("PRAGMA SYNCHRONOUS=0;")
self.begin()
# Create the tables
self.execute("""
CREATE TABLE IF NOT EXISTS claims
(claim_hash BYTES PRIMARY KEY,
lbc REAL NOT NULL DEFAULT 0.0,
trending_score REAL NOT NULL DEFAULT 0.0)
WITHOUT ROWID;""")
self.execute("""
CREATE TABLE IF NOT EXISTS spikes
(id INTEGER PRIMARY KEY,
claim_hash BYTES NOT NULL,
height INTEGER NOT NULL,
mass REAL NOT NULL,
FOREIGN KEY (claim_hash)
REFERENCES claims (claim_hash));""")
# Clear out any existing data
self.execute("DELETE FROM claims;")
self.execute("DELETE FROM spikes;")
# Create indexes
self.execute("CREATE INDEX idx1 ON spikes (claim_hash, height, mass);")
self.execute("CREATE INDEX idx2 ON spikes (claim_hash, height, mass DESC);")
self.execute("CREATE INDEX idx3 on claims (lbc DESC, claim_hash, trending_score);")
# Import data from claims.db
for row in db.execute("""
SELECT claim_hash,
1E-8*(amount + support_amount) AS lbc,
trending_mixed
FROM claim;
"""):
self.execute("INSERT INTO claims VALUES (?, ?, ?);", row)
self.commit()
self.initialised = True
trending_log("done.\n")
def apply_spikes(self, height):
"""
Apply spikes that are due. This occurs inside a transaction.
"""
spikes = []
unit = trending_unit(height)
for row in self.execute("""
SELECT SUM(mass), claim_hash FROM spikes
WHERE height = ?
GROUP BY claim_hash;
""", (height, )):
spikes.append((row[0]*unit, row[1]))
self.write_needed.add(row[1])
self.executemany("""
UPDATE claims
SET trending_score = (trending_score + ?)
WHERE claim_hash = ?;
""", spikes)
self.execute("DELETE FROM spikes WHERE height = ?;", (height, ))
def decay_whales(self, height):
"""
Occurs inside transaction.
"""
if height % SAVE_INTERVAL != 0:
return
whales = self.execute("""
SELECT trending_score, lbc, claim_hash
FROM claims
WHERE lbc >= ?;
""", (WHALE_THRESHOLD, )).fetchall()
whales2 = []
for whale in whales:
trending, lbc, claim_hash = whale
# Overall multiplication factor for decay rate
# At WHALE_THRESHOLD, this is 1
# At 10*WHALE_THRESHOLD, it is 3
decay_rate_factor = 1.0 + 2.0*math.log10(lbc/WHALE_THRESHOLD)
# The -1 is because this is just the *extra* part being applied
factor = (DECAY**SAVE_INTERVAL)**(decay_rate_factor - 1.0)
# Decay
trending *= factor
whales2.append((trending, claim_hash))
self.write_needed.add(claim_hash)
self.executemany("UPDATE claims SET trending_score=? WHERE claim_hash=?;",
whales2)
def renorm(self, height):
"""
Renormalise trending scores. Occurs inside a transaction.
"""
if height % RENORM_INTERVAL == 0:
threshold = 1.0E-3/DECAY_PER_RENORM
for row in self.execute("""SELECT claim_hash FROM claims
WHERE ABS(trending_score) >= ?;""",
(threshold, )):
self.write_needed.add(row[0])
self.execute("""UPDATE claims SET trending_score = ?*trending_score
WHERE ABS(trending_score) >= ?;""",
(DECAY_PER_RENORM, threshold))
def write_to_claims_db(self, db, height):
"""
Write changed trending scores to claims.db.
"""
if height % SAVE_INTERVAL != 0:
return
rows = self.execute(f"""
SELECT trending_score, claim_hash
FROM claims
WHERE claim_hash IN
({','.join('?' for _ in self.write_needed)});
""", list(self.write_needed)).fetchall()
db.executemany("""UPDATE claim SET trending_mixed = ?
WHERE claim_hash = ?;""", rows)
# Clear list of claims needing to be written to claims.db
self.write_needed = set()
def update(self, db, height, recalculate_claim_hashes):
"""
Update trending scores.
Input is a cursor to claims.db, the block height, and the list of
claims that changed.
"""
assert self.initialised
self.begin()
self.renorm(height)
# Fetch changed/new claims from claims.db
for row in db.execute(f"""
SELECT claim_hash,
1E-8*(amount + support_amount) AS lbc
FROM claim
WHERE claim_hash IN
({','.join('?' for _ in recalculate_claim_hashes)});
""", list(recalculate_claim_hashes)):
claim_hash, lbc = row
# Insert into trending db if it does not exist
self.execute("""
INSERT INTO claims (claim_hash)
VALUES (?)
ON CONFLICT (claim_hash) DO NOTHING;""",
(claim_hash, ))
# See if it was an LBC change
old = self.execute("SELECT * FROM claims WHERE claim_hash=?;",
(claim_hash, )).fetchone()
lbc_old = old[1]
# Save new LBC value into trending db
self.execute("UPDATE claims SET lbc = ? WHERE claim_hash = ?;",
(lbc, claim_hash))
if lbc > lbc_old:
# Schedule a future spike
delay = min(int((lbc + 1E-8)**0.4), HALF_LIFE)
spike = (claim_hash, height + delay, spike_mass(lbc, lbc_old))
self.execute("""INSERT INTO spikes
(claim_hash, height, mass)
VALUES (?, ?, ?);""", spike)
elif lbc < lbc_old:
# Subtract from future spikes
penalty = spike_mass(lbc_old, lbc)
spikes = self.execute("""
SELECT * FROM spikes
WHERE claim_hash = ?
ORDER BY height ASC, mass DESC;
""", (claim_hash, )).fetchall()
for spike in spikes:
spike_id, mass = spike[0], spike[3]
if mass > penalty:
# The entire penalty merely reduces this spike
self.execute("UPDATE spikes SET mass=? WHERE id=?;",
(mass - penalty, spike_id))
penalty = 0.0
else:
# Removing this spike entirely accounts for some (or
# all) of the penalty, then move on to other spikes
self.execute("DELETE FROM spikes WHERE id=?;",
(spike_id, ))
penalty -= mass
# If penalty remains, that's a negative spike to be applied
# immediately.
if penalty > 0.0:
self.execute("""
INSERT INTO spikes (claim_hash, height, mass)
VALUES (?, ?, ?);""",
(claim_hash, height, -penalty))
self.apply_spikes(height)
self.decay_whales(height)
self.commit()
self.write_to_claims_db(db, height)
# The "global" instance to work with
# pylint: disable=C0103
trending_data = TrendingDB()
def spike_mass(x, x_old):
"""
Compute the mass of a trending spike (normed - constant units).
x_old = old LBC value
x = new LBC value
"""
# Sign of trending spike
sign = 1.0
if x < x_old:
sign = -1.0
# Magnitude
mag = abs(x**0.25 - x_old**0.25)
# Minnow boost
mag *= 1.0 + 2E4/(x + 100.0)**2
return sign*mag
def run(db, height, final_height, recalculate_claim_hashes):
if height < final_height - 5*HALF_LIFE:
trending_log(f"Skipping trending calculations at block {height}.\n")
return
start = time.time()
trending_log(f"Calculating variable_decay trending at block {height}.\n")
trending_data.update(db, height, recalculate_claim_hashes)
end = time.time()
trending_log(f"Trending operations took {end - start} seconds.\n\n")
def test_trending():
"""
Quick trending test for claims with different support patterns.
Actually use the run() function.
"""
# Create a fake "claims.db" for testing
# pylint: disable=I1101
dbc = apsw.Connection(":memory:")
db = dbc.cursor()
# Create table
db.execute("""
BEGIN;
CREATE TABLE claim (claim_hash TEXT PRIMARY KEY,
amount REAL NOT NULL DEFAULT 0.0,
support_amount REAL NOT NULL DEFAULT 0.0,
trending_mixed REAL NOT NULL DEFAULT 0.0);
COMMIT;
""")
# Initialise trending data before anything happens with the claims
trending_data.initialise(db)
# Insert initial states of claims
everything = {"huge_whale": 0.01, "medium_whale": 0.01, "small_whale": 0.01,
"huge_whale_botted": 0.01, "minnow": 0.01}
def to_list_of_tuples(stuff):
l = []
for key in stuff:
l.append((key, stuff[key]))
return l
db.executemany("""
INSERT INTO claim (claim_hash, amount) VALUES (?, 1E8*?);
""", to_list_of_tuples(everything))
# Process block zero
height = 0
run(db, height, height, everything.keys())
# Save trajectories for plotting
trajectories = {}
for row in trending_data.execute("""
SELECT claim_hash, trending_score
FROM claims;
"""):
trajectories[row[0]] = [row[1]/trending_unit(height)]
# Main loop
for height in range(1, 1000):
# One-off supports
if height == 1:
everything["huge_whale"] += 5E5
everything["medium_whale"] += 5E4
everything["small_whale"] += 5E3
# Every block
if height < 500:
everything["huge_whale_botted"] += 5E5/500
everything["minnow"] += 1
# Remove supports
if height == 500:
for key in everything:
everything[key] = 0.01
# Whack into the db
db.executemany("""
UPDATE claim SET amount = 1E8*? WHERE claim_hash = ?;
""", [(y, x) for (x, y) in to_list_of_tuples(everything)])
# Call run()
run(db, height, height, everything.keys())
# Append current trending scores to trajectories
for row in db.execute("""
SELECT claim_hash, trending_mixed
FROM claim;
"""):
trajectories[row[0]].append(row[1]/trending_unit(height))
dbc.close()
# pylint: disable=C0415
import matplotlib.pyplot as plt
for key in trajectories:
plt.plot(trajectories[key], label=key)
plt.legend()
plt.show()
if __name__ == "__main__":
test_trending()

View file

@ -1,119 +0,0 @@
from math import sqrt
# TRENDING_WINDOW is the number of blocks in ~6hr period (21600 seconds / 161 seconds per block)
TRENDING_WINDOW = 134
# TRENDING_DATA_POINTS says how many samples to use for the trending algorithm
# i.e. only consider claims from the most recent (TRENDING_WINDOW * TRENDING_DATA_POINTS) blocks
TRENDING_DATA_POINTS = 28
CREATE_TREND_TABLE = """
create table if not exists trend (
claim_hash bytes not null,
height integer not null,
amount integer not null,
primary key (claim_hash, height)
) without rowid;
"""
class ZScore:
__slots__ = 'count', 'total', 'power', 'last'
def __init__(self):
self.count = 0
self.total = 0
self.power = 0
self.last = None
def step(self, value):
if self.last is not None:
self.count += 1
self.total += self.last
self.power += self.last ** 2
self.last = value
@property
def mean(self):
return self.total / self.count
@property
def standard_deviation(self):
value = (self.power / self.count) - self.mean ** 2
return sqrt(value) if value > 0 else 0
def finalize(self):
if self.count == 0:
return self.last
return (self.last - self.mean) / (self.standard_deviation or 1)
def install(connection):
connection.create_aggregate("zscore", 1, ZScore)
connection.executescript(CREATE_TREND_TABLE)
def run(db, height, final_height, affected_claims):
# don't start tracking until we're at the end of initial sync
if height < (final_height - (TRENDING_WINDOW * TRENDING_DATA_POINTS)):
return
if height % TRENDING_WINDOW != 0:
return
db.execute(f"""
DELETE FROM trend WHERE height < {height - (TRENDING_WINDOW * TRENDING_DATA_POINTS)}
""")
start = (height - TRENDING_WINDOW) + 1
db.execute(f"""
INSERT OR IGNORE INTO trend (claim_hash, height, amount)
SELECT claim_hash, {start}, COALESCE(
(SELECT SUM(amount) FROM support WHERE claim_hash=claim.claim_hash
AND height >= {start}), 0
) AS support_sum
FROM claim WHERE support_sum > 0
""")
zscore = ZScore()
for global_sum in db.execute("SELECT AVG(amount) AS avg_amount FROM trend GROUP BY height"):
zscore.step(global_sum.avg_amount)
global_mean, global_deviation = 0, 1
if zscore.count > 0:
global_mean = zscore.mean
global_deviation = zscore.standard_deviation
db.execute(f"""
UPDATE claim SET
trending_local = COALESCE((
SELECT zscore(amount) FROM trend
WHERE claim_hash=claim.claim_hash ORDER BY height DESC
), 0),
trending_global = COALESCE((
SELECT (amount - {global_mean}) / {global_deviation} FROM trend
WHERE claim_hash=claim.claim_hash AND height = {start}
), 0),
trending_group = 0,
trending_mixed = 0
""")
# trending_group and trending_mixed determine how trending will show in query results
# normally the SQL will be: "ORDER BY trending_group, trending_mixed"
# changing the trending_group will have significant impact on trending results
# changing the value used for trending_mixed will only impact trending within a trending_group
db.execute(f"""
UPDATE claim SET
trending_group = CASE
WHEN trending_local > 0 AND trending_global > 0 THEN 4
WHEN trending_local <= 0 AND trending_global > 0 THEN 3
WHEN trending_local > 0 AND trending_global <= 0 THEN 2
WHEN trending_local <= 0 AND trending_global <= 0 THEN 1
END,
trending_mixed = CASE
WHEN trending_local > 0 AND trending_global > 0 THEN trending_global
WHEN trending_local <= 0 AND trending_global > 0 THEN trending_local
WHEN trending_local > 0 AND trending_global <= 0 THEN trending_local
WHEN trending_local <= 0 AND trending_global <= 0 THEN trending_global
END
WHERE trending_local <> 0 OR trending_global <> 0
""")