update trending in elasticsearch

-add TrendingPrefixSpike to leveldb
-expose `TRENDING_HALF_LIFE`, `TRENDING_WHALE_HALF_LIFE` and `TRENDING_WHALE_THRESHOLD` hub settings
This commit is contained in:
Jack Robison 2021-08-30 12:16:07 -04:00 committed by Victor Shyba
parent 8eba05308d
commit 6f2b985b73
7 changed files with 187 additions and 78 deletions

View file

@ -24,13 +24,11 @@ from lbry.wallet.server.util import chunks, class_logger
from lbry.crypto.hash import hash160
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue
from lbry.wallet.server.db.trending import TrendingDB
from lbry.wallet.server.udp import StatusServer
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
if typing.TYPE_CHECKING:
@ -264,8 +262,6 @@ class BlockProcessor:
self.claim_channels: Dict[bytes, bytes] = {}
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
self.trending_db = TrendingDB(env.db_dir)
async def claim_producer(self):
if self.db.db_height <= 1:
return
@ -314,11 +310,6 @@ class BlockProcessor:
await self.run_in_thread(self.advance_block, block)
await self.flush()
# trending
extra_claims = self.trending_db.process_block(self.height,
self.daemon.cached_height())
self.touched_claims_to_send_es.union(extra_claims)
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
if self.height == self.coin.nExtendedClaimExpirationForkHeight:
self.logger.warning(
@ -331,6 +322,7 @@ class BlockProcessor:
await self.db.search_index.claim_consumer(self.claim_producer())
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
self.db.filtered_streams, self.db.filtered_channels)
await self.db.search_index.apply_update_and_decay_trending_score()
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
@ -369,7 +361,6 @@ class BlockProcessor:
self.logger.info(f'backed up to height {self.height:,d}')
await self.db._read_claim_txos() # TODO: don't do this
for touched in self.touched_claims_to_send_es:
if not self.db.get_claim_txo(touched):
self.removed_claims_to_send_es.add(touched)
@ -485,6 +476,7 @@ class BlockProcessor:
if txo.script.is_claim_name: # it's a root claim
root_tx_num, root_idx = tx_num, nout
previous_amount = 0
else: # it's a claim update
if claim_hash not in spent_claims:
# print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}")
@ -511,6 +503,7 @@ class BlockProcessor:
previous_claim.amount
).get_remove_activate_ops()
)
previous_amount = previous_claim.amount
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name
@ -524,11 +517,13 @@ class BlockProcessor:
self.txo_to_claim[(tx_num, nout)] = pending
self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops())
self.trending_db.add_event({"claim_hash": claim_hash,
"event": "upsert",
"lbc": 1E-8*txo.amount})
def _add_support(self, txo: 'Output', tx_num: int, nout: int):
# add the spike for trending
self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike(
height, claim_hash, tx_num, nout, txo.amount - previous_amount, half_life=self.env.trending_half_life
))
def _add_support(self, height: int, txo: 'Output', tx_num: int, nout: int):
supported_claim_hash = txo.claim_hash[::-1]
self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout))
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount
@ -536,49 +531,54 @@ class BlockProcessor:
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
supported_claim_hash, tx_num, nout, txo.amount
).get_add_support_utxo_ops())
self.trending_db.add_event({"claim_hash": supported_claim_hash,
"event": "support",
"lbc": 1E-8*txo.amount})
# add the spike for trending
self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike(
height, supported_claim_hash, tx_num, nout, txo.amount, half_life=self.env.trending_half_life
))
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output',
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]):
if txo.script.is_claim_name or txo.script.is_update_claim:
self._add_claim_or_update(height, txo, tx_hash, tx_num, nout, spent_claims)
elif txo.script.is_support_claim or txo.script.is_support_claim_data:
self._add_support(txo, tx_num, nout)
self._add_support(height, txo, tx_num, nout)
def _spend_support_txo(self, txin):
def _spend_support_txo(self, height: int, txin: TxInput):
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
activation = 0
if (txin_num, txin.prev_idx) in self.support_txo_to_claim:
spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx))
self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx))
supported_name = self._get_pending_claim_name(spent_support)
# print(f"\tspent support for {spent_support.hex()}")
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops())
self.trending_db.add_event({"claim_hash": spent_support,
"event": "support",
"lbc": -1E-8*support_amount})
txin_height = height
else:
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
if spent_support:
if not spent_support: # it is not a support
return
supported_name = self._get_pending_claim_name(spent_support)
if supported_name is not None:
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append(
(txin_num, txin.prev_idx))
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True)
txin_height = bisect_right(self.db.tx_counts, self.db.transaction_num_mapping[txin.prev_hash])
if 0 < activation < self.height + 1:
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops())
if supported_name is not None and activation > 0:
self.db_op_stack.extend_ops(StagedActivation(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
support_amount
).get_remove_activate_ops())
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
spent_support, txin_num, txin.prev_idx, support_amount
).get_spend_support_txo_ops())
# add the spike for trending
self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike(
height, spent_support, txin_num, txin.prev_idx, support_amount, subtract=True,
depth=height-txin_height-1, half_life=self.env.trending_half_life
))
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool:
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
@ -602,9 +602,9 @@ class BlockProcessor:
self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops())
return True
def _spend_claim_or_support_txo(self, txin, spent_claims):
def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims):
if not self._spend_claim_txo(txin, spent_claims):
self._spend_support_txo(txin)
self._spend_support_txo(height, txin)
def _abandon_claim(self, claim_hash: bytes, tx_num: int, nout: int, normalized_name: str):
if (tx_num, nout) in self.txo_to_claim:
@ -639,9 +639,6 @@ class BlockProcessor:
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
self._invalidate_channel_signatures(claim_hash)
self.trending_db.add_event({"claim_hash": claim_hash,
"event": "delete"})
def _invalidate_channel_signatures(self, claim_hash: bytes):
for k, signed_claim_hash in self.db.db.iterator(
prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)):
@ -1216,7 +1213,7 @@ class BlockProcessor:
if tx_count not in self.hashXs_by_tx[hashX]:
self.hashXs_by_tx[hashX].append(tx_count)
# spend claim/support txo
spend_claim_or_support_txo(txin, spent_claims)
spend_claim_or_support_txo(height, txin, spent_claims)
# Add the new UTXOs
for nout, txout in enumerate(tx.outputs):

View file

@ -37,3 +37,4 @@ class DB_PREFIXES(enum.Enum):
hashx_utxo = b'h'
hashx_history = b'x'
db_state = b's'
trending_spike = b't'

View file

@ -31,7 +31,8 @@ INDEX_DEFAULT_SETTINGS = {
"claim_type": {"type": "byte"},
"censor_type": {"type": "byte"},
"trending_score": {"type": "float"},
"release_time": {"type": "long"},
"trending_score_change": {"type": "float"},
"release_time": {"type": "long"}
}
}
}
@ -53,7 +54,7 @@ FIELDS = {
'duration', 'release_time',
'tags', 'languages', 'has_source', 'reposted_claim_type',
'reposted_claim_id', 'repost_count',
'trending_score', 'tx_num'
'trending_score', 'tx_num', 'trending_score_change'
}
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id',
@ -66,7 +67,7 @@ RANGE_FIELDS = {
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_score', 'censor_type', 'tx_num'
'trending_score', 'censor_type', 'tx_num', 'trending_score_change'
}
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS

View file

@ -1,3 +1,4 @@
import math
import asyncio
import struct
from binascii import unhexlify
@ -9,7 +10,6 @@ from typing import Optional, List, Iterable, Union
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from elasticsearch.helpers import async_streaming_bulk
from lbry.crypto.base58 import Base58
from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
from lbry.schema.result import Outputs, Censor
from lbry.schema.tags import clean_tags
@ -43,7 +43,8 @@ class IndexVersionMismatch(Exception):
class SearchIndex:
VERSION = 1
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200,
half_life=0.4, whale_threshold=10000, whale_half_life=0.99):
self.search_timeout = search_timeout
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.search_client: Optional[AsyncElasticsearch] = None
@ -56,6 +57,9 @@ class SearchIndex:
self.resolution_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
self._trending_half_life = half_life
self._trending_whale_threshold = whale_threshold
self._trending_whale_half_life = whale_half_life
async def get_index_version(self) -> int:
try:
@ -121,7 +125,7 @@ class SearchIndex:
}
count += 1
if count % 100 == 0:
self.logger.debug("Indexing in progress, %d claims.", count)
self.logger.info("Indexing in progress, %d claims.", count)
if count:
self.logger.info("Indexing done for %d claims.", count)
else:
@ -140,19 +144,57 @@ class SearchIndex:
self.logger.debug("Indexing done.")
def update_filter_query(self, censor_type, blockdict, channels=False):
blockdict = {key.hex(): value.hex() for key, value in blockdict.items()}
blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()}
if channels:
update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
else:
update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
key = 'channel_id' if channels else 'claim_id'
update['script'] = {
"source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_id=params[ctx._source.{key}]",
"source": f"ctx._source.censor_type={censor_type}; "
f"ctx._source.censoring_channel_id=params[ctx._source.{key}];",
"lang": "painless",
"params": blockdict
}
return update
async def apply_update_and_decay_trending_score(self):
update_trending_score_script = """
if (ctx._source.trending_score == null) {
ctx._source.trending_score = ctx._source.trending_score_change;
} else {
ctx._source.trending_score += ctx._source.trending_score_change;
}
ctx._source.trending_score_change = 0.0;
"""
await self.sync_client.update_by_query(
self.index, body={
'query': {
'bool': {'must_not': [{'match': {'trending_score_change': 0.0}}]}
},
'script': {'source': update_trending_score_script, 'lang': 'painless'}
}, slices=4, conflicts='proceed'
)
whale_decay_factor = 2 * (2.0 ** (-1 / self._trending_whale_half_life))
decay_factor = 2 * (2.0 ** (-1 / self._trending_half_life))
decay_script = """
if (ctx._source.trending_score == null) { ctx._source.trending_score = 0.0; }
if ((-0.000001 <= ctx._source.trending_score) && (ctx._source.trending_score <= 0.000001)) {
ctx._source.trending_score = 0.0;
} else if (ctx._source.effective_amount >= %s) {
ctx._source.trending_score *= %s;
} else {
ctx._source.trending_score *= %s;
}
""" % (self._trending_whale_threshold, whale_decay_factor, decay_factor)
await self.sync_client.update_by_query(
self.index, body={
'query': {'bool': {'must_not': [{'match': {'trending_score': 0.0}}]}},
'script': {'source': decay_script, 'lang': 'painless'}
}, slices=4, conflicts='proceed'
)
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
if filtered_streams:
await self.sync_client.update_by_query(

View file

@ -463,6 +463,21 @@ class TouchedOrDeletedClaimValue(typing.NamedTuple):
f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})"
class TrendingSpikeKey(typing.NamedTuple):
height: int
claim_hash: bytes
tx_num: int
position: int
def __str__(self):
return f"{self.__class__.__name__}(height={self.height}, claim_hash={self.claim_hash.hex()}, " \
f"tx_num={self.tx_num}, position={self.position})"
class TrendingSpikeValue(typing.NamedTuple):
mass: float
class ActiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.active_amount.value
key_struct = struct.Struct(b'>20sBLLH')
@ -1335,6 +1350,47 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(touched, deleted)
class TrendingSpikePrefixRow(PrefixRow):
prefix = DB_PREFIXES.trending_spike.value
key_struct = struct.Struct(b'>L20sLH')
value_struct = struct.Struct(b'>f')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack,
struct.Struct(b'>L20s').pack,
struct.Struct(b'>L20sL').pack,
struct.Struct(b'>L20sLH').pack
]
def pack_spike(self, height: int, claim_hash: bytes, tx_num: int, position: int, amount: int, half_life: int,
depth: int = 0, subtract: bool = False) -> RevertablePut:
softened_change = (((amount * 1E-8) + 1E-8) ** 0.25).real
spike_mass = (-1.0 if subtract else 1.0) * softened_change * 2 * ((2.0 ** (-1 / half_life)) ** depth)
# trending_spike_height = self.height + delay_trending_spike(self.amount * 1E-8)
return RevertablePut(*self.pack_item(height, claim_hash, tx_num, position, spike_mass))
@classmethod
def pack_key(cls, height: int, claim_hash: bytes, tx_num: int, position: int):
return super().pack_key(height, claim_hash, tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> TrendingSpikeKey:
return TrendingSpikeKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, mass: float) -> bytes:
return super().pack_value(mass)
@classmethod
def unpack_value(cls, data: bytes) -> TrendingSpikeValue:
return TrendingSpikeValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, height: int, claim_hash: bytes, tx_num: int, position: int, mass: float):
return cls.pack_key(height, claim_hash, tx_num, position), cls.pack_value(mass)
class Prefixes:
claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow
@ -1369,6 +1425,7 @@ class Prefixes:
tx = TXPrefixRow
header = BlockHeaderPrefixRow
touched_or_deleted = TouchedOrDeletedPrefixRow
trending_spike = TrendingSpikePrefixRow
class PrefixDB:
@ -1402,6 +1459,7 @@ class PrefixDB:
self.tx = TXPrefixRow(db, op_stack)
self.header = BlockHeaderPrefixRow(db, op_stack)
self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack)
self.trending_spike = TrendingSpikePrefixRow(db, op_stack)
def commit(self):
try:

View file

@ -42,6 +42,9 @@ class Env:
self.trending_algorithms = [
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
]
self.trending_half_life = float(self.string_amount('TRENDING_HALF_LIFE', "0.9"))
self.trending_whale_half_life = float(self.string_amount('TRENDING_WHALE_HALF_LIFE', "0.99"))
self.trending_whale_threshold = float(self.integer('TRENDING_WHALE_THRESHOLD', 10000))
self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None)
self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True)
self.track_metrics = self.boolean('TRACK_METRICS', False)

View file

@ -22,7 +22,7 @@ from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List
from functools import partial
from asyncio import sleep
from bisect import bisect_right
from collections import defaultdict, OrderedDict
from collections import defaultdict
from lbry.error import ResolveCensoredError
from lbry.schema.result import Censor
@ -38,7 +38,6 @@ from lbry.wallet.server.db.revertable import RevertableOpStack
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
from lbry.wallet.server.db.trending import TrendingDB
from lbry.wallet.transaction import OutputScript
from lbry.schema.claim import Claim, guess_stream_type
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
@ -150,13 +149,15 @@ class LevelDB:
self.total_transactions = None
self.transaction_num_mapping = {}
self.claim_to_txo: typing.OrderedDict[bytes, ClaimToTXOValue] = OrderedDict()
self.txo_to_claim: typing.OrderedDict[Tuple[int, int], bytes] = OrderedDict()
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: Dict[Tuple[int, int], bytes] = {}
# Search index
self.search_index = SearchIndex(
self.env.es_index_prefix, self.env.database_query_timeout,
elastic_host=env.elastic_host, elastic_port=env.elastic_port
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
half_life=self.env.trending_half_life, whale_threshold=self.env.trending_whale_threshold,
whale_half_life=self.env.trending_whale_half_life
)
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
@ -168,8 +169,6 @@ class LevelDB:
else:
self.ledger = RegTestLedger
self.trending_db = TrendingDB(self.env.db_dir)
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
if not claim_hash_and_name:
@ -188,6 +187,12 @@ class LevelDB:
cnt += 1
return cnt
def get_trending_spike_sum(self, height: int, claim_hash: bytes) -> float:
spikes = 0.0
for k, v in self.prefix_db.trending_spike.iterate(prefix=(height, claim_hash)):
spikes += v.mass
return spikes
def get_activation(self, tx_num, position, is_support=False) -> int:
activation = self.db.get(
Prefixes.activated.pack_key(
@ -704,12 +709,9 @@ class LevelDB:
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
'trending_score': self.trending_db.get_trending_score(claim_hash)
# 'trending_group': 0,
# 'trending_mixed': 0,
# 'trending_local': 0,
# 'trending_global': 0,
'trending_score_change': self.get_trending_spike_sum(self.db_height, claim_hash)
}
if metadata.is_repost and reposted_duration is not None:
value['duration'] = reposted_duration
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
@ -748,7 +750,7 @@ class LevelDB:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
name = self.claim_to_txo[claim_hash].normalized_name
if not self.db.get(Prefixes.claim_takeover.pack_key(name)):
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
claim = self._fs_get_claim_by_hash(claim_hash)
@ -944,6 +946,11 @@ class LevelDB:
stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False
)
)
delete_undo_keys.extend(
self.db.iterator(
prefix=Prefixes.trending_spike.pack_partial_key(min_height), include_value=False
)
)
with self.db.write_batch(transaction=True) as batch:
batch_put = batch.put