From cc51543851728e376e73f14eee1acc801370b797 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 9 Jun 2020 16:21:48 -0400 Subject: [PATCH] headers db --- lbry/wallet/server/leveldb.py | 89 ++++++++++++++--------------------- lbry/wallet/server/util.py | 4 +- 2 files changed, 38 insertions(+), 55 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index fe048c9b4..86dba8955 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -32,6 +32,9 @@ from lbry.wallet.server.history import History UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") +HEADER_PREFIX = b'H' +TX_COUNT_PREFIX = b'T' +TX_HASH_PREFIX = b'X' @attr.s(slots=True) @@ -63,15 +66,7 @@ class LevelDB: self.logger = util.class_logger(__name__, self.__class__.__name__) self.env = env self.coin = env.coin - self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) - - # Setup block header size handlers - if self.coin.STATIC_BLOCK_HEADERS: - self.header_offset = self.coin.static_header_offset - self.header_len = self.coin.static_header_len - else: - self.header_offset = self.dynamic_header_offset - self.header_len = self.dynamic_header_len + self.executor = None self.logger.info(f'switching current directory to {env.db_dir}') @@ -88,12 +83,9 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) path = partial(os.path.join, self.env.db_dir) - self.headers_file = util.LogicalFile(path('meta/headers'), 2, 16000000) self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000) self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000) - if not self.coin.STATIC_BLOCK_HEADERS: - self.headers_offsets_file = util.LogicalFile( - path('meta/headers_offsets'), 2, 16000000) + self.headers_db = None async def _read_tx_counts(self): if self.tx_counts is not None: @@ -110,22 +102,24 @@ class LevelDB: assert self.db_tx_count == 0 async def _open_dbs(self, for_sync, compacting): - assert self.utxo_db is None + if self.executor is None: + self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) + assert self.headers_db is None + self.headers_db = self.db_class('headers', for_sync) + if self.headers_db.is_new: + self.logger.info('created new headers db') + self.logger.info(f'opened headers DB (for sync: {for_sync})') + + assert self.utxo_db is None # First UTXO DB self.utxo_db = self.db_class('utxo', for_sync) if self.utxo_db.is_new: self.logger.info('created new database') self.logger.info('creating metadata directory') os.mkdir(os.path.join(self.env.db_dir, 'meta')) - coin_path = os.path.join(self.env.db_dir, 'meta', 'COIN') - with util.open_file(coin_path, create=True) as f: - f.write(f'ElectrumX databases and metadata for ' - f'{self.coin.NAME} {self.coin.NET}'.encode()) - if not self.coin.STATIC_BLOCK_HEADERS: - self.headers_offsets_file.write(0, bytes(8)) - else: - self.logger.info(f'opened UTXO DB (for sync: {for_sync})') + self.logger.info('created new utxo db') + self.logger.info(f'opened utxo db (for sync: {for_sync})') self.read_utxo_state() # Then history DB @@ -140,7 +134,9 @@ class LevelDB: def close(self): self.utxo_db.close() self.history.close_db() + self.headers_db.close() self.executor.shutdown(wait=True) + self.executor = None async def open_for_compacting(self): await self._open_dbs(True, True) @@ -163,6 +159,9 @@ class LevelDB: self.utxo_db.close() self.history.close_db() self.utxo_db = None + if self.headers_db: + self.headers_db.close() + self.headers_db = None await self._open_dbs(False, False) # Header merkle cache @@ -256,11 +255,11 @@ class LevelDB: # Write the headers, tx counts, and tx hashes start_time = time.time() height_start = self.fs_height + 1 - offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(flush_data.headers)) - self.fs_update_header_offsets(offset, height_start, flush_data.headers) - flush_data.headers.clear() + for header in flush_data.headers: + tx_count = self.tx_counts[height_start] + self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + height_start += 1 offset = height_start * self.tx_counts.itemsize self.tx_counts_file.write(offset, self.tx_counts[height_start:].tobytes()) @@ -270,6 +269,7 @@ class LevelDB: self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count + flush_data.headers.clear() if self.utxo_db.for_sync: elapsed = time.time() - start_time self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') @@ -350,28 +350,6 @@ class LevelDB: f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - def fs_update_header_offsets(self, offset_start, height_start, headers): - if self.coin.STATIC_BLOCK_HEADERS: - return - offset = offset_start - offsets = [] - for h in headers: - offset += len(h) - offsets.append(pack("