forked from LBRYCommunity/lbry-sdk
make indexing cooperative
This commit is contained in:
parent
b1bb37511c
commit
6b193ab350
3 changed files with 25 additions and 18 deletions
|
@ -215,7 +215,7 @@ class BlockProcessor:
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||||
if self.sql:
|
if self.sql:
|
||||||
await self.db.search_index.sync_queue(self.sql.claim_queue)
|
await self.db.search_index.claim_consumer(self.sql.claim_producer())
|
||||||
for cache in self.search_cache.values():
|
for cache in self.search_cache.values():
|
||||||
cache.clear()
|
cache.clear()
|
||||||
self.history_cache.clear()
|
self.history_cache.clear()
|
||||||
|
|
|
@ -83,25 +83,26 @@ class SearchIndex:
|
||||||
def delete_index(self):
|
def delete_index(self):
|
||||||
return self.client.indices.delete(self.index, ignore_unavailable=True)
|
return self.client.indices.delete(self.index, ignore_unavailable=True)
|
||||||
|
|
||||||
async def _queue_consumer_doc_producer(self, queue: asyncio.Queue):
|
async def _consume_claim_producer(self, claim_producer):
|
||||||
while not queue.empty():
|
count = 0
|
||||||
op, doc = queue.get_nowait()
|
for op, doc in claim_producer:
|
||||||
if op == 'delete':
|
if op == 'delete':
|
||||||
yield {'_index': self.index, '_op_type': 'delete', '_id': doc}
|
yield {'_index': self.index, '_op_type': 'delete', '_id': doc}
|
||||||
else:
|
else:
|
||||||
yield extract_doc(doc, self.index)
|
yield extract_doc(doc, self.index)
|
||||||
|
count += 1
|
||||||
|
if count % 100:
|
||||||
|
self.logger.info("Indexing in progress, %d claims.", count)
|
||||||
|
self.logger.info("Indexing done for %d claims.", count)
|
||||||
|
|
||||||
async def sync_queue(self, claim_queue):
|
async def claim_consumer(self, claim_producer):
|
||||||
self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize())
|
|
||||||
await self.client.indices.refresh(self.index)
|
await self.client.indices.refresh(self.index)
|
||||||
async for ok, item in async_streaming_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)):
|
async for ok, item in async_streaming_bulk(self.client, self._consume_claim_producer(claim_producer)):
|
||||||
if not ok:
|
if not ok:
|
||||||
self.logger.warning("indexing failed for an item: %s", item)
|
self.logger.warning("indexing failed for an item: %s", item)
|
||||||
await self.client.indices.refresh(self.index)
|
await self.client.indices.refresh(self.index)
|
||||||
await self.client.indices.flush(self.index)
|
await self.client.indices.flush(self.index)
|
||||||
self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize())
|
self.logger.info("Indexing done.")
|
||||||
self.search_cache.clear()
|
|
||||||
self.channel_cache.clear()
|
|
||||||
|
|
||||||
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
|
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
|
||||||
def make_query(censor_type, blockdict, channels=False):
|
def make_query(censor_type, blockdict, channels=False):
|
||||||
|
@ -134,6 +135,8 @@ class SearchIndex:
|
||||||
await self.client.indices.refresh(self.index)
|
await self.client.indices.refresh(self.index)
|
||||||
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32)
|
await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32)
|
||||||
await self.client.indices.refresh(self.index)
|
await self.client.indices.refresh(self.index)
|
||||||
|
self.search_cache.clear()
|
||||||
|
self.channel_cache.clear()
|
||||||
|
|
||||||
async def delete_above_height(self, height):
|
async def delete_above_height(self, height):
|
||||||
await self.client.delete_by_query(self.index, expand_query(height='>'+str(height)))
|
await self.client.delete_by_query(self.index, expand_query(height='>'+str(height)))
|
||||||
|
|
|
@ -233,7 +233,7 @@ class SQLDB:
|
||||||
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
|
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
|
||||||
}
|
}
|
||||||
self.trending = trending
|
self.trending = trending
|
||||||
self.claim_queue = Queue()
|
self.pending_deletes = set()
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
self.db = apsw.Connection(
|
self.db = apsw.Connection(
|
||||||
|
@ -852,18 +852,24 @@ class SQLDB:
|
||||||
|
|
||||||
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
|
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
|
||||||
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
|
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
|
||||||
self.claim_queue.put_nowait(('update', claim))
|
yield 'update', claim
|
||||||
|
|
||||||
|
def clear_changelog(self):
|
||||||
self.execute("delete from changelog;")
|
self.execute("delete from changelog;")
|
||||||
|
|
||||||
def enqueue_deleted(self, deleted_claims):
|
def claim_producer(self):
|
||||||
for claim_hash in deleted_claims:
|
while self.pending_deletes:
|
||||||
self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode()))
|
claim_hash = self.pending_deletes.pop()
|
||||||
|
yield 'delete', hexlify(claim_hash[::-1]).decode()
|
||||||
|
for claim in self.enqueue_changes():
|
||||||
|
yield claim
|
||||||
|
self.clear_changelog()
|
||||||
|
|
||||||
def advance_txs(self, height, all_txs, header, daemon_height, timer):
|
def advance_txs(self, height, all_txs, header, daemon_height, timer):
|
||||||
insert_claims = []
|
insert_claims = []
|
||||||
update_claims = []
|
update_claims = []
|
||||||
update_claim_hashes = set()
|
update_claim_hashes = set()
|
||||||
delete_claim_hashes = set()
|
delete_claim_hashes = self.pending_deletes
|
||||||
insert_supports = []
|
insert_supports = []
|
||||||
delete_support_txo_hashes = set()
|
delete_support_txo_hashes = set()
|
||||||
recalculate_claim_hashes = set() # added/deleted supports, added/updated claim
|
recalculate_claim_hashes = set() # added/deleted supports, added/updated claim
|
||||||
|
@ -943,8 +949,6 @@ class SQLDB:
|
||||||
r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True)
|
r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True)
|
||||||
for algorithm in self.trending:
|
for algorithm in self.trending:
|
||||||
r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes)
|
r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes)
|
||||||
r(self.enqueue_deleted, delete_claim_hashes)
|
|
||||||
r(self.enqueue_changes)
|
|
||||||
|
|
||||||
|
|
||||||
class LBRYLevelDB(LevelDB):
|
class LBRYLevelDB(LevelDB):
|
||||||
|
|
Loading…
Reference in a new issue