batched claim txo fetching for the ES claim producers

This commit is contained in:
Jack Robison 2022-06-17 00:02:00 -04:00
parent ac01a17214
commit 79b84d89a3
3 changed files with 225 additions and 200 deletions

View file

@ -832,6 +832,27 @@ class SecondaryDB:
self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex())
return
async def get_claim_metadatas(self, txos: List[Tuple[bytes, int]]):
tx_hashes = {tx_hash for tx_hash, _ in txos}
txs = {
k: self.coin.transaction(v) async for ((k,), v) in self.prefix_db.tx.multi_get_async_gen(
self._executor, [(tx_hash,) for tx_hash in tx_hashes], deserialize_value=False
)
}
def get_metadata(txo):
if not txo:
return
try:
return txo.metadata
except:
return
return {
(tx_hash, nout): get_metadata(txs[tx_hash].outputs[nout])
for (tx_hash, nout) in txos
}
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list)
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):

View file

@ -27,29 +27,38 @@ class ElasticSyncDB(SecondaryDB):
self.block_timestamp_cache[height] = timestamp
return timestamp
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
async def prepare_claim_metadata_batch(self, claims: Dict[bytes, ResolveResult], extras):
metadatas = {}
needed_txos = set()
for claim_hash, claim in claims.items():
reposted_claim_hash = claim.reposted_claim_hash
needed_txos.add((claim.tx_hash, claim.position))
if reposted_claim_hash:
if not reposted_claim_hash not in extras:
continue
reposted_claim = extras.get((reposted_claim_hash))
if reposted_claim:
needed_txos.add((reposted_claim.tx_hash, reposted_claim.position))
metadatas.update(await self.get_claim_metadatas(list(needed_txos)))
for claim_hash, claim in claims.items():
metadata = metadatas.get((claim.tx_hash, claim.position))
if not metadata:
return
metadata = metadata
continue
if not metadata.is_stream or not metadata.stream.has_fee:
fee_amount = 0
else:
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
if fee_amount >= 9223372036854775807:
return
continue
reposted_claim_hash = claim.reposted_claim_hash
reposted_claim = None
reposted_metadata = None
if reposted_claim_hash:
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
)
if not reposted_metadata:
return
if reposted_claim_hash in extras:
reposted_claim = extras[reposted_claim_hash]
reposted_metadata = metadatas.get((reposted_claim.tx_hash, reposted_claim.position))
reposted_tags = []
reposted_languages = []
reposted_has_source = False
@ -59,16 +68,7 @@ class ElasticSyncDB(SecondaryDB):
reposted_fee_amount = None
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
try:
reposted_metadata = self.coin.transaction(
raw_reposted_claim_tx
).outputs[reposted_claim.position].metadata
except:
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
claim.reposted_claim_hash.hex(), claim_hash.hex())
return
if reposted_metadata:
if reposted_metadata.is_stream:
meta = reposted_metadata.stream
@ -79,7 +79,7 @@ class ElasticSyncDB(SecondaryDB):
elif reposted_metadata.is_repost:
meta = reposted_metadata.repost
else:
return
continue
reposted_tags = [tag for tag in meta.tags]
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
@ -92,7 +92,7 @@ class ElasticSyncDB(SecondaryDB):
else:
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
if reposted_fee_amount >= 9223372036854775807:
return
continue
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
reposted_duration = None
if reposted_metadata.is_stream and \
@ -107,10 +107,9 @@ class ElasticSyncDB(SecondaryDB):
elif metadata.is_repost:
meta = metadata.repost
else:
return
continue
claim_tags = [tag for tag in meta.tags]
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
tags = list(set(claim_tags).union(set(reposted_tags)))
languages = list(set(claim_languages).union(set(reposted_languages)))
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
@ -185,39 +184,40 @@ class ElasticSyncDB(SecondaryDB):
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
elif metadata.is_repost or metadata.is_collection:
value['release_time'] = value['creation_timestamp']
return value
yield value
async def all_claims_producer(self, batch_size=500_000):
async def all_claims_producer(self, batch_size: int):
batch = []
if self._cache_all_claim_txos:
claim_iterator = self.claim_to_txo.items()
else:
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
for claim_hash, claim_txo in claim_iterator:
# TODO: fix the couple of claim txos that dont have controlling names
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
for k in self.prefix_db.claim_to_txo.iterate(include_value=False):
batch.append(k.claim_hash)
if len(batch) == batch_size:
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
claims = {}
total_extras = {}
async for claim_hash, claim, extras in self._prepare_resolve_results(batch, include_extra=False,
apply_blocking=False,
apply_filtering=False):
if not claim:
self.logger.warning("wat")
continue
claims[claim_hash] = claim
total_extras[claim_hash] = claim
total_extras.update(extras)
async for claim in self.prepare_claim_metadata_batch(claims, total_extras):
if claim:
yield claim
batch.clear()
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
if batch:
claims = {}
total_extras = {}
async for claim_hash, claim, extras in self._prepare_resolve_results(batch, include_extra=False,
apply_blocking=False,
apply_filtering=False):
if not claim:
self.logger.warning("wat")
continue
claims[claim_hash] = claim
total_extras[claim_hash] = claim
total_extras.update(extras)
async for claim in self.prepare_claim_metadata_batch(claims, total_extras):
if claim:
yield claim

View file

@ -224,13 +224,18 @@ class ElasticSyncService(BlockchainReaderService):
for idx in range(0, len(touched_claims), 1000):
batch = touched_claims[idx:idx+1000]
async for claim_hash, claim, _ in self.db._prepare_resolve_results(batch, include_extra=False,
claims = {}
total_extras = {}
async for claim_hash, claim, extras in self.db._prepare_resolve_results(batch, include_extra=False,
apply_blocking=False,
apply_filtering=False):
if not claim:
self.log.warning("wat")
continue
claim = self.db._prepare_claim_metadata(claim.claim_hash, claim)
claims[claim_hash] = claim
total_extras[claim_hash] = claim
total_extras.update(extras)
async for claim in self.db.prepare_claim_metadata_batch(claims, total_extras):
if claim:
yield self._upsert_claim_query(self.index, claim)
@ -418,22 +423,21 @@ class ElasticSyncService(BlockchainReaderService):
self.log.info("finished reindexing")
async def _sync_all_claims(self, batch_size=100000):
def load_historic_trending():
notifications = self._trending
for k, v in self.db.prefix_db.trending_notification.iterate():
notifications[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount))
async def all_claims_producer():
current_height = self.db.db_height
async for claim in self.db.all_claims_producer(batch_size=batch_size):
yield self._upsert_claim_query(self.index, claim)
claim_hash = bytes.fromhex(claim['claim_id'])
if claim_hash in self._trending:
yield self._update_trending_query(self.index, claim_hash, self._trending.pop(claim_hash))
self._trending.clear()
self.log.info("loading about %i historic trending updates", self.db.prefix_db.trending_notification.estimate_num_keys())
await asyncio.get_event_loop().run_in_executor(self._executor, load_historic_trending)
self.log.info("loaded historic trending updates for %i claims", len(self._trending))
self.log.info("applying trending")
for batch_height in range(0, current_height, 10000):
notifications = defaultdict(list)
for k, v in self.db.prefix_db.trending_notification.iterate(start=(batch_height,), stop=(batch_height+10000,)):
notifications[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount))
for claim_hash, trending in notifications.items():
yield self._update_trending_query(self.index, claim_hash, trending)
self._trending.clear()
cnt = 0
success = 0