forked from LBRYCommunity/lbry-sdk
combine tx_count_db and hashes_db, add tx db
This commit is contained in:
parent
39202a3d79
commit
bdc7f4b3f5
2 changed files with 29 additions and 36 deletions
|
@ -171,7 +171,7 @@ class BlockProcessor:
|
|||
|
||||
# Caches of unflushed items.
|
||||
self.headers = []
|
||||
self.tx_hashes = []
|
||||
self.block_txs = []
|
||||
self.undo_infos = []
|
||||
|
||||
# UTXO cache
|
||||
|
@ -337,7 +337,7 @@ class BlockProcessor:
|
|||
"""The data for a flush. The lock must be taken."""
|
||||
assert self.state_lock.locked()
|
||||
return FlushData(self.height, self.tx_count, self.headers,
|
||||
self.tx_hashes, self.undo_infos, self.utxo_cache,
|
||||
self.block_txs, self.undo_infos, self.utxo_cache,
|
||||
self.db_deletes, self.tip)
|
||||
|
||||
async def flush(self, flush_utxos):
|
||||
|
@ -404,7 +404,7 @@ class BlockProcessor:
|
|||
self.tip = self.coin.header_hash(headers[-1])
|
||||
|
||||
def advance_txs(self, height, txs, header):
|
||||
self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs))
|
||||
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx for tx, _ in txs]))
|
||||
|
||||
# Use local vars for speed in the loops
|
||||
undo_info = []
|
||||
|
|
|
@ -35,6 +35,7 @@ UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
|||
HEADER_PREFIX = b'H'
|
||||
TX_COUNT_PREFIX = b'T'
|
||||
TX_HASH_PREFIX = b'X'
|
||||
TX_PREFIX = b'B'
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
|
@ -42,7 +43,7 @@ class FlushData:
|
|||
height = attr.ib()
|
||||
tx_count = attr.ib()
|
||||
headers = attr.ib()
|
||||
block_tx_hashes = attr.ib()
|
||||
block_txs = attr.ib()
|
||||
# The following are flushed to the UTXO DB if undo_infos is not None
|
||||
undo_infos = attr.ib()
|
||||
adds = attr.ib()
|
||||
|
@ -82,9 +83,8 @@ class LevelDB:
|
|||
self.merkle = Merkle()
|
||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||
|
||||
self.hashes_db = None
|
||||
self.headers_db = None
|
||||
self.tx_count_db = None
|
||||
self.tx_db = None
|
||||
|
||||
async def _read_tx_counts(self):
|
||||
if self.tx_counts is not None:
|
||||
|
@ -95,7 +95,7 @@ class LevelDB:
|
|||
def get_counts():
|
||||
return tuple(
|
||||
util.unpack_be_uint64(tx_count)
|
||||
for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
|
||||
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
|
||||
)
|
||||
|
||||
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
|
||||
|
@ -123,17 +123,11 @@ class LevelDB:
|
|||
self.logger.info('created new headers db')
|
||||
self.logger.info(f'opened headers DB (for sync: {for_sync})')
|
||||
|
||||
assert self.tx_count_db is None
|
||||
self.tx_count_db = self.db_class('tx_count', for_sync)
|
||||
if self.tx_count_db.is_new:
|
||||
self.logger.info('created new tx count db')
|
||||
self.logger.info(f'opened tx count DB (for sync: {for_sync})')
|
||||
|
||||
assert self.hashes_db is None
|
||||
self.hashes_db = self.db_class('hashes', for_sync)
|
||||
if self.hashes_db.is_new:
|
||||
self.logger.info('created new tx hashes db')
|
||||
self.logger.info(f'opened tx hashes DB (for sync: {for_sync})')
|
||||
assert self.tx_db is None
|
||||
self.tx_db = self.db_class('tx', for_sync)
|
||||
if self.tx_db.is_new:
|
||||
self.logger.info('created new tx db')
|
||||
self.logger.info(f'opened tx DB (for sync: {for_sync})')
|
||||
|
||||
assert self.utxo_db is None
|
||||
# First UTXO DB
|
||||
|
@ -156,8 +150,7 @@ class LevelDB:
|
|||
self.utxo_db.close()
|
||||
self.history.close_db()
|
||||
self.headers_db.close()
|
||||
self.tx_count_db.close()
|
||||
self.hashes_db.close()
|
||||
self.tx_db.close()
|
||||
self.executor.shutdown(wait=True)
|
||||
self.executor = None
|
||||
|
||||
|
@ -187,12 +180,9 @@ class LevelDB:
|
|||
if self.headers_db:
|
||||
self.headers_db.close()
|
||||
self.headers_db = None
|
||||
if self.tx_count_db:
|
||||
self.tx_count_db.close()
|
||||
self.tx_count_db = None
|
||||
if self.hashes_db:
|
||||
self.hashes_db.close()
|
||||
self.hashes_db = None
|
||||
if self.tx_db:
|
||||
self.tx_db.close()
|
||||
self.tx_db = None
|
||||
|
||||
await self._open_dbs(False, False)
|
||||
self.logger.info("opened for serving")
|
||||
|
@ -217,7 +207,7 @@ class LevelDB:
|
|||
assert flush_data.height == self.fs_height == self.db_height
|
||||
assert flush_data.tip == self.db_tip
|
||||
assert not flush_data.headers
|
||||
assert not flush_data.block_tx_hashes
|
||||
assert not flush_data.block_txs
|
||||
assert not flush_data.adds
|
||||
assert not flush_data.deletes
|
||||
assert not flush_data.undo_infos
|
||||
|
@ -275,30 +265,33 @@ class LevelDB:
|
|||
"""
|
||||
prior_tx_count = (self.tx_counts[self.fs_height]
|
||||
if self.fs_height >= 0 else 0)
|
||||
assert len(flush_data.block_tx_hashes) == len(flush_data.headers)
|
||||
assert len(flush_data.block_txs) == len(flush_data.headers)
|
||||
assert flush_data.height == self.fs_height + len(flush_data.headers)
|
||||
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
|
||||
else 0)
|
||||
assert len(self.tx_counts) == flush_data.height + 1
|
||||
assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count
|
||||
assert len(
|
||||
b''.join(hashes for hashes, _ in flush_data.block_txs)
|
||||
) // 32 == flush_data.tx_count - prior_tx_count
|
||||
|
||||
# Write the headers, tx counts, and tx hashes
|
||||
start_time = time.perf_counter()
|
||||
height_start = self.fs_height + 1
|
||||
tx_num = prior_tx_count
|
||||
|
||||
for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes):
|
||||
for header, (tx_hashes, txs) in zip(flush_data.headers, flush_data.block_txs):
|
||||
tx_count = self.tx_counts[height_start]
|
||||
self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
|
||||
self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
height_start += 1
|
||||
offset = 0
|
||||
while offset < len(tx_hashes):
|
||||
self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
|
||||
self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
|
||||
self.tx_db.put(TX_PREFIX + util.pack_be_uint64(tx_num), txs[offset // 32])
|
||||
tx_num += 1
|
||||
offset += 32
|
||||
|
||||
flush_data.block_tx_hashes.clear()
|
||||
flush_data.block_txs.clear()
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
flush_data.headers.clear()
|
||||
|
@ -362,7 +355,7 @@ class LevelDB:
|
|||
def flush_backup(self, flush_data, touched):
|
||||
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
|
||||
assert not flush_data.headers
|
||||
assert not flush_data.block_tx_hashes
|
||||
assert not flush_data.block_txs
|
||||
assert flush_data.height < self.db_height
|
||||
self.history.assert_flushed()
|
||||
|
||||
|
@ -431,7 +424,7 @@ class LevelDB:
|
|||
if tx_height > self.db_height:
|
||||
tx_hash = None
|
||||
else:
|
||||
tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
||||
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
||||
return tx_hash, tx_height
|
||||
|
||||
async def fs_block_hashes(self, height, count):
|
||||
|
@ -463,7 +456,7 @@ class LevelDB:
|
|||
if tx_height > self.db_height:
|
||||
tx_hash = None
|
||||
else:
|
||||
tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
||||
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
||||
hashx_history.append((tx_hash, tx_height))
|
||||
if limit and len(hashx_history) >= limit:
|
||||
return hashx_history
|
||||
|
|
Loading…
Add table
Reference in a new issue