atomic flush_dbs

This commit is contained in:
Jack Robison 2021-01-11 12:17:54 -05:00
parent e2864b76dc
commit c611ab6423
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 133 additions and 115 deletions

View file

@ -2,15 +2,17 @@ import time
import asyncio
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional
from typing import Optional, List, Tuple
from prometheus_client import Gauge, Histogram
import lbry
from lbry.schema.claim import Claim
from lbry.wallet.server.tx import Tx
from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.transaction import Transaction
from lbry.wallet.server.udp import StatusServer
@ -213,13 +215,10 @@ class BlockProcessor:
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
if hprevs == chain:
start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
await self._maybe_flush()
processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height)
self.block_update_time_metric.observe(processed_time)
@ -420,7 +419,14 @@ class BlockProcessor:
self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1])
def advance_txs(self, height, txs, header, block_hash):
self.db.flush_dbs(self.flush_data(), True, self.estimate_txs_remaining)
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
def advance_txs(self, height, txs: List[Tuple[Tx, bytes]], header, block_hash):
self.block_hashes.append(block_hash)
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
@ -608,7 +614,6 @@ class BlockProcessor:
for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:]
if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed)
try:
hash, height = self.db.fs_tx_hash(tx_num)
@ -632,6 +637,7 @@ class BlockProcessor:
# Remove both entries for this UTXO
self.db_deletes.append(hdb_key)
self.db_deletes.append(udb_key)
return hashX + tx_num_packed + utxo_value_packed
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')

View file

@ -28,25 +28,26 @@ from lbry.utils import LRUCacheWithMetrics
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time
from lbry.wallet.server.util import formatted_time, pack_be_uint16
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HISTORY_PREFIX = b'A'
TX_PREFIX = b'B'
BLOCK_HASH_PREFIX = b'C'
HEADER_PREFIX = b'H'
TX_NUM_PREFIX = b'N'
TX_COUNT_PREFIX = b'T'
TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C'
HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h'
HIST_STATE = b'state-hist'
UTXO_STATE = b'state-utxo'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
UTXO_STATE = b'state-utxo'
HIST_STATE = b'state-hist'
@ -239,106 +240,6 @@ class LevelDB:
assert not flush_data.undo_infos
self.history.assert_flushed()
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
self.flush_fs(flush_data)
# Then history
self.flush_history()
# Flush state last as it reads the wall time.
with self.db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
def flush_fs(self, flush_data):
"""Write headers, tx counts and block tx hashes to the filesystem.
The first height to write is self.fs_height + 1. The FS
metadata is all append-only, so in a crash we just pick up
again from the height stored in the DB.
"""
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_txs) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
assert len(
b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers
start_time = time.perf_counter()
with self.db.write_batch() as batch:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
self.headers.append(header)
flush_data.headers.clear()
height_start = self.fs_height + 1
tx_num = prior_tx_count
with self.db.write_batch() as batch:
batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1
offset = 0
while offset < len(tx_hashes):
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num))
batch_put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32])
tx_num += 1
offset += 32
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
elapsed = time.perf_counter() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
self.history.flush()
def flush_utxo_db(self, batch, flush_data):
"""Flush the cached DB writes and UTXO set to the batch."""
# Care is needed because the writes generated by flushing the
@ -382,6 +283,117 @@ class LevelDB:
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
# self.flush_fs(flush_data)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_txs) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
assert len(
b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers
start_time = time.perf_counter()
with self.db.write_batch() as batch:
batch_put = batch.put
height_start = self.fs_height + 1
tx_num = prior_tx_count
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
batch_put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
self.headers.append(header)
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1
offset = 0
while offset < len(tx_hashes):
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
batch_put(TX_PREFIX + tx_hashes[offset:offset + 32], txs[offset // 32])
tx_num += 1
offset += 32
flush_data.headers.clear()
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
# flush_data.claim_txo_cache.clear()
# flush_data.support_txo_cache.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
# Then history
self.history.flush_count += 1
flush_id = pack_be_uint16(self.history.flush_count)
unflushed = self.history.unflushed
for hashX in sorted(unflushed):
key = hashX + flush_id
batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.history.write_state(batch)
unflushed.clear()
self.history.unflushed_count = 0
#########################
# Flush state last as it reads the wall time.
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
# self.flush_state(batch)
#
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
# # Update and put the wall time again - otherwise we drop the
# # time it took to commit the batch
# # self.flush_state(self.db)
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
def flush_state(self, batch):
"""Flush chain state to the batch."""
now = time.time()