From c9ae251d60d8f7aee5b0acb27d1c119eefadb7d2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:02:36 -0400 Subject: [PATCH] download analytics --- lbrynet/analytics.py | 40 ++++++++++++++++----------- lbrynet/daemon/Daemon.py | 58 ++++++++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 27 deletions(-) diff --git a/lbrynet/analytics.py b/lbrynet/analytics.py index 02885fb6c..855bcfcee 100644 --- a/lbrynet/analytics.py +++ b/lbrynet/analytics.py @@ -57,15 +57,14 @@ class Manager(object): self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) ) - def send_download_errored(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_errored(self, err, id_, name, claim_dict, report): + download_error_properties = self._download_error_properties(err, id_, name, claim_dict, + report) + self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) - def send_download_finished(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_finished(self, id_, name, report, claim_dict=None): + download_properties = self._download_properties(id_, name, claim_dict, report) + self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties)) def send_claim_action(self, action): self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) @@ -159,18 +158,27 @@ class Manager(object): return properties @staticmethod - def _download_properties(id_, name, claim_dict=None): - sd_hash = None - if claim_dict: - try: - sd_hash = claim_dict.source_hash - except (KeyError, TypeError, ValueError): - log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True) - return { + def _download_properties(id_, name, claim_dict=None, report=None): + sd_hash = None if not claim_dict else claim_dict.source_hash + p = { 'download_id': id_, 'name': name, 'stream_info': sd_hash } + if report: + p['report'] = report + return p + + @staticmethod + def _download_error_properties(error, id_, name, claim_dict, report): + return { + 'download_id': id_, + 'name': name, + 'stream_info': claim_dict.source_hash, + 'error': str(type(error)), + 'reason': error.message, + 'report': report + } @staticmethod def _make_context(platform, wallet): diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index f77f183c0..521134f87 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -46,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash -from lbrynet.core.Error import NoSuchStreamHash +from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError log = logging.getLogger(__name__) @@ -600,7 +600,40 @@ class Daemon(AuthJSONRPCServer): return download_sd_blob(self.session, blob_hash, rate_manager, timeout) @defer.inlineCallbacks - def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): + def _get_stream_analytics_report(self, claim_dict): + sd_hash = claim_dict.source_hash + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + except NoSuchSDHash: + stream_hash = None + report = { + "sd_hash": sd_hash, + "stream_hash": stream_hash, + } + blobs = {} + sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + report["sd_blob"] = sd_host + if stream_hash: + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) + report["known_blobs"] = len(blob_infos) + else: + blob_infos = [] + report["known_blobs"] = 0 + for blob_hash, blob_num, iv, length in blob_infos: + host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + if host: + blobs[blob_num] = host + report["blobs"] = blobs + defer.returnValue(report) + + @defer.inlineCallbacks + def _download_finished(self, download_id, name, claim_dict): + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + + @defer.inlineCallbacks + def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None, + delete_on_timeout=True): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file @@ -621,17 +654,20 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) - finished_deferred.addCallback( - lambda _: self.analytics_manager.send_download_finished(download_id, - name, - claim_dict)) + finished_deferred.addCallback(lambda _: self._download_finished(download_id, name, + claim_dict)) result = yield self._get_lbry_file_dict(lbry_file, full_status=True) - del self.streams[claim_id] - except Exception as err: - log.warning('Failed to get %s: %s', name, err) - self.analytics_manager.send_download_errored(download_id, name, claim_dict) - del self.streams[claim_id] + except (DownloadDataTimeout, DownloadSDTimeout) as err: + log.warning('Failed to get %s (%s)', name, err) + report = yield self._get_stream_analytics_report(claim_dict) + if isinstance(err, DownloadDataTimeout) and delete_on_timeout: + yield self.lbry_file_manager.delete_lbry_file(self.streams[claim_id].downloader) + elif self.streams[claim_id].downloader: + yield self.streams[claim_id].downloader.stop(err) + self.analytics_manager.send_download_errored(err, download_id, name, claim_dict, + report) result = {'error': err.message} + del self.streams[claim_id] defer.returnValue(result) @defer.inlineCallbacks