atomic flush_dbs

This commit is contained in:
Jack Robison 2021-01-11 12:17:54 -05:00
parent cf5dba9157
commit 9f224a971b
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 134 additions and 112 deletions

View file

@ -2,14 +2,17 @@ import time
import asyncio import asyncio
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional from typing import Optional, List, Tuple
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
import lbry import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.tx import Tx
from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.transaction import Transaction
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
@ -212,15 +215,15 @@ class BlockProcessor:
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
if hprevs == chain: if hprevs == chain:
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.run_in_thread_with_lock(self.advance_blocks, blocks)
if self.sql: if self.sql:
await self.db.search_index.claim_consumer(self.sql.claim_producer()) await self.db.search_index.claim_consumer(self.sql.claim_producer())
for cache in self.search_cache.values(): for cache in self.search_cache.values():
cache.clear() cache.clear()
self.history_cache.clear() self.history_cache.clear() # TODO: is this needed?
self.notifications.notified_mempool_txs.clear() self.notifications.notified_mempool_txs.clear()
await self._maybe_flush()
processed_time = time.perf_counter() - start processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height) self.block_count_metric.set(self.height)
self.block_update_time_metric.observe(processed_time) self.block_update_time_metric.observe(processed_time)
@ -423,7 +426,14 @@ class BlockProcessor:
self.headers.extend(headers) self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1]) self.tip = self.coin.header_hash(headers[-1])
def advance_txs(self, height, txs, header, block_hash): self.db.flush_dbs(self.flush_data(), True, self.estimate_txs_remaining)
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
def advance_txs(self, height, txs: List[Tuple[Tx, bytes]], header, block_hash):
self.block_hashes.append(block_hash) self.block_hashes.append(block_hash)
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
@ -611,7 +621,6 @@ class BlockProcessor:
for hdb_key, hashX in candidates.items(): for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:] tx_num_packed = hdb_key[-4:]
if len(candidates) > 1: if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed) tx_num, = unpack('<I', tx_num_packed)
try: try:
hash, height = self.db.fs_tx_hash(tx_num) hash, height = self.db.fs_tx_hash(tx_num)
@ -635,6 +644,7 @@ class BlockProcessor:
# Remove both entries for this UTXO # Remove both entries for this UTXO
self.db_deletes.append(hdb_key) self.db_deletes.append(hdb_key)
self.db_deletes.append(udb_key) self.db_deletes.append(udb_key)
return hashX + tx_num_packed + utxo_value_packed return hashX + tx_num_packed + utxo_value_packed
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table') self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')

View file

@ -29,25 +29,26 @@ from lbry.utils import LRUCacheWithMetrics
from lbry.wallet.server import util from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time from lbry.wallet.server.util import formatted_time, pack_be_uint16
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
from lbry.wallet.server.history import History from lbry.wallet.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HISTORY_PREFIX = b'A'
TX_PREFIX = b'B'
BLOCK_HASH_PREFIX = b'C'
HEADER_PREFIX = b'H' HEADER_PREFIX = b'H'
TX_NUM_PREFIX = b'N'
TX_COUNT_PREFIX = b'T' TX_COUNT_PREFIX = b'T'
TX_HASH_PREFIX = b'X' TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C'
HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h' HASHX_UTXO_PREFIX = b'h'
HIST_STATE = b'state-hist'
UTXO_STATE = b'state-utxo'
UTXO_PREFIX = b'u' UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x' HASHX_HISTORY_PREFIX = b'x'
UTXO_STATE = b'state-utxo'
HIST_STATE = b'state-hist'
@ -241,106 +242,6 @@ class LevelDB:
assert not flush_data.undo_infos assert not flush_data.undo_infos
self.history.assert_flushed() self.history.assert_flushed()
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
self.flush_fs(flush_data)
# Then history
self.flush_history()
# Flush state last as it reads the wall time.
with self.db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
def flush_fs(self, flush_data):
"""Write headers, tx counts and block tx hashes to the filesystem.
The first height to write is self.fs_height + 1. The FS
metadata is all append-only, so in a crash we just pick up
again from the height stored in the DB.
"""
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_txs) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
assert len(
b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers
start_time = time.perf_counter()
with self.db.write_batch() as batch:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
self.headers.append(header)
flush_data.headers.clear()
height_start = self.fs_height + 1
tx_num = prior_tx_count
with self.db.write_batch() as batch:
batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1
offset = 0
while offset < len(tx_hashes):
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num))
batch_put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32])
tx_num += 1
offset += 32
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
elapsed = time.perf_counter() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
self.history.flush()
def flush_utxo_db(self, batch, flush_data): def flush_utxo_db(self, batch, flush_data):
"""Flush the cached DB writes and UTXO set to the batch.""" """Flush the cached DB writes and UTXO set to the batch."""
# Care is needed because the writes generated by flushing the # Care is needed because the writes generated by flushing the
@ -384,6 +285,117 @@ class LevelDB:
self.db_tx_count = flush_data.tx_count self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip self.db_tip = flush_data.tip
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
# self.flush_fs(flush_data)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_txs) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
assert len(
b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers
start_time = time.perf_counter()
with self.db.write_batch() as batch:
batch_put = batch.put
height_start = self.fs_height + 1
tx_num = prior_tx_count
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
batch_put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
self.headers.append(header)
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1
offset = 0
while offset < len(tx_hashes):
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
batch_put(TX_PREFIX + tx_hashes[offset:offset + 32], txs[offset // 32])
tx_num += 1
offset += 32
flush_data.headers.clear()
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
# flush_data.claim_txo_cache.clear()
# flush_data.support_txo_cache.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
# Then history
self.history.flush_count += 1
flush_id = pack_be_uint16(self.history.flush_count)
unflushed = self.history.unflushed
for hashX in sorted(unflushed):
key = hashX + flush_id
batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.history.write_state(batch)
unflushed.clear()
self.history.unflushed_count = 0
#########################
# Flush state last as it reads the wall time.
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
# self.flush_state(batch)
#
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
# # Update and put the wall time again - otherwise we drop the
# # time it took to commit the batch
# # self.flush_state(self.db)
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
def flush_state(self, batch): def flush_state(self, batch):
"""Flush chain state to the batch.""" """Flush chain state to the batch."""
now = time.time() now = time.time()