diff --git a/lbry/wallet/server/db/elasticsearch/constants.py b/lbry/wallet/server/db/elasticsearch/constants.py index 3ba70f84d..5c33df82e 100644 --- a/lbry/wallet/server/db/elasticsearch/constants.py +++ b/lbry/wallet/server/db/elasticsearch/constants.py @@ -72,11 +72,13 @@ RANGE_FIELDS = { ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS REPLACEMENTS = { - 'trending_mixed': 'trending_score', + 'name': 'normalized', 'txid': 'tx_id', 'nout': 'tx_nout', + 'trending_mixed': 'trending_score', 'normalized_name': 'normalized', - 'stream_types': 'stream_type', - 'media_types': 'media_type', - 'reposted': 'repost_count' + 'reposted': 'repost_count', + # 'stream_types': 'stream_type', + # 'media_types': 'media_type', + 'valid_channel_signature': 'is_signature_valid' } diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index e98fdfb9f..342b63f4a 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -231,7 +231,7 @@ class SearchIndex: if not ok: self.logger.warning("updating trending failed for an item: %s", item) await self.sync_client.indices.refresh(self.index) - self.logger.warning("updated trending scores in %ims", int((time.perf_counter() - start) * 1000)) + self.logger.info("updated trending scores in %ims", int((time.perf_counter() - start) * 1000)) async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): if filtered_streams: @@ -341,24 +341,6 @@ class SearchIndex: cache_item.result = result return result - # async def resolve(self, *urls): - # censor = Censor(Censor.RESOLVE) - # results = [await self.resolve_url(url) for url in urls] - # # just heat the cache - # await self.populate_claim_cache(*filter(lambda x: isinstance(x, str), results)) - # results = [self._get_from_cache_or_error(url, result) for url, result in zip(urls, results)] - # - # censored = [ - # result if not isinstance(result, dict) or not censor.censor(result) - # else ResolveCensoredError(url, result['censoring_channel_hash']) - # for url, result in zip(urls, results) - # ] - # return results, censored, censor - - def _get_from_cache_or_error(self, url: str, resolution: Union[LookupError, StreamResolution, ChannelResolution]): - cached = self.claim_cache.get(resolution) - return cached or (resolution if isinstance(resolution, LookupError) else resolution.lookup_error(url)) - async def get_many(self, *claim_ids): await self.populate_claim_cache(*claim_ids) return filter(None, map(self.claim_cache.get, claim_ids)) @@ -389,10 +371,6 @@ class SearchIndex: return self.short_id_cache.get(key, None) async def search(self, **kwargs): - if 'channel' in kwargs: - kwargs['channel_id'] = await self.resolve_url(kwargs.pop('channel')) - if not kwargs['channel_id'] or not isinstance(kwargs['channel_id'], str): - return [], 0, 0 try: return await self.search_ahead(**kwargs) except NotFoundError: @@ -477,78 +455,6 @@ class SearchIndex: next_page_hits_maybe_check_later.append((hit_id, hit_channel_id)) return reordered_hits - async def resolve_url(self, raw_url): - if raw_url not in self.resolution_cache: - self.resolution_cache[raw_url] = await self._resolve_url(raw_url) - return self.resolution_cache[raw_url] - - async def _resolve_url(self, raw_url): - try: - url = URL.parse(raw_url) - except ValueError as e: - return e - - stream = LookupError(f'Could not find claim at "{raw_url}".') - - channel_id = await self.resolve_channel_id(url) - if isinstance(channel_id, LookupError): - return channel_id - stream = (await self.resolve_stream(url, channel_id if isinstance(channel_id, str) else None)) or stream - if url.has_stream: - return StreamResolution(stream) - else: - return ChannelResolution(channel_id) - - async def resolve_channel_id(self, url: URL): - if not url.has_channel: - return - if url.channel.is_fullid: - return url.channel.claim_id - if url.channel.is_shortid: - channel_id = await self.full_id_from_short_id(url.channel.name, url.channel.claim_id) - if not channel_id: - return LookupError(f'Could not find channel in "{url}".') - return channel_id - - query = url.channel.to_dict() - if set(query) == {'name'}: - query['is_controlling'] = True - else: - query['order_by'] = ['^creation_height'] - matches, _, _ = await self.search(**query, limit=1) - if matches: - channel_id = matches[0]['claim_id'] - else: - return LookupError(f'Could not find channel in "{url}".') - return channel_id - - async def resolve_stream(self, url: URL, channel_id: str = None): - if not url.has_stream: - return None - if url.has_channel and channel_id is None: - return None - query = url.stream.to_dict() - if url.stream.claim_id is not None: - if url.stream.is_fullid: - claim_id = url.stream.claim_id - else: - claim_id = await self.full_id_from_short_id(query['name'], query['claim_id'], channel_id) - return claim_id - - if channel_id is not None: - if set(query) == {'name'}: - # temporarily emulate is_controlling for claims in channel - query['order_by'] = ['effective_amount', '^height'] - else: - query['order_by'] = ['^channel_join'] - query['channel_id'] = channel_id - query['signature_valid'] = True - elif set(query) == {'name'}: - query['is_controlling'] = True - matches, _, _ = await self.search(**query, limit=1) - if matches: - return matches[0]['claim_id'] - async def _get_referenced_rows(self, txo_rows: List[dict]): txo_rows = [row for row in txo_rows if isinstance(row, dict)] referenced_ids = set(filter(None, map(itemgetter('reposted_claim_id'), txo_rows))) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index f39b5ceda..304855d4c 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -357,7 +357,8 @@ class LevelDB: return return list(sorted(candidates, key=lambda item: item[1]))[0] - def _fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: + def _fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError, + OptionalResolveResultOrError]: try: parsed = URL.parse(url) except ValueError as e: @@ -374,7 +375,7 @@ class LevelDB: if channel: resolved_channel = self._resolve(channel.name, channel.claim_id, channel.amount_order) if not resolved_channel: - return None, LookupError(f'Could not find channel in "{url}".') + return None, LookupError(f'Could not find channel in "{url}".'), None if stream: if resolved_channel: stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized) @@ -386,8 +387,9 @@ class LevelDB: if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash: resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash) if not resolved_stream: - return LookupError(f'Could not find claim at "{url}".'), None + return LookupError(f'Could not find claim at "{url}".'), None, None + repost = None if resolved_stream or resolved_channel: claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash claim = resolved_stream if resolved_stream else resolved_channel @@ -397,10 +399,13 @@ class LevelDB: reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash) if blocker_hash: reason_row = self._fs_get_claim_by_hash(blocker_hash) - return None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row) - return resolved_stream, resolved_channel + return None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row), None + if claim.reposted_claim_hash: + repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash) + return resolved_stream, resolved_channel, repost - async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError]: + async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError, + OptionalResolveResultOrError]: return await asyncio.get_event_loop().run_in_executor(None, self._fs_resolve, url) def _fs_get_claim_by_hash(self, claim_hash): @@ -721,9 +726,9 @@ class LevelDB: async def all_claims_producer(self, batch_size=500_000): batch = [] - for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): + for claim_hash, claim_txo in self.claim_to_txo.items(): # TODO: fix the couple of claim txos that dont have controlling names - if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).normalized_name)): + if not self.db.get(Prefixes.claim_takeover.pack_key(claim_txo.normalized_name)): continue claim = self._fs_get_claim_by_hash(claim_hash[1:]) if claim: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 767261213..23b713d1a 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -983,6 +983,12 @@ class LBRYElectrumX(SessionBase): kwargs['release_time'] = format_release_time(kwargs.get('release_time')) try: self.session_mgr.pending_query_metric.inc() + if 'channel' in kwargs: + channel_url = kwargs.pop('channel') + _, channel_claim, _ = await self.db.fs_resolve(channel_url) + if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): + return Outputs.to_base64([], [], 0, None, None) + kwargs['channel_id'] = channel_claim.claim_hash.hex() return await self.db.search_index.cached_search(kwargs) except ConnectionTimeout: self.session_mgr.interrupt_count_metric.inc() @@ -1000,7 +1006,7 @@ class LBRYElectrumX(SessionBase): rows, extra = [], [] for url in urls: self.session_mgr.urls_to_resolve_count_metric.inc() - stream, channel = await self.db.fs_resolve(url) + stream, channel, repost = await self.db.fs_resolve(url) self.session_mgr.resolved_url_count_metric.inc() if isinstance(channel, ResolveCensoredError): rows.append(channel) @@ -1011,12 +1017,16 @@ class LBRYElectrumX(SessionBase): elif channel and not stream: rows.append(channel) # print("resolved channel", channel.name.decode()) + if repost: + extra.append(repost) elif stream: # print("resolved stream", stream.name.decode()) rows.append(stream) if channel: # print("and channel", channel.name.decode()) extra.append(channel) + if repost: + extra.append(repost) # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) return Outputs.to_base64(rows, extra, 0, None, None) diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index 089628f69..637dec6cd 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -28,7 +28,7 @@ class BaseResolveTestCase(CommandTestCase): async def assertNoClaimForName(self, name: str): lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) - stream, channel = await self.conductor.spv_node.server.bp.db.fs_resolve(name) + stream, channel, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(name) self.assertNotIn('claimId', lbrycrd_winning) if stream is not None: self.assertIsInstance(stream, LookupError) @@ -48,7 +48,7 @@ class BaseResolveTestCase(CommandTestCase): async def assertMatchWinningClaim(self, name): expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) - stream, channel = await self.conductor.spv_node.server.bp.db.fs_resolve(name) + stream, channel, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(name) claim = stream if stream else channel claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( claim_id=claim.claim_hash.hex() @@ -657,7 +657,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): await self.generate(32 * 10 - 1) self.assertEqual(1120, self.conductor.spv_node.server.bp.db.db_height) claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_B, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_B}") + claim_B, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_B}") self.assertEqual(1121, self.conductor.spv_node.server.bp.db.db_height) self.assertEqual(1131, claim_B.activation_height) await self.assertMatchClaimIsWinning(name, claim_id_A) @@ -674,7 +674,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted. claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] self.assertEqual(1123, self.conductor.spv_node.server.bp.db.db_height) - claim_C, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_C}") + claim_C, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_C}") self.assertEqual(1133, claim_C.activation_height) await self.assertMatchClaimIsWinning(name, claim_id_A) @@ -692,7 +692,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted. claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] self.assertEqual(1132, self.conductor.spv_node.server.bp.db.db_height) - claim_D, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}") + claim_D, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}") self.assertEqual(False, claim_D.is_controlling) self.assertEqual(801, claim_D.last_takeover_height) self.assertEqual(1142, claim_D.activation_height) @@ -702,7 +702,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase): # State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling await self.generate(1) self.assertEqual(1133, self.conductor.spv_node.server.bp.db.db_height) - claim_D, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}") + claim_D, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}") self.assertEqual(True, claim_D.is_controlling) self.assertEqual(1133, claim_D.last_takeover_height) self.assertEqual(1133, claim_D.activation_height)