lbry-sdk/tests/functional/test_reflector.py
2018-11-09 16:52:46 -05:00

342 lines
14 KiB
Python

import os
from binascii import hexlify
from twisted.internet import defer, error
from twisted.trial import unittest
from lbrynet.p2p.StreamDescriptor import get_sd_info
from lbrynet.extras.reflector.server.server import ReflectorServerFactory
from lbrynet.extras.reflector.client.client import EncryptedFileReflectorClientFactory
from lbrynet.extras.reflector.client.blob import BlobReflectorClientFactory
from lbrynet.extras.daemon.PeerManager import PeerManager
from lbrynet.p2p import BlobManager
from lbrynet.p2p import StreamDescriptor
from lbrynet.blob import EncryptedFileCreator
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
from lbrynet.p2p.RateLimiter import DummyRateLimiter
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
from tests import mocks
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir
class TestReflector(unittest.TestCase):
def setUp(self):
self.reflector_port = None
self.port = None
mocks.mock_conf_settings(self)
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
self.client_db_dir, self.client_blob_dir = mk_db_and_blob_dir()
prm = OnlyFreePaymentsManager()
wallet = mocks.Wallet()
peer_manager = PeerManager()
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
self.server_storage = SQLiteStorage(self.server_db_dir)
self.server_blob_manager = BlobManager.DiskBlobManager(self.server_blob_dir, self.server_storage)
self.client_storage = SQLiteStorage(self.client_db_dir)
self.client_blob_manager = BlobManager.DiskBlobManager(self.client_blob_dir, self.client_storage)
self.server_lbry_file_manager = EncryptedFileManager(
peer_finder, DummyRateLimiter(), self.server_blob_manager, wallet, prm, self.server_storage,
StreamDescriptor.StreamDescriptorIdentifier()
)
self.client_lbry_file_manager = EncryptedFileManager(
peer_finder, DummyRateLimiter(), self.client_blob_manager, wallet, prm, self.client_storage,
StreamDescriptor.StreamDescriptorIdentifier()
)
self.expected_blobs = [
(
'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b'
'441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
2097152
),
(
'f4067522c1b49432a2a679512e3917144317caa1abba0c04'
'1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
2097152
),
(
'305486c434260484fcb2968ce0e963b72f81ba56c11b08b1'
'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
1015056
),
]
d = self.server_storage.setup()
d.addCallback(lambda _: self.server_blob_manager.setup())
d.addCallback(lambda _: self.server_lbry_file_manager.setup())
d.addCallback(lambda _: self.client_storage.setup())
d.addCallback(lambda _: self.client_blob_manager.setup())
d.addCallback(lambda _: self.client_lbry_file_manager.setup())
@defer.inlineCallbacks
def verify_equal(sd_info, stream_hash):
self.assertDictEqual(mocks.create_stream_sd_file, sd_info)
sd_hash = yield self.client_storage.get_sd_blob_hash_for_stream(stream_hash)
defer.returnValue(sd_hash)
def save_sd_blob_hash(sd_hash):
self.sd_hash = sd_hash
self.expected_blobs.append((sd_hash, 923))
def verify_stream_descriptor_file(stream_hash):
self.stream_hash = stream_hash
d = get_sd_info(self.client_storage, stream_hash, True)
d.addCallback(verify_equal, stream_hash)
d.addCallback(save_sd_blob_hash)
return d
def create_stream():
test_file = mocks.GenFile(5209343, bytes((i + 3) for i in range(0, 64, 6)))
d = EncryptedFileCreator.create_lbry_file(
self.client_blob_manager, self.client_storage, prm, self.client_lbry_file_manager,
"test_file",
test_file,
key=b"0123456701234567",
iv_generator=iv_generator()
)
d.addCallback(lambda lbry_file: lbry_file.stream_hash)
return d
def start_server():
server_factory = ReflectorServerFactory(peer_manager, self.server_blob_manager,
self.server_lbry_file_manager)
from twisted.internet import reactor
port = 8943
while self.reflector_port is None:
try:
self.reflector_port = reactor.listenTCP(port, server_factory)
self.port = port
except error.CannotListenError:
port += 1
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
d.addCallback(lambda _: start_server())
return d
@defer.inlineCallbacks
def tearDown(self):
lbry_files = self.client_lbry_file_manager.lbry_files
for lbry_file in lbry_files:
yield self.client_lbry_file_manager.delete_lbry_file(lbry_file)
yield self.client_lbry_file_manager.stop()
yield self.client_blob_manager.stop()
yield self.client_storage.stop()
self.reflector_port.stopListening()
lbry_files = self.server_lbry_file_manager.lbry_files
for lbry_file in lbry_files:
yield self.server_lbry_file_manager.delete_lbry_file(lbry_file)
yield self.server_lbry_file_manager.stop()
yield self.server_blob_manager.stop()
yield self.server_storage.stop()
try:
rm_db_and_blob_dir(self.client_db_dir, self.client_blob_dir)
except Exception as err:
raise unittest.SkipTest("TODO: fix this for windows")
try:
rm_db_and_blob_dir(self.server_db_dir, self.server_blob_dir)
except Exception as err:
raise unittest.SkipTest("TODO: fix this for windows")
if os.path.exists("test_file"):
os.remove("test_file")
def test_stream_reflector(self):
def verify_blob_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
@defer.inlineCallbacks
def verify_stream_on_reflector():
# check stream_info_manager has all the right information
streams = yield self.server_storage.get_all_streams()
self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_storage.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hash = yield self.server_storage.get_sd_blob_hash_for_stream(streams[0])
self.assertEqual(self.sd_hash, sd_hash)
# check lbry file manager has the file
files = yield self.server_lbry_file_manager.lbry_files
self.assertEqual(0, len(files))
streams = yield self.server_storage.get_all_streams()
self.assertEqual(1, len(streams))
stream_info = yield self.server_storage.get_stream_info(self.stream_hash)
self.assertEqual(self.sd_hash, stream_info[3])
self.assertEqual(hexlify(b'test_file').decode(), stream_info[0])
# check should_announce blobs on blob_manager
blob_hashes = yield self.server_storage.get_all_should_announce_blobs()
self.assertSetEqual({self.sd_hash, expected_blob_hashes[0]}, set(blob_hashes))
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def send_to_server():
factory = EncryptedFileReflectorClientFactory(self.client_blob_manager, self.stream_hash, self.sd_hash)
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
return
d = send_to_server()
d.addCallback(lambda _: verify_blob_on_reflector())
d.addCallback(lambda _: verify_stream_on_reflector())
return d
def test_blob_reflector(self):
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def send_to_server(blob_hashes_to_send):
factory = BlobReflectorClientFactory(
self.client_blob_manager,
blob_hashes_to_send
)
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
d = send_to_server([x[0] for x in self.expected_blobs])
d.addCallback(lambda _: verify_data_on_reflector())
return d
def test_blob_reflector_v1(self):
@defer.inlineCallbacks
def verify_stream_on_reflector():
# this protocol should not have any impact on stream info manager
streams = yield self.server_storage.get_all_streams()
self.assertEqual(0, len(streams))
# there should be no should announce blobs here
blob_hashes = yield self.server_storage.get_all_should_announce_blobs()
self.assertEqual(0, len(blob_hashes))
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def send_to_server(blob_hashes_to_send):
factory = BlobReflectorClientFactory(
self.client_blob_manager,
blob_hashes_to_send
)
factory.protocol_version = 0
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
d = send_to_server([x[0] for x in self.expected_blobs])
d.addCallback(lambda _: verify_data_on_reflector())
return d
# test case when we reflect blob, and than that same blob
# is reflected as stream
def test_blob_reflect_and_stream(self):
def verify_blob_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
@defer.inlineCallbacks
def verify_stream_on_reflector():
# check stream_info_manager has all the right information
streams = yield self.server_storage.get_all_streams()
self.assertEqual(1, len(streams))
self.assertEqual(self.stream_hash, streams[0])
blobs = yield self.server_storage.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
self.assertEqual(expected_blob_hashes, blob_hashes)
sd_hash = yield self.server_storage.get_sd_blob_hash_for_stream(
self.stream_hash)
self.assertEqual(self.sd_hash, sd_hash)
# check should_announce blobs on blob_manager
to_announce = yield self.server_storage.get_all_should_announce_blobs()
self.assertSetEqual(set(to_announce), {self.sd_hash, expected_blob_hashes[0]})
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def send_to_server_as_blobs(blob_hashes_to_send):
factory = BlobReflectorClientFactory(
self.client_blob_manager,
blob_hashes_to_send
)
factory.protocol_version = 0
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def send_to_server_as_stream(result):
factory = EncryptedFileReflectorClientFactory(self.client_blob_manager, self.stream_hash, self.sd_hash)
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.get_is_verified())
self.assertEqual(blob_size, blob.length)
# Modify this to change which blobs to send
blobs_to_send = self.expected_blobs
d = send_to_server_as_blobs([x[0] for x in self.expected_blobs])
d.addCallback(send_to_server_as_stream)
d.addCallback(lambda _: verify_blob_on_reflector())
d.addCallback(lambda _: verify_stream_on_reflector())
return d
def iv_generator():
iv = 0
while True:
iv += 1
yield b"%016d" % iv