add blob reflector and v1 tests
This commit is contained in:
parent
f614a13db4
commit
4c76e1b98f
1 changed files with 111 additions and 51 deletions
|
@ -29,32 +29,8 @@ class TestReflector(unittest.TestCase):
|
||||||
self.lbry_file_manager = None
|
self.lbry_file_manager = None
|
||||||
self.server_blob_manager = None
|
self.server_blob_manager = None
|
||||||
self.reflector_port = None
|
self.reflector_port = None
|
||||||
|
self.port = None
|
||||||
self.addCleanup(self.take_down_env)
|
self.addCleanup(self.take_down_env)
|
||||||
|
|
||||||
def take_down_env(self):
|
|
||||||
d = defer.succeed(True)
|
|
||||||
if self.lbry_file_manager is not None:
|
|
||||||
d.addCallback(lambda _: self.lbry_file_manager.stop())
|
|
||||||
if self.session is not None:
|
|
||||||
d.addCallback(lambda _: self.session.shut_down())
|
|
||||||
if self.stream_info_manager is not None:
|
|
||||||
d.addCallback(lambda _: self.stream_info_manager.stop())
|
|
||||||
if self.server_blob_manager is not None:
|
|
||||||
d.addCallback(lambda _: self.server_blob_manager.stop())
|
|
||||||
if self.reflector_port is not None:
|
|
||||||
d.addCallback(lambda _: self.reflector_port.stopListening())
|
|
||||||
|
|
||||||
def delete_test_env():
|
|
||||||
try:
|
|
||||||
shutil.rmtree('client')
|
|
||||||
except:
|
|
||||||
raise unittest.SkipTest("TODO: fix this for windows")
|
|
||||||
|
|
||||||
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
|
|
||||||
d.addErrback(lambda err: str(err))
|
|
||||||
return d
|
|
||||||
|
|
||||||
def test_reflector(self):
|
|
||||||
wallet = mocks.Wallet()
|
wallet = mocks.Wallet()
|
||||||
peer_manager = PeerManager.PeerManager()
|
peer_manager = PeerManager.PeerManager()
|
||||||
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
|
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
|
||||||
|
@ -98,7 +74,7 @@ class TestReflector(unittest.TestCase):
|
||||||
dht_node_class=Node
|
dht_node_class=Node
|
||||||
)
|
)
|
||||||
|
|
||||||
self.stream_info_manager = EncryptedFileMetadataManager.TempEncryptedFileMetadataManager()
|
self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(db_dir)
|
||||||
|
|
||||||
self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(
|
self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(
|
||||||
self.session, self.stream_info_manager, sd_identifier)
|
self.session, self.stream_info_manager, sd_identifier)
|
||||||
|
@ -118,6 +94,7 @@ class TestReflector(unittest.TestCase):
|
||||||
self.expected_blobs.append((sd_hash, 923))
|
self.expected_blobs.append((sd_hash, 923))
|
||||||
|
|
||||||
def verify_stream_descriptor_file(stream_hash):
|
def verify_stream_descriptor_file(stream_hash):
|
||||||
|
self.stream_hash = stream_hash
|
||||||
d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
|
d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
|
||||||
d.addCallback(verify_equal)
|
d.addCallback(verify_equal)
|
||||||
d.addCallback(
|
d.addCallback(
|
||||||
|
@ -127,7 +104,6 @@ class TestReflector(unittest.TestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
d.addCallback(save_sd_blob_hash)
|
d.addCallback(save_sd_blob_hash)
|
||||||
d.addCallback(lambda _: stream_hash)
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def create_stream():
|
def create_stream():
|
||||||
|
@ -149,45 +125,129 @@ class TestReflector(unittest.TestCase):
|
||||||
while self.reflector_port is None:
|
while self.reflector_port is None:
|
||||||
try:
|
try:
|
||||||
self.reflector_port = reactor.listenTCP(port, server_factory)
|
self.reflector_port = reactor.listenTCP(port, server_factory)
|
||||||
|
self.port = port
|
||||||
except error.CannotListenError:
|
except error.CannotListenError:
|
||||||
port += 1
|
port += 1
|
||||||
return defer.succeed(port)
|
|
||||||
|
|
||||||
def send_to_server(port, stream_hash):
|
d.addCallback(lambda _: create_stream())
|
||||||
factory = reflector.ClientFactory(
|
d.addCallback(verify_stream_descriptor_file)
|
||||||
self.session.blob_manager,
|
d.addCallback(lambda _: start_server())
|
||||||
self.stream_info_manager,
|
|
||||||
stream_hash
|
|
||||||
)
|
|
||||||
|
|
||||||
from twisted.internet import reactor
|
|
||||||
reactor.connectTCP('localhost', port, factory)
|
|
||||||
return factory.finished_deferred
|
|
||||||
|
|
||||||
def verify_blob_completed(blob, blob_size):
|
|
||||||
self.assertTrue(blob.is_validated())
|
|
||||||
self.assertEqual(blob_size, blob.length)
|
|
||||||
|
|
||||||
def verify_have_blob(blob_hash, blob_size):
|
|
||||||
d = self.server_blob_manager.get_blob(blob_hash, True)
|
|
||||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def take_down_env(self):
|
||||||
|
d = defer.succeed(True)
|
||||||
|
if self.lbry_file_manager is not None:
|
||||||
|
d.addCallback(lambda _: self.lbry_file_manager.stop())
|
||||||
|
if self.session is not None:
|
||||||
|
d.addCallback(lambda _: self.session.shut_down())
|
||||||
|
if self.stream_info_manager is not None:
|
||||||
|
d.addCallback(lambda _: self.stream_info_manager.stop())
|
||||||
|
if self.server_blob_manager is not None:
|
||||||
|
d.addCallback(lambda _: self.server_blob_manager.stop())
|
||||||
|
if self.reflector_port is not None:
|
||||||
|
d.addCallback(lambda _: self.reflector_port.stopListening())
|
||||||
|
|
||||||
|
def delete_test_env():
|
||||||
|
try:
|
||||||
|
shutil.rmtree('client')
|
||||||
|
except:
|
||||||
|
raise unittest.SkipTest("TODO: fix this for windows")
|
||||||
|
|
||||||
|
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
|
||||||
|
d.addErrback(lambda err: str(err))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def test_stream_reflector(self):
|
||||||
def verify_data_on_reflector():
|
def verify_data_on_reflector():
|
||||||
check_blob_ds = []
|
check_blob_ds = []
|
||||||
for blob_hash, blob_size in self.expected_blobs:
|
for blob_hash, blob_size in self.expected_blobs:
|
||||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||||
return defer.DeferredList(check_blob_ds)
|
return defer.DeferredList(check_blob_ds)
|
||||||
|
|
||||||
def upload_to_reflector(stream_hash):
|
def verify_have_blob(blob_hash, blob_size):
|
||||||
d = start_server()
|
d = self.server_blob_manager.get_blob(blob_hash, True)
|
||||||
d.addCallback(lambda port: send_to_server(port, stream_hash))
|
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def send_to_server():
|
||||||
|
factory = reflector.ClientFactory(
|
||||||
|
self.session.blob_manager,
|
||||||
|
self.stream_info_manager,
|
||||||
|
self.stream_hash
|
||||||
|
)
|
||||||
|
|
||||||
|
from twisted.internet import reactor
|
||||||
|
reactor.connectTCP('localhost', self.port, factory)
|
||||||
|
return factory.finished_deferred
|
||||||
|
|
||||||
|
def verify_blob_completed(blob, blob_size):
|
||||||
|
self.assertTrue(blob.is_validated())
|
||||||
|
self.assertEqual(blob_size, blob.length)
|
||||||
|
return
|
||||||
|
|
||||||
|
d = send_to_server()
|
||||||
d.addCallback(lambda _: verify_data_on_reflector())
|
d.addCallback(lambda _: verify_data_on_reflector())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
d.addCallback(lambda _: create_stream())
|
def test_blob_reflector(self):
|
||||||
d.addCallback(verify_stream_descriptor_file)
|
def verify_data_on_reflector():
|
||||||
d.addCallback(upload_to_reflector)
|
check_blob_ds = []
|
||||||
|
for blob_hash, blob_size in self.expected_blobs:
|
||||||
|
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||||
|
return defer.DeferredList(check_blob_ds)
|
||||||
|
|
||||||
|
def verify_have_blob(blob_hash, blob_size):
|
||||||
|
d = self.server_blob_manager.get_blob(blob_hash, True)
|
||||||
|
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def send_to_server(blob_hashes_to_send):
|
||||||
|
factory = reflector.BlobClientFactory(
|
||||||
|
self.session.blob_manager,
|
||||||
|
blob_hashes_to_send
|
||||||
|
)
|
||||||
|
|
||||||
|
from twisted.internet import reactor
|
||||||
|
reactor.connectTCP('localhost', self.port, factory)
|
||||||
|
return factory.finished_deferred
|
||||||
|
|
||||||
|
def verify_blob_completed(blob, blob_size):
|
||||||
|
self.assertTrue(blob.is_validated())
|
||||||
|
self.assertEqual(blob_size, blob.length)
|
||||||
|
|
||||||
|
d = send_to_server([x[0] for x in self.expected_blobs])
|
||||||
|
d.addCallback(lambda _: verify_data_on_reflector())
|
||||||
|
return d
|
||||||
|
|
||||||
|
def test_blob_reflector_v1(self):
|
||||||
|
def verify_data_on_reflector():
|
||||||
|
check_blob_ds = []
|
||||||
|
for blob_hash, blob_size in self.expected_blobs:
|
||||||
|
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||||
|
return defer.DeferredList(check_blob_ds)
|
||||||
|
|
||||||
|
def verify_have_blob(blob_hash, blob_size):
|
||||||
|
d = self.server_blob_manager.get_blob(blob_hash, True)
|
||||||
|
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def send_to_server(blob_hashes_to_send):
|
||||||
|
factory = reflector.BlobClientFactory(
|
||||||
|
self.session.blob_manager,
|
||||||
|
blob_hashes_to_send
|
||||||
|
)
|
||||||
|
factory.protocol_version = 0
|
||||||
|
|
||||||
|
from twisted.internet import reactor
|
||||||
|
reactor.connectTCP('localhost', self.port, factory)
|
||||||
|
return factory.finished_deferred
|
||||||
|
|
||||||
|
def verify_blob_completed(blob, blob_size):
|
||||||
|
self.assertTrue(blob.is_validated())
|
||||||
|
self.assertEqual(blob_size, blob.length)
|
||||||
|
|
||||||
|
d = send_to_server([x[0] for x in self.expected_blobs])
|
||||||
|
d.addCallback(lambda _: verify_data_on_reflector())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue