diff --git a/hub/db/db.py b/hub/db/db.py index e2110d4..b3cdc3d 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -18,9 +18,9 @@ from hub.schema.url import URL, normalize_name from hub.schema.claim import guess_stream_type from hub.schema.result import Censor from hub.scribe.transaction import TxInput -from hub.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256, STREAM_TYPES, CLAIM_TYPES +from hub.common import hash_to_hex_str, LRUCacheWithMetrics from hub.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem -from hub.db.common import ResolveResult,ExpandedResolveResult, DBError, UTXO +from hub.db.common import ResolveResult, ExpandedResolveResult, DBError, UTXO from hub.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from hub.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey from hub.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow @@ -220,123 +220,230 @@ class SecondaryDB: channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position, channel_height=channel_height, ) - def _resolve_parsed_url(self, name: str, claim_id: Optional[str] = None, - amount_order: Optional[int] = None) -> Optional[ResolveResult]: - """ - :param normalized_name: name - :param claim_id: partial or complete claim id - :param amount_order: '$' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided - """ - try: - normalized_name = normalize_name(name) - except UnicodeDecodeError: - normalized_name = name - if (not amount_order and not claim_id) or amount_order == 1: - # winning resolution - controlling = self.get_controlling_claim(normalized_name) - if not controlling: - # print(f"none controlling for lbry://{normalized_name}") - return - # print(f"resolved controlling lbry://{normalized_name}#{controlling.claim_hash.hex()}") - return self._fs_get_claim_by_hash(controlling.claim_hash) + async def _batch_resolve_parsed_urls(self, args: List[Tuple[str, Optional[str], Optional[int]]]) -> List[Optional[bytes]]: + # list of name, claim id tuples (containing one or the other) + # this is to preserve the ordering + needed: List[Tuple[Optional[str], Optional[bytes]]] = [] + needed_full_claim_hashes = {} + run_in_executor = asyncio.get_event_loop().run_in_executor - amount_order = max(int(amount_order or 1), 1) + def get_txo_for_partial_claim_id(normalized: str, claim_id: str): + for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized, claim_id[:10])): + return claim_txo.tx_num, claim_txo.position - if claim_id: - if len(claim_id) == 40: # a full claim id - claim_txo = self.get_claim_txo(bytes.fromhex(claim_id)) - if not claim_txo or normalized_name != claim_txo.normalized_name: - return - return self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, bytes.fromhex(claim_id), claim_txo.name, - claim_txo.root_tx_num, claim_txo.root_position, - self.get_activation(claim_txo.tx_num, claim_txo.position), claim_txo.channel_signature_is_valid - ) - # resolve by partial/complete claim id - for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])): - full_claim_hash = self.get_cached_claim_hash(claim_txo.tx_num, claim_txo.position) - c = self.get_cached_claim_txo(full_claim_hash) + def get_claim_by_amount(normalized: str, order: int): + order = max(int(order or 1), 1) - non_normalized_name = c.name - signature_is_valid = c.channel_signature_is_valid - return self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, full_claim_hash, non_normalized_name, key.root_tx_num, - key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position), - signature_is_valid - ) - return + for _idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized,))): + if order > _idx + 1: + continue + return claim_val.claim_hash - # resolve by amount ordering, 1 indexed - for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))): - if amount_order > idx + 1: + for idx, (name, claim_id, amount_order) in enumerate(args): + try: + normalized_name = normalize_name(name) + except UnicodeDecodeError: + normalized_name = name + if (not amount_order and not claim_id) or amount_order == 1: + # winning resolution + needed.append((normalized_name, None)) continue - claim_txo = self.get_cached_claim_txo(claim_val.claim_hash) - activation = self.get_activation(key.tx_num, key.position) - return self._prepare_resolve_result( - key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + + full_claim_hash = None + + if claim_id: + if len(claim_id) == 40: # a full claim id + needed.append((None, bytes.fromhex(claim_id))) + continue + + # resolve by partial/complete claim id + txo = await run_in_executor(self._executor, get_txo_for_partial_claim_id, normalized_name, claim_id) + if txo: + needed_full_claim_hashes[idx] = txo + needed.append((None, None)) + continue + # resolve by amount ordering, 1 indexed + needed.append((None, await run_in_executor(self._executor, get_claim_by_amount, normalized_name, amount_order))) + + # fetch the full claim hashes needed from urls with partial claim ids + if needed_full_claim_hashes: + idxs = list(needed_full_claim_hashes.keys()) + position = 0 + async for _, v in self.prefix_db.txo_to_claim.multi_get_async_gen(self._executor, list(needed_full_claim_hashes.values())): + idx = idxs[position] + needed[idx] = None, v.claim_hash + position += 1 + + # fetch the winning claim hashes for the urls using winning resolution + needed_winning = list(set(normalized_name for normalized_name, _ in needed if normalized_name is not None)) + winning_indexes = [idx for idx in range(len(needed)) if needed[idx][0] is not None] + controlling_claims = { + name: takeover_v async for (name,), takeover_v in self.prefix_db.claim_takeover.multi_get_async_gen( + self._executor, [(name,) for name in needed_winning] ) - return + } + for idx in winning_indexes: + name = needed[idx][0] + controlling = controlling_claims[name] + if controlling: + needed[idx] = name, controlling.claim_hash + + return [ + claim_hash for _, claim_hash in needed + ] def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str, stream_claim_id: Optional[str] = None): for key, stream in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash, normalized_name)): if stream_claim_id is not None and not stream.claim_hash.hex().startswith(stream_claim_id): continue - return stream.claim_hash, key.tx_num, key.position, self.get_effective_amount(stream.claim_hash) + return stream.claim_hash - def _resolve(self, url) -> ExpandedResolveResult: - try: - parsed = URL.parse(url) - except ValueError as e: - return ExpandedResolveResult(e, None, None, None) + async def batch_resolve_urls(self, urls: List[str]) -> Dict[str, ExpandedResolveResult]: + """ + Resolve a list of urls to a dictionary of urls to claims, + including any extra claim(s) to expand the result, these could be a channel, a repost, or a repost channel + """ + run_in_executor = asyncio.get_event_loop().run_in_executor - stream = channel = resolved_channel = resolved_stream = None - if parsed.has_stream_in_channel: - channel = parsed.channel - stream = parsed.stream - elif parsed.has_channel: - channel = parsed.channel - elif parsed.has_stream: - stream = parsed.stream - if channel: - resolved_channel = self._resolve_parsed_url(channel.name, channel.claim_id, channel.amount_order) - if not resolved_channel: - return ExpandedResolveResult(None, LookupError(f'Could not find channel in "{url}".'), None, None) - if stream: - if resolved_channel: - stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized, stream.claim_id) - if stream_claim: - stream_claim_id, stream_tx_num, stream_tx_pos, effective_amount = stream_claim - resolved_stream = self._fs_get_claim_by_hash(stream_claim_id) - else: - resolved_stream = self._resolve_parsed_url(stream.name, stream.claim_id, stream.amount_order) - if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash: - resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash) - if not resolved_stream: - return ExpandedResolveResult(LookupError(f'Could not find claim at "{url}".'), None, None, None) + # this is split into two stages, first we map the urls to primary claim hashes they resolve to + # then the claims are collected in a batch, which also collects the extra claims needed for the primary matches - repost = None - reposted_channel = None - if resolved_stream or resolved_channel: - claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash - claim = resolved_stream if resolved_stream else resolved_channel - reposted_claim_hash = resolved_stream.reposted_claim_hash if resolved_stream else None - blocker_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get( - reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get( - reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash) - if blocker_hash: - reason_row = self._fs_get_claim_by_hash(blocker_hash) - return ExpandedResolveResult( - None, ResolveCensoredError(url, blocker_hash.hex(), censor_row=reason_row), None, None + # prepare to resolve all of the outer most levels of the urls - the first name in a url, a stream or a channel + needed: List[Tuple[str, str, int]] = [] # (name, partial claim id, amount order) of a url + needed_streams_in_channels = defaultdict(list) + + parsed_urls = {} + url_parts_to_resolve = {} + check_if_has_channel = set() + resolved_urls = {} + + urls_to_parts_mapping = {} + for url in urls: + need_args = None + try: + parsed = URL.parse(url) + parsed_urls[url] = parsed + except ValueError as e: + parsed_urls[url] = e + continue + stream = channel = None + if parsed.has_stream_in_channel: + stream = parsed.stream + channel = parsed.channel + need_args = (channel.name, channel.claim_id, channel.amount_order) + needed_streams_in_channels[url].append(stream) + elif parsed.has_channel: + channel = parsed.channel + need_args = (channel.name, channel.claim_id, channel.amount_order) + elif parsed.has_stream: + stream = parsed.stream + need_args = (stream.name, stream.claim_id, stream.amount_order) + check_if_has_channel.add(url) + if need_args: + needed.append(need_args) + url_parts_to_resolve[url] = need_args + urls_to_parts_mapping[need_args] = url + + # collect the claim hashes for the outer layer claims in the urls + outer_layer_claims = { + urls_to_parts_mapping[args]: claim_hash for args, claim_hash in zip( + needed, await self._batch_resolve_parsed_urls(needed) + ) + } + + # needed_claims is a set of the total claim hashes to look up + needed_claims = set(claim_hash for claim_hash in outer_layer_claims.values() if claim_hash is not None) + for url, claim_hash in outer_layer_claims.items(): + # if it's a stream not in a channel or is a bare channel then this claim is all that's needed for the url + if url not in needed_streams_in_channels: + if claim_hash: + resolved_urls[url] = claim_hash + needed_claims.add(claim_hash) + + # check if any claims we've accumulated are in channels, add the channels to the set of needed claims + if needed_claims: + claims_to_check_if_in_channel = list(needed_claims) + txos = { + claim_hash: txo + async for (claim_hash, ), txo in self.prefix_db.claim_to_txo.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in claims_to_check_if_in_channel] ) - if claim.reposted_claim_hash: - repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash) - if repost and repost.channel_hash and repost.signature_valid: - reposted_channel = self._fs_get_claim_by_hash(repost.channel_hash) - return ExpandedResolveResult(resolved_stream, resolved_channel, repost, reposted_channel) + } + needed_claims.update({ + claim.signing_hash + async for _, claim in self.prefix_db.claim_to_channel.multi_get_async_gen( + self._executor, [ + (claim_hash, txos[claim_hash].tx_num, txos[claim_hash].position) + for claim_hash in needed_claims if txos[claim_hash] is not None + ] + ) + if claim is not None and claim.signing_hash is not None + }) + + # add the stream claim hashes for urls with channel streams to the needed set + for url, streams in needed_streams_in_channels.items(): + resolved_channel_hash = outer_layer_claims.get(url) + if not resolved_channel_hash: + continue + for stream in streams: + stream_claim_hash = await run_in_executor( + self._executor, self._resolve_claim_in_channel, resolved_channel_hash, stream.normalized, + stream.claim_id + ) + if stream_claim_hash: # set the result claim hash to the stream + resolved_urls[url] = stream_claim_hash + needed_claims.add(stream_claim_hash) + + # collect all of the claim ResolveResults for the urls + claims = {} + if needed_claims: + async for claim_hash, claim, extra in self._prepare_resolve_results( + list(needed_claims), apply_filtering=False): + claims[claim_hash] = claim # the primary result + if extra: # extra results (channels, reposts, repost channels) + claims.update(extra) + results = {} + for url in urls: + claim_hash = resolved_urls.get(url) + parsed = parsed_urls[url] + if not claim_hash or not claims[claim_hash]: + if not isinstance(parsed, Exception) and parsed.has_channel and not parsed.has_stream: + results[url] = ExpandedResolveResult( + None, LookupError(f'Could not find channel in "{url}".'), None, None + ) + elif not isinstance(parsed, Exception) and (parsed.has_stream_in_channel or parsed.has_stream): + results[url] = ExpandedResolveResult( + LookupError(f'Could not find claim at "{url}".'), None, None, None + ) + elif isinstance(parsed, ValueError): + results[url] = ExpandedResolveResult( + parsed, None, None, None + ) + continue + + claim = claims[claim_hash] + stream = channel = None + # FIXME: signatures + + if parsed.has_stream_in_channel or (not isinstance(claim, Exception) and claim.channel_hash): + stream = claim + if not isinstance(claim, Exception): + channel = claims[claim.channel_hash] + elif url.lstrip('lbry://').startswith('@'): + channel = claim + else: + stream = claim + repost = reposted_channel = None + if claim and not isinstance(claim, Exception) and claim.reposted_claim_hash: + if claim.reposted_claim_hash in claims: + repost = claims[stream.reposted_claim_hash] + if repost and not isinstance(repost, Exception) and repost.channel_hash and repost.channel_hash in claims: + reposted_channel = claims[repost.channel_hash] + results[url] = ExpandedResolveResult(stream, channel, repost, reposted_channel) + return results async def resolve(self, url) -> ExpandedResolveResult: - return await asyncio.get_event_loop().run_in_executor(self._executor, self._resolve, url) + return (await self.batch_resolve_urls([url]))[url] def _fs_get_claim_by_hash(self, claim_hash): claim = self.get_cached_claim_txo(claim_hash) @@ -412,6 +519,256 @@ class SecondaryDB: return 0 return channel_count_val.count + async def _prepare_resolve_results(self, claim_hashes: List[bytes], include_extra: bool = True, + apply_blocking: bool = True, apply_filtering: bool = True): + # determine which claims are reposts and what they are reposts of + reposts = { + claim_hash: repost.reposted_claim_hash + async for (claim_hash, ), repost in self.prefix_db.repost.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in claim_hashes] + ) if repost + } + + # deduplicate the requested claim hashes plus any found reposts + claims_and_reposts = list(set(claim_hashes).union(set(reposts.values()))) + + # get the claim txos for the claim hashes (including reposts) + claims = { + claim_hash: claim + async for (claim_hash, ), claim in self.prefix_db.claim_to_txo.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in claims_and_reposts] + ) + } + + # get the channel hashes for all of the so far collected claim txos + channels = { + claim_hash: signed.signing_hash + async for (claim_hash, tx_num, position), signed in self.prefix_db.claim_to_channel.multi_get_async_gen( + self._executor, [ + (claim_hash, claims[claim_hash].tx_num, claims[claim_hash].position) + for claim_hash in claims_and_reposts if claims[claim_hash] is not None + ] + ) if signed + } + + # also look up any channel txos that we don't yet have (we could resolve @foo and @foo/test in one batch) + needed_channels = list({channel_hash for channel_hash in channels.values() if channel_hash not in claims}) + if needed_channels: + claims.update({ + claim_hash: claim + async for (claim_hash,), claim in self.prefix_db.claim_to_txo.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in needed_channels] + ) + }) + + # collect all of the controlling claims for the set of names we've accumulated + unique_names = list( + sorted({claim_txo.normalized_name for claim_txo in claims.values() if claim_txo is not None}) + ) + controlling_claims = { + name: takeover_v + async for (name, ), takeover_v in self.prefix_db.claim_takeover.multi_get_async_gen( + self._executor, [(name,) for name in unique_names] + ) + } + + # collect all of the tx hashes for the accumulated claim txos + claim_tx_hashes = { + tx_num: tx_hash + async for (tx_num, ), tx_hash in self.prefix_db.tx_hash.multi_get_async_gen( + self._executor, [(claim.tx_num,) for claim in claims.values() if claim is not None], False + ) + } + + # collect the short urls + # TODO: consider a dedicated index for this query to make it multi_get-able + run_in_executor = asyncio.get_event_loop().run_in_executor + short_urls = { + claim_hash: await run_in_executor( + self._executor, self.get_short_claim_id_url, + claim.name, claim.normalized_name, claim_hash, claim.root_tx_num, claim.root_position + ) for claim_hash, claim in claims.items() + if claim is not None + } + # collect all of the activation heights for the accumulated claims + activations_needed = { + (1, claim_txo.tx_num, claim_txo.position): claim_hash + for claim_hash, claim_txo in claims.items() + if claim_txo is not None + } + activations = { + activations_needed[k]: -1 if not activation else activation.height + async for k, activation in self.prefix_db.activated.multi_get_async_gen( + self._executor, list(activations_needed.keys()) + ) + } + # collect all of the support amounts for the accumulated claim txos + supports = { + claim_hash: 0 if support is None else support.amount + async for (claim_hash, ), support in self.prefix_db.support_amount.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in claims] + ) + } + # collect all of the counts of claims in channels for the accumulated claim txos + claims_in_channels = { + claim_hash: 0 if not v else v.count + async for (claim_hash, ), v in self.prefix_db.channel_count.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in claims] + ) + } + + repost_counts = { + claim_hash: await run_in_executor( + self._executor, self.get_reposted_count, claim_hash + ) + for claim_hash, claim in claims.items() if claim is not None + } + + effective_amounts = { + claim_hash: await run_in_executor( + self._executor, self.get_effective_amount, claim_hash + ) + for claim_hash, claim in claims.items() if claim is not None + } + + def _prepare_result(touched, claim_txo): + try: + normalized_name = normalize_name(claim_txo.name) + except UnicodeDecodeError: + normalized_name = claim_txo.name + effective_amount = effective_amounts[touched] + reposted_count = repost_counts[touched] + + tx_hash = claim_tx_hashes[claim_txo.tx_num] + claims_in_channel = claims_in_channels[touched] + activation = activations[touched] + support_amount = supports[touched] + reposted_claim_hash = reposts.get(touched) + channel_hash = channels.get(touched) + controlling_claim = controlling_claims[normalized_name] + claim_amount = claim_txo.amount + + height = bisect_right(self.tx_counts, claim_txo.tx_num) + created_height = bisect_right(self.tx_counts, claim_txo.root_tx_num) + expiration_height = self.coin.get_expiration_height(height) + last_take_over_height = controlling_claim.height + + reposted_tx_hash = None + reposted_tx_position = None + reposted_height = None + reposted_channel_hash = None + if reposted_claim_hash: + repost_txo = claims[reposted_claim_hash] + if repost_txo and repost_txo.tx_num in claim_tx_hashes: + reposted_tx_hash = claim_tx_hashes[repost_txo.tx_num] + reposted_tx_position = repost_txo.position + reposted_height = bisect_right(self.tx_counts, repost_txo.tx_num) + if reposted_claim_hash in channels: + reposted_channel_hash = channels[reposted_claim_hash] + short_url = short_urls[touched] + channel_tx_hash = None + channel_tx_position = None + channel_height = None + canonical_url = short_url + if channel_hash: + channel_txo = claims[channel_hash] + if channel_txo and channel_txo.tx_num in claim_tx_hashes: + channel_short_url = short_urls[channel_hash] + canonical_url = f'{channel_short_url}/{short_url}' + channel_tx_hash = claim_tx_hashes[channel_txo.tx_num] + channel_tx_position = channel_txo.position + channel_height = bisect_right(self.tx_counts, channel_txo.tx_num) + + if apply_blocking: + blocker_hash = self.blocked_streams.get(touched) or self.blocked_streams.get( + reposted_claim_hash) or self.blocked_channels.get(touched) or self.blocked_channels.get( + reposted_channel_hash) or self.blocked_channels.get(channel_hash) + if blocker_hash: + reason_row = self._fs_get_claim_by_hash(blocker_hash) + + return ResolveCensoredError(f'lbry://{canonical_url}', blocker_hash.hex(), censor_row=reason_row) + + if apply_filtering: + filter_hash = self.filtered_streams.get(touched) or self.filtered_streams.get( + reposted_claim_hash) or self.filtered_channels.get(touched) or self.filtered_channels.get( + reposted_channel_hash) or self.filtered_channels.get(channel_hash) + if filter_hash: + reason_row = self._fs_get_claim_by_hash(filter_hash) + return ResolveCensoredError(f'lbry://{canonical_url}', filter_hash.hex(), censor_row=reason_row) + + return ResolveResult( + claim_txo.name, normalized_name, touched, claim_txo.tx_num, claim_txo.position, tx_hash, height, + claim_amount, short_url=short_url, + is_controlling=controlling_claim.claim_hash == touched, canonical_url=canonical_url, + last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel, + creation_height=created_height, activation_height=activation, + expiration_height=expiration_height, effective_amount=effective_amount, + support_amount=support_amount, + channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash, + reposted=reposted_count, + signature_valid=None if not channel_hash else claim_txo.channel_signature_is_valid, + reposted_tx_hash=reposted_tx_hash, + reposted_tx_position=reposted_tx_position, reposted_height=reposted_height, + channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position, + channel_height=channel_height, + ) + + total_extra = {} + + for touched in claim_hashes: + extra = {} + claim_txo = claims[touched] + if not claim_txo: + yield touched, None, extra + continue + if touched in total_extra: + claim = total_extra[touched] + else: + claim = total_extra[touched] = _prepare_result(touched, claim_txo) + if isinstance(claim, Exception): + yield touched, claim, extra + continue + + if include_extra: + if claim.channel_hash: + channel_txo = claims[claim.channel_hash] + if channel_txo and not isinstance(channel_txo, Exception): + if claim.channel_hash in total_extra: + extra[claim.channel_hash] = total_extra[claim.channel_hash] + else: + extra[claim.channel_hash] = total_extra[claim.channel_hash] = _prepare_result( + claim.channel_hash, channel_txo + ) + if claim.reposted_claim_hash: + repost_txo = claims[claim.reposted_claim_hash] + if repost_txo and not isinstance(repost_txo, Exception): + if claim.reposted_claim_hash in total_extra: + extra[claim.reposted_claim_hash] = total_extra[claim.reposted_claim_hash] + else: + extra[claim.reposted_claim_hash] = total_extra[claim.reposted_claim_hash] = _prepare_result( + claim.reposted_claim_hash, repost_txo + ) + if not isinstance(claim, Exception) and claim.reposted_claim_hash in channels: + reposted_channel_hash = channels[claim.reposted_claim_hash] + repost_channel_txo = claims[reposted_channel_hash] + if repost_channel_txo and not isinstance(repost_channel_txo, Exception): + if reposted_channel_hash in total_extra: + extra[reposted_channel_hash] = total_extra[reposted_channel_hash] + else: + extra[reposted_channel_hash] = total_extra[reposted_channel_hash] = _prepare_result( + reposted_channel_hash, repost_channel_txo + ) + elif isinstance(repost_channel_txo, Exception): + extra[reposted_channel_hash] = repost_channel_txo + else: + pass # FIXME: lookup error + elif isinstance(repost_txo, Exception): + extra[claim.reposted_claim_hash] = repost_txo + else: + pass # FIXME: lookup error + + yield touched, claim, extra + def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]): streams, channels = {}, {} for reposter_channel_hash in reposter_channel_hashes: @@ -633,9 +990,28 @@ class SecondaryDB: if status: return status.hex() + def _get_hashX_statuses(self, hashXes: List[bytes]): + statuses = { + hashX: status + for hashX, status in zip(hashXes, self.prefix_db.hashX_mempool_status.multi_get( + [(hashX,) for hashX in hashXes], deserialize_value=False + )) if status is not None + } + if len(statuses) < len(hashXes): + statuses.update({ + hashX: status + for hashX, status in zip(hashXes, self.prefix_db.hashX_status.multi_get( + [(hashX,) for hashX in hashXes if hashX not in statuses], deserialize_value=False + )) if status is not None + }) + return [None if hashX not in statuses else statuses[hashX].hex() for hashX in hashXes] + async def get_hashX_status(self, hashX: bytes): return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_status, hashX) + async def get_hashX_statuses(self, hashXes: List[bytes]): + return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_statuses, hashXes) + def get_tx_hash(self, tx_num: int) -> bytes: if self._cache_all_tx_hashes: return self.total_transactions[tx_num] @@ -957,23 +1333,3 @@ class SecondaryDB: self.logger.warning(f'all_utxos: tx hash not ' f'found (reorg?), retrying...') await asyncio.sleep(0.25) - - async def lookup_utxos(self, prevouts): - def lookup_utxos(): - utxos = [] - utxo_append = utxos.append - for (tx_hash, nout) in prevouts: - tx_num_val = self.prefix_db.tx_num.get(tx_hash) - if not tx_num_val: - print("no tx num for ", tx_hash[::-1].hex()) - continue - tx_num = tx_num_val.tx_num - hashX_val = self.prefix_db.hashX_utxo.get(tx_hash[:4], tx_num, nout) - if not hashX_val: - continue - hashX = hashX_val.hashX - utxo_value = self.prefix_db.utxo.get(hashX, tx_num, nout) - if utxo_value: - utxo_append((hashX, utxo_value.amount)) - return utxos - return await asyncio.get_event_loop().run_in_executor(self._executor, lookup_utxos) diff --git a/hub/db/interface.py b/hub/db/interface.py index f3b7094..fef99b7 100644 --- a/hub/db/interface.py +++ b/hub/db/interface.py @@ -1,3 +1,4 @@ +import asyncio import struct import typing import rocksdb @@ -101,6 +102,22 @@ class PrefixRow(metaclass=PrefixRowType): handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args ] + async def multi_get_async_gen(self, executor, key_args: typing.List[typing.Tuple], deserialize_value=True, step=1000): + packed_keys = {self.pack_key(*args): args for args in key_args} + assert len(packed_keys) == len(key_args), 'duplicate partial keys given to multi_get_dict' + db_result = await asyncio.get_event_loop().run_in_executor( + executor, self._db.multi_get, [(self._column_family, key) for key in packed_keys] + ) + unpack_value = self.unpack_value + + def handle_value(v): + return None if v is None else v if not deserialize_value else unpack_value(v) + + for idx, (k, v) in enumerate((db_result or {}).items()): + yield (packed_keys[k[-1]], handle_value(v)) + if idx % step == 0: + await asyncio.sleep(0) + def stage_multi_put(self, items): self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items]) diff --git a/hub/elastic_sync/db.py b/hub/elastic_sync/db.py index 4081d24..0803383 100644 --- a/hub/elastic_sync/db.py +++ b/hub/elastic_sync/db.py @@ -218,49 +218,5 @@ class ElasticSyncDB(SecondaryDB): yield meta batch.clear() - def claim_producer(self, claim_hash: bytes) -> Optional[Dict]: - claim_txo = self.get_cached_claim_txo(claim_hash) - if not claim_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if not claim: - self.logger.warning("wat") - return - return self._prepare_claim_metadata(claim.claim_hash, claim) - def claims_producer(self, claim_hashes: Set[bytes]): - batch = [] - results = [] - for claim_hash in claim_hashes: - claim_txo = self.get_cached_claim_txo(claim_hash) - if not claim_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if claim: - batch.append(claim) - - batch.sort(key=lambda x: x.tx_hash) - - for claim in batch: - _meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if _meta: - results.append(_meta) - return results diff --git a/hub/elastic_sync/service.py b/hub/elastic_sync/service.py index c3b4074..4c96cc5 100644 --- a/hub/elastic_sync/service.py +++ b/hub/elastic_sync/service.py @@ -9,7 +9,7 @@ from hub.schema.result import Censor from hub.service import BlockchainReaderService from hub.common import IndexVersionMismatch, ALL_FIELDS, INDEX_DEFAULT_SETTINGS, expand_query from hub.db.revertable import RevertableOp -from hub.db.common import TrendingNotification, DB_PREFIXES +from hub.db.common import TrendingNotification, DB_PREFIXES, ResolveResult from hub.notifier_protocol import ElasticNotifierProtocol from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT from hub.elastic_sync.db import ElasticSyncDB @@ -219,10 +219,21 @@ class ElasticSyncService(BlockchainReaderService): async def _claim_producer(self): for deleted in self._deleted_claims: yield self._delete_claim_query(self.index, deleted) - for touched in self._touched_claims: - claim = self.db.claim_producer(touched) - if claim: - yield self._upsert_claim_query(self.index, claim) + + touched_claims = list(self._touched_claims) + + for idx in range(0, len(touched_claims), 1000): + batch = touched_claims[idx:idx+1000] + async for claim_hash, claim, _ in self.db._prepare_resolve_results(batch, include_extra=False, + apply_blocking=False, + apply_filtering=False): + if not claim: + self.log.warning("wat") + continue + claim = self.db._prepare_claim_metadata(claim.claim_hash, claim) + if claim: + yield self._upsert_claim_query(self.index, claim) + for claim_hash, notifications in self._trending.items(): yield self._update_trending_query(self.index, claim_hash, notifications) diff --git a/hub/herald/env.py b/hub/herald/env.py index da8f997..b412ab7 100644 --- a/hub/herald/env.py +++ b/hub/herald/env.py @@ -11,7 +11,7 @@ class ServerEnv(Env): session_timeout=None, drop_client=None, description=None, daily_fee=None, database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, - index_address_status=None): + index_address_status=None, address_history_cache_size=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') @@ -50,6 +50,8 @@ class ServerEnv(Env): self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0') self.database_query_timeout = (database_query_timeout / 1000.0) if database_query_timeout is not None else \ (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) + self.hashX_history_cache_size = address_history_cache_size if address_history_cache_size is not None \ + else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) @classmethod def contribute_to_arg_parser(cls, parser): @@ -96,6 +98,11 @@ class ServerEnv(Env): parser.add_argument('--daily_fee', default=cls.default('DAILY_FEE', '0'), type=str) parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000), help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'") + parser.add_argument('--address_history_cache_size', type=int, + default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000), + help="Size of the lru cache of address histories. " + "Can be set in the env with 'ADDRESS_HISTORY_CACHE_SIZE'") + @classmethod def from_arg_parser(cls, args): return cls( @@ -110,5 +117,6 @@ class ServerEnv(Env): drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, - elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses + elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses, + address_history_cache_size=args.address_history_cache_size ) diff --git a/hub/herald/mempool.py b/hub/herald/mempool.py index 26e1ad7..67400d1 100644 --- a/hub/herald/mempool.py +++ b/hub/herald/mempool.py @@ -242,7 +242,7 @@ class HubMemPool: (self.session_manager.hsub_results[session.subscribe_headers_raw],)) ) if hashXes: - asyncio.create_task(session.send_history_notifications(*hashXes)) + asyncio.create_task(session.send_history_notifications(hashXes)) async def _notify_sessions(self, height, touched, new_touched): """Notify sessions about height changes and touched addresses.""" diff --git a/hub/herald/session.py b/hub/herald/session.py index 79d3f59..1279113 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -22,7 +22,7 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS -from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS +from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, asyncify_for_loop from hub.common import LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification @@ -1181,33 +1181,39 @@ class LBRYElectrumX(asyncio.Protocol): status = sha256(history.encode()) return status.hex() - async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): + async def get_hashX_statuses(self, hashXes: typing.List[bytes]): + if self.env.index_address_status: + return await self.db.get_hashX_statuses(hashXes) + return [await self.get_hashX_status(hashX) for hashX in hashXes] + + async def send_history_notifications(self, hashXes: typing.List[bytes]): notifications = [] - for hashX in hashXes: + start = time.perf_counter() + statuses = await self.get_hashX_statuses(hashXes) + duration = time.perf_counter() - start + self.session_manager.address_history_metric.observe(duration) + start = time.perf_counter() + scripthash_notifications = 0 + address_notifications = 0 + for hashX, status in zip(hashXes, statuses): alias = self.hashX_subs[hashX] if len(alias) == 64: method = 'blockchain.scripthash.subscribe' + scripthash_notifications += 1 else: method = 'blockchain.address.subscribe' - start = time.perf_counter() - status = await self.get_hashX_status(hashX) - duration = time.perf_counter() - start - self.session_manager.address_history_metric.observe(duration) - notifications.append((method, (alias, status))) - if duration > 30: - self.logger.warning("slow history notification (%s) for '%s'", duration, alias) - - start = time.perf_counter() - self.session_manager.notifications_in_flight_metric.inc() - for method, args in notifications: - self.NOTIFICATION_COUNT.labels(method=method,).inc() + address_notifications += 1 + notifications.append(Notification(method, (alias, status))) + if scripthash_notifications: + self.NOTIFICATION_COUNT.labels(method='blockchain.scripthash.subscribe',).inc(scripthash_notifications) + if address_notifications: + self.NOTIFICATION_COUNT.labels(method='blockchain.address.subscribe', ).inc(address_notifications) + self.session_manager.notifications_in_flight_metric.inc(len(notifications)) try: - await self.send_notifications( - Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications]) - ) + await self.send_notifications(Batch(notifications)) self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start) finally: - self.session_manager.notifications_in_flight_metric.dec() + self.session_manager.notifications_in_flight_metric.dec(len(notifications)) # def get_metrics_or_placeholder_for_api(self, query_name): # """ Do not hold on to a reference to the metrics @@ -1267,22 +1273,28 @@ class LBRYElectrumX(asyncio.Protocol): self.session_manager.pending_query_metric.dec() self.session_manager.executor_time_metric.observe(time.perf_counter() - start) - async def _cached_resolve_url(self, url): - if url not in self.session_manager.resolve_cache: - self.session_manager.resolve_cache[url] = await self.loop.run_in_executor(self.db._executor, self.db._resolve, url) - return self.session_manager.resolve_cache[url] - async def claimtrie_resolve(self, *urls) -> str: - sorted_urls = tuple(sorted(urls)) - self.session_manager.urls_to_resolve_count_metric.inc(len(sorted_urls)) + # sorted_urls = tuple(sorted(urls)) + self.session_manager.urls_to_resolve_count_metric.inc(len(urls)) try: - if sorted_urls in self.session_manager.resolve_outputs_cache: - return self.session_manager.resolve_outputs_cache[sorted_urls] + # if sorted_urls in self.session_manager.resolve_outputs_cache: + # return self.session_manager.resolve_outputs_cache[sorted_urls] rows, extra = [], [] + resolved = {} + needed = defaultdict(list) + for idx, url in enumerate(urls): + if url in self.session_manager.resolve_cache: + stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url] + resolved[url] = stream, channel, repost, reposted_channel + else: + needed[url].append(idx) + if needed: + resolved_needed = await self.db.batch_resolve_urls(list(needed)) + for url, resolve_result in resolved_needed.items(): + self.session_manager.resolve_cache[url] = resolve_result + resolved.update(resolved_needed) for url in urls: - if url not in self.session_manager.resolve_cache: - self.session_manager.resolve_cache[url] = await self._cached_resolve_url(url) - stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url] + (stream, channel, repost, reposted_channel) = resolved[url] if isinstance(channel, ResolveCensoredError): rows.append(channel) extra.append(channel.censor_row) @@ -1297,28 +1309,24 @@ class LBRYElectrumX(asyncio.Protocol): extra.append(reposted_channel.censor_row) elif channel and not stream: rows.append(channel) - # print("resolved channel", channel.name.decode()) if repost: extra.append(repost) if reposted_channel: extra.append(reposted_channel) elif stream: - # print("resolved stream", stream.name.decode()) rows.append(stream) if channel: - # print("and channel", channel.name.decode()) extra.append(channel) if repost: extra.append(repost) if reposted_channel: extra.append(reposted_channel) - await asyncio.sleep(0) - self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( - None, Outputs.to_base64, rows, extra - ) - return result + # await asyncio.sleep(0) + + # self.session_manager.resolve_outputs_cache[sorted_urls] = result = Outputs.to_base64(rows, extra) + return Outputs.to_base64(rows, extra) finally: - self.session_manager.resolved_url_count_metric.inc(len(sorted_urls)) + self.session_manager.resolved_url_count_metric.inc(len(urls)) async def get_server_height(self): return self.db.db_height @@ -1470,12 +1478,13 @@ class LBRYElectrumX(asyncio.Protocol): address: the address to subscribe to""" if len(addresses) > 1000: raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') - results = [] + hashXes = [item async for item in asyncify_for_loop((self.address_to_hashX(address) for address in addresses), 100)] + statuses = await self.get_hashX_statuses(hashXes) + for hashX, alias in zip(hashXes, addresses): + self.hashX_subs[hashX] = alias + self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self)) self.session_manager.address_subscription_metric.inc(len(addresses)) - for address in addresses: - results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) - await asyncio.sleep(0) - return results + return statuses async def address_unsubscribe(self, address): """Unsubscribe an address. diff --git a/hub/scribe/env.py b/hub/scribe/env.py index e96669a..6bc32c2 100644 --- a/hub/scribe/env.py +++ b/hub/scribe/env.py @@ -28,7 +28,7 @@ class BlockchainEnv(Env): help='This setting translates into the max_open_files option given to rocksdb. ' 'A higher number will use more memory. Defaults to 64.') parser.add_argument('--address_history_cache_size', type=int, - default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000), + default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 2 ** 13), help="LRU cache size for address histories, used when processing new blocks " "and when processing mempool updates. Can be set in env with " "'ADDRESS_HISTORY_CACHE_SIZE'") diff --git a/hub/scribe/service.py b/hub/scribe/service.py index 9dedc30..ed46b0a 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -118,8 +118,8 @@ class BlockchainProcessorService(BlockchainService): self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} - self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) - self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) + self.hashX_history_cache = LRUCache(max(100, env.hashX_history_cache_size)) + self.hashX_full_cache = LRUCache(max(100, env.hashX_history_cache_size)) self.history_tx_info_cache = LRUCache(2 ** 16) def open_db(self):