combine leveldb databases

This commit is contained in:
Jack Robison 2020-11-29 15:19:35 -05:00
parent 18340c248d
commit d960ba7412
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 54 additions and 64 deletions

View file

@ -586,7 +586,7 @@ class BlockProcessor:
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX
in self.db.utxo_db.iterator(prefix=prefix)}
in self.db.db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:]
@ -601,7 +601,7 @@ class BlockProcessor:
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.db.utxo_db.get(udb_key)
utxo_value_packed = self.db.db.get(udb_key)
if utxo_value_packed:
# Remove both entries for this UTXO
self.db_deletes.append(hdb_key)

View file

@ -20,6 +20,10 @@ from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
HASHX_HISTORY_PREFIX = b'x'
HIST_STATE = b'state-hist'
class History:
DB_VERSIONS = [0]
@ -32,8 +36,8 @@ class History:
self.unflushed_count = 0
self.db = None
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
self.db = db_class('hist', for_sync)
def open_db(self, db, for_sync, utxo_flush_count, compacting):
self.db = db #db_class('hist', for_sync)
self.read_state()
self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise
@ -44,11 +48,11 @@ class History:
def close_db(self):
if self.db:
self.db.close()
# self.db.close()
self.db = None
def read_state(self):
state = self.db.get(b'state\0\0')
state = self.db.get(HIST_STATE)
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
@ -80,17 +84,18 @@ class History:
'excess history flushes...')
keys = []
for key, hist in self.db.iterator(prefix=b''):
flush_id, = unpack_be_uint16_from(key[-2:])
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
k = key[1:]
flush_id, = unpack_be_uint16_from(k[-2:])
if flush_id > utxo_flush_count:
keys.append(key)
keys.append(k)
self.logger.info(f'deleting {len(keys):,d} history entries')
self.flush_count = utxo_flush_count
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
batch.delete(HASHX_HISTORY_PREFIX + key)
self.write_state(batch)
self.logger.info('deleted excess history entries')
@ -105,7 +110,7 @@ class History:
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(b'state\0\0', repr(state).encode())
batch.put(HIST_STATE, repr(state).encode())
def add_unflushed(self, hashXs_by_tx, first_tx_num):
unflushed = self.unflushed
@ -132,7 +137,7 @@ class History:
with self.db.write_batch() as batch:
for hashX in sorted(unflushed):
key = hashX + flush_id
batch.put(key, unflushed[hashX].tobytes())
batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.write_state(batch)
count = len(unflushed)
@ -154,16 +159,17 @@ class History:
for hashX in sorted(hashXs):
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
k = key[1:]
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= tx_count
idx = bisect_left(a, tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
puts[k] = a[:idx].tobytes()
break
deletes.append(key)
deletes.append(k)
for key in deletes:
batch.delete(key)
@ -221,9 +227,9 @@ class History:
with self.db.write_batch() as batch:
# Important: delete first! The keyspace may overlap.
for key in keys_to_delete:
batch.delete(key)
batch.delete(HASHX_HISTORY_PREFIX + key)
for key, value in write_items:
batch.put(key, value)
batch.put(HASHX_HISTORY_PREFIX + key, value)
self.write_state(batch)
def _compact_hashX(self, hashX, hist_map, hist_list,
@ -271,11 +277,12 @@ class History:
key_len = HASHX_LEN + 2
write_size = 0
for key, hist in self.db.iterator(prefix=prefix):
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
k = key[1:]
# Ignore non-history entries
if len(key) != key_len:
if len(k) != key_len:
continue
hashX = key[:-2]
hashX = k[:-2]
if hashX != prior_hashX and prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map,
hist_list, write_items,
@ -283,7 +290,7 @@ class History:
hist_map.clear()
hist_list.clear()
prior_hashX = hashX
hist_map[key] = hist
hist_map[k] = hist
hist_list.append(hist)
if prior_hashX:

View file

@ -49,10 +49,9 @@ HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
STATE_PREFIX = b'state'
UTXO_STATE_PREFIX = b'state-utxo-'
HIST_STATE_PREFIX = b'state-hist-'
UTXO_STATE = b'state-utxo'
HIST_STATE = b'state-hist'
class RocksDBState(typing.NamedTuple):
@ -198,7 +197,7 @@ class LevelDB:
self.db_class = db_class(env.db_dir, self.env.db_engine)
self.history = History()
self.utxo_db = None
self.db = None
self.tx_counts = None
self.headers = None
self.last_flush = time.time()
@ -209,9 +208,6 @@ class LevelDB:
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self.db = None
self.db = None
self._block_txs_cache = pylru.lrucache(50000)
self._merkle_tx_cache = pylru.lrucache(100000)
self.total_transactions = None
@ -278,17 +274,11 @@ class LevelDB:
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
self.logger.info(f'opened DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new utxo db')
self.logger.info(f'opened utxo db (for sync: {for_sync})')
self.read_utxo_state()
# Then history DB
self.utxo_flush_count = self.history.open_db(
self.db_class, for_sync, self.utxo_flush_count, compacting
self.db, for_sync, self.utxo_flush_count, compacting
)
self.clear_excess_undo_info()
@ -299,9 +289,8 @@ class LevelDB:
await self._read_headers()
def close(self):
self.utxo_db.close()
self.history.close_db()
self.db.close()
self.history.close_db()
self.executor.shutdown(wait=True)
self.executor = None
@ -322,18 +311,12 @@ class LevelDB:
"""Open the databases for serving. If they are already open they are
closed first.
"""
self.logger.info('closing DBs to re-open for serving')
if self.utxo_db:
self.logger.info('closing DBs to re-open for serving')
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
if self.db:
self.db.close()
self.db = None
if self.db:
self.db.close()
self.db = None
return
# self.logger.info('closing DBs to re-open for serving')
# self.db.close()
# self.history.close_db()
# self.db = None
await self._open_dbs(False, False)
self.logger.info("opened for serving")
@ -382,14 +365,14 @@ class LevelDB:
self.flush_history()
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
self.flush_state(self.db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
@ -397,7 +380,7 @@ class LevelDB:
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.utxo_db.for_sync:
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
@ -493,7 +476,7 @@ class LevelDB:
self.flush_undo_infos(batch_put, flush_data.undo_infos)
flush_data.undo_infos.clear()
if self.utxo_db.for_sync:
if self.db.for_sync:
block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time
@ -527,7 +510,7 @@ class LevelDB:
self.backup_fs(flush_data.height, flush_data.tx_count)
self.history.backup(touched, flush_data.tx_count)
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time.
self.flush_state(batch)
@ -690,7 +673,7 @@ class LevelDB:
cnt = 0
txs = []
for hist in self.history.db.iterator(prefix=hashX, include_key=False):
for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
@ -725,7 +708,7 @@ class LevelDB:
def read_undo_info(self, height):
"""Read undo information from a file for the current height."""
return self.utxo_db.get(self.undo_key(height))
return self.db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos):
"""undo_infos is a list of (undo_info, height) pairs."""
@ -764,14 +747,14 @@ class LevelDB:
prefix = b'U'
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix):
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
keys.append(key)
if keys:
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
@ -792,7 +775,7 @@ class LevelDB:
# -- UTXO database
def read_utxo_state(self):
state = self.utxo_db.get(b'state')
state = self.db.get(UTXO_STATE)
if not state:
self.db_height = -1
self.db_tx_count = 0
@ -835,7 +818,7 @@ class LevelDB:
self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.utxo_db.for_sync:
if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
@ -852,11 +835,11 @@ class LevelDB:
'first_sync': self.first_sync,
'db_version': self.db_version,
}
batch.put(b'state', repr(state).encode())
batch.put(UTXO_STATE, repr(state).encode())
def set_flush_count(self, count):
self.utxo_flush_count = count
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
self.write_utxo_state(batch)
async def all_utxos(self, hashX):
@ -869,7 +852,7 @@ class LevelDB:
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
for db_key, db_value in self.db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
tx_hash, height = fs_tx_hash(tx_num)
@ -902,7 +885,7 @@ class LevelDB:
prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
for db_key, hashX in self.db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
@ -921,7 +904,7 @@ class LevelDB:
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + suffix
db_value = self.utxo_db.get(key)
db_value = self.db.get(key)
if not db_value:
# This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs