increase flush counter to 32 bits
This commit is contained in:
parent
0cd953a6f3
commit
1d9dbd40ec
2 changed files with 16 additions and 6 deletions
|
@ -16,7 +16,7 @@ from collections import defaultdict
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from lbry.wallet.server import util
|
from lbry.wallet.server import util
|
||||||
from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
|
from lbry.wallet.server.util import pack_be_uint32, unpack_be_uint32_from
|
||||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||||
|
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ class History:
|
||||||
|
|
||||||
keys = []
|
keys = []
|
||||||
for key, hist in self.db.iterator(prefix=b''):
|
for key, hist in self.db.iterator(prefix=b''):
|
||||||
flush_id, = unpack_be_uint16_from(key[-2:])
|
flush_id, = unpack_be_uint32_from(key[-2:])
|
||||||
if flush_id > utxo_flush_count:
|
if flush_id > utxo_flush_count:
|
||||||
keys.append(key)
|
keys.append(key)
|
||||||
|
|
||||||
|
@ -126,7 +126,7 @@ class History:
|
||||||
def flush(self):
|
def flush(self):
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
self.flush_count += 1
|
self.flush_count += 1
|
||||||
flush_id = pack_be_uint16(self.flush_count)
|
flush_id = pack_be_uint32(self.flush_count)
|
||||||
unflushed = self.unflushed
|
unflushed = self.unflushed
|
||||||
|
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
|
@ -250,7 +250,7 @@ class History:
|
||||||
write_size = 0
|
write_size = 0
|
||||||
keys_to_delete.update(hist_map)
|
keys_to_delete.update(hist_map)
|
||||||
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
||||||
key = hashX + pack_be_uint16(n)
|
key = hashX + pack_be_uint32(n)
|
||||||
if hist_map.get(key) == chunk:
|
if hist_map.get(key) == chunk:
|
||||||
keys_to_delete.remove(key)
|
keys_to_delete.remove(key)
|
||||||
else:
|
else:
|
||||||
|
@ -301,8 +301,8 @@ class History:
|
||||||
|
|
||||||
# Loop over 2-byte prefixes
|
# Loop over 2-byte prefixes
|
||||||
cursor = self.comp_cursor
|
cursor = self.comp_cursor
|
||||||
while write_size < limit and cursor < 65536:
|
while write_size < limit and cursor < (1 << 32):
|
||||||
prefix = pack_be_uint16(cursor)
|
prefix = pack_be_uint32(cursor)
|
||||||
write_size += self._compact_prefix(prefix, write_items,
|
write_size += self._compact_prefix(prefix, write_items,
|
||||||
keys_to_delete)
|
keys_to_delete)
|
||||||
cursor += 1
|
cursor += 1
|
||||||
|
|
|
@ -189,3 +189,13 @@ class TestHubDiscovery(CommandTestCase):
|
||||||
('127.0.0.1', 9988): {}
|
('127.0.0.1', 9988): {}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressFlush(CommandTestCase):
|
||||||
|
async def test_flush_over_66_thousand(self):
|
||||||
|
history = self.conductor.spv_node.server.db.history
|
||||||
|
history.flush_count = 66_000
|
||||||
|
history.flush()
|
||||||
|
self.assertEqual(history.flush_count, 66_001)
|
||||||
|
await self.generate(1)
|
||||||
|
self.assertEqual(history.flush_count, 66_002)
|
||||||
|
|
Loading…
Reference in a new issue