asyncio daemon

This commit is contained in:
Lex Berezhny 2018-12-12 22:32:44 -05:00
parent 0845d65f4e
commit 248baf58b4
5 changed files with 482 additions and 43 deletions

View file

@ -57,7 +57,7 @@ def start_daemon(settings: typing.Optional[typing.Dict] = None,
if check_connection():
daemon = Daemon()
daemon.start_listening()
asyncio.get_event_loop().run_until_complete(daemon.start_listening())
reactor.run()
else:
log.info("Not connected to internet, unable to start")

View file

@ -1,21 +1,18 @@
import logging.handlers
import os
import requests
import urllib
import json
import textwrap
from typing import Callable, Optional, List
from operator import itemgetter
from binascii import hexlify, unhexlify
from copy import deepcopy
from twisted.internet import defer, reactor
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure
from torba.client.baseaccount import SingleKey, HierarchicalDeterministic
from lbrynet import conf, utils, __version__
from lbrynet import __version__
from lbrynet.dht.error import TimeoutError
from lbrynet.blob.blob_file import is_valid_blobhash
from lbrynet.extras import system_info
@ -27,7 +24,6 @@ from lbrynet.extras.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT, PA
from lbrynet.extras.daemon.ComponentManager import RequiredCondition
from lbrynet.extras.daemon.Downloader import GetStream
from lbrynet.extras.daemon.Publisher import Publisher
from lbrynet.extras.daemon.auth.server import AuthJSONRPCServer
from lbrynet.extras.daemon.mime_types import guess_mime_type
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.wallet.account import Account as LBCAccount
@ -44,9 +40,66 @@ from lbrynet.schema.error import URIParseError, DecodeError
from lbrynet.schema.validator import validate_claim_id
from lbrynet.schema.address import decode_address
from lbrynet.schema.decode import smart_decode
from lbrynet.extras.daemon import analytics
from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.looping_call_manager import LoopingCallManager
from lbrynet.p2p.Error import ComponentsNotStarted, ComponentStartConditionNotMet
from lbrynet.extras.daemon.json_response_encoder import JSONResponseEncoder
import asyncio
import logging
from urllib import parse as urlparse
import json
import inspect
import signal
from functools import wraps
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from twisted.internet.error import ConnectionDone, ConnectionLost
from txjsonrpc import jsonrpclib
from traceback import format_exc
from lbrynet import utils
from lbrynet.p2p.Error import InvalidAuthenticationToken
from lbrynet.extras.daemon.undecorated import undecorated
from twisted.web import server
from lbrynet import conf
from aiohttp import web
log = logging.getLogger(__name__)
requires = AuthJSONRPCServer.requires
def requires(*components, **conditions):
if conditions and ["conditions"] != list(conditions.keys()):
raise SyntaxError("invalid conditions argument")
condition_names = conditions.get("conditions", [])
def _wrap(fn):
@wraps(fn)
def _inner(*args, **kwargs):
component_manager = args[0].component_manager
for condition_name in condition_names:
condition_result, err_msg = component_manager.evaluate_condition(condition_name)
if not condition_result:
raise ComponentStartConditionNotMet(err_msg)
if not component_manager.all_components_running(*components):
raise ComponentsNotStarted("the following required components have not yet started: "
"%s" % json.dumps(components))
return fn(*args, **kwargs)
return _inner
return _wrap
def deprecated(new_command=None):
def _deprecated_wrapper(f):
f.new_command = new_command
f._deprecated = True
return f
return _deprecated_wrapper
INITIALIZING_CODE = 'initializing'
@ -77,6 +130,9 @@ DIRECTION_ASCENDING = 'asc'
DIRECTION_DESCENDING = 'desc'
DIRECTIONS = DIRECTION_ASCENDING, DIRECTION_DESCENDING
EMPTY_PARAMS = [{}]
LBRY_SECRET = "LBRY_SECRET"
async def maybe_paginate(get_records: Callable, get_record_count: Callable,
page: Optional[int], page_size: Optional[int], **constraints):
@ -197,7 +253,97 @@ class WalletIsUnlocked(RequiredCondition):
return not component.check_locked()
class Daemon(AuthJSONRPCServer):
class JSONRPCError:
# http://www.jsonrpc.org/specification#error_object
CODE_PARSE_ERROR = -32700 # Invalid JSON. Error while parsing the JSON text.
CODE_INVALID_REQUEST = -32600 # The JSON sent is not a valid Request object.
CODE_METHOD_NOT_FOUND = -32601 # The method does not exist / is not available.
CODE_INVALID_PARAMS = -32602 # Invalid method parameter(s).
CODE_INTERNAL_ERROR = -32603 # Internal JSON-RPC error (I think this is like a 500?)
CODE_APPLICATION_ERROR = -32500 # Generic error with our app??
CODE_AUTHENTICATION_ERROR = -32501 # Authentication failed
MESSAGES = {
CODE_PARSE_ERROR: "Parse Error. Data is not valid JSON.",
CODE_INVALID_REQUEST: "JSON data is not a valid Request",
CODE_METHOD_NOT_FOUND: "Method Not Found",
CODE_INVALID_PARAMS: "Invalid Params",
CODE_INTERNAL_ERROR: "Internal Error",
CODE_AUTHENTICATION_ERROR: "Authentication Failed",
}
HTTP_CODES = {
CODE_INVALID_REQUEST: 400,
CODE_PARSE_ERROR: 400,
CODE_INVALID_PARAMS: 400,
CODE_METHOD_NOT_FOUND: 404,
CODE_INTERNAL_ERROR: 500,
CODE_APPLICATION_ERROR: 500,
CODE_AUTHENTICATION_ERROR: 401,
}
def __init__(self, message, code=CODE_APPLICATION_ERROR, traceback=None, data=None):
assert isinstance(code, int), "'code' must be an int"
assert (data is None or isinstance(data, dict)), "'data' must be None or a dict"
self.code = code
if message is None:
message = self.MESSAGES[code] if code in self.MESSAGES else "API Error"
self.message = message
self.data = {} if data is None else data
self.traceback = []
if traceback is not None:
trace_lines = traceback.split("\n")
for i, t in enumerate(trace_lines):
if "--- <exception caught here> ---" in t:
if len(trace_lines) > i + 1:
self.traceback = [j for j in trace_lines[i+1:] if j]
break
def to_dict(self):
return {
'code': self.code,
'message': self.message,
'data': self.traceback
}
@classmethod
def create_from_exception(cls, message, code=CODE_APPLICATION_ERROR, traceback=None):
return cls(message, code=code, traceback=traceback)
class UnknownAPIMethodError(Exception):
pass
def jsonrpc_dumps_pretty(obj, **kwargs):
if isinstance(obj, JSONRPCError):
data = {"jsonrpc": "2.0", "error": obj.to_dict()}
else:
data = {"jsonrpc": "2.0", "result": obj}
return json.dumps(data, cls=JSONResponseEncoder, sort_keys=True, indent=2, **kwargs) + "\n"
def trap(err, *to_trap):
err.trap(*to_trap)
class JSONRPCServerType(type):
def __new__(mcs, name, bases, newattrs):
klass = type.__new__(mcs, name, bases, newattrs)
klass.callable_methods = {}
klass.deprecated_methods = {}
for methodname in dir(klass):
if methodname.startswith("jsonrpc_"):
method = getattr(klass, methodname)
if not hasattr(method, '_deprecated'):
klass.callable_methods.update({methodname.split("jsonrpc_")[1]: method})
else:
klass.deprecated_methods.update({methodname.split("jsonrpc_")[1]: method})
return klass
class Daemon(metaclass=JSONRPCServerType):
"""
LBRYnet daemon, a jsonrpc interface to lbry functions
"""
@ -214,6 +360,8 @@ class Daemon(AuthJSONRPCServer):
UPNP_COMPONENT: "upnp"
}
allowed_during_startup = []
def __init__(self, analytics_manager=None, component_manager=None):
to_skip = conf.settings['components_to_skip']
if 'reflector' not in to_skip and not conf.settings['run_reflector_server']:
@ -222,9 +370,22 @@ class Daemon(AuthJSONRPCServer):
Checker.INTERNET_CONNECTION[0]: (LoopingCall(CheckInternetConnection(self)),
Checker.INTERNET_CONNECTION[1])
}
AuthJSONRPCServer.__init__(self, analytics_manager=analytics_manager, component_manager=component_manager,
use_authentication=conf.settings['use_auth_http'],
use_https=conf.settings['use_https'], to_skip=to_skip, looping_calls=looping_calls)
use_authentication = conf.settings['use_auth_http']
use_https = conf.settings['use_https']
self.analytics_manager = analytics_manager or analytics.Manager.new_instance()
self.component_manager = component_manager or ComponentManager(
analytics_manager=self.analytics_manager,
skip_components=to_skip or [],
reactor=reactor
)
self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).items()})
self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).items()}
self._use_authentication = use_authentication or conf.settings['use_auth_http']
self._use_https = use_https or conf.settings['use_https']
self.listening_port = None
self._component_setup_deferred = None
self.announced_startup = False
self.sessions = {}
self.is_first_run = is_first_run()
# TODO: move this to a component
@ -246,6 +407,169 @@ class Daemon(AuthJSONRPCServer):
# TODO: delete this
self.streams = {}
self.app = web.Application()
self.app.router.add_get('/lbryapi', self.handle_old_jsonrpc)
self.handler = self.app.make_handler()
self.server = None
async def start_listening(self):
try:
self.server = await asyncio.get_event_loop().create_server(
self.handler, conf.settings['api_host'], conf.settings['api_port']
)
log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname())
await self.setup()
self.analytics_manager.send_server_startup_success()
except OSError:
log.error('lbrynet API failed to bind TCP %s:%i for listening. Daemon is already running or this port is '
'already in use by another application.', conf.settings['api_host'], conf.settings['api_port'])
reactor.fireSystemEvent("shutdown")
except defer.CancelledError:
log.info("shutting down before finished starting")
reactor.fireSystemEvent("shutdown")
except Exception as err:
self.analytics_manager.send_server_startup_error(str(err))
log.exception('Failed to start lbrynet-daemon')
reactor.fireSystemEvent("shutdown")
async def setup(self):
log.info("Starting lbrynet-daemon")
log.info("Platform: %s", json.dumps(system_info.get_platform()))
reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
if not self.analytics_manager.is_started:
self.analytics_manager.start()
self.analytics_manager.send_server_startup()
for lc_name, lc_time in self._looping_call_times.items():
self.looping_call_manager.start(lc_name, lc_time)
def update_attribute(setup_result, component):
setattr(self, self.component_attributes[component.component_name], component.component)
kwargs = {component: update_attribute for component in self.component_attributes.keys()}
self._component_setup_deferred = self.component_manager.setup(**kwargs)
await self._component_setup_deferred.asFuture(asyncio.get_event_loop())
log.info("Started lbrynet-daemon")
@staticmethod
def _already_shutting_down(sig_num, frame):
log.info("Already shutting down")
async def shutdown(self):
self._stop_streams()
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
signal.signal(signal.SIGTERM, self._already_shutting_down)
if self.listening_port:
self.listening_port.stopListening()
self.looping_call_manager.shutdown()
if self.server is not None:
self.server.close()
await self.server.wait_closed()
await self.app.shutdown()
await self.handler.shutdown(60.0)
await self.app.cleanup()
if self.analytics_manager:
self.analytics_manager.shutdown()
try:
self._component_setup_deferred.cancel()
except (AttributeError, defer.CancelledError):
pass
if self.component_manager is not None:
d = self.component_manager.stop()
d.addErrback(log.fail(), 'Failure while shutting down')
else:
d = defer.succeed(None)
return d
async def handle_old_jsonrpc(self, request):
data = await request.json()
args = data.get('params', {})
try:
function_name = data['method']
except KeyError:
raise web.HTTPBadRequest(text="Missing 'method' value in request.")
try:
fn = self._get_jsonrpc_method(function_name)
except UnknownAPIMethodError:
raise web.HTTPBadRequest(text=f"Invalid method requested: {function_name}.")
if args in (EMPTY_PARAMS, []):
_args, _kwargs = (), {}
elif isinstance(args, dict):
_args, _kwargs = (), args
elif len(args) == 1 and isinstance(args[0], dict):
# TODO: this is for backwards compatibility. Remove this once API and UI are updated
# TODO: also delete EMPTY_PARAMS then
_args, _kwargs = (), args[0]
elif len(args) == 2 and isinstance(args[0], list) and isinstance(args[1], dict):
_args, _kwargs = args
else:
raise web.HTTPBadRequest(text="invalid args format")
params_error, erroneous_params = self._check_params(fn, _args, _kwargs)
if params_error is not None:
params_error_message = '{} for {} command: {}'.format(
params_error, function_name, ', '.join(erroneous_params)
)
log.warning(params_error_message)
raise web.HTTPBadRequest(text=params_error_message)
result = await fn(self, *_args, **_kwargs)
return web.Response(
text=jsonrpc_dumps_pretty(result, ledger=self.ledger),
content_type='application/json'
)
def _verify_method_is_callable(self, function_path):
if function_path not in self.callable_methods:
raise UnknownAPIMethodError(function_path)
def _get_jsonrpc_method(self, function_path):
if function_path in self.deprecated_methods:
new_command = self.deprecated_methods[function_path].new_command
log.warning('API function \"%s\" is deprecated, please update to use \"%s\"',
function_path, new_command)
function_path = new_command
self._verify_method_is_callable(function_path)
return self.callable_methods.get(function_path)
@staticmethod
def _check_params(function, args_tup, args_dict):
argspec = inspect.getfullargspec(undecorated(function))
num_optional_params = 0 if argspec.defaults is None else len(argspec.defaults)
duplicate_params = [
duplicate_param
for duplicate_param in argspec.args[1:len(args_tup) + 1]
if duplicate_param in args_dict
]
if duplicate_params:
return 'Duplicate parameters', duplicate_params
missing_required_params = [
required_param
for required_param in argspec.args[len(args_tup)+1:-num_optional_params]
if required_param not in args_dict
]
if len(missing_required_params):
return 'Missing required parameters', missing_required_params
extraneous_params = [] if argspec.varkw is not None else [
extra_param
for extra_param in args_dict
if extra_param not in argspec.args[1:]
]
if len(extraneous_params):
return 'Extraneous parameters', extraneous_params
return None, None
@property
def default_wallet(self):
try:
@ -267,22 +591,11 @@ class Daemon(AuthJSONRPCServer):
except AttributeError:
return None
@defer.inlineCallbacks
def setup(self):
log.info("Starting lbrynet-daemon")
log.info("Platform: %s", json.dumps(system_info.get_platform()))
yield super().setup()
log.info("Started lbrynet-daemon")
def _stop_streams(self):
"""stop pending GetStream downloads"""
for sd_hash, stream in self.streams.items():
stream.cancel(reason="daemon shutdown")
def _shutdown(self):
self._stop_streams()
return super()._shutdown()
def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
"""
Download a blob
@ -680,7 +993,7 @@ class Daemon(AuthJSONRPCServer):
# #
############################################################################
@AuthJSONRPCServer.deprecated("stop")
@deprecated("stop")
def jsonrpc_daemon_stop(self):
pass
@ -997,39 +1310,39 @@ class Daemon(AuthJSONRPCServer):
"""
return self._render_response(sorted([command for command in self.callable_methods.keys()]))
@AuthJSONRPCServer.deprecated("account_balance")
@deprecated("account_balance")
def jsonrpc_wallet_balance(self, address=None):
pass
@AuthJSONRPCServer.deprecated("account_unlock")
@deprecated("account_unlock")
def jsonrpc_wallet_unlock(self, password):
pass
@AuthJSONRPCServer.deprecated("account_decrypt")
@deprecated("account_decrypt")
def jsonrpc_wallet_decrypt(self):
pass
@AuthJSONRPCServer.deprecated("account_encrypt")
@deprecated("account_encrypt")
def jsonrpc_wallet_encrypt(self, new_password):
pass
@AuthJSONRPCServer.deprecated("address_is_mine")
@deprecated("address_is_mine")
def jsonrpc_wallet_is_address_mine(self, address):
pass
@AuthJSONRPCServer.deprecated()
@deprecated()
def jsonrpc_wallet_public_key(self, address):
pass
@AuthJSONRPCServer.deprecated("address_list")
@deprecated("address_list")
def jsonrpc_wallet_list(self):
pass
@AuthJSONRPCServer.deprecated("address_unused")
@deprecated("address_unused")
def jsonrpc_wallet_new_address(self):
pass
@AuthJSONRPCServer.deprecated("address_unused")
@deprecated("address_unused")
def jsonrpc_wallet_unused_address(self):
pass
@ -2441,7 +2754,7 @@ class Daemon(AuthJSONRPCServer):
self.analytics_manager.send_claim_action('new_support')
return result
@AuthJSONRPCServer.deprecated()
@deprecated()
def jsonrpc_claim_renew(self, outpoint=None, height=None):
pass

View file

@ -8,10 +8,137 @@ from aiohttp import client_exceptions
from lbrynet import utils, conf, log_support
from lbrynet.extras.daemon import analytics
from lbrynet.extras.daemon.auth.client import LBRYAPIClient
from lbrynet.extras.daemon.Daemon import Daemon
import json
import aiohttp
import logging
from urllib.parse import urlparse
from lbrynet import conf
log = logging.getLogger(__name__)
USER_AGENT = "AuthServiceProxy/0.1"
TWISTED_SECURE_SESSION = "TWISTED_SECURE_SESSION"
TWISTED_SESSION = "TWISTED_SESSION"
LBRY_SECRET = "LBRY_SECRET"
HTTP_TIMEOUT = 30
class JSONRPCException(Exception):
def __init__(self, rpc_error):
super().__init__()
self.error = rpc_error
class UnAuthAPIClient:
def __init__(self, host, port, session):
self.host = host
self.port = port
self.session = session
def __getattr__(self, method):
async def f(*args, **kwargs):
return await self.call(method, [args, kwargs])
return f
@classmethod
async def from_url(cls, url):
url_fragment = urlparse(url)
host = url_fragment.hostname
port = url_fragment.port
connector = aiohttp.TCPConnector()
session = aiohttp.ClientSession(connector=connector)
return cls(host, port, session)
async def call(self, method, params=None):
message = {'method': method, 'params': params}
async with self.session.get(conf.settings.get_api_connection_string(), json=message) as resp:
return await resp.json()
class AuthAPIClient:
def __init__(self, key, session, cookies, url, login_url):
self.session = session
self.__api_key = key
self.__login_url = login_url
self.__id_count = 0
self.__url = url
self.__cookies = cookies
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
raise AttributeError(name)
def f(*args, **kwargs):
return self.call(name, [args, kwargs])
return f
async def call(self, method, params=None):
params = params or {}
self.__id_count += 1
pre_auth_post_data = {
'version': '2',
'method': method,
'params': params,
'id': self.__id_count
}
to_auth = json.dumps(pre_auth_post_data, sort_keys=True)
auth_msg = self.__api_key.get_hmac(to_auth).decode()
pre_auth_post_data.update({'hmac': auth_msg})
post_data = json.dumps(pre_auth_post_data)
headers = {
'Host': self.__url.hostname,
'User-Agent': USER_AGENT,
'Content-type': 'application/json'
}
async with self.session.post(self.__login_url, data=post_data, headers=headers) as resp:
if resp is None:
raise JSONRPCException({'code': -342, 'message': 'missing HTTP response from server'})
resp.raise_for_status()
next_secret = resp.headers.get(LBRY_SECRET, False)
if next_secret:
self.__api_key.secret = next_secret
return await resp.json()
@classmethod
async def get_client(cls, key_name=None):
api_key_name = key_name or "api"
keyring = Keyring.load_from_disk()
api_key = keyring.api_key
login_url = conf.settings.get_api_connection_string(api_key_name, api_key.secret)
url = urlparse(login_url)
headers = {
'Host': url.hostname,
'User-Agent': USER_AGENT,
'Content-type': 'application/json'
}
connector = aiohttp.TCPConnector(ssl=None if not conf.settings['use_https'] else keyring.ssl_context)
session = aiohttp.ClientSession(connector=connector)
async with session.post(login_url, headers=headers) as r:
cookies = r.cookies
uid = cookies.get(TWISTED_SECURE_SESSION if conf.settings['use_https'] else TWISTED_SESSION).value
api_key = APIKey.create(seed=uid.encode())
return cls(api_key, session, cookies, url, login_url)
class LBRYAPIClient:
@staticmethod
def get_client(conf_path=None):
conf.conf_file = conf_path
if not conf.settings:
conf.initialize_settings()
return AuthAPIClient.get_client() if conf.settings['use_auth_http'] else \
UnAuthAPIClient.from_url(conf.settings.get_api_connection_string())
if sys.platform.startswith('darwin') or sys.platform.startswith('linux'):

View file

@ -2,6 +2,7 @@ import contextlib
from twisted.trial import unittest
from io import StringIO
from twisted.internet import defer
from torba.testcase import AsyncioTestCase
from lbrynet import conf
from lbrynet.extras import cli
@ -28,11 +29,10 @@ class FakeAnalytics:
pass
class CLIIntegrationTest(unittest.TestCase):
class CLIIntegrationTest(AsyncioTestCase):
USE_AUTH = False
@defer.inlineCallbacks
def setUp(self):
async def asyncSetUp(self):
skip = [
DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT,
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT,
@ -46,10 +46,10 @@ class CLIIntegrationTest(unittest.TestCase):
conf.settings.initialize_post_conf_load()
Daemon.component_attributes = {}
self.daemon = Daemon(analytics_manager=FakeAnalytics())
yield self.daemon.start_listening()
await self.daemon.start_listening()
def tearDown(self):
return self.daemon._shutdown()
async def asyncTearDown(self):
await self.daemon.shutdown()
class AuthenticatedCLITest(CLIIntegrationTest):

View file

@ -21,7 +21,7 @@ lbrynet.schema.BLOCKCHAIN_NAME = 'lbrycrd_regtest'
from lbrynet import conf as lbry_conf
from lbrynet.dht.node import Node
from lbrynet.extras.daemon.Daemon import Daemon
from lbrynet.extras.daemon.Daemon import Daemon, jsonrpc_dumps_pretty
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.daemon.Components import WalletComponent, DHTComponent, HashAnnouncerComponent, \
ExchangeRateManagerComponent
@ -29,7 +29,6 @@ from lbrynet.extras.daemon.Components import REFLECTOR_COMPONENT, PEER_PROTOCOL_
from lbrynet.extras.daemon.Components import UPnPComponent
from lbrynet.extras.daemon.Components import d2f
from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.auth.server import jsonrpc_dumps_pretty
class FakeUPnP(UPnPComponent):