new API parser

This commit is contained in:
Lex Berezhny 2020-05-12 11:02:34 -04:00
parent 8dc5150dbe
commit 955e44631d
3 changed files with 712 additions and 0 deletions

View file

@ -0,0 +1,332 @@
import logging
from decimal import Decimal
from binascii import hexlify, unhexlify
from datetime import datetime, date
from json import JSONEncoder
from google.protobuf.message import DecodeError
from lbry.schema.claim import Claim
from lbry.wallet.wallet import Wallet, Account
from lbry.blockchain.transaction import Transaction, Output
from lbry.crypto.bip32 import PubKey
from lbry.blockchain.dewies import dewies_to_lbc
from import ManagedStream
log = logging.getLogger(__name__)
output_doc = {
'txid': "hash of transaction in hex",
'nout': "position in the transaction",
'height': "block where transaction was recorded",
'amount': "value of the txo as a decimal",
'address': "address of who can spend the txo",
'confirmations': "number of confirmed blocks",
'is_change': "payment to change address, only available when it can be determined",
'is_received': "true if txo was sent from external account to this account",
'is_spent': "true if txo is spent",
'is_mine': "payment to one of your accounts, only available when it can be determined",
'type': "one of 'claim', 'support' or 'purchase'",
'name': "when type is 'claim' or 'support', this is the claim name",
'claim_id': "when type is 'claim', 'support' or 'purchase', this is the claim id",
'claim_op': "when type is 'claim', this determines if it is 'create' or 'update'",
'value': "when type is 'claim' or 'support' with payload, this is the decoded protobuf payload",
'value_type': "determines the type of the 'value' field: 'channel', 'stream', etc",
'protobuf': "hex encoded raw protobuf version of 'value' field",
'permanent_url': "when type is 'claim' or 'support', this is the long permanent claim URL",
'claim': "for purchase outputs only, metadata of purchased claim",
'reposted_claim': "for repost claims only, metadata of claim being reposted",
'signing_channel': "for signed claims only, metadata of signing channel",
'is_channel_signature_valid': "for signed claims only, whether signature is valid",
'purchase_receipt': "metadata for the purchase transaction associated with this claim"
transaction_doc = {
'txid': "hash of transaction in hex",
'height': "block where transaction was recorded",
'inputs': [output_doc],
'outputs': [output_doc],
'total_input': "sum of inputs as a decimal",
'total_output': "sum of outputs, sans fee, as a decimal",
'total_fee': "fee amount",
'hex': "entire transaction encoded in hex",
account_doc = {
'id': 'account_id',
'is_default': 'this account is used by default',
'ledger': 'name of crypto currency and network',
'name': 'optional account name',
'seed': 'human friendly words from which account can be recreated',
'encrypted': 'if account is encrypted',
'private_key': 'extended private key',
'public_key': 'extended public key',
'address_generator': 'settings for generating addresses',
'modified_on': 'date of last modification to account settings'
wallet_doc = {
'id': 'wallet_id',
'name': 'optional wallet name',
managedstream_doc = {
'streaming_url': '(str) url to stream the file using range requests',
'completed': '(bool) true if download is completed',
'file_name': '(str) name of file',
'download_directory': '(str) download directory',
'points_paid': '(float) credit paid to download file',
'stopped': '(bool) true if download is stopped',
'stream_hash': '(str) stream hash of file',
'stream_name': '(str) stream name',
'suggested_file_name': '(str) suggested file name',
'sd_hash': '(str) sd hash of file',
'download_path': '(str) download path of file',
'mime_type': '(str) mime type of file',
'key': '(str) key attached to file',
'total_bytes_lower_bound': '(int) lower bound file size in bytes',
'total_bytes': '(int) file upper bound size in bytes',
'written_bytes': '(int) written size in bytes',
'blobs_completed': '(int) number of fully downloaded blobs',
'blobs_in_stream': '(int) total blobs on stream',
'blobs_remaining': '(int) total blobs remaining to download',
'status': '(str) downloader status',
'claim_id': '(str) None if claim is not found else the claim id',
'txid': '(str) None if claim is not found else the transaction id',
'nout': '(int) None if claim is not found else the transaction output index',
'outpoint': '(str) None if claim is not found else the tx and output',
'metadata': '(dict) None if claim is not found else the claim metadata',
'channel_claim_id': '(str) None if claim is not found or not signed',
'channel_name': '(str) None if claim is not found or not signed',
'claim_name': '(str) None if claim is not found else the claim name'
address_doc = {
def encode_pagination_doc(items):
return {
"page": "Page number of the current items.",
"page_size": "Number of items to show on a page.",
"total_pages": "Total number of pages.",
"total_items": "Total number of items.",
"items": [items],
class JSONResponseEncoder(JSONEncoder):
def __init__(self, *args, service, include_protobuf=False, **kwargs):
super().__init__(*args, **kwargs)
self.service = service
self.include_protobuf = include_protobuf
def default(self, obj): # pylint: disable=method-hidden,arguments-differ,too-many-return-statements
if isinstance(obj, Account):
return self.encode_account(obj)
if isinstance(obj, Wallet):
return self.encode_wallet(obj)
if isinstance(obj, ManagedStream):
return self.encode_file(obj)
if isinstance(obj, Transaction):
return self.encode_transaction(obj)
if isinstance(obj, Output):
return self.encode_output(obj)
if isinstance(obj, Claim):
return self.encode_claim(obj)
if isinstance(obj, PubKey):
return obj.extended_key_string()
if isinstance(obj, date):
return obj.isoformat()
if isinstance(obj, datetime):
return obj.strftime("%Y%m%dT%H:%M:%S")
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, bytes):
return obj.decode()
return super().default(obj)
def encode_transaction(self, tx):
return {
'height': tx.height,
'inputs': [self.encode_input(txo) for txo in tx.inputs],
'outputs': [self.encode_output(txo) for txo in tx.outputs],
'total_input': dewies_to_lbc(tx.input_sum),
'total_output': dewies_to_lbc(tx.input_sum - tx.fee),
'total_fee': dewies_to_lbc(tx.fee),
'hex': hexlify(tx.raw).decode(),
def encode_output(self, txo, check_signature=True):
if not txo:
tx_height = txo.tx_ref.height
best_height = 0#self.ledger.headers.height
output = {
'nout': txo.position,
'height': tx_height,
'amount': dewies_to_lbc(txo.amount),
'address': txo.get_address(self.service.ledger) if txo.has_address else None,
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
'timestamp': 0 #self.ledger.headers.estimated_timestamp(tx_height)
if txo.is_spent is not None:
output['is_spent'] = txo.is_spent
if txo.is_my_output is not None:
output['is_my_output'] = txo.is_my_output
if txo.is_my_input is not None:
output['is_my_input'] = txo.is_my_input
if txo.sent_supports is not None:
output['sent_supports'] = dewies_to_lbc(txo.sent_supports)
if txo.sent_tips is not None:
output['sent_tips'] = dewies_to_lbc(txo.sent_tips)
if txo.received_tips is not None:
output['received_tips'] = dewies_to_lbc(txo.received_tips)
if txo.is_internal_transfer is not None:
output['is_internal_transfer'] = txo.is_internal_transfer
if txo.script.is_claim_name:
output['type'] = 'claim'
output['claim_op'] = 'create'
elif txo.script.is_update_claim:
output['type'] = 'claim'
output['claim_op'] = 'update'
elif txo.script.is_support_claim:
output['type'] = 'support'
elif txo.script.is_return_data:
output['type'] = 'data'
elif txo.purchase is not None:
output['type'] = 'purchase'
output['claim_id'] = txo.purchased_claim_id
if txo.purchased_claim is not None:
output['claim'] = self.encode_output(txo.purchased_claim)
output['type'] = 'payment'
if txo.script.is_claim_involved:
'name': txo.claim_name,
'normalized_name': txo.normalized_name,
'claim_id': txo.claim_id,
'permanent_url': txo.permanent_url,
'meta': self.encode_claim_meta(txo.meta.copy())
if 'short_url' in output['meta']:
output['short_url'] = output['meta'].pop('short_url')
if 'canonical_url' in output['meta']:
output['canonical_url'] = output['meta'].pop('canonical_url')
if is not None:
output['claims'] = [self.encode_output(o) for o in]
if txo.reposted_claim is not None:
output['reposted_claim'] = self.encode_output(txo.reposted_claim)
if txo.script.is_claim_name or txo.script.is_update_claim:
output['value'] = txo.claim
output['value_type'] = txo.claim.claim_type
if self.include_protobuf:
output['protobuf'] = hexlify(txo.claim.to_bytes())
if txo.purchase_receipt is not None:
output['purchase_receipt'] = self.encode_output(txo.purchase_receipt)
if txo.claim.is_channel:
output['has_signing_key'] = txo.has_private_key
if check_signature and txo.claim.is_signed:
if is not None:
output['signing_channel'] = self.encode_output(
output['is_channel_signature_valid'] = txo.is_signed_by(, self.service.ledger)
output['signing_channel'] = {'channel_id': txo.claim.signing_channel_id}
output['is_channel_signature_valid'] = False
except DecodeError:
return output
def encode_claim_meta(self, meta):
for key, value in meta.items():
if key.endswith('_amount'):
if isinstance(value, int):
meta[key] = dewies_to_lbc(value)
if 0 < meta.get('creation_height', 0) <= 0: #self.ledger.headers.height:
meta['creation_timestamp'] = self.ledger.headers.estimated_timestamp(meta['creation_height'])
return meta
def encode_input(self, txi):
return self.encode_output(txi.txo_ref.txo, False) if txi.txo_ref.txo is not None else {
'nout': txi.txo_ref.position
def encode_account(self, account):
result = account.to_dict()
result['id'] =
result.pop('certificates', None)
#result['is_default'] = self.ledger.accounts[0] == account
return result
def encode_wallet(wallet):
return {
def encode_file(self, managed_stream):
output_exists = managed_stream.output_file_exists
tx_height = managed_stream.stream_claim_info.height
best_height = 0 #self.ledger.headers.height
return {
'streaming_url': managed_stream.stream_url,
'completed': managed_stream.completed,
'file_name': managed_stream.file_name if output_exists else None,
'download_directory': managed_stream.download_directory if output_exists else None,
'download_path': managed_stream.full_path if output_exists else None,
'points_paid': 0.0,
'stopped': not managed_stream.running,
'stream_hash': managed_stream.stream_hash,
'stream_name': managed_stream.descriptor.stream_name,
'suggested_file_name': managed_stream.descriptor.suggested_file_name,
'sd_hash': managed_stream.descriptor.sd_hash,
'mime_type': managed_stream.mime_type,
'key': managed_stream.descriptor.key,
'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length(),
'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length(),
'written_bytes': managed_stream.written_bytes,
'blobs_completed': managed_stream.blobs_completed,
'blobs_in_stream': managed_stream.blobs_in_stream,
'blobs_remaining': managed_stream.blobs_remaining,
'status': managed_stream.status,
'claim_id': managed_stream.claim_id,
'txid': managed_stream.txid,
'nout': managed_stream.nout,
'outpoint': managed_stream.outpoint,
'metadata': managed_stream.metadata,
'protobuf': managed_stream.metadata_protobuf,
'channel_claim_id': managed_stream.channel_claim_id,
'channel_name': managed_stream.channel_name,
'claim_name': managed_stream.claim_name,
'content_fee': managed_stream.content_fee,
'purchase_receipt': self.encode_output(managed_stream.purchase_receipt),
'added_on': managed_stream.added_on,
'height': tx_height,
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
'timestamp': 0, #self.ledger.headers.estimated_timestamp(tx_height),
'is_fully_reflected': managed_stream.is_fully_reflected
def encode_claim(self, claim):
encoded = getattr(claim, claim.claim_type).to_dict()
if 'public_key' in encoded:
encoded['public_key_id'] = self.service.ledger.public_key_to_address(
return encoded

lbry/service/ Normal file
View file

@ -0,0 +1,246 @@
import json
import inspect
import textwrap
import tokenize
import token
from io import BytesIO
from typing import Tuple, List
from lbry.service import api
from lbry.service import json_encoder
def parse_description(desc) -> dict:
lines = iter(desc.splitlines())
parts = {'text': []}
current = parts['text']
for line in lines:
if line.strip() in ('Usage:', 'Options:', 'Returns:'):
current = parts.setdefault(line.strip().lower()[:-1], [])
if line.strip():
return parts
def parse_type(tokens: List) -> Tuple[str, str]:
type_ = [tokens.pop(0).string]
if tokens and tokens[0].string == '[':
while tokens[0].string != ']':
json_ = None
if type_ == ['StrOrList']:
type_ = ['str, list']
elif type_[0] == 'Paginated':
json_ = json_encoder.encode_pagination_doc(
getattr(json_encoder, f'{type_[2].lower()}_doc')
return ''.join(type_), json_
# obj_type = result[1:-1]
# if '[' in obj_type:
# sub_type = obj_type[obj_type.index('[') + 1:-1]
# obj_type = obj_type[:obj_type.index('[')]
# if obj_type == 'Paginated':
# obj_def = encode_pagination_doc(RETURN_DOCS[sub_type])
# elif obj_type == 'List':
# obj_def = [RETURN_DOCS[sub_type]]
# else:
# raise NameError(f'Unknown return type: {obj_type}')
# else:
# obj_def = RETURN_DOCS[obj_type]
# return indent(json.dumps(obj_def, indent=4), ' ' * 12)
def parse_argument(tokens, method_name='') -> dict:
arg = {
'name': tokens.pop(0).string,
'desc': []
if arg['name'] == 'self':
return {}
if tokens[0].string == ':':
type_tokens = []
while tokens[0].string not in ('=', ',', ')') and tokens[0].type != token.COMMENT:
arg['type'] = parse_type(type_tokens)[0]
if tokens[0].string == '=':
default = tokens.pop(0)
if default.type == token.NAME:
default_value = eval(default.string)
if default_value is not None:
arg['default'] = default_value
elif default.type == token.NUMBER:
arg['default'] = int(default.string)
arg['default'] = default.string
if tokens[0].string == ',':
if 'type' not in arg:
if 'default' in arg:
arg['type'] = type(arg['default']).__name__
raise Exception(f"No type and no default value for {arg['name']} in {method_name}")
for part in tokens:
if part.type == token.COMMENT:
return arg
def produce_argument_tokens(src: str):
in_args = False
in_comment = False
parsed = []
for t in tokenize.tokenize(BytesIO(src.encode()).readline):
if t.string == '(':
in_args = True
elif in_args:
if not in_comment and t.string == ',':
in_comment = True
elif in_comment and (t.type == token.NAME or t.string == '**'):
yield parsed
in_comment = False
parsed = []
if t.type in (token.NAME, token.OP, token.COMMENT, token.STRING, token.NUMBER):
if t.string == ')':
yield parsed
def parse_return(tokens) -> dict:
d = {'desc': []}
if tokens[0].string == '->':
type_tokens = []
while tokens[0].string != ':':
d['type'], _ = parse_type(type_tokens)
if _:
d['json'] = _
assert tokens.pop(0).string == ':'
for part in tokens:
if part.type == token.COMMENT:
return d
def produce_return_tokens(src: str):
in_return = False
parsed = []
for t in tokenize.tokenize(BytesIO(src.encode()).readline):
if t.string == ')':
in_return = True
elif in_return:
if t.type == token.INDENT:
return parsed
def parse_method(method, expanders: dict) -> dict:
d = {
'name': method.__name__,
'desc': parse_description(textwrap.dedent(method.__doc__)) if method.__doc__ else '',
'method': method,
'arguments': [],
'returns': None
src = inspect.getsource(method)
for tokens in produce_argument_tokens(src):
if tokens[0].string == '**':
expander_name = tokens.pop(0).string[:-7]
if expander_name not in expanders:
raise Exception(f"Expander '{expander_name}' not found, used by {d['name']}.")
expander = expanders[expander_name]
arg = parse_argument(tokens, d['name'])
if arg:
d['returns'] = parse_return(produce_return_tokens(src))
return d
def get_expanders():
expanders = {}
for e in api.kwarg_expanders:
expanders[e.__name__] = parse_method(e, expanders)['arguments']
return expanders
def get_groups(cls):
return {
group_name[:-len('_DOC')].lower(): getattr(cls, group_name).strip()
for group_name in dir(cls) if group_name.endswith('_DOC')
def get_methods(cls):
expanders = get_expanders()
return {
method: parse_method(getattr(cls, method), expanders)
for method in dir(cls) if not method.endswith('_DOC') and not method.startswith('_')
def generate_options(method, indent):
flags = []
for arg in method['arguments']:
if arg['type'] == 'bool':
max_len = max(len(f) for f in flags) + 1
flags = [f.ljust(max_len) for f in flags]
options = []
for flag, arg in zip(flags, method['arguments']):
line = [f"{indent}{flag}: ({arg['type']}) {' '.join(arg['desc'])}"]
if 'default' in arg:
line.append(f" [default: {arg['default']}]")
return options
def augment_description(command):
def get_api_definitions(cls):
groups = get_groups(cls)
commands = get_methods(cls)
for name, command in commands.items():
parts = name.split('_')
if parts[0] in groups:
command['name'] = '_'.join(parts[1:])
command['group'] = parts[0]
#command['desc'] =
return {'groups': groups, 'commands': commands}
def write(fp):
fp.write(f'interface = ')
defs = get_api_definitions(api.API)
for c in defs['commands'].values():
del c['method']
j = json.dumps(defs, indent=4)
j = j.replace(': false', ': False')
j = j.replace(': true', ': True')
j = j.replace(': null', ': None')
def main():
with open('', 'w') as fp:
if __name__ == "__main__":

View file

@ -0,0 +1,134 @@
from unittest import TestCase
from lbry.service.api import Paginated, Wallet
from lbry.service.parser import (
parse_method, get_expanders, get_api_definitions,
class FakeAPI:
THING_DOC = "thing doc"
def thing_create(
name: str, # the name
value1='hi', # the first value
value2=9 # the second value
) -> str: # thing name
"""create command doc"""
def thing_list(
value1: str = None, # the first value
value2: int = None, # the second value
value3=False, # a bool
# multi-line
) -> Paginated[Wallet]: # list of wallets
"""list command doc"""
def not_grouped(self) -> str: # some string
group command doc
not_grouped [--foo]
--foo : (bool) blah
(str) blah
class TestParser(TestCase):
maxDiff = None
def test_parse_method(self):
expanders = get_expanders()
parse_method(FakeAPI.thing_create, expanders), {
'name': 'thing_create',
'desc': {'text': ['create command doc']},
'method': FakeAPI.thing_create,
'arguments': [
{'name': 'name', 'type': 'str', 'desc': ['the name']},
{'name': 'value1', 'type': 'str', 'default': "'hi'", 'desc': ['the first value']},
{'name': 'value2', 'type': 'int', 'default': 9, 'desc': ['the second value']},
'returns': {
'type': 'str',
'desc': ['thing name']
parse_method(FakeAPI.thing_list, expanders), {
'name': 'thing_list',
'desc': {'text': ['list command doc']},
'method': FakeAPI.thing_list,
'arguments': [
{'name': 'value1', 'type': 'str', 'desc': ['the first value']},
{'name': 'value2', 'type': 'int', 'desc': ['the second value']},
{'name': 'value3', 'type': 'bool', 'default': False, 'desc': ['a bool', 'multi-line']},
{'name': 'page', 'type': 'int', 'desc': ['page to return during paginating']},
{'name': 'page_size', 'type': 'int', 'desc': ['number of items on page during pagination']}
'returns': {
'type': 'Paginated[Wallet]',
'desc': ['list of wallets'],
'json': {
'page': 'Page number of the current items.',
'page_size': 'Number of items to show on a page.',
'total_pages': 'Total number of pages.',
'total_items': 'Total number of items.',
'items': [
{'id': 'wallet_id', 'name': 'optional wallet name'}
parse_method(FakeAPI.not_grouped, expanders), {
'name': 'not_grouped',
'desc': {
'text': ['group command doc'],
'usage': [' not_grouped [--foo]'],
'options': [' --foo : (bool) blah'],
'returns': [' (str) blah']
'method': FakeAPI.not_grouped,
'arguments': [],
'returns': {'desc': ['some string'], 'type': 'str'}
def test_get_api_definitions(self):
defs = get_api_definitions(FakeAPI)
self.assertEqual({'groups', 'commands'}, set(defs))
self.assertEqual(defs['groups'], {'thing': 'thing doc'})
self.assertEqual(defs['commands']['thing_create']['group'], 'thing')
self.assertEqual(defs['commands']['thing_create']['name'], 'create')
self.assertEqual(defs['commands']['thing_list']['group'], 'thing')
self.assertEqual(defs['commands']['thing_list']['name'], 'list')
self.assertEqual(defs['commands']['not_grouped']['name'], 'not_grouped')
self.assertNotIn('group', defs['commands']['not_grouped'])
class TestGenerator(TestCase):
maxDiff = None
def test_generate_options(self):
expanders = get_expanders()
generate_options(parse_method(FakeAPI.thing_list, expanders), indent=' '), [
' --value1=<value1> : (str) the first value',
' --value2=<value2> : (int) the second value',
' --value3 : (bool) a bool multi-line [default: False]',
' --page=<page> : (int) page to return during paginating',
' --page_size=<page_size> : (int) number of items on page during pagination'