lbry-sdk/lbrynet/analytics.py
Lex Berezhny 09e272a642 style fix
2018-05-29 12:45:42 -04:00

275 lines
9.6 KiB
Python

import collections
import logging
import treq
from twisted.internet import defer, task
from lbrynet import conf
from lbrynet.core import looping_call_manager, utils, system_info
# Things We Track
SERVER_STARTUP = 'Server Startup'
SERVER_STARTUP_SUCCESS = 'Server Startup Success'
SERVER_STARTUP_ERROR = 'Server Startup Error'
DOWNLOAD_STARTED = 'Download Started'
DOWNLOAD_ERRORED = 'Download Errored'
DOWNLOAD_FINISHED = 'Download Finished'
HEARTBEAT = 'Heartbeat'
CLAIM_ACTION = 'Claim Action' # publish/create/update/abandon
NEW_CHANNEL = 'New Channel'
CREDITS_SENT = 'Credits Sent'
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
log = logging.getLogger(__name__)
class Manager(object):
def __init__(self, analytics_api, context=None, installation_id=None, session_id=None):
self.analytics_api = analytics_api
self._tracked_data = collections.defaultdict(list)
self.looping_call_manager = self._setup_looping_calls()
self.context = context or self._make_context(
system_info.get_platform(), conf.settings['wallet'])
self.installation_id = installation_id or conf.settings.installation_id
self.session_id = session_id or conf.settings.get_session_id()
self.is_started = False
@classmethod
def new_instance(cls, enabled=None):
api = Api.new_instance(enabled)
return cls(api)
# Things We Track
def send_server_startup(self):
self.analytics_api.track(self._event(SERVER_STARTUP))
def send_server_startup_success(self):
self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS))
def send_server_startup_error(self, message):
self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
def send_download_started(self, id_, name, claim_dict=None):
self.analytics_api.track(
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
)
def send_download_errored(self, err, id_, name, claim_dict, report):
download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
report)
self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
def send_download_finished(self, id_, name, report, claim_dict=None):
download_properties = self._download_properties(id_, name, claim_dict, report)
self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))
def send_claim_action(self, action):
self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))
def send_new_channel(self):
self.analytics_api.track(self._event(NEW_CHANNEL))
def send_credits_sent(self):
self.analytics_api.track(self._event(CREDITS_SENT))
def _send_heartbeat(self):
self.analytics_api.track(self._event(HEARTBEAT))
def _update_tracked_metrics(self):
should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED)
if should_send:
self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value))
# Setup / Shutdown
def start(self):
if not self.is_started:
for name, _, interval in self._get_looping_calls():
self.looping_call_manager.start(name, interval)
self.is_started = True
def shutdown(self):
self.looping_call_manager.shutdown()
def register_repeating_metric(self, event_name, value_generator, frequency=300):
lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator)
self.looping_call_manager.register_looping_call(event_name, lcall)
lcall.start(frequency)
def _get_looping_calls(self):
return [
('send_heartbeat', self._send_heartbeat, 60),
('update_tracked_metrics', self._update_tracked_metrics, 300),
]
def _setup_looping_calls(self):
call_manager = looping_call_manager.LoopingCallManager()
for name, fn, _ in self._get_looping_calls():
call_manager.register_looping_call(name, task.LoopingCall(fn))
return call_manager
def _send_repeating_metric(self, event_name, value_generator):
result = value_generator()
self._if_deferred(result, self._send_repeating_metric_value, event_name)
def _send_repeating_metric_value(self, result, event_name):
should_send, value = result
if should_send:
self.analytics_api.track(self._metric_event(event_name, value))
def add_observation(self, metric, value):
self._tracked_data[metric].append(value)
def summarize_and_reset(self, metric, op=sum):
"""Apply `op` on the current values for `metric`.
This operation also resets the metric.
Returns:
a tuple (should_send, value)
"""
try:
values = self._tracked_data.pop(metric)
return True, op(values)
except KeyError:
return False, None
def _event(self, event, event_properties=None):
return {
'userId': 'lbry',
'event': event,
'properties': self._event_properties(event_properties),
'context': self.context,
'timestamp': utils.isonow()
}
def _metric_event(self, metric_name, value):
return self._event(metric_name, {'value': value})
def _event_properties(self, event_properties=None):
properties = {
'lbry_id': self.installation_id,
'session_id': self.session_id,
}
properties.update(event_properties or {})
return properties
@staticmethod
def _download_properties(id_, name, claim_dict=None, report=None):
sd_hash = None if not claim_dict else claim_dict.source_hash
p = {
'download_id': id_,
'name': name,
'stream_info': sd_hash
}
if report:
p['report'] = report
return p
@staticmethod
def _download_error_properties(error, id_, name, claim_dict, report):
def error_name(err):
if not hasattr(type(err), "__name__"):
return str(type(err))
return type(err).__name__
return {
'download_id': id_,
'name': name,
'stream_info': claim_dict.source_hash,
'error': error_name(error),
'reason': error.message,
'report': report
}
@staticmethod
def _make_context(platform, wallet):
context = {
'app': {
'name': 'lbrynet',
'version': platform['lbrynet_version'],
'python_version': platform['python_version'],
'build': platform['build'],
'wallet': {
'name': wallet,
'version': platform['lbryum_version'] if wallet == conf.LBRYUM_WALLET else None
},
},
# TODO: expand os info to give linux/osx specific info
'os': {
'name': platform['os_system'],
'version': platform['os_release']
},
'library': {
'name': 'lbrynet-analytics',
'version': '1.0.0'
},
}
if 'desktop' in platform and 'distro' in platform:
context['os']['desktop'] = platform['desktop']
context['os']['distro'] = platform['distro']
return context
@staticmethod
def _if_deferred(maybe_deferred, callback, *args, **kwargs):
if isinstance(maybe_deferred, defer.Deferred):
maybe_deferred.addCallback(callback, *args, **kwargs)
else:
callback(maybe_deferred, *args, **kwargs)
class Api(object):
def __init__(self, cookies, url, write_key, enabled):
self.cookies = cookies
self.url = url
self._write_key = write_key
self._enabled = enabled
def _post(self, endpoint, data):
# there is an issue with a timing condition with keep-alive
# that is best explained here: https://github.com/mikem23/keepalive-race
#
# If you make a request, wait just the right amount of time,
# then make another request, the requests module may opt to
# reuse the connection, but by the time the server gets it the
# timeout will have expired.
#
# by forcing the connection to close, we will disable the keep-alive.
def update_cookies(response):
self.cookies.update(response.cookies())
return response
assert endpoint[0] == '/'
headers = {b"Connection": b"close"}
d = treq.post(self.url + endpoint, auth=(self._write_key, ''), json=data,
headers=headers, cookies=self.cookies)
d.addCallback(update_cookies)
return d
def track(self, event):
"""Send a single tracking event"""
if not self._enabled:
return defer.succeed('Analytics disabled')
def _log_error(failure, event):
log.warning('Failed to send track event. %s (%s)', failure.getTraceback(), str(event))
log.debug('Sending track event: %s', event)
d = self._post('/track', event)
d.addErrback(_log_error, event)
return d
@classmethod
def new_instance(cls, enabled=None):
"""Initialize an instance using values from the configuration"""
if enabled is None:
enabled = conf.settings['share_usage_data']
return cls(
{},
conf.settings['ANALYTICS_ENDPOINT'],
utils.deobfuscate(conf.settings['ANALYTICS_TOKEN']),
enabled,
)