Merge pull request #1968 from lbryio/analytics

restore old download analytics
This commit is contained in:
Jack Robison 2019-03-01 16:08:18 -05:00 committed by GitHub
commit 3caa2a5039
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 156 additions and 131 deletions

View file

@ -441,7 +441,7 @@ class StreamManagerComponent(Component):
log.info('Starting the file manager') log.info('Starting the file manager')
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
self.stream_manager = StreamManager( self.stream_manager = StreamManager(
loop, self.conf, blob_manager, wallet, storage, node, loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager
) )
await self.stream_manager.start() await self.stream_manager.start()
log.info('Done setting up file manager') log.info('Done setting up file manager')

View file

@ -250,7 +250,7 @@ class Daemon(metaclass=JSONRPCServerType):
self._node_id = None self._node_id = None
self._installation_id = None self._installation_id = None
self.session_id = base58.b58encode(utils.generate_id()).decode() self.session_id = base58.b58encode(utils.generate_id()).decode()
self.analytics_manager = analytics.Manager(conf, self.installation_id, self.session_id) self.analytics_manager = analytics.AnalyticsManager(conf, self.installation_id, self.session_id)
self.component_manager = component_manager or ComponentManager( self.component_manager = component_manager or ComponentManager(
conf, analytics_manager=self.analytics_manager, conf, analytics_manager=self.analytics_manager,
skip_components=conf.components_to_skip or [] skip_components=conf.components_to_skip or []

View file

@ -1,9 +1,9 @@
import asyncio import asyncio
import collections import collections
import logging import logging
import aiohttp import aiohttp
import typing
import binascii
from lbrynet import utils from lbrynet import utils
from lbrynet.conf import Config from lbrynet.conf import Config
from lbrynet.extras import system_info from lbrynet.extras import system_info
@ -22,7 +22,6 @@ HEARTBEAT = 'Heartbeat'
CLAIM_ACTION = 'Claim Action' # publish/create/update/abandon CLAIM_ACTION = 'Claim Action' # publish/create/update/abandon
NEW_CHANNEL = 'New Channel' NEW_CHANNEL = 'New Channel'
CREDITS_SENT = 'Credits Sent' CREDITS_SENT = 'Credits Sent'
NEW_DOWNLOAD_STAT = 'Download'
UPNP_SETUP = "UPnP Setup" UPNP_SETUP = "UPnP Setup"
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded' BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
@ -30,7 +29,46 @@ BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class Manager: def _event_properties(installation_id: str, session_id: str,
event_properties: typing.Optional[typing.Dict]) -> typing.Dict:
properties = {
'lbry_id': installation_id,
'session_id': session_id,
}
properties.update(event_properties or {})
return properties
def _download_properties(download_id: str, name: str, sd_hash: str) -> typing.Dict:
p = {
'download_id': download_id,
'name': name,
'stream_info': sd_hash
}
return p
def _make_context(platform):
# see https://segment.com/docs/spec/common/#context
# they say they'll ignore fields outside the spec, but evidently they don't
context = {
'app': {
'version': platform['lbrynet_version'],
'build': platform['build'],
},
# TODO: expand os info to give linux/osx specific info
'os': {
'name': platform['os_system'],
'version': platform['os_release']
},
}
if 'desktop' in platform and 'distro' in platform:
context['os']['desktop'] = platform['desktop']
context['os']['distro'] = platform['distro']
return context
class AnalyticsManager:
def __init__(self, conf: Config, installation_id: str, session_id: str): def __init__(self, conf: Config, installation_id: str, session_id: str):
self.cookies = {} self.cookies = {}
@ -38,7 +76,7 @@ class Manager:
self._write_key = utils.deobfuscate(ANALYTICS_TOKEN) self._write_key = utils.deobfuscate(ANALYTICS_TOKEN)
self._enabled = conf.share_usage_data self._enabled = conf.share_usage_data
self._tracked_data = collections.defaultdict(list) self._tracked_data = collections.defaultdict(list)
self.context = self._make_context(system_info.get_platform(), 'torba') self.context = _make_context(system_info.get_platform())
self.installation_id = installation_id self.installation_id = installation_id
self.session_id = session_id self.session_id = session_id
self.task: asyncio.Task = None self.task: asyncio.Task = None
@ -60,21 +98,10 @@ class Manager:
if self.task is not None and not self.task.done(): if self.task is not None and not self.task.done():
self.task.cancel() self.task.cancel()
async def _post(self, endpoint, data): async def _post(self, data: typing.Dict):
# there is an issue with a timing condition with keep-alive
# that is best explained here: https://github.com/mikem23/keepalive-race
#
# If you make a request, wait just the right amount of time,
# then make another request, the requests module may opt to
# reuse the connection, but by the time the server gets it the
# timeout will have expired.
#
# by forcing the connection to close, we will disable the keep-alive.
assert endpoint[0] == '/'
request_kwargs = { request_kwargs = {
'method': 'POST', 'method': 'POST',
'url': self.url + endpoint, 'url': self.url + '/track',
'headers': {'Connection': 'Close'}, 'headers': {'Connection': 'Close'},
'auth': aiohttp.BasicAuth(self._write_key, ''), 'auth': aiohttp.BasicAuth(self._write_key, ''),
'json': data, 'json': data,
@ -84,40 +111,13 @@ class Manager:
async with utils.aiohttp_request(**request_kwargs) as response: async with utils.aiohttp_request(**request_kwargs) as response:
self.cookies.update(response.cookies) self.cookies.update(response.cookies)
except Exception as e: except Exception as e:
log.exception('Encountered an exception while POSTing to %s: ', self.url + endpoint, exc_info=e) log.exception('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e)
async def track(self, event): async def track(self, event: typing.Dict):
"""Send a single tracking event""" """Send a single tracking event"""
if self._enabled: if self._enabled:
log.debug('Sending track event: %s', event) log.debug('Sending track event: %s', event)
await self._post('/track', event) await self._post(event)
async def send_new_download_start(self, download_id, name, claim_dict):
await self._send_new_download_stats("start", download_id, name, claim_dict)
async def send_new_download_success(self, download_id, name, claim_dict):
await self._send_new_download_stats("success", download_id, name, claim_dict)
async def send_new_download_fail(self, download_id, name, claim_dict, e):
await self._send_new_download_stats("failure", download_id, name, claim_dict, {
'name': type(e).__name__ if hasattr(type(e), "__name__") else str(type(e)),
'message': str(e),
})
async def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None):
await self.track({
'userId': 'lbry', # required, see https://segment.com/docs/sources/server/http/#track
'event': NEW_DOWNLOAD_STAT,
'properties': self._event_properties({
'download_id': download_id,
'name': name,
'sd_hash': None if not claim_dict else claim_dict.source_hash.decode(),
'action': action,
'error': e,
}),
'context': self.context,
'timestamp': utils.isonow(),
})
async def send_upnp_setup_success_fail(self, success, status): async def send_upnp_setup_success_fail(self, success, status):
await self.track( await self.track(
@ -136,19 +136,23 @@ class Manager:
async def send_server_startup_error(self, message): async def send_server_startup_error(self, message):
await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message})) await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
async def send_download_started(self, id_, name, claim_dict=None): async def send_download_started(self, download_id, name, sd_hash):
await self.track( await self.track(
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) self._event(DOWNLOAD_STARTED, _download_properties(download_id, name, sd_hash))
) )
async def send_download_errored(self, err, id_, name, claim_dict, report): async def send_download_finished(self, download_id, name, sd_hash):
download_error_properties = self._download_error_properties(err, id_, name, claim_dict, await self.track(self._event(DOWNLOAD_FINISHED, _download_properties(download_id, name, sd_hash)))
report)
await self.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
async def send_download_finished(self, id_, name, report, claim_dict=None): async def send_download_errored(self, error: Exception, name: str):
download_properties = self._download_properties(id_, name, claim_dict, report) await self.track(self._event(DOWNLOAD_ERRORED, {
await self.track(self._event(DOWNLOAD_FINISHED, download_properties)) 'download_id': binascii.hexlify(utils.generate_id()).decode(),
'name': name,
'stream_info': None,
'error': type(error).__name__,
'reason': str(error),
'report': None
}))
async def send_claim_action(self, action): async def send_claim_action(self, action):
await self.track(self._event(CLAIM_ACTION, {'action': action})) await self.track(self._event(CLAIM_ACTION, {'action': action}))
@ -162,69 +166,11 @@ class Manager:
async def _send_heartbeat(self): async def _send_heartbeat(self):
await self.track(self._event(HEARTBEAT)) await self.track(self._event(HEARTBEAT))
def _event(self, event, event_properties=None): def _event(self, event, properties: typing.Optional[typing.Dict] = None):
return { return {
'userId': 'lbry', 'userId': 'lbry',
'event': event, 'event': event,
'properties': self._event_properties(event_properties), 'properties': _event_properties(self.installation_id, self.session_id, properties),
'context': self.context, 'context': self.context,
'timestamp': utils.isonow() 'timestamp': utils.isonow()
} }
def _metric_event(self, metric_name, value):
return self._event(metric_name, {'value': value})
def _event_properties(self, event_properties=None):
properties = {
'lbry_id': self.installation_id,
'session_id': self.session_id,
}
properties.update(event_properties or {})
return properties
@staticmethod
def _download_properties(id_, name, claim_dict=None, report=None):
sd_hash = None if not claim_dict else claim_dict.source_hash.decode()
p = {
'download_id': id_,
'name': name,
'stream_info': sd_hash
}
if report:
p['report'] = report
return p
@staticmethod
def _download_error_properties(error, id_, name, claim_dict, report):
def error_name(err):
if not hasattr(type(err), "__name__"):
return str(type(err))
return type(err).__name__
return {
'download_id': id_,
'name': name,
'stream_info': claim_dict.source_hash.decode(),
'error': error_name(error),
'reason': str(error),
'report': report
}
@staticmethod
def _make_context(platform, wallet):
# see https://segment.com/docs/spec/common/#context
# they say they'll ignore fields outside the spec, but evidently they don't
context = {
'app': {
'version': platform['lbrynet_version'],
'build': platform['build'],
},
# TODO: expand os info to give linux/osx specific info
'os': {
'name': platform['os_system'],
'version': platform['os_release']
},
}
if 'desktop' in platform and 'distro' in platform:
context['os']['desktop'] = platform['desktop']
context['os']['distro'] = platform['distro']
return context

View file

@ -3,6 +3,7 @@ import asyncio
import typing import typing
import logging import logging
import binascii import binascii
from lbrynet.utils import generate_id
from lbrynet.extras.daemon.mime_types import guess_media_type from lbrynet.extras.daemon.mime_types import guess_media_type
from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.downloader import StreamDownloader
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
@ -35,8 +36,10 @@ class ManagedStream:
self.stream_hash = descriptor.stream_hash self.stream_hash = descriptor.stream_hash
self.stream_claim_info = claim self.stream_claim_info = claim
self._status = status self._status = status
self.fully_reflected = asyncio.Event(loop=self.loop) self.fully_reflected = asyncio.Event(loop=self.loop)
self.tx = None self.tx = None
self.download_id = binascii.hexlify(generate_id()).decode()
@property @property
def file_name(self) -> typing.Optional[str]: def file_name(self) -> typing.Optional[str]:

View file

@ -17,6 +17,7 @@ if typing.TYPE_CHECKING:
from lbrynet.conf import Config from lbrynet.conf import Config
from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from lbrynet.extras.daemon.analytics import AnalyticsManager
from lbrynet.extras.daemon.storage import SQLiteStorage, StoredStreamClaim from lbrynet.extras.daemon.storage import SQLiteStorage, StoredStreamClaim
from lbrynet.extras.wallet import LbryWalletManager from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
@ -54,13 +55,15 @@ comparison_operators = {
class StreamManager: class StreamManager:
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager', def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node']): wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'],
analytics_manager: typing.Optional['AnalyticsManager'] = None):
self.loop = loop self.loop = loop
self.config = config self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.wallet = wallet self.wallet = wallet
self.storage = storage self.storage = storage
self.node = node self.node = node
self.analytics_manager = analytics_manager
self.streams: typing.Set[ManagedStream] = set() self.streams: typing.Set[ManagedStream] = set()
self.starting_streams: typing.Dict[str, asyncio.Future] = {} self.starting_streams: typing.Dict[str, asyncio.Future] = {}
self.resume_downloading_task: asyncio.Task = None self.resume_downloading_task: asyncio.Task = None
@ -277,6 +280,11 @@ class StreamManager:
if stream.downloader and stream.running: if stream.downloader and stream.running:
await stream.downloader.stream_finished_event.wait() await stream.downloader.stream_finished_event.wait()
stream.update_status(ManagedStream.STATUS_FINISHED) stream.update_status(ManagedStream.STATUS_FINISHED)
if self.analytics_manager:
self.loop.create_task(self.analytics_manager.send_download_finished(
stream.download_id, stream.claim_name, stream.sd_hash
))
task = self.loop.create_task(_wait_for_stream_finished()) task = self.loop.create_task(_wait_for_stream_finished())
self.update_stream_finished_futs.append(task) self.update_stream_finished_futs.append(task)
task.add_done_callback( task.add_done_callback(
@ -403,9 +411,9 @@ class StreamManager:
streams.reverse() streams.reverse()
return streams return streams
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', async def _download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None, file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]: timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
timeout = timeout or self.config.download_timeout timeout = timeout or self.config.download_timeout
parsed_uri = parse_lbry_uri(uri) parsed_uri = parse_lbry_uri(uri)
if parsed_uri.is_channel: if parsed_uri.is_channel:
@ -455,6 +463,10 @@ class StreamManager:
self.node, resolved, file_name, timeout, fee_amount, fee_address, False self.node, resolved, file_name, timeout, fee_amount, fee_address, False
) )
log.info("started new stream, deleting old one") log.info("started new stream, deleting old one")
if self.analytics_manager:
self.loop.create_task(self.analytics_manager.send_download_started(
stream.download_id, parsed_uri.name, claim.source_hash.decode()
))
await self.delete_stream(existing[0]) await self.delete_stream(existing[0])
return stream return stream
elif existing: elif existing:
@ -467,6 +479,21 @@ class StreamManager:
await self.start_stream(stream) await self.start_stream(stream)
return stream return stream
log.info("download stream from %s", uri) log.info("download stream from %s", uri)
return await self.download_stream_from_claim( stream = await self.download_stream_from_claim(
self.node, resolved, file_name, timeout, fee_amount, fee_address self.node, resolved, file_name, timeout, fee_amount, fee_address
) )
if self.analytics_manager:
self.loop.create_task(self.analytics_manager.send_download_started(
stream.download_id, parsed_uri.name, claim.source_hash.decode()
))
return stream
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
try:
return await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout)
except Exception as e:
if self.analytics_manager:
await self.analytics_manager.send_download_errored(e, uri)
raise e

View file

@ -3,10 +3,13 @@ import binascii
from unittest import mock from unittest import mock
import asyncio import asyncio
import time import time
import typing
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager
from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed from lbrynet.utils import generate_id
from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout
from lbrynet.extras.wallet.manager import LbryWalletManager from lbrynet.extras.wallet.manager import LbryWalletManager
from lbrynet.extras.daemon.analytics import AnalyticsManager
from lbrynet.stream.stream_manager import StreamManager from lbrynet.stream.stream_manager import StreamManager
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
@ -102,11 +105,22 @@ class TestStreamManager(BlobExchangeTestBase):
self.sd_hash = descriptor.sd_hash self.sd_hash = descriptor.sd_hash
self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
self.client_storage, get_mock_node(self.server_from_client)) self.client_storage, get_mock_node(self.server_from_client),
AnalyticsManager(self.client_config,
binascii.hexlify(generate_id()).decode(),
binascii.hexlify(generate_id()).decode()))
self.exchange_rate_manager = get_dummy_exchange_rate_manager(time) self.exchange_rate_manager = get_dummy_exchange_rate_manager(time)
async def test_download_stop_resume_delete(self): async def test_download_stop_resume_delete(self):
await self.setup_stream_manager() await self.setup_stream_manager()
received = []
expected_events = ['Download Started', 'Download Finished']
async def check_post(event):
received.append(event['event'])
self.stream_manager.analytics_manager._post = check_post
self.assertSetEqual(self.stream_manager.streams, set()) self.assertSetEqual(self.stream_manager.streams, set())
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
stream_hash = stream.stream_hash stream_hash = stream.stream_hash
@ -147,6 +161,20 @@ class TestStreamManager(BlobExchangeTestBase):
"select status from file where stream_hash=?", stream_hash "select status from file where stream_hash=?", stream_hash
) )
self.assertEqual(stored_status, None) self.assertEqual(stored_status, None)
self.assertListEqual(expected_events, received)
async def _test_download_error(self, expected_error):
received = []
async def check_post(event):
self.assertEqual("Download Errored", event['event'])
received.append(event['properties']['error'])
self.stream_manager.analytics_manager._post = check_post
with self.assertRaises(expected_error):
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
self.assertListEqual([expected_error.__name__], received)
async def test_insufficient_funds(self): async def test_insufficient_funds(self):
fee = { fee = {
@ -156,8 +184,7 @@ class TestStreamManager(BlobExchangeTestBase):
'version': '_0_0_1' 'version': '_0_0_1'
} }
await self.setup_stream_manager(10.0, fee) await self.setup_stream_manager(10.0, fee)
with self.assertRaises(InsufficientFundsError): await self._test_download_error(InsufficientFundsError)
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
async def test_fee_above_max_allowed(self): async def test_fee_above_max_allowed(self):
fee = { fee = {
@ -167,11 +194,32 @@ class TestStreamManager(BlobExchangeTestBase):
'version': '_0_0_1' 'version': '_0_0_1'
} }
await self.setup_stream_manager(1000000.0, fee) await self.setup_stream_manager(1000000.0, fee)
with self.assertRaises(KeyFeeAboveMaxAllowed): await self._test_download_error(KeyFeeAboveMaxAllowed)
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
async def test_resolve_error(self):
await self.setup_stream_manager()
self.uri = "fake"
await self._test_download_error(ResolveError)
async def test_download_timeout(self):
self.server.stop_server()
self.client_config.download_timeout = 1.0
await self.setup_stream_manager()
await self._test_download_error(DownloadSDTimeout)
async def test_download_then_recover_stream_on_startup(self, old_sort=False): async def test_download_then_recover_stream_on_startup(self, old_sort=False):
expected_analytics_events = [
'Download Started',
'Download Finished'
]
received_events = []
async def check_post(event):
received_events.append(event['event'])
await self.setup_stream_manager(old_sort=old_sort) await self.setup_stream_manager(old_sort=old_sort)
self.stream_manager.analytics_manager._post = check_post
self.assertSetEqual(self.stream_manager.streams, set()) self.assertSetEqual(self.stream_manager.streams, set())
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
await stream.downloader.stream_finished_event.wait() await stream.downloader.stream_finished_event.wait()
@ -189,6 +237,7 @@ class TestStreamManager(BlobExchangeTestBase):
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash) sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
self.assertTrue(sd_blob.file_exists) self.assertTrue(sd_blob.file_exists)
self.assertTrue(sd_blob.get_is_verified()) self.assertTrue(sd_blob.get_is_verified())
self.assertListEqual(expected_analytics_events, received_events)
def test_download_then_recover_old_sort_stream_on_startup(self): def test_download_then_recover_old_sort_stream_on_startup(self):
return self.test_download_then_recover_stream_on_startup(old_sort=True) return self.test_download_then_recover_stream_on_startup(old_sort=True)