diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py deleted file mode 100644 index f21af0c4f..000000000 --- a/tests/functional/test_misc.py +++ /dev/null @@ -1,358 +0,0 @@ -import os -from unittest import skip -from hashlib import md5 -from twisted.internet import defer, reactor -from twisted.trial import unittest -from lbrynet import conf -from lbrynet.p2p.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory -from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier -from lbrynet.p2p.StreamDescriptor import download_sd_blob -from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager -from lbrynet.p2p.BlobManager import DiskBlobManager -from lbrynet.extras.daemon.PeerManager import PeerManager -from lbrynet.p2p.RateLimiter import RateLimiter -from lbrynet.p2p.server.BlobRequestHandler import BlobRequestHandlerFactory -from lbrynet.p2p.server.ServerProtocol import ServerProtocolFactory -from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.blob.EncryptedFileCreator import create_lbry_file -from lbrynet.blob.EncryptedFileManager import EncryptedFileManager -from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier -from lbrynet.extras.compat import f2d - -from tests import mocks -from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir - -FakeNode = mocks.Node -FakeWallet = mocks.Wallet -FakePeerFinder = mocks.PeerFinder -FakeAnnouncer = mocks.Announcer -GenFile = mocks.GenFile -test_create_stream_sd_file = mocks.create_stream_sd_file -DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker - - -def init_conf_windows(settings={}): - """ - There is no fork on windows, so imports - are freshly initialized in new processes. - So conf needs to be initialized for new processes - """ - if os.name == 'nt': - original_settings = conf.settings - conf.settings = conf.Config(conf.FIXED_SETTINGS, conf.ADJUSTABLE_SETTINGS) - conf.settings.installation_id = conf.settings.get_installation_id() - conf.settings.update(settings) - - -class LbryUploader: - def __init__(self, file_size, ul_rate_limit=None): - self.file_size = file_size - self.ul_rate_limit = ul_rate_limit - self.kill_check = None - # these attributes get defined in `start` - self.db_dir = None - self.blob_dir = None - self.wallet = None - self.peer_manager = None - self.rate_limiter = None - self.prm = None - self.storage = None - self.blob_manager = None - self.lbry_file_manager = None - self.server_port = None - - @defer.inlineCallbacks - def setup(self): - init_conf_windows() - - self.db_dir, self.blob_dir = mk_db_and_blob_dir() - self.wallet = FakeWallet() - self.peer_manager = PeerManager() - self.rate_limiter = RateLimiter() - if self.ul_rate_limit is not None: - self.rate_limiter.set_ul_limit(self.ul_rate_limit) - self.prm = OnlyFreePaymentsManager() - self.storage = SQLiteStorage(':memory:') - self.blob_manager = DiskBlobManager(self.blob_dir, self.storage) - self.lbry_file_manager = EncryptedFileManager(FakePeerFinder(5553, self.peer_manager, 1), self.rate_limiter, - self.blob_manager, self.wallet, self.prm, self.storage, - StreamDescriptorIdentifier()) - - yield f2d(self.storage.open()) - yield f2d(self.blob_manager.setup()) - yield f2d(self.lbry_file_manager.setup()) - - query_handler_factories = { - 1: BlobAvailabilityHandlerFactory(self.blob_manager), - 2: BlobRequestHandlerFactory( - self.blob_manager, self.wallet, - self.prm, - None), - 3: self.wallet.get_wallet_info_query_handler_factory(), - } - server_factory = ServerProtocolFactory(self.rate_limiter, - query_handler_factories, - self.peer_manager) - self.server_port = reactor.listenTCP(5553, server_factory, interface="localhost") - test_file = GenFile(self.file_size, bytes(i for i in range(0, 64, 6))) - lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.prm, self.lbry_file_manager, - "test_file", test_file) - defer.returnValue(lbry_file.sd_hash) - - @defer.inlineCallbacks - def stop(self): - lbry_files = self.lbry_file_manager.lbry_files - for lbry_file in lbry_files: - yield self.lbry_file_manager.delete_lbry_file(lbry_file) - yield self.lbry_file_manager.stop() - yield f2d(self.blob_manager.stop()) - yield f2d(self.storage.close()) - self.server_port.stopListening() - rm_db_and_blob_dir(self.db_dir, self.blob_dir) - if os.path.exists("test_file"): - os.remove("test_file") - - -@skip -class TestTransfer(unittest.TestCase): - - @defer.inlineCallbacks - def setUp(self): - mocks.mock_conf_settings(self) - self.db_dir, self.blob_dir = mk_db_and_blob_dir() - self.wallet = FakeWallet() - self.peer_manager = PeerManager() - self.peer_finder = FakePeerFinder(5553, self.peer_manager, 1) - self.rate_limiter = RateLimiter() - self.prm = OnlyFreePaymentsManager() - self.storage = SQLiteStorage(':memory:') - self.blob_manager = DiskBlobManager(self.blob_dir, self.storage) - self.sd_identifier = StreamDescriptorIdentifier() - self.lbry_file_manager = EncryptedFileManager(self.peer_finder, self.rate_limiter, - self.blob_manager, self.wallet, self.prm, self.storage, - self.sd_identifier) - - self.uploader = LbryUploader(5209343) - self.sd_hash = yield self.uploader.setup() - yield f2d(self.storage.open()) - yield f2d(self.blob_manager.setup()) - yield f2d(self.lbry_file_manager.setup()) - yield add_lbry_file_to_sd_identifier(self.sd_identifier) - - @defer.inlineCallbacks - def tearDown(self): - yield self.uploader.stop() - lbry_files = self.lbry_file_manager.lbry_files - for lbry_file in lbry_files: - yield self.lbry_file_manager.delete_lbry_file(lbry_file) - yield self.lbry_file_manager.stop() - yield self.blob_manager.stop() - yield f2d(self.storage.close()) - rm_db_and_blob_dir(self.db_dir, self.blob_dir) - if os.path.exists("test_file"): - os.remove("test_file") - - @defer.inlineCallbacks - def test_lbry_transfer(self): - sd_blob = yield download_sd_blob( - self.conf, self.sd_hash, self.blob_manager, self.peer_finder, self.rate_limiter, self.prm, self.wallet - ) - metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) - downloader = yield metadata.factories[0].make_downloader( - metadata, self.prm.min_blob_data_payment_rate, self.prm, self.db_dir, download_mirrors=None - ) - yield downloader.start() - with open(os.path.join(self.db_dir, 'test_file'), 'rb') as f: - hashsum = md5() - hashsum.update(f.read()) - self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") - - # TODO: update these - # def test_last_blob_retrieval(self): - # kill_event = Event() - # dead_event_1 = Event() - # blob_hash_queue_1 = Queue() - # blob_hash_queue_2 = Queue() - # fast_uploader = Process(target=start_blob_uploader, - # args=(blob_hash_queue_1, kill_event, dead_event_1, False)) - # fast_uploader.start() - # self.server_processes.append(fast_uploader) - # dead_event_2 = Event() - # slow_uploader = Process(target=start_blob_uploader, - # args=(blob_hash_queue_2, kill_event, dead_event_2, True)) - # slow_uploader.start() - # self.server_processes.append(slow_uploader) - # - # logging.debug("Testing transfer") - # - # wallet = FakeWallet() - # peer_manager = PeerManager() - # peer_finder = FakePeerFinder(5553, peer_manager, 2) - # hash_announcer = FakeAnnouncer() - # rate_limiter = DummyRateLimiter() - # dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - # node_id="abcd", externalIP="127.0.0.1") - # - # db_dir, blob_dir = mk_db_and_blob_dir() - # self.session = Session( - # conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", - # peer_finder=peer_finder, hash_announcer=hash_announcer, - # blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, - # rate_limiter=rate_limiter, wallet=wallet, - # dht_node=dht_node, external_ip="127.0.0.1") - # - # d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) - # d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) - # d = defer.DeferredList([d1, d2], fireOnOneErrback=True) - # - # def get_blob_hash(results): - # self.assertEqual(results[0][1], results[1][1]) - # return results[0][1] - # - # d.addCallback(get_blob_hash) - # - # def download_blob(blob_hash): - # prm = self.session.payment_rate_manager - # downloader = StandaloneBlobDownloader( - # blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet) - # d = downloader.download() - # return d - # - # def start_transfer(blob_hash): - # - # logging.debug("Starting the transfer") - # - # d = self.session.setup() - # d.addCallback(lambda _: download_blob(blob_hash)) - # - # return d - # - # d.addCallback(start_transfer) - # - # def stop(arg): - # if isinstance(arg, Failure): - # logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) - # else: - # logging.debug("Client is stopping normally.") - # kill_event.set() - # logging.debug("Set the kill event") - # d1 = self.wait_for_event(dead_event_1, 15) - # d2 = self.wait_for_event(dead_event_2, 15) - # dl = defer.DeferredList([d1, d2]) - # - # def print_shutting_down(): - # logging.info("Client is shutting down") - # - # dl.addCallback(lambda _: print_shutting_down()) - # dl.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) - # dl.addCallback(lambda _: arg) - # return dl - # - # d.addBoth(stop) - # return d - # - # def test_double_download(self): - # sd_hash_queue = Queue() - # kill_event = Event() - # dead_event = Event() - # lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343) - # uploader = Process(target=lbry_uploader.start) - # uploader.start() - # self.server_processes.append(uploader) - # - # logging.debug("Testing double download") - # - # wallet = FakeWallet() - # peer_manager = PeerManager() - # peer_finder = FakePeerFinder(5553, peer_manager, 1) - # hash_announcer = FakeAnnouncer() - # rate_limiter = DummyRateLimiter() - # sd_identifier = StreamDescriptorIdentifier() - # dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - # node_id="abcd", externalIP="127.0.0.1") - # - # downloaders = [] - # - # db_dir, blob_dir = mk_db_and_blob_dir() - # self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - # node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, - # hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - # rate_limiter=rate_limiter, wallet=wallet, - # external_ip="127.0.0.1", dht_node=dht_node) - # - # self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) - # - # @defer.inlineCallbacks - # def make_downloader(metadata, prm): - # factories = metadata.factories - # downloader = yield factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, db_dir) - # defer.returnValue(downloader) - # - # @defer.inlineCallbacks - # def download_file(sd_hash): - # prm = self.session.payment_rate_manager - # sd_blob = yield download_sd_blob(self.session, sd_hash, prm) - # metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob) - # downloader = yield make_downloader(metadata, prm) - # downloaders.append(downloader) - # yield downloader.start() - # defer.returnValue(downloader) - # - # def check_md5_sum(): - # f = open(os.path.join(db_dir, 'test_file')) - # hashsum = md5() - # hashsum.update(f.read()) - # self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") - # - # def delete_lbry_file(downloader): - # logging.debug("deleting the file") - # return self.lbry_file_manager.delete_lbry_file(downloader) - # - # def check_lbry_file(downloader): - # d = downloader.status() - # - # def check_status_report(status_report): - # self.assertEqual(status_report.num_known, status_report.num_completed) - # self.assertEqual(status_report.num_known, 3) - # - # d.addCallback(check_status_report) - # return d - # - # @defer.inlineCallbacks - # def start_transfer(sd_hash): - # # download a file, delete it, and download it again - # - # logging.debug("Starting the transfer") - # yield self.session.setup() - # yield add_lbry_file_to_sd_identifier(sd_identifier) - # yield self.lbry_file_manager.setup() - # downloader = yield download_file(sd_hash) - # yield check_md5_sum() - # yield check_lbry_file(downloader) - # yield delete_lbry_file(downloader) - # downloader = yield download_file(sd_hash) - # yield check_lbry_file(downloader) - # yield check_md5_sum() - # yield delete_lbry_file(downloader) - # - # def stop(arg): - # if isinstance(arg, Failure): - # logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) - # else: - # logging.debug("Client is stopping normally.") - # kill_event.set() - # logging.debug("Set the kill event") - # d = self.wait_for_event(dead_event, 15) - # - # def print_shutting_down(): - # logging.info("Client is shutting down") - # - # d.addCallback(lambda _: print_shutting_down()) - # d.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir)) - # d.addCallback(lambda _: arg) - # return d - # - # d = self.wait_for_hash_from_queue(sd_hash_queue) - # d.addCallback(start_transfer) - # d.addBoth(stop) - # return d diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py deleted file mode 100644 index 8a459d1ed..000000000 --- a/tests/functional/test_reflector.py +++ /dev/null @@ -1,343 +0,0 @@ -import os -from unittest import skip -from binascii import hexlify - -from twisted.internet import defer, error -from twisted.trial import unittest -from lbrynet.p2p.StreamDescriptor import get_sd_info -from lbrynet.extras.compat import f2d -from lbrynet.extras.reflector.server.server import ReflectorServerFactory -from lbrynet.extras.reflector.client.client import EncryptedFileReflectorClientFactory -from lbrynet.extras.reflector.client.blob import BlobReflectorClientFactory -from lbrynet.extras.daemon.PeerManager import PeerManager -from lbrynet.p2p import BlobManager -from lbrynet.p2p import StreamDescriptor -from lbrynet.blob import EncryptedFileCreator -from lbrynet.blob.EncryptedFileManager import EncryptedFileManager -from lbrynet.p2p.RateLimiter import DummyRateLimiter -from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager -from tests import mocks -from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir - - -@skip -class TestReflector(unittest.TestCase): - - @defer.inlineCallbacks - def setUp(self): - self.reflector_port = None - self.port = None - mocks.mock_conf_settings(self) - self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir() - self.client_db_dir, self.client_blob_dir = mk_db_and_blob_dir() - prm = OnlyFreePaymentsManager() - wallet = mocks.Wallet() - peer_manager = PeerManager() - peer_finder = mocks.PeerFinder(5553, peer_manager, 2) - self.server_storage = SQLiteStorage(':memory:') - self.server_blob_manager = BlobManager.DiskBlobManager(self.server_blob_dir, self.server_storage) - self.client_storage = SQLiteStorage(':memory:') - self.client_blob_manager = BlobManager.DiskBlobManager(self.client_blob_dir, self.client_storage) - self.server_lbry_file_manager = EncryptedFileManager( - peer_finder, DummyRateLimiter(), self.server_blob_manager, wallet, prm, self.server_storage, - StreamDescriptor.StreamDescriptorIdentifier() - ) - self.client_lbry_file_manager = EncryptedFileManager( - peer_finder, DummyRateLimiter(), self.client_blob_manager, wallet, prm, self.client_storage, - StreamDescriptor.StreamDescriptorIdentifier() - ) - - self.expected_blobs = [ - ( - 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b' - '441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', - 2097152 - ), - ( - 'f4067522c1b49432a2a679512e3917144317caa1abba0c04' - '1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', - 2097152 - ), - ( - '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1' - 'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', - 1015056 - ), - ] - - yield f2d(self.server_storage.open()) - yield f2d(self.server_blob_manager.setup()) - yield f2d(self.server_lbry_file_manager.setup()) - yield f2d(self.client_storage.open()) - yield f2d(self.client_blob_manager.setup()) - yield f2d(self.client_lbry_file_manager.setup()) - - @defer.inlineCallbacks - def verify_equal(sd_info, stream_hash): - self.assertDictEqual(mocks.create_stream_sd_file, sd_info) - sd_hash = yield f2d(self.client_storage.get_sd_blob_hash_for_stream(stream_hash)) - defer.returnValue(sd_hash) - - def save_sd_blob_hash(sd_hash): - self.sd_hash = sd_hash - self.expected_blobs.append((sd_hash, 923)) - - def verify_stream_descriptor_file(stream_hash): - self.stream_hash = stream_hash - d = f2d(get_sd_info(self.client_storage, stream_hash, True)) - d.addCallback(verify_equal, stream_hash) - d.addCallback(save_sd_blob_hash) - return d - - def create_stream(): - test_file = mocks.GenFile(5209343, bytes((i + 3) for i in range(0, 64, 6))) - d = EncryptedFileCreator.create_lbry_file( - self.client_blob_manager, self.client_storage, prm, self.client_lbry_file_manager, - "test_file", - test_file, - key=b"0123456701234567", - iv_generator=iv_generator() - ) - d.addCallback(lambda lbry_file: lbry_file.stream_hash) - return d - - def start_server(): - server_factory = ReflectorServerFactory(peer_manager, self.server_blob_manager, - self.server_lbry_file_manager) - from twisted.internet import reactor - port = 8943 - while self.reflector_port is None: - try: - self.reflector_port = reactor.listenTCP(port, server_factory) - self.port = port - except error.CannotListenError: - port += 1 - - stream_hash = yield create_stream() - yield verify_stream_descriptor_file(stream_hash) - yield start_server() - - @defer.inlineCallbacks - def tearDown(self): - lbry_files = self.client_lbry_file_manager.lbry_files - for lbry_file in lbry_files: - yield self.client_lbry_file_manager.delete_lbry_file(lbry_file) - yield self.client_lbry_file_manager.stop() - yield f2d(self.client_storage.close()) - self.reflector_port.stopListening() - lbry_files = self.server_lbry_file_manager.lbry_files - for lbry_file in lbry_files: - yield self.server_lbry_file_manager.delete_lbry_file(lbry_file) - yield self.server_lbry_file_manager.stop() - yield f2d(self.server_storage.close()) - try: - rm_db_and_blob_dir(self.client_db_dir, self.client_blob_dir) - except Exception as err: - raise unittest.SkipTest("TODO: fix this for windows") - try: - rm_db_and_blob_dir(self.server_db_dir, self.server_blob_dir) - except Exception as err: - raise unittest.SkipTest("TODO: fix this for windows") - if os.path.exists("test_file"): - os.remove("test_file") - - def test_stream_reflector(self): - def verify_blob_on_reflector(): - check_blob_ds = [] - for blob_hash, blob_size in self.expected_blobs: - check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) - return defer.DeferredList(check_blob_ds) - - @defer.inlineCallbacks - def verify_stream_on_reflector(): - # check stream_info_manager has all the right information - streams = yield f2d(self.server_storage.get_all_streams()) - self.assertEqual(1, len(streams)) - self.assertEqual(self.stream_hash, streams[0]) - - blobs = yield f2d(self.server_storage.get_blobs_for_stream(self.stream_hash)) - blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None] - expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] - self.assertEqual(expected_blob_hashes, blob_hashes) - sd_hash = yield f2d(self.server_storage.get_sd_blob_hash_for_stream(streams[0])) - self.assertEqual(self.sd_hash, sd_hash) - - # check lbry file manager has the file - files = yield self.server_lbry_file_manager.lbry_files - - self.assertEqual(0, len(files)) - - streams = yield f2d(self.server_storage.get_all_streams()) - self.assertEqual(1, len(streams)) - stream_info = yield f2d(self.server_storage.get_stream_info(self.stream_hash)) - self.assertEqual(self.sd_hash, stream_info[3]) - self.assertEqual(hexlify(b'test_file').decode(), stream_info[0]) - - # check should_announce blobs on blob_manager - blob_hashes = yield f2d(self.server_storage.get_all_should_announce_blobs()) - self.assertSetEqual({self.sd_hash, expected_blob_hashes[0]}, set(blob_hashes)) - - def verify_have_blob(blob_hash, blob_size): - d = self.server_blob_manager.get_blob(blob_hash) - d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) - return d - - def send_to_server(): - factory = EncryptedFileReflectorClientFactory(self.client_blob_manager, self.stream_hash, self.sd_hash) - - from twisted.internet import reactor - reactor.connectTCP('localhost', self.port, factory) - return factory.finished_deferred - - def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.get_is_verified()) - self.assertEqual(blob_size, blob.length) - return - - d = send_to_server() - d.addCallback(lambda _: verify_blob_on_reflector()) - d.addCallback(lambda _: verify_stream_on_reflector()) - return d - - def test_blob_reflector(self): - def verify_data_on_reflector(): - check_blob_ds = [] - for blob_hash, blob_size in self.expected_blobs: - check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) - return defer.DeferredList(check_blob_ds) - - def verify_have_blob(blob_hash, blob_size): - d = self.server_blob_manager.get_blob(blob_hash) - d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) - return d - - def send_to_server(blob_hashes_to_send): - factory = BlobReflectorClientFactory( - self.client_blob_manager, - blob_hashes_to_send - ) - - from twisted.internet import reactor - reactor.connectTCP('localhost', self.port, factory) - return factory.finished_deferred - - def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.get_is_verified()) - self.assertEqual(blob_size, blob.length) - - d = send_to_server([x[0] for x in self.expected_blobs]) - d.addCallback(lambda _: verify_data_on_reflector()) - return d - - def test_blob_reflector_v1(self): - @defer.inlineCallbacks - def verify_stream_on_reflector(): - # this protocol should not have any impact on stream info manager - streams = yield f2d(self.server_storage.get_all_streams()) - self.assertEqual(0, len(streams)) - # there should be no should announce blobs here - blob_hashes = yield f2d(self.server_storage.get_all_should_announce_blobs()) - self.assertEqual(0, len(blob_hashes)) - - def verify_data_on_reflector(): - check_blob_ds = [] - for blob_hash, blob_size in self.expected_blobs: - check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) - return defer.DeferredList(check_blob_ds) - - def verify_have_blob(blob_hash, blob_size): - d = self.server_blob_manager.get_blob(blob_hash) - d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) - return d - - def send_to_server(blob_hashes_to_send): - factory = BlobReflectorClientFactory( - self.client_blob_manager, - blob_hashes_to_send - ) - factory.protocol_version = 0 - - from twisted.internet import reactor - reactor.connectTCP('localhost', self.port, factory) - return factory.finished_deferred - - def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.get_is_verified()) - self.assertEqual(blob_size, blob.length) - - d = send_to_server([x[0] for x in self.expected_blobs]) - d.addCallback(lambda _: verify_data_on_reflector()) - return d - - # test case when we reflect blob, and than that same blob - # is reflected as stream - @defer.inlineCallbacks - def test_blob_reflect_and_stream(self): - - def verify_blob_on_reflector(): - check_blob_ds = [] - for blob_hash, blob_size in self.expected_blobs: - check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) - return defer.DeferredList(check_blob_ds) - - @defer.inlineCallbacks - def verify_stream_on_reflector(): - # check stream_info_manager has all the right information - - streams = yield f2d(self.server_storage.get_all_streams()) - self.assertEqual(1, len(streams)) - self.assertEqual(self.stream_hash, streams[0]) - - blobs = yield f2d(self.server_storage.get_blobs_for_stream(self.stream_hash)) - blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None] - expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] - self.assertEqual(expected_blob_hashes, blob_hashes) - sd_hash = yield f2d(self.server_storage.get_sd_blob_hash_for_stream(self.stream_hash)) - self.assertEqual(self.sd_hash, sd_hash) - - # check should_announce blobs on blob_manager - to_announce = yield f2d(self.server_storage.get_all_should_announce_blobs()) - self.assertSetEqual(set(to_announce), {self.sd_hash, expected_blob_hashes[0]}) - - def verify_have_blob(blob_hash, blob_size): - d = self.server_blob_manager.get_blob(blob_hash) - d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) - return d - - def send_to_server_as_blobs(blob_hashes_to_send): - factory = BlobReflectorClientFactory( - self.client_blob_manager, - blob_hashes_to_send - ) - factory.protocol_version = 0 - - from twisted.internet import reactor - reactor.connectTCP('localhost', self.port, factory) - return factory.finished_deferred - - def send_to_server_as_stream(result): - factory = EncryptedFileReflectorClientFactory(self.client_blob_manager, self.stream_hash, self.sd_hash) - - from twisted.internet import reactor - reactor.connectTCP('localhost', self.port, factory) - return factory.finished_deferred - - def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.get_is_verified()) - self.assertEqual(blob_size, blob.length) - - # Modify this to change which blobs to send - blobs_to_send = self.expected_blobs - - finished = yield send_to_server_as_blobs([x[0] for x in self.expected_blobs]) - yield send_to_server_as_stream(finished) - yield verify_blob_on_reflector() - yield verify_stream_on_reflector() - - -def iv_generator(): - iv = 0 - while True: - iv += 1 - yield b"%016d" % iv diff --git a/tests/functional/test_streamify.py b/tests/functional/test_streamify.py deleted file mode 100644 index 1e3c351da..000000000 --- a/tests/functional/test_streamify.py +++ /dev/null @@ -1,109 +0,0 @@ -import os -import shutil -import tempfile -from hashlib import md5 -from twisted.trial.unittest import TestCase -from twisted.internet import defer, threads -from lbrynet.conf import Config -from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier -from lbrynet.p2p.BlobManager import DiskBlobManager -from lbrynet.p2p.StreamDescriptor import get_sd_info -from lbrynet.p2p.RateLimiter import DummyRateLimiter -from lbrynet.extras.compat import f2d -from lbrynet.extras.daemon.PeerManager import PeerManager -from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager -from lbrynet.blob.EncryptedFileCreator import create_lbry_file -from lbrynet.blob.EncryptedFileManager import EncryptedFileManager -from tests import mocks - - -FakeNode = mocks.Node -FakeWallet = mocks.Wallet -FakePeerFinder = mocks.PeerFinder -FakeAnnouncer = mocks.Announcer -GenFile = mocks.GenFile -test_create_stream_sd_file = mocks.create_stream_sd_file -DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker - - -class TestStreamify(TestCase): - maxDiff = 5000 - - @defer.inlineCallbacks - def setUp(self): - self.session = None - self.lbry_file_manager = None - self.is_generous = True - self.db_dir = tempfile.mkdtemp() - self.blob_dir = os.path.join(self.db_dir, "blobfiles") - conf = Config(data_dir=self.blob_dir) - os.mkdir(self.blob_dir) - self.dht_node = FakeNode() - self.wallet = FakeWallet() - self.peer_manager = PeerManager() - self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2) - self.rate_limiter = DummyRateLimiter() - self.sd_identifier = StreamDescriptorIdentifier() - self.storage = SQLiteStorage(conf, ':memory:') - self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore) - self.prm = OnlyFreePaymentsManager() - self.lbry_file_manager = EncryptedFileManager( - conf, self.peer_finder, self.rate_limiter, self.blob_manager, self.wallet, self.prm, self.storage, - self.sd_identifier - ) - yield f2d(self.storage.open()) - yield f2d(self.lbry_file_manager.setup()) - - @defer.inlineCallbacks - def tearDown(self): - lbry_files = self.lbry_file_manager.lbry_files - for lbry_file in lbry_files: - yield self.lbry_file_manager.delete_lbry_file(lbry_file) - yield self.lbry_file_manager.stop() - yield f2d(self.storage.close()) - shutil.rmtree(self.db_dir, ignore_errors=True) - if os.path.exists("test_file"): - os.remove("test_file") - - def test_create_stream(self): - - def verify_equal(sd_info): - self.assertEqual(sd_info, test_create_stream_sd_file) - - def verify_stream_descriptor_file(stream_hash): - d = f2d(get_sd_info(self.storage, stream_hash, True)) - d.addCallback(verify_equal) - return d - - def iv_generator(): - iv = 0 - while 1: - iv += 1 - yield b"%016d" % iv - - def create_stream(): - test_file = GenFile(5209343, bytes((i + 3) for i in range(0, 64, 6))) - d = create_lbry_file( - self.blob_manager, self.storage, self.prm, self.lbry_file_manager, "test_file", test_file, - key=b'0123456701234567', iv_generator=iv_generator() - ) - d.addCallback(lambda lbry_file: lbry_file.stream_hash) - return d - - d = create_stream() - d.addCallback(verify_stream_descriptor_file) - return d - - @defer.inlineCallbacks - def test_create_and_combine_stream(self): - test_file = GenFile(53209343, bytes((i + 5) for i in range(0, 64, 6))) - lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.prm, self.lbry_file_manager, - "test_file", test_file) - sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)) - self.assertTrue(lbry_file.sd_hash, sd_hash) - yield lbry_file.start() - f = open('test_file', 'rb') - hashsum = md5() - hashsum.update(f.read()) - self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b") diff --git a/tests/mocks.py b/tests/mocks.py deleted file mode 100644 index 7f4cb6775..000000000 --- a/tests/mocks.py +++ /dev/null @@ -1,474 +0,0 @@ -import asyncio -import base64 -import io -from unittest import mock - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.primitives import serialization -from twisted.internet import defer -from twisted.python.failure import Failure -from lbrynet.conf import Config -from lbrynet.p2p.client.ClientRequest import ClientRequest -from lbrynet.p2p.Error import RequestCanceledError -from lbrynet.p2p import BlobAvailability -from lbrynet.blob.EncryptedFileManager import EncryptedFileManager -from lbrynet.dht.node import Node as RealNode -from lbrynet.extras.daemon import exchange_rate_manager as ERM - -KB = 2**10 -PUBLIC_EXPONENT = 65537 # http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html - - -def decode_rsa_key(pem_key): - decoded = base64.b64decode(''.join(pem_key.splitlines()[1:-1])) - return serialization.load_der_public_key(decoded, default_backend()) - - -class FakeLBRYFile: - def __init__(self, blob_manager, stream_info_manager, stream_hash, uri="fake_uri"): - self.blob_manager = blob_manager - self.stream_info_manager = stream_info_manager - self.stream_hash = stream_hash - self.file_name = 'fake_lbry_file' - - -class Node(RealNode): - def joinNetwork(self, known_node_addresses=None): - return defer.succeed(None) - - def stop(self): - return defer.succeed(None) - - def start(self, known_node_addresses=None): - return self.joinNetwork(known_node_addresses) - - -class FakeNetwork: - @staticmethod - def get_local_height(): - return 1 - - @staticmethod - def get_server_height(): - return 1 - - -class BTCLBCFeed(ERM.MarketFeed): - def __init__(self): - ERM.MarketFeed.__init__( - self, - "BTCLBC", - "market name", - "derp.com", - None, - 0.0 - ) - - -class USDBTCFeed(ERM.MarketFeed): - def __init__(self): - ERM.MarketFeed.__init__( - self, - "USDBTC", - "market name", - "derp.com", - None, - 0.0 - ) - - -class ExchangeRateManager(ERM.ExchangeRateManager): - def __init__(self, market_feeds, rates): - self.market_feeds = market_feeds - for feed in self.market_feeds: - feed.rate = ERM.ExchangeRate( - feed.market, rates[feed.market]['spot'], rates[feed.market]['ts']) - - -class PointTraderKeyExchanger: - - def __init__(self, wallet): - self.wallet = wallet - self._protocols = [] - - def send_next_request(self, peer, protocol): - if not protocol in self._protocols: - r = ClientRequest({'public_key': self.wallet.encoded_public_key.decode()}, - 'public_key') - d = protocol.add_request(r) - d.addCallback(self._handle_exchange_response, peer, r, protocol) - d.addErrback(self._request_failed, peer) - self._protocols.append(protocol) - return defer.succeed(True) - else: - return defer.succeed(False) - - def _handle_exchange_response(self, response_dict, peer, request, protocol): - assert request.response_identifier in response_dict, \ - "Expected %s in dict but did not get it" % request.response_identifier - assert protocol in self._protocols, "Responding protocol is not in our list of protocols" - peer_pub_key = response_dict[request.response_identifier] - self.wallet.set_public_key_for_peer(peer, peer_pub_key) - return True - - def _request_failed(self, err, peer): - if not err.check(RequestCanceledError): - return err - - -class PointTraderKeyQueryHandlerFactory: - - def __init__(self, wallet): - self.wallet = wallet - - def build_query_handler(self): - q_h = PointTraderKeyQueryHandler(self.wallet) - return q_h - - def get_primary_query_identifier(self): - return 'public_key' - - def get_description(self): - return ("Point Trader Address - an address for receiving payments on the " - "point trader testing network") - - -class PointTraderKeyQueryHandler: - - def __init__(self, wallet): - self.wallet = wallet - self.query_identifiers = ['public_key'] - self.public_key = None - self.peer = None - - def register_with_request_handler(self, request_handler, peer): - self.peer = peer - request_handler.register_query_handler(self, self.query_identifiers) - - def handle_queries(self, queries): - if self.query_identifiers[0] in queries: - new_encoded_pub_key = queries[self.query_identifiers[0]] - try: - decode_rsa_key(new_encoded_pub_key) - except (ValueError, TypeError, IndexError): - raise ValueError(f"Client sent an invalid public key: {new_encoded_pub_key}") - self.public_key = new_encoded_pub_key - self.wallet.set_public_key_for_peer(self.peer, self.public_key) - fields = {'public_key': self.wallet.encoded_public_key.decode()} - return fields - if self.public_key is None: - raise ValueError("Expected but did not receive a public key") - else: - return {} - - -class Wallet: - def __init__(self): - self.private_key = rsa.generate_private_key(public_exponent=PUBLIC_EXPONENT, - key_size=1024, backend=default_backend()) - self.encoded_public_key = self.private_key.public_key().public_bytes(serialization.Encoding.PEM, - serialization.PublicFormat.PKCS1) - self._config = None - self.network = None - self.wallet = None - self.is_first_run = False - self.printed_retrieving_headers = False - self._start_check = None - self._catch_up_check = None - self._caught_up_counter = 0 - self._lag_counter = 0 - self.blocks_behind = 0 - self.catchup_progress = 0 - self.max_behind = 0 - - def start(self): - return defer.succeed(True) - - def stop(self): - return defer.succeed(True) - - def get_info_exchanger(self): - return PointTraderKeyExchanger(self) - - def update_peer_address(self, peer, address): - pass - - def get_wallet_info_query_handler_factory(self): - return PointTraderKeyQueryHandlerFactory(self) - - def get_unused_address_for_peer(self, peer): - return defer.succeed("bDtL6qriyimxz71DSYjojTBsm6cpM1bqmj") - - def reserve_points(self, *args): - return True - - def cancel_point_reservation(self, *args): - pass - - def send_points(self, *args): - return defer.succeed(True) - - def add_expected_payment(self, *args): - pass - - def get_balance(self): - return defer.succeed(1000) - - def set_public_key_for_peer(self, peer, public_key): - pass - - def get_claim_metadata_for_sd_hash(self, sd_hash): - return "fakeuri", "aa04a949348f9f094d503e5816f0cfb57ee68a22f6d08d149217d071243e0377", 1 - - def get_claimid(self, name, txid=None, nout=None): - return "aa04a949348f9f094d503e5816f0cfb57ee68a22f6d08d149217d071243e0378" - - -class PeerFinder: - def __init__(self, start_port, peer_manager, num_peers): - self.start_port = start_port - self.peer_manager = peer_manager - self.num_peers = num_peers - self.count = 0 - - def find_peers_for_blob(self, h, filter_self=False): - peer_port = self.start_port + self.count - self.count += 1 - if self.count >= self.num_peers: - self.count = 0 - return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)]) - - def run_manage_loop(self): - pass - - def stop(self): - pass - - -class Announcer: - def __init__(self, *args): - pass - - def hash_queue_size(self): - return 0 - - def immediate_announce(self, *args): - pass - - def start(self): - pass - - def stop(self): - pass - - -class GenFile(io.RawIOBase): - def __init__(self, size, pattern): - io.RawIOBase.__init__(self) - self.size = size - self.pattern = pattern - self.read_so_far = 0 - self.buff = b'' - self.last_offset = 0 - self.name = "." - - def readable(self): - return True - - def writable(self): - return False - - def read(self, n=-1): - if n > -1: - bytes_to_read = min(n, self.size - self.read_so_far) - else: - bytes_to_read = self.size - self.read_so_far - output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] - bytes_to_read -= len(output) - while bytes_to_read > 0: - self.buff = self._generate_chunk() - new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] - bytes_to_read -= len(new_output) - output += new_output - self.read_so_far += len(output) - return output - - def readall(self): - return self.read() - - def _generate_chunk(self, size=KB): - output = self.pattern[self.last_offset:self.last_offset + size] - n_left = size - len(output) - whole_patterns = n_left // len(self.pattern) - output += self.pattern * whole_patterns - self.last_offset = size - len(output) - output += self.pattern[:self.last_offset] - return output - - -class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker): - """ - Class to track peer counts for known blobs, and to discover new popular blobs - - Attributes: - availability (dict): dictionary of peers for known blobs - """ - - def __init__(self, blob_manager=None, peer_finder=None, dht_node=None): - self.availability = { - '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb' - '4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'], - 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5' - '09138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'], - '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9' - '667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd': - ['1.2.3.4', '1.2.3.4', '1.2.3.4'], - '6d8017aba362e5c5d0046625a039513419810a0397d72831' - '8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07': - ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f' - 'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd': - ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe' - '66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787': - ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c' - '243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c': - ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - '8c70d5e2f5c3a6085006198e5192d157a125d92e73787944' - '72007a61947992768926513fc10924785bdb1761df3c37e6': - ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', - '1.2.3.4'], - 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0' - 'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': - ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', - '1.2.3.4', '1.2.3.4'], - 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835' - 'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': - ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', - '1.2.3.4', '1.2.3.4', '1.2.3.4'], - } - self._blob_manager = None - self._peer_finder = PeerFinder(11223, 11224, 2) - self._dht_node = None - self._check_popular = None - self._check_mine = None - self._set_mean_peers() - - def start(self): - pass - - def stop(self): - pass - - -# The components below viz. FakeWallet, FakeSession, FakeFileManager are just for testing Component Manager's -# startup and stop -class FakeComponent: - depends_on = [] - component_name = None - - def __init__(self, component_manager): - self.component_manager = component_manager - self._running = False - - @property - def running(self): - return self._running - - async def start(self): - pass - - async def stop(self): - pass - - @property - def component(self): - return self - - async def _setup(self): - result = await self.start() - self._running = True - return result - - async def _stop(self): - result = await self.stop() - self._running = False - return result - - async def get_status(self): - return {} - - def __lt__(self, other): - return self.component_name < other.component_name - - -class FakeDelayedWallet(FakeComponent): - component_name = "wallet" - depends_on = [] - - async def stop(self): - await asyncio.sleep(1) - - -class FakeDelayedBlobManager(FakeComponent): - component_name = "blob_manager" - depends_on = [FakeDelayedWallet.component_name] - - async def start(self): - await asyncio.sleep(1) - - async def stop(self): - await asyncio.sleep(1) - - -class FakeDelayedFileManager(FakeComponent): - component_name = "file_manager" - depends_on = [FakeDelayedBlobManager.component_name] - - async def start(self): - await asyncio.sleep(1) - - -class FakeFileManager(FakeComponent): - component_name = "file_manager" - depends_on = [] - - @property - def component(self): - return mock.Mock(spec=EncryptedFileManager) - - -create_stream_sd_file = { - 'stream_name': '746573745f66696c65', - 'blobs': [ - { - 'length': 2097152, - 'blob_num': 0, - 'blob_hash': 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b' - '441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', - 'iv': '30303030303030303030303030303031' - }, - { - 'length': 2097152, - 'blob_num': 1, - 'blob_hash': 'f4067522c1b49432a2a679512e3917144317caa1abba0c04' - '1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', - 'iv': '30303030303030303030303030303032' - }, - { - 'length': 1015056, - 'blob_num': 2, - 'blob_hash': '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1' - 'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', - 'iv': '30303030303030303030303030303033' - }, - {'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'} - ], - 'stream_type': 'lbryfile', - 'key': '30313233343536373031323334353637', - 'suggested_file_name': '746573745f66696c65', - 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d' - '315c0f998c4b3545c2dc60906122d94653c23b1898229e3f' -} diff --git a/tests/unit/core/client/__init__.py b/tests/unit/core/client/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/unit/core/client/test_ConnectionManager.py b/tests/unit/core/client/test_ConnectionManager.py deleted file mode 100644 index 11a8f2478..000000000 --- a/tests/unit/core/client/test_ConnectionManager.py +++ /dev/null @@ -1,269 +0,0 @@ -from twisted.trial.unittest import TestCase -from twisted.internet import defer, reactor, task -from twisted.internet.task import deferLater -from twisted.internet.protocol import ServerFactory - -from lbrynet import conf, utils -from lbrynet.p2p.client.ClientRequest import ClientRequest -from lbrynet.p2p.server.ServerProtocol import ServerProtocol -from lbrynet.p2p.client.ClientProtocol import ClientProtocol -from lbrynet.p2p.RateLimiter import RateLimiter -from lbrynet.p2p.Peer import Peer -from lbrynet.p2p.Error import NoResponseError -from lbrynet.extras.daemon.PeerManager import PeerManager -from lbrynet.conf import Config - -PEER_PORT = 5551 -LOCAL_HOST = '127.0.0.1' - - -class MocDownloader: - def insufficient_funds(self): - pass - - -class MocRequestCreator: - - def __init__(self, peers_to_return, peers_to_return_head_blob=None): - self.peers_to_return = peers_to_return - self.peers_to_return_head_blob = peers_to_return_head_blob or [] - self.sent_request = False - - def send_next_request(self, peer, protocol): - if self.sent_request is True: - return defer.succeed(False) - response_identifier = 'moc_request' - r_dict = {'moc_request':0} - request = ClientRequest(r_dict, response_identifier) - d = protocol.add_request(request) # ClientRequest here - d.addErrback(self.request_err, peer) - d.addCallback(self.request_success) - self.sent_request = True - return defer.succeed(True) - - def request_success(self, suc): - pass - - def request_err(self, err, peer): - if isinstance(err.value, NoResponseError): - return err - - def get_new_peers_for_next_unavailable(self): - return self.peers_to_return - - def get_new_peers_for_head_blob(self): - return self.peers_to_return_head_blob - - -class MocFunctionalQueryHandler: - - def __init__(self, clock, is_good=True, is_delayed=False): - self.query_identifiers = ['moc_request'] - self.is_good = is_good - self.is_delayed = is_delayed - self.clock = clock - - def register_with_request_handler(self, request_handler, peer): - request_handler.register_query_handler(self, self.query_identifiers) - - def handle_queries(self, queries): - if self.query_identifiers[0] in queries: - if self.is_delayed: - delay = ClientProtocol.PROTOCOL_TIMEOUT+1 - out = deferLater(self.clock, delay, lambda: {'moc_request':0}) - self.clock.advance(delay) - return out - if self.is_good: - return defer.succeed({'moc_request':0}) - else: - return defer.succeed({'bad_request':0}) - else: - return defer.succeed({}) - - -class MocQueryHandlerFactory: - # is is_good, the query handler works as expectd, - # is is_delayed, the query handler will delay its resposne - def __init__(self, clock, is_good=True, is_delayed=False): - self.is_good = is_good - self.is_delayed = is_delayed - self.clock = clock - - def build_query_handler(self): - return MocFunctionalQueryHandler(self.clock, self.is_good, self.is_delayed) - - def get_primary_query_identifier(self): - return 'moc_query' - - def get_description(self): - return "This is a Moc Query" - - -class MocServerProtocolFactory(ServerFactory): - protocol = ServerProtocol - - def __init__(self, clock, is_good=True, is_delayed=False, has_moc_query_handler=True): - self.rate_limiter = RateLimiter() - query_handler_factory = MocQueryHandlerFactory(clock, is_good, is_delayed) - if has_moc_query_handler: - self.query_handler_factories = { - query_handler_factory.get_primary_query_identifier():query_handler_factory - } - else: - self.query_handler_factories = {} - self.peer_manager = PeerManager() - - -class TestIntegrationConnectionManager(TestCase): - skip = 'times out, needs to be refactored to work with py3' - - def setUp(self): - - conf = Config() - - self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT) - self.downloader = MocDownloader() - self.downloader.conf = conf - self.rate_limiter = RateLimiter() - self.primary_request_creator = MocRequestCreator([self.TEST_PEER]) - self.clock = task.Clock() - utils.call_later = self.clock.callLater - self.server_port = None - - def _init_connection_manager(self, seek_head_blob_first=False): - # this import is required here so utils.call_later is replaced by self.clock.callLater - from lbrynet.p2p.client.ConnectionManager import ConnectionManager - self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter, - [self.primary_request_creator], []) - self.connection_manager.seek_head_blob_first = seek_head_blob_first - self.connection_manager._start() - - def tearDown(self): - if self.server_port is not None: - self.server_port.stopListening() - self.connection_manager.stop() - conf.settings = None - - @defer.inlineCallbacks - def test_success(self): - self._init_connection_manager() - # test to see that if we setup a server, we get a connection - self.server = MocServerProtocolFactory(self.clock) - self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) - yield self.connection_manager.manage(schedule_next_call=False) - self.assertEqual(1, self.connection_manager.num_peer_connections()) - connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\ - factory.connection_was_made_deferred - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertTrue(connection_made) - self.assertEqual(1, self.TEST_PEER.success_count) - self.assertEqual(0, self.TEST_PEER.down_count) - - @defer.inlineCallbacks - def test_server_with_improper_reply(self): - self._init_connection_manager() - self.server = MocServerProtocolFactory(self.clock, is_good=False) - self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) - yield self.connection_manager.manage(schedule_next_call=False) - self.assertEqual(1, self.connection_manager.num_peer_connections()) - connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\ - factory.connection_was_made_deferred - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertTrue(connection_made) - self.assertEqual(0, self.TEST_PEER.success_count) - self.assertEqual(1, self.TEST_PEER.down_count) - - @defer.inlineCallbacks - def test_non_existing_server(self): - # Test to see that if we don't setup a server, we don't get a connection - - self._init_connection_manager() - yield self.connection_manager.manage(schedule_next_call=False) - self.assertEqual(1, self.connection_manager.num_peer_connections()) - connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\ - factory.connection_was_made_deferred - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertFalse(connection_made) - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertEqual(0, self.TEST_PEER.success_count) - self.assertEqual(1, self.TEST_PEER.down_count) - - @defer.inlineCallbacks - def test_parallel_connections(self): - # Test to see that we make two new connections at a manage call, - # without it waiting for the connection to complete - - self._init_connection_manager() - test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1) - self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2] - yield self.connection_manager.manage(schedule_next_call=False) - self.assertEqual(2, self.connection_manager.num_peer_connections()) - self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections) - self.assertIn(test_peer2, self.connection_manager._peer_connections) - - deferred_conn_made_peer1 = self.connection_manager._peer_connections[self.TEST_PEER].\ - factory.connection_was_made_deferred - deferred_conn_made_peer1.addCallback(lambda conn_made: self.assertFalse(conn_made)) - - deferred_conn_made_peer2 = self.connection_manager._peer_connections[test_peer2].\ - factory.connection_was_made_deferred - deferred_conn_made_peer2.addCallback(lambda conn_made: self.assertFalse(conn_made)) - - yield deferred_conn_made_peer1 - yield deferred_conn_made_peer2 - - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertEqual(0, self.TEST_PEER.success_count) - self.assertEqual(1, self.TEST_PEER.down_count) - self.assertEqual(0, test_peer2.success_count) - self.assertEqual(1, test_peer2.down_count) - - @defer.inlineCallbacks - def test_stop(self): - # test to see that when we call stop, the ConnectionManager waits for the - # current manage call to finish, closes connections, - # and removes scheduled manage calls - self._init_connection_manager() - self.connection_manager.manage(schedule_next_call=True) - yield self.connection_manager.stop() - self.assertEqual(0, self.TEST_PEER.success_count) - self.assertEqual(1, self.TEST_PEER.down_count) - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertIsNone(self.connection_manager._next_manage_call) - - @defer.inlineCallbacks - def test_closed_connection_when_server_is_slow(self): - self._init_connection_manager() - self.server = MocServerProtocolFactory( - self.clock, has_moc_query_handler=True, is_delayed=True) - self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) - - yield self.connection_manager.manage(schedule_next_call=False) - self.assertEqual(1, self.connection_manager.num_peer_connections()) - connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\ - factory.connection_was_made_deferred - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertTrue(connection_made) - self.assertEqual(0, self.TEST_PEER.success_count) - self.assertEqual(1, self.TEST_PEER.down_count) - - # test header first seeks - @defer.inlineCallbacks - def test_no_peer_for_head_blob(self): - # test that if we can't find blobs for the head blob, - # it looks at the next unavailable and makes connection - self._init_connection_manager(seek_head_blob_first=True) - self.server = MocServerProtocolFactory(self.clock) - self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) - - self.primary_request_creator.peers_to_return_head_blob = [] - self.primary_request_creator.peers_to_return = [self.TEST_PEER] - - yield self.connection_manager.manage(schedule_next_call=False) - self.assertEqual(1, self.connection_manager.num_peer_connections()) - connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\ - factory.connection_was_made_deferred - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertTrue(connection_made) - self.assertEqual(1, self.TEST_PEER.success_count) - self.assertEqual(0, self.TEST_PEER.down_count) diff --git a/tests/unit/core/server/__init__.py b/tests/unit/core/server/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py deleted file mode 100644 index fe7c9a8b8..000000000 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ /dev/null @@ -1,126 +0,0 @@ -from io import BytesIO -from unittest import mock - -from twisted.internet import defer -from twisted.test import proto_helpers -from twisted.trial import unittest - -from lbrynet.p2p import Peer -from lbrynet.p2p.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager -from lbrynet.conf import Config -from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker - - -class TestBlobRequestHandlerQueries(unittest.TestCase): - def setUp(self): - conf = Config() - self.blob_manager = mock.Mock() - self.payment_rate_manager = NegotiatedPaymentRateManager( - BasePaymentRateManager(0.001, conf.min_info_rate), DummyBlobAvailabilityTracker(), conf.is_generous_host - ) - from lbrynet.p2p.server import BlobRequestHandler - self.handler = BlobRequestHandler.BlobRequestHandler( - self.blob_manager, None, self.payment_rate_manager, None) - - def test_empty_response_when_empty_query(self): - self.assertEqual({}, self.handler.handle_queries({})) - - def test_error_set_when_rate_is_missing(self): - query = {'requested_blob': 'blob'} - response = {'incoming_blob': {'error': 'RATE_UNSET'}} - self.assertEqual(response, self.handler.handle_queries(query)) - - def test_error_set_when_rate_too_low(self): - query = { - 'blob_data_payment_rate': -1.0, - 'requested_blob': 'blob' - } - response = { - 'blob_data_payment_rate': 'RATE_TOO_LOW', - 'incoming_blob': {'error': 'RATE_UNSET'} - } - self.assertEqual(response, self.handler.handle_queries(query)) - - def test_response_when_rate_too_low(self): - query = { - 'blob_data_payment_rate': -1.0, - } - response = { - 'blob_data_payment_rate': 'RATE_TOO_LOW', - } - self.assertEqual(response, self.handler.handle_queries(query)) - - def test_blob_unavailable_when_blob_not_validated(self): - blob = mock.Mock() - blob.get_is_verified.return_value = False - self.blob_manager.get_blob.return_value = defer.succeed(blob) - query = { - 'blob_data_payment_rate': 1.0, - 'requested_blob': 'blob' - } - response = { - 'blob_data_payment_rate': 'RATE_ACCEPTED', - 'incoming_blob': {'error': 'BLOB_UNAVAILABLE'} - } - self.assertEqual(response, self.handler.handle_queries(query)) - - def test_blob_unavailable_when_blob_cannot_be_opened(self): - blob = mock.Mock() - blob.get_is_verified.return_value = True - blob.open_for_reading.return_value = None - self.blob_manager.get_blob.return_value = defer.succeed(blob) - query = { - 'blob_data_payment_rate': 0.0, - 'requested_blob': 'blob' - } - response = { - 'blob_data_payment_rate': 'RATE_ACCEPTED', - 'incoming_blob': {'error': 'BLOB_UNAVAILABLE'} - } - self.assertEqual(response, self.handler.handle_queries(query)) - - def test_blob_details_are_set_when_all_conditions_are_met(self): - blob = mock.Mock() - blob.get_is_verified.return_value = True - blob.open_for_reading.return_value = True - blob.blob_hash = 'DEADBEEF' - blob.length = 42 - peer = mock.Mock() - peer.host = "1.2.3.4" - self.handler.peer = peer - self.blob_manager.get_blob.return_value = defer.succeed(blob) - query = { - 'blob_data_payment_rate': 1.0, - 'requested_blob': 'blob' - } - response = { - 'blob_data_payment_rate': 'RATE_ACCEPTED', - 'incoming_blob': { - 'blob_hash': 'DEADBEEF', - 'length': 42 - } - } - self.assertEqual(response, self.handler.handle_queries(query)) - - -class TestBlobRequestHandlerSender(unittest.TestCase): - def test_nothing_happens_if_not_currently_uploading(self): - from lbrynet.p2p.server import BlobRequestHandler - handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None) - handler.currently_uploading = None - deferred = handler.send_blob_if_requested(None) - self.assertTrue(self.successResultOf(deferred)) - - def test_file_is_sent_to_consumer(self): - # TODO: also check that the expected payment values are set - consumer = proto_helpers.StringTransport() - test_file = BytesIO(b'test') - from lbrynet.p2p.server import BlobRequestHandler - handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None) - handler.peer = mock.create_autospec(Peer.Peer) - handler.currently_uploading = mock.Mock() - handler.read_handle = test_file - handler.send_blob_if_requested(consumer) - while consumer.producer: - consumer.producer.resumeProducing() - self.assertEqual(consumer.value(), b'test') diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py deleted file mode 100644 index 7abec9370..000000000 --- a/tests/unit/core/test_BlobManager.py +++ /dev/null @@ -1,136 +0,0 @@ -import tempfile -import shutil -import os -import random -import string -from twisted.trial import unittest -from twisted.internet import defer - -from tests.test_utils import random_lbry_hash -from lbrynet.p2p.BlobManager import DiskBlobManager -from lbrynet.extras.compat import f2d -from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.p2p.Peer import Peer -from lbrynet.cryptoutils import get_lbry_hash_obj -from lbrynet.conf import Config - - -class BlobManagerTest(unittest.TestCase): - - @defer.inlineCallbacks - def setUp(self): - self.blob_dir = tempfile.mkdtemp() - self.db_dir = tempfile.mkdtemp() - self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(Config(data_dir=self.blob_dir), ':memory:')) - self.peer = Peer('somehost', 22) - yield f2d(self.bm.storage.open()) - - @defer.inlineCallbacks - def tearDown(self): - yield f2d(self.bm.storage.close()) - shutil.rmtree(self.blob_dir) - shutil.rmtree(self.db_dir) - - @defer.inlineCallbacks - def _create_and_add_blob(self, should_announce=False): - # create and add blob to blob manager - data_len = random.randint(1, 1000) - data = b''.join(random.choice(string.ascii_lowercase).encode() for _ in range(data_len)) - - hashobj = get_lbry_hash_obj() - hashobj.update(data) - out = hashobj.hexdigest() - blob_hash = out - - # create new blob - yield f2d(self.bm.setup()) - blob = self.bm.get_blob(blob_hash, len(data)) - - writer, finished_d = blob.open_for_writing(self.peer) - yield writer.write(data) - yield self.bm.blob_completed(blob, should_announce) - - # check to see if blob is there - self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash))) - blobs = yield self.bm.get_all_verified_blobs() - self.assertIn(blob_hash, blobs) - defer.returnValue(blob_hash) - - @defer.inlineCallbacks - def test_create_blob(self): - blob_hashes = [] - - # create a bunch of blobs - for i in range(0, 10): - blob_hash = yield self._create_and_add_blob() - blob_hashes.append(blob_hash) - blobs = yield self.bm.get_all_verified_blobs() - self.assertEqual(10, len(blobs)) - - @defer.inlineCallbacks - def test_delete_blob(self): - # create blob - blob_hash = yield self._create_and_add_blob() - blobs = yield self.bm.get_all_verified_blobs() - self.assertEqual(len(blobs), 1) - - # delete blob - yield self.bm.delete_blobs([blob_hash]) - self.assertFalse(os.path.isfile(os.path.join(self.blob_dir, blob_hash))) - blobs = yield self.bm.get_all_verified_blobs() - self.assertEqual(len(blobs), 0) - blobs = yield f2d(self.bm.storage.get_all_blob_hashes()) - self.assertEqual(len(blobs), 0) - self.assertNotIn(blob_hash, self.bm.blobs) - - # delete blob that was already deleted once - yield self.bm.delete_blobs([blob_hash]) - - # delete blob that does not exist, nothing will - # happen - blob_hash = random_lbry_hash() - yield self.bm.delete_blobs([blob_hash]) - - @defer.inlineCallbacks - def test_delete_open_blob(self): - # Test that a blob that is opened for writing will not be deleted - - # create blobs - blob_hash = None - blob_hashes = [] - for i in range(0, 10): - blob_hash = yield self._create_and_add_blob() - blob_hashes.append(blob_hash) - blobs = yield self.bm.get_all_verified_blobs() - self.assertEqual(len(blobs), 10) - - # open the last blob - blob = self.bm.get_blob(blob_hashes[-1]) - w, finished_d = blob.open_for_writing(self.peer) - - # schedule a close, just to leave the reactor clean - finished_d.addBoth(lambda x: None) - self.addCleanup(w.close) - - # delete the last blob and check if it still exists - yield self.bm.delete_blobs([blob_hash]) - blobs = yield self.bm.get_all_verified_blobs() - self.assertEqual(len(blobs), 10) - self.assertIn(blob_hashes[-1], blobs) - self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hashes[-1]))) - - @defer.inlineCallbacks - def test_should_announce(self): - # create blob with should announce - blob_hash = yield self._create_and_add_blob(should_announce=True) - out = yield self.bm.get_should_announce(blob_hash) - self.assertTrue(out) - count = yield self.bm.count_should_announce_blobs() - self.assertEqual(1, count) - - # set should announce to False - yield self.bm.set_should_announce(blob_hash, should_announce=False) - out = yield self.bm.get_should_announce(blob_hash) - self.assertFalse(out) - count = yield self.bm.count_should_announce_blobs() - self.assertEqual(0, count) diff --git a/tests/unit/core/test_HTTPBlobDownloader.py b/tests/unit/core/test_HTTPBlobDownloader.py deleted file mode 100644 index dce9207dc..000000000 --- a/tests/unit/core/test_HTTPBlobDownloader.py +++ /dev/null @@ -1,97 +0,0 @@ -from unittest.mock import MagicMock - -from twisted.trial import unittest -from twisted.internet import defer - -from lbrynet.blob.blob_file import BlobFile -from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader -from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir - - -class HTTPBlobDownloaderTest(unittest.TestCase): - def setUp(self): - self.db_dir, self.blob_dir = mk_db_and_blob_dir() - self.blob_manager = MagicMock() - self.client = MagicMock() - self.blob_hash = ('d17272b17a1ad61c4316ac13a651c2b0952063214a81333e' - '838364b01b2f07edbd165bb7ec60d2fb2f337a2c02923852') - self.blob = BlobFile(self.blob_dir, self.blob_hash) - self.blob_manager.get_blob.side_effect = lambda _: self.blob - self.response = MagicMock(code=200, length=400) - self.client.get.side_effect = lambda uri: defer.succeed(self.response) - self.downloader = HTTPBlobDownloader( - self.blob_manager, [self.blob_hash], [('server1', 80)], self.client, retry=False - ) - self.downloader.interval = 0 - - def tearDown(self): - self.downloader.stop() - rm_db_and_blob_dir(self.db_dir, self.blob_dir) - - @defer.inlineCallbacks - def test_download_successful(self): - self.client.collect.side_effect = collect - yield self.downloader.start() - self.blob_manager.get_blob.assert_called_with(self.blob_hash) - self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}') - self.client.collect.assert_called() - self.assertEqual(self.blob.get_length(), self.response.length) - self.assertTrue(self.blob.get_is_verified()) - self.assertEqual(self.blob.writers, {}) - - @defer.inlineCallbacks - def test_download_invalid_content(self): - self.client.collect.side_effect = bad_collect - yield self.downloader.start() - self.assertEqual(self.blob.get_length(), self.response.length) - self.assertFalse(self.blob.get_is_verified()) - self.assertEqual(self.blob.writers, {}) - - @defer.inlineCallbacks - def test_peer_finished_first_causing_a_write_on_closed_handle(self): - self.client.collect.side_effect = lambda response, write: defer.fail(IOError('I/O operation on closed file')) - yield self.downloader.start() - self.blob_manager.get_blob.assert_called_with(self.blob_hash) - self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}') - self.client.collect.assert_called() - self.assertEqual(self.blob.get_length(), self.response.length) - self.assertEqual(self.blob.writers, {}) - - @defer.inlineCallbacks - def test_download_transfer_failed(self): - self.client.collect.side_effect = lambda response, write: defer.fail(Exception()) - yield self.downloader.start() - self.assertEqual(len(self.client.collect.mock_calls), self.downloader.max_failures) - self.blob_manager.get_blob.assert_called_with(self.blob_hash) - self.assertEqual(self.blob.get_length(), self.response.length) - self.assertFalse(self.blob.get_is_verified()) - self.assertEqual(self.blob.writers, {}) - - @defer.inlineCallbacks - def test_blob_not_found(self): - self.response.code = 404 - yield self.downloader.start() - self.blob_manager.get_blob.assert_called_with(self.blob_hash) - self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}') - self.client.collect.assert_not_called() - self.assertFalse(self.blob.get_is_verified()) - self.assertEqual(self.blob.writers, {}) - - def test_stop(self): - self.client.collect.side_effect = lambda response, write: defer.Deferred() - self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop - self.downloader.stop() - self.blob_manager.get_blob.assert_called_with(self.blob_hash) - self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}') - self.client.collect.assert_called() - self.assertEqual(self.blob.get_length(), self.response.length) - self.assertFalse(self.blob.get_is_verified()) - self.assertEqual(self.blob.writers, {}) - - -def collect(response, write): - write(b'f' * response.length) - - -def bad_collect(response, write): - write('0' * response.length) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py deleted file mode 100644 index 591008792..000000000 --- a/tests/unit/core/test_HashBlob.py +++ /dev/null @@ -1,162 +0,0 @@ -from lbrynet.blob.blob_file import BlobFile -from lbrynet.p2p.Error import DownloadCanceledError, InvalidDataError - -from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lbry_hash -from twisted.trial import unittest -from twisted.internet import defer - - -class BlobFileTest(unittest.TestCase): - def setUp(self): - self.db_dir, self.blob_dir = mk_db_and_blob_dir() - self.fake_content_len = 64 - self.fake_content = b'0'*self.fake_content_len - self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca560' \ - '2b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105' - - def tearDown(self): - rm_db_and_blob_dir(self.db_dir, self.blob_dir) - - @defer.inlineCallbacks - def test_good_write_and_read(self): - # test a write that should succeed - blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - self.assertFalse(blob_file.verified) - - writer, finished_d = blob_file.open_for_writing(peer=1) - writer.write(self.fake_content) - writer.close() - out = yield finished_d - self.assertIsInstance(out, BlobFile) - self.assertTrue(out.verified) - self.assertEqual(self.fake_content_len, out.get_length()) - - # read from the instance used to write to, and verify content - f = blob_file.open_for_reading() - c = f.read() - self.assertEqual(c, self.fake_content) - self.assertFalse(out.is_downloading()) - - # read from newly declared instance, and verify content - del blob_file - blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - self.assertTrue(blob_file.verified) - f = blob_file.open_for_reading() - self.assertEqual(1, blob_file.readers) - c = f.read() - self.assertEqual(c, self.fake_content) - - # close reader - f.close() - self.assertEqual(0, blob_file.readers) - - - @defer.inlineCallbacks - def test_delete(self): - blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - writer, finished_d = blob_file.open_for_writing(peer=1) - writer.write(self.fake_content) - out = yield finished_d - out = yield blob_file.delete() - - blob_file = BlobFile(self.blob_dir, self.fake_content_hash) - self.assertFalse(blob_file.verified) - - def test_delete_fail(self): - # deletes should fail if being written to - blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - writer, finished_d = blob_file.open_for_writing(peer=1) - with self.assertRaises(ValueError): - blob_file.delete() - writer.write(self.fake_content) - writer.close() - - # deletes should fail if being read and not closed - blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - self.assertTrue(blob_file.verified) - r = blob_file.open_for_reading() # must be set to variable otherwise it gets garbage collected - with self.assertRaises(ValueError): - blob_file.delete() - - @defer.inlineCallbacks - def test_too_much_write(self): - # writing too much data should result in failure - expected_length = 16 - content = b'0'*32 - blob_hash = random_lbry_hash() - blob_file = BlobFile(self.blob_dir, blob_hash, expected_length) - writer, finished_d = blob_file.open_for_writing(peer=1) - writer.write(content) - out = yield self.assertFailure(finished_d, InvalidDataError) - - @defer.inlineCallbacks - def test_bad_hash(self): - # test a write that should fail because its content's hash - # does not equal the blob_hash - length = 64 - content = b'0'*length - blob_hash = random_lbry_hash() - blob_file = BlobFile(self.blob_dir, blob_hash, length) - writer, finished_d = blob_file.open_for_writing(peer=1) - writer.write(content) - yield self.assertFailure(finished_d, InvalidDataError) - - @defer.inlineCallbacks - def test_close_on_incomplete_write(self): - # write all but 1 byte of data, - blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - writer, finished_d = blob_file.open_for_writing(peer=1) - writer.write(self.fake_content[:self.fake_content_len-1]) - writer.close() - yield self.assertFailure(finished_d, DownloadCanceledError) - - # writes after close will throw a IOError exception - with self.assertRaises(IOError): - writer.write(self.fake_content) - - # another call to close will do nothing - writer.close() - - # file should not exist, since we did not finish write - blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - out = blob_file_2.open_for_reading() - self.assertIsNone(out) - - @defer.inlineCallbacks - def test_multiple_writers(self): - # start first writer and write half way, and then start second writer and write everything - blob_hash = self.fake_content_hash - blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) - writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) - writer_1.write(self.fake_content[:self.fake_content_len//2]) - - writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) - writer_2.write(self.fake_content) - out_2 = yield finished_d_2 - out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError) - - self.assertIsInstance(out_2, BlobFile) - self.assertTrue(out_2.verified) - self.assertEqual(self.fake_content_len, out_2.get_length()) - - f = blob_file.open_for_reading() - c = f.read() - self.assertEqual(self.fake_content_len, len(c)) - self.assertEqual(bytearray(c), self.fake_content) - - @defer.inlineCallbacks - def test_multiple_writers_save_at_same_time(self): - blob_hash = self.fake_content_hash - blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) - writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) - writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) - - blob_file.save_verified_blob(writer_1) - # second write should fail to save - yield self.assertFailure(blob_file.save_verified_blob(writer_2), DownloadCanceledError) - - # schedule a close, just to leave the reactor clean - finished_d_1.addBoth(lambda x:None) - finished_d_2.addBoth(lambda x:None) - self.addCleanup(writer_1.close) - self.addCleanup(writer_2.close) diff --git a/tests/unit/core/test_Strategy.py b/tests/unit/core/test_Strategy.py deleted file mode 100644 index 0c1a62581..000000000 --- a/tests/unit/core/test_Strategy.py +++ /dev/null @@ -1,151 +0,0 @@ -import itertools -import random -from unittest import mock - -from twisted.trial import unittest - -from lbrynet.p2p.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager -from lbrynet.p2p.Strategy import BasicAvailabilityWeightedStrategy -from lbrynet.p2p.Offer import Offer -from lbrynet.conf import Config -from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker - -MAX_NEGOTIATION_TURNS = 10 -random.seed(12345) - - -def get_random_sample(list_to_sample): - result = list_to_sample[ - random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))] - if not result: - return get_random_sample(list_to_sample) - return result - - -def calculate_negotation_turns(client_base, host_base, host_is_generous=True, - client_is_generous=True): - blobs = [ - 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5' - '09138c99509a25423a4cef788d571dca7988e1dca69e6fa0', - 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe' - '66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', - '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f' - 'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd', - 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0' - 'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0', - '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c' - '243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c', - '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb' - '4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7', - '6d8017aba362e5c5d0046625a039513419810a0397d72831' - '8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07', - '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9' - '667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd', - '8c70d5e2f5c3a6085006198e5192d157a125d92e73787944' - '72007a61947992768926513fc10924785bdb1761df3c37e6', - 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835' - 'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' - ] - - host = mock.Mock() - host.host = "1.2.3.4" - client = mock.Mock() - client.host = "1.2.3.5" - - conf = Config() - - client_base_prm = BasePaymentRateManager(client_base, conf.min_info_rate) - client_prm = NegotiatedPaymentRateManager(client_base_prm, - DummyBlobAvailabilityTracker(), - client_is_generous) - host_base_prm = BasePaymentRateManager(host_base, conf.min_info_rate) - host_prm = NegotiatedPaymentRateManager(host_base_prm, - DummyBlobAvailabilityTracker(), - host_is_generous) - blobs_to_query = get_random_sample(blobs) - accepted = False - turns = 0 - while not accepted: - rate = client_prm.get_rate_blob_data(host, blobs_to_query) - offer = Offer(rate) - accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer) - turns += 1 - return turns - - -class AvailabilityWeightedStrategyTests(unittest.TestCase): - - def test_first_offer_is_zero_and_second_is_not_if_offer_not_accepted(self): - conf = Config() - strategy = BasicAvailabilityWeightedStrategy( - DummyBlobAvailabilityTracker(), conf.data_rate, conf.is_generous_host - ) - peer = "1.1.1.1" - - blobs = strategy.price_model.blob_tracker.availability.keys() - offer1 = strategy.make_offer(peer, blobs) - - offer2 = strategy.make_offer(peer, blobs) - - self.assertEqual(offer1.rate, 0.0) - self.assertNotEqual(offer2.rate, 0.0) - - def test_accept_zero_and_persist_if_accepted(self): - conf = Config() - host_strategy = BasicAvailabilityWeightedStrategy( - DummyBlobAvailabilityTracker(), conf.data_rate, conf.is_generous_host - ) - client_strategy = BasicAvailabilityWeightedStrategy( - DummyBlobAvailabilityTracker(), conf.data_rate, conf.is_generous_host - ) - - client = "1.1.1.1" - host = "1.1.1.2" - blobs = host_strategy.price_model.blob_tracker.availability.keys() - - offer = client_strategy.make_offer(host, blobs) - response1 = host_strategy.respond_to_offer(offer, client, blobs) - client_strategy.update_accepted_offers(host, response1) - - offer = client_strategy.make_offer(host, blobs) - response2 = host_strategy.respond_to_offer(offer, client, blobs) - client_strategy.update_accepted_offers(host, response2) - - self.assertFalse(response1.is_too_low) - self.assertTrue(response1.is_accepted) - self.assertEqual(response1.rate, 0.0) - - self.assertFalse(response2.is_too_low) - self.assertTrue(response2.is_accepted) - self.assertEqual(response2.rate, 0.0) - - def test_how_many_turns_before_accept_with_similar_rate_settings(self): - base_rates = [0.0001 * n for n in range(1, 10)] - for host_base, client_base in itertools.product(base_rates, base_rates): - turns = calculate_negotation_turns(host_base, - client_base, - client_is_generous=False, - host_is_generous=False) - self.assertGreater(MAX_NEGOTIATION_TURNS, turns) - - def test_generous_connects_in_one_turn(self): - base_rates = [0.0001 * n for n in range(1, 10)] - for host_base, client_base in itertools.product(base_rates, base_rates): - turns = calculate_negotation_turns(host_base, client_base) - self.assertEqual(1, turns) - - def test_how_many_turns_with_generous_client(self): - base_rates = [0.0001 * n for n in range(1, 10)] - for host_base, client_base in itertools.product(base_rates, base_rates): - turns = calculate_negotation_turns(host_base, - client_base, - host_is_generous=False) - self.assertGreater(MAX_NEGOTIATION_TURNS, turns) - - def test_how_many_turns_with_generous_host(self): - base_rates = [0.0001 * n for n in range(1, 10)] - for host_base, client_base in itertools.product(base_rates, base_rates): - turns = calculate_negotation_turns(host_base, - client_base, - client_is_generous=False) - self.assertGreater(MAX_NEGOTIATION_TURNS, turns) diff --git a/tests/unit/core/test_Wallet.py b/tests/unit/core/test_Wallet.py deleted file mode 100644 index d5fa5e193..000000000 --- a/tests/unit/core/test_Wallet.py +++ /dev/null @@ -1,241 +0,0 @@ -# pylint: skip-file -import tempfile - -from decimal import Decimal -from twisted.trial import unittest -from twisted.internet import defer -from lbrynet.p2p.Error import InsufficientFundsError -from lbrynet.schema.claim import ClaimDict - -test_metadata = { - 'license': 'NASA', - 'version': '_0_1_0', - 'description': 'test', - 'language': 'en', - 'author': 'test', - 'title': 'test', - 'nsfw': False, - 'thumbnail': 'test' -} - -test_claim_dict = { - 'version': '_0_0_1', - 'claimType': 'streamType', - 'stream': {'metadata': test_metadata, 'version': '_0_0_1', 'source': - {'source': '8655f713819344980a9a0d67b198344e2c462c90f813e86f' - '0c63789ab0868031f25c54d0bb31af6658e997e2041806eb', - 'sourceType': 'lbry_sd_hash', 'contentType': 'video/mp4', 'version': '_0_0_1'}, - }} - - -#class MocLbryumWallet(LBRYumWallet): -# def __init__(self, db_dir, max_usable_balance=3): -# LBRYumWallet.__init__(self, SQLiteStorage(db_dir), SimpleConfig( -# {"lbryum_path": db_dir, "wallet_path": os.path.join(db_dir, "testwallet")} -# )) -# self.db_dir = db_dir -# self.wallet_balance = Decimal(10.0) -# self.total_reserved_points = Decimal(0.0) -# self.queued_payments = defaultdict(Decimal) -# self.network = FakeNetwork() -# self._mock_max_usable_balance = max_usable_balance -# assert self.config.get_wallet_path() == os.path.join(self.db_dir, "testwallet") -# -# @defer.inlineCallbacks -# def setup(self, password=None, seed=None): -# yield self.storage.setup() -# seed = seed or "travel nowhere air position hill peace suffer parent beautiful rise " \ -# "blood power home crumble teach" -# storage = lbryum.wallet.WalletStorage(self.config.get_wallet_path()) -# self.wallet = lbryum.wallet.NewWallet(storage) -# self.wallet.add_seed(seed, password) -# self.wallet.create_master_keys(password) -# self.wallet.create_main_account() -# -# @defer.inlineCallbacks -# def stop(self): -# yield self.storage.stop() -# yield threads.deferToThread(shutil.rmtree, self.db_dir) -# -# def get_least_used_address(self, account=None, for_change=False, max_count=100): -# return defer.succeed(None) -# -# def get_name_claims(self): -# return threads.deferToThread(lambda: []) -# -# def _save_name_metadata(self, name, claim_outpoint, sd_hash): -# return defer.succeed(True) -# -# def get_max_usable_balance_for_claim(self, name): -# # The amount is returned on the basis of test_point_reservation_and_claim unittest -# # Also affects test_successful_send_name_claim -# return defer.succeed(self._mock_max_usable_balance) - - -class WalletTest(unittest.TestCase): - skip = "Needs to be ported to the new wallet." - - @defer.inlineCallbacks - def setUp(self): - user_dir = tempfile.mkdtemp() - self.wallet = MocLbryumWallet(user_dir) - yield self.wallet.setup() - self.assertEqual(self.wallet.get_balance(), Decimal(10)) - - def tearDown(self): - return self.wallet.stop() - - def test_failed_send_name_claim(self): - def not_enough_funds_send_name_claim(self, name, val, amount): - claim_out = {'success': False, 'reason': 'Not enough funds'} - return claim_out - - self.wallet._send_name_claim = not_enough_funds_send_name_claim - d = self.wallet.claim_name('test', 1, test_claim_dict) - self.assertFailure(d, Exception) - return d - - @defer.inlineCallbacks - def test_successful_send_name_claim(self): - expected_claim_out = { - "claim_id": "f43dc06256a69988bdbea09a58c80493ba15dcfa", - "fee": "0.00012", - "nout": 0, - "success": True, - "txid": "6f8180002ef4d21f5b09ca7d9648a54d213c666daf8639dc283e2fd47450269e", - "value": ClaimDict.load_dict(test_claim_dict).serialized.encode('hex'), - "claim_address": "", - "channel_claim_id": "", - "channel_name": "" - } - - def success_send_name_claim(self, name, val, amount, certificate_id=None, - claim_address=None, change_address=None): - return defer.succeed(expected_claim_out) - - self.wallet._send_name_claim = success_send_name_claim - claim_out = yield self.wallet.claim_name('test', 1, test_claim_dict) - self.assertNotIn('success', claim_out) - self.assertEqual(expected_claim_out['claim_id'], claim_out['claim_id']) - self.assertEqual(expected_claim_out['fee'], claim_out['fee']) - self.assertEqual(expected_claim_out['nout'], claim_out['nout']) - self.assertEqual(expected_claim_out['txid'], claim_out['txid']) - self.assertEqual(expected_claim_out['value'], claim_out['value']) - - @defer.inlineCallbacks - def test_failed_support(self): - # wallet.support_claim will check the balance before calling _support_claim - try: - yield self.wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 1000) - except InsufficientFundsError: - pass - - def test_succesful_support(self): - expected_support_out = { - "fee": "0.000129", - "nout": 0, - "success": True, - "txid": "11030a76521e5f552ca87ad70765d0cc52e6ea4c0dc0063335e6cf2a9a85085f" - } - - expected_result = { - "fee": 0.000129, - "nout": 0, - "txid": "11030a76521e5f552ca87ad70765d0cc52e6ea4c0dc0063335e6cf2a9a85085f" - } - - def check_out(claim_out): - self.assertDictEqual(expected_result, claim_out) - - def success_support_claim(name, val, amount): - return defer.succeed(expected_support_out) - - self.wallet._support_claim = success_support_claim - d = self.wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 1) - d.addCallback(lambda claim_out: check_out(claim_out)) - return d - - @defer.inlineCallbacks - def test_failed_abandon(self): - try: - yield self.wallet.abandon_claim("f43dc06256a69988bdbea09a58c80493ba15dcfa", None, None) - raise Exception("test failed") - except Exception as err: - self.assertSubstring("claim not found", err.message) - - @defer.inlineCallbacks - def test_successful_abandon(self): - expected_abandon_out = { - "fee": "0.000096", - "success": True, - "txid": "0578c161ad8d36a7580c557d7444f967ea7f988e194c20d0e3c42c3cabf110dd" - } - - expected_abandon_result = { - "fee": 0.000096, - "txid": "0578c161ad8d36a7580c557d7444f967ea7f988e194c20d0e3c42c3cabf110dd" - } - - def success_abandon_claim(claim_outpoint, txid, nout): - return defer.succeed(expected_abandon_out) - - self.wallet._abandon_claim = success_abandon_claim - claim_out = yield self.wallet.abandon_claim("f43dc06256a69988bdbea09a58c80493ba15dcfa", None, None) - self.assertDictEqual(expected_abandon_result, claim_out) - - @defer.inlineCallbacks - def test_point_reservation_and_balance(self): - # check that point reservations and cancellation changes the balance - # properly - def update_balance(): - return defer.succeed(5) - - self.wallet._update_balance = update_balance - yield self.wallet.update_balance() - self.assertEqual(5, self.wallet.get_balance()) - - # test point reservation - yield self.wallet.reserve_points('testid', 2) - self.assertEqual(3, self.wallet.get_balance()) - self.assertEqual(2, self.wallet.total_reserved_points) - - # test reserved points cancellation - yield self.wallet.cancel_point_reservation(ReservedPoints('testid', 2)) - self.assertEqual(5, self.wallet.get_balance()) - self.assertEqual(0, self.wallet.total_reserved_points) - - # test point sending - reserve_points = yield self.wallet.reserve_points('testid', 2) - yield self.wallet.send_points_to_address(reserve_points, 1) - self.assertEqual(3, self.wallet.get_balance()) - # test failed point reservation - out = yield self.wallet.reserve_points('testid', 4) - self.assertIsNone(out) - - def test_point_reservation_and_claim(self): - # check that claims take into consideration point reservations - def update_balance(): - return defer.succeed(5) - - self.wallet._update_balance = update_balance - d = self.wallet.update_balance() - d.addCallback(lambda _: self.assertEqual(5, self.wallet.get_balance())) - d.addCallback(lambda _: self.wallet.reserve_points('testid', 2)) - d.addCallback(lambda _: self.wallet.claim_name('test', 4, test_claim_dict)) - self.assertFailure(d, InsufficientFundsError) - return d - - def test_point_reservation_and_support(self): - # check that supports take into consideration point reservations - def update_balance(): - return defer.succeed(5) - - self.wallet._update_balance = update_balance - d = self.wallet.update_balance() - d.addCallback(lambda _: self.assertEqual(5, self.wallet.get_balance())) - d.addCallback(lambda _: self.wallet.reserve_points('testid', 2)) - d.addCallback(lambda _: self.wallet.support_claim( - 'test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 4)) - self.assertFailure(d, InsufficientFundsError) - return d - diff --git a/tests/unit/cryptstream/__init__.py b/tests/unit/cryptstream/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/unit/cryptstream/test_cryptblob.py b/tests/unit/cryptstream/test_cryptblob.py deleted file mode 100644 index 083180b8b..000000000 --- a/tests/unit/cryptstream/test_cryptblob.py +++ /dev/null @@ -1,78 +0,0 @@ -from twisted.trial import unittest -from twisted.internet import defer -from lbrynet.blob import CryptBlob -from lbrynet.blob.blob_file import MAX_BLOB_SIZE - -from cryptography.hazmat.primitives.ciphers.algorithms import AES -import random -import string -from io import BytesIO -import os - -AES_BLOCK_SIZE_BYTES = int(AES.block_size / 8) - -class MocBlob: - def __init__(self): - self.data = b'' - - def read(self, write_func): - data = self.data - write_func(data) - return defer.succeed(True) - - def open_for_reading(self): - return BytesIO(self.data) - - def write(self, data): - if not isinstance(data, bytes): - data = data.encode() - self.data += data - - def close(self): - return defer.succeed(True) - - -def random_string(length): - return ''.join(random.choice(string.ascii_lowercase) for i in range(length)) - - -class TestCryptBlob(unittest.TestCase): - - @defer.inlineCallbacks - def _test_encrypt_decrypt(self, size_of_data): - # max blob size is 2*2**20 -1 ( -1 due to required padding in the end ) - blob = MocBlob() - blob_num = 0 - key = os.urandom(AES_BLOCK_SIZE_BYTES) - iv = os.urandom(AES_BLOCK_SIZE_BYTES) - maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob) - write_size = size_of_data - string_to_encrypt = random_string(size_of_data).encode() - - # encrypt string - done, num_bytes = maker.write(string_to_encrypt) - yield maker.close() - self.assertEqual(size_of_data, num_bytes) - expected_encrypted_blob_size = int((size_of_data / AES_BLOCK_SIZE_BYTES) + 1) * AES_BLOCK_SIZE_BYTES - self.assertEqual(expected_encrypted_blob_size, len(blob.data)) - - if size_of_data < MAX_BLOB_SIZE-1: - self.assertFalse(done) - else: - self.assertTrue(done) - self.data_buf = b'' - - def write_func(data): - self.data_buf += data - - # decrypt string - decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data) - yield decryptor.decrypt(write_func) - self.assertEqual(self.data_buf, string_to_encrypt) - - @defer.inlineCallbacks - def test_encrypt_decrypt(self): - yield self._test_encrypt_decrypt(1) - yield self._test_encrypt_decrypt(16*2) - yield self._test_encrypt_decrypt(2000) - yield self._test_encrypt_decrypt(2*2**20-1) diff --git a/tests/unit/lbryfilemanager/__init__.py b/tests/unit/lbryfilemanager/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py b/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py deleted file mode 100644 index 4e3e579d3..000000000 --- a/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py +++ /dev/null @@ -1,115 +0,0 @@ -import json -from twisted.trial import unittest -from twisted.internet import defer - -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from lbrynet.conf import Config -from lbrynet.extras.compat import f2d -from lbrynet.extras.daemon.PeerManager import PeerManager -from lbrynet.p2p.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader -from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier -from lbrynet.p2p.BlobManager import DiskBlobManager -from lbrynet.p2p.RateLimiter import DummyRateLimiter -from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager -from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.blob import EncryptedFileCreator -from lbrynet.blob.EncryptedFileManager import EncryptedFileManager -from lbrynet.p2p.StreamDescriptor import JSONBytesEncoder -from tests import mocks -from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir - - -FakeNode = mocks.Node -FakeWallet = mocks.Wallet -FakePeerFinder = mocks.PeerFinder -FakeAnnouncer = mocks.Announcer -GenFile = mocks.GenFile -test_create_stream_sd_file = mocks.create_stream_sd_file -DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker - -MB = 2**20 - - -def iv_generator(): - while True: - yield b'3' * (AES.block_size // 8) - - -class CreateEncryptedFileTest(unittest.TestCase): - timeout = 5 - - def setUp(self): - self.tmp_db_dir, self.tmp_blob_dir = mk_db_and_blob_dir() - conf = Config(data_dir=self.tmp_blob_dir) - self.wallet = FakeWallet() - self.peer_manager = PeerManager() - self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2) - self.rate_limiter = DummyRateLimiter() - self.sd_identifier = StreamDescriptorIdentifier() - self.storage = SQLiteStorage(conf, ':memory:') - self.blob_manager = DiskBlobManager(self.tmp_blob_dir, self.storage) - self.prm = OnlyFreePaymentsManager() - self.lbry_file_manager = EncryptedFileManager( - conf, self.peer_finder, self.rate_limiter, self.blob_manager, - self.wallet, self.prm, self.storage, self.sd_identifier - ) - d = f2d(self.storage.open()) - d.addCallback(lambda _: f2d(self.lbry_file_manager.setup())) - return d - - @defer.inlineCallbacks - def tearDown(self): - yield self.lbry_file_manager.stop() - yield f2d(self.blob_manager.stop()) - yield f2d(self.storage.close()) - rm_db_and_blob_dir(self.tmp_db_dir, self.tmp_blob_dir) - - @defer.inlineCallbacks - def create_file(self, filename): - handle = mocks.GenFile(3*MB, b'1') - key = b'2' * (AES.block_size // 8) - out = yield EncryptedFileCreator.create_lbry_file( - self.blob_manager, self.storage, self.prm, self.lbry_file_manager, filename, handle, key, iv_generator() - ) - defer.returnValue(out) - - @defer.inlineCallbacks - def test_can_create_file(self): - expected_stream_hash = "41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c35" \ - "7b1acb742dd83151fb66393a7709e9f346260a4f4db6de10c25" - expected_sd_hash = "40c485432daec586c1a2d247e6c08d137640a5af6e81f3f652" \ - "3e62e81a2e8945b0db7c94f1852e70e371d917b994352c" - filename = 'test.file' - lbry_file = yield self.create_file(filename) - sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)) - - # read the sd blob file - sd_blob = self.blob_manager.blobs[sd_hash] - sd_reader = BlobStreamDescriptorReader(sd_blob) - sd_file_info = yield sd_reader.get_info() - - # this comes from the database, the blobs returned are sorted - sd_info = yield f2d(get_sd_info(self.storage, lbry_file.stream_hash, include_blobs=True)) - self.maxDiff = None - unicode_sd_info = json.loads(json.dumps(sd_info, sort_keys=True, cls=JSONBytesEncoder)) - self.assertDictEqual(unicode_sd_info, sd_file_info) - self.assertEqual(sd_info['stream_hash'], expected_stream_hash) - self.assertEqual(len(sd_info['blobs']), 3) - self.assertNotEqual(sd_info['blobs'][0]['length'], 0) - self.assertNotEqual(sd_info['blobs'][1]['length'], 0) - self.assertEqual(sd_info['blobs'][2]['length'], 0) - self.assertEqual(expected_stream_hash, lbry_file.stream_hash) - self.assertEqual(sd_hash, lbry_file.sd_hash) - self.assertEqual(sd_hash, expected_sd_hash) - blobs = yield self.blob_manager.get_all_verified_blobs() - self.assertEqual(3, len(blobs)) - num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs() - self.assertEqual(2, num_should_announce_blobs) - - @defer.inlineCallbacks - def test_can_create_file_with_unicode_filename(self): - expected_stream_hash = ('d1da4258f3ce12edb91d7e8e160d091d3ab1432c2e55a6352dce0' - '2fd5adb86fe144e93e110075b5865fff8617776c6c0') - filename = '☃.file' - lbry_file = yield self.create_file(filename) - self.assertEqual(expected_stream_hash, lbry_file.stream_hash) diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py deleted file mode 100644 index b0151f3f2..000000000 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ /dev/null @@ -1,185 +0,0 @@ -import types -from unittest import mock -from twisted.trial import unittest -from twisted.internet import defer, task - - -from lbrynet.p2p import PaymentRateManager -from lbrynet.p2p.Error import DownloadDataTimeout, DownloadSDTimeout -from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier -from lbrynet.p2p.BlobManager import DiskBlobManager -from lbrynet.p2p.RateLimiter import DummyRateLimiter -from lbrynet.p2p.client.DownloadManager import DownloadManager -from lbrynet.extras.daemon import Downloader -from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager -from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.extras.daemon.PeerFinder import DummyPeerFinder -from lbrynet.blob.EncryptedFileStatusReport import EncryptedFileStatusReport -from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader -from lbrynet.extras.wallet import LbryWalletManager -from lbrynet.conf import Config - - -class MocDownloader: - def __init__(self): - self.finish_deferred = defer.Deferred(None) - self.stop_called = False - self.file_name = 'test' - self.name = 'test' - self.num_completed = 0 - self.num_known = 1 - self.running_status = ManagedEncryptedFileDownloader.STATUS_RUNNING - - @defer.inlineCallbacks - def status(self): - out = yield EncryptedFileStatusReport( - self.name, self.num_completed, self.num_known, self.running_status) - defer.returnValue(out) - - def start(self): - return self.finish_deferred - - def stop(self): - self.stop_called = True - self.finish_deferred.callback(True) - - -def moc_initialize(self, stream_info): - self.sd_hash = "d5169241150022f996fa7cd6a9a1c421937276a3275eb912" \ - "790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b" - return None - - -def moc_download_sd_blob(self): - return None - - -def moc_download(self, sd_blob, name, txid, nout, key_fee, file_name): - self.pay_key_fee(key_fee, name) - self.downloader = MocDownloader() - self.downloader.start() - - -def moc_pay_key_fee(d): - def _moc_pay_key_fee(self, key_fee, name): - d.callback(True) - return _moc_pay_key_fee - - -class GetStreamTests(unittest.TestCase): - - def init_getstream_with_mocs(self): - conf = Config() - - sd_identifier = mock.Mock(spec=StreamDescriptorIdentifier) - wallet = mock.Mock(spec=LbryWalletManager) - prm = mock.Mock(spec=PaymentRateManager.NegotiatedPaymentRateManager) - exchange_rate_manager = mock.Mock(spec=ExchangeRateManager) - storage = mock.Mock(spec=SQLiteStorage) - peer_finder = DummyPeerFinder() - blob_manager = mock.Mock(spec=DiskBlobManager) - max_key_fee = {'currency': "LBC", 'amount': 10, 'address': ''} - disable_max_key_fee = False - data_rate = {'currency': "LBC", 'amount': 0, 'address': ''} - getstream = Downloader.GetStream( - conf, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, DummyRateLimiter(), prm, - storage, max_key_fee, disable_max_key_fee, timeout=3, data_rate=data_rate - ) - getstream.download_manager = mock.Mock(spec=DownloadManager) - return getstream - - @defer.inlineCallbacks - def test_init_exception(self): - """ - test that if initialization would fail, by giving it invalid - stream_info, that an exception is thrown - """ - - getstream = self.init_getstream_with_mocs() - name = 'test' - stream_info = None - - with self.assertRaises(AttributeError): - yield getstream.start(stream_info, name, "deadbeef" * 12, 0) - - @defer.inlineCallbacks - def test_sd_blob_download_timeout(self): - """ - test that if download_sd_blob fails due to timeout, - DownloadTimeoutError is raised - """ - def download_sd_blob(self): - raise DownloadSDTimeout(self) - - called_pay_fee = defer.Deferred() - - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - getstream._download_sd_blob = types.MethodType(download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream) - name = 'test' - stream_info = None - with self.assertRaises(DownloadSDTimeout): - yield getstream.start(stream_info, name, "deadbeef" * 12, 0) - self.assertFalse(called_pay_fee.called) - - @defer.inlineCallbacks - def test_timeout(self): - """ - test that timeout (set to 3 here) exception is raised - when download times out while downloading first blob, and key fee is paid - """ - called_pay_fee = defer.Deferred() - - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream) - name = 'test' - stream_info = None - start = getstream.start(stream_info, name, "deadbeef" * 12, 0) - with self.assertRaises(DownloadDataTimeout): - yield start - - @defer.inlineCallbacks - def test_finish_one_blob(self): - """ - test that if we have 1 completed blob, start() returns - and key fee is paid - """ - called_pay_fee = defer.Deferred() - - getstream = self.init_getstream_with_mocs() - getstream._initialize = types.MethodType(moc_initialize, getstream) - - getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream) - name = 'test' - stream_info = None - self.assertFalse(getstream.wrote_data) - getstream.data_downloading_deferred.callback(None) - yield getstream.start(stream_info, name, "deadbeef" * 12, 0) - self.assertTrue(getstream.wrote_data) - - # self.assertTrue(getstream.pay_key_fee_called) - - # @defer.inlineCallbacks - # def test_finish_stopped_downloader(self): - # """ - # test that if we have a stopped downloader, beforfe a blob is downloaded, - # start() returns - # """ - # getstream = self.init_getstream_with_mocs() - # getstream._initialize = types.MethodType(moc_initialize, getstream) - # getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) - # getstream._download = types.MethodType(moc_download, getstream) - # name='test' - # stream_info = None - # start = getstream.start(stream_info,name) - # - # getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED - # self.clock.advance(1) - # downloader, f_deferred = yield start