faster claim producer

-make batches of claim txos from the iterator, and sort by tx hash before fetching to maximize cache and read ahead hits
This commit is contained in:
Jack Robison 2021-06-18 13:54:02 -04:00 committed by Victor Shyba
parent a74434e269
commit fd7bfbea78

View file

@ -442,7 +442,7 @@ class LevelDB:
txos[claim_hash] = tx_num, nout txos[claim_hash] = tx_num, nout
return txos return txos
def get_claim_output_script(self, tx_hash, nout): def get_claim_metadata(self, tx_hash, nout):
raw = self.db.get( raw = self.db.get(
DB_PREFIXES.TX_PREFIX.value + tx_hash DB_PREFIXES.TX_PREFIX.value + tx_hash
) )
@ -463,9 +463,13 @@ class LevelDB:
if not claim: if not claim:
print("wat") print("wat")
return return
metadata = self.get_claim_output_script(claim.tx_hash, claim.position) return self._prepare_claim_metadata(claim_hash, claim)
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
if not metadata: if not metadata:
return return
reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1] reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1]
reposted_claim = None reposted_claim = None
reposted_metadata = None reposted_metadata = None
@ -473,7 +477,7 @@ class LevelDB:
reposted_claim = self.get_claim_txo(reposted_claim_hash) reposted_claim = self.get_claim_txo(reposted_claim_hash)
if not reposted_claim: if not reposted_claim:
return return
reposted_metadata = self.get_claim_output_script( reposted_metadata = self.get_claim_metadata(
self.total_transactions[reposted_claim.tx_num], reposted_claim.position self.total_transactions[reposted_claim.tx_num], reposted_claim.position
) )
if not reposted_metadata: if not reposted_metadata:
@ -592,17 +596,37 @@ class LevelDB:
value['release_time'] = metadata.stream.release_time value['release_time'] = metadata.stream.release_time
return value return value
def all_claims_producer(self): def all_claims_producer(self, batch_size=500_000):
batch = []
for claim_hash in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix, include_value=False): for claim_hash in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix, include_value=False):
claim = self._prepare_claim_for_sync(claim_hash[1:]) claim = self._fs_get_claim_by_hash(claim_hash[1:])
if claim: if claim:
yield claim batch.append(claim)
if len(batch) == batch_size:
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
def claims_producer(self, claim_hashes: Set[bytes]): def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
for claim_hash in claim_hashes: for claim_hash in claim_hashes:
result = self._prepare_claim_for_sync(claim_hash) claim = self._fs_get_claim_by_hash(claim_hash)
if result: if claim:
yield result batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list) activated = defaultdict(list)