Merge pull request #178 from lbryio/track-data-upload-rebase

Track Data Uploaded and other stuff
This commit is contained in:
Job Evers‐Meltzer 2016-10-24 08:20:29 -07:00 committed by GitHub
commit 6f3873d99b
23 changed files with 682 additions and 319 deletions

View file

@ -1,2 +1,6 @@
from constants import *
from events import * from events import *
from api import AnalyticsApi as Api from api import Api
from track import Track
from manager import Manager

View file

@ -28,7 +28,7 @@ def log_response(fn):
return wrapper return wrapper
class AnalyticsApi(object): class Api(object):
def __init__(self, session, url, write_key): def __init__(self, session, url, write_key):
self.session = session self.session = session
self.url = url self.url = url
@ -61,7 +61,7 @@ class AnalyticsApi(object):
@classmethod @classmethod
def load(cls, session=None): def load(cls, session=None):
"""Initialize an instance using values from lbry.io.""" """Initialize an instance using values from the configuration"""
if not session: if not session:
session = sessions.FuturesSession() session = sessions.FuturesSession()
return cls( return cls(

View file

@ -0,0 +1,4 @@
"""Constants for metrics"""
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
BLOB_BYTES_AVAILABLE = 'Blob Bytes Available'

View file

@ -1,6 +1,6 @@
import logging import logging
from lbrynet.analytics import utils from lbrynet.core import utils
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -23,31 +23,39 @@ class Events(object):
self.session_id = session_id self.session_id = session_id
def heartbeat(self): def heartbeat(self):
return { return self._event('Heartbeat')
'userId': 'lbry',
'event': 'Heartbeat',
'properties': {
'lbry_id': self.lbry_id,
'session_id': self.session_id
},
'context': self.context,
'timestamp': utils.now()
}
def download_started(self, name, stream_info=None): def download_started(self, name, stream_info=None):
properties = {
'name': name,
'stream_info': get_sd_hash(stream_info)
}
return self._event('Download Started', properties)
def metric_observed(self, metric_name, value):
properties = {
'value': value,
}
return self._event(metric_name, properties)
def _event(self, event, event_properties=None):
return { return {
'userId': 'lbry', 'userId': 'lbry',
'event': 'Download Started', 'event': event,
'properties': { 'properties': self._properties(event_properties),
'lbry_id': self.lbry_id,
'session_id': self.session_id,
'name': name,
'stream_info': get_sd_hash(stream_info)
},
'context': self.context, 'context': self.context,
'timestamp': utils.now() 'timestamp': utils.isonow()
} }
def _properties(self, event_properties=None):
event_properties = event_properties or {}
properties = {
'lbry_id': self.lbry_id,
'session_id': self.session_id,
}
properties.update(event_properties)
return properties
def make_context(platform, wallet, is_dev=False): def make_context(platform, wallet, is_dev=False):
# TODO: distinguish between developer and release instances # TODO: distinguish between developer and release instances

View file

@ -0,0 +1,67 @@
from lbrynet.core import looping_call_manager
from twisted.internet import defer
from twisted.internet import task
import constants
class Manager(object):
def __init__(self, analytics_api, events_generator, track):
self.analytics_api = analytics_api
self.events_generator = events_generator
self.track = track
self.looping_call_manager = self.setup_looping_calls()
def setup_looping_calls(self):
call_manager = looping_call_manager.LoopingCallManager()
looping_calls = [
('send_heartbeat', self._send_heartbeat),
('update_tracked_metrics', self._update_tracked_metrics),
]
for name, fn in looping_calls:
call_manager.register_looping_call(name, task.LoopingCall(fn))
return call_manager
def start(self):
self.looping_call_manager.start('send_heartbeat', 60)
self.looping_call_manager.start('update_tracked_metrics', 300)
def shutdown(self):
self.looping_call_manager.shutdown()
def send_download_started(self, name, stream_info=None):
event = self.events_generator.download_started(name, stream_info)
self.analytics_api.track(event)
def register_repeating_metric(self, event_name, value_generator, frequency=300):
lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator)
self.looping_call_manager.register_looping_call(event_name, lcall)
lcall.start(frequency)
def _send_heartbeat(self):
heartbeat = self.events_generator.heartbeat()
self.analytics_api.track(heartbeat)
def _update_tracked_metrics(self):
should_send, value = self.track.summarize_and_reset(constants.BLOB_BYTES_UPLOADED)
if should_send:
event = self.events_generator.metric_observed(constants.BLOB_BYTES_UPLOADED, value)
self.analytics_api.track(event)
def _send_repeating_metric(self, event_name, value_generator):
result = value_generator()
if_deferred(result, self._send_repeating_metric_value, event_name)
def _send_repeating_metric_value(self, result, event_name):
should_send, value = result
if should_send:
event = self.events_generator.metric_observed(event_name, value)
self.analytics_api.track(event)
def if_deferred(maybe_deferred, callback, *args, **kwargs):
if isinstance(maybe_deferred, defer.Deferred):
maybe_deferred.addCallback(callback, *args, **kwargs)
else:
callback(maybe_deferred, *args, **kwargs)

View file

@ -0,0 +1,24 @@
import collections
class Track(object):
"""Track and summarize observations of metrics."""
def __init__(self):
self.data = collections.defaultdict(list)
def add_observation(self, metric, value):
self.data[metric].append(value)
def summarize_and_reset(self, metric, op=sum):
"""Apply `op` on the current values for `metric`.
This operation also resets the metric.
Returns:
a tuple (should_send, value)
"""
try:
values = self.data.pop(metric)
return True, op(values)
except KeyError:
return False, None

View file

@ -1,8 +0,0 @@
import datetime
from lbrynet.core.utils import *
def now():
"""Return utc now in isoformat with timezone"""
return datetime.datetime.utcnow().isoformat() + 'Z'

View file

@ -4,7 +4,8 @@ Some network wide and also application specific parameters
import os import os
is_generous_host = True is_generous_host = True
IS_DEVELOPMENT_VERSION = False IS_DEVELOPMENT_VERSION = (os.environ.get('LBRY_DEV') is not None)
MAX_HANDSHAKE_SIZE = 2**16 MAX_HANDSHAKE_SIZE = 2**16
MAX_REQUEST_SIZE = 2**16 MAX_REQUEST_SIZE = 2**16

View file

@ -0,0 +1,20 @@
class LoopingCallManager(object):
def __init__(self, calls=None):
self.calls = calls or {}
def register_looping_call(self, name, call):
assert name not in self.calls, '{} is already registered'.format(name)
self.calls[name] = call
def start(self, name, *args):
lcall = self.calls[name]
if not lcall.running:
lcall.start(*args)
def stop(self, name):
self.calls[name].stop()
def shutdown(self):
for lcall in self.calls.itervalues():
if lcall.running:
lcall.stop()

View file

@ -5,7 +5,9 @@ from twisted.protocols.basic import FileSender
from twisted.python.failure import Failure from twisted.python.failure import Failure
from zope.interface import implements from zope.interface import implements
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from lbrynet import analytics
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender
@ -15,15 +17,16 @@ log = logging.getLogger(__name__)
class BlobRequestHandlerFactory(object): class BlobRequestHandlerFactory(object):
implements(IQueryHandlerFactory) implements(IQueryHandlerFactory)
def __init__(self, blob_manager, wallet, payment_rate_manager): def __init__(self, blob_manager, wallet, payment_rate_manager, track):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.wallet = wallet self.wallet = wallet
self.payment_rate_manager = payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.track = track
######### IQueryHandlerFactory ######### ######### IQueryHandlerFactory #########
def build_query_handler(self): def build_query_handler(self):
q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager) q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager, self.track)
return q_h return q_h
def get_primary_query_identifier(self): def get_primary_query_identifier(self):
@ -39,11 +42,12 @@ class BlobRequestHandler(object):
BLOB_QUERY = 'requested_blob' BLOB_QUERY = 'requested_blob'
AVAILABILITY_QUERY = 'requested_blobs' AVAILABILITY_QUERY = 'requested_blobs'
def __init__(self, blob_manager, wallet, payment_rate_manager): def __init__(self, blob_manager, wallet, payment_rate_manager, track):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.wallet = wallet self.wallet = wallet
self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY]
self.track = track
self.peer = None self.peer = None
self.blob_data_payment_rate = None self.blob_data_payment_rate = None
self.read_handle = None self.read_handle = None
@ -190,8 +194,10 @@ class BlobRequestHandler(object):
return inner_d return inner_d
def count_bytes(data): def count_bytes(data):
self.blob_bytes_uploaded += len(data) uploaded = len(data)
self.peer.update_stats('blob_bytes_uploaded', len(data)) self.blob_bytes_uploaded += uploaded
self.peer.update_stats('blob_bytes_uploaded', uploaded)
self.track.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded)
return data return data
def start_transfer(): def start_transfer():

View file

@ -9,9 +9,10 @@ log = logging.getLogger(__name__)
class ServerRequestHandler(object): class ServerRequestHandler(object):
"""This class handles requests from clients. It can upload blobs and return request for information about """This class handles requests from clients. It can upload blobs and
more blobs that are associated with streams""" return request for information about more blobs that are
associated with streams.
"""
implements(interfaces.IPushProducer, interfaces.IConsumer, IRequestHandler) implements(interfaces.IPushProducer, interfaces.IConsumer, IRequestHandler)
def __init__(self, consumer): def __init__(self, consumer):
@ -90,20 +91,27 @@ class ServerRequestHandler(object):
log.debug("Received data") log.debug("Received data")
log.debug("%s", str(data)) log.debug("%s", str(data))
if self.request_received is False: if self.request_received is False:
self.request_buff = self.request_buff + data return self._parse_data_and_maybe_send_blob(data)
msg = self.try_to_parse_request(self.request_buff)
if msg is not None:
self.request_buff = ''
d = self.handle_request(msg)
if self.blob_sender is not None:
d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self))
d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler)
else:
log.debug("Request buff not a valid json message")
log.debug("Request buff: %s", str(self.request_buff))
else: else:
log.warning("The client sent data when we were uploading a file. This should not happen") log.warning("The client sent data when we were uploading a file. This should not happen")
def _parse_data_and_maybe_send_blob(self, data):
self.request_buff = self.request_buff + data
msg = self.try_to_parse_request(self.request_buff)
if msg:
self.request_buff = ''
self._process_msg(msg)
else:
log.debug("Request buff not a valid json message")
log.debug("Request buff: %s", self.request_buff)
def _process_msg(self, msg):
d = self.handle_request(msg)
if self.blob_sender:
d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self))
d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler)
######### IRequestHandler ######### ######### IRequestHandler #########
def register_query_handler(self, query_handler, query_identifiers): def register_query_handler(self, query_handler, query_identifiers):

View file

@ -11,12 +11,30 @@ import yaml
from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.cryptoutils import get_lbry_hash_obj
blobhash_length = get_lbry_hash_obj().digest_size * 2 # digest_size is in bytes, and blob hashes are hex encoded # digest_size is in bytes, and blob hashes are hex encoded
blobhash_length = get_lbry_hash_obj().digest_size * 2
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# defining these time functions here allows for easier overriding in testing
def now():
return datetime.datetime.now()
def utcnow():
return datetime.datetime.utcnow()
def isonow():
"""Return utc now in isoformat with timezone"""
return utcnow().isoformat() + 'Z'
def today():
return datetime.datetime.today()
def generate_id(num=None): def generate_id(num=None):
h = get_lbry_hash_obj() h = get_lbry_hash_obj()
if num is not None: if num is not None:
@ -26,18 +44,19 @@ def generate_id(num=None):
return h.digest() return h.digest()
def is_valid_hashcharacter(char):
return char in "0123456789abcdef"
def is_valid_blobhash(blobhash): def is_valid_blobhash(blobhash):
""" """Checks whether the blobhash is the correct length and contains only
valid characters (0-9, a-f)
@param blobhash: string, the blobhash to check @param blobhash: string, the blobhash to check
@return: Whether the blobhash is the correct length and contains only valid characters (0-9, a-f) @return: True/False
""" """
if len(blobhash) != blobhash_length: return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash)
return False
for l in blobhash:
if l not in "0123456789abcdef":
return False
return True
def version_is_greater_than(a, b): def version_is_greater_than(a, b):
@ -86,10 +105,6 @@ def save_settings(path, settings):
f.close() f.close()
def today():
return datetime.datetime.today()
def check_connection(server="www.lbry.io", port=80): def check_connection(server="www.lbry.io", port=80):
"""Attempts to open a socket to server:port and returns True if successful.""" """Attempts to open a socket to server:port and returns True if successful."""
try: try:

View file

@ -1,3 +1,5 @@
# TODO: THERE IS A LOT OF CODE IN THIS MODULE THAT SHOULD BE REMOVED
# AS IT IS REPEATED IN THE LBRYDaemon MODULE
import logging import logging
import os.path import os.path
import argparse import argparse
@ -12,6 +14,7 @@ from yapsy.PluginManager import PluginManager
from twisted.internet import defer, threads, stdio, task, error from twisted.internet import defer, threads, stdio, task, error
from jsonrpc.proxy import JSONRPCProxy from jsonrpc.proxy import JSONRPCProxy
from lbrynet import analytics
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl
from lbrynet.lbrynet_console.Settings import Settings from lbrynet.lbrynet_console.Settings import Settings
@ -366,11 +369,13 @@ class Console():
] ]
def get_blob_request_handler_factory(rate): def get_blob_request_handler_factory(rate):
self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager, self.blob_request_payment_rate_manager = PaymentRateManager(
rate) self.session.base_payment_rate_manager, rate
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, )
self.session.wallet, handlers.append(BlobRequestHandlerFactory(
self.blob_request_payment_rate_manager)) self.session.blob_manager, self.session.wallet,
self.blob_request_payment_rate_manager, analytics.Track()
))
d1 = self.settings.get_server_data_payment_rate() d1 = self.settings.get_server_data_payment_rate()
d1.addCallback(get_blob_request_handler_factory) d1.addCallback(get_blob_request_handler_factory)

View file

@ -25,8 +25,10 @@ from txjsonrpc.web.jsonrpc import Handler
from jsonschema import ValidationError from jsonschema import ValidationError
from lbrynet import __version__ as lbrynet_version from lbrynet import __version__ as lbrynet_version
# TODO: importing this when internet is disabled raises a socket.gaierror
from lbryum.version import LBRYUM_VERSION as lbryum_version from lbryum.version import LBRYUM_VERSION as lbryum_version
from lbrynet import analytics from lbrynet import analytics
from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError
@ -125,16 +127,104 @@ BAD_REQUEST = 400
NOT_FOUND = 404 NOT_FOUND = 404
OK_CODE = 200 OK_CODE = 200
class Checker:
"""The looping calls the daemon runs"""
INTERNET_CONNECTION = 'internet_connection_checker'
VERSION = 'version_checker'
CONNECTION_PROBLEM = 'connection_problem_checker'
PENDING_CLAIM = 'pending_claim_checker'
class FileID:
"""The different ways a file can be identified"""
NAME = 'name'
SD_HASH = 'sd_hash'
FILE_NAME = 'file_name'
# TODO add login credentials in a conf file # TODO add login credentials in a conf file
# TODO alert if your copy of a lbry file is out of date with the name record # TODO alert if your copy of a lbry file is out of date with the name record
class NoValidSearch(Exception):
pass
class Parameters(object): class Parameters(object):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.__dict__.update(kwargs) self.__dict__.update(kwargs)
class CheckInternetConnection(object):
def __init__(self, daemon):
self.daemon = daemon
def __call__(self):
self.daemon.connected_to_internet = utils.check_connection()
class CheckRemoteVersions(object):
def __init__(self, daemon):
self.daemon = daemon
def __call__(self):
d = self._get_lbrynet_version()
d.addCallback(lambda _: self._get_lbryum_version())
def _get_lbryum_version(self):
try:
version = get_lbryum_version_from_github()
log.info(
"remote lbryum %s > local lbryum %s = %s",
version, lbryum_version,
utils.version_is_greater_than(version, lbryum_version)
)
self.daemon.git_lbryum_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbryum version from git")
self.daemon.git_lbryum_version = None
return defer.fail(None)
def _get_lbrynet_version(self):
try:
version = get_lbrynet_version_from_github()
log.info(
"remote lbrynet %s > local lbrynet %s = %s",
version, lbrynet_version,
utils.version_is_greater_than(version, lbrynet_version)
)
self.daemon.git_lbrynet_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbrynet version from git")
self.daemon.git_lbrynet_version = None
return defer.fail(None)
class AlwaysSend(object):
def __init__(self, value_generator, *args, **kwargs):
self.value_generator = value_generator
self.args = args
self.kwargs = kwargs
def __call__(self):
d = defer.maybeDeferred(self.value_generator, *self.args, **self.kwargs)
d.addCallback(lambda v: (True, v))
return d
def calculate_available_blob_size(blob_manager):
d = blob_manager.get_all_verified_blobs()
d.addCallback(
lambda blobs: defer.DeferredList([blob_manager.get_blob_length(b) for b in blobs]))
d.addCallback(lambda blob_lengths: sum(val for success, val in blob_lengths if success))
return d
class Daemon(jsonrpc.JSONRPC): class Daemon(jsonrpc.JSONRPC):
""" """
LBRYnet daemon, a jsonrpc interface to lbry functions LBRYnet daemon, a jsonrpc interface to lbry functions
@ -342,12 +432,13 @@ class Daemon(jsonrpc.JSONRPC):
self.wallet_user = None self.wallet_user = None
self.wallet_password = None self.wallet_password = None
self.internet_connection_checker = LoopingCall(self._check_network_connection) calls = {
self.version_checker = LoopingCall(self._check_remote_versions) Checker.INTERNET_CONNECTION: LoopingCall(CheckInternetConnection(self)),
self.connection_problem_checker = LoopingCall(self._check_connection_problems) Checker.VERSION: LoopingCall(CheckRemoteVersions(self)),
self.pending_claim_checker = LoopingCall(self._check_pending_claims) Checker.CONNECTION_PROBLEM: LoopingCall(self._check_connection_problems),
self.send_heartbeat = LoopingCall(self._send_heartbeat) Checker.PENDING_CLAIM: LoopingCall(self._check_pending_claims),
# self.lbrynet_connection_checker = LoopingCall(self._check_lbrynet_connection) }
self.looping_call_manager = LoopingCallManager(calls)
self.sd_identifier = StreamDescriptorIdentifier() self.sd_identifier = StreamDescriptorIdentifier()
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -519,9 +610,9 @@ class Daemon(jsonrpc.JSONRPC):
log.info("Starting lbrynet-daemon") log.info("Starting lbrynet-daemon")
self.internet_connection_checker.start(3600) self.looping_call_manager.start(Checker.INTERNET_CONNECTION, 3600)
self.version_checker.start(3600 * 12) self.looping_call_manager.start(Checker.VERSION, 3600 * 12)
self.connection_problem_checker.start(1) self.looping_call_manager.start(Checker.CONNECTION_PROBLEM, 1)
self.exchange_rate_manager.start() self.exchange_rate_manager.start()
if host_ui: if host_ui:
@ -536,8 +627,8 @@ class Daemon(jsonrpc.JSONRPC):
d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory)) d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory))
d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._check_db_migration())
d.addCallback(lambda _: self._get_settings()) d.addCallback(lambda _: self._get_settings())
d.addCallback(lambda _: self._set_events())
d.addCallback(lambda _: self._get_session()) d.addCallback(lambda _: self._get_session())
d.addCallback(lambda _: self._get_analytics())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self._setup_stream_identifier()) d.addCallback(lambda _: self._setup_stream_identifier())
d.addCallback(lambda _: self._setup_lbry_file_manager()) d.addCallback(lambda _: self._setup_lbry_file_manager())
@ -545,23 +636,11 @@ class Daemon(jsonrpc.JSONRPC):
d.addCallback(lambda _: self._setup_server()) d.addCallback(lambda _: self._setup_server())
d.addCallback(lambda _: _log_starting_vals()) d.addCallback(lambda _: _log_starting_vals())
d.addCallback(lambda _: _announce_startup()) d.addCallback(lambda _: _announce_startup())
d.addCallback(lambda _: self._load_analytics_api())
# TODO: handle errors here # TODO: handle errors here
d.callback(None) d.callback(None)
return defer.succeed(None) return defer.succeed(None)
def _load_analytics_api(self):
self.analytics_api = analytics.Api.load()
self.send_heartbeat.start(60)
def _send_heartbeat(self):
heartbeat = self._events.heartbeat()
self.analytics_api.track(heartbeat)
def _send_download_started(self, name, stream_info=None):
event = self._events.download_started(name, stream_info)
self.analytics_api.track(event)
def _get_platform(self): def _get_platform(self):
r = { r = {
@ -591,13 +670,6 @@ class Daemon(jsonrpc.JSONRPC):
d = _log_platform() d = _log_platform()
return d return d
def _set_events(self):
context = analytics.make_context(self._get_platform(), self.wallet_type)
self._events = analytics.Events(context, base58.b58encode(self.lbryid), self._session_id)
def _check_network_connection(self):
self.connected_to_internet = utils.check_connection()
def _check_lbrynet_connection(self): def _check_lbrynet_connection(self):
def _log_success(): def _log_success():
log.info("lbrynet connectivity test passed") log.info("lbrynet connectivity test passed")
@ -608,43 +680,6 @@ class Daemon(jsonrpc.JSONRPC):
d = download_sd_blob(self.session, wonderfullife_sh, self.session.base_payment_rate_manager) d = download_sd_blob(self.session, wonderfullife_sh, self.session.base_payment_rate_manager)
d.addCallbacks(lambda _: _log_success, lambda _: _log_failure) d.addCallbacks(lambda _: _log_success, lambda _: _log_failure)
def _check_remote_versions(self):
def _get_lbryum_version():
try:
r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n')
version = next(line.split("=")[1].split("#")[0].replace(" ", "")
for line in r if "LBRYUM_VERSION" in line)
version = version.replace("'", "")
log.info(
"remote lbryum %s > local lbryum %s = %s",
version, lbryum_version,
utils.version_is_greater_than(version, lbryum_version)
)
self.git_lbryum_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbryum version from git")
self.git_lbryum_version = None
return defer.fail(None)
def _get_lbrynet_version():
try:
version = get_lbrynet_version_from_github()
log.info(
"remote lbrynet %s > local lbrynet %s = %s",
version, lbrynet_version,
utils.version_is_greater_than(version, lbrynet_version)
)
self.git_lbrynet_version = version
return defer.succeed(None)
except Exception:
log.info("Failed to get lbrynet version from git")
self.git_lbrynet_version = None
return defer.fail(None)
d = _get_lbrynet_version()
d.addCallback(lambda _: _get_lbryum_version())
def _check_connection_problems(self): def _check_connection_problems(self):
if not self.git_lbrynet_version or not self.git_lbryum_version: if not self.git_lbrynet_version or not self.git_lbryum_version:
self.connection_problem = CONNECTION_PROBLEM_CODES[0] self.connection_problem = CONNECTION_PROBLEM_CODES[0]
@ -671,7 +706,7 @@ class Daemon(jsonrpc.JSONRPC):
def _get_and_start_file(name): def _get_and_start_file(name):
d = defer.succeed(self.pending_claims.pop(name)) d = defer.succeed(self.pending_claims.pop(name))
d.addCallback(lambda _: self._get_lbry_file("name", name, return_json=False)) d.addCallback(lambda _: self._get_lbry_file(FileID.NAME, name, return_json=False))
d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running") d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running")
def re_add_to_pending_claims(name): def re_add_to_pending_claims(name):
@ -765,8 +800,12 @@ class Daemon(jsonrpc.JSONRPC):
def _setup_query_handlers(self): def _setup_query_handlers(self):
handlers = [ handlers = [
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, BlobRequestHandlerFactory(
self.session.payment_rate_manager), self.session.blob_manager,
self.session.wallet,
self.session.payment_rate_manager,
self.analytics_manager.track
),
self.session.wallet.get_wallet_info_query_handler_factory(), self.session.wallet.get_wallet_info_query_handler_factory(),
] ]
@ -830,18 +869,10 @@ class Daemon(jsonrpc.JSONRPC):
def _shutdown(self): def _shutdown(self):
log.info("Closing lbrynet session") log.info("Closing lbrynet session")
log.info("Status at time of shutdown: " + self.startup_status[0]) log.info("Status at time of shutdown: " + self.startup_status[0])
if self.internet_connection_checker.running: self.looping_call_manager.shutdown()
self.internet_connection_checker.stop() self.analytics_manager.shutdown()
if self.version_checker.running:
self.version_checker.stop()
if self.connection_problem_checker.running:
self.connection_problem_checker.stop()
if self.lbry_ui_manager.update_checker.running: if self.lbry_ui_manager.update_checker.running:
self.lbry_ui_manager.update_checker.stop() self.lbry_ui_manager.update_checker.stop()
if self.pending_claim_checker.running:
self.pending_claim_checker.stop()
if self.send_heartbeat.running:
self.send_heartbeat.stop()
self._clean_up_temp_files() self._clean_up_temp_files()
@ -1020,6 +1051,20 @@ class Daemon(jsonrpc.JSONRPC):
return d return d
def _get_analytics(self):
analytics_api = analytics.Api.load()
context = analytics.make_context(self._get_platform(), self.wallet_type)
events_generator = analytics.Events(
context, base58.b58encode(self.lbryid), self._session_id)
self.analytics_manager = analytics.Manager(
analytics_api, events_generator, analytics.Track())
self.analytics_manager.start()
self.analytics_manager.register_repeating_metric(
analytics.BLOB_BYTES_AVAILABLE,
AlwaysSend(calculate_available_blob_size, self.session.blob_manager),
frequency=300
)
def _get_session(self): def _get_session(self):
def get_default_data_rate(): def get_default_data_rate():
d = self.settings.get_default_data_payment_rate() d = self.settings.get_default_data_payment_rate()
@ -1109,7 +1154,7 @@ class Daemon(jsonrpc.JSONRPC):
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
""" """
self._send_download_started(name) self.analytics_manager.send_download_started(name, stream_info)
helper = _DownloadNameHelper( helper = _DownloadNameHelper(
self, name, timeout, download_directory, file_name, wait_for_write) self, name, timeout, download_directory, file_name, wait_for_write)
@ -1239,108 +1284,13 @@ class Daemon(jsonrpc.JSONRPC):
return defer.succeed(None) return defer.succeed(None)
def _get_lbry_file(self, search_by, val, return_json=True): def _get_lbry_file(self, search_by, val, return_json=True):
def _log_get_lbry_file(f): return _GetFileHelper(self, search_by, val, return_json).retrieve_file()
if f and val:
log.info("Found LBRY file for " + search_by + ": " + val)
elif val:
log.info("Did not find LBRY file for " + search_by + ": " + val)
return f
def _get_json_for_return(f):
def _get_file_status(file_status):
message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, file_status.num_known, file_status.running_status)
return defer.succeed(message)
def _generate_reply(size):
if f.key:
key = binascii.b2a_hex(f.key)
else:
key = None
if os.path.isfile(os.path.join(self.download_directory, f.file_name)):
written_file = file(os.path.join(self.download_directory, f.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
if search_by == "name":
if val in self.streams.keys():
status = self.streams[val].code
elif f in self.lbry_file_manager.lbry_files:
# if f.stopped:
# status = STREAM_STAGES[3]
# else:
status = STREAM_STAGES[2]
else:
status = [False, False]
else:
status = [False, False]
if status[0] == DOWNLOAD_RUNNING_CODE:
d = f.status()
d.addCallback(_get_file_status)
d.addCallback(lambda message: {'completed': f.completed, 'file_name': f.file_name,
'download_directory': f.download_directory,
'download_path': os.path.join(f.download_directory, f.file_name),
'mime_type': mimetypes.guess_type(os.path.join(f.download_directory, f.file_name))[0],
'key': key,
'points_paid': f.points_paid, 'stopped': f.stopped,
'stream_hash': f.stream_hash,
'stream_name': f.stream_name,
'suggested_file_name': f.suggested_file_name,
'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash,
'lbry_uri': f.uri, 'txid': f.txid, 'claim_id': f.claim_id,
'total_bytes': size,
'written_bytes': written_bytes, 'code': status[0],
'message': message})
else:
d = defer.succeed({'completed': f.completed, 'file_name': f.file_name, 'key': key,
'download_directory': f.download_directory,
'download_path': os.path.join(f.download_directory, f.file_name),
'mime_type': mimetypes.guess_type(os.path.join(f.download_directory, f.file_name))[0],
'points_paid': f.points_paid, 'stopped': f.stopped, 'stream_hash': f.stream_hash,
'stream_name': f.stream_name, 'suggested_file_name': f.suggested_file_name,
'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash, 'total_bytes': size,
'written_bytes': written_bytes, 'lbry_uri': f.uri, 'txid': f.txid, 'claim_id': f.claim_id,
'code': status[0], 'message': status[1]})
return d
def _add_metadata(message):
def _add_to_dict(metadata):
message['metadata'] = metadata
return defer.succeed(message)
if f.txid:
d = self._resolve_name(f.uri)
d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation"))
else:
d = defer.succeed(message)
return d
if f:
d = f.get_total_bytes()
d.addCallback(_generate_reply)
d.addCallback(_add_metadata)
return d
else:
return False
if search_by == "name":
d = self._get_lbry_file_by_uri(val)
elif search_by == "sd_hash":
d = self._get_lbry_file_by_sd_hash(val)
elif search_by == "file_name":
d = self._get_lbry_file_by_file_name(val)
# d.addCallback(_log_get_lbry_file)
if return_json:
d.addCallback(_get_json_for_return)
return d
def _get_lbry_files(self): def _get_lbry_files(self):
d = defer.DeferredList([self._get_lbry_file('sd_hash', l.sd_hash) for l in self.lbry_file_manager.lbry_files]) d = defer.DeferredList([
self._get_lbry_file(FileID.SD_HASH, l.sd_hash)
for l in self.lbry_file_manager.lbry_files
])
return d return d
def _reflect(self, lbry_file): def _reflect(self, lbry_file):
@ -1348,7 +1298,7 @@ class Daemon(jsonrpc.JSONRPC):
return defer.fail(Exception("no lbry file given to reflect")) return defer.fail(Exception("no lbry file given to reflect"))
stream_hash = lbry_file.stream_hash stream_hash = lbry_file.stream_hash
if stream_hash is None: if stream_hash is None:
return defer.fail(Exception("no stream hash")) return defer.fail(Exception("no stream hash"))
@ -1690,8 +1640,7 @@ class Daemon(jsonrpc.JSONRPC):
return d return d
def jsonrpc_get_lbry_file(self, p): def jsonrpc_get_lbry_file(self, p):
""" """Get lbry file
Get lbry file
Args: Args:
'name': get file by lbry uri, 'name': get file by lbry uri,
@ -1709,15 +1658,18 @@ class Daemon(jsonrpc.JSONRPC):
'upload_allowed': bool 'upload_allowed': bool
'sd_hash': string 'sd_hash': string
""" """
d = self._get_deferred_for_lbry_file(p)
if p.keys()[0] in ['name', 'sd_hash', 'file_name']:
search_type = p.keys()[0]
d = self._get_lbry_file(search_type, p[search_type])
else:
d = defer.fail()
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
def _get_deferred_for_lbry_file(self, p):
try:
searchtype, value = get_lbry_file_search_value(p)
except NoValidSearch:
return defer.fail()
else:
return self._get_lbry_file(searchtype, value)
def jsonrpc_resolve_name(self, p): def jsonrpc_resolve_name(self, p):
""" """
Resolve stream info from a LBRY uri Resolve stream info from a LBRY uri
@ -1730,9 +1682,8 @@ class Daemon(jsonrpc.JSONRPC):
force = p.get('force', False) force = p.get('force', False)
if 'name' in p: name = p.get(FileID.NAME)
name = p['name'] if not name:
else:
return self._render_response(None, BAD_REQUEST) return self._render_response(None, BAD_REQUEST)
d = self._resolve_name(name, force_refresh=force) d = self._resolve_name(name, force_refresh=force)
@ -1749,7 +1700,7 @@ class Daemon(jsonrpc.JSONRPC):
claim info, False if no such claim exists claim info, False if no such claim exists
""" """
name = p['name'] name = p[FileID.NAME]
d = self.session.wallet.get_my_claim(name) d = self.session.wallet.get_my_claim(name)
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
@ -1771,7 +1722,7 @@ class Daemon(jsonrpc.JSONRPC):
r['amount'] = float(r['amount']) / 10**8 r['amount'] = float(r['amount']) / 10**8
return r return r
name = p['name'] name = p[FileID.NAME]
txid = p.get('txid', None) txid = p.get('txid', None)
d = self.session.wallet.get_claim_info(name, txid) d = self.session.wallet.get_claim_info(name, txid)
d.addCallback(_convert_amount_to_float) d.addCallback(_convert_amount_to_float)
@ -1784,11 +1735,11 @@ class Daemon(jsonrpc.JSONRPC):
# can spec what parameters it expects and how to set default values # can spec what parameters it expects and how to set default values
timeout = p.get('timeout', self.download_timeout) timeout = p.get('timeout', self.download_timeout)
download_directory = p.get('download_directory', self.download_directory) download_directory = p.get('download_directory', self.download_directory)
file_name = p.get('file_name') file_name = p.get(FileID.FILE_NAME)
stream_info = p.get('stream_info') stream_info = p.get('stream_info')
sd_hash = get_sd_hash(stream_info) sd_hash = get_sd_hash(stream_info)
wait_for_write = p.get('wait_for_write', True) wait_for_write = p.get('wait_for_write', True)
name = p.get('name') name = p.get(FileID.NAME)
return Parameters( return Parameters(
timeout=timeout, timeout=timeout,
download_directory=download_directory, download_directory=download_directory,
@ -1842,14 +1793,20 @@ class Daemon(jsonrpc.JSONRPC):
""" """
def _stop_file(f): def _stop_file(f):
d = self.lbry_file_manager.toggle_lbry_file_running(f) if f.stopped:
d.addCallback(lambda _: "Stopped LBRY file") return "LBRY file wasn't running"
return d else:
d = self.lbry_file_manager.toggle_lbry_file_running(f)
d.addCallback(lambda _: "Stopped LBRY file")
return d
if p.keys()[0] in ['name', 'sd_hash', 'file_name']: try:
search_type = p.keys()[0] searchtype, value = get_lbry_file_search_value(p)
d = self._get_lbry_file(search_type, p[search_type], return_json=False) except NoValidSearch:
d.addCallback(lambda l: _stop_file(l) if not l.stopped else "LBRY file wasn't running") d = defer.fail()
else:
d = self._get_lbry_file(searchtype, value, return_json=False)
d.addCallback(_stop_file)
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
@ -1867,13 +1824,19 @@ class Daemon(jsonrpc.JSONRPC):
""" """
def _start_file(f): def _start_file(f):
d = self.lbry_file_manager.toggle_lbry_file_running(f) if f.stopped:
return defer.succeed("Started LBRY file") d = self.lbry_file_manager.toggle_lbry_file_running(f)
return defer.succeed("Started LBRY file")
else:
return "LBRY file was already running"
if p.keys()[0] in ['name', 'sd_hash', 'file_name']: try:
search_type = p.keys()[0] searchtype, value = get_lbry_file_search_value(p)
d = self._get_lbry_file(search_type, p[search_type], return_json=False) except NoValidSearch:
d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running") d = defer.fail()
else:
d = self._get_lbry_file(searchtype, value, return_json=False)
d.addCallback(_start_file)
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
@ -1888,7 +1851,7 @@ class Daemon(jsonrpc.JSONRPC):
estimated cost estimated cost
""" """
name = p['name'] name = p[FileID.NAME]
d = self._get_est_cost(name) d = self._get_est_cost(name)
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
@ -1940,21 +1903,23 @@ class Daemon(jsonrpc.JSONRPC):
confirmation message confirmation message
""" """
if 'delete_target_file' in p.keys(): delete_file = p.get('delete_target_file', True)
delete_file = p['delete_target_file']
else:
delete_file = True
def _delete_file(f): def _delete_file(f):
if not f:
return False
file_name = f.file_name file_name = f.file_name
d = self._delete_lbry_file(f, delete_file=delete_file) d = self._delete_lbry_file(f, delete_file=delete_file)
d.addCallback(lambda _: "Deleted LBRY file" + file_name) d.addCallback(lambda _: "Deleted LBRY file" + file_name)
return d return d
if 'name' in p.keys() or 'sd_hash' in p.keys() or 'file_name' in p.keys(): try:
search_type = [k for k in p.keys() if k != 'delete_target_file'][0] searchtype, value = get_lbry_file_search_value(p)
d = self._get_lbry_file(search_type, p[search_type], return_json=False) except NoValidSearch:
d.addCallback(lambda l: _delete_file(l) if l else False) d = defer.fail()
else:
d = self._get_lbry_file(searchtype, value, return_json=False)
d.addCallback(_delete_file)
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
@ -1979,12 +1944,12 @@ class Daemon(jsonrpc.JSONRPC):
return m return m
def _reflect_if_possible(sd_hash, txid): def _reflect_if_possible(sd_hash, txid):
d = self._get_lbry_file('sd_hash', sd_hash, return_json=False) d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
d.addCallback(self._reflect) d.addCallback(self._reflect)
d.addCallback(lambda _: txid) d.addCallback(lambda _: txid)
return d return d
name = p['name'] name = p[FileID.NAME]
log.info("Publish: ") log.info("Publish: ")
log.info(p) log.info(p)
@ -2012,8 +1977,7 @@ class Daemon(jsonrpc.JSONRPC):
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
return defer.fail(Exception("Specified file for publish doesnt exist: %s" % file_path)) return defer.fail(Exception("Specified file for publish doesnt exist: %s" % file_path))
if not self.pending_claim_checker.running: self.looping_call_manager.start(Checker.PENDING_CLAIM, 30)
self.pending_claim_checker.start(30)
d = self._resolve_name(name, force_refresh=True) d = self._resolve_name(name, force_refresh=True)
d.addErrback(lambda _: None) d.addErrback(lambda _: None)
@ -2091,7 +2055,7 @@ class Daemon(jsonrpc.JSONRPC):
txid txid
""" """
name = p['name'] name = p[FileID.NAME]
claim_id = p['claim_id'] claim_id = p['claim_id']
amount = p['amount'] amount = p['amount']
d = self.session.wallet.support_claim(name, claim_id, amount) d = self.session.wallet.support_claim(name, claim_id, amount)
@ -2131,7 +2095,7 @@ class Daemon(jsonrpc.JSONRPC):
list of name claims list of name claims
""" """
name = p['name'] name = p[FileID.NAME]
d = self.session.wallet.get_claims_for_name(name) d = self.session.wallet.get_claims_for_name(name)
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
@ -2328,7 +2292,7 @@ class Daemon(jsonrpc.JSONRPC):
Returns Returns
sd blob, dict sd blob, dict
""" """
sd_hash = p['sd_hash'] sd_hash = p[FileID.SD_HASH]
timeout = p.get('timeout', DEFAULT_SD_DOWNLOAD_TIMEOUT) timeout = p.get('timeout', DEFAULT_SD_DOWNLOAD_TIMEOUT)
d = self._download_sd_blob(sd_hash, timeout) d = self._download_sd_blob(sd_hash, timeout)
@ -2522,8 +2486,8 @@ class Daemon(jsonrpc.JSONRPC):
True or traceback True or traceback
""" """
sd_hash = p['sd_hash'] sd_hash = p[FileID.SD_HASH]
d = self._get_lbry_file('sd_hash', sd_hash, return_json=False) d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
d.addCallback(self._reflect) d.addCallback(self._reflect)
d.addCallbacks(lambda _: self._render_response(True, OK_CODE), lambda err: self._render_response(err.getTraceback(), OK_CODE)) d.addCallbacks(lambda _: self._render_response(True, OK_CODE), lambda err: self._render_response(err.getTraceback(), OK_CODE))
return d return d
@ -2604,7 +2568,7 @@ class Daemon(jsonrpc.JSONRPC):
else: else:
return 0.0 return 0.0
name = p['name'] name = p[FileID.NAME]
d = self._resolve_name(name, force_refresh=True) d = self._resolve_name(name, force_refresh=True)
d.addCallback(get_sd_hash) d.addCallback(get_sd_hash)
@ -2618,6 +2582,14 @@ class Daemon(jsonrpc.JSONRPC):
return d return d
def get_lbryum_version_from_github():
r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n')
version = next(line.split("=")[1].split("#")[0].replace(" ", "")
for line in r if "LBRYUM_VERSION" in line)
version = version.replace("'", "")
return version
def get_lbrynet_version_from_github(): def get_lbrynet_version_from_github():
"""Return the latest released version from github.""" """Return the latest released version from github."""
response = requests.get('https://api.github.com/repos/lbryio/lbry/releases/latest') response = requests.get('https://api.github.com/repos/lbryio/lbry/releases/latest')
@ -2834,3 +2806,130 @@ class _ResolveNameHelper(object):
def is_cached_name_expired(self): def is_cached_name_expired(self):
time_in_cache = self.now() - self.name_data['timestamp'] time_in_cache = self.now() - self.name_data['timestamp']
return time_in_cache >= self.daemon.cache_time return time_in_cache >= self.daemon.cache_time
class _GetFileHelper(object):
def __init__(self, daemon, search_by, val, return_json=True):
self.daemon = daemon
self.search_by = search_by
self.val = val
self.return_json = return_json
def retrieve_file(self):
d = self.search_for_file()
if self.return_json:
d.addCallback(self._get_json)
return d
def search_for_file(self):
if self.search_by == FileID.NAME:
return self.daemon._get_lbry_file_by_uri(self.val)
elif self.search_by == FileID.SD_HASH:
return self.daemon._get_lbry_file_by_sd_hash(self.val)
elif self.search_by == FileID.FILE_NAME:
return self.daemon._get_lbry_file_by_file_name(self.val)
raise Exception('{} is not a valid search operation'.format(self.search_by))
def _get_json(self, lbry_file):
if lbry_file:
d = lbry_file.get_total_bytes()
d.addCallback(self._generate_reply, lbry_file)
d.addCallback(self._add_metadata, lbry_file)
return d
else:
return False
def _generate_reply(self, size, lbry_file):
written_bytes = self._get_written_bytes(lbry_file)
code, message = self._get_status(lbry_file)
if code == DOWNLOAD_RUNNING_CODE:
d = lbry_file.status()
d.addCallback(self._get_msg_for_file_status)
d.addCallback(
lambda msg: self._get_properties_dict(lbry_file, code, msg, written_bytes, size))
else:
d = defer.succeed(
self._get_properties_dict(lbry_file, code, message, written_bytes, size))
return d
def _get_msg_for_file_status(self, file_status):
message = STREAM_STAGES[2][1] % (
file_status.name, file_status.num_completed, file_status.num_known,
file_status.running_status)
return defer.succeed(message)
def _get_key(self, lbry_file):
return binascii.b2a_hex(lbry_file.key) if lbry_file.key else None
def _full_path(self, lbry_file):
return os.path.join(lbry_file.download_directory, lbry_file.file_name)
def _get_status(self, lbry_file):
if self.search_by == FileID.NAME:
if self.val in self.daemon.streams.keys():
status = self.daemon.streams[self.val].code
elif lbry_file in self.daemon.lbry_file_manager.lbry_files:
status = STREAM_STAGES[2]
else:
status = [False, False]
else:
status = [False, False]
return status
def _get_written_bytes(self, lbry_file):
full_path = self._full_path(lbry_file)
if os.path.isfile(full_path):
with open(full_path) as written_file:
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
else:
written_bytes = False
return written_bytes
def _get_properties_dict(self, lbry_file, code, message, written_bytes, size):
key = self._get_key(lbry_file)
full_path = self._full_path(lbry_file)
mime_type = mimetypes.guess_type(full_path)[0]
return {
'completed': lbry_file.completed,
'file_name': lbry_file.file_name,
'download_directory': lbry_file.download_directory,
'points_paid': lbry_file.points_paid,
'stopped': lbry_file.stopped,
'stream_hash': lbry_file.stream_hash,
'stream_name': lbry_file.stream_name,
'suggested_file_name': lbry_file.suggested_file_name,
'upload_allowed': lbry_file.upload_allowed,
'sd_hash': lbry_file.sd_hash,
'lbry_uri': lbry_file.uri,
'txid': lbry_file.txid,
'claim_id': lbry_file.claim_id,
'download_path': full_path,
'mime_type': mime_type,
'key': key,
'total_bytes': size,
'written_bytes': written_bytes,
'code': code,
'message': message
}
def _add_metadata(self, message, lbry_file):
def _add_to_dict(metadata):
message['metadata'] = metadata
return defer.succeed(message)
if lbry_file.txid:
d = self.daemon._resolve_name(lbry_file.uri)
d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation"))
else:
d = defer.succeed(message)
return d
def get_lbry_file_search_value(p):
for searchtype in (FileID.SD_HASH, FileID.NAME, FileID.FILE_NAME):
value = p.get(searchtype)
if value:
return searchtype, value
raise NoValidSearch()

View file

@ -72,12 +72,16 @@ def main():
if meth in api.help(): if meth in api.help():
try: try:
if params: if params:
r = api.call(meth, params) resp = api.call(meth, params)
else: else:
r = api.call(meth) resp = api.call(meth)
print json.dumps(r, sort_keys=True) print json.dumps(resp, sort_keys=True)
except: except Exception:
print "Something went wrong, here's the usage for %s:" % meth # TODO: The api should return proper error codes
# and messages so that they can be passed along to the user
# instead of this generic message.
# https://app.asana.com/0/158602294500137/200173944358192
print "Something went wrong. Here's the usage for {}:".format(meth)
print api.help({'function': meth}) print api.help({'function': meth})
else: else:
print "Unknown function" print "Unknown function"

View file

@ -226,4 +226,4 @@ class DummyExchangeRateManager(object):
'amount': self.convert_currency(fee_in.currency_symbol, "LBC", fee_in.amount), 'amount': self.convert_currency(fee_in.currency_symbol, "LBC", fee_in.amount),
'address': fee_in.address 'address': fee_in.address
} }
}) })

View file

@ -10,6 +10,7 @@ import unittest
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from Crypto import Random from Crypto import Random
from Crypto.Hash import MD5 from Crypto.Hash import MD5
from lbrynet import analytics
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
@ -18,6 +19,7 @@ from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetad
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
@ -268,8 +270,11 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobAvailabilityHandlerFactory(session.blob_manager): True,
session.payment_rate_manager): True, BlobRequestHandlerFactory(
session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }
@ -394,8 +399,11 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobAvailabilityHandlerFactory(session.blob_manager): True,
session.payment_rate_manager): True, BlobRequestHandlerFactory(
session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }
@ -478,7 +486,8 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet, CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet,
session.payment_rate_manager): True, session.payment_rate_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True, session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }
@ -622,7 +631,10 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager,
analytics.Track()): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
} }

View file

View file

@ -0,0 +1,38 @@
from lbrynet.analytics import events
from twisted.trial import unittest
from tests import util
class EventsTest(unittest.TestCase):
def setUp(self):
util.resetTime(self)
self.event_generator = events.Events('any valid json datatype', 'lbry123', 'session456')
def test_heartbeat(self):
result = self.event_generator.heartbeat()
desired_result = {
'context': 'any valid json datatype',
'event': 'Heartbeat',
'properties': {'lbry_id': 'lbry123', 'session_id': 'session456'},
'timestamp': '2016-01-01T00:00:00Z',
'userId': 'lbry'
}
self.assertEqual(desired_result, result)
def test_download_started(self):
result = self.event_generator.download_started('great gatsby')
desired_result = {
'context': 'any valid json datatype',
'event': 'Download Started',
'properties': {
'lbry_id': 'lbry123',
'session_id': 'session456',
'name': 'great gatsby',
'stream_info': None,
},
'timestamp': '2016-01-01T00:00:00Z',
'userId': 'lbry'
}
self.assertEqual(desired_result, result)

View file

@ -0,0 +1,27 @@
from lbrynet import analytics
from twisted.trial import unittest
class TrackTest(unittest.TestCase):
def test_empty_summarize_is_None(self):
track = analytics.Track()
_, result = track.summarize_and_reset('a')
self.assertEqual(None, result)
def test_can_get_sum_of_metric(self):
track = analytics.Track()
track.add_observation('b', 1)
track.add_observation('b', 2)
_, result = track.summarize_and_reset('b')
self.assertEqual(3, result)
def test_summarize_resets_metric(self):
track = analytics.Track()
track.add_observation('metric', 1)
track.add_observation('metric', 2)
track.summarize_and_reset('metric')
_, result = track.summarize_and_reset('metric')
self.assertEqual(None, result)

View file

@ -5,6 +5,7 @@ from twisted.internet import defer
from twisted.test import proto_helpers from twisted.test import proto_helpers
from twisted.trial import unittest from twisted.trial import unittest
from lbrynet import analytics
from lbrynet.core import Peer from lbrynet.core import Peer
from lbrynet.core.server import BlobRequestHandler from lbrynet.core.server import BlobRequestHandler
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
@ -14,8 +15,10 @@ from tests.mocks import DummyBlobAvailabilityTracker
class TestBlobRequestHandlerQueries(unittest.TestCase): class TestBlobRequestHandlerQueries(unittest.TestCase):
def setUp(self): def setUp(self):
self.blob_manager = mock.Mock() self.blob_manager = mock.Mock()
self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) self.payment_rate_manager = NegotiatedPaymentRateManager(
self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager) BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker())
self.handler = BlobRequestHandler.BlobRequestHandler(
self.blob_manager, None, self.payment_rate_manager, None)
def test_empty_response_when_empty_query(self): def test_empty_response_when_empty_query(self):
self.assertEqual({}, self.successResultOf(self.handler.handle_queries({}))) self.assertEqual({}, self.successResultOf(self.handler.handle_queries({})))
@ -107,7 +110,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
class TestBlobRequestHandlerSender(unittest.TestCase): class TestBlobRequestHandlerSender(unittest.TestCase):
def test_nothing_happens_if_not_currently_uploading(self): def test_nothing_happens_if_not_currently_uploading(self):
handler = BlobRequestHandler.BlobRequestHandler(None, None, None) handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
handler.currently_uploading = None handler.currently_uploading = None
deferred = handler.send_blob_if_requested(None) deferred = handler.send_blob_if_requested(None)
self.assertEqual(True, self.successResultOf(deferred)) self.assertEqual(True, self.successResultOf(deferred))
@ -116,7 +119,8 @@ class TestBlobRequestHandlerSender(unittest.TestCase):
# TODO: also check that the expected payment values are set # TODO: also check that the expected payment values are set
consumer = proto_helpers.StringTransport() consumer = proto_helpers.StringTransport()
test_file = StringIO.StringIO('test') test_file = StringIO.StringIO('test')
handler = BlobRequestHandler.BlobRequestHandler(None, None, None) track = analytics.Track()
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, track)
handler.peer = mock.create_autospec(Peer.Peer) handler.peer = mock.create_autospec(Peer.Peer)
handler.currently_uploading = mock.Mock() handler.currently_uploading = mock.Mock()
handler.read_handle = test_file handler.read_handle = test_file

View file

@ -1,9 +1,10 @@
import mock
from lbrynet.metadata import Fee from lbrynet.metadata import Fee
from lbrynet.lbrynet_daemon import ExchangeRateManager from lbrynet.lbrynet_daemon import ExchangeRateManager
from twisted.trial import unittest from twisted.trial import unittest
from tests import util
class FeeFormatTest(unittest.TestCase): class FeeFormatTest(unittest.TestCase):
def test_fee_created_with_correct_inputs(self): def test_fee_created_with_correct_inputs(self):
@ -19,10 +20,7 @@ class FeeFormatTest(unittest.TestCase):
class FeeTest(unittest.TestCase): class FeeTest(unittest.TestCase):
def setUp(self): def setUp(self):
patcher = mock.patch('time.time') util.resetTime(self)
self.time = patcher.start()
self.time.return_value = 0
self.addCleanup(patcher.stop)
def test_fee_converts_to_lbc(self): def test_fee_converts_to_lbc(self):
fee_dict = { fee_dict = {
@ -31,6 +29,10 @@ class FeeTest(unittest.TestCase):
'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9" 'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9"
} }
} }
rates = {'BTCLBC': {'spot': 3.0, 'ts': 2}, 'USDBTC': {'spot': 2.0, 'ts': 3}} rates = {
'BTCLBC': {'spot': 3.0, 'ts': util.DEFAULT_ISO_TIME + 1},
'USDBTC': {'spot': 2.0, 'ts': util.DEFAULT_ISO_TIME + 2}
}
manager = ExchangeRateManager.DummyExchangeRateManager(rates) manager = ExchangeRateManager.DummyExchangeRateManager(rates)
self.assertEqual(60.0, manager.to_lbc(fee_dict).amount) result = manager.to_lbc(fee_dict).amount
self.assertEqual(60.0, result)

23
tests/util.py Normal file
View file

@ -0,0 +1,23 @@
import datetime
import time
import mock
DEFAULT_TIMESTAMP = datetime.datetime(2016, 1, 1)
DEFAULT_ISO_TIME = time.mktime(DEFAULT_TIMESTAMP.timetuple())
def resetTime(test_case, timestamp=DEFAULT_TIMESTAMP):
iso_time = time.mktime(timestamp.timetuple())
patcher = mock.patch('time.time')
patcher.start().return_value = iso_time
test_case.addCleanup(patcher.stop)
patcher = mock.patch('lbrynet.core.utils.now')
patcher.start().return_value = timestamp
test_case.addCleanup(patcher.stop)
patcher = mock.patch('lbrynet.core.utils.utcnow')
patcher.start().return_value = timestamp
test_case.addCleanup(patcher.stop)