add --reindex option
to lbry-hub-elastic-sync
This commit is contained in:
parent
e0f7066163
commit
704ec9e553
7 changed files with 147 additions and 58 deletions
|
@ -263,7 +263,8 @@ class SPVNode:
|
||||||
self.es_writer = ElasticWriter(env)
|
self.es_writer = ElasticWriter(env)
|
||||||
await self.writer.open()
|
await self.writer.open()
|
||||||
await self.writer.start()
|
await self.writer.start()
|
||||||
await asyncio.wait([self.server.start(), self.es_writer.start()])
|
await self.es_writer.start()
|
||||||
|
await self.server.start()
|
||||||
|
|
||||||
async def stop(self, cleanup=True):
|
async def stop(self, cleanup=True):
|
||||||
if self.stopped:
|
if self.stopped:
|
||||||
|
|
|
@ -1420,7 +1420,6 @@ class BlockProcessor:
|
||||||
# flush the changes
|
# flush the changes
|
||||||
save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit
|
save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit
|
||||||
|
|
||||||
self.db.write_db_state()
|
|
||||||
if save_undo:
|
if save_undo:
|
||||||
self.db.prefix_db.commit(self.height, self.tip)
|
self.db.prefix_db.commit(self.height, self.tip)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -50,7 +50,7 @@ class BlockchainReader:
|
||||||
if self.last_state:
|
if self.last_state:
|
||||||
while True:
|
while True:
|
||||||
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
|
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
|
||||||
self.log.info("connects to block %i", last_height)
|
self.log.debug("connects to block %i", last_height)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
self.log.warning("disconnect block %i", last_height)
|
self.log.warning("disconnect block %i", last_height)
|
||||||
|
|
|
@ -7,20 +7,23 @@ from lbry.wallet.server.chain_reader import BlockchainReaderServer
|
||||||
from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter
|
from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter
|
||||||
|
|
||||||
|
|
||||||
def get_args_and_setup_logging(name):
|
def get_arg_parser(name):
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
prog=name
|
prog=name
|
||||||
)
|
)
|
||||||
Env.contribute_to_arg_parser(parser)
|
Env.contribute_to_arg_parser(parser)
|
||||||
args = parser.parse_args()
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
def setup_logging():
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
logging.getLogger('aiohttp').setLevel(logging.WARNING)
|
logging.getLogger('aiohttp').setLevel(logging.WARNING)
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
|
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
|
||||||
return args
|
|
||||||
|
|
||||||
|
|
||||||
def run_writer_forever():
|
def run_writer_forever():
|
||||||
args = get_args_and_setup_logging('lbry-hub-writer')
|
setup_logging()
|
||||||
|
args = get_arg_parser('lbry-hub-writer').parse_args()
|
||||||
try:
|
try:
|
||||||
block_processor = BlockProcessor(Env.from_arg_parser(args))
|
block_processor = BlockProcessor(Env.from_arg_parser(args))
|
||||||
block_processor.run()
|
block_processor.run()
|
||||||
|
@ -32,7 +35,8 @@ def run_writer_forever():
|
||||||
|
|
||||||
|
|
||||||
def run_server_forever():
|
def run_server_forever():
|
||||||
args = get_args_and_setup_logging('lbry-hub-server')
|
setup_logging()
|
||||||
|
args = get_arg_parser('lbry-hub-server').parse_args()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
server = BlockchainReaderServer(Env.from_arg_parser(args))
|
server = BlockchainReaderServer(Env.from_arg_parser(args))
|
||||||
|
@ -45,10 +49,14 @@ def run_server_forever():
|
||||||
|
|
||||||
|
|
||||||
def run_es_sync_forever():
|
def run_es_sync_forever():
|
||||||
args = get_args_and_setup_logging('lbry-hub-elastic-sync')
|
setup_logging()
|
||||||
|
parser = get_arg_parser('lbry-hub-elastic-sync')
|
||||||
|
parser.add_argument('--reindex', type=bool, default=False)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
server = ElasticWriter(Env.from_arg_parser(args))
|
server = ElasticWriter(Env.from_arg_parser(args))
|
||||||
server.run()
|
server.run(args.reindex)
|
||||||
except Exception:
|
except Exception:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
logging.critical('es writer terminated abnormally')
|
logging.critical('es writer terminated abnormally')
|
||||||
|
|
|
@ -1151,10 +1151,3 @@ class HubDB:
|
||||||
utxo_append((hashX, utxo_value.amount))
|
utxo_append((hashX, utxo_value.amount))
|
||||||
return utxos
|
return utxos
|
||||||
return await asyncio.get_event_loop().run_in_executor(self._executor, lookup_utxos)
|
return await asyncio.get_event_loop().run_in_executor(self._executor, lookup_utxos)
|
||||||
|
|
||||||
async def get_trending_notifications(self, height: int):
|
|
||||||
def read_trending():
|
|
||||||
return {
|
|
||||||
k.claim_hash: v for k, v in self.prefix_db.trending_notification.iterate((height,))
|
|
||||||
}
|
|
||||||
return await asyncio.get_event_loop().run_in_executor(self._executor, read_trending)
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierProtocol
|
||||||
from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
||||||
from lbry.wallet.server.chain_reader import BlockchainReader
|
from lbry.wallet.server.chain_reader import BlockchainReader
|
||||||
from lbry.wallet.server.db.revertable import RevertableOp
|
from lbry.wallet.server.db.revertable import RevertableOp
|
||||||
|
from lbry.wallet.server.db.common import TrendingNotification
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,9 +35,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
self._elastic_host = env.elastic_host
|
self._elastic_host = env.elastic_host
|
||||||
self._elastic_port = env.elastic_port
|
self._elastic_port = env.elastic_port
|
||||||
self.sync_timeout = 1800
|
self.sync_timeout = 1800
|
||||||
self.sync_client = AsyncElasticsearch(
|
self.sync_client = None
|
||||||
[{'host': self._elastic_host, 'port': self._elastic_port}], timeout=self.sync_timeout
|
|
||||||
)
|
|
||||||
self._es_info_path = os.path.join(env.db_dir, 'es_info')
|
self._es_info_path = os.path.join(env.db_dir, 'es_info')
|
||||||
self._last_wrote_height = 0
|
self._last_wrote_height = 0
|
||||||
self._last_wrote_block_hash = None
|
self._last_wrote_block_hash = None
|
||||||
|
@ -66,8 +65,10 @@ class ElasticWriter(BlockchainReader):
|
||||||
self.log.info("notify listener %i", height)
|
self.log.info("notify listener %i", height)
|
||||||
|
|
||||||
def _read_es_height(self):
|
def _read_es_height(self):
|
||||||
with open(self._es_info_path, 'r') as f:
|
info = {}
|
||||||
info = json.loads(f.read())
|
if os.path.exists(self._es_info_path):
|
||||||
|
with open(self._es_info_path, 'r') as f:
|
||||||
|
info.update(json.loads(f.read()))
|
||||||
self._last_wrote_height = int(info.get('height', 0))
|
self._last_wrote_height = int(info.get('height', 0))
|
||||||
self._last_wrote_block_hash = info.get('block_hash', None)
|
self._last_wrote_block_hash = info.get('block_hash', None)
|
||||||
|
|
||||||
|
@ -100,22 +101,26 @@ class ElasticWriter(BlockchainReader):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
await self.sync_client.cluster.health(wait_for_status='yellow')
|
await self.sync_client.cluster.health(wait_for_status='yellow')
|
||||||
|
self.log.info("ES is ready to connect to")
|
||||||
break
|
break
|
||||||
except ConnectionError:
|
except ConnectionError:
|
||||||
self.log.warning("Failed to connect to Elasticsearch. Waiting for it!")
|
self.log.warning("Failed to connect to Elasticsearch. Waiting for it!")
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
index_version = await self.get_index_version()
|
||||||
|
|
||||||
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
|
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
|
||||||
acked = res.get('acknowledged', False)
|
acked = res.get('acknowledged', False)
|
||||||
|
|
||||||
if acked:
|
if acked:
|
||||||
await self.set_index_version(self.VERSION)
|
await self.set_index_version(self.VERSION)
|
||||||
return acked
|
return True
|
||||||
index_version = await self.get_index_version()
|
elif index_version != self.VERSION:
|
||||||
if index_version != self.VERSION:
|
|
||||||
self.log.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
|
self.log.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
|
||||||
raise IndexVersionMismatch(index_version, self.VERSION)
|
raise IndexVersionMismatch(index_version, self.VERSION)
|
||||||
await self.sync_client.indices.refresh(self.index)
|
else:
|
||||||
return acked
|
await self.sync_client.indices.refresh(self.index)
|
||||||
|
return False
|
||||||
|
|
||||||
async def stop_index(self):
|
async def stop_index(self):
|
||||||
if self.sync_client:
|
if self.sync_client:
|
||||||
|
@ -195,10 +200,10 @@ class ElasticWriter(BlockchainReader):
|
||||||
'params': {'src': {
|
'params': {'src': {
|
||||||
'changes': [
|
'changes': [
|
||||||
{
|
{
|
||||||
'height': notify_height,
|
'height': notification.height,
|
||||||
'prev_amount': trending_v.previous_amount / 1E8,
|
'prev_amount': notification.prev_amount / 1E8,
|
||||||
'new_amount': trending_v.new_amount / 1E8,
|
'new_amount': notification.new_amount / 1E8,
|
||||||
} for (notify_height, trending_v) in notifications
|
} for notification in notifications
|
||||||
]
|
]
|
||||||
}}
|
}}
|
||||||
},
|
},
|
||||||
|
@ -219,7 +224,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
|
|
||||||
touched_or_deleted = self.db.prefix_db.touched_or_deleted.get(height)
|
touched_or_deleted = self.db.prefix_db.touched_or_deleted.get(height)
|
||||||
for k, v in self.db.prefix_db.trending_notification.iterate((height,)):
|
for k, v in self.db.prefix_db.trending_notification.iterate((height,)):
|
||||||
self._trending[k.claim_hash].append((k.height, v))
|
self._trending[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount))
|
||||||
if touched_or_deleted:
|
if touched_or_deleted:
|
||||||
readded_after_reorg = self._removed_during_undo.intersection(touched_or_deleted.touched_claims)
|
readded_after_reorg = self._removed_during_undo.intersection(touched_or_deleted.touched_claims)
|
||||||
self._deleted_claims.difference_update(readded_after_reorg)
|
self._deleted_claims.difference_update(readded_after_reorg)
|
||||||
|
@ -292,7 +297,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
def last_synced_height(self) -> int:
|
def last_synced_height(self) -> int:
|
||||||
return self._last_wrote_height
|
return self._last_wrote_height
|
||||||
|
|
||||||
async def start(self):
|
async def start(self, reindex=False):
|
||||||
await super().start()
|
await super().start()
|
||||||
|
|
||||||
def _start_cancellable(run, *args):
|
def _start_cancellable(run, *args):
|
||||||
|
@ -302,10 +307,15 @@ class ElasticWriter(BlockchainReader):
|
||||||
|
|
||||||
self.db.open_db()
|
self.db.open_db()
|
||||||
await self.db.initialize_caches()
|
await self.db.initialize_caches()
|
||||||
|
await self.read_es_height()
|
||||||
await self.start_index()
|
await self.start_index()
|
||||||
self.last_state = self.db.read_db_state()
|
self.last_state = self.db.read_db_state()
|
||||||
|
|
||||||
await _start_cancellable(self.run_es_notifier)
|
await _start_cancellable(self.run_es_notifier)
|
||||||
|
|
||||||
|
if reindex or self._last_wrote_height == 0 and self.db.db_height > 0:
|
||||||
|
self.log.warning("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height)
|
||||||
|
await self.reindex()
|
||||||
await _start_cancellable(self.refresh_blocks_forever)
|
await _start_cancellable(self.refresh_blocks_forever)
|
||||||
|
|
||||||
async def stop(self, delete_index=False):
|
async def stop(self, delete_index=False):
|
||||||
|
@ -319,8 +329,9 @@ class ElasticWriter(BlockchainReader):
|
||||||
await self.stop_index()
|
await self.stop_index()
|
||||||
self._executor.shutdown(wait=True)
|
self._executor.shutdown(wait=True)
|
||||||
self._executor = None
|
self._executor = None
|
||||||
|
self.shutdown_event.set()
|
||||||
|
|
||||||
def run(self):
|
def run(self, reindex=False):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
def __exit():
|
def __exit():
|
||||||
|
@ -328,9 +339,65 @@ class ElasticWriter(BlockchainReader):
|
||||||
try:
|
try:
|
||||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||||
loop.run_until_complete(self.start())
|
loop.run_until_complete(self.start(reindex=reindex))
|
||||||
loop.run_until_complete(self.shutdown_event.wait())
|
loop.run_until_complete(self.shutdown_event.wait())
|
||||||
except (SystemExit, KeyboardInterrupt):
|
except (SystemExit, KeyboardInterrupt):
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
loop.run_until_complete(self.stop())
|
loop.run_until_complete(self.stop())
|
||||||
|
|
||||||
|
async def reindex(self):
|
||||||
|
async with self._lock:
|
||||||
|
self.log.info("reindexing %i claims (estimate)", self.db.prefix_db.claim_to_txo.estimate_num_keys())
|
||||||
|
await self.delete_index()
|
||||||
|
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
|
||||||
|
acked = res.get('acknowledged', False)
|
||||||
|
if acked:
|
||||||
|
await self.set_index_version(self.VERSION)
|
||||||
|
await self.sync_client.indices.refresh(self.index)
|
||||||
|
self.write_es_height(0, self.env.coin.GENESIS_HASH)
|
||||||
|
await self._sync_all_claims()
|
||||||
|
await self.sync_client.indices.refresh(self.index)
|
||||||
|
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
|
||||||
|
self.notify_es_notification_listeners(self.db.db_height, self.db.db_tip)
|
||||||
|
self.log.warning("finished reindexing")
|
||||||
|
|
||||||
|
async def _sync_all_claims(self, batch_size=100000):
|
||||||
|
def load_historic_trending():
|
||||||
|
notifications = self._trending
|
||||||
|
for k, v in self.db.prefix_db.trending_notification.iterate():
|
||||||
|
notifications[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount))
|
||||||
|
|
||||||
|
async def all_claims_producer():
|
||||||
|
async for claim in self.db.all_claims_producer(batch_size=batch_size):
|
||||||
|
yield self._upsert_claim_query(self.index, claim)
|
||||||
|
claim_hash = bytes.fromhex(claim['claim_id'])
|
||||||
|
if claim_hash in self._trending:
|
||||||
|
yield self._update_trending_query(self.index, claim_hash, self._trending.pop(claim_hash))
|
||||||
|
self._trending.clear()
|
||||||
|
|
||||||
|
self.log.info("loading about %i historic trending updates", self.db.prefix_db.trending_notification.estimate_num_keys())
|
||||||
|
await asyncio.get_event_loop().run_in_executor(self._executor, load_historic_trending)
|
||||||
|
self.log.info("loaded historic trending updates for %i claims", len(self._trending))
|
||||||
|
|
||||||
|
cnt = 0
|
||||||
|
success = 0
|
||||||
|
producer = all_claims_producer()
|
||||||
|
|
||||||
|
finished = False
|
||||||
|
try:
|
||||||
|
async for ok, item in async_streaming_bulk(self.sync_client, producer, raise_on_error=False):
|
||||||
|
cnt += 1
|
||||||
|
if not ok:
|
||||||
|
self.log.warning("indexing failed for an item: %s", item)
|
||||||
|
else:
|
||||||
|
success += 1
|
||||||
|
if cnt % batch_size == 0:
|
||||||
|
self.log.info(f"indexed {success} claims")
|
||||||
|
finished = True
|
||||||
|
await self.sync_client.indices.refresh(self.index)
|
||||||
|
self.log.info("indexed %i/%i claims", success, cnt)
|
||||||
|
finally:
|
||||||
|
if not finished:
|
||||||
|
await producer.aclose()
|
||||||
|
self.shutdown_event.set()
|
||||||
|
|
|
@ -89,37 +89,58 @@ class TestUsagePayment(CommandTestCase):
|
||||||
|
|
||||||
class TestESSync(CommandTestCase):
|
class TestESSync(CommandTestCase):
|
||||||
async def test_es_sync_utility(self):
|
async def test_es_sync_utility(self):
|
||||||
|
es_writer = self.conductor.spv_node.es_writer
|
||||||
|
server_search_client = self.conductor.spv_node.server.session_manager.search_index
|
||||||
|
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
await self.stream_create(f"stream{i}", bid='0.001')
|
await self.stream_create(f"stream{i}", bid='0.001')
|
||||||
await self.generate(1)
|
await self.generate(1)
|
||||||
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
db = self.conductor.spv_node.server.db
|
|
||||||
env = self.conductor.spv_node.server.env
|
|
||||||
|
|
||||||
await db.search_index.delete_index()
|
|
||||||
db.search_index.clear_caches()
|
|
||||||
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
|
|
||||||
await db.search_index.stop()
|
|
||||||
|
|
||||||
async def resync():
|
|
||||||
await db.search_index.start()
|
|
||||||
db.search_index.clear_caches()
|
|
||||||
await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True)
|
|
||||||
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
|
||||||
|
|
||||||
|
# delete the index and verify nothing is returned by claim search
|
||||||
|
await es_writer.delete_index()
|
||||||
|
server_search_client.clear_caches()
|
||||||
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
|
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
|
||||||
|
|
||||||
await resync()
|
# reindex, 10 claims should be returned
|
||||||
|
await es_writer.reindex()
|
||||||
# this time we will test a migration from unversioned to v1
|
|
||||||
await db.search_index.sync_client.indices.delete_template(db.search_index.index)
|
|
||||||
await db.search_index.stop()
|
|
||||||
|
|
||||||
await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True)
|
|
||||||
await db.search_index.start()
|
|
||||||
|
|
||||||
await resync()
|
|
||||||
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
|
server_search_client.clear_caches()
|
||||||
|
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
|
|
||||||
|
# reindex again, this should not appear to do anything but will delete and reinsert the same 10 claims
|
||||||
|
await es_writer.reindex()
|
||||||
|
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
|
server_search_client.clear_caches()
|
||||||
|
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
|
|
||||||
|
# delete the index again and stop the writer, upon starting it the writer should reindex automatically
|
||||||
|
await es_writer.stop(delete_index=True)
|
||||||
|
server_search_client.clear_caches()
|
||||||
|
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
|
||||||
|
|
||||||
|
await es_writer.start(reindex=True)
|
||||||
|
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
|
|
||||||
|
# stop the es writer and advance the chain by 1, adding a new claim. upon resuming the es writer, it should
|
||||||
|
# add the new claim
|
||||||
|
await es_writer.stop()
|
||||||
|
await self.stream_create(f"stream11", bid='0.001', confirm=False)
|
||||||
|
generate_block_task = asyncio.create_task(self.generate(1))
|
||||||
|
await es_writer.start()
|
||||||
|
await generate_block_task
|
||||||
|
self.assertEqual(11, len(await self.claim_search(order_by=['height'])))
|
||||||
|
|
||||||
|
|
||||||
|
# # this time we will test a migration from unversioned to v1
|
||||||
|
# await db.search_index.sync_client.indices.delete_template(db.search_index.index)
|
||||||
|
# await db.search_index.stop()
|
||||||
|
#
|
||||||
|
# await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True)
|
||||||
|
# await db.search_index.start()
|
||||||
|
#
|
||||||
|
# await es_writer.reindex()
|
||||||
|
# self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
|
|
||||||
|
|
||||||
class TestHubDiscovery(CommandTestCase):
|
class TestHubDiscovery(CommandTestCase):
|
||||||
|
|
Loading…
Add table
Reference in a new issue