forked from LBRYCommunity/lbry-sdk
simplify advance and reorg
This commit is contained in:
parent
81773a6497
commit
acfc1f56ee
3 changed files with 164 additions and 590 deletions
|
@ -1,12 +1,14 @@
|
|||
import time
|
||||
import asyncio
|
||||
import typing
|
||||
import struct
|
||||
from bisect import bisect_right
|
||||
from struct import pack, unpack
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from typing import Optional, List, Tuple, Set, DefaultDict, Dict
|
||||
from prometheus_client import Gauge, Histogram
|
||||
from collections import defaultdict
|
||||
import array
|
||||
import lbry
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.schema.mime_types import guess_stream_type
|
||||
|
@ -195,21 +197,18 @@ class BlockProcessor:
|
|||
# Meta
|
||||
self.next_cache_check = 0
|
||||
self.touched = set()
|
||||
self.reorg_count = 0
|
||||
|
||||
# Caches of unflushed items.
|
||||
self.headers = []
|
||||
self.block_hashes = []
|
||||
self.block_txs = []
|
||||
self.undo_infos = []
|
||||
|
||||
# UTXO cache
|
||||
self.utxo_cache = {}
|
||||
self.utxo_cache: Dict[Tuple[bytes, int], bytes] = {}
|
||||
self.db_deletes = []
|
||||
|
||||
# Claimtrie cache
|
||||
self.db_op_stack: Optional[RevertableOpStack] = None
|
||||
self.undo_claims = []
|
||||
|
||||
# If the lock is successfully acquired, in-memory chain state
|
||||
# is consistent with self.height
|
||||
|
@ -263,6 +262,7 @@ class BlockProcessor:
|
|||
|
||||
self.doesnt_have_valid_signature: Set[bytes] = set()
|
||||
self.claim_channels: Dict[bytes, bytes] = {}
|
||||
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
|
||||
|
||||
def claim_producer(self):
|
||||
if self.db.db_height <= 1:
|
||||
|
@ -295,6 +295,7 @@ class BlockProcessor:
|
|||
"""Process the list of raw blocks passed. Detects and handles
|
||||
reorgs.
|
||||
"""
|
||||
|
||||
if not raw_blocks:
|
||||
return
|
||||
first = self.height + 1
|
||||
|
@ -305,7 +306,7 @@ class BlockProcessor:
|
|||
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
|
||||
|
||||
if hprevs == chain:
|
||||
start = time.perf_counter()
|
||||
total_start = time.perf_counter()
|
||||
try:
|
||||
for block in blocks:
|
||||
start = time.perf_counter()
|
||||
|
@ -323,14 +324,7 @@ class BlockProcessor:
|
|||
except:
|
||||
self.logger.exception("advance blocks failed")
|
||||
raise
|
||||
# if self.sql:
|
||||
|
||||
# for cache in self.search_cache.values():
|
||||
# cache.clear()
|
||||
self.history_cache.clear() # TODO: is this needed?
|
||||
self.notifications.notified_mempool_txs.clear()
|
||||
|
||||
processed_time = time.perf_counter() - start
|
||||
processed_time = time.perf_counter() - total_start
|
||||
self.block_count_metric.set(self.height)
|
||||
self.block_update_time_metric.observe(processed_time)
|
||||
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
|
||||
|
@ -338,13 +332,32 @@ class BlockProcessor:
|
|||
s = '' if len(blocks) == 1 else 's'
|
||||
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
|
||||
if self._caught_up_event.is_set():
|
||||
# if self.sql:
|
||||
# await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
|
||||
# self.sql.filtered_streams, self.sql.filtered_channels)
|
||||
await self.notifications.on_block(self.touched, self.height)
|
||||
self.touched = set()
|
||||
elif hprevs[0] != chain[0]:
|
||||
await self.reorg_chain()
|
||||
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
|
||||
count = 1
|
||||
block_hashes_from_lbrycrd = await self.daemon.block_hex_hashes(
|
||||
min_start_height, self.coin.REORG_LIMIT
|
||||
)
|
||||
for height, block_hash in zip(
|
||||
reversed(range(min_start_height, min_start_height + self.coin.REORG_LIMIT)),
|
||||
reversed(block_hashes_from_lbrycrd)):
|
||||
if self.block_hashes[height][::-1].hex() == block_hash:
|
||||
break
|
||||
count += 1
|
||||
self.logger.warning(f"blockchain reorg detected at {self.height}, unwinding last {count} blocks")
|
||||
try:
|
||||
assert count > 0, count
|
||||
for _ in range(count):
|
||||
await self.run_in_thread_with_lock(self.backup_block)
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
self.reorg_count_metric.inc()
|
||||
except:
|
||||
self.logger.exception("reorg blocks failed")
|
||||
raise
|
||||
finally:
|
||||
self.logger.info("backed up to block %i", self.height)
|
||||
else:
|
||||
# It is probably possible but extremely rare that what
|
||||
# bitcoind returns doesn't form a chain because it
|
||||
|
@ -355,101 +368,26 @@ class BlockProcessor:
|
|||
'resetting the prefetcher')
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
|
||||
async def reorg_chain(self, count: Optional[int] = None):
|
||||
"""Handle a chain reorganisation.
|
||||
|
||||
Count is the number of blocks to simulate a reorg, or None for
|
||||
a real reorg."""
|
||||
if count is None:
|
||||
self.logger.info('chain reorg detected')
|
||||
else:
|
||||
self.logger.info(f'faking a reorg of {count:,d} blocks')
|
||||
|
||||
async def get_raw_blocks(last_height, hex_hashes):
|
||||
heights = range(last_height, last_height - len(hex_hashes), -1)
|
||||
try:
|
||||
blocks = [await self.db.read_raw_block(height) for height in heights]
|
||||
self.logger.info(f'read {len(blocks)} blocks from disk')
|
||||
return blocks
|
||||
except FileNotFoundError:
|
||||
return await self.daemon.raw_blocks(hex_hashes)
|
||||
|
||||
try:
|
||||
start, last, hashes = await self.reorg_hashes(count)
|
||||
# Reverse and convert to hex strings.
|
||||
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
|
||||
self.logger.info("reorg %i block hashes", len(hashes))
|
||||
|
||||
for hex_hashes in chunks(hashes, 50):
|
||||
raw_blocks = await get_raw_blocks(last, hex_hashes)
|
||||
self.logger.info("got %i raw blocks", len(raw_blocks))
|
||||
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
||||
last -= len(raw_blocks)
|
||||
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
self.reorg_count_metric.inc()
|
||||
except:
|
||||
self.logger.exception("boom")
|
||||
raise
|
||||
finally:
|
||||
self.logger.info("done with reorg")
|
||||
|
||||
async def reorg_hashes(self, count):
|
||||
"""Return a pair (start, last, hashes) of blocks to back up during a
|
||||
reorg.
|
||||
|
||||
The hashes are returned in order of increasing height. Start
|
||||
is the height of the first hash, last of the last.
|
||||
"""
|
||||
|
||||
"""Calculate the reorg range"""
|
||||
|
||||
def diff_pos(hashes1, hashes2):
|
||||
"""Returns the index of the first difference in the hash lists.
|
||||
If both lists match returns their length."""
|
||||
for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)):
|
||||
if hash1 != hash2:
|
||||
return n
|
||||
return len(hashes)
|
||||
|
||||
if count is None:
|
||||
# A real reorg
|
||||
start = self.height - 1
|
||||
count = 1
|
||||
while start > 0:
|
||||
hashes = await self.db.fs_block_hashes(start, count)
|
||||
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
|
||||
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
|
||||
n = diff_pos(hex_hashes, d_hex_hashes)
|
||||
if n > 0:
|
||||
start += n
|
||||
break
|
||||
count = min(count * 2, start)
|
||||
start -= count
|
||||
|
||||
count = (self.height - start) + 1
|
||||
else:
|
||||
start = (self.height - count) + 1
|
||||
last = start + count - 1
|
||||
s = '' if count == 1 else 's'
|
||||
self.logger.info(f'chain was reorganised replacing {count:,d} '
|
||||
f'block{s} at heights {start:,d}-{last:,d}')
|
||||
|
||||
return start, last, await self.db.fs_block_hashes(start, count)
|
||||
|
||||
# - Flushing
|
||||
def flush_data(self):
|
||||
"""The data for a flush. The lock must be taken."""
|
||||
assert self.state_lock.locked()
|
||||
return FlushData(self.height, self.tx_count, self.headers, self.block_hashes,
|
||||
self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache,
|
||||
self.db_deletes, self.tip, self.undo_claims)
|
||||
return FlushData(self.height, self.tx_count, self.block_hashes,
|
||||
self.block_txs, self.db_op_stack, self.tip)
|
||||
|
||||
async def flush(self):
|
||||
def flush():
|
||||
self.db.flush_dbs(self.flush_data())
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
async def write_state(self):
|
||||
def flush():
|
||||
with self.db.db.write_batch() as batch:
|
||||
self.db.write_db_state(batch)
|
||||
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
|
||||
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]):
|
||||
try:
|
||||
|
@ -1167,51 +1105,51 @@ class BlockProcessor:
|
|||
block_hash = self.coin.header_hash(block.header)
|
||||
|
||||
self.block_hashes.append(block_hash)
|
||||
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
|
||||
self.db_op_stack.append(RevertablePut(*Prefixes.block_hash.pack_item(height, block_hash)))
|
||||
|
||||
first_tx_num = self.tx_count
|
||||
undo_info = []
|
||||
hashXs_by_tx = []
|
||||
tx_count = self.tx_count
|
||||
|
||||
# Use local vars for speed in the loops
|
||||
put_utxo = self.utxo_cache.__setitem__
|
||||
claimtrie_stash_extend = self.db_op_stack.extend
|
||||
spend_utxo = self.spend_utxo
|
||||
undo_info_append = undo_info.append
|
||||
update_touched = self.touched.update
|
||||
append_hashX_by_tx = hashXs_by_tx.append
|
||||
hashX_from_script = self.coin.hashX_from_script
|
||||
add_utxo = self.add_utxo
|
||||
|
||||
spend_claim_or_support_txo = self._spend_claim_or_support_txo
|
||||
add_claim_or_support = self._add_claim_or_support
|
||||
|
||||
for tx, tx_hash in txs:
|
||||
spent_claims = {}
|
||||
|
||||
hashXs = [] # hashXs touched by spent inputs/rx outputs
|
||||
append_hashX = hashXs.append
|
||||
tx_numb = pack('<I', tx_count)
|
||||
|
||||
txos = Transaction(tx.raw).outputs
|
||||
|
||||
self.db_op_stack.extend([
|
||||
RevertablePut(*Prefixes.tx.pack_item(tx_hash, tx.raw)),
|
||||
RevertablePut(*Prefixes.tx_num.pack_item(tx_hash, tx_count)),
|
||||
RevertablePut(*Prefixes.tx_hash.pack_item(tx_count, tx_hash))
|
||||
])
|
||||
|
||||
# Spend the inputs
|
||||
for txin in tx.inputs:
|
||||
if txin.is_generation():
|
||||
continue
|
||||
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||
# spend utxo for address histories
|
||||
cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
|
||||
undo_info_append(cache_value)
|
||||
append_hashX(cache_value[:-12])
|
||||
self._spend_claim_or_support_txo(txin, spent_claims)
|
||||
hashX = spend_utxo(txin.prev_hash, txin.prev_idx)
|
||||
if hashX:
|
||||
# self._set_hashX_cache(hashX)
|
||||
if txin_num not in self.hashXs_by_tx[hashX]:
|
||||
self.hashXs_by_tx[hashX].append(txin_num)
|
||||
# spend claim/support txo
|
||||
spend_claim_or_support_txo(txin, spent_claims)
|
||||
|
||||
# Add the new UTXOs
|
||||
for nout, txout in enumerate(tx.outputs):
|
||||
# Get the hashX. Ignore unspendable outputs
|
||||
hashX = hashX_from_script(txout.pk_script)
|
||||
hashX = add_utxo(tx_hash, tx_count, nout, txout)
|
||||
if hashX:
|
||||
append_hashX(hashX)
|
||||
put_utxo(tx_hash + pack('<H', nout), hashX + tx_numb + pack('<Q', txout.value))
|
||||
|
||||
# self._set_hashX_cache(hashX)
|
||||
if tx_count not in self.hashXs_by_tx[hashX]:
|
||||
self.hashXs_by_tx[hashX].append(tx_count)
|
||||
# add claim/support txo
|
||||
self._add_claim_or_support(
|
||||
add_claim_or_support(
|
||||
height, tx_hash, tx_count, nout, txos[nout], spent_claims
|
||||
)
|
||||
|
||||
|
@ -1220,8 +1158,6 @@ class BlockProcessor:
|
|||
# print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}")
|
||||
self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)
|
||||
|
||||
append_hashX_by_tx(hashXs)
|
||||
update_touched(hashXs)
|
||||
self.db.total_transactions.append(tx_hash)
|
||||
self.db.transaction_num_mapping[tx_hash] = tx_count
|
||||
tx_count += 1
|
||||
|
@ -1232,31 +1168,34 @@ class BlockProcessor:
|
|||
# activate claims and process takeovers
|
||||
self._get_takeover_ops(height)
|
||||
|
||||
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
_unflushed = self.db.hist_unflushed
|
||||
_count = 0
|
||||
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
for _hashX in set(_hashXs):
|
||||
_unflushed[_hashX].append(_tx_num)
|
||||
_count += len(_hashXs)
|
||||
self.db.hist_unflushed_count += _count
|
||||
self.db_op_stack.append(RevertablePut(*Prefixes.header.pack_item(height, block.header)))
|
||||
self.db_op_stack.append(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
|
||||
|
||||
for hashX, new_history in self.hashXs_by_tx.items():
|
||||
if not new_history:
|
||||
continue
|
||||
self.db_op_stack.append(
|
||||
RevertablePut(
|
||||
*Prefixes.hashX_history.pack_item(
|
||||
hashX, height, new_history
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
self.tx_count = tx_count
|
||||
self.db.tx_counts.append(self.tx_count)
|
||||
|
||||
undo_claims = b''.join(op.invert().pack() for op in self.db_op_stack)
|
||||
# print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash)))
|
||||
|
||||
if height >= self.daemon.cached_height() - self.env.reorg_limit:
|
||||
self.undo_infos.append((undo_info, height))
|
||||
self.undo_claims.append((undo_claims, height))
|
||||
self.db.write_raw_block(block.raw, height)
|
||||
self.db_op_stack.append(RevertablePut(*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())))
|
||||
|
||||
self.height = height
|
||||
self.headers.append(block.header)
|
||||
self.db.headers.append(block.header)
|
||||
self.tip = self.coin.header_hash(block.header)
|
||||
|
||||
self.db.flush_dbs(self.flush_data())
|
||||
self.clear_after_advance_or_reorg()
|
||||
|
||||
def clear_after_advance_or_reorg(self):
|
||||
self.db_op_stack.clear()
|
||||
self.txo_to_claim.clear()
|
||||
self.claim_hash_to_txo.clear()
|
||||
|
@ -1277,186 +1216,83 @@ class BlockProcessor:
|
|||
self.expired_claim_hashes.clear()
|
||||
self.doesnt_have_valid_signature.clear()
|
||||
self.claim_channels.clear()
|
||||
|
||||
# for cache in self.search_cache.values():
|
||||
# cache.clear()
|
||||
self.utxo_cache.clear()
|
||||
self.hashXs_by_tx.clear()
|
||||
self.history_cache.clear()
|
||||
self.notifications.notified_mempool_txs.clear()
|
||||
|
||||
def backup_blocks(self, raw_blocks):
|
||||
"""Backup the raw blocks and flush.
|
||||
|
||||
The blocks should be in order of decreasing height, starting at.
|
||||
self.height. A flush is performed once the blocks are backed up.
|
||||
"""
|
||||
def backup_block(self):
|
||||
self.db.assert_flushed(self.flush_data())
|
||||
assert self.height >= len(raw_blocks)
|
||||
|
||||
coin = self.coin
|
||||
for raw_block in raw_blocks:
|
||||
self.logger.info("backup block %i", self.height)
|
||||
# Check and update self.tip
|
||||
block = coin.block(raw_block, self.height)
|
||||
header_hash = coin.header_hash(block.header)
|
||||
if header_hash != self.tip:
|
||||
raise ChainError('backup block {} not tip {} at height {:,d}'
|
||||
.format(hash_to_hex_str(header_hash),
|
||||
hash_to_hex_str(self.tip),
|
||||
self.height))
|
||||
self.tip = coin.header_prevhash(block.header)
|
||||
self.backup_txs(block.transactions)
|
||||
self.height -= 1
|
||||
self.db.tx_counts.pop()
|
||||
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
|
||||
self.db.flush_backup(self.flush_data(), self.touched)
|
||||
self.logger.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
def backup_txs(self, txs):
|
||||
# Prevout values, in order down the block (coinbase first if present)
|
||||
# undo_info is in reverse block order
|
||||
undo_info, undo_claims = self.db.read_undo_info(self.height)
|
||||
if undo_info is None:
|
||||
self.logger.info("backup block %i", self.height)
|
||||
# Check and update self.tip
|
||||
undo_ops = self.db.read_undo_info(self.height)
|
||||
if undo_ops is None:
|
||||
raise ChainError(f'no undo information found for height {self.height:,d}')
|
||||
n = len(undo_info)
|
||||
|
||||
# Use local vars for speed in the loops
|
||||
s_pack = pack
|
||||
undo_entry_len = 12 + HASHX_LEN
|
||||
|
||||
for tx, tx_hash in reversed(txs):
|
||||
for idx, txout in enumerate(tx.outputs):
|
||||
# Spend the TX outputs. Be careful with unspendable
|
||||
# outputs - we didn't save those in the first place.
|
||||
hashX = self.coin.hashX_from_script(txout.pk_script)
|
||||
if hashX:
|
||||
cache_value = self.spend_utxo(tx_hash, idx)
|
||||
self.touched.add(cache_value[:-12])
|
||||
|
||||
# Restore the inputs
|
||||
for txin in reversed(tx.inputs):
|
||||
if txin.is_generation():
|
||||
continue
|
||||
n -= undo_entry_len
|
||||
undo_item = undo_info[n:n + undo_entry_len]
|
||||
self.utxo_cache[txin.prev_hash + s_pack('<H', txin.prev_idx)] = undo_item
|
||||
self.touched.add(undo_item[:-12])
|
||||
|
||||
self.db_op_stack.apply_packed_undo_ops(undo_ops)
|
||||
self.db_op_stack.append(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops))
|
||||
self.db.headers.pop()
|
||||
self.block_hashes.pop()
|
||||
self.db.tx_counts.pop()
|
||||
self.tip = self.coin.header_hash(self.db.headers[-1])
|
||||
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
|
||||
self.db.transaction_num_mapping.pop(self.db.total_transactions.pop())
|
||||
self.tx_count -= 1
|
||||
self.height -= 1
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
self.db.flush_backup(self.flush_data())
|
||||
self.clear_after_advance_or_reorg()
|
||||
self.logger.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
assert n == 0
|
||||
self.tx_count -= len(txs)
|
||||
self.undo_claims.append((undo_claims, self.height))
|
||||
|
||||
"""An in-memory UTXO cache, representing all changes to UTXO state
|
||||
since the last DB flush.
|
||||
|
||||
We want to store millions of these in memory for optimal
|
||||
performance during initial sync, because then it is possible to
|
||||
spend UTXOs without ever going to the database (other than as an
|
||||
entry in the address history, and there is only one such entry per
|
||||
TX not per UTXO). So store them in a Python dictionary with
|
||||
binary keys and values.
|
||||
|
||||
Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes)
|
||||
Value: HASHX + TX_NUM + VALUE (11 + 4 + 8 = 23 bytes)
|
||||
|
||||
That's 57 bytes of raw data in-memory. Python dictionary overhead
|
||||
means each entry actually uses about 205 bytes of memory. So
|
||||
almost 5 million UTXOs can fit in 1GB of RAM. There are
|
||||
approximately 42 million UTXOs on bitcoin mainnet at height
|
||||
433,000.
|
||||
|
||||
Semantics:
|
||||
|
||||
add: Add it to the cache dictionary.
|
||||
|
||||
spend: Remove it if in the cache dictionary. Otherwise it's
|
||||
been flushed to the DB. Each UTXO is responsible for two
|
||||
entries in the DB. Mark them for deletion in the next
|
||||
cache flush.
|
||||
|
||||
The UTXO database format has to be able to do two things efficiently:
|
||||
|
||||
1. Given an address be able to list its UTXOs and their values
|
||||
so its balance can be efficiently computed.
|
||||
|
||||
2. When processing transactions, for each prevout spent - a (tx_hash,
|
||||
idx) pair - we have to be able to remove it from the DB. To send
|
||||
notifications to clients we also need to know any address it paid
|
||||
to.
|
||||
|
||||
To this end we maintain two "tables", one for each point above:
|
||||
|
||||
1. Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
Value: the UTXO value as a 64-bit unsigned integer
|
||||
|
||||
2. Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
Value: hashX
|
||||
|
||||
The compressed tx hash is just the first few bytes of the hash of
|
||||
the tx in which the UTXO was created. As this is not unique there
|
||||
will be potential collisions so tx_num is also in the key. When
|
||||
looking up a UTXO the prefix space of the compressed hash needs to
|
||||
be searched and resolved if necessary with the tx_num. The
|
||||
collision rate is low (<0.1%).
|
||||
"""
|
||||
|
||||
def spend_utxo(self, tx_hash, tx_idx):
|
||||
"""Spend a UTXO and return the 33-byte value.
|
||||
|
||||
If the UTXO is not in the cache it must be on disk. We store
|
||||
all UTXOs so not finding one indicates a logic error or DB
|
||||
corruption.
|
||||
"""
|
||||
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
|
||||
hashX = self.coin.hashX_from_script(txout.pk_script)
|
||||
if hashX:
|
||||
self.utxo_cache[(tx_hash, nout)] = hashX
|
||||
self.db_op_stack.extend([
|
||||
RevertablePut(
|
||||
*Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value)
|
||||
),
|
||||
RevertablePut(
|
||||
*Prefixes.hashX_utxo.pack_item(tx_hash[:4], tx_num, nout, hashX)
|
||||
)
|
||||
])
|
||||
return hashX
|
||||
|
||||
def spend_utxo(self, tx_hash: bytes, nout: int):
|
||||
# Fast track is it being in the cache
|
||||
idx_packed = pack('<H', tx_idx)
|
||||
cache_value = self.utxo_cache.pop(tx_hash + idx_packed, None)
|
||||
cache_value = self.utxo_cache.pop((tx_hash, nout), None)
|
||||
if cache_value:
|
||||
return cache_value
|
||||
|
||||
# Spend it from the DB.
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = DB_PREFIXES.HASHX_UTXO_PREFIX.value + tx_hash[:4] + idx_packed
|
||||
prefix = Prefixes.hashX_utxo.pack_partial_key(tx_hash[:4])
|
||||
candidates = {db_key: hashX for db_key, hashX in self.db.db.iterator(prefix=prefix)}
|
||||
|
||||
for hdb_key, hashX in candidates.items():
|
||||
tx_num_packed = hdb_key[-4:]
|
||||
key = Prefixes.hashX_utxo.unpack_key(hdb_key)
|
||||
if len(candidates) > 1:
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
try:
|
||||
hash, height = self.db.fs_tx_hash(tx_num)
|
||||
except IndexError:
|
||||
self.logger.error("data integrity error for hashx history: %s missing tx #%s (%s:%s)",
|
||||
hashX.hex(), tx_num, hash_to_hex_str(tx_hash), tx_idx)
|
||||
continue
|
||||
hash = self.db.total_transactions[key.tx_num]
|
||||
if hash != tx_hash:
|
||||
assert hash is not None # Should always be found
|
||||
continue
|
||||
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
udb_key = DB_PREFIXES.UTXO_PREFIX.value + hashX + hdb_key[-6:]
|
||||
if key.nout != nout:
|
||||
continue
|
||||
udb_key = Prefixes.utxo.pack_key(hashX, key.tx_num, nout)
|
||||
utxo_value_packed = self.db.db.get(udb_key)
|
||||
if utxo_value_packed is None:
|
||||
self.logger.warning(
|
||||
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX)
|
||||
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX)
|
||||
)
|
||||
raise ChainError(f"{hash_to_hex_str(tx_hash)}:{tx_idx} is not found in UTXO db for {hash_to_hex_str(hashX)}")
|
||||
raise ChainError(f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}")
|
||||
# Remove both entries for this UTXO
|
||||
self.db_deletes.append(hdb_key)
|
||||
self.db_deletes.append(udb_key)
|
||||
|
||||
return hashX + tx_num_packed + utxo_value_packed
|
||||
self.db_op_stack.extend([
|
||||
RevertableDelete(hdb_key, hashX),
|
||||
RevertableDelete(udb_key, utxo_value_packed)
|
||||
])
|
||||
return hashX
|
||||
|
||||
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')
|
||||
raise ChainError('UTXO {} / {:,d} not found in "h" table'
|
||||
.format(hash_to_hex_str(tx_hash), tx_idx))
|
||||
.format(hash_to_hex_str(tx_hash), nout))
|
||||
|
||||
async def _process_prefetched_blocks(self):
|
||||
"""Loop forever processing blocks as they arrive."""
|
||||
|
@ -1467,23 +1303,19 @@ class BlockProcessor:
|
|||
self._caught_up_event.set()
|
||||
await self.blocks_event.wait()
|
||||
self.blocks_event.clear()
|
||||
if self.reorg_count: # this could only happen by calling the reorg rpc
|
||||
await self.reorg_chain(self.reorg_count)
|
||||
self.reorg_count = 0
|
||||
else:
|
||||
blocks = self.prefetcher.get_prefetched_blocks()
|
||||
try:
|
||||
await self.check_and_advance_blocks(blocks)
|
||||
except Exception:
|
||||
self.logger.exception("error while processing txs")
|
||||
raise
|
||||
blocks = self.prefetcher.get_prefetched_blocks()
|
||||
try:
|
||||
await self.check_and_advance_blocks(blocks)
|
||||
except Exception:
|
||||
self.logger.exception("error while processing txs")
|
||||
raise
|
||||
|
||||
async def _first_caught_up(self):
|
||||
self.logger.info(f'caught up to height {self.height}')
|
||||
# Flush everything but with first_sync->False state.
|
||||
first_sync = self.db.first_sync
|
||||
self.db.first_sync = False
|
||||
await self.flush()
|
||||
await self.write_state()
|
||||
if first_sync:
|
||||
self.logger.info(f'{lbry.__version__} synced to '
|
||||
f'height {self.height:,d}, halting here.')
|
||||
|
|
|
@ -65,16 +65,10 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
|
|||
class FlushData:
|
||||
height = attr.ib()
|
||||
tx_count = attr.ib()
|
||||
headers = attr.ib()
|
||||
block_hashes = attr.ib()
|
||||
block_txs = attr.ib()
|
||||
claimtrie_stash = attr.ib()
|
||||
# The following are flushed to the UTXO DB if undo_infos is not None
|
||||
undo_infos = attr.ib()
|
||||
adds = attr.ib()
|
||||
deletes = attr.ib()
|
||||
put_and_delete_ops = attr.ib()
|
||||
tip = attr.ib()
|
||||
undo = attr.ib()
|
||||
|
||||
|
||||
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
|
||||
|
@ -143,9 +137,6 @@ class LevelDB:
|
|||
self.merkle = Merkle()
|
||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||
|
||||
self.headers_db = None
|
||||
self.tx_db = None
|
||||
|
||||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||
self.total_transactions = None
|
||||
self.transaction_num_mapping = {}
|
||||
|
@ -748,61 +739,8 @@ class LevelDB:
|
|||
raise RuntimeError(msg)
|
||||
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
|
||||
|
||||
# self.history.clear_excess(self.utxo_flush_count)
|
||||
# < might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
if self.hist_flush_count > self.utxo_flush_count:
|
||||
self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value):
|
||||
k = key[1:]
|
||||
flush_id = int.from_bytes(k[-4:], byteorder='big')
|
||||
if flush_id > self.utxo_flush_count:
|
||||
keys.append(k)
|
||||
|
||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
|
||||
self.hist_flush_count = self.utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key)
|
||||
if keys:
|
||||
self.logger.info('deleted %i excess history entries', len(keys))
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
keys.append(key)
|
||||
if min_height > 0:
|
||||
for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value,
|
||||
stop=Prefixes.undo.pack_key(min_height),
|
||||
include_value=False):
|
||||
keys.append(key)
|
||||
if keys:
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
||||
|
||||
# delete old block files
|
||||
prefix = self.raw_block_prefix()
|
||||
paths = [path for path in glob(f'{prefix}[0-9]*')
|
||||
if len(path) > len(prefix)
|
||||
and int(path[len(prefix):]) < min_height]
|
||||
if paths:
|
||||
for path in paths:
|
||||
try:
|
||||
os.remove(path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
self.logger.info(f'deleted {len(paths):,d} stale block files')
|
||||
|
||||
# Read TX counts (requires meta directory)
|
||||
await self._read_tx_counts()
|
||||
if self.total_transactions is None:
|
||||
|
@ -836,129 +774,50 @@ class LevelDB:
|
|||
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
|
||||
assert flush_data.height == self.fs_height == self.db_height
|
||||
assert flush_data.tip == self.db_tip
|
||||
assert not flush_data.headers
|
||||
assert not flush_data.block_txs
|
||||
assert not flush_data.adds
|
||||
assert not flush_data.deletes
|
||||
assert not flush_data.undo_infos
|
||||
assert not self.hist_unflushed
|
||||
assert not len(flush_data.put_and_delete_ops)
|
||||
|
||||
def flush_dbs(self, flush_data: FlushData):
|
||||
"""Flush out cached state. History is always flushed; UTXOs are
|
||||
flushed if flush_utxos."""
|
||||
|
||||
if flush_data.height == self.db_height:
|
||||
self.assert_flushed(flush_data)
|
||||
return
|
||||
|
||||
# start_time = time.time()
|
||||
prior_flush = self.last_flush
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
|
||||
# Flush to file system
|
||||
# self.flush_fs(flush_data)
|
||||
prior_tx_count = (self.tx_counts[self.fs_height]
|
||||
if self.fs_height >= 0 else 0)
|
||||
|
||||
assert len(flush_data.block_txs) == len(flush_data.headers)
|
||||
assert flush_data.height == self.fs_height + len(flush_data.headers)
|
||||
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
|
||||
else 0)
|
||||
assert len(self.tx_counts) == flush_data.height + 1
|
||||
assert len(
|
||||
b''.join(hashes for hashes, _ in flush_data.block_txs)
|
||||
) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}"
|
||||
|
||||
# Write the headers
|
||||
# start_time = time.perf_counter()
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
delete_undo_keys = []
|
||||
if min_height > 0:
|
||||
delete_undo_keys.extend(
|
||||
self.db.iterator(
|
||||
start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False
|
||||
)
|
||||
)
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
self.put = batch.put
|
||||
batch_put = self.put
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
height_start = self.fs_height + 1
|
||||
tx_num = prior_tx_count
|
||||
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(
|
||||
zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
|
||||
batch_put(DB_PREFIXES.HEADER_PREFIX.value + util.pack_be_uint64(height_start), header)
|
||||
self.headers.append(header)
|
||||
tx_count = self.tx_counts[height_start]
|
||||
batch_put(DB_PREFIXES.BLOCK_HASH_PREFIX.value + util.pack_be_uint64(height_start), block_hash[::-1])
|
||||
batch_put(DB_PREFIXES.TX_COUNT_PREFIX.value + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
height_start += 1
|
||||
offset = 0
|
||||
while offset < len(tx_hashes):
|
||||
batch_put(DB_PREFIXES.TX_HASH_PREFIX.value + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
|
||||
batch_put(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
|
||||
batch_put(DB_PREFIXES.TX_PREFIX.value + tx_hashes[offset:offset + 32], txs[offset // 32])
|
||||
tx_num += 1
|
||||
offset += 32
|
||||
flush_data.headers.clear()
|
||||
flush_data.block_txs.clear()
|
||||
flush_data.block_hashes.clear()
|
||||
for staged_change in flush_data.claimtrie_stash:
|
||||
# print("ADVANCE", staged_change)
|
||||
|
||||
for staged_change in flush_data.put_and_delete_ops:
|
||||
if staged_change.is_put:
|
||||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
flush_data.claimtrie_stash.clear()
|
||||
|
||||
for undo_ops, height in flush_data.undo:
|
||||
batch_put(*Prefixes.undo.pack_item(height, undo_ops))
|
||||
flush_data.undo.clear()
|
||||
for delete_key in delete_undo_keys:
|
||||
batch_delete(delete_key)
|
||||
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
|
||||
# Then history
|
||||
self.hist_flush_count += 1
|
||||
flush_id = util.pack_be_uint32(self.hist_flush_count)
|
||||
unflushed = self.hist_unflushed
|
||||
|
||||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes())
|
||||
|
||||
unflushed.clear()
|
||||
self.hist_unflushed_count = 0
|
||||
|
||||
#########################
|
||||
|
||||
# New undo information
|
||||
for undo_info, height in flush_data.undo_infos:
|
||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
# Spends
|
||||
for key in sorted(flush_data.deletes):
|
||||
batch_delete(key)
|
||||
flush_data.deletes.clear()
|
||||
|
||||
# New UTXOs
|
||||
for key, value in flush_data.adds.items():
|
||||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX)
|
||||
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
|
||||
self.write_db_state(batch)
|
||||
|
||||
def flush_backup(self, flush_data, touched):
|
||||
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
|
||||
assert not flush_data.headers
|
||||
def flush_backup(self, flush_data):
|
||||
assert not flush_data.block_txs
|
||||
assert flush_data.height < self.db_height
|
||||
assert not self.hist_unflushed
|
||||
|
@ -974,82 +833,25 @@ class LevelDB:
|
|||
self.hist_flush_count += 1
|
||||
nremoves = 0
|
||||
|
||||
undo_ops = RevertableOpStack(self.db.get)
|
||||
|
||||
for (packed_ops, height) in reversed(flush_data.undo):
|
||||
undo_ops.extend(reversed(RevertableOp.unpack_stack(packed_ops)))
|
||||
undo_ops.append(
|
||||
RevertableDelete(*Prefixes.undo.pack_item(height, packed_ops))
|
||||
)
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
|
||||
# print("flush undos", flush_data.undo_claimtrie)
|
||||
for op in undo_ops:
|
||||
for op in flush_data.put_and_delete_ops:
|
||||
# print("REWIND", op)
|
||||
if op.is_put:
|
||||
batch_put(op.key, op.value)
|
||||
else:
|
||||
batch_delete(op.key)
|
||||
|
||||
flush_data.undo.clear()
|
||||
|
||||
while self.fs_height > flush_data.height:
|
||||
self.fs_height -= 1
|
||||
self.headers.pop()
|
||||
tx_count = flush_data.tx_count
|
||||
for hashX in sorted(touched):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value + hashX, reverse=True):
|
||||
k = key[1:]
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= tx_count
|
||||
idx = bisect_left(a, tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[k] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(k)
|
||||
|
||||
for key in deletes:
|
||||
batch_delete(key)
|
||||
for key, value in puts.items():
|
||||
batch_put(key, value)
|
||||
|
||||
# New undo information
|
||||
for undo_info, height in flush_data.undo:
|
||||
batch.put(self.undo_key(height), b''.join(undo_info))
|
||||
flush_data.undo.clear()
|
||||
|
||||
# Spends
|
||||
for key in sorted(flush_data.deletes):
|
||||
batch_delete(key)
|
||||
flush_data.deletes.clear()
|
||||
|
||||
# New UTXOs
|
||||
for key, value in flush_data.adds.items():
|
||||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX)
|
||||
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
start_time = time.time()
|
||||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
|
||||
f'{spend_count:,d} spends in '
|
||||
f'{tx_count:,d} txs in '
|
||||
f'{elapsed:.1f}s, committing...')
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
|
@ -1121,7 +923,6 @@ class LevelDB:
|
|||
return None, tx_height
|
||||
|
||||
def _fs_transactions(self, txids: Iterable[str]):
|
||||
unpack_be_uint64 = util.unpack_be_uint64
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.db.get
|
||||
tx_cache = self._tx_and_merkle_cache
|
||||
|
@ -1133,14 +934,12 @@ class LevelDB:
|
|||
tx, merkle = cached_tx
|
||||
else:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = tx_db_get(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hash_bytes)
|
||||
tx_num = self.transaction_num_mapping.get(tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
if tx_num is not None:
|
||||
tx_num = unpack_be_uint64(tx_num)
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
if tx_height < self.db_height:
|
||||
tx = tx_db_get(DB_PREFIXES.TX_PREFIX.value + tx_hash_bytes)
|
||||
tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes))
|
||||
if tx_height == -1:
|
||||
merkle = {
|
||||
'block_height': -1
|
||||
|
@ -1204,67 +1003,10 @@ class LevelDB:
|
|||
|
||||
def undo_key(self, height: int) -> bytes:
|
||||
"""DB key for undo information at the given height."""
|
||||
return DB_PREFIXES.UNDO_PREFIX.value + pack('>I', height)
|
||||
return Prefixes.undo.pack_key(height)
|
||||
|
||||
def read_undo_info(self, height):
|
||||
"""Read undo information from a file for the current height."""
|
||||
return self.db.get(self.undo_key(height)), self.db.get(Prefixes.undo.pack_key(self.fs_height))
|
||||
|
||||
def raw_block_prefix(self):
|
||||
return 'block'
|
||||
|
||||
def raw_block_path(self, height):
|
||||
return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}')
|
||||
|
||||
async def read_raw_block(self, height):
|
||||
"""Returns a raw block read from disk. Raises FileNotFoundError
|
||||
if the block isn't on-disk."""
|
||||
|
||||
def read():
|
||||
with util.open_file(self.raw_block_path(height)) as f:
|
||||
return f.read(-1)
|
||||
|
||||
return await asyncio.get_event_loop().run_in_executor(self.executor, read)
|
||||
|
||||
def write_raw_block(self, block, height):
|
||||
"""Write a raw block to disk."""
|
||||
with util.open_truncate(self.raw_block_path(height)) as f:
|
||||
f.write(block)
|
||||
# Delete old blocks to prevent them accumulating
|
||||
try:
|
||||
del_height = self.min_undo_height(height) - 1
|
||||
os.remove(self.raw_block_path(del_height))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def clear_excess_undo_info(self):
|
||||
"""Clear excess undo info. Only most recent N are kept."""
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
keys.append(key)
|
||||
|
||||
if keys:
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
||||
|
||||
# delete old block files
|
||||
prefix = self.raw_block_prefix()
|
||||
paths = [path for path in glob(f'{prefix}[0-9]*')
|
||||
if len(path) > len(prefix)
|
||||
and int(path[len(prefix):]) < min_height]
|
||||
if paths:
|
||||
for path in paths:
|
||||
try:
|
||||
os.remove(path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
self.logger.info(f'deleted {len(paths):,d} stale block files')
|
||||
def read_undo_info(self, height: int):
|
||||
return self.db.get(Prefixes.undo.pack_key(height))
|
||||
|
||||
# -- UTXO database
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
check_supports(c['claimId'], c['supports'])
|
||||
claim_hash = bytes.fromhex(c['claimId'])
|
||||
self.assertEqual(c['validAtHeight'], db.get_activation(
|
||||
db.total_transactions.index(bytes.fromhex(c['txId'])[::-1]), c['n']
|
||||
db.transaction_num_mapping[bytes.fromhex(c['txId'])[::-1]], c['n']
|
||||
))
|
||||
self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash))
|
||||
|
||||
|
|
Loading…
Reference in a new issue