diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 57d5bea1e..64c9fb917 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -1,5 +1,6 @@ from lbrynet.core import looping_call_manager +from twisted.internet import defer from twisted.internet import task import constants @@ -43,13 +44,24 @@ class Manager(object): self.analytics_api.track(heartbeat) def _update_tracked_metrics(self): - value = self.track.summarize(constants.BLOB_BYTES_UPLOADED) - if value > 0: - event = self.events_generator.metric_observered(constants.BLOB_BYTES_UPLOADED, value) + should_send, value = self.track.summarize(constants.BLOB_BYTES_UPLOADED) + if should_send: + event = self.events_generator.metric_observed(constants.BLOB_BYTES_UPLOADED, value) self.analytics_api.track(event) def _send_repeating_metric(self, event_name, value_generator): - should_send, value = value_generator() + result = value_generator() + if_deferred(result, self._send_repeating_metric_value, event_name) + + def _send_repeating_metric_value(self, result, event_name): + should_send, value = result if should_send: - event = self.events_generator.metric_observered(event_name, value) + event = self.events_generator.metric_observed(event_name, value) self.analytics_api.track(event) + + +def if_deferred(maybe_deferred, callback, *args, **kwargs): + if isinstance(maybe_deferred, defer.Deferred): + maybe_deferred.addCallback(callback, *args, **kwargs) + else: + callback(mabye_deferred, *args, **kwargs) diff --git a/lbrynet/analytics/track.py b/lbrynet/analytics/track.py index c14952bf8..35dc27160 100644 --- a/lbrynet/analytics/track.py +++ b/lbrynet/analytics/track.py @@ -13,5 +13,12 @@ class Track(object): """Apply `op` on the current values for `metric`. This operation also resets the metric. + + Returns: + a tuple (should_send, value) """ - return op(self.data.pop(metric, [])) + try: + values = self.data.pop(metric) + return True, op(values) + except KeyError: + return False, None diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 87afe9c8a..8d67f9ae4 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -185,8 +185,24 @@ class CheckRemoteVersions(object): return defer.fail(None) +class AlwaysSend(object): + def __init__(self, value_generator, *args, **kwargs): + self.value_generator = value_generator + self.args = args + self.kwargs = kwargs + + def __call__(self): + d = defer.maybeDeferred(self.value_generator, *self.args, **self.kwargs) + d.addCallback(lambda v: (True, v)) + return d + + def calculate_available_blob_size(blob_manager): - return sum(blob.length for blob in blob_manager.get_all_verified_blobs()) + d = blob_manager.get_all_verified_blobs() + d.addCallback( + lambda blobs: defer.DeferredList([blob_manager.get_blob_length(b) for b in blobs])) + d.addCallback(lambda blob_lengths: sum(val for success, val in blob_lengths if success)) + return d class Daemon(jsonrpc.JSONRPC): @@ -1021,9 +1037,9 @@ class Daemon(jsonrpc.JSONRPC): self.analytics_manager = analytics.Manager( analytics_api, events_generator, analytics.Track()) self.analytics_manager.start() - self.register_repeating_metric( + self.analytics_manager.register_repeating_metric( analytics.BLOB_BYTES_AVAILABLE, - lambda: calculate_available_blob_size(self.session.blob_manager), + AlwaysSend(calculate_available_blob_size, self.session.blob_manager), frequency=300 )