faster es sync

This commit is contained in:
Jack Robison 2021-07-27 16:11:27 -04:00 committed by Victor Shyba
parent 6699d1e2f8
commit d1560ef09b
4 changed files with 41 additions and 18 deletions

View file

@ -263,13 +263,13 @@ class BlockProcessor:
self.claim_channels: Dict[bytes, bytes] = {} self.claim_channels: Dict[bytes, bytes] = {}
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list) self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
def claim_producer(self): async def claim_producer(self):
if self.db.db_height <= 1: if self.db.db_height <= 1:
return return
for claim_hash in self.removed_claims_to_send_es: for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex() yield 'delete', claim_hash.hex()
for claim in self.db.claims_producer(self.touched_claims_to_send_es): async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
yield 'update', claim yield 'update', claim
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
@ -288,6 +288,7 @@ class BlockProcessor:
async def run_in_thread(): async def run_in_thread():
return await asyncio.get_event_loop().run_in_executor(None, func, *args) return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.shield(run_in_thread()) return await asyncio.shield(run_in_thread())
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
"""Process the list of raw blocks passed. Detects and handles """Process the list of raw blocks passed. Detects and handles
reorgs. reorgs.

View file

@ -104,7 +104,7 @@ class SearchIndex:
async def _consume_claim_producer(self, claim_producer): async def _consume_claim_producer(self, claim_producer):
count = 0 count = 0
for op, doc in claim_producer: async for op, doc in claim_producer:
if op == 'delete': if op == 'delete':
yield {'_index': self.index, '_op_type': 'delete', '_id': doc} yield {'_index': self.index, '_op_type': 'delete', '_id': doc}
else: else:

View file

@ -17,7 +17,7 @@ async def get_all_claims(index_name='claims', db=None):
await db.open_dbs() await db.open_dbs()
try: try:
cnt = 0 cnt = 0
for claim in db.all_claims_producer(): async for claim in db.all_claims_producer():
yield extract_doc(claim, index_name) yield extract_doc(claim, index_name)
cnt += 1 cnt += 1
if cnt % 10000 == 0: if cnt % 10000 == 0:

View file

@ -618,31 +618,47 @@ class LevelDB:
value['release_time'] = metadata.stream.release_time value['release_time'] = metadata.stream.release_time
return value return value
def all_claims_producer(self, batch_size=500_000): async def all_claims_producer(self, batch_size=500_000):
loop = asyncio.get_event_loop()
batch = [] batch = []
tasks = []
for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): for claim_hash, v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix):
# TODO: fix the couple of claim txos that dont have controlling names # TODO: fix the couple of claim txos that dont have controlling names
if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)): if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)):
continue continue
claim = self._fs_get_claim_by_hash(claim_hash[1:]) tasks.append(
if claim: loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash[1:])
batch.append(claim) )
if len(batch) == batch_size: if len(tasks) == batch_size:
for t in asyncio.as_completed(tasks):
claim = await t
if claim:
batch.append(claim)
tasks.clear()
batch.sort(key=lambda x: x.tx_hash) batch.sort(key=lambda x: x.tx_hash)
for claim in batch: for claim_fut in asyncio.as_completed(
meta = self._prepare_claim_metadata(claim.claim_hash, claim) [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim)
for claim in batch]):
meta = await claim_fut
if meta: if meta:
yield meta yield meta
batch.clear() batch.clear()
for t in asyncio.as_completed(tasks):
claim = await t
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash) batch.sort(key=lambda x: x.tx_hash)
for claim in batch: for claim_fut in asyncio.as_completed(
meta = self._prepare_claim_metadata(claim.claim_hash, claim) [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim)
for claim in batch]):
meta = await claim_fut
if meta: if meta:
yield meta yield meta
batch.clear()
def claims_producer(self, claim_hashes: Set[bytes]): async def claims_producer(self, claim_hashes: Set[bytes]):
batch = [] batch = []
loop = asyncio.get_event_loop()
tasks = []
for claim_hash in claim_hashes: for claim_hash in claim_hashes:
if claim_hash not in self.claim_to_txo: if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
@ -651,12 +667,18 @@ class LevelDB:
if not self.db.get(Prefixes.claim_takeover.pack_key(name)): if not self.db.get(Prefixes.claim_takeover.pack_key(name)):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue continue
claim = self._fs_get_claim_by_hash(claim_hash) tasks.append(
loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash)
)
for t in asyncio.as_completed(tasks):
claim = await t
if claim: if claim:
batch.append(claim) batch.append(claim)
batch.sort(key=lambda x: x.tx_hash) batch.sort(key=lambda x: x.tx_hash)
for claim in batch: for claim_fut in asyncio.as_completed(
meta = self._prepare_claim_metadata(claim.claim_hash, claim) [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim)
for claim in batch]):
meta = await claim_fut
if meta: if meta:
yield meta yield meta