225 lines
9.7 KiB
Python
225 lines
9.7 KiB
Python
import os
|
|
import tempfile
|
|
import shutil
|
|
import contextlib
|
|
import logging
|
|
from io import StringIO
|
|
from unittest import TestCase
|
|
from unittest.mock import patch
|
|
from types import SimpleNamespace
|
|
from contextlib import asynccontextmanager
|
|
|
|
import docopt
|
|
from lbry.testcase import AsyncioTestCase
|
|
|
|
from lbry.extras.cli import normalize_value, main, setup_logging
|
|
from lbry.extras.system_info import get_platform
|
|
from lbry.extras.daemon.daemon import Daemon
|
|
from lbry.extras.daemon.loggly_handler import HTTPSLogglyHandler
|
|
from lbry.conf import Config
|
|
from lbry.extras import cli
|
|
|
|
|
|
@asynccontextmanager
|
|
async def get_logger(argv, **conf_options):
|
|
# loggly requires loop, so we do this in async function
|
|
|
|
logger = logging.getLogger('test-root-logger')
|
|
temp_dir = tempfile.mkdtemp()
|
|
temp_config = os.path.join(temp_dir, 'settings.yml')
|
|
|
|
try:
|
|
# create a config (to be loaded on startup)
|
|
_conf = Config.create_from_arguments(SimpleNamespace(config=temp_config))
|
|
with _conf.update_config():
|
|
for opt_name, opt_value in conf_options.items():
|
|
setattr(_conf, opt_name, opt_value)
|
|
|
|
# do what happens on startup
|
|
argv.extend(['--data-dir', temp_dir])
|
|
argv.extend(['--wallet-dir', temp_dir])
|
|
argv.extend(['--config', temp_config])
|
|
parser = cli.get_argument_parser()
|
|
args, command_args = parser.parse_known_args(argv)
|
|
conf: Config = Config.create_from_arguments(args)
|
|
setup_logging(logger, args, conf)
|
|
yield logger
|
|
|
|
finally:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
for mod in cli.LOG_MODULES:
|
|
log = logger.getChild(mod)
|
|
log.setLevel(logging.NOTSET)
|
|
while log.handlers:
|
|
h = log.handlers[0]
|
|
log.removeHandler(log.handlers[0])
|
|
h.close()
|
|
|
|
|
|
class CLILoggingTest(AsyncioTestCase):
|
|
|
|
async def test_verbose_logging(self):
|
|
async with get_logger(["start", "--quiet"], share_usage_data=False) as log:
|
|
log = log.getChild("lbry")
|
|
self.assertTrue(log.isEnabledFor(logging.INFO))
|
|
self.assertFalse(log.isEnabledFor(logging.DEBUG))
|
|
self.assertFalse(log.isEnabledFor(logging.DEBUG))
|
|
self.assertEqual(len(log.handlers), 2)
|
|
self.assertIsInstance(log.handlers[0], logging.handlers.RotatingFileHandler)
|
|
self.assertFalse(log.handlers[1].enabled)
|
|
|
|
async with get_logger(["start", "--verbose"]) as log:
|
|
self.assertTrue(log.getChild("lbry").isEnabledFor(logging.DEBUG))
|
|
self.assertTrue(log.getChild("lbry").isEnabledFor(logging.INFO))
|
|
self.assertFalse(log.getChild("torba").isEnabledFor(logging.DEBUG))
|
|
|
|
async with get_logger(["start", "--verbose", "lbry.extras", "lbry.wallet", "torba.client"]) as log:
|
|
self.assertTrue(log.getChild("lbry.extras").isEnabledFor(logging.DEBUG))
|
|
self.assertTrue(log.getChild("lbry.wallet").isEnabledFor(logging.DEBUG))
|
|
self.assertTrue(log.getChild("torba.client").isEnabledFor(logging.DEBUG))
|
|
self.assertFalse(log.getChild("lbry").isEnabledFor(logging.DEBUG))
|
|
self.assertFalse(log.getChild("torba").isEnabledFor(logging.DEBUG))
|
|
|
|
async def test_loggly(self):
|
|
async with get_logger(["start"]) as log: # default share_usage_data=False
|
|
log = log.getChild("lbry")
|
|
self.assertIsInstance(log.handlers[0], logging.StreamHandler)
|
|
self.assertIsInstance(log.handlers[1], logging.StreamHandler)
|
|
self.assertIsInstance(log.handlers[2], HTTPSLogglyHandler)
|
|
self.assertFalse(log.handlers[2].enabled)
|
|
async with get_logger(["start"], share_usage_data=True) as log:
|
|
log = log.getChild("lbry")
|
|
self.assertEqual(len(log.handlers), 3)
|
|
self.assertIsInstance(log.handlers[2], HTTPSLogglyHandler)
|
|
self.assertTrue(log.handlers[2].enabled)
|
|
async with get_logger(["start"], share_usage_data=False) as log: # explicit share_usage_data=False
|
|
log = log.getChild("lbry")
|
|
self.assertEqual(len(log.handlers), 3)
|
|
self.assertIsInstance(log.handlers[2], HTTPSLogglyHandler)
|
|
self.assertFalse(log.handlers[2].enabled)
|
|
|
|
async def test_quiet(self):
|
|
async with get_logger(["start"]) as log: # default is loud
|
|
log = log.getChild("lbry")
|
|
self.assertEqual(len(log.handlers), 3)
|
|
self.assertIs(type(log.handlers[1]), logging.StreamHandler)
|
|
async with get_logger(["start", "--quiet"]) as log:
|
|
log = log.getChild("lbry")
|
|
self.assertEqual(len(log.handlers), 2)
|
|
self.assertIsNot(type(log.handlers[0]), logging.StreamHandler)
|
|
|
|
|
|
class CLITest(AsyncioTestCase):
|
|
|
|
@staticmethod
|
|
def shell(argv):
|
|
actual_output = StringIO()
|
|
with contextlib.redirect_stdout(actual_output):
|
|
with contextlib.redirect_stderr(actual_output):
|
|
try:
|
|
main(argv)
|
|
except SystemExit as e:
|
|
print(e.args[0])
|
|
return actual_output.getvalue().strip()
|
|
|
|
def test_guess_type(self):
|
|
self.assertEqual('0.3.8', normalize_value('0.3.8'))
|
|
self.assertEqual('0.3', normalize_value('0.3'))
|
|
self.assertEqual(3, normalize_value('3'))
|
|
self.assertEqual(3, normalize_value(3))
|
|
|
|
self.assertEqual(
|
|
'VdNmakxFORPSyfCprAD/eDDPk5TY9QYtSA==',
|
|
normalize_value('VdNmakxFORPSyfCprAD/eDDPk5TY9QYtSA==')
|
|
)
|
|
|
|
self.assertTrue(normalize_value('TRUE'))
|
|
self.assertTrue(normalize_value('true'))
|
|
self.assertTrue(normalize_value('TrUe'))
|
|
self.assertFalse(normalize_value('FALSE'))
|
|
self.assertFalse(normalize_value('false'))
|
|
self.assertFalse(normalize_value('FaLsE'))
|
|
self.assertTrue(normalize_value(True))
|
|
|
|
self.assertEqual('3', normalize_value('3', key="uri"))
|
|
self.assertEqual('0.3', normalize_value('0.3', key="uri"))
|
|
self.assertEqual('True', normalize_value('True', key="uri"))
|
|
self.assertEqual('False', normalize_value('False', key="uri"))
|
|
|
|
self.assertEqual('3', normalize_value('3', key="file_name"))
|
|
self.assertEqual('3', normalize_value('3', key="name"))
|
|
self.assertEqual('3', normalize_value('3', key="download_directory"))
|
|
self.assertEqual('3', normalize_value('3', key="channel_name"))
|
|
self.assertEqual('3', normalize_value('3', key="claim_name"))
|
|
|
|
self.assertEqual(3, normalize_value('3', key="some_other_thing"))
|
|
|
|
def test_help(self):
|
|
self.assertIn('lbrynet [-v] [--api HOST:PORT]', self.shell(['--help']))
|
|
# start is special command, with separate help handling
|
|
self.assertIn('--share-usage-data', self.shell(['start', '--help']))
|
|
# publish is ungrouped command, returns usage only implicitly
|
|
self.assertIn('publish (<name> | --name=<name>)', self.shell(['publish']))
|
|
# publish is ungrouped command, with explicit --help
|
|
self.assertIn('Create or replace a stream claim at a given name', self.shell(['publish', '--help']))
|
|
# account is a group, returns help implicitly
|
|
self.assertIn('Return the balance of an account', self.shell(['account']))
|
|
# account is a group, with explicit --help
|
|
self.assertIn('Return the balance of an account', self.shell(['account', '--help']))
|
|
# account add is a grouped command, returns usage implicitly
|
|
self.assertIn('account_add (<account_name> | --account_name=<account_name>)', self.shell(['account', 'add']))
|
|
# account add is a grouped command, with explicit --help
|
|
self.assertIn('Add a previously created account from a seed,', self.shell(['account', 'add', '--help']))
|
|
|
|
def test_help_error_handling(self):
|
|
# person tries `help` command, then they get help even though that's invalid command
|
|
self.assertIn('--config FILE', self.shell(['help']))
|
|
# help for invalid command, with explicit --help
|
|
self.assertIn('--config FILE', self.shell(['nonexistant', '--help']))
|
|
# help for invalid command, implicit
|
|
self.assertIn('--config FILE', self.shell(['nonexistant']))
|
|
|
|
def test_version_command(self):
|
|
self.assertEqual(
|
|
"lbrynet {lbrynet_version}".format(**get_platform()), self.shell(['--version'])
|
|
)
|
|
|
|
def test_valid_command_daemon_not_started(self):
|
|
self.assertEqual(
|
|
"Could not connect to daemon. Are you sure it's running?",
|
|
self.shell(["publish", 'asd'])
|
|
)
|
|
|
|
def test_deprecated_command_daemon_not_started(self):
|
|
actual_output = StringIO()
|
|
with contextlib.redirect_stdout(actual_output):
|
|
main(["channel", "new", "@foo", "1.0"])
|
|
self.assertEqual(
|
|
actual_output.getvalue().strip(),
|
|
"channel_new is deprecated, using channel_create.\n"
|
|
"Could not connect to daemon. Are you sure it's running?"
|
|
)
|
|
|
|
@patch.object(Daemon, 'start', spec=Daemon, wraps=Daemon.start)
|
|
def test_keyboard_interrupt_handling(self, mock_daemon_start):
|
|
def side_effect():
|
|
raise KeyboardInterrupt
|
|
|
|
mock_daemon_start.side_effect = side_effect
|
|
self.shell(["start", "--no-logging"])
|
|
mock_daemon_start.assert_called_once()
|
|
|
|
|
|
class DaemonDocsTests(TestCase):
|
|
|
|
def test_can_parse_api_method_docs(self):
|
|
failures = []
|
|
for name, fn in Daemon.callable_methods.items():
|
|
try:
|
|
docopt.docopt(fn.__doc__, ())
|
|
except docopt.DocoptLanguageError as err:
|
|
failures.append(f"invalid docstring for {name}, {err.args[0]}")
|
|
except docopt.DocoptExit:
|
|
pass
|
|
if failures:
|
|
self.fail("\n" + "\n".join(failures))
|