claim search fixes

This commit is contained in:
Jack Robison 2021-09-13 10:50:02 -04:00 committed by Victor Shyba
parent 0d19439982
commit c00912015a
5 changed files with 37 additions and 114 deletions

View file

@ -72,11 +72,13 @@ RANGE_FIELDS = {
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
REPLACEMENTS = { REPLACEMENTS = {
'trending_mixed': 'trending_score', 'name': 'normalized',
'txid': 'tx_id', 'txid': 'tx_id',
'nout': 'tx_nout', 'nout': 'tx_nout',
'trending_mixed': 'trending_score',
'normalized_name': 'normalized', 'normalized_name': 'normalized',
'stream_types': 'stream_type', 'reposted': 'repost_count',
'media_types': 'media_type', # 'stream_types': 'stream_type',
'reposted': 'repost_count' # 'media_types': 'media_type',
'valid_channel_signature': 'is_signature_valid'
} }

View file

@ -231,7 +231,7 @@ class SearchIndex:
if not ok: if not ok:
self.logger.warning("updating trending failed for an item: %s", item) self.logger.warning("updating trending failed for an item: %s", item)
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
self.logger.warning("updated trending scores in %ims", int((time.perf_counter() - start) * 1000)) self.logger.info("updated trending scores in %ims", int((time.perf_counter() - start) * 1000))
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
if filtered_streams: if filtered_streams:
@ -341,24 +341,6 @@ class SearchIndex:
cache_item.result = result cache_item.result = result
return result return result
# async def resolve(self, *urls):
# censor = Censor(Censor.RESOLVE)
# results = [await self.resolve_url(url) for url in urls]
# # just heat the cache
# await self.populate_claim_cache(*filter(lambda x: isinstance(x, str), results))
# results = [self._get_from_cache_or_error(url, result) for url, result in zip(urls, results)]
#
# censored = [
# result if not isinstance(result, dict) or not censor.censor(result)
# else ResolveCensoredError(url, result['censoring_channel_hash'])
# for url, result in zip(urls, results)
# ]
# return results, censored, censor
def _get_from_cache_or_error(self, url: str, resolution: Union[LookupError, StreamResolution, ChannelResolution]):
cached = self.claim_cache.get(resolution)
return cached or (resolution if isinstance(resolution, LookupError) else resolution.lookup_error(url))
async def get_many(self, *claim_ids): async def get_many(self, *claim_ids):
await self.populate_claim_cache(*claim_ids) await self.populate_claim_cache(*claim_ids)
return filter(None, map(self.claim_cache.get, claim_ids)) return filter(None, map(self.claim_cache.get, claim_ids))
@ -389,10 +371,6 @@ class SearchIndex:
return self.short_id_cache.get(key, None) return self.short_id_cache.get(key, None)
async def search(self, **kwargs): async def search(self, **kwargs):
if 'channel' in kwargs:
kwargs['channel_id'] = await self.resolve_url(kwargs.pop('channel'))
if not kwargs['channel_id'] or not isinstance(kwargs['channel_id'], str):
return [], 0, 0
try: try:
return await self.search_ahead(**kwargs) return await self.search_ahead(**kwargs)
except NotFoundError: except NotFoundError:
@ -477,78 +455,6 @@ class SearchIndex:
next_page_hits_maybe_check_later.append((hit_id, hit_channel_id)) next_page_hits_maybe_check_later.append((hit_id, hit_channel_id))
return reordered_hits return reordered_hits
async def resolve_url(self, raw_url):
if raw_url not in self.resolution_cache:
self.resolution_cache[raw_url] = await self._resolve_url(raw_url)
return self.resolution_cache[raw_url]
async def _resolve_url(self, raw_url):
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
stream = LookupError(f'Could not find claim at "{raw_url}".')
channel_id = await self.resolve_channel_id(url)
if isinstance(channel_id, LookupError):
return channel_id
stream = (await self.resolve_stream(url, channel_id if isinstance(channel_id, str) else None)) or stream
if url.has_stream:
return StreamResolution(stream)
else:
return ChannelResolution(channel_id)
async def resolve_channel_id(self, url: URL):
if not url.has_channel:
return
if url.channel.is_fullid:
return url.channel.claim_id
if url.channel.is_shortid:
channel_id = await self.full_id_from_short_id(url.channel.name, url.channel.claim_id)
if not channel_id:
return LookupError(f'Could not find channel in "{url}".')
return channel_id
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
matches, _, _ = await self.search(**query, limit=1)
if matches:
channel_id = matches[0]['claim_id']
else:
return LookupError(f'Could not find channel in "{url}".')
return channel_id
async def resolve_stream(self, url: URL, channel_id: str = None):
if not url.has_stream:
return None
if url.has_channel and channel_id is None:
return None
query = url.stream.to_dict()
if url.stream.claim_id is not None:
if url.stream.is_fullid:
claim_id = url.stream.claim_id
else:
claim_id = await self.full_id_from_short_id(query['name'], query['claim_id'], channel_id)
return claim_id
if channel_id is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_id'] = channel_id
query['signature_valid'] = True
elif set(query) == {'name'}:
query['is_controlling'] = True
matches, _, _ = await self.search(**query, limit=1)
if matches:
return matches[0]['claim_id']
async def _get_referenced_rows(self, txo_rows: List[dict]): async def _get_referenced_rows(self, txo_rows: List[dict]):
txo_rows = [row for row in txo_rows if isinstance(row, dict)] txo_rows = [row for row in txo_rows if isinstance(row, dict)]
referenced_ids = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows))) referenced_ids = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows)))

View file

@ -357,7 +357,8 @@ class LevelDB:
return return
return list(sorted(candidates, key=lambda item: item[1]))[0] return list(sorted(candidates, key=lambda item: item[1]))[0]
def _fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: def _fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError,
OptionalResolveResultOrError]:
try: try:
parsed = URL.parse(url) parsed = URL.parse(url)
except ValueError as e: except ValueError as e:
@ -374,7 +375,7 @@ class LevelDB:
if channel: if channel:
resolved_channel = self._resolve(channel.name, channel.claim_id, channel.amount_order) resolved_channel = self._resolve(channel.name, channel.claim_id, channel.amount_order)
if not resolved_channel: if not resolved_channel:
return None, LookupError(f'Could not find channel in "{url}".') return None, LookupError(f'Could not find channel in "{url}".'), None
if stream: if stream:
if resolved_channel: if resolved_channel:
stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized) stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized)
@ -386,8 +387,9 @@ class LevelDB:
if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash: if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash:
resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash) resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash)
if not resolved_stream: if not resolved_stream:
return LookupError(f'Could not find claim at "{url}".'), None return LookupError(f'Could not find claim at "{url}".'), None, None
repost = None
if resolved_stream or resolved_channel: if resolved_stream or resolved_channel:
claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash
claim = resolved_stream if resolved_stream else resolved_channel claim = resolved_stream if resolved_stream else resolved_channel
@ -397,10 +399,13 @@ class LevelDB:
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash) reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
if blocker_hash: if blocker_hash:
reason_row = self._fs_get_claim_by_hash(blocker_hash) reason_row = self._fs_get_claim_by_hash(blocker_hash)
return None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row) return None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row), None
return resolved_stream, resolved_channel if claim.reposted_claim_hash:
repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash)
return resolved_stream, resolved_channel, repost
async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError,
OptionalResolveResultOrError]:
return await asyncio.get_event_loop().run_in_executor(None, self._fs_resolve, url) return await asyncio.get_event_loop().run_in_executor(None, self._fs_resolve, url)
def _fs_get_claim_by_hash(self, claim_hash): def _fs_get_claim_by_hash(self, claim_hash):
@ -721,9 +726,9 @@ class LevelDB:
async def all_claims_producer(self, batch_size=500_000): async def all_claims_producer(self, batch_size=500_000):
batch = [] batch = []
for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): for claim_hash, claim_txo in self.claim_to_txo.items():
# TODO: fix the couple of claim txos that dont have controlling names # TODO: fix the couple of claim txos that dont have controlling names
if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).normalized_name)): if not self.db.get(Prefixes.claim_takeover.pack_key(claim_txo.normalized_name)):
continue continue
claim = self._fs_get_claim_by_hash(claim_hash[1:]) claim = self._fs_get_claim_by_hash(claim_hash[1:])
if claim: if claim:

View file

@ -983,6 +983,12 @@ class LBRYElectrumX(SessionBase):
kwargs['release_time'] = format_release_time(kwargs.get('release_time')) kwargs['release_time'] = format_release_time(kwargs.get('release_time'))
try: try:
self.session_mgr.pending_query_metric.inc() self.session_mgr.pending_query_metric.inc()
if 'channel' in kwargs:
channel_url = kwargs.pop('channel')
_, channel_claim, _ = await self.db.fs_resolve(channel_url)
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
return Outputs.to_base64([], [], 0, None, None)
kwargs['channel_id'] = channel_claim.claim_hash.hex()
return await self.db.search_index.cached_search(kwargs) return await self.db.search_index.cached_search(kwargs)
except ConnectionTimeout: except ConnectionTimeout:
self.session_mgr.interrupt_count_metric.inc() self.session_mgr.interrupt_count_metric.inc()
@ -1000,7 +1006,7 @@ class LBRYElectrumX(SessionBase):
rows, extra = [], [] rows, extra = [], []
for url in urls: for url in urls:
self.session_mgr.urls_to_resolve_count_metric.inc() self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel = await self.db.fs_resolve(url) stream, channel, repost = await self.db.fs_resolve(url)
self.session_mgr.resolved_url_count_metric.inc() self.session_mgr.resolved_url_count_metric.inc()
if isinstance(channel, ResolveCensoredError): if isinstance(channel, ResolveCensoredError):
rows.append(channel) rows.append(channel)
@ -1011,12 +1017,16 @@ class LBRYElectrumX(SessionBase):
elif channel and not stream: elif channel and not stream:
rows.append(channel) rows.append(channel)
# print("resolved channel", channel.name.decode()) # print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
elif stream: elif stream:
# print("resolved stream", stream.name.decode()) # print("resolved stream", stream.name.decode())
rows.append(stream) rows.append(stream)
if channel: if channel:
# print("and channel", channel.name.decode()) # print("and channel", channel.name.decode())
extra.append(channel) extra.append(channel)
if repost:
extra.append(repost)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None) return Outputs.to_base64(rows, extra, 0, None, None)

View file

@ -28,7 +28,7 @@ class BaseResolveTestCase(CommandTestCase):
async def assertNoClaimForName(self, name: str): async def assertNoClaimForName(self, name: str):
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
stream, channel = await self.conductor.spv_node.server.bp.db.fs_resolve(name) stream, channel, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(name)
self.assertNotIn('claimId', lbrycrd_winning) self.assertNotIn('claimId', lbrycrd_winning)
if stream is not None: if stream is not None:
self.assertIsInstance(stream, LookupError) self.assertIsInstance(stream, LookupError)
@ -48,7 +48,7 @@ class BaseResolveTestCase(CommandTestCase):
async def assertMatchWinningClaim(self, name): async def assertMatchWinningClaim(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
stream, channel = await self.conductor.spv_node.server.bp.db.fs_resolve(name) stream, channel, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(name)
claim = stream if stream else channel claim = stream if stream else channel
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=claim.claim_hash.hex() claim_id=claim.claim_hash.hex()
@ -657,7 +657,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
await self.generate(32 * 10 - 1) await self.generate(32 * 10 - 1)
self.assertEqual(1120, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1120, self.conductor.spv_node.server.bp.db.db_height)
claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
claim_B, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_B}") claim_B, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_B}")
self.assertEqual(1121, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1121, self.conductor.spv_node.server.bp.db.db_height)
self.assertEqual(1131, claim_B.activation_height) self.assertEqual(1131, claim_B.activation_height)
await self.assertMatchClaimIsWinning(name, claim_id_A) await self.assertMatchClaimIsWinning(name, claim_id_A)
@ -674,7 +674,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted. # State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted.
claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
self.assertEqual(1123, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1123, self.conductor.spv_node.server.bp.db.db_height)
claim_C, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_C}") claim_C, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_C}")
self.assertEqual(1133, claim_C.activation_height) self.assertEqual(1133, claim_C.activation_height)
await self.assertMatchClaimIsWinning(name, claim_id_A) await self.assertMatchClaimIsWinning(name, claim_id_A)
@ -692,7 +692,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted. # State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted.
claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
self.assertEqual(1132, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1132, self.conductor.spv_node.server.bp.db.db_height)
claim_D, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}") claim_D, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}")
self.assertEqual(False, claim_D.is_controlling) self.assertEqual(False, claim_D.is_controlling)
self.assertEqual(801, claim_D.last_takeover_height) self.assertEqual(801, claim_D.last_takeover_height)
self.assertEqual(1142, claim_D.activation_height) self.assertEqual(1142, claim_D.activation_height)
@ -702,7 +702,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
# State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling # State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling
await self.generate(1) await self.generate(1)
self.assertEqual(1133, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1133, self.conductor.spv_node.server.bp.db.db_height)
claim_D, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}") claim_D, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}")
self.assertEqual(True, claim_D.is_controlling) self.assertEqual(True, claim_D.is_controlling)
self.assertEqual(1133, claim_D.last_takeover_height) self.assertEqual(1133, claim_D.last_takeover_height)
self.assertEqual(1133, claim_D.activation_height) self.assertEqual(1133, claim_D.activation_height)