new bitcoin script parser

This commit is contained in:
Lex Berezhny 2018-04-08 13:37:34 -04:00 committed by Jack Robison
parent 0fd160e6e6
commit 7161a0edb9
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 800 additions and 120 deletions

View file

View file

@ -0,0 +1,257 @@
from binascii import hexlify, unhexlify
from twisted.trial import unittest
from lbrynet.wallet.script import Template, ParseError, tokenize, push_data
from lbrynet.wallet.script import PUSH_SINGLE, PUSH_MANY, OP_HASH160, OP_EQUAL
from lbrynet.wallet.script import InputScript, OutputScript
from lbrynet.wallet.bcd_data_stream import BCDataStream
def parse(opcodes, source):
template = Template('test', opcodes)
s = BCDataStream()
for t in source:
if isinstance(t, bytes):
s.write_many(push_data(t))
elif isinstance(t, int):
s.write_uint8(t)
else:
raise ValueError()
s.reset()
return template.parse(tokenize(s))
class TestScriptTemplates(unittest.TestCase):
def test_push_data(self):
self.assertEqual(parse(
(PUSH_SINGLE('script_hash'),),
(b'abcdef',)
), {
'script_hash': b'abcdef'
}
)
self.assertEqual(parse(
(PUSH_SINGLE('first'), PUSH_SINGLE('last')),
(b'Satoshi', b'Nakamoto')
), {
'first': b'Satoshi',
'last': b'Nakamoto'
}
)
self.assertEqual(parse(
(OP_HASH160, PUSH_SINGLE('script_hash'), OP_EQUAL),
(OP_HASH160, b'abcdef', OP_EQUAL)
), {
'script_hash': b'abcdef'
}
)
def test_push_data_many(self):
self.assertEqual(parse(
(PUSH_MANY('names'),),
(b'amit',)
), {
'names': [b'amit']
}
)
self.assertEqual(parse(
(PUSH_MANY('names'),),
(b'jeremy', b'amit', b'victor')
), {
'names': [b'jeremy', b'amit', b'victor']
}
)
self.assertEqual(parse(
(OP_HASH160, PUSH_MANY('names'), OP_EQUAL),
(OP_HASH160, b'grin', b'jack', OP_EQUAL)
), {
'names': [b'grin', b'jack']
}
)
def test_push_data_mixed(self):
self.assertEqual(parse(
(PUSH_SINGLE('CEO'), PUSH_MANY('Devs'), PUSH_SINGLE(b'CTO'), PUSH_SINGLE(b'State')),
(b'jeremy', b'lex', b'amit', b'victor', b'jack', b'grin', b'NH')
), {
'CEO': b'jeremy',
'CTO': b'grin',
'Devs': [b'lex', b'amit', b'victor', b'jack'],
'State': b'NH'
}
)
def test_push_data_many_separated(self):
self.assertEqual(parse(
(PUSH_MANY('Chiefs'), OP_HASH160, PUSH_MANY('Devs')),
(b'jeremy', b'grin', OP_HASH160, b'lex', b'jack')
), {
'Chiefs': [b'jeremy', b'grin'],
'Devs': [b'lex', b'jack']
}
)
def test_push_data_many_not_separated(self):
with self.assertRaisesRegexp(ParseError, 'consecutive PUSH_MANY'):
parse((PUSH_MANY('Chiefs'), PUSH_MANY('Devs')), (b'jeremy', b'grin', b'lex', b'jack'))
class TestRedeemPubKeyHash(unittest.TestCase):
def redeem_pubkey_hash(self, sig, pubkey):
# this checks that factory function correctly sets up the script
src1 = InputScript.redeem_pubkey_hash(unhexlify(sig), unhexlify(pubkey))
self.assertEqual(src1.template.name, 'pubkey_hash')
self.assertEqual(hexlify(src1.values['signature']), sig)
self.assertEqual(hexlify(src1.values['pubkey']), pubkey)
# now we test that it will round trip
src2 = InputScript(src1.source)
self.assertEqual(src2.template.name, 'pubkey_hash')
self.assertEqual(hexlify(src2.values['signature']), sig)
self.assertEqual(hexlify(src2.values['pubkey']), pubkey)
return hexlify(src1.source)
def test_redeem_pubkey_hash_1(self):
self.assertEqual(
self.redeem_pubkey_hash(
b'30450221009dc93f25184a8d483745cd3eceff49727a317c9bfd8be8d3d04517e9cdaf8dd502200e02dc5939cad9562d2b1f303f185957581c4851c98d497af281118825e18a8301',
b'025415a06514230521bff3aaface31f6db9d9bbc39bf1ca60a189e78731cfd4e1b'
),
'4830450221009dc93f25184a8d483745cd3eceff49727a317c9bfd8be8d3d04517e9cdaf8dd502200e02d'
'c5939cad9562d2b1f303f185957581c4851c98d497af281118825e18a830121025415a06514230521bff3'
'aaface31f6db9d9bbc39bf1ca60a189e78731cfd4e1b'
)
class TestRedeemScriptHash(unittest.TestCase):
def redeem_script_hash(self, sigs, pubkeys):
# this checks that factory function correctly sets up the script
src1 = InputScript.redeem_script_hash(
[unhexlify(sig) for sig in sigs],
[unhexlify(pubkey) for pubkey in pubkeys]
)
subscript1 = src1.values['script']
self.assertEqual(src1.template.name, 'script_hash')
self.assertEqual([hexlify(v) for v in src1.values['signatures']], sigs)
self.assertEqual([hexlify(p) for p in subscript1.values['pubkeys']], pubkeys)
self.assertEqual(subscript1.values['signatures_count'], len(sigs))
self.assertEqual(subscript1.values['pubkeys_count'], len(pubkeys))
# now we test that it will round trip
src2 = InputScript(src1.source)
subscript2 = src2.values['script']
self.assertEqual(src2.template.name, 'script_hash')
self.assertEqual([hexlify(v) for v in src2.values['signatures']], sigs)
self.assertEqual([hexlify(p) for p in subscript2.values['pubkeys']], pubkeys)
self.assertEqual(subscript2.values['signatures_count'], len(sigs))
self.assertEqual(subscript2.values['pubkeys_count'], len(pubkeys))
return hexlify(src1.source)
def test_redeem_script_hash_1(self):
self.assertEqual(
self.redeem_script_hash([
'3045022100fec82ed82687874f2a29cbdc8334e114af645c45298e85bb1efe69fcf15c617a0220575'
'e40399f9ada388d8e522899f4ec3b7256896dd9b02742f6567d960b613f0401',
'3044022024890462f731bd1a42a4716797bad94761fc4112e359117e591c07b8520ea33b02201ac68'
'9e35c4648e6beff1d42490207ba14027a638a62663b2ee40153299141eb01',
'30450221009910823e0142967a73c2d16c1560054d71c0625a385904ba2f1f53e0bc1daa8d02205cd'
'70a89c6cf031a8b07d1d5eb0d65d108c4d49c2d403f84fb03ad3dc318777a01'
], [
'0372ba1fd35e5f1b1437cba0c4ebfc4025b7349366f9f9c7c8c4b03a47bd3f68a4',
'03061d250182b2db1ba144167fd8b0ef3fe0fc3a2fa046958f835ffaf0dfdb7692',
'02463bfbc1eaec74b5c21c09239ae18dbf6fc07833917df10d0b43e322810cee0c',
'02fa6a6455c26fb516cfa85ea8de81dd623a893ffd579ee2a00deb6cdf3633d6bb',
'0382910eae483ce4213d79d107bfc78f3d77e2a31ea597be45256171ad0abeaa89'
]),
'00483045022100fec82ed82687874f2a29cbdc8334e114af645c45298e85bb1efe69fcf15c617a0220575e'
'40399f9ada388d8e522899f4ec3b7256896dd9b02742f6567d960b613f0401473044022024890462f731bd'
'1a42a4716797bad94761fc4112e359117e591c07b8520ea33b02201ac689e35c4648e6beff1d42490207ba'
'14027a638a62663b2ee40153299141eb014830450221009910823e0142967a73c2d16c1560054d71c0625a'
'385904ba2f1f53e0bc1daa8d02205cd70a89c6cf031a8b07d1d5eb0d65d108c4d49c2d403f84fb03ad3dc3'
'18777a014cad53210372ba1fd35e5f1b1437cba0c4ebfc4025b7349366f9f9c7c8c4b03a47bd3f68a42103'
'061d250182b2db1ba144167fd8b0ef3fe0fc3a2fa046958f835ffaf0dfdb76922102463bfbc1eaec74b5c2'
'1c09239ae18dbf6fc07833917df10d0b43e322810cee0c2102fa6a6455c26fb516cfa85ea8de81dd623a89'
'3ffd579ee2a00deb6cdf3633d6bb210382910eae483ce4213d79d107bfc78f3d77e2a31ea597be45256171'
'ad0abeaa8955ae'
)
class TestPayPubKeyHash(unittest.TestCase):
def pay_pubkey_hash(self, pubkey_hash):
# this checks that factory function correctly sets up the script
src1 = OutputScript.pay_pubkey_hash(unhexlify(pubkey_hash))
self.assertEqual(src1.template.name, 'pay_pubkey_hash')
self.assertEqual(hexlify(src1.values['pubkey_hash']), pubkey_hash)
# now we test that it will round trip
src2 = OutputScript(src1.source)
self.assertEqual(src2.template.name, 'pay_pubkey_hash')
self.assertEqual(hexlify(src2.values['pubkey_hash']), pubkey_hash)
return hexlify(src1.source)
def test_pay_pubkey_hash_1(self):
self.assertEqual(
self.pay_pubkey_hash(b'64d74d12acc93ba1ad495e8d2d0523252d664f4d'),
'76a91464d74d12acc93ba1ad495e8d2d0523252d664f4d88ac'
)
class TestPayScriptHash(unittest.TestCase):
def pay_script_hash(self, script_hash):
# this checks that factory function correctly sets up the script
src1 = OutputScript.pay_script_hash(unhexlify(script_hash))
self.assertEqual(src1.template.name, 'pay_script_hash')
self.assertEqual(hexlify(src1.values['script_hash']), script_hash)
# now we test that it will round trip
src2 = OutputScript(src1.source)
self.assertEqual(src2.template.name, 'pay_script_hash')
self.assertEqual(hexlify(src2.values['script_hash']), script_hash)
return hexlify(src1.source)
def test_pay_pubkey_hash_1(self):
self.assertEqual(
self.pay_script_hash(b'63d65a2ee8c44426d06050cfd71c0f0ff3fc41ac'),
'a91463d65a2ee8c44426d06050cfd71c0f0ff3fc41ac87'
)
class TestPayClaimNamePubkeyHash(unittest.TestCase):
def pay_claim_name_pubkey_hash(self, name, claim, pubkey_hash):
# this checks that factory function correctly sets up the script
src1 = OutputScript.pay_claim_name_pubkey_hash(name, unhexlify(claim), unhexlify(pubkey_hash))
self.assertEqual(src1.template.name, 'claim_name+pay_pubkey_hash')
self.assertEqual(src1.values['claim_name'], name)
self.assertEqual(hexlify(src1.values['claim']), claim)
self.assertEqual(hexlify(src1.values['pubkey_hash']), pubkey_hash)
# now we test that it will round trip
src2 = OutputScript(src1.source)
self.assertEqual(src2.template.name, 'claim_name+pay_pubkey_hash')
self.assertEqual(src2.values['claim_name'], name)
self.assertEqual(hexlify(src2.values['claim']), claim)
self.assertEqual(hexlify(src2.values['pubkey_hash']), pubkey_hash)
return hexlify(src1.source)
def test_pay_claim_name_pubkey_hash_1(self):
self.assertEqual(
self.pay_claim_name_pubkey_hash(
# name
b'cats',
# claim
b'080110011a7808011230080410011a084d616361726f6e6922002a003214416c6c20726967687473'
b'2072657365727665642e38004a0052005a001a42080110011a30add80aaf02559ba09853636a0658'
b'c42b727cb5bb4ba8acedb4b7fe656065a47a31878dbf9912135ddb9e13806cc1479d220a696d6167'
b'652f6a7065672a5c080110031a404180cc0fa4d3839ee29cca866baed25fafb43fca1eb3b608ee88'
b'9d351d3573d042c7b83e2e643db0d8e062a04e6e9ae6b90540a2f95fe28638d0f18af4361a1c2214'
b'f73de93f4299fb32c32f949e02198a8e91101abd',
# pub key
b'be16e4b0f9bd8f6d47d02b3a887049c36d3b84cb'
),
'b504636174734cdc080110011a7808011230080410011a084d616361726f6e6922002a003214416c6c207'
'269676874732072657365727665642e38004a0052005a001a42080110011a30add80aaf02559ba0985363'
'6a0658c42b727cb5bb4ba8acedb4b7fe656065a47a31878dbf9912135ddb9e13806cc1479d220a696d616'
'7652f6a7065672a5c080110031a404180cc0fa4d3839ee29cca866baed25fafb43fca1eb3b608ee889d35'
'1d3573d042c7b83e2e643db0d8e062a04e6e9ae6b90540a2f95fe28638d0f18af4361a1c2214f73de93f4'
'299fb32c32f949e02198a8e91101abd6d7576a914be16e4b0f9bd8f6d47d02b3a887049c36d3b84cb88ac'
)

View file

@ -1,133 +1,126 @@
import struct
from io import BytesIO
class SerializationError(Exception):
""" Thrown when there's a problem deserializing or serializing """
class BCDataStream:
def __init__(self, data=None):
self.data = BytesIO(data)
class BCDataStream(object):
def __init__(self):
self.input = None
self.read_cursor = 0
@property
def is_at_beginning(self):
return self.data.tell() == 0
def clear(self):
self.input = None
self.read_cursor = 0
def reset(self):
self.data.seek(0)
def write(self, bytes): # Initialize with string of bytes
if self.input is None:
self.input = bytes
else:
self.input += bytes
def get_bytes(self):
return self.data.getvalue()
def read(self, size):
return self.data.read(size)
def write(self, data):
self.data.write(data)
def write_many(self, many):
self.data.writelines(many)
def read_string(self):
# Strings are encoded depending on length:
# 0 to 252 : 1-byte-length followed by bytes (if any)
# 253 to 65,535 : byte'253' 2-byte-length followed by bytes
# 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
# ... and the Bitcoin client is coded to understand:
# greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
# ... but I don't think it actually handles any strings that big.
if self.input is None:
raise SerializationError("call write(bytes) before trying to deserialize")
return self.read(self.read_compact_size())
try:
length = self.read_compact_size()
except IndexError:
raise SerializationError("attempt to read past end of buffer")
return self.read_bytes(length)
def write_string(self, string):
# Length-encoded as with read-string
self.write_compact_size(len(string))
self.write(string)
def read_bytes(self, length):
try:
result = self.input[self.read_cursor:self.read_cursor + length]
self.read_cursor += length
return result
except IndexError:
raise SerializationError("attempt to read past end of buffer")
return ''
def read_boolean(self):
return self.read_bytes(1)[0] != chr(0)
def read_int16(self):
return self._read_num('<h')
def read_uint16(self):
return self._read_num('<H')
def read_int32(self):
return self._read_num('<i')
def read_uint32(self):
return self._read_num('<I')
def read_int64(self):
return self._read_num('<q')
def read_uint64(self):
return self._read_num('<Q')
def write_boolean(self, val):
return self.write(chr(1) if val else chr(0))
def write_int16(self, val):
return self._write_num('<h', val)
def write_uint16(self, val):
return self._write_num('<H', val)
def write_int32(self, val):
return self._write_num('<i', val)
def write_uint32(self, val):
return self._write_num('<I', val)
def write_int64(self, val):
return self._write_num('<q', val)
def write_uint64(self, val):
return self._write_num('<Q', val)
def read_compact_size(self):
size = ord(self.input[self.read_cursor])
self.read_cursor += 1
if size == 253:
size = self._read_num('<H')
elif size == 254:
size = self._read_num('<I')
elif size == 255:
size = self._read_num('<Q')
return size
def write_compact_size(self, size):
if size < 0:
raise SerializationError("attempt to write size < 0")
elif size < 253:
self.write(chr(size))
elif size < 2 ** 16:
self.write('\xfd')
self._write_num('<H', size)
elif size < 2 ** 32:
self.write('\xfe')
self._write_num('<I', size)
elif size < 2 ** 64:
self.write('\xff')
self._write_num('<Q', size)
def _read_num(self, format):
(i,) = struct.unpack_from(format, self.input, self.read_cursor)
self.read_cursor += struct.calcsize(format)
return i
def _write_num(self, format, num):
s = struct.pack(format, num)
def write_string(self, s):
self.write_compact_size(len(s))
self.write(s)
def read_compact_size(self):
size = self.read_uint8()
if size < 253:
return size
if size == 253:
return self.read_uint16()
elif size == 254:
return self.read_uint32()
elif size == 255:
return self.read_uint64()
def write_compact_size(self, size):
if size < 253:
self.write_uint8(size)
elif size <= 0xFFFF:
self.write_uint8(253)
self.write_uint16(size)
elif size <= 0xFFFFFFFF:
self.write_uint8(254)
self.write_uint32(size)
else:
self.write_uint8(255)
self.write_uint64(size)
def read_boolean(self):
return self.read_uint8() != 0
def write_boolean(self, val):
return self.write_uint8(1 if val else 0)
int8 = struct.Struct('b')
uint8 = struct.Struct('B')
int16 = struct.Struct('<h')
uint16 = struct.Struct('<H')
int32 = struct.Struct('<i')
uint32 = struct.Struct('<I')
int64 = struct.Struct('<q')
uint64 = struct.Struct('<Q')
def _read_struct(self, fmt):
value = self.read(fmt.size)
if len(value) > 0:
return fmt.unpack(value)[0]
def read_int8(self):
return self._read_struct(self.int8)
def read_uint8(self):
return self._read_struct(self.uint8)
def read_int16(self):
return self._read_struct(self.int16)
def read_uint16(self):
return self._read_struct(self.uint16)
def read_int32(self):
return self._read_struct(self.int32)
def read_uint32(self):
return self._read_struct(self.uint32)
def read_int64(self):
return self._read_struct(self.int64)
def read_uint64(self):
return self._read_struct(self.uint64)
def write_int8(self, val):
self.write(self.int8.pack(val))
def write_uint8(self, val):
self.write(self.uint8.pack(val))
def write_int16(self, val):
self.write(self.int16.pack(val))
def write_uint16(self, val):
self.write(self.uint16.pack(val))
def write_int32(self, val):
self.write(self.int32.pack(val))
def write_uint32(self, val):
self.write(self.uint32.pack(val))
def write_int64(self, val):
self.write(self.int64.pack(val))
def write_uint64(self, val):
self.write(self.uint64.pack(val))

426
lbrynet/wallet/script.py Normal file
View file

@ -0,0 +1,426 @@
from itertools import chain
from binascii import hexlify
from collections import namedtuple
from .bcd_data_stream import BCDataStream
from .util import subclass_tuple
# bitcoin opcodes
OP_0 = 0x00
OP_1 = 0x51
OP_16 = 0x60
OP_DUP = 0x76
OP_HASH160 = 0xa9
OP_EQUALVERIFY = 0x88
OP_CHECKSIG = 0xac
OP_CHECKMULTISIG = 0xae
OP_EQUAL = 0x87
OP_PUSHDATA1 = 0x4c
OP_PUSHDATA2 = 0x4d
OP_PUSHDATA4 = 0x4e
OP_2DROP = 0x6d
OP_DROP = 0x75
# lbry custom opcodes
OP_CLAIM_NAME = 0xb5
OP_SUPPORT_CLAIM = 0xb6
OP_UPDATE_CLAIM = 0xb7
# template matching opcodes (not real opcodes)
# base class for PUSH_DATA related opcodes
PUSH_DATA_OP = namedtuple('PUSH_DATA_OP', 'name')
# opcode for variable length strings
PUSH_SINGLE = subclass_tuple('PUSH_SINGLE', PUSH_DATA_OP)
# opcode for variable number of variable length strings
PUSH_MANY = subclass_tuple('PUSH_MANY', PUSH_DATA_OP)
# opcode with embedded subscript parsing
PUSH_SUBSCRIPT = namedtuple('PUSH_SUBSCRIPT', 'name template')
def is_push_data_opcode(opcode):
return isinstance(opcode, PUSH_DATA_OP) or isinstance(opcode, PUSH_SUBSCRIPT)
def is_push_data_token(token):
return 1 <= token <= OP_PUSHDATA4
def push_data(data):
size = len(data)
if size < OP_PUSHDATA1:
yield BCDataStream.uint8.pack(size)
elif size <= 0xFF:
yield BCDataStream.uint8.pack(OP_PUSHDATA1)
yield BCDataStream.uint8.pack(size)
elif size <= 0xFFFF:
yield BCDataStream.uint8.pack(OP_PUSHDATA2)
yield BCDataStream.uint16.pack(size)
else:
yield BCDataStream.uint8.pack(OP_PUSHDATA4)
yield BCDataStream.uint32.pack(size)
yield data
def read_data(token, stream):
if token < OP_PUSHDATA1:
return stream.read(token)
elif token == OP_PUSHDATA1:
return stream.read(stream.read_uint8())
elif token == OP_PUSHDATA2:
return stream.read(stream.read_uint16())
else:
return stream.read(stream.read_uint32())
# opcode for OP_1 - OP_16
SMALL_INTEGER = namedtuple('SMALL_INTEGER', 'name')
def is_small_integer(token):
return OP_1 <= token <= OP_16
def push_small_integer(num):
assert 1 <= num <= 16
yield BCDataStream.uint8.pack(OP_1 + (num - 1))
def read_small_integer(token):
return (token - OP_1) + 1
# tokens contain parsed values to be matched against opcodes
Token = namedtuple('Token', 'value')
DataToken = subclass_tuple('DataToken', Token)
SmallIntegerToken = subclass_tuple('SmallIntegerToken', Token)
def token_producer(source):
token = source.read_uint8()
while token is not None:
if is_push_data_token(token):
yield DataToken(read_data(token, source))
elif is_small_integer(token):
yield SmallIntegerToken(read_small_integer(token))
else:
yield Token(token)
token = source.read_uint8()
def tokenize(source):
return list(token_producer(source))
class ScriptError(Exception):
""" General script handling error. """
class ParseError(ScriptError):
""" Script parsing error. """
class Parser:
def __init__(self, opcodes, tokens):
self.opcodes = opcodes
self.tokens = tokens
self.values = {}
self.token_index = 0
self.opcode_index = 0
def parse(self):
while self.token_index < len(self.tokens) and self.opcode_index < len(self.opcodes):
token = self.tokens[self.token_index]
opcode = self.opcodes[self.opcode_index]
if isinstance(token, DataToken):
if isinstance(opcode, (PUSH_SINGLE, PUSH_SUBSCRIPT)):
self.push_single(opcode, token.value)
elif isinstance(opcode, PUSH_MANY):
self.consume_many_non_greedy()
else:
raise ParseError("DataToken found but opcode was '{}'.".format(opcode))
elif isinstance(token, SmallIntegerToken):
if isinstance(opcode, SMALL_INTEGER):
self.values[opcode.name] = token.value
else:
raise ParseError("SmallIntegerToken found but opcode was '{}'.".format(opcode))
elif token.value == opcode:
pass
else:
raise ParseError("Token is '{}' and opcode is '{}'.".format(token.value, opcode))
self.token_index += 1
self.opcode_index += 1
if self.token_index < len(self.tokens):
raise ParseError("Parse completed without all tokens being consumed.")
if self.opcode_index < len(self.opcodes):
raise ParseError("Parse completed without all opcodes being consumed.")
return self
def consume_many_non_greedy(self):
""" Allows PUSH_MANY to consume data without being greedy
in cases when one or more PUSH_SINGLEs follow a PUSH_MANY. This will
prioritize giving all PUSH_SINGLEs some data and only after that
subsume the rest into PUSH_MANY.
"""
token_values = []
while self.token_index < len(self.tokens):
token = self.tokens[self.token_index]
if not isinstance(token, DataToken):
self.token_index -= 1
break
token_values.append(token.value)
self.token_index += 1
push_opcodes = []
push_many_count = 0
while self.opcode_index < len(self.opcodes):
opcode = self.opcodes[self.opcode_index]
if not is_push_data_opcode(opcode):
self.opcode_index -= 1
break
if isinstance(opcode, PUSH_MANY):
push_many_count += 1
push_opcodes.append(opcode)
self.opcode_index += 1
if push_many_count > 1:
raise ParseError(
"Cannot have more than one consecutive PUSH_MANY, as there is no way to tell which"
" token value should go into which PUSH_MANY."
)
if len(push_opcodes) > len(token_values):
raise ParseError(
"Not enough token values to match all of the PUSH_MANY and PUSH_SINGLE opcodes."
)
many_opcode = push_opcodes.pop(0)
# consume data into PUSH_SINGLE opcodes, working backwards
for opcode in reversed(push_opcodes):
self.push_single(opcode, token_values.pop())
# finally PUSH_MANY gets everything that's left
self.values[many_opcode.name] = token_values
def push_single(self, opcode, value):
if isinstance(opcode, PUSH_SINGLE):
self.values[opcode.name] = value
elif isinstance(opcode, PUSH_SUBSCRIPT):
self.values[opcode.name] = Script.from_source_with_template(value, opcode.template)
else:
raise ParseError("Not a push single or subscript: {}".format(opcode))
class Template(object):
__slots__ = 'name', 'opcodes'
def __init__(self, name, opcodes):
self.name = name
self.opcodes = opcodes
def parse(self, tokens):
return Parser(self.opcodes, tokens).parse().values
def generate(self, values):
source = BCDataStream()
for opcode in self.opcodes:
if isinstance(opcode, PUSH_SINGLE):
data = values[opcode.name]
source.write_many(push_data(data))
elif isinstance(opcode, PUSH_SUBSCRIPT):
data = values[opcode.name]
source.write_many(push_data(data.source))
elif isinstance(opcode, PUSH_MANY):
for data in values[opcode.name]:
source.write_many(push_data(data))
elif isinstance(opcode, SMALL_INTEGER):
data = values[opcode.name]
source.write_many(push_small_integer(data))
else:
source.write_uint8(opcode)
return source.get_bytes()
class Script(object):
__slots__ = 'source', 'template', 'values'
templates = []
def __init__(self, source=None, template=None, values=None, template_hint=None):
self.source = source
self.template = template
self.values = values
if source:
self._parse(template_hint)
elif template and values:
self.source = template.generate(values)
else:
raise ValueError("Either a valid 'source' or a 'template' and 'values' are required.")
@classmethod
def from_source_with_template(cls, source, template):
if template in InputScript.templates:
return InputScript(source, template_hint=template)
elif template in OutputScript.templates:
return OutputScript(source, template_hint=template)
else:
return cls(source, template_hint=template)
def _parse(self, template_hint=None):
tokens = tokenize(BCDataStream(self.source))
for template in chain((template_hint,), self.templates):
if not template:
continue
try:
self.values = template.parse(tokens)
self.template = template
return
except ParseError:
continue
raise ValueError('No matching templates for source: {}'.format(hexlify(self.source)))
class InputScript(Script):
__slots__ = ()
# input / redeem script templates (aka scriptSig)
REDEEM_PUBKEY_HASH = Template('pubkey_hash', (
PUSH_SINGLE('signature'), PUSH_SINGLE('pubkey')
))
REDEEM_SCRIPT = Template('script', (
SMALL_INTEGER('signatures_count'), PUSH_MANY('pubkeys'), SMALL_INTEGER('pubkeys_count'),
OP_CHECKMULTISIG
))
REDEEM_SCRIPT_HASH = Template('script_hash', (
OP_0, PUSH_MANY('signatures'), PUSH_SUBSCRIPT('script', REDEEM_SCRIPT)
))
templates = [
REDEEM_PUBKEY_HASH,
REDEEM_SCRIPT_HASH,
REDEEM_SCRIPT
]
@classmethod
def redeem_pubkey_hash(cls, signature, pubkey):
return cls(template=cls.REDEEM_PUBKEY_HASH, values={
'signature': signature,
'pubkey': pubkey
})
@classmethod
def redeem_script_hash(cls, signatures, pubkeys):
return cls(template=cls.REDEEM_SCRIPT_HASH, values={
'signatures': signatures,
'script': cls.redeem_script(signatures, pubkeys)
})
@classmethod
def redeem_script(cls, signatures, pubkeys):
return cls(template=cls.REDEEM_SCRIPT, values={
'signatures_count': len(signatures),
'pubkeys': pubkeys,
'pubkeys_count': len(pubkeys)
})
class OutputScript(Script):
__slots__ = ()
# output / payment script templates (aka scriptPubKey)
PAY_PUBKEY_HASH = Template('pay_pubkey_hash', (
OP_DUP, OP_HASH160, PUSH_SINGLE('pubkey_hash'), OP_EQUALVERIFY, OP_CHECKSIG
))
PAY_SCRIPT_HASH = Template('pay_script_hash', (
OP_HASH160, PUSH_SINGLE('script_hash'), OP_EQUAL
))
CLAIM_NAME_OPCODES = (
OP_CLAIM_NAME, PUSH_SINGLE('claim_name'), PUSH_SINGLE('claim'),
OP_2DROP, OP_DROP
)
CLAIM_NAME_PUBKEY = Template('claim_name+pay_pubkey_hash', (
CLAIM_NAME_OPCODES + PAY_PUBKEY_HASH.opcodes
))
CLAIM_NAME_SCRIPT = Template('claim_name+pay_script_hash', (
CLAIM_NAME_OPCODES + PAY_SCRIPT_HASH.opcodes
))
SUPPORT_CLAIM_OPCODES = (
OP_SUPPORT_CLAIM, PUSH_SINGLE('claim_name'), PUSH_SINGLE('claim_id'),
OP_2DROP, OP_DROP
)
SUPPORT_CLAIM_PUBKEY = Template('support_claim+pay_pubkey_hash', (
SUPPORT_CLAIM_OPCODES + PAY_PUBKEY_HASH.opcodes
))
SUPPORT_CLAIM_SCRIPT = Template('support_claim+pay_script_hash', (
SUPPORT_CLAIM_OPCODES + PAY_SCRIPT_HASH.opcodes
))
UPDATE_CLAIM_OPCODES = (
OP_UPDATE_CLAIM, PUSH_SINGLE('claim_name'), PUSH_SINGLE('claim_id'), PUSH_SINGLE('claim'),
OP_2DROP, OP_2DROP
)
UPDATE_CLAIM_PUBKEY = Template('update_claim+pay_pubkey_hash', (
UPDATE_CLAIM_OPCODES + PAY_PUBKEY_HASH.opcodes
))
UPDATE_CLAIM_SCRIPT = Template('update_claim+pay_script_hash', (
UPDATE_CLAIM_OPCODES + PAY_SCRIPT_HASH.opcodes
))
templates = [
PAY_PUBKEY_HASH,
PAY_SCRIPT_HASH,
CLAIM_NAME_PUBKEY,
CLAIM_NAME_SCRIPT,
SUPPORT_CLAIM_PUBKEY,
SUPPORT_CLAIM_SCRIPT,
UPDATE_CLAIM_PUBKEY,
UPDATE_CLAIM_SCRIPT
]
@classmethod
def pay_pubkey_hash(cls, pubkey_hash):
return cls(template=cls.PAY_PUBKEY_HASH, values={
'pubkey_hash': pubkey_hash
})
@classmethod
def pay_script_hash(cls, script_hash):
return cls(template=cls.PAY_SCRIPT_HASH, values={
'script_hash': script_hash
})
@classmethod
def pay_claim_name_pubkey_hash(cls, claim_name, claim, pubkey_hash):
return cls(template=cls.CLAIM_NAME_PUBKEY, values={
'claim_name': claim_name,
'claim': claim,
'pubkey_hash': pubkey_hash
})
@property
def is_claim_name(self):
return self.template.name.startswith('claim_name+')
@property
def is_support_claim(self):
return self.template.name.startswith('support_claim+')
@property
def is_update_claim(self):
return self.template.name.startswith('update_claim+')
@property
def is_claim_involved(self):
return self.is_claim_name or self.is_support_claim or self.is_update_claim

View file

@ -8,6 +8,10 @@ from .constants import NO_SIGNATURE
log = logging.getLogger(__name__)
def subclass_tuple(name, base):
return type(name, (base,), {'__slots__': ()})
def normalize_version(v):
return [int(x) for x in re.sub(r'(\.0+)*$', '', v).split(".")]