update functional tests

This commit is contained in:
Jack Robison 2018-02-12 14:19:39 -05:00
parent 49507b98f4
commit b7d0191e5d
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 105 additions and 210 deletions

View file

@ -10,7 +10,6 @@ import unittest
from Crypto import Random from Crypto import Random
from Crypto.Hash import MD5 from Crypto.Hash import MD5
from lbrynet import conf from lbrynet import conf
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
@ -20,7 +19,7 @@ from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbry_file.StreamDescriptor import get_sd_info from lbrynet.core.StreamDescriptor import get_sd_info
from twisted.internet import defer, threads, task from twisted.internet import defer, threads, task
from twisted.trial.unittest import TestCase from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -119,9 +118,7 @@ class LbryUploader(object):
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
self.lbry_file_manager = EncryptedFileManager(
self.session, stream_info_manager, self.sd_identifier)
if self.ul_rate_limit is not None: if self.ul_rate_limit is not None:
self.session.rate_limiter.set_ul_limit(self.ul_rate_limit) self.session.rate_limiter.set_ul_limit(self.ul_rate_limit)
reactor.callLater(1, self.start_all) reactor.callLater(1, self.start_all)
@ -134,7 +131,6 @@ class LbryUploader(object):
d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.start_server()) d.addCallback(lambda _: self.start_server())
d.addCallback(lambda _: self.create_stream()) d.addCallback(lambda _: self.create_stream())
d.addCallback(self.create_stream_descriptor)
d.addCallback(self.put_sd_hash_on_queue) d.addCallback(self.put_sd_hash_on_queue)
def print_error(err): def print_error(err):
@ -180,16 +176,11 @@ class LbryUploader(object):
if self.kill_event.is_set(): if self.kill_event.is_set():
self.kill_server() self.kill_server()
@defer.inlineCallbacks
def create_stream(self): def create_stream(self):
test_file = GenFile(self.file_size, b''.join([chr(i) for i in xrange(0, 64, 6)])) test_file = GenFile(self.file_size, b''.join([chr(i) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file) lbry_file = yield create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file)
return d defer.returnValue(lbry_file.sd_hash)
def create_stream_descriptor(self, stream_hash):
descriptor_writer = BlobStreamDescriptorWriter(self.session.blob_manager)
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(descriptor_writer.create_descriptor)
return d
def put_sd_hash_on_queue(self, sd_hash): def put_sd_hash_on_queue(self, sd_hash):
self.sd_hash_queue.put(sd_hash) self.sd_hash_queue.put(sd_hash)
@ -226,26 +217,20 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1") external_ip="127.0.0.1")
stream_info_manager = DBEncryptedFileMetadataManager(db_dir) lbry_file_manager = EncryptedFileManager(session, sd_identifier)
lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier)
if ul_rate_limit is not None: if ul_rate_limit is not None:
session.rate_limiter.set_ul_limit(ul_rate_limit) session.rate_limiter.set_ul_limit(ul_rate_limit)
def make_downloader(metadata, prm): def make_downloader(metadata, prm, download_directory):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories factories = metadata.factories
chosen_options = [o.default_value for o in return factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, download_directory)
options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(): def download_file():
prm = session.payment_rate_manager prm = session.payment_rate_manager
d = download_sd_blob(session, sd_hash, prm) d = download_sd_blob(session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm) d.addCallback(make_downloader, prm, db_dir)
d.addCallback(lambda downloader: downloader.start()) d.addCallback(lambda downloader: downloader.start())
return d return d
@ -413,7 +398,6 @@ class TestTransfer(TestCase):
mocks.mock_conf_settings(self) mocks.mock_conf_settings(self)
self.server_processes = [] self.server_processes = []
self.session = None self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None self.lbry_file_manager = None
self.is_generous = True self.is_generous = True
self.addCleanup(self.take_down_env) self.addCleanup(self.take_down_env)
@ -425,8 +409,6 @@ class TestTransfer(TestCase):
d.addCallback(lambda _: self.lbry_file_manager.stop()) d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None: if self.session is not None:
d.addCallback(lambda _: self.session.shut_down()) d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
def delete_test_env(): def delete_test_env():
dirs = ['server', 'server1', 'server2', 'client'] dirs = ['server', 'server1', 'server2', 'client']
@ -519,19 +501,12 @@ class TestTransfer(TestCase):
blob_tracker_class=DummyBlobAvailabilityTracker, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, sd_identifier)
def make_downloader(metadata, prm): def make_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories factories = metadata.factories
chosen_options = [ return factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, db_dir)
o.default_value for o in options.get_downloader_options(info_validator, prm)
]
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash): def download_file(sd_hash):
prm = self.session.payment_rate_manager prm = self.session.payment_rate_manager
@ -542,7 +517,7 @@ class TestTransfer(TestCase):
return d return d
def check_md5_sum(): def check_md5_sum():
f = open('test_file') f = open(os.path.join(db_dir, 'test_file'))
hashsum = MD5.new() hashsum = MD5.new()
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
@ -696,25 +671,14 @@ class TestTransfer(TestCase):
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1") external_ip="127.0.0.1")
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier)
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager,
sd_identifier)
@defer.inlineCallbacks @defer.inlineCallbacks
def make_downloader(metadata, prm): def make_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories factories = metadata.factories
chosen_options = [ downloader = yield factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, db_dir)
o.default_value for o in options.get_downloader_options(info_validator, prm)
]
downloader = yield factories[0].make_downloader(metadata, chosen_options, prm)
defer.returnValue(downloader) defer.returnValue(downloader)
def append_downloader(downloader):
downloaders.append(downloader)
return downloader
@defer.inlineCallbacks @defer.inlineCallbacks
def download_file(sd_hash): def download_file(sd_hash):
prm = self.session.payment_rate_manager prm = self.session.payment_rate_manager
@ -722,28 +686,21 @@ class TestTransfer(TestCase):
metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob) metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob)
downloader = yield make_downloader(metadata, prm) downloader = yield make_downloader(metadata, prm)
downloaders.append(downloader) downloaders.append(downloader)
finished_value = yield downloader.start() yield downloader.start()
defer.returnValue(finished_value) defer.returnValue(downloader)
def check_md5_sum(): def check_md5_sum():
f = open('test_file') f = open(os.path.join(db_dir, 'test_file'))
hashsum = MD5.new() hashsum = MD5.new()
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
def delete_lbry_file(): def delete_lbry_file(downloader):
logging.debug("deleting the file") logging.debug("deleting the file")
d = self.lbry_file_manager.delete_lbry_file(downloaders[0]) return self.lbry_file_manager.delete_lbry_file(downloader)
d.addCallback(lambda _: self.lbry_file_manager.get_count_for_stream_hash(
downloaders[0].stream_hash))
d.addCallback(
lambda c: self.stream_info_manager.delete_stream(
downloaders[1].stream_hash) if c == 0 else True)
return d
def check_lbry_file(): def check_lbry_file(downloader):
d = downloaders[1].status() d = downloader.status()
d.addCallback(lambda _: downloaders[1].status())
def check_status_report(status_report): def check_status_report(status_report):
self.assertEqual(status_report.num_known, status_report.num_completed) self.assertEqual(status_report.num_known, status_report.num_completed)
@ -754,17 +711,20 @@ class TestTransfer(TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def start_transfer(sd_hash): def start_transfer(sd_hash):
# download a file, delete it, and download it again
logging.debug("Starting the transfer") logging.debug("Starting the transfer")
yield self.session.setup() yield self.session.setup()
yield self.stream_info_manager.setup()
yield add_lbry_file_to_sd_identifier(sd_identifier) yield add_lbry_file_to_sd_identifier(sd_identifier)
yield self.lbry_file_manager.setup() yield self.lbry_file_manager.setup()
yield download_file(sd_hash) downloader = yield download_file(sd_hash)
yield check_md5_sum() yield check_md5_sum()
yield download_file(sd_hash) yield check_lbry_file(downloader)
yield delete_lbry_file(downloader)
yield check_lbry_file() downloader = yield download_file(sd_hash)
yield delete_lbry_file() yield check_lbry_file(downloader)
yield check_md5_sum()
yield delete_lbry_file(downloader)
def stop(arg): def stop(arg):
if isinstance(arg, Failure): if isinstance(arg, Failure):
@ -819,10 +779,8 @@ class TestTransfer(TestCase):
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1") external_ip="127.0.0.1")
self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager( self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier) self.session, sd_identifier)
def start_additional_uploaders(sd_hash): def start_additional_uploaders(sd_hash):
for i in range(1, num_uploaders): for i in range(1, num_uploaders):

View file

@ -2,13 +2,12 @@ from twisted.internet import defer, threads, error
from twisted.trial import unittest from twisted.trial import unittest
from lbrynet import conf from lbrynet import conf
from lbrynet import lbry_file from lbrynet.core.StreamDescriptor import get_sd_info
from lbrynet import reflector from lbrynet import reflector
from lbrynet.core import BlobManager from lbrynet.core import BlobManager
from lbrynet.core import PeerManager from lbrynet.core import PeerManager
from lbrynet.core import Session from lbrynet.core import Session
from lbrynet.core import StreamDescriptor from lbrynet.core import StreamDescriptor
from lbrynet.lbry_file import EncryptedFileMetadataManager
from lbrynet.lbry_file.client import EncryptedFileOptions from lbrynet.lbry_file.client import EncryptedFileOptions
from lbrynet.file_manager import EncryptedFileCreator from lbrynet.file_manager import EncryptedFileCreator
from lbrynet.file_manager import EncryptedFileManager from lbrynet.file_manager import EncryptedFileManager
@ -21,7 +20,6 @@ class TestReflector(unittest.TestCase):
def setUp(self): def setUp(self):
mocks.mock_conf_settings(self) mocks.mock_conf_settings(self)
self.session = None self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None self.lbry_file_manager = None
self.server_blob_manager = None self.server_blob_manager = None
self.reflector_port = None self.reflector_port = None
@ -66,11 +64,8 @@ class TestReflector(unittest.TestCase):
external_ip="127.0.0.1" external_ip="127.0.0.1"
) )
self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager( self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(self.session,
self.db_dir) sd_identifier)
self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
## Setup reflector server classes ## ## Setup reflector server classes ##
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir() self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
@ -88,26 +83,25 @@ class TestReflector(unittest.TestCase):
external_ip="127.0.0.1" external_ip="127.0.0.1"
) )
self.server_blob_manager = BlobManager.DiskBlobManager( self.server_blob_manager = BlobManager.DiskBlobManager(hash_announcer,
hash_announcer, self.server_blob_dir, self.server_db_dir) self.server_blob_dir,
self.server_stream_info_manager = \ self.server_session.storage)
EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(self.server_db_dir)
self.server_lbry_file_manager = EncryptedFileManager.EncryptedFileManager( self.server_lbry_file_manager = EncryptedFileManager.EncryptedFileManager(
self.server_session, self.server_stream_info_manager, self.server_session, sd_identifier)
sd_identifier)
d = self.session.setup() d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier)) d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.server_session.setup()) d.addCallback(lambda _: self.server_session.setup())
d.addCallback(lambda _: self.server_blob_manager.setup()) d.addCallback(lambda _: self.server_blob_manager.setup())
d.addCallback(lambda _: self.server_stream_info_manager.setup())
d.addCallback(lambda _: self.server_lbry_file_manager.setup()) d.addCallback(lambda _: self.server_lbry_file_manager.setup())
def verify_equal(sd_info): @defer.inlineCallbacks
self.assertEqual(mocks.create_stream_sd_file, sd_info) def verify_equal(sd_info, stream_hash):
self.assertDictEqual(mocks.create_stream_sd_file, sd_info)
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
defer.returnValue(sd_hash)
def save_sd_blob_hash(sd_hash): def save_sd_blob_hash(sd_hash):
self.sd_hash = sd_hash self.sd_hash = sd_hash
@ -115,14 +109,8 @@ class TestReflector(unittest.TestCase):
def verify_stream_descriptor_file(stream_hash): def verify_stream_descriptor_file(stream_hash):
self.stream_hash = stream_hash self.stream_hash = stream_hash
d = lbry_file.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) d = get_sd_info(self.lbry_file_manager.session.storage, stream_hash, True)
d.addCallback(verify_equal) d.addCallback(verify_equal, stream_hash)
d.addCallback(
lambda _: lbry_file.publish_sd_blob(
self.lbry_file_manager.stream_info_manager,
self.session.blob_manager, stream_hash
)
)
d.addCallback(save_sd_blob_hash) d.addCallback(save_sd_blob_hash)
return d return d
@ -136,11 +124,12 @@ class TestReflector(unittest.TestCase):
key="0123456701234567", key="0123456701234567",
iv_generator=iv_generator() iv_generator=iv_generator()
) )
d.addCallback(lambda lbry_file: lbry_file.stream_hash)
return d return d
def start_server(): def start_server():
server_factory = reflector.ServerFactory( server_factory = reflector.ServerFactory(
peer_manager, self.server_blob_manager, self.server_stream_info_manager, peer_manager, self.server_blob_manager,
self.server_lbry_file_manager) self.server_lbry_file_manager)
from twisted.internet import reactor from twisted.internet import reactor
port = 8943 port = 8943
@ -161,13 +150,11 @@ class TestReflector(unittest.TestCase):
## Close client classes ## ## Close client classes ##
d.addCallback(lambda _: self.lbry_file_manager.stop()) d.addCallback(lambda _: self.lbry_file_manager.stop())
d.addCallback(lambda _: self.session.shut_down()) d.addCallback(lambda _: self.session.shut_down())
d.addCallback(lambda _: self.stream_info_manager.stop())
## Close server classes ## ## Close server classes ##
d.addCallback(lambda _: self.server_blob_manager.stop()) d.addCallback(lambda _: self.server_blob_manager.stop())
d.addCallback(lambda _: self.server_lbry_file_manager.stop()) d.addCallback(lambda _: self.server_lbry_file_manager.stop())
d.addCallback(lambda _: self.server_session.shut_down()) d.addCallback(lambda _: self.server_session.shut_down())
d.addCallback(lambda _: self.server_stream_info_manager.stop())
d.addCallback(lambda _: self.reflector_port.stopListening()) d.addCallback(lambda _: self.reflector_port.stopListening())
@ -192,37 +179,32 @@ class TestReflector(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def verify_stream_on_reflector(): def verify_stream_on_reflector():
# check stream_info_manager has all the right information # check stream_info_manager has all the right information
streams = yield self.server_stream_info_manager.get_all_streams() streams = yield self.server_session.storage.get_all_streams()
self.assertEqual(1, len(streams)) self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0]) self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash) blobs = yield self.server_session.storage.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b[0] for b in blobs if b[0] is not None] blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes) self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream( sd_hash = yield self.server_session.storage.get_sd_blob_hash_for_stream(streams[0])
self.stream_hash)
self.assertEqual(1, len(sd_hashes))
expected_sd_hash = self.expected_blobs[-1][0] expected_sd_hash = self.expected_blobs[-1][0]
self.assertEqual(self.sd_hash, sd_hashes[0]) self.assertEqual(self.sd_hash, sd_hash)
# check lbry file manager has the file # check lbry file manager has the file
files = yield self.server_lbry_file_manager.lbry_files files = yield self.server_lbry_file_manager.lbry_files
self.assertEqual(1, len(files))
self.assertEqual(self.sd_hash, files[0].sd_hash)
self.assertEqual('test_file', files[0].file_name)
status = yield files[0].status() self.assertEqual(0, len(files))
self.assertEqual('stopped', status.running_status)
num_blobs = len(self.expected_blobs) -1 # subtract sd hash streams = yield self.server_lbry_file_manager.storage.get_all_streams()
self.assertEqual(num_blobs, status.num_completed) self.assertEqual(1, len(streams))
self.assertEqual(num_blobs, status.num_known) stream_info = yield self.server_lbry_file_manager.storage.get_stream_info(self.stream_hash)
self.assertEqual(self.sd_hash, stream_info[3])
self.assertEqual('test_file'.encode('hex'), stream_info[0])
# check should_announce blobs on blob_manager # check should_announce blobs on blob_manager
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() blob_hashes = yield self.server_blob_manager.storage.get_all_should_announce_blobs()
self.assertEqual(2, len(blob_hashes)) self.assertSetEqual({self.sd_hash, expected_blob_hashes[0]}, set(blob_hashes))
self.assertTrue(self.sd_hash in blob_hashes)
self.assertTrue(expected_blob_hashes[0] in blob_hashes)
def verify_have_blob(blob_hash, blob_size): def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash) d = self.server_blob_manager.get_blob(blob_hash)
@ -231,7 +213,7 @@ class TestReflector(unittest.TestCase):
def send_to_server(): def send_to_server():
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
self.stream_info_manager, self.server_session.storage,
self.stream_hash) self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file) factory = reflector.ClientFactory(fake_lbry_file)
@ -283,10 +265,10 @@ class TestReflector(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def verify_stream_on_reflector(): def verify_stream_on_reflector():
# this protocol should not have any impact on stream info manager # this protocol should not have any impact on stream info manager
streams = yield self.server_stream_info_manager.get_all_streams() streams = yield self.server_session.storage.get_all_streams()
self.assertEqual(0, len(streams)) self.assertEqual(0, len(streams))
# there should be no should announce blobs here # there should be no should announce blobs here
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() blob_hashes = yield self.server_blob_manager.storage.get_all_should_announce_blobs()
self.assertEqual(0, len(blob_hashes)) self.assertEqual(0, len(blob_hashes))
def verify_data_on_reflector(): def verify_data_on_reflector():
@ -333,25 +315,21 @@ class TestReflector(unittest.TestCase):
def verify_stream_on_reflector(): def verify_stream_on_reflector():
# check stream_info_manager has all the right information # check stream_info_manager has all the right information
streams = yield self.server_stream_info_manager.get_all_streams() streams = yield self.server_session.storage.get_all_streams()
self.assertEqual(1, len(streams)) self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0]) self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash) blobs = yield self.server_session.storage.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b[0] for b in blobs if b[0] is not None] blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes) self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream( sd_hash = yield self.server_session.storage.get_sd_blob_hash_for_stream(
self.stream_hash) self.stream_hash)
self.assertEqual(1, len(sd_hashes)) self.assertEqual(self.sd_hash, sd_hash)
expected_sd_hash = self.expected_blobs[-1][0]
self.assertEqual(self.sd_hash, sd_hashes[0])
# check should_announce blobs on blob_manager # check should_announce blobs on blob_manager
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() to_announce = yield self.server_blob_manager.storage.get_all_should_announce_blobs()
self.assertEqual(2, len(blob_hashes)) self.assertSetEqual(set(to_announce), {self.sd_hash, expected_blob_hashes[0]})
self.assertTrue(self.sd_hash in blob_hashes)
self.assertTrue(expected_blob_hashes[0] in blob_hashes)
def verify_have_blob(blob_hash, blob_size): def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash) d = self.server_blob_manager.get_blob(blob_hash)
@ -371,7 +349,7 @@ class TestReflector(unittest.TestCase):
def send_to_server_as_stream(result): def send_to_server_as_stream(result):
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
self.stream_info_manager, self.server_session.storage,
self.stream_hash) self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file) factory = reflector.ClientFactory(fake_lbry_file)
@ -379,7 +357,6 @@ class TestReflector(unittest.TestCase):
reactor.connectTCP('localhost', self.port, factory) reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred return factory.finished_deferred
def verify_blob_completed(blob, blob_size): def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.get_is_verified()) self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length) self.assertEqual(blob_size, blob.length)

View file

@ -1,20 +1,18 @@
import logging
import os import os
import shutil import shutil
import tempfile
from Crypto.Hash import MD5 from Crypto.Hash import MD5
from twisted.trial.unittest import TestCase from twisted.trial.unittest import TestCase
from twisted.internet import defer, threads from twisted.internet import defer
from lbrynet import conf from lbrynet import conf
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.lbry_file import publish_sd_blob
from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbry_file.StreamDescriptor import get_sd_info from lbrynet.core.StreamDescriptor import get_sd_info
from lbrynet.core.PeerManager import PeerManager from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter from lbrynet.core.RateLimiter import DummyRateLimiter
@ -31,31 +29,29 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
class TestStreamify(TestCase): class TestStreamify(TestCase):
maxDiff = 5000
def setUp(self): def setUp(self):
mocks.mock_conf_settings(self) mocks.mock_conf_settings(self)
self.session = None self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None self.lbry_file_manager = None
self.addCleanup(self.take_down_env)
self.is_generous = True self.is_generous = True
self.db_dir = tempfile.mkdtemp()
self.blob_dir = os.path.join(self.db_dir, "blobfiles")
os.mkdir(self.blob_dir)
def take_down_env(self): @defer.inlineCallbacks
d = defer.succeed(True) def tearDown(self):
lbry_files = self.lbry_file_manager.lbry_files
for lbry_file in lbry_files:
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
if self.lbry_file_manager is not None: if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop()) yield self.lbry_file_manager.stop()
if self.session is not None: if self.session is not None:
d.addCallback(lambda _: self.session.shut_down()) yield self.session.shut_down()
if self.stream_info_manager is not None: shutil.rmtree(self.db_dir)
d.addCallback(lambda _: self.stream_info_manager.stop())
def delete_test_env():
shutil.rmtree('client')
if os.path.exists("test_file"): if os.path.exists("test_file"):
os.remove("test_file") os.remove("test_file")
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_create_stream(self): def test_create_stream(self):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
@ -64,28 +60,18 @@ class TestStreamify(TestCase):
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session( self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, blob_dir=self.blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=self.is_generous, external_ip="127.0.0.1" is_generous=self.is_generous, external_ip="127.0.0.1"
) )
self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier)
self.lbry_file_manager = EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
d = self.session.setup() d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: self.lbry_file_manager.setup())
@ -93,7 +79,7 @@ class TestStreamify(TestCase):
self.assertEqual(sd_info, test_create_stream_sd_file) self.assertEqual(sd_info, test_create_stream_sd_file)
def verify_stream_descriptor_file(stream_hash): def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) d = get_sd_info(self.session.storage, stream_hash, True)
d.addCallback(verify_equal) d.addCallback(verify_equal)
return d return d
@ -107,6 +93,7 @@ class TestStreamify(TestCase):
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)])) test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
key="0123456701234567", iv_generator=iv_generator()) key="0123456701234567", iv_generator=iv_generator())
d.addCallback(lambda lbry_file: lbry_file.stream_hash)
return d return d
d.addCallback(lambda _: create_stream()) d.addCallback(lambda _: create_stream())
@ -121,57 +108,30 @@ class TestStreamify(TestCase):
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = Session( self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, blob_dir=self.blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1" blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1"
) )
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier)
self.lbry_file_manager = EncryptedFileManager( @defer.inlineCallbacks
self.session, self.stream_info_manager, sd_identifier) def create_stream():
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
def start_lbry_file(lbry_file): lbry_file = yield create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file)
logging.debug("Calling lbry_file.start()") sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)
d = lbry_file.start() self.assertTrue(lbry_file.sd_hash, sd_hash)
return d yield lbry_file.start()
def combine_stream(info):
stream_hash, sd_hash = info
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash, prm)
d.addCallback(start_lbry_file)
def check_md5_sum():
f = open('test_file') f = open('test_file')
hashsum = MD5.new() hashsum = MD5.new()
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b") self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
d.addCallback(lambda _: check_md5_sum())
return d
@defer.inlineCallbacks
def create_stream():
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, "test_file",
test_file, suggested_file_name="test_file")
sd_hash = yield publish_sd_blob(self.stream_info_manager, self.session.blob_manager,
stream_hash)
defer.returnValue((stream_hash, sd_hash))
d = self.session.setup() d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: create_stream()) d.addCallback(lambda _: create_stream())
d.addCallback(combine_stream)
return d return d