diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index 01badedae..eb100fbf0 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -1,32 +1,23 @@ -import logging -from multiprocessing import Process, Event, Queue import os -import platform -import shutil -import sys -import unittest - from hashlib import md5 +from twisted.internet import defer, reactor +from twisted.trial import unittest from lbrynet import conf -from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager -from lbrynet.core.Session import Session from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory -from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.StreamDescriptor import download_sd_blob -from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file -from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier -from twisted.internet import defer, threads, task -from twisted.trial.unittest import TestCase -from twisted.python.failure import Failure - +from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager +from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.PeerManager import PeerManager -from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter +from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory - +from lbrynet.database.storage import SQLiteStorage +from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file +from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager +from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.tests import mocks -from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, is_android +from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir FakeNode = mocks.Node FakeWallet = mocks.Wallet @@ -36,29 +27,6 @@ GenFile = mocks.GenFile test_create_stream_sd_file = mocks.create_stream_sd_file DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker -log_format = "%(funcName)s(): %(message)s" -logging.basicConfig(level=logging.CRITICAL, format=log_format) - -TEST_SKIP_STRING_ANDROID = "Test cannot pass on Android because multiprocessing is not supported at the OS level." - -def require_system(system): - def wrapper(fn): - return fn - - if platform.system() == system: - return wrapper - else: - return unittest.skip("Skipping. Test can only be run on " + system) - - -def use_epoll_on_linux(): - if sys.platform.startswith("linux"): - sys.modules = sys.modules.copy() - del sys.modules['twisted.internet.reactor'] - import twisted.internet - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor - def init_conf_windows(settings={}): """ @@ -74,779 +42,308 @@ def init_conf_windows(settings={}): class LbryUploader(object): - def __init__(self, sd_hash_queue, kill_event, dead_event, - file_size, ul_rate_limit=None, is_generous=False): - self.sd_hash_queue = sd_hash_queue - self.kill_event = kill_event - self.dead_event = dead_event + def __init__(self, file_size, ul_rate_limit=None): self.file_size = file_size self.ul_rate_limit = ul_rate_limit - self.is_generous = is_generous + self.kill_check = None # these attributes get defined in `start` - self.reactor = None - self.sd_identifier = None - self.session = None + self.db_dir = None + self.blob_dir = None + self.wallet = None + self.peer_manager = None + self.rate_limiter = None + self.prm = None + self.storage = None + self.blob_manager = None self.lbry_file_manager = None self.server_port = None - self.kill_check = None - - def start(self): - use_epoll_on_linux() - init_conf_windows() - - from twisted.internet import reactor - self.reactor = reactor - logging.debug("Starting the uploader") - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = RateLimiter() - self.sd_identifier = StreamDescriptorIdentifier() - self.db_dir, self.blob_dir = mk_db_and_blob_dir() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") - - self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir, - node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, - peer_port=5553, dht_node_port=4445, rate_limiter=rate_limiter, wallet=wallet, - dht_node=dht_node, external_ip="127.0.0.1") - self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) - if self.ul_rate_limit is not None: - self.session.rate_limiter.set_ul_limit(self.ul_rate_limit) - reactor.callLater(1, self.start_all) - if not reactor.running: - reactor.run() - - def start_all(self): - d = self.session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - d.addCallback(lambda _: self.start_server()) - d.addCallback(lambda _: self.create_stream()) - d.addCallback(self.put_sd_hash_on_queue) - - def print_error(err): - logging.critical("Server error: %s", err.getErrorMessage()) - - d.addErrback(print_error) - return d - - def start_server(self): - session = self.session - query_handler_factories = { - 1: BlobAvailabilityHandlerFactory(session.blob_manager), - 2: BlobRequestHandlerFactory( - session.blob_manager, session.wallet, - session.payment_rate_manager, - None), - 3: session.wallet.get_wallet_info_query_handler_factory(), - } - server_factory = ServerProtocolFactory(session.rate_limiter, - query_handler_factories, - session.peer_manager) - self.server_port = self.reactor.listenTCP(5553, server_factory) - logging.debug("Started listening") - self.kill_check = task.LoopingCall(self.check_for_kill) - self.kill_check.start(1.0) - return True - - def kill_server(self): - session = self.session - ds = [] - ds.append(session.shut_down()) - ds.append(self.lbry_file_manager.stop()) - if self.server_port: - ds.append(self.server_port.stopListening()) - self.kill_check.stop() - self.dead_event.set() - dl = defer.DeferredList(ds) - dl.addCallback(lambda _: rm_db_and_blob_dir(self.db_dir, self.blob_dir)) - dl.addCallback(lambda _: self.reactor.stop()) - return dl - - def check_for_kill(self): - if self.kill_event.is_set(): - self.kill_server() @defer.inlineCallbacks - def create_stream(self): + def setup(self): + init_conf_windows() + + self.db_dir, self.blob_dir = mk_db_and_blob_dir() + self.wallet = FakeWallet() + self.peer_manager = PeerManager() + self.rate_limiter = RateLimiter() + if self.ul_rate_limit is not None: + self.rate_limiter.set_ul_limit(self.ul_rate_limit) + self.prm = OnlyFreePaymentsManager() + self.storage = SQLiteStorage(self.db_dir) + self.blob_manager = DiskBlobManager(self.blob_dir, self.storage) + self.lbry_file_manager = EncryptedFileManager(FakePeerFinder(5553, self.peer_manager, 1), self.rate_limiter, + self.blob_manager, self.wallet, self.prm, self.storage, + StreamDescriptorIdentifier()) + + yield self.storage.setup() + yield self.blob_manager.setup() + yield self.lbry_file_manager.setup() + + query_handler_factories = { + 1: BlobAvailabilityHandlerFactory(self.blob_manager), + 2: BlobRequestHandlerFactory( + self.blob_manager, self.wallet, + self.prm, + None), + 3: self.wallet.get_wallet_info_query_handler_factory(), + } + server_factory = ServerProtocolFactory(self.rate_limiter, + query_handler_factories, + self.peer_manager) + self.server_port = reactor.listenTCP(5553, server_factory, interface="localhost") test_file = GenFile(self.file_size, b''.join([chr(i) for i in xrange(0, 64, 6)])) - lbry_file = yield create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file) + lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.prm, self.lbry_file_manager, + "test_file", test_file) defer.returnValue(lbry_file.sd_hash) - def put_sd_hash_on_queue(self, sd_hash): - self.sd_hash_queue.put(sd_hash) + @defer.inlineCallbacks + def stop(self): + lbry_files = self.lbry_file_manager.lbry_files + for lbry_file in lbry_files: + yield self.lbry_file_manager.delete_lbry_file(lbry_file) + yield self.lbry_file_manager.stop() + yield self.blob_manager.stop() + yield self.storage.stop() + self.server_port.stopListening() + rm_db_and_blob_dir(self.db_dir, self.blob_dir) + if os.path.exists("test_file"): + os.remove("test_file") -def start_lbry_reuploader(sd_hash, kill_event, dead_event, - ready_event, n, ul_rate_limit=None, is_generous=False): - use_epoll_on_linux() - init_conf_windows() - from twisted.internet import reactor - - logging.debug("Starting the uploader") - - - wallet = FakeWallet() - peer_port = 5553 + n - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = RateLimiter() - sd_identifier = StreamDescriptorIdentifier() - - db_dir, blob_dir = mk_db_and_blob_dir() - session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd" + str(n), dht_node_port=4446, - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=peer_port, - rate_limiter=rate_limiter, wallet=wallet, - external_ip="127.0.0.1") - - lbry_file_manager = EncryptedFileManager(session, sd_identifier) - - if ul_rate_limit is not None: - session.rate_limiter.set_ul_limit(ul_rate_limit) - - def make_downloader(metadata, prm, download_directory): - factories = metadata.factories - return factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, download_directory) - - def download_file(): - prm = session.payment_rate_manager - d = download_sd_blob(session, sd_hash, prm) - d.addCallback(sd_identifier.get_metadata_for_sd_blob) - d.addCallback(make_downloader, prm, db_dir) - d.addCallback(lambda downloader: downloader.start()) - return d - - def start_transfer(): - - logging.debug("Starting the transfer") - - d = session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: lbry_file_manager.setup()) - d.addCallback(lambda _: download_file()) - - return d - - def start_server(): - - server_port = None - - query_handler_factories = { - 1: BlobAvailabilityHandlerFactory(session.blob_manager), - 2: BlobRequestHandlerFactory( - session.blob_manager, session.wallet, - session.payment_rate_manager, - None), - 3: session.wallet.get_wallet_info_query_handler_factory(), - } - - server_factory = ServerProtocolFactory(session.rate_limiter, - query_handler_factories, - session.peer_manager) - - server_port = reactor.listenTCP(peer_port, server_factory) - logging.debug("Started listening") - - def kill_server(): - ds = [] - ds.append(session.shut_down()) - ds.append(lbry_file_manager.stop()) - if server_port: - ds.append(server_port.stopListening()) - ds.append(rm_db_and_blob_dir(db_dir, blob_dir)) - kill_check.stop() - dead_event.set() - dl = defer.DeferredList(ds) - dl.addCallback(lambda _: reactor.stop()) - return dl - - def check_for_kill(): - if kill_event.is_set(): - kill_server() - - kill_check = task.LoopingCall(check_for_kill) - kill_check.start(1.0) - ready_event.set() - logging.debug("set the ready event") - - d = task.deferLater(reactor, 1.0, start_transfer) - d.addCallback(lambda _: start_server()) - if not reactor.running: - reactor.run() - - -def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False): - use_epoll_on_linux() - init_conf_windows() - from twisted.internet import reactor - - logging.debug("Starting the uploader") - - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = RateLimiter() - - if slow is True: - peer_port = 5553 - else: - peer_port = 5554 - - - db_dir, blob_dir = mk_db_and_blob_dir() - - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") - - session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=peer_port, dht_node_port=4446, - rate_limiter=rate_limiter, wallet=wallet, - external_ip="127.0.0.1", dht_node=dht_node) - - if slow is True: - session.rate_limiter.set_ul_limit(2 ** 11) - - def start_all(): - d = session.setup() - d.addCallback(lambda _: start_server()) - d.addCallback(lambda _: create_single_blob()) - d.addCallback(put_blob_hash_on_queue) - - def print_error(err): - logging.critical("Server error: %s", err.getErrorMessage()) - - d.addErrback(print_error) - return d - - def start_server(): - - server_port = None - - query_handler_factories = { - 1: BlobAvailabilityHandlerFactory(session.blob_manager), - 2: BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager, - None), - 3: session.wallet.get_wallet_info_query_handler_factory(), - } - - server_factory = ServerProtocolFactory(session.rate_limiter, - query_handler_factories, - session.peer_manager) - - server_port = reactor.listenTCP(peer_port, server_factory) - logging.debug("Started listening") - - def kill_server(): - ds = [] - ds.append(session.shut_down()) - if server_port: - ds.append(server_port.stopListening()) - kill_check.stop() - dead_event.set() - dl = defer.DeferredList(ds) - dl.addCallback(lambda _: reactor.stop()) - dl.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) - return dl - - def check_for_kill(): - if kill_event.is_set(): - kill_server() - - kill_check = task.LoopingCall(check_for_kill) - kill_check.start(1.0) - return True - - def create_single_blob(): - blob_creator = session.blob_manager.get_blob_creator() - blob_creator.write("0" * 2 ** 21) - return blob_creator.close() - - def put_blob_hash_on_queue(blob_hash): - logging.debug("Telling the client to start running. Blob hash: %s", str(blob_hash)) - blob_hash_queue.put(blob_hash) - logging.debug("blob hash has been added to the queue") - - reactor.callLater(1, start_all) - if not reactor.running: - reactor.run() - - -class TestTransfer(TestCase): +class TestTransfer(unittest.TestCase): + @defer.inlineCallbacks def setUp(self): mocks.mock_conf_settings(self) - self.server_processes = [] - self.session = None - self.lbry_file_manager = None - self.is_generous = True - self.addCleanup(self.take_down_env) + self.db_dir, self.blob_dir = mk_db_and_blob_dir() + self.wallet = FakeWallet() + self.peer_manager = PeerManager() + self.peer_finder = FakePeerFinder(5553, self.peer_manager, 1) + self.rate_limiter = RateLimiter() + self.prm = OnlyFreePaymentsManager() + self.storage = SQLiteStorage(self.db_dir) + self.blob_manager = DiskBlobManager(self.blob_dir, self.storage) + self.sd_identifier = StreamDescriptorIdentifier() + self.lbry_file_manager = EncryptedFileManager(self.peer_finder, self.rate_limiter, + self.blob_manager, self.wallet, self.prm, self.storage, + self.sd_identifier) - def take_down_env(self): + self.uploader = LbryUploader(5209343) + self.sd_hash = yield self.uploader.setup() + yield self.storage.setup() + yield self.blob_manager.setup() + yield self.lbry_file_manager.setup() + yield add_lbry_file_to_sd_identifier(self.sd_identifier) - d = defer.succeed(True) - if self.lbry_file_manager is not None: - d.addCallback(lambda _: self.lbry_file_manager.stop()) - if self.session is not None: - d.addCallback(lambda _: self.session.shut_down()) + @defer.inlineCallbacks + def tearDown(self): + yield self.uploader.stop() + lbry_files = self.lbry_file_manager.lbry_files + for lbry_file in lbry_files: + yield self.lbry_file_manager.delete_lbry_file(lbry_file) + yield self.lbry_file_manager.stop() + yield self.blob_manager.stop() + yield self.storage.stop() + rm_db_and_blob_dir(self.db_dir, self.blob_dir) + if os.path.exists("test_file"): + os.remove("test_file") - def delete_test_env(): - dirs = ['server', 'server1', 'server2', 'client'] - files = ['test_file'] - for di in dirs: - if os.path.exists(di): - shutil.rmtree(di) - for f in files: - if os.path.exists(f): - os.remove(f) - for p in self.server_processes: - p.terminate() - return True - - d.addCallback(lambda _: threads.deferToThread(delete_test_env)) - return d - - @staticmethod - def wait_for_event(event, timeout): - - from twisted.internet import reactor - d = defer.Deferred() - - def stop(): - set_check.stop() - if stop_call.active(): - stop_call.cancel() - d.callback(True) - - def check_if_event_set(): - if event.is_set(): - logging.debug("Dead event has been found set") - stop() - - def done_waiting(): - logging.warning("Event has not been found set and timeout has expired") - stop() - - set_check = task.LoopingCall(check_if_event_set) - set_check.start(.1) - stop_call = reactor.callLater(timeout, done_waiting) - return d - - @staticmethod - def wait_for_hash_from_queue(hash_queue): - logging.debug("Waiting for the sd_hash to come through the queue") - - d = defer.Deferred() - - def check_for_start(): - if hash_queue.empty() is False: - logging.debug("Client start event has been found set") - start_check.stop() - d.callback(hash_queue.get(False)) - else: - logging.debug("Client start event has NOT been found set") - - start_check = task.LoopingCall(check_for_start) - start_check.start(1.0) - - return d - - @unittest.skipIf(is_android(), - 'Test cannot pass on Android because multiprocessing ' - 'is not supported at the OS level.') + @defer.inlineCallbacks def test_lbry_transfer(self): - sd_hash_queue = Queue() - kill_event = Event() - dead_event = Event() - lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343) - uploader = Process(target=lbry_uploader.start) - uploader.start() - self.server_processes.append(uploader) - - logging.debug("Testing transfer") - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") - - db_dir, blob_dir = mk_db_and_blob_dir() - self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, - rate_limiter=rate_limiter, wallet=wallet, - dht_node=dht_node, external_ip="127.0.0.1") - - self.lbry_file_manager = EncryptedFileManager( - self.session, sd_identifier) - - def make_downloader(metadata, prm): - factories = metadata.factories - return factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, db_dir) - - def download_file(sd_hash): - prm = self.session.payment_rate_manager - d = download_sd_blob(self.session, sd_hash, prm) - d.addCallback(sd_identifier.get_metadata_for_sd_blob) - d.addCallback(make_downloader, prm) - d.addCallback(lambda downloader: downloader.start()) - return d - - def check_md5_sum(): - f = open(os.path.join(db_dir, 'test_file')) + sd_blob = yield download_sd_blob(self.sd_hash, self.blob_manager, self.peer_finder, self.rate_limiter, self.prm, self.wallet) + metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) + downloader = yield metadata.factories[0].make_downloader(metadata, self.prm.min_blob_data_payment_rate, self.prm, self.db_dir, download_mirrors=None) + yield downloader.start() + with open(os.path.join(self.db_dir, 'test_file')) as f: hashsum = md5() hashsum.update(f.read()) - self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") - - @defer.inlineCallbacks - def start_transfer(sd_hash): - logging.debug("Starting the transfer") - yield self.session.setup() - yield add_lbry_file_to_sd_identifier(sd_identifier) - yield self.lbry_file_manager.setup() - yield download_file(sd_hash) - yield check_md5_sum() - - def stop(arg): - if isinstance(arg, Failure): - logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) - else: - logging.debug("Client is stopping normally.") - kill_event.set() - logging.debug("Set the kill event") - d = self.wait_for_event(dead_event, 15) - - def print_shutting_down(): - logging.info("Client is shutting down") - - d.addCallback(lambda _: print_shutting_down()) - d.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) - d.addCallback(lambda _: arg) - return d - - d = self.wait_for_hash_from_queue(sd_hash_queue) - d.addCallback(start_transfer) - d.addBoth(stop) - - return d - - @unittest.skipIf(is_android(), - 'Test cannot pass on Android because multiprocessing ' - 'is not supported at the OS level.') - def test_last_blob_retrieval(self): - kill_event = Event() - dead_event_1 = Event() - blob_hash_queue_1 = Queue() - blob_hash_queue_2 = Queue() - fast_uploader = Process(target=start_blob_uploader, - args=(blob_hash_queue_1, kill_event, dead_event_1, False)) - fast_uploader.start() - self.server_processes.append(fast_uploader) - dead_event_2 = Event() - slow_uploader = Process(target=start_blob_uploader, - args=(blob_hash_queue_2, kill_event, dead_event_2, True)) - slow_uploader.start() - self.server_processes.append(slow_uploader) - - logging.debug("Testing transfer") - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 2) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") - - db_dir, blob_dir = mk_db_and_blob_dir() - self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, - rate_limiter=rate_limiter, wallet=wallet, - dht_node=dht_node, external_ip="127.0.0.1") - - d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) - d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) - d = defer.DeferredList([d1, d2], fireOnOneErrback=True) - - def get_blob_hash(results): - self.assertEqual(results[0][1], results[1][1]) - return results[0][1] - - d.addCallback(get_blob_hash) - - def download_blob(blob_hash): - prm = self.session.payment_rate_manager - downloader = StandaloneBlobDownloader( - blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet) - d = downloader.download() - return d - - def start_transfer(blob_hash): - - logging.debug("Starting the transfer") - - d = self.session.setup() - d.addCallback(lambda _: download_blob(blob_hash)) - - return d - - d.addCallback(start_transfer) - - def stop(arg): - if isinstance(arg, Failure): - logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) - else: - logging.debug("Client is stopping normally.") - kill_event.set() - logging.debug("Set the kill event") - d1 = self.wait_for_event(dead_event_1, 15) - d2 = self.wait_for_event(dead_event_2, 15) - dl = defer.DeferredList([d1, d2]) - - def print_shutting_down(): - logging.info("Client is shutting down") - - dl.addCallback(lambda _: print_shutting_down()) - dl.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) - dl.addCallback(lambda _: arg) - return dl - - d.addBoth(stop) - return d - - @unittest.skipIf(is_android(), - 'Test cannot pass on Android because multiprocessing ' - 'is not supported at the OS level.') - def test_double_download(self): - sd_hash_queue = Queue() - kill_event = Event() - dead_event = Event() - lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343) - uploader = Process(target=lbry_uploader.start) - uploader.start() - self.server_processes.append(uploader) - - logging.debug("Testing double download") - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") - - downloaders = [] - - db_dir, blob_dir = mk_db_and_blob_dir() - self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, - hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - rate_limiter=rate_limiter, wallet=wallet, - external_ip="127.0.0.1", dht_node=dht_node) - - self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) - - @defer.inlineCallbacks - def make_downloader(metadata, prm): - factories = metadata.factories - downloader = yield factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, db_dir) - defer.returnValue(downloader) - - @defer.inlineCallbacks - def download_file(sd_hash): - prm = self.session.payment_rate_manager - sd_blob = yield download_sd_blob(self.session, sd_hash, prm) - metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob) - downloader = yield make_downloader(metadata, prm) - downloaders.append(downloader) - yield downloader.start() - defer.returnValue(downloader) - - def check_md5_sum(): - f = open(os.path.join(db_dir, 'test_file')) - hashsum = md5() - hashsum.update(f.read()) - self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") - - def delete_lbry_file(downloader): - logging.debug("deleting the file") - return self.lbry_file_manager.delete_lbry_file(downloader) - - def check_lbry_file(downloader): - d = downloader.status() - - def check_status_report(status_report): - self.assertEqual(status_report.num_known, status_report.num_completed) - self.assertEqual(status_report.num_known, 3) - - d.addCallback(check_status_report) - return d - - @defer.inlineCallbacks - def start_transfer(sd_hash): - # download a file, delete it, and download it again - - logging.debug("Starting the transfer") - yield self.session.setup() - yield add_lbry_file_to_sd_identifier(sd_identifier) - yield self.lbry_file_manager.setup() - downloader = yield download_file(sd_hash) - yield check_md5_sum() - yield check_lbry_file(downloader) - yield delete_lbry_file(downloader) - downloader = yield download_file(sd_hash) - yield check_lbry_file(downloader) - yield check_md5_sum() - yield delete_lbry_file(downloader) - - def stop(arg): - if isinstance(arg, Failure): - logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) - else: - logging.debug("Client is stopping normally.") - kill_event.set() - logging.debug("Set the kill event") - d = self.wait_for_event(dead_event, 15) - - def print_shutting_down(): - logging.info("Client is shutting down") - - d.addCallback(lambda _: print_shutting_down()) - d.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) - d.addCallback(lambda _: arg) - return d - - d = self.wait_for_hash_from_queue(sd_hash_queue) - d.addCallback(start_transfer) - d.addBoth(stop) - return d - - @unittest.skip("Sadly skipping failing test instead of fixing it") - def test_multiple_uploaders(self): - sd_hash_queue = Queue() - num_uploaders = 3 - kill_event = Event() - dead_events = [Event() for _ in range(num_uploaders)] - ready_events = [Event() for _ in range(1, num_uploaders)] - lbry_uploader = LbryUploader( - sd_hash_queue, kill_event, dead_events[0], 5209343, 9373419, 2 ** 22) - uploader = Process(target=lbry_uploader.start) - uploader.start() - self.server_processes.append(uploader) - - logging.debug("Testing multiple uploaders") - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, num_uploaders) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() - - db_dir, blob_dir = mk_db_and_blob_dir() - self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, - hash_announcer=hash_announcer, blob_dir=blob_dir, - peer_port=5553, rate_limiter=rate_limiter, - wallet=wallet, external_ip="127.0.0.1") - - self.lbry_file_manager = EncryptedFileManager( - self.session, sd_identifier) - - def start_additional_uploaders(sd_hash): - for i in range(1, num_uploaders): - uploader = Process(target=start_lbry_reuploader, - args=( - sd_hash, kill_event, dead_events[i], ready_events[i - 1], i, - 2 ** 10)) - uploader.start() - self.server_processes.append(uploader) - return defer.succeed(True) - - def wait_for_ready_events(): - return defer.DeferredList( - [self.wait_for_event(ready_event, 60) for ready_event in ready_events]) - - def make_downloader(metadata, prm): - info_validator = metadata.validator - options = metadata.options - factories = metadata.factories - chosen_options = [o.default_value for o in - options.get_downloader_options(info_validator, prm)] - return factories[0].make_downloader(metadata, chosen_options, prm) - - def download_file(sd_hash): - prm = self.session.payment_rate_manager - d = download_sd_blob(self.session, sd_hash, prm) - d.addCallback(sd_identifier.get_metadata_for_sd_blob) - d.addCallback(make_downloader, prm) - d.addCallback(lambda downloader: downloader.start()) - return d - - def check_md5_sum(): - f = open('test_file') - hashsum = md5() - hashsum.update(f.read()) - self.assertEqual(hashsum.hexdigest(), "e5941d615f53312fd66638239c1f90d5") - - def start_transfer(sd_hash): - - logging.debug("Starting the transfer") - - d = start_additional_uploaders(sd_hash) - d.addCallback(lambda _: wait_for_ready_events()) - d.addCallback(lambda _: self.session.setup()) - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - d.addCallback(lambda _: download_file(sd_hash)) - d.addCallback(lambda _: check_md5_sum()) - - return d - - def stop(arg): - if isinstance(arg, Failure): - logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) - else: - logging.debug("Client is stopping normally.") - kill_event.set() - logging.debug("Set the kill event") - d = defer.DeferredList( - [self.wait_for_event(dead_event, 15) for dead_event in dead_events]) - - def print_shutting_down(): - logging.info("Client is shutting down") - - d.addCallback(lambda _: print_shutting_down()) - d.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) - d.addCallback(lambda _: arg) - return d - - d = self.wait_for_hash_from_queue(sd_hash_queue) - d.addCallback(start_transfer) - d.addBoth(stop) - - return d - - if is_android(): - test_lbry_transfer.skip = TEST_SKIP_STRING_ANDROID - test_last_blob_retrieval.skip = TEST_SKIP_STRING_ANDROID - test_double_download.skip = TEST_SKIP_STRING_ANDROID - test_multiple_uploaders.skip = TEST_SKIP_STRING_ANDROID + self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") + # TODO: update these + # def test_last_blob_retrieval(self): + # kill_event = Event() + # dead_event_1 = Event() + # blob_hash_queue_1 = Queue() + # blob_hash_queue_2 = Queue() + # fast_uploader = Process(target=start_blob_uploader, + # args=(blob_hash_queue_1, kill_event, dead_event_1, False)) + # fast_uploader.start() + # self.server_processes.append(fast_uploader) + # dead_event_2 = Event() + # slow_uploader = Process(target=start_blob_uploader, + # args=(blob_hash_queue_2, kill_event, dead_event_2, True)) + # slow_uploader.start() + # self.server_processes.append(slow_uploader) + # + # logging.debug("Testing transfer") + # + # wallet = FakeWallet() + # peer_manager = PeerManager() + # peer_finder = FakePeerFinder(5553, peer_manager, 2) + # hash_announcer = FakeAnnouncer() + # rate_limiter = DummyRateLimiter() + # dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, + # node_id="abcd", externalIP="127.0.0.1") + # + # db_dir, blob_dir = mk_db_and_blob_dir() + # self.session = Session( + # conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", + # peer_finder=peer_finder, hash_announcer=hash_announcer, + # blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, + # rate_limiter=rate_limiter, wallet=wallet, + # dht_node=dht_node, external_ip="127.0.0.1") + # + # d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) + # d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) + # d = defer.DeferredList([d1, d2], fireOnOneErrback=True) + # + # def get_blob_hash(results): + # self.assertEqual(results[0][1], results[1][1]) + # return results[0][1] + # + # d.addCallback(get_blob_hash) + # + # def download_blob(blob_hash): + # prm = self.session.payment_rate_manager + # downloader = StandaloneBlobDownloader( + # blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet) + # d = downloader.download() + # return d + # + # def start_transfer(blob_hash): + # + # logging.debug("Starting the transfer") + # + # d = self.session.setup() + # d.addCallback(lambda _: download_blob(blob_hash)) + # + # return d + # + # d.addCallback(start_transfer) + # + # def stop(arg): + # if isinstance(arg, Failure): + # logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) + # else: + # logging.debug("Client is stopping normally.") + # kill_event.set() + # logging.debug("Set the kill event") + # d1 = self.wait_for_event(dead_event_1, 15) + # d2 = self.wait_for_event(dead_event_2, 15) + # dl = defer.DeferredList([d1, d2]) + # + # def print_shutting_down(): + # logging.info("Client is shutting down") + # + # dl.addCallback(lambda _: print_shutting_down()) + # dl.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) + # dl.addCallback(lambda _: arg) + # return dl + # + # d.addBoth(stop) + # return d + # + # def test_double_download(self): + # sd_hash_queue = Queue() + # kill_event = Event() + # dead_event = Event() + # lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343) + # uploader = Process(target=lbry_uploader.start) + # uploader.start() + # self.server_processes.append(uploader) + # + # logging.debug("Testing double download") + # + # wallet = FakeWallet() + # peer_manager = PeerManager() + # peer_finder = FakePeerFinder(5553, peer_manager, 1) + # hash_announcer = FakeAnnouncer() + # rate_limiter = DummyRateLimiter() + # sd_identifier = StreamDescriptorIdentifier() + # dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, + # node_id="abcd", externalIP="127.0.0.1") + # + # downloaders = [] + # + # db_dir, blob_dir = mk_db_and_blob_dir() + # self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, + # node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, + # hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, + # rate_limiter=rate_limiter, wallet=wallet, + # external_ip="127.0.0.1", dht_node=dht_node) + # + # self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) + # + # @defer.inlineCallbacks + # def make_downloader(metadata, prm): + # factories = metadata.factories + # downloader = yield factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, db_dir) + # defer.returnValue(downloader) + # + # @defer.inlineCallbacks + # def download_file(sd_hash): + # prm = self.session.payment_rate_manager + # sd_blob = yield download_sd_blob(self.session, sd_hash, prm) + # metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob) + # downloader = yield make_downloader(metadata, prm) + # downloaders.append(downloader) + # yield downloader.start() + # defer.returnValue(downloader) + # + # def check_md5_sum(): + # f = open(os.path.join(db_dir, 'test_file')) + # hashsum = md5() + # hashsum.update(f.read()) + # self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") + # + # def delete_lbry_file(downloader): + # logging.debug("deleting the file") + # return self.lbry_file_manager.delete_lbry_file(downloader) + # + # def check_lbry_file(downloader): + # d = downloader.status() + # + # def check_status_report(status_report): + # self.assertEqual(status_report.num_known, status_report.num_completed) + # self.assertEqual(status_report.num_known, 3) + # + # d.addCallback(check_status_report) + # return d + # + # @defer.inlineCallbacks + # def start_transfer(sd_hash): + # # download a file, delete it, and download it again + # + # logging.debug("Starting the transfer") + # yield self.session.setup() + # yield add_lbry_file_to_sd_identifier(sd_identifier) + # yield self.lbry_file_manager.setup() + # downloader = yield download_file(sd_hash) + # yield check_md5_sum() + # yield check_lbry_file(downloader) + # yield delete_lbry_file(downloader) + # downloader = yield download_file(sd_hash) + # yield check_lbry_file(downloader) + # yield check_md5_sum() + # yield delete_lbry_file(downloader) + # + # def stop(arg): + # if isinstance(arg, Failure): + # logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) + # else: + # logging.debug("Client is stopping normally.") + # kill_event.set() + # logging.debug("Set the kill event") + # d = self.wait_for_event(dead_event, 15) + # + # def print_shutting_down(): + # logging.info("Client is shutting down") + # + # d.addCallback(lambda _: print_shutting_down()) + # d.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) + # d.addCallback(lambda _: arg) + # return d + # + # d = self.wait_for_hash_from_queue(sd_hash_queue) + # d.addCallback(start_transfer) + # d.addBoth(stop) + # return d