batched resolve using multi_get

This commit is contained in:
Jack Robison 2022-06-09 17:20:57 -04:00
parent 287de0807c
commit 5d44018018
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 515 additions and 172 deletions

View file

@ -18,9 +18,9 @@ from hub.schema.url import URL, normalize_name
from hub.schema.claim import guess_stream_type from hub.schema.claim import guess_stream_type
from hub.schema.result import Censor from hub.schema.result import Censor
from hub.scribe.transaction import TxInput from hub.scribe.transaction import TxInput
from hub.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256, STREAM_TYPES, CLAIM_TYPES from hub.common import hash_to_hex_str, LRUCacheWithMetrics
from hub.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem from hub.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
from hub.db.common import ResolveResult,ExpandedResolveResult, DBError, UTXO from hub.db.common import ResolveResult, ExpandedResolveResult, DBError, UTXO
from hub.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from hub.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from hub.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey from hub.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey
from hub.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow from hub.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow
@ -220,123 +220,230 @@ class SecondaryDB:
channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position, channel_height=channel_height, channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position, channel_height=channel_height,
) )
def _resolve_parsed_url(self, name: str, claim_id: Optional[str] = None, async def _batch_resolve_parsed_urls(self, args: List[Tuple[str, Optional[str], Optional[int]]]) -> List[Optional[bytes]]:
amount_order: Optional[int] = None) -> Optional[ResolveResult]: # list of name, claim id tuples (containing one or the other)
""" # this is to preserve the ordering
:param normalized_name: name needed: List[Tuple[Optional[str], Optional[bytes]]] = []
:param claim_id: partial or complete claim id needed_full_claim_hashes = {}
:param amount_order: '$<value>' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided run_in_executor = asyncio.get_event_loop().run_in_executor
"""
def get_txo_for_partial_claim_id(normalized: str, claim_id: str):
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized, claim_id[:10])):
return claim_txo.tx_num, claim_txo.position
def get_claim_by_amount(normalized: str, order: int):
order = max(int(order or 1), 1)
for _idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized,))):
if order > _idx + 1:
continue
return claim_val.claim_hash
for idx, (name, claim_id, amount_order) in enumerate(args):
try: try:
normalized_name = normalize_name(name) normalized_name = normalize_name(name)
except UnicodeDecodeError: except UnicodeDecodeError:
normalized_name = name normalized_name = name
if (not amount_order and not claim_id) or amount_order == 1: if (not amount_order and not claim_id) or amount_order == 1:
# winning resolution # winning resolution
controlling = self.get_controlling_claim(normalized_name) needed.append((normalized_name, None))
if not controlling: continue
# print(f"none controlling for lbry://{normalized_name}")
return
# print(f"resolved controlling lbry://{normalized_name}#{controlling.claim_hash.hex()}")
return self._fs_get_claim_by_hash(controlling.claim_hash)
amount_order = max(int(amount_order or 1), 1) full_claim_hash = None
if claim_id: if claim_id:
if len(claim_id) == 40: # a full claim id if len(claim_id) == 40: # a full claim id
claim_txo = self.get_claim_txo(bytes.fromhex(claim_id)) needed.append((None, bytes.fromhex(claim_id)))
if not claim_txo or normalized_name != claim_txo.normalized_name:
return
return self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, bytes.fromhex(claim_id), claim_txo.name,
claim_txo.root_tx_num, claim_txo.root_position,
self.get_activation(claim_txo.tx_num, claim_txo.position), claim_txo.channel_signature_is_valid
)
# resolve by partial/complete claim id
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])):
full_claim_hash = self.get_cached_claim_hash(claim_txo.tx_num, claim_txo.position)
c = self.get_cached_claim_txo(full_claim_hash)
non_normalized_name = c.name
signature_is_valid = c.channel_signature_is_valid
return self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, full_claim_hash, non_normalized_name, key.root_tx_num,
key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position),
signature_is_valid
)
return
# resolve by amount ordering, 1 indexed
for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))):
if amount_order > idx + 1:
continue continue
claim_txo = self.get_cached_claim_txo(claim_val.claim_hash)
activation = self.get_activation(key.tx_num, key.position) # resolve by partial/complete claim id
return self._prepare_resolve_result( txo = await run_in_executor(self._executor, get_txo_for_partial_claim_id, normalized_name, claim_id)
key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num, if txo:
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid needed_full_claim_hashes[idx] = txo
needed.append((None, None))
continue
# resolve by amount ordering, 1 indexed
needed.append((None, await run_in_executor(self._executor, get_claim_by_amount, normalized_name, amount_order)))
# fetch the full claim hashes needed from urls with partial claim ids
if needed_full_claim_hashes:
idxs = list(needed_full_claim_hashes.keys())
position = 0
async for _, v in self.prefix_db.txo_to_claim.multi_get_async_gen(self._executor, list(needed_full_claim_hashes.values())):
idx = idxs[position]
needed[idx] = None, v.claim_hash
position += 1
# fetch the winning claim hashes for the urls using winning resolution
needed_winning = list(set(normalized_name for normalized_name, _ in needed if normalized_name is not None))
winning_indexes = [idx for idx in range(len(needed)) if needed[idx][0] is not None]
controlling_claims = {
name: takeover_v async for (name,), takeover_v in self.prefix_db.claim_takeover.multi_get_async_gen(
self._executor, [(name,) for name in needed_winning]
) )
return }
for idx in winning_indexes:
name = needed[idx][0]
controlling = controlling_claims[name]
if controlling:
needed[idx] = name, controlling.claim_hash
return [
claim_hash for _, claim_hash in needed
]
def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str, stream_claim_id: Optional[str] = None): def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str, stream_claim_id: Optional[str] = None):
for key, stream in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash, normalized_name)): for key, stream in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash, normalized_name)):
if stream_claim_id is not None and not stream.claim_hash.hex().startswith(stream_claim_id): if stream_claim_id is not None and not stream.claim_hash.hex().startswith(stream_claim_id):
continue continue
return stream.claim_hash, key.tx_num, key.position, self.get_effective_amount(stream.claim_hash) return stream.claim_hash
def _resolve(self, url) -> ExpandedResolveResult: async def batch_resolve_urls(self, urls: List[str]) -> Dict[str, ExpandedResolveResult]:
"""
Resolve a list of urls to a dictionary of urls to claims,
including any extra claim(s) to expand the result, these could be a channel, a repost, or a repost channel
"""
run_in_executor = asyncio.get_event_loop().run_in_executor
# this is split into two stages, first we map the urls to primary claim hashes they resolve to
# then the claims are collected in a batch, which also collects the extra claims needed for the primary matches
# prepare to resolve all of the outer most levels of the urls - the first name in a url, a stream or a channel
needed: List[Tuple[str, str, int]] = [] # (name, partial claim id, amount order) of a url
needed_streams_in_channels = defaultdict(list)
parsed_urls = {}
url_parts_to_resolve = {}
check_if_has_channel = set()
resolved_urls = {}
urls_to_parts_mapping = {}
for url in urls:
need_args = None
try: try:
parsed = URL.parse(url) parsed = URL.parse(url)
parsed_urls[url] = parsed
except ValueError as e: except ValueError as e:
return ExpandedResolveResult(e, None, None, None) parsed_urls[url] = e
continue
stream = channel = resolved_channel = resolved_stream = None stream = channel = None
if parsed.has_stream_in_channel: if parsed.has_stream_in_channel:
channel = parsed.channel
stream = parsed.stream stream = parsed.stream
channel = parsed.channel
need_args = (channel.name, channel.claim_id, channel.amount_order)
needed_streams_in_channels[url].append(stream)
elif parsed.has_channel: elif parsed.has_channel:
channel = parsed.channel channel = parsed.channel
need_args = (channel.name, channel.claim_id, channel.amount_order)
elif parsed.has_stream: elif parsed.has_stream:
stream = parsed.stream stream = parsed.stream
if channel: need_args = (stream.name, stream.claim_id, stream.amount_order)
resolved_channel = self._resolve_parsed_url(channel.name, channel.claim_id, channel.amount_order) check_if_has_channel.add(url)
if not resolved_channel: if need_args:
return ExpandedResolveResult(None, LookupError(f'Could not find channel in "{url}".'), None, None) needed.append(need_args)
if stream: url_parts_to_resolve[url] = need_args
if resolved_channel: urls_to_parts_mapping[need_args] = url
stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized, stream.claim_id)
if stream_claim:
stream_claim_id, stream_tx_num, stream_tx_pos, effective_amount = stream_claim
resolved_stream = self._fs_get_claim_by_hash(stream_claim_id)
else:
resolved_stream = self._resolve_parsed_url(stream.name, stream.claim_id, stream.amount_order)
if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash:
resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash)
if not resolved_stream:
return ExpandedResolveResult(LookupError(f'Could not find claim at "{url}".'), None, None, None)
repost = None # collect the claim hashes for the outer layer claims in the urls
reposted_channel = None outer_layer_claims = {
if resolved_stream or resolved_channel: urls_to_parts_mapping[args]: claim_hash for args, claim_hash in zip(
claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash needed, await self._batch_resolve_parsed_urls(needed)
claim = resolved_stream if resolved_stream else resolved_channel
reposted_claim_hash = resolved_stream.reposted_claim_hash if resolved_stream else None
blocker_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
if blocker_hash:
reason_row = self._fs_get_claim_by_hash(blocker_hash)
return ExpandedResolveResult(
None, ResolveCensoredError(url, blocker_hash.hex(), censor_row=reason_row), None, None
) )
if claim.reposted_claim_hash: }
repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash)
if repost and repost.channel_hash and repost.signature_valid: # needed_claims is a set of the total claim hashes to look up
reposted_channel = self._fs_get_claim_by_hash(repost.channel_hash) needed_claims = set(claim_hash for claim_hash in outer_layer_claims.values() if claim_hash is not None)
return ExpandedResolveResult(resolved_stream, resolved_channel, repost, reposted_channel) for url, claim_hash in outer_layer_claims.items():
# if it's a stream not in a channel or is a bare channel then this claim is all that's needed for the url
if url not in needed_streams_in_channels:
if claim_hash:
resolved_urls[url] = claim_hash
needed_claims.add(claim_hash)
# check if any claims we've accumulated are in channels, add the channels to the set of needed claims
if needed_claims:
claims_to_check_if_in_channel = list(needed_claims)
txos = {
claim_hash: txo
async for (claim_hash, ), txo in self.prefix_db.claim_to_txo.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims_to_check_if_in_channel]
)
}
needed_claims.update({
claim.signing_hash
async for _, claim in self.prefix_db.claim_to_channel.multi_get_async_gen(
self._executor, [
(claim_hash, txos[claim_hash].tx_num, txos[claim_hash].position)
for claim_hash in needed_claims if txos[claim_hash] is not None
]
)
if claim is not None and claim.signing_hash is not None
})
# add the stream claim hashes for urls with channel streams to the needed set
for url, streams in needed_streams_in_channels.items():
resolved_channel_hash = outer_layer_claims.get(url)
if not resolved_channel_hash:
continue
for stream in streams:
stream_claim_hash = await run_in_executor(
self._executor, self._resolve_claim_in_channel, resolved_channel_hash, stream.normalized,
stream.claim_id
)
if stream_claim_hash: # set the result claim hash to the stream
resolved_urls[url] = stream_claim_hash
needed_claims.add(stream_claim_hash)
# collect all of the claim ResolveResults for the urls
claims = {}
if needed_claims:
async for claim_hash, claim, extra in self._prepare_resolve_results(
list(needed_claims), apply_filtering=False):
claims[claim_hash] = claim # the primary result
if extra: # extra results (channels, reposts, repost channels)
claims.update(extra)
results = {}
for url in urls:
claim_hash = resolved_urls.get(url)
parsed = parsed_urls[url]
if not claim_hash or not claims[claim_hash]:
if not isinstance(parsed, Exception) and parsed.has_channel and not parsed.has_stream:
results[url] = ExpandedResolveResult(
None, LookupError(f'Could not find channel in "{url}".'), None, None
)
elif not isinstance(parsed, Exception) and (parsed.has_stream_in_channel or parsed.has_stream):
results[url] = ExpandedResolveResult(
LookupError(f'Could not find claim at "{url}".'), None, None, None
)
elif isinstance(parsed, ValueError):
results[url] = ExpandedResolveResult(
parsed, None, None, None
)
continue
claim = claims[claim_hash]
stream = channel = None
# FIXME: signatures
if parsed.has_stream_in_channel or (not isinstance(claim, Exception) and claim.channel_hash):
stream = claim
if not isinstance(claim, Exception):
channel = claims[claim.channel_hash]
elif url.lstrip('lbry://').startswith('@'):
channel = claim
else:
stream = claim
repost = reposted_channel = None
if claim and not isinstance(claim, Exception) and claim.reposted_claim_hash:
if claim.reposted_claim_hash in claims:
repost = claims[stream.reposted_claim_hash]
if repost and not isinstance(repost, Exception) and repost.channel_hash and repost.channel_hash in claims:
reposted_channel = claims[repost.channel_hash]
results[url] = ExpandedResolveResult(stream, channel, repost, reposted_channel)
return results
async def resolve(self, url) -> ExpandedResolveResult: async def resolve(self, url) -> ExpandedResolveResult:
return await asyncio.get_event_loop().run_in_executor(self._executor, self._resolve, url) return (await self.batch_resolve_urls([url]))[url]
def _fs_get_claim_by_hash(self, claim_hash): def _fs_get_claim_by_hash(self, claim_hash):
claim = self.get_cached_claim_txo(claim_hash) claim = self.get_cached_claim_txo(claim_hash)
@ -412,6 +519,256 @@ class SecondaryDB:
return 0 return 0
return channel_count_val.count return channel_count_val.count
async def _prepare_resolve_results(self, claim_hashes: List[bytes], include_extra: bool = True,
apply_blocking: bool = True, apply_filtering: bool = True):
# determine which claims are reposts and what they are reposts of
reposts = {
claim_hash: repost.reposted_claim_hash
async for (claim_hash, ), repost in self.prefix_db.repost.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claim_hashes]
) if repost
}
# deduplicate the requested claim hashes plus any found reposts
claims_and_reposts = list(set(claim_hashes).union(set(reposts.values())))
# get the claim txos for the claim hashes (including reposts)
claims = {
claim_hash: claim
async for (claim_hash, ), claim in self.prefix_db.claim_to_txo.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims_and_reposts]
)
}
# get the channel hashes for all of the so far collected claim txos
channels = {
claim_hash: signed.signing_hash
async for (claim_hash, tx_num, position), signed in self.prefix_db.claim_to_channel.multi_get_async_gen(
self._executor, [
(claim_hash, claims[claim_hash].tx_num, claims[claim_hash].position)
for claim_hash in claims_and_reposts if claims[claim_hash] is not None
]
) if signed
}
# also look up any channel txos that we don't yet have (we could resolve @foo and @foo/test in one batch)
needed_channels = list({channel_hash for channel_hash in channels.values() if channel_hash not in claims})
if needed_channels:
claims.update({
claim_hash: claim
async for (claim_hash,), claim in self.prefix_db.claim_to_txo.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in needed_channels]
)
})
# collect all of the controlling claims for the set of names we've accumulated
unique_names = list(
sorted({claim_txo.normalized_name for claim_txo in claims.values() if claim_txo is not None})
)
controlling_claims = {
name: takeover_v
async for (name, ), takeover_v in self.prefix_db.claim_takeover.multi_get_async_gen(
self._executor, [(name,) for name in unique_names]
)
}
# collect all of the tx hashes for the accumulated claim txos
claim_tx_hashes = {
tx_num: tx_hash
async for (tx_num, ), tx_hash in self.prefix_db.tx_hash.multi_get_async_gen(
self._executor, [(claim.tx_num,) for claim in claims.values() if claim is not None], False
)
}
# collect the short urls
# TODO: consider a dedicated index for this query to make it multi_get-able
run_in_executor = asyncio.get_event_loop().run_in_executor
short_urls = {
claim_hash: await run_in_executor(
self._executor, self.get_short_claim_id_url,
claim.name, claim.normalized_name, claim_hash, claim.root_tx_num, claim.root_position
) for claim_hash, claim in claims.items()
if claim is not None
}
# collect all of the activation heights for the accumulated claims
activations_needed = {
(1, claim_txo.tx_num, claim_txo.position): claim_hash
for claim_hash, claim_txo in claims.items()
if claim_txo is not None
}
activations = {
activations_needed[k]: -1 if not activation else activation.height
async for k, activation in self.prefix_db.activated.multi_get_async_gen(
self._executor, list(activations_needed.keys())
)
}
# collect all of the support amounts for the accumulated claim txos
supports = {
claim_hash: 0 if support is None else support.amount
async for (claim_hash, ), support in self.prefix_db.support_amount.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims]
)
}
# collect all of the counts of claims in channels for the accumulated claim txos
claims_in_channels = {
claim_hash: 0 if not v else v.count
async for (claim_hash, ), v in self.prefix_db.channel_count.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims]
)
}
repost_counts = {
claim_hash: await run_in_executor(
self._executor, self.get_reposted_count, claim_hash
)
for claim_hash, claim in claims.items() if claim is not None
}
effective_amounts = {
claim_hash: await run_in_executor(
self._executor, self.get_effective_amount, claim_hash
)
for claim_hash, claim in claims.items() if claim is not None
}
def _prepare_result(touched, claim_txo):
try:
normalized_name = normalize_name(claim_txo.name)
except UnicodeDecodeError:
normalized_name = claim_txo.name
effective_amount = effective_amounts[touched]
reposted_count = repost_counts[touched]
tx_hash = claim_tx_hashes[claim_txo.tx_num]
claims_in_channel = claims_in_channels[touched]
activation = activations[touched]
support_amount = supports[touched]
reposted_claim_hash = reposts.get(touched)
channel_hash = channels.get(touched)
controlling_claim = controlling_claims[normalized_name]
claim_amount = claim_txo.amount
height = bisect_right(self.tx_counts, claim_txo.tx_num)
created_height = bisect_right(self.tx_counts, claim_txo.root_tx_num)
expiration_height = self.coin.get_expiration_height(height)
last_take_over_height = controlling_claim.height
reposted_tx_hash = None
reposted_tx_position = None
reposted_height = None
reposted_channel_hash = None
if reposted_claim_hash:
repost_txo = claims[reposted_claim_hash]
if repost_txo and repost_txo.tx_num in claim_tx_hashes:
reposted_tx_hash = claim_tx_hashes[repost_txo.tx_num]
reposted_tx_position = repost_txo.position
reposted_height = bisect_right(self.tx_counts, repost_txo.tx_num)
if reposted_claim_hash in channels:
reposted_channel_hash = channels[reposted_claim_hash]
short_url = short_urls[touched]
channel_tx_hash = None
channel_tx_position = None
channel_height = None
canonical_url = short_url
if channel_hash:
channel_txo = claims[channel_hash]
if channel_txo and channel_txo.tx_num in claim_tx_hashes:
channel_short_url = short_urls[channel_hash]
canonical_url = f'{channel_short_url}/{short_url}'
channel_tx_hash = claim_tx_hashes[channel_txo.tx_num]
channel_tx_position = channel_txo.position
channel_height = bisect_right(self.tx_counts, channel_txo.tx_num)
if apply_blocking:
blocker_hash = self.blocked_streams.get(touched) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(touched) or self.blocked_channels.get(
reposted_channel_hash) or self.blocked_channels.get(channel_hash)
if blocker_hash:
reason_row = self._fs_get_claim_by_hash(blocker_hash)
return ResolveCensoredError(f'lbry://{canonical_url}', blocker_hash.hex(), censor_row=reason_row)
if apply_filtering:
filter_hash = self.filtered_streams.get(touched) or self.filtered_streams.get(
reposted_claim_hash) or self.filtered_channels.get(touched) or self.filtered_channels.get(
reposted_channel_hash) or self.filtered_channels.get(channel_hash)
if filter_hash:
reason_row = self._fs_get_claim_by_hash(filter_hash)
return ResolveCensoredError(f'lbry://{canonical_url}', filter_hash.hex(), censor_row=reason_row)
return ResolveResult(
claim_txo.name, normalized_name, touched, claim_txo.tx_num, claim_txo.position, tx_hash, height,
claim_amount, short_url=short_url,
is_controlling=controlling_claim.claim_hash == touched, canonical_url=canonical_url,
last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel,
creation_height=created_height, activation_height=activation,
expiration_height=expiration_height, effective_amount=effective_amount,
support_amount=support_amount,
channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash,
reposted=reposted_count,
signature_valid=None if not channel_hash else claim_txo.channel_signature_is_valid,
reposted_tx_hash=reposted_tx_hash,
reposted_tx_position=reposted_tx_position, reposted_height=reposted_height,
channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position,
channel_height=channel_height,
)
total_extra = {}
for touched in claim_hashes:
extra = {}
claim_txo = claims[touched]
if not claim_txo:
yield touched, None, extra
continue
if touched in total_extra:
claim = total_extra[touched]
else:
claim = total_extra[touched] = _prepare_result(touched, claim_txo)
if isinstance(claim, Exception):
yield touched, claim, extra
continue
if include_extra:
if claim.channel_hash:
channel_txo = claims[claim.channel_hash]
if channel_txo and not isinstance(channel_txo, Exception):
if claim.channel_hash in total_extra:
extra[claim.channel_hash] = total_extra[claim.channel_hash]
else:
extra[claim.channel_hash] = total_extra[claim.channel_hash] = _prepare_result(
claim.channel_hash, channel_txo
)
if claim.reposted_claim_hash:
repost_txo = claims[claim.reposted_claim_hash]
if repost_txo and not isinstance(repost_txo, Exception):
if claim.reposted_claim_hash in total_extra:
extra[claim.reposted_claim_hash] = total_extra[claim.reposted_claim_hash]
else:
extra[claim.reposted_claim_hash] = total_extra[claim.reposted_claim_hash] = _prepare_result(
claim.reposted_claim_hash, repost_txo
)
if not isinstance(claim, Exception) and claim.reposted_claim_hash in channels:
reposted_channel_hash = channels[claim.reposted_claim_hash]
repost_channel_txo = claims[reposted_channel_hash]
if repost_channel_txo and not isinstance(repost_channel_txo, Exception):
if reposted_channel_hash in total_extra:
extra[reposted_channel_hash] = total_extra[reposted_channel_hash]
else:
extra[reposted_channel_hash] = total_extra[reposted_channel_hash] = _prepare_result(
reposted_channel_hash, repost_channel_txo
)
elif isinstance(repost_channel_txo, Exception):
extra[reposted_channel_hash] = repost_channel_txo
else:
pass # FIXME: lookup error
elif isinstance(repost_txo, Exception):
extra[claim.reposted_claim_hash] = repost_txo
else:
pass # FIXME: lookup error
yield touched, claim, extra
def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]): def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]):
streams, channels = {}, {} streams, channels = {}, {}
for reposter_channel_hash in reposter_channel_hashes: for reposter_channel_hash in reposter_channel_hashes:

View file

@ -1,3 +1,4 @@
import asyncio
import struct import struct
import typing import typing
import rocksdb import rocksdb
@ -101,6 +102,22 @@ class PrefixRow(metaclass=PrefixRowType):
handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args
] ]
async def multi_get_async_gen(self, executor, key_args: typing.List[typing.Tuple], deserialize_value=True, step=1000):
packed_keys = {self.pack_key(*args): args for args in key_args}
assert len(packed_keys) == len(key_args), 'duplicate partial keys given to multi_get_dict'
db_result = await asyncio.get_event_loop().run_in_executor(
executor, self._db.multi_get, [(self._column_family, key) for key in packed_keys]
)
unpack_value = self.unpack_value
def handle_value(v):
return None if v is None else v if not deserialize_value else unpack_value(v)
for idx, (k, v) in enumerate((db_result or {}).items()):
yield (packed_keys[k[-1]], handle_value(v))
if idx % step == 0:
await asyncio.sleep(0)
def stage_multi_put(self, items): def stage_multi_put(self, items):
self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items]) self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])

View file

@ -218,49 +218,5 @@ class ElasticSyncDB(SecondaryDB):
yield meta yield meta
batch.clear() batch.clear()
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if not claim:
self.logger.warning("wat")
return
return self._prepare_claim_metadata(claim.claim_hash, claim)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
for claim_hash in claim_hashes:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results

View file

@ -9,7 +9,7 @@ from hub.schema.result import Censor
from hub.service import BlockchainReaderService from hub.service import BlockchainReaderService
from hub.common import IndexVersionMismatch, ALL_FIELDS, INDEX_DEFAULT_SETTINGS, expand_query from hub.common import IndexVersionMismatch, ALL_FIELDS, INDEX_DEFAULT_SETTINGS, expand_query
from hub.db.revertable import RevertableOp from hub.db.revertable import RevertableOp
from hub.db.common import TrendingNotification, DB_PREFIXES from hub.db.common import TrendingNotification, DB_PREFIXES, ResolveResult
from hub.notifier_protocol import ElasticNotifierProtocol from hub.notifier_protocol import ElasticNotifierProtocol
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
from hub.elastic_sync.db import ElasticSyncDB from hub.elastic_sync.db import ElasticSyncDB
@ -219,10 +219,21 @@ class ElasticSyncService(BlockchainReaderService):
async def _claim_producer(self): async def _claim_producer(self):
for deleted in self._deleted_claims: for deleted in self._deleted_claims:
yield self._delete_claim_query(self.index, deleted) yield self._delete_claim_query(self.index, deleted)
for touched in self._touched_claims:
claim = self.db.claim_producer(touched) touched_claims = list(self._touched_claims)
for idx in range(0, len(touched_claims), 1000):
batch = touched_claims[idx:idx+1000]
async for claim_hash, claim, _ in self.db._prepare_resolve_results(batch, include_extra=False,
apply_blocking=False,
apply_filtering=False):
if not claim:
self.log.warning("wat")
continue
claim = self.db._prepare_claim_metadata(claim.claim_hash, claim)
if claim: if claim:
yield self._upsert_claim_query(self.index, claim) yield self._upsert_claim_query(self.index, claim)
for claim_hash, notifications in self._trending.items(): for claim_hash, notifications in self._trending.items():
yield self._update_trending_query(self.index, claim_hash, notifications) yield self._update_trending_query(self.index, claim_hash, notifications)

View file

@ -1273,22 +1273,28 @@ class LBRYElectrumX(asyncio.Protocol):
self.session_manager.pending_query_metric.dec() self.session_manager.pending_query_metric.dec()
self.session_manager.executor_time_metric.observe(time.perf_counter() - start) self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
async def _cached_resolve_url(self, url):
if url not in self.session_manager.resolve_cache:
self.session_manager.resolve_cache[url] = await self.loop.run_in_executor(self.db._executor, self.db._resolve, url)
return self.session_manager.resolve_cache[url]
async def claimtrie_resolve(self, *urls) -> str: async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls)) # sorted_urls = tuple(sorted(urls))
self.session_manager.urls_to_resolve_count_metric.inc(len(sorted_urls)) self.session_manager.urls_to_resolve_count_metric.inc(len(urls))
try: try:
if sorted_urls in self.session_manager.resolve_outputs_cache: # if sorted_urls in self.session_manager.resolve_outputs_cache:
return self.session_manager.resolve_outputs_cache[sorted_urls] # return self.session_manager.resolve_outputs_cache[sorted_urls]
rows, extra = [], [] rows, extra = [], []
for url in urls: resolved = {}
if url not in self.session_manager.resolve_cache: needed = defaultdict(list)
self.session_manager.resolve_cache[url] = await self._cached_resolve_url(url) for idx, url in enumerate(urls):
if url in self.session_manager.resolve_cache:
stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url] stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url]
resolved[url] = stream, channel, repost, reposted_channel
else:
needed[url].append(idx)
if needed:
resolved_needed = await self.db.batch_resolve_urls(list(needed))
for url, resolve_result in resolved_needed.items():
self.session_manager.resolve_cache[url] = resolve_result
resolved.update(resolved_needed)
for url in urls:
(stream, channel, repost, reposted_channel) = resolved[url]
if isinstance(channel, ResolveCensoredError): if isinstance(channel, ResolveCensoredError):
rows.append(channel) rows.append(channel)
extra.append(channel.censor_row) extra.append(channel.censor_row)
@ -1303,28 +1309,24 @@ class LBRYElectrumX(asyncio.Protocol):
extra.append(reposted_channel.censor_row) extra.append(reposted_channel.censor_row)
elif channel and not stream: elif channel and not stream:
rows.append(channel) rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost: if repost:
extra.append(repost) extra.append(repost)
if reposted_channel: if reposted_channel:
extra.append(reposted_channel) extra.append(reposted_channel)
elif stream: elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream) rows.append(stream)
if channel: if channel:
# print("and channel", channel.name.decode())
extra.append(channel) extra.append(channel)
if repost: if repost:
extra.append(repost) extra.append(repost)
if reposted_channel: if reposted_channel:
extra.append(reposted_channel) extra.append(reposted_channel)
await asyncio.sleep(0) # await asyncio.sleep(0)
self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra # self.session_manager.resolve_outputs_cache[sorted_urls] = result = Outputs.to_base64(rows, extra)
) return Outputs.to_base64(rows, extra)
return result
finally: finally:
self.session_manager.resolved_url_count_metric.inc(len(sorted_urls)) self.session_manager.resolved_url_count_metric.inc(len(urls))
async def get_server_height(self): async def get_server_height(self):
return self.db.db_height return self.db.db_height