From da4e4ecd23889ec43524fb40489a894abe6f3a7c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 17 Jun 2021 21:19:31 -0400 Subject: [PATCH] _prepare_claim_for_sync generators --- lbry/wallet/server/block_processor.py | 155 +--------------------- lbry/wallet/server/leveldb.py | 179 +++++++++++++++++++++++++- 2 files changed, 184 insertions(+), 150 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e3b63d723..3080c669e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -26,11 +26,11 @@ from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrie from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE -from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue +from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes from lbry.wallet.server.udp import StatusServer +from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete if typing.TYPE_CHECKING: from lbry.wallet.server.leveldb import LevelDB - from lbry.wallet.server.db.revertable import RevertableOp class Prefetcher: @@ -259,22 +259,6 @@ class BlockProcessor: self.amount_cache = {} def claim_producer(self): - def get_claim_txo(tx_hash, nout): - raw = self.db.db.get( - DB_PREFIXES.TX_PREFIX.value + tx_hash - ) - try: - output: TxOutput = self.coin.transaction(raw).outputs[nout] - script = OutputScript(output.pk_script) - script.parse() - return Claim.from_bytes(script.values['claim']) - except: - self.logger.error( - "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), - raw.hex() - ) - return - if self.db.db_height <= 1: return @@ -284,135 +268,8 @@ class BlockProcessor: for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() - for claim_hash in to_send_es: - claim = self.db._fs_get_claim_by_hash(claim_hash) - metadata = get_claim_txo(claim.tx_hash, claim.position) - if not metadata: - continue - reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1] - reposted_claim = None - reposted_metadata = None - if reposted_claim_hash: - reposted_claim = self.db.get_claim_txo(reposted_claim_hash) - if not reposted_claim: - continue - reposted_metadata = get_claim_txo( - self.db.total_transactions[reposted_claim.tx_num], reposted_claim.position - ) - if not reposted_metadata: - continue - reposted_tags = [] - reposted_languages = [] - reposted_has_source = None - reposted_claim_type = None - if reposted_claim: - reposted_tx_hash = self.db.total_transactions[reposted_claim.tx_num] - raw_reposted_claim_tx = self.db.db.get( - DB_PREFIXES.TX_PREFIX.value + reposted_tx_hash - ) - try: - reposted_claim_txo: TxOutput = self.coin.transaction( - raw_reposted_claim_tx - ).outputs[reposted_claim.position] - reposted_script = OutputScript(reposted_claim_txo.pk_script) - reposted_script.parse() - except: - self.logger.error( - "repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), - raw_reposted_claim_tx.hex() - ) - continue - try: - reposted_metadata = Claim.from_bytes(reposted_script.values['claim']) - except: - self.logger.error( - "reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), - raw_reposted_claim_tx.hex() - ) - continue - if reposted_metadata: - reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags] - reposted_languages = [] if not reposted_metadata.is_stream else ( - [lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none'] - ) - reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source - reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type] - claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags] - claim_languages = [] if not metadata.is_stream else ( - [lang.language or 'none' for lang in metadata.stream.languages] or ['none'] - ) - tags = list(set(claim_tags).union(set(reposted_tags))) - languages = list(set(claim_languages).union(set(reposted_languages))) - canonical_url = f'{claim.name}#{claim.claim_hash.hex()}' - if metadata.is_signed: - channel_name = self._get_pending_claim_name(metadata.signing_channel_hash[::-1]) - canonical_url = f'{channel_name}#{metadata.signing_channel_hash[::-1].hex()}/{canonical_url}' - - value = { - 'claim_hash': claim_hash[::-1], - # 'claim_id': claim_hash.hex(), - 'claim_name': claim.name, - 'normalized': claim.name, - 'tx_id': claim.tx_hash[::-1].hex(), - 'tx_num': claim.tx_num, - 'tx_nout': claim.position, - 'amount': claim.amount, - 'timestamp': 0, # TODO: fix - 'creation_timestamp': 0, # TODO: fix - 'height': claim.height, - 'creation_height': claim.creation_height, - 'activation_height': claim.activation_height, - 'expiration_height': claim.expiration_height, - 'effective_amount': claim.effective_amount, - 'support_amount': claim.support_amount, - 'is_controlling': claim.is_controlling, - 'last_take_over_height': claim.last_takeover_height, - - 'short_url': f'{claim.name}#{claim.claim_hash.hex()}', # TODO: fix - 'canonical_url': canonical_url, - - 'title': None if not metadata.is_stream else metadata.stream.title, - 'author': None if not metadata.is_stream else metadata.stream.author, - 'description': None if not metadata.is_stream else metadata.stream.description, - 'claim_type': CLAIM_TYPES[metadata.claim_type], - 'has_source': None if not metadata.is_stream else metadata.stream.has_source, - 'stream_type': None if not metadata.is_stream else STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)], - 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type, - 'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int( - max(metadata.stream.fee.amount or 0, 0)*1000 - ), - 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency, - - 'reposted': self.db.get_reposted_count(claim_hash), - 'reposted_claim_hash': reposted_claim_hash, - 'reposted_claim_type': reposted_claim_type, - 'reposted_has_source': reposted_has_source, - - 'channel_hash': metadata.signing_channel_hash, - - 'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes, - 'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160( - self.ledger.public_key_to_address(metadata.channel.public_key_bytes) - ), - 'signature': metadata.signature, - 'signature_digest': None, # TODO: fix - 'signature_valid': claim.channel_hash is not None, # TODO: fix - 'tags': tags, - 'languages': languages, - 'censor_type': 0, # TODO: fix - 'censoring_channel_hash': None, # TODO: fix - # 'trending_group': 0, - # 'trending_mixed': 0, - # 'trending_local': 0, - # 'trending_global': 0, - } - if metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration): - value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration - if metadata.is_stream and metadata.stream.release_time: - value['release_time'] = metadata.stream.release_time - if metadata.is_channel: - value['claims_in_channel'] = self.db.get_claims_in_channel_count(claim_hash) - yield 'update', value + for claim in self.db.claims_producer(to_send_es): + yield 'update', claim async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -444,9 +301,9 @@ class BlockProcessor: for block in blocks: start = time.perf_counter() await self.run_in_thread_with_lock(self.advance_block, block) - self.logger.info("advanced to %i in %ds", self.height, time.perf_counter() - start) + self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) # TODO: we shouldnt wait on the search index updating before advancing to the next block - # await self.db.search_index.claim_consumer(self.claim_producer()) + await self.db.search_index.claim_consumer(self.claim_producer()) self.db.search_index.clear_caches() self.touched_claims_to_send_es.clear() self.removed_claims_to_send_es.clear() diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 0c2af0198..ab9d10e0b 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -36,11 +36,14 @@ from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_ui from lbry.wallet.server.storage import db_class from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.common import ResolveResult +from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue from lbry.wallet.server.db.claimtrie import length_encoded_name +from lbry.wallet.transaction import OutputScript +from lbry.schema.claim import Claim, guess_stream_type +from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger from lbry.wallet.server.db.elasticsearch import SearchIndex @@ -152,6 +155,13 @@ class LevelDB: self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH) + if env.coin.NET == 'mainnet': + self.ledger = Ledger + elif env.coin.NET == 'testnet': + self.ledger = TestNetLedger + else: + self.ledger = RegTestLedger + def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]: claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx)) if not claim_hash_and_name: @@ -429,6 +439,168 @@ class LevelDB: txos[claim_hash] = tx_num, nout return txos + def get_claim_output_script(self, tx_hash, nout): + raw = self.db.get( + DB_PREFIXES.TX_PREFIX.value + tx_hash + ) + try: + output = self.coin.transaction(raw).outputs[nout] + script = OutputScript(output.pk_script) + script.parse() + return Claim.from_bytes(script.values['claim']) + except: + self.logger.error( + "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), + raw.hex() + ) + return + + def _prepare_claim_for_sync(self, claim_hash: bytes): + claim = self._fs_get_claim_by_hash(claim_hash) + if not claim: + print("wat") + return + metadata = self.get_claim_output_script(claim.tx_hash, claim.position) + if not metadata: + return + reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1] + reposted_claim = None + reposted_metadata = None + if reposted_claim_hash: + reposted_claim = self.get_claim_txo(reposted_claim_hash) + if not reposted_claim: + return + reposted_metadata = self.get_claim_output_script( + self.total_transactions[reposted_claim.tx_num], reposted_claim.position + ) + if not reposted_metadata: + return + reposted_tags = [] + reposted_languages = [] + reposted_has_source = None + reposted_claim_type = None + if reposted_claim: + reposted_tx_hash = self.total_transactions[reposted_claim.tx_num] + raw_reposted_claim_tx = self.db.get( + DB_PREFIXES.TX_PREFIX.value + reposted_tx_hash + ) + try: + reposted_claim_txo = self.coin.transaction( + raw_reposted_claim_tx + ).outputs[reposted_claim.position] + reposted_script = OutputScript(reposted_claim_txo.pk_script) + reposted_script.parse() + except: + self.logger.error( + "repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), + raw_reposted_claim_tx.hex() + ) + return + try: + reposted_metadata = Claim.from_bytes(reposted_script.values['claim']) + except: + self.logger.error( + "reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), + raw_reposted_claim_tx.hex() + ) + return + if reposted_metadata: + reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags] + reposted_languages = [] if not reposted_metadata.is_stream else ( + [lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none'] + ) + reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source + reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type] + claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags] + claim_languages = [] if not metadata.is_stream else ( + [lang.language or 'none' for lang in metadata.stream.languages] or ['none'] + ) + tags = list(set(claim_tags).union(set(reposted_tags))) + languages = list(set(claim_languages).union(set(reposted_languages))) + canonical_url = f'{claim.name}#{claim.claim_hash.hex()}' + if metadata.is_signed: + channel = self.get_claim_txo(metadata.signing_channel_hash[::-1]) + if channel: + canonical_url = f'{channel.name}#{metadata.signing_channel_hash[::-1].hex()}/{canonical_url}' + + value = { + 'claim_hash': claim_hash[::-1], + # 'claim_id': claim_hash.hex(), + 'claim_name': claim.name, + 'normalized': claim.name, + 'tx_id': claim.tx_hash[::-1].hex(), + 'tx_num': claim.tx_num, + 'tx_nout': claim.position, + 'amount': claim.amount, + 'timestamp': 0, # TODO: fix + 'creation_timestamp': 0, # TODO: fix + 'height': claim.height, + 'creation_height': claim.creation_height, + 'activation_height': claim.activation_height, + 'expiration_height': claim.expiration_height, + 'effective_amount': claim.effective_amount, + 'support_amount': claim.support_amount, + 'is_controlling': claim.is_controlling, + 'last_take_over_height': claim.last_takeover_height, + + 'short_url': f'{claim.name}#{claim.claim_hash.hex()}', # TODO: fix + 'canonical_url': canonical_url, + + 'title': None if not metadata.is_stream else metadata.stream.title, + 'author': None if not metadata.is_stream else metadata.stream.author, + 'description': None if not metadata.is_stream else metadata.stream.description, + 'claim_type': CLAIM_TYPES[metadata.claim_type], + 'has_source': None if not metadata.is_stream else metadata.stream.has_source, + 'stream_type': None if not metadata.is_stream else STREAM_TYPES[ + guess_stream_type(metadata.stream.source.media_type)], + 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type, + 'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int( + max(metadata.stream.fee.amount or 0, 0) * 1000 + ), + 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency, + + 'reposted': self.get_reposted_count(claim_hash), + 'reposted_claim_hash': reposted_claim_hash, + 'reposted_claim_type': reposted_claim_type, + 'reposted_has_source': reposted_has_source, + + 'channel_hash': metadata.signing_channel_hash, + + 'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes, + 'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160( + self.ledger.public_key_to_address(metadata.channel.public_key_bytes) + ), + 'signature': metadata.signature, + 'signature_digest': None, # TODO: fix + 'signature_valid': claim.signature_valid, + 'tags': tags, + 'languages': languages, + 'censor_type': 0, # TODO: fix + 'censoring_channel_hash': None, # TODO: fix + 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash) + # 'trending_group': 0, + # 'trending_mixed': 0, + # 'trending_local': 0, + # 'trending_global': 0, + } + if metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration): + value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration + if metadata.is_stream and metadata.stream.release_time: + value['release_time'] = metadata.stream.release_time + return value + + def all_claims_producer(self): + for claim_hash in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix, include_value=False): + claim = self._prepare_claim_for_sync(claim_hash[1:]) + if claim: + yield claim + + def claims_producer(self, claim_hashes: Set[bytes]): + for claim_hash in claim_hashes: + result = self._prepare_claim_for_sync(claim_hash) + if result: + yield result + def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list) for _k, _v in self.db.iterator(prefix=Prefixes.pending_activation.pack_partial_key(height)): @@ -567,6 +739,11 @@ class LevelDB: if height >= min_height: break keys.append(key) + if min_height > 0: + for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value, + stop=DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(min_height), + include_value=False): + keys.append(key) if keys: with self.db.write_batch() as batch: for key in keys: