From 2e59e5e3b8c49318748153bc94a8d74de01778cc Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 11:39:09 -0700 Subject: [PATCH 01/29] extract individual functions --- lbrynet/core/server/ServerRequestHandler.py | 36 +++++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/lbrynet/core/server/ServerRequestHandler.py b/lbrynet/core/server/ServerRequestHandler.py index e1685c37b..2cb87ece6 100644 --- a/lbrynet/core/server/ServerRequestHandler.py +++ b/lbrynet/core/server/ServerRequestHandler.py @@ -9,9 +9,10 @@ log = logging.getLogger(__name__) class ServerRequestHandler(object): - """This class handles requests from clients. It can upload blobs and return request for information about - more blobs that are associated with streams""" - + """This class handles requests from clients. It can upload blobs and + return request for information about more blobs that are + associated with streams. + """ implements(interfaces.IPushProducer, interfaces.IConsumer, IRequestHandler) def __init__(self, consumer): @@ -90,20 +91,27 @@ class ServerRequestHandler(object): log.debug("Received data") log.debug("%s", str(data)) if self.request_received is False: - self.request_buff = self.request_buff + data - msg = self.try_to_parse_request(self.request_buff) - if msg is not None: - self.request_buff = '' - d = self.handle_request(msg) - if self.blob_sender is not None: - d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self)) - d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler) - else: - log.debug("Request buff not a valid json message") - log.debug("Request buff: %s", str(self.request_buff)) + return self._parse_data_and_maybe_send_blob(data) else: log.warning("The client sent data when we were uploading a file. This should not happen") + def _parse_data_and_maybe_send_blob(self, data): + self.request_buff = self.request_buff + data + msg = self.try_to_parse_request(self.request_buff) + if msg: + self.request_buff = '' + self._process_msg(msg) + else: + log.debug("Request buff not a valid json message") + log.debug("Request buff: %s", self.request_buff) + + def _process_msg(self, msg): + d = self.handle_request(msg) + if self.blob_sender: + d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self)) + d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler) + + ######### IRequestHandler ######### def register_query_handler(self, query_handler, query_identifiers): From 93e2b3e20a0a2cf9449e31d6a13cdcbce2047914 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 14:42:31 -0700 Subject: [PATCH 02/29] Extract some looping call code out of the daemon --- lbrynet/lbrynet_daemon/Daemon.py | 148 +++++++++++++++++++------------ 1 file changed, 90 insertions(+), 58 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 44fa1902b..670b9e029 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -25,6 +25,7 @@ from txjsonrpc.web.jsonrpc import Handler from jsonschema import ValidationError from lbrynet import __version__ as lbrynet_version +# TODO: importing this when internet is disabled raises a socket.gaierror from lbryum.version import LBRYUM_VERSION as lbryum_version from lbrynet import analytics from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory @@ -129,12 +130,77 @@ OK_CODE = 200 # TODO alert if your copy of a lbry file is out of date with the name record - class Parameters(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) +class LoopingCallManager(object): + def __init__(self): + self.calls = {} + + def register_looping_call(self, name, *args): + self.calls[name] = LoopingCall(*args) + + def start(self, name, *args): + self.calls[name].start(*args) + + def stop(self, name): + self.calls[name].stop() + + def shutdown(self): + for lcall in self.calls.itervalues(): + if lcall.running: + lcall.stop() + + +class CheckInternetConnection(object): + def __init__(self, daemon): + self.daemon = daemon + + def __call__(self): + self.daemon.connected_to_internet = utils.check_connection() + + +class CheckRemoteVersions(object): + def __init__(self, daemon): + self.daemon = daemon + + def __call__(self): + d = self._get_lbrynet_version() + d.addCallback(lambda _: self._get_lbryum_version()) + + def _get_lbryum_version(self): + try: + version = get_lbryum_version_from_github() + log.info( + "remote lbryum %s > local lbryum %s = %s", + version, lbryum_version, + utils.version_is_greater_than(version, lbryum_version) + ) + self.daemon.git_lbryum_version = version + return defer.succeed(None) + except Exception: + log.info("Failed to get lbryum version from git") + self.daemon.git_lbryum_version = None + return defer.fail(None) + + def _get_lbrynet_version(self): + try: + version = get_lbrynet_version_from_github() + log.info( + "remote lbrynet %s > local lbrynet %s = %s", + version, lbrynet_version, + utils.version_is_greater_than(version, lbrynet_version) + ) + self.daemon.git_lbrynet_version = version + return defer.succeed(None) + except Exception: + log.info("Failed to get lbrynet version from git") + self.daemon.git_lbrynet_version = None + return defer.fail(None) + + class Daemon(jsonrpc.JSONRPC): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -342,12 +408,17 @@ class Daemon(jsonrpc.JSONRPC): self.wallet_user = None self.wallet_password = None - self.internet_connection_checker = LoopingCall(self._check_network_connection) - self.version_checker = LoopingCall(self._check_remote_versions) - self.connection_problem_checker = LoopingCall(self._check_connection_problems) - self.pending_claim_checker = LoopingCall(self._check_pending_claims) - self.send_heartbeat = LoopingCall(self._send_heartbeat) - # self.lbrynet_connection_checker = LoopingCall(self._check_lbrynet_connection) + self.looping_call_manager = LoopingCallManager() + looping_calls = [ + ('internet_connection_checker', CheckInternetConnection(self)), + ('version_checker', CheckRemoteVersions(self)), + ('connection_problem_checker', self._check_connection_problems), + ('pending_claim_checker', self._check_pending_claims), + ('send_heartbeat', self._send_heartbeat), + ('send_tracked_metrics', self._send_tracked_metrics), + ] + for name, fn in looping_calls: + self.looping_call_manager.register_looping_call(name, fn) self.sd_identifier = StreamDescriptorIdentifier() self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -519,9 +590,9 @@ class Daemon(jsonrpc.JSONRPC): log.info("Starting lbrynet-daemon") - self.internet_connection_checker.start(3600) - self.version_checker.start(3600 * 12) - self.connection_problem_checker.start(1) + self.looping_call_manager.start('internet_connection_checker', 3600) + self.looping_call_manager.start('version_checker', 3600 * 12) + self.looping_call_manager.start('connection_problem_checker', 1) self.exchange_rate_manager.start() if host_ui: @@ -595,9 +666,6 @@ class Daemon(jsonrpc.JSONRPC): context = analytics.make_context(self._get_platform(), self.wallet_type) self._events = analytics.Events(context, base58.b58encode(self.lbryid), self._session_id) - def _check_network_connection(self): - self.connected_to_internet = utils.check_connection() - def _check_lbrynet_connection(self): def _log_success(): log.info("lbrynet connectivity test passed") @@ -608,43 +676,6 @@ class Daemon(jsonrpc.JSONRPC): d = download_sd_blob(self.session, wonderfullife_sh, self.session.base_payment_rate_manager) d.addCallbacks(lambda _: _log_success, lambda _: _log_failure) - def _check_remote_versions(self): - def _get_lbryum_version(): - try: - r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n') - version = next(line.split("=")[1].split("#")[0].replace(" ", "") - for line in r if "LBRYUM_VERSION" in line) - version = version.replace("'", "") - log.info( - "remote lbryum %s > local lbryum %s = %s", - version, lbryum_version, - utils.version_is_greater_than(version, lbryum_version) - ) - self.git_lbryum_version = version - return defer.succeed(None) - except Exception: - log.info("Failed to get lbryum version from git") - self.git_lbryum_version = None - return defer.fail(None) - - def _get_lbrynet_version(): - try: - version = get_lbrynet_version_from_github() - log.info( - "remote lbrynet %s > local lbrynet %s = %s", - version, lbrynet_version, - utils.version_is_greater_than(version, lbrynet_version) - ) - self.git_lbrynet_version = version - return defer.succeed(None) - except Exception: - log.info("Failed to get lbrynet version from git") - self.git_lbrynet_version = None - return defer.fail(None) - - d = _get_lbrynet_version() - d.addCallback(lambda _: _get_lbryum_version()) - def _check_connection_problems(self): if not self.git_lbrynet_version or not self.git_lbryum_version: self.connection_problem = CONNECTION_PROBLEM_CODES[0] @@ -830,18 +861,11 @@ class Daemon(jsonrpc.JSONRPC): def _shutdown(self): log.info("Closing lbrynet session") log.info("Status at time of shutdown: " + self.startup_status[0]) - if self.internet_connection_checker.running: - self.internet_connection_checker.stop() - if self.version_checker.running: - self.version_checker.stop() - if self.connection_problem_checker.running: - self.connection_problem_checker.stop() + self.looping_call_manager.shutdown() if self.lbry_ui_manager.update_checker.running: self.lbry_ui_manager.update_checker.stop() if self.pending_claim_checker.running: self.pending_claim_checker.stop() - if self.send_heartbeat.running: - self.send_heartbeat.stop() self._clean_up_temp_files() @@ -2618,6 +2642,14 @@ class Daemon(jsonrpc.JSONRPC): return d +def get_lbryum_version_from_github(): + r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n') + version = next(line.split("=")[1].split("#")[0].replace(" ", "") + for line in r if "LBRYUM_VERSION" in line) + version = version.replace("'", "") + return version + + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" response = requests.get('https://api.github.com/repos/lbryio/lbry/releases/latest') From d0a82c68df4402817825f3e8087817ede47ca393 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 14:51:48 -0700 Subject: [PATCH 03/29] move analytics related calls into their own class --- lbrynet/analytics/api.py | 2 +- lbrynet/lbrynet_daemon/Daemon.py | 49 ++++++++++++++++++++------------ 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/lbrynet/analytics/api.py b/lbrynet/analytics/api.py index 2b8a3344c..db059bd9b 100644 --- a/lbrynet/analytics/api.py +++ b/lbrynet/analytics/api.py @@ -61,7 +61,7 @@ class AnalyticsApi(object): @classmethod def load(cls, session=None): - """Initialize an instance using values from lbry.io.""" + """Initialize an instance using values from the configuration""" if not session: session = sessions.FuturesSession() return cls( diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 670b9e029..b20d86969 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -201,6 +201,31 @@ class CheckRemoteVersions(object): return defer.fail(None) +class AnalyticsManager(object): + def __init__(self): + self.analytics_api = None + self.events_generator = None + self.send_heartbeat = LoopingCall(self._send_heartbeat) + + def start(self, platform, wallet_type, lbry_id, session_id): + context = analytics.make_context(platform, wallet_type) + self.events_generator = analytics.Events(context, base58.b58encode(lbry_id), session_id) + self.analytics_api = analytics.Api.load() + self.send_heartbeat.start(60) + + def shutdown(self): + if self.send_heartbeat.running: + self.send_heartbeat.stop() + + def send_download_started(self, name, stream_info=None): + event = self.events_generator.download_started(name, stream_info) + self.analytics_api.track(event) + + def _send_heartbeat(self): + heartbeat = self.events_generator.heartbeat() + self.analytics_api.track(heartbeat) + + class Daemon(jsonrpc.JSONRPC): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -238,6 +263,7 @@ class Daemon(jsonrpc.JSONRPC): self.first_run_after_update = False self.uploaded_temp_files = [] self._session_id = base58.b58encode(generate_id()) + self.analytics_manager = AnalyticsManager() if os.name == "nt": from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle @@ -414,7 +440,6 @@ class Daemon(jsonrpc.JSONRPC): ('version_checker', CheckRemoteVersions(self)), ('connection_problem_checker', self._check_connection_problems), ('pending_claim_checker', self._check_pending_claims), - ('send_heartbeat', self._send_heartbeat), ('send_tracked_metrics', self._send_tracked_metrics), ] for name, fn in looping_calls: @@ -616,23 +641,14 @@ class Daemon(jsonrpc.JSONRPC): d.addCallback(lambda _: self._setup_server()) d.addCallback(lambda _: _log_starting_vals()) d.addCallback(lambda _: _announce_startup()) - d.addCallback(lambda _: self._load_analytics_api()) + d.addCallback( + lambda _: self.analytics_manager.start( + self._get_platform(), self.wallet_type, self.lbryid, self._session_id)) # TODO: handle errors here d.callback(None) return defer.succeed(None) - def _load_analytics_api(self): - self.analytics_api = analytics.Api.load() - self.send_heartbeat.start(60) - - def _send_heartbeat(self): - heartbeat = self._events.heartbeat() - self.analytics_api.track(heartbeat) - - def _send_download_started(self, name, stream_info=None): - event = self._events.download_started(name, stream_info) - self.analytics_api.track(event) def _get_platform(self): r = { @@ -662,10 +678,6 @@ class Daemon(jsonrpc.JSONRPC): d = _log_platform() return d - def _set_events(self): - context = analytics.make_context(self._get_platform(), self.wallet_type) - self._events = analytics.Events(context, base58.b58encode(self.lbryid), self._session_id) - def _check_lbrynet_connection(self): def _log_success(): log.info("lbrynet connectivity test passed") @@ -862,6 +874,7 @@ class Daemon(jsonrpc.JSONRPC): log.info("Closing lbrynet session") log.info("Status at time of shutdown: " + self.startup_status[0]) self.looping_call_manager.shutdown() + self.analytics_manager.shutdown() if self.lbry_ui_manager.update_checker.running: self.lbry_ui_manager.update_checker.stop() if self.pending_claim_checker.running: @@ -1133,7 +1146,7 @@ class Daemon(jsonrpc.JSONRPC): Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ - self._send_download_started(name) + self.analytics_manager.send_download_started(name, stream_info) helper = _DownloadNameHelper( self, name, timeout, download_directory, file_name, wait_for_write) From 66e139c56605db7f34145bc0945a8ebcce733673 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 15:24:17 -0700 Subject: [PATCH 04/29] set development version based on environment variable --- lbrynet/conf.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 4f895d69b..b98abda6a 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -4,7 +4,8 @@ Some network wide and also application specific parameters import os is_generous_host = True -IS_DEVELOPMENT_VERSION = False +IS_DEVELOPMENT_VERSION = (os.environ.get('LBRY_DEV') is not None) + MAX_HANDSHAKE_SIZE = 2**16 MAX_REQUEST_SIZE = 2**16 From 9416376a3405448506d37f33cc93487b6bc3b89d Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 17:39:02 -0700 Subject: [PATCH 05/29] refactor: add GetFileHelper Move the code related to getting a lbry file into its own class --- lbrynet/lbrynet_daemon/Daemon.py | 221 +++++++++++++++++-------------- 1 file changed, 122 insertions(+), 99 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index b20d86969..d3042c167 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -1276,105 +1276,7 @@ class Daemon(jsonrpc.JSONRPC): return defer.succeed(None) def _get_lbry_file(self, search_by, val, return_json=True): - def _log_get_lbry_file(f): - if f and val: - log.info("Found LBRY file for " + search_by + ": " + val) - elif val: - log.info("Did not find LBRY file for " + search_by + ": " + val) - return f - - def _get_json_for_return(f): - def _get_file_status(file_status): - message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, file_status.num_known, file_status.running_status) - return defer.succeed(message) - - def _generate_reply(size): - if f.key: - key = binascii.b2a_hex(f.key) - else: - key = None - - if os.path.isfile(os.path.join(self.download_directory, f.file_name)): - written_file = file(os.path.join(self.download_directory, f.file_name)) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - - if search_by == "name": - if val in self.streams.keys(): - status = self.streams[val].code - elif f in self.lbry_file_manager.lbry_files: - # if f.stopped: - # status = STREAM_STAGES[3] - # else: - status = STREAM_STAGES[2] - else: - status = [False, False] - else: - status = [False, False] - - if status[0] == DOWNLOAD_RUNNING_CODE: - d = f.status() - d.addCallback(_get_file_status) - d.addCallback(lambda message: {'completed': f.completed, 'file_name': f.file_name, - 'download_directory': f.download_directory, - 'download_path': os.path.join(f.download_directory, f.file_name), - 'mime_type': mimetypes.guess_type(os.path.join(f.download_directory, f.file_name))[0], - 'key': key, - 'points_paid': f.points_paid, 'stopped': f.stopped, - 'stream_hash': f.stream_hash, - 'stream_name': f.stream_name, - 'suggested_file_name': f.suggested_file_name, - 'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash, - 'lbry_uri': f.uri, 'txid': f.txid, 'claim_id': f.claim_id, - 'total_bytes': size, - 'written_bytes': written_bytes, 'code': status[0], - 'message': message}) - else: - d = defer.succeed({'completed': f.completed, 'file_name': f.file_name, 'key': key, - 'download_directory': f.download_directory, - 'download_path': os.path.join(f.download_directory, f.file_name), - 'mime_type': mimetypes.guess_type(os.path.join(f.download_directory, f.file_name))[0], - 'points_paid': f.points_paid, 'stopped': f.stopped, 'stream_hash': f.stream_hash, - 'stream_name': f.stream_name, 'suggested_file_name': f.suggested_file_name, - 'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash, 'total_bytes': size, - 'written_bytes': written_bytes, 'lbry_uri': f.uri, 'txid': f.txid, 'claim_id': f.claim_id, - 'code': status[0], 'message': status[1]}) - - return d - - def _add_metadata(message): - def _add_to_dict(metadata): - message['metadata'] = metadata - return defer.succeed(message) - - if f.txid: - d = self._resolve_name(f.uri) - d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation")) - else: - d = defer.succeed(message) - return d - - if f: - d = f.get_total_bytes() - d.addCallback(_generate_reply) - d.addCallback(_add_metadata) - return d - else: - return False - - if search_by == "name": - d = self._get_lbry_file_by_uri(val) - elif search_by == "sd_hash": - d = self._get_lbry_file_by_sd_hash(val) - elif search_by == "file_name": - d = self._get_lbry_file_by_file_name(val) - # d.addCallback(_log_get_lbry_file) - if return_json: - d.addCallback(_get_json_for_return) - return d + return _GetFileHelper(self, search_by, val, return_json).retrieve_file() def _get_lbry_files(self): d = defer.DeferredList([self._get_lbry_file('sd_hash', l.sd_hash) for l in self.lbry_file_manager.lbry_files]) @@ -2879,3 +2781,124 @@ class _ResolveNameHelper(object): def is_cached_name_expired(self): time_in_cache = self.now() - self.name_data['timestamp'] return time_in_cache >= self.daemon.cache_time + + +class _GetFileHelper(object): + def __init__(self, daemon, search_by, val, return_json=True): + self.daemon = daemon + self.search_by = search_by + self.val = val + self.return_json = return_json + + def retrieve_file(self): + d = self._search_for_file() + if self.return_json: + d.addCallback(self._get_json) + return d + + def search_for_file(self): + if self.search_by == "name": + return self.daemon._get_lbry_file_by_uri(self.val) + elif self.search_by == "sd_hash": + return self.daemon._get_lbry_file_by_sd_hash(self.val) + elif self.search_by == "file_name": + return self.daemon._get_lbry_file_by_file_name(self.val) + raise Exception('{} is not a valid search operation'.format(self.search_by)) + + def _get_json(self, lbry_file): + if lbry_file: + d = lbry_file.get_total_bytes() + d.addCallback(self._generate_reply, lbry_file) + d.addCallback(self._add_metadata, lbry_file) + return d + else: + return False + + def _generate_reply(self, size, lbry_file): + written_bytes = self._get_written_bytes(lbry_file) + code, message = self._get_status(lbry_file) + + if code == DOWNLOAD_RUNNING_CODE: + d = lbry_file.status() + d.addCallback(self._get_file_status) + d.addCallback( + lambda msg: self._get_properties_dict(lbry_file, code, msg, written_bytes, size)) + else: + d = defer.succeed( + self._get_properties_dict(lbry_file, code, message, written_bytes, size)) + return d + + def _get_file_status(self, file_status): + message = STREAM_STAGES[2][1] % ( + file_status.name, file_status.num_completed, file_status.num_known, + file_status.running_status) + return defer.succeed(message) + + def _get_key(self, lbry_file): + return binascii.b2a_hex(lbry_file.key) if lbry_file.key else None + + def _full_path(self, lbry_file): + return os.path.join(lbry_file.download_directory, lbry_file.file_name) + + def _get_status(self, lbry_file): + if self.search_by == "name": + if self.val in self.daemon.streams.keys(): + status = self.daemon.streams[self.val].code + elif lbry_file in self.daemon.lbry_file_manager.lbry_files: + status = STREAM_STAGES[2] + else: + status = [False, False] + else: + status = [False, False] + return status + + def _get_written_bytes(self, lbry_file): + full_path = self._full_path(lbry_file) + if os.path.isfile(full_path): + with open(full_path) as written_file: + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + else: + written_bytes = False + return written_bytes + + def _get_properties_dict(self, lbry_file, code, message, written_bytes, size): + key = self._get_key(lbry_file) + full_path = self._full_path(lbry_file) + mime_type = mimetypes.guess_type(full_path)[0] + + return { + 'completed': lbry_file.completed, + 'file_name': lbry_file.file_name, + 'download_directory': lbry_file.download_directory, + 'points_paid': lbry_file.points_paid, + 'stopped': lbry_file.stopped, + 'stream_hash': lbry_file.stream_hash, + 'stream_name': lbry_file.stream_name, + 'suggested_file_name': lbry_file.suggested_file_name, + 'upload_allowed': lbry_file.upload_allowed, + 'sd_hash': lbry_file.sd_hash, + 'lbry_uri': lbry_file.uri, + 'txid': lbry_file.txid, + 'claim_id': lbry_file.claim_id, + 'download_path': full_path, + 'mime_type': mime_type, + 'key': key, + 'total_bytes': size, + 'written_bytes': written_bytes, + 'code': code, + 'message': message + } + + def _add_metadata(self, message, lbry_file): + def _add_to_dict(metadata): + message['metadata'] = metadata + return defer.succeed(message) + + if lbry_file.txid: + d = self._resolve_name(lbry_file.uri) + d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation")) + else: + d = defer.succeed(message) + return d + From c104ed3f8b9ddf0c2cb699dc74f09de6cd466d2b Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 23:06:07 -0500 Subject: [PATCH 06/29] better time handling in tests --- lbrynet/core/utils.py | 22 +++++++++++++++----- tests/unit/core/test_ExchangeRateManager.py | 16 +++++++------- tests/util.py | 23 +++++++++++++++++++++ 3 files changed, 49 insertions(+), 12 deletions(-) create mode 100644 tests/util.py diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 9491c26fa..85af1a7c4 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -11,12 +11,28 @@ import yaml from lbrynet.core.cryptoutils import get_lbry_hash_obj -blobhash_length = get_lbry_hash_obj().digest_size * 2 # digest_size is in bytes, and blob hashes are hex encoded +# digest_size is in bytes, and blob hashes are hex encoded +blobhash_length = get_lbry_hash_obj().digest_size * 2 log = logging.getLogger(__name__) +# defining this here allows for easier overriding in testing +def now(): + return datetime.datetime.now() + +def utcnow(): + return datetime.datetime.utcnow() + +def isonow(): + """Return utc now in isoformat with timezone""" + return utcnow().isoformat() + 'Z' + +def today(): + return datetime.datetime.today() + + def generate_id(num=None): h = get_lbry_hash_obj() if num is not None: @@ -86,10 +102,6 @@ def save_settings(path, settings): f.close() -def today(): - return datetime.datetime.today() - - def check_connection(server="www.lbry.io", port=80): """Attempts to open a socket to server:port and returns True if successful.""" try: diff --git a/tests/unit/core/test_ExchangeRateManager.py b/tests/unit/core/test_ExchangeRateManager.py index 96827c263..1cd24b700 100644 --- a/tests/unit/core/test_ExchangeRateManager.py +++ b/tests/unit/core/test_ExchangeRateManager.py @@ -1,9 +1,10 @@ -import mock from lbrynet.metadata import Fee from lbrynet.lbrynet_daemon import ExchangeRateManager from twisted.trial import unittest +from tests import util + class FeeFormatTest(unittest.TestCase): def test_fee_created_with_correct_inputs(self): @@ -19,10 +20,7 @@ class FeeFormatTest(unittest.TestCase): class FeeTest(unittest.TestCase): def setUp(self): - patcher = mock.patch('time.time') - self.time = patcher.start() - self.time.return_value = 0 - self.addCleanup(patcher.stop) + util.resetTime(self) def test_fee_converts_to_lbc(self): fee_dict = { @@ -31,6 +29,10 @@ class FeeTest(unittest.TestCase): 'address': "bRcHraa8bYJZL7vkh5sNmGwPDERFUjGPP9" } } - rates = {'BTCLBC': {'spot': 3.0, 'ts': 2}, 'USDBTC': {'spot': 2.0, 'ts': 3}} + rates = { + 'BTCLBC': {'spot': 3.0, 'ts': util.DEFAULT_ISO_TIME + 1}, + 'USDBTC': {'spot': 2.0, 'ts': util.DEFAULT_ISO_TIME + 2} + } manager = ExchangeRateManager.DummyExchangeRateManager(rates) - self.assertEqual(60.0, manager.to_lbc(fee_dict).amount) + result = manager.to_lbc(fee_dict).amount + self.assertEqual(60.0, result) diff --git a/tests/util.py b/tests/util.py new file mode 100644 index 000000000..34e644511 --- /dev/null +++ b/tests/util.py @@ -0,0 +1,23 @@ +import datetime +import time + +import mock + + +DEFAULT_TIMESTAMP = datetime.datetime(2016, 1, 1) +DEFAULT_ISO_TIME = time.mktime(DEFAULT_TIMESTAMP.timetuple()) + + +def resetTime(test_case, timestamp=DEFAULT_TIMESTAMP): + iso_time = time.mktime(timestamp.timetuple()) + patcher = mock.patch('time.time') + patcher.start().return_value = iso_time + test_case.addCleanup(patcher.stop) + + patcher = mock.patch('lbrynet.core.utils.now') + patcher.start().return_value = timestamp + test_case.addCleanup(patcher.stop) + + patcher = mock.patch('lbrynet.core.utils.utcnow') + patcher.start().return_value = timestamp + test_case.addCleanup(patcher.stop) From 3f727d892cf87a96a0bdb460f625be96fc867d48 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 23:15:39 -0500 Subject: [PATCH 07/29] refactor events; add tests --- lbrynet/analytics/events.py | 40 +++++++++++++++-------------- tests/unit/analytics/__init__.py | 0 tests/unit/analytics/test_events.py | 38 +++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 19 deletions(-) create mode 100644 tests/unit/analytics/__init__.py create mode 100644 tests/unit/analytics/test_events.py diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py index 8b1c63287..94d6ce679 100644 --- a/lbrynet/analytics/events.py +++ b/lbrynet/analytics/events.py @@ -1,6 +1,6 @@ import logging -from lbrynet.analytics import utils +from lbrynet.core import utils log = logging.getLogger(__name__) @@ -23,31 +23,33 @@ class Events(object): self.session_id = session_id def heartbeat(self): - return { - 'userId': 'lbry', - 'event': 'Heartbeat', - 'properties': { - 'lbry_id': self.lbry_id, - 'session_id': self.session_id - }, - 'context': self.context, - 'timestamp': utils.now() - } + return self._event('Heartbeat') def download_started(self, name, stream_info=None): + properties = { + 'name': name, + 'stream_info': get_sd_hash(stream_info) + } + return self._event('Download Started', properties) + + def _event(self, event, event_properties=None): return { 'userId': 'lbry', - 'event': 'Download Started', - 'properties': { - 'lbry_id': self.lbry_id, - 'session_id': self.session_id, - 'name': name, - 'stream_info': get_sd_hash(stream_info) - }, + 'event': event, + 'properties': self._properties(event_properties), 'context': self.context, - 'timestamp': utils.now() + 'timestamp': utils.isonow() } + def _properties(self, event_properties=None): + event_properties = event_properties or {} + properties = { + 'lbry_id': self.lbry_id, + 'session_id': self.session_id, + } + properties.update(event_properties) + return properties + def make_context(platform, wallet, is_dev=False): # TODO: distinguish between developer and release instances diff --git a/tests/unit/analytics/__init__.py b/tests/unit/analytics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/analytics/test_events.py b/tests/unit/analytics/test_events.py new file mode 100644 index 000000000..d9baf2e59 --- /dev/null +++ b/tests/unit/analytics/test_events.py @@ -0,0 +1,38 @@ +from lbrynet.analytics import events + +from twisted.trial import unittest + +from tests import util + + +class EventsTest(unittest.TestCase): + def setUp(self): + util.resetTime(self) + self.event_generator = events.Events('any valid json datatype', 'lbry123', 'session456') + + def test_heartbeat(self): + result = self.event_generator.heartbeat() + desired_result = { + 'context': 'any valid json datatype', + 'event': 'Heartbeat', + 'properties': {'lbry_id': 'lbry123', 'session_id': 'session456'}, + 'timestamp': '2016-01-01T00:00:00Z', + 'userId': 'lbry' + } + self.assertEqual(desired_result, result) + + def test_download_started(self): + result = self.event_generator.download_started('great gatsby') + desired_result = { + 'context': 'any valid json datatype', + 'event': 'Download Started', + 'properties': { + 'lbry_id': 'lbry123', + 'session_id': 'session456', + 'name': 'great gatsby', + 'stream_info': None, + }, + 'timestamp': '2016-01-01T00:00:00Z', + 'userId': 'lbry' + } + self.assertEqual(desired_result, result) From 36ae0d5f203622d1ed7f33bdfcc7959d502f8e81 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 23:22:43 -0500 Subject: [PATCH 08/29] remove utils file - function moved to core --- lbrynet/analytics/utils.py | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 lbrynet/analytics/utils.py diff --git a/lbrynet/analytics/utils.py b/lbrynet/analytics/utils.py deleted file mode 100644 index d147f8c34..000000000 --- a/lbrynet/analytics/utils.py +++ /dev/null @@ -1,8 +0,0 @@ -import datetime - -from lbrynet.core.utils import * - - -def now(): - """Return utc now in isoformat with timezone""" - return datetime.datetime.utcnow().isoformat() + 'Z' From 3a91896d8acae9b748e440ea85051cd889b767bf Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 23:26:27 -0500 Subject: [PATCH 09/29] track the amount of data uploaded --- lbrynet/analytics/__init__.py | 6 ++++- lbrynet/analytics/events.py | 7 +++++ lbrynet/analytics/track.py | 17 ++++++++++++ lbrynet/core/server/BlobRequestHandler.py | 16 +++++++---- lbrynet/lbrynet_console/Console.py | 13 +++++---- tests/functional/test_misc.py | 24 ++++++++++++----- tests/unit/analytics/test_track.py | 27 +++++++++++++++++++ .../core/server/test_BlobRequestHandler.py | 12 ++++++--- 8 files changed, 101 insertions(+), 21 deletions(-) create mode 100644 lbrynet/analytics/track.py create mode 100644 tests/unit/analytics/test_track.py diff --git a/lbrynet/analytics/__init__.py b/lbrynet/analytics/__init__.py index 598751034..6d6c562f5 100644 --- a/lbrynet/analytics/__init__.py +++ b/lbrynet/analytics/__init__.py @@ -1,2 +1,6 @@ from events import * -from api import AnalyticsApi as Api \ No newline at end of file +from api import AnalyticsApi as Api +from track import Track + +# Constants for metrics +BLOB_BYTES_UPLOADED = 'blob_bytes_uploaded' diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py index 94d6ce679..1a72fbaaa 100644 --- a/lbrynet/analytics/events.py +++ b/lbrynet/analytics/events.py @@ -32,6 +32,13 @@ class Events(object): } return self._event('Download Started', properties) + def metric_observed(self, metric_name, value): + properties = { + 'metric_name': metric_name, + 'metric_value': value, + } + return self._event('Metric Observed', properties) + def _event(self, event, event_properties=None): return { 'userId': 'lbry', diff --git a/lbrynet/analytics/track.py b/lbrynet/analytics/track.py new file mode 100644 index 000000000..c14952bf8 --- /dev/null +++ b/lbrynet/analytics/track.py @@ -0,0 +1,17 @@ +import collections + + +class Track(object): + """Track and summarize observations of metrics.""" + def __init__(self): + self.data = collections.defaultdict(list) + + def add_observation(self, metric, value): + self.data[metric].append(value) + + def summarize(self, metric, op=sum): + """Apply `op` on the current values for `metric`. + + This operation also resets the metric. + """ + return op(self.data.pop(metric, [])) diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 15874c215..adf60f5b9 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -5,7 +5,9 @@ from twisted.protocols.basic import FileSender from twisted.python.failure import Failure from zope.interface import implements + from lbrynet.core.Offer import Offer +from lbrynet import analytics from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender @@ -15,15 +17,16 @@ log = logging.getLogger(__name__) class BlobRequestHandlerFactory(object): implements(IQueryHandlerFactory) - def __init__(self, blob_manager, wallet, payment_rate_manager): + def __init__(self, blob_manager, wallet, payment_rate_manager, track): self.blob_manager = blob_manager self.wallet = wallet self.payment_rate_manager = payment_rate_manager + self.track = track ######### IQueryHandlerFactory ######### def build_query_handler(self): - q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager) + q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager, self.track) return q_h def get_primary_query_identifier(self): @@ -39,11 +42,12 @@ class BlobRequestHandler(object): BLOB_QUERY = 'requested_blob' AVAILABILITY_QUERY = 'requested_blobs' - def __init__(self, blob_manager, wallet, payment_rate_manager): + def __init__(self, blob_manager, wallet, payment_rate_manager, track): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] + self.track = track self.peer = None self.blob_data_payment_rate = None self.read_handle = None @@ -190,8 +194,10 @@ class BlobRequestHandler(object): return inner_d def count_bytes(data): - self.blob_bytes_uploaded += len(data) - self.peer.update_stats('blob_bytes_uploaded', len(data)) + uploaded = len(data) + self.blob_bytes_uploaded += uploaded + self.peer.update_stats('blob_bytes_uploaded', uploaded) + self.track.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded) return data def start_transfer(): diff --git a/lbrynet/lbrynet_console/Console.py b/lbrynet/lbrynet_console/Console.py index 3f700222e..5282b68b8 100644 --- a/lbrynet/lbrynet_console/Console.py +++ b/lbrynet/lbrynet_console/Console.py @@ -12,6 +12,7 @@ from yapsy.PluginManager import PluginManager from twisted.internet import defer, threads, stdio, task, error from jsonrpc.proxy import JSONRPCProxy +from lbrynet import analytics from lbrynet.core.Session import Session from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl from lbrynet.lbrynet_console.Settings import Settings @@ -366,11 +367,13 @@ class Console(): ] def get_blob_request_handler_factory(rate): - self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager, - rate) - handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, - self.session.wallet, - self.blob_request_payment_rate_manager)) + self.blob_request_payment_rate_manager = PaymentRateManager( + self.session.base_payment_rate_manager, rate + ) + handlers.append(BlobRequestHandlerFactory( + self.session.blob_manager, self.session.wallet, + self.blob_request_payment_rate_manager, analytics.Track() + )) d1 = self.settings.get_server_data_payment_rate() d1.addCallback(get_blob_request_handler_factory) diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 8b8ec0de8..0ee1baebd 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -10,6 +10,7 @@ import unittest from Crypto.PublicKey import RSA from Crypto import Random from Crypto.Hash import MD5 +from lbrynet import analytics from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager @@ -18,6 +19,7 @@ from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetad from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger from lbrynet.core.Session import Session +from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier @@ -268,8 +270,11 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat server_port = None query_handler_factories = { - BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager): True, + BlobAvailabilityHandlerFactory(session.blob_manager): True, + BlobRequestHandlerFactory( + session.blob_manager, session.wallet, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -394,8 +399,11 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra server_port = None query_handler_factories = { - BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager): True, + BlobAvailabilityHandlerFactory(session.blob_manager): True, + BlobRequestHandlerFactory( + session.blob_manager, session.wallet, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -478,7 +486,8 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet, session.payment_rate_manager): True, BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager): True, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -622,7 +631,10 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero server_port = None query_handler_factories = { - BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, + BlobAvailabilityHandlerFactory(session.blob_manager): True, + BlobRequestHandlerFactory(session.blob_manager, session.wallet, + session.payment_rate_manager, + analytics.Track()): True, session.wallet.get_wallet_info_query_handler_factory(): True, } diff --git a/tests/unit/analytics/test_track.py b/tests/unit/analytics/test_track.py new file mode 100644 index 000000000..e12c3d7da --- /dev/null +++ b/tests/unit/analytics/test_track.py @@ -0,0 +1,27 @@ +from lbrynet import analytics + +from twisted.trial import unittest + + +class TrackTest(unittest.TestCase): + def test_empty_summarize_is_zero(self): + track = analytics.Track() + result = track.summarize('a') + self.assertEqual(0, result) + + def test_can_get_sum_of_metric(self): + track = analytics.Track() + track.add_observation('b', 1) + track.add_observation('b', 2) + + result = track.summarize('b') + self.assertEqual(3, result) + + def test_summarize_resets_metric(self): + track = analytics.Track() + track.add_observation('metric', 1) + track.add_observation('metric', 2) + + track.summarize('metric') + result = track.summarize('metric') + self.assertEqual(0, result) diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index 31d7e48ee..6034e5192 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -5,6 +5,7 @@ from twisted.internet import defer from twisted.test import proto_helpers from twisted.trial import unittest +from lbrynet import analytics from lbrynet.core import Peer from lbrynet.core.server import BlobRequestHandler from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager @@ -14,8 +15,10 @@ from tests.mocks import DummyBlobAvailabilityTracker class TestBlobRequestHandlerQueries(unittest.TestCase): def setUp(self): self.blob_manager = mock.Mock() - self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) - self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager) + self.payment_rate_manager = NegotiatedPaymentRateManager( + BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) + self.handler = BlobRequestHandler.BlobRequestHandler( + self.blob_manager, None, self.payment_rate_manager, None) def test_empty_response_when_empty_query(self): self.assertEqual({}, self.successResultOf(self.handler.handle_queries({}))) @@ -107,7 +110,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): class TestBlobRequestHandlerSender(unittest.TestCase): def test_nothing_happens_if_not_currently_uploading(self): - handler = BlobRequestHandler.BlobRequestHandler(None, None, None) + handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None) handler.currently_uploading = None deferred = handler.send_blob_if_requested(None) self.assertEqual(True, self.successResultOf(deferred)) @@ -116,7 +119,8 @@ class TestBlobRequestHandlerSender(unittest.TestCase): # TODO: also check that the expected payment values are set consumer = proto_helpers.StringTransport() test_file = StringIO.StringIO('test') - handler = BlobRequestHandler.BlobRequestHandler(None, None, None) + track = analytics.Track() + handler = BlobRequestHandler.BlobRequestHandler(None, None, None, track) handler.peer = mock.create_autospec(Peer.Peer) handler.currently_uploading = mock.Mock() handler.read_handle = test_file From b71a3fee43a7debef06f0721e30e78dd33dd7163 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 23:35:15 -0500 Subject: [PATCH 10/29] add tracked metrics reporting to analytics --- lbrynet/lbrynet_daemon/Daemon.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index d3042c167..c57f1afe6 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -205,17 +205,22 @@ class AnalyticsManager(object): def __init__(self): self.analytics_api = None self.events_generator = None + self.track = analytics.Track() self.send_heartbeat = LoopingCall(self._send_heartbeat) + self.update_tracked_metrics = LoopingCall(self._update_tracked_metrics) def start(self, platform, wallet_type, lbry_id, session_id): context = analytics.make_context(platform, wallet_type) self.events_generator = analytics.Events(context, base58.b58encode(lbry_id), session_id) self.analytics_api = analytics.Api.load() self.send_heartbeat.start(60) + self.update_tracked_metrics.start(300) def shutdown(self): if self.send_heartbeat.running: self.send_heartbeat.stop() + if self.update_tracked_metrics.running: + self.update_tracked_metrics.stop() def send_download_started(self, name, stream_info=None): event = self.events_generator.download_started(name, stream_info) @@ -225,6 +230,12 @@ class AnalyticsManager(object): heartbeat = self.events_generator.heartbeat() self.analytics_api.track(heartbeat) + def _update_tracked_metrics(self): + value = self.track.summarize(analytics.BLOB_BYTES_UPLOADED) + if value > 0: + event = self.events_generator.metric_observered(analytics.BLOB_BYTES_UPLOADED, value) + self.analytics_api.track(event) + class Daemon(jsonrpc.JSONRPC): """ From 76fb7697fff8dedc853312276d72f8c92ce33562 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Sep 2016 23:35:32 -0500 Subject: [PATCH 11/29] misc --- lbrynet/lbrynet_console/Console.py | 2 ++ lbrynet/lbrynet_daemon/ExchangeRateManager.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lbrynet/lbrynet_console/Console.py b/lbrynet/lbrynet_console/Console.py index 5282b68b8..86be5fb83 100644 --- a/lbrynet/lbrynet_console/Console.py +++ b/lbrynet/lbrynet_console/Console.py @@ -1,3 +1,5 @@ +# TODO: THERE IS A LOT OF CODE IN THIS MODULE THAT SHOULD BE REMOVED +# AS IT IS REPEATED IN THE LBRYDaemon MODULE import logging import os.path import argparse diff --git a/lbrynet/lbrynet_daemon/ExchangeRateManager.py b/lbrynet/lbrynet_daemon/ExchangeRateManager.py index fe9b293e1..8748d5cd0 100644 --- a/lbrynet/lbrynet_daemon/ExchangeRateManager.py +++ b/lbrynet/lbrynet_daemon/ExchangeRateManager.py @@ -226,4 +226,4 @@ class DummyExchangeRateManager(object): 'amount': self.convert_currency(fee_in.currency_symbol, "LBC", fee_in.amount), 'address': fee_in.address } - }) \ No newline at end of file + }) From 48a61605de4739981e32d6cdfc8f9a4282981c46 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Sep 2016 14:03:32 -0500 Subject: [PATCH 12/29] bug fix: looping calls --- lbrynet/lbrynet_daemon/Daemon.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index c57f1afe6..7a29ec37e 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -143,7 +143,9 @@ class LoopingCallManager(object): self.calls[name] = LoopingCall(*args) def start(self, name, *args): - self.calls[name].start(*args) + lcall = self.calls[name] + if not lcall.running: + lcall.start(*args) def stop(self, name): self.calls[name].stop() @@ -888,8 +890,6 @@ class Daemon(jsonrpc.JSONRPC): self.analytics_manager.shutdown() if self.lbry_ui_manager.update_checker.running: self.lbry_ui_manager.update_checker.stop() - if self.pending_claim_checker.running: - self.pending_claim_checker.stop() self._clean_up_temp_files() @@ -1962,8 +1962,7 @@ class Daemon(jsonrpc.JSONRPC): if not os.path.isfile(file_path): return defer.fail(Exception("Specified file for publish doesnt exist: %s" % file_path)) - if not self.pending_claim_checker.running: - self.pending_claim_checker.start(30) + self.looping_call_manager.start('pending_claim_checker', 30) d = self._resolve_name(name, force_refresh=True) d.addErrback(lambda _: None) From f5213fb77acf41e1650a1670004c07d657d1bcf2 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Sep 2016 14:09:56 -0500 Subject: [PATCH 13/29] GetFileHelper bug fixes --- lbrynet/lbrynet_daemon/Daemon.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 7a29ec37e..f94a0caa7 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -2801,7 +2801,7 @@ class _GetFileHelper(object): self.return_json = return_json def retrieve_file(self): - d = self._search_for_file() + d = self.search_for_file() if self.return_json: d.addCallback(self._get_json) return d @@ -2830,7 +2830,7 @@ class _GetFileHelper(object): if code == DOWNLOAD_RUNNING_CODE: d = lbry_file.status() - d.addCallback(self._get_file_status) + d.addCallback(self._get_msg_for_file_status) d.addCallback( lambda msg: self._get_properties_dict(lbry_file, code, msg, written_bytes, size)) else: @@ -2838,7 +2838,7 @@ class _GetFileHelper(object): self._get_properties_dict(lbry_file, code, message, written_bytes, size)) return d - def _get_file_status(self, file_status): + def _get_msg_for_file_status(self, file_status): message = STREAM_STAGES[2][1] % ( file_status.name, file_status.num_completed, file_status.num_known, file_status.running_status) @@ -2906,7 +2906,7 @@ class _GetFileHelper(object): return defer.succeed(message) if lbry_file.txid: - d = self._resolve_name(lbry_file.uri) + d = self.daemon._resolve_name(lbry_file.uri) d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation")) else: d = defer.succeed(message) From f9f07c47e2f0806cedb39017a94b740a8998fb0c Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Sat, 1 Oct 2016 10:47:37 -0500 Subject: [PATCH 14/29] move analytics manager --- lbrynet/analytics/__init__.py | 6 ++--- lbrynet/analytics/api.py | 2 +- lbrynet/analytics/constants.py | 2 ++ lbrynet/analytics/manager.py | 44 ++++++++++++++++++++++++++++++++ lbrynet/lbrynet_daemon/Daemon.py | 43 +++---------------------------- 5 files changed, 54 insertions(+), 43 deletions(-) create mode 100644 lbrynet/analytics/constants.py create mode 100644 lbrynet/analytics/manager.py diff --git a/lbrynet/analytics/__init__.py b/lbrynet/analytics/__init__.py index 6d6c562f5..0b39a5e8f 100644 --- a/lbrynet/analytics/__init__.py +++ b/lbrynet/analytics/__init__.py @@ -1,6 +1,6 @@ +from constants import * from events import * -from api import AnalyticsApi as Api +from api import Api from track import Track +from manager import Manager -# Constants for metrics -BLOB_BYTES_UPLOADED = 'blob_bytes_uploaded' diff --git a/lbrynet/analytics/api.py b/lbrynet/analytics/api.py index db059bd9b..da7f4b8e4 100644 --- a/lbrynet/analytics/api.py +++ b/lbrynet/analytics/api.py @@ -28,7 +28,7 @@ def log_response(fn): return wrapper -class AnalyticsApi(object): +class Api(object): def __init__(self, session, url, write_key): self.session = session self.url = url diff --git a/lbrynet/analytics/constants.py b/lbrynet/analytics/constants.py new file mode 100644 index 000000000..059efe921 --- /dev/null +++ b/lbrynet/analytics/constants.py @@ -0,0 +1,2 @@ +# Constants for metrics +BLOB_BYTES_UPLOADED = 'blob_bytes_uploaded' diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py new file mode 100644 index 000000000..6c8fda620 --- /dev/null +++ b/lbrynet/analytics/manager.py @@ -0,0 +1,44 @@ +import base58 + +from twisted.internet import task + +import api +import constants +import events +import track + + +class Manager(object): + def __init__(self): + self.analytics_api = None + self.events_generator = None + self.track = track.Track() + self.send_heartbeat = task.LoopingCall(self._send_heartbeat) + self.update_tracked_metrics = task.LoopingCall(self._update_tracked_metrics) + + def start(self, platform, wallet_type, lbry_id, session_id): + context = events.make_context(platform, wallet_type) + self.events_generator = events.Events(context, base58.b58encode(lbry_id), session_id) + self.analytics_api = api.Api.load() + self.send_heartbeat.start(60) + self.update_tracked_metrics.start(300) + + def shutdown(self): + if self.send_heartbeat.running: + self.send_heartbeat.stop() + if self.update_tracked_metrics.running: + self.update_tracked_metrics.stop() + + def send_download_started(self, name, stream_info=None): + event = self.events_generator.download_started(name, stream_info) + self.analytics_api.track(event) + + def _send_heartbeat(self): + heartbeat = self.events_generator.heartbeat() + self.analytics_api.track(heartbeat) + + def _update_tracked_metrics(self): + value = self.track.summarize(constants.BLOB_BYTES_UPLOADED) + if value > 0: + event = self.events_generator.metric_observered(analytics.BLOB_BYTES_UPLOADED, value) + self.analytics_api.track(event) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index f94a0caa7..71051a225 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -203,40 +203,8 @@ class CheckRemoteVersions(object): return defer.fail(None) -class AnalyticsManager(object): - def __init__(self): - self.analytics_api = None - self.events_generator = None - self.track = analytics.Track() - self.send_heartbeat = LoopingCall(self._send_heartbeat) - self.update_tracked_metrics = LoopingCall(self._update_tracked_metrics) - - def start(self, platform, wallet_type, lbry_id, session_id): - context = analytics.make_context(platform, wallet_type) - self.events_generator = analytics.Events(context, base58.b58encode(lbry_id), session_id) - self.analytics_api = analytics.Api.load() - self.send_heartbeat.start(60) - self.update_tracked_metrics.start(300) - - def shutdown(self): - if self.send_heartbeat.running: - self.send_heartbeat.stop() - if self.update_tracked_metrics.running: - self.update_tracked_metrics.stop() - - def send_download_started(self, name, stream_info=None): - event = self.events_generator.download_started(name, stream_info) - self.analytics_api.track(event) - - def _send_heartbeat(self): - heartbeat = self.events_generator.heartbeat() - self.analytics_api.track(heartbeat) - - def _update_tracked_metrics(self): - value = self.track.summarize(analytics.BLOB_BYTES_UPLOADED) - if value > 0: - event = self.events_generator.metric_observered(analytics.BLOB_BYTES_UPLOADED, value) - self.analytics_api.track(event) +def calculate_available_blob_sizes(blob_manager): + return sum(blob.length for blob in blob_manager.get_all_verified_blobs()) class Daemon(jsonrpc.JSONRPC): @@ -276,7 +244,7 @@ class Daemon(jsonrpc.JSONRPC): self.first_run_after_update = False self.uploaded_temp_files = [] self._session_id = base58.b58encode(generate_id()) - self.analytics_manager = AnalyticsManager() + self.analytics_manager = analytics.Manager() if os.name == "nt": from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle @@ -453,7 +421,6 @@ class Daemon(jsonrpc.JSONRPC): ('version_checker', CheckRemoteVersions(self)), ('connection_problem_checker', self._check_connection_problems), ('pending_claim_checker', self._check_pending_claims), - ('send_tracked_metrics', self._send_tracked_metrics), ] for name, fn in looping_calls: self.looping_call_manager.register_looping_call(name, fn) @@ -645,7 +612,6 @@ class Daemon(jsonrpc.JSONRPC): d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory)) d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._get_settings()) - d.addCallback(lambda _: self._set_events()) d.addCallback(lambda _: self._get_session()) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self._setup_stream_identifier()) @@ -2876,7 +2842,6 @@ class _GetFileHelper(object): key = self._get_key(lbry_file) full_path = self._full_path(lbry_file) mime_type = mimetypes.guess_type(full_path)[0] - return { 'completed': lbry_file.completed, 'file_name': lbry_file.file_name, @@ -2899,7 +2864,7 @@ class _GetFileHelper(object): 'code': code, 'message': message } - + def _add_metadata(self, message, lbry_file): def _add_to_dict(metadata): message['metadata'] = metadata From 9da1b3c5b7a3c0d893387157583d91f0f91330ff Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Sat, 1 Oct 2016 11:19:45 -0500 Subject: [PATCH 15/29] analytics manager bug fix --- lbrynet/analytics/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 6c8fda620..94c4663f9 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -40,5 +40,5 @@ class Manager(object): def _update_tracked_metrics(self): value = self.track.summarize(constants.BLOB_BYTES_UPLOADED) if value > 0: - event = self.events_generator.metric_observered(analytics.BLOB_BYTES_UPLOADED, value) + event = self.events_generator.metric_observered(constants.BLOB_BYTES_UPLOADED, value) self.analytics_api.track(event) From 4a07b56e8aa0e050f9a380ea83f362132c3ddc27 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 5 Oct 2016 13:51:26 -0500 Subject: [PATCH 16/29] refactor analytics manager to use dependency injection --- lbrynet/analytics/manager.py | 18 +++++------------- lbrynet/lbrynet_daemon/Daemon.py | 14 ++++++++++---- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 94c4663f9..a01e33982 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -1,25 +1,17 @@ -import base58 - from twisted.internet import task -import api import constants -import events -import track class Manager(object): - def __init__(self): - self.analytics_api = None - self.events_generator = None - self.track = track.Track() + def __init__(self, analytics_api, events_generator, track): + self.analytics_api = analytics_api + self.events_generator = events_generator + self.track = track self.send_heartbeat = task.LoopingCall(self._send_heartbeat) self.update_tracked_metrics = task.LoopingCall(self._update_tracked_metrics) - def start(self, platform, wallet_type, lbry_id, session_id): - context = events.make_context(platform, wallet_type) - self.events_generator = events.Events(context, base58.b58encode(lbry_id), session_id) - self.analytics_api = api.Api.load() + def start(self): self.send_heartbeat.start(60) self.update_tracked_metrics.start(300) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 71051a225..abcae16af 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -244,7 +244,6 @@ class Daemon(jsonrpc.JSONRPC): self.first_run_after_update = False self.uploaded_temp_files = [] self._session_id = base58.b58encode(generate_id()) - self.analytics_manager = analytics.Manager() if os.name == "nt": from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle @@ -613,6 +612,7 @@ class Daemon(jsonrpc.JSONRPC): d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._get_settings()) d.addCallback(lambda _: self._get_session()) + d.addCallback(lambda _: self._get_analytics()) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self._setup_stream_identifier()) d.addCallback(lambda _: self._setup_lbry_file_manager()) @@ -620,9 +620,6 @@ class Daemon(jsonrpc.JSONRPC): d.addCallback(lambda _: self._setup_server()) d.addCallback(lambda _: _log_starting_vals()) d.addCallback(lambda _: _announce_startup()) - d.addCallback( - lambda _: self.analytics_manager.start( - self._get_platform(), self.wallet_type, self.lbryid, self._session_id)) # TODO: handle errors here d.callback(None) @@ -1034,6 +1031,15 @@ class Daemon(jsonrpc.JSONRPC): return d + def _get_analytics(self): + analytics_api = analytics.Api.load() + context = analytics.make_context(self._get_platform(), self.wallet_type) + events_generator = analytics.Events( + context, base58.b58encode(self.lbryid), self._session_id) + self.analytics_manager = analytics.Manager( + analytics_api, events_generator, analytics.Track()) + self.analytics_manager.start() + def _get_session(self): def get_default_data_rate(): d = self.settings.get_default_data_payment_rate() From d5f0001950c49fc4dce9f4decb9878daa1b51e90 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 5 Oct 2016 14:16:20 -0500 Subject: [PATCH 17/29] small cleanups --- lbrynet/core/utils.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 85af1a7c4..46f39f7dc 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -18,13 +18,15 @@ blobhash_length = get_lbry_hash_obj().digest_size * 2 log = logging.getLogger(__name__) -# defining this here allows for easier overriding in testing +# defining these time functions here allows for easier overriding in testing def now(): return datetime.datetime.now() + def utcnow(): return datetime.datetime.utcnow() + def isonow(): """Return utc now in isoformat with timezone""" return utcnow().isoformat() + 'Z' @@ -42,6 +44,10 @@ def generate_id(num=None): return h.digest() +def is_valid_hashcharacter(char): + return char in "0123456789abcdef" + + def is_valid_blobhash(blobhash): """ @param blobhash: string, the blobhash to check @@ -50,10 +56,7 @@ def is_valid_blobhash(blobhash): """ if len(blobhash) != blobhash_length: return False - for l in blobhash: - if l not in "0123456789abcdef": - return False - return True + return all(is_valid_hashcharacter(l) for l in blobhash) def version_is_greater_than(a, b): From ddb88eb1c903f3a17ebb84011900adc45847c6cb Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 5 Oct 2016 14:16:54 -0500 Subject: [PATCH 18/29] move LoopingCallManager to own module --- lbrynet/core/looping_call_manager.py | 22 ++++++++++++++++++++++ lbrynet/lbrynet_daemon/Daemon.py | 26 ++++---------------------- 2 files changed, 26 insertions(+), 22 deletions(-) create mode 100644 lbrynet/core/looping_call_manager.py diff --git a/lbrynet/core/looping_call_manager.py b/lbrynet/core/looping_call_manager.py new file mode 100644 index 000000000..ac8986d07 --- /dev/null +++ b/lbrynet/core/looping_call_manager.py @@ -0,0 +1,22 @@ + + +class LoopingCallManager(object): + def __init__(self): + self.calls = {} + + def register_looping_call(self, name, call): + assert name not in self.calls, '{} is already registered'.format(name) + self.calls[name] = call + + def start(self, name, *args): + lcall = self.calls[name] + if not lcall.running: + lcall.start(*args) + + def stop(self, name): + self.calls[name].stop() + + def shutdown(self): + for lcall in self.calls.itervalues(): + if lcall.running: + lcall.stop() diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index abcae16af..8fc1c1653 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -28,6 +28,9 @@ from lbrynet import __version__ as lbrynet_version # TODO: importing this when internet is disabled raises a socket.gaierror from lbryum.version import LBRYUM_VERSION as lbryum_version from lbrynet import analytics +from lbrynet.core.looping_call_manager import LoopingCallManager +from lbrynet.core.PaymentRateManager import PaymentRateManager +from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError @@ -135,27 +138,6 @@ class Parameters(object): self.__dict__.update(kwargs) -class LoopingCallManager(object): - def __init__(self): - self.calls = {} - - def register_looping_call(self, name, *args): - self.calls[name] = LoopingCall(*args) - - def start(self, name, *args): - lcall = self.calls[name] - if not lcall.running: - lcall.start(*args) - - def stop(self, name): - self.calls[name].stop() - - def shutdown(self): - for lcall in self.calls.itervalues(): - if lcall.running: - lcall.stop() - - class CheckInternetConnection(object): def __init__(self, daemon): self.daemon = daemon @@ -422,7 +404,7 @@ class Daemon(jsonrpc.JSONRPC): ('pending_claim_checker', self._check_pending_claims), ] for name, fn in looping_calls: - self.looping_call_manager.register_looping_call(name, fn) + self.looping_call_manager.register_looping_call(name, LoopingCall(fn)) self.sd_identifier = StreamDescriptorIdentifier() self.stream_info_manager = TempEncryptedFileMetadataManager() From 417a8f719fb8fc02f2087f0a63efc5cf5960272b Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 5 Oct 2016 14:38:58 -0500 Subject: [PATCH 19/29] Add looping call manager to analytics manager --- lbrynet/analytics/manager.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index a01e33982..60986c6e9 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -1,3 +1,5 @@ +from lbrynet.core import looping_call_manager + from twisted.internet import task import constants @@ -8,18 +10,24 @@ class Manager(object): self.analytics_api = analytics_api self.events_generator = events_generator self.track = track - self.send_heartbeat = task.LoopingCall(self._send_heartbeat) - self.update_tracked_metrics = task.LoopingCall(self._update_tracked_metrics) + self.looping_call_manager = self.setup_looping_calls() + + def setup_looping_calls(self): + call_manager = looping_call_manager.LoopingCallManager() + looping_calls = [ + ('send_heartbeat', self._send_heartbeat), + ('update_tracked_metrics', self._update_tracked_metrics), + ] + for name, fn in looping_calls: + call_manager.register_looping_call(name, task.LoopingCall(fn)) + return call_manager def start(self): - self.send_heartbeat.start(60) - self.update_tracked_metrics.start(300) + self.looping_call_manager.start('send_heartbeat', 60) + self.looping_call_manager.start('update_tracked_metrics', 300) def shutdown(self): - if self.send_heartbeat.running: - self.send_heartbeat.stop() - if self.update_tracked_metrics.running: - self.update_tracked_metrics.stop() + self.looping_call_manager.shutdown() def send_download_started(self, name, stream_info=None): event = self.events_generator.download_started(name, stream_info) From 8b1bb673c16c7503db9082a3afcafccfb832bc39 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 5 Oct 2016 14:45:17 -0500 Subject: [PATCH 20/29] report available blobs --- lbrynet/analytics/constants.py | 4 +++- lbrynet/analytics/manager.py | 11 +++++++++++ lbrynet/lbrynet_daemon/Daemon.py | 7 ++++++- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/lbrynet/analytics/constants.py b/lbrynet/analytics/constants.py index 059efe921..dcc7b92b1 100644 --- a/lbrynet/analytics/constants.py +++ b/lbrynet/analytics/constants.py @@ -1,2 +1,4 @@ -# Constants for metrics +"""Constants for metrics""" + BLOB_BYTES_UPLOADED = 'blob_bytes_uploaded' +BLOB_BYTES_AVAILABLE = 'blob_bytes_available' diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 60986c6e9..57d5bea1e 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -33,6 +33,11 @@ class Manager(object): event = self.events_generator.download_started(name, stream_info) self.analytics_api.track(event) + def register_repeating_metric(self, event_name, value_generator, frequency=300): + lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator) + self.looping_call_manager.register_looping_call(event_name, lcall) + lcall.start(frequency) + def _send_heartbeat(self): heartbeat = self.events_generator.heartbeat() self.analytics_api.track(heartbeat) @@ -42,3 +47,9 @@ class Manager(object): if value > 0: event = self.events_generator.metric_observered(constants.BLOB_BYTES_UPLOADED, value) self.analytics_api.track(event) + + def _send_repeating_metric(self, event_name, value_generator): + should_send, value = value_generator() + if should_send: + event = self.events_generator.metric_observered(event_name, value) + self.analytics_api.track(event) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 8fc1c1653..87afe9c8a 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -185,7 +185,7 @@ class CheckRemoteVersions(object): return defer.fail(None) -def calculate_available_blob_sizes(blob_manager): +def calculate_available_blob_size(blob_manager): return sum(blob.length for blob in blob_manager.get_all_verified_blobs()) @@ -1021,6 +1021,11 @@ class Daemon(jsonrpc.JSONRPC): self.analytics_manager = analytics.Manager( analytics_api, events_generator, analytics.Track()) self.analytics_manager.start() + self.register_repeating_metric( + analytics.BLOB_BYTES_AVAILABLE, + lambda: calculate_available_blob_size(self.session.blob_manager), + frequency=300 + ) def _get_session(self): def get_default_data_rate(): From 7167d47631670de28a44289a38a4ec723b9458f1 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 11 Oct 2016 12:50:44 -0500 Subject: [PATCH 21/29] bug fixes --- lbrynet/analytics/manager.py | 22 +++++++++++++++++----- lbrynet/analytics/track.py | 9 ++++++++- lbrynet/lbrynet_daemon/Daemon.py | 22 +++++++++++++++++++--- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 57d5bea1e..64c9fb917 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -1,5 +1,6 @@ from lbrynet.core import looping_call_manager +from twisted.internet import defer from twisted.internet import task import constants @@ -43,13 +44,24 @@ class Manager(object): self.analytics_api.track(heartbeat) def _update_tracked_metrics(self): - value = self.track.summarize(constants.BLOB_BYTES_UPLOADED) - if value > 0: - event = self.events_generator.metric_observered(constants.BLOB_BYTES_UPLOADED, value) + should_send, value = self.track.summarize(constants.BLOB_BYTES_UPLOADED) + if should_send: + event = self.events_generator.metric_observed(constants.BLOB_BYTES_UPLOADED, value) self.analytics_api.track(event) def _send_repeating_metric(self, event_name, value_generator): - should_send, value = value_generator() + result = value_generator() + if_deferred(result, self._send_repeating_metric_value, event_name) + + def _send_repeating_metric_value(self, result, event_name): + should_send, value = result if should_send: - event = self.events_generator.metric_observered(event_name, value) + event = self.events_generator.metric_observed(event_name, value) self.analytics_api.track(event) + + +def if_deferred(maybe_deferred, callback, *args, **kwargs): + if isinstance(maybe_deferred, defer.Deferred): + maybe_deferred.addCallback(callback, *args, **kwargs) + else: + callback(mabye_deferred, *args, **kwargs) diff --git a/lbrynet/analytics/track.py b/lbrynet/analytics/track.py index c14952bf8..35dc27160 100644 --- a/lbrynet/analytics/track.py +++ b/lbrynet/analytics/track.py @@ -13,5 +13,12 @@ class Track(object): """Apply `op` on the current values for `metric`. This operation also resets the metric. + + Returns: + a tuple (should_send, value) """ - return op(self.data.pop(metric, [])) + try: + values = self.data.pop(metric) + return True, op(values) + except KeyError: + return False, None diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 87afe9c8a..8d67f9ae4 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -185,8 +185,24 @@ class CheckRemoteVersions(object): return defer.fail(None) +class AlwaysSend(object): + def __init__(self, value_generator, *args, **kwargs): + self.value_generator = value_generator + self.args = args + self.kwargs = kwargs + + def __call__(self): + d = defer.maybeDeferred(self.value_generator, *self.args, **self.kwargs) + d.addCallback(lambda v: (True, v)) + return d + + def calculate_available_blob_size(blob_manager): - return sum(blob.length for blob in blob_manager.get_all_verified_blobs()) + d = blob_manager.get_all_verified_blobs() + d.addCallback( + lambda blobs: defer.DeferredList([blob_manager.get_blob_length(b) for b in blobs])) + d.addCallback(lambda blob_lengths: sum(val for success, val in blob_lengths if success)) + return d class Daemon(jsonrpc.JSONRPC): @@ -1021,9 +1037,9 @@ class Daemon(jsonrpc.JSONRPC): self.analytics_manager = analytics.Manager( analytics_api, events_generator, analytics.Track()) self.analytics_manager.start() - self.register_repeating_metric( + self.analytics_manager.register_repeating_metric( analytics.BLOB_BYTES_AVAILABLE, - lambda: calculate_available_blob_size(self.session.blob_manager), + AlwaysSend(calculate_available_blob_size, self.session.blob_manager), frequency=300 ) From fbf7928412daee4068b035b650e0e2a425808dd7 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 11 Oct 2016 15:11:59 -0500 Subject: [PATCH 22/29] rename events to be seperate for each metric --- lbrynet/analytics/constants.py | 4 ++-- lbrynet/analytics/events.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lbrynet/analytics/constants.py b/lbrynet/analytics/constants.py index dcc7b92b1..280b51a72 100644 --- a/lbrynet/analytics/constants.py +++ b/lbrynet/analytics/constants.py @@ -1,4 +1,4 @@ """Constants for metrics""" -BLOB_BYTES_UPLOADED = 'blob_bytes_uploaded' -BLOB_BYTES_AVAILABLE = 'blob_bytes_available' +BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded' +BLOB_BYTES_AVAILABLE = 'Blob Bytes Available' diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py index 1a72fbaaa..3b2e879be 100644 --- a/lbrynet/analytics/events.py +++ b/lbrynet/analytics/events.py @@ -34,10 +34,9 @@ class Events(object): def metric_observed(self, metric_name, value): properties = { - 'metric_name': metric_name, - 'metric_value': value, + 'value': value, } - return self._event('Metric Observed', properties) + return self._event(metric_name, properties) def _event(self, event, event_properties=None): return { From 813267ae18fec3d80a5d96041addb6d041961f39 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 11 Oct 2016 15:12:10 -0500 Subject: [PATCH 23/29] fix tests --- tests/unit/analytics/test_track.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/unit/analytics/test_track.py b/tests/unit/analytics/test_track.py index e12c3d7da..fcfbe0346 100644 --- a/tests/unit/analytics/test_track.py +++ b/tests/unit/analytics/test_track.py @@ -4,17 +4,17 @@ from twisted.trial import unittest class TrackTest(unittest.TestCase): - def test_empty_summarize_is_zero(self): + def test_empty_summarize_is_None(self): track = analytics.Track() - result = track.summarize('a') - self.assertEqual(0, result) + _, result = track.summarize('a') + self.assertEqual(None, result) def test_can_get_sum_of_metric(self): track = analytics.Track() track.add_observation('b', 1) track.add_observation('b', 2) - result = track.summarize('b') + _, result = track.summarize('b') self.assertEqual(3, result) def test_summarize_resets_metric(self): @@ -23,5 +23,5 @@ class TrackTest(unittest.TestCase): track.add_observation('metric', 2) track.summarize('metric') - result = track.summarize('metric') - self.assertEqual(0, result) + _, result = track.summarize('metric') + self.assertEqual(None, result) From aa291ca79a7094495f42297920f1e8026f5d6572 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 11 Oct 2016 15:14:35 -0500 Subject: [PATCH 24/29] fix pylint --- lbrynet/analytics/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 64c9fb917..455342946 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -64,4 +64,4 @@ def if_deferred(maybe_deferred, callback, *args, **kwargs): if isinstance(maybe_deferred, defer.Deferred): maybe_deferred.addCallback(callback, *args, **kwargs) else: - callback(mabye_deferred, *args, **kwargs) + callback(maybe_deferred, *args, **kwargs) From 7f800ce5bb8ac12ff34db3a55c2e7ff0b81f79e2 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 18 Oct 2016 09:40:13 -0500 Subject: [PATCH 25/29] fix rebase error --- lbrynet/lbrynet_daemon/Daemon.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 8d67f9ae4..853170a4a 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -29,8 +29,6 @@ from lbrynet import __version__ as lbrynet_version from lbryum.version import LBRYUM_VERSION as lbryum_version from lbrynet import analytics from lbrynet.core.looping_call_manager import LoopingCallManager -from lbrynet.core.PaymentRateManager import PaymentRateManager -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError @@ -782,8 +780,12 @@ class Daemon(jsonrpc.JSONRPC): def _setup_query_handlers(self): handlers = [ - BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, - self.session.payment_rate_manager), + BlobRequestHandlerFactory( + self.session.blob_manager, + self.session.wallet, + self.session.payment_rate_manager, + self.analytics_manager.track + ), self.session.wallet.get_wallet_info_query_handler_factory(), ] From 229ed0d7dc094c417ce755be84af1d97b36d2757 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 18 Oct 2016 16:49:31 -0500 Subject: [PATCH 26/29] rename summarize to summarize_and_reset --- lbrynet/analytics/manager.py | 2 +- lbrynet/analytics/track.py | 2 +- tests/unit/analytics/test_track.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 455342946..fecc1a0d2 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -44,7 +44,7 @@ class Manager(object): self.analytics_api.track(heartbeat) def _update_tracked_metrics(self): - should_send, value = self.track.summarize(constants.BLOB_BYTES_UPLOADED) + should_send, value = self.track.summarize_and_reset(constants.BLOB_BYTES_UPLOADED) if should_send: event = self.events_generator.metric_observed(constants.BLOB_BYTES_UPLOADED, value) self.analytics_api.track(event) diff --git a/lbrynet/analytics/track.py b/lbrynet/analytics/track.py index 35dc27160..7643ebce9 100644 --- a/lbrynet/analytics/track.py +++ b/lbrynet/analytics/track.py @@ -9,7 +9,7 @@ class Track(object): def add_observation(self, metric, value): self.data[metric].append(value) - def summarize(self, metric, op=sum): + def summarize_and_reset(self, metric, op=sum): """Apply `op` on the current values for `metric`. This operation also resets the metric. diff --git a/tests/unit/analytics/test_track.py b/tests/unit/analytics/test_track.py index fcfbe0346..531ec56a5 100644 --- a/tests/unit/analytics/test_track.py +++ b/tests/unit/analytics/test_track.py @@ -6,7 +6,7 @@ from twisted.trial import unittest class TrackTest(unittest.TestCase): def test_empty_summarize_is_None(self): track = analytics.Track() - _, result = track.summarize('a') + _, result = track.summarize_and_reset('a') self.assertEqual(None, result) def test_can_get_sum_of_metric(self): @@ -14,7 +14,7 @@ class TrackTest(unittest.TestCase): track.add_observation('b', 1) track.add_observation('b', 2) - _, result = track.summarize('b') + _, result = track.summarize_and_reset('b') self.assertEqual(3, result) def test_summarize_resets_metric(self): @@ -22,6 +22,6 @@ class TrackTest(unittest.TestCase): track.add_observation('metric', 1) track.add_observation('metric', 2) - track.summarize('metric') - _, result = track.summarize('metric') + track.summarize_and_reset('metric') + _, result = track.summarize_and_reset('metric') self.assertEqual(None, result) From 01811621a6c47de5fbd141c9737fc92a5fc32f3e Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 18 Oct 2016 17:25:16 -0500 Subject: [PATCH 27/29] shorten is_valid_blobhash logic --- lbrynet/core/utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 46f39f7dc..3efc1314d 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -49,14 +49,14 @@ def is_valid_hashcharacter(char): def is_valid_blobhash(blobhash): - """ + """Checks whether the blobhash is the correct length and contains only + valid characters (0-9, a-f) + @param blobhash: string, the blobhash to check - @return: Whether the blobhash is the correct length and contains only valid characters (0-9, a-f) + @return: True/False """ - if len(blobhash) != blobhash_length: - return False - return all(is_valid_hashcharacter(l) for l in blobhash) + return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash) def version_is_greater_than(a, b): From 0c2265a8dac6793366a26ab97a8bd26c0255ada4 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Tue, 18 Oct 2016 18:09:35 -0500 Subject: [PATCH 28/29] replace some strings --- lbrynet/core/looping_call_manager.py | 6 +-- lbrynet/lbrynet_daemon/Daemon.py | 58 ++++++++++++++++++---------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/lbrynet/core/looping_call_manager.py b/lbrynet/core/looping_call_manager.py index ac8986d07..7dbc9e022 100644 --- a/lbrynet/core/looping_call_manager.py +++ b/lbrynet/core/looping_call_manager.py @@ -1,8 +1,6 @@ - - class LoopingCallManager(object): - def __init__(self): - self.calls = {} + def __init__(self, calls=None): + self.calls = calls or {} def register_looping_call(self, name, call): assert name not in self.calls, '{} is already registered'.format(name) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 853170a4a..b6abf790d 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -127,6 +127,22 @@ BAD_REQUEST = 400 NOT_FOUND = 404 OK_CODE = 200 + +class Checker: + """The looping calls the daemon runs""" + INTERNET_CONNECTION = 'internet_connection_checker' + VERSION = 'version_checker' + CONNECTION_PROBLEM = 'connection_problem_checker' + PENDING_CLAIM = 'pending_claim_checker' + + +class FileID: + """The different ways a file can be identified""" + NAME = 'name' + SD_HASH = 'sd_hash' + FILE_NAME = 'file_name' + + # TODO add login credentials in a conf file # TODO alert if your copy of a lbry file is out of date with the name record @@ -195,6 +211,8 @@ class AlwaysSend(object): return d + + def calculate_available_blob_size(blob_manager): d = blob_manager.get_all_verified_blobs() d.addCallback( @@ -410,15 +428,13 @@ class Daemon(jsonrpc.JSONRPC): self.wallet_user = None self.wallet_password = None - self.looping_call_manager = LoopingCallManager() - looping_calls = [ - ('internet_connection_checker', CheckInternetConnection(self)), - ('version_checker', CheckRemoteVersions(self)), - ('connection_problem_checker', self._check_connection_problems), - ('pending_claim_checker', self._check_pending_claims), - ] - for name, fn in looping_calls: - self.looping_call_manager.register_looping_call(name, LoopingCall(fn)) + calls = { + Checker.INTERNET_CONNECTION: LoopingCall(CheckInternetConnection(self)), + Checker.VERSION: LoopingCall(CheckRemoteVersions(self)), + Checker.CONNECTION_PROBLEM: LoopingCall(self._check_connection_problems), + Checker.PENDING_CLAIM: LoopingCall(self._check_pending_claims), + } + self.looping_call_manager = LoopingCallManager(calls) self.sd_identifier = StreamDescriptorIdentifier() self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -590,9 +606,9 @@ class Daemon(jsonrpc.JSONRPC): log.info("Starting lbrynet-daemon") - self.looping_call_manager.start('internet_connection_checker', 3600) - self.looping_call_manager.start('version_checker', 3600 * 12) - self.looping_call_manager.start('connection_problem_checker', 1) + self.looping_call_manager.start(Checker.INTERNET_CONNECTION, 3600) + self.looping_call_manager.start(Checker.VERSION, 3600 * 12) + self.looping_call_manager.start(Checker.CONNECTION_PROBLEM, 1) self.exchange_rate_manager.start() if host_ui: @@ -686,7 +702,7 @@ class Daemon(jsonrpc.JSONRPC): def _get_and_start_file(name): d = defer.succeed(self.pending_claims.pop(name)) - d.addCallback(lambda _: self._get_lbry_file("name", name, return_json=False)) + d.addCallback(lambda _: self._get_lbry_file(FileID.NAME, name, return_json=False)) d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running") def re_add_to_pending_claims(name): @@ -1275,7 +1291,7 @@ class Daemon(jsonrpc.JSONRPC): return defer.fail(Exception("no lbry file given to reflect")) stream_hash = lbry_file.stream_hash - + if stream_hash is None: return defer.fail(Exception("no stream hash")) @@ -1939,7 +1955,7 @@ class Daemon(jsonrpc.JSONRPC): if not os.path.isfile(file_path): return defer.fail(Exception("Specified file for publish doesnt exist: %s" % file_path)) - self.looping_call_manager.start('pending_claim_checker', 30) + self.looping_call_manager.start(Checker.PENDING_CLAIM, 30) d = self._resolve_name(name, force_refresh=True) d.addErrback(lambda _: None) @@ -2784,11 +2800,11 @@ class _GetFileHelper(object): return d def search_for_file(self): - if self.search_by == "name": + if self.search_by == FileID.NAME: return self.daemon._get_lbry_file_by_uri(self.val) - elif self.search_by == "sd_hash": + elif self.search_by == FileID.SD_HASH: return self.daemon._get_lbry_file_by_sd_hash(self.val) - elif self.search_by == "file_name": + elif self.search_by == FileID.FILE_NAME: return self.daemon._get_lbry_file_by_file_name(self.val) raise Exception('{} is not a valid search operation'.format(self.search_by)) @@ -2814,7 +2830,7 @@ class _GetFileHelper(object): d = defer.succeed( self._get_properties_dict(lbry_file, code, message, written_bytes, size)) return d - + def _get_msg_for_file_status(self, file_status): message = STREAM_STAGES[2][1] % ( file_status.name, file_status.num_completed, file_status.num_known, @@ -2828,7 +2844,7 @@ class _GetFileHelper(object): return os.path.join(lbry_file.download_directory, lbry_file.file_name) def _get_status(self, lbry_file): - if self.search_by == "name": + if self.search_by == FileID.NAME: if self.val in self.daemon.streams.keys(): status = self.daemon.streams[self.val].code elif lbry_file in self.daemon.lbry_file_manager.lbry_files: @@ -2848,7 +2864,7 @@ class _GetFileHelper(object): else: written_bytes = False return written_bytes - + def _get_properties_dict(self, lbry_file, code, message, written_bytes, size): key = self._get_key(lbry_file) full_path = self._full_path(lbry_file) From a5797cdeb31fa1d8840c6b68443b648a5999815e Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 20 Oct 2016 12:52:37 -0700 Subject: [PATCH 29/29] Better handling of lbry file search - replace lbry file search strings with constants - refactor repeated code for deciding what search type to use --- lbrynet/lbrynet_daemon/Daemon.py | 123 +++++++++++++++++----------- lbrynet/lbrynet_daemon/DaemonCLI.py | 14 ++-- 2 files changed, 85 insertions(+), 52 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index b6abf790d..a9fb10f0f 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -147,6 +147,10 @@ class FileID: # TODO alert if your copy of a lbry file is out of date with the name record +class NoValidSearch(Exception): + pass + + class Parameters(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) @@ -1283,7 +1287,10 @@ class Daemon(jsonrpc.JSONRPC): return _GetFileHelper(self, search_by, val, return_json).retrieve_file() def _get_lbry_files(self): - d = defer.DeferredList([self._get_lbry_file('sd_hash', l.sd_hash) for l in self.lbry_file_manager.lbry_files]) + d = defer.DeferredList([ + self._get_lbry_file(FileID.SD_HASH, l.sd_hash) + for l in self.lbry_file_manager.lbry_files + ]) return d def _reflect(self, lbry_file): @@ -1633,8 +1640,7 @@ class Daemon(jsonrpc.JSONRPC): return d def jsonrpc_get_lbry_file(self, p): - """ - Get lbry file + """Get lbry file Args: 'name': get file by lbry uri, @@ -1652,15 +1658,18 @@ class Daemon(jsonrpc.JSONRPC): 'upload_allowed': bool 'sd_hash': string """ - - if p.keys()[0] in ['name', 'sd_hash', 'file_name']: - search_type = p.keys()[0] - d = self._get_lbry_file(search_type, p[search_type]) - else: - d = defer.fail() + d = self._get_deferred_for_lbry_file(p) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d + def _get_deferred_for_lbry_file(self, p): + try: + searchtype, value = get_lbry_file_search_value(p) + except NoValidSearch: + return defer.fail() + else: + return self._get_lbry_file(searchtype, value) + def jsonrpc_resolve_name(self, p): """ Resolve stream info from a LBRY uri @@ -1673,9 +1682,8 @@ class Daemon(jsonrpc.JSONRPC): force = p.get('force', False) - if 'name' in p: - name = p['name'] - else: + name = p.get(FileID.NAME) + if not name: return self._render_response(None, BAD_REQUEST) d = self._resolve_name(name, force_refresh=force) @@ -1692,7 +1700,7 @@ class Daemon(jsonrpc.JSONRPC): claim info, False if no such claim exists """ - name = p['name'] + name = p[FileID.NAME] d = self.session.wallet.get_my_claim(name) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1714,7 +1722,7 @@ class Daemon(jsonrpc.JSONRPC): r['amount'] = float(r['amount']) / 10**8 return r - name = p['name'] + name = p[FileID.NAME] txid = p.get('txid', None) d = self.session.wallet.get_claim_info(name, txid) d.addCallback(_convert_amount_to_float) @@ -1727,11 +1735,11 @@ class Daemon(jsonrpc.JSONRPC): # can spec what parameters it expects and how to set default values timeout = p.get('timeout', self.download_timeout) download_directory = p.get('download_directory', self.download_directory) - file_name = p.get('file_name') + file_name = p.get(FileID.FILE_NAME) stream_info = p.get('stream_info') sd_hash = get_sd_hash(stream_info) wait_for_write = p.get('wait_for_write', True) - name = p.get('name') + name = p.get(FileID.NAME) return Parameters( timeout=timeout, download_directory=download_directory, @@ -1785,14 +1793,20 @@ class Daemon(jsonrpc.JSONRPC): """ def _stop_file(f): - d = self.lbry_file_manager.toggle_lbry_file_running(f) - d.addCallback(lambda _: "Stopped LBRY file") - return d + if f.stopped: + return "LBRY file wasn't running" + else: + d = self.lbry_file_manager.toggle_lbry_file_running(f) + d.addCallback(lambda _: "Stopped LBRY file") + return d - if p.keys()[0] in ['name', 'sd_hash', 'file_name']: - search_type = p.keys()[0] - d = self._get_lbry_file(search_type, p[search_type], return_json=False) - d.addCallback(lambda l: _stop_file(l) if not l.stopped else "LBRY file wasn't running") + try: + searchtype, value = get_lbry_file_search_value(p) + except NoValidSearch: + d = defer.fail() + else: + d = self._get_lbry_file(searchtype, value, return_json=False) + d.addCallback(_stop_file) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1810,13 +1824,19 @@ class Daemon(jsonrpc.JSONRPC): """ def _start_file(f): - d = self.lbry_file_manager.toggle_lbry_file_running(f) - return defer.succeed("Started LBRY file") + if f.stopped: + d = self.lbry_file_manager.toggle_lbry_file_running(f) + return defer.succeed("Started LBRY file") + else: + return "LBRY file was already running" - if p.keys()[0] in ['name', 'sd_hash', 'file_name']: - search_type = p.keys()[0] - d = self._get_lbry_file(search_type, p[search_type], return_json=False) - d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running") + try: + searchtype, value = get_lbry_file_search_value(p) + except NoValidSearch: + d = defer.fail() + else: + d = self._get_lbry_file(searchtype, value, return_json=False) + d.addCallback(_start_file) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1831,7 +1851,7 @@ class Daemon(jsonrpc.JSONRPC): estimated cost """ - name = p['name'] + name = p[FileID.NAME] d = self._get_est_cost(name) d.addCallback(lambda r: self._render_response(r, OK_CODE)) @@ -1883,21 +1903,23 @@ class Daemon(jsonrpc.JSONRPC): confirmation message """ - if 'delete_target_file' in p.keys(): - delete_file = p['delete_target_file'] - else: - delete_file = True + delete_file = p.get('delete_target_file', True) def _delete_file(f): + if not f: + return False file_name = f.file_name d = self._delete_lbry_file(f, delete_file=delete_file) d.addCallback(lambda _: "Deleted LBRY file" + file_name) return d - if 'name' in p.keys() or 'sd_hash' in p.keys() or 'file_name' in p.keys(): - search_type = [k for k in p.keys() if k != 'delete_target_file'][0] - d = self._get_lbry_file(search_type, p[search_type], return_json=False) - d.addCallback(lambda l: _delete_file(l) if l else False) + try: + searchtype, value = get_lbry_file_search_value(p) + except NoValidSearch: + d = defer.fail() + else: + d = self._get_lbry_file(searchtype, value, return_json=False) + d.addCallback(_delete_file) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1922,12 +1944,12 @@ class Daemon(jsonrpc.JSONRPC): return m def _reflect_if_possible(sd_hash, txid): - d = self._get_lbry_file('sd_hash', sd_hash, return_json=False) + d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) d.addCallback(self._reflect) d.addCallback(lambda _: txid) return d - name = p['name'] + name = p[FileID.NAME] log.info("Publish: ") log.info(p) @@ -2033,7 +2055,7 @@ class Daemon(jsonrpc.JSONRPC): txid """ - name = p['name'] + name = p[FileID.NAME] claim_id = p['claim_id'] amount = p['amount'] d = self.session.wallet.support_claim(name, claim_id, amount) @@ -2073,7 +2095,7 @@ class Daemon(jsonrpc.JSONRPC): list of name claims """ - name = p['name'] + name = p[FileID.NAME] d = self.session.wallet.get_claims_for_name(name) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -2270,7 +2292,7 @@ class Daemon(jsonrpc.JSONRPC): Returns sd blob, dict """ - sd_hash = p['sd_hash'] + sd_hash = p[FileID.SD_HASH] timeout = p.get('timeout', DEFAULT_SD_DOWNLOAD_TIMEOUT) d = self._download_sd_blob(sd_hash, timeout) @@ -2464,8 +2486,8 @@ class Daemon(jsonrpc.JSONRPC): True or traceback """ - sd_hash = p['sd_hash'] - d = self._get_lbry_file('sd_hash', sd_hash, return_json=False) + sd_hash = p[FileID.SD_HASH] + d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) d.addCallback(self._reflect) d.addCallbacks(lambda _: self._render_response(True, OK_CODE), lambda err: self._render_response(err.getTraceback(), OK_CODE)) return d @@ -2546,7 +2568,7 @@ class Daemon(jsonrpc.JSONRPC): else: return 0.0 - name = p['name'] + name = p[FileID.NAME] d = self._resolve_name(name, force_refresh=True) d.addCallback(get_sd_hash) @@ -2567,7 +2589,7 @@ def get_lbryum_version_from_github(): version = version.replace("'", "") return version - + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" response = requests.get('https://api.github.com/repos/lbryio/lbry/releases/latest') @@ -2904,3 +2926,10 @@ class _GetFileHelper(object): d = defer.succeed(message) return d + +def get_lbry_file_search_value(p): + for searchtype in (FileID.SD_HASH, FileID.NAME, FileID.FILE_NAME): + value = p.get(searchtype) + if value: + return searchtype, value + raise NoValidSearch() diff --git a/lbrynet/lbrynet_daemon/DaemonCLI.py b/lbrynet/lbrynet_daemon/DaemonCLI.py index ea4f2234d..8292e349f 100644 --- a/lbrynet/lbrynet_daemon/DaemonCLI.py +++ b/lbrynet/lbrynet_daemon/DaemonCLI.py @@ -72,12 +72,16 @@ def main(): if meth in api.help(): try: if params: - r = api.call(meth, params) + resp = api.call(meth, params) else: - r = api.call(meth) - print json.dumps(r, sort_keys=True) - except: - print "Something went wrong, here's the usage for %s:" % meth + resp = api.call(meth) + print json.dumps(resp, sort_keys=True) + except Exception: + # TODO: The api should return proper error codes + # and messages so that they can be passed along to the user + # instead of this generic message. + # https://app.asana.com/0/158602294500137/200173944358192 + print "Something went wrong. Here's the usage for {}:".format(meth) print api.help({'function': meth}) else: print "Unknown function"