reader mode

This commit is contained in:
Victor Shyba 2021-02-04 18:49:30 -03:00
parent 87037c06c9
commit 5d3704c7ea
7 changed files with 45 additions and 19 deletions

View file

@ -189,7 +189,7 @@ class SPVNode:
'MAX_QUERY_WORKERS': '0',
'INDIVIDUAL_TAG_INDEXES': '',
'RPC_PORT': self.rpc_port,
'ES_INDEX_PREFIX': uuid4().hex
'ES_INDEX_PREFIX': uuid4().hex,
}
if extraconf:
conf.update(extraconf)

View file

@ -215,7 +215,8 @@ class BlockProcessor:
if hprevs == chain:
start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
await self.db.search_index.sync_queue(self.sql.claim_queue)
if self.sql:
await self.db.search_index.sync_queue(self.sql.claim_queue)
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
@ -229,8 +230,9 @@ class BlockProcessor:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
if self._caught_up_event.is_set():
await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels)
if self.sql:
await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
self.sql.filtered_streams, self.sql.filtered_channels)
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
elif hprevs[0] != chain[0]:
@ -285,7 +287,8 @@ class BlockProcessor:
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
if self.sql:
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
except:
@ -789,15 +792,17 @@ class LBRYBlockProcessor(BlockProcessor):
self.timer = Timer('BlockProcessor')
def advance_blocks(self, blocks):
self.sql.begin()
if self.sql:
self.sql.begin()
try:
self.timer.run(super().advance_blocks, blocks)
except:
self.logger.exception(f'Error while advancing transaction in new block.')
raise
finally:
self.sql.commit()
if self.db.first_sync and self.height == self.daemon.cached_height():
if self.sql:
self.sql.commit()
if self.sql and self.db.first_sync and self.height == self.daemon.cached_height():
self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES')
if self.env.individual_tag_indexes:
self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES')
@ -806,7 +811,8 @@ class LBRYBlockProcessor(BlockProcessor):
def advance_txs(self, height, txs, header, block_hash):
timer = self.timer.sub_timers['advance_blocks']
undo = timer.run(super().advance_txs, height, txs, header, block_hash, timer_name='super().advance_txs')
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if self.sql:
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
self.timer.show(height=height)
return undo

View file

@ -66,7 +66,7 @@ class SearchIndex:
return asyncio.ensure_future(client.close())
def delete_index(self):
return self.client.indices.delete(self.index)
return self.client.indices.delete(self.index, ignore_unavailable=True)
async def sync_queue(self, claim_queue):
if claim_queue.empty():

View file

@ -951,21 +951,28 @@ class LBRYLevelDB(LevelDB):
for algorithm_name in self.env.trending_algorithms:
if algorithm_name in TRENDING_ALGORITHMS:
trending.append(TRENDING_ALGORITHMS[algorithm_name])
self.sql = SQLDB(
self, path,
self.env.default('BLOCKING_CHANNEL_IDS', '').split(' '),
self.env.default('FILTERING_CHANNEL_IDS', '').split(' '),
trending
)
if self.env.es_mode == 'writer':
self.logger.info('Index mode: writer. Using SQLite db to sync ES')
self.sql = SQLDB(
self, path,
self.env.default('BLOCKING_CHANNEL_IDS', '').split(' '),
self.env.default('FILTERING_CHANNEL_IDS', '').split(' '),
trending
)
else:
self.logger.info('Index mode: reader')
self.sql = None
# Search index
self.search_index = SearchIndex(self.env.es_index_prefix)
def close(self):
super().close()
self.sql.close()
if self.sql:
self.sql.close()
async def _open_dbs(self, *args, **kwargs):
await self.search_index.start()
await super()._open_dbs(*args, **kwargs)
self.sql.open()
if self.sql:
self.sql.open()

View file

@ -54,6 +54,7 @@ class Env:
network = self.default('NET', 'mainnet').strip()
self.coin = Coin.lookup_coin_class(coin_name, network)
self.es_index_prefix = self.default('ES_INDEX_PREFIX', '')
self.es_mode = self.default('ES_MODE', 'writer')
self.cache_MB = self.integer('CACHE_MB', 1200)
self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
# Server stuff

View file

@ -11,6 +11,7 @@ from lbry.extras.daemon.comment_client import verify
from lbry.extras.daemon.daemon import DEFAULT_PAGE_SIZE
from lbry.testcase import CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode
from lbry.wallet.transaction import Transaction
from lbry.wallet.util import satoshis_to_coins as lbc
@ -97,6 +98,18 @@ class ClaimSearchCommand(ClaimTestCase):
with self.assertRaises(ConnectionResetError):
await self.claim_search(claim_ids=claim_ids)
async def test_claim_search_as_reader_server(self):
node2 = SPVNode(self.conductor.spv_module, node_number=2)
current_prefix = self.conductor.spv_node.server.bp.env.es_index_prefix
await node2.start(self.blockchain, extraconf={'ES_MODE': 'reader', 'ES_INDEX_PREFIX': current_prefix})
self.addCleanup(node2.stop)
self.ledger.network.config['default_servers'] = [(node2.hostname, node2.port)]
await self.ledger.stop()
await self.ledger.start()
channel2 = await self.channel_create('@abc', '0.1', allow_duplicate_name=True)
await asyncio.sleep(1) # fixme: find a way to block on the writer
await self.assertFindsClaims([channel2], name='@abc')
async def test_basic_claim_search(self):
await self.create_channel()
channel_txo = self.channel['outputs'][0]

View file

@ -80,7 +80,6 @@ class ReconnectTests(IntegrationTestCase):
self.assertFalse(self.ledger.network.is_connected)
await self.ledger.resolve([], ['derp'])
self.assertEqual(50002, self.ledger.network.client.server[1])
await node2.stop(True)
async def test_direct_sync(self):
await self.ledger.stop()