diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 86dba8955..69c6da06b 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -83,21 +83,29 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) path = partial(os.path.join, self.env.db_dir) - self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000) self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000) self.headers_db = None + self.tx_count_db = None async def _read_tx_counts(self): if self.tx_counts is not None: return # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase - size = (self.db_height + 1) * 4 - tx_counts = self.tx_counts_file.read(0, size) - assert len(tx_counts) == size + + def get_counts(): + return tuple( + util.unpack_be_uint64(tx_count) + for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + ) + + tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) + assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" self.tx_counts = array.array('I', tx_counts) + if self.tx_counts: - assert self.db_tx_count == self.tx_counts[-1] + assert self.db_tx_count == self.tx_counts[-1], \ + f"{self.db_tx_count} vs {self.tx_counts[-1]} ({len(self.tx_counts)} counts)" else: assert self.db_tx_count == 0 @@ -111,6 +119,12 @@ class LevelDB: self.logger.info('created new headers db') self.logger.info(f'opened headers DB (for sync: {for_sync})') + assert self.tx_count_db is None + self.tx_count_db = self.db_class('tx_count', for_sync) + if self.tx_count_db.is_new: + self.logger.info('created new tx count db') + self.logger.info(f'opened tx count DB (for sync: {for_sync})') + assert self.utxo_db is None # First UTXO DB self.utxo_db = self.db_class('utxo', for_sync) @@ -135,6 +149,7 @@ class LevelDB: self.utxo_db.close() self.history.close_db() self.headers_db.close() + self.tx_count_db.close() self.executor.shutdown(wait=True) self.executor = None @@ -162,6 +177,9 @@ class LevelDB: if self.headers_db: self.headers_db.close() self.headers_db = None + if self.tx_count_db: + self.tx_count_db.close() + self.tx_count_db = None await self._open_dbs(False, False) # Header merkle cache @@ -259,10 +277,8 @@ class LevelDB: for header in flush_data.headers: tx_count = self.tx_counts[height_start] self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 - offset = height_start * self.tx_counts.itemsize - self.tx_counts_file.write(offset, - self.tx_counts[height_start:].tobytes()) offset = prior_tx_count * 32 self.hashes_file.write(offset, hashes)