From ccfb1f392180d922e2c9ce1415d008f2e5976a8d Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Dec 2016 21:48:12 -0600 Subject: [PATCH 01/11] refactor: name private/public variables --- lbrynet/lbrynet_daemon/Daemon.py | 25 +++++++++++-------------- lbrynet/lbrynet_daemon/Downloader.py | 20 ++++++++++---------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 0bbf63b4e..5eac402a7 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -798,10 +798,10 @@ class Daemon(AuthJSONRPCServer): d = self._resolve_name(name) else: d = defer.succeed(stream_info) - d.addCallback(helper._setup_stream) + d.addCallback(helper.setup_stream) d.addCallback(helper.wait_or_get_stream) if not stream_info: - d.addCallback(helper._remove_from_wait) + d.addCallback(helper.remove_from_wait) return d def add_stream(self, name, timeout, download_directory, file_name, stream_info): @@ -2342,7 +2342,7 @@ class _DownloadNameHelper(object): self.file_name = file_name self.wait_for_write = wait_for_write - def _setup_stream(self, stream_info): + def setup_stream(self, stream_info): stream_hash = get_sd_hash(stream_info) d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) d.addCallback(self._prepend_stream_info, stream_info) @@ -2383,7 +2383,7 @@ class _DownloadNameHelper(object): if stream: return stream.downloader else: - self._remove_from_wait("Timed out") + self.remove_from_wait("Timed out") return defer.fail(Exception("Timed out")) d.addCallback(lambda _: _get_stream_for_return()) @@ -2391,27 +2391,24 @@ class _DownloadNameHelper(object): def _wait_for_write(self): d = defer.succeed(None) - if not self.has_downloader_wrote(): + if not self._has_downloader_wrote(): d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) return d - def has_downloader_wrote(self): + def _has_downloader_wrote(self): stream = self.daemon.streams.get(self.name, False) if stream: - downloader = stream.downloader + return self._get_written_bytes(stream.downloader.file_name) else: - downloader = False - if not downloader: return False - return self.get_written_bytes(downloader.file_name) def _wait_on_lbry_file(self, f): - written_bytes = self.get_written_bytes(f.file_name) + written_bytes = self._get_written_bytes(f.file_name) if written_bytes: return defer.succeed(self._disp_file(f)) return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) - def get_written_bytes(self, file_name): + def _get_written_bytes(self, file_name): """Returns the number of bytes written to `file_name`. Returns False if there were issues reading `file_name`. @@ -2434,10 +2431,10 @@ class _DownloadNameHelper(object): log.info("Already downloaded: %s --> %s", f.sd_hash, file_path) return f - def _remove_from_wait(self, r): + def remove_from_wait(self, reason): if self.name in self.daemon.waiting_on: del self.daemon.waiting_on[self.name] - return r + return reason class _ResolveNameHelper(object): diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index e4ec76a3a..3f59e175e 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -50,7 +50,7 @@ class GetStream(object): self.max_key_fee = max_key_fee self.stream_info = None self.stream_info_manager = None - self.d = defer.Deferred(None) + self._d = defer.Deferred(None) self.timeout = timeout self.timeout_counter = 0 self.download_directory = download_directory @@ -71,7 +71,7 @@ class GetStream(object): elif self.timeout_counter >= self.timeout: log.info("Timeout downloading lbry://%s" % self.resolved_name) self.checker.stop() - self.d.cancel() + self._d.cancel() self.code = STREAM_STAGES[4] self.finished.callback((False, None, None)) @@ -128,15 +128,15 @@ class GetStream(object): self.checker.start(1) - self.d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) - self.d.addCallback(lambda _: download_sd_blob( + self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) + self._d.addCallback(lambda _: download_sd_blob( self.session, self.stream_hash, self.payment_rate_manager)) - self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) - self.d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) - self.d.addCallback(get_downloader_factory) - self.d.addCallback(make_downloader) - self.d.addCallbacks(self._start_download, _cause_timeout) - self.d.callback(None) + self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) + self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) + self._d.addCallback(get_downloader_factory) + self._d.addCallback(make_downloader) + self._d.addCallbacks(self._start_download, _cause_timeout) + self._d.callback(None) return self.finished From 689ac431d331ba8a9f54be1fdd8ea9c175c4ddd2 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Thu, 29 Dec 2016 22:37:47 -0600 Subject: [PATCH 02/11] bug fix in log.fail() The value of the callback needs to be returned. In particular, this allows the error to be passed along. --- lbrynet/core/log_support.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index 0c228e8a6..72f96da14 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -369,7 +369,7 @@ class Logger(logging.Logger): self.name, level, fn, lno, msg, msg_args, exc_info, func, msg_kwargs) self.handle(record) if callback: - callback(err, *args, **kwargs) + return callback(err, *args, **kwargs) return _fail def trace(self, msg, *args, **kwargs): From 01cc4f28e01c3ac8793009614484e85c803955e5 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 00:12:20 -0600 Subject: [PATCH 03/11] Refactor jsonrpc_get Convert some of the calls to inlineCallbacks, which allowed the code to be cleaned up and made more clear --- .../lbryfile/EncryptedFileMetadataManager.py | 2 +- lbrynet/lbrynet_daemon/Daemon.py | 174 +++++++++--------- lbrynet/lbrynet_daemon/Downloader.py | 28 +-- 3 files changed, 102 insertions(+), 102 deletions(-) diff --git a/lbrynet/lbryfile/EncryptedFileMetadataManager.py b/lbrynet/lbryfile/EncryptedFileMetadataManager.py index bae6291b6..e9b2e7335 100644 --- a/lbrynet/lbryfile/EncryptedFileMetadataManager.py +++ b/lbrynet/lbryfile/EncryptedFileMetadataManager.py @@ -49,7 +49,7 @@ class DBEncryptedFileMetadataManager(object): def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): - log.debug("Getting blobs for a stream. Count is %s", str(count)) + log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count) def get_positions_of_start_and_end(): if start_blob is not None: diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 5eac402a7..295494087 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -70,6 +70,7 @@ STARTUP_STAGES = [ (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits...') ] +# TODO: make this consistent with the stages in Downloader.py DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' @@ -779,9 +780,9 @@ class Daemon(AuthJSONRPCServer): d.addCallback(BlobStreamDescriptorReader) d.addCallback(lambda blob: blob.get_info()) d.addCallback(cb) - return r + @defer.inlineCallbacks def _download_name(self, name, timeout=None, download_directory=None, file_name=None, stream_info=None, wait_for_write=True): """ @@ -795,14 +796,11 @@ class Daemon(AuthJSONRPCServer): if not stream_info: self.waiting_on[name] = True - d = self._resolve_name(name) - else: - d = defer.succeed(stream_info) - d.addCallback(helper.setup_stream) - d.addCallback(helper.wait_or_get_stream) - if not stream_info: - d.addCallback(helper.remove_from_wait) - return d + stream_info = yield self._resolve_name(name) + del self.waiting_on[name] + lbry_file = yield helper.setup_stream(stream_info) + sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) + defer.returnValue((sd_hash, file_path)) def add_stream(self, name, timeout, download_directory, file_name, stream_info): """Makes, adds and starts a stream""" @@ -1400,8 +1398,6 @@ class Daemon(AuthJSONRPCServer): return self._render_response(None, BAD_REQUEST) d = self._resolve_name(name, force_refresh=force) - # TODO: this is the rpc call that returns a server.failure. - # what is up with that? d.addCallbacks( lambda info: self._render_response(info, OK_CODE), # TODO: Is server.failure a module? It looks like it: @@ -1484,6 +1480,7 @@ class Daemon(AuthJSONRPCServer): ) @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks def jsonrpc_get(self, p): """Download stream from a LBRY uri. @@ -1493,28 +1490,40 @@ class Daemon(AuthJSONRPCServer): 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name 'timeout': optional - 'wait_for_write': optional, defaults to True + 'wait_for_write': optional, defaults to True. When set, waits for the file to + only start to be written before returning any results. Returns: 'stream_hash': hex string 'path': path of download """ params = self._process_get_parameters(p) if not params.name: - return server.failure + # TODO: return a useful error message here, like "name argument is required" + defer.returnValue(server.failure) if params.name in self.waiting_on: - return server.failure - d = self._download_name(name=params.name, - timeout=params.timeout, - download_directory=params.download_directory, - stream_info=params.stream_info, - file_name=params.file_name, - wait_for_write=params.wait_for_write) - # TODO: downloading can timeout. Not sure what to do when that happens - d.addCallbacks( - get_output_callback(params), - lambda err: str(err)) - d.addCallback(lambda message: self._render_response(message, OK_CODE)) - return d + # TODO: return a useful error message here, like "already + # waiting for name to be resolved" + defer.returnValue(server.failure) + try: + sd_hash, file_path = yield self._download_name( + name=params.name, + timeout=params.timeout, + download_directory=params.download_directory, + stream_info=params.stream_info, + file_name=params.file_name, + wait_for_write=params.wait_for_write + ) + except Exception as e: + log.exception('Failed to get %s', params.name) + response = yield self._render_response(str(e), OK_CODE) + else: + # TODO: should stream_hash key be changed to sd_hash? + message = { + 'stream_hash': params.sd_hash if params.stream_info else sd_hash, + 'path': file_path + } + response = yield self._render_response(message, OK_CODE) + defer.returnValue(response) @AuthJSONRPCServer.auth_required def jsonrpc_stop_lbry_file(self, p): @@ -1722,6 +1731,7 @@ class Daemon(AuthJSONRPCServer): txid = p['txid'] nout = p['nout'] else: + # TODO: return a useful error message return server.failure def _disp(x): @@ -1916,6 +1926,7 @@ class Daemon(AuthJSONRPCServer): amount = p['amount'] address = p['address'] else: + # TODO: return a useful error message return server.failure reserved_points = self.session.wallet.reserve_points(address, amount) @@ -1957,6 +1968,7 @@ class Daemon(AuthJSONRPCServer): d = self.session.wallet.get_block_info(height) d.addCallback(lambda blockhash: self.session.wallet.get_block(blockhash)) else: + # TODO: return a useful error message return server.failure d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1974,6 +1986,7 @@ class Daemon(AuthJSONRPCServer): if 'txid' in p.keys(): txid = p['txid'] else: + # TODO: return a useful error message return server.failure d = self.session.wallet.get_claims_from_tx(txid) @@ -2318,15 +2331,6 @@ def get_sd_hash(stream_info): return stream_info.get('stream_hash') -def get_output_callback(params): - def callback(l): - return { - 'stream_hash': params.sd_hash if params.stream_info else l.sd_hash, - 'path': os.path.join(params.download_directory, l.file_name) - } - return callback - - class _DownloadNameHelper(object): def __init__(self, daemon, name, timeout=None, @@ -2342,52 +2346,54 @@ class _DownloadNameHelper(object): self.file_name = file_name self.wait_for_write = wait_for_write + @defer.inlineCallbacks def setup_stream(self, stream_info): - stream_hash = get_sd_hash(stream_info) - d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) - d.addCallback(self._prepend_stream_info, stream_info) - return d + sd_hash = get_sd_hash(stream_info) + lbry_file = yield self.daemon._get_lbry_file_by_sd_hash(sd_hash) + if self._does_lbry_file_exists(lbry_file): + defer.returnValue(lbry_file) + else: + defer.returnValue(None) - def _prepend_stream_info(self, lbry_file, stream_info): - if lbry_file: - if os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)): - return defer.succeed((stream_info, lbry_file)) - return defer.succeed((stream_info, None)) + def _does_lbry_file_exists(self, lbry_file): + return lbry_file and os.path.isfile(self._full_path(lbry_file)) - def wait_or_get_stream(self, args): - stream_info, lbry_file = args + def _full_path(self, lbry_file): + return os.path.join(self.download_directory, lbry_file.file_name) + + @defer.inlineCallbacks + def wait_or_get_stream(self, stream_info, lbry_file): if lbry_file: log.debug('Wait on lbry_file') - return self._wait_on_lbry_file(lbry_file) + # returns the lbry_file + yield self._wait_on_lbry_file(lbry_file) + defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file))) else: log.debug('No lbry_file, need to get stream') - return self._get_stream(stream_info) + # returns an instance of ManagedEncryptedFileDownloaderFactory + sd_hash, file_path = yield self._get_stream(stream_info) + defer.returnValue((sd_hash, file_path)) + def _wait_on_lbry_file(self, f): + file_path = self._full_path(f) + written_bytes = self._get_written_bytes(file_path) + if written_bytes: + log.info("File has bytes: %s --> %s", f.sd_hash, file_path) + return defer.succeed(True) + return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) + + @defer.inlineCallbacks def _get_stream(self, stream_info): - d = self.daemon.add_stream( + was_successful, sd_hash, download_path = yield self.daemon.add_stream( self.name, self.timeout, self.download_directory, self.file_name, stream_info) - - def _handle_timeout(args): - was_successful, _, _ = args - if not was_successful: - log.warning("lbry://%s timed out, removing from streams", self.name) - del self.daemon.streams[self.name] - - d.addCallback(_handle_timeout) - + if not was_successful: + log.warning("lbry://%s timed out, removing from streams", self.name) + del self.daemon.streams[self.name] + self.remove_from_wait("Timed out") + raise Exception("Timed out") if self.wait_for_write: - d.addCallback(lambda _: self._wait_for_write()) - - def _get_stream_for_return(): - stream = self.daemon.streams.get(self.name, None) - if stream: - return stream.downloader - else: - self.remove_from_wait("Timed out") - return defer.fail(Exception("Timed out")) - - d.addCallback(lambda _: _get_stream_for_return()) - return d + yield self._wait_for_write() + defer.returnValue((sd_hash, download_path)) def _wait_for_write(self): d = defer.succeed(None) @@ -2398,39 +2404,27 @@ class _DownloadNameHelper(object): def _has_downloader_wrote(self): stream = self.daemon.streams.get(self.name, False) if stream: - return self._get_written_bytes(stream.downloader.file_name) + file_path = self._full_path(stream.downloader) + return self._get_written_bytes(file_path) else: return False - def _wait_on_lbry_file(self, f): - written_bytes = self._get_written_bytes(f.file_name) - if written_bytes: - return defer.succeed(self._disp_file(f)) - return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) + def _get_written_bytes(self, file_path): + """Returns the number of bytes written to `file_path`. - def _get_written_bytes(self, file_name): - """Returns the number of bytes written to `file_name`. - - Returns False if there were issues reading `file_name`. + Returns False if there were issues reading `file_path`. """ try: - file_path = os.path.join(self.download_directory, file_name) if os.path.isfile(file_path): - written_file = file(file_path) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() + with open(file_path) as written_file: + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() else: written_bytes = False except Exception: writen_bytes = False return written_bytes - def _disp_file(self, f): - file_path = os.path.join(self.download_directory, f.file_name) - log.info("Already downloaded: %s --> %s", f.sd_hash, file_path) - return f - def remove_from_wait(self, reason): if self.name in self.daemon.waiting_on: del self.daemon.waiting_on[self.name] diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index 3f59e175e..88eb10163 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -15,6 +15,7 @@ INITIALIZING_CODE = 'initializing' DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' +# TODO: is this ever used? DOWNLOAD_STOPPED_CODE = 'stopped' STREAM_STAGES = [ (INITIALIZING_CODE, 'Initializing...'), @@ -46,7 +47,7 @@ class GetStream(object): self.payment_rate_manager = self.session.payment_rate_manager self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier - self.stream_hash = None + self.sd_hash = None self.max_key_fee = max_key_fee self.stream_info = None self.stream_info_manager = None @@ -56,6 +57,8 @@ class GetStream(object): self.download_directory = download_directory self.download_path = None self.downloader = None + # fired after the metadata has been downloaded and the + # actual file has been started self.finished = defer.Deferred(None) self.checker = LoopingCall(self.check_status) self.code = STREAM_STAGES[0] @@ -63,10 +66,10 @@ class GetStream(object): def check_status(self): self.timeout_counter += 1 - # TODO: Why is this the stopping condition for the finished callback? + # download_path is set after the sd blob has been downloaded if self.download_path: self.checker.stop() - self.finished.callback((True, self.stream_hash, self.download_path)) + self.finished.callback((True, self.sd_hash, self.download_path)) elif self.timeout_counter >= self.timeout: log.info("Timeout downloading lbry://%s" % self.resolved_name) @@ -108,20 +111,20 @@ class GetStream(object): self.resolved_name = name self.stream_info = deepcopy(stream_info) self.description = self.stream_info['description'] - self.stream_hash = self.stream_info['sources']['lbry_sd_hash'] + self.sd_hash = self.stream_info['sources']['lbry_sd_hash'] if 'fee' in self.stream_info: self.fee = FeeValidator(self.stream_info['fee']) max_key_fee = self._convert_max_fee() converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount if converted_fee > self.wallet.wallet_balance: - log.warning("Insufficient funds to download lbry://%s", self.resolved_name) - return defer.fail(InsufficientFundsError()) + msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format( + self.resolved_name, converted_fee, self.wallet.wallet_balance) + raise InsufficientFundsError(msg) if converted_fee > max_key_fee: - log.warning( - "Key fee %f above limit of %f didn't download lbry://%s", + msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format( converted_fee, max_key_fee, self.resolved_name) - return defer.fail(KeyFeeAboveMaxAllowed()) + raise KeyFeeAboveMaxAllowed(msg) log.info( "Key fee %f below limit of %f, downloading lbry://%s", converted_fee, max_key_fee, self.resolved_name) @@ -130,7 +133,7 @@ class GetStream(object): self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) self._d.addCallback(lambda _: download_sd_blob( - self.session, self.stream_hash, self.payment_rate_manager)) + self.session, self.sd_hash, self.payment_rate_manager)) self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) self._d.addCallback(get_downloader_factory) @@ -147,7 +150,7 @@ class GetStream(object): d = self._pay_key_fee() d.addCallback(lambda _: log.info( - "Downloading %s --> %s", self.stream_hash, self.downloader.file_name)) + "Downloading %s --> %s", self.sd_hash, self.downloader.file_name)) d.addCallback(lambda _: self.downloader.start()) def _pay_key_fee(self): @@ -155,6 +158,9 @@ class GetStream(object): fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) if reserved_points is None: + log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name) + # TODO: If we get here, nobody will know that there was an error + # as nobody actually cares about self._d return defer.fail(InsufficientFundsError()) return self.wallet.send_points_to_address(reserved_points, fee_lbc) return defer.succeed(None) From 1256beea96b9cb4540730782d840a560eecddf55 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 00:50:36 -0600 Subject: [PATCH 04/11] remove blob_requester from DownloadManager --- lbrynet/core/client/DownloadManager.py | 1 - .../cryptstream/client/CryptStreamDownloader.py | 14 +++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/lbrynet/core/client/DownloadManager.py b/lbrynet/core/client/DownloadManager.py index 689a607df..78f6a006e 100644 --- a/lbrynet/core/client/DownloadManager.py +++ b/lbrynet/core/client/DownloadManager.py @@ -14,7 +14,6 @@ class DownloadManager(object): def __init__(self, blob_manager, upload_allowed): self.blob_manager = blob_manager self.upload_allowed = upload_allowed - self.blob_requester = None self.blob_info_finder = None self.progress_manager = None self.blob_handler = None diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index 9b1e351c7..c32936d6f 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -62,19 +62,16 @@ class CryptStreamDownloader(object): self.payment_rate_manager = payment_rate_manager self.wallet = wallet self.upload_allowed = upload_allowed - self.key = None self.stream_name = None - self.completed = False self.stopped = True self.stopping = False self.starting = False - self.download_manager = None self.finished_deferred = None - self.points_paid = 0.0 + self.blob_requester = None def __str__(self): return str(self.stream_name) @@ -155,20 +152,19 @@ class CryptStreamDownloader(object): return d def _get_download_manager(self): + assert self.blob_requester is None download_manager = DownloadManager(self.blob_manager, self.upload_allowed) download_manager.blob_info_finder = self._get_metadata_handler(download_manager) - download_manager.blob_requester = self._get_blob_requester(download_manager) download_manager.progress_manager = self._get_progress_manager(download_manager) download_manager.blob_handler = self._get_blob_handler(download_manager) download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() + # blob_requester needs to be set before the connection manager is setup + self.blob_requester = self._get_blob_requester(download_manager) download_manager.connection_manager = self._get_connection_manager(download_manager) - #return DownloadManager(self.blob_manager, self.blob_requester, self.metadata_handler, - # self.progress_manager, self.blob_handler, self.connection_manager) return download_manager def _remove_download_manager(self): self.download_manager.blob_info_finder = None - self.download_manager.blob_requester = None self.download_manager.progress_manager = None self.download_manager.blob_handler = None self.download_manager.wallet_info_exchanger = None @@ -176,7 +172,7 @@ class CryptStreamDownloader(object): self.download_manager = None def _get_primary_request_creators(self, download_manager): - return [download_manager.blob_requester] + return [self.blob_requester] def _get_secondary_request_creators(self, download_manager): return [download_manager.wallet_info_exchanger] From 227323b7a46c379a58f8103e1e008e9b5388f525 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 00:51:03 -0600 Subject: [PATCH 05/11] convert more to inlineCallbacks --- .../client/CryptStreamDownloader.py | 4 +- .../EncryptedFileDownloader.py | 38 +++++++------------ 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index c32936d6f..0094edd52 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -83,7 +83,6 @@ class CryptStreamDownloader(object): return self.stop() def start(self): - if self.starting is True: raise CurrentlyStartingError() if self.stopping is True: @@ -94,9 +93,8 @@ class CryptStreamDownloader(object): self.starting = True self.completed = False self.finished_deferred = defer.Deferred() - fd = self.finished_deferred d = self._start() - d.addCallback(lambda _: fd) + d.addCallback(lambda _: self.finished_deferred) return d def stop(self, err=None): diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 527bcfdc3..e2a1a4bc5 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -137,32 +137,20 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): d.addCallback(make_full_status) return d + @defer.inlineCallbacks def _start(self): - - d = EncryptedFileSaver._start(self) - d.addCallback( - lambda _: self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)) - - def _save_sd_hash(sd_hash): - if len(sd_hash): - self.sd_hash = sd_hash[0] - d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) - else: - d = defer.succeed(None) - - return d - - def _save_claim(name, txid, nout): - self.uri = name - self.txid = txid - self.nout = nout - return defer.succeed(None) - - d.addCallback(_save_sd_hash) - d.addCallback(lambda r: _save_claim(r[0], r[1], r[2]) if r else None) - d.addCallback(lambda _: self._save_status()) - - return d + yield EncryptedFileSaver._start(self) + sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + if len(sd_hash): + self.sd_hash = sd_hash[0] + maybe_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) + if maybe_metadata: + name, txid, nout = maybe_metadata + self.uri = name + self.txid = txid + self.nout = nout + status = yield self._save_status() + defer.returnValue(status) def _get_finished_deferred_callback_value(self): if self.completed is True: From 4eb10b56c1c80ed04ac3e85f0c2f3a0cdb72cb42 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 01:40:06 -0600 Subject: [PATCH 06/11] small changes --- lbrynet/core/client/ClientProtocol.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 2d9e3d0f5..f95162cf3 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -176,11 +176,12 @@ class ClientProtocol(Protocol): def _handle_response(self, response): ds = [] - log.debug("Handling a response. Current expected responses: %s", self._response_deferreds) + log.debug( + "Handling a response from %s. Expected responses: %s. Actual responses: %s", + self.peer, self._response_deferreds.keys(), response.keys()) for key, val in response.items(): if key in self._response_deferreds: - d = self._response_deferreds[key] - del self._response_deferreds[key] + d = self._response_deferreds.pop(key) d.callback({key: val}) ds.append(d) for k, d in self._response_deferreds.items(): @@ -194,6 +195,7 @@ class ClientProtocol(Protocol): d.addErrback(self._handle_response_error) ds.append(d) + # TODO: are we sure we want to consume errors here dl = defer.DeferredList(ds, consumeErrors=True) def get_next_request(results): From 0b53fde3529bcb5e9ccd7336549010ee4fc72096 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 08:27:50 -0600 Subject: [PATCH 07/11] refactor connectionmanager to use inlineCallbacks --- lbrynet/core/client/ConnectionManager.py | 119 +++++++++++------------ 1 file changed, 57 insertions(+), 62 deletions(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index d8c6170fc..a3331b31f 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -72,13 +72,19 @@ class ConnectionManager(object): closing_deferreds.append(close_connection(peer)) return defer.DeferredList(closing_deferreds) + @defer.inlineCallbacks def get_next_request(self, peer, protocol): - log.debug("Trying to get the next request for peer %s", peer) - if not peer in self._peer_connections or self.stopped is True: log.debug("The peer has already been told to shut down.") - return defer.succeed(False) + defer.returnValue(False) + requests = yield self._send_primary_requests(peer, protocol) + have_request = any(r[1] for r in requests if r[0] is True) + if have_request: + yield self._send_secondary_requests(peer, protocol) + defer.returnValue(have_request) + + def _send_primary_requests(self, peer, protocol): def handle_error(err): err.trap(InsufficientFundsError) @@ -97,34 +103,20 @@ class ConnectionManager(object): self._peer_connections[peer].request_creators.append(request_creator) return request_sent - def check_requests(requests): - have_request = True in [r[1] for r in requests if r[0] is True] - return have_request - - def get_secondary_requests_if_necessary(have_request): - if have_request is True: - ds = [] - for s_r_c in self._secondary_request_creators: - d = s_r_c.send_next_request(peer, protocol) - ds.append(d) - dl = defer.DeferredList(ds) - else: - dl = defer.succeed(None) - dl.addCallback(lambda _: have_request) - return dl - ds = [] - for p_r_c in self._primary_request_creators: d = p_r_c.send_next_request(peer, protocol) d.addErrback(handle_error) d.addCallback(check_if_request_sent, p_r_c) ds.append(d) + return defer.DeferredList(ds, fireOnOneErrback=True) - dl = defer.DeferredList(ds, fireOnOneErrback=True) - dl.addCallback(check_requests) - dl.addCallback(get_secondary_requests_if_necessary) - return dl + def _send_secondary_requests(self, peer, protocol): + ds = [ + s_r_c.send_next_request(peer, protocol) + for s_r_c in self._secondary_request_creators + ] + return defer.DeferredList(ds) def protocol_disconnected(self, peer, protocol): if peer in self._peer_connections: @@ -147,49 +139,52 @@ class ConnectionManager(object): return sorted(self._primary_request_creators, key=count_peers) def _connect_to_peer(self, peer): + if peer is None or self.stopped: + return from twisted.internet import reactor - if peer is not None and self.stopped is False: - log.debug("Trying to connect to %s", peer) - factory = ClientProtocolFactory(peer, self.rate_limiter, self) - self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], - factory) - connection = reactor.connectTCP(peer.host, peer.port, factory) - self._peer_connections[peer].connection = connection + log.debug("Trying to connect to %s", peer) + factory = ClientProtocolFactory(peer, self.rate_limiter, self) + self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], + factory) + connection = reactor.connectTCP(peer.host, peer.port, factory) + self._peer_connections[peer].connection = connection + @defer.inlineCallbacks def _manage(self): - from twisted.internet import reactor - - def get_new_peers(request_creators): - log.debug("Trying to get a new peer to connect to") - if len(request_creators) > 0: - log.debug("Got a creator to check: %s", request_creators[0]) - d = request_creators[0].get_new_peers() - d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:])) - return d - else: - return defer.succeed(None) - - def pick_best_peer(peers): - # TODO: Eventually rank them based on past performance/reputation. For now - # TODO: just pick the first to which we don't have an open connection - - log.debug("Got a list of peers to choose from: %s", peers) - if peers is None: - return None - for peer in peers: - if not peer in self._peer_connections: - log.debug("Got a good peer. Returning peer %s", peer) - return peer - log.debug("Couldn't find a good peer to connect to") - return None - if len(self._peer_connections) < conf.settings.max_connections_per_stream: - ordered_request_creators = self._rank_request_creator_connections() - d = get_new_peers(ordered_request_creators) - d.addCallback(pick_best_peer) - d.addCallback(self._connect_to_peer) - + try: + ordered_request_creators = self._rank_request_creator_connections() + peers = yield self._get_new_peers(ordered_request_creators) + peer = self._pick_best_peer(peers) + yield self._connect_to_peer(peer) + except Exception: + # log this otherwise it will just end up as an unhandled error in deferred + log.exception('Something bad happened picking a peer') self._next_manage_call = reactor.callLater(1, self._manage) + + @defer.inlineCallbacks + def _get_new_peers(self, request_creators): + log.debug("Trying to get a new peer to connect to") + if not request_creators: + defer.returnValue(None) + log.debug("Got a creator to check: %s", request_creators[0]) + new_peers = yield request_creators[0].get_new_peers() + if not new_peers: + new_peers = yield self._get_new_peers(request_creators[1:]) + defer.returnValue(new_peers) + + def _pick_best_peer(self, peers): + # TODO: Eventually rank them based on past performance/reputation. For now + # TODO: just pick the first to which we don't have an open connection + log.debug("Got a list of peers to choose from: %s", peers) + if peers is None: + return None + for peer in peers: + if not peer in self._peer_connections: + log.debug("Got a good peer. Returning peer %s", peer) + return peer + log.debug("Couldn't find a good peer to connect to") + return None From 1bf1b7624cdc3a94a04bdc2bd6c184c00b555adf Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 10:47:34 -0600 Subject: [PATCH 08/11] more inlineCallback cleanup --- .../client/CryptStreamDownloader.py | 22 +++---- tests/functional/test_misc.py | 61 ++++++++++--------- 2 files changed, 40 insertions(+), 43 deletions(-) diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index 0094edd52..ae312ad59 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -97,25 +97,20 @@ class CryptStreamDownloader(object): d.addCallback(lambda _: self.finished_deferred) return d + @defer.inlineCallbacks def stop(self, err=None): - - def check_if_stop_succeeded(success): - self.stopping = False - if success is True: - self.stopped = True - self._remove_download_manager() - return success - if self.stopped is True: raise AlreadyStoppedError() if self.stopping is True: raise CurrentlyStoppingError() assert self.download_manager is not None self.stopping = True - d = self.download_manager.stop_downloading() - d.addCallback(check_if_stop_succeeded) - d.addCallback(lambda _: self._fire_completed_deferred(err)) - return d + success = yield self.download_manager.stop_downloading() + self.stopping = False + if success is True: + self.stopped = True + self._remove_download_manager() + yield self._fire_completed_deferred(err) def _start_failed(self): @@ -204,7 +199,8 @@ class CryptStreamDownloader(object): if err is not None: d.errback(err) else: - d.callback(self._get_finished_deferred_callback_value()) + value = self._get_finished_deferred_callback_value() + d.callback(value) else: log.debug("Not firing the completed deferred because d is None") diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 17ea1a7f8..1e0efc37e 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -652,7 +652,8 @@ class TestTransfer(TestCase): options = metadata.options factories = metadata.factories chosen_options = [ - o.default_value for o in options.get_downloader_options(info_validator, prm)] + o.default_value for o in options.get_downloader_options(info_validator, prm) + ] return factories[0].make_downloader(metadata, chosen_options, prm) def download_file(sd_hash): @@ -669,17 +670,14 @@ class TestTransfer(TestCase): hashsum.update(f.read()) self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") + @defer.inlineCallbacks def start_transfer(sd_hash): - logging.debug("Starting the transfer") - - d = self.session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - d.addCallback(lambda _: download_file(sd_hash)) - d.addCallback(lambda _: check_md5_sum()) - - return d + yield self.session.setup() + yield add_lbry_file_to_sd_identifier(sd_identifier) + yield self.lbry_file_manager.setup() + yield download_file(sd_hash) + yield check_md5_sum() def stop(arg): if isinstance(arg, Failure): @@ -914,25 +912,30 @@ class TestTransfer(TestCase): self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) + @defer.inlineCallbacks def make_downloader(metadata, prm): info_validator = metadata.validator options = metadata.options factories = metadata.factories - chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] - return factories[0].make_downloader(metadata, chosen_options, prm) + chosen_options = [ + o.default_value for o in options.get_downloader_options(info_validator, prm) + ] + downloader = yield factories[0].make_downloader(metadata, chosen_options, prm) + defer.returnValue(downloader) def append_downloader(downloader): downloaders.append(downloader) return downloader + @defer.inlineCallbacks def download_file(sd_hash): prm = self.session.payment_rate_manager - d = download_sd_blob(self.session, sd_hash, prm) - d.addCallback(sd_identifier.get_metadata_for_sd_blob) - d.addCallback(make_downloader, prm) - d.addCallback(append_downloader) - d.addCallback(lambda downloader: downloader.start()) - return d + sd_blob = yield download_sd_blob(self.session, sd_hash, prm) + metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob) + downloader = yield make_downloader(metadata, prm) + downloaders.append(downloader) + finished_value = yield downloader.start() + defer.returnValue(finished_value) def check_md5_sum(): f = open('test_file') @@ -959,20 +962,18 @@ class TestTransfer(TestCase): d.addCallback(check_status_report) return d + @defer.inlineCallbacks def start_transfer(sd_hash): logging.debug("Starting the transfer") - - d = self.session.setup() - d.addCallback(lambda _: self.stream_info_manager.setup()) - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - d.addCallback(lambda _: download_file(sd_hash)) - d.addCallback(lambda _: check_md5_sum()) - d.addCallback(lambda _: download_file(sd_hash)) - d.addCallback(lambda _: delete_lbry_file()) - d.addCallback(lambda _: check_lbry_file()) - - return d + yield self.session.setup() + yield self.stream_info_manager.setup() + yield add_lbry_file_to_sd_identifier(sd_identifier) + yield self.lbry_file_manager.setup() + yield download_file(sd_hash) + yield check_md5_sum() + yield download_file(sd_hash) + yield delete_lbry_file() + yield check_lbry_file() def stop(arg): if isinstance(arg, Failure): From ad1e2650e33f07eb43b9bc8b0cbe988ff3fedb37 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 11:37:11 -0600 Subject: [PATCH 09/11] add events for downloading stages --- lbrynet/analytics/events.py | 14 ++++++++++++++ lbrynet/analytics/manager.py | 8 ++++++++ lbrynet/lbrynet_daemon/Daemon.py | 10 +++++++++- 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py index 310ee1ce1..d211754ee 100644 --- a/lbrynet/analytics/events.py +++ b/lbrynet/analytics/events.py @@ -55,6 +55,20 @@ class Events(object): } return self._event('Download Started', properties) + def download_errored(self, name, stream_info=None): + properties = { + 'name': name, + 'stream_info': get_sd_hash(stream_info) + } + return self._event('Download Errored', properties) + + def download_finished(self, name, stream_info=None): + properties = { + 'name': name, + 'stream_info': get_sd_hash(stream_info) + } + return self._event('Download Finished', properties) + def error(self, message, sd_hash=None): properties = { 'message': message, diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 5af578b61..b8d00526b 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -74,6 +74,14 @@ class Manager(object): event = self.events_generator.download_started(name, stream_info) self.analytics_api.track(event) + def send_download_errored(self, name, stream_info=None): + event = self.events_generator.download_errored(name, stream_info) + self.analytics_api.track(event) + + def send_download_finished(self, name, stream_info=None): + event = self.events_generator.download_finished(name, stream_info) + self.analytics_api.track(event) + def send_error(self, message, sd_hash=None): event = self.events_generator.error(message, sd_hash) self.analytics_api.track(event) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 295494087..7d1b65096 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -790,7 +790,7 @@ class Daemon(AuthJSONRPCServer): If it already exists in the file manager, return the existing lbry file """ timeout = timeout if timeout is not None else conf.settings.download_timeout - self.analytics_manager.send_download_started(name, stream_info) + helper = _DownloadNameHelper( self, name, timeout, download_directory, file_name, wait_for_write) @@ -1504,6 +1504,9 @@ class Daemon(AuthJSONRPCServer): # TODO: return a useful error message here, like "already # waiting for name to be resolved" defer.returnValue(server.failure) + name = params.name + stream_info = params.stream_info + self.analytics_manager.send_download_started(name, stream_info) try: sd_hash, file_path = yield self._download_name( name=params.name, @@ -1514,6 +1517,7 @@ class Daemon(AuthJSONRPCServer): wait_for_write=params.wait_for_write ) except Exception as e: + self.analytics_manager.send_download_errored(name, stream_info) log.exception('Failed to get %s', params.name) response = yield self._render_response(str(e), OK_CODE) else: @@ -1522,6 +1526,10 @@ class Daemon(AuthJSONRPCServer): 'stream_hash': params.sd_hash if params.stream_info else sd_hash, 'path': file_path } + stream = self.streams.get(name) + if stream: + stream.downloader.finished_deferred.addCallback( + lambda _: self.analytics_manager.send_download_finished(name, stream_info)) response = yield self._render_response(message, OK_CODE) defer.returnValue(response) From 6bdc9069c9482c0c53270672a30d3959cfe5956e Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 12:24:52 -0600 Subject: [PATCH 10/11] on get, first check if we already have the name --- lbrynet/lbrynet_daemon/Daemon.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 7d1b65096..025b22a6b 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -1506,6 +1506,18 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(server.failure) name = params.name stream_info = params.stream_info + + # first check if we already have this + lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False) + if lbry_file: + log.info('Already have a file for %s', name) + message = { + 'stream_hash': params.sd_hash if params.stream_info else lbry_file.sd_hash, + 'path': os.path.join(lbry_file.download_directory, lbry_file.file_name) + } + response = yield self._render_response(message, OK_CODE) + defer.returnValue(response) + self.analytics_manager.send_download_started(name, stream_info) try: sd_hash, file_path = yield self._download_name( From 669a56754538ac605c03de92ac4dc1984ce47337 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 30 Dec 2016 12:35:17 -0600 Subject: [PATCH 11/11] add id to link download events --- lbrynet/analytics/events.py | 29 ++++++++++++++--------------- lbrynet/analytics/manager.py | 12 ++++++------ lbrynet/core/utils.py | 5 +++++ lbrynet/lbrynet_daemon/Daemon.py | 9 ++++++--- tests/unit/analytics/test_events.py | 3 ++- 5 files changed, 33 insertions(+), 25 deletions(-) diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py index d211754ee..353e93073 100644 --- a/lbrynet/analytics/events.py +++ b/lbrynet/analytics/events.py @@ -48,25 +48,16 @@ class Events(object): def heartbeat(self): return self._event('Heartbeat') - def download_started(self, name, stream_info=None): - properties = { - 'name': name, - 'stream_info': get_sd_hash(stream_info) - } + def download_started(self, *args, **kwargs): + properties = download_properties(*args, **kwargs) return self._event('Download Started', properties) - def download_errored(self, name, stream_info=None): - properties = { - 'name': name, - 'stream_info': get_sd_hash(stream_info) - } + def download_errored(self, *args, **kwargs): + properties = download_properties(*args, **kwargs) return self._event('Download Errored', properties) - def download_finished(self, name, stream_info=None): - properties = { - 'name': name, - 'stream_info': get_sd_hash(stream_info) - } + def download_finished(self, *args, **kwargs): + properties = download_properties(*args, **kwargs) return self._event('Download Finished', properties) def error(self, message, sd_hash=None): @@ -124,3 +115,11 @@ def make_context(platform, wallet): 'version': '1.0.0' }, } + + +def download_properties(id_, name, stream_info=None): + return { + 'download_id': id_, + 'name': name, + 'stream_info': get_sd_hash(stream_info) + } diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index b8d00526b..6e11c805c 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -70,16 +70,16 @@ class Manager(object): event = self.events_generator.server_startup_error(message) self.analytics_api.track(event) - def send_download_started(self, name, stream_info=None): - event = self.events_generator.download_started(name, stream_info) + def send_download_started(self, id_, name, stream_info=None): + event = self.events_generator.download_started(id_, name, stream_info) self.analytics_api.track(event) - def send_download_errored(self, name, stream_info=None): - event = self.events_generator.download_errored(name, stream_info) + def send_download_errored(self, id_, name, stream_info=None): + event = self.events_generator.download_errored(id_, name, stream_info) self.analytics_api.track(event) - def send_download_finished(self, name, stream_info=None): - event = self.events_generator.download_finished(name, stream_info) + def send_download_finished(self, id_, name, stream_info=None): + event = self.events_generator.download_finished(id_, name, stream_info) self.analytics_api.track(event) def send_error(self, message, sd_hash=None): diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 08d5e813f..b55f6c208 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -4,6 +4,7 @@ import logging import random import os import socket +import string import sys import pkg_resources @@ -92,3 +93,7 @@ def setup_certs_for_windows(): if getattr(sys, 'frozen', False) and os.name == "nt": cert_path = os.path.join(os.path.dirname(sys.executable), "cacert.pem") os.environ["REQUESTS_CA_BUNDLE"] = cert_path + + +def random_string(length=10, chars=string.ascii_lowercase): + return ''.join([random.choice(chars) for _ in range(length)]) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 025b22a6b..b8a438b5c 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -1518,7 +1518,8 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(message, OK_CODE) defer.returnValue(response) - self.analytics_manager.send_download_started(name, stream_info) + download_id = utils.random_string() + self.analytics_manager.send_download_started(download_id, name, stream_info) try: sd_hash, file_path = yield self._download_name( name=params.name, @@ -1529,7 +1530,7 @@ class Daemon(AuthJSONRPCServer): wait_for_write=params.wait_for_write ) except Exception as e: - self.analytics_manager.send_download_errored(name, stream_info) + self.analytics_manager.send_download_errored(download_id, name, stream_info) log.exception('Failed to get %s', params.name) response = yield self._render_response(str(e), OK_CODE) else: @@ -1541,7 +1542,9 @@ class Daemon(AuthJSONRPCServer): stream = self.streams.get(name) if stream: stream.downloader.finished_deferred.addCallback( - lambda _: self.analytics_manager.send_download_finished(name, stream_info)) + lambda _: self.analytics_manager.send_download_finished( + download_id, name, stream_info) + ) response = yield self._render_response(message, OK_CODE) defer.returnValue(response) diff --git a/tests/unit/analytics/test_events.py b/tests/unit/analytics/test_events.py index d9baf2e59..06b9bcd92 100644 --- a/tests/unit/analytics/test_events.py +++ b/tests/unit/analytics/test_events.py @@ -22,7 +22,7 @@ class EventsTest(unittest.TestCase): self.assertEqual(desired_result, result) def test_download_started(self): - result = self.event_generator.download_started('great gatsby') + result = self.event_generator.download_started('1', 'great gatsby') desired_result = { 'context': 'any valid json datatype', 'event': 'Download Started', @@ -31,6 +31,7 @@ class EventsTest(unittest.TestCase): 'session_id': 'session456', 'name': 'great gatsby', 'stream_info': None, + 'download_id': '1' }, 'timestamp': '2016-01-01T00:00:00Z', 'userId': 'lbry'