diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 6c9234f8f..199e1d12f 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -14,7 +14,7 @@ from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger from lbry.wallet.constants import TXO_TYPES from lbry.wallet.server.db.common import STREAM_TYPES, CLAIM_TYPES -from lbry.wallet.transaction import OutputScript, Output +from lbry.wallet.transaction import OutputScript, Output, Transaction from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN @@ -252,7 +252,10 @@ class BlockProcessor: self.removed_claims_to_send_es = set() self.touched_claims_to_send_es = set() - self.pending_reposted_count = set() + self.pending_reposted = set() + self.pending_channel_counts = defaultdict(lambda: 0) + + self.pending_channels = {} def claim_producer(self): def get_claim_txo(tx_hash, nout): @@ -265,7 +268,7 @@ class BlockProcessor: script.parse() return Claim.from_bytes(script.values['claim']) except: - self.logger.exception( + self.logger.error( "tx parsing for ES went boom %s %s", tx_hash[::-1].hex(), raw.hex() ) @@ -275,7 +278,8 @@ class BlockProcessor: return to_send_es = set(self.touched_claims_to_send_es) - to_send_es.update(self.pending_reposted_count.difference(self.removed_claims_to_send_es)) + to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es)) + to_send_es.update({k for k, v in self.pending_channel_counts.items() if v != 0}.difference(self.removed_claims_to_send_es)) for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() @@ -312,7 +316,7 @@ class BlockProcessor: reposted_script = OutputScript(reposted_claim_txo.pk_script) reposted_script.parse() except: - self.logger.exception( + self.logger.error( "repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), raw_reposted_claim_tx.hex() ) @@ -320,7 +324,7 @@ class BlockProcessor: try: reposted_metadata = Claim.from_bytes(reposted_script.values['claim']) except: - self.logger.exception( + self.logger.error( "reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(), raw_reposted_claim_tx.hex() ) @@ -373,9 +377,10 @@ class BlockProcessor: 'has_source': None if not metadata.is_stream else metadata.stream.has_source, 'stream_type': None if not metadata.is_stream else STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)], 'media_type': None if not metadata.is_stream else metadata.stream.source.media_type, - 'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int(max(metadata.stream.fee.amount or 0, 0)*1000), + 'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int( + max(metadata.stream.fee.amount or 0, 0)*1000 + ), 'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency, - # 'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration), 'reposted': self.db.get_reposted_count(claim_hash), 'reposted_claim_hash': reposted_claim_hash, @@ -389,14 +394,12 @@ class BlockProcessor: self.ledger.public_key_to_address(metadata.channel.public_key_bytes) ), 'signature': metadata.signature, - 'signature_digest': None, # TODO: fix - 'signature_valid': False, # TODO: fix - 'claims_in_channel': 0, # TODO: fix - + 'signature_digest': None, # TODO: fix + 'signature_valid': claim.channel_hash is not None, # TODO: fix 'tags': tags, 'languages': languages, - 'censor_type': 0, # TODO: fix - 'censoring_channel_hash': None, # TODO: fix + 'censor_type': 0, # TODO: fix + 'censoring_channel_hash': None, # TODO: fix # 'trending_group': 0, # 'trending_mixed': 0, # 'trending_local': 0, @@ -406,8 +409,9 @@ class BlockProcessor: value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration if metadata.is_stream and metadata.stream.release_time: value['release_time'] = metadata.stream.release_time - - yield ('update', value) + if metadata.is_channel: + value['claims_in_channel'] = self.db.get_claims_in_channel_count(claim_hash) + yield 'update', value async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -443,7 +447,8 @@ class BlockProcessor: self.db.search_index.clear_caches() self.touched_claims_to_send_es.clear() self.removed_claims_to_send_es.clear() - self.pending_reposted_count.clear() + self.pending_reposted.clear() + self.pending_channel_counts.clear() print("******************\n") except: self.logger.exception("advance blocks failed") @@ -601,20 +606,59 @@ class BlockProcessor: else: claim_hash = txo.claim_hash[::-1] print(f"\tupdate lbry://{claim_name}#{claim_hash.hex()} ({tx_num} {txo.amount})") + + signing_channel_hash = None + channel_signature_is_valid = False try: signable = txo.signable + is_repost = txo.claim.is_repost + is_channel = txo.claim.is_channel + if txo.claim.is_signed: + signing_channel_hash = txo.signable.signing_channel_hash[::-1] except: # google.protobuf.message.DecodeError: Could not parse JSON. signable = None + is_repost = False + is_channel = False ops = [] - signing_channel_hash = None reposted_claim_hash = None - if txo.claim.is_repost: - reposted_claim_hash = txo.claim.repost.reference.claim_hash[::-1] - self.pending_reposted_count.add(reposted_claim_hash) + if is_repost: + reposted_claim_hash = txo.claim.repost.reference.claim_hash[::-1] + self.pending_reposted.add(reposted_claim_hash) + + if is_channel: + self.pending_channels[claim_hash] = txo.claim.channel.public_key_bytes + + raw_channel_tx = None if signable and signable.signing_channel_hash: - signing_channel_hash = txo.signable.signing_channel_hash[::-1] + signing_channel = self.db.get_claim_txo(signing_channel_hash) + if signing_channel: + raw_channel_tx = self.db.db.get( + DB_PREFIXES.TX_PREFIX.value + self.db.total_transactions[signing_channel[0].tx_num] + ) + channel_pub_key_bytes = None + try: + if not signing_channel: + if txo.signable.signing_channel_hash[::-1] in self.pending_channels: + channel_pub_key_bytes = self.pending_channels[txo.signable.signing_channel_hash[::-1]] + elif raw_channel_tx: + chan_output = self.coin.transaction(raw_channel_tx).outputs[signing_channel[0].position] + + chan_script = OutputScript(chan_output.pk_script) + chan_script.parse() + channel_meta = Claim.from_bytes(chan_script.values['claim']) + + channel_pub_key_bytes = channel_meta.channel.public_key_bytes + if channel_pub_key_bytes: + channel_signature_is_valid = Output.is_signature_valid( + txo.get_encoded_signature(), txo.get_signature_digest(self.ledger), channel_pub_key_bytes + ) + if channel_signature_is_valid: + self.pending_channel_counts[signing_channel_hash] += 1 + except: + self.logger.exception(f"error validating channel signature for %s:%i", tx_hash[::-1].hex(), nout) + if txo.script.is_claim_name: # it's a root claim root_tx_num, root_idx = tx_num, nout else: # it's a claim update @@ -639,7 +683,7 @@ class BlockProcessor: ) pending = StagedClaimtrieItem( claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num, - root_idx, signing_channel_hash, reposted_claim_hash + root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash ) self.pending_claims[(tx_num, nout)] = pending self.pending_claim_txos[claim_hash] = (tx_num, nout) @@ -707,10 +751,13 @@ class BlockProcessor: spent = StagedClaimtrieItem( v.name, claim_hash, v.amount, self.coin.get_expiration_height(bisect_right(self.db.tx_counts, txin_num)), - txin_num, txin.prev_idx, v.root_tx_num, v.root_position, signing_hash, reposted_claim_hash + txin_num, txin.prev_idx, v.root_tx_num, v.root_position, v.channel_signature_is_valid, signing_hash, + reposted_claim_hash ) if spent.reposted_claim_hash: - self.pending_reposted_count.add(spent.reposted_claim_hash) + self.pending_reposted.add(spent.reposted_claim_hash) + if spent.signing_hash and spent.channel_signature_is_valid: + self.pending_channel_counts[spent.signing_hash] -= 1 spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name) print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}") return spent.get_spend_claim_txo_ops() @@ -729,18 +776,22 @@ class BlockProcessor: prev_amount, prev_signing_hash = pending.amount, pending.signing_hash reposted_claim_hash = pending.reposted_claim_hash expiration = self.coin.get_expiration_height(self.height) + signature_is_valid = pending.channel_signature_is_valid else: k, v = self.db.get_claim_txo( claim_hash ) claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount + signature_is_valid = v.channel_signature_is_valid prev_signing_hash = self.db.get_channel_for_claim(claim_hash) reposted_claim_hash = self.db.get_repost(claim_hash) expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num)) self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem( name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num, - claim_root_idx, prev_signing_hash, reposted_claim_hash + claim_root_idx, signature_is_valid, prev_signing_hash, reposted_claim_hash ) + if prev_signing_hash and prev_signing_hash in self.pending_channel_counts: + self.pending_channel_counts.pop(prev_signing_hash) self.pending_supports[claim_hash].clear() self.pending_supports.pop(claim_hash) @@ -1206,6 +1257,8 @@ class BlockProcessor: append_hashX = hashXs.append tx_numb = pack('20sLH') - value_struct = struct.Struct(b'>LHQ') + value_struct = struct.Struct(b'>LHQB') key_part_lambdas = [ lambda: b'', struct.Struct(b'>20s').pack, @@ -272,20 +273,23 @@ class ClaimToTXOPrefixRow(PrefixRow): @classmethod def unpack_value(cls, data: bytes) -> ClaimToTXOValue: - root_tx_num, root_position, amount = cls.value_struct.unpack(data[:14]) - name_len = int.from_bytes(data[14:16], byteorder='big') - name = data[16:16 + name_len].decode() - return ClaimToTXOValue(root_tx_num, root_position, amount, name) + root_tx_num, root_position, amount, channel_signature_is_valid = cls.value_struct.unpack(data[:15]) + name_len = int.from_bytes(data[15:17], byteorder='big') + name = data[17:17 + name_len].decode() + return ClaimToTXOValue(root_tx_num, root_position, amount, bool(channel_signature_is_valid), name) @classmethod - def pack_value(cls, root_tx_num: int, root_position: int, amount: int, name: str) -> bytes: - return cls.value_struct.pack(root_tx_num, root_position, amount) + length_encoded_name(name) + def pack_value(cls, root_tx_num: int, root_position: int, amount: int, + channel_signature_is_valid: bool, name: str) -> bytes: + return cls.value_struct.pack( + root_tx_num, root_position, amount, int(channel_signature_is_valid) + ) + length_encoded_name(name) @classmethod def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int, - amount: int, name: str): + amount: int, channel_signature_is_valid: bool, name: str): return cls.pack_key(claim_hash, tx_num, position), \ - cls.pack_value(root_tx_num, root_position, amount, name) + cls.pack_value(root_tx_num, root_position, amount, channel_signature_is_valid, name) class TXOToClaimPrefixRow(PrefixRow): diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index d38c93bbb..19907719d 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -220,14 +220,13 @@ class LevelDB: channel_hash = self.get_channel_for_claim(claim_hash) reposted_claim_hash = self.get_repost(claim_hash) - claims_in_channel = None short_url = f'{name}#{claim_hash.hex()}' canonical_url = short_url + claims_in_channel = self.get_claims_in_channel_count(claim_hash) if channel_hash: channel_vals = self.get_claim_txo(channel_hash) if channel_vals: channel_name = channel_vals[1].name - claims_in_channel = self.get_claims_in_channel_count(channel_hash) canonical_url = f'{channel_name}#{channel_hash.hex()}/{name}#{claim_hash.hex()}' return ResolveResult( name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url,