diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 26eaad00e..ccb704d73 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -586,7 +586,7 @@ class BlockProcessor: # Value: hashX prefix = b'h' + tx_hash[:4] + idx_packed candidates = {db_key: hashX for db_key, hashX - in self.db.utxo_db.iterator(prefix=prefix)} + in self.db.db.iterator(prefix=prefix)} for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] @@ -601,7 +601,7 @@ class BlockProcessor: # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer udb_key = b'u' + hashX + hdb_key[-6:] - utxo_value_packed = self.db.utxo_db.get(udb_key) + utxo_value_packed = self.db.db.get(udb_key) if utxo_value_packed: # Remove both entries for this UTXO self.db_deletes.append(hdb_key) diff --git a/lbry/wallet/server/history.py b/lbry/wallet/server/history.py index 72952dc53..312eaed03 100644 --- a/lbry/wallet/server/history.py +++ b/lbry/wallet/server/history.py @@ -20,6 +20,10 @@ from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN +HASHX_HISTORY_PREFIX = b'x' +HIST_STATE = b'state-hist' + + class History: DB_VERSIONS = [0] @@ -32,8 +36,8 @@ class History: self.unflushed_count = 0 self.db = None - def open_db(self, db_class, for_sync, utxo_flush_count, compacting): - self.db = db_class('hist', for_sync) + def open_db(self, db, for_sync, utxo_flush_count, compacting): + self.db = db #db_class('hist', for_sync) self.read_state() self.clear_excess(utxo_flush_count) # An incomplete compaction needs to be cancelled otherwise @@ -44,11 +48,11 @@ class History: def close_db(self): if self.db: - self.db.close() + # self.db.close() self.db = None def read_state(self): - state = self.db.get(b'state\0\0') + state = self.db.get(HIST_STATE) if state: state = ast.literal_eval(state.decode()) if not isinstance(state, dict): @@ -80,17 +84,18 @@ class History: 'excess history flushes...') keys = [] - for key, hist in self.db.iterator(prefix=b''): - flush_id, = unpack_be_uint16_from(key[-2:]) + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): + k = key[1:] + flush_id, = unpack_be_uint16_from(k[-2:]) if flush_id > utxo_flush_count: - keys.append(key) + keys.append(k) self.logger.info(f'deleting {len(keys):,d} history entries') self.flush_count = utxo_flush_count with self.db.write_batch() as batch: for key in keys: - batch.delete(key) + batch.delete(HASHX_HISTORY_PREFIX + key) self.write_state(batch) self.logger.info('deleted excess history entries') @@ -105,7 +110,7 @@ class History: } # History entries are not prefixed; the suffix \0\0 ensures we # look similar to other entries and aren't interfered with - batch.put(b'state\0\0', repr(state).encode()) + batch.put(HIST_STATE, repr(state).encode()) def add_unflushed(self, hashXs_by_tx, first_tx_num): unflushed = self.unflushed @@ -132,7 +137,7 @@ class History: with self.db.write_batch() as batch: for hashX in sorted(unflushed): key = hashX + flush_id - batch.put(key, unflushed[hashX].tobytes()) + batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes()) self.write_state(batch) count = len(unflushed) @@ -154,16 +159,17 @@ class History: for hashX in sorted(hashXs): deletes = [] puts = {} - for key, hist in self.db.iterator(prefix=hashX, reverse=True): + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True): + k = key[1:] a = array.array('I') a.frombytes(hist) # Remove all history entries >= tx_count idx = bisect_left(a, tx_count) nremoves += len(a) - idx if idx > 0: - puts[key] = a[:idx].tobytes() + puts[k] = a[:idx].tobytes() break - deletes.append(key) + deletes.append(k) for key in deletes: batch.delete(key) @@ -221,9 +227,9 @@ class History: with self.db.write_batch() as batch: # Important: delete first! The keyspace may overlap. for key in keys_to_delete: - batch.delete(key) + batch.delete(HASHX_HISTORY_PREFIX + key) for key, value in write_items: - batch.put(key, value) + batch.put(HASHX_HISTORY_PREFIX + key, value) self.write_state(batch) def _compact_hashX(self, hashX, hist_map, hist_list, @@ -271,11 +277,12 @@ class History: key_len = HASHX_LEN + 2 write_size = 0 - for key, hist in self.db.iterator(prefix=prefix): + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix): + k = key[1:] # Ignore non-history entries - if len(key) != key_len: + if len(k) != key_len: continue - hashX = key[:-2] + hashX = k[:-2] if hashX != prior_hashX and prior_hashX: write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, write_items, @@ -283,7 +290,7 @@ class History: hist_map.clear() hist_list.clear() prior_hashX = hashX - hist_map[key] = hist + hist_map[k] = hist hist_list.append(hist) if prior_hashX: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index d24ffbed4..e0616070a 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -49,10 +49,9 @@ HISTORY_PREFIX = b'A' HASHX_UTXO_PREFIX = b'h' UTXO_PREFIX = b'u' HASHX_HISTORY_PREFIX = b'x' -STATE_PREFIX = b'state' -UTXO_STATE_PREFIX = b'state-utxo-' -HIST_STATE_PREFIX = b'state-hist-' +UTXO_STATE = b'state-utxo' +HIST_STATE = b'state-hist' class RocksDBState(typing.NamedTuple): @@ -198,7 +197,7 @@ class LevelDB: self.db_class = db_class(env.db_dir, self.env.db_engine) self.history = History() - self.utxo_db = None + self.db = None self.tx_counts = None self.headers = None self.last_flush = time.time() @@ -209,9 +208,6 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - self.db = None - self.db = None - self._block_txs_cache = pylru.lrucache(50000) self._merkle_tx_cache = pylru.lrucache(100000) self.total_transactions = None @@ -278,17 +274,11 @@ class LevelDB: self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}') self.logger.info(f'opened DB (for sync: {for_sync})') - assert self.utxo_db is None - # First UTXO DB - self.utxo_db = self.db_class('utxo', for_sync) - if self.utxo_db.is_new: - self.logger.info('created new utxo db') - self.logger.info(f'opened utxo db (for sync: {for_sync})') self.read_utxo_state() # Then history DB self.utxo_flush_count = self.history.open_db( - self.db_class, for_sync, self.utxo_flush_count, compacting + self.db, for_sync, self.utxo_flush_count, compacting ) self.clear_excess_undo_info() @@ -299,9 +289,8 @@ class LevelDB: await self._read_headers() def close(self): - self.utxo_db.close() - self.history.close_db() self.db.close() + self.history.close_db() self.executor.shutdown(wait=True) self.executor = None @@ -322,18 +311,12 @@ class LevelDB: """Open the databases for serving. If they are already open they are closed first. """ - self.logger.info('closing DBs to re-open for serving') - if self.utxo_db: - self.logger.info('closing DBs to re-open for serving') - self.utxo_db.close() - self.history.close_db() - self.utxo_db = None if self.db: - self.db.close() - self.db = None - if self.db: - self.db.close() - self.db = None + return + # self.logger.info('closing DBs to re-open for serving') + # self.db.close() + # self.history.close_db() + # self.db = None await self._open_dbs(False, False) self.logger.info("opened for serving") @@ -382,14 +365,14 @@ class LevelDB: self.flush_history() # Flush state last as it reads the wall time. - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: if flush_utxos: self.flush_utxo_db(batch, flush_data) self.flush_state(batch) # Update and put the wall time again - otherwise we drop the # time it took to commit the batch - self.flush_state(self.utxo_db) + self.flush_state(self.db) elapsed = self.last_flush - start_time self.logger.info(f'flush #{self.history.flush_count:,d} took ' @@ -397,7 +380,7 @@ class LevelDB: f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') # Catch-up stats - if self.utxo_db.for_sync: + if self.db.for_sync: flush_interval = self.last_flush - prior_flush tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) tx_per_sec_last = 1 + int(tx_delta / flush_interval) @@ -493,7 +476,7 @@ class LevelDB: self.flush_undo_infos(batch_put, flush_data.undo_infos) flush_data.undo_infos.clear() - if self.utxo_db.for_sync: + if self.db.for_sync: block_count = flush_data.height - self.db_height tx_count = flush_data.tx_count - self.db_tx_count elapsed = time.time() - start_time @@ -527,7 +510,7 @@ class LevelDB: self.backup_fs(flush_data.height, flush_data.tx_count) self.history.backup(touched, flush_data.tx_count) - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: self.flush_utxo_db(batch, flush_data) # Flush state last as it reads the wall time. self.flush_state(batch) @@ -690,7 +673,7 @@ class LevelDB: cnt = 0 txs = [] - for hist in self.history.db.iterator(prefix=hashX, include_key=False): + for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): a = array.array('I') a.frombytes(hist) for tx_num in a: @@ -725,7 +708,7 @@ class LevelDB: def read_undo_info(self, height): """Read undo information from a file for the current height.""" - return self.utxo_db.get(self.undo_key(height)) + return self.db.get(self.undo_key(height)) def flush_undo_infos(self, batch_put, undo_infos): """undo_infos is a list of (undo_info, height) pairs.""" @@ -764,14 +747,14 @@ class LevelDB: prefix = b'U' min_height = self.min_undo_height(self.db_height) keys = [] - for key, hist in self.utxo_db.iterator(prefix=prefix): + for key, hist in self.db.iterator(prefix=prefix): height, = unpack('>I', key[-4:]) if height >= min_height: break keys.append(key) if keys: - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: for key in keys: batch.delete(key) self.logger.info(f'deleted {len(keys):,d} stale undo entries') @@ -792,7 +775,7 @@ class LevelDB: # -- UTXO database def read_utxo_state(self): - state = self.utxo_db.get(b'state') + state = self.db.get(UTXO_STATE) if not state: self.db_height = -1 self.db_tx_count = 0 @@ -835,7 +818,7 @@ class LevelDB: self.logger.info(f'height: {self.db_height:,d}') self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') self.logger.info(f'tx count: {self.db_tx_count:,d}') - if self.utxo_db.for_sync: + if self.db.for_sync: self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') if self.first_sync: self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') @@ -852,11 +835,11 @@ class LevelDB: 'first_sync': self.first_sync, 'db_version': self.db_version, } - batch.put(b'state', repr(state).encode()) + batch.put(UTXO_STATE, repr(state).encode()) def set_flush_count(self, count): self.utxo_flush_count = count - with self.utxo_db.write_batch() as batch: + with self.db.write_batch() as batch: self.write_utxo_state(batch) async def all_utxos(self, hashX): @@ -869,7 +852,7 @@ class LevelDB: # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer prefix = b'u' + hashX - for db_key, db_value in self.utxo_db.iterator(prefix=prefix): + for db_key, db_value in self.db.iterator(prefix=prefix): tx_pos, tx_num = s_unpack('