add tests for checking stream info on reflector server
This commit is contained in:
parent
32feb628c3
commit
4d5ba94a6e
2 changed files with 38 additions and 3 deletions
|
@ -230,6 +230,14 @@ class DiskBlobManager(DHTHashSupplier):
|
||||||
d = self.db_conn.runQuery("select blob_hash from blobs")
|
d = self.db_conn.runQuery("select blob_hash from blobs")
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@rerun_if_locked
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_all_should_announce_blob_hashes(self):
|
||||||
|
# return a list of blob hashes where should_announce is True
|
||||||
|
blob_hashes = yield self.db_conn.runQuery(
|
||||||
|
"select blob_hash from blobs where should_announce = 1")
|
||||||
|
defer.returnValue([d[0] for d in blob_hashes])
|
||||||
|
|
||||||
@rerun_if_locked
|
@rerun_if_locked
|
||||||
def _get_all_verified_blob_hashes(self):
|
def _get_all_verified_blob_hashes(self):
|
||||||
d = self._get_all_blob_hashes()
|
d = self._get_all_blob_hashes()
|
||||||
|
|
|
@ -81,17 +81,21 @@ class TestReflector(unittest.TestCase):
|
||||||
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
|
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
|
||||||
self.server_blob_manager = BlobManager.DiskBlobManager(
|
self.server_blob_manager = BlobManager.DiskBlobManager(
|
||||||
hash_announcer, self.server_blob_dir, self.server_db_dir)
|
hash_announcer, self.server_blob_dir, self.server_db_dir)
|
||||||
|
self.server_stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(self.server_db_dir)
|
||||||
|
|
||||||
|
|
||||||
d = self.session.setup()
|
d = self.session.setup()
|
||||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||||
d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier))
|
d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier))
|
||||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||||
d.addCallback(lambda _: self.server_blob_manager.setup())
|
d.addCallback(lambda _: self.server_blob_manager.setup())
|
||||||
|
d.addCallback(lambda _: self.server_stream_info_manager.setup())
|
||||||
|
|
||||||
def verify_equal(sd_info):
|
def verify_equal(sd_info):
|
||||||
self.assertEqual(mocks.create_stream_sd_file, sd_info)
|
self.assertEqual(mocks.create_stream_sd_file, sd_info)
|
||||||
|
|
||||||
def save_sd_blob_hash(sd_hash):
|
def save_sd_blob_hash(sd_hash):
|
||||||
|
self.sd_hash = sd_hash
|
||||||
self.expected_blobs.append((sd_hash, 923))
|
self.expected_blobs.append((sd_hash, 923))
|
||||||
|
|
||||||
def verify_stream_descriptor_file(stream_hash):
|
def verify_stream_descriptor_file(stream_hash):
|
||||||
|
@ -120,7 +124,7 @@ class TestReflector(unittest.TestCase):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def start_server():
|
def start_server():
|
||||||
server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager)
|
server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager, self.server_stream_info_manager)
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
port = 8943
|
port = 8943
|
||||||
while self.reflector_port is None:
|
while self.reflector_port is None:
|
||||||
|
@ -160,12 +164,34 @@ class TestReflector(unittest.TestCase):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_stream_reflector(self):
|
def test_stream_reflector(self):
|
||||||
def verify_data_on_reflector():
|
def verify_blob_on_reflector():
|
||||||
check_blob_ds = []
|
check_blob_ds = []
|
||||||
for blob_hash, blob_size in self.expected_blobs:
|
for blob_hash, blob_size in self.expected_blobs:
|
||||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||||
return defer.DeferredList(check_blob_ds)
|
return defer.DeferredList(check_blob_ds)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def verify_stream_on_reflector():
|
||||||
|
# check stream_info_manager has all the right information
|
||||||
|
streams = yield self.server_stream_info_manager.get_all_streams()
|
||||||
|
self.assertEqual(1, len(streams))
|
||||||
|
self.assertEqual(self.stream_hash, streams[0])
|
||||||
|
|
||||||
|
blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||||
|
blob_hashes = [b[0] for b in blobs if b[0] is not None]
|
||||||
|
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
|
||||||
|
self.assertEqual(expected_blob_hashes, blob_hashes)
|
||||||
|
sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
|
||||||
|
self.assertEqual(1, len(sd_hashes))
|
||||||
|
expected_sd_hash = self.expected_blobs[-1][0]
|
||||||
|
self.assertEqual(self.sd_hash, sd_hashes[0])
|
||||||
|
|
||||||
|
# check should_announce blobs on blob_manager
|
||||||
|
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
|
||||||
|
self.assertEqual(2, len(blob_hashes))
|
||||||
|
self.assertTrue(self.sd_hash in blob_hashes)
|
||||||
|
self.assertTrue(expected_blob_hashes[0] in blob_hashes)
|
||||||
|
|
||||||
def verify_have_blob(blob_hash, blob_size):
|
def verify_have_blob(blob_hash, blob_size):
|
||||||
d = self.server_blob_manager.get_blob(blob_hash)
|
d = self.server_blob_manager.get_blob(blob_hash)
|
||||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||||
|
@ -187,7 +213,8 @@ class TestReflector(unittest.TestCase):
|
||||||
return
|
return
|
||||||
|
|
||||||
d = send_to_server()
|
d = send_to_server()
|
||||||
d.addCallback(lambda _: verify_data_on_reflector())
|
d.addCallback(lambda _: verify_blob_on_reflector())
|
||||||
|
d.addCallback(lambda _: verify_stream_on_reflector())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_blob_reflector(self):
|
def test_blob_reflector(self):
|
||||||
|
|
Loading…
Add table
Reference in a new issue