This commit is contained in:
Jack Robison 2021-06-04 16:50:37 -04:00 committed by Victor Shyba
parent 6ea96e79bd
commit 49f4add8d1
4 changed files with 107 additions and 49 deletions

View file

@ -14,7 +14,7 @@ from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
from lbry.wallet.constants import TXO_TYPES
from lbry.wallet.server.db.common import STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.transaction import OutputScript, Output
from lbry.wallet.transaction import OutputScript, Output, Transaction
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
@ -252,7 +252,10 @@ class BlockProcessor:
self.removed_claims_to_send_es = set()
self.touched_claims_to_send_es = set()
self.pending_reposted_count = set()
self.pending_reposted = set()
self.pending_channel_counts = defaultdict(lambda: 0)
self.pending_channels = {}
def claim_producer(self):
def get_claim_txo(tx_hash, nout):
@ -265,7 +268,7 @@ class BlockProcessor:
script.parse()
return Claim.from_bytes(script.values['claim'])
except:
self.logger.exception(
self.logger.error(
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
raw.hex()
)
@ -275,7 +278,8 @@ class BlockProcessor:
return
to_send_es = set(self.touched_claims_to_send_es)
to_send_es.update(self.pending_reposted_count.difference(self.removed_claims_to_send_es))
to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es))
to_send_es.update({k for k, v in self.pending_channel_counts.items() if v != 0}.difference(self.removed_claims_to_send_es))
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
@ -312,7 +316,7 @@ class BlockProcessor:
reposted_script = OutputScript(reposted_claim_txo.pk_script)
reposted_script.parse()
except:
self.logger.exception(
self.logger.error(
"repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
raw_reposted_claim_tx.hex()
)
@ -320,7 +324,7 @@ class BlockProcessor:
try:
reposted_metadata = Claim.from_bytes(reposted_script.values['claim'])
except:
self.logger.exception(
self.logger.error(
"reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
raw_reposted_claim_tx.hex()
)
@ -373,9 +377,10 @@ class BlockProcessor:
'has_source': None if not metadata.is_stream else metadata.stream.has_source,
'stream_type': None if not metadata.is_stream else STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)],
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int(max(metadata.stream.fee.amount or 0, 0)*1000),
'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int(
max(metadata.stream.fee.amount or 0, 0)*1000
),
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
# 'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration),
'reposted': self.db.get_reposted_count(claim_hash),
'reposted_claim_hash': reposted_claim_hash,
@ -389,14 +394,12 @@ class BlockProcessor:
self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
),
'signature': metadata.signature,
'signature_digest': None, # TODO: fix
'signature_valid': False, # TODO: fix
'claims_in_channel': 0, # TODO: fix
'signature_digest': None, # TODO: fix
'signature_valid': claim.channel_hash is not None, # TODO: fix
'tags': tags,
'languages': languages,
'censor_type': 0, # TODO: fix
'censoring_channel_hash': None, # TODO: fix
'censor_type': 0, # TODO: fix
'censoring_channel_hash': None, # TODO: fix
# 'trending_group': 0,
# 'trending_mixed': 0,
# 'trending_local': 0,
@ -406,8 +409,9 @@ class BlockProcessor:
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
if metadata.is_stream and metadata.stream.release_time:
value['release_time'] = metadata.stream.release_time
yield ('update', value)
if metadata.is_channel:
value['claims_in_channel'] = self.db.get_claims_in_channel_count(claim_hash)
yield 'update', value
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
@ -443,7 +447,8 @@ class BlockProcessor:
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
self.pending_reposted_count.clear()
self.pending_reposted.clear()
self.pending_channel_counts.clear()
print("******************\n")
except:
self.logger.exception("advance blocks failed")
@ -601,20 +606,59 @@ class BlockProcessor:
else:
claim_hash = txo.claim_hash[::-1]
print(f"\tupdate lbry://{claim_name}#{claim_hash.hex()} ({tx_num} {txo.amount})")
signing_channel_hash = None
channel_signature_is_valid = False
try:
signable = txo.signable
is_repost = txo.claim.is_repost
is_channel = txo.claim.is_channel
if txo.claim.is_signed:
signing_channel_hash = txo.signable.signing_channel_hash[::-1]
except: # google.protobuf.message.DecodeError: Could not parse JSON.
signable = None
is_repost = False
is_channel = False
ops = []
signing_channel_hash = None
reposted_claim_hash = None
if txo.claim.is_repost:
reposted_claim_hash = txo.claim.repost.reference.claim_hash[::-1]
self.pending_reposted_count.add(reposted_claim_hash)
if is_repost:
reposted_claim_hash = txo.claim.repost.reference.claim_hash[::-1]
self.pending_reposted.add(reposted_claim_hash)
if is_channel:
self.pending_channels[claim_hash] = txo.claim.channel.public_key_bytes
raw_channel_tx = None
if signable and signable.signing_channel_hash:
signing_channel_hash = txo.signable.signing_channel_hash[::-1]
signing_channel = self.db.get_claim_txo(signing_channel_hash)
if signing_channel:
raw_channel_tx = self.db.db.get(
DB_PREFIXES.TX_PREFIX.value + self.db.total_transactions[signing_channel[0].tx_num]
)
channel_pub_key_bytes = None
try:
if not signing_channel:
if txo.signable.signing_channel_hash[::-1] in self.pending_channels:
channel_pub_key_bytes = self.pending_channels[txo.signable.signing_channel_hash[::-1]]
elif raw_channel_tx:
chan_output = self.coin.transaction(raw_channel_tx).outputs[signing_channel[0].position]
chan_script = OutputScript(chan_output.pk_script)
chan_script.parse()
channel_meta = Claim.from_bytes(chan_script.values['claim'])
channel_pub_key_bytes = channel_meta.channel.public_key_bytes
if channel_pub_key_bytes:
channel_signature_is_valid = Output.is_signature_valid(
txo.get_encoded_signature(), txo.get_signature_digest(self.ledger), channel_pub_key_bytes
)
if channel_signature_is_valid:
self.pending_channel_counts[signing_channel_hash] += 1
except:
self.logger.exception(f"error validating channel signature for %s:%i", tx_hash[::-1].hex(), nout)
if txo.script.is_claim_name: # it's a root claim
root_tx_num, root_idx = tx_num, nout
else: # it's a claim update
@ -639,7 +683,7 @@ class BlockProcessor:
)
pending = StagedClaimtrieItem(
claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num,
root_idx, signing_channel_hash, reposted_claim_hash
root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
)
self.pending_claims[(tx_num, nout)] = pending
self.pending_claim_txos[claim_hash] = (tx_num, nout)
@ -707,10 +751,13 @@ class BlockProcessor:
spent = StagedClaimtrieItem(
v.name, claim_hash, v.amount,
self.coin.get_expiration_height(bisect_right(self.db.tx_counts, txin_num)),
txin_num, txin.prev_idx, v.root_tx_num, v.root_position, signing_hash, reposted_claim_hash
txin_num, txin.prev_idx, v.root_tx_num, v.root_position, v.channel_signature_is_valid, signing_hash,
reposted_claim_hash
)
if spent.reposted_claim_hash:
self.pending_reposted_count.add(spent.reposted_claim_hash)
self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid:
self.pending_channel_counts[spent.signing_hash] -= 1
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name)
print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
return spent.get_spend_claim_txo_ops()
@ -729,18 +776,22 @@ class BlockProcessor:
prev_amount, prev_signing_hash = pending.amount, pending.signing_hash
reposted_claim_hash = pending.reposted_claim_hash
expiration = self.coin.get_expiration_height(self.height)
signature_is_valid = pending.channel_signature_is_valid
else:
k, v = self.db.get_claim_txo(
claim_hash
)
claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount
signature_is_valid = v.channel_signature_is_valid
prev_signing_hash = self.db.get_channel_for_claim(claim_hash)
reposted_claim_hash = self.db.get_repost(claim_hash)
expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num))
self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem(
name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num,
claim_root_idx, prev_signing_hash, reposted_claim_hash
claim_root_idx, signature_is_valid, prev_signing_hash, reposted_claim_hash
)
if prev_signing_hash and prev_signing_hash in self.pending_channel_counts:
self.pending_channel_counts.pop(prev_signing_hash)
self.pending_supports[claim_hash].clear()
self.pending_supports.pop(claim_hash)
@ -1206,6 +1257,8 @@ class BlockProcessor:
append_hashX = hashXs.append
tx_numb = pack('<I', tx_count)
txos = Transaction(tx.raw).outputs
# Spend the inputs
for txin in tx.inputs:
if txin.is_generation():
@ -1228,11 +1281,8 @@ class BlockProcessor:
put_utxo(tx_hash + pack('<H', nout), hashX + tx_numb + pack('<Q', txout.value))
# add claim/support txo
script = OutputScript(txout.pk_script)
script.parse()
claim_or_support_ops = self._add_claim_or_support(
height, tx_hash, tx_count, nout, Output(txout.value, script), spent_claims
height, tx_hash, tx_count, nout, txos[nout], spent_claims
)
if claim_or_support_ops:
claimtrie_stash_extend(claim_or_support_ops)
@ -1298,6 +1348,7 @@ class BlockProcessor:
self.possible_future_activated_claim.clear()
self.possible_future_activated_support.clear()
self.possible_future_support_txos.clear()
self.pending_channels.clear()
# for cache in self.search_cache.values():
# cache.clear()

View file

@ -136,6 +136,7 @@ class StagedClaimtrieItem(typing.NamedTuple):
position: int
root_claim_tx_num: int
root_claim_tx_position: int
channel_signature_is_valid: bool
signing_hash: Optional[bytes]
reposted_claim_hash: Optional[bytes]
@ -156,7 +157,7 @@ class StagedClaimtrieItem(typing.NamedTuple):
op(
*Prefixes.claim_to_txo.pack_item(
self.claim_hash, self.tx_num, self.position, self.root_claim_tx_num, self.root_claim_tx_position,
self.amount, self.name
self.amount, self.channel_signature_is_valid, self.name
)
),
# claim hash by txo
@ -180,18 +181,21 @@ class StagedClaimtrieItem(typing.NamedTuple):
]
if self.signing_hash:
ops.extend([
ops.append(
# channel by stream
op(
*Prefixes.claim_to_channel.pack_item(self.claim_hash, self.signing_hash)
),
# stream by channel
op(
*Prefixes.channel_to_claim.pack_item(
self.signing_hash, self.name, self.tx_num, self.position, self.claim_hash
)
)
if self.channel_signature_is_valid:
ops.append(
# stream by channel
op(
*Prefixes.channel_to_claim.pack_item(
self.signing_hash, self.name, self.tx_num, self.position, self.claim_hash
)
)
)
])
if self.reposted_claim_hash:
ops.extend([
op(

View file

@ -55,6 +55,7 @@ class ClaimToTXOValue(typing.NamedTuple):
root_position: int
amount: int
# activation: int
channel_signature_is_valid: bool
name: str
@ -248,7 +249,7 @@ class ActiveAmountPrefixRow(PrefixRow):
class ClaimToTXOPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_to_txo.value
key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>LHQ')
value_struct = struct.Struct(b'>LHQB')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack,
@ -272,20 +273,23 @@ class ClaimToTXOPrefixRow(PrefixRow):
@classmethod
def unpack_value(cls, data: bytes) -> ClaimToTXOValue:
root_tx_num, root_position, amount = cls.value_struct.unpack(data[:14])
name_len = int.from_bytes(data[14:16], byteorder='big')
name = data[16:16 + name_len].decode()
return ClaimToTXOValue(root_tx_num, root_position, amount, name)
root_tx_num, root_position, amount, channel_signature_is_valid = cls.value_struct.unpack(data[:15])
name_len = int.from_bytes(data[15:17], byteorder='big')
name = data[17:17 + name_len].decode()
return ClaimToTXOValue(root_tx_num, root_position, amount, bool(channel_signature_is_valid), name)
@classmethod
def pack_value(cls, root_tx_num: int, root_position: int, amount: int, name: str) -> bytes:
return cls.value_struct.pack(root_tx_num, root_position, amount) + length_encoded_name(name)
def pack_value(cls, root_tx_num: int, root_position: int, amount: int,
channel_signature_is_valid: bool, name: str) -> bytes:
return cls.value_struct.pack(
root_tx_num, root_position, amount, int(channel_signature_is_valid)
) + length_encoded_name(name)
@classmethod
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int,
amount: int, name: str):
amount: int, channel_signature_is_valid: bool, name: str):
return cls.pack_key(claim_hash, tx_num, position), \
cls.pack_value(root_tx_num, root_position, amount, name)
cls.pack_value(root_tx_num, root_position, amount, channel_signature_is_valid, name)
class TXOToClaimPrefixRow(PrefixRow):

View file

@ -220,14 +220,13 @@ class LevelDB:
channel_hash = self.get_channel_for_claim(claim_hash)
reposted_claim_hash = self.get_repost(claim_hash)
claims_in_channel = None
short_url = f'{name}#{claim_hash.hex()}'
canonical_url = short_url
claims_in_channel = self.get_claims_in_channel_count(claim_hash)
if channel_hash:
channel_vals = self.get_claim_txo(channel_hash)
if channel_vals:
channel_name = channel_vals[1].name
claims_in_channel = self.get_claims_in_channel_count(channel_hash)
canonical_url = f'{channel_name}#{channel_hash.hex()}/{name}#{claim_hash.hex()}'
return ResolveResult(
name, claim_hash, tx_num, position, tx_hash, height, claim_amount, short_url=short_url,