diff --git a/lbry/wallet/server/history.py b/lbry/wallet/server/history.py index 72952dc53..c3ad1131e 100644 --- a/lbry/wallet/server/history.py +++ b/lbry/wallet/server/history.py @@ -16,7 +16,7 @@ from collections import defaultdict from functools import partial from lbry.wallet.server import util -from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from +from lbry.wallet.server.util import pack_be_uint32, unpack_be_uint32_from from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN @@ -81,7 +81,7 @@ class History: keys = [] for key, hist in self.db.iterator(prefix=b''): - flush_id, = unpack_be_uint16_from(key[-2:]) + flush_id, = unpack_be_uint32_from(key[-2:]) if flush_id > utxo_flush_count: keys.append(key) @@ -126,7 +126,7 @@ class History: def flush(self): start_time = time.time() self.flush_count += 1 - flush_id = pack_be_uint16(self.flush_count) + flush_id = pack_be_uint32(self.flush_count) unflushed = self.unflushed with self.db.write_batch() as batch: @@ -250,7 +250,7 @@ class History: write_size = 0 keys_to_delete.update(hist_map) for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - key = hashX + pack_be_uint16(n) + key = hashX + pack_be_uint32(n) if hist_map.get(key) == chunk: keys_to_delete.remove(key) else: @@ -301,8 +301,8 @@ class History: # Loop over 2-byte prefixes cursor = self.comp_cursor - while write_size < limit and cursor < 65536: - prefix = pack_be_uint16(cursor) + while write_size < limit and cursor < (1 << 32): + prefix = pack_be_uint32(cursor) write_size += self._compact_prefix(prefix, write_items, keys_to_delete) cursor += 1 diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 67ff86a48..544e04f51 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -189,3 +189,13 @@ class TestHubDiscovery(CommandTestCase): ('127.0.0.1', 9988): {} } ) + + +class TestStressFlush(CommandTestCase): + async def test_flush_over_66_thousand(self): + history = self.conductor.spv_node.server.db.history + history.flush_count = 66_000 + history.flush() + self.assertEqual(history.flush_count, 66_001) + await self.generate(1) + self.assertEqual(history.flush_count, 66_002)