From 920dad524a7d297b15d4743ac436edb4a48dcecd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 22 Feb 2021 16:26:18 -0300 Subject: [PATCH] simplify sync and use asyncio Queue instead --- lbry/wallet/server/db/elastic_search.py | 48 ++++--------------------- lbry/wallet/server/db/writer.py | 17 +++++---- 2 files changed, 17 insertions(+), 48 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 5a4b68ec0..a2a6eb642 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -81,19 +81,17 @@ class SearchIndex: return self.client.indices.delete(self.index, ignore_unavailable=True) async def sync_queue(self, claim_queue): + self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize()) if claim_queue.empty(): return - to_delete, to_update = [], [] + actions = [] while not claim_queue.empty(): operation, doc = claim_queue.get_nowait() - if operation == 'delete': - to_delete.append(doc) - else: - to_update.append(doc) - await self.delete(to_delete) + actions.append(extract_doc(doc, self.index)) + self.logger.info("prepare update: %d elements. Queue: %d elements", len(actions), claim_queue.qsize()) await self.client.indices.refresh(self.index) - for bulk in range(0, len(to_update), 400): - await self.update(to_update[bulk:bulk+400]) + self.logger.info("update done: %d elements. Queue: %d elements", len(actions), claim_queue.qsize()) + await async_bulk(self.client, actions) await self.client.indices.refresh(self.index) await self.client.indices.flush(self.index) @@ -129,40 +127,6 @@ class SearchIndex: await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32) await self.client.indices.refresh(self.index) - async def update(self, claims): - if not claims: - return - actions = [extract_doc(claim, self.index) for claim in claims] - names = [] - claim_ids = [] - for claim in claims: - if claim['is_controlling']: - names.append(claim['normalized']) - claim_ids.append(claim['claim_id']) - if names: - update = expand_query(name__in=names, not_claim_id=claim_ids, is_controlling=True) - update['script'] = { - "source": "ctx._source.is_controlling=false", - "lang": "painless" - } - await self.client.indices.refresh(self.index) - await self.client.update_by_query(self.index, body=update) - await self.client.indices.refresh(self.index) - await async_bulk(self.client, actions) - - async def delete(self, claim_ids): - if not claim_ids: - return - actions = [{'_index': self.index, '_op_type': 'delete', '_id': claim_id} for claim_id in claim_ids] - await async_bulk(self.client, actions, raise_on_error=False) - update = expand_query(channel_id__in=claim_ids) - update['script'] = { - "source": "ctx._source.signature_valid=false", - "lang": "painless" - } - await self.client.indices.refresh(self.index) - await self.client.update_by_query(self.index, body=update) - async def delete_above_height(self, height): await self.client.delete_by_query(self.index, expand_query(height='>'+str(height))) await self.client.indices.refresh(self.index) diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index d2793f77c..406ebd7fa 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -1,10 +1,12 @@ import os +from asyncio import Queue + import apsw from typing import Union, Tuple, Set, List from itertools import chain from decimal import Decimal from collections import namedtuple -from multiprocessing import Manager, Queue +from multiprocessing import Manager from binascii import unhexlify, hexlify from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.util import class_logger @@ -143,6 +145,11 @@ class SQLDB: begin insert or ignore into changelog (claim_hash) values (new.claim_hash); end; + create trigger if not exists claimtrie_changelog after update on claimtrie + begin + insert or ignore into changelog (claim_hash) values (new.claim_hash); + insert or ignore into changelog (claim_hash) values (old.claim_hash); + end; """ SEARCH_INDEXES = """ @@ -226,7 +233,7 @@ class SQLDB: unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id } self.trending = trending - self.claim_queue = Queue(maxsize=100_000) + self.claim_queue = Queue() def open(self): self.db = apsw.Connection( @@ -845,14 +852,12 @@ class SQLDB: claim['tags'] = claim['tags'].split(',,') if claim['tags'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] - if not self.claim_queue.full(): - self.claim_queue.put_nowait(('update', claim)) + self.claim_queue.put_nowait(('update', claim)) self.execute("delete from changelog;") def enqueue_deleted(self, deleted_claims): for claim_hash in deleted_claims: - if not self.claim_queue.full(): - self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode())) + self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode())) def advance_txs(self, height, all_txs, header, daemon_height, timer): insert_claims = []