use a thread pool to sync changes

This commit is contained in:
Victor Shyba 2021-01-31 05:14:46 -03:00
parent e4d06a088b
commit d4bf004d74
2 changed files with 9 additions and 4 deletions

View file

@ -5,8 +5,6 @@ from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional from typing import Optional
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
import lbry import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.db.elastic_search import SearchIndex
from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
@ -165,6 +163,7 @@ class BlockProcessor:
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.executor = ThreadPoolExecutor(1) self.executor = ThreadPoolExecutor(1)
self.index_executor = ThreadPoolExecutor(8)
# Meta # Meta
self.next_cache_check = 0 self.next_cache_check = 0
@ -216,6 +215,10 @@ class BlockProcessor:
if hprevs == chain: if hprevs == chain:
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.run_in_thread_with_lock(self.advance_blocks, blocks)
pending = []
for height in range(first, first + len(blocks)):
pending.append(asyncio.get_event_loop().run_in_executor(self.index_executor, self.db.sql.enqueue_changes, height))
await asyncio.gather(*pending)
await self.db.search_index.sync_queue(self.sql.claim_queue) await self.db.search_index.sync_queue(self.sql.claim_queue)
for cache in self.search_cache.values(): for cache in self.search_cache.values():
cache.clear() cache.clear()

View file

@ -806,7 +806,7 @@ class SQLDB:
f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}"
) )
def enqueue_changes(self, height, deleted_claims): def enqueue_changes(self, height):
for claim in self.execute(f""" for claim in self.execute(f"""
SELECT claimtrie.claim_hash as is_controlling, SELECT claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height, claimtrie.last_take_over_height,
@ -838,6 +838,8 @@ class SQLDB:
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
if not self.claim_queue.full(): if not self.claim_queue.full():
self.claim_queue.put_nowait(('update', claim)) self.claim_queue.put_nowait(('update', claim))
def enqueue_deleted(self, deleted_claims):
for claim_hash in deleted_claims: for claim_hash in deleted_claims:
if not self.claim_queue.full(): if not self.claim_queue.full():
self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode())) self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode()))
@ -937,7 +939,7 @@ class SQLDB:
if not self._fts_synced and self.main.first_sync and height == daemon_height: if not self._fts_synced and self.main.first_sync and height == daemon_height:
r(first_sync_finished, self.db.cursor()) r(first_sync_finished, self.db.cursor())
self._fts_synced = True self._fts_synced = True
r(self.enqueue_changes, height, delete_claim_hashes) r(self.enqueue_deleted, delete_claim_hashes)
class LBRYLevelDB(LevelDB): class LBRYLevelDB(LevelDB):