merge tx_db and headers_db

This commit is contained in:
Jack Robison 2020-11-29 14:56:21 -05:00
parent 9012db0cfb
commit 18340c248d
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -32,7 +32,7 @@ import attr
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from
from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from, pack_le_int32
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.history import History
if typing.TYPE_CHECKING:
@ -51,6 +51,9 @@ UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
STATE_PREFIX = b'state'
UTXO_STATE_PREFIX = b'state-utxo-'
HIST_STATE_PREFIX = b'state-hist-'
class RocksDBState(typing.NamedTuple):
db_version: int
@ -206,8 +209,8 @@ class LevelDB:
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self.headers_db = None
self.tx_db = None
self.db = None
self.db = None
self._block_txs_cache = pylru.lrucache(50000)
self._merkle_tx_cache = pylru.lrucache(100000)
@ -222,7 +225,7 @@ class LevelDB:
def get_counts():
return tuple(
util.unpack_be_uint64(tx_count)
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
)
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
@ -237,7 +240,7 @@ class LevelDB:
async def _read_txids(self):
def get_txids():
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
start = time.perf_counter()
self.logger.info("loading txids")
@ -253,7 +256,7 @@ class LevelDB:
def get_headers():
return [
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False)
header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False)
]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
@ -269,17 +272,11 @@ class LevelDB:
f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode())
assert self.headers_db is None
self.headers_db = self.db_class('headers', for_sync)
if self.headers_db.is_new:
self.logger.info('created new headers db')
self.logger.info(f'opened headers DB (for sync: {for_sync})')
assert self.tx_db is None
self.tx_db = self.db_class('tx', for_sync)
if self.tx_db.is_new:
self.logger.info('created new tx db')
self.logger.info(f'opened tx DB (for sync: {for_sync})')
assert self.db is None
self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
if self.db.is_new:
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
self.logger.info(f'opened DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
@ -304,8 +301,7 @@ class LevelDB:
def close(self):
self.utxo_db.close()
self.history.close_db()
self.headers_db.close()
self.tx_db.close()
self.db.close()
self.executor.shutdown(wait=True)
self.executor = None
@ -332,12 +328,12 @@ class LevelDB:
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
if self.headers_db:
self.headers_db.close()
self.headers_db = None
if self.tx_db:
self.tx_db.close()
self.tx_db = None
if self.db:
self.db.close()
self.db = None
if self.db:
self.db.close()
self.db = None
await self._open_dbs(False, False)
self.logger.info("opened for serving")
@ -432,7 +428,7 @@ class LevelDB:
# Write the headers
start_time = time.perf_counter()
with self.headers_db.write_batch() as batch:
with self.db.write_batch() as batch:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
@ -442,7 +438,7 @@ class LevelDB:
height_start = self.fs_height + 1
tx_num = prior_tx_count
with self.tx_db.write_batch() as batch:
with self.db.write_batch() as batch:
batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start]
@ -594,7 +590,7 @@ class LevelDB:
tx_pos = tx_num - tx_counts[tx_height - 1]
def _update_block_txs_cache():
block_txs = list(self.tx_db.iterator(
block_txs = list(self.db.iterator(
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False
@ -625,7 +621,7 @@ class LevelDB:
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
tx_db_get = self.db.get
tx_infos = []
for tx_hash in txids:
@ -688,7 +684,7 @@ class LevelDB:
def read_history():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
tx_db_get = self.db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0