lbry-sdk/lbry/service/parser.py
Lex Berezhny 3ef83febc0 progress
2020-06-05 00:35:22 -04:00

306 lines
9.6 KiB
Python

import json
import inspect
import textwrap
import tokenize
import token
from io import BytesIO
from typing import Tuple, List
from lbry.service import api
from lbry.service import json_encoder
LINE_WIDTH = 90
def parse_description(desc) -> dict:
lines = iter(desc.splitlines())
parts = {'text': []}
current = parts['text']
for line in lines:
if line.strip() in ('Usage:', 'Options:', 'Returns:'):
current = parts.setdefault(line.strip().lower()[:-1], [])
else:
if line.strip():
if line.strip() == '{kwargs}':
parts['kwargs'] = line.find('{kwargs}')
else:
current.append(line)
return parts
def parse_type(tokens: List) -> Tuple[str, str]:
type_ = [tokens.pop(0).string]
if tokens and tokens[0].string == '[':
while tokens[0].string != ']':
type_.append(tokens.pop(0).string)
type_.append(tokens.pop(0).string)
json_ = None
if type_ == ['StrOrList']:
type_ = ['str, list']
elif type_[0] == 'Paginated':
json_ = json_encoder.encode_pagination_doc(
getattr(json_encoder, f'{type_[2].lower()}_doc')
)
elif len(type_) == 1 and hasattr(json_encoder, f'{type_[0].lower()}_doc'):
json_ = getattr(json_encoder, f'{type_[0].lower()}_doc')
return ''.join(type_), json_
def parse_argument(tokens, method_name='') -> dict:
arg = {
'name': tokens.pop(0).string,
'desc': []
}
if arg['name'] == 'self':
return {}
if tokens[0].string == ':':
tokens.pop(0)
type_tokens = []
while tokens[0].string not in ('=', ',', ')') and tokens[0].type != token.COMMENT:
type_tokens.append(tokens.pop(0))
arg['type'] = parse_type(type_tokens)[0]
if tokens[0].string == '=':
tokens.pop(0)
default = tokens.pop(0)
if default.type == token.NAME:
default_value = eval(default.string) # pylint: disable=eval-used
if default_value is not None:
arg['default'] = default_value
elif default.type == token.NUMBER:
arg['default'] = int(default.string)
else:
arg['default'] = default.string
if tokens[0].string == ',':
tokens.pop(0)
if 'type' not in arg:
if 'default' in arg:
arg['type'] = type(arg['default']).__name__
else:
raise Exception(f"No type and no default value for {arg['name']} in {method_name}")
for part in tokens:
if part.type == token.COMMENT:
arg['desc'].append(part.string[1:].strip())
return arg
def produce_argument_tokens(src: str):
in_args = False
in_comment = False
parsed = []
for t in tokenize.tokenize(BytesIO(src.encode()).readline):
if t.string == '(':
in_args = True
elif in_args:
if not in_comment and t.string == ',':
in_comment = True
elif in_comment and (t.type == token.NAME or t.string == '**'):
if not parsed[0].string.startswith('_'):
yield parsed
in_comment = False
parsed = []
if t.type in (token.NAME, token.OP, token.COMMENT, token.STRING, token.NUMBER):
parsed.append(t)
if t.string == ')':
if not parsed[0].string.startswith('_'):
yield parsed
break
def parse_return(tokens) -> dict:
d = {'desc': [], 'type': None}
if tokens[0].string == '->':
tokens.pop(0)
type_tokens = []
while tokens[0].string != ':':
type_tokens.append(tokens.pop(0))
d['type'], _ = parse_type(type_tokens)
if _:
d['json'] = _
assert tokens.pop(0).string == ':'
for part in tokens:
if part.type == token.COMMENT:
d['desc'].append(part.string[1:].strip())
return d
def produce_return_tokens(src: str):
in_return = False
parsed = []
for t in tokenize.tokenize(BytesIO(src.encode()).readline):
if t.string == ')':
in_return = True
elif in_return:
if t.type == token.INDENT:
break
parsed.append(t)
return parsed
def parse_method(method, expanders: dict) -> dict:
d = {
'name': method.__name__,
'desc': parse_description(textwrap.dedent(method.__doc__)) if method.__doc__ else {},
'method': method,
'arguments': [],
'returns': None
}
src = inspect.getsource(method)
for tokens in produce_argument_tokens(src):
if tokens[0].string == '**':
tokens.pop(0)
d['kwargs'] = []
expander_names = tokens.pop(0).string[:-7]
if expander_names.startswith('_'):
continue
for expander_name in expander_names.split('_and_'):
if expander_name not in expanders:
raise Exception(f"Expander '{expander_name}' not found, used by {d['name']}.")
d['arguments'].extend(expanders[expander_name])
d['kwargs'].extend(expanders[expander_name])
else:
arg = parse_argument(tokens, d['name'])
if arg:
d['arguments'].append(arg)
d['returns'] = parse_return(produce_return_tokens(src))
return d
def get_expanders():
expanders = {}
for name, func in api.kwarg_expanders.items():
if name.endswith('_original'):
expanders[name[:-len('_original')]] = parse_method(func, expanders)['arguments']
return expanders
def get_groups(cls):
return {
group_name[:-len('_DOC')].lower(): getattr(cls, group_name).strip()
for group_name in dir(cls) if group_name.endswith('_DOC')
}
def get_methods(cls):
expanders = get_expanders()
return {
method: parse_method(getattr(cls, method), expanders)
for method in dir(cls) if not method.endswith('_DOC') and not method.startswith('_')
}
def generate_options(method, indent) -> List[str]:
if not method['arguments']:
return []
flags = []
for arg in method['arguments']:
if arg['type'] == 'bool':
flags.append(f"--{arg['name']}")
else:
flags.append(f"--{arg['name']}=<{arg['name']}>")
max_len = max(len(f) for f in flags) + 1
flags = [f.ljust(max_len) for f in flags]
options = []
for flag, arg in zip(flags, method['arguments']):
left = f"{indent}{flag}: "
text = f"({arg['type']}) {' '.join(arg['desc'])}"
if 'default' in arg:
if arg['type'] != 'bool':
text += f" [default: {arg['default']}]"
wrapped = textwrap.wrap(text, LINE_WIDTH-len(left))
lines = [f"{left}{wrapped.pop(0)}"]
for line in wrapped:
lines.append(f"{' '*len(left)} {line}")
options.extend(lines)
return options
def generate_help(command):
indent = 4
text = []
desc = command['desc']
for line in desc.get('text', []):
text.append(line)
text.append('')
usage, kwargs_offset = desc.get('usage', []), desc.get('kwargs', False)
text.append('Usage:')
if usage:
for line in usage:
text.append(line)
else:
text.append(f"{' '*indent}{command['cli']}")
if kwargs_offset:
flags = []
for arg in command['kwargs']:
if arg['type'] == 'bool':
flags.append(f"[--{arg['name']}]")
elif 'list' in arg['type']:
flags.append(f"[--{arg['name']}=<{arg['name']}>...]")
else:
flags.append(f"[--{arg['name']}=<{arg['name']}>]")
wrapped = textwrap.wrap(' '.join(flags), LINE_WIDTH-kwargs_offset)
for line in wrapped:
text.append(f"{' '*kwargs_offset}{line}")
text.append('')
options = desc.get('options', [])
if options or command['arguments']:
text.append('Options:')
for line in options:
text.append(line)
text.extend(generate_options(command, ' '*indent))
text.append('')
returns = desc.get('returns', [])
if returns or command['returns']['type']:
text.append('Returns:')
if command['returns']['type']:
return_comment = ' '.join(command['returns']['desc'])
text.append(f"{' '*indent}({command['returns']['type']}) {return_comment}")
text.extend(returns)
if 'json' in command['returns']:
dump = json.dumps(command['returns']['json'], indent=4)
text.extend(textwrap.indent(dump, ' '*indent).splitlines())
return '\n'.join(text)
def get_api_definitions(cls):
groups = get_groups(cls)
commands = get_methods(cls)
for name, command in commands.items():
parts = name.split('_')
if parts[0] in groups:
command['name'] = '_'.join(parts[1:])
command['group'] = parts[0]
command['cli'] = f"{command['group']} {command['name']}"
else:
command['cli'] = command['name']
command['help'] = generate_help(command)
return {'groups': groups, 'commands': commands}
def write(fp):
fp.write('# pylint: skip-file\n')
fp.write('# DO NOT EDIT: GENERATED FILE\n')
fp.write(f'interface = ')
defs = get_api_definitions(api.API)
for c in defs['commands'].values():
del c['method']
j = json.dumps(defs, indent=4)
j = j.replace(': false', ': False')
j = j.replace(': true', ': True')
j = j.replace(': null', ': None')
fp.write(j)
fp.write('\n')
def main():
with open('metadata.py', 'w') as fp:
write(fp)
if __name__ == "__main__":
main()