From b4853c5f672ea19d65a08a7b65efef1879eda2f9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 6 Aug 2021 14:11:28 -0400 Subject: [PATCH] fix merge conflicts and simplify extract_doc --- lbry/error/__init__.py | 3 +- lbry/schema/result.py | 4 +- lbry/wallet/server/db/elasticsearch/search.py | 50 ++++++------------- lbry/wallet/server/db/elasticsearch/sync.py | 11 +++- lbry/wallet/server/leveldb.py | 41 +++++++-------- 5 files changed, 47 insertions(+), 62 deletions(-) diff --git a/lbry/error/__init__.py b/lbry/error/__init__.py index 7f16a3a41..f8c9d3165 100644 --- a/lbry/error/__init__.py +++ b/lbry/error/__init__.py @@ -252,9 +252,10 @@ class ResolveTimeoutError(WalletError): class ResolveCensoredError(WalletError): - def __init__(self, url, censor_id): + def __init__(self, url, censor_id, censor_row): self.url = url self.censor_id = censor_id + self.censor_row = censor_row super().__init__(f"Resolve of '{url}' was censored by channel with claim id '{censor_id}'.") diff --git a/lbry/schema/result.py b/lbry/schema/result.py index 5e3bf54b9..eed4b9d6d 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -44,7 +44,7 @@ class Censor: def censor(self, row) -> Optional[bytes]: if self.is_censored(row): - censoring_channel_hash = row['censoring_channel_hash'] + censoring_channel_hash = bytes.fromhex(row['censoring_channel_id'])[::-1] self.censored.setdefault(censoring_channel_hash, set()) self.censored[censoring_channel_hash].add(row['tx_hash']) return censoring_channel_hash @@ -192,7 +192,7 @@ class Outputs: if row.reposted_claim_hash: set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows) elif isinstance(row, ResolveCensoredError): - set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows) + set_reference(txo_message.error.blocked.channel, row.censor_id, extra_txo_rows) return page.SerializeToString() @classmethod diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 0e333ae22..4a8f309d8 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -106,9 +106,19 @@ class SearchIndex: count = 0 async for op, doc in claim_producer: if op == 'delete': - yield {'_index': self.index, '_op_type': 'delete', '_id': doc} + yield { + '_index': self.index, + '_op_type': 'delete', + '_id': doc + } else: - yield extract_doc(doc, self.index) + yield { + 'doc': {key: value for key, value in doc.items() if key in ALL_FIELDS}, + '_id': doc['claim_id'], + '_index': self.index, + '_op_type': 'update', + 'doc_as_upsert': True + } count += 1 if count % 100 == 0: self.logger.debug("Indexing in progress, %d claims.", count) @@ -474,34 +484,6 @@ class SearchIndex: return referenced_txos -def extract_doc(doc, index): - doc['claim_id'] = doc.pop('claim_hash')[::-1].hex() - if doc['reposted_claim_hash'] is not None: - doc['reposted_claim_id'] = doc.pop('reposted_claim_hash').hex() - else: - doc['reposted_claim_id'] = None - channel_hash = doc.pop('channel_hash') - doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash - channel_hash = doc.pop('censoring_channel_hash') - doc['censoring_channel_hash'] = channel_hash.hex() if channel_hash else channel_hash - # txo_hash = doc.pop('txo_hash') - # doc['tx_id'] = txo_hash[:32][::-1].hex() - # doc['tx_nout'] = struct.unpack('32sLL32sLLBBlll') DB_STATE_STRUCT_SIZE = 94 @@ -527,7 +527,7 @@ class LevelDB: output = self.coin.transaction(raw).outputs[nout] script = OutputScript(output.pk_script) script.parse() - return Claim.from_bytes(script.values['claim']) + return Claim.from_bytes(script.values['claim']), ''.join(chr(c) for c in script.values['claim_name']) except: self.logger.error( "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), @@ -546,6 +546,7 @@ class LevelDB: metadata = self.get_claim_metadata(claim.tx_hash, claim.position) if not metadata: return + metadata, non_normalized_name = metadata if not metadata.is_stream or not metadata.stream.has_fee: fee_amount = 0 else: @@ -564,6 +565,7 @@ class LevelDB: ) if not reposted_metadata: return + reposted_metadata, _ = reposted_metadata reposted_tags = [] reposted_languages = [] reposted_has_source = None @@ -632,10 +634,9 @@ class LevelDB: reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get( reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash) value = { - 'claim_hash': claim_hash[::-1], - # 'claim_id': claim_hash.hex(), - 'claim_name': claim.name, - 'normalized': claim.name, + 'claim_id': claim_hash.hex(), + 'claim_name': non_normalized_name, + 'normalized_name': claim.name, 'tx_id': claim.tx_hash[::-1].hex(), 'tx_num': claim.tx_num, 'tx_nout': claim.position, @@ -648,7 +649,7 @@ class LevelDB: 'expiration_height': claim.expiration_height, 'effective_amount': claim.effective_amount, 'support_amount': claim.support_amount, - 'is_controlling': claim.is_controlling, + 'is_controlling': bool(claim.is_controlling), 'last_take_over_height': claim.last_takeover_height, 'short_url': claim.short_url, 'canonical_url': claim.canonical_url, @@ -658,30 +659,26 @@ class LevelDB: 'claim_type': CLAIM_TYPES[metadata.claim_type], 'has_source': reposted_has_source if reposted_has_source is not None else ( False if not metadata.is_stream else metadata.stream.has_source), - 'stream_type': None if not metadata.is_stream else STREAM_TYPES[ + 'stream_type': 0 if not metadata.is_stream else STREAM_TYPES[ guess_stream_type(metadata.stream.source.media_type)], 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type, 'fee_amount': fee_amount, 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency, - 'reposted': self.get_reposted_count(claim_hash), - 'reposted_claim_hash': reposted_claim_hash, + 'repost_count': self.get_reposted_count(claim_hash), + 'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(), 'reposted_claim_type': reposted_claim_type, 'reposted_has_source': reposted_has_source, - - 'channel_hash': metadata.signing_channel_hash, - - 'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes, - 'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160( - self.ledger.public_key_to_address(metadata.channel.public_key_bytes) - ), - 'signature': metadata.signature, - 'signature_digest': None, # TODO: fix - 'signature_valid': claim.signature_valid, + 'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(), + 'public_key_id': None if not metadata.is_channel else + self.ledger.public_key_to_address(metadata.channel.public_key_bytes), + 'signature': (metadata.signature or b'').hex() or None, + # 'signature_digest': metadata.signature, + 'is_signature_valid': bool(claim.signature_valid), 'tags': tags, 'languages': languages, 'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED, - 'censoring_channel_hash': blocked_hash or filtered_hash or None, + 'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None, 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash) # 'trending_group': 0, # 'trending_mixed': 0, @@ -695,9 +692,7 @@ class LevelDB: return value async def all_claims_producer(self, batch_size=500_000): - loop = asyncio.get_event_loop() batch = [] - tasks = [] for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): # TODO: fix the couple of claim txos that dont have controlling names if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)):