update trending in elasticsearch
-add TrendingPrefixSpike to leveldb -expose `TRENDING_HALF_LIFE`, `TRENDING_WHALE_HALF_LIFE` and `TRENDING_WHALE_THRESHOLD` hub settings
This commit is contained in:
parent
65c0668d40
commit
34576e880d
7 changed files with 187 additions and 78 deletions
|
@ -24,13 +24,11 @@ from lbry.wallet.server.util import chunks, class_logger
|
||||||
from lbry.crypto.hash import hash160
|
from lbry.crypto.hash import hash160
|
||||||
from lbry.wallet.server.leveldb import FlushData
|
from lbry.wallet.server.leveldb import FlushData
|
||||||
from lbry.wallet.server.mempool import MemPool
|
from lbry.wallet.server.mempool import MemPool
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
|
||||||
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
|
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
|
||||||
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
|
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
|
||||||
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
|
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue
|
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes, ClaimToTXOValue
|
||||||
from lbry.wallet.server.db.trending import TrendingDB
|
|
||||||
from lbry.wallet.server.udp import StatusServer
|
from lbry.wallet.server.udp import StatusServer
|
||||||
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
|
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete, RevertableOpStack
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
@ -264,8 +262,6 @@ class BlockProcessor:
|
||||||
self.claim_channels: Dict[bytes, bytes] = {}
|
self.claim_channels: Dict[bytes, bytes] = {}
|
||||||
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
|
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
|
||||||
|
|
||||||
self.trending_db = TrendingDB(env.db_dir)
|
|
||||||
|
|
||||||
async def claim_producer(self):
|
async def claim_producer(self):
|
||||||
if self.db.db_height <= 1:
|
if self.db.db_height <= 1:
|
||||||
return
|
return
|
||||||
|
@ -314,11 +310,6 @@ class BlockProcessor:
|
||||||
await self.run_in_thread(self.advance_block, block)
|
await self.run_in_thread(self.advance_block, block)
|
||||||
await self.flush()
|
await self.flush()
|
||||||
|
|
||||||
# trending
|
|
||||||
extra_claims = self.trending_db.process_block(self.height,
|
|
||||||
self.daemon.cached_height())
|
|
||||||
self.touched_claims_to_send_es.union(extra_claims)
|
|
||||||
|
|
||||||
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
||||||
if self.height == self.coin.nExtendedClaimExpirationForkHeight:
|
if self.height == self.coin.nExtendedClaimExpirationForkHeight:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
|
@ -326,14 +317,15 @@ class BlockProcessor:
|
||||||
)
|
)
|
||||||
await self.run_in_thread(self.db.apply_expiration_extension_fork)
|
await self.run_in_thread(self.db.apply_expiration_extension_fork)
|
||||||
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||||
if not self.db.first_sync:
|
if not self.db.first_sync:
|
||||||
self.db.reload_blocking_filtering_streams()
|
self.db.reload_blocking_filtering_streams()
|
||||||
await self.db.search_index.claim_consumer(self.claim_producer())
|
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||||
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
|
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
|
||||||
self.db.filtered_streams, self.db.filtered_channels)
|
self.db.filtered_streams, self.db.filtered_channels)
|
||||||
self.db.search_index.clear_caches()
|
await self.db.search_index.apply_update_and_decay_trending_score()
|
||||||
self.touched_claims_to_send_es.clear()
|
self.db.search_index.clear_caches()
|
||||||
self.removed_claims_to_send_es.clear()
|
self.touched_claims_to_send_es.clear()
|
||||||
|
self.removed_claims_to_send_es.clear()
|
||||||
# print("******************\n")
|
# print("******************\n")
|
||||||
except:
|
except:
|
||||||
self.logger.exception("advance blocks failed")
|
self.logger.exception("advance blocks failed")
|
||||||
|
@ -368,16 +360,15 @@ class BlockProcessor:
|
||||||
await self.flush()
|
await self.flush()
|
||||||
self.logger.info(f'backed up to height {self.height:,d}')
|
self.logger.info(f'backed up to height {self.height:,d}')
|
||||||
|
|
||||||
await self.db._read_claim_txos() # TODO: don't do this
|
await self.db._read_claim_txos() # TODO: don't do this
|
||||||
|
for touched in self.touched_claims_to_send_es:
|
||||||
for touched in self.touched_claims_to_send_es:
|
if not self.db.get_claim_txo(touched):
|
||||||
if not self.db.get_claim_txo(touched):
|
self.removed_claims_to_send_es.add(touched)
|
||||||
self.removed_claims_to_send_es.add(touched)
|
self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es)
|
||||||
self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es)
|
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||||
await self.db.search_index.claim_consumer(self.claim_producer())
|
self.db.search_index.clear_caches()
|
||||||
self.db.search_index.clear_caches()
|
self.touched_claims_to_send_es.clear()
|
||||||
self.touched_claims_to_send_es.clear()
|
self.removed_claims_to_send_es.clear()
|
||||||
self.removed_claims_to_send_es.clear()
|
|
||||||
await self.prefetcher.reset_height(self.height)
|
await self.prefetcher.reset_height(self.height)
|
||||||
self.reorg_count_metric.inc()
|
self.reorg_count_metric.inc()
|
||||||
except:
|
except:
|
||||||
|
@ -485,6 +476,7 @@ class BlockProcessor:
|
||||||
|
|
||||||
if txo.script.is_claim_name: # it's a root claim
|
if txo.script.is_claim_name: # it's a root claim
|
||||||
root_tx_num, root_idx = tx_num, nout
|
root_tx_num, root_idx = tx_num, nout
|
||||||
|
previous_amount = 0
|
||||||
else: # it's a claim update
|
else: # it's a claim update
|
||||||
if claim_hash not in spent_claims:
|
if claim_hash not in spent_claims:
|
||||||
# print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}")
|
# print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}")
|
||||||
|
@ -511,6 +503,7 @@ class BlockProcessor:
|
||||||
previous_claim.amount
|
previous_claim.amount
|
||||||
).get_remove_activate_ops()
|
).get_remove_activate_ops()
|
||||||
)
|
)
|
||||||
|
previous_amount = previous_claim.amount
|
||||||
|
|
||||||
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
|
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
|
||||||
tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name
|
tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name
|
||||||
|
@ -524,11 +517,13 @@ class BlockProcessor:
|
||||||
self.txo_to_claim[(tx_num, nout)] = pending
|
self.txo_to_claim[(tx_num, nout)] = pending
|
||||||
self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
|
self.claim_hash_to_txo[claim_hash] = (tx_num, nout)
|
||||||
self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops())
|
self.db_op_stack.extend_ops(pending.get_add_claim_utxo_ops())
|
||||||
self.trending_db.add_event({"claim_hash": claim_hash,
|
|
||||||
"event": "upsert",
|
|
||||||
"lbc": 1E-8*txo.amount})
|
|
||||||
|
|
||||||
def _add_support(self, txo: 'Output', tx_num: int, nout: int):
|
# add the spike for trending
|
||||||
|
self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike(
|
||||||
|
height, claim_hash, tx_num, nout, txo.amount - previous_amount, half_life=self.env.trending_half_life
|
||||||
|
))
|
||||||
|
|
||||||
|
def _add_support(self, height: int, txo: 'Output', tx_num: int, nout: int):
|
||||||
supported_claim_hash = txo.claim_hash[::-1]
|
supported_claim_hash = txo.claim_hash[::-1]
|
||||||
self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout))
|
self.support_txos_by_claim[supported_claim_hash].append((tx_num, nout))
|
||||||
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount
|
self.support_txo_to_claim[(tx_num, nout)] = supported_claim_hash, txo.amount
|
||||||
|
@ -536,49 +531,54 @@ class BlockProcessor:
|
||||||
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
|
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
|
||||||
supported_claim_hash, tx_num, nout, txo.amount
|
supported_claim_hash, tx_num, nout, txo.amount
|
||||||
).get_add_support_utxo_ops())
|
).get_add_support_utxo_ops())
|
||||||
self.trending_db.add_event({"claim_hash": supported_claim_hash,
|
|
||||||
"event": "support",
|
# add the spike for trending
|
||||||
"lbc": 1E-8*txo.amount})
|
self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike(
|
||||||
|
height, supported_claim_hash, tx_num, nout, txo.amount, half_life=self.env.trending_half_life
|
||||||
|
))
|
||||||
|
|
||||||
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output',
|
def _add_claim_or_support(self, height: int, tx_hash: bytes, tx_num: int, nout: int, txo: 'Output',
|
||||||
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]):
|
spent_claims: typing.Dict[bytes, Tuple[int, int, str]]):
|
||||||
if txo.script.is_claim_name or txo.script.is_update_claim:
|
if txo.script.is_claim_name or txo.script.is_update_claim:
|
||||||
self._add_claim_or_update(height, txo, tx_hash, tx_num, nout, spent_claims)
|
self._add_claim_or_update(height, txo, tx_hash, tx_num, nout, spent_claims)
|
||||||
elif txo.script.is_support_claim or txo.script.is_support_claim_data:
|
elif txo.script.is_support_claim or txo.script.is_support_claim_data:
|
||||||
self._add_support(txo, tx_num, nout)
|
self._add_support(height, txo, tx_num, nout)
|
||||||
|
|
||||||
def _spend_support_txo(self, txin):
|
def _spend_support_txo(self, height: int, txin: TxInput):
|
||||||
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||||
|
activation = 0
|
||||||
if (txin_num, txin.prev_idx) in self.support_txo_to_claim:
|
if (txin_num, txin.prev_idx) in self.support_txo_to_claim:
|
||||||
spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx))
|
spent_support, support_amount = self.support_txo_to_claim.pop((txin_num, txin.prev_idx))
|
||||||
self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx))
|
self.support_txos_by_claim[spent_support].remove((txin_num, txin.prev_idx))
|
||||||
supported_name = self._get_pending_claim_name(spent_support)
|
supported_name = self._get_pending_claim_name(spent_support)
|
||||||
# print(f"\tspent support for {spent_support.hex()}")
|
|
||||||
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
|
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
|
||||||
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
|
txin_height = height
|
||||||
spent_support, txin_num, txin.prev_idx, support_amount
|
else:
|
||||||
).get_spend_support_txo_ops())
|
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
|
||||||
self.trending_db.add_event({"claim_hash": spent_support,
|
if not spent_support: # it is not a support
|
||||||
"event": "support",
|
return
|
||||||
"lbc": -1E-8*support_amount})
|
|
||||||
|
|
||||||
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
|
|
||||||
if spent_support:
|
|
||||||
supported_name = self._get_pending_claim_name(spent_support)
|
supported_name = self._get_pending_claim_name(spent_support)
|
||||||
if supported_name is not None:
|
if supported_name is not None:
|
||||||
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append((txin_num, txin.prev_idx))
|
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append(
|
||||||
|
(txin_num, txin.prev_idx))
|
||||||
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True)
|
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True)
|
||||||
|
txin_height = bisect_right(self.db.tx_counts, self.db.transaction_num_mapping[txin.prev_hash])
|
||||||
if 0 < activation < self.height + 1:
|
if 0 < activation < self.height + 1:
|
||||||
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
|
self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
|
||||||
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
|
|
||||||
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
|
|
||||||
spent_support, txin_num, txin.prev_idx, support_amount
|
|
||||||
).get_spend_support_txo_ops())
|
|
||||||
if supported_name is not None and activation > 0:
|
if supported_name is not None and activation > 0:
|
||||||
self.db_op_stack.extend_ops(StagedActivation(
|
self.db_op_stack.extend_ops(StagedActivation(
|
||||||
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
|
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
|
||||||
support_amount
|
support_amount
|
||||||
).get_remove_activate_ops())
|
).get_remove_activate_ops())
|
||||||
|
# print(f"\tspent support for {spent_support.hex()} activation:{activation} {support_amount}")
|
||||||
|
self.db_op_stack.extend_ops(StagedClaimtrieSupport(
|
||||||
|
spent_support, txin_num, txin.prev_idx, support_amount
|
||||||
|
).get_spend_support_txo_ops())
|
||||||
|
# add the spike for trending
|
||||||
|
self.db_op_stack.append_op(self.db.prefix_db.trending_spike.pack_spike(
|
||||||
|
height, spent_support, txin_num, txin.prev_idx, support_amount, subtract=True,
|
||||||
|
depth=height-txin_height-1, half_life=self.env.trending_half_life
|
||||||
|
))
|
||||||
|
|
||||||
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool:
|
def _spend_claim_txo(self, txin: TxInput, spent_claims: Dict[bytes, Tuple[int, int, str]]) -> bool:
|
||||||
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||||
|
@ -602,9 +602,9 @@ class BlockProcessor:
|
||||||
self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops())
|
self.db_op_stack.extend_ops(spent.get_spend_claim_txo_ops())
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _spend_claim_or_support_txo(self, txin, spent_claims):
|
def _spend_claim_or_support_txo(self, height: int, txin: TxInput, spent_claims):
|
||||||
if not self._spend_claim_txo(txin, spent_claims):
|
if not self._spend_claim_txo(txin, spent_claims):
|
||||||
self._spend_support_txo(txin)
|
self._spend_support_txo(height, txin)
|
||||||
|
|
||||||
def _abandon_claim(self, claim_hash: bytes, tx_num: int, nout: int, normalized_name: str):
|
def _abandon_claim(self, claim_hash: bytes, tx_num: int, nout: int, normalized_name: str):
|
||||||
if (tx_num, nout) in self.txo_to_claim:
|
if (tx_num, nout) in self.txo_to_claim:
|
||||||
|
@ -639,9 +639,6 @@ class BlockProcessor:
|
||||||
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
|
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
|
||||||
self._invalidate_channel_signatures(claim_hash)
|
self._invalidate_channel_signatures(claim_hash)
|
||||||
|
|
||||||
self.trending_db.add_event({"claim_hash": claim_hash,
|
|
||||||
"event": "delete"})
|
|
||||||
|
|
||||||
def _invalidate_channel_signatures(self, claim_hash: bytes):
|
def _invalidate_channel_signatures(self, claim_hash: bytes):
|
||||||
for k, signed_claim_hash in self.db.db.iterator(
|
for k, signed_claim_hash in self.db.db.iterator(
|
||||||
prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)):
|
prefix=Prefixes.channel_to_claim.pack_partial_key(claim_hash)):
|
||||||
|
@ -1216,7 +1213,7 @@ class BlockProcessor:
|
||||||
if tx_count not in self.hashXs_by_tx[hashX]:
|
if tx_count not in self.hashXs_by_tx[hashX]:
|
||||||
self.hashXs_by_tx[hashX].append(tx_count)
|
self.hashXs_by_tx[hashX].append(tx_count)
|
||||||
# spend claim/support txo
|
# spend claim/support txo
|
||||||
spend_claim_or_support_txo(txin, spent_claims)
|
spend_claim_or_support_txo(height, txin, spent_claims)
|
||||||
|
|
||||||
# Add the new UTXOs
|
# Add the new UTXOs
|
||||||
for nout, txout in enumerate(tx.outputs):
|
for nout, txout in enumerate(tx.outputs):
|
||||||
|
|
|
@ -37,3 +37,4 @@ class DB_PREFIXES(enum.Enum):
|
||||||
hashx_utxo = b'h'
|
hashx_utxo = b'h'
|
||||||
hashx_history = b'x'
|
hashx_history = b'x'
|
||||||
db_state = b's'
|
db_state = b's'
|
||||||
|
trending_spike = b't'
|
||||||
|
|
|
@ -31,7 +31,8 @@ INDEX_DEFAULT_SETTINGS = {
|
||||||
"claim_type": {"type": "byte"},
|
"claim_type": {"type": "byte"},
|
||||||
"censor_type": {"type": "byte"},
|
"censor_type": {"type": "byte"},
|
||||||
"trending_score": {"type": "float"},
|
"trending_score": {"type": "float"},
|
||||||
"release_time": {"type": "long"},
|
"trending_score_change": {"type": "float"},
|
||||||
|
"release_time": {"type": "long"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -53,7 +54,7 @@ FIELDS = {
|
||||||
'duration', 'release_time',
|
'duration', 'release_time',
|
||||||
'tags', 'languages', 'has_source', 'reposted_claim_type',
|
'tags', 'languages', 'has_source', 'reposted_claim_type',
|
||||||
'reposted_claim_id', 'repost_count',
|
'reposted_claim_id', 'repost_count',
|
||||||
'trending_score', 'tx_num'
|
'trending_score', 'tx_num', 'trending_score_change'
|
||||||
}
|
}
|
||||||
|
|
||||||
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id',
|
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id',
|
||||||
|
@ -66,7 +67,7 @@ RANGE_FIELDS = {
|
||||||
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
|
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
|
||||||
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
|
'tx_position', 'channel_join', 'repost_count', 'limit_claims_per_channel',
|
||||||
'amount', 'effective_amount', 'support_amount',
|
'amount', 'effective_amount', 'support_amount',
|
||||||
'trending_score', 'censor_type', 'tx_num'
|
'trending_score', 'censor_type', 'tx_num', 'trending_score_change'
|
||||||
}
|
}
|
||||||
|
|
||||||
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
|
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import math
|
||||||
import asyncio
|
import asyncio
|
||||||
import struct
|
import struct
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
|
@ -9,7 +10,6 @@ from typing import Optional, List, Iterable, Union
|
||||||
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
|
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
|
||||||
from elasticsearch.helpers import async_streaming_bulk
|
from elasticsearch.helpers import async_streaming_bulk
|
||||||
|
|
||||||
from lbry.crypto.base58 import Base58
|
|
||||||
from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
|
from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
|
||||||
from lbry.schema.result import Outputs, Censor
|
from lbry.schema.result import Outputs, Censor
|
||||||
from lbry.schema.tags import clean_tags
|
from lbry.schema.tags import clean_tags
|
||||||
|
@ -43,7 +43,8 @@ class IndexVersionMismatch(Exception):
|
||||||
class SearchIndex:
|
class SearchIndex:
|
||||||
VERSION = 1
|
VERSION = 1
|
||||||
|
|
||||||
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
|
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200,
|
||||||
|
half_life=0.4, whale_threshold=10000, whale_half_life=0.99):
|
||||||
self.search_timeout = search_timeout
|
self.search_timeout = search_timeout
|
||||||
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
|
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
|
||||||
self.search_client: Optional[AsyncElasticsearch] = None
|
self.search_client: Optional[AsyncElasticsearch] = None
|
||||||
|
@ -56,6 +57,9 @@ class SearchIndex:
|
||||||
self.resolution_cache = LRUCache(2 ** 17)
|
self.resolution_cache = LRUCache(2 ** 17)
|
||||||
self._elastic_host = elastic_host
|
self._elastic_host = elastic_host
|
||||||
self._elastic_port = elastic_port
|
self._elastic_port = elastic_port
|
||||||
|
self._trending_half_life = half_life
|
||||||
|
self._trending_whale_threshold = whale_threshold
|
||||||
|
self._trending_whale_half_life = whale_half_life
|
||||||
|
|
||||||
async def get_index_version(self) -> int:
|
async def get_index_version(self) -> int:
|
||||||
try:
|
try:
|
||||||
|
@ -121,7 +125,7 @@ class SearchIndex:
|
||||||
}
|
}
|
||||||
count += 1
|
count += 1
|
||||||
if count % 100 == 0:
|
if count % 100 == 0:
|
||||||
self.logger.debug("Indexing in progress, %d claims.", count)
|
self.logger.info("Indexing in progress, %d claims.", count)
|
||||||
if count:
|
if count:
|
||||||
self.logger.info("Indexing done for %d claims.", count)
|
self.logger.info("Indexing done for %d claims.", count)
|
||||||
else:
|
else:
|
||||||
|
@ -140,19 +144,57 @@ class SearchIndex:
|
||||||
self.logger.debug("Indexing done.")
|
self.logger.debug("Indexing done.")
|
||||||
|
|
||||||
def update_filter_query(self, censor_type, blockdict, channels=False):
|
def update_filter_query(self, censor_type, blockdict, channels=False):
|
||||||
blockdict = {key.hex(): value.hex() for key, value in blockdict.items()}
|
blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()}
|
||||||
if channels:
|
if channels:
|
||||||
update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
|
update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
|
||||||
else:
|
else:
|
||||||
update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
|
update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
|
||||||
key = 'channel_id' if channels else 'claim_id'
|
key = 'channel_id' if channels else 'claim_id'
|
||||||
update['script'] = {
|
update['script'] = {
|
||||||
"source": f"ctx._source.censor_type={censor_type}; ctx._source.censoring_channel_id=params[ctx._source.{key}]",
|
"source": f"ctx._source.censor_type={censor_type}; "
|
||||||
|
f"ctx._source.censoring_channel_id=params[ctx._source.{key}];",
|
||||||
"lang": "painless",
|
"lang": "painless",
|
||||||
"params": blockdict
|
"params": blockdict
|
||||||
}
|
}
|
||||||
return update
|
return update
|
||||||
|
|
||||||
|
async def apply_update_and_decay_trending_score(self):
|
||||||
|
update_trending_score_script = """
|
||||||
|
if (ctx._source.trending_score == null) {
|
||||||
|
ctx._source.trending_score = ctx._source.trending_score_change;
|
||||||
|
} else {
|
||||||
|
ctx._source.trending_score += ctx._source.trending_score_change;
|
||||||
|
}
|
||||||
|
ctx._source.trending_score_change = 0.0;
|
||||||
|
"""
|
||||||
|
await self.sync_client.update_by_query(
|
||||||
|
self.index, body={
|
||||||
|
'query': {
|
||||||
|
'bool': {'must_not': [{'match': {'trending_score_change': 0.0}}]}
|
||||||
|
},
|
||||||
|
'script': {'source': update_trending_score_script, 'lang': 'painless'}
|
||||||
|
}, slices=4, conflicts='proceed'
|
||||||
|
)
|
||||||
|
|
||||||
|
whale_decay_factor = 2 * (2.0 ** (-1 / self._trending_whale_half_life))
|
||||||
|
decay_factor = 2 * (2.0 ** (-1 / self._trending_half_life))
|
||||||
|
decay_script = """
|
||||||
|
if (ctx._source.trending_score == null) { ctx._source.trending_score = 0.0; }
|
||||||
|
if ((-0.000001 <= ctx._source.trending_score) && (ctx._source.trending_score <= 0.000001)) {
|
||||||
|
ctx._source.trending_score = 0.0;
|
||||||
|
} else if (ctx._source.effective_amount >= %s) {
|
||||||
|
ctx._source.trending_score *= %s;
|
||||||
|
} else {
|
||||||
|
ctx._source.trending_score *= %s;
|
||||||
|
}
|
||||||
|
""" % (self._trending_whale_threshold, whale_decay_factor, decay_factor)
|
||||||
|
await self.sync_client.update_by_query(
|
||||||
|
self.index, body={
|
||||||
|
'query': {'bool': {'must_not': [{'match': {'trending_score': 0.0}}]}},
|
||||||
|
'script': {'source': decay_script, 'lang': 'painless'}
|
||||||
|
}, slices=4, conflicts='proceed'
|
||||||
|
)
|
||||||
|
|
||||||
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
|
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
|
||||||
if filtered_streams:
|
if filtered_streams:
|
||||||
await self.sync_client.update_by_query(
|
await self.sync_client.update_by_query(
|
||||||
|
|
|
@ -463,6 +463,21 @@ class TouchedOrDeletedClaimValue(typing.NamedTuple):
|
||||||
f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})"
|
f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})"
|
||||||
|
|
||||||
|
|
||||||
|
class TrendingSpikeKey(typing.NamedTuple):
|
||||||
|
height: int
|
||||||
|
claim_hash: bytes
|
||||||
|
tx_num: int
|
||||||
|
position: int
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"{self.__class__.__name__}(height={self.height}, claim_hash={self.claim_hash.hex()}, " \
|
||||||
|
f"tx_num={self.tx_num}, position={self.position})"
|
||||||
|
|
||||||
|
|
||||||
|
class TrendingSpikeValue(typing.NamedTuple):
|
||||||
|
mass: float
|
||||||
|
|
||||||
|
|
||||||
class ActiveAmountPrefixRow(PrefixRow):
|
class ActiveAmountPrefixRow(PrefixRow):
|
||||||
prefix = DB_PREFIXES.active_amount.value
|
prefix = DB_PREFIXES.active_amount.value
|
||||||
key_struct = struct.Struct(b'>20sBLLH')
|
key_struct = struct.Struct(b'>20sBLLH')
|
||||||
|
@ -1335,6 +1350,47 @@ class TouchedOrDeletedPrefixRow(PrefixRow):
|
||||||
return cls.pack_key(height), cls.pack_value(touched, deleted)
|
return cls.pack_key(height), cls.pack_value(touched, deleted)
|
||||||
|
|
||||||
|
|
||||||
|
class TrendingSpikePrefixRow(PrefixRow):
|
||||||
|
prefix = DB_PREFIXES.trending_spike.value
|
||||||
|
key_struct = struct.Struct(b'>L20sLH')
|
||||||
|
value_struct = struct.Struct(b'>f')
|
||||||
|
|
||||||
|
key_part_lambdas = [
|
||||||
|
lambda: b'',
|
||||||
|
struct.Struct(b'>L').pack,
|
||||||
|
struct.Struct(b'>L20s').pack,
|
||||||
|
struct.Struct(b'>L20sL').pack,
|
||||||
|
struct.Struct(b'>L20sLH').pack
|
||||||
|
]
|
||||||
|
|
||||||
|
def pack_spike(self, height: int, claim_hash: bytes, tx_num: int, position: int, amount: int, half_life: int,
|
||||||
|
depth: int = 0, subtract: bool = False) -> RevertablePut:
|
||||||
|
softened_change = (((amount * 1E-8) + 1E-8) ** 0.25).real
|
||||||
|
spike_mass = (-1.0 if subtract else 1.0) * softened_change * 2 * ((2.0 ** (-1 / half_life)) ** depth)
|
||||||
|
# trending_spike_height = self.height + delay_trending_spike(self.amount * 1E-8)
|
||||||
|
return RevertablePut(*self.pack_item(height, claim_hash, tx_num, position, spike_mass))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_key(cls, height: int, claim_hash: bytes, tx_num: int, position: int):
|
||||||
|
return super().pack_key(height, claim_hash, tx_num, position)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_key(cls, key: bytes) -> TrendingSpikeKey:
|
||||||
|
return TrendingSpikeKey(*super().unpack_key(key))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_value(cls, mass: float) -> bytes:
|
||||||
|
return super().pack_value(mass)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_value(cls, data: bytes) -> TrendingSpikeValue:
|
||||||
|
return TrendingSpikeValue(*cls.value_struct.unpack(data))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_item(cls, height: int, claim_hash: bytes, tx_num: int, position: int, mass: float):
|
||||||
|
return cls.pack_key(height, claim_hash, tx_num, position), cls.pack_value(mass)
|
||||||
|
|
||||||
|
|
||||||
class Prefixes:
|
class Prefixes:
|
||||||
claim_to_support = ClaimToSupportPrefixRow
|
claim_to_support = ClaimToSupportPrefixRow
|
||||||
support_to_claim = SupportToClaimPrefixRow
|
support_to_claim = SupportToClaimPrefixRow
|
||||||
|
@ -1369,6 +1425,7 @@ class Prefixes:
|
||||||
tx = TXPrefixRow
|
tx = TXPrefixRow
|
||||||
header = BlockHeaderPrefixRow
|
header = BlockHeaderPrefixRow
|
||||||
touched_or_deleted = TouchedOrDeletedPrefixRow
|
touched_or_deleted = TouchedOrDeletedPrefixRow
|
||||||
|
trending_spike = TrendingSpikePrefixRow
|
||||||
|
|
||||||
|
|
||||||
class PrefixDB:
|
class PrefixDB:
|
||||||
|
@ -1402,6 +1459,7 @@ class PrefixDB:
|
||||||
self.tx = TXPrefixRow(db, op_stack)
|
self.tx = TXPrefixRow(db, op_stack)
|
||||||
self.header = BlockHeaderPrefixRow(db, op_stack)
|
self.header = BlockHeaderPrefixRow(db, op_stack)
|
||||||
self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack)
|
self.touched_or_deleted = TouchedOrDeletedPrefixRow(db, op_stack)
|
||||||
|
self.trending_spike = TrendingSpikePrefixRow(db, op_stack)
|
||||||
|
|
||||||
def commit(self):
|
def commit(self):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -42,6 +42,9 @@ class Env:
|
||||||
self.trending_algorithms = [
|
self.trending_algorithms = [
|
||||||
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
|
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
|
||||||
]
|
]
|
||||||
|
self.trending_half_life = float(self.string_amount('TRENDING_HALF_LIFE', "0.9"))
|
||||||
|
self.trending_whale_half_life = float(self.string_amount('TRENDING_WHALE_HALF_LIFE', "0.99"))
|
||||||
|
self.trending_whale_threshold = float(self.integer('TRENDING_WHALE_THRESHOLD', 10000))
|
||||||
self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None)
|
self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None)
|
||||||
self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True)
|
self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True)
|
||||||
self.track_metrics = self.boolean('TRACK_METRICS', False)
|
self.track_metrics = self.boolean('TRACK_METRICS', False)
|
||||||
|
|
|
@ -22,7 +22,7 @@ from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from asyncio import sleep
|
from asyncio import sleep
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from collections import defaultdict, OrderedDict
|
from collections import defaultdict
|
||||||
|
|
||||||
from lbry.error import ResolveCensoredError
|
from lbry.error import ResolveCensoredError
|
||||||
from lbry.schema.result import Censor
|
from lbry.schema.result import Censor
|
||||||
|
@ -38,7 +38,6 @@ from lbry.wallet.server.db.revertable import RevertableOpStack
|
||||||
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
|
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
||||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
|
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
|
||||||
from lbry.wallet.server.db.trending import TrendingDB
|
|
||||||
from lbry.wallet.transaction import OutputScript
|
from lbry.wallet.transaction import OutputScript
|
||||||
from lbry.schema.claim import Claim, guess_stream_type
|
from lbry.schema.claim import Claim, guess_stream_type
|
||||||
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
|
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
|
||||||
|
@ -150,13 +149,15 @@ class LevelDB:
|
||||||
self.total_transactions = None
|
self.total_transactions = None
|
||||||
self.transaction_num_mapping = {}
|
self.transaction_num_mapping = {}
|
||||||
|
|
||||||
self.claim_to_txo: typing.OrderedDict[bytes, ClaimToTXOValue] = OrderedDict()
|
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
|
||||||
self.txo_to_claim: typing.OrderedDict[Tuple[int, int], bytes] = OrderedDict()
|
self.txo_to_claim: Dict[Tuple[int, int], bytes] = {}
|
||||||
|
|
||||||
# Search index
|
# Search index
|
||||||
self.search_index = SearchIndex(
|
self.search_index = SearchIndex(
|
||||||
self.env.es_index_prefix, self.env.database_query_timeout,
|
self.env.es_index_prefix, self.env.database_query_timeout,
|
||||||
elastic_host=env.elastic_host, elastic_port=env.elastic_port
|
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
|
||||||
|
half_life=self.env.trending_half_life, whale_threshold=self.env.trending_whale_threshold,
|
||||||
|
whale_half_life=self.env.trending_whale_half_life
|
||||||
)
|
)
|
||||||
|
|
||||||
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
||||||
|
@ -168,8 +169,6 @@ class LevelDB:
|
||||||
else:
|
else:
|
||||||
self.ledger = RegTestLedger
|
self.ledger = RegTestLedger
|
||||||
|
|
||||||
self.trending_db = TrendingDB(self.env.db_dir)
|
|
||||||
|
|
||||||
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
||||||
claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
|
claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
|
||||||
if not claim_hash_and_name:
|
if not claim_hash_and_name:
|
||||||
|
@ -188,6 +187,12 @@ class LevelDB:
|
||||||
cnt += 1
|
cnt += 1
|
||||||
return cnt
|
return cnt
|
||||||
|
|
||||||
|
def get_trending_spike_sum(self, height: int, claim_hash: bytes) -> float:
|
||||||
|
spikes = 0.0
|
||||||
|
for k, v in self.prefix_db.trending_spike.iterate(prefix=(height, claim_hash)):
|
||||||
|
spikes += v.mass
|
||||||
|
return spikes
|
||||||
|
|
||||||
def get_activation(self, tx_num, position, is_support=False) -> int:
|
def get_activation(self, tx_num, position, is_support=False) -> int:
|
||||||
activation = self.db.get(
|
activation = self.db.get(
|
||||||
Prefixes.activated.pack_key(
|
Prefixes.activated.pack_key(
|
||||||
|
@ -704,12 +709,9 @@ class LevelDB:
|
||||||
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
|
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
|
||||||
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
|
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
|
||||||
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
|
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
|
||||||
'trending_score': self.trending_db.get_trending_score(claim_hash)
|
'trending_score_change': self.get_trending_spike_sum(self.db_height, claim_hash)
|
||||||
# 'trending_group': 0,
|
|
||||||
# 'trending_mixed': 0,
|
|
||||||
# 'trending_local': 0,
|
|
||||||
# 'trending_global': 0,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if metadata.is_repost and reposted_duration is not None:
|
if metadata.is_repost and reposted_duration is not None:
|
||||||
value['duration'] = reposted_duration
|
value['duration'] = reposted_duration
|
||||||
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
||||||
|
@ -748,7 +750,7 @@ class LevelDB:
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
continue
|
continue
|
||||||
name = self.claim_to_txo[claim_hash].normalized_name
|
name = self.claim_to_txo[claim_hash].normalized_name
|
||||||
if not self.db.get(Prefixes.claim_takeover.pack_key(name)):
|
if not self.prefix_db.claim_takeover.get(name):
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
continue
|
continue
|
||||||
claim = self._fs_get_claim_by_hash(claim_hash)
|
claim = self._fs_get_claim_by_hash(claim_hash)
|
||||||
|
@ -944,6 +946,11 @@ class LevelDB:
|
||||||
stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False
|
stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
delete_undo_keys.extend(
|
||||||
|
self.db.iterator(
|
||||||
|
prefix=Prefixes.trending_spike.pack_partial_key(min_height), include_value=False
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
with self.db.write_batch(transaction=True) as batch:
|
with self.db.write_batch(transaction=True) as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
|
|
Loading…
Reference in a new issue