diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 8b9ffb093..3236b1152 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -18,8 +18,8 @@ from lbry.crypto.hash import hash160 from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport -from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation -from lbry.wallet.server.db.claimtrie import get_remove_name_ops +from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops +from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue from lbry.wallet.server.udp import StatusServer @@ -237,6 +237,75 @@ class BlockProcessor: self.possible_future_activated_claim: Dict[Tuple[str, bytes], int] = {} self.possible_future_support_txos: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) + self.removed_claims_to_send_es = set() + self.touched_claims_to_send_es = set() + + def claim_producer(self): + if self.db.db_height <= 1: + return + for claim_hash in self.removed_claims_to_send_es: + yield 'delete', claim_hash.hex() + for claim_hash in self.touched_claims_to_send_es: + claim = self.db._fs_get_claim_by_hash(claim_hash) + yield ('update', { + 'claim_hash': claim_hash, + # 'claim_id': claim_hash.hex(), + 'claim_name': claim.name, + 'normalized': claim.name, + 'tx_id': claim.tx_hash[::-1].hex(), + 'tx_nout': claim.position, + 'amount': claim.amount, + 'timestamp': 0, + 'creation_timestamp': 0, + 'height': claim.height, + 'creation_height': claim.creation_height, + 'activation_height': claim.activation_height, + 'expiration_height': claim.expiration_height, + 'effective_amount': claim.effective_amount, + 'support_amount': claim.support_amount, + 'is_controlling': claim.is_controlling, + 'last_take_over_height': claim.last_takeover_height, + + 'short_url': '', + 'canonical_url': '', + + 'release_time': 0, + 'title': '', + 'author': '', + 'description': '', + 'claim_type': 0, + 'has_source': False, + 'stream_type': '', + 'media_type': '', + 'fee_amount': 0, + 'fee_currency': '', + 'duration': 0, + + 'reposted': 0, + 'reposted_claim_hash': None, + 'reposted_claim_type': None, + 'reposted_has_source': False, + + 'channel_hash': None, + + 'public_key_bytes': None, + 'public_key_hash': None, + 'signature': None, + 'signature_digest': None, + 'signature_valid': False, + 'claims_in_channel': 0, + + 'tags': [], + 'languages': [], + + 'censor_type': 0, + 'censoring_channel_hash': None, + # 'trending_group': 0, + # 'trending_mixed': 0, + # 'trending_local': 0, + # 'trending_global': 0, + }) + async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -266,12 +335,15 @@ class BlockProcessor: try: for block in blocks: await self.run_in_thread_with_lock(self.advance_block, block) + await self.db.search_index.claim_consumer(self.claim_producer()) + self.touched_claims_to_send_es.clear() + self.removed_claims_to_send_es.clear() print("******************\n") except: self.logger.exception("advance blocks failed") raise # if self.sql: - # await self.db.search_index.claim_consumer(self.db.claim_producer()) + for cache in self.search_cache.values(): cache.clear() self.history_cache.clear() # TODO: is this needed? @@ -948,6 +1020,43 @@ class BlockProcessor: if (controlling and winning != controlling.claim_hash) or (not controlling and winning): print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") ops.extend(get_takeover_name_ops(name, winning, height, controlling)) + + # gather cumulative removed/touched sets to update the search index + self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys())) + self.touched_claims_to_send_es.update( + set(self.staged_activated_support.keys()).union(set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())).difference( + self.removed_claims_to_send_es) + ) + + # for use the cumulative changes to now update bid ordered resolve + for removed in self.removed_claims_to_send_es: + removed_claim = self.db.get_claim_txo(removed) + if not removed_claim: + continue + k, v = removed_claim + name, tx_num, position = v.name, k.tx_num, k.position + ops.extend(get_remove_effective_amount_ops( + name, self.db.get_effective_amount(removed), tx_num, position, removed + )) + for touched in self.touched_claims_to_send_es: + if touched in self.pending_claim_txos: + pending = self.pending_claims[self.pending_claim_txos[touched]] + name, tx_num, position = pending.name, pending.tx_num, pending.position + claim_from_db = self.db.get_claim_txo(touched) + if claim_from_db: + k, v = claim_from_db + prev_tx_num, prev_position = k.tx_num, k.position + ops.extend(get_remove_effective_amount_ops( + name, self.db.get_effective_amount(touched), prev_tx_num, prev_position, touched + )) + else: + k, v = self.db.get_claim_txo(touched) + name, tx_num, position = v.name, k.tx_num, k.position + ops.extend(get_remove_effective_amount_ops( + name, self.db.get_effective_amount(touched), tx_num, position, touched + )) + ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched), + tx_num, position, touched)) return ops def advance_block(self, block): @@ -1060,8 +1169,6 @@ class BlockProcessor: self.db.flush_dbs(self.flush_data()) - # self.effective_amount_changes.clear() - self.pending_claims.clear() self.pending_claim_txos.clear() self.pending_supports.clear() diff --git a/lbry/wallet/server/db/claimtrie.py b/lbry/wallet/server/db/claimtrie.py index 9493d0054..4c8aa301c 100644 --- a/lbry/wallet/server/db/claimtrie.py +++ b/lbry/wallet/server/db/claimtrie.py @@ -2,7 +2,7 @@ import typing from typing import Optional from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue +from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE @@ -115,6 +115,18 @@ def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int, ] +def get_remove_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes): + return [ + RevertableDelete(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash)) + ] + + +def get_add_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes): + return [ + RevertablePut(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash)) + ] + + class StagedClaimtrieItem(typing.NamedTuple): name: str claim_hash: bytes diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 75f7f4a0e..99ca99887 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -170,48 +170,43 @@ class SearchIndex: self.claim_cache.clear() self.resolution_cache.clear() - async def session_query(self, query_name, kwargs): - offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0 + async def cached_search(self, kwargs): total_referenced = [] - if query_name == 'resolve': - total_referenced, response, censor = await self.resolve(*kwargs) - else: - cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache) - if cache_item.result is not None: + cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache) + if cache_item.result is not None: + return cache_item.result + async with cache_item.lock: + if cache_item.result: return cache_item.result - async with cache_item.lock: - if cache_item.result: - return cache_item.result - censor = Censor(Censor.SEARCH) - if kwargs.get('no_totals'): - response, offset, total = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) - else: - response, offset, total = await self.search(**kwargs) - censor.apply(response) + censor = Censor(Censor.SEARCH) + if kwargs.get('no_totals'): + response, offset, total = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) + else: + response, offset, total = await self.search(**kwargs) + censor.apply(response) + total_referenced.extend(response) + if censor.censored: + response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) total_referenced.extend(response) - if censor.censored: - response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) - total_referenced.extend(response) - result = Outputs.to_base64( - response, await self._get_referenced_rows(total_referenced), offset, total, censor - ) - cache_item.result = result - return result - return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor) + result = Outputs.to_base64( + response, await self._get_referenced_rows(total_referenced), offset, total, censor + ) + cache_item.result = result + return result - async def resolve(self, *urls): - censor = Censor(Censor.RESOLVE) - results = [await self.resolve_url(url) for url in urls] - # just heat the cache - await self.populate_claim_cache(*filter(lambda x: isinstance(x, str), results)) - results = [self._get_from_cache_or_error(url, result) for url, result in zip(urls, results)] - - censored = [ - result if not isinstance(result, dict) or not censor.censor(result) - else ResolveCensoredError(url, result['censoring_channel_id']) - for url, result in zip(urls, results) - ] - return results, censored, censor + # async def resolve(self, *urls): + # censor = Censor(Censor.RESOLVE) + # results = [await self.resolve_url(url) for url in urls] + # # just heat the cache + # await self.populate_claim_cache(*filter(lambda x: isinstance(x, str), results)) + # results = [self._get_from_cache_or_error(url, result) for url, result in zip(urls, results)] + # + # censored = [ + # result if not isinstance(result, dict) or not censor.censor(result) + # else ResolveCensoredError(url, result['censoring_channel_hash']) + # for url, result in zip(urls, results) + # ] + # return results, censored, censor def _get_from_cache_or_error(self, url: str, resolution: Union[LookupError, StreamResolution, ChannelResolution]): cached = self.claim_cache.get(resolution) @@ -432,10 +427,11 @@ def extract_doc(doc, index): doc['reposted_claim_id'] = None channel_hash = doc.pop('channel_hash') doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash - doc['censoring_channel_id'] = doc.get('censoring_channel_id') - txo_hash = doc.pop('txo_hash') - doc['tx_id'] = txo_hash[:32][::-1].hex() - doc['tx_nout'] = struct.unpack('