Merge pull request #2800 from lbryio/fix-2746

Fix race condition setting/clearing the claim_search and resolve cache
This commit is contained in:
Jack Robison 2020-02-13 11:39:27 -05:00 committed by GitHub
commit 3713d3488d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -164,6 +164,7 @@ class BlockProcessor:
# is consistent with self.height # is consistent with self.height
self.state_lock = asyncio.Lock() self.state_lock = asyncio.Lock()
self.search_cache = {}
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -191,6 +192,8 @@ class BlockProcessor:
if hprevs == chain: if hprevs == chain:
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.run_in_thread_with_lock(self.advance_blocks, blocks)
for cache in self.search_cache.values():
cache.clear()
await self._maybe_flush() await self._maybe_flush()
processed_time = time.perf_counter() - start processed_time = time.perf_counter() - start
BLOCK_COUNT.set(self.height) BLOCK_COUNT.set(self.height)
@ -729,7 +732,6 @@ class LBRYBlockProcessor(BlockProcessor):
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor') self.timer = Timer('BlockProcessor')
self.search_cache = {}
def advance_blocks(self, blocks): def advance_blocks(self, blocks):
self.sql.begin() self.sql.begin()
@ -744,8 +746,6 @@ class LBRYBlockProcessor(BlockProcessor):
self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES') self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES')
if self.env.individual_tag_indexes: if self.env.individual_tag_indexes:
self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES') self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES')
for cache in self.search_cache.values():
cache.clear()
def advance_txs(self, height, txs, header): def advance_txs(self, height, txs, header):
timer = self.timer.sub_timers['advance_blocks'] timer = self.timer.sub_timers['advance_blocks']