diff --git a/lbrynet/lbryfile/__init__.py b/lbrynet/lbryfile/__init__.py index e69de29bb..8cd10066a 100644 --- a/lbrynet/lbryfile/__init__.py +++ b/lbrynet/lbryfile/__init__.py @@ -0,0 +1,2 @@ +from lbrynet.lbryfile.StreamDescriptor import get_sd_info +from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob diff --git a/tests/lbrynet/__init__.py b/tests/functional/__init__.py similarity index 100% rename from tests/lbrynet/__init__.py rename to tests/functional/__init__.py diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py new file mode 100644 index 000000000..d77557108 --- /dev/null +++ b/tests/functional/test_reflector.py @@ -0,0 +1,189 @@ +import os +import shutil + +from twisted.internet import defer, threads, error +from twisted.trial import unittest + +from lbrynet import reflector +from lbrynet import conf +from lbrynet import lbryfile +from lbrynet.core import BlobManager +from lbrynet.core import PeerManager +from lbrynet.core import RateLimiter +from lbrynet.core import Session +from lbrynet.core import StreamDescriptor +from lbrynet.lbryfile import LBRYFileMetadataManager +from lbrynet.lbryfile.client import LBRYFileOptions +from lbrynet.lbryfilemanager import LBRYFileCreator +from lbrynet.lbryfilemanager import LBRYFileManager + +from tests import mocks + + +class TestReflector(unittest.TestCase): + def setUp(self): + self.session = None + self.stream_info_manager = None + self.lbry_file_manager = None + self.server_blob_manager = None + self.reflector_port = None + self.addCleanup(self.take_down_env) + + def take_down_env(self): + d = defer.succeed(True) + if self.lbry_file_manager is not None: + d.addCallback(lambda _: self.lbry_file_manager.stop()) + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) + if self.stream_info_manager is not None: + d.addCallback(lambda _: self.stream_info_manager.stop()) + if self.server_blob_manager is not None: + d.addCallback(lambda _: self.server_blob_manager.stop()) + if self.reflector_port is not None: + d.addCallback(lambda _: self.reflector_port.stopListening()) + + def delete_test_env(): + shutil.rmtree('client') + + d.addCallback(lambda _: threads.deferToThread(delete_test_env)) + return d + + def test_reflector(self): + wallet = mocks.Wallet() + peer_manager = PeerManager.PeerManager() + peer_finder = mocks.PeerFinder(5553, peer_manager, 2) + hash_announcer = mocks.Announcer() + rate_limiter = RateLimiter.DummyRateLimiter() + sd_identifier = StreamDescriptor.StreamDescriptorIdentifier() + + self.expected_blobs = [ + ( + 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b' + '441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', + 2097152 + ), + ( + 'f4067522c1b49432a2a679512e3917144317caa1abba0c04' + '1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', + 2097152 + ), + ( + '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1' + 'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', + 1015056 + ), + ] + + db_dir = "client" + os.mkdir(db_dir) + + self.session = Session.LBRYSession( + conf.MIN_BLOB_DATA_PAYMENT_RATE, + db_dir=db_dir, + lbryid="abcd", + peer_finder=peer_finder, + hash_announcer=hash_announcer, + blob_dir=None, + peer_port=5553, + use_upnp=False, + rate_limiter=rate_limiter, + wallet=wallet + ) + + self.stream_info_manager = LBRYFileMetadataManager.TempLBRYFileMetadataManager() + + self.lbry_file_manager = LBRYFileManager.LBRYFileManager( + self.session, self.stream_info_manager, sd_identifier) + + self.server_blob_manager = BlobManager.TempBlobManager(hash_announcer) + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: LBRYFileOptions.add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: self.server_blob_manager.setup()) + + def verify_equal(sd_info): + self.assertEqual(sd_info, mocks.create_stream_sd_file) + + def save_sd_blob_hash(sd_hash): + self.expected_blobs.append((sd_hash, 923)) + + def verify_stream_descriptor_file(stream_hash): + d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) + d.addCallback(verify_equal) + d.addCallback( + lambda _: lbryfile.publish_sd_blob( + self.lbry_file_manager.stream_info_manager, + self.session.blob_manager, stream_hash + ) + ) + d.addCallback(save_sd_blob_hash) + d.addCallback(lambda _: stream_hash) + return d + + def iv_generator(): + iv = 0 + while 1: + iv += 1 + yield "%016d" % iv + + def create_stream(): + test_file = mocks.GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)])) + d = LBRYFileCreator.create_lbry_file( + self.session, + self.lbry_file_manager, + "test_file", + test_file, + key="0123456701234567", + iv_generator=iv_generator() + ) + return d + + def start_server(): + server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager) + from twisted.internet import reactor + port = 8943 + while self.reflector_port is None: + try: + self.reflector_port = reactor.listenTCP(port, server_factory) + except error.CannotListenError: + port += 1 + return defer.succeed(port) + + def send_to_server(port, stream_hash): + factory = reflector.ClientFactory( + self.session.blob_manager, + self.stream_info_manager, + stream_hash + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def upload_to_reflector(stream_hash): + d = start_server() + d.addCallback(lambda port: send_to_server(port, stream_hash)) + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + d.addCallback(lambda _: create_stream()) + d.addCallback(verify_stream_descriptor_file) + d.addCallback(upload_to_reflector) + return d diff --git a/tests/mocks.py b/tests/mocks.py new file mode 100644 index 000000000..6ea183195 --- /dev/null +++ b/tests/mocks.py @@ -0,0 +1,164 @@ +import io + +from Crypto.PublicKey import RSA +from twisted.internet import defer, threads, task, error + +from lbrynet.core import PTCWallet + + +class Node(object): + def __init__(self, *args, **kwargs): + pass + + def joinNetwork(self, *args): + pass + + def stop(self): + pass + + +class Wallet(object): + def __init__(self): + self.private_key = RSA.generate(1024) + self.encoded_public_key = self.private_key.publickey().exportKey() + + def start(self): + return defer.succeed(True) + + def stop(self): + return defer.succeed(True) + + def get_info_exchanger(self): + return PTCWallet.PointTraderKeyExchanger(self) + + def get_wallet_info_query_handler_factory(self): + return PTCWallet.PointTraderKeyQueryHandlerFactory(self) + + def reserve_points(self, *args): + return True + + def cancel_point_reservation(self, *args): + pass + + def send_points(self, *args): + return defer.succeed(True) + + def add_expected_payment(self, *args): + pass + + def get_balance(self): + return defer.succeed(1000) + + def set_public_key_for_peer(self, peer, public_key): + pass + + +class PeerFinder(object): + def __init__(self, start_port, peer_manager, num_peers): + self.start_port = start_port + self.peer_manager = peer_manager + self.num_peers = num_peers + self.count = 0 + + def find_peers_for_blob(self, *args): + peer_port = self.start_port + self.count + self.count += 1 + if self.count >= self.num_peers: + self.count = 0 + return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)]) + + def run_manage_loop(self): + pass + + def stop(self): + pass + + +class Announcer(object): + def __init__(self, *args): + pass + + def add_supplier(self, supplier): + pass + + def immediate_announce(self, *args): + pass + + def run_manage_loop(self): + pass + + def stop(self): + pass + + +class GenFile(io.RawIOBase): + def __init__(self, size, pattern): + io.RawIOBase.__init__(self) + self.size = size + self.pattern = pattern + self.read_so_far = 0 + self.buff = b'' + self.last_offset = 0 + + def readable(self): + return True + + def writable(self): + return False + + def read(self, n=-1): + if n > -1: + bytes_to_read = min(n, self.size - self.read_so_far) + else: + bytes_to_read = self.size - self.read_so_far + output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] + bytes_to_read -= len(output) + while bytes_to_read > 0: + self.buff = self._generate_chunk() + new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] + bytes_to_read -= len(new_output) + output += new_output + self.read_so_far += len(output) + return output + + def readall(self): + return self.read() + + def _generate_chunk(self, n=2**10): + output = self.pattern[self.last_offset:self.last_offset + n] + n_left = n - len(output) + whole_patterns = n_left / len(self.pattern) + output += self.pattern * whole_patterns + self.last_offset = n - len(output) + output += self.pattern[:self.last_offset] + return output + + +create_stream_sd_file = { + 'stream_name': '746573745f66696c65', + 'blobs': [ + { + 'length': 2097152, + 'blob_num': 0, + 'blob_hash': 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', + 'iv': '30303030303030303030303030303031' + }, + { + 'length': 2097152, + 'blob_num': 1, + 'blob_hash': 'f4067522c1b49432a2a679512e3917144317caa1abba0c041e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', + 'iv': '30303030303030303030303030303032' + }, + { + 'length': 1015056, + 'blob_num': 2, + 'blob_hash': '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', + 'iv': '30303030303030303030303030303033' + }, + {'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'} + ], + 'stream_type': 'lbryfile', + 'key': '30313233343536373031323334353637', + 'suggested_file_name': '746573745f66696c65', + 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f' +} diff --git a/tests/lbrynet/core/__init__.py b/tests/unit/__init__.py similarity index 100% rename from tests/lbrynet/core/__init__.py rename to tests/unit/__init__.py diff --git a/tests/lbrynet/core/server/__init__.py b/tests/unit/core/__init__.py similarity index 100% rename from tests/lbrynet/core/server/__init__.py rename to tests/unit/core/__init__.py diff --git a/tests/lbrynet/lbrynet_daemon/__init__.py b/tests/unit/core/server/__init__.py similarity index 100% rename from tests/lbrynet/lbrynet_daemon/__init__.py rename to tests/unit/core/server/__init__.py diff --git a/tests/lbrynet/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py similarity index 100% rename from tests/lbrynet/core/server/test_BlobRequestHandler.py rename to tests/unit/core/server/test_BlobRequestHandler.py diff --git a/tests/lbrynet/core/test_LBRYExchangeRateManager.py b/tests/unit/core/test_LBRYExchangeRateManager.py similarity index 100% rename from tests/lbrynet/core/test_LBRYExchangeRateManager.py rename to tests/unit/core/test_LBRYExchangeRateManager.py diff --git a/tests/lbrynet/core/test_LBRYMetadata.py b/tests/unit/core/test_LBRYMetadata.py similarity index 100% rename from tests/lbrynet/core/test_LBRYMetadata.py rename to tests/unit/core/test_LBRYMetadata.py diff --git a/tests/lbrynet/core/test_utils.py b/tests/unit/core/test_utils.py similarity index 100% rename from tests/lbrynet/core/test_utils.py rename to tests/unit/core/test_utils.py diff --git a/tests/unit/lbrynet_daemon/__init__.py b/tests/unit/lbrynet_daemon/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lbrynet/lbrynet_daemon/test_LBRYDaemon.py b/tests/unit/lbrynet_daemon/test_LBRYDaemon.py similarity index 100% rename from tests/lbrynet/lbrynet_daemon/test_LBRYDaemon.py rename to tests/unit/lbrynet_daemon/test_LBRYDaemon.py