download analytics

This commit is contained in:
Jack Robison 2017-09-27 17:02:36 -04:00
parent af99edc764
commit c9ae251d60
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
2 changed files with 71 additions and 27 deletions

View file

@ -57,15 +57,14 @@ class Manager(object):
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
) )
def send_download_errored(self, id_, name, claim_dict=None): def send_download_errored(self, err, id_, name, claim_dict, report):
self.analytics_api.track( download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict)) report)
) self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
def send_download_finished(self, id_, name, claim_dict=None): def send_download_finished(self, id_, name, report, claim_dict=None):
self.analytics_api.track( download_properties = self._download_properties(id_, name, claim_dict, report)
self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict)) self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))
)
def send_claim_action(self, action): def send_claim_action(self, action):
self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))
@ -159,18 +158,27 @@ class Manager(object):
return properties return properties
@staticmethod @staticmethod
def _download_properties(id_, name, claim_dict=None): def _download_properties(id_, name, claim_dict=None, report=None):
sd_hash = None sd_hash = None if not claim_dict else claim_dict.source_hash
if claim_dict: p = {
try:
sd_hash = claim_dict.source_hash
except (KeyError, TypeError, ValueError):
log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True)
return {
'download_id': id_, 'download_id': id_,
'name': name, 'name': name,
'stream_info': sd_hash 'stream_info': sd_hash
} }
if report:
p['report'] = report
return p
@staticmethod
def _download_error_properties(error, id_, name, claim_dict, report):
return {
'download_id': id_,
'name': name,
'stream_info': claim_dict.source_hash,
'error': str(type(error)),
'reason': error.message,
'report': report
}
@staticmethod @staticmethod
def _make_context(platform, wallet): def _make_context(platform, wallet):

View file

@ -46,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError from lbrynet.core.Error import NullFundsError, NegativeFundsError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -600,7 +600,40 @@ class Daemon(AuthJSONRPCServer):
return download_sd_blob(self.session, blob_hash, rate_manager, timeout) return download_sd_blob(self.session, blob_hash, rate_manager, timeout)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): def _get_stream_analytics_report(self, claim_dict):
sd_hash = claim_dict.source_hash
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
except NoSuchSDHash:
stream_hash = None
report = {
"sd_hash": sd_hash,
"stream_hash": stream_hash,
}
blobs = {}
sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash)
report["sd_blob"] = sd_host
if stream_hash:
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
report["known_blobs"] = len(blob_infos)
else:
blob_infos = []
report["known_blobs"] = 0
for blob_hash, blob_num, iv, length in blob_infos:
host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash)
if host:
blobs[blob_num] = host
report["blobs"] = blobs
defer.returnValue(report)
@defer.inlineCallbacks
def _download_finished(self, download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
@defer.inlineCallbacks
def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None,
delete_on_timeout=True):
""" """
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
@ -621,17 +654,20 @@ class Daemon(AuthJSONRPCServer):
file_name) file_name)
try: try:
lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name)
finished_deferred.addCallback( finished_deferred.addCallback(lambda _: self._download_finished(download_id, name,
lambda _: self.analytics_manager.send_download_finished(download_id, claim_dict))
name,
claim_dict))
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
del self.streams[claim_id] except (DownloadDataTimeout, DownloadSDTimeout) as err:
except Exception as err: log.warning('Failed to get %s (%s)', name, err)
log.warning('Failed to get %s: %s', name, err) report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_errored(download_id, name, claim_dict) if isinstance(err, DownloadDataTimeout) and delete_on_timeout:
del self.streams[claim_id] yield self.lbry_file_manager.delete_lbry_file(self.streams[claim_id].downloader)
elif self.streams[claim_id].downloader:
yield self.streams[claim_id].downloader.stop(err)
self.analytics_manager.send_download_errored(err, download_id, name, claim_dict,
report)
result = {'error': err.message} result = {'error': err.message}
del self.streams[claim_id]
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks