combine leveldb databases

This commit is contained in:
Jack Robison 2021-01-09 14:39:20 -05:00
parent 23035b9aa0
commit cf5dba9157
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 84 additions and 109 deletions

View file

@ -605,7 +605,9 @@ class BlockProcessor:
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
candidates = dict(self.db.utxo_db.iterator(prefix=prefix))
candidates = {db_key: hashX for db_key, hashX
in self.db.db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:]
if len(candidates) > 1:
@ -624,7 +626,7 @@ class BlockProcessor:
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.db.utxo_db.get(udb_key)
utxo_value_packed = self.db.db.get(udb_key)
if utxo_value_packed is None:
self.logger.warning(
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX)

View file

@ -16,13 +16,17 @@ from collections import defaultdict
from functools import partial
from lbry.wallet.server import util
from lbry.wallet.server.util import pack_be_uint32, unpack_be_uint32_from, unpack_be_uint16_from
from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
HASHX_HISTORY_PREFIX = b'x'
HIST_STATE = b'state-hist'
class History:
DB_VERSIONS = [0, 1]
DB_VERSIONS = [0]
def __init__(self):
self.logger = util.class_logger(__name__, self.__class__.__name__)
@ -32,34 +36,9 @@ class History:
self.unflushed_count = 0
self.db = None
@property
def needs_migration(self):
return self.db_version != max(self.DB_VERSIONS)
def migrate(self):
# 0 -> 1: flush_count from 16 to 32 bits
self.logger.warning("HISTORY MIGRATION IN PROGRESS. Please avoid shutting down before it finishes.")
with self.db.write_batch() as batch:
for key, value in self.db.iterator(prefix=b''):
if len(key) != 13:
continue
flush_id, = unpack_be_uint16_from(key[-2:])
new_key = key[:-2] + pack_be_uint32(flush_id)
batch.put(new_key, value)
self.logger.warning("history migration: new keys added, removing old ones.")
for key, value in self.db.iterator(prefix=b''):
if len(key) == 13:
batch.delete(key)
self.logger.warning("history migration: writing new state.")
self.db_version = 1
self.write_state(batch)
self.logger.warning("history migration: done.")
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
self.db = db_class('hist', for_sync)
def open_db(self, db, for_sync, utxo_flush_count, compacting):
self.db = db #db_class('hist', for_sync)
self.read_state()
if self.needs_migration:
self.migrate()
self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
@ -69,11 +48,11 @@ class History:
def close_db(self):
if self.db:
self.db.close()
# self.db.close()
self.db = None
def read_state(self):
state = self.db.get(b'state\0\0')
state = self.db.get(HIST_STATE)
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
@ -105,17 +84,18 @@ class History:
'excess history flushes...')
keys = []
for key, hist in self.db.iterator(prefix=b''):
flush_id, = unpack_be_uint32_from(key[-4:])
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
k = key[1:]
flush_id, = unpack_be_uint16_from(k[-2:])
if flush_id > utxo_flush_count:
keys.append(key)
keys.append(k)
self.logger.info(f'deleting {len(keys):,d} history entries')
self.flush_count = utxo_flush_count
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
batch.delete(HASHX_HISTORY_PREFIX + key)
self.write_state(batch)
self.logger.info('deleted excess history entries')
@ -130,7 +110,7 @@ class History:
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(b'state\0\0', repr(state).encode())
batch.put(HIST_STATE, repr(state).encode())
def add_unflushed(self, hashXs_by_tx, first_tx_num):
unflushed = self.unflushed
@ -151,13 +131,13 @@ class History:
def flush(self):
start_time = time.time()
self.flush_count += 1
flush_id = pack_be_uint32(self.flush_count)
flush_id = pack_be_uint16(self.flush_count)
unflushed = self.unflushed
with self.db.write_batch() as batch:
for hashX in sorted(unflushed):
key = hashX + flush_id
batch.put(key, unflushed[hashX].tobytes())
batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.write_state(batch)
count = len(unflushed)
@ -179,16 +159,17 @@ class History:
for hashX in sorted(hashXs):
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
k = key[1:]
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= tx_count
idx = bisect_left(a, tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
puts[k] = a[:idx].tobytes()
break
deletes.append(key)
deletes.append(k)
for key in deletes:
batch.delete(key)
@ -246,9 +227,9 @@ class History:
with self.db.write_batch() as batch:
# Important: delete first! The keyspace may overlap.
for key in keys_to_delete:
batch.delete(key)
batch.delete(HASHX_HISTORY_PREFIX + key)
for key, value in write_items:
batch.put(key, value)
batch.put(HASHX_HISTORY_PREFIX + key, value)
self.write_state(batch)
def _compact_hashX(self, hashX, hist_map, hist_list,
@ -275,7 +256,7 @@ class History:
write_size = 0
keys_to_delete.update(hist_map)
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
key = hashX + pack_be_uint32(n)
key = hashX + pack_be_uint16(n)
if hist_map.get(key) == chunk:
keys_to_delete.remove(key)
else:
@ -296,11 +277,12 @@ class History:
key_len = HASHX_LEN + 2
write_size = 0
for key, hist in self.db.iterator(prefix=prefix):
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
k = key[1:]
# Ignore non-history entries
if len(key) != key_len:
if len(k) != key_len:
continue
hashX = key[:-2]
hashX = k[:-2]
if hashX != prior_hashX and prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map,
hist_list, write_items,
@ -308,7 +290,7 @@ class History:
hist_map.clear()
hist_list.clear()
prior_hashX = hashX
hist_map[key] = hist
hist_map[k] = hist
hist_list.append(hist)
if prior_hashX:
@ -326,8 +308,8 @@ class History:
# Loop over 2-byte prefixes
cursor = self.comp_cursor
while write_size < limit and cursor < (1 << 32):
prefix = pack_be_uint32(cursor)
while write_size < limit and cursor < 65536:
prefix = pack_be_uint16(cursor)
write_size += self._compact_prefix(prefix, write_items,
keys_to_delete)
cursor += 1

View file

@ -41,6 +41,14 @@ TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C'
HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
UTXO_STATE = b'state-utxo'
HIST_STATE = b'state-hist'
@ -80,7 +88,7 @@ class LevelDB:
self.db_class = db_class(env.db_dir, self.env.db_engine)
self.history = History()
self.utxo_db = None
self.db = None
self.tx_counts = None
self.headers = None
self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server')
@ -107,7 +115,7 @@ class LevelDB:
def get_counts():
return tuple(
util.unpack_be_uint64(tx_count)
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
)
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
@ -122,7 +130,7 @@ class LevelDB:
async def _read_txids(self):
def get_txids():
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
start = time.perf_counter()
self.logger.info("loading txids")
@ -137,7 +145,9 @@ class LevelDB:
return
def get_headers():
return list(self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False))
return [
header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False)
]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
@ -152,29 +162,17 @@ class LevelDB:
f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode())
assert self.headers_db is None
self.headers_db = self.db_class('headers', for_sync)
if self.headers_db.is_new:
self.logger.info('created new headers db')
self.logger.info(f'opened headers DB (for sync: {for_sync})')
assert self.db is None
self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
if self.db.is_new:
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
self.logger.info(f'opened DB (for sync: {for_sync})')
assert self.tx_db is None
self.tx_db = self.db_class('tx', for_sync)
if self.tx_db.is_new:
self.logger.info('created new tx db')
self.logger.info(f'opened tx DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new utxo db')
self.logger.info(f'opened utxo db (for sync: {for_sync})')
self.read_utxo_state()
# Then history DB
self.utxo_flush_count = self.history.open_db(
self.db_class, for_sync, self.utxo_flush_count, compacting
self.db, for_sync, self.utxo_flush_count, compacting
)
self.clear_excess_undo_info()
@ -185,10 +183,8 @@ class LevelDB:
await self._read_headers()
def close(self):
self.utxo_db.close()
self.db.close()
self.history.close_db()
self.headers_db.close()
self.tx_db.close()
self.executor.shutdown(wait=True)
self.executor = None
@ -209,18 +205,12 @@ class LevelDB:
"""Open the databases for serving. If they are already open they are
closed first.
"""
self.logger.info('closing DBs to re-open for serving')
if self.utxo_db:
self.logger.info('closing DBs to re-open for serving')
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
if self.headers_db:
self.headers_db.close()
self.headers_db = None
if self.tx_db:
self.tx_db.close()
self.tx_db = None
if self.db:
return
# self.logger.info('closing DBs to re-open for serving')
# self.db.close()
# self.history.close_db()
# self.db = None
await self._open_dbs(False, False)
self.logger.info("opened for serving")
@ -269,14 +259,14 @@ class LevelDB:
self.flush_history()
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
self.flush_state(self.db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
@ -284,7 +274,7 @@ class LevelDB:
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.utxo_db.for_sync:
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
@ -315,7 +305,7 @@ class LevelDB:
# Write the headers
start_time = time.perf_counter()
with self.headers_db.write_batch() as batch:
with self.db.write_batch() as batch:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
@ -325,7 +315,7 @@ class LevelDB:
height_start = self.fs_height + 1
tx_num = prior_tx_count
with self.tx_db.write_batch() as batch:
with self.db.write_batch() as batch:
batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start]
@ -380,7 +370,7 @@ class LevelDB:
self.flush_undo_infos(batch_put, flush_data.undo_infos)
flush_data.undo_infos.clear()
if self.utxo_db.for_sync:
if self.db.for_sync:
block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time
@ -414,7 +404,7 @@ class LevelDB:
self.backup_fs(flush_data.height, flush_data.tx_count)
self.history.backup(touched, flush_data.tx_count)
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time.
self.flush_state(batch)
@ -488,9 +478,8 @@ class LevelDB:
def _fs_transactions(self, txids: Iterable[str]):
unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
tx_db_get = self.db.get
tx_cache = self._tx_and_merkle_cache
tx_infos = {}
for tx_hash in txids:
@ -548,11 +537,13 @@ class LevelDB:
def read_history():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0
txs = []
for hist in self.history.db.iterator(prefix=hashX, include_key=False):
for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
@ -587,7 +578,7 @@ class LevelDB:
def read_undo_info(self, height):
"""Read undo information from a file for the current height."""
return self.utxo_db.get(self.undo_key(height))
return self.db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos):
"""undo_infos is a list of (undo_info, height) pairs."""
@ -626,14 +617,14 @@ class LevelDB:
prefix = b'U'
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix):
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
keys.append(key)
if keys:
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
@ -654,7 +645,7 @@ class LevelDB:
# -- UTXO database
def read_utxo_state(self):
state = self.utxo_db.get(b'state')
state = self.db.get(UTXO_STATE)
if not state:
self.db_height = -1
self.db_tx_count = 0
@ -697,7 +688,7 @@ class LevelDB:
self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.utxo_db.for_sync:
if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
@ -714,11 +705,11 @@ class LevelDB:
'first_sync': self.first_sync,
'db_version': self.db_version,
}
batch.put(b'state', repr(state).encode())
batch.put(UTXO_STATE, repr(state).encode())
def set_flush_count(self, count):
self.utxo_flush_count = count
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
self.write_utxo_state(batch)
async def all_utxos(self, hashX):
@ -731,7 +722,7 @@ class LevelDB:
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
for db_key, db_value in self.db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
tx_hash, height = fs_tx_hash(tx_num)
@ -764,7 +755,7 @@ class LevelDB:
prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
for db_key, hashX in self.db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
@ -783,7 +774,7 @@ class LevelDB:
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + suffix
db_value = self.utxo_db.get(key)
db_value = self.db.get(key)
if not db_value:
# This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs