test sync helper

This commit is contained in:
Victor Shyba 2021-05-11 21:38:05 -03:00
parent 5101464e3b
commit 43a1385b79
3 changed files with 35 additions and 16 deletions

View file

@ -162,6 +162,9 @@ class SearchIndex:
await self.sync_client.update_by_query( await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4)
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
self.clear_caches()
def clear_caches(self):
self.search_cache.clear() self.search_cache.clear()
self.short_id_cache.clear() self.short_id_cache.clear()
self.claim_cache.clear() self.claim_cache.clear()

View file

@ -12,10 +12,8 @@ from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC from lbry.wallet.server.coin import LBC
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch
INDEX = 'claims'
async def get_all(db, shard_num, shards_total, limit=0, index_name='claims'):
async def get_all(db, shard_num, shards_total, limit=0):
logging.info("shard %d starting", shard_num) logging.info("shard %d starting", shard_num)
def exec_factory(cursor, statement, bindings): def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
@ -44,25 +42,26 @@ ORDER BY claim.height desc
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
if num % 10_000 == 0: if num % 10_000 == 0:
logging.info("%d/%d", num, total) logging.info("%d/%d", num, total)
yield extract_doc(claim, INDEX) yield extract_doc(claim, index_name)
if 0 < limit <= num: if 0 < limit <= num:
break break
async def consume(producer): async def consume(producer, index_name):
env = Env(LBC) env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
try: try:
await async_bulk(es, producer, request_timeout=120) await async_bulk(es, producer, request_timeout=120)
await es.indices.refresh(index=INDEX) print(await es.indices.refresh(index=index_name))
finally: finally:
await es.close() await es.close()
async def make_es_index(): async def make_es_index(index=None):
env = Env(LBC) env = Env(LBC)
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) if index is None:
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
try: try:
return await index.start() return await index.start()
@ -76,21 +75,21 @@ async def make_es_index():
index.stop() index.stop()
async def run(args, shard): async def run(db_path, clients, blocks, shard, index_name='claims'):
def itsbusy(*_): def itsbusy(*_):
logging.info("shard %d: db is busy, retry", shard) logging.info("shard %d: db is busy, retry", shard)
return True return True
db = apsw.Connection(args.db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) db = apsw.Connection(db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI)
db.setbusyhandler(itsbusy) db.setbusyhandler(itsbusy)
db.cursor().execute('pragma journal_mode=wal;') db.cursor().execute('pragma journal_mode=wal;')
db.cursor().execute('pragma temp_store=memory;') db.cursor().execute('pragma temp_store=memory;')
producer = get_all(db.cursor(), shard, args.clients, limit=args.blocks) producer = get_all(db.cursor(), shard, clients, limit=blocks, index_name=index_name)
await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients)))) await asyncio.gather(*(consume(producer, index_name=index_name) for _ in range(min(8, clients))))
def __run(args, shard): def __run(args, shard):
asyncio.run(run(args, shard)) asyncio.run(run(args.db_path, args.clients, args.blocks, shard))
def run_elastic_sync(): def run_elastic_sync():

View file

@ -4,6 +4,7 @@ import lbry
import lbry.wallet import lbry.wallet
from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
from lbry.wallet.server.db.elasticsearch.sync import run as run_sync, make_es_index
from lbry.wallet.server.session import LBRYElectrumX from lbry.wallet.server.session import LBRYElectrumX
from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
@ -13,9 +14,6 @@ class TestSessions(IntegrationTestCase):
""" """
Tests that server cleans up stale connections after session timeout and client times out too. Tests that server cleans up stale connections after session timeout and client times out too.
""" """
LEDGER = lbry.wallet
async def test_session_bloat_from_socket_timeout(self): async def test_session_bloat_from_socket_timeout(self):
await self.conductor.stop_spv() await self.conductor.stop_spv()
await self.ledger.stop() await self.ledger.stop()
@ -87,3 +85,22 @@ class TestUsagePayment(CommandTestCase):
self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted
self.assertEqual(tx.outputs[0].amount, 100000000) self.assertEqual(tx.outputs[0].amount, 100000000)
self.assertEqual(tx.outputs[0].get_address(self.ledger), address) self.assertEqual(tx.outputs[0].get_address(self.ledger), address)
class TestESSync(CommandTestCase):
VERBOSITY = 'DEBUG'
async def test_es_sync_utility(self):
for i in range(10):
await self.stream_create(f"stream{i}", bid='0.001')
await self.generate(1)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
db = self.conductor.spv_node.server.db
await db.search_index.delete_index()
db.search_index.clear_caches()
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
await db.search_index.start()
db.search_index.clear_caches()
await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))