diff --git a/lbry/schema/result.py b/lbry/schema/result.py index ef86c7696..ff21edeaf 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -1,23 +1,28 @@ import base64 import struct -from typing import List +from typing import List, TYPE_CHECKING, Union from binascii import hexlify from itertools import chain from lbry.error import ResolveCensoredError from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage from lbry.schema.types.v2.result_pb2 import Error as ErrorMessage +if TYPE_CHECKING: + from lbry.wallet.server.leveldb import ResolveResult INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID) NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND) BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED) -def set_reference(reference, txo_row): - if txo_row: - reference.tx_hash = txo_row['txo_hash'][:32] - reference.nout = struct.unpack(' bool: if self.is_censored(row): - censoring_channel_hash = bytes.fromhex(row['censoring_channel_id'])[::-1] + censoring_channel_hash = row['censoring_channel_hash'] self.censored.setdefault(censoring_channel_hash, set()) self.censored[censoring_channel_hash].add(row['tx_hash']) return True @@ -174,46 +179,49 @@ class Outputs: page.offset = offset if total is not None: page.total = total - if blocked is not None: - blocked.to_message(page, extra_txo_rows) + # if blocked is not None: + # blocked.to_message(page, extra_txo_rows) + for row in extra_txo_rows: + cls.encode_txo(page.extra_txos.add(), row) + for row in txo_rows: - cls.row_to_message(row, page.txos.add(), extra_txo_rows) - for row in extra_txo_rows.values(): - cls.row_to_message(row, page.extra_txos.add(), extra_txo_rows) + # cls.row_to_message(row, page.txos.add(), extra_txo_rows) + txo_message: 'OutputsMessage' = page.txos.add() + cls.encode_txo(txo_message, row) + if not isinstance(row, Exception): + if row.channel_hash: + set_reference(txo_message.claim.channel, row.channel_hash, extra_txo_rows) + if row.reposted_claim_hash: + set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows) + # set_reference(txo_message.error.blocked.channel, row.censor_hash, extra_txo_rows) return page.SerializeToString() @classmethod - def row_to_message(cls, txo, txo_message, extra_row_dict: dict): - if isinstance(txo, Exception): - txo_message.error.text = txo.args[0] - if isinstance(txo, ValueError): + def encode_txo(cls, txo_message, resolve_result: Union['ResolveResult', Exception]): + if isinstance(resolve_result, Exception): + txo_message.error.text = resolve_result.args[0] + if isinstance(resolve_result, ValueError): txo_message.error.code = ErrorMessage.INVALID - elif isinstance(txo, LookupError): + elif isinstance(resolve_result, LookupError): txo_message.error.code = ErrorMessage.NOT_FOUND - elif isinstance(txo, ResolveCensoredError): + elif isinstance(resolve_result, ResolveCensoredError): txo_message.error.code = ErrorMessage.BLOCKED - set_reference(txo_message.error.blocked.channel, extra_row_dict.get(bytes.fromhex(txo.censor_id)[::-1])) return - txo_message.tx_hash = txo['txo_hash'][:32] - txo_message.nout, = struct.unpack('= min_height: self.undo_infos.append((undo_info, height)) + self.undo_claims.append((undo_claims, height)) self.db.write_raw_block(block.raw, height) + for touched_claim_hash, amount_changes in self.effective_amount_changes.items(): + new_effective_amount = sum(amount_changes) + assert new_effective_amount >= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}' + self.claimtrie_stash.extend( + self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount) + ) + # print("update effective amount to", touched_claim_hash.hex(), new_effective_amount) + headers = [block.header for block in blocks] self.height = height self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining) + # print("+++++++++++++++++++++++++++++++++++++++++++++\nFLUSHED\n+++++++++++++++++++++++++++++++++++++++++++++") + + self.effective_amount_changes.clear() + self.pending_claims.clear() + self.pending_claim_txos.clear() + self.pending_supports.clear() + self.pending_support_txos.clear() + self.pending_abandon.clear() for cache in self.search_cache.values(): cache.clear() self.history_cache.clear() self.notifications.notified_mempool_txs.clear() - def advance_txs(self, height, txs: List[Tuple[Tx, bytes]], header, block_hash): + def _add_claim_or_update(self, txo, script, tx_hash, idx, tx_count, txout, spent_claims): + try: + claim_name = txo.normalized_name + except UnicodeDecodeError: + claim_name = ''.join(chr(c) for c in txo.script.values['claim_name']) + if script.is_claim_name: + claim_hash = hash160(tx_hash + pack('>I', idx))[::-1] + # print(f"\tnew lbry://{claim_name}#{claim_hash.hex()} ({tx_count} {txout.value})") + else: + claim_hash = txo.claim_hash[::-1] + + signing_channel_hash = None + channel_claims_count = 0 + activation_height = 0 + try: + signable = txo.signable + except: # google.protobuf.message.DecodeError: Could not parse JSON. + signable = None + + if signable and signable.signing_channel_hash: + signing_channel_hash = txo.signable.signing_channel_hash[::-1] + # if signing_channel_hash in self.pending_claim_txos: + # pending_channel = self.pending_claims[self.pending_claim_txos[signing_channel_hash]] + # channel_claims_count = pending_channel. + + channel_claims_count = self.db.get_claims_in_channel_count(signing_channel_hash) + 1 + if script.is_claim_name: + support_amount = 0 + root_tx_num, root_idx = tx_count, idx + else: + if claim_hash not in spent_claims: + print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}") + return [] + support_amount = self.db.get_support_amount(claim_hash) + (prev_tx_num, prev_idx, _) = spent_claims.pop(claim_hash) + # print(f"\tupdate lbry://{claim_name}#{claim_hash.hex()} {tx_hash[::-1].hex()} {txout.value}") + + if (prev_tx_num, prev_idx) in self.pending_claims: + previous_claim = self.pending_claims.pop((prev_tx_num, prev_idx)) + root_tx_num = previous_claim.root_claim_tx_num + root_idx = previous_claim.root_claim_tx_position + # prev_amount = previous_claim.amount + else: + root_tx_num, root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( + claim_hash + ) + + pending = StagedClaimtrieItem( + claim_name, claim_hash, txout.value, support_amount + txout.value, + activation_height, tx_count, idx, root_tx_num, root_idx, + signing_channel_hash, channel_claims_count + ) + + self.pending_claims[(tx_count, idx)] = pending + self.pending_claim_txos[claim_hash] = (tx_count, idx) + self.effective_amount_changes[claim_hash].append(txout.value) + return pending.get_add_claim_utxo_ops() + + def _add_support(self, txo, txout, idx, tx_count): + supported_claim_hash = txo.claim_hash[::-1] + + if supported_claim_hash in self.effective_amount_changes: + # print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}") + self.effective_amount_changes[supported_claim_hash].append(txout.value) + self.pending_supports[supported_claim_hash].add((tx_count, idx)) + self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value + return StagedClaimtrieSupport( + supported_claim_hash, tx_count, idx, txout.value + ).get_add_support_utxo_ops() + + elif supported_claim_hash not in self.pending_claims and supported_claim_hash not in self.pending_abandon: + if self.db.claim_exists(supported_claim_hash): + _, _, _, name, supported_tx_num, supported_pos = self.db.get_root_claim_txo_and_current_amount( + supported_claim_hash + ) + starting_amount = self.db.get_effective_amount(supported_claim_hash) + if supported_claim_hash not in self.effective_amount_changes: + self.effective_amount_changes[supported_claim_hash].append(starting_amount) + self.effective_amount_changes[supported_claim_hash].append(txout.value) + self.pending_supports[supported_claim_hash].add((tx_count, idx)) + self.pending_support_txos[(tx_count, idx)] = supported_claim_hash, txout.value + # print(f"\tsupport claim {supported_claim_hash.hex()} {starting_amount}+{txout.value}={starting_amount + txout.value}") + return StagedClaimtrieSupport( + supported_claim_hash, tx_count, idx, txout.value + ).get_add_support_utxo_ops() + else: + print(f"\tthis is a wonky tx, contains unlinked support for non existent {supported_claim_hash.hex()}") + return [] + + def _add_claim_or_support(self, tx_hash, tx_count, idx, txo, txout, script, spent_claims): + if script.is_claim_name or script.is_update_claim: + return self._add_claim_or_update(txo, script, tx_hash, idx, tx_count, txout, spent_claims) + elif script.is_support_claim or script.is_support_claim_data: + return self._add_support(txo, txout, idx, tx_count) + return [] + + def _spend_support(self, txin): + txin_num = self.db.transaction_num_mapping[txin.prev_hash] + + if (txin_num, txin.prev_idx) in self.pending_support_txos: + spent_support, support_amount = self.pending_support_txos.pop((txin_num, txin.prev_idx)) + self.pending_supports[spent_support].remove((txin_num, txin.prev_idx)) + else: + spent_support, support_amount = self.db.get_supported_claim_from_txo(txin_num, txin.prev_idx) + if spent_support and support_amount is not None and spent_support not in self.pending_abandon: + # print(f"\tspent support for {spent_support.hex()} -{support_amount} ({txin_num}, {txin.prev_idx})") + if spent_support not in self.effective_amount_changes: + assert spent_support not in self.pending_claims + prev_effective_amount = self.db.get_effective_amount(spent_support) + self.effective_amount_changes[spent_support].append(prev_effective_amount) + self.effective_amount_changes[spent_support].append(-support_amount) + return StagedClaimtrieSupport( + spent_support, txin_num, txin.prev_idx, support_amount + ).get_spend_support_txo_ops() + return [] + + def _spend_claim(self, txin, spent_claims): + txin_num = self.db.transaction_num_mapping[txin.prev_hash] + if (txin_num, txin.prev_idx) in self.pending_claims: + spent = self.pending_claims[(txin_num, txin.prev_idx)] + name = spent.name + spent_claims[spent.claim_hash] = (txin_num, txin.prev_idx, name) + # print(f"spend lbry://{name}#{spent.claim_hash.hex()}") + else: + spent_claim_hash_and_name = self.db.claim_hash_and_name_from_txo( + txin_num, txin.prev_idx + ) + if not spent_claim_hash_and_name: # txo is not a claim + return [] + prev_claim_hash, txi_len_encoded_name = spent_claim_hash_and_name + + prev_signing_hash = self.db.get_channel_for_claim(prev_claim_hash) + prev_claims_in_channel_count = None + if prev_signing_hash: + prev_claims_in_channel_count = self.db.get_claims_in_channel_count( + prev_signing_hash + ) + prev_effective_amount = self.db.get_effective_amount( + prev_claim_hash + ) + claim_root_tx_num, claim_root_idx, prev_amount, name, tx_num, position = self.db.get_root_claim_txo_and_current_amount(prev_claim_hash) + activation_height = 0 + spent = StagedClaimtrieItem( + name, prev_claim_hash, prev_amount, prev_effective_amount, + activation_height, txin_num, txin.prev_idx, claim_root_tx_num, + claim_root_idx, prev_signing_hash, prev_claims_in_channel_count + ) + spent_claims[prev_claim_hash] = (txin_num, txin.prev_idx, name) + # print(f"spend lbry://{spent_claims[prev_claim_hash][2]}#{prev_claim_hash.hex()}") + if spent.claim_hash not in self.effective_amount_changes: + self.effective_amount_changes[spent.claim_hash].append(spent.effective_amount) + self.effective_amount_changes[spent.claim_hash].append(-spent.amount) + return spent.get_spend_claim_txo_ops() + + def _spend_claim_or_support(self, txin, spent_claims): + spend_claim_ops = self._spend_claim(txin, spent_claims) + if spend_claim_ops: + return spend_claim_ops + return self._spend_support(txin) + + def _abandon(self, spent_claims): + # Handle abandoned claims + ops = [] + + for abandoned_claim_hash, (prev_tx_num, prev_idx, name) in spent_claims.items(): + # print(f"\tabandon lbry://{name}#{abandoned_claim_hash.hex()} {prev_tx_num} {prev_idx}") + + if (prev_tx_num, prev_idx) in self.pending_claims: + pending = self.pending_claims.pop((prev_tx_num, prev_idx)) + claim_root_tx_num = pending.root_claim_tx_num + claim_root_idx = pending.root_claim_tx_position + prev_amount = pending.amount + prev_signing_hash = pending.signing_hash + prev_effective_amount = pending.effective_amount + prev_claims_in_channel_count = pending.claims_in_channel_count + else: + claim_root_tx_num, claim_root_idx, prev_amount, _, _, _ = self.db.get_root_claim_txo_and_current_amount( + abandoned_claim_hash + ) + prev_signing_hash = self.db.get_channel_for_claim(abandoned_claim_hash) + prev_claims_in_channel_count = None + if prev_signing_hash: + prev_claims_in_channel_count = self.db.get_claims_in_channel_count( + prev_signing_hash + ) + prev_effective_amount = self.db.get_effective_amount( + abandoned_claim_hash + ) + + for (support_tx_num, support_tx_idx) in self.pending_supports[abandoned_claim_hash]: + _, support_amount = self.pending_support_txos.pop((support_tx_num, support_tx_idx)) + ops.extend( + StagedClaimtrieSupport( + abandoned_claim_hash, support_tx_num, support_tx_idx, support_amount + ).get_spend_support_txo_ops() + ) + # print(f"\tremove pending support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}") + self.pending_supports[abandoned_claim_hash].clear() + self.pending_supports.pop(abandoned_claim_hash) + + for (support_tx_num, support_tx_idx, support_amount) in self.db.get_supports(abandoned_claim_hash): + ops.extend( + StagedClaimtrieSupport( + abandoned_claim_hash, support_tx_num, support_tx_idx, support_amount + ).get_spend_support_txo_ops() + ) + # print(f"\tremove support for abandoned lbry://{name}#{abandoned_claim_hash.hex()} {support_tx_num} {support_tx_idx}") + + activation_height = 0 + if abandoned_claim_hash in self.effective_amount_changes: + # print("pop") + self.effective_amount_changes.pop(abandoned_claim_hash) + self.pending_abandon.add(abandoned_claim_hash) + + # print(f"\tabandoned lbry://{name}#{abandoned_claim_hash.hex()}") + ops.extend( + StagedClaimtrieItem( + name, abandoned_claim_hash, prev_amount, prev_effective_amount, + activation_height, prev_tx_num, prev_idx, claim_root_tx_num, + claim_root_idx, prev_signing_hash, prev_claims_in_channel_count + ).get_abandon_ops(self.db.db)) + return ops + + def advance_block(self, block, height: int): + from lbry.wallet.transaction import OutputScript, Output + + txs: List[Tuple[Tx, bytes]] = block.transactions + # header = self.coin.electrum_header(block.header, height) + block_hash = self.coin.header_hash(block.header) + self.block_hashes.append(block_hash) self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) + first_tx_num = self.tx_count undo_info = [] hashXs_by_tx = [] - tx_num = self.tx_count + tx_count = self.tx_count # Use local vars for speed in the loops put_utxo = self.utxo_cache.__setitem__ + claimtrie_stash = [] + claimtrie_stash_extend = claimtrie_stash.extend spend_utxo = self.spend_utxo undo_info_append = undo_info.append update_touched = self.touched.update append_hashX_by_tx = hashXs_by_tx.append hashX_from_script = self.coin.hashX_from_script + unchanged_effective_amounts = {k: sum(v) for k, v in self.effective_amount_changes.items()} + for tx, tx_hash in txs: - hashXs = [] + # print(f"{tx_hash[::-1].hex()} @ {height}") + spent_claims = {} + + hashXs = [] # hashXs touched by spent inputs/rx outputs append_hashX = hashXs.append - tx_numb = pack('= 0, f'{new_effective_amount}, {touched_claim_hash.hex()}' + # if touched_claim_hash not in unchanged_effective_amounts or unchanged_effective_amounts[touched_claim_hash] != new_effective_amount: + # claimtrie_stash_extend( + # self.db.get_update_effective_amount_ops(touched_claim_hash, new_effective_amount) + # ) + # # print("update effective amount to", touched_claim_hash.hex(), new_effective_amount) + + undo_claims = b''.join(op.invert().pack() for op in claimtrie_stash) + self.claimtrie_stash.extend(claimtrie_stash) + # print("%i undo bytes for %i (%i claimtrie stash ops)" % (len(undo_claims), height, len(claimtrie_stash))) + + return undo_info, undo_claims def backup_blocks(self, raw_blocks): """Backup the raw blocks and flush. @@ -495,6 +800,7 @@ class BlockProcessor: coin = self.coin for raw_block in raw_blocks: self.logger.info("backup block %i", self.height) + print("backup", self.height) # Check and update self.tip block = coin.block(raw_block, self.height) header_hash = coin.header_hash(block.header) @@ -511,13 +817,14 @@ class BlockProcessor: # self.touched can include other addresses which is # harmless, but remove None. self.touched.discard(None) + self.db.flush_backup(self.flush_data(), self.touched) self.logger.info(f'backed up to height {self.height:,d}') def backup_txs(self, txs): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order - undo_info = self.db.read_undo_info(self.height) + undo_info, undo_claims = self.db.read_undo_info(self.height) if undo_info is None: raise ChainError(f'no undo information found for height {self.height:,d}') n = len(undo_info) @@ -548,6 +855,7 @@ class BlockProcessor: assert n == 0 self.tx_count -= len(txs) + self.undo_claims.append((undo_claims, self.height)) """An in-memory UTXO cache, representing all changes to UTXO state since the last DB flush. @@ -610,6 +918,7 @@ class BlockProcessor: all UTXOs so not finding one indicates a logic error or DB corruption. """ + # Fast track is it being in the cache idx_packed = pack(' bytes: + encoded = name.encode('utf-8') + return len(encoded).to_bytes(2, byteorder='big') + encoded + + +class StagedClaimtrieSupport(typing.NamedTuple): + claim_hash: bytes + tx_num: int + position: int + amount: int + + def _get_add_remove_support_utxo_ops(self, add=True): + """ + get a list of revertable operations to add or spend a support txo to the key: value database + + :param add: if true use RevertablePut operations, otherwise use RevertableDelete + :return: + """ + op = RevertablePut if add else RevertableDelete + return [ + op( + *Prefixes.claim_to_support.pack_item(self.claim_hash, self.tx_num, self.position, self.amount) + ), + op( + *Prefixes.support_to_claim.pack_item(self.tx_num, self.position, self.claim_hash) + ) + ] + + def get_add_support_utxo_ops(self) -> typing.List[RevertableOp]: + return self._get_add_remove_support_utxo_ops(add=True) + + def get_spend_support_txo_ops(self) -> typing.List[RevertableOp]: + return self._get_add_remove_support_utxo_ops(add=False) + + +def get_update_effective_amount_ops(name: str, new_effective_amount: int, prev_effective_amount: int, tx_num: int, + position: int, root_tx_num: int, root_position: int, claim_hash: bytes, + signing_hash: Optional[bytes] = None, + claims_in_channel_count: Optional[int] = None): + assert root_position != root_tx_num, f"{tx_num} {position} {root_tx_num} {root_tx_num}" + ops = [ + RevertableDelete( + *Prefixes.claim_effective_amount.pack_item( + name, prev_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position + ) + ), + RevertablePut( + *Prefixes.claim_effective_amount.pack_item( + name, new_effective_amount, tx_num, position, claim_hash, root_tx_num, root_position + ) + ) + ] + if signing_hash: + ops.extend([ + RevertableDelete( + *Prefixes.channel_to_claim.pack_item( + signing_hash, name, prev_effective_amount, tx_num, position, claim_hash, claims_in_channel_count + ) + ), + RevertablePut( + *Prefixes.channel_to_claim.pack_item( + signing_hash, name, new_effective_amount, tx_num, position, claim_hash, claims_in_channel_count + ) + ) + ]) + return ops + + +class StagedClaimtrieItem(typing.NamedTuple): + name: str + claim_hash: bytes + amount: int + effective_amount: int + activation_height: int + tx_num: int + position: int + root_claim_tx_num: int + root_claim_tx_position: int + signing_hash: Optional[bytes] + claims_in_channel_count: Optional[int] + + @property + def is_update(self) -> bool: + return (self.tx_num, self.position) != (self.root_claim_tx_num, self.root_claim_tx_position) + + def _get_add_remove_claim_utxo_ops(self, add=True): + """ + get a list of revertable operations to add or spend a claim txo to the key: value database + + :param add: if true use RevertablePut operations, otherwise use RevertableDelete + :return: + """ + op = RevertablePut if add else RevertableDelete + ops = [ + # url resolution by effective amount + op( + *Prefixes.claim_effective_amount.pack_item( + self.name, self.effective_amount, self.tx_num, self.position, self.claim_hash, + self.root_claim_tx_num, self.root_claim_tx_position + ) + ), + # claim tip by claim hash + op( + *Prefixes.claim_to_txo.pack_item( + self.claim_hash, self.tx_num, self.position, self.root_claim_tx_num, self.root_claim_tx_position, + self.amount, self.name + ) + ), + # short url resolution + op( + *Prefixes.claim_short_id.pack_item( + self.name, self.claim_hash, self.root_claim_tx_num, self.root_claim_tx_position, self.tx_num, + self.position + ) + ), + # claim hash by txo + op( + *Prefixes.txo_to_claim.pack_item(self.tx_num, self.position, self.claim_hash, self.name) + ) + ] + if self.signing_hash and self.claims_in_channel_count is not None: + # claims_in_channel_count can be none if the channel doesnt exist + ops.extend([ + # channel by stream + op( + *Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash) + ), + # stream by channel + op( + *Prefixes.channel_to_claim.pack_item( + self.signing_hash, self.name, self.effective_amount, self.tx_num, self.position, + self.claim_hash, self.claims_in_channel_count + ) + ) + ]) + return ops + + def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]: + return self._get_add_remove_claim_utxo_ops(add=True) + + def get_spend_claim_txo_ops(self) -> typing.List[RevertableOp]: + return self._get_add_remove_claim_utxo_ops(add=False) + + def get_invalidate_channel_ops(self, db) -> typing.List[RevertableOp]: + if not self.signing_hash: + return [] + return [ + RevertableDelete(*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash)) + ] + delete_prefix(db, DB_PREFIXES.channel_to_claim.value + self.signing_hash) + + def get_abandon_ops(self, db) -> typing.List[RevertableOp]: + packed_name = length_encoded_name(self.name) + delete_short_id_ops = delete_prefix( + db, DB_PREFIXES.claim_short_id_prefix.value + packed_name + self.claim_hash + ) + delete_claim_ops = delete_prefix(db, DB_PREFIXES.claim_to_txo.value + self.claim_hash) + delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash) + invalidate_channel_ops = self.get_invalidate_channel_ops(db) + return delete_short_id_ops + delete_claim_ops + delete_supports_ops + invalidate_channel_ops + diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py new file mode 100644 index 000000000..3b1657af9 --- /dev/null +++ b/lbry/wallet/server/db/prefixes.py @@ -0,0 +1,391 @@ +import typing +import struct +from lbry.wallet.server.db import DB_PREFIXES + + +def length_encoded_name(name: str) -> bytes: + encoded = name.encode('utf-8') + return len(encoded).to_bytes(2, byteorder='big') + encoded + + +class PrefixRow: + prefix: bytes + key_struct: struct.Struct + value_struct: struct.Struct + + @classmethod + def pack_key(cls, *args) -> bytes: + return cls.prefix + cls.key_struct.pack(*args) + + @classmethod + def pack_value(cls, *args) -> bytes: + return cls.value_struct.pack(*args) + + @classmethod + def unpack_key(cls, key: bytes): + assert key[:1] == cls.prefix + return cls.key_struct.unpack(key[1:]) + + @classmethod + def unpack_value(cls, data: bytes): + return cls.value_struct.unpack(data) + + @classmethod + def unpack_item(cls, key: bytes, value: bytes): + return cls.unpack_key(key), cls.unpack_value(value) + + +class EffectiveAmountKey(typing.NamedTuple): + name: str + effective_amount: int + tx_num: int + position: int + + +class EffectiveAmountValue(typing.NamedTuple): + claim_hash: bytes + root_tx_num: int + root_position: int + + +class ClaimToTXOKey(typing.NamedTuple): + claim_hash: bytes + tx_num: int + position: int + + +class ClaimToTXOValue(typing.NamedTuple): + root_tx_num: int + root_position: int + amount: int + name: str + + +class TXOToClaimKey(typing.NamedTuple): + tx_num: int + position: int + + +class TXOToClaimValue(typing.NamedTuple): + claim_hash: bytes + name: str + + +class ClaimShortIDKey(typing.NamedTuple): + name: str + claim_hash: bytes + root_tx_num: int + root_position: int + + +class ClaimShortIDValue(typing.NamedTuple): + tx_num: int + position: int + + +class ClaimToChannelKey(typing.NamedTuple): + claim_hash: bytes + + +class ClaimToChannelValue(typing.NamedTuple): + signing_hash: bytes + + +class ChannelToClaimKey(typing.NamedTuple): + signing_hash: bytes + name: str + effective_amount: int + tx_num: int + position: int + + +class ChannelToClaimValue(typing.NamedTuple): + claim_hash: bytes + claims_in_channel: int + + +class ClaimToSupportKey(typing.NamedTuple): + claim_hash: bytes + tx_num: int + position: int + + +class ClaimToSupportValue(typing.NamedTuple): + amount: int + + +class SupportToClaimKey(typing.NamedTuple): + tx_num: int + position: int + + +class SupportToClaimValue(typing.NamedTuple): + claim_hash: bytes + + +class EffectiveAmountPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_effective_amount_prefix.value + key_struct = struct.Struct(b'>QLH') + value_struct = struct.Struct(b'>20sLH') + + @classmethod + def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int): + return cls.prefix + length_encoded_name(name) + cls.key_struct.pack( + 0xffffffffffffffff - effective_amount, tx_num, position + ) + + @classmethod + def unpack_key(cls, key: bytes) -> EffectiveAmountKey: + assert key[:1] == cls.prefix + name_len = int.from_bytes(key[1:3], byteorder='big') + name = key[3:3 + name_len].decode() + ones_comp_effective_amount, tx_num, position = cls.key_struct.unpack(key[3 + name_len:]) + return EffectiveAmountKey( + name, 0xffffffffffffffff - ones_comp_effective_amount, tx_num, position + ) + + @classmethod + def unpack_value(cls, data: bytes) -> EffectiveAmountValue: + return EffectiveAmountValue(*super().unpack_value(data)) + + @classmethod + def pack_value(cls, claim_hash: bytes, root_tx_num: int, root_position: int) -> bytes: + return super().pack_value(claim_hash, root_tx_num, root_position) + + @classmethod + def pack_item(cls, name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes, + root_tx_num: int, root_position: int): + return cls.pack_key(name, effective_amount, tx_num, position), \ + cls.pack_value(claim_hash, root_tx_num, root_position) + + +class ClaimToTXOPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_to_txo.value + key_struct = struct.Struct(b'>20sLH') + value_struct = struct.Struct(b'>LHQ') + + @classmethod + def pack_key(cls, claim_hash: bytes, tx_num: int, position: int): + return super().pack_key( + claim_hash, 0xffffffff - tx_num, 0xffff - position + ) + + @classmethod + def unpack_key(cls, key: bytes) -> ClaimToTXOKey: + assert key[:1] == cls.prefix + claim_hash, ones_comp_tx_num, ones_comp_position = cls.key_struct.unpack(key[1:]) + return ClaimToTXOKey( + claim_hash, 0xffffffff - ones_comp_tx_num, 0xffff - ones_comp_position + ) + + @classmethod + def unpack_value(cls, data: bytes) ->ClaimToTXOValue: + root_tx_num, root_position, amount = cls.value_struct.unpack(data[:14]) + name_len = int.from_bytes(data[14:16], byteorder='big') + name = data[16:16 + name_len].decode() + return ClaimToTXOValue(root_tx_num, root_position, amount, name) + + @classmethod + def pack_value(cls, root_tx_num: int, root_position: int, amount: int, name: str) -> bytes: + return cls.value_struct.pack(root_tx_num, root_position, amount) + length_encoded_name(name) + + @classmethod + def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int, + amount: int, name: str): + return cls.pack_key(claim_hash, tx_num, position), \ + cls.pack_value(root_tx_num, root_position, amount, name) + + +class TXOToClaimPrefixRow(PrefixRow): + prefix = DB_PREFIXES.txo_to_claim.value + key_struct = struct.Struct(b'>LH') + value_struct = struct.Struct(b'>20s') + + @classmethod + def pack_key(cls, tx_num: int, position: int): + return super().pack_key(tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> TXOToClaimKey: + return TXOToClaimKey(*super().unpack_key(key)) + + @classmethod + def unpack_value(cls, data: bytes) -> TXOToClaimValue: + claim_hash, = cls.value_struct.unpack(data[:20]) + name_len = int.from_bytes(data[20:22], byteorder='big') + name = data[22:22 + name_len].decode() + return TXOToClaimValue(claim_hash, name) + + @classmethod + def pack_value(cls, claim_hash: bytes, name: str) -> bytes: + return cls.value_struct.pack(claim_hash) + length_encoded_name(name) + + @classmethod + def pack_item(cls, tx_num: int, position: int, claim_hash: bytes, name: str): + return cls.pack_key(tx_num, position), \ + cls.pack_value(claim_hash, name) + + +class ClaimShortIDPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_short_id_prefix.value + key_struct = struct.Struct(b'>20sLH') + value_struct = struct.Struct(b'>LH') + + @classmethod + def pack_key(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int): + return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(claim_hash, root_tx_num, root_position) + + @classmethod + def pack_value(cls, tx_num: int, position: int): + return super().pack_value(tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> ClaimShortIDKey: + assert key[:1] == cls.prefix + name_len = int.from_bytes(key[1:3], byteorder='big') + name = key[3:3 + name_len].decode() + return ClaimShortIDKey(name, *cls.key_struct.unpack(key[3 + name_len:])) + + @classmethod + def unpack_value(cls, data: bytes) -> ClaimShortIDValue: + return ClaimShortIDValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int, + tx_num: int, position: int): + return cls.pack_key(name, claim_hash, root_tx_num, root_position), \ + cls.pack_value(tx_num, position) + + +class ClaimToChannelPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_to_channel.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'>20s') + + @classmethod + def pack_key(cls, claim_hash: bytes): + return super().pack_key(claim_hash) + + @classmethod + def pack_value(cls, signing_hash: bytes): + return super().pack_value(signing_hash) + + @classmethod + def unpack_key(cls, key: bytes) -> ClaimToChannelKey: + return ClaimToChannelKey(*super().unpack_key(key)) + + @classmethod + def unpack_value(cls, data: bytes) -> ClaimToChannelValue: + return ClaimToChannelValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, claim_hash: bytes, signing_hash: bytes): + return cls.pack_key(claim_hash), cls.pack_value(signing_hash) + + +class ChannelToClaimPrefixRow(PrefixRow): + prefix = DB_PREFIXES.channel_to_claim.value + key_struct = struct.Struct(b'>QLH') + value_struct = struct.Struct(b'>20sL') + + @classmethod + def pack_key(cls, signing_hash: bytes, name: str, effective_amount: int, tx_num: int, position: int): + return cls.prefix + signing_hash + length_encoded_name(name) + cls.key_struct.pack( + 0xffffffffffffffff - effective_amount, tx_num, position + ) + + @classmethod + def unpack_key(cls, key: bytes) -> ChannelToClaimKey: + assert key[:1] == cls.prefix + signing_hash = key[1:21] + name_len = int.from_bytes(key[21:23], byteorder='big') + name = key[23:23 + name_len].decode() + ones_comp_effective_amount, tx_num, position = cls.key_struct.unpack(key[23 + name_len:]) + return ChannelToClaimKey( + signing_hash, name, 0xffffffffffffffff - ones_comp_effective_amount, tx_num, position + ) + + @classmethod + def pack_value(cls, claim_hash: bytes, claims_in_channel: int) -> bytes: + return super().pack_value(claim_hash, claims_in_channel) + + @classmethod + def unpack_value(cls, data: bytes) -> ChannelToClaimValue: + return ChannelToClaimValue(*cls.value_struct.unpack(data)) + + @classmethod + def pack_item(cls, signing_hash: bytes, name: str, effective_amount: int, tx_num: int, position: int, + claim_hash: bytes, claims_in_channel: int): + return cls.pack_key(signing_hash, name, effective_amount, tx_num, position), \ + cls.pack_value(claim_hash, claims_in_channel) + + +class ClaimToSupportPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_to_support.value + key_struct = struct.Struct(b'>20sLH') + value_struct = struct.Struct(b'>Q') + + @classmethod + def pack_key(cls, claim_hash: bytes, tx_num: int, position: int): + return super().pack_key(claim_hash, tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> ClaimToSupportKey: + return ClaimToSupportKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, amount: int) -> bytes: + return super().pack_value(amount) + + @classmethod + def unpack_value(cls, data: bytes) -> ClaimToSupportValue: + return ClaimToSupportValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, amount: int): + return cls.pack_key(claim_hash, tx_num, position), \ + cls.pack_value(amount) + + +class SupportToClaimPrefixRow(PrefixRow): + prefix = DB_PREFIXES.support_to_claim.value + key_struct = struct.Struct(b'>LH') + value_struct = struct.Struct(b'>20s') + + @classmethod + def pack_key(cls, tx_num: int, position: int): + return super().pack_key(tx_num, position) + + @classmethod + def unpack_key(cls, key: bytes) -> SupportToClaimKey: + return SupportToClaimKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, claim_hash: bytes) -> bytes: + return super().pack_value(claim_hash) + + @classmethod + def unpack_value(cls, data: bytes) -> SupportToClaimValue: + return SupportToClaimValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, tx_num: int, position: int, claim_hash: bytes): + return cls.pack_key(tx_num, position), \ + cls.pack_value(claim_hash) + + +class Prefixes: + claim_to_support = ClaimToSupportPrefixRow + support_to_claim = SupportToClaimPrefixRow + + claim_to_txo = ClaimToTXOPrefixRow + txo_to_claim = TXOToClaimPrefixRow + + claim_to_channel = ClaimToChannelPrefixRow + channel_to_claim = ChannelToClaimPrefixRow + + claim_short_id = ClaimShortIDPrefixRow + + claim_effective_amount = EffectiveAmountPrefixRow + + undo_claimtrie = b'M' diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py new file mode 100644 index 000000000..bd391cf88 --- /dev/null +++ b/lbry/wallet/server/db/revertable.py @@ -0,0 +1,78 @@ +import struct +from typing import Tuple, List +from lbry.wallet.server.db import DB_PREFIXES + +_OP_STRUCT = struct.Struct('>BHH') + + +class RevertableOp: + __slots__ = [ + 'key', + 'value', + ] + is_put = 0 + + def __init__(self, key: bytes, value: bytes): + self.key = key + self.value = value + + def invert(self) -> 'RevertableOp': + raise NotImplementedError() + + def pack(self) -> bytes: + """ + Serialize to bytes + """ + return struct.pack( + f'>BHH{len(self.key)}s{len(self.value)}s', self.is_put, len(self.key), len(self.value), self.key, + self.value + ) + + @classmethod + def unpack(cls, packed: bytes) -> Tuple['RevertableOp', bytes]: + """ + Deserialize from bytes + + :param packed: bytes containing at least one packed revertable op + :return: tuple of the deserialized op (a put or a delete) and the remaining serialized bytes + """ + is_put, key_len, val_len = _OP_STRUCT.unpack(packed[:5]) + key = packed[5:5 + key_len] + value = packed[5 + key_len:5 + key_len + val_len] + if is_put == 1: + return RevertablePut(key, value), packed[5 + key_len + val_len:] + return RevertableDelete(key, value), packed[5 + key_len + val_len:] + + @classmethod + def unpack_stack(cls, packed: bytes) -> List['RevertableOp']: + """ + Deserialize multiple from bytes + """ + ops = [] + while packed: + op, packed = cls.unpack(packed) + ops.append(op) + return ops + + def __eq__(self, other: 'RevertableOp') -> bool: + return (self.is_put, self.key, self.value) == (other.is_put, other.key, other.value) + + def __repr__(self) -> str: + return f"{'PUT' if self.is_put else 'DELETE'} {DB_PREFIXES(self.key[:1]).name}: " \ + f"{self.key[1:].hex()} | {self.value.hex()}" + + +class RevertableDelete(RevertableOp): + def invert(self): + return RevertablePut(self.key, self.value) + + +class RevertablePut(RevertableOp): + is_put = 1 + + def invert(self): + return RevertableDelete(self.key, self.value) + + +def delete_prefix(db: 'plyvel.DB', prefix: bytes) -> List['RevertableDelete']: + return [RevertableDelete(k, v) for k, v in db.iterator(prefix=prefix)] diff --git a/lbry/wallet/server/hash.py b/lbry/wallet/server/hash.py index 2c0201952..e9d088684 100644 --- a/lbry/wallet/server/hash.py +++ b/lbry/wallet/server/hash.py @@ -36,6 +36,7 @@ _sha512 = hashlib.sha512 _new_hash = hashlib.new _new_hmac = hmac.new HASHX_LEN = 11 +CLAIM_HASH_LEN = 20 def sha256(x): diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index f509a14a7..e5d18fbbf 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -15,9 +15,9 @@ import ast import base64 import os import time -import zlib import typing -from typing import Optional, List, Tuple, Iterable +import struct +from typing import Optional, Iterable from functools import partial from asyncio import sleep from bisect import bisect_right, bisect_left @@ -27,14 +27,24 @@ from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor import attr from lbry.utils import LRUCacheWithMetrics +from lbry.schema.url import URL from lbry.wallet.server import util -from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN +from lbry.wallet.server.hash import hash_to_hex_str, CLAIM_HASH_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.storage import db_class - +from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix +from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db.prefixes import Prefixes +from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") + +TXO_STRUCT = struct.Struct(b'>LH') +TXO_STRUCT_unpack = TXO_STRUCT.unpack +TXO_STRUCT_pack = TXO_STRUCT.pack + + HISTORY_PREFIX = b'A' TX_PREFIX = b'B' BLOCK_HASH_PREFIX = b'C' @@ -58,11 +68,34 @@ class FlushData: headers = attr.ib() block_hashes = attr.ib() block_txs = attr.ib() + claimtrie_stash = attr.ib() # The following are flushed to the UTXO DB if undo_infos is not None undo_infos = attr.ib() adds = attr.ib() deletes = attr.ib() tip = attr.ib() + undo_claimtrie = attr.ib() + + +class ResolveResult(typing.NamedTuple): + name: str + claim_hash: bytes + tx_num: int + position: int + tx_hash: bytes + height: int + short_url: str + is_controlling: bool + canonical_url: str + creation_height: int + activation_height: int + expiration_height: int + effective_amount: int + support_amount: int + last_take_over_height: Optional[int] + claims_in_channel: Optional[int] + channel_hash: Optional[bytes] + reposted_claim_hash: Optional[bytes] class LevelDB: @@ -73,7 +106,7 @@ class LevelDB: """ DB_VERSIONS = [6] - HIST_DB_VERSIONS = [0] + HIST_DB_VERSIONS = [0, 6] class DBError(Exception): """Raised on general DB errors generally indicating corruption.""" @@ -113,6 +146,225 @@ class LevelDB: self.total_transactions = None self.transaction_num_mapping = {} + def claim_hash_and_name_from_txo(self, tx_num: int, tx_idx: int): + claim_hash_and_name = self.db.get( + DB_PREFIXES.txo_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx) + ) + if not claim_hash_and_name: + return + return claim_hash_and_name[:CLAIM_HASH_LEN], claim_hash_and_name[CLAIM_HASH_LEN:] + + def get_supported_claim_from_txo(self, tx_num, tx_idx: int): + supported_claim_hash = self.db.get( + DB_PREFIXES.support_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx) + ) + if supported_claim_hash: + packed_support_amount = self.db.get( + Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, tx_idx) + ) + if packed_support_amount is not None: + return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount + return None, None + + def get_support_amount(self, claim_hash: bytes): + total = 0 + for packed in self.db.iterator(prefix=DB_PREFIXES.claim_to_support.value + claim_hash, include_key=False): + total += Prefixes.claim_to_support.unpack_value(packed).amount + return total + + def get_supports(self, claim_hash: bytes): + supports = [] + for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_support.value + claim_hash): + unpacked_k = Prefixes.claim_to_support.unpack_key(k) + unpacked_v = Prefixes.claim_to_support.unpack_value(v) + supports.append((unpacked_k.tx_num, unpacked_k.position, unpacked_v.amount)) + + return supports + + def _prepare_resolve_result(self, tx_num: int, position: int, claim_hash: bytes, name: str, root_tx_num: int, + root_position: int) -> ResolveResult: + tx_hash = self.total_transactions[tx_num] + height = bisect_right(self.tx_counts, tx_num) + created_height = bisect_right(self.tx_counts, root_tx_num) + last_take_over_height = 0 + activation_height = created_height + expiration_height = 0 + + support_amount = self.get_support_amount(claim_hash) + effective_amount = self.get_effective_amount(claim_hash) + channel_hash = self.get_channel_for_claim(claim_hash) + + claims_in_channel = None + short_url = f'{name}#{claim_hash.hex()}' + canonical_url = short_url + if channel_hash: + channel_vals = self.get_root_claim_txo_and_current_amount(channel_hash) + if channel_vals: + _, _, _, channel_name, _, _ = channel_vals + claims_in_channel = self.get_claims_in_channel_count(channel_hash) + canonical_url = f'{channel_name}#{channel_hash.hex()}/{name}#{claim_hash.hex()}' + return ResolveResult( + name, claim_hash, tx_num, position, tx_hash, height, short_url=short_url, + is_controlling=False, canonical_url=canonical_url, last_take_over_height=last_take_over_height, + claims_in_channel=claims_in_channel, creation_height=created_height, activation_height=activation_height, + expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount, + channel_hash=channel_hash, reposted_claim_hash=None + ) + + def _resolve(self, normalized_name: str, claim_id: Optional[str] = None, + amount_order: int = 1) -> Optional[ResolveResult]: + """ + :param normalized_name: name + :param claim_id: partial or complete claim id + :param amount_order: '$' suffix to a url, defaults to 1 (winning) if no claim id modifier is provided + """ + + encoded_name = length_encoded_name(normalized_name) + amount_order = max(int(amount_order or 1), 1) + if claim_id: + # resolve by partial/complete claim id + short_claim_hash = bytes.fromhex(claim_id) + prefix = DB_PREFIXES.claim_short_id_prefix.value + encoded_name + short_claim_hash + for k, v in self.db.iterator(prefix=prefix): + key = Prefixes.claim_short_id.unpack_key(k) + claim_txo = Prefixes.claim_short_id.unpack_value(v) + return self._prepare_resolve_result(claim_txo.tx_num, claim_txo.position, key.claim_hash, key.name, + key.root_tx_num, key.root_position) + return + + # resolve by amount ordering, 1 indexed + for idx, (k, v) in enumerate(self.db.iterator(prefix=DB_PREFIXES.claim_effective_amount_prefix.value + encoded_name)): + if amount_order > idx + 1: + continue + key = Prefixes.claim_effective_amount.unpack_key(k) + claim_val = Prefixes.claim_effective_amount.unpack_value(v) + return self._prepare_resolve_result( + key.tx_num, key.position, claim_val.claim_hash, key.name, claim_val.root_tx_num, + claim_val.root_position + ) + return + + def _resolve_claim_in_channel(self, channel_hash: bytes, normalized_name: str): + prefix = DB_PREFIXES.channel_to_claim.value + channel_hash + length_encoded_name(normalized_name) + candidates = [] + for k, v in self.db.iterator(prefix=prefix): + key = Prefixes.channel_to_claim.unpack_key(k) + stream = Prefixes.channel_to_claim.unpack_value(v) + if not candidates or candidates[-1][-1] == key.effective_amount: + candidates.append((stream.claim_hash, key.tx_num, key.position, key.effective_amount)) + else: + break + if not candidates: + return + return list(sorted(candidates, key=lambda item: item[1]))[0] + + def _fs_resolve(self, url): + try: + parsed = URL.parse(url) + except ValueError as e: + return e, None + + stream = channel = resolved_channel = resolved_stream = None + if parsed.has_stream_in_channel: + channel = parsed.channel + stream = parsed.stream + elif parsed.has_channel: + channel = parsed.channel + elif parsed.has_stream: + stream = parsed.stream + if channel: + resolved_channel = self._resolve(channel.normalized, channel.claim_id, channel.amount_order) + if not resolved_channel: + return None, LookupError(f'Could not find channel in "{url}".') + if stream: + if resolved_channel: + stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized) + if stream_claim: + stream_claim_id, stream_tx_num, stream_tx_pos, effective_amount = stream_claim + resolved_stream = self._fs_get_claim_by_hash(stream_claim_id) + else: + resolved_stream = self._resolve(stream.normalized, stream.claim_id, stream.amount_order) + if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash: + resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash) + if not resolved_stream: + return LookupError(f'Could not find claim at "{url}".'), None + + return resolved_stream, resolved_channel + + async def fs_resolve(self, url): + return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_resolve, url) + + def _fs_get_claim_by_hash(self, claim_hash): + for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash): + unpacked_k = Prefixes.claim_to_txo.unpack_key(k) + unpacked_v = Prefixes.claim_to_txo.unpack_value(v) + return self._prepare_resolve_result( + unpacked_k.tx_num, unpacked_k.position, unpacked_k.claim_hash, unpacked_v.name, + unpacked_v.root_tx_num, unpacked_v.root_position + ) + + async def fs_getclaimbyid(self, claim_id): + return await asyncio.get_event_loop().run_in_executor( + self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id) + ) + + def claim_exists(self, claim_hash: bytes): + for _ in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_value=False): + return True + return False + + def get_root_claim_txo_and_current_amount(self, claim_hash): + for k, v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash): + unpacked_k = Prefixes.claim_to_txo.unpack_key(k) + unpacked_v = Prefixes.claim_to_txo.unpack_value(v) + return unpacked_v.root_tx_num, unpacked_v.root_position, unpacked_v.amount, unpacked_v.name,\ + unpacked_k.tx_num, unpacked_k.position + + def make_staged_claim_item(self, claim_hash: bytes) -> StagedClaimtrieItem: + root_tx_num, root_idx, value, name, tx_num, idx = self.db.get_root_claim_txo_and_current_amount( + claim_hash + ) + activation_height = 0 + effective_amount = self.db.get_support_amount(claim_hash) + value + signing_hash = self.get_channel_for_claim(claim_hash) + if signing_hash: + count = self.get_claims_in_channel_count(signing_hash) + else: + count = 0 + return StagedClaimtrieItem( + name, claim_hash, value, effective_amount, activation_height, tx_num, idx, root_tx_num, root_idx, + signing_hash, count + ) + + def get_effective_amount(self, claim_hash): + for v in self.db.iterator(prefix=DB_PREFIXES.claim_to_txo.value + claim_hash, include_key=False): + return Prefixes.claim_to_txo.unpack_value(v).amount + self.get_support_amount(claim_hash) + fnord + return None + + def get_update_effective_amount_ops(self, claim_hash: bytes, effective_amount: int): + claim_info = self.get_root_claim_txo_and_current_amount(claim_hash) + if not claim_info: + return [] + root_tx_num, root_position, amount, name, tx_num, position = claim_info + signing_hash = self.get_channel_for_claim(claim_hash) + claims_in_channel_count = None + if signing_hash: + claims_in_channel_count = self.get_claims_in_channel_count(signing_hash) + prev_effective_amount = self.get_effective_amount(claim_hash) + return get_update_effective_amount_ops( + name, effective_amount, prev_effective_amount, tx_num, position, + root_tx_num, root_position, claim_hash, signing_hash, claims_in_channel_count + ) + + def get_claims_in_channel_count(self, channel_hash) -> int: + for v in self.db.iterator(prefix=DB_PREFIXES.channel_to_claim.value + channel_hash, include_key=False): + return Prefixes.channel_to_claim.unpack_value(v).claims_in_channel + return 0 + + def get_channel_for_claim(self, claim_hash) -> Optional[bytes]: + return self.db.get(DB_PREFIXES.claim_to_channel.value + claim_hash) + # def add_unflushed(self, hashXs_by_tx, first_tx_num): # unflushed = self.history.unflushed # count = 0 @@ -220,8 +472,7 @@ class LevelDB: # < might happen at end of compaction as both DBs cannot be # updated atomically if self.hist_flush_count > self.utxo_flush_count: - self.logger.info('DB shut down uncleanly. Scanning for ' - 'excess history flushes...') + self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...') keys = [] for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): @@ -350,27 +601,6 @@ class LevelDB: add_count = len(flush_data.adds) spend_count = len(flush_data.deletes) // 2 - # Spends - batch_delete = batch.delete - for key in sorted(flush_data.deletes): - batch_delete(key) - flush_data.deletes.clear() - - # New UTXOs - batch_put = batch.put - for key, value in flush_data.adds.items(): - # suffix = tx_idx + tx_num - hashX = value[:-12] - suffix = key[-2:] + value[-12:-8] - batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX) - batch_put(UTXO_PREFIX + hashX + suffix, value[-8:]) - flush_data.adds.clear() - - # New undo information - for undo_info, height in flush_data.undo_infos: - batch_put(self.undo_key(height), b''.join(undo_info)) - flush_data.undo_infos.clear() - if self.db.for_sync: block_count = flush_data.height - self.db_height tx_count = flush_data.tx_count - self.db_tx_count @@ -394,11 +624,12 @@ class LevelDB: } # History entries are not prefixed; the suffix \0\0 ensures we # look similar to other entries and aren't interfered with - batch.put(HIST_STATE, repr(state).encode()) + batch.put(DB_PREFIXES.HIST_STATE.value, repr(state).encode()) def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining): """Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.""" + if flush_data.height == self.db_height: self.assert_flushed(flush_data) return @@ -419,41 +650,49 @@ class LevelDB: assert len(self.tx_counts) == flush_data.height + 1 assert len( b''.join(hashes for hashes, _ in flush_data.block_txs) - ) // 32 == flush_data.tx_count - prior_tx_count - + ) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}" # Write the headers start_time = time.perf_counter() with self.db.write_batch() as batch: - batch_put = batch.put + self.put = batch.put + batch_put = self.put + batch_delete = batch.delete height_start = self.fs_height + 1 tx_num = prior_tx_count - for i, (header, block_hash, (tx_hashes, txs)) in enumerate(zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)): - batch_put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + for i, (header, block_hash, (tx_hashes, txs)) in enumerate( + zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)): + batch_put(DB_PREFIXES.HEADER_PREFIX.value + util.pack_be_uint64(height_start), header) self.headers.append(header) tx_count = self.tx_counts[height_start] - batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) - batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) + batch_put(DB_PREFIXES.BLOCK_HASH_PREFIX.value + util.pack_be_uint64(height_start), block_hash[::-1]) + batch_put(DB_PREFIXES.TX_COUNT_PREFIX.value + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 offset = 0 while offset < len(tx_hashes): - batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32]) - batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num)) - batch_put(TX_PREFIX + tx_hashes[offset:offset + 32], txs[offset // 32]) - + batch_put(DB_PREFIXES.TX_HASH_PREFIX.value + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32]) + batch_put(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num)) + batch_put(DB_PREFIXES.TX_PREFIX.value + tx_hashes[offset:offset + 32], txs[offset // 32]) tx_num += 1 offset += 32 flush_data.headers.clear() flush_data.block_txs.clear() flush_data.block_hashes.clear() - # flush_data.claim_txo_cache.clear() - # flush_data.support_txo_cache.clear() + for staged_change in flush_data.claimtrie_stash: + # print("ADVANCE", staged_change) + if staged_change.is_put: + batch_put(staged_change.key, staged_change.value) + else: + batch_delete(staged_change.key) + flush_data.claimtrie_stash.clear() + for undo_claims, height in flush_data.undo_claimtrie: + batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_claims) + flush_data.undo_claimtrie.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count - # Then history self.hist_flush_count += 1 flush_id = pack_be_uint16(self.hist_flush_count) @@ -461,17 +700,51 @@ class LevelDB: for hashX in sorted(unflushed): key = hashX + flush_id - batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes()) + batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes()) self.write_history_state(batch) unflushed.clear() self.hist_unflushed_count = 0 - ######################### + # New undo information + for undo_info, height in flush_data.undo_infos: + batch_put(self.undo_key(height), b''.join(undo_info)) + flush_data.undo_infos.clear() + + # Spends + for key in sorted(flush_data.deletes): + batch_delete(key) + flush_data.deletes.clear() + + # New UTXOs + for key, value in flush_data.adds.items(): + # suffix = tx_idx + tx_num + hashX = value[:-12] + suffix = key[-2:] + value[-12:-8] + batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX) + batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:]) + flush_data.adds.clear() + # Flush state last as it reads the wall time. - self.flush_utxo_db(batch, flush_data) + start_time = time.time() + add_count = len(flush_data.adds) + spend_count = len(flush_data.deletes) // 2 + + if self.db.for_sync: + block_count = flush_data.height - self.db_height + tx_count = flush_data.tx_count - self.db_tx_count + elapsed = time.time() - start_time + self.logger.info(f'flushed {block_count:,d} blocks with ' + f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' + f'{spend_count:,d} spends in ' + f'{elapsed:.1f}s, committing...') + + self.utxo_flush_count = self.hist_flush_count + self.db_height = flush_data.height + self.db_tx_count = flush_data.tx_count + self.db_tip = flush_data.tip # self.flush_state(batch) # @@ -524,24 +797,43 @@ class LevelDB: start_time = time.time() tx_delta = flush_data.tx_count - self.last_flush_tx_count ### - while self.fs_height > flush_data.height: - self.fs_height -= 1 - self.headers.pop() self.fs_tx_count = flush_data.tx_count # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(flush_data.height + 1) - ### # Not certain this is needed, but it doesn't hurt self.hist_flush_count += 1 nremoves = 0 with self.db.write_batch() as batch: + batch_put = batch.put + batch_delete = batch.delete + + claim_reorg_height = self.fs_height + print("flush undos", flush_data.undo_claimtrie) + for (ops, height) in reversed(flush_data.undo_claimtrie): + claimtrie_ops = RevertableOp.unpack_stack(ops) + print("%i undo ops for %i" % (len(claimtrie_ops), height)) + for op in reversed(claimtrie_ops): + print("REWIND", op) + if op.is_put: + batch_put(op.key, op.value) + else: + batch_delete(op.key) + batch_delete(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(claim_reorg_height)) + claim_reorg_height -= 1 + + flush_data.undo_claimtrie.clear() + flush_data.claimtrie_stash.clear() + + while self.fs_height > flush_data.height: + self.fs_height -= 1 + self.headers.pop() tx_count = flush_data.tx_count for hashX in sorted(touched): deletes = [] puts = {} - for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True): + for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value + hashX, reverse=True): k = key[1:] a = array.array('I') a.frombytes(hist) @@ -554,18 +846,61 @@ class LevelDB: deletes.append(k) for key in deletes: - batch.delete(key) + batch_delete(key) for key, value in puts.items(): - batch.put(key, value) + batch_put(key, value) + + self.write_history_state(batch) + # New undo information + for undo_info, height in flush_data.undo_infos: + batch.put(self.undo_key(height), b''.join(undo_info)) + flush_data.undo_infos.clear() + + # Spends + for key in sorted(flush_data.deletes): + batch_delete(key) + flush_data.deletes.clear() + + # New UTXOs + for key, value in flush_data.adds.items(): + # suffix = tx_idx + tx_num + hashX = value[:-12] + suffix = key[-2:] + value[-12:-8] + batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX) + batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:]) + flush_data.adds.clear() + self.flush_utxo_db(batch, flush_data) + start_time = time.time() + add_count = len(flush_data.adds) + spend_count = len(flush_data.deletes) // 2 + + if self.db.for_sync: + block_count = flush_data.height - self.db_height + tx_count = flush_data.tx_count - self.db_tx_count + elapsed = time.time() - start_time + self.logger.info(f'flushed {block_count:,d} blocks with ' + f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' + f'{spend_count:,d} spends in ' + f'{elapsed:.1f}s, committing...') + + self.utxo_flush_count = self.hist_flush_count + self.db_height = flush_data.height + self.db_tx_count = flush_data.tx_count + self.db_tip = flush_data.tip + + + # Flush state last as it reads the wall time. now = time.time() self.wall_time += now - self.last_flush self.last_flush = now self.last_flush_tx_count = self.fs_tx_count self.write_utxo_state(batch) + + self.logger.info(f'backing up removed {nremoves:,d} history entries') elapsed = self.last_flush - start_time self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. ' @@ -636,14 +971,14 @@ class LevelDB: tx, merkle = cached_tx else: tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) + tx_num = tx_db_get(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hash_bytes) tx = None tx_height = -1 if tx_num is not None: tx_num = unpack_be_uint64(tx_num) tx_height = bisect_right(tx_counts, tx_num) if tx_height < self.db_height: - tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + tx = tx_db_get(DB_PREFIXES.TX_PREFIX.value + tx_hash_bytes) if tx_height == -1: merkle = { 'block_height': -1 @@ -691,7 +1026,7 @@ class LevelDB: cnt = 0 txs = [] - for hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): + for hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value + hashX, include_key=False): a = array.array('I') a.frombytes(hist) for tx_num in a: @@ -726,7 +1061,8 @@ class LevelDB: def read_undo_info(self, height): """Read undo information from a file for the current height.""" - return self.db.get(self.undo_key(height)) + undo_claims = self.db.get(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(self.fs_height)) + return self.db.get(self.undo_key(height)), undo_claims def raw_block_prefix(self): return 'block' @@ -759,7 +1095,7 @@ class LevelDB: """Clear excess undo info. Only most recent N are kept.""" min_height = self.min_undo_height(self.db_height) keys = [] - for key, hist in self.db.iterator(prefix=UNDO_PREFIX): + for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value): height, = unpack('>I', key[-4:]) if height >= min_height: break @@ -847,7 +1183,7 @@ class LevelDB: 'first_sync': self.first_sync, 'db_version': self.db_version, } - batch.put(UTXO_STATE, repr(state).encode()) + batch.put(DB_PREFIXES.UTXO_STATE.value, repr(state).encode()) def set_flush_count(self, count): self.utxo_flush_count = count @@ -863,7 +1199,7 @@ class LevelDB: fs_tx_hash = self.fs_tx_hash # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer - prefix = UTXO_PREFIX + hashX + prefix = DB_PREFIXES.UTXO_PREFIX.value + hashX for db_key, db_value in self.db.iterator(prefix=prefix): tx_pos, tx_num = s_unpack('