wallet server and resolve working, functional test

This commit is contained in:
Victor Shyba 2018-12-21 16:18:44 -03:00 committed by Lex Berezhny
parent 6339224980
commit 70471eebfa
4 changed files with 41 additions and 12 deletions

View file

@ -186,7 +186,7 @@ class Resolver:
claim_result['has_signature'] = True claim_result['has_signature'] = True
claim_result['signature_is_valid'] = False claim_result['signature_is_valid'] = False
validated, channel_name = validate_claim_signature_and_get_channel_name( validated, channel_name = validate_claim_signature_and_get_channel_name(
decoded, certificate, claim_result['address']) decoded, certificate, claim_result['address'], claim_result['name'])
claim_result['channel_name'] = channel_name claim_result['channel_name'] = channel_name
if validated: if validated:
claim_result['signature_is_valid'] = True claim_result['signature_is_valid'] = True
@ -197,7 +197,7 @@ class Resolver:
if 'amount' in claim_result: if 'amount' in claim_result:
claim_result = format_amount_value(claim_result) claim_result = format_amount_value(claim_result)
claim_result['permanent_url'] = _get_permanent_url(claim_result) claim_result['permanent_url'] = _get_permanent_url(claim_result, decoded.certificate_id)
return claim_result return claim_result
@ -307,11 +307,11 @@ def format_amount_value(obj):
return obj return obj
def _get_permanent_url(claim_result): def _get_permanent_url(claim_result, certificate_id):
if claim_result.get('has_signature') and claim_result.get('channel_name'): if claim_result.get('has_signature') and claim_result.get('channel_name'):
return "{}#{}/{}".format( return "{}#{}/{}".format(
claim_result['channel_name'], claim_result['channel_name'],
claim_result['value']['publisherSignature']['certificateId'], certificate_id,
claim_result['name'] claim_result['name']
) )
else: else:
@ -386,7 +386,7 @@ def _verify_proof(name, claim_trie_root, result, height, depth, transaction_clas
def validate_claim_signature_and_get_channel_name(claim, certificate_claim, def validate_claim_signature_and_get_channel_name(claim, certificate_claim,
claim_address, decoded_certificate=None): claim_address, name, decoded_certificate=None):
if not certificate_claim: if not certificate_claim:
return False, None return False, None
if 'value' not in certificate_claim: if 'value' not in certificate_claim:
@ -395,18 +395,18 @@ def validate_claim_signature_and_get_channel_name(claim, certificate_claim,
certificate = decoded_certificate or smart_decode(certificate_claim['value']) certificate = decoded_certificate or smart_decode(certificate_claim['value'])
if not isinstance(certificate, ClaimDict): if not isinstance(certificate, ClaimDict):
raise TypeError("Certificate is not a ClaimDict: %s" % str(type(certificate))) raise TypeError("Certificate is not a ClaimDict: %s" % str(type(certificate)))
if _validate_signed_claim(claim, claim_address, certificate): if _validate_signed_claim(claim, claim_address, name, certificate):
return True, certificate_claim['name'] return True, certificate_claim['name']
return False, None return False, None
def _validate_signed_claim(claim, claim_address, certificate): def _validate_signed_claim(claim, claim_address, name, certificate):
if not claim.has_signature: if not claim.has_signature:
raise Exception("Claim is not signed") raise Exception("Claim is not signed")
if not is_address(claim_address): if not is_address(claim_address):
raise Exception("Not given a valid claim address") raise Exception("Not given a valid claim address")
try: try:
if claim.validate_signature(claim_address, certificate.protobuf): if claim.validate_signature(claim_address, certificate.protobuf, name):
return True return True
except BadSignatureError: except BadSignatureError:
# print_msg("Signature for %s is invalid" % claim_id) # print_msg("Signature for %s is invalid" % claim_id)

View file

@ -1,11 +1,11 @@
import hashlib import hashlib
import struct import struct
from binascii import unhexlify
import msgpack import msgpack
from torba.server.hash import hash_to_hex_str from torba.server.hash import hash_to_hex_str
from torba.server.block_processor import BlockProcessor from torba.server.block_processor import BlockProcessor
from lbrynet.schema.proto.claim_pb2 import Claim
from lbrynet.schema.uri import parse_lbry_uri from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.schema.decode import smart_decode from lbrynet.schema.decode import smart_decode
@ -150,14 +150,14 @@ class LBRYBlockProcessor(BlockProcessor):
def _checksig(self, name, value, address): def _checksig(self, name, value, address):
try: try:
parse_lbry_uri(name.decode()) # skip invalid names parse_lbry_uri(name.decode()) # skip invalid names
cert_id = Claim.FromString(value).publisherSignature.certificateId[::-1] or None claim_dict = smart_decode(value)
cert_id = unhexlify(claim_dict.certificate_id)[::-1]
if not self.should_validate_signatures: if not self.should_validate_signatures:
return cert_id return cert_id
if cert_id: if cert_id:
cert_claim = self.db.get_claim_info(cert_id) cert_claim = self.db.get_claim_info(cert_id)
if cert_claim: if cert_claim:
certificate = smart_decode(cert_claim.value) certificate = smart_decode(cert_claim.value)
claim_dict = smart_decode(value)
claim_dict.validate_signature(address, certificate) claim_dict.validate_signature(address, certificate)
return cert_id return cert_id
except Exception as e: except Exception as e:

View file

@ -55,7 +55,7 @@ class ClaimDict(OrderedDict):
claim = self.protobuf claim = self.protobuf
if claim.HasField("publisherSignature"): if claim.HasField("publisherSignature"):
return True return True
return False return self.detached_signature and self.detached_signature.certificate_id
@property @property
def is_certificate(self): def is_certificate(self):

View file

@ -1,6 +1,7 @@
import logging import logging
import asyncio import asyncio
from lbrynet.schema.schema import SECP256k1
from torba.testcase import IntegrationTestCase from torba.testcase import IntegrationTestCase
from lbrynet.schema.claim import ClaimDict from lbrynet.schema.claim import ClaimDict
from lbrynet.extras.wallet.transaction import Transaction from lbrynet.extras.wallet.transaction import Transaction
@ -95,3 +96,31 @@ class BasicTransactionTest(IntegrationTestCase):
response = await self.ledger.resolve(0, 10, 'lbry://404', 'lbry://@404') response = await self.ledger.resolve(0, 10, 'lbry://404', 'lbry://@404')
self.assertEqual('URI lbry://404 cannot be resolved', response['lbry://404']['error']) self.assertEqual('URI lbry://404 cannot be resolved', response['lbry://404']['error'])
self.assertEqual('URI lbry://@404 cannot be resolved', response['lbry://@404']['error']) self.assertEqual('URI lbry://@404 cannot be resolved', response['lbry://@404']['error'])
async def test_new_signature_model(self):
address1, address2 = await self.account.receiving.get_addresses(limit=2, only_usable=True)
sendtxid1 = await self.blockchain.send_to_address(address1, 5)
sendtxid2 = await self.blockchain.send_to_address(address2, 5)
await self.blockchain.generate(1)
await asyncio.wait([
self.on_transaction_id(sendtxid1),
self.on_transaction_id(sendtxid2)
])
self.assertEqual(d2l(await self.account.get_balance()), '10.0')
cert, key = generate_certificate()
cert_tx = await Transaction.claim('@bar', cert, l2d('1.0'), address1, [self.account], self.account)
claim = ClaimDict.load_dict(example_claim_dict)
claim = claim.sign(key, address1, cert_tx.outputs[0].claim_id, name='foo', curve=SECP256k1)
claim_tx = await Transaction.claim('foo', claim, l2d('1.0'), address1, [self.account], self.account)
await self.broadcast(cert_tx)
await self.broadcast(claim_tx)
await self.ledger.wait(claim_tx)
await self.blockchain.generate(1)
await self.ledger.wait(claim_tx)
response = await self.ledger.resolve(0, 10, 'lbry://@bar/foo')
self.assertIn('lbry://@bar/foo', response)
self.assertIn('claim', response['lbry://@bar/foo'])