backport fixes from server

This commit is contained in:
Victor Shyba 2021-02-02 17:11:13 -03:00
parent e61874bb6f
commit bf44befff6
3 changed files with 24 additions and 7 deletions

View file

@ -51,6 +51,8 @@ class SearchIndex:
},
"height": {"type": "integer"},
"claim_type": {"type": "byte"},
"censor_type": {"type": "byte"},
"trending_mixed": {"type": "float"},
}
}
}
@ -270,18 +272,25 @@ def extract_doc(doc, index):
'doc_as_upsert': True}
FIELDS = ['is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount',
FIELDS = {'is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount',
'timestamp', 'creation_timestamp', 'height', 'creation_height', 'activation_height', 'expiration_height',
'release_time', 'short_url', 'canonical_url', 'title', 'author', 'description', 'claim_type', 'reposted',
'stream_type', 'media_type', 'fee_amount', 'fee_currency', 'duration', 'reposted_claim_hash', 'censor_type',
'claims_in_channel', 'channel_join', 'signature_valid', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'channel_id', 'tx_id', 'tx_nout',
'signature', 'signature_digest', 'public_key_bytes', 'public_key_hash', 'public_key_id', '_id', 'tags',
'reposted_claim_id']
TEXT_FIELDS = ['author', 'canonical_url', 'channel_id', 'claim_name', 'description',
'reposted_claim_id'}
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description',
'media_type', 'normalized', 'public_key_bytes', 'public_key_hash', 'short_url', 'signature',
'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags']
RANGE_FIELDS = ['height', 'fee_amount', 'duration', 'reposted', 'release_time', 'censor_type']
'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'}
RANGE_FIELDS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'censor_type',
'trending_local', 'trending_global',
}
REPLACEMENTS = {
'name': 'normalized',
'txid': 'tx_id',

View file

@ -1029,6 +1029,12 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def run_and_cache_query(self, query_name, function, kwargs):
if isinstance(kwargs, dict) and 'trending_mixed' in kwargs.get('order_by', {}):
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
# also, release_time isnt releavant when sorting by trending but it makes cache bad
if 'release_time' in kwargs:
kwargs.pop('release_time')
kwargs['order_by'] = ['trending_mixed']
metrics = self.get_metrics_or_placeholder_for_api(query_name)
metrics.start()
cache = self.session_mgr.search_cache[query_name]

View file

@ -19,7 +19,7 @@ async def get_all(db, shard_num, shards_total):
return True
db.setexectrace(exec_factory)
total = db.execute(f"select count(*) as total from claim where rowid % {shards_total} = {shard_num};").fetchone()[0]
total = db.execute(f"select count(*) as total from claim where height % {shards_total} = {shard_num};").fetchone()[0]
for num, claim in enumerate(db.execute(f"""
SELECT claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
@ -27,7 +27,7 @@ SELECT claimtrie.claim_hash as is_controlling,
(select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages,
claim.*
FROM claim LEFT JOIN claimtrie USING (claim_hash)
WHERE claim.rowid % {shards_total} = {shard_num}
WHERE claim.height % {shards_total} = {shard_num}
""")):
claim = dict(claim._asdict())
claim['censor_type'] = 0
@ -47,6 +47,8 @@ async def consume(producer):
async def run(args, shard):
db = apsw.Connection(args.db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI)
db.cursor().execute('pragma journal_mode=wal;')
db.cursor().execute('pragma temp_store=memory;')
index = SearchIndex('')
await index.start()
await index.stop()