fix bulk es sync

This commit is contained in:
Jack Robison 2021-08-02 12:08:55 -04:00 committed by Victor Shyba
parent 613acc7b00
commit c68334b421

View file

@ -702,39 +702,25 @@ class LevelDB:
# TODO: fix the couple of claim txos that dont have controlling names # TODO: fix the couple of claim txos that dont have controlling names
if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)): if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)):
continue continue
tasks.append( claim = self._fs_get_claim_by_hash(claim_hash[1:])
loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash[1:]) if claim:
) batch.append(claim)
if len(tasks) == batch_size: if len(batch) == batch_size:
for t in asyncio.as_completed(tasks):
claim = await t
if claim:
batch.append(claim)
tasks.clear()
batch.sort(key=lambda x: x.tx_hash) batch.sort(key=lambda x: x.tx_hash)
for claim_fut in asyncio.as_completed( for claim in batch:
[loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) meta = self._prepare_claim_metadata(claim.claim_hash, claim)
for claim in batch]):
meta = await claim_fut
if meta: if meta:
yield meta yield meta
batch.clear() batch.clear()
for t in asyncio.as_completed(tasks):
claim = await t
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash) batch.sort(key=lambda x: x.tx_hash)
for claim_fut in asyncio.as_completed( for claim in batch:
[loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) meta = self._prepare_claim_metadata(claim.claim_hash, claim)
for claim in batch]):
meta = await claim_fut
if meta: if meta:
yield meta yield meta
batch.clear()
async def claims_producer(self, claim_hashes: Set[bytes]): async def claims_producer(self, claim_hashes: Set[bytes]):
batch = [] batch = []
loop = asyncio.get_event_loop()
tasks = []
for claim_hash in claim_hashes: for claim_hash in claim_hashes:
if claim_hash not in self.claim_to_txo: if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
@ -743,20 +729,15 @@ class LevelDB:
if not self.db.get(Prefixes.claim_takeover.pack_key(name)): if not self.db.get(Prefixes.claim_takeover.pack_key(name)):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue continue
tasks.append( claim = self._fs_get_claim_by_hash(claim_hash)
loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash)
)
for t in asyncio.as_completed(tasks):
claim = await t
if claim: if claim:
batch.append(claim) batch.append(claim)
batch.sort(key=lambda x: x.tx_hash) batch.sort(key=lambda x: x.tx_hash)
for claim_fut in asyncio.as_completed( for claim in batch:
[loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) meta = self._prepare_claim_metadata(claim.claim_hash, claim)
for claim in batch]):
meta = await claim_fut
if meta: if meta:
yield meta yield meta
batch.clear()
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list) activated = defaultdict(list)