forked from LBRYCommunity/lbry-sdk
tx hashes db
This commit is contained in:
parent
71eccdc0e3
commit
375187aa70
1 changed files with 27 additions and 14 deletions
|
@ -82,8 +82,7 @@ class LevelDB:
|
|||
self.merkle = Merkle()
|
||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||
|
||||
path = partial(os.path.join, self.env.db_dir)
|
||||
self.hashes_file = util.LogicalFile(path('hashes'), 4, 16000000)
|
||||
self.hashes_db = None
|
||||
self.headers_db = None
|
||||
self.tx_count_db = None
|
||||
|
||||
|
@ -130,6 +129,12 @@ class LevelDB:
|
|||
self.logger.info('created new tx count db')
|
||||
self.logger.info(f'opened tx count DB (for sync: {for_sync})')
|
||||
|
||||
assert self.hashes_db is None
|
||||
self.hashes_db = self.db_class('hashes', for_sync)
|
||||
if self.hashes_db.is_new:
|
||||
self.logger.info('created new tx hashes db')
|
||||
self.logger.info(f'opened tx hashes DB (for sync: {for_sync})')
|
||||
|
||||
assert self.utxo_db is None
|
||||
# First UTXO DB
|
||||
self.utxo_db = self.db_class('utxo', for_sync)
|
||||
|
@ -152,6 +157,7 @@ class LevelDB:
|
|||
self.history.close_db()
|
||||
self.headers_db.close()
|
||||
self.tx_count_db.close()
|
||||
self.hashes_db.close()
|
||||
self.executor.shutdown(wait=True)
|
||||
self.executor = None
|
||||
|
||||
|
@ -184,6 +190,10 @@ class LevelDB:
|
|||
if self.tx_count_db:
|
||||
self.tx_count_db.close()
|
||||
self.tx_count_db = None
|
||||
if self.hashes_db:
|
||||
self.hashes_db.close()
|
||||
self.hashes_db = None
|
||||
|
||||
await self._open_dbs(False, False)
|
||||
self.logger.info("opened for serving")
|
||||
|
||||
|
@ -270,13 +280,10 @@ class LevelDB:
|
|||
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
|
||||
else 0)
|
||||
assert len(self.tx_counts) == flush_data.height + 1
|
||||
hashes = b''.join(flush_data.block_tx_hashes)
|
||||
flush_data.block_tx_hashes.clear()
|
||||
assert len(hashes) % 32 == 0
|
||||
assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count
|
||||
assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count
|
||||
|
||||
# Write the headers, tx counts, and tx hashes
|
||||
start_time = time.time()
|
||||
start_time = time.perf_counter()
|
||||
height_start = self.fs_height + 1
|
||||
|
||||
for header in flush_data.headers:
|
||||
|
@ -284,16 +291,22 @@ class LevelDB:
|
|||
self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
|
||||
self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
height_start += 1
|
||||
offset = prior_tx_count * 32
|
||||
self.hashes_file.write(offset, hashes)
|
||||
|
||||
tx_num = prior_tx_count
|
||||
for tx_hashes in flush_data.block_tx_hashes:
|
||||
offset = 0
|
||||
while offset < len(tx_hashes):
|
||||
self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
|
||||
tx_num += 1
|
||||
offset += 32
|
||||
|
||||
flush_data.block_tx_hashes.clear()
|
||||
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
|
||||
flush_data.headers.clear()
|
||||
if self.utxo_db.for_sync:
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
|
||||
elapsed = time.perf_counter() - start_time
|
||||
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
|
||||
|
||||
def flush_history(self):
|
||||
self.history.flush()
|
||||
|
@ -421,7 +434,7 @@ class LevelDB:
|
|||
if tx_height > self.db_height:
|
||||
tx_hash = None
|
||||
else:
|
||||
tx_hash = self.hashes_file.read(tx_num * 32, 32)
|
||||
tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
||||
return tx_hash, tx_height
|
||||
|
||||
async def fs_block_hashes(self, height, count):
|
||||
|
|
Loading…
Reference in a new issue