update sync script to handle ES falling behind leveldb on shutdown

This commit is contained in:
Jack Robison 2021-10-20 19:34:55 -04:00 committed by Victor Shyba
parent 9aafb7a743
commit fd2ab47a16
8 changed files with 85 additions and 57 deletions

View file

@ -367,6 +367,7 @@ class BlockProcessor:
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
self.db.filtered_streams, self.db.filtered_channels) self.db.filtered_streams, self.db.filtered_channels)
await self.db.search_index.update_trending_score(self.activation_info_to_send_es) await self.db.search_index.update_trending_score(self.activation_info_to_send_es)
await self._es_caught_up()
self.db.search_index.clear_caches() self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear() self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear() self.removed_claims_to_send_es.clear()
@ -1620,6 +1621,7 @@ class BlockProcessor:
else: else:
self.tx_count = self.db.tx_counts[-1] self.tx_count = self.db.tx_counts[-1]
self.height -= 1 self.height -= 1
# self.touched can include other addresses which is # self.touched can include other addresses which is
# harmless, but remove None. # harmless, but remove None.
self.touched_hashXs.discard(None) self.touched_hashXs.discard(None)
@ -1649,8 +1651,15 @@ class BlockProcessor:
self.db.last_flush = now self.db.last_flush = now
self.db.last_flush_tx_count = self.db.fs_tx_count self.db.last_flush_tx_count = self.db.fs_tx_count
await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1) def rollback():
self.db.prefix_db.rollback(self.height + 1)
self.db.es_sync_height = self.height
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
await self.run_in_thread_with_lock(rollback)
self.clear_after_advance_or_reorg() self.clear_after_advance_or_reorg()
self.db.assert_db_state()
elapsed = self.db.last_flush - start_time elapsed = self.db.last_flush - start_time
self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. ' self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
@ -1713,6 +1722,17 @@ class BlockProcessor:
self.logger.exception("error while processing txs") self.logger.exception("error while processing txs")
raise raise
async def _es_caught_up(self):
self.db.es_sync_height = self.height
def flush():
assert len(self.db.prefix_db._op_stack) == 0
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
self.db.assert_db_state()
await self.run_in_thread_with_lock(flush)
async def _first_caught_up(self): async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}') self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state. # Flush everything but with first_sync->False state.

View file

@ -10,9 +10,6 @@ def get_argument_parser():
prog="lbry-hub" prog="lbry-hub"
) )
Env.contribute_to_arg_parser(parser) Env.contribute_to_arg_parser(parser)
sub = parser.add_subparsers(metavar='COMMAND')
start = sub.add_parser('start', help='Start LBRY Network interface.')
return parser return parser

View file

@ -1,27 +1,28 @@
import os
import argparse import argparse
import asyncio import asyncio
import logging import logging
from elasticsearch import AsyncElasticsearch from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk from elasticsearch.helpers import async_streaming_bulk
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS
async def get_recent_claims(blocks: int, index_name='claims', db=None): async def get_recent_claims(env, index_name='claims', db=None):
env = Env(LBC)
need_open = db is None need_open = db is None
db = db or LevelDB(env) db = db or LevelDB(env)
try:
if need_open: if need_open:
await db.open_dbs() await db.open_dbs()
try: db_state = db.prefix_db.db_state.get()
if db_state.es_sync_height == db_state.height:
return
cnt = 0 cnt = 0
state = db.prefix_db.db_state.get()
touched_claims = set() touched_claims = set()
deleted_claims = set() deleted_claims = set()
for height in range(state.height - blocks + 1, state.height + 1): for height in range(db_state.es_sync_height, db_state.height + 1):
touched_or_deleted = db.prefix_db.touched_or_deleted.get(height) touched_or_deleted = db.prefix_db.touched_or_deleted.get(height)
touched_claims.update(touched_or_deleted.touched_claims) touched_claims.update(touched_or_deleted.touched_claims)
deleted_claims.update(touched_or_deleted.deleted_claims) deleted_claims.update(touched_or_deleted.deleted_claims)
@ -48,14 +49,19 @@ async def get_recent_claims(blocks: int, index_name='claims', db=None):
logging.warning("could not sync claim %s", touched.hex()) logging.warning("could not sync claim %s", touched.hex())
if cnt % 10000 == 0: if cnt % 10000 == 0:
logging.info("%i claims sent to ES", cnt) logging.info("%i claims sent to ES", cnt)
db.es_sync_height = db.db_height
db.write_db_state()
db.prefix_db.unsafe_commit()
db.assert_db_state()
logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims)) logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims))
finally: finally:
if need_open: if need_open:
db.close() db.close()
async def get_all_claims(index_name='claims', db=None): async def get_all_claims(env, index_name='claims', db=None):
env = Env(LBC)
need_open = db is None need_open = db is None
db = db or LevelDB(env) db = db or LevelDB(env)
if need_open: if need_open:
@ -79,34 +85,26 @@ async def get_all_claims(index_name='claims', db=None):
db.close() db.close()
async def make_es_index(index=None): async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'):
env = Env(LBC) index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port)
if index is None: logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
try: try:
return await index.start() created = await index.start()
except IndexVersionMismatch as err: except IndexVersionMismatch as err:
logging.info( logging.info(
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
) )
await index.delete_index() await index.delete_index()
await index.stop() await index.stop()
return await index.start() created = await index.start()
finally: finally:
index.stop() index.stop()
async def run_sync(index_name='claims', db=None, clients=32, blocks=0):
env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
if blocks > 0: if force or created:
blocks = min(blocks, 200) claim_generator = get_all_claims(env, index_name=index_name, db=db)
logging.info("Resyncing last %i blocks", blocks)
claim_generator = get_recent_claims(blocks, index_name=index_name, db=db)
else: else:
claim_generator = get_all_claims(index_name=index_name, db=db) claim_generator = get_recent_claims(env, index_name=index_name, db=db)
try: try:
async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False): async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False):
if not ok: if not ok:
@ -123,17 +121,14 @@ def run_elastic_sync():
logging.info('lbry.server starting') logging.info('lbry.server starting')
parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync")
# parser.add_argument("db_path", type=str)
parser.add_argument("-c", "--clients", type=int, default=32) parser.add_argument("-c", "--clients", type=int, default=32)
parser.add_argument("-b", "--blocks", type=int, default=0)
parser.add_argument("-f", "--force", default=False, action='store_true') parser.add_argument("-f", "--force", default=False, action='store_true')
Env.contribute_to_arg_parser(parser)
args = parser.parse_args() args = parser.parse_args()
env = Env.from_arg_parser(args)
# if not args.force and not os.path.exists(args.db_path): if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')):
# logging.info("DB path doesnt exist") logging.info("DB path doesnt exist, nothing to sync to ES")
# return
if not args.force and not asyncio.run(make_es_index()):
logging.info("ES is already initialized")
return return
asyncio.run(run_sync(clients=args.clients, blocks=args.blocks))
asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force))

View file

@ -534,6 +534,7 @@ class DBState(typing.NamedTuple):
hist_flush_count: int hist_flush_count: int
comp_flush_count: int comp_flush_count: int
comp_cursor: int comp_cursor: int
es_sync_height: int
class ActiveAmountPrefixRow(PrefixRow): class ActiveAmountPrefixRow(PrefixRow):
@ -1521,7 +1522,7 @@ class SupportAmountPrefixRow(PrefixRow):
class DBStatePrefixRow(PrefixRow): class DBStatePrefixRow(PrefixRow):
prefix = DB_PREFIXES.db_state.value prefix = DB_PREFIXES.db_state.value
value_struct = struct.Struct(b'>32sLL32sLLBBlll') value_struct = struct.Struct(b'>32sLL32sLLBBlllL')
key_struct = struct.Struct(b'') key_struct = struct.Struct(b'')
key_part_lambdas = [ key_part_lambdas = [
@ -1539,15 +1540,19 @@ class DBStatePrefixRow(PrefixRow):
@classmethod @classmethod
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int) -> bytes: comp_cursor: int, es_sync_height: int) -> bytes:
return super().pack_value( return super().pack_value(
genesis, height, tx_count, tip, utxo_flush_count, genesis, height, tx_count, tip, utxo_flush_count,
wall_time, 1 if first_sync else 0, db_version, hist_flush_count, wall_time, 1 if first_sync else 0, db_version, hist_flush_count,
comp_flush_count, comp_cursor comp_flush_count, comp_cursor, es_sync_height
) )
@classmethod @classmethod
def unpack_value(cls, data: bytes) -> DBState: def unpack_value(cls, data: bytes) -> DBState:
if len(data) == 94:
# TODO: delete this after making a new snapshot - 10/20/21
# migrate in the es_sync_height if it doesnt exist
data += data[32:36]
return DBState(*super().unpack_value(data)) return DBState(*super().unpack_value(data))
@classmethod @classmethod

View file

@ -121,14 +121,14 @@ class RevertableOpStack:
elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
# there is a value and we're not deleting it in this op # there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack # check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"delete {op}") raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}")
elif op.is_delete and not has_stored_val: elif op.is_delete and not has_stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif op.is_delete and stored_val != op.value: elif op.is_delete and stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
except OpStackIntegrity as err: except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes: if op.key[:1] in self._unsafe_prefixes:
log.error(f"skipping over integrity error: {err}") log.debug(f"skipping over integrity error: {err}")
else: else:
raise err raise err
self._items[op.key].append(op) self._items[op.key].append(op)

View file

@ -289,9 +289,11 @@ class Env:
@classmethod @classmethod
def contribute_to_arg_parser(cls, parser): def contribute_to_arg_parser(cls, parser):
parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb') parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb',
default=cls.default('DB_DIRECTORY', None))
parser.add_argument('--daemon_url', parser.add_argument('--daemon_url',
help='URL for rpc from lbrycrd, <rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>') help='URL for rpc from lbrycrd, <rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>',
default=cls.default('DAEMON_URL', None))
parser.add_argument('--db_max_open_files', type=int, default=512, parser.add_argument('--db_max_open_files', type=int, default=512,
help='number of files leveldb can have open at a time') help='number of files leveldb can have open at a time')
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),

View file

@ -33,7 +33,7 @@ from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow
from lbry.wallet.transaction import OutputScript from lbry.wallet.transaction import OutputScript
from lbry.schema.claim import Claim, guess_stream_type from lbry.schema.claim import Claim, guess_stream_type
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
@ -87,6 +87,8 @@ class LevelDB:
self.hist_comp_flush_count = -1 self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1 self.hist_comp_cursor = -1
self.es_sync_height = 0
# blocking/filtering dicts # blocking/filtering dicts
blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ') blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ')
filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ') filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ')
@ -827,7 +829,8 @@ class LevelDB:
self.prefix_db = HubDB( self.prefix_db = HubDB(
os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB, os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB,
reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix}
) )
self.logger.info(f'opened db: lbry-leveldb') self.logger.info(f'opened db: lbry-leveldb')
@ -1059,7 +1062,8 @@ class LevelDB:
self.prefix_db.db_state.stage_put((), ( self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version, self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height
) )
) )
@ -1101,11 +1105,12 @@ class LevelDB:
def assert_db_state(self): def assert_db_state(self):
state = self.prefix_db.db_state.get() state = self.prefix_db.db_state.get()
assert self.db_version == state.db_version assert self.db_version == state.db_version, f"{self.db_version} != {state.db_version}"
assert self.db_height == state.height assert self.db_height == state.height, f"{self.db_height} != {state.height}"
assert self.db_tx_count == state.tx_count assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}"
assert self.db_tip == state.tip assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}"
assert self.first_sync == state.first_sync assert self.first_sync == state.first_sync, f"{self.first_sync} != {state.first_sync}"
assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}"
async def all_utxos(self, hashX): async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order.""" """Return all UTXOs for an address sorted in no particular order."""

View file

@ -5,7 +5,7 @@ import lbry.wallet
from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
from lbry.wallet.rpc import RPCError from lbry.wallet.rpc import RPCError
from lbry.wallet.server.db.elasticsearch.sync import run_sync, make_es_index from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync
from lbry.wallet.server.session import LBRYElectrumX from lbry.wallet.server.session import LBRYElectrumX
from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
@ -95,16 +95,17 @@ class TestESSync(CommandTestCase):
await self.generate(1) await self.generate(1)
self.assertEqual(10, len(await self.claim_search(order_by=['height']))) self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
db = self.conductor.spv_node.server.db db = self.conductor.spv_node.server.db
env = self.conductor.spv_node.server.env
await db.search_index.delete_index() await db.search_index.delete_index()
db.search_index.clear_caches() db.search_index.clear_caches()
self.assertEqual(0, len(await self.claim_search(order_by=['height']))) self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
await db.search_index.stop() await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
async def resync(): async def resync():
await db.search_index.start() await db.search_index.start()
db.search_index.clear_caches() db.search_index.clear_caches()
await run_sync(index_name=db.search_index.index, db=db) await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True)
self.assertEqual(10, len(await self.claim_search(order_by=['height']))) self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
self.assertEqual(0, len(await self.claim_search(order_by=['height']))) self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
@ -114,9 +115,12 @@ class TestESSync(CommandTestCase):
# this time we will test a migration from unversioned to v1 # this time we will test a migration from unversioned to v1
await db.search_index.sync_client.indices.delete_template(db.search_index.index) await db.search_index.sync_client.indices.delete_template(db.search_index.index)
await db.search_index.stop() await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True)
await db.search_index.start() await db.search_index.start()
await resync() await resync()
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
class TestHubDiscovery(CommandTestCase): class TestHubDiscovery(CommandTestCase):