diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 5fc1885f1..d24ffbed4 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -32,7 +32,7 @@ import attr from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache -from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from +from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from, pack_le_int32 from lbry.wallet.server.storage import db_class from lbry.wallet.server.history import History if typing.TYPE_CHECKING: @@ -51,6 +51,9 @@ UTXO_PREFIX = b'u' HASHX_HISTORY_PREFIX = b'x' STATE_PREFIX = b'state' +UTXO_STATE_PREFIX = b'state-utxo-' +HIST_STATE_PREFIX = b'state-hist-' + class RocksDBState(typing.NamedTuple): db_version: int @@ -206,8 +209,8 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - self.headers_db = None - self.tx_db = None + self.db = None + self.db = None self._block_txs_cache = pylru.lrucache(50000) self._merkle_tx_cache = pylru.lrucache(100000) @@ -222,7 +225,7 @@ class LevelDB: def get_counts(): return tuple( util.unpack_be_uint64(tx_count) - for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) ) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) @@ -237,7 +240,7 @@ class LevelDB: async def _read_txids(self): def get_txids(): - return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) + return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) start = time.perf_counter() self.logger.info("loading txids") @@ -253,7 +256,7 @@ class LevelDB: def get_headers(): return [ - header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False) + header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False) ] headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) @@ -269,17 +272,11 @@ class LevelDB: f.write(f'ElectrumX databases and metadata for ' f'{self.coin.NAME} {self.coin.NET}'.encode()) - assert self.headers_db is None - self.headers_db = self.db_class('headers', for_sync) - if self.headers_db.is_new: - self.logger.info('created new headers db') - self.logger.info(f'opened headers DB (for sync: {for_sync})') - - assert self.tx_db is None - self.tx_db = self.db_class('tx', for_sync) - if self.tx_db.is_new: - self.logger.info('created new tx db') - self.logger.info(f'opened tx DB (for sync: {for_sync})') + assert self.db is None + self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync) + if self.db.is_new: + self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}') + self.logger.info(f'opened DB (for sync: {for_sync})') assert self.utxo_db is None # First UTXO DB @@ -304,8 +301,7 @@ class LevelDB: def close(self): self.utxo_db.close() self.history.close_db() - self.headers_db.close() - self.tx_db.close() + self.db.close() self.executor.shutdown(wait=True) self.executor = None @@ -332,12 +328,12 @@ class LevelDB: self.utxo_db.close() self.history.close_db() self.utxo_db = None - if self.headers_db: - self.headers_db.close() - self.headers_db = None - if self.tx_db: - self.tx_db.close() - self.tx_db = None + if self.db: + self.db.close() + self.db = None + if self.db: + self.db.close() + self.db = None await self._open_dbs(False, False) self.logger.info("opened for serving") @@ -432,7 +428,7 @@ class LevelDB: # Write the headers start_time = time.perf_counter() - with self.headers_db.write_batch() as batch: + with self.db.write_batch() as batch: batch_put = batch.put for i, header in enumerate(flush_data.headers): batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header) @@ -442,7 +438,7 @@ class LevelDB: height_start = self.fs_height + 1 tx_num = prior_tx_count - with self.tx_db.write_batch() as batch: + with self.db.write_batch() as batch: batch_put = batch.put for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs): tx_count = self.tx_counts[height_start] @@ -594,7 +590,7 @@ class LevelDB: tx_pos = tx_num - tx_counts[tx_height - 1] def _update_block_txs_cache(): - block_txs = list(self.tx_db.iterator( + block_txs = list(self.db.iterator( start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]), stop=None if tx_height + 1 == len(tx_counts) else TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False @@ -625,7 +621,7 @@ class LevelDB: def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]: unpack_be_uint64 = util.unpack_be_uint64 tx_counts = self.tx_counts - tx_db_get = self.tx_db.get + tx_db_get = self.db.get tx_infos = [] for tx_hash in txids: @@ -688,7 +684,7 @@ class LevelDB: def read_history(): db_height = self.db_height tx_counts = self.tx_counts - tx_db_get = self.tx_db.get + tx_db_get = self.db.get pack_be_uint64 = util.pack_be_uint64 cnt = 0