index ES during sync

This commit is contained in:
Victor Shyba 2021-01-17 05:40:39 -03:00
parent 8da04a584f
commit 3abdc01230
3 changed files with 23 additions and 2 deletions

View file

@ -6,6 +6,7 @@ install:
--global-option=fetch \ --global-option=fetch \
--global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=--version --global-option=3.30.1 --global-option=--all \
--global-option=build --global-option=--enable --global-option=fts5 --global-option=build --global-option=--enable --global-option=fts5
python -m pip install elasticsearch[async]
pip install -e . pip install -e .
tools: tools:

View file

@ -4,7 +4,7 @@ from typing import Union, Tuple, Set, List
from itertools import chain from itertools import chain
from decimal import Decimal from decimal import Decimal
from collections import namedtuple from collections import namedtuple
from multiprocessing import Manager from multiprocessing import Manager, Queue
from binascii import unhexlify from binascii import unhexlify
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger from lbry.wallet.server.util import class_logger
@ -20,7 +20,6 @@ from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
ATTRIBUTE_ARRAY_MAX_LENGTH = 100 ATTRIBUTE_ARRAY_MAX_LENGTH = 100
@ -217,6 +216,7 @@ class SQLDB:
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
} }
self.trending = trending self.trending = trending
self.claim_queue = Queue(maxsize=10)
def open(self): def open(self):
self.db = apsw.Connection( self.db = apsw.Connection(
@ -804,6 +804,22 @@ class SQLDB:
f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}"
) )
def enqueue_changes(self, changed_claim_hashes, deleted_claims):
if not changed_claim_hashes and not deleted_claims:
return
for claim_hash in deleted_claims:
if not self.claim_queue.full():
self.claim_queue.put_nowait(('delete', claim_hash))
for claim in self.execute(f"""
SELECT claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
claim.*
FROM claim LEFT JOIN claimtrie USING (claim_hash)
WHERE claim_hash IN ({','.join('?' for _ in changed_claim_hashes)})
""", changed_claim_hashes):
if not self.claim_queue.full():
self.claim_queue.put_nowait(('update', dict(claim._asdict())))
def advance_txs(self, height, all_txs, header, daemon_height, timer): def advance_txs(self, height, all_txs, header, daemon_height, timer):
insert_claims = [] insert_claims = []
update_claims = [] update_claims = []
@ -899,6 +915,7 @@ class SQLDB:
if not self._fts_synced and self.main.first_sync and height == daemon_height: if not self._fts_synced and self.main.first_sync and height == daemon_height:
r(first_sync_finished, self.db.cursor()) r(first_sync_finished, self.db.cursor())
self._fts_synced = True self._fts_synced = True
r(self.enqueue_changes, recalculate_claim_hashes, delete_claim_hashes)
class LBRYLevelDB(LevelDB): class LBRYLevelDB(LevelDB):

View file

@ -5,6 +5,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
import typing import typing
import lbry import lbry
from lbry.wallet.server.db.elastic_search import indexer_task
from lbry.wallet.server.mempool import MemPool, MemPoolAPI from lbry.wallet.server.mempool import MemPool, MemPoolAPI
from lbry.prometheus import PrometheusServer from lbry.prometheus import PrometheusServer
@ -94,6 +95,7 @@ class Server:
self.session_mgr = env.coin.SESSION_MANAGER( self.session_mgr = env.coin.SESSION_MANAGER(
env, db, bp, daemon, mempool, self.shutdown_event env, db, bp, daemon, mempool, self.shutdown_event
) )
self._indexer_task = None
async def start(self): async def start(self):
env = self.env env = self.env
@ -121,6 +123,7 @@ class Server:
await self.db.populate_header_merkle_cache() await self.db.populate_header_merkle_cache()
await _start_cancellable(self.mempool.keep_synchronized) await _start_cancellable(self.mempool.keep_synchronized)
await _start_cancellable(self.session_mgr.serve, self.notifications) await _start_cancellable(self.session_mgr.serve, self.notifications)
self.cancellable_tasks.append(asyncio.create_task(indexer_task(self.bp.sql.claim_queue)))
async def stop(self): async def stop(self):
for task in reversed(self.cancellable_tasks): for task in reversed(self.cancellable_tasks):