simplify advance and reorg
This commit is contained in:
parent
2bb55d681d
commit
3955b64405
3 changed files with 164 additions and 590 deletions
|
@ -1,12 +1,14 @@
|
||||||
import time
|
import time
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
|
import struct
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from struct import pack, unpack
|
from struct import pack, unpack
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
from typing import Optional, List, Tuple, Set, DefaultDict, Dict
|
from typing import Optional, List, Tuple, Set, DefaultDict, Dict
|
||||||
from prometheus_client import Gauge, Histogram
|
from prometheus_client import Gauge, Histogram
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import array
|
||||||
import lbry
|
import lbry
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
from lbry.schema.mime_types import guess_stream_type
|
from lbry.schema.mime_types import guess_stream_type
|
||||||
|
@ -195,21 +197,18 @@ class BlockProcessor:
|
||||||
# Meta
|
# Meta
|
||||||
self.next_cache_check = 0
|
self.next_cache_check = 0
|
||||||
self.touched = set()
|
self.touched = set()
|
||||||
self.reorg_count = 0
|
|
||||||
|
|
||||||
# Caches of unflushed items.
|
# Caches of unflushed items.
|
||||||
self.headers = []
|
|
||||||
self.block_hashes = []
|
self.block_hashes = []
|
||||||
self.block_txs = []
|
self.block_txs = []
|
||||||
self.undo_infos = []
|
self.undo_infos = []
|
||||||
|
|
||||||
# UTXO cache
|
# UTXO cache
|
||||||
self.utxo_cache = {}
|
self.utxo_cache: Dict[Tuple[bytes, int], bytes] = {}
|
||||||
self.db_deletes = []
|
self.db_deletes = []
|
||||||
|
|
||||||
# Claimtrie cache
|
# Claimtrie cache
|
||||||
self.db_op_stack: Optional[RevertableOpStack] = None
|
self.db_op_stack: Optional[RevertableOpStack] = None
|
||||||
self.undo_claims = []
|
|
||||||
|
|
||||||
# If the lock is successfully acquired, in-memory chain state
|
# If the lock is successfully acquired, in-memory chain state
|
||||||
# is consistent with self.height
|
# is consistent with self.height
|
||||||
|
@ -263,6 +262,7 @@ class BlockProcessor:
|
||||||
|
|
||||||
self.doesnt_have_valid_signature: Set[bytes] = set()
|
self.doesnt_have_valid_signature: Set[bytes] = set()
|
||||||
self.claim_channels: Dict[bytes, bytes] = {}
|
self.claim_channels: Dict[bytes, bytes] = {}
|
||||||
|
self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list)
|
||||||
|
|
||||||
def claim_producer(self):
|
def claim_producer(self):
|
||||||
if self.db.db_height <= 1:
|
if self.db.db_height <= 1:
|
||||||
|
@ -295,6 +295,7 @@ class BlockProcessor:
|
||||||
"""Process the list of raw blocks passed. Detects and handles
|
"""Process the list of raw blocks passed. Detects and handles
|
||||||
reorgs.
|
reorgs.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not raw_blocks:
|
if not raw_blocks:
|
||||||
return
|
return
|
||||||
first = self.height + 1
|
first = self.height + 1
|
||||||
|
@ -305,7 +306,7 @@ class BlockProcessor:
|
||||||
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
|
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
|
||||||
|
|
||||||
if hprevs == chain:
|
if hprevs == chain:
|
||||||
start = time.perf_counter()
|
total_start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
for block in blocks:
|
for block in blocks:
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
|
@ -323,14 +324,7 @@ class BlockProcessor:
|
||||||
except:
|
except:
|
||||||
self.logger.exception("advance blocks failed")
|
self.logger.exception("advance blocks failed")
|
||||||
raise
|
raise
|
||||||
# if self.sql:
|
processed_time = time.perf_counter() - total_start
|
||||||
|
|
||||||
# for cache in self.search_cache.values():
|
|
||||||
# cache.clear()
|
|
||||||
self.history_cache.clear() # TODO: is this needed?
|
|
||||||
self.notifications.notified_mempool_txs.clear()
|
|
||||||
|
|
||||||
processed_time = time.perf_counter() - start
|
|
||||||
self.block_count_metric.set(self.height)
|
self.block_count_metric.set(self.height)
|
||||||
self.block_update_time_metric.observe(processed_time)
|
self.block_update_time_metric.observe(processed_time)
|
||||||
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
|
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
|
||||||
|
@ -338,13 +332,32 @@ class BlockProcessor:
|
||||||
s = '' if len(blocks) == 1 else 's'
|
s = '' if len(blocks) == 1 else 's'
|
||||||
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
|
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
|
||||||
if self._caught_up_event.is_set():
|
if self._caught_up_event.is_set():
|
||||||
# if self.sql:
|
|
||||||
# await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels,
|
|
||||||
# self.sql.filtered_streams, self.sql.filtered_channels)
|
|
||||||
await self.notifications.on_block(self.touched, self.height)
|
await self.notifications.on_block(self.touched, self.height)
|
||||||
self.touched = set()
|
self.touched = set()
|
||||||
elif hprevs[0] != chain[0]:
|
elif hprevs[0] != chain[0]:
|
||||||
await self.reorg_chain()
|
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
|
||||||
|
count = 1
|
||||||
|
block_hashes_from_lbrycrd = await self.daemon.block_hex_hashes(
|
||||||
|
min_start_height, self.coin.REORG_LIMIT
|
||||||
|
)
|
||||||
|
for height, block_hash in zip(
|
||||||
|
reversed(range(min_start_height, min_start_height + self.coin.REORG_LIMIT)),
|
||||||
|
reversed(block_hashes_from_lbrycrd)):
|
||||||
|
if self.block_hashes[height][::-1].hex() == block_hash:
|
||||||
|
break
|
||||||
|
count += 1
|
||||||
|
self.logger.warning(f"blockchain reorg detected at {self.height}, unwinding last {count} blocks")
|
||||||
|
try:
|
||||||
|
assert count > 0, count
|
||||||
|
for _ in range(count):
|
||||||
|
await self.run_in_thread_with_lock(self.backup_block)
|
||||||
|
await self.prefetcher.reset_height(self.height)
|
||||||
|
self.reorg_count_metric.inc()
|
||||||
|
except:
|
||||||
|
self.logger.exception("reorg blocks failed")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self.logger.info("backed up to block %i", self.height)
|
||||||
else:
|
else:
|
||||||
# It is probably possible but extremely rare that what
|
# It is probably possible but extremely rare that what
|
||||||
# bitcoind returns doesn't form a chain because it
|
# bitcoind returns doesn't form a chain because it
|
||||||
|
@ -355,101 +368,26 @@ class BlockProcessor:
|
||||||
'resetting the prefetcher')
|
'resetting the prefetcher')
|
||||||
await self.prefetcher.reset_height(self.height)
|
await self.prefetcher.reset_height(self.height)
|
||||||
|
|
||||||
async def reorg_chain(self, count: Optional[int] = None):
|
|
||||||
"""Handle a chain reorganisation.
|
|
||||||
|
|
||||||
Count is the number of blocks to simulate a reorg, or None for
|
|
||||||
a real reorg."""
|
|
||||||
if count is None:
|
|
||||||
self.logger.info('chain reorg detected')
|
|
||||||
else:
|
|
||||||
self.logger.info(f'faking a reorg of {count:,d} blocks')
|
|
||||||
|
|
||||||
async def get_raw_blocks(last_height, hex_hashes):
|
|
||||||
heights = range(last_height, last_height - len(hex_hashes), -1)
|
|
||||||
try:
|
|
||||||
blocks = [await self.db.read_raw_block(height) for height in heights]
|
|
||||||
self.logger.info(f'read {len(blocks)} blocks from disk')
|
|
||||||
return blocks
|
|
||||||
except FileNotFoundError:
|
|
||||||
return await self.daemon.raw_blocks(hex_hashes)
|
|
||||||
|
|
||||||
try:
|
|
||||||
start, last, hashes = await self.reorg_hashes(count)
|
|
||||||
# Reverse and convert to hex strings.
|
|
||||||
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
|
|
||||||
self.logger.info("reorg %i block hashes", len(hashes))
|
|
||||||
|
|
||||||
for hex_hashes in chunks(hashes, 50):
|
|
||||||
raw_blocks = await get_raw_blocks(last, hex_hashes)
|
|
||||||
self.logger.info("got %i raw blocks", len(raw_blocks))
|
|
||||||
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
|
||||||
last -= len(raw_blocks)
|
|
||||||
|
|
||||||
await self.prefetcher.reset_height(self.height)
|
|
||||||
self.reorg_count_metric.inc()
|
|
||||||
except:
|
|
||||||
self.logger.exception("boom")
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
self.logger.info("done with reorg")
|
|
||||||
|
|
||||||
async def reorg_hashes(self, count):
|
|
||||||
"""Return a pair (start, last, hashes) of blocks to back up during a
|
|
||||||
reorg.
|
|
||||||
|
|
||||||
The hashes are returned in order of increasing height. Start
|
|
||||||
is the height of the first hash, last of the last.
|
|
||||||
"""
|
|
||||||
|
|
||||||
"""Calculate the reorg range"""
|
|
||||||
|
|
||||||
def diff_pos(hashes1, hashes2):
|
|
||||||
"""Returns the index of the first difference in the hash lists.
|
|
||||||
If both lists match returns their length."""
|
|
||||||
for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)):
|
|
||||||
if hash1 != hash2:
|
|
||||||
return n
|
|
||||||
return len(hashes)
|
|
||||||
|
|
||||||
if count is None:
|
|
||||||
# A real reorg
|
|
||||||
start = self.height - 1
|
|
||||||
count = 1
|
|
||||||
while start > 0:
|
|
||||||
hashes = await self.db.fs_block_hashes(start, count)
|
|
||||||
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
|
|
||||||
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
|
|
||||||
n = diff_pos(hex_hashes, d_hex_hashes)
|
|
||||||
if n > 0:
|
|
||||||
start += n
|
|
||||||
break
|
|
||||||
count = min(count * 2, start)
|
|
||||||
start -= count
|
|
||||||
|
|
||||||
count = (self.height - start) + 1
|
|
||||||
else:
|
|
||||||
start = (self.height - count) + 1
|
|
||||||
last = start + count - 1
|
|
||||||
s = '' if count == 1 else 's'
|
|
||||||
self.logger.info(f'chain was reorganised replacing {count:,d} '
|
|
||||||
f'block{s} at heights {start:,d}-{last:,d}')
|
|
||||||
|
|
||||||
return start, last, await self.db.fs_block_hashes(start, count)
|
|
||||||
|
|
||||||
# - Flushing
|
# - Flushing
|
||||||
def flush_data(self):
|
def flush_data(self):
|
||||||
"""The data for a flush. The lock must be taken."""
|
"""The data for a flush. The lock must be taken."""
|
||||||
assert self.state_lock.locked()
|
assert self.state_lock.locked()
|
||||||
return FlushData(self.height, self.tx_count, self.headers, self.block_hashes,
|
return FlushData(self.height, self.tx_count, self.block_hashes,
|
||||||
self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache,
|
self.block_txs, self.db_op_stack, self.tip)
|
||||||
self.db_deletes, self.tip, self.undo_claims)
|
|
||||||
|
|
||||||
async def flush(self):
|
async def flush(self):
|
||||||
def flush():
|
def flush():
|
||||||
self.db.flush_dbs(self.flush_data())
|
self.db.flush_dbs(self.flush_data())
|
||||||
await self.run_in_thread_with_lock(flush)
|
await self.run_in_thread_with_lock(flush)
|
||||||
|
|
||||||
|
async def write_state(self):
|
||||||
|
def flush():
|
||||||
|
with self.db.db.write_batch() as batch:
|
||||||
|
self.db.write_db_state(batch)
|
||||||
|
|
||||||
|
await self.run_in_thread_with_lock(flush)
|
||||||
|
|
||||||
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
|
def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int,
|
||||||
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]):
|
spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]):
|
||||||
try:
|
try:
|
||||||
|
@ -1167,51 +1105,51 @@ class BlockProcessor:
|
||||||
block_hash = self.coin.header_hash(block.header)
|
block_hash = self.coin.header_hash(block.header)
|
||||||
|
|
||||||
self.block_hashes.append(block_hash)
|
self.block_hashes.append(block_hash)
|
||||||
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
|
self.db_op_stack.append(RevertablePut(*Prefixes.block_hash.pack_item(height, block_hash)))
|
||||||
|
|
||||||
first_tx_num = self.tx_count
|
|
||||||
undo_info = []
|
|
||||||
hashXs_by_tx = []
|
|
||||||
tx_count = self.tx_count
|
tx_count = self.tx_count
|
||||||
|
|
||||||
# Use local vars for speed in the loops
|
# Use local vars for speed in the loops
|
||||||
put_utxo = self.utxo_cache.__setitem__
|
|
||||||
claimtrie_stash_extend = self.db_op_stack.extend
|
|
||||||
spend_utxo = self.spend_utxo
|
spend_utxo = self.spend_utxo
|
||||||
undo_info_append = undo_info.append
|
add_utxo = self.add_utxo
|
||||||
update_touched = self.touched.update
|
|
||||||
append_hashX_by_tx = hashXs_by_tx.append
|
spend_claim_or_support_txo = self._spend_claim_or_support_txo
|
||||||
hashX_from_script = self.coin.hashX_from_script
|
add_claim_or_support = self._add_claim_or_support
|
||||||
|
|
||||||
for tx, tx_hash in txs:
|
for tx, tx_hash in txs:
|
||||||
spent_claims = {}
|
spent_claims = {}
|
||||||
|
|
||||||
hashXs = [] # hashXs touched by spent inputs/rx outputs
|
|
||||||
append_hashX = hashXs.append
|
|
||||||
tx_numb = pack('<I', tx_count)
|
|
||||||
|
|
||||||
txos = Transaction(tx.raw).outputs
|
txos = Transaction(tx.raw).outputs
|
||||||
|
|
||||||
|
self.db_op_stack.extend([
|
||||||
|
RevertablePut(*Prefixes.tx.pack_item(tx_hash, tx.raw)),
|
||||||
|
RevertablePut(*Prefixes.tx_num.pack_item(tx_hash, tx_count)),
|
||||||
|
RevertablePut(*Prefixes.tx_hash.pack_item(tx_count, tx_hash))
|
||||||
|
])
|
||||||
|
|
||||||
# Spend the inputs
|
# Spend the inputs
|
||||||
for txin in tx.inputs:
|
for txin in tx.inputs:
|
||||||
if txin.is_generation():
|
if txin.is_generation():
|
||||||
continue
|
continue
|
||||||
|
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||||
# spend utxo for address histories
|
# spend utxo for address histories
|
||||||
cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
|
hashX = spend_utxo(txin.prev_hash, txin.prev_idx)
|
||||||
undo_info_append(cache_value)
|
if hashX:
|
||||||
append_hashX(cache_value[:-12])
|
# self._set_hashX_cache(hashX)
|
||||||
self._spend_claim_or_support_txo(txin, spent_claims)
|
if txin_num not in self.hashXs_by_tx[hashX]:
|
||||||
|
self.hashXs_by_tx[hashX].append(txin_num)
|
||||||
|
# spend claim/support txo
|
||||||
|
spend_claim_or_support_txo(txin, spent_claims)
|
||||||
|
|
||||||
# Add the new UTXOs
|
# Add the new UTXOs
|
||||||
for nout, txout in enumerate(tx.outputs):
|
for nout, txout in enumerate(tx.outputs):
|
||||||
# Get the hashX. Ignore unspendable outputs
|
# Get the hashX. Ignore unspendable outputs
|
||||||
hashX = hashX_from_script(txout.pk_script)
|
hashX = add_utxo(tx_hash, tx_count, nout, txout)
|
||||||
if hashX:
|
if hashX:
|
||||||
append_hashX(hashX)
|
# self._set_hashX_cache(hashX)
|
||||||
put_utxo(tx_hash + pack('<H', nout), hashX + tx_numb + pack('<Q', txout.value))
|
if tx_count not in self.hashXs_by_tx[hashX]:
|
||||||
|
self.hashXs_by_tx[hashX].append(tx_count)
|
||||||
# add claim/support txo
|
# add claim/support txo
|
||||||
self._add_claim_or_support(
|
add_claim_or_support(
|
||||||
height, tx_hash, tx_count, nout, txos[nout], spent_claims
|
height, tx_hash, tx_count, nout, txos[nout], spent_claims
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1220,8 +1158,6 @@ class BlockProcessor:
|
||||||
# print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}")
|
# print(f"\tabandon {abandoned_claim_hash.hex()} {tx_num} {nout}")
|
||||||
self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)
|
self._abandon_claim(abandoned_claim_hash, tx_num, nout, name)
|
||||||
|
|
||||||
append_hashX_by_tx(hashXs)
|
|
||||||
update_touched(hashXs)
|
|
||||||
self.db.total_transactions.append(tx_hash)
|
self.db.total_transactions.append(tx_hash)
|
||||||
self.db.transaction_num_mapping[tx_hash] = tx_count
|
self.db.transaction_num_mapping[tx_hash] = tx_count
|
||||||
tx_count += 1
|
tx_count += 1
|
||||||
|
@ -1232,31 +1168,34 @@ class BlockProcessor:
|
||||||
# activate claims and process takeovers
|
# activate claims and process takeovers
|
||||||
self._get_takeover_ops(height)
|
self._get_takeover_ops(height)
|
||||||
|
|
||||||
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
|
self.db_op_stack.append(RevertablePut(*Prefixes.header.pack_item(height, block.header)))
|
||||||
_unflushed = self.db.hist_unflushed
|
self.db_op_stack.append(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
|
||||||
_count = 0
|
|
||||||
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
for hashX, new_history in self.hashXs_by_tx.items():
|
||||||
for _hashX in set(_hashXs):
|
if not new_history:
|
||||||
_unflushed[_hashX].append(_tx_num)
|
continue
|
||||||
_count += len(_hashXs)
|
self.db_op_stack.append(
|
||||||
self.db.hist_unflushed_count += _count
|
RevertablePut(
|
||||||
|
*Prefixes.hashX_history.pack_item(
|
||||||
|
hashX, height, new_history
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
self.tx_count = tx_count
|
self.tx_count = tx_count
|
||||||
self.db.tx_counts.append(self.tx_count)
|
self.db.tx_counts.append(self.tx_count)
|
||||||
|
|
||||||
undo_claims = b''.join(op.invert().pack() for op in self.db_op_stack)
|
|
||||||
# print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash)))
|
|
||||||
|
|
||||||
if height >= self.daemon.cached_height() - self.env.reorg_limit:
|
if height >= self.daemon.cached_height() - self.env.reorg_limit:
|
||||||
self.undo_infos.append((undo_info, height))
|
self.db_op_stack.append(RevertablePut(*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())))
|
||||||
self.undo_claims.append((undo_claims, height))
|
|
||||||
self.db.write_raw_block(block.raw, height)
|
|
||||||
|
|
||||||
self.height = height
|
self.height = height
|
||||||
self.headers.append(block.header)
|
self.db.headers.append(block.header)
|
||||||
self.tip = self.coin.header_hash(block.header)
|
self.tip = self.coin.header_hash(block.header)
|
||||||
|
|
||||||
self.db.flush_dbs(self.flush_data())
|
self.db.flush_dbs(self.flush_data())
|
||||||
|
self.clear_after_advance_or_reorg()
|
||||||
|
|
||||||
|
def clear_after_advance_or_reorg(self):
|
||||||
self.db_op_stack.clear()
|
self.db_op_stack.clear()
|
||||||
self.txo_to_claim.clear()
|
self.txo_to_claim.clear()
|
||||||
self.claim_hash_to_txo.clear()
|
self.claim_hash_to_txo.clear()
|
||||||
|
@ -1277,186 +1216,83 @@ class BlockProcessor:
|
||||||
self.expired_claim_hashes.clear()
|
self.expired_claim_hashes.clear()
|
||||||
self.doesnt_have_valid_signature.clear()
|
self.doesnt_have_valid_signature.clear()
|
||||||
self.claim_channels.clear()
|
self.claim_channels.clear()
|
||||||
|
self.utxo_cache.clear()
|
||||||
# for cache in self.search_cache.values():
|
self.hashXs_by_tx.clear()
|
||||||
# cache.clear()
|
|
||||||
self.history_cache.clear()
|
self.history_cache.clear()
|
||||||
self.notifications.notified_mempool_txs.clear()
|
self.notifications.notified_mempool_txs.clear()
|
||||||
|
|
||||||
def backup_blocks(self, raw_blocks):
|
def backup_block(self):
|
||||||
"""Backup the raw blocks and flush.
|
|
||||||
|
|
||||||
The blocks should be in order of decreasing height, starting at.
|
|
||||||
self.height. A flush is performed once the blocks are backed up.
|
|
||||||
"""
|
|
||||||
self.db.assert_flushed(self.flush_data())
|
self.db.assert_flushed(self.flush_data())
|
||||||
assert self.height >= len(raw_blocks)
|
self.logger.info("backup block %i", self.height)
|
||||||
|
# Check and update self.tip
|
||||||
coin = self.coin
|
undo_ops = self.db.read_undo_info(self.height)
|
||||||
for raw_block in raw_blocks:
|
if undo_ops is None:
|
||||||
self.logger.info("backup block %i", self.height)
|
|
||||||
# Check and update self.tip
|
|
||||||
block = coin.block(raw_block, self.height)
|
|
||||||
header_hash = coin.header_hash(block.header)
|
|
||||||
if header_hash != self.tip:
|
|
||||||
raise ChainError('backup block {} not tip {} at height {:,d}'
|
|
||||||
.format(hash_to_hex_str(header_hash),
|
|
||||||
hash_to_hex_str(self.tip),
|
|
||||||
self.height))
|
|
||||||
self.tip = coin.header_prevhash(block.header)
|
|
||||||
self.backup_txs(block.transactions)
|
|
||||||
self.height -= 1
|
|
||||||
self.db.tx_counts.pop()
|
|
||||||
|
|
||||||
# self.touched can include other addresses which is
|
|
||||||
# harmless, but remove None.
|
|
||||||
self.touched.discard(None)
|
|
||||||
|
|
||||||
self.db.flush_backup(self.flush_data(), self.touched)
|
|
||||||
self.logger.info(f'backed up to height {self.height:,d}')
|
|
||||||
|
|
||||||
def backup_txs(self, txs):
|
|
||||||
# Prevout values, in order down the block (coinbase first if present)
|
|
||||||
# undo_info is in reverse block order
|
|
||||||
undo_info, undo_claims = self.db.read_undo_info(self.height)
|
|
||||||
if undo_info is None:
|
|
||||||
raise ChainError(f'no undo information found for height {self.height:,d}')
|
raise ChainError(f'no undo information found for height {self.height:,d}')
|
||||||
n = len(undo_info)
|
self.db_op_stack.apply_packed_undo_ops(undo_ops)
|
||||||
|
self.db_op_stack.append(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops))
|
||||||
# Use local vars for speed in the loops
|
self.db.headers.pop()
|
||||||
s_pack = pack
|
self.block_hashes.pop()
|
||||||
undo_entry_len = 12 + HASHX_LEN
|
self.db.tx_counts.pop()
|
||||||
|
self.tip = self.coin.header_hash(self.db.headers[-1])
|
||||||
for tx, tx_hash in reversed(txs):
|
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
|
||||||
for idx, txout in enumerate(tx.outputs):
|
|
||||||
# Spend the TX outputs. Be careful with unspendable
|
|
||||||
# outputs - we didn't save those in the first place.
|
|
||||||
hashX = self.coin.hashX_from_script(txout.pk_script)
|
|
||||||
if hashX:
|
|
||||||
cache_value = self.spend_utxo(tx_hash, idx)
|
|
||||||
self.touched.add(cache_value[:-12])
|
|
||||||
|
|
||||||
# Restore the inputs
|
|
||||||
for txin in reversed(tx.inputs):
|
|
||||||
if txin.is_generation():
|
|
||||||
continue
|
|
||||||
n -= undo_entry_len
|
|
||||||
undo_item = undo_info[n:n + undo_entry_len]
|
|
||||||
self.utxo_cache[txin.prev_hash + s_pack('<H', txin.prev_idx)] = undo_item
|
|
||||||
self.touched.add(undo_item[:-12])
|
|
||||||
|
|
||||||
self.db.transaction_num_mapping.pop(self.db.total_transactions.pop())
|
self.db.transaction_num_mapping.pop(self.db.total_transactions.pop())
|
||||||
|
self.tx_count -= 1
|
||||||
|
self.height -= 1
|
||||||
|
# self.touched can include other addresses which is
|
||||||
|
# harmless, but remove None.
|
||||||
|
self.touched.discard(None)
|
||||||
|
self.db.flush_backup(self.flush_data())
|
||||||
|
self.clear_after_advance_or_reorg()
|
||||||
|
self.logger.info(f'backed up to height {self.height:,d}')
|
||||||
|
|
||||||
assert n == 0
|
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
|
||||||
self.tx_count -= len(txs)
|
hashX = self.coin.hashX_from_script(txout.pk_script)
|
||||||
self.undo_claims.append((undo_claims, self.height))
|
if hashX:
|
||||||
|
self.utxo_cache[(tx_hash, nout)] = hashX
|
||||||
"""An in-memory UTXO cache, representing all changes to UTXO state
|
self.db_op_stack.extend([
|
||||||
since the last DB flush.
|
RevertablePut(
|
||||||
|
*Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value)
|
||||||
We want to store millions of these in memory for optimal
|
),
|
||||||
performance during initial sync, because then it is possible to
|
RevertablePut(
|
||||||
spend UTXOs without ever going to the database (other than as an
|
*Prefixes.hashX_utxo.pack_item(tx_hash[:4], tx_num, nout, hashX)
|
||||||
entry in the address history, and there is only one such entry per
|
)
|
||||||
TX not per UTXO). So store them in a Python dictionary with
|
])
|
||||||
binary keys and values.
|
return hashX
|
||||||
|
|
||||||
Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes)
|
|
||||||
Value: HASHX + TX_NUM + VALUE (11 + 4 + 8 = 23 bytes)
|
|
||||||
|
|
||||||
That's 57 bytes of raw data in-memory. Python dictionary overhead
|
|
||||||
means each entry actually uses about 205 bytes of memory. So
|
|
||||||
almost 5 million UTXOs can fit in 1GB of RAM. There are
|
|
||||||
approximately 42 million UTXOs on bitcoin mainnet at height
|
|
||||||
433,000.
|
|
||||||
|
|
||||||
Semantics:
|
|
||||||
|
|
||||||
add: Add it to the cache dictionary.
|
|
||||||
|
|
||||||
spend: Remove it if in the cache dictionary. Otherwise it's
|
|
||||||
been flushed to the DB. Each UTXO is responsible for two
|
|
||||||
entries in the DB. Mark them for deletion in the next
|
|
||||||
cache flush.
|
|
||||||
|
|
||||||
The UTXO database format has to be able to do two things efficiently:
|
|
||||||
|
|
||||||
1. Given an address be able to list its UTXOs and their values
|
|
||||||
so its balance can be efficiently computed.
|
|
||||||
|
|
||||||
2. When processing transactions, for each prevout spent - a (tx_hash,
|
|
||||||
idx) pair - we have to be able to remove it from the DB. To send
|
|
||||||
notifications to clients we also need to know any address it paid
|
|
||||||
to.
|
|
||||||
|
|
||||||
To this end we maintain two "tables", one for each point above:
|
|
||||||
|
|
||||||
1. Key: b'u' + address_hashX + tx_idx + tx_num
|
|
||||||
Value: the UTXO value as a 64-bit unsigned integer
|
|
||||||
|
|
||||||
2. Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
|
||||||
Value: hashX
|
|
||||||
|
|
||||||
The compressed tx hash is just the first few bytes of the hash of
|
|
||||||
the tx in which the UTXO was created. As this is not unique there
|
|
||||||
will be potential collisions so tx_num is also in the key. When
|
|
||||||
looking up a UTXO the prefix space of the compressed hash needs to
|
|
||||||
be searched and resolved if necessary with the tx_num. The
|
|
||||||
collision rate is low (<0.1%).
|
|
||||||
"""
|
|
||||||
|
|
||||||
def spend_utxo(self, tx_hash, tx_idx):
|
|
||||||
"""Spend a UTXO and return the 33-byte value.
|
|
||||||
|
|
||||||
If the UTXO is not in the cache it must be on disk. We store
|
|
||||||
all UTXOs so not finding one indicates a logic error or DB
|
|
||||||
corruption.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
def spend_utxo(self, tx_hash: bytes, nout: int):
|
||||||
# Fast track is it being in the cache
|
# Fast track is it being in the cache
|
||||||
idx_packed = pack('<H', tx_idx)
|
cache_value = self.utxo_cache.pop((tx_hash, nout), None)
|
||||||
cache_value = self.utxo_cache.pop(tx_hash + idx_packed, None)
|
|
||||||
if cache_value:
|
if cache_value:
|
||||||
return cache_value
|
return cache_value
|
||||||
|
|
||||||
# Spend it from the DB.
|
prefix = Prefixes.hashX_utxo.pack_partial_key(tx_hash[:4])
|
||||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
|
||||||
# Value: hashX
|
|
||||||
prefix = DB_PREFIXES.HASHX_UTXO_PREFIX.value + tx_hash[:4] + idx_packed
|
|
||||||
candidates = {db_key: hashX for db_key, hashX in self.db.db.iterator(prefix=prefix)}
|
candidates = {db_key: hashX for db_key, hashX in self.db.db.iterator(prefix=prefix)}
|
||||||
|
|
||||||
for hdb_key, hashX in candidates.items():
|
for hdb_key, hashX in candidates.items():
|
||||||
tx_num_packed = hdb_key[-4:]
|
key = Prefixes.hashX_utxo.unpack_key(hdb_key)
|
||||||
if len(candidates) > 1:
|
if len(candidates) > 1:
|
||||||
tx_num, = unpack('<I', tx_num_packed)
|
hash = self.db.total_transactions[key.tx_num]
|
||||||
try:
|
|
||||||
hash, height = self.db.fs_tx_hash(tx_num)
|
|
||||||
except IndexError:
|
|
||||||
self.logger.error("data integrity error for hashx history: %s missing tx #%s (%s:%s)",
|
|
||||||
hashX.hex(), tx_num, hash_to_hex_str(tx_hash), tx_idx)
|
|
||||||
continue
|
|
||||||
if hash != tx_hash:
|
if hash != tx_hash:
|
||||||
assert hash is not None # Should always be found
|
assert hash is not None # Should always be found
|
||||||
continue
|
continue
|
||||||
|
if key.nout != nout:
|
||||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
continue
|
||||||
# Value: the UTXO value as a 64-bit unsigned integer
|
udb_key = Prefixes.utxo.pack_key(hashX, key.tx_num, nout)
|
||||||
udb_key = DB_PREFIXES.UTXO_PREFIX.value + hashX + hdb_key[-6:]
|
|
||||||
utxo_value_packed = self.db.db.get(udb_key)
|
utxo_value_packed = self.db.db.get(udb_key)
|
||||||
if utxo_value_packed is None:
|
if utxo_value_packed is None:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX)
|
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), nout, hash_to_hex_str(hashX)
|
||||||
)
|
)
|
||||||
raise ChainError(f"{hash_to_hex_str(tx_hash)}:{tx_idx} is not found in UTXO db for {hash_to_hex_str(hashX)}")
|
raise ChainError(f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}")
|
||||||
# Remove both entries for this UTXO
|
# Remove both entries for this UTXO
|
||||||
self.db_deletes.append(hdb_key)
|
self.db_op_stack.extend([
|
||||||
self.db_deletes.append(udb_key)
|
RevertableDelete(hdb_key, hashX),
|
||||||
|
RevertableDelete(udb_key, utxo_value_packed)
|
||||||
return hashX + tx_num_packed + utxo_value_packed
|
])
|
||||||
|
return hashX
|
||||||
|
|
||||||
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')
|
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')
|
||||||
raise ChainError('UTXO {} / {:,d} not found in "h" table'
|
raise ChainError('UTXO {} / {:,d} not found in "h" table'
|
||||||
.format(hash_to_hex_str(tx_hash), tx_idx))
|
.format(hash_to_hex_str(tx_hash), nout))
|
||||||
|
|
||||||
async def _process_prefetched_blocks(self):
|
async def _process_prefetched_blocks(self):
|
||||||
"""Loop forever processing blocks as they arrive."""
|
"""Loop forever processing blocks as they arrive."""
|
||||||
|
@ -1467,23 +1303,19 @@ class BlockProcessor:
|
||||||
self._caught_up_event.set()
|
self._caught_up_event.set()
|
||||||
await self.blocks_event.wait()
|
await self.blocks_event.wait()
|
||||||
self.blocks_event.clear()
|
self.blocks_event.clear()
|
||||||
if self.reorg_count: # this could only happen by calling the reorg rpc
|
blocks = self.prefetcher.get_prefetched_blocks()
|
||||||
await self.reorg_chain(self.reorg_count)
|
try:
|
||||||
self.reorg_count = 0
|
await self.check_and_advance_blocks(blocks)
|
||||||
else:
|
except Exception:
|
||||||
blocks = self.prefetcher.get_prefetched_blocks()
|
self.logger.exception("error while processing txs")
|
||||||
try:
|
raise
|
||||||
await self.check_and_advance_blocks(blocks)
|
|
||||||
except Exception:
|
|
||||||
self.logger.exception("error while processing txs")
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def _first_caught_up(self):
|
async def _first_caught_up(self):
|
||||||
self.logger.info(f'caught up to height {self.height}')
|
self.logger.info(f'caught up to height {self.height}')
|
||||||
# Flush everything but with first_sync->False state.
|
# Flush everything but with first_sync->False state.
|
||||||
first_sync = self.db.first_sync
|
first_sync = self.db.first_sync
|
||||||
self.db.first_sync = False
|
self.db.first_sync = False
|
||||||
await self.flush()
|
await self.write_state()
|
||||||
if first_sync:
|
if first_sync:
|
||||||
self.logger.info(f'{lbry.__version__} synced to '
|
self.logger.info(f'{lbry.__version__} synced to '
|
||||||
f'height {self.height:,d}, halting here.')
|
f'height {self.height:,d}, halting here.')
|
||||||
|
|
|
@ -65,16 +65,10 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
|
||||||
class FlushData:
|
class FlushData:
|
||||||
height = attr.ib()
|
height = attr.ib()
|
||||||
tx_count = attr.ib()
|
tx_count = attr.ib()
|
||||||
headers = attr.ib()
|
|
||||||
block_hashes = attr.ib()
|
block_hashes = attr.ib()
|
||||||
block_txs = attr.ib()
|
block_txs = attr.ib()
|
||||||
claimtrie_stash = attr.ib()
|
put_and_delete_ops = attr.ib()
|
||||||
# The following are flushed to the UTXO DB if undo_infos is not None
|
|
||||||
undo_infos = attr.ib()
|
|
||||||
adds = attr.ib()
|
|
||||||
deletes = attr.ib()
|
|
||||||
tip = attr.ib()
|
tip = attr.ib()
|
||||||
undo = attr.ib()
|
|
||||||
|
|
||||||
|
|
||||||
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
|
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]]
|
||||||
|
@ -143,9 +137,6 @@ class LevelDB:
|
||||||
self.merkle = Merkle()
|
self.merkle = Merkle()
|
||||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||||
|
|
||||||
self.headers_db = None
|
|
||||||
self.tx_db = None
|
|
||||||
|
|
||||||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||||
self.total_transactions = None
|
self.total_transactions = None
|
||||||
self.transaction_num_mapping = {}
|
self.transaction_num_mapping = {}
|
||||||
|
@ -748,61 +739,8 @@ class LevelDB:
|
||||||
raise RuntimeError(msg)
|
raise RuntimeError(msg)
|
||||||
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
|
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
|
||||||
|
|
||||||
# self.history.clear_excess(self.utxo_flush_count)
|
|
||||||
# < might happen at end of compaction as both DBs cannot be
|
|
||||||
# updated atomically
|
|
||||||
if self.hist_flush_count > self.utxo_flush_count:
|
|
||||||
self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...')
|
|
||||||
|
|
||||||
keys = []
|
|
||||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value):
|
|
||||||
k = key[1:]
|
|
||||||
flush_id = int.from_bytes(k[-4:], byteorder='big')
|
|
||||||
if flush_id > self.utxo_flush_count:
|
|
||||||
keys.append(k)
|
|
||||||
|
|
||||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
|
||||||
|
|
||||||
self.hist_flush_count = self.utxo_flush_count
|
|
||||||
with self.db.write_batch() as batch:
|
|
||||||
for key in keys:
|
|
||||||
batch.delete(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key)
|
|
||||||
if keys:
|
|
||||||
self.logger.info('deleted %i excess history entries', len(keys))
|
|
||||||
|
|
||||||
self.utxo_flush_count = self.hist_flush_count
|
self.utxo_flush_count = self.hist_flush_count
|
||||||
|
|
||||||
min_height = self.min_undo_height(self.db_height)
|
|
||||||
keys = []
|
|
||||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value):
|
|
||||||
height, = unpack('>I', key[-4:])
|
|
||||||
if height >= min_height:
|
|
||||||
break
|
|
||||||
keys.append(key)
|
|
||||||
if min_height > 0:
|
|
||||||
for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value,
|
|
||||||
stop=Prefixes.undo.pack_key(min_height),
|
|
||||||
include_value=False):
|
|
||||||
keys.append(key)
|
|
||||||
if keys:
|
|
||||||
with self.db.write_batch() as batch:
|
|
||||||
for key in keys:
|
|
||||||
batch.delete(key)
|
|
||||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
|
||||||
|
|
||||||
# delete old block files
|
|
||||||
prefix = self.raw_block_prefix()
|
|
||||||
paths = [path for path in glob(f'{prefix}[0-9]*')
|
|
||||||
if len(path) > len(prefix)
|
|
||||||
and int(path[len(prefix):]) < min_height]
|
|
||||||
if paths:
|
|
||||||
for path in paths:
|
|
||||||
try:
|
|
||||||
os.remove(path)
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass
|
|
||||||
self.logger.info(f'deleted {len(paths):,d} stale block files')
|
|
||||||
|
|
||||||
# Read TX counts (requires meta directory)
|
# Read TX counts (requires meta directory)
|
||||||
await self._read_tx_counts()
|
await self._read_tx_counts()
|
||||||
if self.total_transactions is None:
|
if self.total_transactions is None:
|
||||||
|
@ -836,129 +774,50 @@ class LevelDB:
|
||||||
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
|
assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count
|
||||||
assert flush_data.height == self.fs_height == self.db_height
|
assert flush_data.height == self.fs_height == self.db_height
|
||||||
assert flush_data.tip == self.db_tip
|
assert flush_data.tip == self.db_tip
|
||||||
assert not flush_data.headers
|
|
||||||
assert not flush_data.block_txs
|
assert not flush_data.block_txs
|
||||||
assert not flush_data.adds
|
assert not len(flush_data.put_and_delete_ops)
|
||||||
assert not flush_data.deletes
|
|
||||||
assert not flush_data.undo_infos
|
|
||||||
assert not self.hist_unflushed
|
|
||||||
|
|
||||||
def flush_dbs(self, flush_data: FlushData):
|
def flush_dbs(self, flush_data: FlushData):
|
||||||
"""Flush out cached state. History is always flushed; UTXOs are
|
|
||||||
flushed if flush_utxos."""
|
|
||||||
|
|
||||||
if flush_data.height == self.db_height:
|
if flush_data.height == self.db_height:
|
||||||
self.assert_flushed(flush_data)
|
self.assert_flushed(flush_data)
|
||||||
return
|
return
|
||||||
|
|
||||||
# start_time = time.time()
|
min_height = self.min_undo_height(self.db_height)
|
||||||
prior_flush = self.last_flush
|
delete_undo_keys = []
|
||||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
if min_height > 0:
|
||||||
|
delete_undo_keys.extend(
|
||||||
# Flush to file system
|
self.db.iterator(
|
||||||
# self.flush_fs(flush_data)
|
start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False
|
||||||
prior_tx_count = (self.tx_counts[self.fs_height]
|
)
|
||||||
if self.fs_height >= 0 else 0)
|
)
|
||||||
|
|
||||||
assert len(flush_data.block_txs) == len(flush_data.headers)
|
|
||||||
assert flush_data.height == self.fs_height + len(flush_data.headers)
|
|
||||||
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
|
|
||||||
else 0)
|
|
||||||
assert len(self.tx_counts) == flush_data.height + 1
|
|
||||||
assert len(
|
|
||||||
b''.join(hashes for hashes, _ in flush_data.block_txs)
|
|
||||||
) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}"
|
|
||||||
|
|
||||||
# Write the headers
|
|
||||||
# start_time = time.perf_counter()
|
|
||||||
|
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
self.put = batch.put
|
batch_put = batch.put
|
||||||
batch_put = self.put
|
|
||||||
batch_delete = batch.delete
|
batch_delete = batch.delete
|
||||||
height_start = self.fs_height + 1
|
|
||||||
tx_num = prior_tx_count
|
for staged_change in flush_data.put_and_delete_ops:
|
||||||
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(
|
|
||||||
zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
|
|
||||||
batch_put(DB_PREFIXES.HEADER_PREFIX.value + util.pack_be_uint64(height_start), header)
|
|
||||||
self.headers.append(header)
|
|
||||||
tx_count = self.tx_counts[height_start]
|
|
||||||
batch_put(DB_PREFIXES.BLOCK_HASH_PREFIX.value + util.pack_be_uint64(height_start), block_hash[::-1])
|
|
||||||
batch_put(DB_PREFIXES.TX_COUNT_PREFIX.value + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
|
||||||
height_start += 1
|
|
||||||
offset = 0
|
|
||||||
while offset < len(tx_hashes):
|
|
||||||
batch_put(DB_PREFIXES.TX_HASH_PREFIX.value + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
|
|
||||||
batch_put(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
|
|
||||||
batch_put(DB_PREFIXES.TX_PREFIX.value + tx_hashes[offset:offset + 32], txs[offset // 32])
|
|
||||||
tx_num += 1
|
|
||||||
offset += 32
|
|
||||||
flush_data.headers.clear()
|
|
||||||
flush_data.block_txs.clear()
|
|
||||||
flush_data.block_hashes.clear()
|
|
||||||
for staged_change in flush_data.claimtrie_stash:
|
|
||||||
# print("ADVANCE", staged_change)
|
|
||||||
if staged_change.is_put:
|
if staged_change.is_put:
|
||||||
batch_put(staged_change.key, staged_change.value)
|
batch_put(staged_change.key, staged_change.value)
|
||||||
else:
|
else:
|
||||||
batch_delete(staged_change.key)
|
batch_delete(staged_change.key)
|
||||||
flush_data.claimtrie_stash.clear()
|
for delete_key in delete_undo_keys:
|
||||||
|
batch_delete(delete_key)
|
||||||
for undo_ops, height in flush_data.undo:
|
|
||||||
batch_put(*Prefixes.undo.pack_item(height, undo_ops))
|
|
||||||
flush_data.undo.clear()
|
|
||||||
|
|
||||||
self.fs_height = flush_data.height
|
self.fs_height = flush_data.height
|
||||||
self.fs_tx_count = flush_data.tx_count
|
self.fs_tx_count = flush_data.tx_count
|
||||||
|
|
||||||
# Then history
|
|
||||||
self.hist_flush_count += 1
|
self.hist_flush_count += 1
|
||||||
flush_id = util.pack_be_uint32(self.hist_flush_count)
|
|
||||||
unflushed = self.hist_unflushed
|
|
||||||
|
|
||||||
for hashX in sorted(unflushed):
|
|
||||||
key = hashX + flush_id
|
|
||||||
batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes())
|
|
||||||
|
|
||||||
unflushed.clear()
|
|
||||||
self.hist_unflushed_count = 0
|
self.hist_unflushed_count = 0
|
||||||
|
|
||||||
#########################
|
|
||||||
|
|
||||||
# New undo information
|
|
||||||
for undo_info, height in flush_data.undo_infos:
|
|
||||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
|
||||||
flush_data.undo_infos.clear()
|
|
||||||
|
|
||||||
# Spends
|
|
||||||
for key in sorted(flush_data.deletes):
|
|
||||||
batch_delete(key)
|
|
||||||
flush_data.deletes.clear()
|
|
||||||
|
|
||||||
# New UTXOs
|
|
||||||
for key, value in flush_data.adds.items():
|
|
||||||
# suffix = tx_idx + tx_num
|
|
||||||
hashX = value[:-12]
|
|
||||||
suffix = key[-2:] + value[-12:-8]
|
|
||||||
batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX)
|
|
||||||
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
|
|
||||||
flush_data.adds.clear()
|
|
||||||
|
|
||||||
self.utxo_flush_count = self.hist_flush_count
|
self.utxo_flush_count = self.hist_flush_count
|
||||||
self.db_height = flush_data.height
|
self.db_height = flush_data.height
|
||||||
self.db_tx_count = flush_data.tx_count
|
self.db_tx_count = flush_data.tx_count
|
||||||
self.db_tip = flush_data.tip
|
self.db_tip = flush_data.tip
|
||||||
|
self.last_flush_tx_count = self.fs_tx_count
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self.wall_time += now - self.last_flush
|
self.wall_time += now - self.last_flush
|
||||||
self.last_flush = now
|
self.last_flush = now
|
||||||
self.last_flush_tx_count = self.fs_tx_count
|
|
||||||
|
|
||||||
self.write_db_state(batch)
|
self.write_db_state(batch)
|
||||||
|
|
||||||
def flush_backup(self, flush_data, touched):
|
def flush_backup(self, flush_data):
|
||||||
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
|
|
||||||
assert not flush_data.headers
|
|
||||||
assert not flush_data.block_txs
|
assert not flush_data.block_txs
|
||||||
assert flush_data.height < self.db_height
|
assert flush_data.height < self.db_height
|
||||||
assert not self.hist_unflushed
|
assert not self.hist_unflushed
|
||||||
|
@ -974,82 +833,25 @@ class LevelDB:
|
||||||
self.hist_flush_count += 1
|
self.hist_flush_count += 1
|
||||||
nremoves = 0
|
nremoves = 0
|
||||||
|
|
||||||
undo_ops = RevertableOpStack(self.db.get)
|
|
||||||
|
|
||||||
for (packed_ops, height) in reversed(flush_data.undo):
|
|
||||||
undo_ops.extend(reversed(RevertableOp.unpack_stack(packed_ops)))
|
|
||||||
undo_ops.append(
|
|
||||||
RevertableDelete(*Prefixes.undo.pack_item(height, packed_ops))
|
|
||||||
)
|
|
||||||
|
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
batch_delete = batch.delete
|
batch_delete = batch.delete
|
||||||
|
for op in flush_data.put_and_delete_ops:
|
||||||
# print("flush undos", flush_data.undo_claimtrie)
|
|
||||||
for op in undo_ops:
|
|
||||||
# print("REWIND", op)
|
# print("REWIND", op)
|
||||||
if op.is_put:
|
if op.is_put:
|
||||||
batch_put(op.key, op.value)
|
batch_put(op.key, op.value)
|
||||||
else:
|
else:
|
||||||
batch_delete(op.key)
|
batch_delete(op.key)
|
||||||
|
|
||||||
flush_data.undo.clear()
|
|
||||||
|
|
||||||
while self.fs_height > flush_data.height:
|
while self.fs_height > flush_data.height:
|
||||||
self.fs_height -= 1
|
self.fs_height -= 1
|
||||||
self.headers.pop()
|
|
||||||
tx_count = flush_data.tx_count
|
|
||||||
for hashX in sorted(touched):
|
|
||||||
deletes = []
|
|
||||||
puts = {}
|
|
||||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value + hashX, reverse=True):
|
|
||||||
k = key[1:]
|
|
||||||
a = array.array('I')
|
|
||||||
a.frombytes(hist)
|
|
||||||
# Remove all history entries >= tx_count
|
|
||||||
idx = bisect_left(a, tx_count)
|
|
||||||
nremoves += len(a) - idx
|
|
||||||
if idx > 0:
|
|
||||||
puts[k] = a[:idx].tobytes()
|
|
||||||
break
|
|
||||||
deletes.append(k)
|
|
||||||
|
|
||||||
for key in deletes:
|
|
||||||
batch_delete(key)
|
|
||||||
for key, value in puts.items():
|
|
||||||
batch_put(key, value)
|
|
||||||
|
|
||||||
# New undo information
|
|
||||||
for undo_info, height in flush_data.undo:
|
|
||||||
batch.put(self.undo_key(height), b''.join(undo_info))
|
|
||||||
flush_data.undo.clear()
|
|
||||||
|
|
||||||
# Spends
|
|
||||||
for key in sorted(flush_data.deletes):
|
|
||||||
batch_delete(key)
|
|
||||||
flush_data.deletes.clear()
|
|
||||||
|
|
||||||
# New UTXOs
|
|
||||||
for key, value in flush_data.adds.items():
|
|
||||||
# suffix = tx_idx + tx_num
|
|
||||||
hashX = value[:-12]
|
|
||||||
suffix = key[-2:] + value[-12:-8]
|
|
||||||
batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX)
|
|
||||||
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
|
|
||||||
flush_data.adds.clear()
|
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
add_count = len(flush_data.adds)
|
|
||||||
spend_count = len(flush_data.deletes) // 2
|
|
||||||
|
|
||||||
if self.db.for_sync:
|
if self.db.for_sync:
|
||||||
block_count = flush_data.height - self.db_height
|
block_count = flush_data.height - self.db_height
|
||||||
tx_count = flush_data.tx_count - self.db_tx_count
|
tx_count = flush_data.tx_count - self.db_tx_count
|
||||||
elapsed = time.time() - start_time
|
elapsed = time.time() - start_time
|
||||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||||
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
|
f'{tx_count:,d} txs in '
|
||||||
f'{spend_count:,d} spends in '
|
|
||||||
f'{elapsed:.1f}s, committing...')
|
f'{elapsed:.1f}s, committing...')
|
||||||
|
|
||||||
self.utxo_flush_count = self.hist_flush_count
|
self.utxo_flush_count = self.hist_flush_count
|
||||||
|
@ -1121,7 +923,6 @@ class LevelDB:
|
||||||
return None, tx_height
|
return None, tx_height
|
||||||
|
|
||||||
def _fs_transactions(self, txids: Iterable[str]):
|
def _fs_transactions(self, txids: Iterable[str]):
|
||||||
unpack_be_uint64 = util.unpack_be_uint64
|
|
||||||
tx_counts = self.tx_counts
|
tx_counts = self.tx_counts
|
||||||
tx_db_get = self.db.get
|
tx_db_get = self.db.get
|
||||||
tx_cache = self._tx_and_merkle_cache
|
tx_cache = self._tx_and_merkle_cache
|
||||||
|
@ -1133,14 +934,12 @@ class LevelDB:
|
||||||
tx, merkle = cached_tx
|
tx, merkle = cached_tx
|
||||||
else:
|
else:
|
||||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||||
tx_num = tx_db_get(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hash_bytes)
|
tx_num = self.transaction_num_mapping.get(tx_hash_bytes)
|
||||||
tx = None
|
tx = None
|
||||||
tx_height = -1
|
tx_height = -1
|
||||||
if tx_num is not None:
|
if tx_num is not None:
|
||||||
tx_num = unpack_be_uint64(tx_num)
|
|
||||||
tx_height = bisect_right(tx_counts, tx_num)
|
tx_height = bisect_right(tx_counts, tx_num)
|
||||||
if tx_height < self.db_height:
|
tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes))
|
||||||
tx = tx_db_get(DB_PREFIXES.TX_PREFIX.value + tx_hash_bytes)
|
|
||||||
if tx_height == -1:
|
if tx_height == -1:
|
||||||
merkle = {
|
merkle = {
|
||||||
'block_height': -1
|
'block_height': -1
|
||||||
|
@ -1204,67 +1003,10 @@ class LevelDB:
|
||||||
|
|
||||||
def undo_key(self, height: int) -> bytes:
|
def undo_key(self, height: int) -> bytes:
|
||||||
"""DB key for undo information at the given height."""
|
"""DB key for undo information at the given height."""
|
||||||
return DB_PREFIXES.UNDO_PREFIX.value + pack('>I', height)
|
return Prefixes.undo.pack_key(height)
|
||||||
|
|
||||||
def read_undo_info(self, height):
|
def read_undo_info(self, height: int):
|
||||||
"""Read undo information from a file for the current height."""
|
return self.db.get(Prefixes.undo.pack_key(height))
|
||||||
return self.db.get(self.undo_key(height)), self.db.get(Prefixes.undo.pack_key(self.fs_height))
|
|
||||||
|
|
||||||
def raw_block_prefix(self):
|
|
||||||
return 'block'
|
|
||||||
|
|
||||||
def raw_block_path(self, height):
|
|
||||||
return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}')
|
|
||||||
|
|
||||||
async def read_raw_block(self, height):
|
|
||||||
"""Returns a raw block read from disk. Raises FileNotFoundError
|
|
||||||
if the block isn't on-disk."""
|
|
||||||
|
|
||||||
def read():
|
|
||||||
with util.open_file(self.raw_block_path(height)) as f:
|
|
||||||
return f.read(-1)
|
|
||||||
|
|
||||||
return await asyncio.get_event_loop().run_in_executor(self.executor, read)
|
|
||||||
|
|
||||||
def write_raw_block(self, block, height):
|
|
||||||
"""Write a raw block to disk."""
|
|
||||||
with util.open_truncate(self.raw_block_path(height)) as f:
|
|
||||||
f.write(block)
|
|
||||||
# Delete old blocks to prevent them accumulating
|
|
||||||
try:
|
|
||||||
del_height = self.min_undo_height(height) - 1
|
|
||||||
os.remove(self.raw_block_path(del_height))
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def clear_excess_undo_info(self):
|
|
||||||
"""Clear excess undo info. Only most recent N are kept."""
|
|
||||||
min_height = self.min_undo_height(self.db_height)
|
|
||||||
keys = []
|
|
||||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value):
|
|
||||||
height, = unpack('>I', key[-4:])
|
|
||||||
if height >= min_height:
|
|
||||||
break
|
|
||||||
keys.append(key)
|
|
||||||
|
|
||||||
if keys:
|
|
||||||
with self.db.write_batch() as batch:
|
|
||||||
for key in keys:
|
|
||||||
batch.delete(key)
|
|
||||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
|
||||||
|
|
||||||
# delete old block files
|
|
||||||
prefix = self.raw_block_prefix()
|
|
||||||
paths = [path for path in glob(f'{prefix}[0-9]*')
|
|
||||||
if len(path) > len(prefix)
|
|
||||||
and int(path[len(prefix):]) < min_height]
|
|
||||||
if paths:
|
|
||||||
for path in paths:
|
|
||||||
try:
|
|
||||||
os.remove(path)
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass
|
|
||||||
self.logger.info(f'deleted {len(paths):,d} stale block files')
|
|
||||||
|
|
||||||
# -- UTXO database
|
# -- UTXO database
|
||||||
|
|
||||||
|
|
|
@ -82,7 +82,7 @@ class BaseResolveTestCase(CommandTestCase):
|
||||||
check_supports(c['claimId'], c['supports'])
|
check_supports(c['claimId'], c['supports'])
|
||||||
claim_hash = bytes.fromhex(c['claimId'])
|
claim_hash = bytes.fromhex(c['claimId'])
|
||||||
self.assertEqual(c['validAtHeight'], db.get_activation(
|
self.assertEqual(c['validAtHeight'], db.get_activation(
|
||||||
db.total_transactions.index(bytes.fromhex(c['txId'])[::-1]), c['n']
|
db.transaction_num_mapping[bytes.fromhex(c['txId'])[::-1]], c['n']
|
||||||
))
|
))
|
||||||
self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash))
|
self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash))
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue