From 39275682d90fd246971f75e2693e767852dfc90f Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Wed, 6 Jan 2016 00:50:50 -0500 Subject: [PATCH 1/4] show the estimated cost of a download and format the download size better --- lbrynet/lbryfilemanager/LBRYFileManager.py | 1 + lbrynet/lbrynet_console/ConsoleControl.py | 3 +- lbrynet/lbrynet_console/ControlHandlers.py | 47 ++++++++++++++++++++-- 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index f89f09793..fb4cb5b80 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -36,6 +36,7 @@ class LBRYFileManager(object): self.download_directory = os.path.join(os.path.expanduser("~"), 'Downloads') else: self.download_directory = os.getcwd() + log.debug("Download directory for LBRYFileManager: %s", str(self.download_directory)) def setup(self): d = self._open_db() diff --git a/lbrynet/lbrynet_console/ConsoleControl.py b/lbrynet/lbrynet_console/ConsoleControl.py index 0477154bd..8b6552e6e 100644 --- a/lbrynet/lbrynet_console/ConsoleControl.py +++ b/lbrynet/lbrynet_console/ConsoleControl.py @@ -37,7 +37,8 @@ class ConsoleControl(basic.LineReceiver): "your balance is showing 0 when you know it shouldn't be, it\n" "is likely that the culprit is the blockchain.\n\n" "You should have received 1000 LBC the first time you ran\n" - "this program. If you did not, let us know!\n\n" + "this program. If you did not, let us know! But first give\n" + "them a couple of minutes to show up.\n\n" "Welcome to lbrynet-console!") self.sendLine("") self.sendLine("Enter a command. Try 'get wonderfullife' or 'help' to see more options.") diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index 7099cae89..2856ae135 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -554,7 +554,8 @@ class AddStream(CommandHandler): if command in self.factory_choices: self.factory = self.factory_choices[command] self._start_download() - self.console.sendLine("Downloading in the background") + self.console.sendLine("Downloading in the background. Use the command 'status'\n" + "to check the status of the download.") self.finished_deferred.callback(None) else: self._show_factory_choices() @@ -639,8 +640,26 @@ class AddStream(CommandHandler): self._show_info_and_options() return self._show_factory_choices() + def _get_estimated_cost_string(self): + estimated_cost_string = "unknown LBC" + for option, option_value in zip(self.download_options, self.options_chosen): + if option.short_description == "data payment rate": + if option_value == None: + rate = self.payment_rate_manager.get_effective_min_blob_data_payment_rate() + else: + rate = option_value + stream_size = None + for field, val in self.metadata.validator.info_to_show(): + if field == "stream_size": + stream_size = int(val) + if stream_size is not None and rate is not None: + estimated_cost_string = str(stream_size * 1.0 / 2**20 * rate) + " LBC" + return estimated_cost_string + def _show_factory_choices(self): prompt = "\n" + prompt += "Estimated cost: " + self._get_estimated_cost_string() + prompt += "\n\n" for factory_choice_string in self.factory_choice_strings: prompt += factory_choice_string[1] + '\n' self.console.sendLine(str(prompt)) @@ -649,13 +668,35 @@ class AddStream(CommandHandler): #self.download_options = self.metadata.options.get_downloader_options(self.metadata.validator, # self.payment_rate_manager) prompt = "Stream info:\n" - for info_line in self._get_info_to_show(): - prompt += info_line[0] + ": " + info_line[1] + "\n" + for field_name, value in self._get_info_to_show(): + if field_name == "stream_size": + value = str(self._get_formatted_stream_size(int(value))) + prompt += field_name + ": " + value + "\n" prompt += "\nOptions:\n" for option in self.download_options: prompt += option.long_description + ": " + str(option.default_value_description) + "\n" self.console.sendLine(str(prompt)) + @staticmethod + def _get_formatted_stream_size(stream_size): + if isinstance(stream_size, (int, long)): + if stream_size >= 2**40: + units = "TB" + factor = 2**40 + elif stream_size >= 2**30: + units = "GB" + factor = 2**30 + elif stream_size >= 2**20: + units = "MB" + factor = 2**20 + elif stream_size >= 2**10: + units = "KB" + factor = 2**10 + else: + return str(stream_size) + " B" + return "%.1f %s" % (round((stream_size * 1.0 / factor), 1), units) + return stream_size + def _get_info_to_show(self): return self.metadata.validator.info_to_show() From 49869d071a9042ffdacaeca69b3a4004294fa807 Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Wed, 6 Jan 2016 00:56:45 -0500 Subject: [PATCH 2/4] show when lbrycrdd is being stopped and when it stops --- lbrynet/core/LBRYcrdWallet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lbrynet/core/LBRYcrdWallet.py b/lbrynet/core/LBRYcrdWallet.py index 34f6f9d36..aff9fd6ed 100644 --- a/lbrynet/core/LBRYcrdWallet.py +++ b/lbrynet/core/LBRYcrdWallet.py @@ -402,7 +402,9 @@ class LBRYcrdWallet(object): def _stop_daemon(self): if self.lbrycrdd is not None and self.started_lbrycrdd is True: + alert.info("Stopping lbrycrdd...") d = threads.deferToThread(self._rpc_stop) + d.addCallback(lambda _: alert.info("Stopped lbrycrdd.")) return d return defer.succeed(True) From ec1ba02ccaf5c4f2a5cfcd50a504d2477425b97c Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Wed, 6 Jan 2016 13:08:20 -0500 Subject: [PATCH 3/4] show where the log file is, and log failure to open file better --- lbrynet/lbryfile/client/LBRYFileDownloader.py | 13 ++++++++++++- lbrynet/lbrynet_console/ControlHandlers.py | 18 +++++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/lbrynet/lbryfile/client/LBRYFileDownloader.py b/lbrynet/lbryfile/client/LBRYFileDownloader.py index f8cf4a7c0..9799f1f01 100644 --- a/lbrynet/lbryfile/client/LBRYFileDownloader.py +++ b/lbrynet/lbryfile/client/LBRYFileDownloader.py @@ -12,6 +12,11 @@ from lbrynet.lbryfile.client.LBRYFileMetadataHandler import LBRYFileMetadataHand import os from twisted.internet import defer, threads, reactor from twisted.python.procutils import which +import logging +import traceback + + +log = logging.getLogger(__name__) class LBRYFileDownloader(CryptStreamDownloader): @@ -178,7 +183,13 @@ class LBRYFileSaver(LBRYFileDownloader): file_name + "_" + str(ext_num))): ext_num += 1 file_name = file_name + "_" + str(ext_num) - self.file_handle = open(os.path.join(self.download_directory, file_name), 'wb') + try: + self.file_handle = open(os.path.join(self.download_directory, file_name), 'wb') + except IOError: + log.error(traceback.format_exc()) + raise ValueError("Failed to open %s. Make sure you have permission to save files to that" + " location." % str(os.path.join(self.download_directory, + file_name))) return threads.deferToThread(open_file) def _close_output(self): diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index 2856ae135..710eab730 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -54,6 +54,14 @@ class InvalidValueError(Exception): # prompt_description = None +def get_log_file(): + log_file = "console.log" + logging_handlers = logging.getLogger().handlers + if len(logging_handlers): + log_file = logging_handlers[0].baseFilename + return log_file + + class RoundedTime(object): SECOND = 0 MINUTE = 1 @@ -611,9 +619,7 @@ class AddStream(CommandHandler): def _handle_load_failed(self, err): self.loading_failed = True log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback()) - log_file = "console.log" - if len(log.handlers): - log_file = log.handlers[0].baseFilename + log_file = get_log_file() self.console.sendLine("An unexpected error occurred attempting to load the stream's metadata.\n" "See %s for further details.\n\n" % log_file) self.finished_deferred.callback(None) @@ -771,10 +777,8 @@ class AddStream(CommandHandler): d.addErrback(self._log_recent_blockchain_time_error_download) else: log.error("An unexpected error has caused the download to stop: %s" % err.getTraceback()) - log_file = "console.log" - if len(log.handlers): - log_file = log.handlers[0].baseFilename - self.console.sendLine("An unexpected error has caused the download to stop. See %s for details." % log_file) + log_file = get_log_file() + self.console.sendLine("An unexpected error has caused the download to stop:\n%s\n\nSee %s for further details." % (err.getErrorMessage(), log_file)) def _make_downloader(self): return self.factory.make_downloader(self.metadata, self.options_chosen, From 2ddeca2976a0acaae6ba1795988ec10484ccb300 Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Sat, 16 Jan 2016 01:16:37 -0500 Subject: [PATCH 4/4] fix bug caused by downloading file twice and deleting one --- lbrynet/core/client/StreamProgressManager.py | 6 + .../client/CryptStreamDownloader.py | 15 +- lbrynet/lbryfile/client/LBRYFileDownloader.py | 25 +++ lbrynet/lbryfilemanager/LBRYFileCreator.py | 9 +- lbrynet/lbryfilemanager/LBRYFileDownloader.py | 7 +- lbrynet/lbryfilemanager/LBRYFileManager.py | 152 +++++++----------- lbrynet/lbrynet_console/ControlHandlers.py | 51 ++---- lbrynet/lbrynet_console/LBRYConsole.py | 2 +- tests/functional_tests.py | 117 ++++++++++++++ 9 files changed, 240 insertions(+), 144 deletions(-) diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index dd8cda00a..a82522530 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -118,6 +118,8 @@ class FullStreamProgressManager(StreamProgressManager): def finished_outputting_blob(): self.last_blob_outputted += 1 + + def check_if_finished(): final_blob_num = self.download_manager.final_blob_num() if final_blob_num is not None and final_blob_num == self.last_blob_outputted: self._finished_outputting() @@ -134,9 +136,13 @@ class FullStreamProgressManager(StreamProgressManager): d = self.download_manager.handle_blob(self.last_blob_outputted + 1) d.addCallback(lambda _: finished_outputting_blob()) d.addCallback(lambda _: self._finished_with_blob(current_blob_num)) + d.addCallback(lambda _: check_if_finished()) def log_error(err): log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage()) + if self.outputting_d is not None and not self.outputting_d.called: + self.outputting_d.callback(True) + self.outputting_d = None d.addErrback(log_error) else: diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index f40f082fe..72e10d60c 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -1,3 +1,4 @@ +import logging from zope.interface import implements from lbrynet.interfaces import IStreamDownloader from lbrynet.core.client.BlobRequester import BlobRequester @@ -9,6 +10,9 @@ from twisted.internet import defer from twisted.python.failure import Failure +log = logging.getLogger(__name__) + + class StartFailedError(Exception): pass @@ -79,10 +83,6 @@ class CryptStreamDownloader(object): def start(self): - def set_finished_deferred(): - self.finished_deferred = defer.Deferred() - return self.finished_deferred - if self.starting is True: raise CurrentlyStartingError() if self.stopping is True: @@ -92,8 +92,9 @@ class CryptStreamDownloader(object): assert self.download_manager is None self.starting = True self.completed = False + self.finished_deferred = defer.Deferred() d = self._start() - d.addCallback(lambda _: set_finished_deferred()) + d.addCallback(lambda _: self.finished_deferred) return d def stop(self, err=None): @@ -112,8 +113,8 @@ class CryptStreamDownloader(object): assert self.download_manager is not None self.stopping = True d = self.download_manager.stop_downloading() - self._fire_completed_deferred(err) d.addCallback(check_if_stop_succeeded) + d.addCallback(lambda _: self._fire_completed_deferred(err)) return d def _start_failed(self): @@ -203,6 +204,8 @@ class CryptStreamDownloader(object): d.errback(err) else: d.callback(self._get_finished_deferred_callback_value()) + else: + log.debug("Not firing the completed deferred because d is None") def _get_finished_deferred_callback_value(self): return None diff --git a/lbrynet/lbryfile/client/LBRYFileDownloader.py b/lbrynet/lbryfile/client/LBRYFileDownloader.py index 9799f1f01..3780192d2 100644 --- a/lbrynet/lbryfile/client/LBRYFileDownloader.py +++ b/lbrynet/lbryfile/client/LBRYFileDownloader.py @@ -46,6 +46,31 @@ class LBRYFileDownloader(CryptStreamDownloader): else: return defer.succeed(True) + def delete_data(self): + d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + + def get_blob_hashes(blob_infos): + return [b[0] for b in blob_infos if b[0] is not None] + + d1.addCallback(get_blob_hashes) + d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + + def combine_blob_hashes(results): + blob_hashes = [] + for success, result in results: + if success is True: + blob_hashes.extend(result) + return blob_hashes + + def delete_blobs(blob_hashes): + self.blob_manager.delete_blobs(blob_hashes) + return True + + dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) + dl.addCallback(combine_blob_hashes) + dl.addCallback(delete_blobs) + return dl + def stop(self, err=None): d = self._close_output() d.addCallback(lambda _: CryptStreamDownloader.stop(self, err=err)) diff --git a/lbrynet/lbryfilemanager/LBRYFileCreator.py b/lbrynet/lbryfilemanager/LBRYFileCreator.py index e214cca7d..1b0fcb5b0 100644 --- a/lbrynet/lbryfilemanager/LBRYFileCreator.py +++ b/lbrynet/lbryfilemanager/LBRYFileCreator.py @@ -36,7 +36,7 @@ class LBRYFileStreamCreator(CryptStreamCreator): log.debug("length: %s", str(blob_info.length)) self.blob_infos.append(blob_info) - def _save_lbry_file_info(self): + def _save_stream_info(self): stream_info_manager = self.lbry_file_manager.stream_info_manager d = stream_info_manager.save_stream(self.stream_hash, binascii.hexlify(self.name), binascii.hexlify(self.key), @@ -46,8 +46,6 @@ class LBRYFileStreamCreator(CryptStreamCreator): def setup(self): d = CryptStreamCreator.setup(self) - d.addCallback(lambda _: self.stream_hash) - return d def _get_blobs_hashsum(self): @@ -79,10 +77,7 @@ class LBRYFileStreamCreator(CryptStreamCreator): def _finished(self): self._make_stream_hash() - d = self._save_lbry_file_info() - d.addCallback(lambda _: self.lbry_file_manager.change_lbry_file_status( - self.stream_hash, ManagedLBRYFileDownloader.STATUS_FINISHED - )) + d = self._save_stream_info() return d diff --git a/lbrynet/lbryfilemanager/LBRYFileDownloader.py b/lbrynet/lbryfilemanager/LBRYFileDownloader.py index 1fdae8659..fed0824e5 100644 --- a/lbrynet/lbryfilemanager/LBRYFileDownloader.py +++ b/lbrynet/lbryfilemanager/LBRYFileDownloader.py @@ -18,17 +18,18 @@ class ManagedLBRYFileDownloader(LBRYFileSaver): STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, + def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, lbry_file_manager, payment_rate_manager, wallet, download_directory, upload_allowed, file_name=None): LBRYFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, download_directory, upload_allowed, file_name) + self.rowid = rowid self.lbry_file_manager = lbry_file_manager self.saving_status = False def restore(self): - d = self.lbry_file_manager.get_lbry_file_status(self.stream_hash) + d = self.lbry_file_manager.get_lbry_file_status(self) def restore_status(status): if status == ManagedLBRYFileDownloader.STATUS_RUNNING: @@ -103,7 +104,7 @@ class ManagedLBRYFileDownloader(LBRYFileSaver): s = ManagedLBRYFileDownloader.STATUS_STOPPED else: s = ManagedLBRYFileDownloader.STATUS_RUNNING - return self.lbry_file_manager.change_lbry_file_status(self.stream_hash, s) + return self.lbry_file_manager.change_lbry_file_status(self, s) def _get_progress_manager(self, download_manager): return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager) diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index fb4cb5b80..ed4c75a79 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -44,44 +44,15 @@ class LBRYFileManager(object): d.addCallback(lambda _: self._start_lbry_files()) return d - def get_all_lbry_file_stream_hashes_and_options(self): - d = self._get_all_lbry_file_stream_hashes() + def get_lbry_file_status(self, lbry_file): + return self._get_lbry_file_status(lbry_file.rowid) - def get_options(stream_hashes): - ds = [] + def set_lbry_file_data_payment_rate(self, lbry_file, new_rate): + return self._set_lbry_file_payment_rate(lbry_file.rowid, new_rate) - def get_options_for_stream_hash(stream_hash): - d = self.get_lbry_file_options(stream_hash) - d.addCallback(lambda options: (stream_hash, options)) - return d - - for stream_hash in stream_hashes: - ds.append(get_options_for_stream_hash(stream_hash)) - dl = defer.DeferredList(ds) - dl.addCallback(lambda results: [r[1] for r in results if r[0]]) - return dl - - d.addCallback(get_options) - return d - - def save_lbry_file(self, stream_hash, data_payment_rate): - return self._save_lbry_file(stream_hash, data_payment_rate) - - def get_lbry_file_status(self, stream_hash): - return self._get_lbry_file_status(stream_hash) - - def get_lbry_file_options(self, stream_hash): - return self._get_lbry_file_options(stream_hash) - - def delete_lbry_file_options(self, stream_hash): - return self._delete_lbry_file_options(stream_hash) - - def set_lbry_file_data_payment_rate(self, stream_hash, new_rate): - return self._set_lbry_file_payment_rate(stream_hash, new_rate) - - def change_lbry_file_status(self, stream_hash, status): - log.debug("Changing status of %s to %s", stream_hash, status) - return self._change_file_status(stream_hash, status) + def change_lbry_file_status(self, lbry_file, status): + log.debug("Changing status of %s to %s", lbry_file.stream_hash, status) + return self._change_file_status(lbry_file.rowid, status) def get_lbry_file_status_reports(self): ds = [] @@ -103,29 +74,32 @@ class LBRYFileManager(object): def _start_lbry_files(self): - def set_options_and_restore(stream_hash, options): + def set_options_and_restore(rowid, stream_hash, options): payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) - d = self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0]) + d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate=options) d.addCallback(lambda downloader: downloader.restore()) return d def log_error(err): log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage()) - def start_lbry_files(stream_hashes_and_options): - for stream_hash, options in stream_hashes_and_options: - d = set_options_and_restore(stream_hash, options) + def start_lbry_files(lbry_files_and_options): + for rowid, stream_hash, options in lbry_files_and_options: + d = set_options_and_restore(rowid, stream_hash, options) d.addErrback(log_error) return True - d = self.get_all_lbry_file_stream_hashes_and_options() + d = self._get_all_lbry_files() d.addCallback(start_lbry_files) return d - def start_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): + def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): payment_rate_manager.min_blob_data_payment_rate = blob_data_rate - lbry_file_downloader = ManagedLBRYFileDownloader(stream_hash, self.session.peer_finder, - self.session.rate_limiter, self.session.blob_manager, + lbry_file_downloader = ManagedLBRYFileDownloader(rowid, stream_hash, + self.session.peer_finder, + self.session.rate_limiter, + self.session.blob_manager, self.stream_info_manager, self, payment_rate_manager, self.session.wallet, self.download_directory, @@ -137,17 +111,17 @@ class LBRYFileManager(object): def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): d = self._save_lbry_file(stream_hash, blob_data_rate) - d.addCallback(lambda _: self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate, upload_allowed)) + d.addCallback(lambda rowid: self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate, upload_allowed)) return d - def delete_lbry_file(self, stream_hash): + def delete_lbry_file(self, lbry_file): for l in self.lbry_files: - if l.stream_hash == stream_hash: + if l == lbry_file: lbry_file = l break else: - return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " + - stream_hash))) + return defer.fail(Failure(ValueError("Could not find that LBRY file"))) def wait_for_finished(count=2): if count <= 0 or lbry_file.saving_status is False: @@ -166,23 +140,16 @@ class LBRYFileManager(object): self.lbry_files.remove(lbry_file) d.addCallback(lambda _: remove_from_list()) - d.addCallback(lambda _: self.delete_lbry_file_options(stream_hash)) + d.addCallback(lambda _: self._delete_lbry_file_options(lbry_file.rowid)) return d - def toggle_lbry_file_running(self, stream_hash): + def toggle_lbry_file_running(self, lbry_file): """Toggle whether a stream reader is currently running""" for l in self.lbry_files: - if l.stream_hash == stream_hash: + if l == lbry_file: return l.toggle_running() else: - return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " + - stream_hash))) - - def get_stream_hash_from_name(self, lbry_file_name): - for l in self.lbry_files: - if l.file_name == lbry_file_name: - return l.stream_hash - return None + return defer.fail(Failure(ValueError("Could not find that LBRY file"))) def stop(self): ds = [] @@ -209,6 +176,9 @@ class LBRYFileManager(object): dl.addCallback(lambda _: close_db()) return dl + def get_count_for_stream_hash(self, stream_hash): + return self._get_count_for_stream_hash(stream_hash) + ######### database calls ######### def _open_db(self): @@ -227,41 +197,41 @@ class LBRYFileManager(object): @rerun_if_locked def _save_lbry_file(self, stream_hash, data_payment_rate): - return self.sql_db.runQuery("insert into lbry_file_options values (?, ?, ?)", - (data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED, - stream_hash)) + def do_save(db_transaction): + db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", + (data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED, + stream_hash)) + return db_transaction.lastrowid + return self.sql_db.runInteraction(do_save) @rerun_if_locked - def _get_lbry_file_options(self, stream_hash): - d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda result: result[0] if len(result) else (None, )) + def _delete_lbry_file_options(self, rowid): + return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?", + (rowid,)) + + @rerun_if_locked + def _set_lbry_file_payment_rate(self, rowid, new_rate): + return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where rowid = ?", + (new_rate, rowid)) + + @rerun_if_locked + def _get_all_lbry_files(self): + d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options") return d @rerun_if_locked - def _delete_lbry_file_options(self, stream_hash): - return self.sql_db.runQuery("delete from lbry_file_options where stream_hash = ?", - (stream_hash,)) + def _change_file_status(self, rowid, new_status): + return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", + (new_status, rowid)) @rerun_if_locked - def _set_lbry_file_payment_rate(self, stream_hash, new_rate): - return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where stream_hash = ?", - (new_rate, stream_hash)) - - @rerun_if_locked - def _get_all_lbry_file_stream_hashes(self): - d = self.sql_db.runQuery("select stream_hash from lbry_file_options") - d.addCallback(lambda results: [r[0] for r in results]) - return d - - @rerun_if_locked - def _change_file_status(self, stream_hash, new_status): - return self.sql_db.runQuery("update lbry_file_options set status = ? where stream_hash = ?", - (new_status, stream_hash)) - - @rerun_if_locked - def _get_lbry_file_status(self, stream_hash): - d = self.sql_db.runQuery("select status from lbry_file_options where stream_hash = ?", - (stream_hash,)) + def _get_lbry_file_status(self, rowid): + d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", + (rowid,)) d.addCallback(lambda r: r[0][0] if len(r) else ManagedLBRYFileDownloader.STATUS_STOPPED) - return d \ No newline at end of file + return d + + @rerun_if_locked + def _get_count_for_stream_hash(self, stream_hash): + return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", + (stream_hash,)) \ No newline at end of file diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index 710eab730..78a8dee33 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -1067,11 +1067,11 @@ class DeleteLBRYFile(CommandHandler): self.finished_deferred.callback(None) def _delete_lbry_file(self): - d = self.lbry_file_manager.delete_lbry_file(self.lbry_file.stream_hash) + d = self.lbry_file_manager.delete_lbry_file(self.lbry_file) def finish_deletion(): if self.delete_data is True: - d = self._delete_data() + d = self.lbry_file.delete_data() else: d = defer.succeed(True) d.addCallback(lambda _: self._delete_stream_data()) @@ -1080,33 +1080,12 @@ class DeleteLBRYFile(CommandHandler): d.addCallback(lambda _: finish_deletion()) return d - def _delete_data(self): - d1 = self.stream_info_manager.get_blobs_for_stream(self.lbry_file.stream_hash) - - def get_blob_hashes(blob_infos): - return [b[0] for b in blob_infos if b[0] is not None] - - d1.addCallback(get_blob_hashes) - d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.lbry_file.stream_hash) - - def combine_blob_hashes(results): - blob_hashes = [] - for success, result in results: - if success is True: - blob_hashes.extend(result) - return blob_hashes - - def delete_blobs(blob_hashes): - self.blob_manager.delete_blobs(blob_hashes) - return True - - dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) - dl.addCallback(combine_blob_hashes) - dl.addCallback(delete_blobs) - return dl - def _delete_stream_data(self): - return self.stream_info_manager.delete_stream(self.lbry_file.stream_hash) + s_h = self.lbry_file.stream_hash + d = self.lbry_file_manager.get_count_for_stream_hash(s_h) + # TODO: could possibly be a timing issue here + d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True) + return d class DeleteLBRYFileFactory(LBRYFileChooserFactory): @@ -1138,7 +1117,7 @@ class ToggleLBRYFileRunning(CommandHandler): self.lbry_file_manager = lbry_file_manager def start(self): - d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file.stream_hash) + d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file) d.addErrback(self._handle_download_error) self.finished_deferred.callback(None) @@ -1177,11 +1156,11 @@ class CreateLBRYFile(CommandHandler): def add_to_lbry_files(self, stream_hash): prm = PaymentRateManager(self.session.base_payment_rate_manager) d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) - d.addCallback(self.set_status, stream_hash) + d.addCallback(self.set_status) return d - def set_status(self, lbry_file_downloader, stream_hash): - d = self.lbry_file_manager.change_lbry_file_status(stream_hash, + def set_status(self, lbry_file_downloader): + d = self.lbry_file_manager.change_lbry_file_status(lbry_file_downloader, ManagedLBRYFileDownloader.STATUS_FINISHED) d.addCallback(lambda _: lbry_file_downloader.restore()) return d @@ -1407,7 +1386,7 @@ class ModifyLBRYFileDataPaymentRate(ModifyPaymentRate): def _set_rate(self, rate): self.payment_rate_manager.min_blob_data_payment_rate = rate - return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file.stream_hash, rate) + return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file, rate) def _get_current_status(self): status = "The LBRY file's current data payment rate is " @@ -1821,9 +1800,9 @@ class Publish(CommandHandler): v_string += "Is this correct? (y/n): " return v_string - def set_status(self, lbry_file_downloader, stream_hash): + def set_status(self, lbry_file_downloader): self.lbry_file = lbry_file_downloader - d = self.lbry_file_manager.change_lbry_file_status(stream_hash, + d = self.lbry_file_manager.change_lbry_file_status(self.lbry_file, ManagedLBRYFileDownloader.STATUS_FINISHED) d.addCallback(lambda _: lbry_file_downloader.restore()) return d @@ -1831,7 +1810,7 @@ class Publish(CommandHandler): def add_to_lbry_files(self, stream_hash): prm = PaymentRateManager(self.session.base_payment_rate_manager) d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) - d.addCallback(self.set_status, stream_hash) + d.addCallback(self.set_status) return d def _create_sd_blob(self): diff --git a/lbrynet/lbrynet_console/LBRYConsole.py b/lbrynet/lbrynet_console/LBRYConsole.py index 73e817af2..0005e30a7 100644 --- a/lbrynet/lbrynet_console/LBRYConsole.py +++ b/lbrynet/lbrynet_console/LBRYConsole.py @@ -125,7 +125,7 @@ class LBRYConsole(): # self.session.wallet, self.sd_identifier, self.autofetcher_conf) def _show_start_error(self, error): - print error.getErrorMessage() + print error.getTraceback() log.error("An error occurred during start up: %s", error.getTraceback()) return error diff --git a/tests/functional_tests.py b/tests/functional_tests.py index 38c3253f8..f6516cdad 100644 --- a/tests/functional_tests.py +++ b/tests/functional_tests.py @@ -911,6 +911,121 @@ class TestTransfer(TestCase): return d + def test_double_download(self): + + sd_hash_queue = Queue() + kill_event = Event() + dead_event = Event() + uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event)) + uploader.start() + self.server_processes.append(uploader) + + logging.debug("Testing double download") + + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + downloaders = [] + + db_dir = "client" + blob_dir = os.path.join(db_dir, "blobfiles") + os.mkdir(db_dir) + os.mkdir(blob_dir) + + self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=blob_dir, peer_port=5553, use_upnp=False, + rate_limiter=rate_limiter, wallet=wallet) + + self.stream_info_manager = DBLBRYFileMetadataManager(self.session.db_dir) + + self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier) + + def make_downloader(metadata, prm): + info_validator = metadata.validator + options = metadata.options + factories = metadata.factories + chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] + return factories[0].make_downloader(metadata, chosen_options, prm) + + def append_downloader(downloader): + downloaders.append(downloader) + return downloader + + def download_file(sd_hash): + prm = PaymentRateManager(self.session.base_payment_rate_manager) + d = download_sd_blob(self.session, sd_hash, prm) + d.addCallback(sd_identifier.get_metadata_for_sd_blob) + d.addCallback(make_downloader, prm) + d.addCallback(append_downloader) + d.addCallback(lambda downloader: downloader.start()) + return d + + def check_md5_sum(): + f = open('test_file') + hashsum = MD5.new() + hashsum.update(f.read()) + self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") + + def delete_lbry_file(): + logging.debug("deleting the file...") + d = self.lbry_file_manager.delete_lbry_file(downloaders[0]) + d.addCallback(lambda _: self.lbry_file_manager.get_count_for_stream_hash(downloaders[0].stream_hash)) + d.addCallback(lambda c: self.stream_info_manager.delete_stream(downloaders[1].stream_hash) if c == 0 else True) + return d + + def check_lbry_file(): + d = downloaders[1].status() + d.addCallback(lambda _: downloaders[1].status()) + + def check_status_report(status_report): + self.assertEqual(status_report.num_known, status_report.num_completed) + self.assertEqual(status_report.num_known, 3) + + d.addCallback(check_status_report) + return d + + def start_transfer(sd_hash): + + logging.debug("Starting the transfer") + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: download_file(sd_hash)) + d.addCallback(lambda _: check_md5_sum()) + d.addCallback(lambda _: download_file(sd_hash)) + d.addCallback(lambda _: delete_lbry_file()) + d.addCallback(lambda _: check_lbry_file()) + + return d + + def stop(arg): + if isinstance(arg, Failure): + logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) + else: + logging.debug("Client is stopping normally.") + kill_event.set() + logging.debug("Set the kill event") + d = self.wait_for_dead_event(dead_event) + + def print_shutting_down(): + logging.info("Client is shutting down") + + d.addCallback(lambda _: print_shutting_down()) + d.addCallback(lambda _: arg) + return d + + d = self.wait_for_hash_from_queue(sd_hash_queue) + d.addCallback(start_transfer) + d.addBoth(stop) + return d + class TestStreamify(TestCase): @@ -932,6 +1047,8 @@ class TestStreamify(TestCase): def delete_test_env(): shutil.rmtree('client') + if os.path.exists("test_file"): + os.remove("test_file") d.addCallback(lambda _: threads.deferToThread(delete_test_env)) return d