diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 8e38aa9a2..7ac468b8b 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -605,7 +605,9 @@ class BlockProcessor: # Key: b'h' + compressed_tx_hash + tx_idx + tx_num # Value: hashX prefix = b'h' + tx_hash[:4] + idx_packed - candidates = dict(self.db.utxo_db.iterator(prefix=prefix)) + candidates = {db_key: hashX for db_key, hashX + in self.db.db.iterator(prefix=prefix)} + for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] if len(candidates) > 1: @@ -624,7 +626,7 @@ class BlockProcessor: # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer udb_key = b'u' + hashX + hdb_key[-6:] - utxo_value_packed = self.db.utxo_db.get(udb_key) + utxo_value_packed = self.db.db.get(udb_key) if utxo_value_packed is None: self.logger.warning( "%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX) diff --git a/lbry/wallet/server/history.py b/lbry/wallet/server/history.py index f3a7fbf17..312eaed03 100644 --- a/lbry/wallet/server/history.py +++ b/lbry/wallet/server/history.py @@ -16,13 +16,17 @@ from collections import defaultdict from functools import partial from lbry.wallet.server import util -from lbry.wallet.server.util import pack_be_uint32, unpack_be_uint32_from, unpack_be_uint16_from +from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN +HASHX_HISTORY_PREFIX = b'x' +HIST_STATE = b'state-hist' + + class History: - DB_VERSIONS = [0, 1] + DB_VERSIONS = [0] def __init__(self): self.logger = util.class_logger(__name__, self.__class__.__name__) @@ -32,34 +36,9 @@ class History: self.unflushed_count = 0 self.db = None - @property - def needs_migration(self): - return self.db_version != max(self.DB_VERSIONS) - - def migrate(self): - # 0 -> 1: flush_count from 16 to 32 bits - self.logger.warning("HISTORY MIGRATION IN PROGRESS. Please avoid shutting down before it finishes.") - with self.db.write_batch() as batch: - for key, value in self.db.iterator(prefix=b''): - if len(key) != 13: - continue - flush_id, = unpack_be_uint16_from(key[-2:]) - new_key = key[:-2] + pack_be_uint32(flush_id) - batch.put(new_key, value) - self.logger.warning("history migration: new keys added, removing old ones.") - for key, value in self.db.iterator(prefix=b''): - if len(key) == 13: - batch.delete(key) - self.logger.warning("history migration: writing new state.") - self.db_version = 1 - self.write_state(batch) - self.logger.warning("history migration: done.") - - def open_db(self, db_class, for_sync, utxo_flush_count, compacting): - self.db = db_class('hist', for_sync) + def open_db(self, db, for_sync, utxo_flush_count, compacting): + self.db = db #db_class('hist', for_sync) self.read_state() - if self.needs_migration: - self.migrate() self.clear_excess(utxo_flush_count) # An incomplete compaction needs to be cancelled otherwise # restarting it will corrupt the history @@ -69,11 +48,11 @@ class History: def close_db(self): if self.db: - self.db.close() + # self.db.close() self.db = None def read_state(self): - state = self.db.get(b'state\0\0') + state = self.db.get(HIST_STATE) if state: state = ast.literal_eval(state.decode()) if not isinstance(state, dict): @@ -105,17 +84,18 @@ class History: 'excess history flushes...') keys = [] - for key, hist in self.db.iterator(prefix=b''): - flush_id, = unpack_be_uint32_from(key[-4:]) + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): + k = key[1:] + flush_id, = unpack_be_uint16_from(k[-2:]) if flush_id > utxo_flush_count: - keys.append(key) + keys.append(k) self.logger.info(f'deleting {len(keys):,d} history entries') self.flush_count = utxo_flush_count with self.db.write_batch() as batch: for key in keys: - batch.delete(key) + batch.delete(HASHX_HISTORY_PREFIX + key) self.write_state(batch) self.logger.info('deleted excess history entries') @@ -130,7 +110,7 @@ class History: } # History entries are not prefixed; the suffix \0\0 ensures we # look similar to other entries and aren't interfered with - batch.put(b'state\0\0', repr(state).encode()) + batch.put(HIST_STATE, repr(state).encode()) def add_unflushed(self, hashXs_by_tx, first_tx_num): unflushed = self.unflushed @@ -151,13 +131,13 @@ class History: def flush(self): start_time = time.time() self.flush_count += 1 - flush_id = pack_be_uint32(self.flush_count) + flush_id = pack_be_uint16(self.flush_count) unflushed = self.unflushed with self.db.write_batch() as batch: for hashX in sorted(unflushed): key = hashX + flush_id - batch.put(key, unflushed[hashX].tobytes()) + batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes()) self.write_state(batch) count = len(unflushed) @@ -179,16 +159,17 @@ class History: for hashX in sorted(hashXs): deletes = [] puts = {} - for key, hist in self.db.iterator(prefix=hashX, reverse=True): + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True): + k = key[1:] a = array.array('I') a.frombytes(hist) # Remove all history entries >= tx_count idx = bisect_left(a, tx_count) nremoves += len(a) - idx if idx > 0: - puts[key] = a[:idx].tobytes() + puts[k] = a[:idx].tobytes() break - deletes.append(key) + deletes.append(k) for key in deletes: batch.delete(key) @@ -246,9 +227,9 @@ class History: with self.db.write_batch() as batch: # Important: delete first! The keyspace may overlap. for key in keys_to_delete: - batch.delete(key) + batch.delete(HASHX_HISTORY_PREFIX + key) for key, value in write_items: - batch.put(key, value) + batch.put(HASHX_HISTORY_PREFIX + key, value) self.write_state(batch) def _compact_hashX(self, hashX, hist_map, hist_list, @@ -275,7 +256,7 @@ class History: write_size = 0 keys_to_delete.update(hist_map) for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - key = hashX + pack_be_uint32(n) + key = hashX + pack_be_uint16(n) if hist_map.get(key) == chunk: keys_to_delete.remove(key) else: @@ -296,11 +277,12 @@ class History: key_len = HASHX_LEN + 2 write_size = 0 - for key, hist in self.db.iterator(prefix=prefix): + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix): + k = key[1:] # Ignore non-history entries - if len(key) != key_len: + if len(k) != key_len: continue - hashX = key[:-2] + hashX = k[:-2] if hashX != prior_hashX and prior_hashX: write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, write_items, @@ -308,7 +290,7 @@ class History: hist_map.clear() hist_list.clear() prior_hashX = hashX - hist_map[key] = hist + hist_map[k] = hist hist_list.append(hist) if prior_hashX: @@ -326,8 +308,8 @@ class History: # Loop over 2-byte prefixes cursor = self.comp_cursor - while write_size < limit and cursor < (1 << 32): - prefix = pack_be_uint32(cursor) + while write_size < limit and cursor < 65536: + prefix = pack_be_uint16(cursor) write_size += self._compact_prefix(prefix, write_items, keys_to_delete) cursor += 1 diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 5498706bd..effc0b4c6 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -41,6 +41,14 @@ TX_HASH_PREFIX = b'X' TX_PREFIX = b'B' TX_NUM_PREFIX = b'N' BLOCK_HASH_PREFIX = b'C' +HISTORY_PREFIX = b'A' +HASHX_UTXO_PREFIX = b'h' +UTXO_PREFIX = b'u' +HASHX_HISTORY_PREFIX = b'x' + +UTXO_STATE = b'state-utxo' +HIST_STATE = b'state-hist' + @@ -80,7 +88,7 @@ class LevelDB: self.db_class = db_class(env.db_dir, self.env.db_engine) self.history = History() - self.utxo_db = None + self.db = None self.tx_counts = None self.headers = None self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server') @@ -107,7 +115,7 @@ class LevelDB: def get_counts(): return tuple( util.unpack_be_uint64(tx_count) - for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) ) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) @@ -122,7 +130,7 @@ class LevelDB: async def _read_txids(self): def get_txids(): - return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) + return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) start = time.perf_counter() self.logger.info("loading txids") @@ -137,7 +145,9 @@ class LevelDB: return def get_headers(): - return list(self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False)) + return [ + header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False) + ] headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" @@ -152,29 +162,17 @@ class LevelDB: f.write(f'ElectrumX databases and metadata for ' f'{self.coin.NAME} {self.coin.NET}'.encode()) - assert self.headers_db is None - self.headers_db = self.db_class('headers', for_sync) - if self.headers_db.is_new: - self.logger.info('created new headers db') - self.logger.info(f'opened headers DB (for sync: {for_sync})') + assert self.db is None + self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync) + if self.db.is_new: + self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}') + self.logger.info(f'opened DB (for sync: {for_sync})') - assert self.tx_db is None - self.tx_db = self.db_class('tx', for_sync) - if self.tx_db.is_new: - self.logger.info('created new tx db') - self.logger.info(f'opened tx DB (for sync: {for_sync})') - - assert self.utxo_db is None - # First UTXO DB - self.utxo_db = self.db_class('utxo', for_sync) - if self.utxo_db.is_new: - self.logger.info('created new utxo db') - self.logger.info(f'opened utxo db (for sync: {for_sync})') self.read_utxo_state() # Then history DB self.utxo_flush_count = self.history.open_db( - self.db_class, for_sync, self.utxo_flush_count, compacting + self.db, for_sync, self.utxo_flush_count, compacting ) self.clear_excess_undo_info() @@ -185,10 +183,8 @@ class LevelDB: await self._read_headers() def close(self): - self.utxo_db.close() + self.db.close() self.history.close_db() - self.headers_db.close() - self.tx_db.close() self.executor.shutdown(wait=True) self.executor = None @@ -209,18 +205,12 @@ class LevelDB: """Open the databases for serving. If they are already open they are closed first. """ - self.logger.info('closing DBs to re-open for serving') - if self.utxo_db: - self.logger.info('closing DBs to re-open for serving') - self.utxo_db.close() - self.history.close_db() - self.utxo_db = None - if self.headers_db: - self.headers_db.close() - self.headers_db = None - if self.tx_db: - self.tx_db.close() - self.tx_db = None + if self.db: + return + # self.logger.info('closing DBs to re-open for serving') + # self.db.close() + # self.history.close_db() + # self.db = None await self._open_dbs(False, False) self.logger.info("opened for serving") @@ -269,14 +259,14 @@ class LevelDB: self.flush_history() # Flush state last as it reads the wall time. - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: if flush_utxos: self.flush_utxo_db(batch, flush_data) self.flush_state(batch) # Update and put the wall time again - otherwise we drop the # time it took to commit the batch - self.flush_state(self.utxo_db) + self.flush_state(self.db) elapsed = self.last_flush - start_time self.logger.info(f'flush #{self.history.flush_count:,d} took ' @@ -284,7 +274,7 @@ class LevelDB: f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') # Catch-up stats - if self.utxo_db.for_sync: + if self.db.for_sync: flush_interval = self.last_flush - prior_flush tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) tx_per_sec_last = 1 + int(tx_delta / flush_interval) @@ -315,7 +305,7 @@ class LevelDB: # Write the headers start_time = time.perf_counter() - with self.headers_db.write_batch() as batch: + with self.db.write_batch() as batch: batch_put = batch.put for i, header in enumerate(flush_data.headers): batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header) @@ -325,7 +315,7 @@ class LevelDB: height_start = self.fs_height + 1 tx_num = prior_tx_count - with self.tx_db.write_batch() as batch: + with self.db.write_batch() as batch: batch_put = batch.put for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs): tx_count = self.tx_counts[height_start] @@ -380,7 +370,7 @@ class LevelDB: self.flush_undo_infos(batch_put, flush_data.undo_infos) flush_data.undo_infos.clear() - if self.utxo_db.for_sync: + if self.db.for_sync: block_count = flush_data.height - self.db_height tx_count = flush_data.tx_count - self.db_tx_count elapsed = time.time() - start_time @@ -414,7 +404,7 @@ class LevelDB: self.backup_fs(flush_data.height, flush_data.tx_count) self.history.backup(touched, flush_data.tx_count) - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: self.flush_utxo_db(batch, flush_data) # Flush state last as it reads the wall time. self.flush_state(batch) @@ -488,9 +478,8 @@ class LevelDB: def _fs_transactions(self, txids: Iterable[str]): unpack_be_uint64 = util.unpack_be_uint64 tx_counts = self.tx_counts - tx_db_get = self.tx_db.get + tx_db_get = self.db.get tx_cache = self._tx_and_merkle_cache - tx_infos = {} for tx_hash in txids: @@ -548,11 +537,13 @@ class LevelDB: def read_history(): db_height = self.db_height tx_counts = self.tx_counts + tx_db_get = self.db.get + pack_be_uint64 = util.pack_be_uint64 cnt = 0 txs = [] - for hist in self.history.db.iterator(prefix=hashX, include_key=False): + for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): a = array.array('I') a.frombytes(hist) for tx_num in a: @@ -587,7 +578,7 @@ class LevelDB: def read_undo_info(self, height): """Read undo information from a file for the current height.""" - return self.utxo_db.get(self.undo_key(height)) + return self.db.get(self.undo_key(height)) def flush_undo_infos(self, batch_put, undo_infos): """undo_infos is a list of (undo_info, height) pairs.""" @@ -626,14 +617,14 @@ class LevelDB: prefix = b'U' min_height = self.min_undo_height(self.db_height) keys = [] - for key, hist in self.utxo_db.iterator(prefix=prefix): + for key, hist in self.db.iterator(prefix=prefix): height, = unpack('>I', key[-4:]) if height >= min_height: break keys.append(key) if keys: - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: for key in keys: batch.delete(key) self.logger.info(f'deleted {len(keys):,d} stale undo entries') @@ -654,7 +645,7 @@ class LevelDB: # -- UTXO database def read_utxo_state(self): - state = self.utxo_db.get(b'state') + state = self.db.get(UTXO_STATE) if not state: self.db_height = -1 self.db_tx_count = 0 @@ -697,7 +688,7 @@ class LevelDB: self.logger.info(f'height: {self.db_height:,d}') self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') self.logger.info(f'tx count: {self.db_tx_count:,d}') - if self.utxo_db.for_sync: + if self.db.for_sync: self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') if self.first_sync: self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') @@ -714,11 +705,11 @@ class LevelDB: 'first_sync': self.first_sync, 'db_version': self.db_version, } - batch.put(b'state', repr(state).encode()) + batch.put(UTXO_STATE, repr(state).encode()) def set_flush_count(self, count): self.utxo_flush_count = count - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: self.write_utxo_state(batch) async def all_utxos(self, hashX): @@ -731,7 +722,7 @@ class LevelDB: # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer prefix = b'u' + hashX - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): + for db_key, db_value in self.db.iterator(prefix=prefix): tx_pos, tx_num = s_unpack('