bug fixes

This commit is contained in:
Job Evers-Meltzer 2016-10-11 12:50:44 -05:00
parent 8b1bb673c1
commit 7167d47631
3 changed files with 44 additions and 9 deletions

View file

@ -1,5 +1,6 @@
from lbrynet.core import looping_call_manager
from twisted.internet import defer
from twisted.internet import task
import constants
@ -43,13 +44,24 @@ class Manager(object):
self.analytics_api.track(heartbeat)
def _update_tracked_metrics(self):
value = self.track.summarize(constants.BLOB_BYTES_UPLOADED)
if value > 0:
event = self.events_generator.metric_observered(constants.BLOB_BYTES_UPLOADED, value)
should_send, value = self.track.summarize(constants.BLOB_BYTES_UPLOADED)
if should_send:
event = self.events_generator.metric_observed(constants.BLOB_BYTES_UPLOADED, value)
self.analytics_api.track(event)
def _send_repeating_metric(self, event_name, value_generator):
should_send, value = value_generator()
result = value_generator()
if_deferred(result, self._send_repeating_metric_value, event_name)
def _send_repeating_metric_value(self, result, event_name):
should_send, value = result
if should_send:
event = self.events_generator.metric_observered(event_name, value)
event = self.events_generator.metric_observed(event_name, value)
self.analytics_api.track(event)
def if_deferred(maybe_deferred, callback, *args, **kwargs):
if isinstance(maybe_deferred, defer.Deferred):
maybe_deferred.addCallback(callback, *args, **kwargs)
else:
callback(mabye_deferred, *args, **kwargs)

View file

@ -13,5 +13,12 @@ class Track(object):
"""Apply `op` on the current values for `metric`.
This operation also resets the metric.
Returns:
a tuple (should_send, value)
"""
return op(self.data.pop(metric, []))
try:
values = self.data.pop(metric)
return True, op(values)
except KeyError:
return False, None

View file

@ -185,8 +185,24 @@ class CheckRemoteVersions(object):
return defer.fail(None)
class AlwaysSend(object):
def __init__(self, value_generator, *args, **kwargs):
self.value_generator = value_generator
self.args = args
self.kwargs = kwargs
def __call__(self):
d = defer.maybeDeferred(self.value_generator, *self.args, **self.kwargs)
d.addCallback(lambda v: (True, v))
return d
def calculate_available_blob_size(blob_manager):
return sum(blob.length for blob in blob_manager.get_all_verified_blobs())
d = blob_manager.get_all_verified_blobs()
d.addCallback(
lambda blobs: defer.DeferredList([blob_manager.get_blob_length(b) for b in blobs]))
d.addCallback(lambda blob_lengths: sum(val for success, val in blob_lengths if success))
return d
class Daemon(jsonrpc.JSONRPC):
@ -1021,9 +1037,9 @@ class Daemon(jsonrpc.JSONRPC):
self.analytics_manager = analytics.Manager(
analytics_api, events_generator, analytics.Track())
self.analytics_manager.start()
self.register_repeating_metric(
self.analytics_manager.register_repeating_metric(
analytics.BLOB_BYTES_AVAILABLE,
lambda: calculate_available_blob_size(self.session.blob_manager),
AlwaysSend(calculate_available_blob_size, self.session.blob_manager),
frequency=300
)