From 3a91896d8acae9b748e440ea85051cd889b767bf Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 23:26:27 -0500 Subject: [PATCH] track the amount of data uploaded --- lbrynet/analytics/__init__.py | 6 ++++- lbrynet/analytics/events.py | 7 +++++ lbrynet/analytics/track.py | 17 ++++++++++++ lbrynet/core/server/BlobRequestHandler.py | 16 +++++++---- lbrynet/lbrynet_console/Console.py | 13 +++++---- tests/functional/test_misc.py | 24 ++++++++++++----- tests/unit/analytics/test_track.py | 27 +++++++++++++++++++ .../core/server/test_BlobRequestHandler.py | 12 ++++++--- 8 files changed, 101 insertions(+), 21 deletions(-) create mode 100644 lbrynet/analytics/track.py create mode 100644 tests/unit/analytics/test_track.py diff --git a/lbrynet/analytics/__init__.py b/lbrynet/analytics/__init__.py index 598751034..6d6c562f5 100644 --- a/lbrynet/analytics/__init__.py +++ b/lbrynet/analytics/__init__.py @@ -1,2 +1,6 @@ from events import * -from api import AnalyticsApi as Api \ No newline at end of file +from api import AnalyticsApi as Api +from track import Track + +# Constants for metrics +BLOB_BYTES_UPLOADED = 'blob_bytes_uploaded' diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py index 94d6ce679..1a72fbaaa 100644 --- a/lbrynet/analytics/events.py +++ b/lbrynet/analytics/events.py @@ -32,6 +32,13 @@ class Events(object): } return self._event('Download Started', properties) + def metric_observed(self, metric_name, value): + properties = { + 'metric_name': metric_name, + 'metric_value': value, + } + return self._event('Metric Observed', properties) + def _event(self, event, event_properties=None): return { 'userId': 'lbry', diff --git a/lbrynet/analytics/track.py b/lbrynet/analytics/track.py new file mode 100644 index 000000000..c14952bf8 --- /dev/null +++ b/lbrynet/analytics/track.py @@ -0,0 +1,17 @@ +import collections + + +class Track(object): + """Track and summarize observations of metrics.""" + def __init__(self): + self.data = collections.defaultdict(list) + + def add_observation(self, metric, value): + self.data[metric].append(value) + + def summarize(self, metric, op=sum): + """Apply `op` on the current values for `metric`. + + This operation also resets the metric. + """ + return op(self.data.pop(metric, [])) diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 15874c215..adf60f5b9 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -5,7 +5,9 @@ from twisted.protocols.basic import FileSender from twisted.python.failure import Failure from zope.interface import implements + from lbrynet.core.Offer import Offer +from lbrynet import analytics from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender @@ -15,15 +17,16 @@ log = logging.getLogger(__name__) class BlobRequestHandlerFactory(object): implements(IQueryHandlerFactory) - def __init__(self, blob_manager, wallet, payment_rate_manager): + def __init__(self, blob_manager, wallet, payment_rate_manager, track): self.blob_manager = blob_manager self.wallet = wallet self.payment_rate_manager = payment_rate_manager + self.track = track ######### IQueryHandlerFactory ######### def build_query_handler(self): - q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager) + q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager, self.track) return q_h def get_primary_query_identifier(self): @@ -39,11 +42,12 @@ class BlobRequestHandler(object): BLOB_QUERY = 'requested_blob' AVAILABILITY_QUERY = 'requested_blobs' - def __init__(self, blob_manager, wallet, payment_rate_manager): + def __init__(self, blob_manager, wallet, payment_rate_manager, track): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] + self.track = track self.peer = None self.blob_data_payment_rate = None self.read_handle = None @@ -190,8 +194,10 @@ class BlobRequestHandler(object): return inner_d def count_bytes(data): - self.blob_bytes_uploaded += len(data) - self.peer.update_stats('blob_bytes_uploaded', len(data)) + uploaded = len(data) + self.blob_bytes_uploaded += uploaded + self.peer.update_stats('blob_bytes_uploaded', uploaded) + self.track.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded) return data def start_transfer(): diff --git a/lbrynet/lbrynet_console/Console.py b/lbrynet/lbrynet_console/Console.py index 3f700222e..5282b68b8 100644 --- a/lbrynet/lbrynet_console/Console.py +++ b/lbrynet/lbrynet_console/Console.py @@ -12,6 +12,7 @@ from yapsy.PluginManager import PluginManager from twisted.internet import defer, threads, stdio, task, error from jsonrpc.proxy import JSONRPCProxy +from lbrynet import analytics from lbrynet.core.Session import Session from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl from lbrynet.lbrynet_console.Settings import Settings @@ -366,11 +367,13 @@ class Console(): ] def get_blob_request_handler_factory(rate): - self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager, - rate) - handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, - self.session.wallet, - self.blob_request_payment_rate_manager)) + self.blob_request_payment_rate_manager = PaymentRateManager( + self.session.base_payment_rate_manager, rate + ) + handlers.append(BlobRequestHandlerFactory( + self.session.blob_manager, self.session.wallet, + self.blob_request_payment_rate_manager, analytics.Track() + )) d1 = self.settings.get_server_data_payment_rate() d1.addCallback(get_blob_request_handler_factory) diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 8b8ec0de8..0ee1baebd 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -10,6 +10,7 @@ import unittest from Crypto.PublicKey import RSA from Crypto import Random from Crypto.Hash import MD5 +from lbrynet import analytics from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager @@ -18,6 +19,7 @@ from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetad from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger from lbrynet.core.Session import Session +from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier @@ -268,8 +270,11 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat server_port = None query_handler_factories = { - BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager): True, + BlobAvailabilityHandlerFactory(session.blob_manager): True, + BlobRequestHandlerFactory( + session.blob_manager, session.wallet, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -394,8 +399,11 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra server_port = None query_handler_factories = { - BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager): True, + BlobAvailabilityHandlerFactory(session.blob_manager): True, + BlobRequestHandlerFactory( + session.blob_manager, session.wallet, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -478,7 +486,8 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet, session.payment_rate_manager): True, BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager): True, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -622,7 +631,10 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero server_port = None query_handler_factories = { - BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, + BlobAvailabilityHandlerFactory(session.blob_manager): True, + BlobRequestHandlerFactory(session.blob_manager, session.wallet, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } diff --git a/tests/unit/analytics/test_track.py b/tests/unit/analytics/test_track.py new file mode 100644 index 000000000..e12c3d7da --- /dev/null +++ b/tests/unit/analytics/test_track.py @@ -0,0 +1,27 @@ +from lbrynet import analytics + +from twisted.trial import unittest + + +class TrackTest(unittest.TestCase): + def test_empty_summarize_is_zero(self): + track = analytics.Track() + result = track.summarize('a') + self.assertEqual(0, result) + + def test_can_get_sum_of_metric(self): + track = analytics.Track() + track.add_observation('b', 1) + track.add_observation('b', 2) + + result = track.summarize('b') + self.assertEqual(3, result) + + def test_summarize_resets_metric(self): + track = analytics.Track() + track.add_observation('metric', 1) + track.add_observation('metric', 2) + + track.summarize('metric') + result = track.summarize('metric') + self.assertEqual(0, result) diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index 31d7e48ee..6034e5192 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -5,6 +5,7 @@ from twisted.internet import defer from twisted.test import proto_helpers from twisted.trial import unittest +from lbrynet import analytics from lbrynet.core import Peer from lbrynet.core.server import BlobRequestHandler from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager @@ -14,8 +15,10 @@ from tests.mocks import DummyBlobAvailabilityTracker class TestBlobRequestHandlerQueries(unittest.TestCase): def setUp(self): self.blob_manager = mock.Mock() - self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) - self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager) + self.payment_rate_manager = NegotiatedPaymentRateManager( + BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) + self.handler = BlobRequestHandler.BlobRequestHandler( + self.blob_manager, None, self.payment_rate_manager, None) def test_empty_response_when_empty_query(self): self.assertEqual({}, self.successResultOf(self.handler.handle_queries({}))) @@ -107,7 +110,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): class TestBlobRequestHandlerSender(unittest.TestCase): def test_nothing_happens_if_not_currently_uploading(self): - handler = BlobRequestHandler.BlobRequestHandler(None, None, None) + handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None) handler.currently_uploading = None deferred = handler.send_blob_if_requested(None) self.assertEqual(True, self.successResultOf(deferred)) @@ -116,7 +119,8 @@ class TestBlobRequestHandlerSender(unittest.TestCase): # TODO: also check that the expected payment values are set consumer = proto_helpers.StringTransport() test_file = StringIO.StringIO('test') - handler = BlobRequestHandler.BlobRequestHandler(None, None, None) + track = analytics.Track() + handler = BlobRequestHandler.BlobRequestHandler(None, None, None, track) handler.peer = mock.create_autospec(Peer.Peer) handler.currently_uploading = mock.Mock() handler.read_handle = test_file