diff --git a/Makefile b/Makefile index a6221fa03..9911c24da 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ install: --global-option=fetch \ --global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=build --global-option=--enable --global-option=fts5 + python -m pip install elasticsearch[async] pip install -e . tools: diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 86f9e0c12..c051d9152 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -4,7 +4,7 @@ from typing import Union, Tuple, Set, List from itertools import chain from decimal import Decimal from collections import namedtuple -from multiprocessing import Manager +from multiprocessing import Manager, Queue from binascii import unhexlify from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.util import class_logger @@ -20,7 +20,6 @@ from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES - ATTRIBUTE_ARRAY_MAX_LENGTH = 100 @@ -217,6 +216,7 @@ class SQLDB: unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id } self.trending = trending + self.claim_queue = Queue(maxsize=10) def open(self): self.db = apsw.Connection( @@ -804,6 +804,22 @@ class SQLDB: f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" ) + def enqueue_changes(self, changed_claim_hashes, deleted_claims): + if not changed_claim_hashes and not deleted_claims: + return + for claim_hash in deleted_claims: + if not self.claim_queue.full(): + self.claim_queue.put_nowait(('delete', claim_hash)) + for claim in self.execute(f""" + SELECT claimtrie.claim_hash as is_controlling, + claimtrie.last_take_over_height, + claim.* + FROM claim LEFT JOIN claimtrie USING (claim_hash) + WHERE claim_hash IN ({','.join('?' for _ in changed_claim_hashes)}) + """, changed_claim_hashes): + if not self.claim_queue.full(): + self.claim_queue.put_nowait(('update', dict(claim._asdict()))) + def advance_txs(self, height, all_txs, header, daemon_height, timer): insert_claims = [] update_claims = [] @@ -899,6 +915,7 @@ class SQLDB: if not self._fts_synced and self.main.first_sync and height == daemon_height: r(first_sync_finished, self.db.cursor()) self._fts_synced = True + r(self.enqueue_changes, recalculate_claim_hashes, delete_claim_hashes) class LBRYLevelDB(LevelDB): diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index fc789b7da..0cb046965 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -5,6 +5,7 @@ from concurrent.futures.thread import ThreadPoolExecutor import typing import lbry +from lbry.wallet.server.db.elastic_search import indexer_task from lbry.wallet.server.mempool import MemPool, MemPoolAPI from lbry.prometheus import PrometheusServer @@ -94,6 +95,7 @@ class Server: self.session_mgr = env.coin.SESSION_MANAGER( env, db, bp, daemon, mempool, self.shutdown_event ) + self._indexer_task = None async def start(self): env = self.env @@ -121,6 +123,7 @@ class Server: await self.db.populate_header_merkle_cache() await _start_cancellable(self.mempool.keep_synchronized) await _start_cancellable(self.session_mgr.serve, self.notifications) + self.cancellable_tasks.append(asyncio.create_task(indexer_task(self.bp.sql.claim_queue))) async def stop(self): for task in reversed(self.cancellable_tasks):