diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 66cb6af02..fa25006a4 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -367,6 +367,7 @@ class BlockProcessor: await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, self.db.filtered_streams, self.db.filtered_channels) await self.db.search_index.update_trending_score(self.activation_info_to_send_es) + await self._es_caught_up() self.db.search_index.clear_caches() self.touched_claims_to_send_es.clear() self.removed_claims_to_send_es.clear() @@ -1620,6 +1621,7 @@ class BlockProcessor: else: self.tx_count = self.db.tx_counts[-1] self.height -= 1 + # self.touched can include other addresses which is # harmless, but remove None. self.touched_hashXs.discard(None) @@ -1649,8 +1651,15 @@ class BlockProcessor: self.db.last_flush = now self.db.last_flush_tx_count = self.db.fs_tx_count - await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1) + def rollback(): + self.db.prefix_db.rollback(self.height + 1) + self.db.es_sync_height = self.height + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + + await self.run_in_thread_with_lock(rollback) self.clear_after_advance_or_reorg() + self.db.assert_db_state() elapsed = self.db.last_flush - start_time self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. ' @@ -1713,6 +1722,17 @@ class BlockProcessor: self.logger.exception("error while processing txs") raise + async def _es_caught_up(self): + self.db.es_sync_height = self.height + + def flush(): + assert len(self.db.prefix_db._op_stack) == 0 + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.db.assert_db_state() + + await self.run_in_thread_with_lock(flush) + async def _first_caught_up(self): self.logger.info(f'caught up to height {self.height}') # Flush everything but with first_sync->False state. diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index af80aeffa..74a3d092a 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -10,9 +10,6 @@ def get_argument_parser(): prog="lbry-hub" ) Env.contribute_to_arg_parser(parser) - sub = parser.add_subparsers(metavar='COMMAND') - start = sub.add_parser('start', help='Start LBRY Network interface.') - return parser diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 86614ecd6..35941b61a 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -1,27 +1,28 @@ +import os import argparse import asyncio import logging from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_streaming_bulk from lbry.wallet.server.env import Env -from lbry.wallet.server.coin import LBC from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS -async def get_recent_claims(blocks: int, index_name='claims', db=None): - env = Env(LBC) +async def get_recent_claims(env, index_name='claims', db=None): need_open = db is None db = db or LevelDB(env) - if need_open: - await db.open_dbs() try: + if need_open: + await db.open_dbs() + db_state = db.prefix_db.db_state.get() + if db_state.es_sync_height == db_state.height: + return cnt = 0 - state = db.prefix_db.db_state.get() touched_claims = set() deleted_claims = set() - for height in range(state.height - blocks + 1, state.height + 1): + for height in range(db_state.es_sync_height, db_state.height + 1): touched_or_deleted = db.prefix_db.touched_or_deleted.get(height) touched_claims.update(touched_or_deleted.touched_claims) deleted_claims.update(touched_or_deleted.deleted_claims) @@ -48,14 +49,19 @@ async def get_recent_claims(blocks: int, index_name='claims', db=None): logging.warning("could not sync claim %s", touched.hex()) if cnt % 10000 == 0: logging.info("%i claims sent to ES", cnt) + + db.es_sync_height = db.db_height + db.write_db_state() + db.prefix_db.unsafe_commit() + db.assert_db_state() + logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims)) finally: if need_open: db.close() -async def get_all_claims(index_name='claims', db=None): - env = Env(LBC) +async def get_all_claims(env, index_name='claims', db=None): need_open = db is None db = db or LevelDB(env) if need_open: @@ -79,34 +85,26 @@ async def get_all_claims(index_name='claims', db=None): db.close() -async def make_es_index(index=None): - env = Env(LBC) - if index is None: - index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) - +async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'): + index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port) + logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) try: - return await index.start() + created = await index.start() except IndexVersionMismatch as err: logging.info( "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version ) await index.delete_index() await index.stop() - return await index.start() + created = await index.start() finally: index.stop() - -async def run_sync(index_name='claims', db=None, clients=32, blocks=0): - env = Env(LBC) - logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) - if blocks > 0: - blocks = min(blocks, 200) - logging.info("Resyncing last %i blocks", blocks) - claim_generator = get_recent_claims(blocks, index_name=index_name, db=db) + if force or created: + claim_generator = get_all_claims(env, index_name=index_name, db=db) else: - claim_generator = get_all_claims(index_name=index_name, db=db) + claim_generator = get_recent_claims(env, index_name=index_name, db=db) try: async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False): if not ok: @@ -123,17 +121,14 @@ def run_elastic_sync(): logging.info('lbry.server starting') parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") - # parser.add_argument("db_path", type=str) parser.add_argument("-c", "--clients", type=int, default=32) - parser.add_argument("-b", "--blocks", type=int, default=0) parser.add_argument("-f", "--force", default=False, action='store_true') + Env.contribute_to_arg_parser(parser) args = parser.parse_args() + env = Env.from_arg_parser(args) - # if not args.force and not os.path.exists(args.db_path): - # logging.info("DB path doesnt exist") - # return - - if not args.force and not asyncio.run(make_es_index()): - logging.info("ES is already initialized") + if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')): + logging.info("DB path doesnt exist, nothing to sync to ES") return - asyncio.run(run_sync(clients=args.clients, blocks=args.blocks)) + + asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force)) diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index d71b941b4..c264016fb 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -534,6 +534,7 @@ class DBState(typing.NamedTuple): hist_flush_count: int comp_flush_count: int comp_cursor: int + es_sync_height: int class ActiveAmountPrefixRow(PrefixRow): @@ -1521,7 +1522,7 @@ class SupportAmountPrefixRow(PrefixRow): class DBStatePrefixRow(PrefixRow): prefix = DB_PREFIXES.db_state.value - value_struct = struct.Struct(b'>32sLL32sLLBBlll') + value_struct = struct.Struct(b'>32sLL32sLLBBlllL') key_struct = struct.Struct(b'') key_part_lambdas = [ @@ -1539,15 +1540,19 @@ class DBStatePrefixRow(PrefixRow): @classmethod def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int) -> bytes: + comp_cursor: int, es_sync_height: int) -> bytes: return super().pack_value( genesis, height, tx_count, tip, utxo_flush_count, wall_time, 1 if first_sync else 0, db_version, hist_flush_count, - comp_flush_count, comp_cursor + comp_flush_count, comp_cursor, es_sync_height ) @classmethod def unpack_value(cls, data: bytes) -> DBState: + if len(data) == 94: + # TODO: delete this after making a new snapshot - 10/20/21 + # migrate in the es_sync_height if it doesnt exist + data += data[32:36] return DBState(*super().unpack_value(data)) @classmethod diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 099c2b48e..e59bbcdf3 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -121,14 +121,14 @@ class RevertableOpStack: elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: # there is a value and we're not deleting it in this op # check that a delete for the stored value is in the stack - raise OpStackIntegrity(f"delete {op}") + raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}") elif op.is_delete and not has_stored_val: raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") elif op.is_delete and stored_val != op.value: raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") except OpStackIntegrity as err: if op.key[:1] in self._unsafe_prefixes: - log.error(f"skipping over integrity error: {err}") + log.debug(f"skipping over integrity error: {err}") else: raise err self._items[op.key].append(op) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 13956d525..a109abf76 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -289,9 +289,11 @@ class Env: @classmethod def contribute_to_arg_parser(cls, parser): - parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb') + parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb', + default=cls.default('DB_DIRECTORY', None)) parser.add_argument('--daemon_url', - help='URL for rpc from lbrycrd, :@') + help='URL for rpc from lbrycrd, :@', + default=cls.default('DAEMON_URL', None)) parser.add_argument('--db_max_open_files', type=int, default=512, help='number of files leveldb can have open at a time') parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 24a742a24..dddf3f1fb 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -33,7 +33,7 @@ from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE -from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue +from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow from lbry.wallet.transaction import OutputScript from lbry.schema.claim import Claim, guess_stream_type from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger @@ -87,6 +87,8 @@ class LevelDB: self.hist_comp_flush_count = -1 self.hist_comp_cursor = -1 + self.es_sync_height = 0 + # blocking/filtering dicts blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ') filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ') @@ -827,7 +829,8 @@ class LevelDB: self.prefix_db = HubDB( os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB, - reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files + reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files, + unsafe_prefixes={DBStatePrefixRow.prefix} ) self.logger.info(f'opened db: lbry-leveldb') @@ -1059,7 +1062,8 @@ class LevelDB: self.prefix_db.db_state.stage_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version, - self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor + self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, + self.es_sync_height ) ) @@ -1101,11 +1105,12 @@ class LevelDB: def assert_db_state(self): state = self.prefix_db.db_state.get() - assert self.db_version == state.db_version - assert self.db_height == state.height - assert self.db_tx_count == state.tx_count - assert self.db_tip == state.tip - assert self.first_sync == state.first_sync + assert self.db_version == state.db_version, f"{self.db_version} != {state.db_version}" + assert self.db_height == state.height, f"{self.db_height} != {state.height}" + assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}" + assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}" + assert self.first_sync == state.first_sync, f"{self.first_sync} != {state.first_sync}" + assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}" async def all_utxos(self, hashX): """Return all UTXOs for an address sorted in no particular order.""" diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 4f7930c05..139a0bf0b 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -5,7 +5,7 @@ import lbry.wallet from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.wallet.network import ClientSession from lbry.wallet.rpc import RPCError -from lbry.wallet.server.db.elasticsearch.sync import run_sync, make_es_index +from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync from lbry.wallet.server.session import LBRYElectrumX from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.wallet.orchstr8.node import SPVNode @@ -95,16 +95,17 @@ class TestESSync(CommandTestCase): await self.generate(1) self.assertEqual(10, len(await self.claim_search(order_by=['height']))) db = self.conductor.spv_node.server.db + env = self.conductor.spv_node.server.env + await db.search_index.delete_index() db.search_index.clear_caches() self.assertEqual(0, len(await self.claim_search(order_by=['height']))) await db.search_index.stop() - self.assertTrue(await make_es_index(db.search_index)) async def resync(): await db.search_index.start() db.search_index.clear_caches() - await run_sync(index_name=db.search_index.index, db=db) + await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) self.assertEqual(10, len(await self.claim_search(order_by=['height']))) self.assertEqual(0, len(await self.claim_search(order_by=['height']))) @@ -114,9 +115,12 @@ class TestESSync(CommandTestCase): # this time we will test a migration from unversioned to v1 await db.search_index.sync_client.indices.delete_template(db.search_index.index) await db.search_index.stop() - self.assertTrue(await make_es_index(db.search_index)) + + await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) await db.search_index.start() + await resync() + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) class TestHubDiscovery(CommandTestCase):