restore old download analytics and update tests

This commit is contained in:
Jack Robison 2019-03-01 14:48:49 -05:00
parent be4a96c1b3
commit df8934d472
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 157 additions and 132 deletions

View file

@ -441,7 +441,7 @@ class StreamManagerComponent(Component):
log.info('Starting the file manager')
loop = asyncio.get_event_loop()
self.stream_manager = StreamManager(
loop, self.conf, blob_manager, wallet, storage, node,
loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager
)
await self.stream_manager.start()
log.info('Done setting up file manager')

View file

@ -250,7 +250,7 @@ class Daemon(metaclass=JSONRPCServerType):
self._node_id = None
self._installation_id = None
self.session_id = base58.b58encode(utils.generate_id()).decode()
self.analytics_manager = analytics.Manager(conf, self.installation_id, self.session_id)
self.analytics_manager = analytics.AnalyticsManager(conf, self.installation_id, self.session_id)
self.component_manager = component_manager or ComponentManager(
conf, analytics_manager=self.analytics_manager,
skip_components=conf.components_to_skip or []

View file

@ -1,9 +1,9 @@
import asyncio
import collections
import logging
import aiohttp
import typing
import binascii
from lbrynet import utils
from lbrynet.conf import Config
from lbrynet.extras import system_info
@ -22,7 +22,6 @@ HEARTBEAT = 'Heartbeat'
CLAIM_ACTION = 'Claim Action' # publish/create/update/abandon
NEW_CHANNEL = 'New Channel'
CREDITS_SENT = 'Credits Sent'
NEW_DOWNLOAD_STAT = 'Download'
UPNP_SETUP = "UPnP Setup"
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
@ -30,7 +29,46 @@ BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
log = logging.getLogger(__name__)
class Manager:
def _event_properties(installation_id: str, session_id: str,
event_properties: typing.Optional[typing.Dict]) -> typing.Dict:
properties = {
'lbry_id': installation_id,
'session_id': session_id,
}
properties.update(event_properties or {})
return properties
def _download_properties(download_id: str, name: str, sd_hash: str) -> typing.Dict:
p = {
'download_id': download_id,
'name': name,
'stream_info': sd_hash
}
return p
def _make_context(platform):
# see https://segment.com/docs/spec/common/#context
# they say they'll ignore fields outside the spec, but evidently they don't
context = {
'app': {
'version': platform['lbrynet_version'],
'build': platform['build'],
},
# TODO: expand os info to give linux/osx specific info
'os': {
'name': platform['os_system'],
'version': platform['os_release']
},
}
if 'desktop' in platform and 'distro' in platform:
context['os']['desktop'] = platform['desktop']
context['os']['distro'] = platform['distro']
return context
class AnalyticsManager:
def __init__(self, conf: Config, installation_id: str, session_id: str):
self.cookies = {}
@ -38,7 +76,7 @@ class Manager:
self._write_key = utils.deobfuscate(ANALYTICS_TOKEN)
self._enabled = conf.share_usage_data
self._tracked_data = collections.defaultdict(list)
self.context = self._make_context(system_info.get_platform(), 'torba')
self.context = _make_context(system_info.get_platform())
self.installation_id = installation_id
self.session_id = session_id
self.task: asyncio.Task = None
@ -60,21 +98,10 @@ class Manager:
if self.task is not None and not self.task.done():
self.task.cancel()
async def _post(self, endpoint, data):
# there is an issue with a timing condition with keep-alive
# that is best explained here: https://github.com/mikem23/keepalive-race
#
# If you make a request, wait just the right amount of time,
# then make another request, the requests module may opt to
# reuse the connection, but by the time the server gets it the
# timeout will have expired.
#
# by forcing the connection to close, we will disable the keep-alive.
assert endpoint[0] == '/'
async def _post(self, data: typing.Dict):
request_kwargs = {
'method': 'POST',
'url': self.url + endpoint,
'url': self.url + '/track',
'headers': {'Connection': 'Close'},
'auth': aiohttp.BasicAuth(self._write_key, ''),
'json': data,
@ -84,40 +111,13 @@ class Manager:
async with utils.aiohttp_request(**request_kwargs) as response:
self.cookies.update(response.cookies)
except Exception as e:
log.exception('Encountered an exception while POSTing to %s: ', self.url + endpoint, exc_info=e)
log.exception('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e)
async def track(self, event):
async def track(self, event: typing.Dict):
"""Send a single tracking event"""
if self._enabled:
log.debug('Sending track event: %s', event)
await self._post('/track', event)
async def send_new_download_start(self, download_id, name, claim_dict):
await self._send_new_download_stats("start", download_id, name, claim_dict)
async def send_new_download_success(self, download_id, name, claim_dict):
await self._send_new_download_stats("success", download_id, name, claim_dict)
async def send_new_download_fail(self, download_id, name, claim_dict, e):
await self._send_new_download_stats("failure", download_id, name, claim_dict, {
'name': type(e).__name__ if hasattr(type(e), "__name__") else str(type(e)),
'message': str(e),
})
async def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None):
await self.track({
'userId': 'lbry', # required, see https://segment.com/docs/sources/server/http/#track
'event': NEW_DOWNLOAD_STAT,
'properties': self._event_properties({
'download_id': download_id,
'name': name,
'sd_hash': None if not claim_dict else claim_dict.source_hash.decode(),
'action': action,
'error': e,
}),
'context': self.context,
'timestamp': utils.isonow(),
})
log.info('Sending track event: %s', event)
await self._post(event)
async def send_upnp_setup_success_fail(self, success, status):
await self.track(
@ -136,19 +136,23 @@ class Manager:
async def send_server_startup_error(self, message):
await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
async def send_download_started(self, id_, name, claim_dict=None):
async def send_download_started(self, download_id, name, sd_hash):
await self.track(
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
self._event(DOWNLOAD_STARTED, _download_properties(download_id, name, sd_hash))
)
async def send_download_errored(self, err, id_, name, claim_dict, report):
download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
report)
await self.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
async def send_download_finished(self, download_id, name, sd_hash):
await self.track(self._event(DOWNLOAD_FINISHED, _download_properties(download_id, name, sd_hash)))
async def send_download_finished(self, id_, name, report, claim_dict=None):
download_properties = self._download_properties(id_, name, claim_dict, report)
await self.track(self._event(DOWNLOAD_FINISHED, download_properties))
async def send_download_errored(self, error: Exception, name: str):
await self.track(self._event(DOWNLOAD_ERRORED, {
'download_id': binascii.hexlify(utils.generate_id()).decode(),
'name': name,
'stream_info': None,
'error': type(error).__name__,
'reason': str(error),
'report': None
}))
async def send_claim_action(self, action):
await self.track(self._event(CLAIM_ACTION, {'action': action}))
@ -162,69 +166,11 @@ class Manager:
async def _send_heartbeat(self):
await self.track(self._event(HEARTBEAT))
def _event(self, event, event_properties=None):
def _event(self, event, properties: typing.Optional[typing.Dict] = None):
return {
'userId': 'lbry',
'event': event,
'properties': self._event_properties(event_properties),
'properties': _event_properties(self.installation_id, self.session_id, properties),
'context': self.context,
'timestamp': utils.isonow()
}
def _metric_event(self, metric_name, value):
return self._event(metric_name, {'value': value})
def _event_properties(self, event_properties=None):
properties = {
'lbry_id': self.installation_id,
'session_id': self.session_id,
}
properties.update(event_properties or {})
return properties
@staticmethod
def _download_properties(id_, name, claim_dict=None, report=None):
sd_hash = None if not claim_dict else claim_dict.source_hash.decode()
p = {
'download_id': id_,
'name': name,
'stream_info': sd_hash
}
if report:
p['report'] = report
return p
@staticmethod
def _download_error_properties(error, id_, name, claim_dict, report):
def error_name(err):
if not hasattr(type(err), "__name__"):
return str(type(err))
return type(err).__name__
return {
'download_id': id_,
'name': name,
'stream_info': claim_dict.source_hash.decode(),
'error': error_name(error),
'reason': str(error),
'report': report
}
@staticmethod
def _make_context(platform, wallet):
# see https://segment.com/docs/spec/common/#context
# they say they'll ignore fields outside the spec, but evidently they don't
context = {
'app': {
'version': platform['lbrynet_version'],
'build': platform['build'],
},
# TODO: expand os info to give linux/osx specific info
'os': {
'name': platform['os_system'],
'version': platform['os_release']
},
}
if 'desktop' in platform and 'distro' in platform:
context['os']['desktop'] = platform['desktop']
context['os']['distro'] = platform['distro']
return context

View file

@ -3,6 +3,7 @@ import asyncio
import typing
import logging
import binascii
from lbrynet.utils import generate_id
from lbrynet.extras.daemon.mime_types import guess_media_type
from lbrynet.stream.downloader import StreamDownloader
from lbrynet.stream.descriptor import StreamDescriptor
@ -35,8 +36,10 @@ class ManagedStream:
self.stream_hash = descriptor.stream_hash
self.stream_claim_info = claim
self._status = status
self.fully_reflected = asyncio.Event(loop=self.loop)
self.tx = None
self.download_id = binascii.hexlify(generate_id()).decode()
@property
def file_name(self) -> typing.Optional[str]:

View file

@ -17,6 +17,7 @@ if typing.TYPE_CHECKING:
from lbrynet.conf import Config
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.dht.node import Node
from lbrynet.extras.daemon.analytics import AnalyticsManager
from lbrynet.extras.daemon.storage import SQLiteStorage, StoredStreamClaim
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
@ -54,13 +55,15 @@ comparison_operators = {
class StreamManager:
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node']):
wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'],
analytics_manager: typing.Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.blob_manager = blob_manager
self.wallet = wallet
self.storage = storage
self.node = node
self.analytics_manager = analytics_manager
self.streams: typing.Set[ManagedStream] = set()
self.starting_streams: typing.Dict[str, asyncio.Future] = {}
self.resume_downloading_task: asyncio.Task = None
@ -277,6 +280,11 @@ class StreamManager:
if stream.downloader and stream.running:
await stream.downloader.stream_finished_event.wait()
stream.update_status(ManagedStream.STATUS_FINISHED)
if self.analytics_manager:
await self.analytics_manager.send_download_finished(
stream.download_id, stream.claim_name, stream.sd_hash
)
task = self.loop.create_task(_wait_for_stream_finished())
self.update_stream_finished_futs.append(task)
task.add_done_callback(
@ -403,9 +411,9 @@ class StreamManager:
streams.reverse()
return streams
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
async def _download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
timeout = timeout or self.config.download_timeout
parsed_uri = parse_lbry_uri(uri)
if parsed_uri.is_channel:
@ -455,6 +463,10 @@ class StreamManager:
self.node, resolved, file_name, timeout, fee_amount, fee_address, False
)
log.info("started new stream, deleting old one")
if self.analytics_manager:
await self.analytics_manager.send_download_started(
stream.download_id, parsed_uri.name, claim.source_hash.decode()
)
await self.delete_stream(existing[0])
return stream
elif existing:
@ -467,6 +479,21 @@ class StreamManager:
await self.start_stream(stream)
return stream
log.info("download stream from %s", uri)
return await self.download_stream_from_claim(
self.node, resolved, file_name, timeout, fee_amount, fee_address
stream = await self.download_stream_from_claim(
self.node, resolved, file_name, timeout, fee_amount, fee_address
)
if self.analytics_manager:
await self.analytics_manager.send_download_started(
stream.download_id, parsed_uri.name, claim.source_hash.decode()
)
return stream
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
try:
return await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout)
except Exception as e:
if self.analytics_manager:
await self.analytics_manager.send_download_errored(e, uri)
raise e

View file

@ -3,10 +3,13 @@ import binascii
from unittest import mock
import asyncio
import time
import typing
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager
from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.utils import generate_id
from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout
from lbrynet.extras.wallet.manager import LbryWalletManager
from lbrynet.extras.daemon.analytics import AnalyticsManager
from lbrynet.stream.stream_manager import StreamManager
from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.dht.node import Node
@ -102,11 +105,22 @@ class TestStreamManager(BlobExchangeTestBase):
self.sd_hash = descriptor.sd_hash
self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
self.client_storage, get_mock_node(self.server_from_client))
self.client_storage, get_mock_node(self.server_from_client),
AnalyticsManager(self.client_config,
binascii.hexlify(generate_id()).decode(),
binascii.hexlify(generate_id()).decode()))
self.exchange_rate_manager = get_dummy_exchange_rate_manager(time)
async def test_download_stop_resume_delete(self):
await self.setup_stream_manager()
received = []
expected_events = ['Download Started', 'Download Finished']
async def check_post(event):
received.append(event['event'])
self.stream_manager.analytics_manager._post = check_post
self.assertSetEqual(self.stream_manager.streams, set())
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
stream_hash = stream.stream_hash
@ -147,6 +161,20 @@ class TestStreamManager(BlobExchangeTestBase):
"select status from file where stream_hash=?", stream_hash
)
self.assertEqual(stored_status, None)
self.assertListEqual(expected_events, received)
async def _test_download_error(self, expected_error):
received = []
async def check_post(event):
self.assertEqual("Download Errored", event['event'])
received.append(event['properties']['error'])
self.stream_manager.analytics_manager._post = check_post
with self.assertRaises(expected_error):
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
self.assertListEqual([expected_error.__name__], received)
async def test_insufficient_funds(self):
fee = {
@ -156,8 +184,7 @@ class TestStreamManager(BlobExchangeTestBase):
'version': '_0_0_1'
}
await self.setup_stream_manager(10.0, fee)
with self.assertRaises(InsufficientFundsError):
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
await self._test_download_error(InsufficientFundsError)
async def test_fee_above_max_allowed(self):
fee = {
@ -167,11 +194,32 @@ class TestStreamManager(BlobExchangeTestBase):
'version': '_0_0_1'
}
await self.setup_stream_manager(1000000.0, fee)
with self.assertRaises(KeyFeeAboveMaxAllowed):
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
await self._test_download_error(KeyFeeAboveMaxAllowed)
async def test_resolve_error(self):
await self.setup_stream_manager()
self.uri = "fake"
await self._test_download_error(ResolveError)
async def test_download_timeout(self):
self.server.stop_server()
self.client_config.download_timeout = 1.0
await self.setup_stream_manager()
await self._test_download_error(DownloadSDTimeout)
async def test_download_then_recover_stream_on_startup(self, old_sort=False):
expected_analytics_events = [
'Download Started',
'Download Finished'
]
received_events = []
async def check_post(event):
received_events.append(event['event'])
await self.setup_stream_manager(old_sort=old_sort)
self.stream_manager.analytics_manager._post = check_post
self.assertSetEqual(self.stream_manager.streams, set())
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
await stream.downloader.stream_finished_event.wait()
@ -189,6 +237,7 @@ class TestStreamManager(BlobExchangeTestBase):
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
self.assertTrue(sd_blob.file_exists)
self.assertTrue(sd_blob.get_is_verified())
self.assertListEqual(expected_analytics_events, received_events)
def test_download_then_recover_old_sort_stream_on_startup(self):
return self.test_download_then_recover_stream_on_startup(old_sort=True)