diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index 0589b3edb..ceae4b125 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -436,7 +436,8 @@ class RPCSession(SessionBase): except CancelledError: raise except Exception: - self.logger.exception(f'exception handling {request}') + reqstr = str(request) + self.logger.exception(f'exception handling {reqstr[:16_000]}') result = RPCError(JSONRPC.INTERNAL_ERROR, 'internal server error') if isinstance(request, Request): diff --git a/lbry/wallet/server/history.py b/lbry/wallet/server/history.py index 72952dc53..f3a7fbf17 100644 --- a/lbry/wallet/server/history.py +++ b/lbry/wallet/server/history.py @@ -16,13 +16,13 @@ from collections import defaultdict from functools import partial from lbry.wallet.server import util -from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from +from lbry.wallet.server.util import pack_be_uint32, unpack_be_uint32_from, unpack_be_uint16_from from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN class History: - DB_VERSIONS = [0] + DB_VERSIONS = [0, 1] def __init__(self): self.logger = util.class_logger(__name__, self.__class__.__name__) @@ -32,9 +32,34 @@ class History: self.unflushed_count = 0 self.db = None + @property + def needs_migration(self): + return self.db_version != max(self.DB_VERSIONS) + + def migrate(self): + # 0 -> 1: flush_count from 16 to 32 bits + self.logger.warning("HISTORY MIGRATION IN PROGRESS. Please avoid shutting down before it finishes.") + with self.db.write_batch() as batch: + for key, value in self.db.iterator(prefix=b''): + if len(key) != 13: + continue + flush_id, = unpack_be_uint16_from(key[-2:]) + new_key = key[:-2] + pack_be_uint32(flush_id) + batch.put(new_key, value) + self.logger.warning("history migration: new keys added, removing old ones.") + for key, value in self.db.iterator(prefix=b''): + if len(key) == 13: + batch.delete(key) + self.logger.warning("history migration: writing new state.") + self.db_version = 1 + self.write_state(batch) + self.logger.warning("history migration: done.") + def open_db(self, db_class, for_sync, utxo_flush_count, compacting): self.db = db_class('hist', for_sync) self.read_state() + if self.needs_migration: + self.migrate() self.clear_excess(utxo_flush_count) # An incomplete compaction needs to be cancelled otherwise # restarting it will corrupt the history @@ -81,7 +106,7 @@ class History: keys = [] for key, hist in self.db.iterator(prefix=b''): - flush_id, = unpack_be_uint16_from(key[-2:]) + flush_id, = unpack_be_uint32_from(key[-4:]) if flush_id > utxo_flush_count: keys.append(key) @@ -126,7 +151,7 @@ class History: def flush(self): start_time = time.time() self.flush_count += 1 - flush_id = pack_be_uint16(self.flush_count) + flush_id = pack_be_uint32(self.flush_count) unflushed = self.unflushed with self.db.write_batch() as batch: @@ -250,7 +275,7 @@ class History: write_size = 0 keys_to_delete.update(hist_map) for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - key = hashX + pack_be_uint16(n) + key = hashX + pack_be_uint32(n) if hist_map.get(key) == chunk: keys_to_delete.remove(key) else: @@ -301,8 +326,8 @@ class History: # Loop over 2-byte prefixes cursor = self.comp_cursor - while write_size < limit and cursor < 65536: - prefix = pack_be_uint16(cursor) + while write_size < limit and cursor < (1 << 32): + prefix = pack_be_uint32(cursor) write_size += self._compact_prefix(prefix, write_items, keys_to_delete) cursor += 1 diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 67ff86a48..544e04f51 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -189,3 +189,13 @@ class TestHubDiscovery(CommandTestCase): ('127.0.0.1', 9988): {} } ) + + +class TestStressFlush(CommandTestCase): + async def test_flush_over_66_thousand(self): + history = self.conductor.spv_node.server.db.history + history.flush_count = 66_000 + history.flush() + self.assertEqual(history.flush_count, 66_001) + await self.generate(1) + self.assertEqual(history.flush_count, 66_002) diff --git a/tests/unit/wallet/server/test_migration.py b/tests/unit/wallet/server/test_migration.py new file mode 100644 index 000000000..9aa8cccee --- /dev/null +++ b/tests/unit/wallet/server/test_migration.py @@ -0,0 +1,57 @@ +import unittest +from shutil import rmtree +from tempfile import mkdtemp + +from lbry.wallet.server.history import History +from lbry.wallet.server.storage import LevelDB + + +# dumped from a real history database. Aside from the state, all records are : +STATE_RECORD = (b'state\x00\x00', b"{'flush_count': 21497, 'comp_flush_count': -1, 'comp_cursor': -1, 'db_version': 0}") +UNMIGRATED_RECORDS = { + '00538b2cbe4a5f1be2dc320241': 'f5ed500142ee5001', + '00538b48def1904014880501f2': 'b9a52a01baa52a01', + '00538cdcf989b74de32c5100ca': 'c973870078748700', + '00538d42d5df44603474284ae1': 'f5d9d802', + '00538d42d5df44603474284ae2': '75dad802', + '00538ebc879dac6ddbee9e0029': '3ca42f0042a42f00', + '00538ed1d391327208748200bc': '804e7d00af4e7d00', + '00538f3de41d9e33affa0300c2': '7de8810086e88100', + '00539007f87792d98422c505a5': '8c5a7202445b7202', + '0053902cf52ee9682d633b0575': 'eb0f64026c106402', + '005390e05674571551632205a2': 'a13d7102e13d7102', + '0053914ef25a9ceed927330584': '78096902960b6902', + '005391768113f69548f37a01b1': 'a5b90b0114ba0b01', + '005391a289812669e5b44c02c2': '33da8a016cdc8a01', +} + + +class TestHistoryDBMigration(unittest.TestCase): + def test_migrate_flush_count_from_16_to_32_bits(self): + self.history = History() + tmpdir = mkdtemp() + self.addCleanup(lambda: rmtree(tmpdir)) + LevelDB.import_module() + db = LevelDB(tmpdir, 'hist', True) + with db.write_batch() as batch: + for key, value in UNMIGRATED_RECORDS.items(): + batch.put(bytes.fromhex(key), bytes.fromhex(value)) + batch.put(*STATE_RECORD) + self.history.db = db + self.history.read_state() + self.assertEqual(21497, self.history.flush_count) + self.assertEqual(0, self.history.db_version) + self.assertTrue(self.history.needs_migration) + self.history.migrate() + self.assertFalse(self.history.needs_migration) + self.assertEqual(1, self.history.db_version) + for idx, (key, value) in enumerate(sorted(db.iterator())): + if key == b'state\x00\x00': + continue + key, counter = key[:-4], key[-4:] + expected_value = UNMIGRATED_RECORDS[key.hex() + counter.hex()[-4:]] + self.assertEqual(value.hex(), expected_value) + + +if __name__ == '__main__': + unittest.main()