move exception handling into is_signed_by
This commit is contained in:
parent
6946860521
commit
9accbfcf8b
6 changed files with 31 additions and 27 deletions
|
@ -4,8 +4,6 @@ from binascii import hexlify
|
|||
from datetime import datetime
|
||||
from json import JSONEncoder
|
||||
|
||||
from ecdsa import BadSignatureError
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from google.protobuf.message import DecodeError
|
||||
|
||||
from lbrynet.schema.claim import Claim
|
||||
|
@ -192,15 +190,8 @@ class JSONResponseEncoder(JSONEncoder):
|
|||
}
|
||||
if check_signature and txo.claim.is_signed:
|
||||
output['is_channel_signature_valid'] = False
|
||||
try:
|
||||
if txo.channel:
|
||||
output['is_channel_signature_valid'] = txo.is_signed_by(txo.channel, self.ledger)
|
||||
except (BadSignatureError, InvalidSignature):
|
||||
pass
|
||||
except ValueError:
|
||||
log.exception(
|
||||
'txo.id: %s, txo.channel.id:%s, output: %s',
|
||||
txo.id, txo.channel.id, output
|
||||
)
|
||||
except DecodeError:
|
||||
pass
|
||||
return output
|
||||
|
|
|
@ -3,8 +3,12 @@ import logging
|
|||
from binascii import unhexlify
|
||||
from typing import Tuple, List, Dict
|
||||
|
||||
from ecdsa import BadSignatureError
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
from torba.client.baseledger import BaseLedger
|
||||
from lbrynet.schema.result import Outputs
|
||||
from lbrynet.schema.url import URL
|
||||
from lbrynet.wallet.dewies import dewies_to_lbc
|
||||
from lbrynet.wallet.resolve import Resolver
|
||||
from lbrynet.wallet.account import Account
|
||||
|
@ -61,7 +65,16 @@ class MainNetLedger(BaseLedger):
|
|||
async def resolve(self, urls):
|
||||
txos = (await self._inflate_outputs(self.network.resolve(urls)))[0]
|
||||
assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received."
|
||||
return {url: txo for url, txo in zip(urls, txos)}
|
||||
result = {}
|
||||
for url, txo in zip(urls, txos):
|
||||
if txo and URL.parse(url).has_channel:
|
||||
if not txo.channel or not txo.is_signed_by(txo.channel, self):
|
||||
txo = None
|
||||
if txo:
|
||||
result[url] = txo
|
||||
else:
|
||||
result[url] = {'error': f'{url} did not resolve to a claim'}
|
||||
return result
|
||||
|
||||
async def claim_search(self, **kwargs) -> Tuple[List, int, int]:
|
||||
return await self._inflate_outputs(self.network.claim_search(**kwargs))
|
||||
|
|
|
@ -2,7 +2,6 @@ import logging
|
|||
|
||||
import asyncio
|
||||
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from binascii import unhexlify, hexlify
|
||||
from lbrynet.utils import lru_cache_concurrent
|
||||
from lbrynet.wallet.account import validate_claim_id
|
||||
|
@ -340,12 +339,9 @@ def validate_claim_signature_and_get_channel_name(claim_result, certificate_clai
|
|||
claim_tx=None, cert_tx=None):
|
||||
valid_signature = False
|
||||
if cert_tx and certificate_claim and claim_tx and claim_result:
|
||||
try:
|
||||
valid_signature = claim_tx.outputs[claim_result['nout']].is_signed_by(
|
||||
cert_tx.outputs[certificate_claim['nout']], ledger
|
||||
)
|
||||
except InvalidSignature:
|
||||
pass
|
||||
valid_signature = claim_tx.outputs[claim_result['nout']].is_signed_by(
|
||||
cert_tx.outputs[certificate_claim['nout']], ledger
|
||||
)
|
||||
if not valid_signature:
|
||||
log.warning("lbry://%s#%s has an invalid signature",
|
||||
claim_result['name'], claim_result['claim_id'])
|
||||
|
|
|
@ -9,7 +9,7 @@ from cryptography.hazmat.primitives.serialization import load_der_public_key
|
|||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
|
||||
from ecdsa.util import sigencode_der
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
from torba.client.basetransaction import BaseTransaction, BaseInput, BaseOutput, ReadOnlyList
|
||||
from torba.client.hash import hash160, sha256, Base58
|
||||
|
@ -116,9 +116,13 @@ class Output(BaseOutput):
|
|||
signature = hexlify(self.claim.signature)
|
||||
r = int(signature[:int(len(signature)/2)], 16)
|
||||
s = int(signature[int(len(signature)/2):], 16)
|
||||
encoded_sig = sigencode_der(r, s, len(signature)*4)
|
||||
public_key.verify(encoded_sig, digest, ec.ECDSA(Prehashed(hash)))
|
||||
return True
|
||||
encoded_sig = ecdsa.util.sigencode_der(r, s, len(signature)*4)
|
||||
try:
|
||||
public_key.verify(encoded_sig, digest, ec.ECDSA(Prehashed(hash)))
|
||||
return True
|
||||
except (ValueError, InvalidSignature):
|
||||
pass
|
||||
return False
|
||||
|
||||
def sign(self, channel: 'Output', first_input_id=None):
|
||||
self.channel = channel
|
||||
|
|
|
@ -830,7 +830,9 @@ class StreamCommands(CommandTestCase):
|
|||
response = await self.resolve('bad_example')
|
||||
self.assertFalse(response['bad_example']['is_channel_signature_valid'])
|
||||
response = await self.resolve('@olds/bad_example')
|
||||
self.assertFalse(response['@olds/bad_example']['is_channel_signature_valid'])
|
||||
self.assertEqual(response, {
|
||||
'@olds/bad_example': {'error': '@olds/bad_example did not resolve to a claim'}
|
||||
})
|
||||
|
||||
|
||||
def generate_signed_legacy(address: bytes, output: Output):
|
||||
|
|
|
@ -49,8 +49,7 @@ class TestSigningAndValidatingClaim(AsyncioTestCase):
|
|||
def test_fail_to_validate_on_wrong_channel(self):
|
||||
stream = self.get_stream()
|
||||
stream.sign(self.get_channel())
|
||||
with self.assertRaises(InvalidSignature):
|
||||
self.assertTrue(stream.is_signed_by(self.get_channel()))
|
||||
self.assertFalse(stream.is_signed_by(self.get_channel()))
|
||||
|
||||
def test_fail_to_validate_altered_claim(self):
|
||||
channel = self.get_channel()
|
||||
|
@ -58,8 +57,7 @@ class TestSigningAndValidatingClaim(AsyncioTestCase):
|
|||
stream.sign(channel)
|
||||
self.assertTrue(stream.is_signed_by(channel))
|
||||
stream.claim.stream.title = 'hello'
|
||||
with self.assertRaises(InvalidSignature):
|
||||
self.assertTrue(stream.is_signed_by(channel))
|
||||
self.assertFalse(stream.is_signed_by(channel))
|
||||
|
||||
def test_valid_private_key_for_cert(self):
|
||||
channel = self.get_channel()
|
||||
|
|
Loading…
Reference in a new issue