Unified CLI, python 3(WIP) (#1330)

* Added new custom cli class using aiohttp
* Proper error handling in CLI based on RPC error codes(PoC)
* Auth API working
* UnitTests
This commit is contained in:
hackrush 2018-08-05 02:49:10 +05:30 committed by Jack Robison
parent f41229cb5b
commit a7ef8889dd
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
14 changed files with 280 additions and 158 deletions

View file

@ -1,36 +1,52 @@
import sys
import json
import asyncio
import aiohttp
from aiohttp.client_exceptions import ClientConnectorError
from requests.exceptions import ConnectionError
from docopt import docopt
from textwrap import dedent
from lbrynet.daemon.auth.client import LBRYAPIClient
from lbrynet.core.system_info import get_platform
from lbrynet.daemon.Daemon import Daemon
from lbrynet.daemon.DaemonControl import start
from lbrynet.daemon.DaemonControl import start as daemon_main
from lbrynet.daemon.DaemonConsole import main as daemon_console
async def execute_command(command, args):
message = {'method': command, 'params': args}
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost:5279/lbryapi', json=message) as resp:
print(json.dumps(await resp.json(), indent=4))
async def execute_command(method, params, conf_path=None):
# this check if the daemon is running or not
try:
api = LBRYAPIClient.get_client(conf_path)
await api.status()
except (ClientConnectorError, ConnectionError):
print("Could not connect to daemon. Are you sure it's running?")
return 1
# this actually executes the method
try:
resp = await api.call(method, params)
print(json.dumps(resp["result"], indent=2))
except KeyError:
if resp["error"]["code"] == -32500:
print(json.dumps(resp["error"], indent=2))
else:
print(json.dumps(resp["error"]["message"], indent=2))
def print_help():
print(dedent("""
NAME
lbry - LBRY command line client.
lbrynet - LBRY command line client.
USAGE
lbry [--conf <config file>] <command> [<args>]
lbrynet [--conf <config file>] <command> [<args>]
EXAMPLES
lbry commands # list available commands
lbry status # get daemon status
lbry --conf ~/l1.conf status # like above but using ~/l1.conf as config file
lbry resolve_name what # resolve a name
lbry help resolve_name # get help for a command
lbrynet commands # list available commands
lbrynet status # get daemon status
lbrynet --conf ~/l1.conf status # like above but using ~/l1.conf as config file
lbrynet resolve_name what # resolve a name
lbrynet help resolve_name # get help for a command
"""))
@ -42,14 +58,14 @@ def print_help_for_command(command):
print("Invalid command name")
def guess_type(x, key=None):
def normalize_value(x, key=None):
if not isinstance(x, str):
return x
if key in ('uri', 'channel_name', 'name', 'file_name', 'download_directory'):
return x
if x in ('true', 'True', 'TRUE'):
if x.lower() == 'true':
return True
if x in ('false', 'False', 'FALSE'):
if x.lower() == 'false':
return False
if '.' in x:
try:
@ -79,7 +95,7 @@ def set_kwargs(parsed_args):
k = remove_brackets(key[2:])
elif remove_brackets(key) not in kwargs:
k = remove_brackets(key)
kwargs[k] = guess_type(arg, k)
kwargs[k] = normalize_value(arg, k)
return kwargs
@ -89,6 +105,16 @@ def main(argv=None):
print_help()
return 1
conf_path = None
if len(argv) and argv[0] == "--conf":
if len(argv) < 2:
print("No config file specified for --conf option")
print_help()
return 1
conf_path = argv[1]
argv = argv[2:]
method, args = argv[0], argv[1:]
if method in ['help', '--help', '-h']:
@ -96,24 +122,31 @@ def main(argv=None):
print_help_for_command(args[0])
else:
print_help()
return 0
elif method in ['version', '--version', '-v']:
print(json.dumps(get_platform(get_ip=False), sort_keys=True, indent=4, separators=(',', ': ')))
print(json.dumps(get_platform(get_ip=False), sort_keys=True, indent=2, separators=(',', ': ')))
return 0
elif method == 'start':
start(args)
sys.exit(daemon_main(args, conf_path))
elif method == 'console':
sys.exit(daemon_console())
elif method not in Daemon.callable_methods:
print('"{}" is not a valid command.'.format(method))
return 1
if method not in Daemon.deprecated_methods:
print('{} is not a valid command.'.format(method))
return 1
new_method = Daemon.deprecated_methods[method].new_command
print("{} is deprecated, using {}.".format(method, new_method))
method = new_method
else:
fn = Daemon.callable_methods[method]
parsed = docopt(fn.__doc__, args)
kwargs = set_kwargs(parsed)
loop = asyncio.get_event_loop()
loop.run_until_complete(execute_command(method, kwargs))
fn = Daemon.callable_methods[method]
parsed = docopt(fn.__doc__, args)
params = set_kwargs(parsed)
loop = asyncio.get_event_loop()
loop.run_until_complete(execute_command(method, params, conf_path))
return 0

View file

@ -1,3 +1,7 @@
class RPCError(Exception):
code = 0
class PriceDisagreementError(Exception):
pass
@ -41,8 +45,8 @@ class NullFundsError(Exception):
pass
class InsufficientFundsError(Exception):
pass
class InsufficientFundsError(RPCError):
code = -310
class ConnectionClosedBeforeResponseError(Exception):
@ -77,11 +81,13 @@ class UnknownURI(Exception):
super().__init__('URI {} cannot be resolved'.format(uri))
self.name = uri
class UnknownOutpoint(Exception):
def __init__(self, outpoint):
super().__init__('Outpoint {} cannot be resolved'.format(outpoint))
self.outpoint = outpoint
class InvalidName(Exception):
def __init__(self, name, invalid_characters):
self.name = name

View file

@ -101,7 +101,7 @@ def obfuscate(plain):
return rot13(base64.b64encode(plain).decode())
def check_connection(server="lbry.io", port=80, timeout=2):
def check_connection(server="lbry.io", port=80, timeout=5):
"""Attempts to open a socket to server:port and returns True if successful."""
log.debug('Checking connection to %s:%s', server, port)
try:

View file

@ -71,7 +71,7 @@ def main():
if method not in Daemon.deprecated_methods:
print_error("\"%s\" is not a valid command." % method)
return
new_method = Daemon.deprecated_methods[method]._new_command
new_method = Daemon.deprecated_methods[method].new_command
print_error("\"%s\" is deprecated, using \"%s\"." % (method, new_method))
method = new_method

View file

@ -1,8 +1,11 @@
import sys
import code
import argparse
import asyncio
import logging.handlers
from twisted.internet import defer, reactor, threads
from aiohttp import client_exceptions
from lbrynet import analytics
from lbrynet import conf
from lbrynet.core import utils
@ -10,8 +13,6 @@ from lbrynet.core import log_support
from lbrynet.daemon.auth.client import LBRYAPIClient
from lbrynet.daemon.Daemon import Daemon
get_client = LBRYAPIClient.get_client
log = logging.getLogger(__name__)
@ -114,7 +115,7 @@ def get_methods(daemon):
locs = {}
def wrapped(name, fn):
client = get_client()
client = LBRYAPIClient.get_client()
_fn = getattr(client, name)
_fn.__doc__ = fn.__doc__
return {name: _fn}
@ -181,18 +182,18 @@ def threaded_terminal(started_daemon, quiet):
d.addErrback(log.exception)
def start_lbrynet_console(quiet, use_existing_daemon, useauth):
async def start_lbrynet_console(quiet, use_existing_daemon, useauth):
if not utils.check_connection():
print("Not connected to internet, unable to start")
raise Exception("Not connected to internet, unable to start")
if not quiet:
print("Starting lbrynet-console...")
try:
get_client().status()
await LBRYAPIClient.get_client().status()
d = defer.succeed(False)
if not quiet:
print("lbrynet-daemon is already running, connecting to it...")
except:
except client_exceptions.ClientConnectorError:
if not use_existing_daemon:
if not quiet:
print("Starting lbrynet-daemon...")
@ -222,7 +223,8 @@ def main():
"--http-auth", dest="useauth", action="store_true", default=conf.settings['use_auth_http']
)
args = parser.parse_args()
start_lbrynet_console(args.quiet, args.use_existing_daemon, args.useauth)
loop = asyncio.get_event_loop()
loop.run_until_complete(start_lbrynet_console(args.quiet, args.use_existing_daemon, args.useauth))
reactor.run()

View file

@ -13,7 +13,6 @@ import argparse
import logging.handlers
from twisted.internet import reactor
#from jsonrpc.proxy import JSONRPCProxy
from lbrynet import conf
from lbrynet.core import utils, system_info
@ -26,20 +25,13 @@ def test_internet_connection():
return utils.check_connection()
def start(argv):
"""The primary entry point for launching the daemon."""
def start(argv=None, conf_path=None):
if conf_path is not None:
conf.conf_file = conf_path
# postpone loading the config file to after the CLI arguments
# have been parsed, as they may contain an alternate config file location
conf.initialize_settings(load_conf_file=False)
conf.initialize_settings()
parser = argparse.ArgumentParser(description="Launch lbrynet-daemon")
parser.add_argument(
"--conf",
help="specify an alternative configuration file",
type=str,
default=None
)
parser = argparse.ArgumentParser()
parser.add_argument(
"--http-auth", dest="useauth", action="store_true", default=conf.settings['use_auth_http']
)
@ -58,9 +50,8 @@ def start(argv):
)
args = parser.parse_args(argv)
update_settings_from_args(args)
conf.settings.load_conf_file_settings()
if args.useauth:
conf.settings.update({'use_auth_http': args.useauth}, data_types=(conf.TYPE_CLI,))
if args.version:
version = system_info.get_platform(get_ip=False)
@ -90,17 +81,3 @@ def start(argv):
reactor.run()
else:
log.info("Not connected to internet, unable to start")
def update_settings_from_args(args):
if args.conf:
conf.conf_file = args.conf
if args.useauth:
conf.settings.update({
'use_auth_http': args.useauth,
}, data_types=(conf.TYPE_CLI,))
if __name__ == "__main__":
start(sys.argv[1:])

View file

@ -39,8 +39,12 @@ class PasswordChecker:
return cls(passwords)
def requestAvatarId(self, creds):
if creds.username in self.passwords:
pw = self.passwords.get(creds.username)
password_dict_bytes = {}
for api in self.passwords:
password_dict_bytes.update({api.encode(): self.passwords[api].encode()})
if creds.username in password_dict_bytes:
pw = password_dict_bytes.get(creds.username)
pw_match = creds.checkPassword(pw)
if pw_match:
return defer.succeed(creds.username)

View file

@ -1,11 +1,11 @@
# pylint: skip-file
import os
import json
import urlparse
import aiohttp
from urllib.parse import urlparse
import requests
from requests.cookies import RequestsCookieJar
import logging
from jsonrpc.proxy import JSONRPCProxy
from lbrynet import conf
from lbrynet.daemon.auth.util import load_api_keys, APIKey, API_KEY_NAME, get_auth_message
@ -14,6 +14,7 @@ USER_AGENT = "AuthServiceProxy/0.1"
TWISTED_SESSION = "TWISTED_SESSION"
LBRY_SECRET = "LBRY_SECRET"
HTTP_TIMEOUT = 30
SCHEME = "http"
def copy_cookies(cookies):
@ -28,6 +29,32 @@ class JSONRPCException(Exception):
self.error = rpc_error
class UnAuthAPIClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.scheme = SCHEME
def __getattr__(self, method):
async def f(*args, **kwargs):
return await self.call(method, [args, kwargs])
return f
@classmethod
def from_url(cls, url):
url_fragment = urlparse(url)
host = url_fragment.hostname
port = url_fragment.port
return cls(host, port)
async def call(self, method, params=None):
message = {'method': method, 'params': params}
async with aiohttp.ClientSession() as session:
async with session.get('{}://{}:{}'.format(self.scheme, self.host, self.port), json=message) as resp:
return await resp.json()
class AuthAPIClient:
def __init__(self, key, timeout, connection, count, cookies, url, login_url):
self.__api_key = key
@ -46,7 +73,7 @@ class AuthAPIClient:
return f
def call(self, method, params=None):
async def call(self, method, params=None):
params = params or {}
self.__id_count += 1
pre_auth_post_data = {
@ -56,34 +83,27 @@ class AuthAPIClient:
'id': self.__id_count
}
to_auth = get_auth_message(pre_auth_post_data)
pre_auth_post_data.update({'hmac': self.__api_key.get_hmac(to_auth)})
pre_auth_post_data.update({'hmac': self.__api_key.get_hmac(to_auth).decode()})
post_data = json.dumps(pre_auth_post_data)
cookies = copy_cookies(self.__cookies)
req = requests.Request(
method='POST', url=self.__service_url, data=post_data, cookies=cookies,
headers={
'Host': self.__url.hostname,
'User-Agent': USER_AGENT,
'Content-type': 'application/json'
'Host': self.__url.hostname,
'User-Agent': USER_AGENT,
'Content-type': 'application/json'
}
)
http_response = self.__conn.send(req.prepare())
if http_response is None:
raise JSONRPCException({
'code': -342, 'message': 'missing HTTP response from server'})
raise JSONRPCException({'code': -342, 'message': 'missing HTTP response from server'})
http_response.raise_for_status()
next_secret = http_response.headers.get(LBRY_SECRET, False)
if next_secret:
self.__api_key.secret = next_secret
self.__cookies = copy_cookies(http_response.cookies)
response = http_response.json()
if response.get('error') is not None:
raise JSONRPCException(response['error'])
elif 'result' not in response:
raise JSONRPCException({
'code': -343, 'message': 'missing JSON-RPC result'})
else:
return response['result']
return http_response.json()
@classmethod
def config(cls, key_name=None, key=None, pw_path=None, timeout=HTTP_TIMEOUT, connection=None, count=0,
@ -97,24 +117,23 @@ class AuthAPIClient:
else:
api_key = APIKey(name=api_key_name, secret=key)
if login_url is None:
service_url = "http://%s:%s@%s:%i/%s" % (api_key_name,
api_key.secret,
conf.settings['api_host'],
conf.settings['api_port'],
conf.settings['API_ADDRESS'])
service_url = "http://{}:{}@{}:{}".format(
api_key_name, api_key.secret, conf.settings['api_host'], conf.settings['api_port']
)
else:
service_url = login_url
id_count = count
if auth is None and connection is None and cookies is None and url is None:
# This is a new client instance, start an authenticated session
url = urlparse.urlparse(service_url)
url = urlparse(service_url)
conn = requests.Session()
req = requests.Request(method='POST',
url=service_url,
headers={'Host': url.hostname,
'User-Agent': USER_AGENT,
'Content-type': 'application/json'},)
req = requests.Request(
method='POST', url=service_url, headers={
'Host': url.hostname,
'User-Agent': USER_AGENT,
'Content-type': 'application/json'
})
r = req.prepare()
http_response = conn.send(r)
cookies = RequestsCookieJar()
@ -133,8 +152,9 @@ class AuthAPIClient:
class LBRYAPIClient:
@staticmethod
def get_client():
def get_client(conf_path=None):
conf.conf_file = conf_path
if not conf.settings:
conf.initialize_settings()
return AuthAPIClient.config() if conf.settings['use_auth_http'] else \
JSONRPCProxy.from_url(conf.settings.get_api_connection_string())
UnAuthAPIClient.from_url(conf.settings.get_api_connection_string())

View file

@ -81,8 +81,8 @@ class JSONRPCError:
}
@classmethod
def create_from_exception(cls, exception, code=CODE_APPLICATION_ERROR, traceback=None):
return cls(exception.message, code=code, traceback=traceback)
def create_from_exception(cls, message, code=CODE_APPLICATION_ERROR, traceback=None):
return cls(message, code=code, traceback=traceback)
def default_decimal(obj):
@ -109,8 +109,7 @@ def jsonrpc_dumps_pretty(obj, **kwargs):
else:
data = {"jsonrpc": "2.0", "result": obj, "id": id_}
return json.dumps(data, cls=jsonrpclib.JSONRPCEncoder, sort_keys=True, indent=2,
separators=(',', ': '), **kwargs) + "\n"
return json.dumps(data, cls=jsonrpclib.JSONRPCEncoder, sort_keys=True, indent=2, **kwargs) + "\n"
class JSONRPCServerType(type):
@ -134,7 +133,7 @@ class AuthorizedBase(metaclass=JSONRPCServerType):
@staticmethod
def deprecated(new_command=None):
def _deprecated_wrapper(f):
f._new_command = new_command
f.new_command = new_command
f._deprecated = True
return f
return _deprecated_wrapper
@ -284,8 +283,8 @@ class AuthJSONRPCServer(AuthorizedBase):
request.setHeader(LBRY_SECRET, self.sessions.get(session_id).secret)
@staticmethod
def _render_message(request, message):
request.write(message)
def _render_message(request, message: str):
request.write(message.encode())
request.finish()
def _render_error(self, failure, request, id_):
@ -296,8 +295,15 @@ class AuthJSONRPCServer(AuthorizedBase):
error = failure.check(JSONRPCError)
if error is None:
# maybe its a twisted Failure with another type of error
error = JSONRPCError(failure.getErrorMessage() or failure.type.__name__,
traceback=failure.getTraceback())
if hasattr(failure.type, "code"):
error_code = failure.type.code
else:
error_code = JSONRPCError.CODE_APPLICATION_ERROR
error = JSONRPCError.create_from_exception(
failure.getErrorMessage() or failure.type.__name__,
code=error_code,
traceback=failure.getTraceback()
)
if not failure.check(ComponentsNotStarted, ComponentStartConditionNotMet):
log.warning("error processing api request: %s\ntraceback: %s", error.message,
"\n".join(error.traceback))
@ -321,7 +327,7 @@ class AuthJSONRPCServer(AuthorizedBase):
return self._render(request)
except BaseException as e:
log.error(e)
error = JSONRPCError.create_from_exception(e, traceback=format_exc())
error = JSONRPCError.create_from_exception(str(e), traceback=format_exc())
self._render_error(error, request, None)
return server.NOT_DONE_YET
@ -352,12 +358,12 @@ class AuthJSONRPCServer(AuthorizedBase):
session.touch()
request.content.seek(0, 0)
content = request.content.read()
content = request.content.read().decode()
try:
parsed = jsonrpclib.loads(content)
except ValueError:
except json.JSONDecodeError:
log.warning("Unable to decode request json")
self._render_error(JSONRPCError(None, JSONRPCError.CODE_PARSE_ERROR), request, None)
self._render_error(JSONRPCError(None, code=JSONRPCError.CODE_PARSE_ERROR), request, None)
return server.NOT_DONE_YET
request_id = None
@ -381,7 +387,8 @@ class AuthJSONRPCServer(AuthorizedBase):
log.warning("API validation failed")
self._render_error(
JSONRPCError.create_from_exception(
err, code=JSONRPCError.CODE_AUTHENTICATION_ERROR,
str(err),
code=JSONRPCError.CODE_AUTHENTICATION_ERROR,
traceback=format_exc()
),
request, request_id
@ -396,7 +403,7 @@ class AuthJSONRPCServer(AuthorizedBase):
except UnknownAPIMethodError as err:
log.warning('Failed to get function %s: %s', function_name, err)
self._render_error(
JSONRPCError(None, JSONRPCError.CODE_METHOD_NOT_FOUND),
JSONRPCError(None, code=JSONRPCError.CODE_METHOD_NOT_FOUND),
request, request_id
)
return server.NOT_DONE_YET
@ -507,7 +514,7 @@ class AuthJSONRPCServer(AuthorizedBase):
def _get_jsonrpc_method(self, function_path):
if function_path in self.deprecated_methods:
new_command = self.deprecated_methods[function_path]._new_command
new_command = self.deprecated_methods[function_path].new_command
log.warning('API function \"%s\" is deprecated, please update to use \"%s\"',
function_path, new_command)
function_path = new_command
@ -565,10 +572,10 @@ class AuthJSONRPCServer(AuthorizedBase):
def _callback_render(self, result, request, id_, auth_required=False):
try:
encoded_message = jsonrpc_dumps_pretty(result, id=id_, default=default_decimal).encode()
message = jsonrpc_dumps_pretty(result, id=id_, default=default_decimal)
request.setResponseCode(200)
self._set_headers(request, encoded_message, auth_required)
self._render_message(request, encoded_message)
self._set_headers(request, message, auth_required)
self._render_message(request, message)
except Exception as err:
log.exception("Failed to render API response: %s", result)
self._render_error(err, request, id_)

View file

@ -12,12 +12,12 @@ API_KEY_NAME = "api"
LBRY_SECRET = "LBRY_SECRET"
def sha(x):
def sha(x: bytes) -> bytes:
h = hashlib.sha256(x).digest()
return base58.b58encode(h)
def generate_key(x=None):
def generate_key(x: bytes=None) -> bytes:
if x is None:
return sha(os.urandom(256))
else:
@ -41,7 +41,7 @@ class APIKey:
def get_hmac(self, message):
decoded_key = self._raw_key()
signature = hmac.new(decoded_key, message, hashlib.sha256)
signature = hmac.new(decoded_key, message.encode(), hashlib.sha256)
return base58.b58encode(signature.digest())
def compare_hmac(self, message, token):
@ -66,7 +66,7 @@ def load_api_keys(path):
keys_for_return = {}
for key_name in data:
key = data[key_name]
secret = key['secret']
secret = key['secret'].decode()
expiration = key['expiration']
keys_for_return.update({key_name: APIKey(secret, key_name, expiration)})
return keys_for_return

View file

View file

@ -0,0 +1,6 @@
from lbrynet import conf
from lbrynet import cli
class CLIIntegrationTest:
pass

100
tests/test_cli.py Normal file
View file

@ -0,0 +1,100 @@
import contextlib
import json
from io import StringIO
from twisted.trial import unittest
from lbrynet.core.system_info import get_platform
from lbrynet.cli import normalize_value, main
class CLITest(unittest.TestCase):
def test_guess_type(self):
self.assertEqual('0.3.8', normalize_value('0.3.8'))
self.assertEqual(0.3, normalize_value('0.3'))
self.assertEqual(3, normalize_value('3'))
self.assertEqual(3, normalize_value(3))
self.assertEqual(
'VdNmakxFORPSyfCprAD/eDDPk5TY9QYtSA==',
normalize_value('VdNmakxFORPSyfCprAD/eDDPk5TY9QYtSA==')
)
self.assertEqual(True, normalize_value('TRUE'))
self.assertEqual(True, normalize_value('true'))
self.assertEqual(True, normalize_value('TrUe'))
self.assertEqual(False, normalize_value('FALSE'))
self.assertEqual(False, normalize_value('false'))
self.assertEqual(False, normalize_value('FaLsE'))
self.assertEqual(True, normalize_value(True))
self.assertEqual('3', normalize_value('3', key="uri"))
self.assertEqual('0.3', normalize_value('0.3', key="uri"))
self.assertEqual('True', normalize_value('True', key="uri"))
self.assertEqual('False', normalize_value('False', key="uri"))
self.assertEqual('3', normalize_value('3', key="file_name"))
self.assertEqual('3', normalize_value('3', key="name"))
self.assertEqual('3', normalize_value('3', key="download_directory"))
self.assertEqual('3', normalize_value('3', key="channel_name"))
self.assertEqual(3, normalize_value('3', key="some_other_thing"))
def test_help_command(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
main(['help'])
actual_output = actual_output.getvalue()
self.assertSubstring('lbrynet - LBRY command line client.', actual_output)
self.assertSubstring('USAGE', actual_output)
def test_help_for_command_command(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
main(['help', 'publish'])
actual_output = actual_output.getvalue()
self.assertSubstring('Make a new name claim and publish', actual_output)
self.assertSubstring('Usage:', actual_output)
def test_help_for_command_command_with_invalid_command(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
main(['help', 'publish1'])
self.assertSubstring('Invalid command name', actual_output.getvalue())
def test_version_command(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
main(['version'])
self.assertEqual(
actual_output.getvalue().strip(),
json.dumps(get_platform(get_ip=False), sort_keys=True, indent=2)
)
def test_invalid_command(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
main(['publish1'])
self.assertEqual(
actual_output.getvalue().strip(),
"publish1 is not a valid command."
)
def test_valid_command_daemon_not_started(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
main(["publish", '--name=asd', '--bid=99'])
self.assertEqual(
actual_output.getvalue().strip(),
"Could not connect to daemon. Are you sure it's running?"
)
def test_deprecated_command_daemon_not_started(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
main(["channel_list_mine"])
self.assertEqual(
actual_output.getvalue().strip(),
"channel_list_mine is deprecated, using channel_list.\n"
"Could not connect to daemon. Are you sure it's running?"
)

View file

@ -1,33 +0,0 @@
from unittest import skip
from twisted.trial import unittest
# from lbrynet.daemon import DaemonCLI
@skip('cli is being rewritten to work in py3')
class DaemonCLITests(unittest.TestCase):
def test_guess_type(self):
self.assertEqual('0.3.8', DaemonCLI.guess_type('0.3.8'))
self.assertEqual(0.3, DaemonCLI.guess_type('0.3'))
self.assertEqual(3, DaemonCLI.guess_type('3'))
self.assertEqual('VdNmakxFORPSyfCprAD/eDDPk5TY9QYtSA==',
DaemonCLI.guess_type('VdNmakxFORPSyfCprAD/eDDPk5TY9QYtSA=='))
self.assertEqual(0.3, DaemonCLI.guess_type('0.3'))
self.assertEqual(True, DaemonCLI.guess_type('TRUE'))
self.assertEqual(True, DaemonCLI.guess_type('true'))
self.assertEqual(True, DaemonCLI.guess_type('True'))
self.assertEqual(False, DaemonCLI.guess_type('FALSE'))
self.assertEqual(False, DaemonCLI.guess_type('false'))
self.assertEqual(False, DaemonCLI.guess_type('False'))
self.assertEqual('3', DaemonCLI.guess_type('3', key="uri"))
self.assertEqual('0.3', DaemonCLI.guess_type('0.3', key="uri"))
self.assertEqual('True', DaemonCLI.guess_type('True', key="uri"))
self.assertEqual('False', DaemonCLI.guess_type('False', key="uri"))
self.assertEqual('3', DaemonCLI.guess_type('3', key="file_name"))
self.assertEqual('3', DaemonCLI.guess_type('3', key="name"))
self.assertEqual('3', DaemonCLI.guess_type('3', key="download_directory"))
self.assertEqual('3', DaemonCLI.guess_type('3', key="channel_name"))
self.assertEqual(3, DaemonCLI.guess_type('3', key="some_other_thing"))