From b0a3771ccfbc7726f3c14271daa141361995c887 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:23:41 -0400 Subject: [PATCH 01/15] better download errors --- lbrynet/core/Error.py | 15 +++++++++++++++ lbrynet/core/client/StandaloneBlobDownloader.py | 4 ++-- lbrynet/daemon/Downloader.py | 17 +++++++++++++---- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index d1e8bb785..4822b7342 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -9,11 +9,26 @@ class DuplicateStreamHashError(Exception): class DownloadCanceledError(Exception): pass + +class DownloadSDTimeout(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download sd blob {} within timeout'.format(download)) + self.download = download + + class DownloadTimeoutError(Exception): def __init__(self, download): Exception.__init__(self, 'Failed to download {} within timeout'.format(download)) self.download = download + +class DownloadDataTimeout(Exception): + def __init__(self, download): + Exception.__init__(self, 'Failed to download data blobs for sd hash ' + '{} within timeout'.format(download)) + self.download = download + + class RequestCanceledError(Exception): pass diff --git a/lbrynet/core/client/StandaloneBlobDownloader.py b/lbrynet/core/client/StandaloneBlobDownloader.py index f7a108c65..10509fd27 100644 --- a/lbrynet/core/client/StandaloneBlobDownloader.py +++ b/lbrynet/core/client/StandaloneBlobDownloader.py @@ -5,7 +5,7 @@ from lbrynet.core.BlobInfo import BlobInfo from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.ConnectionManager import ConnectionManager from lbrynet.core.client.DownloadManager import DownloadManager -from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError +from lbrynet.core.Error import InvalidBlobHashError, DownloadSDTimeout from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call from twisted.python.failure import Failure from twisted.internet import defer @@ -136,7 +136,7 @@ class StandaloneBlobDownloader(object): def _download_timedout(self): self.stop() if not self.finished_deferred.called: - self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash)) + self.finished_deferred.errback(DownloadSDTimeout(self.blob_hash)) def insufficient_funds(self, err): self.stop() diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 488f02886..f801ee474 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -5,7 +5,8 @@ from twisted.internet.task import LoopingCall from lbryschema.fee import Fee -from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError +from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed +from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -68,7 +69,7 @@ class GetStream(object): if self.data_downloading_deferred.called: safe_stop_looping_call(self.checker) else: - log.info("Downloading stream data (%i seconds)", self.timeout_counter) + log.info("Waiting for stream data (%i seconds)", self.timeout_counter) def check_status(self): """ @@ -77,7 +78,8 @@ class GetStream(object): self.timeout_counter += 1 if self.timeout_counter >= self.timeout: if not self.data_downloading_deferred.called: - self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name)) + self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash)) + safe_stop_looping_call(self.checker) else: d = self.downloader.status() @@ -150,6 +152,12 @@ class GetStream(object): self._check_status(status) defer.returnValue(self.download_path) + def fail(self, err): + safe_stop_looping_call(self.checker) + if not err.check(DownloadDataTimeout): + raise err + return DownloadCanceledError() + @defer.inlineCallbacks def _initialize(self, stream_info): # Set sd_hash and return key_fee from stream_info @@ -179,7 +187,8 @@ class GetStream(object): log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() - self.finished_deferred.addCallback(self.finish, name) + self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), + self.fail) @defer.inlineCallbacks def start(self, stream_info, name): From 250855d45db08b6876038440c4b09c623abc6be7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:25:21 -0400 Subject: [PATCH 02/15] stop failed downloader outside of GetStream --- lbrynet/daemon/Downloader.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index f801ee474..3bfe4bf08 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -76,7 +76,7 @@ class GetStream(object): Check if we've got the first data blob in the stream yet """ self.timeout_counter += 1 - if self.timeout_counter >= self.timeout: + if self.timeout_counter > self.timeout: if not self.data_downloading_deferred.called: self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash)) @@ -213,9 +213,8 @@ class GetStream(object): try: yield self.data_downloading_deferred - except Exception as err: - self.downloader.stop() + except DownloadDataTimeout as err: safe_stop_looping_call(self.checker) - raise + raise err defer.returnValue((self.downloader, self.finished_deferred)) From 9de4657a4dffd040a78269cf1d16ffc43c24b782 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:30:03 -0400 Subject: [PATCH 03/15] fix blob history and callback from _download_succeeded previously _download_succeeded did not block on blob_completed, presumably because even longer ago it did not block on a deriving immediate_announce call and thus took a long time to return --- lbrynet/core/client/BlobRequester.py | 9 +++++---- lbrynet/core/server/BlobRequestHandler.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 1ce4f7205..37185bbf0 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -516,8 +516,6 @@ class DownloadRequest(RequestHelper): def _pay_or_cancel_payment(self, arg, reserved_points, blob): if self._can_pay_peer(blob, arg): self._pay_peer(blob.length, reserved_points) - d = self.requestor.blob_manager.add_blob_to_download_history( - str(blob), str(self.peer.host), float(self.protocol_prices[self.protocol])) else: self._cancel_points(reserved_points) return arg @@ -567,8 +565,11 @@ class DownloadRequest(RequestHelper): self.peer.update_stats('blobs_downloaded', 1) self.peer.update_score(5.0) should_announce = blob.blob_hash == self.head_blob_hash - self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) - return arg + d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) + d.addCallback(lambda _: self.requestor.blob_manager.add_blob_to_download_history( + blob.blob_hash, self.peer.host, self.protocol_prices[self.protocol])) + d.addCallback(lambda _: arg) + return d def _download_failed(self, reason): if not reason.check(DownloadCanceledError, PriceDisagreementError): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 308d0c822..de98cf898 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -162,7 +162,7 @@ class BlobRequestHandler(object): def record_transaction(self, blob): d = self.blob_manager.add_blob_to_upload_history( - str(blob), self.peer.host, self.blob_data_payment_rate) + blob.blob_hash, self.peer.host, self.blob_data_payment_rate) return d def _reply_to_send_request(self, response, incoming): From af99edc764102116f2fd9ee22bef54560699880f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:30:22 -0400 Subject: [PATCH 04/15] add get_host_downloaded_from --- lbrynet/core/BlobManager.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index f6c329dc1..6293db4ff 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -128,6 +128,16 @@ class DiskBlobManager(DHTHashSupplier): d = self._add_blob_to_download_history(blob_hash, host, rate) return d + @defer.inlineCallbacks + def get_host_downloaded_from(self, blob_hash): + query_str = "SELECT host FROM download WHERE blob=? ORDER BY ts DESC LIMIT 1" + host = yield self.db_conn.runQuery(query_str, (blob_hash,)) + if host: + result = host[0][0] + else: + result = None + defer.returnValue(result) + def add_blob_to_upload_history(self, blob_hash, host, rate): d = self._add_blob_to_upload_history(blob_hash, host, rate) return d From c9ae251d60d8f7aee5b0acb27d1c119eefadb7d2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:02:36 -0400 Subject: [PATCH 05/15] download analytics --- lbrynet/analytics.py | 40 ++++++++++++++++----------- lbrynet/daemon/Daemon.py | 58 ++++++++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 27 deletions(-) diff --git a/lbrynet/analytics.py b/lbrynet/analytics.py index 02885fb6c..855bcfcee 100644 --- a/lbrynet/analytics.py +++ b/lbrynet/analytics.py @@ -57,15 +57,14 @@ class Manager(object): self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) ) - def send_download_errored(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_errored(self, err, id_, name, claim_dict, report): + download_error_properties = self._download_error_properties(err, id_, name, claim_dict, + report) + self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties)) - def send_download_finished(self, id_, name, claim_dict=None): - self.analytics_api.track( - self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict)) - ) + def send_download_finished(self, id_, name, report, claim_dict=None): + download_properties = self._download_properties(id_, name, claim_dict, report) + self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties)) def send_claim_action(self, action): self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) @@ -159,18 +158,27 @@ class Manager(object): return properties @staticmethod - def _download_properties(id_, name, claim_dict=None): - sd_hash = None - if claim_dict: - try: - sd_hash = claim_dict.source_hash - except (KeyError, TypeError, ValueError): - log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True) - return { + def _download_properties(id_, name, claim_dict=None, report=None): + sd_hash = None if not claim_dict else claim_dict.source_hash + p = { 'download_id': id_, 'name': name, 'stream_info': sd_hash } + if report: + p['report'] = report + return p + + @staticmethod + def _download_error_properties(error, id_, name, claim_dict, report): + return { + 'download_id': id_, + 'name': name, + 'stream_info': claim_dict.source_hash, + 'error': str(type(error)), + 'reason': error.message, + 'report': report + } @staticmethod def _make_context(platform, wallet): diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index f77f183c0..521134f87 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -46,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash -from lbrynet.core.Error import NoSuchStreamHash +from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError log = logging.getLogger(__name__) @@ -600,7 +600,40 @@ class Daemon(AuthJSONRPCServer): return download_sd_blob(self.session, blob_hash, rate_manager, timeout) @defer.inlineCallbacks - def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): + def _get_stream_analytics_report(self, claim_dict): + sd_hash = claim_dict.source_hash + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + except NoSuchSDHash: + stream_hash = None + report = { + "sd_hash": sd_hash, + "stream_hash": stream_hash, + } + blobs = {} + sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + report["sd_blob"] = sd_host + if stream_hash: + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) + report["known_blobs"] = len(blob_infos) + else: + blob_infos = [] + report["known_blobs"] = 0 + for blob_hash, blob_num, iv, length in blob_infos: + host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + if host: + blobs[blob_num] = host + report["blobs"] = blobs + defer.returnValue(report) + + @defer.inlineCallbacks + def _download_finished(self, download_id, name, claim_dict): + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + + @defer.inlineCallbacks + def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None, + delete_on_timeout=True): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file @@ -621,17 +654,20 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) - finished_deferred.addCallback( - lambda _: self.analytics_manager.send_download_finished(download_id, - name, - claim_dict)) + finished_deferred.addCallback(lambda _: self._download_finished(download_id, name, + claim_dict)) result = yield self._get_lbry_file_dict(lbry_file, full_status=True) - del self.streams[claim_id] - except Exception as err: - log.warning('Failed to get %s: %s', name, err) - self.analytics_manager.send_download_errored(download_id, name, claim_dict) - del self.streams[claim_id] + except (DownloadDataTimeout, DownloadSDTimeout) as err: + log.warning('Failed to get %s (%s)', name, err) + report = yield self._get_stream_analytics_report(claim_dict) + if isinstance(err, DownloadDataTimeout) and delete_on_timeout: + yield self.lbry_file_manager.delete_lbry_file(self.streams[claim_id].downloader) + elif self.streams[claim_id].downloader: + yield self.streams[claim_id].downloader.stop(err) + self.analytics_manager.send_download_errored(err, download_id, name, claim_dict, + report) result = {'error': err.message} + del self.streams[claim_id] defer.returnValue(result) @defer.inlineCallbacks From fa83a4f67159acf71933662f9ab2f90bdf1b013c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:47:36 -0400 Subject: [PATCH 06/15] fix stop condition --- lbrynet/daemon/Downloader.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 3bfe4bf08..626c79947 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -10,7 +10,6 @@ from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet import conf INITIALIZING_CODE = 'initializing' @@ -62,9 +61,7 @@ class GetStream(object): return os.path.join(self.download_directory, self.downloader.file_name) def _check_status(self, status): - stop_condition = (status.num_completed > 0 or - status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED) - if stop_condition and not self.data_downloading_deferred.called: + if status.num_completed > 0 and not self.data_downloading_deferred.called: self.data_downloading_deferred.callback(True) if self.data_downloading_deferred.called: safe_stop_looping_call(self.checker) From 81b584a35a029591b120f3ba49ccfe27d1f85773 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:47:50 -0400 Subject: [PATCH 07/15] update tests and scripts --- scripts/query_available_blobs.py | 2 +- tests/unit/core/test_BlobManager.py | 4 +- tests/unit/lbrynet_daemon/test_Downloader.py | 92 +++++++++----------- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py index 727c019c8..39e1f406f 100644 --- a/scripts/query_available_blobs.py +++ b/scripts/query_available_blobs.py @@ -59,7 +59,7 @@ def main(args=None): use_upnp=False, wallet=wallet ) - api = analytics.Api.new_instance() + api = analytics.Api.new_instance(conf.settings['share_usage_data']) run(args, session, api) reactor.run() finally: diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py index f6b4a1f04..ddeee7c0c 100644 --- a/tests/unit/core/test_BlobManager.py +++ b/tests/unit/core/test_BlobManager.py @@ -50,10 +50,10 @@ class BlobManagerTest(unittest.TestCase): writer, finished_d = yield blob.open_for_writing(self.peer) yield writer.write(data) yield self.bm.blob_completed(blob) - yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data)) + yield self.bm.add_blob_to_upload_history(blob_hash, 'test', len(data)) # check to see if blob is there - self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hash))) + self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash))) blobs = yield self.bm.get_all_verified_blobs() self.assertTrue(blob_hash in blobs) defer.returnValue(blob_hash) diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py index df3e4315c..791e28c4d 100644 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -1,26 +1,22 @@ import types import mock -import json from twisted.trial import unittest from twisted.internet import defer, task -from lbryschema.claim import ClaimDict from lbrynet.core import Session, PaymentRateManager, Wallet -from lbrynet.core.Error import DownloadTimeoutError +from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.daemon import Downloader -from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata -from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport -from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader, ManagedEncryptedFileDownloaderFactory +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager -from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker from tests.mocks import ExchangeRateManager as DummyExchangeRateManager -from tests.mocks import BTCLBCFeed, USDBTCFeed from tests.mocks import mock_conf_settings + class MocDownloader(object): def __init__(self): self.finish_deferred = defer.Deferred(None) @@ -106,7 +102,7 @@ class GetStreamTests(unittest.TestCase): DownloadTimeoutError is raised """ def download_sd_blob(self): - raise DownloadTimeoutError(self.file_name) + raise DownloadSDTimeout(self.file_name) getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream) @@ -115,31 +111,30 @@ class GetStreamTests(unittest.TestCase): getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) name='test' stream_info = None - with self.assertRaises(DownloadTimeoutError): + with self.assertRaises(DownloadSDTimeout): yield getstream.start(stream_info,name) self.assertFalse(getstream.pay_key_fee_called) - - @defer.inlineCallbacks - def test_timeout(self): - """ - test that timeout (set to 2 here) exception is raised - when download times out while downloading first blob, and key fee is paid - """ - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) - name='test' - stream_info = None - start = getstream.start(stream_info,name) - self.clock.advance(1) - self.clock.advance(1) - with self.assertRaises(DownloadTimeoutError): - yield start - self.assertTrue(getstream.downloader.stop_called) - self.assertTrue(getstream.pay_key_fee_called) + # @defer.inlineCallbacks + # def test_timeout(self): + # """ + # test that timeout (set to 2 here) exception is raised + # when download times out while downloading first blob, and key fee is paid + # """ + # getstream = self.init_getstream_with_mocs() + # getstream._initialize = types.MethodType(moc_initialize, getstream) + # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + # getstream._download = types.MethodType(moc_download, getstream) + # getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + # name='test' + # stream_info = None + # start = getstream.start(stream_info,name) + # self.clock.advance(1) + # self.clock.advance(1) + # with self.assertRaises(DownloadDataTimeout): + # yield start + # self.assertTrue(getstream.downloader.stop_called) + # self.assertTrue(getstream.pay_key_fee_called) @defer.inlineCallbacks def test_finish_one_blob(self): @@ -163,21 +158,20 @@ class GetStreamTests(unittest.TestCase): downloader, f_deferred = yield start self.assertTrue(getstream.pay_key_fee_called) - - @defer.inlineCallbacks - def test_finish_stopped_downloader(self): - """ - test that if we have a stopped downloader, beforfe a blob is downloaded, - start() returns - """ - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - name='test' - stream_info = None - start = getstream.start(stream_info,name) - - getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED - self.clock.advance(1) - downloader, f_deferred = yield start + # @defer.inlineCallbacks + # def test_finish_stopped_downloader(self): + # """ + # test that if we have a stopped downloader, beforfe a blob is downloaded, + # start() returns + # """ + # getstream = self.init_getstream_with_mocs() + # getstream._initialize = types.MethodType(moc_initialize, getstream) + # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + # getstream._download = types.MethodType(moc_download, getstream) + # name='test' + # stream_info = None + # start = getstream.start(stream_info,name) + # + # getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED + # self.clock.advance(1) + # downloader, f_deferred = yield start From d2b91d5f51175c5d13351946a2a3ea08e2ee3905 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 27 Sep 2017 17:55:58 -0400 Subject: [PATCH 08/15] changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ceff6db7e..21ceeeff9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ at anytime. ### Fixed * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer + * Fixed blob download history ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. @@ -27,6 +28,8 @@ at anytime. ### Changed * Announcing by head blob is turned on by default * Updated reflector server dns + * Improved download analytics + * Improved download errors by distinguishing a data timeout from a sd timeout ### Added * Added WAL pragma to sqlite3 From 89ebed570ec091db797d814bd67caf4b141de39f Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 28 Sep 2017 11:04:36 -0400 Subject: [PATCH 09/15] fix test_timeout in test_Downloader --- tests/unit/lbrynet_daemon/test_Downloader.py | 40 ++++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py index 791e28c4d..231e277f4 100644 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -115,26 +115,26 @@ class GetStreamTests(unittest.TestCase): yield getstream.start(stream_info,name) self.assertFalse(getstream.pay_key_fee_called) - # @defer.inlineCallbacks - # def test_timeout(self): - # """ - # test that timeout (set to 2 here) exception is raised - # when download times out while downloading first blob, and key fee is paid - # """ - # getstream = self.init_getstream_with_mocs() - # getstream._initialize = types.MethodType(moc_initialize, getstream) - # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - # getstream._download = types.MethodType(moc_download, getstream) - # getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) - # name='test' - # stream_info = None - # start = getstream.start(stream_info,name) - # self.clock.advance(1) - # self.clock.advance(1) - # with self.assertRaises(DownloadDataTimeout): - # yield start - # self.assertTrue(getstream.downloader.stop_called) - # self.assertTrue(getstream.pay_key_fee_called) + @defer.inlineCallbacks + def test_timeout(self): + """ + test that timeout (set to 3 here) exception is raised + when download times out while downloading first blob, and key fee is paid + """ + getstream = self.init_getstream_with_mocs() + getstream._initialize = types.MethodType(moc_initialize, getstream) + getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) + getstream._download = types.MethodType(moc_download, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + name='test' + stream_info = None + start = getstream.start(stream_info,name) + self.clock.advance(1) + self.clock.advance(1) + self.clock.advance(1) + with self.assertRaises(DownloadDataTimeout): + yield start + self.assertTrue(getstream.pay_key_fee_called) @defer.inlineCallbacks def test_finish_one_blob(self): From 50b51569a3d5b919f206a9c2bbe2365cdaba79c2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:51:20 -0400 Subject: [PATCH 10/15] cancel streams on shutdown --- lbrynet/daemon/Daemon.py | 8 ++++++++ lbrynet/daemon/Downloader.py | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 521134f87..1155f0cab 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -392,6 +392,11 @@ class Daemon(AuthJSONRPCServer): def _already_shutting_down(sig_num, frame): log.info("Already shutting down") + def _stop_streams(self): + """stop pending GetStream downloads""" + for claim_id, stream in self.streams.iteritems(): + stream.cancel(reason="daemon shutdown") + def _shutdown(self): # ignore INT/TERM signals once shutdown has started signal.signal(signal.SIGINT, self._already_shutting_down) @@ -399,6 +404,9 @@ class Daemon(AuthJSONRPCServer): log.info("Closing lbrynet session") log.info("Status at time of shutdown: " + self.startup_status[0]) + + self._stop_streams() + self.looping_call_manager.shutdown() if self.analytics_manager: self.analytics_manager.shutdown() diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 626c79947..53a164d58 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -151,9 +151,7 @@ class GetStream(object): def fail(self, err): safe_stop_looping_call(self.checker) - if not err.check(DownloadDataTimeout): - raise err - return DownloadCanceledError() + raise err @defer.inlineCallbacks def _initialize(self, stream_info): @@ -184,8 +182,7 @@ class GetStream(object): log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() - self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), - self.fail) + self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) @defer.inlineCallbacks def start(self, stream_info, name): @@ -215,3 +212,13 @@ class GetStream(object): raise err defer.returnValue((self.downloader, self.finished_deferred)) + + def cancel(self, reason=None): + if reason: + msg = "download stream cancelled: %s" % reason + else: + msg = "download stream cancelled" + if self.finished_deferred and not self.finished_deferred.called: + self.finished_deferred.errback(DownloadCanceledError(msg)) + if self.data_downloading_deferred and not self.data_downloading_deferred.called: + self.data_downloading_deferred.errback(DownloadCanceledError(msg)) From 82a2805aaf663293e96e143915c69803871e2160 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:52:30 -0400 Subject: [PATCH 11/15] json blobs --- lbrynet/daemon/Daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 1155f0cab..5aba3c7d2 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -631,7 +631,7 @@ class Daemon(AuthJSONRPCServer): host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) if host: blobs[blob_num] = host - report["blobs"] = blobs + report["blobs"] = json.dumps(blobs) defer.returnValue(report) @defer.inlineCallbacks From 9fd60c823f9169039ff37b379cbb3ea71f9d243b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:53:12 -0400 Subject: [PATCH 12/15] add _download_failed errback --- lbrynet/daemon/Daemon.py | 43 ++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 5aba3c7d2..0bb3338bd 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -635,18 +635,25 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(report) @defer.inlineCallbacks - def _download_finished(self, download_id, name, claim_dict): - report = yield self._get_stream_analytics_report(claim_dict) - self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) - - @defer.inlineCallbacks - def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None, - delete_on_timeout=True): + def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ + @defer.inlineCallbacks + def _download_finished(download_id, name, claim_dict): + log.info("Finished: %s", name) + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + + @defer.inlineCallbacks + def _download_failed(error, download_id, name, claim_dict): + log.warning("Failed %s: %s", name, error) + report = yield self._get_stream_analytics_report(claim_dict) + self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, + report) + if claim_id in self.streams: downloader = self.streams[claim_id] result = yield downloader.finished_deferred @@ -662,18 +669,20 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) - finished_deferred.addCallback(lambda _: self._download_finished(download_id, name, - claim_dict)) + finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, + claim_dict), + lambda e: _download_failed(e, download_id, name, + claim_dict)) + result = yield self._get_lbry_file_dict(lbry_file, full_status=True) - except (DownloadDataTimeout, DownloadSDTimeout) as err: - log.warning('Failed to get %s (%s)', name, err) - report = yield self._get_stream_analytics_report(claim_dict) - if isinstance(err, DownloadDataTimeout) and delete_on_timeout: - yield self.lbry_file_manager.delete_lbry_file(self.streams[claim_id].downloader) - elif self.streams[claim_id].downloader: + except Exception as err: + if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)): + log.warning('Failed to get %s (%s)', name, err) + else: + log.error('Failed to get %s (%s)', name, err) + if self.streams[claim_id].downloader: yield self.streams[claim_id].downloader.stop(err) - self.analytics_manager.send_download_errored(err, download_id, name, claim_dict, - report) + yield _download_failed(err, download_id, name, claim_dict) result = {'error': err.message} del self.streams[claim_id] defer.returnValue(result) From b7bfb259e527eb7b08fa43a80768ce9703c8ea49 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 13:54:01 -0400 Subject: [PATCH 13/15] fix download exceptions --- lbrynet/daemon/Daemon.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 0bb3338bd..6ab358bcf 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -612,14 +612,17 @@ class Daemon(AuthJSONRPCServer): sd_hash = claim_dict.source_hash try: stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) - except NoSuchSDHash: + except Exception: stream_hash = None report = { "sd_hash": sd_hash, "stream_hash": stream_hash, } blobs = {} - sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + try: + sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + except Exception: + sd_host = None report["sd_blob"] = sd_host if stream_hash: blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) @@ -628,7 +631,10 @@ class Daemon(AuthJSONRPCServer): blob_infos = [] report["known_blobs"] = 0 for blob_hash, blob_num, iv, length in blob_infos: - host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + try: + host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) + except Exception: + host = None if host: blobs[blob_num] = host report["blobs"] = json.dumps(blobs) @@ -643,13 +649,11 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def _download_finished(download_id, name, claim_dict): - log.info("Finished: %s", name) report = yield self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) @defer.inlineCallbacks def _download_failed(error, download_id, name, claim_dict): - log.warning("Failed %s: %s", name, error) report = yield self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, report) @@ -684,7 +688,8 @@ class Daemon(AuthJSONRPCServer): yield self.streams[claim_id].downloader.stop(err) yield _download_failed(err, download_id, name, claim_dict) result = {'error': err.message} - del self.streams[claim_id] + finally: + del self.streams[claim_id] defer.returnValue(result) @defer.inlineCallbacks From c6db4b187aa1a41a330310dccf1720cb4b5a8e51 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 15:45:24 -0400 Subject: [PATCH 14/15] fix error name --- lbrynet/analytics.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lbrynet/analytics.py b/lbrynet/analytics.py index 855bcfcee..4b41e0804 100644 --- a/lbrynet/analytics.py +++ b/lbrynet/analytics.py @@ -171,11 +171,15 @@ class Manager(object): @staticmethod def _download_error_properties(error, id_, name, claim_dict, report): + def error_name(err): + if not hasattr(type(err), "__name__"): + return str(type(err)) + return type(err).__name__ return { 'download_id': id_, 'name': name, 'stream_info': claim_dict.source_hash, - 'error': str(type(error)), + 'error': error_name(error), 'reason': error.message, 'report': report } From 2ebb9da10833f8e67424ae31ce1558107a33d699 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 28 Sep 2017 15:46:02 -0400 Subject: [PATCH 15/15] move download_failed --- lbrynet/daemon/Daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 6ab358bcf..63d7d1461 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -680,13 +680,13 @@ class Daemon(AuthJSONRPCServer): result = yield self._get_lbry_file_dict(lbry_file, full_status=True) except Exception as err: + yield _download_failed(err, download_id, name, claim_dict) if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)): log.warning('Failed to get %s (%s)', name, err) else: log.error('Failed to get %s (%s)', name, err) if self.streams[claim_id].downloader: yield self.streams[claim_id].downloader.stop(err) - yield _download_failed(err, download_id, name, claim_dict) result = {'error': err.message} finally: del self.streams[claim_id]