forked from LBRYCommunity/lbry-sdk
claims db
-move all leveldb prefixes to DB_PREFIXES enum -add serializable RevertableOp interface for key/value puts and deletes -resolve urls from leveldb
This commit is contained in:
parent
0a833f5f83
commit
b40cda78ee
9 changed files with 1467 additions and 137 deletions
|
@ -1,23 +1,28 @@
|
|||
import base64
|
||||
import struct
|
||||
from typing import List
|
||||
from typing import List, TYPE_CHECKING, Union
|
||||
from binascii import hexlify
|
||||
from itertools import chain
|
||||
|
||||
from lbry.error import ResolveCensoredError
|
||||
from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage
|
||||
from lbry.schema.types.v2.result_pb2 import Error as ErrorMessage
|
||||
if TYPE_CHECKING:
|
||||
from lbry.wallet.server.leveldb import ResolveResult
|
||||
|
||||
INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID)
|
||||
NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND)
|
||||
BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED)
|
||||
|
||||
|
||||
def set_reference(reference, txo_row):
|
||||
if txo_row:
|
||||
reference.tx_hash = txo_row['txo_hash'][:32]
|
||||
reference.nout = struct.unpack('<I', txo_row['txo_hash'][32:])[0]
|
||||
reference.height = txo_row['height']
|
||||
def set_reference(reference, claim_hash, rows):
|
||||
if claim_hash:
|
||||
for txo in rows:
|
||||
if claim_hash == txo.claim_hash:
|
||||
reference.tx_hash = txo.tx_hash
|
||||
reference.nout = txo.position
|
||||
reference.height = txo.height
|
||||
return
|
||||
|
||||
|
||||
class Censor:
|
||||
|
@ -40,7 +45,7 @@ class Censor:
|
|||
|
||||
def censor(self, row) -> bool:
|
||||
if self.is_censored(row):
|
||||
censoring_channel_hash = bytes.fromhex(row['censoring_channel_id'])[::-1]
|
||||
censoring_channel_hash = row['censoring_channel_hash']
|
||||
self.censored.setdefault(censoring_channel_hash, set())
|
||||
self.censored[censoring_channel_hash].add(row['tx_hash'])
|
||||
return True
|
||||
|
@ -174,46 +179,49 @@ class Outputs:
|
|||
page.offset = offset
|
||||
if total is not None:
|
||||
page.total = total
|
||||
if blocked is not None:
|
||||
blocked.to_message(page, extra_txo_rows)
|
||||
# if blocked is not None:
|
||||
# blocked.to_message(page, extra_txo_rows)
|
||||
for row in extra_txo_rows:
|
||||
cls.encode_txo(page.extra_txos.add(), row)
|
||||
|
||||
for row in txo_rows:
|
||||
cls.row_to_message(row, page.txos.add(), extra_txo_rows)
|
||||
for row in extra_txo_rows.values():
|
||||
cls.row_to_message(row, page.extra_txos.add(), extra_txo_rows)
|
||||
# cls.row_to_message(row, page.txos.add(), extra_txo_rows)
|
||||
txo_message: 'OutputsMessage' = page.txos.add()
|
||||
cls.encode_txo(txo_message, row)
|
||||
if not isinstance(row, Exception):
|
||||
if row.channel_hash:
|
||||
set_reference(txo_message.claim.channel, row.channel_hash, extra_txo_rows)
|
||||
if row.reposted_claim_hash:
|
||||
set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows)
|
||||
# set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows)
|
||||
return page.SerializeToString()
|
||||
|
||||
@classmethod
|
||||
def row_to_message(cls, txo, txo_message, extra_row_dict: dict):
|
||||
if isinstance(txo, Exception):
|
||||
txo_message.error.text = txo.args[0]
|
||||
if isinstance(txo, ValueError):
|
||||
def encode_txo(cls, txo_message, resolve_result: Union['ResolveResult', Exception]):
|
||||
if isinstance(resolve_result, Exception):
|
||||
txo_message.error.text = resolve_result.args[0]
|
||||
if isinstance(resolve_result, ValueError):
|
||||
txo_message.error.code = ErrorMessage.INVALID
|
||||
elif isinstance(txo, LookupError):
|
||||
elif isinstance(resolve_result, LookupError):
|
||||
txo_message.error.code = ErrorMessage.NOT_FOUND
|
||||
elif isinstance(txo, ResolveCensoredError):
|
||||
elif isinstance(resolve_result, ResolveCensoredError):
|
||||
txo_message.error.code = ErrorMessage.BLOCKED
|
||||
set_reference(txo_message.error.blocked.channel, extra_row_dict.get(bytes.fromhex(txo.censor_id)[::-1]))
|
||||
return
|
||||
txo_message.tx_hash = txo['txo_hash'][:32]
|
||||
txo_message.nout, = struct.unpack('<I', txo['txo_hash'][32:])
|
||||
txo_message.height = txo['height']
|
||||
txo_message.claim.short_url = txo['short_url']
|
||||
txo_message.claim.reposted = txo['reposted']
|
||||
if txo['canonical_url'] is not None:
|
||||
txo_message.claim.canonical_url = txo['canonical_url']
|
||||
txo_message.claim.is_controlling = bool(txo['is_controlling'])
|
||||
if txo['last_take_over_height'] is not None:
|
||||
txo_message.claim.take_over_height = txo['last_take_over_height']
|
||||
txo_message.claim.creation_height = txo['creation_height']
|
||||
txo_message.claim.activation_height = txo['activation_height']
|
||||
txo_message.claim.expiration_height = txo['expiration_height']
|
||||
if txo['claims_in_channel'] is not None:
|
||||
txo_message.claim.claims_in_channel = txo['claims_in_channel']
|
||||
txo_message.claim.effective_amount = txo['effective_amount']
|
||||
txo_message.claim.support_amount = txo['support_amount']
|
||||
txo_message.claim.trending_group = txo['trending_group']
|
||||
txo_message.claim.trending_mixed = txo['trending_mixed']
|
||||
txo_message.claim.trending_local = txo['trending_local']
|
||||
txo_message.claim.trending_global = txo['trending_global']
|
||||
set_reference(txo_message.claim.channel, extra_row_dict.get(txo['channel_hash']))
|
||||
set_reference(txo_message.claim.repost, extra_row_dict.get(txo['reposted_claim_hash']))
|
||||
txo_message.tx_hash = resolve_result.tx_hash
|
||||
txo_message.nout = resolve_result.position
|
||||
txo_message.height = resolve_result.height
|
||||
txo_message.claim.short_url = resolve_result.short_url
|
||||
txo_message.claim.reposted = 0
|
||||
txo_message.claim.is_controlling = resolve_result.is_controlling
|
||||
txo_message.claim.creation_height = resolve_result.creation_height
|
||||
txo_message.claim.activation_height = resolve_result.activation_height
|
||||
txo_message.claim.expiration_height = resolve_result.expiration_height
|
||||
txo_message.claim.effective_amount = resolve_result.effective_amount
|
||||
txo_message.claim.support_amount = resolve_result.support_amount
|
||||
|
||||
if resolve_result.canonical_url is not None:
|
||||
txo_message.claim.canonical_url = resolve_result.canonical_url
|
||||
if resolve_result.last_take_over_height is not None:
|
||||
txo_message.claim.take_over_height = resolve_result.last_take_over_height
|
||||
if resolve_result.claims_in_channel is not None:
|
||||
txo_message.claim.claims_in_channel = resolve_result.claims_in_channel
|
||||
|
|
|
@ -5,6 +5,7 @@ from struct import pack, unpack
|
|||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from typing import Optional, List, Tuple
|
||||
from prometheus_client import Gauge, Histogram
|
||||
from collections import defaultdict
|
||||
import lbry
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.wallet.server.tx import Tx
|
||||
|
@ -12,8 +13,11 @@ from lbry.wallet.server.db.writer import SQLDB
|
|||
from lbry.wallet.server.daemon import DaemonError
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.util import chunks, class_logger
|
||||
from lbry.crypto.hash import hash160
|
||||
from lbry.wallet.server.leveldb import FlushData
|
||||
from lbry.wallet.transaction import Transaction
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
|
||||
|
||||
from lbry.wallet.server.udp import StatusServer
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
|
@ -185,6 +189,10 @@ class BlockProcessor:
|
|||
self.utxo_cache = {}
|
||||
self.db_deletes = []
|
||||
|
||||
# Claimtrie cache
|
||||
self.claimtrie_stash = []
|
||||
self.undo_claims = []
|
||||
|
||||
# If the lock is successfully acquired, in-memory chain state
|
||||
# is consistent with self.height
|
||||
self.state_lock = asyncio.Lock()
|
||||
|
@ -192,6 +200,12 @@ class BlockProcessor:
|
|||
self.search_cache = {}
|
||||
self.history_cache = {}
|
||||
self.status_server = StatusServer()
|
||||
self.effective_amount_changes = defaultdict(list)
|
||||
self.pending_claims = {}
|
||||
self.pending_claim_txos = {}
|
||||
self.pending_supports = defaultdict(set)
|
||||
self.pending_support_txos = {}
|
||||
self.pending_abandon = set()
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
# Run in a thread to prevent blocking. Shielded so that
|
||||
|
@ -218,9 +232,12 @@ class BlockProcessor:
|
|||
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
|
||||
|
||||
if hprevs == chain:
|
||||
|
||||
start = time.perf_counter()
|
||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||
try:
|
||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||
except:
|
||||
self.logger.exception("advance blocks failed")
|
||||
raise
|
||||
if self.sql:
|
||||
await self.db.search_index.claim_consumer(self.sql.claim_producer())
|
||||
for cache in self.search_cache.values():
|
||||
|
@ -354,8 +371,8 @@ class BlockProcessor:
|
|||
"""The data for a flush. The lock must be taken."""
|
||||
assert self.state_lock.locked()
|
||||
return FlushData(self.height, self.tx_count, self.headers, self.block_hashes,
|
||||
self.block_txs, self.undo_infos, self.utxo_cache,
|
||||
self.db_deletes, self.tip)
|
||||
self.block_txs, self.claimtrie_stash, self.undo_infos, self.utxo_cache,
|
||||
self.db_deletes, self.tip, self.undo_claims)
|
||||
|
||||
async def flush(self, flush_utxos):
|
||||
def flush():
|
||||
|
@ -404,58 +421,316 @@ class BlockProcessor:
|
|||
"""
|
||||
min_height = self.db.min_undo_height(self.daemon.cached_height())
|
||||
height = self.height
|
||||
# print("---------------------------------\nFLUSH\n---------------------------------")
|
||||
|
||||
for block in blocks:
|
||||
height += 1
|
||||
undo_info = self.advance_txs(
|
||||
height, block.transactions, self.coin.electrum_header(block.header, height),
|
||||
self.coin.header_hash(block.header)
|
||||
)
|
||||
# print(f"***********************************\nADVANCE {height}\n***********************************")
|
||||
undo_info, undo_claims = self.advance_block(block, height)
|
||||
if height >= min_height:
|
||||
self.undo_infos.append((undo_info, height))
|
||||
self.undo_claims.append((undo_claims, height))
|
||||
self.db.write_raw_block(block.raw, height)
|
||||
|
||||
for touched_claim_hash, amount_changes in self.effective_amount_changes.items():
|
||||
new_effective_amount = sum(amount_changes)
|
||||
assert new_effective_amount >= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}'
|
||||
self.claimtrie_stash.extend(
|
||||
self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount)
|
||||
)
|
||||
# print("update effective amount to", touched_claim_hash.hex(), new_effective_amount)
|
||||
|
||||
headers = [block.header for block in blocks]
|
||||
self.height = height
|
||||
self.headers.extend(headers)
|
||||
self.tip = self.coin.header_hash(headers[-1])
|
||||
|
||||
self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining)
|
||||
# print("+++++++++++++++++++++++++++++++++++++++++++++\nFLUSHED\n+++++++++++++++++++++++++++++++++++++++++++++")
|
||||
|
||||
self.effective_amount_changes.clear()
|
||||
self.pending_claims.clear()
|
||||
self.pending_claim_txos.clear()
|
||||
self.pending_supports.clear()
|
||||
self.pending_support_txos.clear()
|
||||
self.pending_abandon.clear()
|
||||
|
||||
for cache in self.search_cache.values():
|
||||
cache.clear()
|
||||
self.history_cache.clear()
|
||||
self.notifications.notified_mempool_txs.clear()
|
||||
|
||||
def advance_txs(self, height, txs: List[Tuple[Tx, bytes]], header, block_hash):
|
||||
def _add_claim_or_update(self, txo, script, tx_hash, idx, tx_count, txout, spent_claims):
|
||||
try:
|
||||
claim_name = txo.normalized_name
|
||||
except UnicodeDecodeError:
|
||||
claim_name = ''.join(chr(c) for c in txo.script.values['claim_name'])
|
||||
if script.is_claim_name:
|
||||
claim_hash = hash160(tx_hash + pack('>I', idx))[::-1]
|
||||
# print(f"\tnew lbry://{claim_name}#{claim_hash.hex()} ({tx_count} {txout.value})")
|
||||
else:
|
||||
claim_hash = txo.claim_hash[::-1]
|
||||
|
||||
signing_channel_hash = None
|
||||
channel_claims_count = 0
|
||||
activation_height = 0
|
||||
try:
|
||||
signable = txo.signable
|
||||
except: # google.protobuf.message.DecodeError: Could not parse JSON.
|
||||
signable = None
|
||||
|
||||
if signable and signable.signing_channel_hash:
|
||||
signing_channel_hash = txo.signable.signing_channel_hash[::-1]
|
||||
# if signing_channel_hash in self.pending_claim_txos:
|
||||
# pending_channel = self.pending_claims[self.pending_claim_txos[signing_channel_hash]]
|
||||
# channel_claims_count = pending_channel.
|
||||
|
||||
channel_claims_count = self.db.get_claims_in_channel_count(signing_channel_hash) + 1
|
||||
if script.is_claim_name:
|
||||
support_amount = 0
|
||||
root_tx_num, root_idx = tx_count, idx
|
||||
else:
|
||||
if claim_hash not in spent_claims:
|
||||
print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}")
|
||||
return []
|
||||
support_amount = self.db.get_support_amount(claim_hash)
|
||||
(prev_tx_num, prev_idx, _) = spent_claims.pop(claim_hash)
|
||||
# print(f"\tupdate lbry://{claim_name}#{claim_hash.hex()} {tx_hash[::-1].hex()} {txout.value}")
|
||||
|
||||
if (prev_tx_num, prev_idx) in self.pending_claims:
|
||||
previous_claim = self.pending_claims.pop((prev_tx_num, prev_idx))
|
||||
root_tx_num = previous_claim.root_claim_tx_num
|
||||
root_idx = previous_claim.root_claim_tx_position
|
||||
# prev_amount = previous_claim.amount
|
||||
else:
|
||||
root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount(
|
||||
claim_hash
|
||||
)
|
||||
|
||||
pending = StagedClaimtrieItem(
|
||||
claim_name, claim_hash, txout.value, support_amount + txout.value,
|
||||
activation_height, tx_count, idx, root_tx_num, root_idx,
|
||||
signing_channel_hash, channel_claims_count
|
||||
)
|
||||
|
||||
self.pending_claims[(tx_count, idx)] = pending
|
||||
self.pending_claim_txos[claim_hash] = (tx_count, idx)
|
||||
self.effective_amount_changes[claim_hash].append(txout.value)
|
||||
return pending.get_add_claim_utxo_ops()
|
||||
|
||||
def _add_support(self, txo, txout, idx, tx_count):
|
||||
supported_claim_hash = txo.claim_hash[::-1]
|
||||
|
||||
if supported_claim_hash in self.effective_amount_changes:
|
||||
# print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}")
|
||||
self.effective_amount_changes[supported_claim_hash].append(txout.value)
|
||||
self.pending_supports[supported_claim_hash].add((tx_count, idx))
|
||||
self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value
|
||||
return StagedClaimtrieSupport(
|
||||
supported_claim_hash, tx_count, idx, txout.value
|
||||
).get_add_support_utxo_ops()
|
||||
|
||||
elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon:
|
||||
if self.db.claim_exists(supported_claim_hash):
|
||||
_, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount(
|
||||
supported_claim_hash
|
||||
)
|
||||
starting_amount = self.db.get_effective_amount(supported_claim_hash)
|
||||
if supported_claim_hash not in self.effective_amount_changes:
|
||||
self.effective_amount_changes[supported_claim_hash].append(starting_amount)
|
||||
self.effective_amount_changes[supported_claim_hash].append(txout.value)
|
||||
self.pending_supports[supported_claim_hash].add((tx_count, idx))
|
||||
self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value
|
||||
# print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}")
|
||||
return StagedClaimtrieSupport(
|
||||
supported_claim_hash, tx_count, idx, txout.value
|
||||
).get_add_support_utxo_ops()
|
||||
else:
|
||||
print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}")
|
||||
return []
|
||||
|
||||
def _add_claim_or_support(self, tx_hash, tx_count, idx, txo, txout, script, spent_claims):
|
||||
if script.is_claim_name or script.is_update_claim:
|
||||
return self._add_claim_or_update(txo, script, tx_hash, idx, tx_count, txout, spent_claims)
|
||||
elif script.is_support_claim or script.is_support_claim_data:
|
||||
return self._add_support(txo, txout, idx, tx_count)
|
||||
return []
|
||||
|
||||
def _spend_support(self, txin):
|
||||
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||
|
||||
if (txin_num, txin.prev_idx) in self.pending_support_txos:
|
||||
spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx))
|
||||
self.pending_supports[spent_support].remove((txin_num, txin.prev_idx))
|
||||
else:
|
||||
spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx)
|
||||
if spent_support and support_amount is not None and spent_support not in self.pending_abandon:
|
||||
# print(f"\tspent support for {spent_support.hex()} -{support_amount} ({txin_num}, {txin.prev_idx})")
|
||||
if spent_support not in self.effective_amount_changes:
|
||||
assert spent_support not in self.pending_claims
|
||||
prev_effective_amount = self.db.get_effective_amount(spent_support)
|
||||
self.effective_amount_changes[spent_support].append(prev_effective_amount)
|
||||
self.effective_amount_changes[spent_support].append(-support_amount)
|
||||
return StagedClaimtrieSupport(
|
||||
spent_support, txin_num, txin.prev_idx, support_amount
|
||||
).get_spend_support_txo_ops()
|
||||
return []
|
||||
|
||||
def _spend_claim(self, txin, spent_claims):
|
||||
txin_num = self.db.transaction_num_mapping[txin.prev_hash]
|
||||
if (txin_num, txin.prev_idx) in self.pending_claims:
|
||||
spent = self.pending_claims[(txin_num, txin.prev_idx)]
|
||||
name = spent.name
|
||||
spent_claims[spent.claim_hash] = (txin_num, txin.prev_idx, name)
|
||||
# print(f"spend lbry://{name}#{spent.claim_hash.hex()}")
|
||||
else:
|
||||
spent_claim_hash_and_name = self.db.claim_hash_and_name_from_txo(
|
||||
txin_num, txin.prev_idx
|
||||
)
|
||||
if not spent_claim_hash_and_name: # txo is not a claim
|
||||
return []
|
||||
prev_claim_hash, txi_len_encoded_name = spent_claim_hash_and_name
|
||||
|
||||
prev_signing_hash = self.db.get_channel_for_claim(prev_claim_hash)
|
||||
prev_claims_in_channel_count = None
|
||||
if prev_signing_hash:
|
||||
prev_claims_in_channel_count = self.db.get_claims_in_channel_count(
|
||||
prev_signing_hash
|
||||
)
|
||||
prev_effective_amount = self.db.get_effective_amount(
|
||||
prev_claim_hash
|
||||
)
|
||||
claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash)
|
||||
activation_height = 0
|
||||
spent = StagedClaimtrieItem(
|
||||
name, prev_claim_hash, prev_amount, prev_effective_amount,
|
||||
activation_height, txin_num, txin.prev_idx, claim_root_tx_num,
|
||||
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
|
||||
)
|
||||
spent_claims[prev_claim_hash] = (txin_num, txin.prev_idx, name)
|
||||
# print(f"spend lbry://{spent_claims[prev_claim_hash][2]}#{prev_claim_hash.hex()}")
|
||||
if spent.claim_hash not in self.effective_amount_changes:
|
||||
self.effective_amount_changes[spent.claim_hash].append(spent.effective_amount)
|
||||
self.effective_amount_changes[spent.claim_hash].append(-spent.amount)
|
||||
return spent.get_spend_claim_txo_ops()
|
||||
|
||||
def _spend_claim_or_support(self, txin, spent_claims):
|
||||
spend_claim_ops = self._spend_claim(txin, spent_claims)
|
||||
if spend_claim_ops:
|
||||
return spend_claim_ops
|
||||
return self._spend_support(txin)
|
||||
|
||||
def _abandon(self, spent_claims):
|
||||
# Handle abandoned claims
|
||||
ops = []
|
||||
|
||||
for abandoned_claim_hash, (prev_tx_num, prev_idx, name) in spent_claims.items():
|
||||
# print(f"\tabandon lbry://{name}#{abandoned_claim_hash.hex()} {prev_tx_num} {prev_idx}")
|
||||
|
||||
if (prev_tx_num, prev_idx) in self.pending_claims:
|
||||
pending = self.pending_claims.pop((prev_tx_num, prev_idx))
|
||||
claim_root_tx_num = pending.root_claim_tx_num
|
||||
claim_root_idx = pending.root_claim_tx_position
|
||||
prev_amount = pending.amount
|
||||
prev_signing_hash = pending.signing_hash
|
||||
prev_effective_amount = pending.effective_amount
|
||||
prev_claims_in_channel_count = pending.claims_in_channel_count
|
||||
else:
|
||||
claim_root_tx_num, claim_root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount(
|
||||
abandoned_claim_hash
|
||||
)
|
||||
prev_signing_hash = self.db.get_channel_for_claim(abandoned_claim_hash)
|
||||
prev_claims_in_channel_count = None
|
||||
if prev_signing_hash:
|
||||
prev_claims_in_channel_count = self.db.get_claims_in_channel_count(
|
||||
prev_signing_hash
|
||||
)
|
||||
prev_effective_amount = self.db.get_effective_amount(
|
||||
abandoned_claim_hash
|
||||
)
|
||||
|
||||
for (support_tx_num, support_tx_idx) in self.pending_supports[abandoned_claim_hash]:
|
||||
_, support_amount = self.pending_support_txos.pop((support_tx_num, support_tx_idx))
|
||||
ops.extend(
|
||||
StagedClaimtrieSupport(
|
||||
abandoned_claim_hash, support_tx_num, support_tx_idx, support_amount
|
||||
).get_spend_support_txo_ops()
|
||||
)
|
||||
# print(f"\tremove pending support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}")
|
||||
self.pending_supports[abandoned_claim_hash].clear()
|
||||
self.pending_supports.pop(abandoned_claim_hash)
|
||||
|
||||
for (support_tx_num, support_tx_idx, support_amount) in self.db.get_supports(abandoned_claim_hash):
|
||||
ops.extend(
|
||||
StagedClaimtrieSupport(
|
||||
abandoned_claim_hash, support_tx_num, support_tx_idx, support_amount
|
||||
).get_spend_support_txo_ops()
|
||||
)
|
||||
# print(f"\tremove support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}")
|
||||
|
||||
activation_height = 0
|
||||
if abandoned_claim_hash in self.effective_amount_changes:
|
||||
# print("pop")
|
||||
self.effective_amount_changes.pop(abandoned_claim_hash)
|
||||
self.pending_abandon.add(abandoned_claim_hash)
|
||||
|
||||
# print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}")
|
||||
ops.extend(
|
||||
StagedClaimtrieItem(
|
||||
name, abandoned_claim_hash, prev_amount, prev_effective_amount,
|
||||
activation_height, prev_tx_num, prev_idx, claim_root_tx_num,
|
||||
claim_root_idx, prev_signing_hash, prev_claims_in_channel_count
|
||||
).get_abandon_ops(self.db.db))
|
||||
return ops
|
||||
|
||||
def advance_block(self, block, height: int):
|
||||
from lbry.wallet.transaction import OutputScript, Output
|
||||
|
||||
txs: List[Tuple[Tx, bytes]] = block.transactions
|
||||
# header = self.coin.electrum_header(block.header, height)
|
||||
block_hash = self.coin.header_hash(block.header)
|
||||
|
||||
self.block_hashes.append(block_hash)
|
||||
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
|
||||
|
||||
first_tx_num = self.tx_count
|
||||
undo_info = []
|
||||
hashXs_by_tx = []
|
||||
tx_num = self.tx_count
|
||||
tx_count = self.tx_count
|
||||
|
||||
# Use local vars for speed in the loops
|
||||
put_utxo = self.utxo_cache.__setitem__
|
||||
claimtrie_stash = []
|
||||
claimtrie_stash_extend = claimtrie_stash.extend
|
||||
spend_utxo = self.spend_utxo
|
||||
undo_info_append = undo_info.append
|
||||
update_touched = self.touched.update
|
||||
append_hashX_by_tx = hashXs_by_tx.append
|
||||
hashX_from_script = self.coin.hashX_from_script
|
||||
|
||||
unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()}
|
||||
|
||||
for tx, tx_hash in txs:
|
||||
hashXs = []
|
||||
# print(f"{tx_hash[::-1].hex()} @ {height}")
|
||||
spent_claims = {}
|
||||
|
||||
hashXs = [] # hashXs touched by spent inputs/rx outputs
|
||||
append_hashX = hashXs.append
|
||||
tx_numb = pack('<I', tx_num)
|
||||
tx_numb = pack('<I', tx_count)
|
||||
|
||||
# Spend the inputs
|
||||
for txin in tx.inputs:
|
||||
if txin.is_generation():
|
||||
continue
|
||||
# spend utxo for address histories
|
||||
cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
|
||||
undo_info_append(cache_value)
|
||||
append_hashX(cache_value[:-12])
|
||||
|
||||
spend_claim_or_support_ops = self._spend_claim_or_support(txin, spent_claims)
|
||||
if spend_claim_or_support_ops:
|
||||
claimtrie_stash_extend(spend_claim_or_support_ops)
|
||||
|
||||
# Add the new UTXOs
|
||||
for idx, txout in enumerate(tx.outputs):
|
||||
# Get the hashX. Ignore unspendable outputs
|
||||
|
@ -464,13 +739,29 @@ class BlockProcessor:
|
|||
append_hashX(hashX)
|
||||
put_utxo(tx_hash + pack('<H', idx), hashX + tx_numb + pack('<Q', txout.value))
|
||||
|
||||
# add claim/support txo
|
||||
script = OutputScript(txout.pk_script)
|
||||
script.parse()
|
||||
txo = Output(txout.value, script)
|
||||
|
||||
claim_or_support_ops = self._add_claim_or_support(
|
||||
tx_hash, tx_count, idx, txo, txout, script, spent_claims
|
||||
)
|
||||
if claim_or_support_ops:
|
||||
claimtrie_stash_extend(claim_or_support_ops)
|
||||
|
||||
# Handle abandoned claims
|
||||
abandon_ops = self._abandon(spent_claims)
|
||||
if abandon_ops:
|
||||
claimtrie_stash_extend(abandon_ops)
|
||||
|
||||
append_hashX_by_tx(hashXs)
|
||||
update_touched(hashXs)
|
||||
self.db.total_transactions.append(tx_hash)
|
||||
tx_num += 1
|
||||
self.db.transaction_num_mapping[tx_hash] = tx_count
|
||||
tx_count += 1
|
||||
|
||||
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
first_tx_num = self.tx_count
|
||||
_unflushed = self.db.hist_unflushed
|
||||
_count = 0
|
||||
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
|
@ -478,10 +769,24 @@ class BlockProcessor:
|
|||
_unflushed[_hashX].append(_tx_num)
|
||||
_count += len(_hashXs)
|
||||
self.db.hist_unflushed_count += _count
|
||||
self.tx_count = tx_num
|
||||
self.db.tx_counts.append(tx_num)
|
||||
|
||||
return undo_info
|
||||
self.tx_count = tx_count
|
||||
self.db.tx_counts.append(self.tx_count)
|
||||
|
||||
# for touched_claim_hash, amount_changes in self.effective_amount_changes.items():
|
||||
# new_effective_amount = sum(amount_changes)
|
||||
# assert new_effective_amount >= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}'
|
||||
# if touched_claim_hash not in unchanged_effective_amounts or unchanged_effective_amounts[touched_claim_hash] != new_effective_amount:
|
||||
# claimtrie_stash_extend(
|
||||
# self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount)
|
||||
# )
|
||||
# # print("update effective amount to", touched_claim_hash.hex(), new_effective_amount)
|
||||
|
||||
undo_claims = b''.join(op.invert().pack() for op in claimtrie_stash)
|
||||
self.claimtrie_stash.extend(claimtrie_stash)
|
||||
# print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash)))
|
||||
|
||||
return undo_info, undo_claims
|
||||
|
||||
def backup_blocks(self, raw_blocks):
|
||||
"""Backup the raw blocks and flush.
|
||||
|
@ -495,6 +800,7 @@ class BlockProcessor:
|
|||
coin = self.coin
|
||||
for raw_block in raw_blocks:
|
||||
self.logger.info("backup block %i", self.height)
|
||||
print("backup", self.height)
|
||||
# Check and update self.tip
|
||||
block = coin.block(raw_block, self.height)
|
||||
header_hash = coin.header_hash(block.header)
|
||||
|
@ -511,13 +817,14 @@ class BlockProcessor:
|
|||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
|
||||
self.db.flush_backup(self.flush_data(), self.touched)
|
||||
self.logger.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
def backup_txs(self, txs):
|
||||
# Prevout values, in order down the block (coinbase first if present)
|
||||
# undo_info is in reverse block order
|
||||
undo_info = self.db.read_undo_info(self.height)
|
||||
undo_info, undo_claims = self.db.read_undo_info(self.height)
|
||||
if undo_info is None:
|
||||
raise ChainError(f'no undo information found for height {self.height:,d}')
|
||||
n = len(undo_info)
|
||||
|
@ -548,6 +855,7 @@ class BlockProcessor:
|
|||
|
||||
assert n == 0
|
||||
self.tx_count -= len(txs)
|
||||
self.undo_claims.append((undo_claims, self.height))
|
||||
|
||||
"""An in-memory UTXO cache, representing all changes to UTXO state
|
||||
since the last DB flush.
|
||||
|
@ -610,6 +918,7 @@ class BlockProcessor:
|
|||
all UTXOs so not finding one indicates a logic error or DB
|
||||
corruption.
|
||||
"""
|
||||
|
||||
# Fast track is it being in the cache
|
||||
idx_packed = pack('<H', tx_idx)
|
||||
cache_value = self.utxo_cache.pop(tx_hash + idx_packed, None)
|
||||
|
@ -617,12 +926,10 @@ class BlockProcessor:
|
|||
return cache_value
|
||||
|
||||
# Spend it from the DB.
|
||||
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
candidates = {db_key: hashX for db_key, hashX
|
||||
in self.db.db.iterator(prefix=prefix)}
|
||||
prefix = DB_PREFIXES.HASHX_UTXO_PREFIX.value + tx_hash[:4] + idx_packed
|
||||
candidates = {db_key: hashX for db_key, hashX in self.db.db.iterator(prefix=prefix)}
|
||||
|
||||
for hdb_key, hashX in candidates.items():
|
||||
tx_num_packed = hdb_key[-4:]
|
||||
|
@ -640,7 +947,7 @@ class BlockProcessor:
|
|||
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
udb_key = b'u' + hashX + hdb_key[-6:]
|
||||
udb_key = DB_PREFIXES.UTXO_PREFIX.value + hashX + hdb_key[-6:]
|
||||
utxo_value_packed = self.db.db.get(udb_key)
|
||||
if utxo_value_packed is None:
|
||||
self.logger.warning(
|
||||
|
@ -802,8 +1109,8 @@ class LBRYBlockProcessor(BlockProcessor):
|
|||
self.prefetcher.polling_delay = 0.5
|
||||
self.should_validate_signatures = self.env.boolean('VALIDATE_CLAIM_SIGNATURES', False)
|
||||
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
|
||||
# self.sql: SQLDB = self.db.sql
|
||||
# self.timer = Timer('BlockProcessor')
|
||||
self.sql: SQLDB = self.db.sql
|
||||
self.timer = Timer('BlockProcessor')
|
||||
|
||||
def advance_blocks(self, blocks):
|
||||
if self.sql:
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
import enum
|
||||
|
||||
|
||||
class DB_PREFIXES(enum.Enum):
|
||||
claim_to_support = b'K'
|
||||
support_to_claim = b'L'
|
||||
|
||||
claim_to_txo = b'E'
|
||||
txo_to_claim = b'G'
|
||||
|
||||
claim_to_channel = b'I'
|
||||
channel_to_claim = b'J'
|
||||
|
||||
claim_short_id_prefix = b'F'
|
||||
claim_effective_amount_prefix = b'D'
|
||||
|
||||
undo_claimtrie = b'M'
|
||||
|
||||
HISTORY_PREFIX = b'A'
|
||||
TX_PREFIX = b'B'
|
||||
BLOCK_HASH_PREFIX = b'C'
|
||||
HEADER_PREFIX = b'H'
|
||||
TX_NUM_PREFIX = b'N'
|
||||
TX_COUNT_PREFIX = b'T'
|
||||
UNDO_PREFIX = b'U'
|
||||
TX_HASH_PREFIX = b'X'
|
||||
|
||||
HASHX_UTXO_PREFIX = b'h'
|
||||
HIST_STATE = b'state-hist'
|
||||
UTXO_STATE = b'state-utxo'
|
||||
UTXO_PREFIX = b'u'
|
||||
HASHX_HISTORY_PREFIX = b'x'
|
167
lbry/wallet/server/db/claimtrie.py
Normal file
167
lbry/wallet/server/db/claimtrie.py
Normal file
|
@ -0,0 +1,167 @@
|
|||
import typing
|
||||
from typing import Optional
|
||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.prefixes import Prefixes
|
||||
|
||||
|
||||
def length_encoded_name(name: str) -> bytes:
|
||||
encoded = name.encode('utf-8')
|
||||
return len(encoded).to_bytes(2, byteorder='big') + encoded
|
||||
|
||||
|
||||
class StagedClaimtrieSupport(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
tx_num: int
|
||||
position: int
|
||||
amount: int
|
||||
|
||||
def _get_add_remove_support_utxo_ops(self, add=True):
|
||||
"""
|
||||
get a list of revertable operations to add or spend a support txo to the key: value database
|
||||
|
||||
:param add: if true use RevertablePut operations, otherwise use RevertableDelete
|
||||
:return:
|
||||
"""
|
||||
op = RevertablePut if add else RevertableDelete
|
||||
return [
|
||||
op(
|
||||
*Prefixes.claim_to_support.pack_item(self.claim_hash, self.tx_num, self.position, self.amount)
|
||||
),
|
||||
op(
|
||||
*Prefixes.support_to_claim.pack_item(self.tx_num, self.position, self.claim_hash)
|
||||
)
|
||||
]
|
||||
|
||||
def get_add_support_utxo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_support_utxo_ops(add=True)
|
||||
|
||||
def get_spend_support_txo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_support_utxo_ops(add=False)
|
||||
|
||||
|
||||
def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_effective_amount: int, tx_num: int,
|
||||
position: int, root_tx_num: int, root_position: int, claim_hash: bytes,
|
||||
signing_hash: Optional[bytes] = None,
|
||||
claims_in_channel_count: Optional[int] = None):
|
||||
assert root_position != root_tx_num, f"{tx_num} {position} {root_tx_num} {root_tx_num}"
|
||||
ops = [
|
||||
RevertableDelete(
|
||||
*Prefixes.claim_effective_amount.pack_item(
|
||||
name, prev_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position
|
||||
)
|
||||
),
|
||||
RevertablePut(
|
||||
*Prefixes.claim_effective_amount.pack_item(
|
||||
name, new_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position
|
||||
)
|
||||
)
|
||||
]
|
||||
if signing_hash:
|
||||
ops.extend([
|
||||
RevertableDelete(
|
||||
*Prefixes.channel_to_claim.pack_item(
|
||||
signing_hash, name, prev_effective_amount, tx_num, position, claim_hash, claims_in_channel_count
|
||||
)
|
||||
),
|
||||
RevertablePut(
|
||||
*Prefixes.channel_to_claim.pack_item(
|
||||
signing_hash, name, new_effective_amount, tx_num, position, claim_hash, claims_in_channel_count
|
||||
)
|
||||
)
|
||||
])
|
||||
return ops
|
||||
|
||||
|
||||
class StagedClaimtrieItem(typing.NamedTuple):
|
||||
name: str
|
||||
claim_hash: bytes
|
||||
amount: int
|
||||
effective_amount: int
|
||||
activation_height: int
|
||||
tx_num: int
|
||||
position: int
|
||||
root_claim_tx_num: int
|
||||
root_claim_tx_position: int
|
||||
signing_hash: Optional[bytes]
|
||||
claims_in_channel_count: Optional[int]
|
||||
|
||||
@property
|
||||
def is_update(self) -> bool:
|
||||
return (self.tx_num, self.position) != (self.root_claim_tx_num, self.root_claim_tx_position)
|
||||
|
||||
def _get_add_remove_claim_utxo_ops(self, add=True):
|
||||
"""
|
||||
get a list of revertable operations to add or spend a claim txo to the key: value database
|
||||
|
||||
:param add: if true use RevertablePut operations, otherwise use RevertableDelete
|
||||
:return:
|
||||
"""
|
||||
op = RevertablePut if add else RevertableDelete
|
||||
ops = [
|
||||
# url resolution by effective amount
|
||||
op(
|
||||
*Prefixes.claim_effective_amount.pack_item(
|
||||
self.name, self.effective_amount, self.tx_num, self.position, self.claim_hash,
|
||||
self.root_claim_tx_num, self.root_claim_tx_position
|
||||
)
|
||||
),
|
||||
# claim tip by claim hash
|
||||
op(
|
||||
*Prefixes.claim_to_txo.pack_item(
|
||||
self.claim_hash, self.tx_num, self.position, self.root_claim_tx_num, self.root_claim_tx_position,
|
||||
self.amount, self.name
|
||||
)
|
||||
),
|
||||
# short url resolution
|
||||
op(
|
||||
*Prefixes.claim_short_id.pack_item(
|
||||
self.name, self.claim_hash, self.root_claim_tx_num, self.root_claim_tx_position, self.tx_num,
|
||||
self.position
|
||||
)
|
||||
),
|
||||
# claim hash by txo
|
||||
op(
|
||||
*Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.name)
|
||||
)
|
||||
]
|
||||
if self.signing_hash and self.claims_in_channel_count is not None:
|
||||
# claims_in_channel_count can be none if the channel doesnt exist
|
||||
ops.extend([
|
||||
# channel by stream
|
||||
op(
|
||||
*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash)
|
||||
),
|
||||
# stream by channel
|
||||
op(
|
||||
*Prefixes.channel_to_claim.pack_item(
|
||||
self.signing_hash, self.name, self.effective_amount, self.tx_num, self.position,
|
||||
self.claim_hash, self.claims_in_channel_count
|
||||
)
|
||||
)
|
||||
])
|
||||
return ops
|
||||
|
||||
def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_claim_utxo_ops(add=True)
|
||||
|
||||
def get_spend_claim_txo_ops(self) -> typing.List[RevertableOp]:
|
||||
return self._get_add_remove_claim_utxo_ops(add=False)
|
||||
|
||||
def get_invalidate_channel_ops(self, db) -> typing.List[RevertableOp]:
|
||||
if not self.signing_hash:
|
||||
return []
|
||||
return [
|
||||
RevertableDelete(*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash))
|
||||
] + delete_prefix(db, DB_PREFIXES.channel_to_claim.value + self.signing_hash)
|
||||
|
||||
def get_abandon_ops(self, db) -> typing.List[RevertableOp]:
|
||||
packed_name = length_encoded_name(self.name)
|
||||
delete_short_id_ops = delete_prefix(
|
||||
db, DB_PREFIXES.claim_short_id_prefix.value + packed_name + self.claim_hash
|
||||
)
|
||||
delete_claim_ops = delete_prefix(db, DB_PREFIXES.claim_to_txo.value + self.claim_hash)
|
||||
delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash)
|
||||
invalidate_channel_ops = self.get_invalidate_channel_ops(db)
|
||||
return delete_short_id_ops + delete_claim_ops + delete_supports_ops + invalidate_channel_ops
|
||||
|
391
lbry/wallet/server/db/prefixes.py
Normal file
391
lbry/wallet/server/db/prefixes.py
Normal file
|
@ -0,0 +1,391 @@
|
|||
import typing
|
||||
import struct
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
|
||||
|
||||
def length_encoded_name(name: str) -> bytes:
|
||||
encoded = name.encode('utf-8')
|
||||
return len(encoded).to_bytes(2, byteorder='big') + encoded
|
||||
|
||||
|
||||
class PrefixRow:
|
||||
prefix: bytes
|
||||
key_struct: struct.Struct
|
||||
value_struct: struct.Struct
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, *args) -> bytes:
|
||||
return cls.prefix + cls.key_struct.pack(*args)
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, *args) -> bytes:
|
||||
return cls.value_struct.pack(*args)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes):
|
||||
assert key[:1] == cls.prefix
|
||||
return cls.key_struct.unpack(key[1:])
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes):
|
||||
return cls.value_struct.unpack(data)
|
||||
|
||||
@classmethod
|
||||
def unpack_item(cls, key: bytes, value: bytes):
|
||||
return cls.unpack_key(key), cls.unpack_value(value)
|
||||
|
||||
|
||||
class EffectiveAmountKey(typing.NamedTuple):
|
||||
name: str
|
||||
effective_amount: int
|
||||
tx_num: int
|
||||
position: int
|
||||
|
||||
|
||||
class EffectiveAmountValue(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
root_tx_num: int
|
||||
root_position: int
|
||||
|
||||
|
||||
class ClaimToTXOKey(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
tx_num: int
|
||||
position: int
|
||||
|
||||
|
||||
class ClaimToTXOValue(typing.NamedTuple):
|
||||
root_tx_num: int
|
||||
root_position: int
|
||||
amount: int
|
||||
name: str
|
||||
|
||||
|
||||
class TXOToClaimKey(typing.NamedTuple):
|
||||
tx_num: int
|
||||
position: int
|
||||
|
||||
|
||||
class TXOToClaimValue(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
name: str
|
||||
|
||||
|
||||
class ClaimShortIDKey(typing.NamedTuple):
|
||||
name: str
|
||||
claim_hash: bytes
|
||||
root_tx_num: int
|
||||
root_position: int
|
||||
|
||||
|
||||
class ClaimShortIDValue(typing.NamedTuple):
|
||||
tx_num: int
|
||||
position: int
|
||||
|
||||
|
||||
class ClaimToChannelKey(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
|
||||
|
||||
class ClaimToChannelValue(typing.NamedTuple):
|
||||
signing_hash: bytes
|
||||
|
||||
|
||||
class ChannelToClaimKey(typing.NamedTuple):
|
||||
signing_hash: bytes
|
||||
name: str
|
||||
effective_amount: int
|
||||
tx_num: int
|
||||
position: int
|
||||
|
||||
|
||||
class ChannelToClaimValue(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
claims_in_channel: int
|
||||
|
||||
|
||||
class ClaimToSupportKey(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
tx_num: int
|
||||
position: int
|
||||
|
||||
|
||||
class ClaimToSupportValue(typing.NamedTuple):
|
||||
amount: int
|
||||
|
||||
|
||||
class SupportToClaimKey(typing.NamedTuple):
|
||||
tx_num: int
|
||||
position: int
|
||||
|
||||
|
||||
class SupportToClaimValue(typing.NamedTuple):
|
||||
claim_hash: bytes
|
||||
|
||||
|
||||
class EffectiveAmountPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.claim_effective_amount_prefix.value
|
||||
key_struct = struct.Struct(b'>QLH')
|
||||
value_struct = struct.Struct(b'>20sLH')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int):
|
||||
return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(
|
||||
0xffffffffffffffff - effective_amount, tx_num, position
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> EffectiveAmountKey:
|
||||
assert key[:1] == cls.prefix
|
||||
name_len = int.from_bytes(key[1:3], byteorder='big')
|
||||
name = key[3:3 + name_len].decode()
|
||||
ones_comp_effective_amount, tx_num, position = cls.key_struct.unpack(key[3 + name_len:])
|
||||
return EffectiveAmountKey(
|
||||
name, 0xffffffffffffffff - ones_comp_effective_amount, tx_num, position
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> EffectiveAmountValue:
|
||||
return EffectiveAmountValue(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, claim_hash: bytes, root_tx_num: int, root_position: int) -> bytes:
|
||||
return super().pack_value(claim_hash, root_tx_num, root_position)
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes,
|
||||
root_tx_num: int, root_position: int):
|
||||
return cls.pack_key(name, effective_amount, tx_num, position), \
|
||||
cls.pack_value(claim_hash, root_tx_num, root_position)
|
||||
|
||||
|
||||
class ClaimToTXOPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.claim_to_txo.value
|
||||
key_struct = struct.Struct(b'>20sLH')
|
||||
value_struct = struct.Struct(b'>LHQ')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, claim_hash: bytes, tx_num: int, position: int):
|
||||
return super().pack_key(
|
||||
claim_hash, 0xffffffff - tx_num, 0xffff - position
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> ClaimToTXOKey:
|
||||
assert key[:1] == cls.prefix
|
||||
claim_hash, ones_comp_tx_num, ones_comp_position = cls.key_struct.unpack(key[1:])
|
||||
return ClaimToTXOKey(
|
||||
claim_hash, 0xffffffff - ones_comp_tx_num, 0xffff - ones_comp_position
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) ->ClaimToTXOValue:
|
||||
root_tx_num, root_position, amount = cls.value_struct.unpack(data[:14])
|
||||
name_len = int.from_bytes(data[14:16], byteorder='big')
|
||||
name = data[16:16 + name_len].decode()
|
||||
return ClaimToTXOValue(root_tx_num, root_position, amount, name)
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, root_tx_num: int, root_position: int, amount: int, name: str) -> bytes:
|
||||
return cls.value_struct.pack(root_tx_num, root_position, amount) + length_encoded_name(name)
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int,
|
||||
amount: int, name: str):
|
||||
return cls.pack_key(claim_hash, tx_num, position), \
|
||||
cls.pack_value(root_tx_num, root_position, amount, name)
|
||||
|
||||
|
||||
class TXOToClaimPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.txo_to_claim.value
|
||||
key_struct = struct.Struct(b'>LH')
|
||||
value_struct = struct.Struct(b'>20s')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, tx_num: int, position: int):
|
||||
return super().pack_key(tx_num, position)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> TXOToClaimKey:
|
||||
return TXOToClaimKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> TXOToClaimValue:
|
||||
claim_hash, = cls.value_struct.unpack(data[:20])
|
||||
name_len = int.from_bytes(data[20:22], byteorder='big')
|
||||
name = data[22:22 + name_len].decode()
|
||||
return TXOToClaimValue(claim_hash, name)
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, claim_hash: bytes, name: str) -> bytes:
|
||||
return cls.value_struct.pack(claim_hash) + length_encoded_name(name)
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, tx_num: int, position: int, claim_hash: bytes, name: str):
|
||||
return cls.pack_key(tx_num, position), \
|
||||
cls.pack_value(claim_hash, name)
|
||||
|
||||
|
||||
class ClaimShortIDPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.claim_short_id_prefix.value
|
||||
key_struct = struct.Struct(b'>20sLH')
|
||||
value_struct = struct.Struct(b'>LH')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int):
|
||||
return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(claim_hash, root_tx_num, root_position)
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, tx_num: int, position: int):
|
||||
return super().pack_value(tx_num, position)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> ClaimShortIDKey:
|
||||
assert key[:1] == cls.prefix
|
||||
name_len = int.from_bytes(key[1:3], byteorder='big')
|
||||
name = key[3:3 + name_len].decode()
|
||||
return ClaimShortIDKey(name, *cls.key_struct.unpack(key[3 + name_len:]))
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> ClaimShortIDValue:
|
||||
return ClaimShortIDValue(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int,
|
||||
tx_num: int, position: int):
|
||||
return cls.pack_key(name, claim_hash, root_tx_num, root_position), \
|
||||
cls.pack_value(tx_num, position)
|
||||
|
||||
|
||||
class ClaimToChannelPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.claim_to_channel.value
|
||||
key_struct = struct.Struct(b'>20s')
|
||||
value_struct = struct.Struct(b'>20s')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, claim_hash: bytes):
|
||||
return super().pack_key(claim_hash)
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, signing_hash: bytes):
|
||||
return super().pack_value(signing_hash)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> ClaimToChannelKey:
|
||||
return ClaimToChannelKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> ClaimToChannelValue:
|
||||
return ClaimToChannelValue(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, claim_hash: bytes, signing_hash: bytes):
|
||||
return cls.pack_key(claim_hash), cls.pack_value(signing_hash)
|
||||
|
||||
|
||||
class ChannelToClaimPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.channel_to_claim.value
|
||||
key_struct = struct.Struct(b'>QLH')
|
||||
value_struct = struct.Struct(b'>20sL')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, signing_hash: bytes, name: str, effective_amount: int, tx_num: int, position: int):
|
||||
return cls.prefix + signing_hash + length_encoded_name(name) + cls.key_struct.pack(
|
||||
0xffffffffffffffff - effective_amount, tx_num, position
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> ChannelToClaimKey:
|
||||
assert key[:1] == cls.prefix
|
||||
signing_hash = key[1:21]
|
||||
name_len = int.from_bytes(key[21:23], byteorder='big')
|
||||
name = key[23:23 + name_len].decode()
|
||||
ones_comp_effective_amount, tx_num, position = cls.key_struct.unpack(key[23 + name_len:])
|
||||
return ChannelToClaimKey(
|
||||
signing_hash, name, 0xffffffffffffffff - ones_comp_effective_amount, tx_num, position
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, claim_hash: bytes, claims_in_channel: int) -> bytes:
|
||||
return super().pack_value(claim_hash, claims_in_channel)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> ChannelToClaimValue:
|
||||
return ChannelToClaimValue(*cls.value_struct.unpack(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, signing_hash: bytes, name: str, effective_amount: int, tx_num: int, position: int,
|
||||
claim_hash: bytes, claims_in_channel: int):
|
||||
return cls.pack_key(signing_hash, name, effective_amount, tx_num, position), \
|
||||
cls.pack_value(claim_hash, claims_in_channel)
|
||||
|
||||
|
||||
class ClaimToSupportPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.claim_to_support.value
|
||||
key_struct = struct.Struct(b'>20sLH')
|
||||
value_struct = struct.Struct(b'>Q')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, claim_hash: bytes, tx_num: int, position: int):
|
||||
return super().pack_key(claim_hash, tx_num, position)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> ClaimToSupportKey:
|
||||
return ClaimToSupportKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, amount: int) -> bytes:
|
||||
return super().pack_value(amount)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> ClaimToSupportValue:
|
||||
return ClaimToSupportValue(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, amount: int):
|
||||
return cls.pack_key(claim_hash, tx_num, position), \
|
||||
cls.pack_value(amount)
|
||||
|
||||
|
||||
class SupportToClaimPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.support_to_claim.value
|
||||
key_struct = struct.Struct(b'>LH')
|
||||
value_struct = struct.Struct(b'>20s')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, tx_num: int, position: int):
|
||||
return super().pack_key(tx_num, position)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> SupportToClaimKey:
|
||||
return SupportToClaimKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, claim_hash: bytes) -> bytes:
|
||||
return super().pack_value(claim_hash)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> SupportToClaimValue:
|
||||
return SupportToClaimValue(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, tx_num: int, position: int, claim_hash: bytes):
|
||||
return cls.pack_key(tx_num, position), \
|
||||
cls.pack_value(claim_hash)
|
||||
|
||||
|
||||
class Prefixes:
|
||||
claim_to_support = ClaimToSupportPrefixRow
|
||||
support_to_claim = SupportToClaimPrefixRow
|
||||
|
||||
claim_to_txo = ClaimToTXOPrefixRow
|
||||
txo_to_claim = TXOToClaimPrefixRow
|
||||
|
||||
claim_to_channel = ClaimToChannelPrefixRow
|
||||
channel_to_claim = ChannelToClaimPrefixRow
|
||||
|
||||
claim_short_id = ClaimShortIDPrefixRow
|
||||
|
||||
claim_effective_amount = EffectiveAmountPrefixRow
|
||||
|
||||
undo_claimtrie = b'M'
|
78
lbry/wallet/server/db/revertable.py
Normal file
78
lbry/wallet/server/db/revertable.py
Normal file
|
@ -0,0 +1,78 @@
|
|||
import struct
|
||||
from typing import Tuple, List
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
|
||||
_OP_STRUCT = struct.Struct('>BHH')
|
||||
|
||||
|
||||
class RevertableOp:
|
||||
__slots__ = [
|
||||
'key',
|
||||
'value',
|
||||
]
|
||||
is_put = 0
|
||||
|
||||
def __init__(self, key: bytes, value: bytes):
|
||||
self.key = key
|
||||
self.value = value
|
||||
|
||||
def invert(self) -> 'RevertableOp':
|
||||
raise NotImplementedError()
|
||||
|
||||
def pack(self) -> bytes:
|
||||
"""
|
||||
Serialize to bytes
|
||||
"""
|
||||
return struct.pack(
|
||||
f'>BHH{len(self.key)}s{len(self.value)}s', self.is_put, len(self.key), len(self.value), self.key,
|
||||
self.value
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack(cls, packed: bytes) -> Tuple['RevertableOp', bytes]:
|
||||
"""
|
||||
Deserialize from bytes
|
||||
|
||||
:param packed: bytes containing at least one packed revertable op
|
||||
:return: tuple of the deserialized op (a put or a delete) and the remaining serialized bytes
|
||||
"""
|
||||
is_put, key_len, val_len = _OP_STRUCT.unpack(packed[:5])
|
||||
key = packed[5:5 + key_len]
|
||||
value = packed[5 + key_len:5 + key_len + val_len]
|
||||
if is_put == 1:
|
||||
return RevertablePut(key, value), packed[5 + key_len + val_len:]
|
||||
return RevertableDelete(key, value), packed[5 + key_len + val_len:]
|
||||
|
||||
@classmethod
|
||||
def unpack_stack(cls, packed: bytes) -> List['RevertableOp']:
|
||||
"""
|
||||
Deserialize multiple from bytes
|
||||
"""
|
||||
ops = []
|
||||
while packed:
|
||||
op, packed = cls.unpack(packed)
|
||||
ops.append(op)
|
||||
return ops
|
||||
|
||||
def __eq__(self, other: 'RevertableOp') -> bool:
|
||||
return (self.is_put, self.key, self.value) == (other.is_put, other.key, other.value)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: " \
|
||||
f"{self.key[1:].hex()} | {self.value.hex()}"
|
||||
|
||||
|
||||
class RevertableDelete(RevertableOp):
|
||||
def invert(self):
|
||||
return RevertablePut(self.key, self.value)
|
||||
|
||||
|
||||
class RevertablePut(RevertableOp):
|
||||
is_put = 1
|
||||
|
||||
def invert(self):
|
||||
return RevertableDelete(self.key, self.value)
|
||||
|
||||
|
||||
def delete_prefix(db: 'plyvel.DB', prefix: bytes) -> List['RevertableDelete']:
|
||||
return [RevertableDelete(k, v) for k, v in db.iterator(prefix=prefix)]
|
|
@ -36,6 +36,7 @@ _sha512 = hashlib.sha512
|
|||
_new_hash = hashlib.new
|
||||
_new_hmac = hmac.new
|
||||
HASHX_LEN = 11
|
||||
CLAIM_HASH_LEN = 20
|
||||
|
||||
|
||||
def sha256(x):
|
||||
|
|
|
@ -15,9 +15,9 @@ import ast
|
|||
import base64
|
||||
import os
|
||||
import time
|
||||
import zlib
|
||||
import typing
|
||||
from typing import Optional, List, Tuple, Iterable
|
||||
import struct
|
||||
from typing import Optional, Iterable
|
||||
from functools import partial
|
||||
from asyncio import sleep
|
||||
from bisect import bisect_right, bisect_left
|
||||
|
@ -27,14 +27,24 @@ from struct import pack, unpack
|
|||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
import attr
|
||||
from lbry.utils import LRUCacheWithMetrics
|
||||
from lbry.schema.url import URL
|
||||
from lbry.wallet.server import util
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN
|
||||
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
||||
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
|
||||
from lbry.wallet.server.storage import db_class
|
||||
|
||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
from lbry.wallet.server.db.prefixes import Prefixes
|
||||
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name
|
||||
|
||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||
|
||||
TXO_STRUCT = struct.Struct(b'>LH')
|
||||
TXO_STRUCT_unpack = TXO_STRUCT.unpack
|
||||
TXO_STRUCT_pack = TXO_STRUCT.pack
|
||||
|
||||
|
||||
HISTORY_PREFIX = b'A'
|
||||
TX_PREFIX = b'B'
|
||||
BLOCK_HASH_PREFIX = b'C'
|
||||
|
@ -58,11 +68,34 @@ class FlushData:
|
|||
headers = attr.ib()
|
||||
block_hashes = attr.ib()
|
||||
block_txs = attr.ib()
|
||||
claimtrie_stash = attr.ib()
|
||||
# The following are flushed to the UTXO DB if undo_infos is not None
|
||||
undo_infos = attr.ib()
|
||||
adds = attr.ib()
|
||||
deletes = attr.ib()
|
||||
tip = attr.ib()
|
||||
undo_claimtrie = attr.ib()
|
||||
|
||||
|
||||
class ResolveResult(typing.NamedTuple):
|
||||
name: str
|
||||
claim_hash: bytes
|
||||
tx_num: int
|
||||
position: int
|
||||
tx_hash: bytes
|
||||
height: int
|
||||
short_url: str
|
||||
is_controlling: bool
|
||||
canonical_url: str
|
||||
creation_height: int
|
||||
activation_height: int
|
||||
expiration_height: int
|
||||
effective_amount: int
|
||||
support_amount: int
|
||||
last_take_over_height: Optional[int]
|
||||
claims_in_channel: Optional[int]
|
||||
channel_hash: Optional[bytes]
|
||||
reposted_claim_hash: Optional[bytes]
|
||||
|
||||
|
||||
class LevelDB:
|
||||
|
@ -73,7 +106,7 @@ class LevelDB:
|
|||
"""
|
||||
|
||||
DB_VERSIONS = [6]
|
||||
HIST_DB_VERSIONS = [0]
|
||||
HIST_DB_VERSIONS = [0, 6]
|
||||
|
||||
class DBError(Exception):
|
||||
"""Raised on general DB errors generally indicating corruption."""
|
||||
|
@ -113,6 +146,225 @@ class LevelDB:
|
|||
self.total_transactions = None
|
||||
self.transaction_num_mapping = {}
|
||||
|
||||
def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int):
|
||||
claim_hash_and_name = self.db.get(
|
||||
DB_PREFIXES.txo_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx)
|
||||
)
|
||||
if not claim_hash_and_name:
|
||||
return
|
||||
return claim_hash_and_name[:CLAIM_HASH_LEN], claim_hash_and_name[CLAIM_HASH_LEN:]
|
||||
|
||||
def get_supported_claim_from_txo(self, tx_num, tx_idx: int):
|
||||
supported_claim_hash = self.db.get(
|
||||
DB_PREFIXES.support_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx)
|
||||
)
|
||||
if supported_claim_hash:
|
||||
packed_support_amount = self.db.get(
|
||||
Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, tx_idx)
|
||||
)
|
||||
if packed_support_amount is not None:
|
||||
return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount
|
||||
return None, None
|
||||
|
||||
def get_support_amount(self, claim_hash: bytes):
|
||||
total = 0
|
||||
for packed in self.db.iterator(prefix=DB_PREFIXES.claim_to_support.value + claim_hash, include_key=False):
|
||||
total += Prefixes.claim_to_support.unpack_value(packed).amount
|
||||
return total
|
||||
|
||||
def get_supports(self, claim_hash: bytes):
|
||||
supports = []
|
||||
for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_support.value + claim_hash):
|
||||
unpacked_k = Prefixes.claim_to_support.unpack_key(k)
|
||||
unpacked_v = Prefixes.claim_to_support.unpack_value(v)
|
||||
supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount))
|
||||
|
||||
return supports
|
||||
|
||||
def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, name: str, root_tx_num: int,
|
||||
root_position: int) -> ResolveResult:
|
||||
tx_hash = self.total_transactions[tx_num]
|
||||
height = bisect_right(self.tx_counts, tx_num)
|
||||
created_height = bisect_right(self.tx_counts, root_tx_num)
|
||||
last_take_over_height = 0
|
||||
activation_height = created_height
|
||||
expiration_height = 0
|
||||
|
||||
support_amount = self.get_support_amount(claim_hash)
|
||||
effective_amount = self.get_effective_amount(claim_hash)
|
||||
channel_hash = self.get_channel_for_claim(claim_hash)
|
||||
|
||||
claims_in_channel = None
|
||||
short_url = f'{name}#{claim_hash.hex()}'
|
||||
canonical_url = short_url
|
||||
if channel_hash:
|
||||
channel_vals = self.get_root_claim_txo_and_current_amount(channel_hash)
|
||||
if channel_vals:
|
||||
_, _, _, channel_name, _, _ = channel_vals
|
||||
claims_in_channel = self.get_claims_in_channel_count(channel_hash)
|
||||
canonical_url = f'{channel_name}#{channel_hash.hex()}/{name}#{claim_hash.hex()}'
|
||||
return ResolveResult(
|
||||
name, claim_hash, tx_num, position, tx_hash, height, short_url=short_url,
|
||||
is_controlling=False, canonical_url=canonical_url, last_take_over_height=last_take_over_height,
|
||||
claims_in_channel=claims_in_channel, creation_height=created_height, activation_height=activation_height,
|
||||
expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount,
|
||||
channel_hash=channel_hash, reposted_claim_hash=None
|
||||
)
|
||||
|
||||
def _resolve(self, normalized_name: str, claim_id: Optional[str] = None,
|
||||
amount_order: int = 1) -> Optional[ResolveResult]:
|
||||
"""
|
||||
:param normalized_name: name
|
||||
:param claim_id: partial or complete claim id
|
||||
:param amount_order: '$<value>' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided
|
||||
"""
|
||||
|
||||
encoded_name = length_encoded_name(normalized_name)
|
||||
amount_order = max(int(amount_order or 1), 1)
|
||||
if claim_id:
|
||||
# resolve by partial/complete claim id
|
||||
short_claim_hash = bytes.fromhex(claim_id)
|
||||
prefix = DB_PREFIXES.claim_short_id_prefix.value + encoded_name + short_claim_hash
|
||||
for k, v in self.db.iterator(prefix=prefix):
|
||||
key = Prefixes.claim_short_id.unpack_key(k)
|
||||
claim_txo = Prefixes.claim_short_id.unpack_value(v)
|
||||
return self._prepare_resolve_result(claim_txo.tx_num, claim_txo.position, key.claim_hash, key.name,
|
||||
key.root_tx_num, key.root_position)
|
||||
return
|
||||
|
||||
# resolve by amount ordering, 1 indexed
|
||||
for idx, (k, v) in enumerate(self.db.iterator(prefix=DB_PREFIXES.claim_effective_amount_prefix.value + encoded_name)):
|
||||
if amount_order > idx + 1:
|
||||
continue
|
||||
key = Prefixes.claim_effective_amount.unpack_key(k)
|
||||
claim_val = Prefixes.claim_effective_amount.unpack_value(v)
|
||||
return self._prepare_resolve_result(
|
||||
key.tx_num, key.position, claim_val.claim_hash, key.name, claim_val.root_tx_num,
|
||||
claim_val.root_position
|
||||
)
|
||||
return
|
||||
|
||||
def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str):
|
||||
prefix = DB_PREFIXES.channel_to_claim.value + channel_hash + length_encoded_name(normalized_name)
|
||||
candidates = []
|
||||
for k, v in self.db.iterator(prefix=prefix):
|
||||
key = Prefixes.channel_to_claim.unpack_key(k)
|
||||
stream = Prefixes.channel_to_claim.unpack_value(v)
|
||||
if not candidates or candidates[-1][-1] == key.effective_amount:
|
||||
candidates.append((stream.claim_hash, key.tx_num, key.position, key.effective_amount))
|
||||
else:
|
||||
break
|
||||
if not candidates:
|
||||
return
|
||||
return list(sorted(candidates, key=lambda item: item[1]))[0]
|
||||
|
||||
def _fs_resolve(self, url):
|
||||
try:
|
||||
parsed = URL.parse(url)
|
||||
except ValueError as e:
|
||||
return e, None
|
||||
|
||||
stream = channel = resolved_channel = resolved_stream = None
|
||||
if parsed.has_stream_in_channel:
|
||||
channel = parsed.channel
|
||||
stream = parsed.stream
|
||||
elif parsed.has_channel:
|
||||
channel = parsed.channel
|
||||
elif parsed.has_stream:
|
||||
stream = parsed.stream
|
||||
if channel:
|
||||
resolved_channel = self._resolve(channel.normalized, channel.claim_id, channel.amount_order)
|
||||
if not resolved_channel:
|
||||
return None, LookupError(f'Could not find channel in "{url}".')
|
||||
if stream:
|
||||
if resolved_channel:
|
||||
stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized)
|
||||
if stream_claim:
|
||||
stream_claim_id, stream_tx_num, stream_tx_pos, effective_amount = stream_claim
|
||||
resolved_stream = self._fs_get_claim_by_hash(stream_claim_id)
|
||||
else:
|
||||
resolved_stream = self._resolve(stream.normalized, stream.claim_id, stream.amount_order)
|
||||
if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash:
|
||||
resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash)
|
||||
if not resolved_stream:
|
||||
return LookupError(f'Could not find claim at "{url}".'), None
|
||||
|
||||
return resolved_stream, resolved_channel
|
||||
|
||||
async def fs_resolve(self, url):
|
||||
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url)
|
||||
|
||||
def _fs_get_claim_by_hash(self, claim_hash):
|
||||
for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash):
|
||||
unpacked_k = Prefixes.claim_to_txo.unpack_key(k)
|
||||
unpacked_v = Prefixes.claim_to_txo.unpack_value(v)
|
||||
return self._prepare_resolve_result(
|
||||
unpacked_k.tx_num, unpacked_k.position, unpacked_k.claim_hash, unpacked_v.name,
|
||||
unpacked_v.root_tx_num, unpacked_v.root_position
|
||||
)
|
||||
|
||||
async def fs_getclaimbyid(self, claim_id):
|
||||
return await asyncio.get_event_loop().run_in_executor(
|
||||
self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id)
|
||||
)
|
||||
|
||||
def claim_exists(self, claim_hash: bytes):
|
||||
for _ in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_value=False):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_root_claim_txo_and_current_amount(self, claim_hash):
|
||||
for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash):
|
||||
unpacked_k = Prefixes.claim_to_txo.unpack_key(k)
|
||||
unpacked_v = Prefixes.claim_to_txo.unpack_value(v)
|
||||
return unpacked_v.root_tx_num, unpacked_v.root_position, unpacked_v.amount, unpacked_v.name,\
|
||||
unpacked_k.tx_num, unpacked_k.position
|
||||
|
||||
def make_staged_claim_item(self, claim_hash: bytes) -> StagedClaimtrieItem:
|
||||
root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount(
|
||||
claim_hash
|
||||
)
|
||||
activation_height = 0
|
||||
effective_amount = self.db.get_support_amount(claim_hash) + value
|
||||
signing_hash = self.get_channel_for_claim(claim_hash)
|
||||
if signing_hash:
|
||||
count = self.get_claims_in_channel_count(signing_hash)
|
||||
else:
|
||||
count = 0
|
||||
return StagedClaimtrieItem(
|
||||
name, claim_hash, value, effective_amount, activation_height, tx_num, idx, root_tx_num, root_idx,
|
||||
signing_hash, count
|
||||
)
|
||||
|
||||
def get_effective_amount(self, claim_hash):
|
||||
for v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_key=False):
|
||||
return Prefixes.claim_to_txo.unpack_value(v).amount + self.get_support_amount(claim_hash)
|
||||
fnord
|
||||
return None
|
||||
|
||||
def get_update_effective_amount_ops(self, claim_hash: bytes, effective_amount: int):
|
||||
claim_info = self.get_root_claim_txo_and_current_amount(claim_hash)
|
||||
if not claim_info:
|
||||
return []
|
||||
root_tx_num, root_position, amount, name, tx_num, position = claim_info
|
||||
signing_hash = self.get_channel_for_claim(claim_hash)
|
||||
claims_in_channel_count = None
|
||||
if signing_hash:
|
||||
claims_in_channel_count = self.get_claims_in_channel_count(signing_hash)
|
||||
prev_effective_amount = self.get_effective_amount(claim_hash)
|
||||
return get_update_effective_amount_ops(
|
||||
name, effective_amount, prev_effective_amount, tx_num, position,
|
||||
root_tx_num, root_position, claim_hash, signing_hash, claims_in_channel_count
|
||||
)
|
||||
|
||||
def get_claims_in_channel_count(self, channel_hash) -> int:
|
||||
for v in self.db.iterator(prefix=DB_PREFIXES.channel_to_claim.value + channel_hash, include_key=False):
|
||||
return Prefixes.channel_to_claim.unpack_value(v).claims_in_channel
|
||||
return 0
|
||||
|
||||
def get_channel_for_claim(self, claim_hash) -> Optional[bytes]:
|
||||
return self.db.get(DB_PREFIXES.claim_to_channel.value + claim_hash)
|
||||
|
||||
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
# unflushed = self.history.unflushed
|
||||
# count = 0
|
||||
|
@ -220,8 +472,7 @@ class LevelDB:
|
|||
# < might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
if self.hist_flush_count > self.utxo_flush_count:
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||
|
@ -350,27 +601,6 @@ class LevelDB:
|
|||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
|
||||
# Spends
|
||||
batch_delete = batch.delete
|
||||
for key in sorted(flush_data.deletes):
|
||||
batch_delete(key)
|
||||
flush_data.deletes.clear()
|
||||
|
||||
# New UTXOs
|
||||
batch_put = batch.put
|
||||
for key, value in flush_data.adds.items():
|
||||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
|
||||
batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
# New undo information
|
||||
for undo_info, height in flush_data.undo_infos:
|
||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
|
@ -394,11 +624,12 @@ class LevelDB:
|
|||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
batch.put(DB_PREFIXES.HIST_STATE.value, repr(state).encode())
|
||||
|
||||
def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
|
||||
"""Flush out cached state. History is always flushed; UTXOs are
|
||||
flushed if flush_utxos."""
|
||||
|
||||
if flush_data.height == self.db_height:
|
||||
self.assert_flushed(flush_data)
|
||||
return
|
||||
|
@ -419,41 +650,49 @@ class LevelDB:
|
|||
assert len(self.tx_counts) == flush_data.height + 1
|
||||
assert len(
|
||||
b''.join(hashes for hashes, _ in flush_data.block_txs)
|
||||
) // 32 == flush_data.tx_count - prior_tx_count
|
||||
|
||||
) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}"
|
||||
|
||||
# Write the headers
|
||||
start_time = time.perf_counter()
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
self.put = batch.put
|
||||
batch_put = self.put
|
||||
batch_delete = batch.delete
|
||||
height_start = self.fs_height + 1
|
||||
tx_num = prior_tx_count
|
||||
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
|
||||
batch_put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
|
||||
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(
|
||||
zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
|
||||
batch_put(DB_PREFIXES.HEADER_PREFIX.value + util.pack_be_uint64(height_start), header)
|
||||
self.headers.append(header)
|
||||
tx_count = self.tx_counts[height_start]
|
||||
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
|
||||
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
batch_put(DB_PREFIXES.BLOCK_HASH_PREFIX.value + util.pack_be_uint64(height_start), block_hash[::-1])
|
||||
batch_put(DB_PREFIXES.TX_COUNT_PREFIX.value + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
height_start += 1
|
||||
offset = 0
|
||||
while offset < len(tx_hashes):
|
||||
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
|
||||
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
|
||||
batch_put(TX_PREFIX + tx_hashes[offset:offset + 32], txs[offset // 32])
|
||||
|
||||
batch_put(DB_PREFIXES.TX_HASH_PREFIX.value + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
|
||||
batch_put(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
|
||||
batch_put(DB_PREFIXES.TX_PREFIX.value + tx_hashes[offset:offset + 32], txs[offset // 32])
|
||||
tx_num += 1
|
||||
offset += 32
|
||||
flush_data.headers.clear()
|
||||
flush_data.block_txs.clear()
|
||||
flush_data.block_hashes.clear()
|
||||
# flush_data.claim_txo_cache.clear()
|
||||
# flush_data.support_txo_cache.clear()
|
||||
|
||||
for staged_change in flush_data.claimtrie_stash:
|
||||
# print("ADVANCE", staged_change)
|
||||
if staged_change.is_put:
|
||||
batch_put(staged_change.key, staged_change.value)
|
||||
else:
|
||||
batch_delete(staged_change.key)
|
||||
flush_data.claimtrie_stash.clear()
|
||||
for undo_claims, height in flush_data.undo_claimtrie:
|
||||
batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_claims)
|
||||
flush_data.undo_claimtrie.clear()
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
|
||||
|
||||
# Then history
|
||||
self.hist_flush_count += 1
|
||||
flush_id = pack_be_uint16(self.hist_flush_count)
|
||||
|
@ -461,17 +700,51 @@ class LevelDB:
|
|||
|
||||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
|
||||
batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes())
|
||||
self.write_history_state(batch)
|
||||
|
||||
unflushed.clear()
|
||||
self.hist_unflushed_count = 0
|
||||
|
||||
|
||||
#########################
|
||||
|
||||
# New undo information
|
||||
for undo_info, height in flush_data.undo_infos:
|
||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
# Spends
|
||||
for key in sorted(flush_data.deletes):
|
||||
batch_delete(key)
|
||||
flush_data.deletes.clear()
|
||||
|
||||
# New UTXOs
|
||||
for key, value in flush_data.adds.items():
|
||||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX)
|
||||
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
start_time = time.time()
|
||||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
|
||||
f'{spend_count:,d} spends in '
|
||||
f'{elapsed:.1f}s, committing...')
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
# self.flush_state(batch)
|
||||
#
|
||||
|
@ -524,24 +797,43 @@ class LevelDB:
|
|||
start_time = time.time()
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
###
|
||||
while self.fs_height > flush_data.height:
|
||||
self.fs_height -= 1
|
||||
self.headers.pop()
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
# Truncate header_mc: header count is 1 more than the height.
|
||||
self.header_mc.truncate(flush_data.height + 1)
|
||||
|
||||
###
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.hist_flush_count += 1
|
||||
nremoves = 0
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
batch_delete = batch.delete
|
||||
|
||||
claim_reorg_height = self.fs_height
|
||||
print("flush undos", flush_data.undo_claimtrie)
|
||||
for (ops, height) in reversed(flush_data.undo_claimtrie):
|
||||
claimtrie_ops = RevertableOp.unpack_stack(ops)
|
||||
print("%i undo ops for %i" % (len(claimtrie_ops), height))
|
||||
for op in reversed(claimtrie_ops):
|
||||
print("REWIND", op)
|
||||
if op.is_put:
|
||||
batch_put(op.key, op.value)
|
||||
else:
|
||||
batch_delete(op.key)
|
||||
batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height))
|
||||
claim_reorg_height -= 1
|
||||
|
||||
flush_data.undo_claimtrie.clear()
|
||||
flush_data.claimtrie_stash.clear()
|
||||
|
||||
while self.fs_height > flush_data.height:
|
||||
self.fs_height -= 1
|
||||
self.headers.pop()
|
||||
tx_count = flush_data.tx_count
|
||||
for hashX in sorted(touched):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value + hashX, reverse=True):
|
||||
k = key[1:]
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
|
@ -554,18 +846,61 @@ class LevelDB:
|
|||
deletes.append(k)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
batch_delete(key)
|
||||
for key, value in puts.items():
|
||||
batch.put(key, value)
|
||||
batch_put(key, value)
|
||||
|
||||
|
||||
self.write_history_state(batch)
|
||||
|
||||
# New undo information
|
||||
for undo_info, height in flush_data.undo_infos:
|
||||
batch.put(self.undo_key(height), b''.join(undo_info))
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
# Spends
|
||||
for key in sorted(flush_data.deletes):
|
||||
batch_delete(key)
|
||||
flush_data.deletes.clear()
|
||||
|
||||
# New UTXOs
|
||||
for key, value in flush_data.adds.items():
|
||||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX)
|
||||
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
start_time = time.time()
|
||||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
|
||||
f'{spend_count:,d} spends in '
|
||||
f'{elapsed:.1f}s, committing...')
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. '
|
||||
|
@ -636,14 +971,14 @@ class LevelDB:
|
|||
tx, merkle = cached_tx
|
||||
else:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
|
||||
tx_num = tx_db_get(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
if tx_num is not None:
|
||||
tx_num = unpack_be_uint64(tx_num)
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
if tx_height < self.db_height:
|
||||
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
|
||||
tx = tx_db_get(DB_PREFIXES.TX_PREFIX.value + tx_hash_bytes)
|
||||
if tx_height == -1:
|
||||
merkle = {
|
||||
'block_height': -1
|
||||
|
@ -691,7 +1026,7 @@ class LevelDB:
|
|||
cnt = 0
|
||||
txs = []
|
||||
|
||||
for hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
|
||||
for hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value + hashX, include_key=False):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
|
@ -726,7 +1061,8 @@ class LevelDB:
|
|||
|
||||
def read_undo_info(self, height):
|
||||
"""Read undo information from a file for the current height."""
|
||||
return self.db.get(self.undo_key(height))
|
||||
undo_claims = self.db.get(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(self.fs_height))
|
||||
return self.db.get(self.undo_key(height)), undo_claims
|
||||
|
||||
def raw_block_prefix(self):
|
||||
return 'block'
|
||||
|
@ -759,7 +1095,7 @@ class LevelDB:
|
|||
"""Clear excess undo info. Only most recent N are kept."""
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
|
@ -847,7 +1183,7 @@ class LevelDB:
|
|||
'first_sync': self.first_sync,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
batch.put(UTXO_STATE, repr(state).encode())
|
||||
batch.put(DB_PREFIXES.UTXO_STATE.value, repr(state).encode())
|
||||
|
||||
def set_flush_count(self, count):
|
||||
self.utxo_flush_count = count
|
||||
|
@ -863,7 +1199,7 @@ class LevelDB:
|
|||
fs_tx_hash = self.fs_tx_hash
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
prefix = UTXO_PREFIX + hashX
|
||||
prefix = DB_PREFIXES.UTXO_PREFIX.value + hashX
|
||||
for db_key, db_value in self.db.iterator(prefix=prefix):
|
||||
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
|
||||
value, = unpack('<Q', db_value)
|
||||
|
@ -894,7 +1230,7 @@ class LevelDB:
|
|||
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
|
||||
prefix = DB_PREFIXES.HASHX_UTXO_PREFIX.value + tx_hash[:4] + idx_packed
|
||||
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for db_key, hashX in self.db.iterator(prefix=prefix):
|
||||
|
@ -915,7 +1251,7 @@ class LevelDB:
|
|||
return None
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
key = UTXO_PREFIX + hashX + suffix
|
||||
key = DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix
|
||||
db_value = self.db.get(key)
|
||||
if not db_value:
|
||||
# This can happen if the DB was updated between
|
||||
|
|
|
@ -1028,13 +1028,23 @@ class LBRYElectrumX(SessionBase):
|
|||
return RPCError(1, str(err))
|
||||
|
||||
async def claimtrie_resolve(self, *urls):
|
||||
if urls:
|
||||
count = len(urls)
|
||||
try:
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc(count)
|
||||
return await self.run_and_cache_query('resolve', urls)
|
||||
finally:
|
||||
self.session_mgr.resolved_url_count_metric.inc(count)
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
print("resolve", url)
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc()
|
||||
stream, channel = await self.db.fs_resolve(url)
|
||||
self.session_mgr.resolved_url_count_metric.inc()
|
||||
if channel and not stream:
|
||||
rows.append(channel)
|
||||
# print("resolved channel", channel.name.decode())
|
||||
elif stream:
|
||||
# print("resolved stream", stream.name.decode())
|
||||
rows.append(stream)
|
||||
if channel:
|
||||
# print("and channel", channel.name.decode())
|
||||
extra.append(channel)
|
||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
||||
return Outputs.to_base64(rows, extra, 0, None, None)
|
||||
|
||||
async def get_server_height(self):
|
||||
return self.bp.height
|
||||
|
|
Loading…
Add table
Reference in a new issue