combine leveldb databases

This commit is contained in:
Jack Robison 2021-01-09 14:39:20 -05:00
parent d3ac65e431
commit e2864b76dc
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 75 additions and 78 deletions

View file

@ -603,7 +603,8 @@ class BlockProcessor:
# Value: hashX # Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX candidates = {db_key: hashX for db_key, hashX
in self.db.utxo_db.iterator(prefix=prefix)} in self.db.db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items(): for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:] tx_num_packed = hdb_key[-4:]
if len(candidates) > 1: if len(candidates) > 1:
@ -622,7 +623,7 @@ class BlockProcessor:
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:] udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.db.utxo_db.get(udb_key) utxo_value_packed = self.db.db.get(udb_key)
if utxo_value_packed is None: if utxo_value_packed is None:
self.logger.warning( self.logger.warning(
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX) "%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX)

View file

@ -20,6 +20,10 @@ from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
HASHX_HISTORY_PREFIX = b'x'
HIST_STATE = b'state-hist'
class History: class History:
DB_VERSIONS = [0] DB_VERSIONS = [0]
@ -32,8 +36,8 @@ class History:
self.unflushed_count = 0 self.unflushed_count = 0
self.db = None self.db = None
def open_db(self, db_class, for_sync, utxo_flush_count, compacting): def open_db(self, db, for_sync, utxo_flush_count, compacting):
self.db = db_class('hist', for_sync) self.db = db #db_class('hist', for_sync)
self.read_state() self.read_state()
self.clear_excess(utxo_flush_count) self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise # An incomplete compaction needs to be cancelled otherwise
@ -44,11 +48,11 @@ class History:
def close_db(self): def close_db(self):
if self.db: if self.db:
self.db.close() # self.db.close()
self.db = None self.db = None
def read_state(self): def read_state(self):
state = self.db.get(b'state\0\0') state = self.db.get(HIST_STATE)
if state: if state:
state = ast.literal_eval(state.decode()) state = ast.literal_eval(state.decode())
if not isinstance(state, dict): if not isinstance(state, dict):
@ -80,17 +84,18 @@ class History:
'excess history flushes...') 'excess history flushes...')
keys = [] keys = []
for key, hist in self.db.iterator(prefix=b''): for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
flush_id, = unpack_be_uint16_from(key[-2:]) k = key[1:]
flush_id, = unpack_be_uint16_from(k[-2:])
if flush_id > utxo_flush_count: if flush_id > utxo_flush_count:
keys.append(key) keys.append(k)
self.logger.info(f'deleting {len(keys):,d} history entries') self.logger.info(f'deleting {len(keys):,d} history entries')
self.flush_count = utxo_flush_count self.flush_count = utxo_flush_count
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
for key in keys: for key in keys:
batch.delete(key) batch.delete(HASHX_HISTORY_PREFIX + key)
self.write_state(batch) self.write_state(batch)
self.logger.info('deleted excess history entries') self.logger.info('deleted excess history entries')
@ -105,7 +110,7 @@ class History:
} }
# History entries are not prefixed; the suffix \0\0 ensures we # History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with # look similar to other entries and aren't interfered with
batch.put(b'state\0\0', repr(state).encode()) batch.put(HIST_STATE, repr(state).encode())
def add_unflushed(self, hashXs_by_tx, first_tx_num): def add_unflushed(self, hashXs_by_tx, first_tx_num):
unflushed = self.unflushed unflushed = self.unflushed
@ -132,7 +137,7 @@ class History:
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
for hashX in sorted(unflushed): for hashX in sorted(unflushed):
key = hashX + flush_id key = hashX + flush_id
batch.put(key, unflushed[hashX].tobytes()) batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.write_state(batch) self.write_state(batch)
count = len(unflushed) count = len(unflushed)
@ -154,16 +159,17 @@ class History:
for hashX in sorted(hashXs): for hashX in sorted(hashXs):
deletes = [] deletes = []
puts = {} puts = {}
for key, hist in self.db.iterator(prefix=hashX, reverse=True): for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
k = key[1:]
a = array.array('I') a = array.array('I')
a.frombytes(hist) a.frombytes(hist)
# Remove all history entries >= tx_count # Remove all history entries >= tx_count
idx = bisect_left(a, tx_count) idx = bisect_left(a, tx_count)
nremoves += len(a) - idx nremoves += len(a) - idx
if idx > 0: if idx > 0:
puts[key] = a[:idx].tobytes() puts[k] = a[:idx].tobytes()
break break
deletes.append(key) deletes.append(k)
for key in deletes: for key in deletes:
batch.delete(key) batch.delete(key)
@ -221,9 +227,9 @@ class History:
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
# Important: delete first! The keyspace may overlap. # Important: delete first! The keyspace may overlap.
for key in keys_to_delete: for key in keys_to_delete:
batch.delete(key) batch.delete(HASHX_HISTORY_PREFIX + key)
for key, value in write_items: for key, value in write_items:
batch.put(key, value) batch.put(HASHX_HISTORY_PREFIX + key, value)
self.write_state(batch) self.write_state(batch)
def _compact_hashX(self, hashX, hist_map, hist_list, def _compact_hashX(self, hashX, hist_map, hist_list,
@ -271,11 +277,12 @@ class History:
key_len = HASHX_LEN + 2 key_len = HASHX_LEN + 2
write_size = 0 write_size = 0
for key, hist in self.db.iterator(prefix=prefix): for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
k = key[1:]
# Ignore non-history entries # Ignore non-history entries
if len(key) != key_len: if len(k) != key_len:
continue continue
hashX = key[:-2] hashX = k[:-2]
if hashX != prior_hashX and prior_hashX: if hashX != prior_hashX and prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map, write_size += self._compact_hashX(prior_hashX, hist_map,
hist_list, write_items, hist_list, write_items,
@ -283,7 +290,7 @@ class History:
hist_map.clear() hist_map.clear()
hist_list.clear() hist_list.clear()
prior_hashX = hashX prior_hashX = hashX
hist_map[key] = hist hist_map[k] = hist
hist_list.append(hist) hist_list.append(hist)
if prior_hashX: if prior_hashX:

View file

@ -40,6 +40,14 @@ TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B' TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N' TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C' BLOCK_HASH_PREFIX = b'C'
HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
UTXO_STATE = b'state-utxo'
HIST_STATE = b'state-hist'
@ -79,7 +87,7 @@ class LevelDB:
self.db_class = db_class(env.db_dir, self.env.db_engine) self.db_class = db_class(env.db_dir, self.env.db_engine)
self.history = History() self.history = History()
self.utxo_db = None self.db = None
self.tx_counts = None self.tx_counts = None
self.headers = None self.headers = None
self.last_flush = time.time() self.last_flush = time.time()
@ -105,7 +113,7 @@ class LevelDB:
def get_counts(): def get_counts():
return tuple( return tuple(
util.unpack_be_uint64(tx_count) util.unpack_be_uint64(tx_count)
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
) )
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
@ -120,7 +128,7 @@ class LevelDB:
async def _read_txids(self): async def _read_txids(self):
def get_txids(): def get_txids():
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
start = time.perf_counter() start = time.perf_counter()
self.logger.info("loading txids") self.logger.info("loading txids")
@ -136,7 +144,7 @@ class LevelDB:
def get_headers(): def get_headers():
return [ return [
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False) header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False)
] ]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
@ -152,29 +160,17 @@ class LevelDB:
f.write(f'ElectrumX databases and metadata for ' f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode()) f'{self.coin.NAME} {self.coin.NET}'.encode())
assert self.headers_db is None assert self.db is None
self.headers_db = self.db_class('headers', for_sync) self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
if self.headers_db.is_new: if self.db.is_new:
self.logger.info('created new headers db') self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
self.logger.info(f'opened headers DB (for sync: {for_sync})') self.logger.info(f'opened DB (for sync: {for_sync})')
assert self.tx_db is None
self.tx_db = self.db_class('tx', for_sync)
if self.tx_db.is_new:
self.logger.info('created new tx db')
self.logger.info(f'opened tx DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new utxo db')
self.logger.info(f'opened utxo db (for sync: {for_sync})')
self.read_utxo_state() self.read_utxo_state()
# Then history DB # Then history DB
self.utxo_flush_count = self.history.open_db( self.utxo_flush_count = self.history.open_db(
self.db_class, for_sync, self.utxo_flush_count, compacting self.db, for_sync, self.utxo_flush_count, compacting
) )
self.clear_excess_undo_info() self.clear_excess_undo_info()
@ -185,10 +181,8 @@ class LevelDB:
await self._read_headers() await self._read_headers()
def close(self): def close(self):
self.utxo_db.close() self.db.close()
self.history.close_db() self.history.close_db()
self.headers_db.close()
self.tx_db.close()
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
self.executor = None self.executor = None
@ -209,18 +203,12 @@ class LevelDB:
"""Open the databases for serving. If they are already open they are """Open the databases for serving. If they are already open they are
closed first. closed first.
""" """
self.logger.info('closing DBs to re-open for serving') if self.db:
if self.utxo_db: return
self.logger.info('closing DBs to re-open for serving') # self.logger.info('closing DBs to re-open for serving')
self.utxo_db.close() # self.db.close()
self.history.close_db() # self.history.close_db()
self.utxo_db = None # self.db = None
if self.headers_db:
self.headers_db.close()
self.headers_db = None
if self.tx_db:
self.tx_db.close()
self.tx_db = None
await self._open_dbs(False, False) await self._open_dbs(False, False)
self.logger.info("opened for serving") self.logger.info("opened for serving")
@ -269,14 +257,14 @@ class LevelDB:
self.flush_history() self.flush_history()
# Flush state last as it reads the wall time. # Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch: with self.db.write_batch() as batch:
if flush_utxos: if flush_utxos:
self.flush_utxo_db(batch, flush_data) self.flush_utxo_db(batch, flush_data)
self.flush_state(batch) self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the # Update and put the wall time again - otherwise we drop the
# time it took to commit the batch # time it took to commit the batch
self.flush_state(self.utxo_db) self.flush_state(self.db)
elapsed = self.last_flush - start_time elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took ' self.logger.info(f'flush #{self.history.flush_count:,d} took '
@ -284,7 +272,7 @@ class LevelDB:
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats # Catch-up stats
if self.utxo_db.for_sync: if self.db.for_sync:
flush_interval = self.last_flush - prior_flush flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval) tx_per_sec_last = 1 + int(tx_delta / flush_interval)
@ -315,7 +303,7 @@ class LevelDB:
# Write the headers # Write the headers
start_time = time.perf_counter() start_time = time.perf_counter()
with self.headers_db.write_batch() as batch: with self.db.write_batch() as batch:
batch_put = batch.put batch_put = batch.put
for i, header in enumerate(flush_data.headers): for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header) batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
@ -325,7 +313,7 @@ class LevelDB:
height_start = self.fs_height + 1 height_start = self.fs_height + 1
tx_num = prior_tx_count tx_num = prior_tx_count
with self.tx_db.write_batch() as batch: with self.db.write_batch() as batch:
batch_put = batch.put batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs): for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start] tx_count = self.tx_counts[height_start]
@ -380,7 +368,7 @@ class LevelDB:
self.flush_undo_infos(batch_put, flush_data.undo_infos) self.flush_undo_infos(batch_put, flush_data.undo_infos)
flush_data.undo_infos.clear() flush_data.undo_infos.clear()
if self.utxo_db.for_sync: if self.db.for_sync:
block_count = flush_data.height - self.db_height block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time elapsed = time.time() - start_time
@ -414,7 +402,7 @@ class LevelDB:
self.backup_fs(flush_data.height, flush_data.tx_count) self.backup_fs(flush_data.height, flush_data.tx_count)
self.history.backup(touched, flush_data.tx_count) self.history.backup(touched, flush_data.tx_count)
with self.utxo_db.write_batch() as batch: with self.db.write_batch() as batch:
self.flush_utxo_db(batch, flush_data) self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time. # Flush state last as it reads the wall time.
self.flush_state(batch) self.flush_state(batch)
@ -471,9 +459,8 @@ class LevelDB:
def _fs_transactions(self, txids: Iterable[str]): def _fs_transactions(self, txids: Iterable[str]):
unpack_be_uint64 = util.unpack_be_uint64 unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.tx_db.get tx_db_get = self.db.get
tx_cache = self._tx_and_merkle_cache tx_cache = self._tx_and_merkle_cache
tx_infos = {} tx_infos = {}
for tx_hash in txids: for tx_hash in txids:
@ -531,11 +518,13 @@ class LevelDB:
def read_history(): def read_history():
db_height = self.db_height db_height = self.db_height
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0 cnt = 0
txs = [] txs = []
for hist in self.history.db.iterator(prefix=hashX, include_key=False): for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I') a = array.array('I')
a.frombytes(hist) a.frombytes(hist)
for tx_num in a: for tx_num in a:
@ -570,7 +559,7 @@ class LevelDB:
def read_undo_info(self, height): def read_undo_info(self, height):
"""Read undo information from a file for the current height.""" """Read undo information from a file for the current height."""
return self.utxo_db.get(self.undo_key(height)) return self.db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos): def flush_undo_infos(self, batch_put, undo_infos):
"""undo_infos is a list of (undo_info, height) pairs.""" """undo_infos is a list of (undo_info, height) pairs."""
@ -609,14 +598,14 @@ class LevelDB:
prefix = b'U' prefix = b'U'
min_height = self.min_undo_height(self.db_height) min_height = self.min_undo_height(self.db_height)
keys = [] keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix): for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:]) height, = unpack('>I', key[-4:])
if height >= min_height: if height >= min_height:
break break
keys.append(key) keys.append(key)
if keys: if keys:
with self.utxo_db.write_batch() as batch: with self.db.write_batch() as batch:
for key in keys: for key in keys:
batch.delete(key) batch.delete(key)
self.logger.info(f'deleted {len(keys):,d} stale undo entries') self.logger.info(f'deleted {len(keys):,d} stale undo entries')
@ -637,7 +626,7 @@ class LevelDB:
# -- UTXO database # -- UTXO database
def read_utxo_state(self): def read_utxo_state(self):
state = self.utxo_db.get(b'state') state = self.db.get(UTXO_STATE)
if not state: if not state:
self.db_height = -1 self.db_height = -1
self.db_tx_count = 0 self.db_tx_count = 0
@ -680,7 +669,7 @@ class LevelDB:
self.logger.info(f'height: {self.db_height:,d}') self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}') self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.utxo_db.for_sync: if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync: if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
@ -697,11 +686,11 @@ class LevelDB:
'first_sync': self.first_sync, 'first_sync': self.first_sync,
'db_version': self.db_version, 'db_version': self.db_version,
} }
batch.put(b'state', repr(state).encode()) batch.put(UTXO_STATE, repr(state).encode())
def set_flush_count(self, count): def set_flush_count(self, count):
self.utxo_flush_count = count self.utxo_flush_count = count
with self.utxo_db.write_batch() as batch: with self.db.write_batch() as batch:
self.write_utxo_state(batch) self.write_utxo_state(batch)
async def all_utxos(self, hashX): async def all_utxos(self, hashX):
@ -714,7 +703,7 @@ class LevelDB:
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix): for db_key, db_value in self.db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
tx_hash, height = fs_tx_hash(tx_num) tx_hash, height = fs_tx_hash(tx_num)
@ -747,7 +736,7 @@ class LevelDB:
prefix = b'h' + tx_hash[:4] + idx_packed prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches. # Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.utxo_db.iterator(prefix=prefix): for db_key, hashX in self.db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:] tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed) tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num) hash, height = self.fs_tx_hash(tx_num)
@ -766,7 +755,7 @@ class LevelDB:
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + suffix key = b'u' + hashX + suffix
db_value = self.utxo_db.get(key) db_value = self.db.get(key)
if not db_value: if not db_value:
# This can happen if the DB was updated between # This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs # getting the hashXs and getting the UTXOs