diff --git a/scribe/db/common.py b/scribe/db/common.py index 5ca52ab..3cf530e 100644 --- a/scribe/db/common.py +++ b/scribe/db/common.py @@ -496,6 +496,12 @@ class ResolveResult(typing.NamedTuple): channel_hash: typing.Optional[bytes] reposted_claim_hash: typing.Optional[bytes] signature_valid: typing.Optional[bool] + reposted_tx_hash: typing.Optional[bytes] + reposted_tx_position: typing.Optional[int] + reposted_height: typing.Optional[int] + channel_tx_hash: typing.Optional[bytes] + channel_tx_position: typing.Optional[int] + channel_height: typing.Optional[int] class TrendingNotification(typing.NamedTuple): diff --git a/scribe/db/db.py b/scribe/db/db.py index 44a0b20..cfa9819 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -168,7 +168,6 @@ class HubDB: height = bisect_right(self.tx_counts, tx_num) created_height = bisect_right(self.tx_counts, root_tx_num) last_take_over_height = controlling_claim.height - expiration_height = self.coin.get_expiration_height(height) support_amount = self.get_support_amount(claim_hash) claim_amount = self.get_cached_claim_txo(claim_hash).amount @@ -176,9 +175,21 @@ class HubDB: effective_amount = self.get_effective_amount(claim_hash) channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) reposted_claim_hash = self.get_repost(claim_hash) + reposted_tx_hash = None + reposted_tx_position = None + reposted_height = None + if reposted_claim_hash: + repost_txo = self.get_cached_claim_txo(reposted_claim_hash) + if repost_txo: + reposted_tx_hash = self.get_tx_hash(repost_txo.tx_num) + reposted_tx_position = repost_txo.position + reposted_height = bisect_right(self.tx_counts, repost_txo.tx_num) short_url = self.get_short_claim_id_url(name, normalized_name, claim_hash, root_tx_num, root_position) canonical_url = short_url claims_in_channel = self.get_claims_in_channel_count(claim_hash) + channel_tx_hash = None + channel_tx_position = None + channel_height = None if channel_hash: channel_vals = self.get_cached_claim_txo(channel_hash) if channel_vals: @@ -187,6 +198,9 @@ class HubDB: channel_vals.root_position ) canonical_url = f'{channel_short_url}/{short_url}' + channel_tx_hash = self.get_tx_hash(channel_vals.tx_num) + channel_tx_position = channel_vals.position + channel_height = bisect_right(self.tx_counts, channel_vals.tx_num) return ResolveResult( name, normalized_name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url, is_controlling=controlling_claim.claim_hash == claim_hash, canonical_url=canonical_url, @@ -195,7 +209,9 @@ class HubDB: expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount, channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash, reposted=self.get_reposted_count(claim_hash), - signature_valid=None if not channel_hash else signature_valid + signature_valid=None if not channel_hash else signature_valid, reposted_tx_hash=reposted_tx_hash, + reposted_tx_position=reposted_tx_position, reposted_height=reposted_height, + channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position, channel_height=channel_height, ) def _resolve_parsed_url(self, name: str, claim_id: Optional[str] = None, @@ -396,16 +412,6 @@ class HubDB: return 0 return channel_count_val.count - async def reload_blocking_filtering_streams(self): - def reload(): - self.blocked_streams, self.blocked_channels = self.get_streams_and_channels_reposted_by_channel_hashes( - self.blocking_channel_hashes - ) - self.filtered_streams, self.filtered_channels = self.get_streams_and_channels_reposted_by_channel_hashes( - self.filtering_channel_hashes - ) - await asyncio.get_event_loop().run_in_executor(self._executor, reload) - def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]): streams, channels = {}, {} for reposter_channel_hash in reposter_channel_hashes: @@ -474,7 +480,7 @@ class HubDB: fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000) if fee_amount >= 9223372036854775807: return - reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1] + reposted_claim_hash = claim.reposted_claim_hash reposted_claim = None reposted_metadata = None if reposted_claim_hash: @@ -496,15 +502,14 @@ class HubDB: reposted_fee_currency = None reposted_duration = None if reposted_claim: - reposted_tx_hash = self.get_tx_hash(reposted_claim.tx_num) - raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False) + raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False) try: reposted_metadata = self.coin.transaction( raw_reposted_claim_tx ).outputs[reposted_claim.position].metadata except: self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s", - reposted_tx_hash[::-1].hex(), claim_hash.hex()) + claim.reposted_claim_hash.hex(), claim_hash.hex()) return if reposted_metadata: if reposted_metadata.is_stream: @@ -605,7 +610,13 @@ class HubDB: 'languages': languages, 'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED, 'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None, - 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash) + 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash), + 'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(), + 'reposted_tx_position': claim.reposted_tx_position, + 'reposted_height': claim.reposted_height, + 'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(), + 'channel_tx_position': claim.channel_tx_position, + 'channel_height': claim.channel_height, } if metadata.is_repost and reposted_duration is not None: diff --git a/scribe/elasticsearch/constants.py b/scribe/elasticsearch/constants.py index afdfd6f..10193ef 100644 --- a/scribe/elasticsearch/constants.py +++ b/scribe/elasticsearch/constants.py @@ -66,20 +66,25 @@ FIELDS = { 'duration', 'release_time', 'tags', 'languages', 'has_source', 'reposted_claim_type', 'reposted_claim_id', 'repost_count', 'sd_hash', - 'trending_score', 'tx_num' + 'trending_score', 'tx_num', + 'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id', + 'reposted_tx_position', 'reposted_height', } -TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id', - 'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature', - 'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', - 'tags', 'sd_hash'} +TEXT_FIELDS = { + 'author', 'canonical_url', 'channel_id', 'description', 'claim_id', 'censoring_channel_id', + 'media_type', 'normalized_name', 'public_key_bytes', 'public_key_id', 'short_url', 'signature', + 'claim_name', 'signature_digest', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', + 'tags', 'sd_hash', 'channel_tx_id', 'reposted_tx_id', +} RANGE_FIELDS = { 'height', 'creation_height', 'activation_height', 'expiration_height', 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', 'tx_position', 'repost_count', 'limit_claims_per_channel', 'amount', 'effective_amount', 'support_amount', - 'trending_score', 'censor_type', 'tx_num' + 'trending_score', 'censor_type', 'tx_num', 'reposted_tx_position', 'reposted_height', + 'channel_tx_position', 'channel_height', } ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS diff --git a/scribe/elasticsearch/search.py b/scribe/elasticsearch/search.py index 5089464..70785c7 100644 --- a/scribe/elasticsearch/search.py +++ b/scribe/elasticsearch/search.py @@ -472,6 +472,38 @@ class SearchIndex: self.search_cache.clear() self.claim_cache.clear() + def _make_resolve_result(self, es_result): + return ResolveResult( + name=es_result['claim_name'], + normalized_name=es_result['normalized_name'], + claim_hash=es_result['claim_hash'], + tx_num=es_result['tx_num'], + position=es_result['tx_nout'], + tx_hash=es_result['tx_hash'], + height=es_result['height'], + amount=es_result['amount'], + short_url=es_result['short_url'], + is_controlling=es_result['is_controlling'], + canonical_url=es_result['canonical_url'], + creation_height=es_result['creation_height'], + activation_height=es_result['activation_height'], + expiration_height=es_result['expiration_height'], + effective_amount=es_result['effective_amount'], + support_amount=es_result['support_amount'], + last_takeover_height=es_result['last_take_over_height'], + claims_in_channel=es_result['claims_in_channel'], + channel_hash=es_result['channel_hash'], + reposted_claim_hash=es_result['reposted_claim_hash'], + reposted=es_result['reposted'], + signature_valid=es_result['signature_valid'], + reposted_tx_hash=bytes.fromhex(es_result['reposted_tx_id'] or '')[::-1] or None, + reposted_tx_position=es_result['reposted_tx_position'], + reposted_height=es_result['reposted_height'], + channel_tx_hash=bytes.fromhex(es_result['channel_tx_id'] or '')[::-1] or None, + channel_tx_position=es_result['channel_tx_position'], + channel_height=es_result['channel_height'], + ) + async def cached_search(self, kwargs): total_referenced = [] cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache) @@ -481,68 +513,15 @@ class SearchIndex: if cache_item.result: return cache_item.result censor = Censor(Censor.SEARCH) - if kwargs.get('no_totals'): - response, offset, total = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) - else: - response, offset, total = await self.search(**kwargs) + response, offset, total = await self.search(**kwargs) censor.apply(response) total_referenced.extend(response) if censor.censored: response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) total_referenced.extend(response) - response = [ - ResolveResult( - name=r['claim_name'], - normalized_name=r['normalized_name'], - claim_hash=r['claim_hash'], - tx_num=r['tx_num'], - position=r['tx_nout'], - tx_hash=r['tx_hash'], - height=r['height'], - amount=r['amount'], - short_url=r['short_url'], - is_controlling=r['is_controlling'], - canonical_url=r['canonical_url'], - creation_height=r['creation_height'], - activation_height=r['activation_height'], - expiration_height=r['expiration_height'], - effective_amount=r['effective_amount'], - support_amount=r['support_amount'], - last_takeover_height=r['last_take_over_height'], - claims_in_channel=r['claims_in_channel'], - channel_hash=r['channel_hash'], - reposted_claim_hash=r['reposted_claim_hash'], - reposted=r['reposted'], - signature_valid=r['signature_valid'] - ) for r in response - ] - extra = [ - ResolveResult( - name=r['claim_name'], - normalized_name=r['normalized_name'], - claim_hash=r['claim_hash'], - tx_num=r['tx_num'], - position=r['tx_nout'], - tx_hash=r['tx_hash'], - height=r['height'], - amount=r['amount'], - short_url=r['short_url'], - is_controlling=r['is_controlling'], - canonical_url=r['canonical_url'], - creation_height=r['creation_height'], - activation_height=r['activation_height'], - expiration_height=r['expiration_height'], - effective_amount=r['effective_amount'], - support_amount=r['support_amount'], - last_takeover_height=r['last_take_over_height'], - claims_in_channel=r['claims_in_channel'], - channel_hash=r['channel_hash'], - reposted_claim_hash=r['reposted_claim_hash'], - reposted=r['reposted'], - signature_valid=r['signature_valid'] - ) for r in await self._get_referenced_rows(total_referenced) - ] + response = [self._make_resolve_result(r) for r in response] + extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)] result = Outputs.to_base64( response, extra, offset, total, censor ) diff --git a/scribe/hub/session.py b/scribe/hub/session.py index e56dcfa..67f2295 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -1195,7 +1195,7 @@ class LBRYElectrumX(asyncio.Protocol): channel_url = kwargs.pop('channel') _, channel_claim, _, _ = await self.db.resolve(channel_url) if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): - return Outputs.to_base64([], [], 0, None, None) + return Outputs.to_base64([], []) kwargs['channel_id'] = channel_claim.claim_hash.hex() return await self.session_manager.search_index.cached_search(kwargs) except ConnectionTimeout: @@ -1232,6 +1232,12 @@ class LBRYElectrumX(asyncio.Protocol): elif isinstance(stream, ResolveCensoredError): rows.append(stream) extra.append(stream.censor_row) + elif isinstance(repost, ResolveCensoredError): + rows.append(repost) + extra.append(repost.censor_row) + elif isinstance(reposted_channel, ResolveCensoredError): + rows.append(reposted_channel) + extra.append(reposted_channel.censor_row) elif channel and not stream: rows.append(channel) # print("resolved channel", channel.name.decode()) @@ -1251,7 +1257,7 @@ class LBRYElectrumX(asyncio.Protocol): extra.append(reposted_channel) await asyncio.sleep(0) self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( - None, Outputs.to_base64, rows, extra, 0, None, None + None, Outputs.to_base64, rows, extra ) return result finally: @@ -1271,15 +1277,25 @@ class LBRYElectrumX(asyncio.Protocol): return await asyncio.get_event_loop().run_in_executor(self.db._executor, get_height) - async def claimtrie_getclaimbyid(self, claim_id): + def _getclaimbyid(self, claim_id: str): rows = [] extra = [] - stream = await self.db.fs_getclaimbyid(claim_id) - if not stream: - stream = LookupError(f"Could not find claim at {claim_id}") - rows.append(stream) + claim_hash = bytes.fromhex(claim_id) + stream = self.db._fs_get_claim_by_hash(claim_hash) + rows.append(stream or LookupError(f"Could not find claim at {claim_id}")) + if stream and stream.channel_hash: + channel = self.db._fs_get_claim_by_hash(stream.channel_hash) + extra.append(channel or LookupError(f"Could not find channel at {stream.channel_hash.hex()}")) + if stream and stream.reposted_claim_hash: + repost = self.db._fs_get_claim_by_hash(stream.reposted_claim_hash) + if repost: + extra.append(repost) return Outputs.to_base64(rows, extra, 0, None, None) + async def claimtrie_getclaimbyid(self, claim_id): + assert len(claim_id) == 40, f"{len(claim_id)}: '{claim_id}'" + return await self.loop.run_in_executor(None, self._getclaimbyid, claim_id) + def assert_tx_hash(self, value): '''Raise an RPCError if the value is not a valid transaction hash.''' diff --git a/scribe/reader/elastic_sync.py b/scribe/reader/elastic_sync.py index e858683..ee4acc1 100644 --- a/scribe/reader/elastic_sync.py +++ b/scribe/reader/elastic_sync.py @@ -274,7 +274,6 @@ class ElasticWriter(BaseBlockchainReader): else: success += 1 await self.sync_client.indices.refresh(self.index) - await self.db.reload_blocking_filtering_streams() await self.apply_filters( self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams, self.db.filtered_channels @@ -316,6 +315,13 @@ class ElasticWriter(BaseBlockchainReader): self._force_reindex = reindex return super().run() + async def start(self, reindex=False): + self._force_reindex = reindex + try: + return await super().start() + finally: + self._force_reindex = False + async def _reindex(self): async with self._lock: self.log.info("reindexing %i claims (estimate)", self.db.prefix_db.claim_to_txo.estimate_num_keys()) diff --git a/scribe/schema/result.py b/scribe/schema/result.py index c361cfe..8d4865c 100644 --- a/scribe/schema/result.py +++ b/scribe/schema/result.py @@ -6,9 +6,8 @@ from itertools import chain from scribe.error import ResolveCensoredError from scribe.schema.types.v2.result_pb2 import Outputs as OutputsMessage from scribe.schema.types.v2.result_pb2 import Error as ErrorMessage -# if TYPE_CHECKING: -# from lbry_schema.schema.claim import ResolveResult - +if TYPE_CHECKING: + from scribe.db.common import ResolveResult INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID) NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND) BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED) @@ -50,38 +49,17 @@ class Censor: return censoring_channel_hash return None - def to_message(self, outputs: OutputsMessage, extra_txo_rows: dict): + def to_message(self, outputs: OutputsMessage, extra_txo_rows: List['ResolveResult']): for censoring_channel_hash, count in self.censored.items(): + outputs.blocked_total += len(count) blocked = outputs.blocked.add() blocked.count = len(count) - set_reference(blocked.channel, censoring_channel_hash, extra_txo_rows) - outputs.blocked_total += len(count) - - -class ResolveResult(NamedTuple): - name: str - normalized_name: str - claim_hash: bytes - tx_num: int - position: int - tx_hash: bytes - height: int - amount: int - short_url: str - is_controlling: bool - canonical_url: str - creation_height: int - activation_height: int - expiration_height: int - effective_amount: int - support_amount: int - reposted: int - last_takeover_height: Optional[int] - claims_in_channel: Optional[int] - channel_hash: Optional[bytes] - reposted_claim_hash: Optional[bytes] - signature_valid: Optional[bool] - + for resolve_result in extra_txo_rows: + if resolve_result.claim_hash == censoring_channel_hash: + blocked.channel.tx_hash = resolve_result.tx_hash + blocked.channel.nout = resolve_result.position + blocked.channel.height = resolve_result.height + return class Outputs: @@ -194,7 +172,7 @@ class Outputs: ) @classmethod - def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked=None) -> str: + def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censor = None) -> str: return base64.b64encode(cls.to_bytes(txo_rows, extra_txo_rows, offset, total, blocked)).decode() @classmethod @@ -206,37 +184,30 @@ class Outputs: if blocked is not None: blocked.to_message(page, extra_txo_rows) for row in extra_txo_rows: - txo_message: 'OutputsMessage' = page.extra_txos.add() - if not isinstance(row, Exception): - if row.channel_hash: - set_reference(txo_message.claim.channel, row.channel_hash, extra_txo_rows) - if row.reposted_claim_hash: - set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows) - cls.encode_txo(txo_message, row) - + cls.encode_txo(page.extra_txos.add(), row) for row in txo_rows: - # cls.row_to_message(row, page.txos.add(), extra_txo_rows) - txo_message: 'OutputsMessage' = page.txos.add() - cls.encode_txo(txo_message, row) - if not isinstance(row, Exception): - if row.channel_hash: - set_reference(txo_message.claim.channel, row.channel_hash, extra_txo_rows) - if row.reposted_claim_hash: - set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows) - elif isinstance(row, ResolveCensoredError): - set_reference(txo_message.error.blocked.channel, row.censor_id, extra_txo_rows) + txo_message = page.txos.add() + if isinstance(row, ResolveCensoredError): + for resolve_result in extra_txo_rows: + if resolve_result.claim_hash == row.censor_id: + txo_message.error.code = ErrorMessage.BLOCKED + txo_message.error.text = str(row) + txo_message.error.blocked.channel.tx_hash = resolve_result.tx_hash + txo_message.error.blocked.channel.nout = resolve_result.position + txo_message.error.blocked.channel.height = resolve_result.height + break + else: + cls.encode_txo(txo_message, row) return page.SerializeToString() @classmethod - def encode_txo(cls, txo_message, resolve_result: Union[ResolveResult, Exception]): + def encode_txo(cls, txo_message: OutputsMessage, resolve_result: Union['ResolveResult', Exception]): if isinstance(resolve_result, Exception): txo_message.error.text = resolve_result.args[0] if isinstance(resolve_result, ValueError): txo_message.error.code = ErrorMessage.INVALID elif isinstance(resolve_result, LookupError): txo_message.error.code = ErrorMessage.NOT_FOUND - elif isinstance(resolve_result, ResolveCensoredError): - txo_message.error.code = ErrorMessage.BLOCKED return txo_message.tx_hash = resolve_result.tx_hash txo_message.nout = resolve_result.position @@ -256,3 +227,11 @@ class Outputs: txo_message.claim.take_over_height = resolve_result.last_takeover_height if resolve_result.claims_in_channel is not None: txo_message.claim.claims_in_channel = resolve_result.claims_in_channel + if resolve_result.reposted_claim_hash and resolve_result.reposted_tx_hash is not None: + txo_message.claim.repost.tx_hash = resolve_result.reposted_tx_hash + txo_message.claim.repost.nout = resolve_result.reposted_tx_position + txo_message.claim.repost.height = resolve_result.reposted_height + if resolve_result.channel_hash and resolve_result.channel_tx_hash is not None: + txo_message.claim.channel.tx_hash = resolve_result.channel_tx_hash + txo_message.claim.channel.nout = resolve_result.channel_tx_position + txo_message.claim.channel.height = resolve_result.channel_height