diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 7fe0ae059..68c735265 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -171,7 +171,7 @@ class BlockProcessor: # Caches of unflushed items. self.headers = [] - self.tx_hashes = [] + self.block_txs = [] self.undo_infos = [] # UTXO cache @@ -337,7 +337,7 @@ class BlockProcessor: """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() return FlushData(self.height, self.tx_count, self.headers, - self.tx_hashes, self.undo_infos, self.utxo_cache, + self.block_txs, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) async def flush(self, flush_utxos): @@ -404,7 +404,7 @@ class BlockProcessor: self.tip = self.coin.header_hash(headers[-1]) def advance_txs(self, height, txs, header): - self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) + self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx for tx, _ in txs])) # Use local vars for speed in the loops undo_info = [] diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index ccd3fcd60..a7f84afe6 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -35,6 +35,7 @@ UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") HEADER_PREFIX = b'H' TX_COUNT_PREFIX = b'T' TX_HASH_PREFIX = b'X' +TX_PREFIX = b'B' @attr.s(slots=True) @@ -42,7 +43,7 @@ class FlushData: height = attr.ib() tx_count = attr.ib() headers = attr.ib() - block_tx_hashes = attr.ib() + block_txs = attr.ib() # The following are flushed to the UTXO DB if undo_infos is not None undo_infos = attr.ib() adds = attr.ib() @@ -82,9 +83,8 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - self.hashes_db = None self.headers_db = None - self.tx_count_db = None + self.tx_db = None async def _read_tx_counts(self): if self.tx_counts is not None: @@ -95,7 +95,7 @@ class LevelDB: def get_counts(): return tuple( util.unpack_be_uint64(tx_count) - for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) ) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) @@ -123,17 +123,11 @@ class LevelDB: self.logger.info('created new headers db') self.logger.info(f'opened headers DB (for sync: {for_sync})') - assert self.tx_count_db is None - self.tx_count_db = self.db_class('tx_count', for_sync) - if self.tx_count_db.is_new: - self.logger.info('created new tx count db') - self.logger.info(f'opened tx count DB (for sync: {for_sync})') - - assert self.hashes_db is None - self.hashes_db = self.db_class('hashes', for_sync) - if self.hashes_db.is_new: - self.logger.info('created new tx hashes db') - self.logger.info(f'opened tx hashes DB (for sync: {for_sync})') + assert self.tx_db is None + self.tx_db = self.db_class('tx', for_sync) + if self.tx_db.is_new: + self.logger.info('created new tx db') + self.logger.info(f'opened tx DB (for sync: {for_sync})') assert self.utxo_db is None # First UTXO DB @@ -156,8 +150,7 @@ class LevelDB: self.utxo_db.close() self.history.close_db() self.headers_db.close() - self.tx_count_db.close() - self.hashes_db.close() + self.tx_db.close() self.executor.shutdown(wait=True) self.executor = None @@ -187,12 +180,9 @@ class LevelDB: if self.headers_db: self.headers_db.close() self.headers_db = None - if self.tx_count_db: - self.tx_count_db.close() - self.tx_count_db = None - if self.hashes_db: - self.hashes_db.close() - self.hashes_db = None + if self.tx_db: + self.tx_db.close() + self.tx_db = None await self._open_dbs(False, False) self.logger.info("opened for serving") @@ -217,7 +207,7 @@ class LevelDB: assert flush_data.height == self.fs_height == self.db_height assert flush_data.tip == self.db_tip assert not flush_data.headers - assert not flush_data.block_tx_hashes + assert not flush_data.block_txs assert not flush_data.adds assert not flush_data.deletes assert not flush_data.undo_infos @@ -275,30 +265,33 @@ class LevelDB: """ prior_tx_count = (self.tx_counts[self.fs_height] if self.fs_height >= 0 else 0) - assert len(flush_data.block_tx_hashes) == len(flush_data.headers) + assert len(flush_data.block_txs) == len(flush_data.headers) assert flush_data.height == self.fs_height + len(flush_data.headers) assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts else 0) assert len(self.tx_counts) == flush_data.height + 1 - assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count + assert len( + b''.join(hashes for hashes, _ in flush_data.block_txs) + ) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes start_time = time.perf_counter() height_start = self.fs_height + 1 tx_num = prior_tx_count - for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes): + for header, (tx_hashes, txs) in zip(flush_data.headers, flush_data.block_txs): tx_count = self.tx_counts[height_start] self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) - self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) + self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 offset = 0 while offset < len(tx_hashes): - self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + self.tx_db.put(TX_PREFIX + util.pack_be_uint64(tx_num), txs[offset // 32]) tx_num += 1 offset += 32 - flush_data.block_tx_hashes.clear() + flush_data.block_txs.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count flush_data.headers.clear() @@ -362,7 +355,7 @@ class LevelDB: def flush_backup(self, flush_data, touched): """Like flush_dbs() but when backing up. All UTXOs are flushed.""" assert not flush_data.headers - assert not flush_data.block_tx_hashes + assert not flush_data.block_txs assert flush_data.height < self.db_height self.history.assert_flushed() @@ -431,7 +424,7 @@ class LevelDB: if tx_height > self.db_height: tx_hash = None else: - tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height async def fs_block_hashes(self, height, count): @@ -463,7 +456,7 @@ class LevelDB: if tx_height > self.db_height: tx_hash = None else: - tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) hashx_history.append((tx_hash, tx_height)) if limit and len(hashx_history) >= limit: return hashx_history