From b032e99dd1db49a97553d997dbb79c1578177618 Mon Sep 17 00:00:00 2001 From: Oleg Silkin Date: Tue, 8 Jan 2019 18:02:49 -0600 Subject: [PATCH] Removes the twisted dependency from the analytics file and replaces it with `asyncio` and `aiohttp`. Also removes the calling loop from the analytics in favor of a `dict` that stores the names of the methods and their coroutines that are wrapped inside a looping task. The tasks are canceled when the analytics manager is asked to shutdown Signed-off-by: Oleg Silkin --- lbrynet/extras/daemon/Components.py | 2 +- lbrynet/extras/daemon/Daemon.py | 34 +++--- lbrynet/extras/daemon/DaemonConsole.py | 3 +- lbrynet/extras/daemon/analytics.py | 153 +++++++++++-------------- lbrynet/extras/daemon/storage.py | 16 ++- 5 files changed, 98 insertions(+), 110 deletions(-) diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index afc7f91b1..c279468b6 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -696,7 +696,7 @@ class UPnPComponent(Component): log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string) else: log.error("failed to setup upnp") - self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status()) + await self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status()) self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False)) async def stop(self): diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index bb6687f53..495de9efb 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -413,14 +413,14 @@ class Daemon(metaclass=JSONRPCServerType): ) log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname()[:2]) await self.setup() - self.analytics_manager.send_server_startup_success() + await self.analytics_manager.send_server_startup_success() except OSError: log.error('lbrynet API failed to bind TCP %s:%i for listening. Daemon is already running or this port is ' 'already in use by another application.', conf.settings['api_host'], conf.settings['api_port']) except defer.CancelledError: log.info("shutting down before finished starting") except Exception as err: - self.analytics_manager.send_server_startup_error(str(err)) + await self.analytics_manager.send_server_startup_error(str(err)) log.exception('Failed to start lbrynet-daemon') async def setup(self): @@ -429,7 +429,7 @@ class Daemon(metaclass=JSONRPCServerType): if not self.analytics_manager.is_started: self.analytics_manager.start() - self.analytics_manager.send_server_startup() + await self.analytics_manager.send_server_startup() for lc_name, lc_time in self._looping_call_times.items(): self.looping_call_manager.start(lc_name, lc_time) @@ -460,7 +460,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.handler.shutdown(60.0) await self.app.cleanup() if self.analytics_manager: - self.analytics_manager.shutdown() + await self.analytics_manager.shutdown() try: self._component_setup_task.cancel() except (AttributeError, asyncio.CancelledError): @@ -660,22 +660,22 @@ class Daemon(metaclass=JSONRPCServerType): async def _download_finished(download_id, name, claim_dict): report = await self._get_stream_analytics_report(claim_dict) - self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) - self.analytics_manager.send_new_download_success(download_id, name, claim_dict) + await self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + await self.analytics_manager.send_new_download_success(download_id, name, claim_dict) async def _download_failed(error, download_id, name, claim_dict): report = await self._get_stream_analytics_report(claim_dict) - self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, + await self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, report) - self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error) + await self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error) if sd_hash in self.streams: downloader = self.streams[sd_hash] return await d2f(downloader.finished_deferred) else: download_id = utils.random_string() - self.analytics_manager.send_download_started(download_id, name, claim_dict) - self.analytics_manager.send_new_download_start(download_id, name, claim_dict) + await self.analytics_manager.send_download_started(download_id, name, claim_dict) + await self.analytics_manager.send_new_download_start(download_id, name, claim_dict) self.streams[sd_hash] = GetStream( self.file_manager.sd_identifier, self.wallet_manager, self.exchange_rate_manager, self.blob_manager, self.component_manager.peer_finder, self.rate_limiter, self.payment_rate_manager, self.storage, @@ -721,7 +721,7 @@ class Daemon(metaclass=JSONRPCServerType): d = reupload.reflect_file(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) - self.analytics_manager.send_claim_action('publish') + await self.analytics_manager.send_claim_action('publish') nout = 0 txo = tx.outputs[nout] log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout) @@ -1383,7 +1383,7 @@ class Daemon(metaclass=JSONRPCServerType): raise InsufficientFundsError() account = self.get_account_or_default(account_id) result = await self.wallet_manager.send_points_to_address(reserved_points, amount, account) - self.analytics_manager.send_credits_sent() + await self.analytics_manager.send_credits_sent() else: log.info("This command is deprecated for sending tips, please use the newer claim_tip command") result = await self.jsonrpc_claim_tip(claim_id=claim_id, amount=amount, account_id=account_id) @@ -1794,7 +1794,7 @@ class Daemon(metaclass=JSONRPCServerType): account = self.get_account_or_default(account_id) result = await account.send_to_addresses(amount, addresses, broadcast) - self.analytics_manager.send_credits_sent() + await self.analytics_manager.send_credits_sent() return result @requires(WALLET_COMPONENT) @@ -2325,7 +2325,7 @@ class Daemon(metaclass=JSONRPCServerType): channel_name, amount, self.get_account_or_default(account_id) ) self.default_wallet.save() - self.analytics_manager.send_new_channel() + await self.analytics_manager.send_new_channel() nout = 0 txo = tx.outputs[nout] log.info("Claimed a new channel! lbry://%s txid: %s nout: %d", channel_name, tx.id, nout) @@ -2645,7 +2645,7 @@ class Daemon(metaclass=JSONRPCServerType): raise Exception('Must specify nout') tx = await self.wallet_manager.abandon_claim(claim_id, txid, nout, account) - self.analytics_manager.send_claim_action('abandon') + await self.analytics_manager.send_claim_action('abandon') if blocking: await self.ledger.wait(tx) return {"success": True, "tx": tx} @@ -2680,7 +2680,7 @@ class Daemon(metaclass=JSONRPCServerType): account = self.get_account_or_default(account_id) amount = self.get_dewies_or_error("amount", amount) result = await self.wallet_manager.support_claim(name, claim_id, amount, account) - self.analytics_manager.send_claim_action('new_support') + await self.analytics_manager.send_claim_action('new_support') return result @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @@ -2713,7 +2713,7 @@ class Daemon(metaclass=JSONRPCServerType): amount = self.get_dewies_or_error("amount", amount) validate_claim_id(claim_id) result = await self.wallet_manager.tip_claim(amount, claim_id, account) - self.analytics_manager.send_claim_action('new_support') + await self.analytics_manager.send_claim_action('new_support') return result @deprecated() diff --git a/lbrynet/extras/daemon/DaemonConsole.py b/lbrynet/extras/daemon/DaemonConsole.py index 4735667b0..3385d9610 100644 --- a/lbrynet/extras/daemon/DaemonConsole.py +++ b/lbrynet/extras/daemon/DaemonConsole.py @@ -295,7 +295,8 @@ def start_server_and_listen(use_auth, analytics_manager, quiet): logging.getLogger("lbryum").setLevel(logging.CRITICAL) logging.getLogger("requests").setLevel(logging.CRITICAL) - analytics_manager.send_server_startup() + # TODO: turn this all into async. Until then this routine can't be called + # analytics_manager.send_server_startup() yield Daemon().start_listening() diff --git a/lbrynet/extras/daemon/analytics.py b/lbrynet/extras/daemon/analytics.py index 0208d6757..92adeaa7d 100644 --- a/lbrynet/extras/daemon/analytics.py +++ b/lbrynet/extras/daemon/analytics.py @@ -1,11 +1,12 @@ import collections import logging -import treq -from twisted.internet import defer, task +import asyncio +import aiohttp from lbrynet import conf, utils -from lbrynet.extras import looping_call_manager, system_info +from lbrynet.extras import system_info +from lbrynet.extras.daemon.storage import looping_call # Things We Track SERVER_STARTUP = 'Server Startup' @@ -30,7 +31,7 @@ class Manager: def __init__(self, analytics_api, context=None, installation_id=None, session_id=None): self.analytics_api = analytics_api self._tracked_data = collections.defaultdict(list) - self.looping_call_manager = self._setup_looping_calls() + self.looping_tasks = {} self.context = context or self._make_context( system_info.get_platform(), conf.settings['wallet']) self.installation_id = installation_id or conf.settings.installation_id @@ -43,20 +44,20 @@ class Manager: return cls(api) # Things We Track - def send_new_download_start(self, download_id, name, claim_dict): - self._send_new_download_stats("start", download_id, name, claim_dict) + async def send_new_download_start(self, download_id, name, claim_dict): + await self._send_new_download_stats("start", download_id, name, claim_dict) - def send_new_download_success(self, download_id, name, claim_dict): - self._send_new_download_stats("success", download_id, name, claim_dict) + async def send_new_download_success(self, download_id, name, claim_dict): + await self._send_new_download_stats("success", download_id, name, claim_dict) - def send_new_download_fail(self, download_id, name, claim_dict, e): - self._send_new_download_stats("failure", download_id, name, claim_dict, { + async def send_new_download_fail(self, download_id, name, claim_dict, e): + await self._send_new_download_stats("failure", download_id, name, claim_dict, { 'name': type(e).__name__ if hasattr(type(e), "__name__") else str(type(e)), 'message': str(e), }) - def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None): - self.analytics_api.track({ + async def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None): + await self.analytics_api.track({ 'userId': 'lbry', # required, see https://segment.com/docs/sources/server/http/#track 'event': NEW_DOWNLOAD_STAT, 'properties': self._event_properties({ @@ -70,91 +71,81 @@ class Manager: 'timestamp': utils.isonow(), }) - def send_upnp_setup_success_fail(self, success, status): - self.analytics_api.track( + async def send_upnp_setup_success_fail(self, success, status): + await self.analytics_api.track( self._event(UPNP_SETUP, { 'success': success, 'status': status, }) ) - def send_server_startup(self): - self.analytics_api.track(self._event(SERVER_STARTUP)) + async def send_server_startup(self): + await self.analytics_api.track(self._event(SERVER_STARTUP)) - def send_server_startup_success(self): - self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS)) + async def send_server_startup_success(self): + await self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS)) - def send_server_startup_error(self, message): - self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message})) + async def send_server_startup_error(self, message): + await self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message})) - def send_download_started(self, id_, name, claim_dict=None): - self.analytics_api.track( + async def send_download_started(self, id_, name, claim_dict=None): + await self.analytics_api.track( self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) ) - def send_download_errored(self, err, id_, name, claim_dict, report): + async def send_download_errored(self, err, id_, name, claim_dict, report): download_error_properties = self._download_error_properties(err, id_, name, claim_dict, report) - self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) + await self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) - def send_download_finished(self, id_, name, report, claim_dict=None): + async def send_download_finished(self, id_, name, report, claim_dict=None): download_properties = self._download_properties(id_, name, claim_dict, report) - self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties)) + await self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties)) - def send_claim_action(self, action): - self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) + async def send_claim_action(self, action): + await self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) - def send_new_channel(self): - self.analytics_api.track(self._event(NEW_CHANNEL)) + async def send_new_channel(self): + await self.analytics_api.track(self._event(NEW_CHANNEL)) - def send_credits_sent(self): - self.analytics_api.track(self._event(CREDITS_SENT)) + async def send_credits_sent(self): + await self.analytics_api.track(self._event(CREDITS_SENT)) - def _send_heartbeat(self): - self.analytics_api.track(self._event(HEARTBEAT)) + async def _send_heartbeat(self): + await self.analytics_api.track(self._event(HEARTBEAT)) - def _update_tracked_metrics(self): + async def _update_tracked_metrics(self): should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED) if should_send: - self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value)) + await self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value)) # Setup / Shutdown def start(self): if not self.is_started: - for name, _, interval in self._get_looping_calls(): - self.looping_call_manager.start(name, interval) + for name, fn, secs in self._get_looping_calls(): + self.looping_tasks[name] = asyncio.create_task(looping_call(secs, fn)) self.is_started = True + log.info("Start") def shutdown(self): - self.looping_call_manager.shutdown() + if self.is_started: + try: + for name, task in self.looping_tasks.items(): + if task: + task.cancel() + self.looping_tasks[name] = None + log.info("Stopped analytics looping calls") + self.is_started = False + except Exception as e: + log.exception('Got exception when trying to cancel tasks in analytics: ', exc_info=e) - def register_repeating_metric(self, event_name, value_generator, frequency=300): - lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator) - self.looping_call_manager.register_looping_call(event_name, lcall) - lcall.start(frequency) - - def _get_looping_calls(self): + def _get_looping_calls(self) -> list: return [ ('send_heartbeat', self._send_heartbeat, 300), ('update_tracked_metrics', self._update_tracked_metrics, 600), ] - def _setup_looping_calls(self): - call_manager = looping_call_manager.LoopingCallManager() - for name, fn, _ in self._get_looping_calls(): - call_manager.register_looping_call(name, task.LoopingCall(fn)) - return call_manager - - def _send_repeating_metric(self, event_name, value_generator): - result = value_generator() - self._if_deferred(result, self._send_repeating_metric_value, event_name) - - def _send_repeating_metric_value(self, result, event_name): - should_send, value = result - if should_send: - self.analytics_api.track(self._metric_event(event_name, value)) - def add_observation(self, metric, value): self._tracked_data[metric].append(value) @@ -239,13 +230,6 @@ class Manager: context['os']['distro'] = platform['distro'] return context - @staticmethod - def _if_deferred(maybe_deferred, callback, *args, **kwargs): - if isinstance(maybe_deferred, defer.Deferred): - maybe_deferred.addCallback(callback, *args, **kwargs) - else: - callback(maybe_deferred, *args, **kwargs) - class Api: def __init__(self, cookies, url, write_key, enabled): @@ -254,7 +238,7 @@ class Api: self._write_key = write_key self._enabled = enabled - def _post(self, endpoint, data): + async def _post(self, endpoint, data): # there is an issue with a timing condition with keep-alive # that is best explained here: https://github.com/mikem23/keepalive-race # @@ -265,29 +249,28 @@ class Api: # # by forcing the connection to close, we will disable the keep-alive. - def update_cookies(response): - self.cookies.update(response.cookies()) - return response - assert endpoint[0] == '/' - headers = {b"Connection": b"close"} - d = treq.post(self.url + endpoint, auth=(self._write_key, ''), json=data, - headers=headers, cookies=self.cookies) - d.addCallback(update_cookies) - return d + request_kwargs = { + 'method': 'POST', + 'url': self.url + endpoint, + 'headers': {'Connection': 'Close'}, + 'auth': aiohttp.BasicAuth(self._write_key, ''), + 'json': data, + 'cookies': self.cookies + } + try: + async with aiohttp.request(**request_kwargs) as response: + self.cookies.update(response.cookies) + except Exception as e: + log.exception('Encountered an exception while POSTing to %s: ', self.url + endpoint, exc_info=e) - def track(self, event): + async def track(self, event): """Send a single tracking event""" if not self._enabled: - return defer.succeed('Analytics disabled') - - def _log_error(failure, event): - log.warning('Failed to send track event. %s (%s)', failure.getTraceback(), str(event)) + return 'Analytics disabled' log.debug('Sending track event: %s', event) - d = self._post('/track', event) - d.addErrback(_log_error, event) - return d + await self._post('/track', event) @classmethod def new_instance(cls, enabled=None): diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 883b8b367..dbef7e754 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -51,12 +51,16 @@ async def open_file_for_writing(download_directory: str, suggested_file_name: st async def looping_call(interval, fun): - while True: - try: - await fun() - except Exception as e: - log.exception('Looping call experienced exception:', exc_info=e) - await asyncio.sleep(interval) + try: + while True: + try: + await fun() + except Exception as e: + log.exception('Looping call experienced exception:', exc_info=e) + await asyncio.sleep(interval) + except asyncio.CancelledError: + pass + class SQLiteStorage(SQLiteMixin):