lbry-sdk/lbry/blockchain/ledger.py
2020-09-17 11:25:50 -04:00

174 lines
5.9 KiB
Python

from binascii import unhexlify
from string import hexdigits
from typing import TYPE_CHECKING, Type
from lbry.crypto.hash import hash160, double_sha256
from lbry.crypto.base58 import Base58
from lbry.schema.url import URL
from .header import Headers, UnvalidatedHeaders
from .checkpoints import HASHES
from .dewies import lbc_to_dewies
if TYPE_CHECKING:
from lbry.conf import Config
class Ledger:
name = 'LBRY Credits'
symbol = 'LBC'
network_name = 'mainnet'
headers_class = Headers
secret_prefix = bytes((0x1c,))
pubkey_address_prefix = bytes((0x55,))
script_address_prefix = bytes((0x7a,))
extended_public_key_prefix = unhexlify('0488b21e')
extended_private_key_prefix = unhexlify('0488ade4')
max_target = 0x0000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
genesis_hash = '9c89283ba0f3227f6c03b70216b9f665f0118d5e0fa729cedf4fb34d6a34f463'
genesis_bits = 0x1f00ffff
target_timespan = 150
fee_per_byte = 50
fee_per_name_char = 200000
checkpoints = HASHES
def __init__(self, conf: 'Config'):
self.conf = conf
self.coin_selection_strategy = None
@classmethod
def get_id(cls):
return '{}_{}'.format(cls.symbol.lower(), cls.network_name.lower())
@classmethod
def hash160_to_address(cls, h160):
raw_address = cls.pubkey_address_prefix + h160
return Base58.encode(bytearray(raw_address + double_sha256(raw_address)[0:4]))
@staticmethod
def address_to_hash160(address) -> bytes:
return Base58.decode(address)[1:21]
@classmethod
def is_valid_address(cls, address):
decoded = Base58.decode_check(address)
return decoded[0] == cls.pubkey_address_prefix[0]
@classmethod
def valid_address_or_error(cls, address):
try:
assert cls.is_valid_address(address)
except:
raise Exception(f"'{address}' is not a valid address")
@staticmethod
def valid_claim_id(claim_id: str):
if not len(claim_id) == 40:
raise Exception(f"Incorrect claimid length: {len(claim_id)}")
if set(claim_id).difference(hexdigits):
raise Exception("Claim id is not hex encoded")
@staticmethod
def valid_channel_name_or_error(name: str):
try:
if not name:
raise Exception("Channel name cannot be blank.")
parsed = URL.parse(name)
if not parsed.has_channel:
raise Exception("Channel names must start with '@' symbol.")
if parsed.channel.name != name:
raise Exception("Channel name has invalid character")
except (TypeError, ValueError):
raise Exception("Invalid channel name.")
@staticmethod
def valid_stream_name_or_error(name: str):
try:
if not name:
raise Exception('Stream name cannot be blank.')
parsed = URL.parse(name)
if parsed.has_channel:
raise Exception(
"Stream names cannot start with '@' symbol. This is reserved for channels claims."
)
if not parsed.has_stream or parsed.stream.name != name:
raise Exception('Stream name has invalid characters.')
except (TypeError, ValueError):
raise Exception("Invalid stream name.")
@staticmethod
def valid_collection_name_or_error(name: str):
try:
if not name:
raise Exception('Collection name cannot be blank.')
parsed = URL.parse(name)
if parsed.has_channel:
raise Exception(
"Collection names cannot start with '@' symbol. This is reserved for channels claims."
)
if not parsed.has_stream or parsed.stream.name != name:
raise Exception('Collection name has invalid characters.')
except (TypeError, ValueError):
raise Exception("Invalid collection name.")
@staticmethod
def get_dewies_or_error(argument: str, lbc: str, positive_value=False):
try:
dewies = lbc_to_dewies(lbc)
if positive_value and dewies <= 0:
raise ValueError(f"'{argument}' value must be greater than 0.0")
return dewies
except ValueError as e:
raise ValueError(f"Invalid value for '{argument}': {e.args[0]}")
def get_fee_address(self, kwargs: dict, claim_address: str) -> str:
if 'fee_address' in kwargs:
self.valid_address_or_error(kwargs['fee_address'])
return kwargs['fee_address']
if 'fee_currency' in kwargs or 'fee_amount' in kwargs:
return claim_address
@classmethod
def public_key_to_address(cls, public_key):
return cls.hash160_to_address(hash160(public_key))
@staticmethod
def private_key_to_wif(private_key):
return b'\x1c' + private_key + b'\x01'
class TestNetLedger(Ledger):
network_name = 'testnet'
pubkey_address_prefix = bytes((111,))
script_address_prefix = bytes((196,))
extended_public_key_prefix = unhexlify('043587cf')
extended_private_key_prefix = unhexlify('04358394')
checkpoints = {}
class RegTestLedger(Ledger):
network_name = 'regtest'
headers_class = UnvalidatedHeaders
pubkey_address_prefix = bytes((111,))
script_address_prefix = bytes((196,))
extended_public_key_prefix = unhexlify('043587cf')
extended_private_key_prefix = unhexlify('04358394')
max_target = 0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
genesis_hash = '6e3fcf1299d4ec5d79c3a4c91d624a4acf9e2e173d95a1a0504f677669687556'
genesis_bits = 0x207fffff
target_timespan = 1
checkpoints = {}
def ledger_class_from_name(name) -> Type[Ledger]:
return {
Ledger.network_name: Ledger,
TestNetLedger.network_name: TestNetLedger,
RegTestLedger.network_name: RegTestLedger
}[name]