fix merge conflicts and simplify extract_doc
This commit is contained in:
parent
2d48e93f74
commit
54461dfa75
5 changed files with 47 additions and 62 deletions
|
@ -252,9 +252,10 @@ class ResolveTimeoutError(WalletError):
|
|||
|
||||
class ResolveCensoredError(WalletError):
|
||||
|
||||
def __init__(self, url, censor_id):
|
||||
def __init__(self, url, censor_id, censor_row):
|
||||
self.url = url
|
||||
self.censor_id = censor_id
|
||||
self.censor_row = censor_row
|
||||
super().__init__(f"Resolve of '{url}' was censored by channel with claim id '{censor_id}'.")
|
||||
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ class Censor:
|
|||
|
||||
def censor(self, row) -> Optional[bytes]:
|
||||
if self.is_censored(row):
|
||||
censoring_channel_hash = row['censoring_channel_hash']
|
||||
censoring_channel_hash = bytes.fromhex(row['censoring_channel_id'])[::-1]
|
||||
self.censored.setdefault(censoring_channel_hash, set())
|
||||
self.censored[censoring_channel_hash].add(row['tx_hash'])
|
||||
return censoring_channel_hash
|
||||
|
@ -192,7 +192,7 @@ class Outputs:
|
|||
if row.reposted_claim_hash:
|
||||
set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows)
|
||||
elif isinstance(row, ResolveCensoredError):
|
||||
set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows)
|
||||
set_reference(txo_message.error.blocked.channel, row.censor_id, extra_txo_rows)
|
||||
return page.SerializeToString()
|
||||
|
||||
@classmethod
|
||||
|
|
|
@ -106,9 +106,19 @@ class SearchIndex:
|
|||
count = 0
|
||||
async for op, doc in claim_producer:
|
||||
if op == 'delete':
|
||||
yield {'_index': self.index, '_op_type': 'delete', '_id': doc}
|
||||
yield {
|
||||
'_index': self.index,
|
||||
'_op_type': 'delete',
|
||||
'_id': doc
|
||||
}
|
||||
else:
|
||||
yield extract_doc(doc, self.index)
|
||||
yield {
|
||||
'doc': {key: value for key, value in doc.items() if key in ALL_FIELDS},
|
||||
'_id': doc['claim_id'],
|
||||
'_index': self.index,
|
||||
'_op_type': 'update',
|
||||
'doc_as_upsert': True
|
||||
}
|
||||
count += 1
|
||||
if count % 100 == 0:
|
||||
self.logger.debug("Indexing in progress, %d claims.", count)
|
||||
|
@ -474,34 +484,6 @@ class SearchIndex:
|
|||
return referenced_txos
|
||||
|
||||
|
||||
def extract_doc(doc, index):
|
||||
doc['claim_id'] = doc.pop('claim_hash')[::-1].hex()
|
||||
if doc['reposted_claim_hash'] is not None:
|
||||
doc['reposted_claim_id'] = doc.pop('reposted_claim_hash').hex()
|
||||
else:
|
||||
doc['reposted_claim_id'] = None
|
||||
channel_hash = doc.pop('channel_hash')
|
||||
doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash
|
||||
channel_hash = doc.pop('censoring_channel_hash')
|
||||
doc['censoring_channel_hash'] = channel_hash.hex() if channel_hash else channel_hash
|
||||
# txo_hash = doc.pop('txo_hash')
|
||||
# doc['tx_id'] = txo_hash[:32][::-1].hex()
|
||||
# doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0]
|
||||
doc['repost_count'] = doc.pop('reposted')
|
||||
doc['is_controlling'] = bool(doc['is_controlling'])
|
||||
doc['signature'] = (doc.pop('signature') or b'').hex() or None
|
||||
doc['signature_digest'] = doc['signature']
|
||||
doc['public_key_bytes'] = (doc.pop('public_key_bytes') or b'').hex() or None
|
||||
doc['public_key_id'] = (doc.pop('public_key_hash') or b'').hex() or None
|
||||
doc['is_signature_valid'] = bool(doc['signature_valid'])
|
||||
doc['claim_type'] = doc.get('claim_type', 0) or 0
|
||||
doc['stream_type'] = int(doc.get('stream_type', 0) or 0)
|
||||
doc['has_source'] = bool(doc['has_source'])
|
||||
doc['normalized_name'] = doc.pop('normalized')
|
||||
doc = {key: value for key, value in doc.items() if key in ALL_FIELDS}
|
||||
return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', 'doc_as_upsert': True}
|
||||
|
||||
|
||||
def expand_query(**kwargs):
|
||||
if "amount_order" in kwargs:
|
||||
kwargs["limit"] = 1
|
||||
|
@ -533,8 +515,8 @@ def expand_query(**kwargs):
|
|||
value = CLAIM_TYPES[value]
|
||||
else:
|
||||
value = [CLAIM_TYPES[claim_type] for claim_type in value]
|
||||
elif key == 'stream_type':
|
||||
value = STREAM_TYPES[value] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
|
||||
# elif key == 'stream_type':
|
||||
# value = STREAM_TYPES[value] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
|
||||
if key == '_id':
|
||||
if isinstance(value, Iterable):
|
||||
value = [item[::-1].hex() for item in value]
|
||||
|
@ -590,13 +572,13 @@ def expand_query(**kwargs):
|
|||
elif key == 'limit_claims_per_channel':
|
||||
collapse = ('channel_id.keyword', value)
|
||||
if kwargs.get('has_channel_signature'):
|
||||
query['must'].append({"exists": {"field": "signature_digest"}})
|
||||
query['must'].append({"exists": {"field": "signature"}})
|
||||
if 'signature_valid' in kwargs:
|
||||
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
|
||||
elif 'signature_valid' in kwargs:
|
||||
query.setdefault('should', [])
|
||||
query["minimum_should_match"] = 1
|
||||
query['should'].append({"bool": {"must_not": {"exists": {"field": "signature_digest"}}}})
|
||||
query['should'].append({"bool": {"must_not": {"exists": {"field": "signature"}}}})
|
||||
query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
|
||||
if 'has_source' in kwargs:
|
||||
query.setdefault('should', [])
|
||||
|
|
|
@ -6,7 +6,8 @@ from elasticsearch.helpers import async_bulk
|
|||
from lbry.wallet.server.env import Env
|
||||
from lbry.wallet.server.coin import LBC
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch
|
||||
from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch
|
||||
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS
|
||||
|
||||
|
||||
async def get_all_claims(index_name='claims', db=None):
|
||||
|
@ -18,7 +19,13 @@ async def get_all_claims(index_name='claims', db=None):
|
|||
try:
|
||||
cnt = 0
|
||||
async for claim in db.all_claims_producer():
|
||||
yield extract_doc(claim, index_name)
|
||||
yield {
|
||||
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
|
||||
'_id': claim['claim_id'],
|
||||
'_index': index_name,
|
||||
'_op_type': 'update',
|
||||
'doc_as_upsert': True
|
||||
}
|
||||
cnt += 1
|
||||
if cnt % 10000 == 0:
|
||||
print(f"{cnt} claims sent")
|
||||
|
|
|
@ -65,7 +65,7 @@ class FlushData:
|
|||
tip = attr.ib()
|
||||
|
||||
|
||||
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
|
||||
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, ResolveCensoredError, LookupError, ValueError]]
|
||||
|
||||
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sLLBBlll')
|
||||
DB_STATE_STRUCT_SIZE = 94
|
||||
|
@ -527,7 +527,7 @@ class LevelDB:
|
|||
output = self.coin.transaction(raw).outputs[nout]
|
||||
script = OutputScript(output.pk_script)
|
||||
script.parse()
|
||||
return Claim.from_bytes(script.values['claim'])
|
||||
return Claim.from_bytes(script.values['claim']), ''.join(chr(c) for c in script.values['claim_name'])
|
||||
except:
|
||||
self.logger.error(
|
||||
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
|
||||
|
@ -546,6 +546,7 @@ class LevelDB:
|
|||
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
|
||||
if not metadata:
|
||||
return
|
||||
metadata, non_normalized_name = metadata
|
||||
if not metadata.is_stream or not metadata.stream.has_fee:
|
||||
fee_amount = 0
|
||||
else:
|
||||
|
@ -564,6 +565,7 @@ class LevelDB:
|
|||
)
|
||||
if not reposted_metadata:
|
||||
return
|
||||
reposted_metadata, _ = reposted_metadata
|
||||
reposted_tags = []
|
||||
reposted_languages = []
|
||||
reposted_has_source = None
|
||||
|
@ -632,10 +634,9 @@ class LevelDB:
|
|||
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
|
||||
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
|
||||
value = {
|
||||
'claim_hash': claim_hash[::-1],
|
||||
# 'claim_id': claim_hash.hex(),
|
||||
'claim_name': claim.name,
|
||||
'normalized': claim.name,
|
||||
'claim_id': claim_hash.hex(),
|
||||
'claim_name': non_normalized_name,
|
||||
'normalized_name': claim.name,
|
||||
'tx_id': claim.tx_hash[::-1].hex(),
|
||||
'tx_num': claim.tx_num,
|
||||
'tx_nout': claim.position,
|
||||
|
@ -648,7 +649,7 @@ class LevelDB:
|
|||
'expiration_height': claim.expiration_height,
|
||||
'effective_amount': claim.effective_amount,
|
||||
'support_amount': claim.support_amount,
|
||||
'is_controlling': claim.is_controlling,
|
||||
'is_controlling': bool(claim.is_controlling),
|
||||
'last_take_over_height': claim.last_takeover_height,
|
||||
'short_url': claim.short_url,
|
||||
'canonical_url': claim.canonical_url,
|
||||
|
@ -658,30 +659,26 @@ class LevelDB:
|
|||
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
||||
'has_source': reposted_has_source if reposted_has_source is not None else (
|
||||
False if not metadata.is_stream else metadata.stream.has_source),
|
||||
'stream_type': None if not metadata.is_stream else STREAM_TYPES[
|
||||
'stream_type': 0 if not metadata.is_stream else STREAM_TYPES[
|
||||
guess_stream_type(metadata.stream.source.media_type)],
|
||||
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
|
||||
'fee_amount': fee_amount,
|
||||
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
|
||||
|
||||
'reposted': self.get_reposted_count(claim_hash),
|
||||
'reposted_claim_hash': reposted_claim_hash,
|
||||
'repost_count': self.get_reposted_count(claim_hash),
|
||||
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
|
||||
'reposted_claim_type': reposted_claim_type,
|
||||
'reposted_has_source': reposted_has_source,
|
||||
|
||||
'channel_hash': metadata.signing_channel_hash,
|
||||
|
||||
'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes,
|
||||
'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160(
|
||||
self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
|
||||
),
|
||||
'signature': metadata.signature,
|
||||
'signature_digest': None, # TODO: fix
|
||||
'signature_valid': claim.signature_valid,
|
||||
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
|
||||
'public_key_id': None if not metadata.is_channel else
|
||||
self.ledger.public_key_to_address(metadata.channel.public_key_bytes),
|
||||
'signature': (metadata.signature or b'').hex() or None,
|
||||
# 'signature_digest': metadata.signature,
|
||||
'is_signature_valid': bool(claim.signature_valid),
|
||||
'tags': tags,
|
||||
'languages': languages,
|
||||
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
|
||||
'censoring_channel_hash': blocked_hash or filtered_hash or None,
|
||||
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
|
||||
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash)
|
||||
# 'trending_group': 0,
|
||||
# 'trending_mixed': 0,
|
||||
|
@ -695,9 +692,7 @@ class LevelDB:
|
|||
return value
|
||||
|
||||
async def all_claims_producer(self, batch_size=500_000):
|
||||
loop = asyncio.get_event_loop()
|
||||
batch = []
|
||||
tasks = []
|
||||
for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix):
|
||||
# TODO: fix the couple of claim txos that dont have controlling names
|
||||
if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)):
|
||||
|
|
Loading…
Add table
Reference in a new issue