Merge pull request #45 from lbryio/batched-status

Use multi_get for resolve and address statuses
This commit is contained in:
Jack Robison 2022-06-14 13:53:12 -04:00 committed by GitHub
commit 4187afd165
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 579 additions and 222 deletions

View file

@ -18,7 +18,7 @@ from hub.schema.url import URL, normalize_name
from hub.schema.claim import guess_stream_type
from hub.schema.result import Censor
from hub.scribe.transaction import TxInput
from hub.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256, STREAM_TYPES, CLAIM_TYPES
from hub.common import hash_to_hex_str, LRUCacheWithMetrics
from hub.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
from hub.db.common import ResolveResult, ExpandedResolveResult, DBError, UTXO
from hub.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
@ -220,123 +220,230 @@ class SecondaryDB:
channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position, channel_height=channel_height,
)
def _resolve_parsed_url(self, name: str, claim_id: Optional[str] = None,
amount_order: Optional[int] = None) -> Optional[ResolveResult]:
"""
:param normalized_name: name
:param claim_id: partial or complete claim id
:param amount_order: '$<value>' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided
"""
async def _batch_resolve_parsed_urls(self, args: List[Tuple[str, Optional[str], Optional[int]]]) -> List[Optional[bytes]]:
# list of name, claim id tuples (containing one or the other)
# this is to preserve the ordering
needed: List[Tuple[Optional[str], Optional[bytes]]] = []
needed_full_claim_hashes = {}
run_in_executor = asyncio.get_event_loop().run_in_executor
def get_txo_for_partial_claim_id(normalized: str, claim_id: str):
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized, claim_id[:10])):
return claim_txo.tx_num, claim_txo.position
def get_claim_by_amount(normalized: str, order: int):
order = max(int(order or 1), 1)
for _idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized,))):
if order > _idx + 1:
continue
return claim_val.claim_hash
for idx, (name, claim_id, amount_order) in enumerate(args):
try:
normalized_name = normalize_name(name)
except UnicodeDecodeError:
normalized_name = name
if (not amount_order and not claim_id) or amount_order == 1:
# winning resolution
controlling = self.get_controlling_claim(normalized_name)
if not controlling:
# print(f"none controlling for lbry://{normalized_name}")
return
# print(f"resolved controlling lbry://{normalized_name}#{controlling.claim_hash.hex()}")
return self._fs_get_claim_by_hash(controlling.claim_hash)
needed.append((normalized_name, None))
continue
amount_order = max(int(amount_order or 1), 1)
full_claim_hash = None
if claim_id:
if len(claim_id) == 40: # a full claim id
claim_txo = self.get_claim_txo(bytes.fromhex(claim_id))
if not claim_txo or normalized_name != claim_txo.normalized_name:
return
return self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, bytes.fromhex(claim_id), claim_txo.name,
claim_txo.root_tx_num, claim_txo.root_position,
self.get_activation(claim_txo.tx_num, claim_txo.position), claim_txo.channel_signature_is_valid
)
# resolve by partial/complete claim id
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])):
full_claim_hash = self.get_cached_claim_hash(claim_txo.tx_num, claim_txo.position)
c = self.get_cached_claim_txo(full_claim_hash)
non_normalized_name = c.name
signature_is_valid = c.channel_signature_is_valid
return self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, full_claim_hash, non_normalized_name, key.root_tx_num,
key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position),
signature_is_valid
)
return
# resolve by amount ordering, 1 indexed
for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))):
if amount_order > idx + 1:
needed.append((None, bytes.fromhex(claim_id)))
continue
claim_txo = self.get_cached_claim_txo(claim_val.claim_hash)
activation = self.get_activation(key.tx_num, key.position)
return self._prepare_resolve_result(
key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
# resolve by partial/complete claim id
txo = await run_in_executor(self._executor, get_txo_for_partial_claim_id, normalized_name, claim_id)
if txo:
needed_full_claim_hashes[idx] = txo
needed.append((None, None))
continue
# resolve by amount ordering, 1 indexed
needed.append((None, await run_in_executor(self._executor, get_claim_by_amount, normalized_name, amount_order)))
# fetch the full claim hashes needed from urls with partial claim ids
if needed_full_claim_hashes:
idxs = list(needed_full_claim_hashes.keys())
position = 0
async for _, v in self.prefix_db.txo_to_claim.multi_get_async_gen(self._executor, list(needed_full_claim_hashes.values())):
idx = idxs[position]
needed[idx] = None, v.claim_hash
position += 1
# fetch the winning claim hashes for the urls using winning resolution
needed_winning = list(set(normalized_name for normalized_name, _ in needed if normalized_name is not None))
winning_indexes = [idx for idx in range(len(needed)) if needed[idx][0] is not None]
controlling_claims = {
name: takeover_v async for (name,), takeover_v in self.prefix_db.claim_takeover.multi_get_async_gen(
self._executor, [(name,) for name in needed_winning]
)
return
}
for idx in winning_indexes:
name = needed[idx][0]
controlling = controlling_claims[name]
if controlling:
needed[idx] = name, controlling.claim_hash
return [
claim_hash for _, claim_hash in needed
]
def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str, stream_claim_id: Optional[str] = None):
for key, stream in self.prefix_db.channel_to_claim.iterate(prefix=(channel_hash, normalized_name)):
if stream_claim_id is not None and not stream.claim_hash.hex().startswith(stream_claim_id):
continue
return stream.claim_hash, key.tx_num, key.position, self.get_effective_amount(stream.claim_hash)
return stream.claim_hash
def _resolve(self, url) -> ExpandedResolveResult:
async def batch_resolve_urls(self, urls: List[str]) -> Dict[str, ExpandedResolveResult]:
"""
Resolve a list of urls to a dictionary of urls to claims,
including any extra claim(s) to expand the result, these could be a channel, a repost, or a repost channel
"""
run_in_executor = asyncio.get_event_loop().run_in_executor
# this is split into two stages, first we map the urls to primary claim hashes they resolve to
# then the claims are collected in a batch, which also collects the extra claims needed for the primary matches
# prepare to resolve all of the outer most levels of the urls - the first name in a url, a stream or a channel
needed: List[Tuple[str, str, int]] = [] # (name, partial claim id, amount order) of a url
needed_streams_in_channels = defaultdict(list)
parsed_urls = {}
url_parts_to_resolve = {}
check_if_has_channel = set()
resolved_urls = {}
urls_to_parts_mapping = {}
for url in urls:
need_args = None
try:
parsed = URL.parse(url)
parsed_urls[url] = parsed
except ValueError as e:
return ExpandedResolveResult(e, None, None, None)
stream = channel = resolved_channel = resolved_stream = None
parsed_urls[url] = e
continue
stream = channel = None
if parsed.has_stream_in_channel:
channel = parsed.channel
stream = parsed.stream
channel = parsed.channel
need_args = (channel.name, channel.claim_id, channel.amount_order)
needed_streams_in_channels[url].append(stream)
elif parsed.has_channel:
channel = parsed.channel
need_args = (channel.name, channel.claim_id, channel.amount_order)
elif parsed.has_stream:
stream = parsed.stream
if channel:
resolved_channel = self._resolve_parsed_url(channel.name, channel.claim_id, channel.amount_order)
if not resolved_channel:
return ExpandedResolveResult(None, LookupError(f'Could not find channel in "{url}".'), None, None)
if stream:
if resolved_channel:
stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized, stream.claim_id)
if stream_claim:
stream_claim_id, stream_tx_num, stream_tx_pos, effective_amount = stream_claim
resolved_stream = self._fs_get_claim_by_hash(stream_claim_id)
else:
resolved_stream = self._resolve_parsed_url(stream.name, stream.claim_id, stream.amount_order)
if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash:
resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash)
if not resolved_stream:
return ExpandedResolveResult(LookupError(f'Could not find claim at "{url}".'), None, None, None)
need_args = (stream.name, stream.claim_id, stream.amount_order)
check_if_has_channel.add(url)
if need_args:
needed.append(need_args)
url_parts_to_resolve[url] = need_args
urls_to_parts_mapping[need_args] = url
repost = None
reposted_channel = None
if resolved_stream or resolved_channel:
claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash
claim = resolved_stream if resolved_stream else resolved_channel
reposted_claim_hash = resolved_stream.reposted_claim_hash if resolved_stream else None
blocker_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
if blocker_hash:
reason_row = self._fs_get_claim_by_hash(blocker_hash)
return ExpandedResolveResult(
None, ResolveCensoredError(url, blocker_hash.hex(), censor_row=reason_row), None, None
# collect the claim hashes for the outer layer claims in the urls
outer_layer_claims = {
urls_to_parts_mapping[args]: claim_hash for args, claim_hash in zip(
needed, await self._batch_resolve_parsed_urls(needed)
)
if claim.reposted_claim_hash:
repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash)
if repost and repost.channel_hash and repost.signature_valid:
reposted_channel = self._fs_get_claim_by_hash(repost.channel_hash)
return ExpandedResolveResult(resolved_stream, resolved_channel, repost, reposted_channel)
}
# needed_claims is a set of the total claim hashes to look up
needed_claims = set(claim_hash for claim_hash in outer_layer_claims.values() if claim_hash is not None)
for url, claim_hash in outer_layer_claims.items():
# if it's a stream not in a channel or is a bare channel then this claim is all that's needed for the url
if url not in needed_streams_in_channels:
if claim_hash:
resolved_urls[url] = claim_hash
needed_claims.add(claim_hash)
# check if any claims we've accumulated are in channels, add the channels to the set of needed claims
if needed_claims:
claims_to_check_if_in_channel = list(needed_claims)
txos = {
claim_hash: txo
async for (claim_hash, ), txo in self.prefix_db.claim_to_txo.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims_to_check_if_in_channel]
)
}
needed_claims.update({
claim.signing_hash
async for _, claim in self.prefix_db.claim_to_channel.multi_get_async_gen(
self._executor, [
(claim_hash, txos[claim_hash].tx_num, txos[claim_hash].position)
for claim_hash in needed_claims if txos[claim_hash] is not None
]
)
if claim is not None and claim.signing_hash is not None
})
# add the stream claim hashes for urls with channel streams to the needed set
for url, streams in needed_streams_in_channels.items():
resolved_channel_hash = outer_layer_claims.get(url)
if not resolved_channel_hash:
continue
for stream in streams:
stream_claim_hash = await run_in_executor(
self._executor, self._resolve_claim_in_channel, resolved_channel_hash, stream.normalized,
stream.claim_id
)
if stream_claim_hash: # set the result claim hash to the stream
resolved_urls[url] = stream_claim_hash
needed_claims.add(stream_claim_hash)
# collect all of the claim ResolveResults for the urls
claims = {}
if needed_claims:
async for claim_hash, claim, extra in self._prepare_resolve_results(
list(needed_claims), apply_filtering=False):
claims[claim_hash] = claim # the primary result
if extra: # extra results (channels, reposts, repost channels)
claims.update(extra)
results = {}
for url in urls:
claim_hash = resolved_urls.get(url)
parsed = parsed_urls[url]
if not claim_hash or not claims[claim_hash]:
if not isinstance(parsed, Exception) and parsed.has_channel and not parsed.has_stream:
results[url] = ExpandedResolveResult(
None, LookupError(f'Could not find channel in "{url}".'), None, None
)
elif not isinstance(parsed, Exception) and (parsed.has_stream_in_channel or parsed.has_stream):
results[url] = ExpandedResolveResult(
LookupError(f'Could not find claim at "{url}".'), None, None, None
)
elif isinstance(parsed, ValueError):
results[url] = ExpandedResolveResult(
parsed, None, None, None
)
continue
claim = claims[claim_hash]
stream = channel = None
# FIXME: signatures
if parsed.has_stream_in_channel or (not isinstance(claim, Exception) and claim.channel_hash):
stream = claim
if not isinstance(claim, Exception):
channel = claims[claim.channel_hash]
elif url.lstrip('lbry://').startswith('@'):
channel = claim
else:
stream = claim
repost = reposted_channel = None
if claim and not isinstance(claim, Exception) and claim.reposted_claim_hash:
if claim.reposted_claim_hash in claims:
repost = claims[stream.reposted_claim_hash]
if repost and not isinstance(repost, Exception) and repost.channel_hash and repost.channel_hash in claims:
reposted_channel = claims[repost.channel_hash]
results[url] = ExpandedResolveResult(stream, channel, repost, reposted_channel)
return results
async def resolve(self, url) -> ExpandedResolveResult:
return await asyncio.get_event_loop().run_in_executor(self._executor, self._resolve, url)
return (await self.batch_resolve_urls([url]))[url]
def _fs_get_claim_by_hash(self, claim_hash):
claim = self.get_cached_claim_txo(claim_hash)
@ -412,6 +519,256 @@ class SecondaryDB:
return 0
return channel_count_val.count
async def _prepare_resolve_results(self, claim_hashes: List[bytes], include_extra: bool = True,
apply_blocking: bool = True, apply_filtering: bool = True):
# determine which claims are reposts and what they are reposts of
reposts = {
claim_hash: repost.reposted_claim_hash
async for (claim_hash, ), repost in self.prefix_db.repost.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claim_hashes]
) if repost
}
# deduplicate the requested claim hashes plus any found reposts
claims_and_reposts = list(set(claim_hashes).union(set(reposts.values())))
# get the claim txos for the claim hashes (including reposts)
claims = {
claim_hash: claim
async for (claim_hash, ), claim in self.prefix_db.claim_to_txo.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims_and_reposts]
)
}
# get the channel hashes for all of the so far collected claim txos
channels = {
claim_hash: signed.signing_hash
async for (claim_hash, tx_num, position), signed in self.prefix_db.claim_to_channel.multi_get_async_gen(
self._executor, [
(claim_hash, claims[claim_hash].tx_num, claims[claim_hash].position)
for claim_hash in claims_and_reposts if claims[claim_hash] is not None
]
) if signed
}
# also look up any channel txos that we don't yet have (we could resolve @foo and @foo/test in one batch)
needed_channels = list({channel_hash for channel_hash in channels.values() if channel_hash not in claims})
if needed_channels:
claims.update({
claim_hash: claim
async for (claim_hash,), claim in self.prefix_db.claim_to_txo.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in needed_channels]
)
})
# collect all of the controlling claims for the set of names we've accumulated
unique_names = list(
sorted({claim_txo.normalized_name for claim_txo in claims.values() if claim_txo is not None})
)
controlling_claims = {
name: takeover_v
async for (name, ), takeover_v in self.prefix_db.claim_takeover.multi_get_async_gen(
self._executor, [(name,) for name in unique_names]
)
}
# collect all of the tx hashes for the accumulated claim txos
claim_tx_hashes = {
tx_num: tx_hash
async for (tx_num, ), tx_hash in self.prefix_db.tx_hash.multi_get_async_gen(
self._executor, [(claim.tx_num,) for claim in claims.values() if claim is not None], False
)
}
# collect the short urls
# TODO: consider a dedicated index for this query to make it multi_get-able
run_in_executor = asyncio.get_event_loop().run_in_executor
short_urls = {
claim_hash: await run_in_executor(
self._executor, self.get_short_claim_id_url,
claim.name, claim.normalized_name, claim_hash, claim.root_tx_num, claim.root_position
) for claim_hash, claim in claims.items()
if claim is not None
}
# collect all of the activation heights for the accumulated claims
activations_needed = {
(1, claim_txo.tx_num, claim_txo.position): claim_hash
for claim_hash, claim_txo in claims.items()
if claim_txo is not None
}
activations = {
activations_needed[k]: -1 if not activation else activation.height
async for k, activation in self.prefix_db.activated.multi_get_async_gen(
self._executor, list(activations_needed.keys())
)
}
# collect all of the support amounts for the accumulated claim txos
supports = {
claim_hash: 0 if support is None else support.amount
async for (claim_hash, ), support in self.prefix_db.support_amount.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims]
)
}
# collect all of the counts of claims in channels for the accumulated claim txos
claims_in_channels = {
claim_hash: 0 if not v else v.count
async for (claim_hash, ), v in self.prefix_db.channel_count.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims]
)
}
repost_counts = {
claim_hash: await run_in_executor(
self._executor, self.get_reposted_count, claim_hash
)
for claim_hash, claim in claims.items() if claim is not None
}
effective_amounts = {
claim_hash: await run_in_executor(
self._executor, self.get_effective_amount, claim_hash
)
for claim_hash, claim in claims.items() if claim is not None
}
def _prepare_result(touched, claim_txo):
try:
normalized_name = normalize_name(claim_txo.name)
except UnicodeDecodeError:
normalized_name = claim_txo.name
effective_amount = effective_amounts[touched]
reposted_count = repost_counts[touched]
tx_hash = claim_tx_hashes[claim_txo.tx_num]
claims_in_channel = claims_in_channels[touched]
activation = activations[touched]
support_amount = supports[touched]
reposted_claim_hash = reposts.get(touched)
channel_hash = channels.get(touched)
controlling_claim = controlling_claims[normalized_name]
claim_amount = claim_txo.amount
height = bisect_right(self.tx_counts, claim_txo.tx_num)
created_height = bisect_right(self.tx_counts, claim_txo.root_tx_num)
expiration_height = self.coin.get_expiration_height(height)
last_take_over_height = controlling_claim.height
reposted_tx_hash = None
reposted_tx_position = None
reposted_height = None
reposted_channel_hash = None
if reposted_claim_hash:
repost_txo = claims[reposted_claim_hash]
if repost_txo and repost_txo.tx_num in claim_tx_hashes:
reposted_tx_hash = claim_tx_hashes[repost_txo.tx_num]
reposted_tx_position = repost_txo.position
reposted_height = bisect_right(self.tx_counts, repost_txo.tx_num)
if reposted_claim_hash in channels:
reposted_channel_hash = channels[reposted_claim_hash]
short_url = short_urls[touched]
channel_tx_hash = None
channel_tx_position = None
channel_height = None
canonical_url = short_url
if channel_hash:
channel_txo = claims[channel_hash]
if channel_txo and channel_txo.tx_num in claim_tx_hashes:
channel_short_url = short_urls[channel_hash]
canonical_url = f'{channel_short_url}/{short_url}'
channel_tx_hash = claim_tx_hashes[channel_txo.tx_num]
channel_tx_position = channel_txo.position
channel_height = bisect_right(self.tx_counts, channel_txo.tx_num)
if apply_blocking:
blocker_hash = self.blocked_streams.get(touched) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(touched) or self.blocked_channels.get(
reposted_channel_hash) or self.blocked_channels.get(channel_hash)
if blocker_hash:
reason_row = self._fs_get_claim_by_hash(blocker_hash)
return ResolveCensoredError(f'lbry://{canonical_url}', blocker_hash.hex(), censor_row=reason_row)
if apply_filtering:
filter_hash = self.filtered_streams.get(touched) or self.filtered_streams.get(
reposted_claim_hash) or self.filtered_channels.get(touched) or self.filtered_channels.get(
reposted_channel_hash) or self.filtered_channels.get(channel_hash)
if filter_hash:
reason_row = self._fs_get_claim_by_hash(filter_hash)
return ResolveCensoredError(f'lbry://{canonical_url}', filter_hash.hex(), censor_row=reason_row)
return ResolveResult(
claim_txo.name, normalized_name, touched, claim_txo.tx_num, claim_txo.position, tx_hash, height,
claim_amount, short_url=short_url,
is_controlling=controlling_claim.claim_hash == touched, canonical_url=canonical_url,
last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel,
creation_height=created_height, activation_height=activation,
expiration_height=expiration_height, effective_amount=effective_amount,
support_amount=support_amount,
channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash,
reposted=reposted_count,
signature_valid=None if not channel_hash else claim_txo.channel_signature_is_valid,
reposted_tx_hash=reposted_tx_hash,
reposted_tx_position=reposted_tx_position, reposted_height=reposted_height,
channel_tx_hash=channel_tx_hash, channel_tx_position=channel_tx_position,
channel_height=channel_height,
)
total_extra = {}
for touched in claim_hashes:
extra = {}
claim_txo = claims[touched]
if not claim_txo:
yield touched, None, extra
continue
if touched in total_extra:
claim = total_extra[touched]
else:
claim = total_extra[touched] = _prepare_result(touched, claim_txo)
if isinstance(claim, Exception):
yield touched, claim, extra
continue
if include_extra:
if claim.channel_hash:
channel_txo = claims[claim.channel_hash]
if channel_txo and not isinstance(channel_txo, Exception):
if claim.channel_hash in total_extra:
extra[claim.channel_hash] = total_extra[claim.channel_hash]
else:
extra[claim.channel_hash] = total_extra[claim.channel_hash] = _prepare_result(
claim.channel_hash, channel_txo
)
if claim.reposted_claim_hash:
repost_txo = claims[claim.reposted_claim_hash]
if repost_txo and not isinstance(repost_txo, Exception):
if claim.reposted_claim_hash in total_extra:
extra[claim.reposted_claim_hash] = total_extra[claim.reposted_claim_hash]
else:
extra[claim.reposted_claim_hash] = total_extra[claim.reposted_claim_hash] = _prepare_result(
claim.reposted_claim_hash, repost_txo
)
if not isinstance(claim, Exception) and claim.reposted_claim_hash in channels:
reposted_channel_hash = channels[claim.reposted_claim_hash]
repost_channel_txo = claims[reposted_channel_hash]
if repost_channel_txo and not isinstance(repost_channel_txo, Exception):
if reposted_channel_hash in total_extra:
extra[reposted_channel_hash] = total_extra[reposted_channel_hash]
else:
extra[reposted_channel_hash] = total_extra[reposted_channel_hash] = _prepare_result(
reposted_channel_hash, repost_channel_txo
)
elif isinstance(repost_channel_txo, Exception):
extra[reposted_channel_hash] = repost_channel_txo
else:
pass # FIXME: lookup error
elif isinstance(repost_txo, Exception):
extra[claim.reposted_claim_hash] = repost_txo
else:
pass # FIXME: lookup error
yield touched, claim, extra
def get_streams_and_channels_reposted_by_channel_hashes(self, reposter_channel_hashes: Set[bytes]):
streams, channels = {}, {}
for reposter_channel_hash in reposter_channel_hashes:
@ -633,9 +990,28 @@ class SecondaryDB:
if status:
return status.hex()
def _get_hashX_statuses(self, hashXes: List[bytes]):
statuses = {
hashX: status
for hashX, status in zip(hashXes, self.prefix_db.hashX_mempool_status.multi_get(
[(hashX,) for hashX in hashXes], deserialize_value=False
)) if status is not None
}
if len(statuses) < len(hashXes):
statuses.update({
hashX: status
for hashX, status in zip(hashXes, self.prefix_db.hashX_status.multi_get(
[(hashX,) for hashX in hashXes if hashX not in statuses], deserialize_value=False
)) if status is not None
})
return [None if hashX not in statuses else statuses[hashX].hex() for hashX in hashXes]
async def get_hashX_status(self, hashX: bytes):
return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_status, hashX)
async def get_hashX_statuses(self, hashXes: List[bytes]):
return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_statuses, hashXes)
def get_tx_hash(self, tx_num: int) -> bytes:
if self._cache_all_tx_hashes:
return self.total_transactions[tx_num]
@ -957,23 +1333,3 @@ class SecondaryDB:
self.logger.warning(f'all_utxos: tx hash not '
f'found (reorg?), retrying...')
await asyncio.sleep(0.25)
async def lookup_utxos(self, prevouts):
def lookup_utxos():
utxos = []
utxo_append = utxos.append
for (tx_hash, nout) in prevouts:
tx_num_val = self.prefix_db.tx_num.get(tx_hash)
if not tx_num_val:
print("no tx num for ", tx_hash[::-1].hex())
continue
tx_num = tx_num_val.tx_num
hashX_val = self.prefix_db.hashX_utxo.get(tx_hash[:4], tx_num, nout)
if not hashX_val:
continue
hashX = hashX_val.hashX
utxo_value = self.prefix_db.utxo.get(hashX, tx_num, nout)
if utxo_value:
utxo_append((hashX, utxo_value.amount))
return utxos
return await asyncio.get_event_loop().run_in_executor(self._executor, lookup_utxos)

View file

@ -1,3 +1,4 @@
import asyncio
import struct
import typing
import rocksdb
@ -101,6 +102,22 @@ class PrefixRow(metaclass=PrefixRowType):
handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args
]
async def multi_get_async_gen(self, executor, key_args: typing.List[typing.Tuple], deserialize_value=True, step=1000):
packed_keys = {self.pack_key(*args): args for args in key_args}
assert len(packed_keys) == len(key_args), 'duplicate partial keys given to multi_get_dict'
db_result = await asyncio.get_event_loop().run_in_executor(
executor, self._db.multi_get, [(self._column_family, key) for key in packed_keys]
)
unpack_value = self.unpack_value
def handle_value(v):
return None if v is None else v if not deserialize_value else unpack_value(v)
for idx, (k, v) in enumerate((db_result or {}).items()):
yield (packed_keys[k[-1]], handle_value(v))
if idx % step == 0:
await asyncio.sleep(0)
def stage_multi_put(self, items):
self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])

View file

@ -218,49 +218,5 @@ class ElasticSyncDB(SecondaryDB):
yield meta
batch.clear()
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if not claim:
self.logger.warning("wat")
return
return self._prepare_claim_metadata(claim.claim_hash, claim)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
for claim_hash in claim_hashes:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results

View file

@ -9,7 +9,7 @@ from hub.schema.result import Censor
from hub.service import BlockchainReaderService
from hub.common import IndexVersionMismatch, ALL_FIELDS, INDEX_DEFAULT_SETTINGS, expand_query
from hub.db.revertable import RevertableOp
from hub.db.common import TrendingNotification, DB_PREFIXES
from hub.db.common import TrendingNotification, DB_PREFIXES, ResolveResult
from hub.notifier_protocol import ElasticNotifierProtocol
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
from hub.elastic_sync.db import ElasticSyncDB
@ -219,10 +219,21 @@ class ElasticSyncService(BlockchainReaderService):
async def _claim_producer(self):
for deleted in self._deleted_claims:
yield self._delete_claim_query(self.index, deleted)
for touched in self._touched_claims:
claim = self.db.claim_producer(touched)
touched_claims = list(self._touched_claims)
for idx in range(0, len(touched_claims), 1000):
batch = touched_claims[idx:idx+1000]
async for claim_hash, claim, _ in self.db._prepare_resolve_results(batch, include_extra=False,
apply_blocking=False,
apply_filtering=False):
if not claim:
self.log.warning("wat")
continue
claim = self.db._prepare_claim_metadata(claim.claim_hash, claim)
if claim:
yield self._upsert_claim_query(self.index, claim)
for claim_hash, notifications in self._trending.items():
yield self._update_trending_query(self.index, claim_hash, notifications)

View file

@ -11,7 +11,7 @@ class ServerEnv(Env):
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
index_address_status=None):
index_address_status=None, address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
@ -50,6 +50,8 @@ class ServerEnv(Env):
self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0')
self.database_query_timeout = (database_query_timeout / 1000.0) if database_query_timeout is not None else \
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
self.hashX_history_cache_size = address_history_cache_size if address_history_cache_size is not None \
else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000)
@classmethod
def contribute_to_arg_parser(cls, parser):
@ -96,6 +98,11 @@ class ServerEnv(Env):
parser.add_argument('--daily_fee', default=cls.default('DAILY_FEE', '0'), type=str)
parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000),
help="Elasticsearch query timeout, in ms. Can be set in env with 'QUERY_TIMEOUT_MS'")
parser.add_argument('--address_history_cache_size', type=int,
default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000),
help="Size of the lru cache of address histories. "
"Can be set in the env with 'ADDRESS_HISTORY_CACHE_SIZE'")
@classmethod
def from_arg_parser(cls, args):
return cls(
@ -110,5 +117,6 @@ class ServerEnv(Env):
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
address_history_cache_size=args.address_history_cache_size
)

View file

@ -242,7 +242,7 @@ class HubMemPool:
(self.session_manager.hsub_results[session.subscribe_headers_raw],))
)
if hashXes:
asyncio.create_task(session.send_history_notifications(*hashXes))
asyncio.create_task(session.send_history_notifications(hashXes))
async def _notify_sessions(self, height, touched, new_touched):
"""Notify sessions about height changes and touched addresses."""

View file

@ -22,7 +22,7 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION
from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from hub.herald.search import SearchIndex
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, asyncify_for_loop
from hub.common import LRUCacheWithMetrics
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
@ -1181,33 +1181,39 @@ class LBRYElectrumX(asyncio.Protocol):
status = sha256(history.encode())
return status.hex()
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
async def get_hashX_statuses(self, hashXes: typing.List[bytes]):
if self.env.index_address_status:
return await self.db.get_hashX_statuses(hashXes)
return [await self.get_hashX_status(hashX) for hashX in hashXes]
async def send_history_notifications(self, hashXes: typing.List[bytes]):
notifications = []
for hashX in hashXes:
start = time.perf_counter()
statuses = await self.get_hashX_statuses(hashXes)
duration = time.perf_counter() - start
self.session_manager.address_history_metric.observe(duration)
start = time.perf_counter()
scripthash_notifications = 0
address_notifications = 0
for hashX, status in zip(hashXes, statuses):
alias = self.hashX_subs[hashX]
if len(alias) == 64:
method = 'blockchain.scripthash.subscribe'
scripthash_notifications += 1
else:
method = 'blockchain.address.subscribe'
start = time.perf_counter()
status = await self.get_hashX_status(hashX)
duration = time.perf_counter() - start
self.session_manager.address_history_metric.observe(duration)
notifications.append((method, (alias, status)))
if duration > 30:
self.logger.warning("slow history notification (%s) for '%s'", duration, alias)
start = time.perf_counter()
self.session_manager.notifications_in_flight_metric.inc()
for method, args in notifications:
self.NOTIFICATION_COUNT.labels(method=method,).inc()
address_notifications += 1
notifications.append(Notification(method, (alias, status)))
if scripthash_notifications:
self.NOTIFICATION_COUNT.labels(method='blockchain.scripthash.subscribe',).inc(scripthash_notifications)
if address_notifications:
self.NOTIFICATION_COUNT.labels(method='blockchain.address.subscribe', ).inc(address_notifications)
self.session_manager.notifications_in_flight_metric.inc(len(notifications))
try:
await self.send_notifications(
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
)
await self.send_notifications(Batch(notifications))
self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start)
finally:
self.session_manager.notifications_in_flight_metric.dec()
self.session_manager.notifications_in_flight_metric.dec(len(notifications))
# def get_metrics_or_placeholder_for_api(self, query_name):
# """ Do not hold on to a reference to the metrics
@ -1267,22 +1273,28 @@ class LBRYElectrumX(asyncio.Protocol):
self.session_manager.pending_query_metric.dec()
self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
async def _cached_resolve_url(self, url):
if url not in self.session_manager.resolve_cache:
self.session_manager.resolve_cache[url] = await self.loop.run_in_executor(self.db._executor, self.db._resolve, url)
return self.session_manager.resolve_cache[url]
async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls))
self.session_manager.urls_to_resolve_count_metric.inc(len(sorted_urls))
# sorted_urls = tuple(sorted(urls))
self.session_manager.urls_to_resolve_count_metric.inc(len(urls))
try:
if sorted_urls in self.session_manager.resolve_outputs_cache:
return self.session_manager.resolve_outputs_cache[sorted_urls]
# if sorted_urls in self.session_manager.resolve_outputs_cache:
# return self.session_manager.resolve_outputs_cache[sorted_urls]
rows, extra = [], []
for url in urls:
if url not in self.session_manager.resolve_cache:
self.session_manager.resolve_cache[url] = await self._cached_resolve_url(url)
resolved = {}
needed = defaultdict(list)
for idx, url in enumerate(urls):
if url in self.session_manager.resolve_cache:
stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url]
resolved[url] = stream, channel, repost, reposted_channel
else:
needed[url].append(idx)
if needed:
resolved_needed = await self.db.batch_resolve_urls(list(needed))
for url, resolve_result in resolved_needed.items():
self.session_manager.resolve_cache[url] = resolve_result
resolved.update(resolved_needed)
for url in urls:
(stream, channel, repost, reposted_channel) = resolved[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
@ -1297,28 +1309,24 @@ class LBRYElectrumX(asyncio.Protocol):
extra.append(reposted_channel.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
await asyncio.sleep(0)
self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra
)
return result
# await asyncio.sleep(0)
# self.session_manager.resolve_outputs_cache[sorted_urls] = result = Outputs.to_base64(rows, extra)
return Outputs.to_base64(rows, extra)
finally:
self.session_manager.resolved_url_count_metric.inc(len(sorted_urls))
self.session_manager.resolved_url_count_metric.inc(len(urls))
async def get_server_height(self):
return self.db.db_height
@ -1470,12 +1478,13 @@ class LBRYElectrumX(asyncio.Protocol):
address: the address to subscribe to"""
if len(addresses) > 1000:
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
results = []
hashXes = [item async for item in asyncify_for_loop((self.address_to_hashX(address) for address in addresses), 100)]
statuses = await self.get_hashX_statuses(hashXes)
for hashX, alias in zip(hashXes, addresses):
self.hashX_subs[hashX] = alias
self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self))
self.session_manager.address_subscription_metric.inc(len(addresses))
for address in addresses:
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
await asyncio.sleep(0)
return results
return statuses
async def address_unsubscribe(self, address):
"""Unsubscribe an address.

View file

@ -28,7 +28,7 @@ class BlockchainEnv(Env):
help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.')
parser.add_argument('--address_history_cache_size', type=int,
default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000),
default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 2 ** 13),
help="LRU cache size for address histories, used when processing new blocks "
"and when processing mempool updates. Can be set in env with "
"'ADDRESS_HISTORY_CACHE_SIZE'")

View file

@ -118,8 +118,8 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {}
self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
self.hashX_history_cache = LRUCache(max(100, env.hashX_history_cache_size))
self.hashX_full_cache = LRUCache(max(100, env.hashX_history_cache_size))
self.history_tx_info_cache = LRUCache(2 ** 16)
def open_db(self):