client side pretty much done

This commit is contained in:
Lex Berezhny 2019-03-20 01:46:23 -04:00
parent 22bacc907b
commit 1d68bef6f2
49 changed files with 482 additions and 1957 deletions

View file

@ -23,8 +23,8 @@ from lbrynet.stream.stream_manager import StreamManager
from lbrynet.extras.daemon.Component import Component
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.wallet import Network
from lbrynet.wallet import LbryWalletManager
from lbrynet.wallet import Network
log = logging.getLogger(__name__)

View file

@ -33,21 +33,19 @@ from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.json_response_encoder import JSONResponseEncoder
from lbrynet.extras.daemon.mime_types import guess_media_type
from lbrynet.extras.daemon.undecorated import undecorated
from lbrynet.extras.wallet.account import Account as LBCAccount
from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.schema.error import URIParseError, DecodeError
from lbrynet.schema.validator import validate_claim_id
from lbrynet.schema.address import decode_address
from lbrynet.wallet.account import Account as LBCAccount, validate_claim_id
from lbrynet.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbrynet.schema.claim import Claim
from lbrynet.schema.uri import parse_lbry_uri, URIParseError
if typing.TYPE_CHECKING:
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.dht.node import Node
from lbrynet.extras.daemon.Components import UPnPComponent
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.wallet.manager import LbryWalletManager
from lbrynet.wallet.ledger import MainNetLedger
from lbrynet.stream.stream_manager import StreamManager
log = logging.getLogger(__name__)
@ -553,14 +551,14 @@ class Daemon(metaclass=JSONRPCServerType):
return None
@property
def default_account(self):
def default_account(self) -> Optional[LBCAccount]:
try:
return self.wallet_manager.default_account
except AttributeError:
return None
@property
def ledger(self):
def ledger(self) -> Optional['MainNetLedger']:
try:
return self.wallet_manager.default_account.ledger
except AttributeError:
@ -579,12 +577,12 @@ class Daemon(metaclass=JSONRPCServerType):
if claim_response and 'claim' in claim_response:
if 'value' in claim_response['claim'] and claim_response['claim']['value'] is not None:
claim_value = ClaimDict.load_dict(claim_response['claim']['value'])
if not claim_value.has_fee:
claim_value = Claim.from_bytes(claim_response['claim']['value'])
if not claim_value.stream.has_fee:
return 0.0
return round(
self.exchange_rate_manager.convert_currency(
claim_value.source_fee.currency, "LBC", claim_value.source_fee.amount
claim_value.stream.fee.currency, "LBC", claim_value.stream.fee.amount
), 5
)
else:
@ -855,7 +853,7 @@ class Daemon(metaclass=JSONRPCServerType):
if address:
# raises an error if the address is invalid
decode_address(address)
self.ledger.is_valid_address(address)
reserved_points = self.wallet_manager.reserve_points(address, amount)
if reserved_points is None:
@ -1245,7 +1243,7 @@ class Daemon(metaclass=JSONRPCServerType):
raise NegativeFundsError()
for address in addresses:
decode_address(address)
self.ledger.is_valid_address(address)
account = self.get_account_or_default(account_id)
result = await account.send_to_addresses(amount, addresses, broadcast)
@ -1890,7 +1888,7 @@ class Daemon(metaclass=JSONRPCServerType):
@requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
async def jsonrpc_publish(
self, name, bid, metadata=None, file_path=None, fee=None, title=None,
self, name, bid, file_path=None, fee=None, title=None,
description=None, author=None, language=None, license=None,
license_url=None, thumbnail=None, preview=None, nsfw=None,
channel_name=None, channel_id=None, channel_account_id=None, account_id=None,
@ -1988,7 +1986,7 @@ class Daemon(metaclass=JSONRPCServerType):
for address in [claim_address, change_address]:
if address is not None:
# raises an error if the address is invalid
decode_address(address)
self.ledger.is_valid_address(address)
account = self.get_account_or_default(account_id)
@ -2005,33 +2003,26 @@ class Daemon(metaclass=JSONRPCServerType):
f"you can specify for this claim is {dewies_to_lbc(available)}."
)
metadata = metadata or {}
if fee is not None:
metadata['fee'] = fee
claim = Claim()
stream = claim.stream
if title is not None:
metadata['title'] = title
stream.title = title
if description is not None:
metadata['description'] = description
stream.description = description
if author is not None:
metadata['author'] = author
stream.author = author
if language is not None:
metadata['language'] = language
stream.language = language
if license is not None:
metadata['license'] = license
stream.license = license
if license_url is not None:
metadata['licenseUrl'] = license_url
stream.license_url = license_url
if thumbnail is not None:
metadata['thumbnail'] = thumbnail
if preview is not None:
metadata['preview'] = preview
if nsfw is not None:
metadata['nsfw'] = bool(nsfw)
metadata['version'] = '_0_1_0'
stream.thumbnail_url = thumbnail
# check for original deprecated format {'currency':{'address','amount'}}
# add address, version to fee if unspecified
if 'fee' in metadata:
if fee is not None:
if len(metadata['fee'].keys()) == 1 and isinstance(metadata['fee'].values()[0], dict):
raise Exception('Old format for fee no longer supported. '
'Fee must be specified as {"currency":,"address":,"amount":}')
@ -2046,15 +2037,6 @@ class Daemon(metaclass=JSONRPCServerType):
if 'fee' in metadata and 'version' not in metadata['fee']:
metadata['fee']['version'] = '_0_0_1'
claim_dict = {
'version': '_0_0_1',
'claimType': 'streamType',
'stream': {
'metadata': metadata,
'version': '_0_0_1'
}
}
sd_to_delete = None
if file_path:
@ -2063,27 +2045,16 @@ class Daemon(metaclass=JSONRPCServerType):
if os.path.getsize(file_path) == 0:
raise Exception(f"Cannot publish empty file {file_path}")
if existing_claims:
sd_to_delete = existing_claims[-1].claim_dict['stream']['source']['source']
sd_to_delete = existing_claims[-1].claim.stream.hash
# since the file hasn't yet been made into a stream, we don't have
# a valid Source for the claim when validating the format, we'll use a fake one
claim_dict['stream']['source'] = {
'version': '_0_0_1',
'sourceType': 'lbry_sd_hash',
'source': '0' * 96,
'contentType': ''
}
stream.hash = '0' * 96
elif not existing_claims:
raise Exception("no previous stream to update")
else:
claim_dict['stream']['source'] = existing_claims[-1].claim_dict['stream']['source']
try:
claim_dict = ClaimDict.load_dict(claim_dict).claim_dict
# the metadata to use in the claim can be serialized by lbrynet.schema
except DecodeError as err:
# there was a problem with a metadata field, raise an error here rather than
# waiting to find out when we go to publish the claim (after having made the stream)
raise Exception(f"invalid publish metadata: {err}")
stream.hash = '0' * 96
stream.hash_bytes = existing_claims[-1].claim.stream.hash_bytes
certificate = None
if channel_id or channel_name:
certificate = await self.get_channel_or_error(
@ -2092,8 +2063,8 @@ class Daemon(metaclass=JSONRPCServerType):
if file_path:
stream = await self.stream_manager.create_stream(file_path)
claim_dict['stream']['source']['source'] = stream.sd_hash
claim_dict['stream']['source']['contentType'] = guess_media_type(file_path)
claim.stream.hash = stream.sd_hash
claim.stream.media_type = guess_media_type(file_path)
if sd_to_delete:
stream_hash_to_delete = await self.storage.get_stream_hash_for_sd_hash(sd_to_delete)
@ -2110,19 +2081,18 @@ class Daemon(metaclass=JSONRPCServerType):
else:
log.info("updating claim with stream %s from previous", claim_dict['stream']['source']['source'][:8])
sd_hash = claim_dict['stream']['source']['source']
sd_hash = claim.stream.hash
log.info("Publish: %s", {
'name': name,
'file_path': file_path,
'bid': dewies_to_lbc(amount),
'claim_address': claim_address,
'change_address': change_address,
'claim_dict': claim_dict,
'channel_id': channel_id,
'channel_name': channel_name
})
tx = await self.wallet_manager.claim_name(
account, name, amount, claim_dict, certificate, claim_address
account, name, amount, claim, certificate, claim_address
)
stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash)
if stream_hash:
@ -2274,7 +2244,7 @@ class Daemon(metaclass=JSONRPCServerType):
}
"""
decode_address(address)
self.ledger.is_valid_address(address)
return self.wallet_manager.send_claim_to_address(
claim_id, address, self.get_dewies_or_error("amount", amount) if amount else None
)

View file

@ -2,6 +2,7 @@ import asyncio
import time
import logging
import json
from decimal import Decimal
from lbrynet.error import InvalidExchangeRateResponse, CurrencyConversionError
from lbrynet.utils import aiohttp_request
@ -226,12 +227,12 @@ class ExchangeRateManager:
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair == (from_currency, to_currency)):
return amount * market.rate.spot
return amount * Decimal(market.rate.spot)
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair[0] == from_currency):
return self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * market.rate.spot)
market.rate.currency_pair[1], to_currency, amount * Decimal(market.rate.spot))
raise CurrencyConversionError(
f'Unable to convert {amount} from {from_currency} to {to_currency}')

View file

@ -4,9 +4,9 @@ from binascii import hexlify
from datetime import datetime
from json import JSONEncoder
from ecdsa import BadSignatureError
from lbrynet.extras.wallet import MainNetLedger
from lbrynet.extras.wallet.transaction import Transaction, Output
from lbrynet.extras.wallet.dewies import dewies_to_lbc
from lbrynet.wallet.ledger import MainNetLedger
from lbrynet.wallet.transaction import Transaction, Output
from lbrynet.wallet.dewies import dewies_to_lbc
log = logging.getLogger(__name__)
@ -68,15 +68,13 @@ class JSONResponseEncoder(JSONEncoder):
if txo.script.is_claim_name or txo.script.is_update_claim:
claim = txo.claim
output['value'] = claim.claim_dict
if claim.has_signature:
output['value'] = claim.to_dict()
if claim.is_signed:
output['valid_signature'] = None
if txo.channel is not None:
output['channel_name'] = txo.channel.claim_name
try:
output['valid_signature'] = claim.validate_signature(
txo.get_address(self.ledger), txo.channel.claim, name=txo.claim_name
)
output['valid_signature'] = txo.is_signed_by(txo.channel, self.ledger)
except BadSignatureError:
output['valid_signature'] = False
except ValueError:

View file

@ -7,9 +7,8 @@ import binascii
import time
from torba.client.basedatabase import SQLiteMixin
from lbrynet.conf import Config
from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.decode import smart_decode
from lbrynet.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbrynet.schema.claim import Claim
from lbrynet.dht.constants import data_expiration
from lbrynet.blob.blob_info import BlobInfo
@ -39,7 +38,9 @@ class StoredStreamClaim:
self.claim_name = name
self.amount = amount
self.height = height
self.claim: typing.Optional[ClaimDict] = None if not serialized else smart_decode(serialized)
self.claim: typing.Optional[Claim] = None if not serialized else Claim.from_bytes(
binascii.unhexlify(serialized)
)
self.claim_address = address
self.claim_sequence = claim_sequence
self.channel_claim_id = channel_claim_id
@ -588,19 +589,12 @@ class SQLiteStorage(SQLiteMixin):
height = claim_info['height']
address = claim_info['address']
sequence = claim_info['claim_sequence']
certificate_id = claim_info['value'].signing_channel_id
try:
certificate_id = claim_info['value'].get('publisherSignature', {}).get('certificateId')
except AttributeError:
certificate_id = None
try:
if claim_info['value'].get('stream', {}).get('source', {}).get('sourceType') == "lbry_sd_hash":
source_hash = claim_info['value'].get('stream', {}).get('source', {}).get('source')
else:
source_hash = None
except AttributeError:
source_hash = claim_info['value'].stream.hash
except (AttributeError, ValueError):
source_hash = None
serialized = claim_info.get('hex') or binascii.hexlify(
smart_decode(claim_info['value']).serialized).decode()
serialized = binascii.hexlify(claim_info['value'].to_bytes())
transaction.execute(
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence)
@ -620,12 +614,12 @@ class SQLiteStorage(SQLiteMixin):
stream_hash = stream_hash[0]
known_outpoint = transaction.execute(
"select claim_outpoint from content_claim where stream_hash=?", (stream_hash,)
)
).fetchone()
known_claim_id = transaction.execute(
"select claim_id from claim "
"inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint "
"where c3.stream_hash=?", (stream_hash,)
)
).fetchone()
if not known_claim_id:
content_claims_to_update.append((stream_hash, outpoint))
elif known_outpoint != outpoint:
@ -664,7 +658,7 @@ class SQLiteStorage(SQLiteMixin):
).fetchone()
if not claim_info:
raise Exception("claim not found")
new_claim_id, claim = claim_info[0], ClaimDict.deserialize(binascii.unhexlify(claim_info[1]))
new_claim_id, claim = claim_info[0], Claim.from_bytes(binascii.unhexlify(claim_info[1]))
# certificate claims should not be in the content_claim table
if not claim.is_stream:
@ -677,7 +671,7 @@ class SQLiteStorage(SQLiteMixin):
if not known_sd_hash:
raise Exception("stream not found")
# check the claim contains the same sd hash
if known_sd_hash[0].encode() != claim.source_hash:
if known_sd_hash[0] != claim.stream.hash:
raise Exception("stream mismatch")
# if there is a current claim associated to the file, check that the new claim is an update to it

View file

@ -2,7 +2,6 @@ import platform
import os
import logging.handlers
from lbrynet.schema import __version__ as schema_version
from lbrynet import build_type, __version__ as lbrynet_version
log = logging.getLogger(__name__)
@ -16,7 +15,6 @@ def get_platform() -> dict:
"os_release": platform.release(),
"os_system": platform.system(),
"lbrynet_version": lbrynet_version,
"lbryschema_version": schema_version,
"build": build_type.BUILD, # CI server sets this during build step
}
if p["os_system"] == "Linux":

View file

@ -1,90 +1,64 @@
from lbrynet.schema.constants import ADDRESS_CHECKSUM_LENGTH
from lbrynet.schema.hashing import double_sha256
from lbrynet.schema.error import InvalidAddress
from google.protobuf.message import DecodeError
from google.protobuf.json_format import MessageToDict
alphabet = b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
class Signable:
__slots__ = (
'message', 'version', 'signature',
'signature_type', 'unsigned_payload', 'signing_channel_id'
)
def scrub_input(v):
if isinstance(v, str) and not isinstance(v, bytes):
v = v.encode('ascii')
return v
message_class = None
def __init__(self, message=None):
self.message = message or self.message_class()
self.version = 2
self.signature = None
self.signature_type = 'SECP256k1'
self.unsigned_payload = None
self.signing_channel_id = None
def b58encode_int(i, default_one=True):
'''Encode an integer using Base58'''
if not i and default_one:
return alphabet[0:1]
string = b""
while i:
i, idx = divmod(i, 58)
string = alphabet[idx:idx+1] + string
return string
@property
def is_undetermined(self):
return self.message.WhichOneof('type') is None
@property
def is_signed(self):
return self.signature is not None
def b58encode(v):
'''Encode a string using Base58'''
def to_dict(self):
return MessageToDict(self.message)
v = scrub_input(v)
def to_message_bytes(self) -> bytes:
return self.message.SerializeToString()
nPad = len(v)
v = v.lstrip(b'\0')
nPad -= len(v)
def to_bytes(self) -> bytes:
pieces = bytearray()
if self.is_signed:
pieces.append(1)
pieces.extend(self.signing_channel_id)
pieces.extend(self.signature)
else:
pieces.append(0)
pieces.extend(self.to_message_bytes())
return bytes(pieces)
p, acc = 1, 0
for c in reversed(v):
acc += p * c
p = p << 8
@classmethod
def from_bytes(cls, data: bytes):
signable = cls()
if data[0] == 0:
signable.message.ParseFromString(data[1:])
elif data[0] == 1:
signable.signing_channel_id = data[1:21]
signable.signature = data[21:85]
signable.message.ParseFromString(data[85:])
else:
raise DecodeError('Could not determine message format version.')
return signable
result = b58encode_int(acc, default_one=False)
def __len__(self):
return len(self.to_bytes())
return alphabet[0:1] * nPad + result
def b58decode_int(v):
'''Decode a Base58 encoded string as an integer'''
v = scrub_input(v)
decimal = 0
for char in v:
decimal = decimal * 58 + alphabet.index(char)
return decimal
def b58decode(v):
'''Decode a Base58 encoded string'''
v = scrub_input(v)
origlen = len(v)
v = v.lstrip(alphabet[0:1])
newlen = len(v)
acc = b58decode_int(v)
result = []
while acc > 0:
acc, mod = divmod(acc, 256)
result.append(mod)
return b'\0' * (origlen - newlen) + bytes(reversed(result))
def validate_b58_checksum(addr_bytes):
addr_without_checksum = addr_bytes[:-ADDRESS_CHECKSUM_LENGTH]
addr_checksum = addr_bytes[-ADDRESS_CHECKSUM_LENGTH:]
if double_sha256(addr_without_checksum)[:ADDRESS_CHECKSUM_LENGTH] != addr_checksum:
raise InvalidAddress("Invalid address checksum")
def b58decode_strip_checksum(v):
addr_bytes = b58decode(v)
validate_b58_checksum(addr_bytes)
return addr_bytes[:-ADDRESS_CHECKSUM_LENGTH]
def b58encode_with_checksum(addr_bytes):
addr_checksum = double_sha256(addr_bytes)[:ADDRESS_CHECKSUM_LENGTH]
return b58encode(addr_bytes + addr_checksum)
def __bytes__(self):
return self.to_bytes()

View file

@ -1,239 +1,41 @@
import json
from collections import OrderedDict
from typing import List, Tuple
from decimal import Decimal
from binascii import hexlify, unhexlify
from google.protobuf import json_format # pylint: disable=no-name-in-module
from google.protobuf.message import DecodeError as DecodeError_pb # pylint: disable=no-name-in-module,import-error
from google.protobuf.message import DecodeError
from torba.client.hash import Base58
from torba.client.constants import COIN
from lbrynet.schema.signature import Signature
from lbrynet.schema.constants import CURVE_NAMES, SECP256k1
from lbrynet.schema.error import DecodeError
from lbrynet.schema.types.v2.claim_pb2 import Claim as ClaimMessage, Fee as FeeMessage
from lbrynet.schema.base import b58decode, b58encode
from lbrynet.schema import compat
from lbrynet.schema.base import Signable
class ClaimDict(OrderedDict):
def __init__(self, claim_dict=None, detached_signature: Signature=None):
if isinstance(claim_dict, legacy_claim_pb2.Claim):
raise Exception("To initialize %s with a Claim protobuf use %s.load_protobuf" %
(self.__class__.__name__, self.__class__.__name__))
self.detached_signature = detached_signature
OrderedDict.__init__(self, claim_dict or [])
class Claim(Signable):
@property
def protobuf_dict(self):
"""Claim dictionary using base64 to represent bytes"""
return json.loads(json_format.MessageToJson(self.protobuf, True))
@property
def protobuf(self):
"""Claim message object"""
return LegacyClaim.load(self)
@property
def serialized(self):
"""Serialized Claim protobuf"""
if self.detached_signature:
return self.detached_signature.serialized
return self.protobuf.SerializeToString()
@property
def serialized_no_signature(self):
"""Serialized Claim protobuf without publisherSignature field"""
claim = self.protobuf
claim.ClearField("publisherSignature")
return ClaimDict.load_protobuf(claim).serialized
@property
def has_signature(self):
return self.protobuf.HasField("publisherSignature") or (
self.detached_signature and self.detached_signature.raw_signature
)
@property
def is_certificate(self):
claim = self.protobuf
return CLAIM_TYPE_NAMES[claim.claimType] == "certificate"
@property
def is_stream(self):
claim = self.protobuf
return CLAIM_TYPE_NAMES[claim.claimType] == "stream"
@property
def source_hash(self):
claim = self.protobuf
if not CLAIM_TYPE_NAMES[claim.claimType] == "stream":
return None
return binascii.hexlify(claim.stream.source.source)
@property
def has_fee(self):
claim = self.protobuf
if not CLAIM_TYPE_NAMES[claim.claimType] == "stream":
return None
if claim.stream.metadata.HasField("fee"):
return True
return False
@property
def source_fee(self):
claim = self.protobuf
if not CLAIM_TYPE_NAMES[claim.claimType] == "stream":
return None
if claim.stream.metadata.HasField("fee"):
return Fee.load_protobuf(claim.stream.metadata.fee)
return None
@property
def certificate_id(self) -> str:
if self.protobuf.HasField("publisherSignature"):
return binascii.hexlify(self.protobuf.publisherSignature.certificateId).decode()
if self.detached_signature and self.detached_signature.certificate_id:
return binascii.hexlify(self.detached_signature.certificate_id).decode()
@property
def signature(self):
if not self.has_signature:
return None
return binascii.hexlify(self.protobuf.publisherSignature.signature)
@property
def protobuf_len(self):
"""Length of serialized string"""
return self.protobuf.ByteSize()
@property
def json_len(self):
"""Length of json encoded string"""
return len(json.dumps(self.claim_dict))
@property
def claim_dict(self):
"""Claim dictionary with bytes represented as hex and base58"""
return dict(encode_fields(self, self.detached_signature))
@classmethod
def load_protobuf_dict(cls, protobuf_dict, detached_signature=None):
"""
Load a ClaimDict from a dictionary with base64 encoded bytes
(as returned by the protobuf json formatter)
"""
return cls(decode_b64_fields(protobuf_dict), detached_signature=detached_signature)
@classmethod
def load_protobuf(cls, protobuf_claim, detached_signature=None):
"""Load ClaimDict from a protobuf Claim message"""
return cls.load_protobuf_dict(json.loads(json_format.MessageToJson(protobuf_claim, True)), detached_signature)
@classmethod
def load_dict(cls, claim_dict):
"""Load ClaimDict from a dictionary with hex and base58 encoded bytes"""
try:
claim_dict, detached_signature = decode_fields(claim_dict)
return cls.load_protobuf(cls(claim_dict).protobuf, detached_signature)
except json_format.ParseError as err:
raise DecodeError(str(err))
@classmethod
def deserialize(cls, serialized):
"""Load a ClaimDict from a serialized protobuf string"""
detached_signature = Signature.flagged_parse(serialized)
temp_claim = legacy_claim_pb2.Claim()
try:
temp_claim.ParseFromString(detached_signature.payload)
except DecodeError_pb:
raise DecodeError(DecodeError_pb)
return cls.load_protobuf(temp_claim, detached_signature=detached_signature)
@classmethod
def generate_certificate(cls, private_key, curve=SECP256k1):
signer = get_signer(curve).load_pem(private_key)
return cls.load_protobuf(signer.certificate)
def sign(self, private_key, claim_address, cert_claim_id, curve=SECP256k1, name=None, force_detached=False):
signer = get_signer(curve).load_pem(private_key)
signed, signature = signer.sign_stream_claim(self, claim_address, cert_claim_id, name, force_detached)
return ClaimDict.load_protobuf(signed, signature)
def validate_signature(self, claim_address, certificate, name=None):
if isinstance(certificate, ClaimDict):
certificate = certificate.protobuf
curve = CURVE_NAMES[certificate.certificate.keyType]
validator = get_validator(curve).load_from_certificate(certificate, self.certificate_id)
return validator.validate_claim_signature(self, claim_address, name)
def validate_private_key(self, private_key, certificate_id):
certificate = self.protobuf
if CLAIM_TYPE_NAMES[certificate.claimType] != "certificate":
return
curve = CURVE_NAMES[certificate.certificate.keyType]
validator = get_validator(curve).load_from_certificate(certificate, certificate_id)
signing_key = validator.signing_key_from_pem(private_key)
return validator.validate_private_key(signing_key)
def get_validator(self, certificate_id):
"""
Get a lbrynet.schema.validator.Validator object for a certificate claim
:param certificate_id: claim id of this certificate claim
:return: None or lbrynet.schema.validator.Validator object
"""
claim = self.protobuf
if CLAIM_TYPE_NAMES[claim.claimType] != "certificate":
return
curve = CURVE_NAMES[claim.certificate.keyType]
return get_validator(curve).load_from_certificate(claim, certificate_id)
class Claim:
__slots__ = '_claim', 'signature', 'certificate_id', 'signature_type', 'unsigned_payload'
__slots__ = 'version',
message_class = ClaimMessage
def __init__(self, claim_message=None):
self._claim = claim_message or ClaimMessage()
self.signature = None
self.signature_type = 'SECP256k1'
self.certificate_id = None
self.unsigned_payload = None
@property
def is_undetermined(self):
return self._claim.WhichOneof('type') is None
super().__init__(claim_message)
self.version = 2
@property
def is_stream(self):
return self._claim.WhichOneof('type') == 'stream'
return self.message.WhichOneof('type') == 'stream'
@property
def is_channel(self):
return self._claim.WhichOneof('type') == 'channel'
@property
def is_signed(self):
return self.signature is not None
return self.message.WhichOneof('type') == 'channel'
@property
def stream_message(self):
if self.is_undetermined:
self._claim.stream.SetInParent()
self.message.stream.SetInParent()
if not self.is_stream:
raise ValueError('Claim is not a stream.')
return self._claim.stream
return self.message.stream
@property
def stream(self) -> 'Stream':
@ -242,31 +44,30 @@ class Claim:
@property
def channel_message(self):
if self.is_undetermined:
self._claim.channel.SetInParent()
self.message.channel.SetInParent()
if not self.is_channel:
raise ValueError('Claim is not a channel.')
return self._claim.channel
return self.message.channel
@property
def channel(self) -> 'Channel':
return Channel(self)
def to_bytes(self) -> bytes:
return self._claim.SerializeToString()
@classmethod
def from_bytes(cls, data: bytes) -> 'Claim':
claim = ClaimMessage()
if data[0] == 0:
claim.ParseFromString(data[1:])
return cls(claim)
elif data[0] == 1:
claim.ParseFromString(data[85:])
return cls(claim).from_message(payload[1:21], payload[21:85])
elif data[0] == ord('{'):
return compat.from_old_json_schema(cls(claim), data)
else:
return compat.from_types_v1(cls(claim), data)
try:
return super().from_bytes(data)
except DecodeError:
claim = cls()
if data[0] == ord('{'):
claim.version = 0
compat.from_old_json_schema(claim, data)
elif data[0] not in (0, 1):
claim.version = 1
compat.from_types_v1(claim, data)
else:
raise
return claim
class Video:
@ -338,11 +139,11 @@ class Fee:
@property
def address(self) -> str:
return b58encode(self._fee.address).decode()
return Base58.encode(self._fee.address)
@address.setter
def address(self, address: str):
self._fee.address = b58decode(address)
self._fee.address = Base58.decode(address)
@property
def address_bytes(self) -> bytes:
@ -519,6 +320,10 @@ class Stream:
def fee(self) -> Fee:
return Fee(self._stream.fee)
@property
def has_fee(self) -> bool:
return self._stream.HasField('fee')
@property
def tags(self) -> List:
return self._stream.tags

View file

@ -20,7 +20,7 @@ def from_old_json_schema(claim, payload: bytes):
stream.language = value.get('language', '')
stream.hash = value['sources']['lbry_sd_hash']
if value.get('nsfw', False):
stream.tags.append('nsfw')
stream.tags.append('mature')
if "fee" in value:
fee = value["fee"]
currency = list(fee.keys())[0]
@ -49,7 +49,7 @@ def from_types_v1(claim, payload: bytes):
stream.media_type = old.stream.source.contentType
stream.hash_bytes = old.stream.source.source
if old.stream.metadata.nsfw:
stream.tags.append('nsfw')
stream.tags.append('mature')
if old.stream.metadata.HasField('fee'):
fee = old.stream.metadata.fee
stream.fee.address_bytes = fee.address
@ -64,7 +64,7 @@ def from_types_v1(claim, payload: bytes):
sig = old.publisherSignature
claim.signature = sig.signature
claim.signature_type = KeyType.Name(sig.signatureType)
claim.certificate_id = sig.certificateId
claim.signing_channel_id = sig.certificateId
old.ClearField("publisherSignature")
claim.unsigned_payload = old.SerializeToString()
elif old.claimType == 2:

View file

@ -1,58 +0,0 @@
LBC = "LBC"
BTC = "BTC"
USD = "USD"
CURRENCY_MAP = {
LBC: 1,
BTC: 2,
USD: 3
}
CURRENCY_NAMES = {
1: LBC,
2: BTC,
3: USD
}
ADDRESS_LENGTH = 25
ADDRESS_CHECKSUM_LENGTH = 4
NIST256p = "NIST256p"
NIST384p = "NIST384p"
SECP256k1 = "SECP256k1"
ECDSA_CURVES = {
NIST256p: 1,
NIST384p: 2,
SECP256k1: 3
}
CURVE_NAMES = {
1: NIST256p,
2: NIST384p,
3: SECP256k1
}
SHA256 = "sha256"
SHA384 = "sha384"
B58_CHARS = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
assert len(B58_CHARS) == 58
PUBKEY_ADDRESS = 0
SCRIPT_ADDRESS = 5
ADDRESS_PREFIXES = {
"lbrycrd_main": {
PUBKEY_ADDRESS: 85,
SCRIPT_ADDRESS: 122
},
"lbrycrd_regtest": {
PUBKEY_ADDRESS: 111,
SCRIPT_ADDRESS: 196
},
"lbrycrd_testnet": {
PUBKEY_ADDRESS: 111,
SCRIPT_ADDRESS: 196
},
}

View file

@ -1,90 +0,0 @@
import base64, binascii
from copy import deepcopy
from lbrynet.schema.address import decode_address, encode_address
from lbrynet.schema.legacy_schema_v1 import CLAIM_TYPES, CLAIM_TYPE, STREAM_TYPE, CERTIFICATE_TYPE
from lbrynet.schema.legacy_schema_v1 import SIGNATURE
from lbrynet.schema.error import DecodeError, InvalidAddress
from lbrynet.schema.signature import Signature
def encode_fields(claim_dictionary, detached_signature: Signature):
"""Encode bytes to hex and b58 for return by ClaimDict"""
claim_dictionary = deepcopy(claim_dictionary)
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
claim_value = claim_dictionary[claim_type]
if claim_type == CLAIM_TYPES[STREAM_TYPE]:
claim_value['source']['source'] = binascii.hexlify(claim_value['source']['source']).decode()
if 'fee' in claim_value['metadata']:
try:
address = encode_address(claim_value['metadata']['fee']['address'])
except InvalidAddress as err:
raise DecodeError("Invalid fee address: %s" % err)
claim_value['metadata']['fee']['address'] = address
elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]:
public_key = claim_value["publicKey"]
claim_value["publicKey"] = binascii.hexlify(public_key).decode()
if SIGNATURE in claim_dictionary:
encoded_sig = binascii.hexlify(claim_dictionary[SIGNATURE]['signature']).decode()
encoded_cert_id = binascii.hexlify(claim_dictionary[SIGNATURE]['certificateId']).decode()
claim_dictionary[SIGNATURE]['signature'] = encoded_sig
claim_dictionary[SIGNATURE]['certificateId'] = encoded_cert_id
elif detached_signature and detached_signature.raw_signature:
claim_dictionary[SIGNATURE] = {
'detached_signature': binascii.hexlify(detached_signature.serialized).decode(),
'certificateId': binascii.hexlify(detached_signature.certificate_id).decode()
}
claim_dictionary[claim_type] = claim_value
return claim_dictionary
def decode_fields(claim_dictionary):
"""Decode hex and b58 encoded bytes in dictionaries given to ClaimDict"""
detached_signature = None
claim_dictionary = deepcopy(claim_dictionary)
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
claim_value = claim_dictionary[claim_type]
if claim_type == CLAIM_TYPES[STREAM_TYPE]:
claim_value['source']['source'] = binascii.unhexlify(claim_value['source']['source'])
if 'fee' in claim_value['metadata']:
try:
address = decode_address(claim_value['metadata']['fee']['address'])
except InvalidAddress as err:
raise DecodeError("Invalid fee address: %s" % err)
claim_value['metadata']['fee']['address'] = address
elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]:
public_key = binascii.unhexlify(claim_value["publicKey"])
claim_value["publicKey"] = public_key
if SIGNATURE in claim_dictionary and not claim_dictionary[SIGNATURE].get('detached_signature'):
decoded_sig = binascii.unhexlify(claim_dictionary[SIGNATURE]['signature'])
decoded_cert_id = binascii.unhexlify(claim_dictionary[SIGNATURE]['certificateId'])
claim_dictionary[SIGNATURE]['signature'] = decoded_sig
claim_dictionary[SIGNATURE]['certificateId'] = decoded_cert_id
elif claim_dictionary.get(SIGNATURE, {}).get('detached_signature'):
hex_detached_signature = claim_dictionary[SIGNATURE]['detached_signature']
detached_signature = Signature.flagged_parse(binascii.unhexlify(hex_detached_signature))
del claim_dictionary[SIGNATURE]
claim_dictionary[claim_type] = claim_value
return claim_dictionary, detached_signature
def decode_b64_fields(claim_dictionary):
"""Decode b64 encoded bytes in protobuf generated dictionary to be given to ClaimDict"""
claim_dictionary = deepcopy(claim_dictionary)
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
claim_value = claim_dictionary[claim_type]
if claim_type == CLAIM_TYPES[STREAM_TYPE]:
claim_value['source']['source'] = base64.b64decode(claim_value['source']['source'])
if 'fee' in claim_value['metadata']:
address = base64.b64decode(claim_value['metadata']['fee']['address'])
claim_value['metadata']['fee']['address'] = address
elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]:
public_key = base64.b64decode(claim_value["publicKey"])
claim_value["publicKey"] = public_key
if SIGNATURE in claim_dictionary:
encoded_sig = base64.b64decode(claim_dictionary[SIGNATURE]['signature'])
encoded_cert_id = base64.b64decode(claim_dictionary[SIGNATURE]['certificateId'])
claim_dictionary[SIGNATURE]['signature'] = encoded_sig
claim_dictionary[SIGNATURE]['certificateId'] = encoded_cert_id
claim_dictionary[claim_type] = claim_value
return claim_dictionary

View file

@ -1,22 +0,0 @@
class UnknownSourceType(Exception):
pass
class InvalidSourceHashLength(Exception):
pass
class DecodeError(Exception):
pass
class URIParseError(Exception):
pass
class CertificateError(Exception):
pass
class InvalidAddress(Exception):
pass

View file

@ -1,23 +0,0 @@
import hashlib
def sha256(x):
if isinstance(x, str):
x = x.encode('utf-8')
return hashlib.sha256(x).digest()
def double_sha256(x):
return sha256(sha256(x))
def ripemd160(x):
if isinstance(x, str):
x = x.encode('utf-8')
md = hashlib.new('ripemd160')
md.update(x)
return md.digest()
def hash160(x):
return ripemd160(sha256(x))

View file

@ -1,44 +0,0 @@
from collections import namedtuple
from typing import Union
LEGACY = namedtuple('Legacy', 'payload')
NAMED_SECP256K1 = namedtuple('NamedSECP256k1', 'raw_signature certificate_id payload')
FLAGS = {
LEGACY: 0x80,
NAMED_SECP256K1: 0x01
}
class Signature:
def __init__(self, data: Union[LEGACY, NAMED_SECP256K1]):
self.data = data
@property
def payload(self):
return self.data.payload
@property
def certificate_id(self):
if type(self.data) == NAMED_SECP256K1:
return self.data.certificate_id
@property
def raw_signature(self):
if type(self.data) == NAMED_SECP256K1:
return self.data.raw_signature
@classmethod
def flagged_parse(cls, binary: bytes):
flag = binary[0]
if flag == FLAGS[NAMED_SECP256K1]:
return cls(NAMED_SECP256K1(binary[1:65], binary[65:85], binary[85:]))
else:
return cls(LEGACY(binary))
@property
def serialized(self):
if isinstance(self.data, NAMED_SECP256K1):
return (bytes([FLAGS[type(self.data)]]) + self.data.raw_signature + self.data.certificate_id + self.payload)
elif isinstance(self.data, LEGACY):
return self.payload

View file

@ -1,121 +0,0 @@
import ecdsa
import hashlib
import binascii
from lbrynet.schema.address import decode_address
from lbrynet.schema.encoding import decode_b64_fields
from lbrynet.schema.signature import Signature, NAMED_SECP256K1
from lbrynet.schema.validator import validate_claim_id
from lbrynet.schema.legacy_schema_v1.certificate import Certificate
from lbrynet.schema.legacy_schema_v1.claim import Claim
from lbrynet.schema.legacy_schema_v1 import V_0_0_1, CLAIM_TYPE, CLAIM_TYPES, CERTIFICATE_TYPE, VERSION
from lbrynet.schema.constants import NIST256p, NIST384p, SECP256k1, SHA256, SHA384
class NIST_ECDSASigner(object):
CURVE = None
CURVE_NAME = None
HASHFUNC = hashlib.sha256
HASHFUNC_NAME = SHA256
def __init__(self, private_key):
self._private_key = private_key
@property
def private_key(self):
return self._private_key
@property
def public_key(self):
return self.private_key.get_verifying_key()
@property
def certificate(self):
certificate_claim = {
VERSION: V_0_0_1,
CLAIM_TYPE: CERTIFICATE_TYPE,
CLAIM_TYPES[CERTIFICATE_TYPE]: Certificate.load_from_key_obj(self.public_key,
self.CURVE_NAME)
}
return Claim.load(certificate_claim)
@classmethod
def load_pem(cls, pem_string):
return cls(ecdsa.SigningKey.from_pem(pem_string, hashfunc=cls.HASHFUNC_NAME))
@classmethod
def generate(cls):
return cls(ecdsa.SigningKey.generate(curve=cls.CURVE, hashfunc=cls.HASHFUNC_NAME))
def sign(self, *fields):
digest = self.HASHFUNC(bytearray(b''.join(fields))).digest()
return self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC)
def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, detached=False):
validate_claim_id(cert_claim_id)
raw_cert_id = binascii.unhexlify(cert_claim_id)
decoded_addr = decode_address(claim_address)
if detached:
assert name, "Name is required for detached signatures"
assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}"
signature = self.sign(
name.lower().encode(),
decoded_addr,
claim.serialized_no_signature,
raw_cert_id,
)
else:
signature = self.sign(decoded_addr, claim.serialized_no_signature, raw_cert_id)
if detached:
return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature(NAMED_SECP256K1(
signature,
raw_cert_id,
claim.serialized_no_signature
))
# -- Legacy signer (signature inside protobuf) --
if not isinstance(self.private_key, ecdsa.SigningKey):
raise Exception("Not given a signing key")
sig_dict = {
"version": V_0_0_1,
"signatureType": self.CURVE_NAME,
"signature": signature,
"certificateId": raw_cert_id
}
msg = {
"version": V_0_0_1,
"stream": decode_b64_fields(claim.protobuf_dict)['stream'],
"publisherSignature": sig_dict
}
proto = Claim.load(msg)
return proto, Signature.flagged_parse(proto.SerializeToString())
class NIST256pSigner(NIST_ECDSASigner):
CURVE = ecdsa.NIST256p
CURVE_NAME = NIST256p
class NIST384pSigner(NIST_ECDSASigner):
CURVE = ecdsa.NIST384p
CURVE_NAME = NIST384p
HASHFUNC = hashlib.sha384
HASHFUNC_NAME = SHA384
class SECP256k1Signer(NIST_ECDSASigner):
CURVE = ecdsa.SECP256k1
CURVE_NAME = SECP256k1
def get_signer(curve):
if curve == NIST256p:
return NIST256pSigner
elif curve == NIST384p:
return NIST384pSigner
elif curve == SECP256k1:
return SECP256k1Signer
else:
raise Exception("Unknown curve: %s" % str(curve))

View file

@ -0,0 +1,6 @@
from lbrynet.schema.base import Signable
class Support(Signable):
__slots__ = ()
message_class = None # TODO: add support protobufs

View file

@ -1,5 +1,4 @@
import re
from lbrynet.schema.error import URIParseError
PROTOCOL = 'lbry://'
CHANNEL_CHAR = '@'
@ -13,6 +12,10 @@ CLAIM_ID_MAX_LENGTH = 40
CHANNEL_NAME_MIN_LENGTH = 1
class URIParseError(Exception):
pass
class URI(object):
__slots__ = ['name', 'claim_sequence', 'bid_position', 'claim_id', 'path']

View file

@ -1,142 +0,0 @@
from string import hexdigits
import ecdsa
import hashlib
import binascii
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_der_public_key
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.exceptions import InvalidSignature
from ecdsa.util import sigencode_der
from lbrynet.schema.address import decode_address
from lbrynet.schema.constants import NIST256p, NIST384p, SECP256k1, ECDSA_CURVES, CURVE_NAMES
def validate_claim_id(claim_id):
if not len(claim_id) == 40:
raise Exception("Incorrect claimid length: %i" % len(claim_id))
if isinstance(claim_id, bytes):
claim_id = claim_id.decode('utf-8')
if set(claim_id).difference(hexdigits):
raise Exception("Claim id is not hex encoded")
class Validator:
CURVE_NAME = None
HASHFUNC = hashlib.sha256
def __init__(self, public_key, certificate_claim_id):
validate_claim_id(certificate_claim_id)
if CURVE_NAMES.get(get_key_type_from_dem(public_key)) != self.CURVE_NAME:
raise Exception("Curve mismatch")
self._public_key = public_key
self._certificate_claim_id = certificate_claim_id
@property
def public_key(self):
return self._public_key
@property
def certificate_claim_id(self):
return self._certificate_claim_id
@classmethod
def signing_key_from_pem(cls, pem):
return ecdsa.SigningKey.from_pem(pem, hashfunc=cls.HASHFUNC)
@classmethod
def signing_key_from_der(cls, der):
return ecdsa.SigningKey.from_der(der, hashfunc=cls.HASHFUNC)
@classmethod
def load_from_certificate(cls, certificate_claim, certificate_claim_id):
certificate = certificate_claim.certificate
return cls(certificate.publicKey, certificate_claim_id)
def validate_signature(self, digest, signature):
public_key = load_der_public_key(self.public_key, default_backend())
if len(signature) == 64:
hash = hashes.SHA256()
elif len(signature) == 96:
hash = hashes.SHA384()
signature = binascii.hexlify(signature)
r = int(signature[:int(len(signature)/2)], 16)
s = int(signature[int(len(signature)/2):], 16)
encoded_sig = sigencode_der(r, s, len(signature)*4)
try:
public_key.verify(encoded_sig, digest, ec.ECDSA(Prehashed(hash)))
return True
except InvalidSignature:
# TODO Fixme. This is what is expected today on the outer calls. This should be implementation independent
# but requires changing everything calling that
from ecdsa import BadSignatureError
raise BadSignatureError
def validate_claim_signature(self, claim, claim_address, name):
to_sign = bytearray()
if claim.detached_signature and claim.detached_signature.raw_signature:
assert name is not None, "Name is required for verifying detached signatures."
to_sign.extend(name.lower().encode())
signature = claim.detached_signature.raw_signature
payload = claim.detached_signature.payload
else:
# extract and serialize the stream from the claim, then check the signature
signature = binascii.unhexlify(claim.signature)
payload = claim.serialized_no_signature
decoded_address = decode_address(claim_address)
if signature is None:
raise Exception("No signature to validate")
to_sign.extend(decoded_address)
to_sign.extend(payload)
to_sign.extend(binascii.unhexlify(self.certificate_claim_id))
return self.validate_signature(self.HASHFUNC(to_sign).digest(), signature)
def validate_private_key(self, private_key):
if not isinstance(private_key, ecdsa.SigningKey):
raise TypeError("Not given a signing key, given a %s" % str(type(private_key)))
return private_key.get_verifying_key().to_der() == self.public_key
class NIST256pValidator(Validator):
CURVE_NAME = NIST256p
HASHFUNC = hashlib.sha256
class NIST384pValidator(Validator):
CURVE_NAME = NIST384p
HASHFUNC = hashlib.sha384
class SECP256k1Validator(Validator):
CURVE_NAME = SECP256k1
HASHFUNC = hashlib.sha256
def get_validator(curve):
if curve == NIST256p:
return NIST256pValidator
elif curve == NIST384p:
return NIST384pValidator
elif curve == SECP256k1:
return SECP256k1Validator
else:
raise Exception("Unknown curve: %s" % str(curve))
def get_key_type_from_dem(pubkey_dem):
name = serialization.load_der_public_key(pubkey_dem, default_backend()).curve.name
if name == 'secp256k1':
return ECDSA_CURVES[SECP256k1]
elif name == 'secp256r1':
return ECDSA_CURVES[NIST256p]
elif name == 'secp384r1':
return ECDSA_CURVES[NIST384p]
raise Exception("unexpected curve: %s" % name)

View file

@ -226,6 +226,6 @@ class ManagedStream:
self.stream_claim_info = StoredStreamClaim(
self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
claim_info['name'], claim_info['amount'], claim_info['height'],
binascii.hexlify(claim.serialized).decode(), claim.certificate_id, claim_info['address'],
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
claim_info['claim_sequence'], claim_info.get('channel_name')
)

View file

@ -4,15 +4,15 @@ import typing
import binascii
import logging
import random
from decimal import Decimal
from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError, \
DownloadDataTimeout, DownloadSDTimeout
from lbrynet.utils import generate_id
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.downloader import StreamDownloader
from lbrynet.stream.managed_stream import ManagedStream
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.claim import Claim
from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.schema.decode import smart_decode
from lbrynet.extras.daemon.storage import lbc_to_dewies
if typing.TYPE_CHECKING:
from lbrynet.conf import Config
@ -74,7 +74,7 @@ class StreamManager:
async def _update_content_claim(self, stream: ManagedStream):
claim_info = await self.storage.get_content_claim(stream.stream_hash)
stream.set_claim(claim_info, smart_decode(claim_info['value']))
stream.set_claim(claim_info, claim_info['value'])
async def start_stream(self, stream: ManagedStream) -> bool:
"""
@ -406,7 +406,7 @@ class StreamManager:
if 'error' in resolved:
raise ResolveError(f"error resolving stream: {resolved['error']}")
claim = ClaimDict.load_dict(resolved['value'])
claim = Claim.from_bytes(binascii.unhexlify(resolved['hex']))
outpoint = f"{resolved['txid']}:{resolved['nout']}"
resolved_time = self.loop.time() - start_time
@ -417,12 +417,12 @@ class StreamManager:
# check that the fee is payable
fee_amount, fee_address = None, None
if claim.has_fee:
if claim.stream.has_fee:
fee_amount = round(exchange_rate_manager.convert_currency(
claim.source_fee.currency, "LBC", claim.source_fee.amount
claim.stream.fee.currency, "LBC", claim.stream.fee.amount
), 5)
max_fee_amount = round(exchange_rate_manager.convert_currency(
self.config.max_key_fee['currency'], "LBC", self.config.max_key_fee['amount']
self.config.max_key_fee['currency'], "LBC", Decimal(self.config.max_key_fee['amount'])
), 5)
if fee_amount > max_fee_amount:
msg = f"fee of {fee_amount} exceeds max configured to allow of {max_fee_amount}"
@ -433,11 +433,11 @@ class StreamManager:
msg = f"fee of {fee_amount} exceeds max available balance"
log.warning(msg)
raise InsufficientFundsError(msg)
fee_address = claim.source_fee.address.decode()
fee_address = claim.stream.fee.address
# download the stream
download_id = binascii.hexlify(generate_id()).decode()
downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.source_hash.decode(),
downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.stream.hash,
self.config.download_dir, file_name)
stream = None
@ -484,7 +484,7 @@ class StreamManager:
None if not stream else len(stream.downloader.blob_downloader.scores),
False if not downloader else downloader.added_fixed_peers,
self.config.fixed_peer_delay if not downloader else downloader.fixed_peers_delay,
claim.source_hash.decode(), time_to_descriptor,
claim.stream.hash, time_to_descriptor,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
time_to_first_bytes, None if not error else error.__class__.__name__

View file

@ -14,7 +14,7 @@ import pkg_resources
import contextlib
import certifi
import aiohttp
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.claim import Claim
from lbrynet.cryptoutils import get_lbry_hash_obj
@ -115,8 +115,8 @@ def short_hash(hash_str):
def get_sd_hash(stream_info):
if not stream_info:
return None
if isinstance(stream_info, ClaimDict):
return stream_info.source_hash
if isinstance(stream_info, Claim):
return stream_info.stream.hash
result = stream_info.get('claim', {}).\
get('value', {}).\
get('stream', {}).\

View file

@ -4,7 +4,8 @@ __node_bin__ = ''
__node_url__ = (
'https://github.com/lbryio/lbrycrd/releases/download/v0.12.4.0/lbrycrd-linux.zip'
)
__spvserver__ = 'lbrynet.extras.wallet.server.coin.LBCRegTest'
__spvserver__ = 'lbrynet.wallet.server.coin.LBCRegTest'
from lbrynet.wallet.manager import LbryWalletManager
from lbrynet.extras.wallet.network import Network
from lbrynet.wallet.network import Network
from lbrynet.wallet.ledger import MainNetLedger, RegTestLedger, TestNetLedger

View file

@ -7,9 +7,6 @@ from string import hexdigits
from torba.client.baseaccount import BaseAccount
from torba.client.basetransaction import TXORef
from lbrynet.schema.claim import ClaimDict
#from lbrynet.schema.signer import SECP256k1, get_signer
log = logging.getLogger(__name__)
@ -23,11 +20,6 @@ def validate_claim_id(claim_id):
raise Exception("Claim id is not hex encoded")
def generate_certificate():
secp256k1_private_key = get_signer(SECP256k1).generate().private_key.to_pem()
return ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1), secp256k1_private_key
class Account(BaseAccount):
def __init__(self, *args, **kwargs):

View file

@ -56,8 +56,8 @@ class WalletDatabase(BaseDatabase):
channel_ids = set()
for txo in txos:
if txo.script.is_claim_name or txo.script.is_update_claim:
if 'publisherSignature' in txo.claim_dict:
channel_ids.add(txo.claim_dict['publisherSignature']['certificateId'])
if txo.claim.is_signed:
channel_ids.add(txo.claim.signing_channel_id)
if txo.claim_name.startswith('@') and my_account is not None:
txo.private_key = my_account.get_certificate_private_key(txo.ref)

View file

@ -3,8 +3,7 @@ import logging
from binascii import unhexlify
from torba.client.baseledger import BaseLedger
from lbrynet.schema.error import URIParseError
from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.schema.uri import parse_lbry_uri, URIParseError
from lbrynet.wallet.dewies import dewies_to_lbc
from lbrynet.wallet.resolve import Resolver
from lbrynet.wallet.account import Account, validate_claim_id

View file

@ -6,15 +6,13 @@ from binascii import unhexlify
from datetime import datetime
from typing import Optional
from lbrynet.schema.claim import Claim
from torba.client.basemanager import BaseWalletManager
from torba.rpc.jsonrpc import CodeMessageError
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.claim import Claim
from lbrynet.wallet.ledger import MainNetLedger
from lbrynet.wallet.account import BaseAccount, generate_certificate
from lbrynet.wallet.transaction import Transaction
from lbrynet.wallet.account import BaseAccount
from lbrynet.wallet.transaction import Transaction, Output, Input
from lbrynet.wallet.database import WalletDatabase
from lbrynet.wallet.dewies import dewies_to_lbc
@ -247,7 +245,7 @@ class LbryWalletManager(BaseWalletManager):
if not amount:
amount = claims[0].get_estimator(self.ledger).effective_amount
tx = await Transaction.update(
claims[0], ClaimDict.deserialize(claims[0].script.values['claim']), amount,
claims[0], claims[0].claim, amount,
destination_address.encode(), [account], account
)
await self.ledger.broadcast(tx)
@ -395,27 +393,36 @@ class LbryWalletManager(BaseWalletManager):
return account.get_utxos()
async def claim_name(self, account, name, amount, claim: Claim, certificate=None, claim_address=None):
if not claim_address:
claim_address = await account.receiving.get_or_create_usable_address()
if certificate:
claim = claim.sign(certificate.claim_id, certificate.private_key)
claim_address = claim_address or await account.receiving.get_or_create_usable_address()
existing_claims = await account.get_claims(
claim_name_type__any={'is_claim': 1, 'is_update': 1}, # exclude is_supports
claim_name=name
)
inputs = []
if len(existing_claims) == 0:
tx = await Transaction.claim(
name, claim, amount, claim_address, [account], account
claim_output = Output.pay_claim_name_pubkey_hash(
amount, name, claim, account.ledger.address_to_hash160(claim_address)
)
elif len(existing_claims) == 1:
tx = await Transaction.update(
existing_claims[0], claim, amount, claim_address, [account], account
previous_claim = existing_claims[0]
claim_output = Output.pay_update_claim_pubkey_hash(
amount, previous_claim.claim_name, previous_claim.claim_id,
claim, account.ledger.address_to_hash160(claim_address)
)
inputs = [Input.spend(previous_claim)]
else:
raise NameError(f"More than one other claim exists with the name '{name}'.")
if certificate:
claim.sign(certificate.claim_id, certificate.private_key, tx.inputs[0].txo_ref.id.encode())
claim_output.sign(certificate, first_input_id=b'placeholder')
tx = await Transaction.create(inputs, [claim_output], [account], account)
if certificate:
claim_output.sign(certificate)
tx._reset()
await account.ledger.broadcast(tx)
await self.old_db.save_claims([self._old_get_temp_claim_info(
tx, tx.outputs[0], claim_address, claim, name, dewies_to_lbc(amount)
@ -463,14 +470,19 @@ class LbryWalletManager(BaseWalletManager):
async def claim_new_channel(self, channel_name, amount, account):
address = await account.receiving.get_or_create_usable_address()
cert, key = generate_certificate()
tx = await Transaction.claim(channel_name, cert, amount, address, [account], account)
claim = Claim()
claim_output = Output.pay_claim_name_pubkey_hash(
amount, channel_name, claim, account.ledger.address_to_hash160(address)
)
key = claim_output.generate_channel_private_key()
tx = await Transaction.create([], [claim_output], [account], account)
await account.ledger.broadcast(tx)
account.add_certificate_private_key(tx.outputs[0].ref, key.decode())
# TODO: release reserved tx outputs in case anything fails by this point
await self.old_db.save_claims([self._old_get_temp_claim_info(
tx, tx.outputs[0], address, cert, channel_name, dewies_to_lbc(amount)
tx, tx.outputs[0], address, claim, channel_name, dewies_to_lbc(amount)
)])
return tx

View file

@ -5,7 +5,7 @@ from binascii import unhexlify, hexlify
from lbrynet.wallet.dewies import dewies_to_lbc
from lbrynet.error import UnknownNameError, UnknownClaimID, UnknownURI, UnknownOutpoint
from lbrynet.schema.claim import Claim
from lbrynet.schema.error import DecodeError
from google.protobuf.message import DecodeError
from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.wallet.claim_proofs import verify_proof, InvalidProofError
log = logging.getLogger(__name__)

View file

@ -7,7 +7,7 @@ from torba.server.hash import hash_to_hex_str
from torba.server.block_processor import BlockProcessor
from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.schema.decode import smart_decode
from lbrynet.schema.claim import Claim
from lbrynet.wallet.server.model import NameClaim, ClaimInfo, ClaimUpdate, ClaimSupport
@ -148,14 +148,14 @@ class LBRYBlockProcessor(BlockProcessor):
def _checksig(self, name, value, address):
try:
parse_lbry_uri(name.decode()) # skip invalid names
claim_dict = smart_decode(value)
cert_id = unhexlify(claim_dict.certificate_id)[::-1]
claim_dict = Claim.from_bytes(value)
cert_id = unhexlify(claim_dict.signing_channel_id)[::-1]
if not self.should_validate_signatures:
return cert_id
if cert_id:
cert_claim = self.db.get_claim_info(cert_id)
if cert_claim:
certificate = smart_decode(cert_claim.value)
certificate = Claim.from_bytes(cert_claim.value)
claim_dict.validate_signature(address, certificate)
return cert_id
except Exception as e:

View file

@ -6,7 +6,7 @@ from torba.server.hash import hash_to_hex_str, HASHX_LEN
from hashlib import sha256
from torba.server.coins import Coin, CoinError
from lbrynet.extras.wallet.server.opcodes import decode_claim_script, opcodes as lbry_opcodes
from lbrynet.wallet.server.opcodes import decode_claim_script, opcodes as lbry_opcodes
class LBC(Coin):

View file

@ -7,10 +7,9 @@ from torba.server.hash import hash_to_hex_str
from torba.server.session import ElectrumX
from torba.server import util
from lbrynet.schema.uri import parse_lbry_uri, CLAIM_ID_MAX_LENGTH
from lbrynet.schema.error import URIParseError
from lbrynet.extras.wallet.server.block_processor import LBRYBlockProcessor
from lbrynet.extras.wallet.server.db import LBRYDB
from lbrynet.schema.uri import parse_lbry_uri, CLAIM_ID_MAX_LENGTH, URIParseError
from lbrynet.wallet.server.block_processor import LBRYBlockProcessor
from lbrynet.wallet.server.db import LBRYDB
class LBRYElectrumX(ElectrumX):

View file

@ -1,5 +1,5 @@
from torba.server.tx import Deserializer
from lbrynet.extras.wallet.server.opcodes import decode_claim_script
from lbrynet.wallet.server.opcodes import decode_claim_script
from lbrynet.wallet.server.model import TxClaimOutput, LBRYTx

View file

@ -1,4 +1,5 @@
import struct
import hashlib
from binascii import hexlify, unhexlify
from typing import List, Iterable, Optional
@ -26,12 +27,11 @@ class Output(BaseOutput):
script: OutputScript
script_class = OutputScript
__slots__ = '_claim', 'channel', 'private_key'
__slots__ = 'channel', 'private_key'
def __init__(self, *args, channel: Optional['Output'] = None,
private_key: Optional[str] = None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._claim = None
self.channel = channel
self.private_key = private_key
@ -69,9 +69,9 @@ class Output(BaseOutput):
@property
def claim(self) -> Claim:
if self.is_claim:
if self._claim is None:
self._claim = Claim.from_bytes(self.script.values['claim'])
return self._claim
if not isinstance(self.script.values['claim'], Claim):
self.script.values['claim'] = Claim.from_bytes(self.script.values['claim'])
return self.script.values['claim']
raise ValueError('Only claim name and claim update have the claim payload.')
@property
@ -90,46 +90,55 @@ class Output(BaseOutput):
def has_private_key(self):
return self.private_key is not None
def is_signed_by(self, channel: 'Output', ledger):
def is_signed_by(self, channel: 'Output', ledger=None):
if self.claim.unsigned_payload:
digest = sha256(b''.join([
pieces = [
Base58.decode(self.get_address(ledger)),
self.claim.unsigned_payload,
self.claim.certificate_id
]))
public_key = load_der_public_key(channel.claim.channel.public_key_bytes, default_backend())
hash = hashes.SHA256()
signature = hexlify(self.claim.signature)
r = int(signature[:int(len(signature)/2)], 16)
s = int(signature[int(len(signature)/2):], 16)
encoded_sig = sigencode_der(r, s, len(signature)*4)
public_key.verify(encoded_sig, digest, ec.ECDSA(Prehashed(hash)))
return True
self.claim.signing_channel_id
]
else:
digest = sha256(b''.join([
self.certificate_id.encode(),
first_input_txid_nout.encode(),
self.to_bytes()
])).digest()
pieces = [
self.tx_ref.tx.inputs[0].txo_ref.id.encode(),
self.claim.signing_channel_id,
self.claim.to_message_bytes()
]
digest = sha256(b''.join(pieces))
public_key = load_der_public_key(channel.claim.channel.public_key_bytes, default_backend())
hash = hashes.SHA256()
signature = hexlify(self.claim.signature)
r = int(signature[:int(len(signature)/2)], 16)
s = int(signature[int(len(signature)/2):], 16)
encoded_sig = sigencode_der(r, s, len(signature)*4)
public_key.verify(encoded_sig, digest, ec.ECDSA(Prehashed(hash)))
return True
def sign(self, channel: 'Output'):
def sign(self, channel: 'Output', first_input_id=None):
self.claim.signing_channel_id = unhexlify(channel.claim_id)
digest = sha256(b''.join([
certificate_id.encode(),
first_input_txid_nout.encode(),
self.to_bytes()
])).digest()
private_key = ecdsa.SigningKey.from_pem(private_key_text, hashfunc="sha256")
self.signature = private_key.sign_digest_deterministic(digest, hashfunc="sha256")
self.certificate_id = certificate_id
self.script.values['claim'] = self._claim.to_bytes()
first_input_id or self.tx_ref.tx.inputs[0].txo_ref.id.encode(),
self.claim.signing_channel_id,
self.claim.to_message_bytes()
]))
private_key = ecdsa.SigningKey.from_pem(channel.private_key, hashfunc=hashlib.sha256)
self.claim.signature = private_key.sign_digest_deterministic(digest, hashfunc=hashlib.sha256)
def generate_channel_private_key(self):
private_key = ecdsa.SigningKey.generate(curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256)
self.private_key = private_key.to_pem()
self.claim.channel.public_key_bytes = private_key.get_verifying_key().to_der()
return self.private_key
def is_channel_private_key(self, private_key_pem):
private_key = ecdsa.SigningKey.from_pem(private_key_pem, hashfunc=hashlib.sha256)
return self.claim.channel.public_key_bytes == private_key.get_verifying_key().to_der()
@classmethod
def pay_claim_name_pubkey_hash(
cls, amount: int, claim_name: str, claim: Claim, pubkey_hash: bytes) -> 'Output':
script = cls.script_class.pay_claim_name_pubkey_hash(
claim_name.encode(), claim.to_bytes(), pubkey_hash)
claim_name.encode(), claim, pubkey_hash)
txo = cls(amount, script)
txo._claim = claim
return txo
@classmethod
@ -138,7 +147,6 @@ class Output(BaseOutput):
script = cls.script_class.pay_update_claim_pubkey_hash(
claim_name.encode(), unhexlify(claim_id)[::-1], claim, pubkey_hash)
txo = cls(amount, script)
txo._claim = claim
return txo
@classmethod

View file

@ -1,8 +1,8 @@
import tempfile
from lbrynet.extras.wallet.transaction import Transaction
from lbrynet.wallet.transaction import Transaction
from lbrynet.error import InsufficientFundsError
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.claim import Claim
from integration.testcase import CommandTestCase
@ -40,8 +40,8 @@ class ClaimCommands(CommandTestCase):
self.assertEqual(txs[0]['update_info'][0]['balance_delta'], '1.5')
self.assertEqual(txs[0]['update_info'][0]['claim_id'], claim['claim_id'])
self.assertEqual(txs[0]['value'], '0.0')
self.assertEqual(txs[0]['fee'], '-0.0001985')
await self.assertBalance(self.account, '8.9796945')
self.assertEqual(txs[0]['fee'], '-0.000182')
await self.assertBalance(self.account, '8.979711')
await self.out(self.daemon.jsonrpc_claim_abandon(claim['claim_id']))
txs = await self.out(self.daemon.jsonrpc_transaction_list())
@ -50,7 +50,7 @@ class ClaimCommands(CommandTestCase):
self.assertEqual(txs[0]['abandon_info'][0]['claim_id'], claim['claim_id'])
self.assertEqual(txs[0]['value'], '0.0')
self.assertEqual(txs[0]['fee'], '-0.000107')
await self.assertBalance(self.account, '9.9795875')
await self.assertBalance(self.account, '9.979604')
async def test_update_claim_holding_address(self):
other_account_id = (await self.daemon.jsonrpc_account_create('second account'))['id']

View file

@ -4,17 +4,14 @@ import tempfile
import logging
from binascii import unhexlify
import lbrynet.extras.wallet
from lbrynet.schema.claim import ClaimDict
from torba.testcase import IntegrationTestCase
import lbrynet.schema
lbrynet.schema.BLOCKCHAIN_NAME = 'lbrycrd_regtest'
import lbrynet.wallet
from lbrynet.schema.claim import Claim
from lbrynet.conf import Config
from lbrynet.extras.daemon.Daemon import Daemon, jsonrpc_dumps_pretty
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.wallet import LbryWalletManager
from lbrynet.extras.daemon.Components import Component, WalletComponent
from lbrynet.extras.daemon.Components import (
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
@ -63,7 +60,7 @@ class ExchangeRateManagerComponent(Component):
class CommandTestCase(IntegrationTestCase):
timeout = 180
LEDGER = lbrynet.extras.wallet
LEDGER = lbrynet.wallet
MANAGER = LbryWalletManager
VERBOSITY = logging.WARN

View file

@ -3,16 +3,15 @@ from unittest import mock
import json
from lbrynet.conf import Config
from lbrynet.schema.decode import smart_decode
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT
from lbrynet.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT
from lbrynet.extras.daemon.Components import UPNP_COMPONENT, BLOB_COMPONENT
from lbrynet.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
from lbrynet.extras.daemon.Components import HEADERS_COMPONENT, STREAM_MANAGER_COMPONENT
from lbrynet.extras.daemon.Components import HEADERS_COMPONENT
from lbrynet.extras.daemon.Daemon import Daemon as LBRYDaemon
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.wallet import LbryWalletManager
from torba.client.wallet import Wallet
from tests import test_utils

View file

@ -1,5 +1,6 @@
import unittest
from lbrynet.schema.fee import Fee
from decimal import Decimal
from lbrynet.schema.claim import Claim
from lbrynet.extras.daemon import exchange_rate_manager
from lbrynet.error import InvalidExchangeRateResponse
from tests import test_utils
@ -43,28 +44,6 @@ def get_dummy_exchange_rate_manager(time):
return DummyExchangeRateManager([BTCLBCFeed(), USDBTCFeed()], rates)
class FeeFormatTest(unittest.TestCase):
def test_fee_created_with_correct_inputs(self):
fee_dict = {
'currency': 'USD',
'amount': 10.0,
'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
}
fee = Fee(fee_dict)
self.assertEqual(10.0, fee['amount'])
self.assertEqual('USD', fee['currency'])
def test_fee_zero(self):
fee_dict = {
'currency': 'LBC',
'amount': 0.0,
'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
}
fee = Fee(fee_dict)
self.assertEqual(0.0, fee['amount'])
self.assertEqual('LBC', fee['currency'])
class ExchangeRateTest(unittest.TestCase):
def setUp(self):
test_utils.reset_time(self)
@ -81,11 +60,9 @@ class FeeTest(unittest.TestCase):
test_utils.reset_time(self)
def test_fee_converts_to_lbc(self):
fee = Fee({
'currency': 'USD',
'amount': 10.0,
'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
})
fee = Claim().stream.fee
fee.usd = Decimal(10.0)
fee.address = "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
rates = {
'BTCLBC': {'spot': 3.0, 'ts': test_utils.DEFAULT_ISO_TIME + 1},
@ -99,11 +76,9 @@ class FeeTest(unittest.TestCase):
def test_missing_feed(self):
# test when a feed is missing for conversion
fee = Fee({
'currency': 'USD',
'amount': 1.0,
'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
})
fee = Claim().stream.fee
fee.usd = Decimal(1.0)
fee.address = "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
rates = {
'BTCLBC': {'spot': 1.0, 'ts': test_utils.DEFAULT_ISO_TIME + 1},

View file

@ -2,7 +2,6 @@ from unittest import TestCase
from binascii import unhexlify
from lbrynet.schema import Claim
from lbrynet.schema.base import b58decode
class TestOldJSONSchemaCompatibility(TestCase):
@ -144,8 +143,6 @@ class TestTypesV1Compatibility(TestCase):
'3f17'
)
self.assertTrue(stream.is_signed_by(channel, b58decode('bb4UAfujhmvTgyx7ufoEa4aevum6hKSW36')))
def test_unsigned_with_fee(self):
claim = Claim.from_bytes(unhexlify(
b'080110011ad6010801127c080410011a08727067206d69646922046d6964692a08727067206d696469322'

View file

@ -1,219 +0,0 @@
claim_id_1 = "63f2da17b0d90042c559cc73b6b17f853945c43e"
claim_address_2 = "bDtL6qriyimxz71DSYjojTBsm6cpM1bqmj"
claim_address_1 = "bUG7VaMzLEqqyZQAyg9srxQzvf1wwnJ48w"
nist256p_private_key = """-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIBhixPFinjHmG94r00VBjmE73XZmlSHag5Bg3BFdCeQgoAoGCCqGSM49
AwEHoUQDQgAEtSfatRTR6ppwoDVJ94hbvhFDF42mACkWSc2Tao6zzYW4xaRPbI7j
IBUL+6prbDM+GXZ8X2mtmeaNIgjWTT7YFw==
-----END EC PRIVATE KEY-----
"""
nist384p_private_key = """-----BEGIN EC PRIVATE KEY-----
MIGkAgEBBDD5PPbcgT62WADeVBkDFsKCTCwQULHD7eE0iZz7c9Xk+6gZazMFgsGp
O0Rs9n+lmACgBwYFK4EEACKhZANiAASzpp0t4nIxoedhQN+J2pZ/EmwZl/x4dwdd
AjY4ZwKBdhfWIWgtcET9PBJlda0EvxR+CTwrt1em26VNS/57eH3yNFJQdCQiMSFY
mTtML6D/rctN1oztTSQdwHPA9x99FcU=
-----END EC PRIVATE KEY-----
"""
secp256k1_private_key = """-----BEGIN EC PRIVATE KEY-----
MHQCAQEEIPbjaEfCCCy5HHvGHkEw3X/dTJXlr4jcEJHV1OmcBDPmoAcGBSuBBAAK
oUQDQgAElLPrkVIapvtKrv0DkgQb9vAXtCQDBIu+iHlsQC5dx1ZnOWZwpYKQuM4i
LNbuTlfxCHWYwovwLjYnao8iwgp0og==
-----END EC PRIVATE KEY-----
"""
nist256p_cert = {
"version": "_0_0_1",
"claimType": "certificateType",
"certificate": {
"publicKey": "3059301306072a8648ce3d020106082a8648ce3d03010703420004b527dab514d1ea9a70a03549f7885bbe1143178da600291649cd936a8eb3cd85b8c5a44f6c8ee320150bfbaa6b6c333e19767c5f69ad99e68d2208d64d3ed817",
"keyType": "NIST256p",
"version": "_0_0_1"
}
}
nist384p_cert = {
"version": "_0_0_1",
"claimType": "certificateType",
"certificate": {
"publicKey": "3076301006072a8648ce3d020106052b8104002203620004b3a69d2de27231a1e76140df89da967f126c1997fc7877075d0236386702817617d621682d7044fd3c126575ad04bf147e093c2bb757a6dba54d4bfe7b787df2345250742422312158993b4c2fa0ffadcb4dd68ced4d241dc073c0f71f7d15c5",
"keyType": "NIST384p",
"version": "_0_0_1"
}
}
secp256k1_cert = {
"version": "_0_0_1",
"claimType": "certificateType",
"certificate": {
"publicKey": "3056301006072a8648ce3d020106052b8104000a0342000494b3eb91521aa6fb4aaefd0392041bf6f017b42403048bbe88796c402e5dc75667396670a58290b8ce222cd6ee4e57f1087598c28bf02e36276a8f22c20a74a2",
"keyType": "SECP256k1",
"version": "_0_0_1"
}
}
malformed_secp256k1_cert = {
"version": "_0_0_1",
"claimType": "certificateType",
"certificate": {
"publicKey": "3056301006072a8648ce3d020106052b8104000a0342000494b3eb91521aa6fb4aaefd0392041bf6f017b42403048bbe88796c402e5dc75667396670a58290b8ce222cd6ee4e57f1087598c28bf02e36276a8f22c20a74a2",
"keyType": "NIST256p",
"version": "_0_0_1"
}
}
example_003 = {
"language": "en",
"license": "LBRY Inc",
"nsfw": False,
"description": "What is LBRY? An introduction with Alex Tabarrok",
"content_type": "video/mp4",
"author": "Samuel Bryan",
"ver": "0.0.3",
"title": "What is LBRY?",
"sources": {
"lbry_sd_hash": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b"
},
"thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png"
}
example_010 = {
"version": "_0_0_1",
"claimType": "streamType",
"stream": {
"source": {
"source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b",
"version": "_0_0_1",
"contentType": "video/mp4",
"sourceType": "lbry_sd_hash"
},
"version": "_0_0_1",
"metadata": {
"license": "LBRY Inc",
"description": "What is LBRY? An introduction with Alex Tabarrok",
"language": "en",
"title": "What is LBRY?",
"author": "Samuel Bryan",
"version": "_0_1_0",
"nsfw": False,
"licenseUrl": "",
"preview": "",
"thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png"
}
}
}
example_010_serialized = "080110011adc010801129401080410011a0d57686174206973204c4252593f223057686174206973204c4252593f20416e20696e74726f64756374696f6e207769746820416c6578205461626172726f6b2a0c53616d75656c20427279616e32084c42525920496e6338004a2f68747470733a2f2f73332e616d617a6f6e6177732e636f6d2f66696c65732e6c6272792e696f2f6c6f676f2e706e6752005a001a41080110011a30d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b2209766964656f2f6d7034"
claim_010_signed_nist256p = {
"version": "_0_0_1",
"publisherSignature": {
"certificateId": "63f2da17b0d90042c559cc73b6b17f853945c43e",
"signatureType": "NIST256p",
"version": "_0_0_1",
"signature": "ec117f5e16a911f704aab8efa9178b1cdfcad0ba8e571ba86a56ecdade129fdff60ff7dcf00355bda788020a43a40fbd55aaaa080c3555fd8f0a87612b62936a"
},
"claimType": "streamType",
"stream": {
"source": {
"source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b",
"version": "_0_0_1",
"contentType": "video/mp4",
"sourceType": "lbry_sd_hash"
},
"version": "_0_0_1",
"metadata": {
"license": "LBRY Inc",
"description": "What is LBRY? An introduction with Alex Tabarrok",
"language": "en",
"title": "What is LBRY?",
"author": "Samuel Bryan",
"version": "_0_1_0",
"nsfw": False,
"licenseUrl": "",
"preview": "",
"thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png"
}
}
}
claim_010_signed_nist384p = {
"version": "_0_0_1",
"publisherSignature": {
"certificateId": "63f2da17b0d90042c559cc73b6b17f853945c43e",
"signatureType": "NIST384p",
"version": "_0_0_1",
"signature": "18e56bb52872809ac598c366c5f0fa9ecbcadb01198b7150b0c4518049086b6b4f552f01d16eaf9cbbf061d8ee35520f8fe22f278a4d0aab5f9c8a4cadd38b6bd4bdbb3b4368e24c6e966ebc24684d24f3d19f5a3e4c7bf69273b0f94aa1c51b"
},
"claimType": "streamType",
"stream": {
"source": {
"source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b",
"version": "_0_0_1",
"contentType": "video/mp4",
"sourceType": "lbry_sd_hash"
},
"version": "_0_0_1",
"metadata": {
"license": "LBRY Inc",
"description": "What is LBRY? An introduction with Alex Tabarrok",
"language": "en",
"title": "What is LBRY?",
"author": "Samuel Bryan",
"version": "_0_1_0",
"nsfw": False,
"licenseUrl": "",
"preview": "",
"thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png"
}
}
}
claim_010_signed_secp256k1 = {
"version": "_0_0_1",
"publisherSignature": {
"certificateId": "63f2da17b0d90042c559cc73b6b17f853945c43e",
"signatureType": "SECP256k1",
"version": "_0_0_1",
"signature": "798a37bd4310339e6a9b424ebc3fd2b3263280c13c0d08b1d1fa5e53d29c102b2d340cedecc5018988819db0ac6eb61bf67dbeec4ebee7231668fd13931e6320"
},
"claimType": "streamType",
"stream": {
"source": {
"source": "d5169241150022f996fa7cd6a9a1c421937276a3275eb912790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b",
"version": "_0_0_1",
"contentType": "video/mp4",
"sourceType": "lbry_sd_hash"
},
"version": "_0_0_1",
"metadata": {
"license": "LBRY Inc",
"description": "What is LBRY? An introduction with Alex Tabarrok",
"language": "en",
"title": "What is LBRY?",
"author": "Samuel Bryan",
"version": "_0_1_0",
"nsfw": False,
"licenseUrl": "",
"preview": "",
"thumbnail": "https://s3.amazonaws.com/files.lbry.io/logo.png"
}
}
}
hex_encoded_003="7b22766572223a2022302e302e33222c20226465736372697074696f6e223a202274657374222c20226c6963656e7365223a2022437265617469766520436f6d6d6f6e73204174747269627574696f6e20342e3020496e7465726e6174696f6e616c222c2022617574686f72223a202274657374222c20227469746c65223a202274657374222c20226c616e6775616765223a2022656e222c2022736f7572636573223a207b226c6272795f73645f68617368223a2022323961643231386336316335393934393962323263313732323833373162356665396136653732356564633965663639316137383139623365373430363530303436373835323932303632396662636464626361636631336433313537396434227d2c2022636f6e74656e745f74797065223a2022696d6167652f706e67222c20226e736677223a2066616c73657d"
decoded_hex_encoded_003={u'version': u'_0_0_1', u'claimType': u'streamType', u'stream': {u'source': {u'source': '29ad218c61c599499b22c17228371b5fe9a6e725edc9ef691a7819b3e7406500467852920629fbcddbcacf13d31579d4', u'version': u'_0_0_1', u'contentType': u'image/png', u'sourceType': u'lbry_sd_hash'}, u'version': u'_0_0_1', u'metadata': {u'license': u'Creative Commons Attribution 4.0 International', u'description': u'test', u'language': u'en', u'title': u'test', u'author': u'test', u'version': u'_0_1_0', u'nsfw': False, u'licenseUrl': u'', u'preview': u'', u'thumbnail': u''}}}
binary_claim = b'\x08\x01\x10\x02"^\x08\x01\x10\x03"X0V0\x10\x06\x07*\x86H\xce=\x02\x01\x06\x05+\x81\x04\x00\n\x03B\x00\x04\x89U\x97\x1dk\xbc\xd4\xf7\xe2\xb5\xa9a7\xbc\xa4;\xda\x9a\x13\x84<\x05"\xa5\xc3\no;u\xb6\x8co\x10\x81\x8c\x1d\xf2\xe7\t\x9c.\xc8\x9b\x84\xabz:6\x15\xa5\xb3\x16\n\x03YT&M\x98\xec+\xef\x89;'
expected_binary_claim_decoded = {u'certificate': {u'keyType': u'SECP256k1',
u'publicKey': u'3056301006072a8648ce3d020106052b8104000a034200048955971d6bbcd4f7e2b5a96137bca43bda9a13843c0522a5c30a6f3b75b68c6f10818c1df2e7099c2ec89b84ab7a3a3615a5b3160a035954264d98ec2bef893b',
u'version': u'_0_0_1'},
u'claimType': u'certificateType',
u'version': u'_0_0_1'}

View file

@ -1,645 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ecdsa
import binascii
from copy import deepcopy
import unittest
from lbrynet.schema.signature import Signature, NAMED_SECP256K1
from torba.client.constants import COIN
from .test_data import example_003, example_010, example_010_serialized
from .test_data import claim_id_1, claim_address_1, claim_address_2
from .test_data import binary_claim, expected_binary_claim_decoded
from .test_data import nist256p_private_key, claim_010_signed_nist256p, nist256p_cert
from .test_data import nist384p_private_key, claim_010_signed_nist384p, nist384p_cert
from .test_data import secp256k1_private_key, claim_010_signed_secp256k1, secp256k1_cert
from .test_data import hex_encoded_003, decoded_hex_encoded_003, malformed_secp256k1_cert
from lbrynet import schema
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.constants import NIST256p, NIST384p, SECP256k1
from lbrynet.schema.legacy.migrate import migrate
from lbrynet.schema.signer import get_signer
from lbrynet.schema.uri import URI, URIParseError
from lbrynet.schema.decode import smart_decode, migrate_legacy_protobuf
from lbrynet.schema.error import DecodeError, InvalidAddress
from lbrynet.schema.address import decode_address, encode_address
from lbrynet.schema.proto2 import legacy_claim_pb2
parsed_uri_matches = [
("test", URI("test"), False, False, "test", None),
("test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False, False, "test", None),
("test:1", URI("test", claim_sequence=1), False, False, "test", None),
("test$1", URI("test", bid_position=1), False, False, "test", None),
("lbry://test", URI("test"), False, False, "test", None),
("lbry://test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False, False, "test", None),
("lbry://test:1", URI("test", claim_sequence=1), False, False, "test", None),
("lbry://test$1", URI("test", bid_position=1), False, False, "test", None),
("@test", URI("@test"), True, True, None, "@test"),
("@test#%s" % claim_id_1, URI("@test", claim_id=claim_id_1), True, True, None, "@test"),
("@test:1", URI("@test", claim_sequence=1), True, True, None, "@test"),
("@test$1", URI("@test", bid_position=1), True, True, None, "@test"),
("lbry://@test1:1/fakepath", URI("@test1", claim_sequence=1, path="fakepath"), True, False, "fakepath", "@test1"),
("lbry://@test1$1/fakepath", URI("@test1", bid_position=1, path="fakepath"), True, False, "fakepath", "@test1"),
("lbry://@test1#abcdef/fakepath", URI("@test1", claim_id="abcdef", path="fakepath"), True, False, "fakepath",
"@test1"),
("@z", URI("@z"), True, True, None, "@z"),
("@yx", URI("@yx"), True, True, None, "@yx"),
("@abc", URI("@abc"), True, True, None, "@abc")
]
parsed_uri_raises = [
("lbry://", URIParseError),
("lbry://test:3$1", URIParseError),
("lbry://test$1:1", URIParseError),
("lbry://test#x", URIParseError),
("lbry://test#x/page", URIParseError),
("lbry://test$", URIParseError),
("lbry://test#", URIParseError),
("lbry://test:", URIParseError),
("lbry://test$x", URIParseError),
("lbry://test:x", URIParseError),
("lbry://@test@", URIParseError),
("lbry://@test:", URIParseError),
("lbry://test@", URIParseError),
("lbry://tes@t", URIParseError),
("lbry://test:1#%s" % claim_id_1, URIParseError),
("lbry://test:0", URIParseError),
("lbry://test$0", URIParseError),
("lbry://test/path", URIParseError),
("lbry://@test1#abcdef/fakepath:1", URIParseError),
("lbry://@test1:1/fakepath:1", URIParseError),
("lbry://@test1:1ab/fakepath", URIParseError),
("lbry://test:1:1:1", URIParseError),
("whatever/lbry://test", URIParseError),
("lbry://lbry://test", URIParseError),
("lbry://@/what", URIParseError),
("lbry://abc:0x123", URIParseError),
("lbry://abc:0x123/page", URIParseError),
("lbry://@test1#ABCDEF/fakepath", URIParseError),
("test:0001", URIParseError),
("lbry://@test1$1/fakepath?arg1&arg2&arg3", URIParseError)
]
class UnitTest(unittest.TestCase):
maxDiff = 4000
class TestURIParser(UnitTest):
def setUp(self):
self.longMessage = True
def test_uri_parse(self):
for test_string, expected_uri_obj, contains_channel, is_channel, claim_name, channel_name in parsed_uri_matches:
try:
# string -> URI
self.assertEqual(URI.from_uri_string(test_string), expected_uri_obj, test_string)
# URI -> dict -> URI
self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()), expected_uri_obj,
test_string)
# contains_channel
self.assertEqual(URI.from_uri_string(test_string).contains_channel, contains_channel,
test_string)
# is_channel
self.assertEqual(URI.from_uri_string(test_string).is_channel, is_channel,
test_string)
# claim_name
self.assertEqual(URI.from_uri_string(test_string).claim_name, claim_name,
test_string)
# channel_name
self.assertEqual(URI.from_uri_string(test_string).channel_name, channel_name,
test_string)
# convert-to-string test only works if protocol is present in test_string
if test_string.startswith('lbry://'):
# string -> URI -> string
self.assertEqual(URI.from_uri_string(test_string).to_uri_string(), test_string,
test_string)
# string -> URI -> dict -> URI -> string
uri_dict = URI.from_uri_string(test_string).to_dict()
self.assertEqual(URI.from_dict(uri_dict).to_uri_string(), test_string,
test_string)
# URI -> dict -> URI -> string
self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()).to_uri_string(),
test_string, test_string)
except URIParseError as err:
print("ERROR: " + test_string)
raise
def test_uri_errors(self):
for test_str, err in parsed_uri_raises:
try:
URI.from_uri_string(test_str)
except URIParseError:
pass
else:
print("\nSuccessfully parsed invalid url: " + test_str)
self.assertRaises(err, URI.from_uri_string, test_str)
class TestEncoderAndDecoder(UnitTest):
def test_encode_decode(self):
test_claim = ClaimDict.load_dict(example_010)
self.assertEqual(test_claim.is_certificate, False)
self.assertDictEqual(test_claim.claim_dict, example_010)
test_pb = test_claim.protobuf
self.assertDictEqual(ClaimDict.load_protobuf(test_pb).claim_dict, example_010)
self.assertEqual(test_pb.ByteSize(), ClaimDict.load_protobuf(test_pb).protobuf_len)
self.assertEqual(test_claim.json_len, ClaimDict.load_protobuf(test_pb).json_len)
def test_deserialize(self):
deserialized_claim = ClaimDict.deserialize(binascii.unhexlify(example_010_serialized))
self.assertDictEqual(ClaimDict.load_dict(example_010).claim_dict,
deserialized_claim.claim_dict)
def test_stream_is_not_certificate(self):
deserialized_claim = ClaimDict.deserialize(binascii.unhexlify(example_010_serialized))
self.assertEqual(deserialized_claim.is_certificate, False)
class TestISO639(UnitTest):
def test_alpha2(self):
prefixes = ['en', 'aa', 'ab', 'ae', 'af', 'ak', 'am', 'an', 'ar', 'as', 'av', 'ay', 'az',
'ba', 'be', 'bg', 'bh', 'bi', 'bm', 'bn', 'bo', 'br', 'bs', 'ca', 'ce', 'ch',
'co', 'cr', 'cs', 'cu', 'cv', 'cy', 'da', 'de', 'dv', 'dz', 'ee', 'el', 'eo',
'es', 'et', 'eu', 'fa', 'ff', 'fi', 'fj', 'fo', 'fr', 'fy', 'ga', 'gd', 'gl',
'gn', 'gu', 'gv', 'ha', 'he', 'hi', 'ho', 'hr', 'ht', 'hu', 'hy', 'hz', 'ia',
'id', 'ie', 'ig', 'ii', 'ik', 'io', 'is', 'it', 'iu', 'ja', 'jv', 'ka', 'kg',
'ki', 'kj', 'kk', 'kl', 'km', 'kn', 'ko', 'kr', 'ks', 'ku', 'kv', 'kw', 'ky',
'la', 'lb', 'lg', 'li', 'ln', 'lo', 'lt', 'lu', 'lv', 'mg', 'mh', 'mi', 'mk',
'ml', 'mn', 'mr', 'ms', 'mt', 'my', 'na', 'nb', 'nd', 'ne', 'ng', 'nl', 'nn',
'no', 'nr', 'nv', 'ny', 'oc', 'oj', 'om', 'or', 'os', 'pa', 'pi', 'pl', 'ps',
'pt', 'qu', 'rm', 'rn', 'ro', 'ru', 'rw', 'sa', 'sc', 'sd', 'se', 'sg', 'si',
'sk', 'sl', 'sm', 'sn', 'so', 'sq', 'sr', 'ss', 'st', 'su', 'sv', 'sw', 'ta',
'te', 'tg', 'th', 'ti', 'tk', 'tl', 'tn', 'to', 'tr', 'ts', 'tt', 'tw', 'ty',
'ug', 'uk', 'ur', 'uz', 've', 'vi', 'vo', 'wa', 'wo', 'xh', 'yi', 'yo', 'za',
'zh', 'zu']
for prefix in prefixes:
metadata = deepcopy(example_010)
metadata['stream']['metadata']['language'] = prefix
claim = ClaimDict.load_dict(metadata)
serialized = claim.serialized
self.assertDictEqual(metadata, dict(ClaimDict.deserialize(serialized).claim_dict))
def test_fake_alpha2(self):
fake_codes = ["bb", "zz"]
for fake_code in fake_codes:
metadata = deepcopy(example_010)
metadata['stream']['metadata']['language'] = fake_code
self.assertRaises(DecodeError, ClaimDict.load_dict, metadata)
class TestMigration(UnitTest):
def test_migrate_to_010(self):
migrated_0_1_0 = migrate(example_003)
self.assertDictEqual(migrated_0_1_0.claim_dict, example_010)
self.assertEqual(migrated_0_1_0.is_certificate, False)
class TestNIST256pSignatures(UnitTest):
def test_make_ecdsa_cert(self):
cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p)
self.assertEqual(cert.is_certificate, True)
self.assertDictEqual(cert.claim_dict, nist256p_cert)
def test_validate_ecdsa_signature(self):
cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p)
signed = ClaimDict.load_dict(example_010).sign(nist256p_private_key,
claim_address_2, claim_id_1, curve=NIST256p)
self.assertDictEqual(signed.claim_dict, claim_010_signed_nist256p)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True)
def test_remove_signature_equals_unsigned(self):
unsigned = ClaimDict.load_dict(example_010)
signed = unsigned.sign(nist256p_private_key, claim_address_1, claim_id_1, curve=NIST256p)
self.assertEqual(unsigned.serialized, signed.serialized_no_signature)
def test_fail_to_validate_fake_ecdsa_signature(self):
signed = ClaimDict.load_dict(example_010).sign(nist256p_private_key, claim_address_1,
claim_id_1, curve=NIST256p)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
fake_key = get_signer(NIST256p).generate().private_key.to_pem()
fake_cert = ClaimDict.generate_certificate(fake_key, curve=NIST256p)
self.assertRaises(ecdsa.keys.BadSignatureError, signed_copy.validate_signature,
claim_address_2, fake_cert)
def test_fail_to_validate_ecdsa_sig_for_altered_claim(self):
cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p)
altered = ClaimDict.load_dict(example_010).sign(nist256p_private_key, claim_address_1,
claim_id_1, curve=NIST256p)
sd_hash = altered['stream']['source']['source']
altered['stream']['source']['source'] = sd_hash[::-1]
altered_copy = ClaimDict.load_dict(altered.claim_dict)
self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature,
claim_address_1, cert)
class TestNIST384pSignatures(UnitTest):
def test_make_ecdsa_cert(self):
cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p)
self.assertEqual(cert.is_certificate, True)
self.assertDictEqual(cert.claim_dict, nist384p_cert)
def test_validate_ecdsa_signature(self):
cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p)
signed = ClaimDict.load_dict(example_010).sign(nist384p_private_key,
claim_address_2, claim_id_1, curve=NIST384p)
self.assertDictEqual(signed.claim_dict, claim_010_signed_nist384p)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True)
def test_remove_signature_equals_unsigned(self):
unsigned = ClaimDict.load_dict(example_010)
signed = unsigned.sign(nist384p_private_key, claim_address_1, claim_id_1, curve=NIST384p)
self.assertEqual(unsigned.serialized, signed.serialized_no_signature)
def test_fail_to_validate_fake_ecdsa_signature(self):
signed = ClaimDict.load_dict(example_010).sign(nist384p_private_key, claim_address_1,
claim_id_1, curve=NIST384p)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
fake_key = get_signer(NIST384p).generate().private_key.to_pem()
fake_cert = ClaimDict.generate_certificate(fake_key, curve=NIST384p)
self.assertRaises(ecdsa.keys.BadSignatureError, signed_copy.validate_signature,
claim_address_2, fake_cert)
def test_fail_to_validate_ecdsa_sig_for_altered_claim(self):
cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p)
altered = ClaimDict.load_dict(example_010).sign(nist384p_private_key, claim_address_1,
claim_id_1, curve=NIST384p)
sd_hash = altered['stream']['source']['source']
altered['stream']['source']['source'] = sd_hash[::-1]
altered_copy = ClaimDict.load_dict(altered.claim_dict)
self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature,
claim_address_1, cert)
class TestSECP256k1Signatures(UnitTest):
def test_make_ecdsa_cert(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertEqual(cert.is_certificate, True)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
def test_validate_ecdsa_signature(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2,
claim_id_1, curve=SECP256k1)
self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True)
def test_fail_to_sign_with_no_claim_address(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
self.assertRaises(Exception, ClaimDict.load_dict(example_010).sign, secp256k1_private_key,
None, claim_id_1, curve=SECP256k1)
def test_fail_to_validate_with_no_claim_address(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2,
claim_id_1, curve=SECP256k1)
self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
self.assertRaises(Exception, signed_copy.validate_signature, None, cert)
def test_remove_signature_equals_unsigned(self):
unsigned = ClaimDict.load_dict(example_010)
signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1, curve=SECP256k1)
self.assertEqual(unsigned.serialized, signed.serialized_no_signature)
def test_fail_to_validate_fake_ecdsa_signature(self):
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1,
claim_id_1, curve=SECP256k1)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
fake_key = get_signer(SECP256k1).generate().private_key.to_pem()
fake_cert = ClaimDict.generate_certificate(fake_key, curve=SECP256k1)
self.assertRaises(ecdsa.keys.BadSignatureError, signed_copy.validate_signature,
claim_address_2, fake_cert)
def test_fail_to_validate_ecdsa_sig_for_altered_claim(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1,
claim_id_1, curve=SECP256k1)
sd_hash = altered['stream']['source']['source']
altered['stream']['source']['source'] = sd_hash[::-1]
altered_copy = ClaimDict.load_dict(altered.claim_dict)
self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature,
claim_address_1, cert)
class TestDetachedNamedSECP256k1Signatures(UnitTest):
def test_validate_detached_named_ecdsa_signature(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
signed_copy = ClaimDict.deserialize(signed.serialized)
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
def test_validate_detached_named_ecdsa_signature_from_dict(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
self.assertEqual(
signed.claim_dict['publisherSignature']['detached_signature'],
binascii.hexlify(signed.serialized).decode()
)
signed_copy = ClaimDict.load_dict(signed.claim_dict)
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
def test_validate_what_cant_be_serialized_back(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
original = ClaimDict.load_dict(example_010).serialized
altered = original + b'\x00\x01\x02\x30\x50\x80\x99' # pretend this extra trash is from some unknown protobuf
# manually sign
signer = get_signer(SECP256k1).load_pem(secp256k1_private_key)
signature = signer.sign(
b'example',
decode_address(claim_address_2),
altered,
binascii.unhexlify(claim_id_1),
)
detached_sig = Signature(NAMED_SECP256K1(
signature,
binascii.unhexlify(claim_id_1),
altered
))
signed = detached_sig.serialized
self.assertEqual(signed[85:], altered)
signed_copy = ClaimDict.deserialize(signed)
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
self.assertEqual(signed, signed_copy.serialized)
def test_validate_what_cant_be_serialized_back_even_by_loading_back_from_dictionary(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
original = ClaimDict.load_dict(example_010).serialized
altered = original + b'\x00\x01\x02\x30\x50\x80\x99' # pretend this extra trash is from some unknown protobuf
# manually sign
signer = get_signer(SECP256k1).load_pem(secp256k1_private_key)
signature = signer.sign(
b'example',
decode_address(claim_address_2),
altered,
binascii.unhexlify(claim_id_1),
)
detached_sig = Signature(NAMED_SECP256K1(
signature,
binascii.unhexlify(claim_id_1),
altered
))
signed = detached_sig.serialized
self.assertEqual(signed[85:], altered)
signed_copy = ClaimDict.deserialize(signed)
signed_copy = ClaimDict.load_dict(signed_copy.claim_dict)
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
self.assertEqual(signed, signed_copy.serialized)
def test_fail_to_sign_with_no_claim_address(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
self.assertRaises(Exception, ClaimDict.load_dict(example_010).sign, secp256k1_private_key,
None, claim_id_1, curve=SECP256k1, name='example', force_detached=True)
def test_fail_to_validate_with_no_claim_address(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name='example')
def test_fail_to_validate_with_no_name(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name=None)
def test_remove_signature_equals_unsigned(self):
unsigned = ClaimDict.load_dict(example_010)
signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
self.assertEqual(unsigned.serialized, signed.serialized_no_signature)
def test_fail_to_validate_fake_ecdsa_signature(self):
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
signed_copy = ClaimDict.deserialize(signed.serialized)
fake_key = get_signer(SECP256k1).generate().private_key.to_pem()
fake_cert = ClaimDict.generate_certificate(fake_key, curve=SECP256k1)
self.assertRaises(ecdsa.keys.BadSignatureError, signed_copy.validate_signature,
claim_address_2, fake_cert, 'example')
def test_fail_to_validate_ecdsa_sig_for_altered_claim(self):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
original_serialization = altered.serialized
sd_hash = altered['stream']['source']['source']
altered['stream']['source']['source'] = sd_hash[::-1]
altered_serialization = altered.protobuf.SerializeToString()
# keep signature, but replace serialization with the altered claim (check signature.py for slice sizes)
altered_copy = ClaimDict.deserialize(original_serialization[:85] + altered_serialization)
self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature,
claim_address_1, cert, 'example')
class TestMetadata(UnitTest):
def test_fail_with_fake_sd_hash(self):
claim = deepcopy(example_010)
sd_hash = claim['stream']['source']['source'][:-2]
claim['stream']['source']['source'] = sd_hash
self.assertRaises(AssertionError, ClaimDict.load_dict, claim)
class TestSmartDecode(UnitTest):
def test_hex_decode(self):
self.assertEqual(decoded_hex_encoded_003, smart_decode(hex_encoded_003).claim_dict)
def test_binary_decode(self):
self.assertEqual(expected_binary_claim_decoded, smart_decode(binary_claim).claim_dict)
def test_smart_decode_raises(self):
with self.assertRaises(TypeError):
smart_decode(1)
with self.assertRaises(DecodeError):
smart_decode("aaab")
with self.assertRaises(DecodeError):
smart_decode("{'bogus_dict':1}")
class TestMainnetAddressValidation(UnitTest):
def test_mainnet_address_encode_decode(self):
valid_addr_hex = "55be482f953ed0feda4fc5c4d012681b6119274993dc96bf10"
self.assertEqual(encode_address(binascii.unhexlify(valid_addr_hex)),
b"bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHyR")
self.assertEqual(decode_address("bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHyR"),
binascii.unhexlify(valid_addr_hex))
def test_mainnet_address_encode_error(self):
invalid_prefix = "54be482f953ed0feda4fc5c4d012681b6119274993dc96bf10"
invalid_checksum = "55be482f953ed0feda4fc5c4d012681b6119274993dc96bf11"
invalid_length = "55482f953ed0feda4fc5c4d012681b6119274993dc96bf10"
with self.assertRaises(InvalidAddress):
encode_address(binascii.unhexlify(invalid_prefix))
encode_address(binascii.unhexlify(invalid_checksum))
encode_address(binascii.unhexlify(invalid_length))
def test_mainnet_address_decode_error(self):
with self.assertRaises(InvalidAddress):
decode_address("bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHR")
with self.assertRaises(InvalidAddress):
decode_address("mzGSynizDwSgURdnFjosZwakSVuZrdE8V4")
class TestRegtestAddressValidation(UnitTest):
def setUp(self):
schema.BLOCKCHAIN_NAME = "lbrycrd_regtest"
def tearDown(self):
schema.BLOCKCHAIN_NAME = "lbrycrd_main"
def test_regtest_address_encode_decode(self):
valid_addr_hex = "6fcdac187757dbf05500f613ada6fdd953d59b9acbf3c9343f"
self.assertEqual(encode_address(binascii.unhexlify(valid_addr_hex)),
b"mzGSynizDwSgURdnFjosZwakSVuZrdE8V4")
self.assertEqual(decode_address("mzGSynizDwSgURdnFjosZwakSVuZrdE8V4"),
binascii.unhexlify(valid_addr_hex))
def test_regtest_address_encode_error(self):
invalid_prefix = "6dcdac187757dbf05500f613ada6fdd953d59b9acbf3c9343f"
invalid_checksum = "6fcdac187757dbf05500f613ada6fdd953d59b9acbf3c9343d"
invalid_length = "6fcdac187757dbf05500f613ada6fdd953d59b9acbf3c934"
with self.assertRaises(InvalidAddress):
encode_address(binascii.unhexlify(invalid_prefix))
encode_address(binascii.unhexlify(invalid_checksum))
encode_address(binascii.unhexlify(invalid_length))
def test_regtest_address_decode_error(self):
with self.assertRaises(InvalidAddress):
decode_address("bW5PZEvEBNPQRVhwpYXSjabFgbSw1oaHyR")
with self.assertRaises(InvalidAddress):
decode_address("mzGSynizDwSgURdnFjosZwakSVuZrdE8V5")
class TestInvalidCertificateCurve(UnitTest):
def test_invalid_cert_curve(self):
with self.assertRaises(Exception):
ClaimDict.load_dict(malformed_secp256k1_cert)
class TestValidatePrivateKey(UnitTest):
def test_valid_private_key_for_cert(self):
cert_claim = ClaimDict.load_dict(secp256k1_cert)
self.assertEqual(cert_claim.validate_private_key(secp256k1_private_key, claim_id_1),
True)
def test_fail_to_load_wrong_private_key_for_cert(self):
cert_claim = ClaimDict.load_dict(secp256k1_cert)
self.assertEqual(cert_claim.validate_private_key(nist256p_private_key, claim_id_1),
False)
class TestMigrateLegacyProtobufToCurrentSchema(UnitTest):
def test_migrate_legacy_binary_certificate_to_proto3_certificate(self):
legacy_binary_cert = binary_claim
migrated_cert = migrate_legacy_protobuf(legacy_binary_cert)
self.assertEqual(binascii.hexlify(migrated_cert.channel.public_key).decode(),
expected_binary_claim_decoded['certificate']['publicKey'])
self.assertFalse(migrated_cert.HasField('stream'))
def test_unsigned_stream_claim_migration(self):
legacy_binary_unsigned_stream_claim = binascii.unhexlify(example_010_serialized)
migrated_claim = migrate_legacy_protobuf(legacy_binary_unsigned_stream_claim)
self.assertFalse(migrated_claim.HasField('channel'))
self.assertEqual(migrated_claim.stream.hash, binascii.unhexlify(example_010['stream']['source']['source']))
self.assertEqual(migrated_claim.stream.media_type, example_010['stream']['source']['contentType'])
self.assertEqual(migrated_claim.stream.license, example_010['stream']['metadata']['license'])
self.assertEqual(migrated_claim.stream.description, example_010['stream']['metadata']['description'])
self.assertEqual(migrated_claim.stream.language, example_010['stream']['metadata']['language'])
self.assertEqual(migrated_claim.stream.title, example_010['stream']['metadata']['title'])
self.assertEqual(migrated_claim.stream.author, example_010['stream']['metadata']['author'])
self.assertEqual(migrated_claim.stream.thumbnail_url, example_010['stream']['metadata']['thumbnail'])
self.assertEqual(len(migrated_claim.stream.tags[:]), 0) # it would have if nsfw was True
self.assertEqual(migrated_claim.stream.license_url, "")
def test_nsfw_migrated_as_tag(self):
legacy_binary_unsigned_stream_claim = binascii.unhexlify(example_010_serialized)
claim = legacy_claim_pb2.Claim()
claim.ParseFromString(legacy_binary_unsigned_stream_claim)
claim.stream.metadata.nsfw = True
legacy_binary_unsigned_stream_claim = claim.SerializeToString()
migrated_claim = migrate_legacy_protobuf(legacy_binary_unsigned_stream_claim)
self.assertEqual(migrated_claim.stream.tags[:], ["nsfw"])
def test_license_url_migration(self):
legacy_binary_unsigned_stream_claim = binascii.unhexlify(example_010_serialized)
claim = legacy_claim_pb2.Claim()
claim.ParseFromString(legacy_binary_unsigned_stream_claim)
claim.stream.metadata.licenseUrl = "url/license"
legacy_binary_unsigned_stream_claim = claim.SerializeToString()
migrated_claim = migrate_legacy_protobuf(legacy_binary_unsigned_stream_claim)
self.assertEqual(migrated_claim.stream.license_url, "url/license")
def test_LBC_fee_migration(self):
legacy_binary_unsigned_stream_claim = binascii.unhexlify(example_010_serialized)
claim = legacy_claim_pb2.Claim()
claim.ParseFromString(legacy_binary_unsigned_stream_claim)
claim.stream.metadata.fee.currency = 1
claim.stream.metadata.fee.version = 0
claim.stream.metadata.fee.amount = 2.0
claim.stream.metadata.fee.address = b"bob"
legacy_binary_unsigned_stream_claim = claim.SerializeToString()
migrated_claim = migrate_legacy_protobuf(legacy_binary_unsigned_stream_claim)
self.assertEqual(migrated_claim.stream.fee.currency, 0) # LBC was 1, migrates to 0
self.assertEqual(migrated_claim.stream.fee.amount, int(2.0*COIN))
self.assertEqual(migrated_claim.stream.fee.address, b"bob")
def test_USD_fee_migration(self):
legacy_binary_unsigned_stream_claim = binascii.unhexlify(example_010_serialized)
claim = legacy_claim_pb2.Claim()
claim.ParseFromString(legacy_binary_unsigned_stream_claim)
claim.stream.metadata.fee.currency = 3
claim.stream.metadata.fee.version = 0
claim.stream.metadata.fee.amount = 2.0
claim.stream.metadata.fee.address = b"bob"
legacy_binary_unsigned_stream_claim = claim.SerializeToString()
migrated_claim = migrate_legacy_protobuf(legacy_binary_unsigned_stream_claim)
self.assertEqual(migrated_claim.stream.fee.currency, 1) # USD was 3, migrates to 1
self.assertEqual(migrated_claim.stream.fee.amount, int(200))
self.assertEqual(migrated_claim.stream.fee.address, b"bob")
def test_negative_fee_trolling_becomes_zero(self):
legacy_binary_unsigned_stream_claim = binascii.unhexlify(example_010_serialized)
claim = legacy_claim_pb2.Claim()
claim.ParseFromString(legacy_binary_unsigned_stream_claim)
claim.stream.metadata.fee.currency = 3
claim.stream.metadata.fee.version = 0
claim.stream.metadata.fee.amount = -2.0
claim.stream.metadata.fee.address = b"bob"
legacy_binary_unsigned_stream_claim = claim.SerializeToString()
migrated_claim = migrate_legacy_protobuf(legacy_binary_unsigned_stream_claim)
self.assertEqual(migrated_claim.stream.fee.currency, 1) # USD was 3, migrates to 1
self.assertEqual(migrated_claim.stream.fee.amount, int(0))
self.assertEqual(migrated_claim.stream.fee.address, b"bob")
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,113 @@
import unittest
from lbrynet.schema.uri import URI, URIParseError
claim_id_1 = "63f2da17b0d90042c559cc73b6b17f853945c43e"
parsed_uri_matches = [
("test", URI("test"), False, False, "test", None),
("test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False, False, "test", None),
("test:1", URI("test", claim_sequence=1), False, False, "test", None),
("test$1", URI("test", bid_position=1), False, False, "test", None),
("lbry://test", URI("test"), False, False, "test", None),
("lbry://test#%s" % claim_id_1, URI("test", claim_id=claim_id_1), False, False, "test", None),
("lbry://test:1", URI("test", claim_sequence=1), False, False, "test", None),
("lbry://test$1", URI("test", bid_position=1), False, False, "test", None),
("@test", URI("@test"), True, True, None, "@test"),
("@test#%s" % claim_id_1, URI("@test", claim_id=claim_id_1), True, True, None, "@test"),
("@test:1", URI("@test", claim_sequence=1), True, True, None, "@test"),
("@test$1", URI("@test", bid_position=1), True, True, None, "@test"),
("lbry://@test1:1/fakepath", URI("@test1", claim_sequence=1, path="fakepath"), True, False, "fakepath", "@test1"),
("lbry://@test1$1/fakepath", URI("@test1", bid_position=1, path="fakepath"), True, False, "fakepath", "@test1"),
("lbry://@test1#abcdef/fakepath", URI("@test1", claim_id="abcdef", path="fakepath"), True, False, "fakepath",
"@test1"),
("@z", URI("@z"), True, True, None, "@z"),
("@yx", URI("@yx"), True, True, None, "@yx"),
("@abc", URI("@abc"), True, True, None, "@abc")
]
parsed_uri_raises = [
("lbry://", URIParseError),
("lbry://test:3$1", URIParseError),
("lbry://test$1:1", URIParseError),
("lbry://test#x", URIParseError),
("lbry://test#x/page", URIParseError),
("lbry://test$", URIParseError),
("lbry://test#", URIParseError),
("lbry://test:", URIParseError),
("lbry://test$x", URIParseError),
("lbry://test:x", URIParseError),
("lbry://@test@", URIParseError),
("lbry://@test:", URIParseError),
("lbry://test@", URIParseError),
("lbry://tes@t", URIParseError),
("lbry://test:1#%s" % claim_id_1, URIParseError),
("lbry://test:0", URIParseError),
("lbry://test$0", URIParseError),
("lbry://test/path", URIParseError),
("lbry://@test1#abcdef/fakepath:1", URIParseError),
("lbry://@test1:1/fakepath:1", URIParseError),
("lbry://@test1:1ab/fakepath", URIParseError),
("lbry://test:1:1:1", URIParseError),
("whatever/lbry://test", URIParseError),
("lbry://lbry://test", URIParseError),
("lbry://@/what", URIParseError),
("lbry://abc:0x123", URIParseError),
("lbry://abc:0x123/page", URIParseError),
("lbry://@test1#ABCDEF/fakepath", URIParseError),
("test:0001", URIParseError),
("lbry://@test1$1/fakepath?arg1&arg2&arg3", URIParseError)
]
class TestURIParser(unittest.TestCase):
maxDiff = 4000
longMessage = True
def test_uri_parse(self):
for test_string, expected_uri_obj, contains_channel, is_channel, claim_name, channel_name in parsed_uri_matches:
try:
# string -> URI
self.assertEqual(URI.from_uri_string(test_string), expected_uri_obj, test_string)
# URI -> dict -> URI
self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()), expected_uri_obj,
test_string)
# contains_channel
self.assertEqual(URI.from_uri_string(test_string).contains_channel, contains_channel,
test_string)
# is_channel
self.assertEqual(URI.from_uri_string(test_string).is_channel, is_channel,
test_string)
# claim_name
self.assertEqual(URI.from_uri_string(test_string).claim_name, claim_name,
test_string)
# channel_name
self.assertEqual(URI.from_uri_string(test_string).channel_name, channel_name,
test_string)
# convert-to-string test only works if protocol is present in test_string
if test_string.startswith('lbry://'):
# string -> URI -> string
self.assertEqual(URI.from_uri_string(test_string).to_uri_string(), test_string,
test_string)
# string -> URI -> dict -> URI -> string
uri_dict = URI.from_uri_string(test_string).to_dict()
self.assertEqual(URI.from_dict(uri_dict).to_uri_string(), test_string,
test_string)
# URI -> dict -> URI -> string
self.assertEqual(URI.from_dict(expected_uri_obj.to_dict()).to_uri_string(),
test_string, test_string)
except URIParseError as err:
print("ERROR: " + test_string)
raise
def test_uri_errors(self):
for test_str, err in parsed_uri_raises:
try:
URI.from_uri_string(test_str)
except URIParseError:
pass
else:
print("\nSuccessfully parsed invalid url: " + test_str)
self.assertRaises(err, URI.from_uri_string, test_str)

View file

@ -16,7 +16,7 @@ from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.dht.node import Node
from lbrynet.dht.protocol.protocol import KademliaProtocol
from lbrynet.dht.protocol.routing_table import TreeRoutingTable
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.claim import Claim
def get_mock_node(peer=None):
@ -54,36 +54,19 @@ def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
"permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124",
"supports": [],
"txid": "81ac52662af926fdf639d56920069e0f63449d4cde074c61717cb99ddde40e3c",
"value": {
"claimType": "streamType",
"stream": {
"metadata": {
"author": "",
"description": "",
"language": "en",
"license": "None",
"licenseUrl": "",
"nsfw": False,
"preview": "",
"thumbnail": "",
"title": "33rpm",
"version": "_0_1_0"
},
"source": {
"contentType": "image/png",
"source": sd_hash,
"sourceType": "lbry_sd_hash",
"version": "_0_0_1"
},
"version": "_0_0_1"
},
"version": "_0_0_1"
}
}
claim_obj = Claim()
if fee:
claim['value']['stream']['metadata']['fee'] = fee
claim_dict = ClaimDict.load_dict(claim['value'])
claim['hex'] = binascii.hexlify(claim_dict.serialized).decode()
if fee['currency'] == 'LBC':
claim_obj.stream.fee.lbc = Decimal(fee['amount'])
elif fee['currency'] == 'USD':
claim_obj.stream.fee.usd = Decimal(fee['amount'])
claim_obj.stream.title = "33rpm"
claim_obj.stream.language = "en"
claim_obj.stream.hash = sd_hash
claim_obj.stream.media_type = "image/png"
claim['value'] = claim_obj
claim['hex'] = binascii.hexlify(claim_obj.to_bytes()).decode()
async def mock_resolve(*args):
await storage.save_claims([claim])

View file

@ -1,9 +1,9 @@
from torba.testcase import AsyncioTestCase
from torba.client.wallet import Wallet
from lbrynet.extras.wallet.ledger import MainNetLedger, WalletDatabase
from lbrynet.extras.wallet.header import Headers
from lbrynet.extras.wallet.account import Account
from lbrynet.wallet.ledger import MainNetLedger, WalletDatabase
from lbrynet.wallet.header import Headers
from lbrynet.wallet.account import Account
class TestAccount(AsyncioTestCase):

View file

@ -1,8 +1,8 @@
import unittest
from binascii import hexlify, unhexlify
from lbrynet.extras.wallet.claim_proofs import get_hash_for_outpoint, verify_proof
from lbrynet.schema.hashing import double_sha256
from lbrynet.wallet.claim_proofs import get_hash_for_outpoint, verify_proof
from torba.client.hash import double_sha256
class ClaimProofsTestCase(unittest.TestCase):

View file

@ -1,6 +1,6 @@
import unittest
from lbrynet.extras.wallet.dewies import lbc_to_dewies as l2d, dewies_to_lbc as d2l
from lbrynet.wallet.dewies import lbc_to_dewies as l2d, dewies_to_lbc as d2l
class TestDeweyConversion(unittest.TestCase):

View file

@ -3,7 +3,7 @@ from binascii import unhexlify
from torba.testcase import AsyncioTestCase
from torba.client.util import ArithUint256
from lbrynet.extras.wallet.ledger import Headers
from lbrynet.wallet.ledger import Headers
class TestHeaders(AsyncioTestCase):

View file

@ -1,9 +1,9 @@
from torba.testcase import AsyncioTestCase
from torba.client.wallet import Wallet
from lbrynet.extras.wallet.account import Account
from lbrynet.extras.wallet.transaction import Transaction, Output, Input
from lbrynet.extras.wallet.ledger import MainNetLedger
from lbrynet.wallet.account import Account
from lbrynet.wallet.transaction import Transaction, Output, Input
from lbrynet.wallet.ledger import MainNetLedger
class LedgerTestCase(AsyncioTestCase):

View file

@ -1,11 +1,76 @@
from unittest import TestCase
from binascii import unhexlify
from cryptography.exceptions import InvalidSignature
from torba.testcase import AsyncioTestCase
from torba.client.constants import CENT, NULL_HASH32
from lbrynet.wallet.ledger import MainNetLedger
from lbrynet.wallet.transaction import Transaction
from lbrynet.wallet.transaction import Transaction, Input, Output
from lbrynet.schema.claim import Claim
class TestValidatingOldSignatures(TestCase):
def get_output(amount=CENT, pubkey_hash=NULL_HASH32):
return Transaction() \
.add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \
.outputs[0]
def get_input():
return Input.spend(get_output())
def get_tx():
return Transaction().add_inputs([get_input()])
class TestSigningAndValidatingClaim(AsyncioTestCase):
@staticmethod
def get_channel():
channel_txo = Output.pay_claim_name_pubkey_hash(CENT, '@foo', Claim(), b'abc')
channel_txo.generate_channel_private_key()
get_tx().add_outputs([channel_txo])
return channel_txo
@staticmethod
def get_stream():
stream_txo = Output.pay_claim_name_pubkey_hash(CENT, 'foo', Claim(), b'abc')
get_tx().add_outputs([stream_txo])
return stream_txo
def test_successful_create_sign_and_validate(self):
channel = self.get_channel()
stream = self.get_stream()
stream.sign(channel)
self.assertTrue(stream.is_signed_by(channel))
def test_fail_to_validate_on_wrong_channel(self):
stream = self.get_stream()
stream.sign(self.get_channel())
with self.assertRaises(InvalidSignature):
self.assertTrue(stream.is_signed_by(self.get_channel()))
def test_fail_to_validate_altered_claim(self):
channel = self.get_channel()
stream = self.get_stream()
stream.sign(channel)
self.assertTrue(stream.is_signed_by(channel))
stream.claim.stream.title = 'hello'
with self.assertRaises(InvalidSignature):
self.assertTrue(stream.is_signed_by(channel))
def test_valid_private_key_for_cert(self):
channel = self.get_channel()
self.assertTrue(channel.is_channel_private_key(channel.private_key))
def test_fail_to_load_wrong_private_key_for_cert(self):
channel = self.get_channel()
self.assertFalse(channel.is_channel_private_key(self.get_channel().private_key))
class TestValidatingOldSignatures(AsyncioTestCase):
def test_signed_claim_made_by_ytsync(self):
stream_tx = Transaction(unhexlify(

View file

@ -1,7 +1,7 @@
import unittest
from binascii import hexlify, unhexlify
from lbrynet.extras.wallet.script import OutputScript
from lbrynet.wallet.script import OutputScript
class TestPayClaimNamePubkeyHash(unittest.TestCase):

View file

@ -5,8 +5,8 @@ from torba.testcase import AsyncioTestCase
from torba.client.constants import CENT, COIN, NULL_HASH32
from torba.client.wallet import Wallet
from lbrynet.extras.wallet import MainNetLedger
from lbrynet.extras.wallet.transaction import Transaction, Output, Input
from lbrynet.wallet.ledger import MainNetLedger
from lbrynet.wallet.transaction import Transaction, Output, Input
FEE_PER_BYTE = 50