Removes the twisted dependency from the analytics file and replaces it

with `asyncio` and `aiohttp`. Also removes the calling loop from the
analytics in favor of a `dict` that stores the names of the methods
and their coroutines that are wrapped inside a looping task. The tasks
are canceled when the analytics manager is asked to shutdown

Signed-off-by: Oleg Silkin <o.silkin98@gmail.com>
This commit is contained in:
Oleg Silkin 2019-01-08 18:02:49 -06:00 committed by Lex Berezhny
parent fd7811f38d
commit b032e99dd1
5 changed files with 98 additions and 110 deletions

View file

@ -696,7 +696,7 @@ class UPnPComponent(Component):
log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string) log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
else: else:
log.error("failed to setup upnp") log.error("failed to setup upnp")
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status()) await self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False)) self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False))
async def stop(self): async def stop(self):

View file

@ -413,14 +413,14 @@ class Daemon(metaclass=JSONRPCServerType):
) )
log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname()[:2]) log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname()[:2])
await self.setup() await self.setup()
self.analytics_manager.send_server_startup_success() await self.analytics_manager.send_server_startup_success()
except OSError: except OSError:
log.error('lbrynet API failed to bind TCP %s:%i for listening. Daemon is already running or this port is ' log.error('lbrynet API failed to bind TCP %s:%i for listening. Daemon is already running or this port is '
'already in use by another application.', conf.settings['api_host'], conf.settings['api_port']) 'already in use by another application.', conf.settings['api_host'], conf.settings['api_port'])
except defer.CancelledError: except defer.CancelledError:
log.info("shutting down before finished starting") log.info("shutting down before finished starting")
except Exception as err: except Exception as err:
self.analytics_manager.send_server_startup_error(str(err)) await self.analytics_manager.send_server_startup_error(str(err))
log.exception('Failed to start lbrynet-daemon') log.exception('Failed to start lbrynet-daemon')
async def setup(self): async def setup(self):
@ -429,7 +429,7 @@ class Daemon(metaclass=JSONRPCServerType):
if not self.analytics_manager.is_started: if not self.analytics_manager.is_started:
self.analytics_manager.start() self.analytics_manager.start()
self.analytics_manager.send_server_startup() await self.analytics_manager.send_server_startup()
for lc_name, lc_time in self._looping_call_times.items(): for lc_name, lc_time in self._looping_call_times.items():
self.looping_call_manager.start(lc_name, lc_time) self.looping_call_manager.start(lc_name, lc_time)
@ -460,7 +460,7 @@ class Daemon(metaclass=JSONRPCServerType):
await self.handler.shutdown(60.0) await self.handler.shutdown(60.0)
await self.app.cleanup() await self.app.cleanup()
if self.analytics_manager: if self.analytics_manager:
self.analytics_manager.shutdown() await self.analytics_manager.shutdown()
try: try:
self._component_setup_task.cancel() self._component_setup_task.cancel()
except (AttributeError, asyncio.CancelledError): except (AttributeError, asyncio.CancelledError):
@ -660,22 +660,22 @@ class Daemon(metaclass=JSONRPCServerType):
async def _download_finished(download_id, name, claim_dict): async def _download_finished(download_id, name, claim_dict):
report = await self._get_stream_analytics_report(claim_dict) report = await self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) await self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
self.analytics_manager.send_new_download_success(download_id, name, claim_dict) await self.analytics_manager.send_new_download_success(download_id, name, claim_dict)
async def _download_failed(error, download_id, name, claim_dict): async def _download_failed(error, download_id, name, claim_dict):
report = await self._get_stream_analytics_report(claim_dict) report = await self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, await self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
report) report)
self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error) await self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error)
if sd_hash in self.streams: if sd_hash in self.streams:
downloader = self.streams[sd_hash] downloader = self.streams[sd_hash]
return await d2f(downloader.finished_deferred) return await d2f(downloader.finished_deferred)
else: else:
download_id = utils.random_string() download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, claim_dict) await self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.analytics_manager.send_new_download_start(download_id, name, claim_dict) await self.analytics_manager.send_new_download_start(download_id, name, claim_dict)
self.streams[sd_hash] = GetStream( self.streams[sd_hash] = GetStream(
self.file_manager.sd_identifier, self.wallet_manager, self.exchange_rate_manager, self.blob_manager, self.file_manager.sd_identifier, self.wallet_manager, self.exchange_rate_manager, self.blob_manager,
self.component_manager.peer_finder, self.rate_limiter, self.payment_rate_manager, self.storage, self.component_manager.peer_finder, self.rate_limiter, self.payment_rate_manager, self.storage,
@ -721,7 +721,7 @@ class Daemon(metaclass=JSONRPCServerType):
d = reupload.reflect_file(publisher.lbry_file) d = reupload.reflect_file(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception) log.exception)
self.analytics_manager.send_claim_action('publish') await self.analytics_manager.send_claim_action('publish')
nout = 0 nout = 0
txo = tx.outputs[nout] txo = tx.outputs[nout]
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout) log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout)
@ -1383,7 +1383,7 @@ class Daemon(metaclass=JSONRPCServerType):
raise InsufficientFundsError() raise InsufficientFundsError()
account = self.get_account_or_default(account_id) account = self.get_account_or_default(account_id)
result = await self.wallet_manager.send_points_to_address(reserved_points, amount, account) result = await self.wallet_manager.send_points_to_address(reserved_points, amount, account)
self.analytics_manager.send_credits_sent() await self.analytics_manager.send_credits_sent()
else: else:
log.info("This command is deprecated for sending tips, please use the newer claim_tip command") log.info("This command is deprecated for sending tips, please use the newer claim_tip command")
result = await self.jsonrpc_claim_tip(claim_id=claim_id, amount=amount, account_id=account_id) result = await self.jsonrpc_claim_tip(claim_id=claim_id, amount=amount, account_id=account_id)
@ -1794,7 +1794,7 @@ class Daemon(metaclass=JSONRPCServerType):
account = self.get_account_or_default(account_id) account = self.get_account_or_default(account_id)
result = await account.send_to_addresses(amount, addresses, broadcast) result = await account.send_to_addresses(amount, addresses, broadcast)
self.analytics_manager.send_credits_sent() await self.analytics_manager.send_credits_sent()
return result return result
@requires(WALLET_COMPONENT) @requires(WALLET_COMPONENT)
@ -2325,7 +2325,7 @@ class Daemon(metaclass=JSONRPCServerType):
channel_name, amount, self.get_account_or_default(account_id) channel_name, amount, self.get_account_or_default(account_id)
) )
self.default_wallet.save() self.default_wallet.save()
self.analytics_manager.send_new_channel() await self.analytics_manager.send_new_channel()
nout = 0 nout = 0
txo = tx.outputs[nout] txo = tx.outputs[nout]
log.info("Claimed a new channel! lbry://%s txid: %s nout: %d", channel_name, tx.id, nout) log.info("Claimed a new channel! lbry://%s txid: %s nout: %d", channel_name, tx.id, nout)
@ -2645,7 +2645,7 @@ class Daemon(metaclass=JSONRPCServerType):
raise Exception('Must specify nout') raise Exception('Must specify nout')
tx = await self.wallet_manager.abandon_claim(claim_id, txid, nout, account) tx = await self.wallet_manager.abandon_claim(claim_id, txid, nout, account)
self.analytics_manager.send_claim_action('abandon') await self.analytics_manager.send_claim_action('abandon')
if blocking: if blocking:
await self.ledger.wait(tx) await self.ledger.wait(tx)
return {"success": True, "tx": tx} return {"success": True, "tx": tx}
@ -2680,7 +2680,7 @@ class Daemon(metaclass=JSONRPCServerType):
account = self.get_account_or_default(account_id) account = self.get_account_or_default(account_id)
amount = self.get_dewies_or_error("amount", amount) amount = self.get_dewies_or_error("amount", amount)
result = await self.wallet_manager.support_claim(name, claim_id, amount, account) result = await self.wallet_manager.support_claim(name, claim_id, amount, account)
self.analytics_manager.send_claim_action('new_support') await self.analytics_manager.send_claim_action('new_support')
return result return result
@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@ -2713,7 +2713,7 @@ class Daemon(metaclass=JSONRPCServerType):
amount = self.get_dewies_or_error("amount", amount) amount = self.get_dewies_or_error("amount", amount)
validate_claim_id(claim_id) validate_claim_id(claim_id)
result = await self.wallet_manager.tip_claim(amount, claim_id, account) result = await self.wallet_manager.tip_claim(amount, claim_id, account)
self.analytics_manager.send_claim_action('new_support') await self.analytics_manager.send_claim_action('new_support')
return result return result
@deprecated() @deprecated()

View file

@ -295,7 +295,8 @@ def start_server_and_listen(use_auth, analytics_manager, quiet):
logging.getLogger("lbryum").setLevel(logging.CRITICAL) logging.getLogger("lbryum").setLevel(logging.CRITICAL)
logging.getLogger("requests").setLevel(logging.CRITICAL) logging.getLogger("requests").setLevel(logging.CRITICAL)
analytics_manager.send_server_startup() # TODO: turn this all into async. Until then this routine can't be called
# analytics_manager.send_server_startup()
yield Daemon().start_listening() yield Daemon().start_listening()

View file

@ -1,11 +1,12 @@
import collections import collections
import logging import logging
import treq import asyncio
from twisted.internet import defer, task import aiohttp
from lbrynet import conf, utils from lbrynet import conf, utils
from lbrynet.extras import looping_call_manager, system_info from lbrynet.extras import system_info
from lbrynet.extras.daemon.storage import looping_call
# Things We Track # Things We Track
SERVER_STARTUP = 'Server Startup' SERVER_STARTUP = 'Server Startup'
@ -30,7 +31,7 @@ class Manager:
def __init__(self, analytics_api, context=None, installation_id=None, session_id=None): def __init__(self, analytics_api, context=None, installation_id=None, session_id=None):
self.analytics_api = analytics_api self.analytics_api = analytics_api
self._tracked_data = collections.defaultdict(list) self._tracked_data = collections.defaultdict(list)
self.looping_call_manager = self._setup_looping_calls() self.looping_tasks = {}
self.context = context or self._make_context( self.context = context or self._make_context(
system_info.get_platform(), conf.settings['wallet']) system_info.get_platform(), conf.settings['wallet'])
self.installation_id = installation_id or conf.settings.installation_id self.installation_id = installation_id or conf.settings.installation_id
@ -43,20 +44,20 @@ class Manager:
return cls(api) return cls(api)
# Things We Track # Things We Track
def send_new_download_start(self, download_id, name, claim_dict): async def send_new_download_start(self, download_id, name, claim_dict):
self._send_new_download_stats("start", download_id, name, claim_dict) await self._send_new_download_stats("start", download_id, name, claim_dict)
def send_new_download_success(self, download_id, name, claim_dict): async def send_new_download_success(self, download_id, name, claim_dict):
self._send_new_download_stats("success", download_id, name, claim_dict) await self._send_new_download_stats("success", download_id, name, claim_dict)
def send_new_download_fail(self, download_id, name, claim_dict, e): async def send_new_download_fail(self, download_id, name, claim_dict, e):
self._send_new_download_stats("failure", download_id, name, claim_dict, { await self._send_new_download_stats("failure", download_id, name, claim_dict, {
'name': type(e).__name__ if hasattr(type(e), "__name__") else str(type(e)), 'name': type(e).__name__ if hasattr(type(e), "__name__") else str(type(e)),
'message': str(e), 'message': str(e),
}) })
def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None): async def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None):
self.analytics_api.track({ await self.analytics_api.track({
'userId': 'lbry', # required, see https://segment.com/docs/sources/server/http/#track 'userId': 'lbry', # required, see https://segment.com/docs/sources/server/http/#track
'event': NEW_DOWNLOAD_STAT, 'event': NEW_DOWNLOAD_STAT,
'properties': self._event_properties({ 'properties': self._event_properties({
@ -70,91 +71,81 @@ class Manager:
'timestamp': utils.isonow(), 'timestamp': utils.isonow(),
}) })
def send_upnp_setup_success_fail(self, success, status): async def send_upnp_setup_success_fail(self, success, status):
self.analytics_api.track( await self.analytics_api.track(
self._event(UPNP_SETUP, { self._event(UPNP_SETUP, {
'success': success, 'success': success,
'status': status, 'status': status,
}) })
) )
def send_server_startup(self): async def send_server_startup(self):
self.analytics_api.track(self._event(SERVER_STARTUP)) await self.analytics_api.track(self._event(SERVER_STARTUP))
def send_server_startup_success(self): async def send_server_startup_success(self):
self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS)) await self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS))
def send_server_startup_error(self, message): async def send_server_startup_error(self, message):
self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message})) await self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
def send_download_started(self, id_, name, claim_dict=None): async def send_download_started(self, id_, name, claim_dict=None):
self.analytics_api.track( await self.analytics_api.track(
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
) )
def send_download_errored(self, err, id_, name, claim_dict, report): async def send_download_errored(self, err, id_, name, claim_dict, report):
download_error_properties = self._download_error_properties(err, id_, name, claim_dict, download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
report) report)
self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) await self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
def send_download_finished(self, id_, name, report, claim_dict=None): async def send_download_finished(self, id_, name, report, claim_dict=None):
download_properties = self._download_properties(id_, name, claim_dict, report) download_properties = self._download_properties(id_, name, claim_dict, report)
self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties)) await self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))
def send_claim_action(self, action): async def send_claim_action(self, action):
self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) await self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))
def send_new_channel(self): async def send_new_channel(self):
self.analytics_api.track(self._event(NEW_CHANNEL)) await self.analytics_api.track(self._event(NEW_CHANNEL))
def send_credits_sent(self): async def send_credits_sent(self):
self.analytics_api.track(self._event(CREDITS_SENT)) await self.analytics_api.track(self._event(CREDITS_SENT))
def _send_heartbeat(self): async def _send_heartbeat(self):
self.analytics_api.track(self._event(HEARTBEAT)) await self.analytics_api.track(self._event(HEARTBEAT))
def _update_tracked_metrics(self): async def _update_tracked_metrics(self):
should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED) should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED)
if should_send: if should_send:
self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value)) await self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value))
# Setup / Shutdown # Setup / Shutdown
def start(self): def start(self):
if not self.is_started: if not self.is_started:
for name, _, interval in self._get_looping_calls(): for name, fn, secs in self._get_looping_calls():
self.looping_call_manager.start(name, interval) self.looping_tasks[name] = asyncio.create_task(looping_call(secs, fn))
self.is_started = True self.is_started = True
log.info("Start")
def shutdown(self): def shutdown(self):
self.looping_call_manager.shutdown() if self.is_started:
try:
for name, task in self.looping_tasks.items():
if task:
task.cancel()
self.looping_tasks[name] = None
log.info("Stopped analytics looping calls")
self.is_started = False
except Exception as e:
log.exception('Got exception when trying to cancel tasks in analytics: ', exc_info=e)
def register_repeating_metric(self, event_name, value_generator, frequency=300): def _get_looping_calls(self) -> list:
lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator)
self.looping_call_manager.register_looping_call(event_name, lcall)
lcall.start(frequency)
def _get_looping_calls(self):
return [ return [
('send_heartbeat', self._send_heartbeat, 300), ('send_heartbeat', self._send_heartbeat, 300),
('update_tracked_metrics', self._update_tracked_metrics, 600), ('update_tracked_metrics', self._update_tracked_metrics, 600),
] ]
def _setup_looping_calls(self):
call_manager = looping_call_manager.LoopingCallManager()
for name, fn, _ in self._get_looping_calls():
call_manager.register_looping_call(name, task.LoopingCall(fn))
return call_manager
def _send_repeating_metric(self, event_name, value_generator):
result = value_generator()
self._if_deferred(result, self._send_repeating_metric_value, event_name)
def _send_repeating_metric_value(self, result, event_name):
should_send, value = result
if should_send:
self.analytics_api.track(self._metric_event(event_name, value))
def add_observation(self, metric, value): def add_observation(self, metric, value):
self._tracked_data[metric].append(value) self._tracked_data[metric].append(value)
@ -239,13 +230,6 @@ class Manager:
context['os']['distro'] = platform['distro'] context['os']['distro'] = platform['distro']
return context return context
@staticmethod
def _if_deferred(maybe_deferred, callback, *args, **kwargs):
if isinstance(maybe_deferred, defer.Deferred):
maybe_deferred.addCallback(callback, *args, **kwargs)
else:
callback(maybe_deferred, *args, **kwargs)
class Api: class Api:
def __init__(self, cookies, url, write_key, enabled): def __init__(self, cookies, url, write_key, enabled):
@ -254,7 +238,7 @@ class Api:
self._write_key = write_key self._write_key = write_key
self._enabled = enabled self._enabled = enabled
def _post(self, endpoint, data): async def _post(self, endpoint, data):
# there is an issue with a timing condition with keep-alive # there is an issue with a timing condition with keep-alive
# that is best explained here: https://github.com/mikem23/keepalive-race # that is best explained here: https://github.com/mikem23/keepalive-race
# #
@ -265,29 +249,28 @@ class Api:
# #
# by forcing the connection to close, we will disable the keep-alive. # by forcing the connection to close, we will disable the keep-alive.
def update_cookies(response):
self.cookies.update(response.cookies())
return response
assert endpoint[0] == '/' assert endpoint[0] == '/'
headers = {b"Connection": b"close"} request_kwargs = {
d = treq.post(self.url + endpoint, auth=(self._write_key, ''), json=data, 'method': 'POST',
headers=headers, cookies=self.cookies) 'url': self.url + endpoint,
d.addCallback(update_cookies) 'headers': {'Connection': 'Close'},
return d 'auth': aiohttp.BasicAuth(self._write_key, ''),
'json': data,
'cookies': self.cookies
}
try:
async with aiohttp.request(**request_kwargs) as response:
self.cookies.update(response.cookies)
except Exception as e:
log.exception('Encountered an exception while POSTing to %s: ', self.url + endpoint, exc_info=e)
def track(self, event): async def track(self, event):
"""Send a single tracking event""" """Send a single tracking event"""
if not self._enabled: if not self._enabled:
return defer.succeed('Analytics disabled') return 'Analytics disabled'
def _log_error(failure, event):
log.warning('Failed to send track event. %s (%s)', failure.getTraceback(), str(event))
log.debug('Sending track event: %s', event) log.debug('Sending track event: %s', event)
d = self._post('/track', event) await self._post('/track', event)
d.addErrback(_log_error, event)
return d
@classmethod @classmethod
def new_instance(cls, enabled=None): def new_instance(cls, enabled=None):

View file

@ -51,12 +51,16 @@ async def open_file_for_writing(download_directory: str, suggested_file_name: st
async def looping_call(interval, fun): async def looping_call(interval, fun):
while True: try:
try: while True:
await fun() try:
except Exception as e: await fun()
log.exception('Looping call experienced exception:', exc_info=e) except Exception as e:
await asyncio.sleep(interval) log.exception('Looping call experienced exception:', exc_info=e)
await asyncio.sleep(interval)
except asyncio.CancelledError:
pass
class SQLiteStorage(SQLiteMixin): class SQLiteStorage(SQLiteMixin):