fixed long string concatenation and merged with master

This commit is contained in:
Akinwale Ariwodola 2017-10-02 17:13:45 +01:00
commit 63dd57aaea
25 changed files with 357 additions and 174 deletions

View file

@ -19,6 +19,7 @@ at anytime.
### Fixed ### Fixed
* Fixed handling cancelled blob and availability requests * Fixed handling cancelled blob and availability requests
* Fixed redundant blob requests to a peer * Fixed redundant blob requests to a peer
* Fixed https://github.com/lbryio/lbry/issues/923
### Deprecated ### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
@ -45,6 +46,20 @@ at anytime.
* Removed unused `EncryptedFileOpener` * Removed unused `EncryptedFileOpener`
## [0.16.3] - 2017-09-28
### Fixed
* Fixed blob download history
### Changed
* Improved download analytics
* Improved download errors by distinguishing a data timeout from a sd timeout
## [0.16.2] - 2017-09-26
### Fixed
* Fixed https://github.com/lbryio/lbry/issues/771 (handle when a certificate is missing for a signed claim in `claim_list_mine`)
## [0.16.1] - 2017-09-20 ## [0.16.1] - 2017-09-20
### Fixed ### Fixed
* Fixed `transaction_list` doc string * Fixed `transaction_list` doc string
@ -53,6 +68,7 @@ at anytime.
### Changed ### Changed
* Bumped `lbryum` requirement to 3.1.8 [see changelog](https://github.com/lbryio/lbryum/blob/master/CHANGELOG.md#318---2017-09-20) * Bumped `lbryum` requirement to 3.1.8 [see changelog](https://github.com/lbryio/lbryum/blob/master/CHANGELOG.md#318---2017-09-20)
## [0.16.0] - 2017-09-18 ## [0.16.0] - 2017-09-18
### Fixed ### Fixed
* Fixed uncaught error when shutting down after a failed daemon startup * Fixed uncaught error when shutting down after a failed daemon startup

View file

@ -1,6 +1,6 @@
import logging import logging
__version__ = "0.17.0rc5" __version__ = "0.17.0rc9"
version = tuple(__version__.split('.')) version = tuple(__version__.split('.'))
logging.getLogger(__name__).addHandler(logging.NullHandler()) logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -57,15 +57,14 @@ class Manager(object):
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict)) self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
) )
def send_download_errored(self, id_, name, claim_dict=None): def send_download_errored(self, err, id_, name, claim_dict, report):
self.analytics_api.track( download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
self._event(DOWNLOAD_ERRORED, self._download_properties(id_, name, claim_dict)) report)
) self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
def send_download_finished(self, id_, name, claim_dict=None): def send_download_finished(self, id_, name, report, claim_dict=None):
self.analytics_api.track( download_properties = self._download_properties(id_, name, claim_dict, report)
self._event(DOWNLOAD_FINISHED, self._download_properties(id_, name, claim_dict)) self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))
)
def send_claim_action(self, action): def send_claim_action(self, action):
self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action})) self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))
@ -159,18 +158,31 @@ class Manager(object):
return properties return properties
@staticmethod @staticmethod
def _download_properties(id_, name, claim_dict=None): def _download_properties(id_, name, claim_dict=None, report=None):
sd_hash = None sd_hash = None if not claim_dict else claim_dict.source_hash
if claim_dict: p = {
try:
sd_hash = claim_dict.source_hash
except (KeyError, TypeError, ValueError):
log.debug('Failed to get sd_hash from %s', claim_dict, exc_info=True)
return {
'download_id': id_, 'download_id': id_,
'name': name, 'name': name,
'stream_info': sd_hash 'stream_info': sd_hash
} }
if report:
p['report'] = report
return p
@staticmethod
def _download_error_properties(error, id_, name, claim_dict, report):
def error_name(err):
if not hasattr(type(err), "__name__"):
return str(type(err))
return type(err).__name__
return {
'download_id': id_,
'name': name,
'stream_info': claim_dict.source_hash,
'error': error_name(error),
'reason': error.message,
'report': report
}
@staticmethod @staticmethod
def _make_context(platform, wallet): def _make_context(platform, wallet):

View file

@ -9,7 +9,7 @@ from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.utils import is_valid_blobhash
from lbrynet.blob.writer import HashBlobWriter from lbrynet.blob.writer import HashBlobWriter
from lbrynet.blob.reader import HashBlobReader from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -75,19 +75,13 @@ class BlobFile(object):
""" """
open blob for reading open blob for reading
returns a file handle that can be read() from. returns a file like object that can be read() from, and closed() when
once finished with the file handle, user must call close_read_handle() finished
otherwise blob cannot be deleted.
""" """
if self._verified is True: if self._verified is True:
file_handle = None reader = HashBlobReader(self.file_path, self.reader_finished)
try:
file_handle = open(self.file_path, 'rb')
self.readers += 1 self.readers += 1
return file_handle return reader
except IOError:
log.exception('Failed to open %s', self.file_path)
self.close_read_handle(file_handle)
return None return None
def delete(self): def delete(self):
@ -150,12 +144,16 @@ class BlobFile(object):
return False return False
def read(self, write_func): def read(self, write_func):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
def close_self(*args): def close_self(*args):
self.close_read_handle(file_handle) self.close_read_handle(file_handle)
return args[0] return args[0]
file_sender = FileSender() file_sender = FileSender()
reader = HashBlobReader(write_func) reader = HashBlobReader_v0(write_func)
file_handle = self.open_for_reading() file_handle = self.open_for_reading()
if file_handle is not None: if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader) d = file_sender.beginFileTransfer(file_handle, reader)
@ -164,6 +162,19 @@ class BlobFile(object):
d = defer.fail(IOError("Could not read the blob")) d = defer.fail(IOError("Could not read the blob"))
return d return d
def close_read_handle(self, file_handle):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
if file_handle is not None:
file_handle.close()
self.readers -= 1
def reader_finished(self, reader):
self.readers -= 1
return defer.succeed(True)
def writer_finished(self, writer, err=None): def writer_finished(self, writer, err=None):
def fire_finished_deferred(): def fire_finished_deferred():
self._verified = True self._verified = True
@ -208,11 +219,6 @@ class BlobFile(object):
d.addBoth(lambda _: writer.close_handle()) d.addBoth(lambda _: writer.close_handle())
return d return d
def close_read_handle(self, file_handle):
if file_handle is not None:
file_handle.close()
self.readers -= 1
@defer.inlineCallbacks @defer.inlineCallbacks
def _save_verified_blob(self, writer): def _save_verified_blob(self, writer):
with self.setting_verified_blob_lock: with self.setting_verified_blob_lock:

View file

@ -5,7 +5,11 @@ from zope.interface import implements
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class HashBlobReader(object): class HashBlobReader_v0(object):
"""
This is a class that is only used in StreamBlobDecryptor
and should be deprecated
"""
implements(interfaces.IConsumer) implements(interfaces.IConsumer)
def __init__(self, write_func): def __init__(self, write_func):
@ -28,3 +32,28 @@ class HashBlobReader(object):
self.write_func(data) self.write_func(data)
if self.streaming is False: if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing) reactor.callLater(0, self.producer.resumeProducing)
class HashBlobReader(object):
"""
This is a file like reader class that supports
read(size) and close()
"""
def __init__(self, file_path, finished_cb):
self.finished_cb = finished_cb
self.finished_cb_d = None
self.read_handle = open(file_path, 'rb')
def __del__(self):
self.close()
def read(self, size=-1):
return self.read_handle.read(size)
def close(self):
# if we've already closed and called finished_cb, do nothing
if self.finished_cb_d is not None:
return
self.read_handle.close()
self.finished_cb_d = self.finished_cb(self)

View file

@ -128,6 +128,16 @@ class DiskBlobManager(DHTHashSupplier):
d = self._add_blob_to_download_history(blob_hash, host, rate) d = self._add_blob_to_download_history(blob_hash, host, rate)
return d return d
@defer.inlineCallbacks
def get_host_downloaded_from(self, blob_hash):
query_str = "SELECT host FROM download WHERE blob=? ORDER BY ts DESC LIMIT 1"
host = yield self.db_conn.runQuery(query_str, (blob_hash,))
if host:
result = host[0][0]
else:
result = None
defer.returnValue(result)
def add_blob_to_upload_history(self, blob_hash, host, rate): def add_blob_to_upload_history(self, blob_hash, host, rate):
d = self._add_blob_to_upload_history(blob_hash, host, rate) d = self._add_blob_to_upload_history(blob_hash, host, rate)
return d return d

View file

@ -9,11 +9,26 @@ class DuplicateStreamHashError(Exception):
class DownloadCanceledError(Exception): class DownloadCanceledError(Exception):
pass pass
class DownloadSDTimeout(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download sd blob {} within timeout'.format(download))
self.download = download
class DownloadTimeoutError(Exception): class DownloadTimeoutError(Exception):
def __init__(self, download): def __init__(self, download):
Exception.__init__(self, 'Failed to download {} within timeout'.format(download)) Exception.__init__(self, 'Failed to download {} within timeout'.format(download))
self.download = download self.download = download
class DownloadDataTimeout(Exception):
def __init__(self, download):
Exception.__init__(self, 'Failed to download data blobs for sd hash '
'{} within timeout'.format(download))
self.download = download
class RequestCanceledError(Exception): class RequestCanceledError(Exception):
pass pass

View file

@ -55,7 +55,7 @@ class BlobStreamDescriptorReader(StreamDescriptorReader):
f = self.blob.open_for_reading() f = self.blob.open_for_reading()
if f is not None: if f is not None:
raw_data = f.read() raw_data = f.read()
self.blob.close_read_handle(f) f.close()
return raw_data return raw_data
else: else:
raise ValueError("Could not open the blob for reading") raise ValueError("Could not open the blob for reading")

View file

@ -516,8 +516,6 @@ class DownloadRequest(RequestHelper):
def _pay_or_cancel_payment(self, arg, reserved_points, blob): def _pay_or_cancel_payment(self, arg, reserved_points, blob):
if self._can_pay_peer(blob, arg): if self._can_pay_peer(blob, arg):
self._pay_peer(blob.length, reserved_points) self._pay_peer(blob.length, reserved_points)
d = self.requestor.blob_manager.add_blob_to_download_history(
str(blob), str(self.peer.host), float(self.protocol_prices[self.protocol]))
else: else:
self._cancel_points(reserved_points) self._cancel_points(reserved_points)
return arg return arg
@ -567,8 +565,11 @@ class DownloadRequest(RequestHelper):
self.peer.update_stats('blobs_downloaded', 1) self.peer.update_stats('blobs_downloaded', 1)
self.peer.update_score(5.0) self.peer.update_score(5.0)
should_announce = blob.blob_hash == self.head_blob_hash should_announce = blob.blob_hash == self.head_blob_hash
self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
return arg d.addCallback(lambda _: self.requestor.blob_manager.add_blob_to_download_history(
blob.blob_hash, self.peer.host, self.protocol_prices[self.protocol]))
d.addCallback(lambda _: arg)
return d
def _download_failed(self, reason): def _download_failed(self, reason):
if not reason.check(DownloadCanceledError, PriceDisagreementError): if not reason.check(DownloadCanceledError, PriceDisagreementError):

View file

@ -5,7 +5,7 @@ from lbrynet.core.BlobInfo import BlobInfo
from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.ConnectionManager import ConnectionManager from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.core.client.DownloadManager import DownloadManager from lbrynet.core.client.DownloadManager import DownloadManager
from lbrynet.core.Error import InvalidBlobHashError, DownloadTimeoutError from lbrynet.core.Error import InvalidBlobHashError, DownloadSDTimeout
from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call from lbrynet.core.utils import is_valid_blobhash, safe_start_looping_call, safe_stop_looping_call
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.internet import defer from twisted.internet import defer
@ -136,7 +136,7 @@ class StandaloneBlobDownloader(object):
def _download_timedout(self): def _download_timedout(self):
self.stop() self.stop()
if not self.finished_deferred.called: if not self.finished_deferred.called:
self.finished_deferred.errback(DownloadTimeoutError(self.blob_hash)) self.finished_deferred.errback(DownloadSDTimeout(self.blob_hash))
def insufficient_funds(self, err): def insufficient_funds(self, err):
self.stop() self.stop()

View file

@ -89,7 +89,7 @@ class BlobRequestHandler(object):
def cancel_send(self, err): def cancel_send(self, err):
if self.currently_uploading is not None: if self.currently_uploading is not None:
self.currently_uploading.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.currently_uploading = None self.currently_uploading = None
return err return err
@ -162,7 +162,7 @@ class BlobRequestHandler(object):
def record_transaction(self, blob): def record_transaction(self, blob):
d = self.blob_manager.add_blob_to_upload_history( d = self.blob_manager.add_blob_to_upload_history(
str(blob), self.peer.host, self.blob_data_payment_rate) blob.blob_hash, self.peer.host, self.blob_data_payment_rate)
return d return d
def _reply_to_send_request(self, response, incoming): def _reply_to_send_request(self, response, incoming):
@ -225,7 +225,7 @@ class BlobRequestHandler(object):
def set_not_uploading(reason=None): def set_not_uploading(reason=None):
if self.currently_uploading is not None: if self.currently_uploading is not None:
self.currently_uploading.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.currently_uploading = None self.currently_uploading = None
self.file_sender = None self.file_sender = None

View file

@ -46,7 +46,7 @@ from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError from lbrynet.core.Error import NullFundsError, NegativeFundsError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -392,6 +392,11 @@ class Daemon(AuthJSONRPCServer):
def _already_shutting_down(sig_num, frame): def _already_shutting_down(sig_num, frame):
log.info("Already shutting down") log.info("Already shutting down")
def _stop_streams(self):
"""stop pending GetStream downloads"""
for claim_id, stream in self.streams.iteritems():
stream.cancel(reason="daemon shutdown")
def _shutdown(self): def _shutdown(self):
# ignore INT/TERM signals once shutdown has started # ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down) signal.signal(signal.SIGINT, self._already_shutting_down)
@ -399,6 +404,9 @@ class Daemon(AuthJSONRPCServer):
log.info("Closing lbrynet session") log.info("Closing lbrynet session")
log.info("Status at time of shutdown: " + self.startup_status[0]) log.info("Status at time of shutdown: " + self.startup_status[0])
self._stop_streams()
self.looping_call_manager.shutdown() self.looping_call_manager.shutdown()
if self.analytics_manager: if self.analytics_manager:
self.analytics_manager.shutdown() self.analytics_manager.shutdown()
@ -599,6 +607,39 @@ class Daemon(AuthJSONRPCServer):
timeout = timeout or 30 timeout = timeout or 30
return download_sd_blob(self.session, blob_hash, rate_manager, timeout) return download_sd_blob(self.session, blob_hash, rate_manager, timeout)
@defer.inlineCallbacks
def _get_stream_analytics_report(self, claim_dict):
sd_hash = claim_dict.source_hash
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
except Exception:
stream_hash = None
report = {
"sd_hash": sd_hash,
"stream_hash": stream_hash,
}
blobs = {}
try:
sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash)
except Exception:
sd_host = None
report["sd_blob"] = sd_host
if stream_hash:
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
report["known_blobs"] = len(blob_infos)
else:
blob_infos = []
report["known_blobs"] = 0
# for blob_hash, blob_num, iv, length in blob_infos:
# try:
# host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash)
# except Exception:
# host = None
# if host:
# blobs[blob_num] = host
# report["blobs"] = json.dumps(blobs)
defer.returnValue(report)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None):
""" """
@ -606,6 +647,17 @@ class Daemon(AuthJSONRPCServer):
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
""" """
@defer.inlineCallbacks
def _download_finished(download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
@defer.inlineCallbacks
def _download_failed(error, download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
report)
if claim_id in self.streams: if claim_id in self.streams:
downloader = self.streams[claim_id] downloader = self.streams[claim_id]
result = yield downloader.finished_deferred result = yield downloader.finished_deferred
@ -621,17 +673,23 @@ class Daemon(AuthJSONRPCServer):
file_name) file_name)
try: try:
lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name)
finished_deferred.addCallback( finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name,
lambda _: self.analytics_manager.send_download_finished(download_id, claim_dict),
name, lambda e: _download_failed(e, download_id, name,
claim_dict)) claim_dict))
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
del self.streams[claim_id]
except Exception as err: except Exception as err:
log.warning('Failed to get %s: %s', name, err) yield _download_failed(err, download_id, name, claim_dict)
self.analytics_manager.send_download_errored(download_id, name, claim_dict) if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)):
del self.streams[claim_id] log.warning('Failed to get %s (%s)', name, err)
else:
log.error('Failed to get %s (%s)', name, err)
if self.streams[claim_id].downloader:
yield self.streams[claim_id].downloader.stop(err)
result = {'error': err.message} result = {'error': err.message}
finally:
del self.streams[claim_id]
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -2375,7 +2433,7 @@ class Daemon(AuthJSONRPCServer):
if encoding and encoding in decoders: if encoding and encoding in decoders:
blob_file = blob.open_for_reading() blob_file = blob.open_for_reading()
result = decoders[encoding](blob_file.read()) result = decoders[encoding](blob_file.read())
blob.close_read_handle(blob_file) blob_file.close()
else: else:
result = "Downloaded blob %s" % blob_hash result = "Downloaded blob %s" % blob_hash
@ -2624,7 +2682,7 @@ class Daemon(AuthJSONRPCServer):
def read_sd_blob(sd_blob): def read_sd_blob(sd_blob):
sd_blob_file = sd_blob.open_for_reading() sd_blob_file = sd_blob.open_for_reading()
decoded_sd_blob = json.loads(sd_blob_file.read()) decoded_sd_blob = json.loads(sd_blob_file.read())
sd_blob.close_read_handle(sd_blob_file) sd_blob_file.close()
return decoded_sd_blob return decoded_sd_blob
resolved_result = yield self.session.wallet.resolve(uri) resolved_result = yield self.session.wallet.resolve(uri)

View file

@ -5,11 +5,11 @@ from twisted.internet.task import LoopingCall
from lbryschema.fee import Fee from lbryschema.fee import Fee
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, DownloadTimeoutError from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet import conf from lbrynet import conf
INITIALIZING_CODE = 'initializing' INITIALIZING_CODE = 'initializing'
@ -61,27 +61,31 @@ class GetStream(object):
return os.path.join(self.download_directory, self.downloader.file_name) return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status): def _check_status(self, status):
stop_condition = (status.num_completed > 0 or if status.num_completed > 0 and not self.data_downloading_deferred.called:
status.running_status == ManagedEncryptedFileDownloader.STATUS_STOPPED)
if stop_condition and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True) self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called: if self.data_downloading_deferred.called:
safe_stop_looping_call(self.checker) safe_stop_looping_call(self.checker)
else: else:
log.info("Downloading stream data (%i seconds)", self.timeout_counter) log.debug("Waiting for stream data (%i seconds)", self.timeout_counter)
def check_status(self): def check_status(self):
""" """
Check if we've got the first data blob in the stream yet Check if we've got the first data blob in the stream yet
""" """
self.timeout_counter += 1 self.timeout_counter += 1
if self.timeout_counter >= self.timeout: if self.timeout_counter > self.timeout:
if not self.data_downloading_deferred.called: if not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(DownloadTimeoutError(self.file_name)) if self.downloader:
safe_stop_looping_call(self.checker) err = DownloadDataTimeout(self.sd_hash)
else: else:
err = DownloadSDTimeout(self.sd_hash)
self.data_downloading_deferred.errback(err)
safe_stop_looping_call(self.checker)
elif self.downloader:
d = self.downloader.status() d = self.downloader.status()
d.addCallback(self._check_status) d.addCallback(self._check_status)
else:
log.debug("Waiting for stream descriptor (%i seconds)", self.timeout_counter)
def convert_max_fee(self): def convert_max_fee(self):
currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount']
@ -150,6 +154,10 @@ class GetStream(object):
self._check_status(status) self._check_status(status)
defer.returnValue(self.download_path) defer.returnValue(self.download_path)
def fail(self, err):
safe_stop_looping_call(self.checker)
raise err
@defer.inlineCallbacks @defer.inlineCallbacks
def _initialize(self, stream_info): def _initialize(self, stream_info):
# Set sd_hash and return key_fee from stream_info # Set sd_hash and return key_fee from stream_info
@ -176,10 +184,9 @@ class GetStream(object):
def _download(self, sd_blob, name, key_fee): def _download(self, sd_blob, name, key_fee):
self.downloader = yield self._create_downloader(sd_blob) self.downloader = yield self._create_downloader(sd_blob)
yield self.pay_key_fee(key_fee, name) yield self.pay_key_fee(key_fee, name)
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallback(self.finish, name) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self, stream_info, name): def start(self, stream_info, name):
@ -195,18 +202,25 @@ class GetStream(object):
self.set_status(INITIALIZING_CODE, name) self.set_status(INITIALIZING_CODE, name)
key_fee = yield self._initialize(stream_info) key_fee = yield self._initialize(stream_info)
safe_start_looping_call(self.checker, 1)
self.set_status(DOWNLOAD_METADATA_CODE, name) self.set_status(DOWNLOAD_METADATA_CODE, name)
sd_blob = yield self._download_sd_blob() sd_blob = yield self._download_sd_blob()
yield self._download(sd_blob, name, key_fee) yield self._download(sd_blob, name, key_fee)
self.set_status(DOWNLOAD_RUNNING_CODE, name) self.set_status(DOWNLOAD_RUNNING_CODE, name)
safe_start_looping_call(self.checker, 1)
try: try:
yield self.data_downloading_deferred yield self.data_downloading_deferred
except Exception as err: except DownloadDataTimeout as err:
self.downloader.stop()
safe_stop_looping_call(self.checker) safe_stop_looping_call(self.checker)
raise raise err
defer.returnValue((self.downloader, self.finished_deferred)) defer.returnValue((self.downloader, self.finished_deferred))
def cancel(self, reason=None):
if reason:
msg = "download stream cancelled: %s" % reason
else:
msg = "download stream cancelled"
if self.data_downloading_deferred and not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(DownloadCanceledError(msg))

View file

@ -94,7 +94,7 @@ class BlobReflectorClient(Protocol):
def set_not_uploading(self): def set_not_uploading(self):
if self.next_blob_to_send is not None: if self.next_blob_to_send is not None:
self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.next_blob_to_send = None self.next_blob_to_send = None
self.file_sender = None self.file_sender = None
@ -105,6 +105,7 @@ class BlobReflectorClient(Protocol):
assert self.read_handle is not None, \ assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer" "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self) d = self.file_sender.beginFileTransfer(self.read_handle, self)
d.addCallback(lambda _: self.read_handle.close())
return d return d
def handle_handshake_response(self, response_dict): def handle_handshake_response(self, response_dict):

View file

@ -179,7 +179,7 @@ class EncryptedFileReflectorClient(Protocol):
def set_not_uploading(self): def set_not_uploading(self):
if self.next_blob_to_send is not None: if self.next_blob_to_send is not None:
log.debug("Close %s", self.next_blob_to_send) log.debug("Close %s", self.next_blob_to_send)
self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.next_blob_to_send = None self.next_blob_to_send = None
if self.file_sender is not None: if self.file_sender is not None:
@ -191,6 +191,7 @@ class EncryptedFileReflectorClient(Protocol):
assert self.read_handle is not None, \ assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer" "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self) d = self.file_sender.beginFileTransfer(self.read_handle, self)
d.addCallback(lambda _: self.read_handle.close())
return d return d
def handle_handshake_response(self, response_dict): def handle_handshake_response(self, response_dict):

View file

@ -219,35 +219,35 @@ class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker):
def __init__(self, blob_manager=None, peer_finder=None, dht_node=None): def __init__(self, blob_manager=None, peer_finder=None, dht_node=None):
self.availability = { self.availability = {
''.join(('91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb', '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb'
'4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7')): ['1.2.3.4'], '4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'],
''.join(('b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5', 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5'
'09138c99509a25423a4cef788d571dca7988e1dca69e6fa0')): ['1.2.3.4', '1.2.3.4'], '09138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'],
''.join(('6af95cd062b4a179576997ef1054c9d2120f8592eea045e9', '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9'
'667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd')): '667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd':
['1.2.3.4', '1.2.3.4', '1.2.3.4'], ['1.2.3.4', '1.2.3.4', '1.2.3.4'],
''.join(('6d8017aba362e5c5d0046625a039513419810a0397d72831', '6d8017aba362e5c5d0046625a039513419810a0397d72831'
'8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07')): '8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07':
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
''.join(('5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f', '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f'
'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd')): 'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd':
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
''.join(('d7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe', 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe'
'66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787')): '66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787':
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
''.join(('9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c', '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c'
'243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c')): '243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c':
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
''.join(('8c70d5e2f5c3a6085006198e5192d157a125d92e73787944', '8c70d5e2f5c3a6085006198e5192d157a125d92e73787944'
'72007a61947992768926513fc10924785bdb1761df3c37e6')): '72007a61947992768926513fc10924785bdb1761df3c37e6':
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4',
'1.2.3.4'], '1.2.3.4'],
''.join(('f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0', 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0'
'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0')): 'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0':
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4',
'1.2.3.4', '1.2.3.4'], '1.2.3.4', '1.2.3.4'],
''.join(('c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835', 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835'
'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c')): 'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c':
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4',
'1.2.3.4', '1.2.3.4', '1.2.3.4'], '1.2.3.4', '1.2.3.4', '1.2.3.4'],
} }
@ -272,22 +272,22 @@ create_stream_sd_file = {
{ {
'length': 2097152, 'length': 2097152,
'blob_num': 0, 'blob_num': 0,
'blob_hash': ''.join(('dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b', 'blob_hash': 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b'
'441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586')), '441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
'iv': '30303030303030303030303030303031' 'iv': '30303030303030303030303030303031'
}, },
{ {
'length': 2097152, 'length': 2097152,
'blob_num': 1, 'blob_num': 1,
'blob_hash': ''.join(('f4067522c1b49432a2a679512e3917144317caa1abba0c04', 'blob_hash': 'f4067522c1b49432a2a679512e3917144317caa1abba0c04'
'1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70')), '1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
'iv': '30303030303030303030303030303032' 'iv': '30303030303030303030303030303032'
}, },
{ {
'length': 1015056, 'length': 1015056,
'blob_num': 2, 'blob_num': 2,
'blob_hash': ''.join(('305486c434260484fcb2968ce0e963b72f81ba56c11b08b1', 'blob_hash': '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1'
'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9')), 'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
'iv': '30303030303030303030303030303033' 'iv': '30303030303030303030303030303033'
}, },
{'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'} {'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'}
@ -295,8 +295,8 @@ create_stream_sd_file = {
'stream_type': 'lbryfile', 'stream_type': 'lbryfile',
'key': '30313233343536373031323334353637', 'key': '30313233343536373031323334353637',
'suggested_file_name': '746573745f66696c65', 'suggested_file_name': '746573745f66696c65',
'stream_hash': ''.join(('6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d', 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d'
'315c0f998c4b3545c2dc60906122d94653c23b1898229e3f')) '315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'
} }

View file

@ -12,8 +12,8 @@ class BlobFileTest(unittest.TestCase):
self.db_dir, self.blob_dir = mk_db_and_blob_dir() self.db_dir, self.blob_dir = mk_db_and_blob_dir()
self.fake_content_len = 64 self.fake_content_len = 64
self.fake_content = bytearray('0'*self.fake_content_len) self.fake_content = bytearray('0'*self.fake_content_len)
self.fake_content_hash = ''.join(('53871b26a08e90cb62142f2a39f0b80de41792322b0ca560', self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca560' \
'2b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105')) '2b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105'
def tearDown(self): def tearDown(self):
rm_db_and_blob_dir(self.db_dir, self.blob_dir) rm_db_and_blob_dir(self.db_dir, self.blob_dir)
@ -43,9 +43,15 @@ class BlobFileTest(unittest.TestCase):
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
self.assertTrue(blob_file.verified) self.assertTrue(blob_file.verified)
f = blob_file.open_for_reading() f = blob_file.open_for_reading()
self.assertEqual(1, blob_file.readers)
c = f.read() c = f.read()
self.assertEqual(c, self.fake_content) self.assertEqual(c, self.fake_content)
# close reader
f.close()
self.assertEqual(0, blob_file.readers)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_delete(self): def test_delete(self):
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
@ -57,6 +63,21 @@ class BlobFileTest(unittest.TestCase):
blob_file = BlobFile(self.blob_dir, self.fake_content_hash) blob_file = BlobFile(self.blob_dir, self.fake_content_hash)
self.assertFalse(blob_file.verified) self.assertFalse(blob_file.verified)
@defer.inlineCallbacks
def test_delete_fail(self):
# deletes should fail if being written to
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
writer, finished_d = blob_file.open_for_writing(peer=1)
yield self.assertFailure(blob_file.delete(), ValueError)
writer.write(self.fake_content)
writer.close()
# deletes should fail if being read and not closed
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
self.assertTrue(blob_file.verified)
f = blob_file.open_for_reading()
yield self.assertFailure(blob_file.delete(), ValueError)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_too_much_write(self): def test_too_much_write(self):
# writing too much data should result in failure # writing too much data should result in failure

View file

@ -23,26 +23,26 @@ def get_random_sample(list_to_sample):
def calculate_negotation_turns(client_base, host_base, host_is_generous=True, def calculate_negotation_turns(client_base, host_base, host_is_generous=True,
client_is_generous=True): client_is_generous=True):
blobs = [ blobs = [
''.join(('b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5', 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5'
'09138c99509a25423a4cef788d571dca7988e1dca69e6fa0')), '09138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
''.join(('d7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe', 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe'
'66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787')), '66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
''.join(('5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f', '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f'
'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd')), 'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd',
''.join(('f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0', 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0'
'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0')), 'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0',
''.join(('9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c', '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c'
'243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c')), '243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c',
''.join(('91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb', '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb'
'4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7')), '4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7',
''.join(('6d8017aba362e5c5d0046625a039513419810a0397d72831', '6d8017aba362e5c5d0046625a039513419810a0397d72831'
'8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07')), '8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07',
''.join(('6af95cd062b4a179576997ef1054c9d2120f8592eea045e9', '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9'
'667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd')), '667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd',
''.join(('8c70d5e2f5c3a6085006198e5192d157a125d92e73787944', '8c70d5e2f5c3a6085006198e5192d157a125d92e73787944'
'72007a61947992768926513fc10924785bdb1761df3c37e6')), '72007a61947992768926513fc10924785bdb1761df3c37e6',
''.join(('c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835', 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835'
'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c')) 'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
] ]
host = mock.Mock() host = mock.Mock()

View file

@ -21,8 +21,8 @@ test_claim_dict = {
'version':'_0_0_1', 'version':'_0_0_1',
'claimType':'streamType', 'claimType':'streamType',
'stream':{'metadata':test_metadata, 'version':'_0_0_1', 'source': 'stream':{'metadata':test_metadata, 'version':'_0_0_1', 'source':
{'source': ''.join(('8655f713819344980a9a0d67b198344e2c462c90f813e86f', {'source': '8655f713819344980a9a0d67b198344e2c462c90f813e86f'
'0c63789ab0868031f25c54d0bb31af6658e997e2041806eb')), '0c63789ab0868031f25c54d0bb31af6658e997e2041806eb',
'sourceType': 'lbry_sd_hash', 'contentType': 'video/mp4', 'version': '_0_0_1'}, 'sourceType': 'lbry_sd_hash', 'contentType': 'video/mp4', 'version': '_0_0_1'},
}} }}

View file

@ -42,8 +42,8 @@ class DictDataStoreTest(unittest.TestCase):
self.ds.addPeerToBlob(key, value, now, now, 'node1') self.ds.addPeerToBlob(key, value, now, now, 'node1')
except Exception: except Exception:
import traceback import traceback
self.fail(''.join(('Failed writing the following data: key: "%s", ', self.fail('Failed writing the following data: key: "%s" '
'data: "%s"\n The error was: %s:')) % 'data: "%s"\n The error was: %s:' %
(key, value, traceback.format_exc(5))) (key, value, traceback.format_exc(5)))
# Verify writing (test query ability) # Verify writing (test query ability)
@ -91,13 +91,13 @@ class DictDataStoreTest(unittest.TestCase):
('val1', str(now - td), str(now))) ('val1', str(now - td), str(now)))
self.failIf( self.failIf(
'val2' in self.ds.getPeersForBlob(h1), 'val2' in self.ds.getPeersForBlob(h1),
''.join(('DataStore failed to delete an expired value! ', 'DataStore failed to delete an expired value! '
'Value %s, publish time %s, current time %s')) % 'Value %s, publish time %s, current time %s' %
('val2', str(now - td2), str(now))) ('val2', str(now - td2), str(now)))
self.failIf( self.failIf(
'val3' in self.ds.getPeersForBlob(h2), 'val3' in self.ds.getPeersForBlob(h2),
''.join(('DataStore failed to delete an expired value! ', 'DataStore failed to delete an expired value! '
'Value %s, publish time %s, current time %s')) % 'Value %s, publish time %s, current time %s' %
('val3', str(now - td2), str(now))) ('val3', str(now - td2), str(now)))
self.failUnless( self.failUnless(
'val4' in self.ds.getPeersForBlob(h2), 'val4' in self.ds.getPeersForBlob(h2),

View file

@ -63,8 +63,8 @@ class DefaultFormatTranslatorTest(unittest.TestCase):
for key in msgPrimitive: for key in msgPrimitive:
self.failUnlessEqual( self.failUnlessEqual(
translatedObj[key], msgPrimitive[key], translatedObj[key], msgPrimitive[key],
''.join(('Message object type %s not translated correctly into primitive on ', 'Message object type %s not translated correctly into primitive on '
'key "%s"; expected "%s", got "%s"')) % 'key "%s"; expected "%s", got "%s"' %
(msg.__class__.__name__, key, msgPrimitive[key], translatedObj[key])) (msg.__class__.__name__, key, msgPrimitive[key], translatedObj[key]))
def testFromPrimitive(self): def testFromPrimitive(self):
@ -78,8 +78,8 @@ class DefaultFormatTranslatorTest(unittest.TestCase):
for key in msg.__dict__: for key in msg.__dict__:
self.failUnlessEqual( self.failUnlessEqual(
msg.__dict__[key], translatedObj.__dict__[key], msg.__dict__[key], translatedObj.__dict__[key],
''.join(('Message instance variable "%s" not translated correctly; ', 'Message instance variable "%s" not translated correctly; '
'expected "%s", got "%s"')) % 'expected "%s", got "%s"' %
(key, msg.__dict__[key], translatedObj.__dict__[key])) (key, msg.__dict__[key], translatedObj.__dict__[key]))

View file

@ -42,8 +42,8 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
"license_url": "fake license url", "license_url": "fake license url",
"nsfw": False, "nsfw": False,
"sources": { "sources": {
"lbry_sd_hash": ''.join(('d2b8b6e907dde95245fe6d144d16c2fdd60c4e0c6463ec98', "lbry_sd_hash": 'd2b8b6e907dde95245fe6d144d16c2fdd60c4e0c6463ec98'
'b85642d06d8e9414e8fcfdcb7cb13532ec5454fb8fe7f280')) 'b85642d06d8e9414e8fcfdcb7cb13532ec5454fb8fe7f280'
}, },
"thumbnail": "fake thumbnail", "thumbnail": "fake thumbnail",
"title": "fake title", "title": "fake title",

View file

@ -4,7 +4,7 @@ from twisted.trial import unittest
from twisted.internet import defer, task from twisted.internet import defer, task
from lbrynet.core import Session, PaymentRateManager, Wallet from lbrynet.core import Session, PaymentRateManager, Wallet
from lbrynet.core.Error import DownloadTimeoutError from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
from lbrynet.daemon import Downloader from lbrynet.daemon import Downloader
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
@ -15,6 +15,7 @@ from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from lbrynet.tests.mocks import mock_conf_settings from lbrynet.tests.mocks import mock_conf_settings
class MocDownloader(object): class MocDownloader(object):
def __init__(self): def __init__(self):
self.finish_deferred = defer.Deferred(None) self.finish_deferred = defer.Deferred(None)
@ -27,8 +28,8 @@ class MocDownloader(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def status(self): def status(self):
out = yield EncryptedFileStatusReport(self.name, self.num_completed, self.num_known, out = yield EncryptedFileStatusReport(
self.running_status) self.name, self.num_completed, self.num_known, self.running_status)
defer.returnValue(out) defer.returnValue(out)
def start(self): def start(self):
@ -39,8 +40,8 @@ class MocDownloader(object):
self.finish_deferred.callback(True) self.finish_deferred.callback(True)
def moc_initialize(self, stream_info): def moc_initialize(self, stream_info):
self.sd_hash = ''.join(('d5169241150022f996fa7cd6a9a1c421937276a3275eb912', self.sd_hash = "d5169241150022f996fa7cd6a9a1c421937276a3275eb912" \
'790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b')) "790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b"
return None return None
def moc_download_sd_blob(self): def moc_download_sd_blob(self):
@ -102,7 +103,7 @@ class GetStreamTests(unittest.TestCase):
DownloadTimeoutError is raised DownloadTimeoutError is raised
""" """
def download_sd_blob(self): def download_sd_blob(self):
raise DownloadTimeoutError(self.file_name) raise DownloadSDTimeout(self.file_name)
getstream = self.init_getstream_with_mocs() getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream) getstream._initialize = types.MethodType(moc_initialize, getstream)
@ -111,15 +112,14 @@ class GetStreamTests(unittest.TestCase):
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream)
name = 'test' name = 'test'
stream_info = None stream_info = None
with self.assertRaises(DownloadTimeoutError): with self.assertRaises(DownloadSDTimeout):
yield getstream.start(stream_info, name) yield getstream.start(stream_info, name)
self.assertFalse(getstream.pay_key_fee_called) self.assertFalse(getstream.pay_key_fee_called)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_timeout(self): def test_timeout(self):
""" """
test that timeout (set to 2 here) exception is raised test that timeout (set to 3 here) exception is raised
when download times out while downloading first blob, and key fee is paid when download times out while downloading first blob, and key fee is paid
""" """
getstream = self.init_getstream_with_mocs() getstream = self.init_getstream_with_mocs()
@ -132,9 +132,9 @@ class GetStreamTests(unittest.TestCase):
start = getstream.start(stream_info, name) start = getstream.start(stream_info, name)
self.clock.advance(1) self.clock.advance(1)
self.clock.advance(1) self.clock.advance(1)
with self.assertRaises(DownloadTimeoutError): self.clock.advance(1)
with self.assertRaises(DownloadDataTimeout):
yield start yield start
self.assertTrue(getstream.downloader.stop_called)
self.assertTrue(getstream.pay_key_fee_called) self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -159,21 +159,20 @@ class GetStreamTests(unittest.TestCase):
downloader, f_deferred = yield start downloader, f_deferred = yield start
self.assertTrue(getstream.pay_key_fee_called) self.assertTrue(getstream.pay_key_fee_called)
# @defer.inlineCallbacks
@defer.inlineCallbacks # def test_finish_stopped_downloader(self):
def test_finish_stopped_downloader(self): # """
""" # test that if we have a stopped downloader, beforfe a blob is downloaded,
test that if we have a stopped downloader, beforfe a blob is downloaded, # start() returns
start() returns # """
""" # getstream = self.init_getstream_with_mocs()
getstream = self.init_getstream_with_mocs() # getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._initialize = types.MethodType(moc_initialize, getstream) # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) # getstream._download = types.MethodType(moc_download, getstream)
getstream._download = types.MethodType(moc_download, getstream) # name='test'
name = 'test' # stream_info = None
stream_info = None # start = getstream.start(stream_info,name)
start = getstream.start(stream_info, name) #
# getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED
getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED # self.clock.advance(1)
self.clock.advance(1) # downloader, f_deferred = yield start
downloader, f_deferred = yield start

View file

@ -84,9 +84,9 @@ class LBRYioFeedTest(unittest.TestCase):
def test_handle_response(self): def test_handle_response(self):
feed = ExchangeRateManager.LBRYioFeed() feed = ExchangeRateManager.LBRYioFeed()
response = ''.join(('{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ', response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ', '\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}')) '\"success\": true, \"error\": null}'
out = yield feed._handle_response(response) out = yield feed._handle_response(response)
expected = 1.0 / 5.065289549855739e-05 expected = 1.0 / 5.065289549855739e-05
self.assertEqual(expected, out) self.assertEqual(expected, out)
@ -105,9 +105,9 @@ class LBRYioBTCFeedTest(unittest.TestCase):
def test_handle_response(self): def test_handle_response(self):
feed = ExchangeRateManager.LBRYioBTCFeed() feed = ExchangeRateManager.LBRYioBTCFeed()
response = ''.join(('{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ', response = '{\"data\": {\"fresh\": 0, \"lbc_usd\": 0.05863062523378918, ' \
'\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ', '\"lbc_btc\": 5.065289549855739e-05, \"btc_usd\": 1157.498}, ' \
'\"success\": true, \"error\": null}')) '\"success\": true, \"error\": null}'
out = yield feed._handle_response(response) out = yield feed._handle_response(response)
expected = 1.0 / 1157.498 expected = 1.0 / 1157.498
self.assertEqual(expected, out) self.assertEqual(expected, out)

View file

@ -59,7 +59,7 @@ def main(args=None):
use_upnp=False, use_upnp=False,
wallet=wallet wallet=wallet
) )
api = analytics.Api.new_instance() api = analytics.Api.new_instance(conf.settings['share_usage_data'])
run(args, session, api) run(args, session, api)
reactor.run() reactor.run()
finally: finally: