Merge branch 'download-errors'

This commit is contained in:
Jack Robison 2017-09-28 16:07:16 -04:00
commit 283dede833
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
12 changed files with 184 additions and 79 deletions

View file

@ -19,6 +19,7 @@ at anytime.
### Fixed
* Fixed handling cancelled blob and availability requests
* Fixed redundant blob requests to a peer
* Fixed blob download history
### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
@ -27,6 +28,8 @@ at anytime.
### Changed
* Announcing by head blob is turned on by default
* Updated reflector server dns
* Improved download analytics
* Improved download errors by distinguishing a data timeout from a sd timeout
### Added
* Added WAL pragma to sqlite3

View file

@ -57,15 +57,14 @@ class Manager(object):
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
)
def send_download_errored(self, id_, name, claim_dict=None):
self.analytics_api.track(
self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict))
)
def send_download_errored(self, err, id_, name, claim_dict, report):
download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
report)
self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
def send_download_finished(self, id_, name, claim_dict=None):
self.analytics_api.track(
self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict))
)
def send_download_finished(self, id_, name, report, claim_dict=None):
download_properties = self._download_properties(id_, name, claim_dict, report)
self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))
def send_claim_action(self, action):
self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))
@ -159,18 +158,31 @@ class Manager(object):
return properties
@staticmethod
def _download_properties(id_, name, claim_dict=None):
sd_hash = None
if claim_dict:
try:
sd_hash = claim_dict.source_hash
except (KeyError, TypeError, ValueError):
log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True)
return {
def _download_properties(id_, name, claim_dict=None, report=None):
sd_hash = None if not claim_dict else claim_dict.source_hash
p = {
'download_id': id_,
'name': name,
'stream_info': sd_hash
}
if report:
p['report'] = report
return p
@staticmethod
def _download_error_properties(error, id_, name, claim_dict, report):
def error_name(err):
if not hasattr(type(err), "__name__"):
return str(type(err))
return type(err).__name__
return {
'download_id': id_,
'name': name,
'stream_info': claim_dict.source_hash,
'error': error_name(error),
'reason': error.message,
'report': report
}
@staticmethod
def _make_context(platform, wallet):

View file

@ -128,6 +128,16 @@ class DiskBlobManager(DHTHashSupplier):
d = self._add_blob_to_download_history(blob_hash, host, rate)
return d
@defer.inlineCallbacks
def get_host_downloaded_from(self, blob_hash):
query_str = "SELECT host FROM download WHERE blob=? ORDER BY ts DESC LIMIT 1"
host = yield self.db_conn.runQuery(query_str, (blob_hash,))
if host:
result = host[0][0]
else:
result = None
defer.returnValue(result)
def add_blob_to_upload_history(self, blob_hash, host, rate):
d = self._add_blob_to_upload_history(blob_hash, host, rate)
return d

View file

@ -9,11 +9,26 @@ class DuplicateStreamHashError(Exception):
class DownloadCanceledError(Exception):
pass
class DownloadSDTimeout(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download sd blob {} within timeout'.format(download))
self.download = download
class DownloadTimeoutError(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download {} within timeout'.format(download))
self.download = download
class DownloadDataTimeout(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download data blobs for sd hash '
'{} within timeout'.format(download))
self.download = download
class RequestCanceledError(Exception):
pass

View file

@ -516,8 +516,6 @@ class DownloadRequest(RequestHelper):
def _pay_or_cancel_payment(self, arg, reserved_points, blob):
if self._can_pay_peer(blob, arg):
self._pay_peer(blob.length, reserved_points)
d = self.requestor.blob_manager.add_blob_to_download_history(
str(blob), str(self.peer.host), float(self.protocol_prices[self.protocol]))
else:
self._cancel_points(reserved_points)
return arg
@ -567,8 +565,11 @@ class DownloadRequest(RequestHelper):
self.peer.update_stats('blobs_downloaded', 1)
self.peer.update_score(5.0)
should_announce = blob.blob_hash == self.head_blob_hash
self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
return arg
d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
d.addCallback(lambda _: self.requestor.blob_manager.add_blob_to_download_history(
blob.blob_hash, self.peer.host, self.protocol_prices[self.protocol]))
d.addCallback(lambda _: arg)
return d
def _download_failed(self, reason):
if not reason.check(DownloadCanceledError, PriceDisagreementError):

View file

@ -5,7 +5,7 @@ from lbrynet.core.BlobInfo import BlobInfo
from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.core.client.DownloadManager import DownloadManager
from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError
from lbrynet.core.Error import InvalidBlobHashError, DownloadSDTimeout
from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call
from twisted.python.failure import Failure
from twisted.internet import defer
@ -136,7 +136,7 @@ class StandaloneBlobDownloader(object):
def _download_timedout(self):
self.stop()
if not self.finished_deferred.called:
self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash))
self.finished_deferred.errback(DownloadSDTimeout(self.blob_hash))
def insufficient_funds(self, err):
self.stop()

View file

@ -162,7 +162,7 @@ class BlobRequestHandler(object):
def record_transaction(self, blob):
d = self.blob_manager.add_blob_to_upload_history(
str(blob), self.peer.host, self.blob_data_payment_rate)
blob.blob_hash, self.peer.host, self.blob_data_payment_rate)
return d
def _reply_to_send_request(self, response, incoming):

View file

@ -46,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash
from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError
log = logging.getLogger(__name__)
@ -392,6 +392,11 @@ class Daemon(AuthJSONRPCServer):
def _already_shutting_down(sig_num, frame):
log.info("Already shutting down")
def _stop_streams(self):
"""stop pending GetStream downloads"""
for claim_id, stream in self.streams.iteritems():
stream.cancel(reason="daemon shutdown")
def _shutdown(self):
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
@ -399,6 +404,9 @@ class Daemon(AuthJSONRPCServer):
log.info("Closing lbrynet session")
log.info("Status at time of shutdown: " + self.startup_status[0])
self._stop_streams()
self.looping_call_manager.shutdown()
if self.analytics_manager:
self.analytics_manager.shutdown()
@ -599,6 +607,39 @@ class Daemon(AuthJSONRPCServer):
timeout = timeout or 30
return download_sd_blob(self.session, blob_hash, rate_manager, timeout)
@defer.inlineCallbacks
def _get_stream_analytics_report(self, claim_dict):
sd_hash = claim_dict.source_hash
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
except Exception:
stream_hash = None
report = {
"sd_hash": sd_hash,
"stream_hash": stream_hash,
}
blobs = {}
try:
sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash)
except Exception:
sd_host = None
report["sd_blob"] = sd_host
if stream_hash:
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
report["known_blobs"] = len(blob_infos)
else:
blob_infos = []
report["known_blobs"] = 0
for blob_hash, blob_num, iv, length in blob_infos:
try:
host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash)
except Exception:
host = None
if host:
blobs[blob_num] = host
report["blobs"] = json.dumps(blobs)
defer.returnValue(report)
@defer.inlineCallbacks
def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None):
"""
@ -606,6 +647,17 @@ class Daemon(AuthJSONRPCServer):
If it already exists in the file manager, return the existing lbry file
"""
@defer.inlineCallbacks
def _download_finished(download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
@defer.inlineCallbacks
def _download_failed(error, download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
report)
if claim_id in self.streams:
downloader = self.streams[claim_id]
result = yield downloader.finished_deferred
@ -621,17 +673,23 @@ class Daemon(AuthJSONRPCServer):
file_name)
try:
lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name)
finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(download_id,
name,
claim_dict))
finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name,
claim_dict),
lambda e: _download_failed(e, download_id, name,
claim_dict))
result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
del self.streams[claim_id]
except Exception as err:
log.warning('Failed to get %s: %s', name, err)
self.analytics_manager.send_download_errored(download_id, name, claim_dict)
del self.streams[claim_id]
yield _download_failed(err, download_id, name, claim_dict)
if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)):
log.warning('Failed to get %s (%s)', name, err)
else:
log.error('Failed to get %s (%s)', name, err)
if self.streams[claim_id].downloader:
yield self.streams[claim_id].downloader.stop(err)
result = {'error': err.message}
finally:
del self.streams[claim_id]
defer.returnValue(result)
@defer.inlineCallbacks

View file

@ -5,11 +5,11 @@ from twisted.internet.task import LoopingCall
from lbryschema.fee import Fee
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet import conf
INITIALIZING_CODE = 'initializing'
@ -61,23 +61,22 @@ class GetStream(object):
return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status):
stop_condition = (status.num_completed > 0 or
status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED)
if stop_condition and not self.data_downloading_deferred.called:
if status.num_completed > 0 and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called:
safe_stop_looping_call(self.checker)
else:
log.info("Downloading stream data (%i seconds)", self.timeout_counter)
log.info("Waiting for stream data (%i seconds)", self.timeout_counter)
def check_status(self):
"""
Check if we've got the first data blob in the stream yet
"""
self.timeout_counter += 1
if self.timeout_counter >= self.timeout:
if self.timeout_counter > self.timeout:
if not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name))
self.data_downloading_deferred.errback(DownloadDataTimeout(self.sd_hash))
safe_stop_looping_call(self.checker)
else:
d = self.downloader.status()
@ -150,6 +149,10 @@ class GetStream(object):
self._check_status(status)
defer.returnValue(self.download_path)
def fail(self, err):
safe_stop_looping_call(self.checker)
raise err
@defer.inlineCallbacks
def _initialize(self, stream_info):
# Set sd_hash and return key_fee from stream_info
@ -179,7 +182,7 @@ class GetStream(object):
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallback(self.finish, name)
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
@defer.inlineCallbacks
def start(self, stream_info, name):
@ -204,9 +207,18 @@ class GetStream(object):
try:
yield self.data_downloading_deferred
except Exception as err:
self.downloader.stop()
except DownloadDataTimeout as err:
safe_stop_looping_call(self.checker)
raise
raise err
defer.returnValue((self.downloader, self.finished_deferred))
def cancel(self, reason=None):
if reason:
msg = "download stream cancelled: %s" % reason
else:
msg = "download stream cancelled"
if self.finished_deferred and not self.finished_deferred.called:
self.finished_deferred.errback(DownloadCanceledError(msg))
if self.data_downloading_deferred and not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(DownloadCanceledError(msg))

View file

@ -59,7 +59,7 @@ def main(args=None):
use_upnp=False,
wallet=wallet
)
api = analytics.Api.new_instance()
api = analytics.Api.new_instance(conf.settings['share_usage_data'])
run(args, session, api)
reactor.run()
finally:

View file

@ -50,10 +50,10 @@ class BlobManagerTest(unittest.TestCase):
writer, finished_d = yield blob.open_for_writing(self.peer)
yield writer.write(data)
yield self.bm.blob_completed(blob)
yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data))
yield self.bm.add_blob_to_upload_history(blob_hash, 'test', len(data))
# check to see if blob is there
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hash)))
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash)))
blobs = yield self.bm.get_all_verified_blobs()
self.assertTrue(blob_hash in blobs)
defer.returnValue(blob_hash)

View file

@ -1,26 +1,22 @@
import types
import mock
import json
from twisted.trial import unittest
from twisted.internet import defer, task
from lbryschema.claim import ClaimDict
from lbrynet.core import Session, PaymentRateManager, Wallet
from lbrynet.core.Error import DownloadTimeoutError
from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
from lbrynet.daemon import Downloader
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier,StreamMetadata
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader, ManagedEncryptedFileDownloaderFactory
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
from tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from tests.mocks import BTCLBCFeed, USDBTCFeed
from tests.mocks import mock_conf_settings
class MocDownloader(object):
def __init__(self):
self.finish_deferred = defer.Deferred(None)
@ -106,7 +102,7 @@ class GetStreamTests(unittest.TestCase):
DownloadTimeoutError is raised
"""
def download_sd_blob(self):
raise DownloadTimeoutError(self.file_name)
raise DownloadSDTimeout(self.file_name)
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
@ -115,15 +111,14 @@ class GetStreamTests(unittest.TestCase):
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream)
name='test'
stream_info = None
with self.assertRaises(DownloadTimeoutError):
with self.assertRaises(DownloadSDTimeout):
yield getstream.start(stream_info,name)
self.assertFalse(getstream.pay_key_fee_called)
@defer.inlineCallbacks
def test_timeout(self):
"""
test that timeout (set to 2 here) exception is raised
test that timeout (set to 3 here) exception is raised
when download times out while downloading first blob, and key fee is paid
"""
getstream = self.init_getstream_with_mocs()
@ -136,9 +131,9 @@ class GetStreamTests(unittest.TestCase):
start = getstream.start(stream_info,name)
self.clock.advance(1)
self.clock.advance(1)
with self.assertRaises(DownloadTimeoutError):
self.clock.advance(1)
with self.assertRaises(DownloadDataTimeout):
yield start
self.assertTrue(getstream.downloader.stop_called)
self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks
@ -163,21 +158,20 @@ class GetStreamTests(unittest.TestCase):
downloader, f_deferred = yield start
self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks
def test_finish_stopped_downloader(self):
"""
test that if we have a stopped downloader, beforfe a blob is downloaded,
start() returns
"""
getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream)
name='test'
stream_info = None
start = getstream.start(stream_info,name)
getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED
self.clock.advance(1)
downloader, f_deferred = yield start
# @defer.inlineCallbacks
# def test_finish_stopped_downloader(self):
# """
# test that if we have a stopped downloader, beforfe a blob is downloaded,
# start() returns
# """
# getstream = self.init_getstream_with_mocs()
# getstream._initialize = types.MethodType(moc_initialize, getstream)
# getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
# getstream._download = types.MethodType(moc_download, getstream)
# name='test'
# stream_info = None
# start = getstream.start(stream_info,name)
#
# getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED
# self.clock.advance(1)
# downloader, f_deferred = yield start