Merge pull request #3342 from lbryio/bug_flush_counter
[resync required] Avoid flush counter overflows on long running hubs by increasing it to 32 bits
This commit is contained in:
commit
f05dc46432
4 changed files with 101 additions and 8 deletions
|
@ -436,7 +436,8 @@ class RPCSession(SessionBase):
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
raise
|
raise
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception(f'exception handling {request}')
|
reqstr = str(request)
|
||||||
|
self.logger.exception(f'exception handling {reqstr[:16_000]}')
|
||||||
result = RPCError(JSONRPC.INTERNAL_ERROR,
|
result = RPCError(JSONRPC.INTERNAL_ERROR,
|
||||||
'internal server error')
|
'internal server error')
|
||||||
if isinstance(request, Request):
|
if isinstance(request, Request):
|
||||||
|
|
|
@ -16,13 +16,13 @@ from collections import defaultdict
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from lbry.wallet.server import util
|
from lbry.wallet.server import util
|
||||||
from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
|
from lbry.wallet.server.util import pack_be_uint32, unpack_be_uint32_from, unpack_be_uint16_from
|
||||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||||
|
|
||||||
|
|
||||||
class History:
|
class History:
|
||||||
|
|
||||||
DB_VERSIONS = [0]
|
DB_VERSIONS = [0, 1]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||||
|
@ -32,9 +32,34 @@ class History:
|
||||||
self.unflushed_count = 0
|
self.unflushed_count = 0
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def needs_migration(self):
|
||||||
|
return self.db_version != max(self.DB_VERSIONS)
|
||||||
|
|
||||||
|
def migrate(self):
|
||||||
|
# 0 -> 1: flush_count from 16 to 32 bits
|
||||||
|
self.logger.warning("HISTORY MIGRATION IN PROGRESS. Please avoid shutting down before it finishes.")
|
||||||
|
with self.db.write_batch() as batch:
|
||||||
|
for key, value in self.db.iterator(prefix=b''):
|
||||||
|
if len(key) != 13:
|
||||||
|
continue
|
||||||
|
flush_id, = unpack_be_uint16_from(key[-2:])
|
||||||
|
new_key = key[:-2] + pack_be_uint32(flush_id)
|
||||||
|
batch.put(new_key, value)
|
||||||
|
self.logger.warning("history migration: new keys added, removing old ones.")
|
||||||
|
for key, value in self.db.iterator(prefix=b''):
|
||||||
|
if len(key) == 13:
|
||||||
|
batch.delete(key)
|
||||||
|
self.logger.warning("history migration: writing new state.")
|
||||||
|
self.db_version = 1
|
||||||
|
self.write_state(batch)
|
||||||
|
self.logger.warning("history migration: done.")
|
||||||
|
|
||||||
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
|
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
|
||||||
self.db = db_class('hist', for_sync)
|
self.db = db_class('hist', for_sync)
|
||||||
self.read_state()
|
self.read_state()
|
||||||
|
if self.needs_migration:
|
||||||
|
self.migrate()
|
||||||
self.clear_excess(utxo_flush_count)
|
self.clear_excess(utxo_flush_count)
|
||||||
# An incomplete compaction needs to be cancelled otherwise
|
# An incomplete compaction needs to be cancelled otherwise
|
||||||
# restarting it will corrupt the history
|
# restarting it will corrupt the history
|
||||||
|
@ -81,7 +106,7 @@ class History:
|
||||||
|
|
||||||
keys = []
|
keys = []
|
||||||
for key, hist in self.db.iterator(prefix=b''):
|
for key, hist in self.db.iterator(prefix=b''):
|
||||||
flush_id, = unpack_be_uint16_from(key[-2:])
|
flush_id, = unpack_be_uint32_from(key[-4:])
|
||||||
if flush_id > utxo_flush_count:
|
if flush_id > utxo_flush_count:
|
||||||
keys.append(key)
|
keys.append(key)
|
||||||
|
|
||||||
|
@ -126,7 +151,7 @@ class History:
|
||||||
def flush(self):
|
def flush(self):
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
self.flush_count += 1
|
self.flush_count += 1
|
||||||
flush_id = pack_be_uint16(self.flush_count)
|
flush_id = pack_be_uint32(self.flush_count)
|
||||||
unflushed = self.unflushed
|
unflushed = self.unflushed
|
||||||
|
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
|
@ -250,7 +275,7 @@ class History:
|
||||||
write_size = 0
|
write_size = 0
|
||||||
keys_to_delete.update(hist_map)
|
keys_to_delete.update(hist_map)
|
||||||
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
||||||
key = hashX + pack_be_uint16(n)
|
key = hashX + pack_be_uint32(n)
|
||||||
if hist_map.get(key) == chunk:
|
if hist_map.get(key) == chunk:
|
||||||
keys_to_delete.remove(key)
|
keys_to_delete.remove(key)
|
||||||
else:
|
else:
|
||||||
|
@ -301,8 +326,8 @@ class History:
|
||||||
|
|
||||||
# Loop over 2-byte prefixes
|
# Loop over 2-byte prefixes
|
||||||
cursor = self.comp_cursor
|
cursor = self.comp_cursor
|
||||||
while write_size < limit and cursor < 65536:
|
while write_size < limit and cursor < (1 << 32):
|
||||||
prefix = pack_be_uint16(cursor)
|
prefix = pack_be_uint32(cursor)
|
||||||
write_size += self._compact_prefix(prefix, write_items,
|
write_size += self._compact_prefix(prefix, write_items,
|
||||||
keys_to_delete)
|
keys_to_delete)
|
||||||
cursor += 1
|
cursor += 1
|
||||||
|
|
|
@ -189,3 +189,13 @@ class TestHubDiscovery(CommandTestCase):
|
||||||
('127.0.0.1', 9988): {}
|
('127.0.0.1', 9988): {}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestStressFlush(CommandTestCase):
|
||||||
|
async def test_flush_over_66_thousand(self):
|
||||||
|
history = self.conductor.spv_node.server.db.history
|
||||||
|
history.flush_count = 66_000
|
||||||
|
history.flush()
|
||||||
|
self.assertEqual(history.flush_count, 66_001)
|
||||||
|
await self.generate(1)
|
||||||
|
self.assertEqual(history.flush_count, 66_002)
|
||||||
|
|
57
tests/unit/wallet/server/test_migration.py
Normal file
57
tests/unit/wallet/server/test_migration.py
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
import unittest
|
||||||
|
from shutil import rmtree
|
||||||
|
from tempfile import mkdtemp
|
||||||
|
|
||||||
|
from lbry.wallet.server.history import History
|
||||||
|
from lbry.wallet.server.storage import LevelDB
|
||||||
|
|
||||||
|
|
||||||
|
# dumped from a real history database. Aside from the state, all records are <hashX><flush_count>: <value>
|
||||||
|
STATE_RECORD = (b'state\x00\x00', b"{'flush_count': 21497, 'comp_flush_count': -1, 'comp_cursor': -1, 'db_version': 0}")
|
||||||
|
UNMIGRATED_RECORDS = {
|
||||||
|
'00538b2cbe4a5f1be2dc320241': 'f5ed500142ee5001',
|
||||||
|
'00538b48def1904014880501f2': 'b9a52a01baa52a01',
|
||||||
|
'00538cdcf989b74de32c5100ca': 'c973870078748700',
|
||||||
|
'00538d42d5df44603474284ae1': 'f5d9d802',
|
||||||
|
'00538d42d5df44603474284ae2': '75dad802',
|
||||||
|
'00538ebc879dac6ddbee9e0029': '3ca42f0042a42f00',
|
||||||
|
'00538ed1d391327208748200bc': '804e7d00af4e7d00',
|
||||||
|
'00538f3de41d9e33affa0300c2': '7de8810086e88100',
|
||||||
|
'00539007f87792d98422c505a5': '8c5a7202445b7202',
|
||||||
|
'0053902cf52ee9682d633b0575': 'eb0f64026c106402',
|
||||||
|
'005390e05674571551632205a2': 'a13d7102e13d7102',
|
||||||
|
'0053914ef25a9ceed927330584': '78096902960b6902',
|
||||||
|
'005391768113f69548f37a01b1': 'a5b90b0114ba0b01',
|
||||||
|
'005391a289812669e5b44c02c2': '33da8a016cdc8a01',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestHistoryDBMigration(unittest.TestCase):
|
||||||
|
def test_migrate_flush_count_from_16_to_32_bits(self):
|
||||||
|
self.history = History()
|
||||||
|
tmpdir = mkdtemp()
|
||||||
|
self.addCleanup(lambda: rmtree(tmpdir))
|
||||||
|
LevelDB.import_module()
|
||||||
|
db = LevelDB(tmpdir, 'hist', True)
|
||||||
|
with db.write_batch() as batch:
|
||||||
|
for key, value in UNMIGRATED_RECORDS.items():
|
||||||
|
batch.put(bytes.fromhex(key), bytes.fromhex(value))
|
||||||
|
batch.put(*STATE_RECORD)
|
||||||
|
self.history.db = db
|
||||||
|
self.history.read_state()
|
||||||
|
self.assertEqual(21497, self.history.flush_count)
|
||||||
|
self.assertEqual(0, self.history.db_version)
|
||||||
|
self.assertTrue(self.history.needs_migration)
|
||||||
|
self.history.migrate()
|
||||||
|
self.assertFalse(self.history.needs_migration)
|
||||||
|
self.assertEqual(1, self.history.db_version)
|
||||||
|
for idx, (key, value) in enumerate(sorted(db.iterator())):
|
||||||
|
if key == b'state\x00\x00':
|
||||||
|
continue
|
||||||
|
key, counter = key[:-4], key[-4:]
|
||||||
|
expected_value = UNMIGRATED_RECORDS[key.hex() + counter.hex()[-4:]]
|
||||||
|
self.assertEqual(value.hex(), expected_value)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in a new issue