diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 1db38f26f..f7bb7faba 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -82,8 +82,7 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - path = partial(os.path.join, self.env.db_dir) - self.hashes_file = util.LogicalFile(path('hashes'), 4, 16000000) + self.hashes_db = None self.headers_db = None self.tx_count_db = None @@ -130,6 +129,12 @@ class LevelDB: self.logger.info('created new tx count db') self.logger.info(f'opened tx count DB (for sync: {for_sync})') + assert self.hashes_db is None + self.hashes_db = self.db_class('hashes', for_sync) + if self.hashes_db.is_new: + self.logger.info('created new tx hashes db') + self.logger.info(f'opened tx hashes DB (for sync: {for_sync})') + assert self.utxo_db is None # First UTXO DB self.utxo_db = self.db_class('utxo', for_sync) @@ -152,6 +157,7 @@ class LevelDB: self.history.close_db() self.headers_db.close() self.tx_count_db.close() + self.hashes_db.close() self.executor.shutdown(wait=True) self.executor = None @@ -184,6 +190,10 @@ class LevelDB: if self.tx_count_db: self.tx_count_db.close() self.tx_count_db = None + if self.hashes_db: + self.hashes_db.close() + self.hashes_db = None + await self._open_dbs(False, False) self.logger.info("opened for serving") @@ -270,13 +280,10 @@ class LevelDB: assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts else 0) assert len(self.tx_counts) == flush_data.height + 1 - hashes = b''.join(flush_data.block_tx_hashes) - flush_data.block_tx_hashes.clear() - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count + assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes - start_time = time.time() + start_time = time.perf_counter() height_start = self.fs_height + 1 for header in flush_data.headers: @@ -284,16 +291,22 @@ class LevelDB: self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 - offset = prior_tx_count * 32 - self.hashes_file.write(offset, hashes) + + tx_num = prior_tx_count + for tx_hashes in flush_data.block_tx_hashes: + offset = 0 + while offset < len(tx_hashes): + self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + tx_num += 1 + offset += 32 + + flush_data.block_tx_hashes.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count - flush_data.headers.clear() - if self.utxo_db.for_sync: - elapsed = time.time() - start_time - self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') + elapsed = time.perf_counter() - start_time + self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') def flush_history(self): self.history.flush() @@ -421,7 +434,7 @@ class LevelDB: if tx_height > self.db_height: tx_hash = None else: - tx_hash = self.hashes_file.read(tx_num * 32, 32) + tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height async def fs_block_hashes(self, height, count):