diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 28acba5ba..3f13bf0e3 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -263,13 +263,13 @@ class BlockProcessor: self.claim_channels: Dict[bytes, bytes] = {} self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list) - def claim_producer(self): + async def claim_producer(self): if self.db.db_height <= 1: return for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() - for claim in self.db.claims_producer(self.touched_claims_to_send_es): + async for claim in self.db.claims_producer(self.touched_claims_to_send_es): yield 'update', claim async def run_in_thread_with_lock(self, func, *args): @@ -288,6 +288,7 @@ class BlockProcessor: async def run_in_thread(): return await asyncio.get_event_loop().run_in_executor(None, func, *args) return await asyncio.shield(run_in_thread()) + async def check_and_advance_blocks(self, raw_blocks): """Process the list of raw blocks passed. Detects and handles reorgs. diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index f57586829..3ec121b47 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -104,7 +104,7 @@ class SearchIndex: async def _consume_claim_producer(self, claim_producer): count = 0 - for op, doc in claim_producer: + async for op, doc in claim_producer: if op == 'delete': yield {'_index': self.index, '_op_type': 'delete', '_id': doc} else: diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 15e076de9..c6894b57f 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -17,7 +17,7 @@ async def get_all_claims(index_name='claims', db=None): await db.open_dbs() try: cnt = 0 - for claim in db.all_claims_producer(): + async for claim in db.all_claims_producer(): yield extract_doc(claim, index_name) cnt += 1 if cnt % 10000 == 0: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index dd1ecbad0..de1f3b2e6 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -618,31 +618,47 @@ class LevelDB: value['release_time'] = metadata.stream.release_time return value - def all_claims_producer(self, batch_size=500_000): + async def all_claims_producer(self, batch_size=500_000): + loop = asyncio.get_event_loop() batch = [] + tasks = [] for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): # TODO: fix the couple of claim txos that dont have controlling names if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)): continue - claim = self._fs_get_claim_by_hash(claim_hash[1:]) - if claim: - batch.append(claim) - if len(batch) == batch_size: + tasks.append( + loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash[1:]) + ) + if len(tasks) == batch_size: + for t in asyncio.as_completed(tasks): + claim = await t + if claim: + batch.append(claim) + tasks.clear() batch.sort(key=lambda x: x.tx_hash) - for claim in batch: - meta = self._prepare_claim_metadata(claim.claim_hash, claim) + for claim_fut in asyncio.as_completed( + [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) + for claim in batch]): + meta = await claim_fut if meta: yield meta batch.clear() + for t in asyncio.as_completed(tasks): + claim = await t + if claim: + batch.append(claim) batch.sort(key=lambda x: x.tx_hash) - for claim in batch: - meta = self._prepare_claim_metadata(claim.claim_hash, claim) + for claim_fut in asyncio.as_completed( + [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) + for claim in batch]): + meta = await claim_fut if meta: yield meta - batch.clear() - def claims_producer(self, claim_hashes: Set[bytes]): + async def claims_producer(self, claim_hashes: Set[bytes]): batch = [] + loop = asyncio.get_event_loop() + tasks = [] for claim_hash in claim_hashes: if claim_hash not in self.claim_to_txo: self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) @@ -651,12 +667,18 @@ class LevelDB: if not self.db.get(Prefixes.claim_takeover.pack_key(name)): self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue - claim = self._fs_get_claim_by_hash(claim_hash) + tasks.append( + loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash) + ) + for t in asyncio.as_completed(tasks): + claim = await t if claim: batch.append(claim) batch.sort(key=lambda x: x.tx_hash) - for claim in batch: - meta = self._prepare_claim_metadata(claim.claim_hash, claim) + for claim_fut in asyncio.as_completed( + [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) + for claim in batch]): + meta = await claim_fut if meta: yield meta