forked from LBRYCommunity/lbry-sdk
289 lines
10 KiB
Python
289 lines
10 KiB
Python
# Copyright (c) 2016-2017, Neil Booth
|
|
#
|
|
# All rights reserved.
|
|
#
|
|
# The MIT License (MIT)
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining
|
|
# a copy of this software and associated documentation files (the
|
|
# "Software"), to deal in the Software without restriction, including
|
|
# without limitation the rights to use, copy, modify, merge, publish,
|
|
# distribute, sublicense, and/or sell copies of the Software, and to
|
|
# permit persons to whom the Software is furnished to do so, subject to
|
|
# the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
|
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
|
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
|
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
# and warranty status of this software.
|
|
|
|
"""Script-related classes and functions."""
|
|
|
|
|
|
from collections import namedtuple
|
|
|
|
from lbry.wallet.server.util import unpack_le_uint16_from, unpack_le_uint32_from, \
|
|
pack_le_uint16, pack_le_uint32
|
|
|
|
|
|
class EnumError(Exception):
|
|
pass
|
|
|
|
|
|
class Enumeration:
|
|
|
|
def __init__(self, name, enumList):
|
|
self.__doc__ = name
|
|
|
|
lookup = {}
|
|
reverseLookup = {}
|
|
i = 0
|
|
uniqueNames = set()
|
|
uniqueValues = set()
|
|
for x in enumList:
|
|
if isinstance(x, tuple):
|
|
x, i = x
|
|
if not isinstance(x, str):
|
|
raise EnumError(f"enum name {x} not a string")
|
|
if not isinstance(i, int):
|
|
raise EnumError(f"enum value {i} not an integer")
|
|
if x in uniqueNames:
|
|
raise EnumError(f"enum name {x} not unique")
|
|
if i in uniqueValues:
|
|
raise EnumError(f"enum value {i} not unique")
|
|
uniqueNames.add(x)
|
|
uniqueValues.add(i)
|
|
lookup[x] = i
|
|
reverseLookup[i] = x
|
|
i = i + 1
|
|
self.lookup = lookup
|
|
self.reverseLookup = reverseLookup
|
|
|
|
def __getattr__(self, attr):
|
|
result = self.lookup.get(attr)
|
|
if result is None:
|
|
raise AttributeError(f'enumeration has no member {attr}')
|
|
return result
|
|
|
|
def whatis(self, value):
|
|
return self.reverseLookup[value]
|
|
|
|
|
|
class ScriptError(Exception):
|
|
"""Exception used for script errors."""
|
|
|
|
|
|
OpCodes = Enumeration("Opcodes", [
|
|
("OP_0", 0), ("OP_PUSHDATA1", 76),
|
|
"OP_PUSHDATA2", "OP_PUSHDATA4", "OP_1NEGATE",
|
|
"OP_RESERVED",
|
|
"OP_1", "OP_2", "OP_3", "OP_4", "OP_5", "OP_6", "OP_7", "OP_8",
|
|
"OP_9", "OP_10", "OP_11", "OP_12", "OP_13", "OP_14", "OP_15", "OP_16",
|
|
"OP_NOP", "OP_VER", "OP_IF", "OP_NOTIF", "OP_VERIF", "OP_VERNOTIF",
|
|
"OP_ELSE", "OP_ENDIF", "OP_VERIFY", "OP_RETURN",
|
|
"OP_TOALTSTACK", "OP_FROMALTSTACK", "OP_2DROP", "OP_2DUP", "OP_3DUP",
|
|
"OP_2OVER", "OP_2ROT", "OP_2SWAP", "OP_IFDUP", "OP_DEPTH", "OP_DROP",
|
|
"OP_DUP", "OP_NIP", "OP_OVER", "OP_PICK", "OP_ROLL", "OP_ROT",
|
|
"OP_SWAP", "OP_TUCK",
|
|
"OP_CAT", "OP_SUBSTR", "OP_LEFT", "OP_RIGHT", "OP_SIZE",
|
|
"OP_INVERT", "OP_AND", "OP_OR", "OP_XOR", "OP_EQUAL", "OP_EQUALVERIFY",
|
|
"OP_RESERVED1", "OP_RESERVED2",
|
|
"OP_1ADD", "OP_1SUB", "OP_2MUL", "OP_2DIV", "OP_NEGATE", "OP_ABS",
|
|
"OP_NOT", "OP_0NOTEQUAL", "OP_ADD", "OP_SUB", "OP_MUL", "OP_DIV", "OP_MOD",
|
|
"OP_LSHIFT", "OP_RSHIFT", "OP_BOOLAND", "OP_BOOLOR", "OP_NUMEQUAL",
|
|
"OP_NUMEQUALVERIFY", "OP_NUMNOTEQUAL", "OP_LESSTHAN", "OP_GREATERTHAN",
|
|
"OP_LESSTHANOREQUAL", "OP_GREATERTHANOREQUAL", "OP_MIN", "OP_MAX",
|
|
"OP_WITHIN",
|
|
"OP_RIPEMD160", "OP_SHA1", "OP_SHA256", "OP_HASH160", "OP_HASH256",
|
|
"OP_CODESEPARATOR", "OP_CHECKSIG", "OP_CHECKSIGVERIFY", "OP_CHECKMULTISIG",
|
|
"OP_CHECKMULTISIGVERIFY",
|
|
"OP_NOP1",
|
|
"OP_CHECKLOCKTIMEVERIFY", "OP_CHECKSEQUENCEVERIFY"
|
|
])
|
|
|
|
|
|
# Paranoia to make it hard to create bad scripts
|
|
assert OpCodes.OP_DUP == 0x76
|
|
assert OpCodes.OP_HASH160 == 0xa9
|
|
assert OpCodes.OP_EQUAL == 0x87
|
|
assert OpCodes.OP_EQUALVERIFY == 0x88
|
|
assert OpCodes.OP_CHECKSIG == 0xac
|
|
assert OpCodes.OP_CHECKMULTISIG == 0xae
|
|
|
|
|
|
def _match_ops(ops, pattern):
|
|
if len(ops) != len(pattern):
|
|
return False
|
|
for op, pop in zip(ops, pattern):
|
|
if pop != op:
|
|
# -1 means 'data push', whose op is an (op, data) tuple
|
|
if pop == -1 and isinstance(op, tuple):
|
|
continue
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class ScriptPubKey:
|
|
"""A class for handling a tx output script that gives conditions
|
|
necessary for spending.
|
|
"""
|
|
|
|
TO_ADDRESS_OPS = [OpCodes.OP_DUP, OpCodes.OP_HASH160, -1,
|
|
OpCodes.OP_EQUALVERIFY, OpCodes.OP_CHECKSIG]
|
|
TO_P2SH_OPS = [OpCodes.OP_HASH160, -1, OpCodes.OP_EQUAL]
|
|
TO_PUBKEY_OPS = [-1, OpCodes.OP_CHECKSIG]
|
|
|
|
PayToHandlers = namedtuple('PayToHandlers', 'address script_hash pubkey '
|
|
'unspendable strange')
|
|
|
|
@classmethod
|
|
def pay_to(cls, handlers, script):
|
|
"""Parse a script, invoke the appropriate handler and
|
|
return the result.
|
|
|
|
One of the following handlers is invoked:
|
|
handlers.address(hash160)
|
|
handlers.script_hash(hash160)
|
|
handlers.pubkey(pubkey)
|
|
handlers.unspendable()
|
|
handlers.strange(script)
|
|
"""
|
|
try:
|
|
ops = Script.get_ops(script)
|
|
except ScriptError:
|
|
return handlers.unspendable()
|
|
|
|
match = _match_ops
|
|
|
|
if match(ops, cls.TO_ADDRESS_OPS):
|
|
return handlers.address(ops[2][-1])
|
|
if match(ops, cls.TO_P2SH_OPS):
|
|
return handlers.script_hash(ops[1][-1])
|
|
if match(ops, cls.TO_PUBKEY_OPS):
|
|
return handlers.pubkey(ops[0][-1])
|
|
if ops and ops[0] == OpCodes.OP_RETURN:
|
|
return handlers.unspendable()
|
|
return handlers.strange(script)
|
|
|
|
@classmethod
|
|
def P2SH_script(cls, hash160):
|
|
return (bytes([OpCodes.OP_HASH160])
|
|
+ Script.push_data(hash160)
|
|
+ bytes([OpCodes.OP_EQUAL]))
|
|
|
|
@classmethod
|
|
def P2PKH_script(cls, hash160):
|
|
return (bytes([OpCodes.OP_DUP, OpCodes.OP_HASH160])
|
|
+ Script.push_data(hash160)
|
|
+ bytes([OpCodes.OP_EQUALVERIFY, OpCodes.OP_CHECKSIG]))
|
|
|
|
@classmethod
|
|
def validate_pubkey(cls, pubkey, req_compressed=False):
|
|
if isinstance(pubkey, (bytes, bytearray)):
|
|
if len(pubkey) == 33 and pubkey[0] in (2, 3):
|
|
return # Compressed
|
|
if len(pubkey) == 65 and pubkey[0] == 4:
|
|
if not req_compressed:
|
|
return
|
|
raise PubKeyError('uncompressed pubkeys are invalid')
|
|
raise PubKeyError(f'invalid pubkey {pubkey}')
|
|
|
|
@classmethod
|
|
def pubkey_script(cls, pubkey):
|
|
cls.validate_pubkey(pubkey)
|
|
return Script.push_data(pubkey) + bytes([OpCodes.OP_CHECKSIG])
|
|
|
|
@classmethod
|
|
def multisig_script(cls, m, pubkeys):
|
|
"""Returns the script for a pay-to-multisig transaction."""
|
|
n = len(pubkeys)
|
|
if not 1 <= m <= n <= 15:
|
|
raise ScriptError(f'{m:d} of {n:d} multisig script not possible')
|
|
for pubkey in pubkeys:
|
|
cls.validate_pubkey(pubkey, req_compressed=True)
|
|
# See https://bitcoin.org/en/developer-guide
|
|
# 2 of 3 is: OP_2 pubkey1 pubkey2 pubkey3 OP_3 OP_CHECKMULTISIG
|
|
return (bytes([OP_1 + m - 1])
|
|
+ b''.join(cls.push_data(pubkey) for pubkey in pubkeys)
|
|
+ bytes([OP_1 + n - 1, OP_CHECK_MULTISIG]))
|
|
|
|
|
|
class Script:
|
|
|
|
@classmethod
|
|
def get_ops(cls, script):
|
|
ops = []
|
|
|
|
# The unpacks or script[n] below throw on truncated scripts
|
|
try:
|
|
n = 0
|
|
while n < len(script):
|
|
op = script[n]
|
|
n += 1
|
|
|
|
if op <= OpCodes.OP_PUSHDATA4:
|
|
# Raw bytes follow
|
|
if op < OpCodes.OP_PUSHDATA1:
|
|
dlen = op
|
|
elif op == OpCodes.OP_PUSHDATA1:
|
|
dlen = script[n]
|
|
n += 1
|
|
elif op == OpCodes.OP_PUSHDATA2:
|
|
dlen, = unpack_le_uint16_from(script[n: n + 2])
|
|
n += 2
|
|
else:
|
|
dlen, = unpack_le_uint32_from(script[n: n + 4])
|
|
n += 4
|
|
if n + dlen > len(script):
|
|
raise IndexError
|
|
op = (op, script[n:n + dlen])
|
|
n += dlen
|
|
|
|
ops.append(op)
|
|
except Exception:
|
|
# Truncated script; e.g. tx_hash
|
|
# ebc9fa1196a59e192352d76c0f6e73167046b9d37b8302b6bb6968dfd279b767
|
|
raise ScriptError('truncated script')
|
|
|
|
return ops
|
|
|
|
@classmethod
|
|
def push_data(cls, data):
|
|
"""Returns the opcodes to push the data on the stack."""
|
|
assert isinstance(data, (bytes, bytearray))
|
|
|
|
n = len(data)
|
|
if n < OpCodes.OP_PUSHDATA1:
|
|
return bytes([n]) + data
|
|
if n < 256:
|
|
return bytes([OpCodes.OP_PUSHDATA1, n]) + data
|
|
if n < 65536:
|
|
return bytes([OpCodes.OP_PUSHDATA2]) + pack_le_uint16(n) + data
|
|
return bytes([OpCodes.OP_PUSHDATA4]) + pack_le_uint32(n) + data
|
|
|
|
@classmethod
|
|
def opcode_name(cls, opcode):
|
|
if OpCodes.OP_0 < opcode < OpCodes.OP_PUSHDATA1:
|
|
return f'OP_{opcode:d}'
|
|
try:
|
|
return OpCodes.whatis(opcode)
|
|
except KeyError:
|
|
return f'OP_UNKNOWN:{opcode:d}'
|
|
|
|
@classmethod
|
|
def dump(cls, script):
|
|
opcodes, datas = cls.get_ops(script)
|
|
for opcode, data in zip(opcodes, datas):
|
|
name = cls.opcode_name(opcode)
|
|
if data is None:
|
|
print(name)
|
|
else:
|
|
print(f'{name} {data.hex()} ({len(data):d} bytes)')
|