From 2ddeca2976a0acaae6ba1795988ec10484ccb300 Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Sat, 16 Jan 2016 01:16:37 -0500 Subject: [PATCH] fix bug caused by downloading file twice and deleting one --- lbrynet/core/client/StreamProgressManager.py | 6 + .../client/CryptStreamDownloader.py | 15 +- lbrynet/lbryfile/client/LBRYFileDownloader.py | 25 +++ lbrynet/lbryfilemanager/LBRYFileCreator.py | 9 +- lbrynet/lbryfilemanager/LBRYFileDownloader.py | 7 +- lbrynet/lbryfilemanager/LBRYFileManager.py | 152 +++++++----------- lbrynet/lbrynet_console/ControlHandlers.py | 51 ++---- lbrynet/lbrynet_console/LBRYConsole.py | 2 +- tests/functional_tests.py | 117 ++++++++++++++ 9 files changed, 240 insertions(+), 144 deletions(-) diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index dd8cda00a..a82522530 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -118,6 +118,8 @@ class FullStreamProgressManager(StreamProgressManager): def finished_outputting_blob(): self.last_blob_outputted += 1 + + def check_if_finished(): final_blob_num = self.download_manager.final_blob_num() if final_blob_num is not None and final_blob_num == self.last_blob_outputted: self._finished_outputting() @@ -134,9 +136,13 @@ class FullStreamProgressManager(StreamProgressManager): d = self.download_manager.handle_blob(self.last_blob_outputted + 1) d.addCallback(lambda _: finished_outputting_blob()) d.addCallback(lambda _: self._finished_with_blob(current_blob_num)) + d.addCallback(lambda _: check_if_finished()) def log_error(err): log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage()) + if self.outputting_d is not None and not self.outputting_d.called: + self.outputting_d.callback(True) + self.outputting_d = None d.addErrback(log_error) else: diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index f40f082fe..72e10d60c 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -1,3 +1,4 @@ +import logging from zope.interface import implements from lbrynet.interfaces import IStreamDownloader from lbrynet.core.client.BlobRequester import BlobRequester @@ -9,6 +10,9 @@ from twisted.internet import defer from twisted.python.failure import Failure +log = logging.getLogger(__name__) + + class StartFailedError(Exception): pass @@ -79,10 +83,6 @@ class CryptStreamDownloader(object): def start(self): - def set_finished_deferred(): - self.finished_deferred = defer.Deferred() - return self.finished_deferred - if self.starting is True: raise CurrentlyStartingError() if self.stopping is True: @@ -92,8 +92,9 @@ class CryptStreamDownloader(object): assert self.download_manager is None self.starting = True self.completed = False + self.finished_deferred = defer.Deferred() d = self._start() - d.addCallback(lambda _: set_finished_deferred()) + d.addCallback(lambda _: self.finished_deferred) return d def stop(self, err=None): @@ -112,8 +113,8 @@ class CryptStreamDownloader(object): assert self.download_manager is not None self.stopping = True d = self.download_manager.stop_downloading() - self._fire_completed_deferred(err) d.addCallback(check_if_stop_succeeded) + d.addCallback(lambda _: self._fire_completed_deferred(err)) return d def _start_failed(self): @@ -203,6 +204,8 @@ class CryptStreamDownloader(object): d.errback(err) else: d.callback(self._get_finished_deferred_callback_value()) + else: + log.debug("Not firing the completed deferred because d is None") def _get_finished_deferred_callback_value(self): return None diff --git a/lbrynet/lbryfile/client/LBRYFileDownloader.py b/lbrynet/lbryfile/client/LBRYFileDownloader.py index 9799f1f01..3780192d2 100644 --- a/lbrynet/lbryfile/client/LBRYFileDownloader.py +++ b/lbrynet/lbryfile/client/LBRYFileDownloader.py @@ -46,6 +46,31 @@ class LBRYFileDownloader(CryptStreamDownloader): else: return defer.succeed(True) + def delete_data(self): + d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + + def get_blob_hashes(blob_infos): + return [b[0] for b in blob_infos if b[0] is not None] + + d1.addCallback(get_blob_hashes) + d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + + def combine_blob_hashes(results): + blob_hashes = [] + for success, result in results: + if success is True: + blob_hashes.extend(result) + return blob_hashes + + def delete_blobs(blob_hashes): + self.blob_manager.delete_blobs(blob_hashes) + return True + + dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) + dl.addCallback(combine_blob_hashes) + dl.addCallback(delete_blobs) + return dl + def stop(self, err=None): d = self._close_output() d.addCallback(lambda _: CryptStreamDownloader.stop(self, err=err)) diff --git a/lbrynet/lbryfilemanager/LBRYFileCreator.py b/lbrynet/lbryfilemanager/LBRYFileCreator.py index e214cca7d..1b0fcb5b0 100644 --- a/lbrynet/lbryfilemanager/LBRYFileCreator.py +++ b/lbrynet/lbryfilemanager/LBRYFileCreator.py @@ -36,7 +36,7 @@ class LBRYFileStreamCreator(CryptStreamCreator): log.debug("length: %s", str(blob_info.length)) self.blob_infos.append(blob_info) - def _save_lbry_file_info(self): + def _save_stream_info(self): stream_info_manager = self.lbry_file_manager.stream_info_manager d = stream_info_manager.save_stream(self.stream_hash, binascii.hexlify(self.name), binascii.hexlify(self.key), @@ -46,8 +46,6 @@ class LBRYFileStreamCreator(CryptStreamCreator): def setup(self): d = CryptStreamCreator.setup(self) - d.addCallback(lambda _: self.stream_hash) - return d def _get_blobs_hashsum(self): @@ -79,10 +77,7 @@ class LBRYFileStreamCreator(CryptStreamCreator): def _finished(self): self._make_stream_hash() - d = self._save_lbry_file_info() - d.addCallback(lambda _: self.lbry_file_manager.change_lbry_file_status( - self.stream_hash, ManagedLBRYFileDownloader.STATUS_FINISHED - )) + d = self._save_stream_info() return d diff --git a/lbrynet/lbryfilemanager/LBRYFileDownloader.py b/lbrynet/lbryfilemanager/LBRYFileDownloader.py index 1fdae8659..fed0824e5 100644 --- a/lbrynet/lbryfilemanager/LBRYFileDownloader.py +++ b/lbrynet/lbryfilemanager/LBRYFileDownloader.py @@ -18,17 +18,18 @@ class ManagedLBRYFileDownloader(LBRYFileSaver): STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, + def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, lbry_file_manager, payment_rate_manager, wallet, download_directory, upload_allowed, file_name=None): LBRYFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, download_directory, upload_allowed, file_name) + self.rowid = rowid self.lbry_file_manager = lbry_file_manager self.saving_status = False def restore(self): - d = self.lbry_file_manager.get_lbry_file_status(self.stream_hash) + d = self.lbry_file_manager.get_lbry_file_status(self) def restore_status(status): if status == ManagedLBRYFileDownloader.STATUS_RUNNING: @@ -103,7 +104,7 @@ class ManagedLBRYFileDownloader(LBRYFileSaver): s = ManagedLBRYFileDownloader.STATUS_STOPPED else: s = ManagedLBRYFileDownloader.STATUS_RUNNING - return self.lbry_file_manager.change_lbry_file_status(self.stream_hash, s) + return self.lbry_file_manager.change_lbry_file_status(self, s) def _get_progress_manager(self, download_manager): return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager) diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index fb4cb5b80..ed4c75a79 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -44,44 +44,15 @@ class LBRYFileManager(object): d.addCallback(lambda _: self._start_lbry_files()) return d - def get_all_lbry_file_stream_hashes_and_options(self): - d = self._get_all_lbry_file_stream_hashes() + def get_lbry_file_status(self, lbry_file): + return self._get_lbry_file_status(lbry_file.rowid) - def get_options(stream_hashes): - ds = [] + def set_lbry_file_data_payment_rate(self, lbry_file, new_rate): + return self._set_lbry_file_payment_rate(lbry_file.rowid, new_rate) - def get_options_for_stream_hash(stream_hash): - d = self.get_lbry_file_options(stream_hash) - d.addCallback(lambda options: (stream_hash, options)) - return d - - for stream_hash in stream_hashes: - ds.append(get_options_for_stream_hash(stream_hash)) - dl = defer.DeferredList(ds) - dl.addCallback(lambda results: [r[1] for r in results if r[0]]) - return dl - - d.addCallback(get_options) - return d - - def save_lbry_file(self, stream_hash, data_payment_rate): - return self._save_lbry_file(stream_hash, data_payment_rate) - - def get_lbry_file_status(self, stream_hash): - return self._get_lbry_file_status(stream_hash) - - def get_lbry_file_options(self, stream_hash): - return self._get_lbry_file_options(stream_hash) - - def delete_lbry_file_options(self, stream_hash): - return self._delete_lbry_file_options(stream_hash) - - def set_lbry_file_data_payment_rate(self, stream_hash, new_rate): - return self._set_lbry_file_payment_rate(stream_hash, new_rate) - - def change_lbry_file_status(self, stream_hash, status): - log.debug("Changing status of %s to %s", stream_hash, status) - return self._change_file_status(stream_hash, status) + def change_lbry_file_status(self, lbry_file, status): + log.debug("Changing status of %s to %s", lbry_file.stream_hash, status) + return self._change_file_status(lbry_file.rowid, status) def get_lbry_file_status_reports(self): ds = [] @@ -103,29 +74,32 @@ class LBRYFileManager(object): def _start_lbry_files(self): - def set_options_and_restore(stream_hash, options): + def set_options_and_restore(rowid, stream_hash, options): payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) - d = self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0]) + d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate=options) d.addCallback(lambda downloader: downloader.restore()) return d def log_error(err): log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage()) - def start_lbry_files(stream_hashes_and_options): - for stream_hash, options in stream_hashes_and_options: - d = set_options_and_restore(stream_hash, options) + def start_lbry_files(lbry_files_and_options): + for rowid, stream_hash, options in lbry_files_and_options: + d = set_options_and_restore(rowid, stream_hash, options) d.addErrback(log_error) return True - d = self.get_all_lbry_file_stream_hashes_and_options() + d = self._get_all_lbry_files() d.addCallback(start_lbry_files) return d - def start_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): + def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): payment_rate_manager.min_blob_data_payment_rate = blob_data_rate - lbry_file_downloader = ManagedLBRYFileDownloader(stream_hash, self.session.peer_finder, - self.session.rate_limiter, self.session.blob_manager, + lbry_file_downloader = ManagedLBRYFileDownloader(rowid, stream_hash, + self.session.peer_finder, + self.session.rate_limiter, + self.session.blob_manager, self.stream_info_manager, self, payment_rate_manager, self.session.wallet, self.download_directory, @@ -137,17 +111,17 @@ class LBRYFileManager(object): def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True): d = self._save_lbry_file(stream_hash, blob_data_rate) - d.addCallback(lambda _: self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate, upload_allowed)) + d.addCallback(lambda rowid: self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate, upload_allowed)) return d - def delete_lbry_file(self, stream_hash): + def delete_lbry_file(self, lbry_file): for l in self.lbry_files: - if l.stream_hash == stream_hash: + if l == lbry_file: lbry_file = l break else: - return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " + - stream_hash))) + return defer.fail(Failure(ValueError("Could not find that LBRY file"))) def wait_for_finished(count=2): if count <= 0 or lbry_file.saving_status is False: @@ -166,23 +140,16 @@ class LBRYFileManager(object): self.lbry_files.remove(lbry_file) d.addCallback(lambda _: remove_from_list()) - d.addCallback(lambda _: self.delete_lbry_file_options(stream_hash)) + d.addCallback(lambda _: self._delete_lbry_file_options(lbry_file.rowid)) return d - def toggle_lbry_file_running(self, stream_hash): + def toggle_lbry_file_running(self, lbry_file): """Toggle whether a stream reader is currently running""" for l in self.lbry_files: - if l.stream_hash == stream_hash: + if l == lbry_file: return l.toggle_running() else: - return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " + - stream_hash))) - - def get_stream_hash_from_name(self, lbry_file_name): - for l in self.lbry_files: - if l.file_name == lbry_file_name: - return l.stream_hash - return None + return defer.fail(Failure(ValueError("Could not find that LBRY file"))) def stop(self): ds = [] @@ -209,6 +176,9 @@ class LBRYFileManager(object): dl.addCallback(lambda _: close_db()) return dl + def get_count_for_stream_hash(self, stream_hash): + return self._get_count_for_stream_hash(stream_hash) + ######### database calls ######### def _open_db(self): @@ -227,41 +197,41 @@ class LBRYFileManager(object): @rerun_if_locked def _save_lbry_file(self, stream_hash, data_payment_rate): - return self.sql_db.runQuery("insert into lbry_file_options values (?, ?, ?)", - (data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED, - stream_hash)) + def do_save(db_transaction): + db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", + (data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED, + stream_hash)) + return db_transaction.lastrowid + return self.sql_db.runInteraction(do_save) @rerun_if_locked - def _get_lbry_file_options(self, stream_hash): - d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda result: result[0] if len(result) else (None, )) + def _delete_lbry_file_options(self, rowid): + return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?", + (rowid,)) + + @rerun_if_locked + def _set_lbry_file_payment_rate(self, rowid, new_rate): + return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where rowid = ?", + (new_rate, rowid)) + + @rerun_if_locked + def _get_all_lbry_files(self): + d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options") return d @rerun_if_locked - def _delete_lbry_file_options(self, stream_hash): - return self.sql_db.runQuery("delete from lbry_file_options where stream_hash = ?", - (stream_hash,)) + def _change_file_status(self, rowid, new_status): + return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", + (new_status, rowid)) @rerun_if_locked - def _set_lbry_file_payment_rate(self, stream_hash, new_rate): - return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where stream_hash = ?", - (new_rate, stream_hash)) - - @rerun_if_locked - def _get_all_lbry_file_stream_hashes(self): - d = self.sql_db.runQuery("select stream_hash from lbry_file_options") - d.addCallback(lambda results: [r[0] for r in results]) - return d - - @rerun_if_locked - def _change_file_status(self, stream_hash, new_status): - return self.sql_db.runQuery("update lbry_file_options set status = ? where stream_hash = ?", - (new_status, stream_hash)) - - @rerun_if_locked - def _get_lbry_file_status(self, stream_hash): - d = self.sql_db.runQuery("select status from lbry_file_options where stream_hash = ?", - (stream_hash,)) + def _get_lbry_file_status(self, rowid): + d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", + (rowid,)) d.addCallback(lambda r: r[0][0] if len(r) else ManagedLBRYFileDownloader.STATUS_STOPPED) - return d \ No newline at end of file + return d + + @rerun_if_locked + def _get_count_for_stream_hash(self, stream_hash): + return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", + (stream_hash,)) \ No newline at end of file diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index 710eab730..78a8dee33 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -1067,11 +1067,11 @@ class DeleteLBRYFile(CommandHandler): self.finished_deferred.callback(None) def _delete_lbry_file(self): - d = self.lbry_file_manager.delete_lbry_file(self.lbry_file.stream_hash) + d = self.lbry_file_manager.delete_lbry_file(self.lbry_file) def finish_deletion(): if self.delete_data is True: - d = self._delete_data() + d = self.lbry_file.delete_data() else: d = defer.succeed(True) d.addCallback(lambda _: self._delete_stream_data()) @@ -1080,33 +1080,12 @@ class DeleteLBRYFile(CommandHandler): d.addCallback(lambda _: finish_deletion()) return d - def _delete_data(self): - d1 = self.stream_info_manager.get_blobs_for_stream(self.lbry_file.stream_hash) - - def get_blob_hashes(blob_infos): - return [b[0] for b in blob_infos if b[0] is not None] - - d1.addCallback(get_blob_hashes) - d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.lbry_file.stream_hash) - - def combine_blob_hashes(results): - blob_hashes = [] - for success, result in results: - if success is True: - blob_hashes.extend(result) - return blob_hashes - - def delete_blobs(blob_hashes): - self.blob_manager.delete_blobs(blob_hashes) - return True - - dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) - dl.addCallback(combine_blob_hashes) - dl.addCallback(delete_blobs) - return dl - def _delete_stream_data(self): - return self.stream_info_manager.delete_stream(self.lbry_file.stream_hash) + s_h = self.lbry_file.stream_hash + d = self.lbry_file_manager.get_count_for_stream_hash(s_h) + # TODO: could possibly be a timing issue here + d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True) + return d class DeleteLBRYFileFactory(LBRYFileChooserFactory): @@ -1138,7 +1117,7 @@ class ToggleLBRYFileRunning(CommandHandler): self.lbry_file_manager = lbry_file_manager def start(self): - d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file.stream_hash) + d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file) d.addErrback(self._handle_download_error) self.finished_deferred.callback(None) @@ -1177,11 +1156,11 @@ class CreateLBRYFile(CommandHandler): def add_to_lbry_files(self, stream_hash): prm = PaymentRateManager(self.session.base_payment_rate_manager) d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) - d.addCallback(self.set_status, stream_hash) + d.addCallback(self.set_status) return d - def set_status(self, lbry_file_downloader, stream_hash): - d = self.lbry_file_manager.change_lbry_file_status(stream_hash, + def set_status(self, lbry_file_downloader): + d = self.lbry_file_manager.change_lbry_file_status(lbry_file_downloader, ManagedLBRYFileDownloader.STATUS_FINISHED) d.addCallback(lambda _: lbry_file_downloader.restore()) return d @@ -1407,7 +1386,7 @@ class ModifyLBRYFileDataPaymentRate(ModifyPaymentRate): def _set_rate(self, rate): self.payment_rate_manager.min_blob_data_payment_rate = rate - return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file.stream_hash, rate) + return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file, rate) def _get_current_status(self): status = "The LBRY file's current data payment rate is " @@ -1821,9 +1800,9 @@ class Publish(CommandHandler): v_string += "Is this correct? (y/n): " return v_string - def set_status(self, lbry_file_downloader, stream_hash): + def set_status(self, lbry_file_downloader): self.lbry_file = lbry_file_downloader - d = self.lbry_file_manager.change_lbry_file_status(stream_hash, + d = self.lbry_file_manager.change_lbry_file_status(self.lbry_file, ManagedLBRYFileDownloader.STATUS_FINISHED) d.addCallback(lambda _: lbry_file_downloader.restore()) return d @@ -1831,7 +1810,7 @@ class Publish(CommandHandler): def add_to_lbry_files(self, stream_hash): prm = PaymentRateManager(self.session.base_payment_rate_manager) d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) - d.addCallback(self.set_status, stream_hash) + d.addCallback(self.set_status) return d def _create_sd_blob(self): diff --git a/lbrynet/lbrynet_console/LBRYConsole.py b/lbrynet/lbrynet_console/LBRYConsole.py index 73e817af2..0005e30a7 100644 --- a/lbrynet/lbrynet_console/LBRYConsole.py +++ b/lbrynet/lbrynet_console/LBRYConsole.py @@ -125,7 +125,7 @@ class LBRYConsole(): # self.session.wallet, self.sd_identifier, self.autofetcher_conf) def _show_start_error(self, error): - print error.getErrorMessage() + print error.getTraceback() log.error("An error occurred during start up: %s", error.getTraceback()) return error diff --git a/tests/functional_tests.py b/tests/functional_tests.py index 38c3253f8..f6516cdad 100644 --- a/tests/functional_tests.py +++ b/tests/functional_tests.py @@ -911,6 +911,121 @@ class TestTransfer(TestCase): return d + def test_double_download(self): + + sd_hash_queue = Queue() + kill_event = Event() + dead_event = Event() + uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event)) + uploader.start() + self.server_processes.append(uploader) + + logging.debug("Testing double download") + + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + downloaders = [] + + db_dir = "client" + blob_dir = os.path.join(db_dir, "blobfiles") + os.mkdir(db_dir) + os.mkdir(blob_dir) + + self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=blob_dir, peer_port=5553, use_upnp=False, + rate_limiter=rate_limiter, wallet=wallet) + + self.stream_info_manager = DBLBRYFileMetadataManager(self.session.db_dir) + + self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier) + + def make_downloader(metadata, prm): + info_validator = metadata.validator + options = metadata.options + factories = metadata.factories + chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] + return factories[0].make_downloader(metadata, chosen_options, prm) + + def append_downloader(downloader): + downloaders.append(downloader) + return downloader + + def download_file(sd_hash): + prm = PaymentRateManager(self.session.base_payment_rate_manager) + d = download_sd_blob(self.session, sd_hash, prm) + d.addCallback(sd_identifier.get_metadata_for_sd_blob) + d.addCallback(make_downloader, prm) + d.addCallback(append_downloader) + d.addCallback(lambda downloader: downloader.start()) + return d + + def check_md5_sum(): + f = open('test_file') + hashsum = MD5.new() + hashsum.update(f.read()) + self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") + + def delete_lbry_file(): + logging.debug("deleting the file...") + d = self.lbry_file_manager.delete_lbry_file(downloaders[0]) + d.addCallback(lambda _: self.lbry_file_manager.get_count_for_stream_hash(downloaders[0].stream_hash)) + d.addCallback(lambda c: self.stream_info_manager.delete_stream(downloaders[1].stream_hash) if c == 0 else True) + return d + + def check_lbry_file(): + d = downloaders[1].status() + d.addCallback(lambda _: downloaders[1].status()) + + def check_status_report(status_report): + self.assertEqual(status_report.num_known, status_report.num_completed) + self.assertEqual(status_report.num_known, 3) + + d.addCallback(check_status_report) + return d + + def start_transfer(sd_hash): + + logging.debug("Starting the transfer") + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: download_file(sd_hash)) + d.addCallback(lambda _: check_md5_sum()) + d.addCallback(lambda _: download_file(sd_hash)) + d.addCallback(lambda _: delete_lbry_file()) + d.addCallback(lambda _: check_lbry_file()) + + return d + + def stop(arg): + if isinstance(arg, Failure): + logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) + else: + logging.debug("Client is stopping normally.") + kill_event.set() + logging.debug("Set the kill event") + d = self.wait_for_dead_event(dead_event) + + def print_shutting_down(): + logging.info("Client is shutting down") + + d.addCallback(lambda _: print_shutting_down()) + d.addCallback(lambda _: arg) + return d + + d = self.wait_for_hash_from_queue(sd_hash_queue) + d.addCallback(start_transfer) + d.addBoth(stop) + return d + class TestStreamify(TestCase): @@ -932,6 +1047,8 @@ class TestStreamify(TestCase): def delete_test_env(): shutil.rmtree('client') + if os.path.exists("test_file"): + os.remove("test_file") d.addCallback(lambda _: threads.deferToThread(delete_test_env)) return d