Merge pull request #3305 from lbryio/fix_migrator_tool

fix hub Elasticsearch sync/migrations tool for when the db exists already
This commit is contained in:
Alex Grin 2021-05-14 11:06:07 -04:00 committed by GitHub
commit 09339c9cfb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 18 deletions

View file

@ -68,9 +68,9 @@ class SearchIndex:
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400 self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
) )
async def start(self): async def start(self) -> bool:
if self.sync_client: if self.sync_client:
return return False
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout)
@ -162,6 +162,9 @@ class SearchIndex:
await self.sync_client.update_by_query( await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4)
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
self.clear_caches()
def clear_caches(self):
self.search_cache.clear() self.search_cache.clear()
self.short_id_cache.clear() self.short_id_cache.clear()
self.claim_cache.clear() self.claim_cache.clear()

View file

@ -12,10 +12,8 @@ from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC from lbry.wallet.server.coin import LBC
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch
INDEX = 'claims'
async def get_all(db, shard_num, shards_total, limit=0, index_name='claims'):
async def get_all(db, shard_num, shards_total, limit=0):
logging.info("shard %d starting", shard_num) logging.info("shard %d starting", shard_num)
def exec_factory(cursor, statement, bindings): def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
@ -44,24 +42,25 @@ ORDER BY claim.height desc
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
if num % 10_000 == 0: if num % 10_000 == 0:
logging.info("%d/%d", num, total) logging.info("%d/%d", num, total)
yield extract_doc(claim, INDEX) yield extract_doc(claim, index_name)
if 0 < limit <= num: if 0 < limit <= num:
break break
async def consume(producer): async def consume(producer, index_name):
env = Env(LBC) env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
try: try:
await async_bulk(es, producer, request_timeout=120) await async_bulk(es, producer, request_timeout=120)
await es.indices.refresh(index=INDEX) await es.indices.refresh(index=index_name)
finally: finally:
await es.close() await es.close()
async def make_es_index(): async def make_es_index(index=None):
env = Env(LBC) env = Env(LBC)
if index is None:
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
try: try:
@ -71,26 +70,27 @@ async def make_es_index():
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
) )
await index.delete_index() await index.delete_index()
await index.stop()
return await index.start() return await index.start()
finally: finally:
index.stop() index.stop()
async def run(args, shard): async def run(db_path, clients, blocks, shard, index_name='claims'):
def itsbusy(*_): def itsbusy(*_):
logging.info("shard %d: db is busy, retry", shard) logging.info("shard %d: db is busy, retry", shard)
return True return True
db = apsw.Connection(args.db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) db = apsw.Connection(db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI)
db.setbusyhandler(itsbusy) db.setbusyhandler(itsbusy)
db.cursor().execute('pragma journal_mode=wal;') db.cursor().execute('pragma journal_mode=wal;')
db.cursor().execute('pragma temp_store=memory;') db.cursor().execute('pragma temp_store=memory;')
producer = get_all(db.cursor(), shard, args.clients, limit=args.blocks) producer = get_all(db.cursor(), shard, clients, limit=blocks, index_name=index_name)
await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients)))) await asyncio.gather(*(consume(producer, index_name=index_name) for _ in range(min(8, clients))))
def __run(args, shard): def __run(args, shard):
asyncio.run(run(args, shard)) asyncio.run(run(args.db_path, args.clients, args.blocks, shard))
def run_elastic_sync(): def run_elastic_sync():

View file

@ -4,6 +4,7 @@ import lbry
import lbry.wallet import lbry.wallet
from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
from lbry.wallet.server.db.elasticsearch.sync import run as run_sync, make_es_index
from lbry.wallet.server.session import LBRYElectrumX from lbry.wallet.server.session import LBRYElectrumX
from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
@ -13,9 +14,6 @@ class TestSessions(IntegrationTestCase):
""" """
Tests that server cleans up stale connections after session timeout and client times out too. Tests that server cleans up stale connections after session timeout and client times out too.
""" """
LEDGER = lbry.wallet
async def test_session_bloat_from_socket_timeout(self): async def test_session_bloat_from_socket_timeout(self):
await self.conductor.stop_spv() await self.conductor.stop_spv()
await self.ledger.stop() await self.ledger.stop()
@ -87,3 +85,31 @@ class TestUsagePayment(CommandTestCase):
self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted
self.assertEqual(tx.outputs[0].amount, 100000000) self.assertEqual(tx.outputs[0].amount, 100000000)
self.assertEqual(tx.outputs[0].get_address(self.ledger), address) self.assertEqual(tx.outputs[0].get_address(self.ledger), address)
class TestESSync(CommandTestCase):
async def test_es_sync_utility(self):
for i in range(10):
await self.stream_create(f"stream{i}", bid='0.001')
await self.generate(1)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
db = self.conductor.spv_node.server.db
await db.search_index.delete_index()
db.search_index.clear_caches()
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
async def resync():
await db.search_index.start()
db.search_index.clear_caches()
await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
await resync()
# this time we will test a migration from unversioned to v1
await db.search_index.sync_client.indices.delete_template(db.search_index.index)
await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
await db.search_index.start()
await resync()