From 6f2b985b736429fe8f565ccd141cd1efba1fb7ef Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 30 Aug 2021 12:16:07 -0400 Subject: [PATCH] update trending in elasticsearch -add TrendingPrefixSpike to leveldb -expose `TRENDING_HALF_LIFE`, `TRENDING_WHALE_HALF_LIFE` and `TRENDING_WHALE_THRESHOLD` hub settings --- lbry/wallet/server/block_processor.py | 111 +++++++++--------- lbry/wallet/server/db/__init__.py | 1 + .../server/db/elasticsearch/constants.py | 7 +- lbry/wallet/server/db/elasticsearch/search.py | 52 +++++++- lbry/wallet/server/db/prefixes.py | 58 +++++++++ lbry/wallet/server/env.py | 3 + lbry/wallet/server/leveldb.py | 33 ++++-- 7 files changed, 187 insertions(+), 78 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e7247869d..c7239abb8 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -24,13 +24,11 @@ from lbry.wallet.server.util import chunks, class_logger from lbry.crypto.hash import hash160 from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.mempool import MemPool -from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue -from lbry.wallet.server.db.trending import TrendingDB from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack if typing.TYPE_CHECKING: @@ -264,8 +262,6 @@ class BlockProcessor: self.claim_channels: Dict[bytes, bytes] = {} self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list) - self.trending_db = TrendingDB(env.db_dir) - async def claim_producer(self): if self.db.db_height <= 1: return @@ -314,11 +310,6 @@ class BlockProcessor: await self.run_in_thread(self.advance_block, block) await self.flush() - # trending - extra_claims = self.trending_db.process_block(self.height, - self.daemon.cached_height()) - self.touched_claims_to_send_es.union(extra_claims) - self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) if self.height == self.coin.nExtendedClaimExpirationForkHeight: self.logger.warning( @@ -326,14 +317,15 @@ class BlockProcessor: ) await self.run_in_thread(self.db.apply_expiration_extension_fork) # TODO: we shouldnt wait on the search index updating before advancing to the next block - if not self.db.first_sync: - self.db.reload_blocking_filtering_streams() - await self.db.search_index.claim_consumer(self.claim_producer()) - await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, - self.db.filtered_streams, self.db.filtered_channels) - self.db.search_index.clear_caches() - self.touched_claims_to_send_es.clear() - self.removed_claims_to_send_es.clear() + if not self.db.first_sync: + self.db.reload_blocking_filtering_streams() + await self.db.search_index.claim_consumer(self.claim_producer()) + await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, + self.db.filtered_streams, self.db.filtered_channels) + await self.db.search_index.apply_update_and_decay_trending_score() + self.db.search_index.clear_caches() + self.touched_claims_to_send_es.clear() + self.removed_claims_to_send_es.clear() # print("******************\n") except: self.logger.exception("advance blocks failed") @@ -368,16 +360,15 @@ class BlockProcessor: await self.flush() self.logger.info(f'backed up to height {self.height:,d}') - await self.db._read_claim_txos() # TODO: don't do this - - for touched in self.touched_claims_to_send_es: - if not self.db.get_claim_txo(touched): - self.removed_claims_to_send_es.add(touched) - self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es) - await self.db.search_index.claim_consumer(self.claim_producer()) - self.db.search_index.clear_caches() - self.touched_claims_to_send_es.clear() - self.removed_claims_to_send_es.clear() + await self.db._read_claim_txos() # TODO: don't do this + for touched in self.touched_claims_to_send_es: + if not self.db.get_claim_txo(touched): + self.removed_claims_to_send_es.add(touched) + self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es) + await self.db.search_index.claim_consumer(self.claim_producer()) + self.db.search_index.clear_caches() + self.touched_claims_to_send_es.clear() + self.removed_claims_to_send_es.clear() await self.prefetcher.reset_height(self.height) self.reorg_count_metric.inc() except: @@ -485,6 +476,7 @@ class BlockProcessor: if txo.script.is_claim_name: # it's a root claim root_tx_num, root_idx = tx_num, nout + previous_amount = 0 else: # it's a claim update if claim_hash not in spent_claims: # print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}") @@ -511,6 +503,7 @@ class BlockProcessor: previous_claim.amount ).get_remove_activate_ops() ) + previous_amount = previous_claim.amount self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name @@ -524,11 +517,13 @@ class BlockProcessor: self.txo_to_claim[(tx_num, nout)] = pending self.claim_hash_to_txo[claim_hash] = (tx_num, nout) self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops()) - self.trending_db.add_event({"claim_hash": claim_hash, - "event": "upsert", - "lbc": 1E-8*txo.amount}) - def _add_support(self, txo: 'Output', tx_num: int, nout: int): + # add the spike for trending + self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike( + height, claim_hash, tx_num, nout, txo.amount - previous_amount, half_life=self.env.trending_half_life + )) + + def _add_support(self, height: int, txo: 'Output', tx_num: int, nout: int): supported_claim_hash = txo.claim_hash[::-1] self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout)) self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount @@ -536,49 +531,54 @@ class BlockProcessor: self.db_op_stack.extend_ops(StagedClaimtrieSupport( supported_claim_hash, tx_num, nout, txo.amount ).get_add_support_utxo_ops()) - self.trending_db.add_event({"claim_hash": supported_claim_hash, - "event": "support", - "lbc": 1E-8*txo.amount}) + + # add the spike for trending + self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike( + height, supported_claim_hash, tx_num, nout, txo.amount, half_life=self.env.trending_half_life + )) def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output', spent_claims: typing.Dict[bytes, Tuple[int, int, str]]): if txo.script.is_claim_name or txo.script.is_update_claim: self._add_claim_or_update(height, txo, tx_hash, tx_num, nout, spent_claims) elif txo.script.is_support_claim or txo.script.is_support_claim_data: - self._add_support(txo, tx_num, nout) + self._add_support(height, txo, tx_num, nout) - def _spend_support_txo(self, txin): + def _spend_support_txo(self, height: int, txin: TxInput): txin_num = self.db.transaction_num_mapping[txin.prev_hash] + activation = 0 if (txin_num, txin.prev_idx) in self.support_txo_to_claim: spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx)) self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx)) supported_name = self._get_pending_claim_name(spent_support) - # print(f"\tspent support for {spent_support.hex()}") self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx)) - self.db_op_stack.extend_ops(StagedClaimtrieSupport( - spent_support, txin_num, txin.prev_idx, support_amount - ).get_spend_support_txo_ops()) - self.trending_db.add_event({"claim_hash": spent_support, - "event": "support", - "lbc": -1E-8*support_amount}) - - spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) - if spent_support: + txin_height = height + else: + spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) + if not spent_support: # it is not a support + return supported_name = self._get_pending_claim_name(spent_support) if supported_name is not None: - self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx)) + self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append( + (txin_num, txin.prev_idx)) activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True) + txin_height = bisect_right(self.db.tx_counts, self.db.transaction_num_mapping[txin.prev_hash]) if 0 < activation < self.height + 1: self.removed_active_support_amount_by_claim[spent_support].append(support_amount) - # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}") - self.db_op_stack.extend_ops(StagedClaimtrieSupport( - spent_support, txin_num, txin.prev_idx, support_amount - ).get_spend_support_txo_ops()) if supported_name is not None and activation > 0: self.db_op_stack.extend_ops(StagedActivation( ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, support_amount ).get_remove_activate_ops()) + # print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}") + self.db_op_stack.extend_ops(StagedClaimtrieSupport( + spent_support, txin_num, txin.prev_idx, support_amount + ).get_spend_support_txo_ops()) + # add the spike for trending + self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike( + height, spent_support, txin_num, txin.prev_idx, support_amount, subtract=True, + depth=height-txin_height-1, half_life=self.env.trending_half_life + )) def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool: txin_num = self.db.transaction_num_mapping[txin.prev_hash] @@ -602,9 +602,9 @@ class BlockProcessor: self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops()) return True - def _spend_claim_or_support_txo(self, txin, spent_claims): + def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims): if not self._spend_claim_txo(txin, spent_claims): - self._spend_support_txo(txin) + self._spend_support_txo(height, txin) def _abandon_claim(self, claim_hash: bytes, tx_num: int, nout: int, normalized_name: str): if (tx_num, nout) in self.txo_to_claim: @@ -639,9 +639,6 @@ class BlockProcessor: if normalized_name.startswith('@'): # abandon a channel, invalidate signatures self._invalidate_channel_signatures(claim_hash) - self.trending_db.add_event({"claim_hash": claim_hash, - "event": "delete"}) - def _invalidate_channel_signatures(self, claim_hash: bytes): for k, signed_claim_hash in self.db.db.iterator( prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)): @@ -1216,7 +1213,7 @@ class BlockProcessor: if tx_count not in self.hashXs_by_tx[hashX]: self.hashXs_by_tx[hashX].append(tx_count) # spend claim/support txo - spend_claim_or_support_txo(txin, spent_claims) + spend_claim_or_support_txo(height, txin, spent_claims) # Add the new UTXOs for nout, txout in enumerate(tx.outputs): diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index ec33b6ead..f2c40697b 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -37,3 +37,4 @@ class DB_PREFIXES(enum.Enum): hashx_utxo = b'h' hashx_history = b'x' db_state = b's' + trending_spike = b't' diff --git a/lbry/wallet/server/db/elasticsearch/constants.py b/lbry/wallet/server/db/elasticsearch/constants.py index f89bf3353..8461c6ab7 100644 --- a/lbry/wallet/server/db/elasticsearch/constants.py +++ b/lbry/wallet/server/db/elasticsearch/constants.py @@ -31,7 +31,8 @@ INDEX_DEFAULT_SETTINGS = { "claim_type": {"type": "byte"}, "censor_type": {"type": "byte"}, "trending_score": {"type": "float"}, - "release_time": {"type": "long"}, + "trending_score_change": {"type": "float"}, + "release_time": {"type": "long"} } } } @@ -53,7 +54,7 @@ FIELDS = { 'duration', 'release_time', 'tags', 'languages', 'has_source', 'reposted_claim_type', 'reposted_claim_id', 'repost_count', - 'trending_score', 'tx_num' + 'trending_score', 'tx_num', 'trending_score_change' } TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id', @@ -66,7 +67,7 @@ RANGE_FIELDS = { 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', 'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel', 'amount', 'effective_amount', 'support_amount', - 'trending_score', 'censor_type', 'tx_num' + 'trending_score', 'censor_type', 'tx_num', 'trending_score_change' } ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 0379ec090..7d933f036 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -1,3 +1,4 @@ +import math import asyncio import struct from binascii import unhexlify @@ -9,7 +10,6 @@ from typing import Optional, List, Iterable, Union from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch.helpers import async_streaming_bulk -from lbry.crypto.base58 import Base58 from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError from lbry.schema.result import Outputs, Censor from lbry.schema.tags import clean_tags @@ -43,7 +43,8 @@ class IndexVersionMismatch(Exception): class SearchIndex: VERSION = 1 - def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200): + def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200, + half_life=0.4, whale_threshold=10000, whale_half_life=0.99): self.search_timeout = search_timeout self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None @@ -56,6 +57,9 @@ class SearchIndex: self.resolution_cache = LRUCache(2 ** 17) self._elastic_host = elastic_host self._elastic_port = elastic_port + self._trending_half_life = half_life + self._trending_whale_threshold = whale_threshold + self._trending_whale_half_life = whale_half_life async def get_index_version(self) -> int: try: @@ -121,7 +125,7 @@ class SearchIndex: } count += 1 if count % 100 == 0: - self.logger.debug("Indexing in progress, %d claims.", count) + self.logger.info("Indexing in progress, %d claims.", count) if count: self.logger.info("Indexing done for %d claims.", count) else: @@ -140,19 +144,57 @@ class SearchIndex: self.logger.debug("Indexing done.") def update_filter_query(self, censor_type, blockdict, channels=False): - blockdict = {key.hex(): value.hex() for key, value in blockdict.items()} + blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()} if channels: update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") else: update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") key = 'channel_id' if channels else 'claim_id' update['script'] = { - "source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_id=params[ctx._source.{key}]", + "source": f"ctx._source.censor_type={censor_type}; " + f"ctx._source.censoring_channel_id=params[ctx._source.{key}];", "lang": "painless", "params": blockdict } return update + async def apply_update_and_decay_trending_score(self): + update_trending_score_script = """ + if (ctx._source.trending_score == null) { + ctx._source.trending_score = ctx._source.trending_score_change; + } else { + ctx._source.trending_score += ctx._source.trending_score_change; + } + ctx._source.trending_score_change = 0.0; + """ + await self.sync_client.update_by_query( + self.index, body={ + 'query': { + 'bool': {'must_not': [{'match': {'trending_score_change': 0.0}}]} + }, + 'script': {'source': update_trending_score_script, 'lang': 'painless'} + }, slices=4, conflicts='proceed' + ) + + whale_decay_factor = 2 * (2.0 ** (-1 / self._trending_whale_half_life)) + decay_factor = 2 * (2.0 ** (-1 / self._trending_half_life)) + decay_script = """ + if (ctx._source.trending_score == null) { ctx._source.trending_score = 0.0; } + if ((-0.000001 <= ctx._source.trending_score) && (ctx._source.trending_score <= 0.000001)) { + ctx._source.trending_score = 0.0; + } else if (ctx._source.effective_amount >= %s) { + ctx._source.trending_score *= %s; + } else { + ctx._source.trending_score *= %s; + } + """ % (self._trending_whale_threshold, whale_decay_factor, decay_factor) + await self.sync_client.update_by_query( + self.index, body={ + 'query': {'bool': {'must_not': [{'match': {'trending_score': 0.0}}]}}, + 'script': {'source': decay_script, 'lang': 'painless'} + }, slices=4, conflicts='proceed' + ) + async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): if filtered_streams: await self.sync_client.update_by_query( diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 10655138b..ddeb204f9 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -463,6 +463,21 @@ class TouchedOrDeletedClaimValue(typing.NamedTuple): f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})" +class TrendingSpikeKey(typing.NamedTuple): + height: int + claim_hash: bytes + tx_num: int + position: int + + def __str__(self): + return f"{self.__class__.__name__}(height={self.height}, claim_hash={self.claim_hash.hex()}, " \ + f"tx_num={self.tx_num}, position={self.position})" + + +class TrendingSpikeValue(typing.NamedTuple): + mass: float + + class ActiveAmountPrefixRow(PrefixRow): prefix = DB_PREFIXES.active_amount.value key_struct = struct.Struct(b'>20sBLLH') @@ -1335,6 +1350,47 @@ class TouchedOrDeletedPrefixRow(PrefixRow): return cls.pack_key(height), cls.pack_value(touched, deleted) +class TrendingSpikePrefixRow(PrefixRow): + prefix = DB_PREFIXES.trending_spike.value + key_struct = struct.Struct(b'>L20sLH') + value_struct = struct.Struct(b'>f') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>L').pack, + struct.Struct(b'>L20s').pack, + struct.Struct(b'>L20sL').pack, + struct.Struct(b'>L20sLH').pack + ] + + def pack_spike(self, height: int, claim_hash: bytes, tx_num: int, position: int, amount: int, half_life: int, + depth: int = 0, subtract: bool = False) -> RevertablePut: + softened_change = (((amount * 1E-8) + 1E-8) ** 0.25).real + spike_mass = (-1.0 if subtract else 1.0) * softened_change * 2 * ((2.0 ** (-1 / half_life)) ** depth) + # trending_spike_height = self.height + delay_trending_spike(self.amount * 1E-8) + return RevertablePut(*self.pack_item(height, claim_hash, tx_num, position, spike_mass)) + + @classmethod + def pack_key(cls, height: int, claim_hash: bytes, tx_num: int, position: int): + return super().pack_key(height, claim_hash, tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> TrendingSpikeKey: + return TrendingSpikeKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, mass: float) -> bytes: + return super().pack_value(mass) + + @classmethod + def unpack_value(cls, data: bytes) -> TrendingSpikeValue: + return TrendingSpikeValue(*cls.value_struct.unpack(data)) + + @classmethod + def pack_item(cls, height: int, claim_hash: bytes, tx_num: int, position: int, mass: float): + return cls.pack_key(height, claim_hash, tx_num, position), cls.pack_value(mass) + + class Prefixes: claim_to_support = ClaimToSupportPrefixRow support_to_claim = SupportToClaimPrefixRow @@ -1369,6 +1425,7 @@ class Prefixes: tx = TXPrefixRow header = BlockHeaderPrefixRow touched_or_deleted = TouchedOrDeletedPrefixRow + trending_spike = TrendingSpikePrefixRow class PrefixDB: @@ -1402,6 +1459,7 @@ class PrefixDB: self.tx = TXPrefixRow(db, op_stack) self.header = BlockHeaderPrefixRow(db, op_stack) self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack) + self.trending_spike = TrendingSpikePrefixRow(db, op_stack) def commit(self): try: diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index c20d64d64..6086c3ff6 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -42,6 +42,9 @@ class Env: self.trending_algorithms = [ trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending ] + self.trending_half_life = float(self.string_amount('TRENDING_HALF_LIFE', "0.9")) + self.trending_whale_half_life = float(self.string_amount('TRENDING_WHALE_HALF_LIFE', "0.99")) + self.trending_whale_threshold = float(self.integer('TRENDING_WHALE_THRESHOLD', 10000)) self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None) self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True) self.track_metrics = self.boolean('TRACK_METRICS', False) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 08b5d237b..b21383156 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -22,7 +22,7 @@ from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List from functools import partial from asyncio import sleep from bisect import bisect_right -from collections import defaultdict, OrderedDict +from collections import defaultdict from lbry.error import ResolveCensoredError from lbry.schema.result import Censor @@ -38,7 +38,6 @@ from lbry.wallet.server.db.revertable import RevertableOpStack from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue -from lbry.wallet.server.db.trending import TrendingDB from lbry.wallet.transaction import OutputScript from lbry.schema.claim import Claim, guess_stream_type from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger @@ -150,13 +149,15 @@ class LevelDB: self.total_transactions = None self.transaction_num_mapping = {} - self.claim_to_txo: typing.OrderedDict[bytes, ClaimToTXOValue] = OrderedDict() - self.txo_to_claim: typing.OrderedDict[Tuple[int, int], bytes] = OrderedDict() + self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} + self.txo_to_claim: Dict[Tuple[int, int], bytes] = {} # Search index self.search_index = SearchIndex( self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port + elastic_host=env.elastic_host, elastic_port=env.elastic_port, + half_life=self.env.trending_half_life, whale_threshold=self.env.trending_whale_threshold, + whale_half_life=self.env.trending_whale_half_life ) self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH) @@ -168,8 +169,6 @@ class LevelDB: else: self.ledger = RegTestLedger - self.trending_db = TrendingDB(self.env.db_dir) - def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]: claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx)) if not claim_hash_and_name: @@ -188,6 +187,12 @@ class LevelDB: cnt += 1 return cnt + def get_trending_spike_sum(self, height: int, claim_hash: bytes) -> float: + spikes = 0.0 + for k, v in self.prefix_db.trending_spike.iterate(prefix=(height, claim_hash)): + spikes += v.mass + return spikes + def get_activation(self, tx_num, position, is_support=False) -> int: activation = self.db.get( Prefixes.activated.pack_key( @@ -704,12 +709,9 @@ class LevelDB: 'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED, 'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None, 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash), - 'trending_score': self.trending_db.get_trending_score(claim_hash) - # 'trending_group': 0, - # 'trending_mixed': 0, - # 'trending_local': 0, - # 'trending_global': 0, + 'trending_score_change': self.get_trending_spike_sum(self.db_height, claim_hash) } + if metadata.is_repost and reposted_duration is not None: value['duration'] = reposted_duration elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration): @@ -748,7 +750,7 @@ class LevelDB: self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue name = self.claim_to_txo[claim_hash].normalized_name - if not self.db.get(Prefixes.claim_takeover.pack_key(name)): + if not self.prefix_db.claim_takeover.get(name): self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue claim = self._fs_get_claim_by_hash(claim_hash) @@ -944,6 +946,11 @@ class LevelDB: stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False ) ) + delete_undo_keys.extend( + self.db.iterator( + prefix=Prefixes.trending_spike.pack_partial_key(min_height), include_value=False + ) + ) with self.db.write_batch(transaction=True) as batch: batch_put = batch.put