lbry-sdk/lbry/extras/daemon/analytics.py

244 lines
9.4 KiB
Python
Raw Permalink Normal View History

2019-01-21 21:55:50 +01:00
import asyncio
2017-04-26 20:15:38 +02:00
import collections
import logging
import typing
import aiohttp
2019-06-21 02:55:47 +02:00
from lbry import utils
from lbry.conf import Config
from lbry.extras import system_info
2017-04-26 20:15:38 +02:00
2019-01-28 01:26:18 +01:00
ANALYTICS_ENDPOINT = 'https://api.segment.io/v1'
ANALYTICS_TOKEN = 'Ax5LZzR1o3q3Z3WjATASDwR5rKyHH0qOIRIbLmMXn2H='
2017-04-26 20:15:38 +02:00
# Things We Track
SERVER_STARTUP = 'Server Startup'
SERVER_STARTUP_SUCCESS = 'Server Startup Success'
SERVER_STARTUP_ERROR = 'Server Startup Error'
DOWNLOAD_STARTED = 'Download Started'
DOWNLOAD_ERRORED = 'Download Errored'
DOWNLOAD_FINISHED = 'Download Finished'
HEARTBEAT = 'Heartbeat'
2021-10-01 03:38:02 +02:00
DISK_SPACE = 'Disk Space'
2017-04-27 02:02:00 +02:00
CLAIM_ACTION = 'Claim Action' # publish/create/update/abandon
NEW_CHANNEL = 'New Channel'
CREDITS_SENT = 'Credits Sent'
UPNP_SETUP = "UPnP Setup"
2017-04-26 20:15:38 +02:00
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
2019-03-11 02:55:33 +01:00
TIME_TO_FIRST_BYTES = "Time To First Bytes"
2017-04-26 20:15:38 +02:00
log = logging.getLogger(__name__)
def _event_properties(installation_id: str, session_id: str,
event_properties: typing.Optional[typing.Dict]) -> typing.Dict:
properties = {
'lbry_id': installation_id,
'session_id': session_id,
}
properties.update(event_properties or {})
return properties
2019-03-11 02:55:33 +01:00
def _download_properties(conf: Config, external_ip: str, resolve_duration: float,
total_duration: typing.Optional[float], download_id: str, name: str,
outpoint: str, active_peer_count: typing.Optional[int],
tried_peers_count: typing.Optional[int], connection_failures_count: typing.Optional[int],
added_fixed_peers: bool, fixed_peer_delay: float, sd_hash: str,
sd_download_duration: typing.Optional[float] = None,
2019-03-11 02:55:33 +01:00
head_blob_hash: typing.Optional[str] = None,
head_blob_length: typing.Optional[int] = None,
head_blob_download_duration: typing.Optional[float] = None,
error: typing.Optional[str] = None, error_msg: typing.Optional[str] = None,
wallet_server: typing.Optional[str] = None) -> typing.Dict:
2019-03-11 02:55:33 +01:00
return {
"external_ip": external_ip,
"download_id": download_id,
"total_duration": round(total_duration, 4),
"resolve_duration": None if not resolve_duration else round(resolve_duration, 4),
"error": error,
"error_message": error_msg,
'name': name,
2019-03-11 02:55:33 +01:00
"outpoint": outpoint,
"node_rpc_timeout": conf.node_rpc_timeout,
"peer_connect_timeout": conf.peer_connect_timeout,
"blob_download_timeout": conf.blob_download_timeout,
"use_fixed_peers": len(conf.fixed_peers) > 0,
"fixed_peer_delay": fixed_peer_delay,
"added_fixed_peers": added_fixed_peers,
2019-03-11 02:55:33 +01:00
"active_peer_count": active_peer_count,
"tried_peers_count": tried_peers_count,
"sd_blob_hash": sd_hash,
"sd_blob_duration": None if not sd_download_duration else round(sd_download_duration, 4),
"head_blob_hash": head_blob_hash,
"head_blob_length": head_blob_length,
"head_blob_duration": None if not head_blob_download_duration else round(head_blob_download_duration, 4),
"connection_failures_count": connection_failures_count,
"wallet_server": wallet_server
}
def _make_context(platform):
# see https://segment.com/docs/spec/common/#context
# they say they'll ignore fields outside the spec, but evidently they don't
context = {
'app': {
'version': platform['lbrynet_version'],
'build': platform['build'],
},
# TODO: expand os info to give linux/osx specific info
'os': {
'name': platform['os_system'],
'version': platform['os_release']
},
}
if 'desktop' in platform and 'distro' in platform:
context['os']['desktop'] = platform['desktop']
context['os']['distro'] = platform['distro']
return context
class AnalyticsManager:
2019-01-21 21:55:50 +01:00
def __init__(self, conf: Config, installation_id: str, session_id: str):
2019-03-11 02:55:33 +01:00
self.conf = conf
2019-01-21 21:55:50 +01:00
self.cookies = {}
self.url = ANALYTICS_ENDPOINT
self._write_key = utils.deobfuscate(ANALYTICS_TOKEN)
2017-04-26 20:15:38 +02:00
self._tracked_data = collections.defaultdict(list)
self.context = _make_context(system_info.get_platform())
2019-01-21 21:55:50 +01:00
self.installation_id = installation_id
self.session_id = session_id
self.task: typing.Optional[asyncio.Task] = None
2019-03-11 02:55:33 +01:00
self.external_ip: typing.Optional[str] = None
2019-01-21 21:55:50 +01:00
2020-02-18 22:57:52 +01:00
@property
def enabled(self):
return self.conf.share_usage_data
2019-01-22 05:28:26 +01:00
@property
def is_started(self):
return self.task is not None
2019-03-11 02:55:33 +01:00
async def start(self):
2020-02-18 22:57:52 +01:00
if self.task is None:
2019-01-21 21:55:50 +01:00
self.task = asyncio.create_task(self.run())
async def run(self):
while True:
2020-02-18 22:57:52 +01:00
if self.enabled:
self.external_ip, _ = await utils.get_external_ip(self.conf.lbryum_servers)
2020-02-18 22:57:52 +01:00
await self._send_heartbeat()
2019-01-21 21:55:50 +01:00
await asyncio.sleep(1800)
def stop(self):
if self.task is not None and not self.task.done():
self.task.cancel()
async def _post(self, data: typing.Dict):
2019-01-21 21:55:50 +01:00
request_kwargs = {
'method': 'POST',
'url': self.url + '/track',
2019-01-21 21:55:50 +01:00
'headers': {'Connection': 'Close'},
'auth': aiohttp.BasicAuth(self._write_key, ''),
'json': data,
'cookies': self.cookies
}
try:
2019-02-28 18:40:11 +01:00
async with utils.aiohttp_request(**request_kwargs) as response:
2019-01-21 21:55:50 +01:00
self.cookies.update(response.cookies)
except Exception as e:
2019-03-11 02:55:33 +01:00
log.debug('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e)
2019-01-21 21:55:50 +01:00
async def track(self, event: typing.Dict):
2019-01-21 21:55:50 +01:00
"""Send a single tracking event"""
2020-02-18 22:57:52 +01:00
if self.enabled:
2019-03-01 21:26:45 +01:00
log.debug('Sending track event: %s', event)
await self._post(event)
2017-04-26 20:15:38 +02:00
async def send_upnp_setup_success_fail(self, success, status):
2019-01-21 21:55:50 +01:00
await self.track(
self._event(UPNP_SETUP, {
'success': success,
'status': status,
})
)
async def send_disk_space_used(self, storage_used, storage_limit, is_from_network_quota):
2021-10-01 03:38:02 +02:00
await self.track(
self._event(DISK_SPACE, {
'used': storage_used,
'limit': storage_limit,
'from_network_quota': is_from_network_quota
2021-10-01 03:38:02 +02:00
})
)
async def send_server_startup(self):
2019-01-21 21:55:50 +01:00
await self.track(self._event(SERVER_STARTUP))
2017-04-26 20:15:38 +02:00
async def send_server_startup_success(self):
2019-01-21 21:55:50 +01:00
await self.track(self._event(SERVER_STARTUP_SUCCESS))
2017-04-26 20:15:38 +02:00
async def send_server_startup_error(self, message):
2019-01-21 21:55:50 +01:00
await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
2017-04-26 20:15:38 +02:00
2019-03-11 02:55:33 +01:00
async def send_time_to_first_bytes(self, resolve_duration: typing.Optional[float],
total_duration: typing.Optional[float], download_id: str,
name: str, outpoint: typing.Optional[str],
found_peers_count: typing.Optional[int],
tried_peers_count: typing.Optional[int],
connection_failures_count: typing.Optional[int],
added_fixed_peers: bool,
fixed_peers_delay: float, sd_hash: str,
2019-03-11 02:55:33 +01:00
sd_download_duration: typing.Optional[float] = None,
head_blob_hash: typing.Optional[str] = None,
head_blob_length: typing.Optional[int] = None,
head_blob_duration: typing.Optional[int] = None,
error: typing.Optional[str] = None,
error_msg: typing.Optional[str] = None,
wallet_server: typing.Optional[str] = None):
2019-03-11 02:55:33 +01:00
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
found_peers_count, tried_peers_count, connection_failures_count, added_fixed_peers, fixed_peers_delay,
sd_hash, sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error, error_msg,
wallet_server
2019-03-11 02:55:33 +01:00
)))
2017-04-26 20:15:38 +02:00
async def send_download_finished(self, download_id, name, sd_hash):
2019-03-11 02:55:33 +01:00
await self.track(
self._event(
DOWNLOAD_FINISHED, {
'download_id': download_id,
'name': name,
'stream_info': sd_hash
}
)
)
2017-04-26 20:15:38 +02:00
async def send_claim_action(self, action):
2019-01-21 21:55:50 +01:00
await self.track(self._event(CLAIM_ACTION, {'action': action}))
2017-04-27 02:02:00 +02:00
async def send_new_channel(self):
2019-01-21 21:55:50 +01:00
await self.track(self._event(NEW_CHANNEL))
2017-04-27 02:02:00 +02:00
async def send_credits_sent(self):
2019-01-21 21:55:50 +01:00
await self.track(self._event(CREDITS_SENT))
2017-04-27 02:02:00 +02:00
async def _send_heartbeat(self):
2019-01-21 21:55:50 +01:00
await self.track(self._event(HEARTBEAT))
2017-04-26 20:15:38 +02:00
def _event(self, event, properties: typing.Optional[typing.Dict] = None):
2017-04-26 20:15:38 +02:00
return {
'userId': 'lbry',
'event': event,
'properties': _event_properties(self.installation_id, self.session_id, properties),
2017-04-26 20:15:38 +02:00
'context': self.context,
'timestamp': utils.isonow()
}