track the amount of data uploaded

This commit is contained in:
Job Evers-Meltzer 2016-09-29 23:26:27 -05:00
parent 36ae0d5f20
commit 3a91896d8a
8 changed files with 101 additions and 21 deletions

View file

@ -1,2 +1,6 @@
from events import * from events import *
from api import AnalyticsApi as Api from api import AnalyticsApi as Api
from track import Track
# Constants for metrics
BLOB_BYTES_UPLOADED = 'blob_bytes_uploaded'

View file

@ -32,6 +32,13 @@ class Events(object):
} }
return self._event('Download Started', properties) return self._event('Download Started', properties)
def metric_observed(self, metric_name, value):
properties = {
'metric_name': metric_name,
'metric_value': value,
}
return self._event('Metric Observed', properties)
def _event(self, event, event_properties=None): def _event(self, event, event_properties=None):
return { return {
'userId': 'lbry', 'userId': 'lbry',

View file

@ -0,0 +1,17 @@
import collections
class Track(object):
"""Track and summarize observations of metrics."""
def __init__(self):
self.data = collections.defaultdict(list)
def add_observation(self, metric, value):
self.data[metric].append(value)
def summarize(self, metric, op=sum):
"""Apply `op` on the current values for `metric`.
This operation also resets the metric.
"""
return op(self.data.pop(metric, []))

View file

@ -5,7 +5,9 @@ from twisted.protocols.basic import FileSender
from twisted.python.failure import Failure from twisted.python.failure import Failure
from zope.interface import implements from zope.interface import implements
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from lbrynet import analytics
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender
@ -15,15 +17,16 @@ log = logging.getLogger(__name__)
class BlobRequestHandlerFactory(object): class BlobRequestHandlerFactory(object):
implements(IQueryHandlerFactory) implements(IQueryHandlerFactory)
def __init__(self, blob_manager, wallet, payment_rate_manager): def __init__(self, blob_manager, wallet, payment_rate_manager, track):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.wallet = wallet self.wallet = wallet
self.payment_rate_manager = payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.track = track
######### IQueryHandlerFactory ######### ######### IQueryHandlerFactory #########
def build_query_handler(self): def build_query_handler(self):
q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager) q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager, self.track)
return q_h return q_h
def get_primary_query_identifier(self): def get_primary_query_identifier(self):
@ -39,11 +42,12 @@ class BlobRequestHandler(object):
BLOB_QUERY = 'requested_blob' BLOB_QUERY = 'requested_blob'
AVAILABILITY_QUERY = 'requested_blobs' AVAILABILITY_QUERY = 'requested_blobs'
def __init__(self, blob_manager, wallet, payment_rate_manager): def __init__(self, blob_manager, wallet, payment_rate_manager, track):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.wallet = wallet self.wallet = wallet
self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY]
self.track = track
self.peer = None self.peer = None
self.blob_data_payment_rate = None self.blob_data_payment_rate = None
self.read_handle = None self.read_handle = None
@ -190,8 +194,10 @@ class BlobRequestHandler(object):
return inner_d return inner_d
def count_bytes(data): def count_bytes(data):
self.blob_bytes_uploaded += len(data) uploaded = len(data)
self.peer.update_stats('blob_bytes_uploaded', len(data)) self.blob_bytes_uploaded += uploaded
self.peer.update_stats('blob_bytes_uploaded', uploaded)
self.track.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded)
return data return data
def start_transfer(): def start_transfer():

View file

@ -12,6 +12,7 @@ from yapsy.PluginManager import PluginManager
from twisted.internet import defer, threads, stdio, task, error from twisted.internet import defer, threads, stdio, task, error
from jsonrpc.proxy import JSONRPCProxy from jsonrpc.proxy import JSONRPCProxy
from lbrynet import analytics
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl
from lbrynet.lbrynet_console.Settings import Settings from lbrynet.lbrynet_console.Settings import Settings
@ -366,11 +367,13 @@ class Console():
] ]
def get_blob_request_handler_factory(rate): def get_blob_request_handler_factory(rate):
self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager, self.blob_request_payment_rate_manager = PaymentRateManager(
rate) self.session.base_payment_rate_manager, rate
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, )
self.session.wallet, handlers.append(BlobRequestHandlerFactory(
self.blob_request_payment_rate_manager)) self.session.blob_manager, self.session.wallet,
self.blob_request_payment_rate_manager, analytics.Track()
))
d1 = self.settings.get_server_data_payment_rate() d1 = self.settings.get_server_data_payment_rate()
d1.addCallback(get_blob_request_handler_factory) d1.addCallback(get_blob_request_handler_factory)

View file

@ -10,6 +10,7 @@ import unittest
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto import Random from Crypto import Random
from Crypto.Hash import MD5 from Crypto.Hash import MD5
from lbrynet import analytics
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
@ -18,6 +19,7 @@ from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetad
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
@ -268,8 +270,11 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobAvailabilityHandlerFactory(session.blob_manager): True,
session.payment_rate_manager): True, BlobRequestHandlerFactory(
session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }
@ -394,8 +399,11 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobAvailabilityHandlerFactory(session.blob_manager): True,
session.payment_rate_manager): True, BlobRequestHandlerFactory(
session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }
@ -478,7 +486,8 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet, CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet,
session.payment_rate_manager): True, session.payment_rate_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True, session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }
@ -622,7 +631,10 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }

View file

@ -0,0 +1,27 @@
from lbrynet import analytics
from twisted.trial import unittest
class TrackTest(unittest.TestCase):
def test_empty_summarize_is_zero(self):
track = analytics.Track()
result = track.summarize('a')
self.assertEqual(0, result)
def test_can_get_sum_of_metric(self):
track = analytics.Track()
track.add_observation('b', 1)
track.add_observation('b', 2)
result = track.summarize('b')
self.assertEqual(3, result)
def test_summarize_resets_metric(self):
track = analytics.Track()
track.add_observation('metric', 1)
track.add_observation('metric', 2)
track.summarize('metric')
result = track.summarize('metric')
self.assertEqual(0, result)

View file

@ -5,6 +5,7 @@ from twisted.internet import defer
from twisted.test import proto_helpers from twisted.test import proto_helpers
from twisted.trial import unittest from twisted.trial import unittest
from lbrynet import analytics
from lbrynet.core import Peer from lbrynet.core import Peer
from lbrynet.core.server import BlobRequestHandler from lbrynet.core.server import BlobRequestHandler
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
@ -14,8 +15,10 @@ from tests.mocks import DummyBlobAvailabilityTracker
class TestBlobRequestHandlerQueries(unittest.TestCase): class TestBlobRequestHandlerQueries(unittest.TestCase):
def setUp(self): def setUp(self):
self.blob_manager = mock.Mock() self.blob_manager = mock.Mock()
self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) self.payment_rate_manager = NegotiatedPaymentRateManager(
self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager) BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker())
self.handler = BlobRequestHandler.BlobRequestHandler(
self.blob_manager, None, self.payment_rate_manager, None)
def test_empty_response_when_empty_query(self): def test_empty_response_when_empty_query(self):
self.assertEqual({}, self.successResultOf(self.handler.handle_queries({}))) self.assertEqual({}, self.successResultOf(self.handler.handle_queries({})))
@ -107,7 +110,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
class TestBlobRequestHandlerSender(unittest.TestCase): class TestBlobRequestHandlerSender(unittest.TestCase):
def test_nothing_happens_if_not_currently_uploading(self): def test_nothing_happens_if_not_currently_uploading(self):
handler = BlobRequestHandler.BlobRequestHandler(None, None, None) handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
handler.currently_uploading = None handler.currently_uploading = None
deferred = handler.send_blob_if_requested(None) deferred = handler.send_blob_if_requested(None)
self.assertEqual(True, self.successResultOf(deferred)) self.assertEqual(True, self.successResultOf(deferred))
@ -116,7 +119,8 @@ class TestBlobRequestHandlerSender(unittest.TestCase):
# TODO: also check that the expected payment values are set # TODO: also check that the expected payment values are set
consumer = proto_helpers.StringTransport() consumer = proto_helpers.StringTransport()
test_file = StringIO.StringIO('test') test_file = StringIO.StringIO('test')
handler = BlobRequestHandler.BlobRequestHandler(None, None, None) track = analytics.Track()
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, track)
handler.peer = mock.create_autospec(Peer.Peer) handler.peer = mock.create_autospec(Peer.Peer)
handler.currently_uploading = mock.Mock() handler.currently_uploading = mock.Mock()
handler.read_handle = test_file handler.read_handle = test_file