fix merge conflicts and simplify extract_doc

This commit is contained in:
Jack Robison 2021-08-06 14:11:28 -04:00 committed by Victor Shyba
parent c68334b421
commit b4853c5f67
5 changed files with 47 additions and 62 deletions

View file

@ -252,9 +252,10 @@ class ResolveTimeoutError(WalletError):
class ResolveCensoredError(WalletError): class ResolveCensoredError(WalletError):
def __init__(self, url, censor_id): def __init__(self, url, censor_id, censor_row):
self.url = url self.url = url
self.censor_id = censor_id self.censor_id = censor_id
self.censor_row = censor_row
super().__init__(f"Resolve of '{url}' was censored by channel with claim id '{censor_id}'.") super().__init__(f"Resolve of '{url}' was censored by channel with claim id '{censor_id}'.")

View file

@ -44,7 +44,7 @@ class Censor:
def censor(self, row) -> Optional[bytes]: def censor(self, row) -> Optional[bytes]:
if self.is_censored(row): if self.is_censored(row):
censoring_channel_hash = row['censoring_channel_hash'] censoring_channel_hash = bytes.fromhex(row['censoring_channel_id'])[::-1]
self.censored.setdefault(censoring_channel_hash, set()) self.censored.setdefault(censoring_channel_hash, set())
self.censored[censoring_channel_hash].add(row['tx_hash']) self.censored[censoring_channel_hash].add(row['tx_hash'])
return censoring_channel_hash return censoring_channel_hash
@ -192,7 +192,7 @@ class Outputs:
if row.reposted_claim_hash: if row.reposted_claim_hash:
set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows) set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows)
elif isinstance(row, ResolveCensoredError): elif isinstance(row, ResolveCensoredError):
set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows) set_reference(txo_message.error.blocked.channel, row.censor_id, extra_txo_rows)
return page.SerializeToString() return page.SerializeToString()
@classmethod @classmethod

View file

@ -106,9 +106,19 @@ class SearchIndex:
count = 0 count = 0
async for op, doc in claim_producer: async for op, doc in claim_producer:
if op == 'delete': if op == 'delete':
yield {'_index': self.index, '_op_type': 'delete', '_id': doc} yield {
'_index': self.index,
'_op_type': 'delete',
'_id': doc
}
else: else:
yield extract_doc(doc, self.index) yield {
'doc': {key: value for key, value in doc.items() if key in ALL_FIELDS},
'_id': doc['claim_id'],
'_index': self.index,
'_op_type': 'update',
'doc_as_upsert': True
}
count += 1 count += 1
if count % 100 == 0: if count % 100 == 0:
self.logger.debug("Indexing in progress, %d claims.", count) self.logger.debug("Indexing in progress, %d claims.", count)
@ -474,34 +484,6 @@ class SearchIndex:
return referenced_txos return referenced_txos
def extract_doc(doc, index):
doc['claim_id'] = doc.pop('claim_hash')[::-1].hex()
if doc['reposted_claim_hash'] is not None:
doc['reposted_claim_id'] = doc.pop('reposted_claim_hash').hex()
else:
doc['reposted_claim_id'] = None
channel_hash = doc.pop('channel_hash')
doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash
channel_hash = doc.pop('censoring_channel_hash')
doc['censoring_channel_hash'] = channel_hash.hex() if channel_hash else channel_hash
# txo_hash = doc.pop('txo_hash')
# doc['tx_id'] = txo_hash[:32][::-1].hex()
# doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0]
doc['repost_count'] = doc.pop('reposted')
doc['is_controlling'] = bool(doc['is_controlling'])
doc['signature'] = (doc.pop('signature') or b'').hex() or None
doc['signature_digest'] = doc['signature']
doc['public_key_bytes'] = (doc.pop('public_key_bytes') or b'').hex() or None
doc['public_key_id'] = (doc.pop('public_key_hash') or b'').hex() or None
doc['is_signature_valid'] = bool(doc['signature_valid'])
doc['claim_type'] = doc.get('claim_type', 0) or 0
doc['stream_type'] = int(doc.get('stream_type', 0) or 0)
doc['has_source'] = bool(doc['has_source'])
doc['normalized_name'] = doc.pop('normalized')
doc = {key: value for key, value in doc.items() if key in ALL_FIELDS}
return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', 'doc_as_upsert': True}
def expand_query(**kwargs): def expand_query(**kwargs):
if "amount_order" in kwargs: if "amount_order" in kwargs:
kwargs["limit"] = 1 kwargs["limit"] = 1
@ -533,8 +515,8 @@ def expand_query(**kwargs):
value = CLAIM_TYPES[value] value = CLAIM_TYPES[value]
else: else:
value = [CLAIM_TYPES[claim_type] for claim_type in value] value = [CLAIM_TYPES[claim_type] for claim_type in value]
elif key == 'stream_type': # elif key == 'stream_type':
value = STREAM_TYPES[value] if isinstance(value, str) else list(map(STREAM_TYPES.get, value)) # value = STREAM_TYPES[value] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
if key == '_id': if key == '_id':
if isinstance(value, Iterable): if isinstance(value, Iterable):
value = [item[::-1].hex() for item in value] value = [item[::-1].hex() for item in value]
@ -590,13 +572,13 @@ def expand_query(**kwargs):
elif key == 'limit_claims_per_channel': elif key == 'limit_claims_per_channel':
collapse = ('channel_id.keyword', value) collapse = ('channel_id.keyword', value)
if kwargs.get('has_channel_signature'): if kwargs.get('has_channel_signature'):
query['must'].append({"exists": {"field": "signature_digest"}}) query['must'].append({"exists": {"field": "signature"}})
if 'signature_valid' in kwargs: if 'signature_valid' in kwargs:
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
elif 'signature_valid' in kwargs: elif 'signature_valid' in kwargs:
query.setdefault('should', []) query.setdefault('should', [])
query["minimum_should_match"] = 1 query["minimum_should_match"] = 1
query['should'].append({"bool": {"must_not": {"exists": {"field": "signature_digest"}}}}) query['should'].append({"bool": {"must_not": {"exists": {"field": "signature"}}}})
query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
if 'has_source' in kwargs: if 'has_source' in kwargs:
query.setdefault('should', []) query.setdefault('should', [])

View file

@ -6,7 +6,8 @@ from elasticsearch.helpers import async_bulk
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC from lbry.wallet.server.coin import LBC
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS
async def get_all_claims(index_name='claims', db=None): async def get_all_claims(index_name='claims', db=None):
@ -18,7 +19,13 @@ async def get_all_claims(index_name='claims', db=None):
try: try:
cnt = 0 cnt = 0
async for claim in db.all_claims_producer(): async for claim in db.all_claims_producer():
yield extract_doc(claim, index_name) yield {
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
'_id': claim['claim_id'],
'_index': index_name,
'_op_type': 'update',
'doc_as_upsert': True
}
cnt += 1 cnt += 1
if cnt % 10000 == 0: if cnt % 10000 == 0:
print(f"{cnt} claims sent") print(f"{cnt} claims sent")

View file

@ -65,7 +65,7 @@ class FlushData:
tip = attr.ib() tip = attr.ib()
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]] OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, ResolveCensoredError, LookupError, ValueError]]
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sLLBBlll') DB_STATE_STRUCT = struct.Struct(b'>32sLL32sLLBBlll')
DB_STATE_STRUCT_SIZE = 94 DB_STATE_STRUCT_SIZE = 94
@ -527,7 +527,7 @@ class LevelDB:
output = self.coin.transaction(raw).outputs[nout] output = self.coin.transaction(raw).outputs[nout]
script = OutputScript(output.pk_script) script = OutputScript(output.pk_script)
script.parse() script.parse()
return Claim.from_bytes(script.values['claim']) return Claim.from_bytes(script.values['claim']), ''.join(chr(c) for c in script.values['claim_name'])
except: except:
self.logger.error( self.logger.error(
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
@ -546,6 +546,7 @@ class LevelDB:
metadata = self.get_claim_metadata(claim.tx_hash, claim.position) metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
if not metadata: if not metadata:
return return
metadata, non_normalized_name = metadata
if not metadata.is_stream or not metadata.stream.has_fee: if not metadata.is_stream or not metadata.stream.has_fee:
fee_amount = 0 fee_amount = 0
else: else:
@ -564,6 +565,7 @@ class LevelDB:
) )
if not reposted_metadata: if not reposted_metadata:
return return
reposted_metadata, _ = reposted_metadata
reposted_tags = [] reposted_tags = []
reposted_languages = [] reposted_languages = []
reposted_has_source = None reposted_has_source = None
@ -632,10 +634,9 @@ class LevelDB:
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get( reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash) reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
value = { value = {
'claim_hash': claim_hash[::-1], 'claim_id': claim_hash.hex(),
# 'claim_id': claim_hash.hex(), 'claim_name': non_normalized_name,
'claim_name': claim.name, 'normalized_name': claim.name,
'normalized': claim.name,
'tx_id': claim.tx_hash[::-1].hex(), 'tx_id': claim.tx_hash[::-1].hex(),
'tx_num': claim.tx_num, 'tx_num': claim.tx_num,
'tx_nout': claim.position, 'tx_nout': claim.position,
@ -648,7 +649,7 @@ class LevelDB:
'expiration_height': claim.expiration_height, 'expiration_height': claim.expiration_height,
'effective_amount': claim.effective_amount, 'effective_amount': claim.effective_amount,
'support_amount': claim.support_amount, 'support_amount': claim.support_amount,
'is_controlling': claim.is_controlling, 'is_controlling': bool(claim.is_controlling),
'last_take_over_height': claim.last_takeover_height, 'last_take_over_height': claim.last_takeover_height,
'short_url': claim.short_url, 'short_url': claim.short_url,
'canonical_url': claim.canonical_url, 'canonical_url': claim.canonical_url,
@ -658,30 +659,26 @@ class LevelDB:
'claim_type': CLAIM_TYPES[metadata.claim_type], 'claim_type': CLAIM_TYPES[metadata.claim_type],
'has_source': reposted_has_source if reposted_has_source is not None else ( 'has_source': reposted_has_source if reposted_has_source is not None else (
False if not metadata.is_stream else metadata.stream.has_source), False if not metadata.is_stream else metadata.stream.has_source),
'stream_type': None if not metadata.is_stream else STREAM_TYPES[ 'stream_type': 0 if not metadata.is_stream else STREAM_TYPES[
guess_stream_type(metadata.stream.source.media_type)], guess_stream_type(metadata.stream.source.media_type)],
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type, 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
'fee_amount': fee_amount, 'fee_amount': fee_amount,
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency, 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
'reposted': self.get_reposted_count(claim_hash), 'repost_count': self.get_reposted_count(claim_hash),
'reposted_claim_hash': reposted_claim_hash, 'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
'reposted_claim_type': reposted_claim_type, 'reposted_claim_type': reposted_claim_type,
'reposted_has_source': reposted_has_source, 'reposted_has_source': reposted_has_source,
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
'channel_hash': metadata.signing_channel_hash, 'public_key_id': None if not metadata.is_channel else
self.ledger.public_key_to_address(metadata.channel.public_key_bytes),
'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes, 'signature': (metadata.signature or b'').hex() or None,
'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160( # 'signature_digest': metadata.signature,
self.ledger.public_key_to_address(metadata.channel.public_key_bytes) 'is_signature_valid': bool(claim.signature_valid),
),
'signature': metadata.signature,
'signature_digest': None, # TODO: fix
'signature_valid': claim.signature_valid,
'tags': tags, 'tags': tags,
'languages': languages, 'languages': languages,
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED, 'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
'censoring_channel_hash': blocked_hash or filtered_hash or None, 'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash) 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash)
# 'trending_group': 0, # 'trending_group': 0,
# 'trending_mixed': 0, # 'trending_mixed': 0,
@ -695,9 +692,7 @@ class LevelDB:
return value return value
async def all_claims_producer(self, batch_size=500_000): async def all_claims_producer(self, batch_size=500_000):
loop = asyncio.get_event_loop()
batch = [] batch = []
tasks = []
for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix):
# TODO: fix the couple of claim txos that dont have controlling names # TODO: fix the couple of claim txos that dont have controlling names
if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)): if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)):