forked from LBRYCommunity/lbry-sdk
delete old tests
This commit is contained in:
parent
db75f0baef
commit
bb6112276f
18 changed files with 0 additions and 2844 deletions
|
@ -1,358 +0,0 @@
|
|||
import os
|
||||
from unittest import skip
|
||||
from hashlib import md5
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.trial import unittest
|
||||
from lbrynet import conf
|
||||
from lbrynet.p2p.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
|
||||
from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier
|
||||
from lbrynet.p2p.StreamDescriptor import download_sd_blob
|
||||
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
|
||||
from lbrynet.p2p.BlobManager import DiskBlobManager
|
||||
from lbrynet.extras.daemon.PeerManager import PeerManager
|
||||
from lbrynet.p2p.RateLimiter import RateLimiter
|
||||
from lbrynet.p2p.server.BlobRequestHandler import BlobRequestHandlerFactory
|
||||
from lbrynet.p2p.server.ServerProtocol import ServerProtocolFactory
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.blob.EncryptedFileCreator import create_lbry_file
|
||||
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
|
||||
from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
|
||||
from lbrynet.extras.compat import f2d
|
||||
|
||||
from tests import mocks
|
||||
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir
|
||||
|
||||
FakeNode = mocks.Node
|
||||
FakeWallet = mocks.Wallet
|
||||
FakePeerFinder = mocks.PeerFinder
|
||||
FakeAnnouncer = mocks.Announcer
|
||||
GenFile = mocks.GenFile
|
||||
test_create_stream_sd_file = mocks.create_stream_sd_file
|
||||
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
|
||||
|
||||
|
||||
def init_conf_windows(settings={}):
|
||||
"""
|
||||
There is no fork on windows, so imports
|
||||
are freshly initialized in new processes.
|
||||
So conf needs to be initialized for new processes
|
||||
"""
|
||||
if os.name == 'nt':
|
||||
original_settings = conf.settings
|
||||
conf.settings = conf.Config(conf.FIXED_SETTINGS, conf.ADJUSTABLE_SETTINGS)
|
||||
conf.settings.installation_id = conf.settings.get_installation_id()
|
||||
conf.settings.update(settings)
|
||||
|
||||
|
||||
class LbryUploader:
|
||||
def __init__(self, file_size, ul_rate_limit=None):
|
||||
self.file_size = file_size
|
||||
self.ul_rate_limit = ul_rate_limit
|
||||
self.kill_check = None
|
||||
# these attributes get defined in `start`
|
||||
self.db_dir = None
|
||||
self.blob_dir = None
|
||||
self.wallet = None
|
||||
self.peer_manager = None
|
||||
self.rate_limiter = None
|
||||
self.prm = None
|
||||
self.storage = None
|
||||
self.blob_manager = None
|
||||
self.lbry_file_manager = None
|
||||
self.server_port = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setup(self):
|
||||
init_conf_windows()
|
||||
|
||||
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
|
||||
self.wallet = FakeWallet()
|
||||
self.peer_manager = PeerManager()
|
||||
self.rate_limiter = RateLimiter()
|
||||
if self.ul_rate_limit is not None:
|
||||
self.rate_limiter.set_ul_limit(self.ul_rate_limit)
|
||||
self.prm = OnlyFreePaymentsManager()
|
||||
self.storage = SQLiteStorage(':memory:')
|
||||
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)
|
||||
self.lbry_file_manager = EncryptedFileManager(FakePeerFinder(5553, self.peer_manager, 1), self.rate_limiter,
|
||||
self.blob_manager, self.wallet, self.prm, self.storage,
|
||||
StreamDescriptorIdentifier())
|
||||
|
||||
yield f2d(self.storage.open())
|
||||
yield f2d(self.blob_manager.setup())
|
||||
yield f2d(self.lbry_file_manager.setup())
|
||||
|
||||
query_handler_factories = {
|
||||
1: BlobAvailabilityHandlerFactory(self.blob_manager),
|
||||
2: BlobRequestHandlerFactory(
|
||||
self.blob_manager, self.wallet,
|
||||
self.prm,
|
||||
None),
|
||||
3: self.wallet.get_wallet_info_query_handler_factory(),
|
||||
}
|
||||
server_factory = ServerProtocolFactory(self.rate_limiter,
|
||||
query_handler_factories,
|
||||
self.peer_manager)
|
||||
self.server_port = reactor.listenTCP(5553, server_factory, interface="localhost")
|
||||
test_file = GenFile(self.file_size, bytes(i for i in range(0, 64, 6)))
|
||||
lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.prm, self.lbry_file_manager,
|
||||
"test_file", test_file)
|
||||
defer.returnValue(lbry_file.sd_hash)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self):
|
||||
lbry_files = self.lbry_file_manager.lbry_files
|
||||
for lbry_file in lbry_files:
|
||||
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
|
||||
yield self.lbry_file_manager.stop()
|
||||
yield f2d(self.blob_manager.stop())
|
||||
yield f2d(self.storage.close())
|
||||
self.server_port.stopListening()
|
||||
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
|
||||
if os.path.exists("test_file"):
|
||||
os.remove("test_file")
|
||||
|
||||
|
||||
@skip
|
||||
class TestTransfer(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
mocks.mock_conf_settings(self)
|
||||
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
|
||||
self.wallet = FakeWallet()
|
||||
self.peer_manager = PeerManager()
|
||||
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 1)
|
||||
self.rate_limiter = RateLimiter()
|
||||
self.prm = OnlyFreePaymentsManager()
|
||||
self.storage = SQLiteStorage(':memory:')
|
||||
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)
|
||||
self.sd_identifier = StreamDescriptorIdentifier()
|
||||
self.lbry_file_manager = EncryptedFileManager(self.peer_finder, self.rate_limiter,
|
||||
self.blob_manager, self.wallet, self.prm, self.storage,
|
||||
self.sd_identifier)
|
||||
|
||||
self.uploader = LbryUploader(5209343)
|
||||
self.sd_hash = yield self.uploader.setup()
|
||||
yield f2d(self.storage.open())
|
||||
yield f2d(self.blob_manager.setup())
|
||||
yield f2d(self.lbry_file_manager.setup())
|
||||
yield add_lbry_file_to_sd_identifier(self.sd_identifier)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
yield self.uploader.stop()
|
||||
lbry_files = self.lbry_file_manager.lbry_files
|
||||
for lbry_file in lbry_files:
|
||||
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
|
||||
yield self.lbry_file_manager.stop()
|
||||
yield self.blob_manager.stop()
|
||||
yield f2d(self.storage.close())
|
||||
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
|
||||
if os.path.exists("test_file"):
|
||||
os.remove("test_file")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_lbry_transfer(self):
|
||||
sd_blob = yield download_sd_blob(
|
||||
self.conf, self.sd_hash, self.blob_manager, self.peer_finder, self.rate_limiter, self.prm, self.wallet
|
||||
)
|
||||
metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob)
|
||||
downloader = yield metadata.factories[0].make_downloader(
|
||||
metadata, self.prm.min_blob_data_payment_rate, self.prm, self.db_dir, download_mirrors=None
|
||||
)
|
||||
yield downloader.start()
|
||||
with open(os.path.join(self.db_dir, 'test_file'), 'rb') as f:
|
||||
hashsum = md5()
|
||||
hashsum.update(f.read())
|
||||
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
|
||||
|
||||
# TODO: update these
|
||||
# def test_last_blob_retrieval(self):
|
||||
# kill_event = Event()
|
||||
# dead_event_1 = Event()
|
||||
# blob_hash_queue_1 = Queue()
|
||||
# blob_hash_queue_2 = Queue()
|
||||
# fast_uploader = Process(target=start_blob_uploader,
|
||||
# args=(blob_hash_queue_1, kill_event, dead_event_1, False))
|
||||
# fast_uploader.start()
|
||||
# self.server_processes.append(fast_uploader)
|
||||
# dead_event_2 = Event()
|
||||
# slow_uploader = Process(target=start_blob_uploader,
|
||||
# args=(blob_hash_queue_2, kill_event, dead_event_2, True))
|
||||
# slow_uploader.start()
|
||||
# self.server_processes.append(slow_uploader)
|
||||
#
|
||||
# logging.debug("Testing transfer")
|
||||
#
|
||||
# wallet = FakeWallet()
|
||||
# peer_manager = PeerManager()
|
||||
# peer_finder = FakePeerFinder(5553, peer_manager, 2)
|
||||
# hash_announcer = FakeAnnouncer()
|
||||
# rate_limiter = DummyRateLimiter()
|
||||
# dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553,
|
||||
# node_id="abcd", externalIP="127.0.0.1")
|
||||
#
|
||||
# db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
# self.session = Session(
|
||||
# conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
|
||||
# peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
# blob_dir=blob_dir, peer_port=5553, dht_node_port=4445,
|
||||
# rate_limiter=rate_limiter, wallet=wallet,
|
||||
# dht_node=dht_node, external_ip="127.0.0.1")
|
||||
#
|
||||
# d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
|
||||
# d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
|
||||
# d = defer.DeferredList([d1, d2], fireOnOneErrback=True)
|
||||
#
|
||||
# def get_blob_hash(results):
|
||||
# self.assertEqual(results[0][1], results[1][1])
|
||||
# return results[0][1]
|
||||
#
|
||||
# d.addCallback(get_blob_hash)
|
||||
#
|
||||
# def download_blob(blob_hash):
|
||||
# prm = self.session.payment_rate_manager
|
||||
# downloader = StandaloneBlobDownloader(
|
||||
# blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet)
|
||||
# d = downloader.download()
|
||||
# return d
|
||||
#
|
||||
# def start_transfer(blob_hash):
|
||||
#
|
||||
# logging.debug("Starting the transfer")
|
||||
#
|
||||
# d = self.session.setup()
|
||||
# d.addCallback(lambda _: download_blob(blob_hash))
|
||||
#
|
||||
# return d
|
||||
#
|
||||
# d.addCallback(start_transfer)
|
||||
#
|
||||
# def stop(arg):
|
||||
# if isinstance(arg, Failure):
|
||||
# logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback())
|
||||
# else:
|
||||
# logging.debug("Client is stopping normally.")
|
||||
# kill_event.set()
|
||||
# logging.debug("Set the kill event")
|
||||
# d1 = self.wait_for_event(dead_event_1, 15)
|
||||
# d2 = self.wait_for_event(dead_event_2, 15)
|
||||
# dl = defer.DeferredList([d1, d2])
|
||||
#
|
||||
# def print_shutting_down():
|
||||
# logging.info("Client is shutting down")
|
||||
#
|
||||
# dl.addCallback(lambda _: print_shutting_down())
|
||||
# dl.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir))
|
||||
# dl.addCallback(lambda _: arg)
|
||||
# return dl
|
||||
#
|
||||
# d.addBoth(stop)
|
||||
# return d
|
||||
#
|
||||
# def test_double_download(self):
|
||||
# sd_hash_queue = Queue()
|
||||
# kill_event = Event()
|
||||
# dead_event = Event()
|
||||
# lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343)
|
||||
# uploader = Process(target=lbry_uploader.start)
|
||||
# uploader.start()
|
||||
# self.server_processes.append(uploader)
|
||||
#
|
||||
# logging.debug("Testing double download")
|
||||
#
|
||||
# wallet = FakeWallet()
|
||||
# peer_manager = PeerManager()
|
||||
# peer_finder = FakePeerFinder(5553, peer_manager, 1)
|
||||
# hash_announcer = FakeAnnouncer()
|
||||
# rate_limiter = DummyRateLimiter()
|
||||
# sd_identifier = StreamDescriptorIdentifier()
|
||||
# dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553,
|
||||
# node_id="abcd", externalIP="127.0.0.1")
|
||||
#
|
||||
# downloaders = []
|
||||
#
|
||||
# db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
# self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
|
||||
# node_id="abcd", peer_finder=peer_finder, dht_node_port=4445,
|
||||
# hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553,
|
||||
# rate_limiter=rate_limiter, wallet=wallet,
|
||||
# external_ip="127.0.0.1", dht_node=dht_node)
|
||||
#
|
||||
# self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier)
|
||||
#
|
||||
# @defer.inlineCallbacks
|
||||
# def make_downloader(metadata, prm):
|
||||
# factories = metadata.factories
|
||||
# downloader = yield factories[0].make_downloader(metadata, prm.min_blob_data_payment_rate, prm, db_dir)
|
||||
# defer.returnValue(downloader)
|
||||
#
|
||||
# @defer.inlineCallbacks
|
||||
# def download_file(sd_hash):
|
||||
# prm = self.session.payment_rate_manager
|
||||
# sd_blob = yield download_sd_blob(self.session, sd_hash, prm)
|
||||
# metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob)
|
||||
# downloader = yield make_downloader(metadata, prm)
|
||||
# downloaders.append(downloader)
|
||||
# yield downloader.start()
|
||||
# defer.returnValue(downloader)
|
||||
#
|
||||
# def check_md5_sum():
|
||||
# f = open(os.path.join(db_dir, 'test_file'))
|
||||
# hashsum = md5()
|
||||
# hashsum.update(f.read())
|
||||
# self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
|
||||
#
|
||||
# def delete_lbry_file(downloader):
|
||||
# logging.debug("deleting the file")
|
||||
# return self.lbry_file_manager.delete_lbry_file(downloader)
|
||||
#
|
||||
# def check_lbry_file(downloader):
|
||||
# d = downloader.status()
|
||||
#
|
||||
# def check_status_report(status_report):
|
||||
# self.assertEqual(status_report.num_known, status_report.num_completed)
|
||||
# self.assertEqual(status_report.num_known, 3)
|
||||
#
|
||||
# d.addCallback(check_status_report)
|
||||
# return d
|
||||
#
|
||||
# @defer.inlineCallbacks
|
||||
# def start_transfer(sd_hash):
|
||||
# # download a file, delete it, and download it again
|
||||
#
|
||||
# logging.debug("Starting the transfer")
|
||||
# yield self.session.setup()
|
||||
# yield add_lbry_file_to_sd_identifier(sd_identifier)
|
||||
# yield self.lbry_file_manager.setup()
|
||||
# downloader = yield download_file(sd_hash)
|
||||
# yield check_md5_sum()
|
||||
# yield check_lbry_file(downloader)
|
||||
# yield delete_lbry_file(downloader)
|
||||
# downloader = yield download_file(sd_hash)
|
||||
# yield check_lbry_file(downloader)
|
||||
# yield check_md5_sum()
|
||||
# yield delete_lbry_file(downloader)
|
||||
#
|
||||
# def stop(arg):
|
||||
# if isinstance(arg, Failure):
|
||||
# logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback())
|
||||
# else:
|
||||
# logging.debug("Client is stopping normally.")
|
||||
# kill_event.set()
|
||||
# logging.debug("Set the kill event")
|
||||
# d = self.wait_for_event(dead_event, 15)
|
||||
#
|
||||
# def print_shutting_down():
|
||||
# logging.info("Client is shutting down")
|
||||
#
|
||||
# d.addCallback(lambda _: print_shutting_down())
|
||||
# d.addCallback(lambda _: rm_db_and_blob_dir(db_dir, blob_dir))
|
||||
# d.addCallback(lambda _: arg)
|
||||
# return d
|
||||
#
|
||||
# d = self.wait_for_hash_from_queue(sd_hash_queue)
|
||||
# d.addCallback(start_transfer)
|
||||
# d.addBoth(stop)
|
||||
# return d
|
|
@ -1,343 +0,0 @@
|
|||
import os
|
||||
from unittest import skip
|
||||
from binascii import hexlify
|
||||
|
||||
from twisted.internet import defer, error
|
||||
from twisted.trial import unittest
|
||||
from lbrynet.p2p.StreamDescriptor import get_sd_info
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.extras.reflector.server.server import ReflectorServerFactory
|
||||
from lbrynet.extras.reflector.client.client import EncryptedFileReflectorClientFactory
|
||||
from lbrynet.extras.reflector.client.blob import BlobReflectorClientFactory
|
||||
from lbrynet.extras.daemon.PeerManager import PeerManager
|
||||
from lbrynet.p2p import BlobManager
|
||||
from lbrynet.p2p import StreamDescriptor
|
||||
from lbrynet.blob import EncryptedFileCreator
|
||||
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
|
||||
from lbrynet.p2p.RateLimiter import DummyRateLimiter
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
|
||||
from tests import mocks
|
||||
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir
|
||||
|
||||
|
||||
@skip
|
||||
class TestReflector(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
self.reflector_port = None
|
||||
self.port = None
|
||||
mocks.mock_conf_settings(self)
|
||||
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
|
||||
self.client_db_dir, self.client_blob_dir = mk_db_and_blob_dir()
|
||||
prm = OnlyFreePaymentsManager()
|
||||
wallet = mocks.Wallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
|
||||
self.server_storage = SQLiteStorage(':memory:')
|
||||
self.server_blob_manager = BlobManager.DiskBlobManager(self.server_blob_dir, self.server_storage)
|
||||
self.client_storage = SQLiteStorage(':memory:')
|
||||
self.client_blob_manager = BlobManager.DiskBlobManager(self.client_blob_dir, self.client_storage)
|
||||
self.server_lbry_file_manager = EncryptedFileManager(
|
||||
peer_finder, DummyRateLimiter(), self.server_blob_manager, wallet, prm, self.server_storage,
|
||||
StreamDescriptor.StreamDescriptorIdentifier()
|
||||
)
|
||||
self.client_lbry_file_manager = EncryptedFileManager(
|
||||
peer_finder, DummyRateLimiter(), self.client_blob_manager, wallet, prm, self.client_storage,
|
||||
StreamDescriptor.StreamDescriptorIdentifier()
|
||||
)
|
||||
|
||||
self.expected_blobs = [
|
||||
(
|
||||
'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b'
|
||||
'441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
|
||||
2097152
|
||||
),
|
||||
(
|
||||
'f4067522c1b49432a2a679512e3917144317caa1abba0c04'
|
||||
'1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
|
||||
2097152
|
||||
),
|
||||
(
|
||||
'305486c434260484fcb2968ce0e963b72f81ba56c11b08b1'
|
||||
'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
|
||||
1015056
|
||||
),
|
||||
]
|
||||
|
||||
yield f2d(self.server_storage.open())
|
||||
yield f2d(self.server_blob_manager.setup())
|
||||
yield f2d(self.server_lbry_file_manager.setup())
|
||||
yield f2d(self.client_storage.open())
|
||||
yield f2d(self.client_blob_manager.setup())
|
||||
yield f2d(self.client_lbry_file_manager.setup())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def verify_equal(sd_info, stream_hash):
|
||||
self.assertDictEqual(mocks.create_stream_sd_file, sd_info)
|
||||
sd_hash = yield f2d(self.client_storage.get_sd_blob_hash_for_stream(stream_hash))
|
||||
defer.returnValue(sd_hash)
|
||||
|
||||
def save_sd_blob_hash(sd_hash):
|
||||
self.sd_hash = sd_hash
|
||||
self.expected_blobs.append((sd_hash, 923))
|
||||
|
||||
def verify_stream_descriptor_file(stream_hash):
|
||||
self.stream_hash = stream_hash
|
||||
d = f2d(get_sd_info(self.client_storage, stream_hash, True))
|
||||
d.addCallback(verify_equal, stream_hash)
|
||||
d.addCallback(save_sd_blob_hash)
|
||||
return d
|
||||
|
||||
def create_stream():
|
||||
test_file = mocks.GenFile(5209343, bytes((i + 3) for i in range(0, 64, 6)))
|
||||
d = EncryptedFileCreator.create_lbry_file(
|
||||
self.client_blob_manager, self.client_storage, prm, self.client_lbry_file_manager,
|
||||
"test_file",
|
||||
test_file,
|
||||
key=b"0123456701234567",
|
||||
iv_generator=iv_generator()
|
||||
)
|
||||
d.addCallback(lambda lbry_file: lbry_file.stream_hash)
|
||||
return d
|
||||
|
||||
def start_server():
|
||||
server_factory = ReflectorServerFactory(peer_manager, self.server_blob_manager,
|
||||
self.server_lbry_file_manager)
|
||||
from twisted.internet import reactor
|
||||
port = 8943
|
||||
while self.reflector_port is None:
|
||||
try:
|
||||
self.reflector_port = reactor.listenTCP(port, server_factory)
|
||||
self.port = port
|
||||
except error.CannotListenError:
|
||||
port += 1
|
||||
|
||||
stream_hash = yield create_stream()
|
||||
yield verify_stream_descriptor_file(stream_hash)
|
||||
yield start_server()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
lbry_files = self.client_lbry_file_manager.lbry_files
|
||||
for lbry_file in lbry_files:
|
||||
yield self.client_lbry_file_manager.delete_lbry_file(lbry_file)
|
||||
yield self.client_lbry_file_manager.stop()
|
||||
yield f2d(self.client_storage.close())
|
||||
self.reflector_port.stopListening()
|
||||
lbry_files = self.server_lbry_file_manager.lbry_files
|
||||
for lbry_file in lbry_files:
|
||||
yield self.server_lbry_file_manager.delete_lbry_file(lbry_file)
|
||||
yield self.server_lbry_file_manager.stop()
|
||||
yield f2d(self.server_storage.close())
|
||||
try:
|
||||
rm_db_and_blob_dir(self.client_db_dir, self.client_blob_dir)
|
||||
except Exception as err:
|
||||
raise unittest.SkipTest("TODO: fix this for windows")
|
||||
try:
|
||||
rm_db_and_blob_dir(self.server_db_dir, self.server_blob_dir)
|
||||
except Exception as err:
|
||||
raise unittest.SkipTest("TODO: fix this for windows")
|
||||
if os.path.exists("test_file"):
|
||||
os.remove("test_file")
|
||||
|
||||
def test_stream_reflector(self):
|
||||
def verify_blob_on_reflector():
|
||||
check_blob_ds = []
|
||||
for blob_hash, blob_size in self.expected_blobs:
|
||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||
return defer.DeferredList(check_blob_ds)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def verify_stream_on_reflector():
|
||||
# check stream_info_manager has all the right information
|
||||
streams = yield f2d(self.server_storage.get_all_streams())
|
||||
self.assertEqual(1, len(streams))
|
||||
self.assertEqual(self.stream_hash, streams[0])
|
||||
|
||||
blobs = yield f2d(self.server_storage.get_blobs_for_stream(self.stream_hash))
|
||||
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
|
||||
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
|
||||
self.assertEqual(expected_blob_hashes, blob_hashes)
|
||||
sd_hash = yield f2d(self.server_storage.get_sd_blob_hash_for_stream(streams[0]))
|
||||
self.assertEqual(self.sd_hash, sd_hash)
|
||||
|
||||
# check lbry file manager has the file
|
||||
files = yield self.server_lbry_file_manager.lbry_files
|
||||
|
||||
self.assertEqual(0, len(files))
|
||||
|
||||
streams = yield f2d(self.server_storage.get_all_streams())
|
||||
self.assertEqual(1, len(streams))
|
||||
stream_info = yield f2d(self.server_storage.get_stream_info(self.stream_hash))
|
||||
self.assertEqual(self.sd_hash, stream_info[3])
|
||||
self.assertEqual(hexlify(b'test_file').decode(), stream_info[0])
|
||||
|
||||
# check should_announce blobs on blob_manager
|
||||
blob_hashes = yield f2d(self.server_storage.get_all_should_announce_blobs())
|
||||
self.assertSetEqual({self.sd_hash, expected_blob_hashes[0]}, set(blob_hashes))
|
||||
|
||||
def verify_have_blob(blob_hash, blob_size):
|
||||
d = self.server_blob_manager.get_blob(blob_hash)
|
||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||
return d
|
||||
|
||||
def send_to_server():
|
||||
factory = EncryptedFileReflectorClientFactory(self.client_blob_manager, self.stream_hash, self.sd_hash)
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
return factory.finished_deferred
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
return
|
||||
|
||||
d = send_to_server()
|
||||
d.addCallback(lambda _: verify_blob_on_reflector())
|
||||
d.addCallback(lambda _: verify_stream_on_reflector())
|
||||
return d
|
||||
|
||||
def test_blob_reflector(self):
|
||||
def verify_data_on_reflector():
|
||||
check_blob_ds = []
|
||||
for blob_hash, blob_size in self.expected_blobs:
|
||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||
return defer.DeferredList(check_blob_ds)
|
||||
|
||||
def verify_have_blob(blob_hash, blob_size):
|
||||
d = self.server_blob_manager.get_blob(blob_hash)
|
||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||
return d
|
||||
|
||||
def send_to_server(blob_hashes_to_send):
|
||||
factory = BlobReflectorClientFactory(
|
||||
self.client_blob_manager,
|
||||
blob_hashes_to_send
|
||||
)
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
return factory.finished_deferred
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
|
||||
d = send_to_server([x[0] for x in self.expected_blobs])
|
||||
d.addCallback(lambda _: verify_data_on_reflector())
|
||||
return d
|
||||
|
||||
def test_blob_reflector_v1(self):
|
||||
@defer.inlineCallbacks
|
||||
def verify_stream_on_reflector():
|
||||
# this protocol should not have any impact on stream info manager
|
||||
streams = yield f2d(self.server_storage.get_all_streams())
|
||||
self.assertEqual(0, len(streams))
|
||||
# there should be no should announce blobs here
|
||||
blob_hashes = yield f2d(self.server_storage.get_all_should_announce_blobs())
|
||||
self.assertEqual(0, len(blob_hashes))
|
||||
|
||||
def verify_data_on_reflector():
|
||||
check_blob_ds = []
|
||||
for blob_hash, blob_size in self.expected_blobs:
|
||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||
return defer.DeferredList(check_blob_ds)
|
||||
|
||||
def verify_have_blob(blob_hash, blob_size):
|
||||
d = self.server_blob_manager.get_blob(blob_hash)
|
||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||
return d
|
||||
|
||||
def send_to_server(blob_hashes_to_send):
|
||||
factory = BlobReflectorClientFactory(
|
||||
self.client_blob_manager,
|
||||
blob_hashes_to_send
|
||||
)
|
||||
factory.protocol_version = 0
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
return factory.finished_deferred
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
|
||||
d = send_to_server([x[0] for x in self.expected_blobs])
|
||||
d.addCallback(lambda _: verify_data_on_reflector())
|
||||
return d
|
||||
|
||||
# test case when we reflect blob, and than that same blob
|
||||
# is reflected as stream
|
||||
@defer.inlineCallbacks
|
||||
def test_blob_reflect_and_stream(self):
|
||||
|
||||
def verify_blob_on_reflector():
|
||||
check_blob_ds = []
|
||||
for blob_hash, blob_size in self.expected_blobs:
|
||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||
return defer.DeferredList(check_blob_ds)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def verify_stream_on_reflector():
|
||||
# check stream_info_manager has all the right information
|
||||
|
||||
streams = yield f2d(self.server_storage.get_all_streams())
|
||||
self.assertEqual(1, len(streams))
|
||||
self.assertEqual(self.stream_hash, streams[0])
|
||||
|
||||
blobs = yield f2d(self.server_storage.get_blobs_for_stream(self.stream_hash))
|
||||
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
|
||||
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
|
||||
self.assertEqual(expected_blob_hashes, blob_hashes)
|
||||
sd_hash = yield f2d(self.server_storage.get_sd_blob_hash_for_stream(self.stream_hash))
|
||||
self.assertEqual(self.sd_hash, sd_hash)
|
||||
|
||||
# check should_announce blobs on blob_manager
|
||||
to_announce = yield f2d(self.server_storage.get_all_should_announce_blobs())
|
||||
self.assertSetEqual(set(to_announce), {self.sd_hash, expected_blob_hashes[0]})
|
||||
|
||||
def verify_have_blob(blob_hash, blob_size):
|
||||
d = self.server_blob_manager.get_blob(blob_hash)
|
||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||
return d
|
||||
|
||||
def send_to_server_as_blobs(blob_hashes_to_send):
|
||||
factory = BlobReflectorClientFactory(
|
||||
self.client_blob_manager,
|
||||
blob_hashes_to_send
|
||||
)
|
||||
factory.protocol_version = 0
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
return factory.finished_deferred
|
||||
|
||||
def send_to_server_as_stream(result):
|
||||
factory = EncryptedFileReflectorClientFactory(self.client_blob_manager, self.stream_hash, self.sd_hash)
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
return factory.finished_deferred
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
|
||||
# Modify this to change which blobs to send
|
||||
blobs_to_send = self.expected_blobs
|
||||
|
||||
finished = yield send_to_server_as_blobs([x[0] for x in self.expected_blobs])
|
||||
yield send_to_server_as_stream(finished)
|
||||
yield verify_blob_on_reflector()
|
||||
yield verify_stream_on_reflector()
|
||||
|
||||
|
||||
def iv_generator():
|
||||
iv = 0
|
||||
while True:
|
||||
iv += 1
|
||||
yield b"%016d" % iv
|
|
@ -1,109 +0,0 @@
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from hashlib import md5
|
||||
from twisted.trial.unittest import TestCase
|
||||
from twisted.internet import defer, threads
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier
|
||||
from lbrynet.p2p.BlobManager import DiskBlobManager
|
||||
from lbrynet.p2p.StreamDescriptor import get_sd_info
|
||||
from lbrynet.p2p.RateLimiter import DummyRateLimiter
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.extras.daemon.PeerManager import PeerManager
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
|
||||
from lbrynet.blob.EncryptedFileCreator import create_lbry_file
|
||||
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
|
||||
from tests import mocks
|
||||
|
||||
|
||||
FakeNode = mocks.Node
|
||||
FakeWallet = mocks.Wallet
|
||||
FakePeerFinder = mocks.PeerFinder
|
||||
FakeAnnouncer = mocks.Announcer
|
||||
GenFile = mocks.GenFile
|
||||
test_create_stream_sd_file = mocks.create_stream_sd_file
|
||||
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
|
||||
|
||||
|
||||
class TestStreamify(TestCase):
|
||||
maxDiff = 5000
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
self.session = None
|
||||
self.lbry_file_manager = None
|
||||
self.is_generous = True
|
||||
self.db_dir = tempfile.mkdtemp()
|
||||
self.blob_dir = os.path.join(self.db_dir, "blobfiles")
|
||||
conf = Config(data_dir=self.blob_dir)
|
||||
os.mkdir(self.blob_dir)
|
||||
self.dht_node = FakeNode()
|
||||
self.wallet = FakeWallet()
|
||||
self.peer_manager = PeerManager()
|
||||
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2)
|
||||
self.rate_limiter = DummyRateLimiter()
|
||||
self.sd_identifier = StreamDescriptorIdentifier()
|
||||
self.storage = SQLiteStorage(conf, ':memory:')
|
||||
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore)
|
||||
self.prm = OnlyFreePaymentsManager()
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
conf, self.peer_finder, self.rate_limiter, self.blob_manager, self.wallet, self.prm, self.storage,
|
||||
self.sd_identifier
|
||||
)
|
||||
yield f2d(self.storage.open())
|
||||
yield f2d(self.lbry_file_manager.setup())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
lbry_files = self.lbry_file_manager.lbry_files
|
||||
for lbry_file in lbry_files:
|
||||
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
|
||||
yield self.lbry_file_manager.stop()
|
||||
yield f2d(self.storage.close())
|
||||
shutil.rmtree(self.db_dir, ignore_errors=True)
|
||||
if os.path.exists("test_file"):
|
||||
os.remove("test_file")
|
||||
|
||||
def test_create_stream(self):
|
||||
|
||||
def verify_equal(sd_info):
|
||||
self.assertEqual(sd_info, test_create_stream_sd_file)
|
||||
|
||||
def verify_stream_descriptor_file(stream_hash):
|
||||
d = f2d(get_sd_info(self.storage, stream_hash, True))
|
||||
d.addCallback(verify_equal)
|
||||
return d
|
||||
|
||||
def iv_generator():
|
||||
iv = 0
|
||||
while 1:
|
||||
iv += 1
|
||||
yield b"%016d" % iv
|
||||
|
||||
def create_stream():
|
||||
test_file = GenFile(5209343, bytes((i + 3) for i in range(0, 64, 6)))
|
||||
d = create_lbry_file(
|
||||
self.blob_manager, self.storage, self.prm, self.lbry_file_manager, "test_file", test_file,
|
||||
key=b'0123456701234567', iv_generator=iv_generator()
|
||||
)
|
||||
d.addCallback(lambda lbry_file: lbry_file.stream_hash)
|
||||
return d
|
||||
|
||||
d = create_stream()
|
||||
d.addCallback(verify_stream_descriptor_file)
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_create_and_combine_stream(self):
|
||||
test_file = GenFile(53209343, bytes((i + 5) for i in range(0, 64, 6)))
|
||||
lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.prm, self.lbry_file_manager,
|
||||
"test_file", test_file)
|
||||
sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash))
|
||||
self.assertTrue(lbry_file.sd_hash, sd_hash)
|
||||
yield lbry_file.start()
|
||||
f = open('test_file', 'rb')
|
||||
hashsum = md5()
|
||||
hashsum.update(f.read())
|
||||
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
|
474
tests/mocks.py
474
tests/mocks.py
|
@ -1,474 +0,0 @@
|
|||
import asyncio
|
||||
import base64
|
||||
import io
|
||||
from unittest import mock
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.p2p.client.ClientRequest import ClientRequest
|
||||
from lbrynet.p2p.Error import RequestCanceledError
|
||||
from lbrynet.p2p import BlobAvailability
|
||||
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
|
||||
from lbrynet.dht.node import Node as RealNode
|
||||
from lbrynet.extras.daemon import exchange_rate_manager as ERM
|
||||
|
||||
KB = 2**10
|
||||
PUBLIC_EXPONENT = 65537 # http://www.daemonology.net/blog/2009-06-11-cryptographic-right-answers.html
|
||||
|
||||
|
||||
def decode_rsa_key(pem_key):
|
||||
decoded = base64.b64decode(''.join(pem_key.splitlines()[1:-1]))
|
||||
return serialization.load_der_public_key(decoded, default_backend())
|
||||
|
||||
|
||||
class FakeLBRYFile:
|
||||
def __init__(self, blob_manager, stream_info_manager, stream_hash, uri="fake_uri"):
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.stream_hash = stream_hash
|
||||
self.file_name = 'fake_lbry_file'
|
||||
|
||||
|
||||
class Node(RealNode):
|
||||
def joinNetwork(self, known_node_addresses=None):
|
||||
return defer.succeed(None)
|
||||
|
||||
def stop(self):
|
||||
return defer.succeed(None)
|
||||
|
||||
def start(self, known_node_addresses=None):
|
||||
return self.joinNetwork(known_node_addresses)
|
||||
|
||||
|
||||
class FakeNetwork:
|
||||
@staticmethod
|
||||
def get_local_height():
|
||||
return 1
|
||||
|
||||
@staticmethod
|
||||
def get_server_height():
|
||||
return 1
|
||||
|
||||
|
||||
class BTCLBCFeed(ERM.MarketFeed):
|
||||
def __init__(self):
|
||||
ERM.MarketFeed.__init__(
|
||||
self,
|
||||
"BTCLBC",
|
||||
"market name",
|
||||
"derp.com",
|
||||
None,
|
||||
0.0
|
||||
)
|
||||
|
||||
|
||||
class USDBTCFeed(ERM.MarketFeed):
|
||||
def __init__(self):
|
||||
ERM.MarketFeed.__init__(
|
||||
self,
|
||||
"USDBTC",
|
||||
"market name",
|
||||
"derp.com",
|
||||
None,
|
||||
0.0
|
||||
)
|
||||
|
||||
|
||||
class ExchangeRateManager(ERM.ExchangeRateManager):
|
||||
def __init__(self, market_feeds, rates):
|
||||
self.market_feeds = market_feeds
|
||||
for feed in self.market_feeds:
|
||||
feed.rate = ERM.ExchangeRate(
|
||||
feed.market, rates[feed.market]['spot'], rates[feed.market]['ts'])
|
||||
|
||||
|
||||
class PointTraderKeyExchanger:
|
||||
|
||||
def __init__(self, wallet):
|
||||
self.wallet = wallet
|
||||
self._protocols = []
|
||||
|
||||
def send_next_request(self, peer, protocol):
|
||||
if not protocol in self._protocols:
|
||||
r = ClientRequest({'public_key': self.wallet.encoded_public_key.decode()},
|
||||
'public_key')
|
||||
d = protocol.add_request(r)
|
||||
d.addCallback(self._handle_exchange_response, peer, r, protocol)
|
||||
d.addErrback(self._request_failed, peer)
|
||||
self._protocols.append(protocol)
|
||||
return defer.succeed(True)
|
||||
else:
|
||||
return defer.succeed(False)
|
||||
|
||||
def _handle_exchange_response(self, response_dict, peer, request, protocol):
|
||||
assert request.response_identifier in response_dict, \
|
||||
"Expected %s in dict but did not get it" % request.response_identifier
|
||||
assert protocol in self._protocols, "Responding protocol is not in our list of protocols"
|
||||
peer_pub_key = response_dict[request.response_identifier]
|
||||
self.wallet.set_public_key_for_peer(peer, peer_pub_key)
|
||||
return True
|
||||
|
||||
def _request_failed(self, err, peer):
|
||||
if not err.check(RequestCanceledError):
|
||||
return err
|
||||
|
||||
|
||||
class PointTraderKeyQueryHandlerFactory:
|
||||
|
||||
def __init__(self, wallet):
|
||||
self.wallet = wallet
|
||||
|
||||
def build_query_handler(self):
|
||||
q_h = PointTraderKeyQueryHandler(self.wallet)
|
||||
return q_h
|
||||
|
||||
def get_primary_query_identifier(self):
|
||||
return 'public_key'
|
||||
|
||||
def get_description(self):
|
||||
return ("Point Trader Address - an address for receiving payments on the "
|
||||
"point trader testing network")
|
||||
|
||||
|
||||
class PointTraderKeyQueryHandler:
|
||||
|
||||
def __init__(self, wallet):
|
||||
self.wallet = wallet
|
||||
self.query_identifiers = ['public_key']
|
||||
self.public_key = None
|
||||
self.peer = None
|
||||
|
||||
def register_with_request_handler(self, request_handler, peer):
|
||||
self.peer = peer
|
||||
request_handler.register_query_handler(self, self.query_identifiers)
|
||||
|
||||
def handle_queries(self, queries):
|
||||
if self.query_identifiers[0] in queries:
|
||||
new_encoded_pub_key = queries[self.query_identifiers[0]]
|
||||
try:
|
||||
decode_rsa_key(new_encoded_pub_key)
|
||||
except (ValueError, TypeError, IndexError):
|
||||
raise ValueError(f"Client sent an invalid public key: {new_encoded_pub_key}")
|
||||
self.public_key = new_encoded_pub_key
|
||||
self.wallet.set_public_key_for_peer(self.peer, self.public_key)
|
||||
fields = {'public_key': self.wallet.encoded_public_key.decode()}
|
||||
return fields
|
||||
if self.public_key is None:
|
||||
raise ValueError("Expected but did not receive a public key")
|
||||
else:
|
||||
return {}
|
||||
|
||||
|
||||
class Wallet:
|
||||
def __init__(self):
|
||||
self.private_key = rsa.generate_private_key(public_exponent=PUBLIC_EXPONENT,
|
||||
key_size=1024, backend=default_backend())
|
||||
self.encoded_public_key = self.private_key.public_key().public_bytes(serialization.Encoding.PEM,
|
||||
serialization.PublicFormat.PKCS1)
|
||||
self._config = None
|
||||
self.network = None
|
||||
self.wallet = None
|
||||
self.is_first_run = False
|
||||
self.printed_retrieving_headers = False
|
||||
self._start_check = None
|
||||
self._catch_up_check = None
|
||||
self._caught_up_counter = 0
|
||||
self._lag_counter = 0
|
||||
self.blocks_behind = 0
|
||||
self.catchup_progress = 0
|
||||
self.max_behind = 0
|
||||
|
||||
def start(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
def stop(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_info_exchanger(self):
|
||||
return PointTraderKeyExchanger(self)
|
||||
|
||||
def update_peer_address(self, peer, address):
|
||||
pass
|
||||
|
||||
def get_wallet_info_query_handler_factory(self):
|
||||
return PointTraderKeyQueryHandlerFactory(self)
|
||||
|
||||
def get_unused_address_for_peer(self, peer):
|
||||
return defer.succeed("bDtL6qriyimxz71DSYjojTBsm6cpM1bqmj")
|
||||
|
||||
def reserve_points(self, *args):
|
||||
return True
|
||||
|
||||
def cancel_point_reservation(self, *args):
|
||||
pass
|
||||
|
||||
def send_points(self, *args):
|
||||
return defer.succeed(True)
|
||||
|
||||
def add_expected_payment(self, *args):
|
||||
pass
|
||||
|
||||
def get_balance(self):
|
||||
return defer.succeed(1000)
|
||||
|
||||
def set_public_key_for_peer(self, peer, public_key):
|
||||
pass
|
||||
|
||||
def get_claim_metadata_for_sd_hash(self, sd_hash):
|
||||
return "fakeuri", "aa04a949348f9f094d503e5816f0cfb57ee68a22f6d08d149217d071243e0377", 1
|
||||
|
||||
def get_claimid(self, name, txid=None, nout=None):
|
||||
return "aa04a949348f9f094d503e5816f0cfb57ee68a22f6d08d149217d071243e0378"
|
||||
|
||||
|
||||
class PeerFinder:
|
||||
def __init__(self, start_port, peer_manager, num_peers):
|
||||
self.start_port = start_port
|
||||
self.peer_manager = peer_manager
|
||||
self.num_peers = num_peers
|
||||
self.count = 0
|
||||
|
||||
def find_peers_for_blob(self, h, filter_self=False):
|
||||
peer_port = self.start_port + self.count
|
||||
self.count += 1
|
||||
if self.count >= self.num_peers:
|
||||
self.count = 0
|
||||
return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)])
|
||||
|
||||
def run_manage_loop(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
class Announcer:
|
||||
def __init__(self, *args):
|
||||
pass
|
||||
|
||||
def hash_queue_size(self):
|
||||
return 0
|
||||
|
||||
def immediate_announce(self, *args):
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
class GenFile(io.RawIOBase):
|
||||
def __init__(self, size, pattern):
|
||||
io.RawIOBase.__init__(self)
|
||||
self.size = size
|
||||
self.pattern = pattern
|
||||
self.read_so_far = 0
|
||||
self.buff = b''
|
||||
self.last_offset = 0
|
||||
self.name = "."
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
def writable(self):
|
||||
return False
|
||||
|
||||
def read(self, n=-1):
|
||||
if n > -1:
|
||||
bytes_to_read = min(n, self.size - self.read_so_far)
|
||||
else:
|
||||
bytes_to_read = self.size - self.read_so_far
|
||||
output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
|
||||
bytes_to_read -= len(output)
|
||||
while bytes_to_read > 0:
|
||||
self.buff = self._generate_chunk()
|
||||
new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
|
||||
bytes_to_read -= len(new_output)
|
||||
output += new_output
|
||||
self.read_so_far += len(output)
|
||||
return output
|
||||
|
||||
def readall(self):
|
||||
return self.read()
|
||||
|
||||
def _generate_chunk(self, size=KB):
|
||||
output = self.pattern[self.last_offset:self.last_offset + size]
|
||||
n_left = size - len(output)
|
||||
whole_patterns = n_left // len(self.pattern)
|
||||
output += self.pattern * whole_patterns
|
||||
self.last_offset = size - len(output)
|
||||
output += self.pattern[:self.last_offset]
|
||||
return output
|
||||
|
||||
|
||||
class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker):
|
||||
"""
|
||||
Class to track peer counts for known blobs, and to discover new popular blobs
|
||||
|
||||
Attributes:
|
||||
availability (dict): dictionary of peers for known blobs
|
||||
"""
|
||||
|
||||
def __init__(self, blob_manager=None, peer_finder=None, dht_node=None):
|
||||
self.availability = {
|
||||
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb'
|
||||
'4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'],
|
||||
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5'
|
||||
'09138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'],
|
||||
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9'
|
||||
'667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
'6d8017aba362e5c5d0046625a039513419810a0397d72831'
|
||||
'8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f'
|
||||
'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe'
|
||||
'66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c'
|
||||
'243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
'8c70d5e2f5c3a6085006198e5192d157a125d92e73787944'
|
||||
'72007a61947992768926513fc10924785bdb1761df3c37e6':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4',
|
||||
'1.2.3.4'],
|
||||
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0'
|
||||
'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4',
|
||||
'1.2.3.4', '1.2.3.4'],
|
||||
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835'
|
||||
'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c':
|
||||
['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4',
|
||||
'1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
}
|
||||
self._blob_manager = None
|
||||
self._peer_finder = PeerFinder(11223, 11224, 2)
|
||||
self._dht_node = None
|
||||
self._check_popular = None
|
||||
self._check_mine = None
|
||||
self._set_mean_peers()
|
||||
|
||||
def start(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
# The components below viz. FakeWallet, FakeSession, FakeFileManager are just for testing Component Manager's
|
||||
# startup and stop
|
||||
class FakeComponent:
|
||||
depends_on = []
|
||||
component_name = None
|
||||
|
||||
def __init__(self, component_manager):
|
||||
self.component_manager = component_manager
|
||||
self._running = False
|
||||
|
||||
@property
|
||||
def running(self):
|
||||
return self._running
|
||||
|
||||
async def start(self):
|
||||
pass
|
||||
|
||||
async def stop(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return self
|
||||
|
||||
async def _setup(self):
|
||||
result = await self.start()
|
||||
self._running = True
|
||||
return result
|
||||
|
||||
async def _stop(self):
|
||||
result = await self.stop()
|
||||
self._running = False
|
||||
return result
|
||||
|
||||
async def get_status(self):
|
||||
return {}
|
||||
|
||||
def __lt__(self, other):
|
||||
return self.component_name < other.component_name
|
||||
|
||||
|
||||
class FakeDelayedWallet(FakeComponent):
|
||||
component_name = "wallet"
|
||||
depends_on = []
|
||||
|
||||
async def stop(self):
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
class FakeDelayedBlobManager(FakeComponent):
|
||||
component_name = "blob_manager"
|
||||
depends_on = [FakeDelayedWallet.component_name]
|
||||
|
||||
async def start(self):
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def stop(self):
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
class FakeDelayedFileManager(FakeComponent):
|
||||
component_name = "file_manager"
|
||||
depends_on = [FakeDelayedBlobManager.component_name]
|
||||
|
||||
async def start(self):
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
class FakeFileManager(FakeComponent):
|
||||
component_name = "file_manager"
|
||||
depends_on = []
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return mock.Mock(spec=EncryptedFileManager)
|
||||
|
||||
|
||||
create_stream_sd_file = {
|
||||
'stream_name': '746573745f66696c65',
|
||||
'blobs': [
|
||||
{
|
||||
'length': 2097152,
|
||||
'blob_num': 0,
|
||||
'blob_hash': 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b'
|
||||
'441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
|
||||
'iv': '30303030303030303030303030303031'
|
||||
},
|
||||
{
|
||||
'length': 2097152,
|
||||
'blob_num': 1,
|
||||
'blob_hash': 'f4067522c1b49432a2a679512e3917144317caa1abba0c04'
|
||||
'1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
|
||||
'iv': '30303030303030303030303030303032'
|
||||
},
|
||||
{
|
||||
'length': 1015056,
|
||||
'blob_num': 2,
|
||||
'blob_hash': '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1'
|
||||
'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
|
||||
'iv': '30303030303030303030303030303033'
|
||||
},
|
||||
{'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'}
|
||||
],
|
||||
'stream_type': 'lbryfile',
|
||||
'key': '30313233343536373031323334353637',
|
||||
'suggested_file_name': '746573745f66696c65',
|
||||
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d'
|
||||
'315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'
|
||||
}
|
|
@ -1,269 +0,0 @@
|
|||
from twisted.trial.unittest import TestCase
|
||||
from twisted.internet import defer, reactor, task
|
||||
from twisted.internet.task import deferLater
|
||||
from twisted.internet.protocol import ServerFactory
|
||||
|
||||
from lbrynet import conf, utils
|
||||
from lbrynet.p2p.client.ClientRequest import ClientRequest
|
||||
from lbrynet.p2p.server.ServerProtocol import ServerProtocol
|
||||
from lbrynet.p2p.client.ClientProtocol import ClientProtocol
|
||||
from lbrynet.p2p.RateLimiter import RateLimiter
|
||||
from lbrynet.p2p.Peer import Peer
|
||||
from lbrynet.p2p.Error import NoResponseError
|
||||
from lbrynet.extras.daemon.PeerManager import PeerManager
|
||||
from lbrynet.conf import Config
|
||||
|
||||
PEER_PORT = 5551
|
||||
LOCAL_HOST = '127.0.0.1'
|
||||
|
||||
|
||||
class MocDownloader:
|
||||
def insufficient_funds(self):
|
||||
pass
|
||||
|
||||
|
||||
class MocRequestCreator:
|
||||
|
||||
def __init__(self, peers_to_return, peers_to_return_head_blob=None):
|
||||
self.peers_to_return = peers_to_return
|
||||
self.peers_to_return_head_blob = peers_to_return_head_blob or []
|
||||
self.sent_request = False
|
||||
|
||||
def send_next_request(self, peer, protocol):
|
||||
if self.sent_request is True:
|
||||
return defer.succeed(False)
|
||||
response_identifier = 'moc_request'
|
||||
r_dict = {'moc_request':0}
|
||||
request = ClientRequest(r_dict, response_identifier)
|
||||
d = protocol.add_request(request) # ClientRequest here
|
||||
d.addErrback(self.request_err, peer)
|
||||
d.addCallback(self.request_success)
|
||||
self.sent_request = True
|
||||
return defer.succeed(True)
|
||||
|
||||
def request_success(self, suc):
|
||||
pass
|
||||
|
||||
def request_err(self, err, peer):
|
||||
if isinstance(err.value, NoResponseError):
|
||||
return err
|
||||
|
||||
def get_new_peers_for_next_unavailable(self):
|
||||
return self.peers_to_return
|
||||
|
||||
def get_new_peers_for_head_blob(self):
|
||||
return self.peers_to_return_head_blob
|
||||
|
||||
|
||||
class MocFunctionalQueryHandler:
|
||||
|
||||
def __init__(self, clock, is_good=True, is_delayed=False):
|
||||
self.query_identifiers = ['moc_request']
|
||||
self.is_good = is_good
|
||||
self.is_delayed = is_delayed
|
||||
self.clock = clock
|
||||
|
||||
def register_with_request_handler(self, request_handler, peer):
|
||||
request_handler.register_query_handler(self, self.query_identifiers)
|
||||
|
||||
def handle_queries(self, queries):
|
||||
if self.query_identifiers[0] in queries:
|
||||
if self.is_delayed:
|
||||
delay = ClientProtocol.PROTOCOL_TIMEOUT+1
|
||||
out = deferLater(self.clock, delay, lambda: {'moc_request':0})
|
||||
self.clock.advance(delay)
|
||||
return out
|
||||
if self.is_good:
|
||||
return defer.succeed({'moc_request':0})
|
||||
else:
|
||||
return defer.succeed({'bad_request':0})
|
||||
else:
|
||||
return defer.succeed({})
|
||||
|
||||
|
||||
class MocQueryHandlerFactory:
|
||||
# is is_good, the query handler works as expectd,
|
||||
# is is_delayed, the query handler will delay its resposne
|
||||
def __init__(self, clock, is_good=True, is_delayed=False):
|
||||
self.is_good = is_good
|
||||
self.is_delayed = is_delayed
|
||||
self.clock = clock
|
||||
|
||||
def build_query_handler(self):
|
||||
return MocFunctionalQueryHandler(self.clock, self.is_good, self.is_delayed)
|
||||
|
||||
def get_primary_query_identifier(self):
|
||||
return 'moc_query'
|
||||
|
||||
def get_description(self):
|
||||
return "This is a Moc Query"
|
||||
|
||||
|
||||
class MocServerProtocolFactory(ServerFactory):
|
||||
protocol = ServerProtocol
|
||||
|
||||
def __init__(self, clock, is_good=True, is_delayed=False, has_moc_query_handler=True):
|
||||
self.rate_limiter = RateLimiter()
|
||||
query_handler_factory = MocQueryHandlerFactory(clock, is_good, is_delayed)
|
||||
if has_moc_query_handler:
|
||||
self.query_handler_factories = {
|
||||
query_handler_factory.get_primary_query_identifier():query_handler_factory
|
||||
}
|
||||
else:
|
||||
self.query_handler_factories = {}
|
||||
self.peer_manager = PeerManager()
|
||||
|
||||
|
||||
class TestIntegrationConnectionManager(TestCase):
|
||||
skip = 'times out, needs to be refactored to work with py3'
|
||||
|
||||
def setUp(self):
|
||||
|
||||
conf = Config()
|
||||
|
||||
self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT)
|
||||
self.downloader = MocDownloader()
|
||||
self.downloader.conf = conf
|
||||
self.rate_limiter = RateLimiter()
|
||||
self.primary_request_creator = MocRequestCreator([self.TEST_PEER])
|
||||
self.clock = task.Clock()
|
||||
utils.call_later = self.clock.callLater
|
||||
self.server_port = None
|
||||
|
||||
def _init_connection_manager(self, seek_head_blob_first=False):
|
||||
# this import is required here so utils.call_later is replaced by self.clock.callLater
|
||||
from lbrynet.p2p.client.ConnectionManager import ConnectionManager
|
||||
self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter,
|
||||
[self.primary_request_creator], [])
|
||||
self.connection_manager.seek_head_blob_first = seek_head_blob_first
|
||||
self.connection_manager._start()
|
||||
|
||||
def tearDown(self):
|
||||
if self.server_port is not None:
|
||||
self.server_port.stopListening()
|
||||
self.connection_manager.stop()
|
||||
conf.settings = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_success(self):
|
||||
self._init_connection_manager()
|
||||
# test to see that if we setup a server, we get a connection
|
||||
self.server = MocServerProtocolFactory(self.clock)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\
|
||||
factory.connection_was_made_deferred
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertTrue(connection_made)
|
||||
self.assertEqual(1, self.TEST_PEER.success_count)
|
||||
self.assertEqual(0, self.TEST_PEER.down_count)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_server_with_improper_reply(self):
|
||||
self._init_connection_manager()
|
||||
self.server = MocServerProtocolFactory(self.clock, is_good=False)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\
|
||||
factory.connection_was_made_deferred
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertTrue(connection_made)
|
||||
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_non_existing_server(self):
|
||||
# Test to see that if we don't setup a server, we don't get a connection
|
||||
|
||||
self._init_connection_manager()
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\
|
||||
factory.connection_was_made_deferred
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertFalse(connection_made)
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_parallel_connections(self):
|
||||
# Test to see that we make two new connections at a manage call,
|
||||
# without it waiting for the connection to complete
|
||||
|
||||
self._init_connection_manager()
|
||||
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
|
||||
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(2, self.connection_manager.num_peer_connections())
|
||||
self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections)
|
||||
self.assertIn(test_peer2, self.connection_manager._peer_connections)
|
||||
|
||||
deferred_conn_made_peer1 = self.connection_manager._peer_connections[self.TEST_PEER].\
|
||||
factory.connection_was_made_deferred
|
||||
deferred_conn_made_peer1.addCallback(lambda conn_made: self.assertFalse(conn_made))
|
||||
|
||||
deferred_conn_made_peer2 = self.connection_manager._peer_connections[test_peer2].\
|
||||
factory.connection_was_made_deferred
|
||||
deferred_conn_made_peer2.addCallback(lambda conn_made: self.assertFalse(conn_made))
|
||||
|
||||
yield deferred_conn_made_peer1
|
||||
yield deferred_conn_made_peer2
|
||||
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||
self.assertEqual(0, test_peer2.success_count)
|
||||
self.assertEqual(1, test_peer2.down_count)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_stop(self):
|
||||
# test to see that when we call stop, the ConnectionManager waits for the
|
||||
# current manage call to finish, closes connections,
|
||||
# and removes scheduled manage calls
|
||||
self._init_connection_manager()
|
||||
self.connection_manager.manage(schedule_next_call=True)
|
||||
yield self.connection_manager.stop()
|
||||
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertIsNone(self.connection_manager._next_manage_call)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_closed_connection_when_server_is_slow(self):
|
||||
self._init_connection_manager()
|
||||
self.server = MocServerProtocolFactory(
|
||||
self.clock, has_moc_query_handler=True, is_delayed=True)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\
|
||||
factory.connection_was_made_deferred
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertTrue(connection_made)
|
||||
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||
|
||||
# test header first seeks
|
||||
@defer.inlineCallbacks
|
||||
def test_no_peer_for_head_blob(self):
|
||||
# test that if we can't find blobs for the head blob,
|
||||
# it looks at the next unavailable and makes connection
|
||||
self._init_connection_manager(seek_head_blob_first=True)
|
||||
self.server = MocServerProtocolFactory(self.clock)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
|
||||
self.primary_request_creator.peers_to_return_head_blob = []
|
||||
self.primary_request_creator.peers_to_return = [self.TEST_PEER]
|
||||
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].\
|
||||
factory.connection_was_made_deferred
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertTrue(connection_made)
|
||||
self.assertEqual(1, self.TEST_PEER.success_count)
|
||||
self.assertEqual(0, self.TEST_PEER.down_count)
|
|
@ -1,126 +0,0 @@
|
|||
from io import BytesIO
|
||||
from unittest import mock
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.test import proto_helpers
|
||||
from twisted.trial import unittest
|
||||
|
||||
from lbrynet.p2p import Peer
|
||||
from lbrynet.p2p.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
|
||||
from lbrynet.conf import Config
|
||||
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
|
||||
|
||||
|
||||
class TestBlobRequestHandlerQueries(unittest.TestCase):
|
||||
def setUp(self):
|
||||
conf = Config()
|
||||
self.blob_manager = mock.Mock()
|
||||
self.payment_rate_manager = NegotiatedPaymentRateManager(
|
||||
BasePaymentRateManager(0.001, conf.min_info_rate), DummyBlobAvailabilityTracker(), conf.is_generous_host
|
||||
)
|
||||
from lbrynet.p2p.server import BlobRequestHandler
|
||||
self.handler = BlobRequestHandler.BlobRequestHandler(
|
||||
self.blob_manager, None, self.payment_rate_manager, None)
|
||||
|
||||
def test_empty_response_when_empty_query(self):
|
||||
self.assertEqual({}, self.handler.handle_queries({}))
|
||||
|
||||
def test_error_set_when_rate_is_missing(self):
|
||||
query = {'requested_blob': 'blob'}
|
||||
response = {'incoming_blob': {'error': 'RATE_UNSET'}}
|
||||
self.assertEqual(response, self.handler.handle_queries(query))
|
||||
|
||||
def test_error_set_when_rate_too_low(self):
|
||||
query = {
|
||||
'blob_data_payment_rate': -1.0,
|
||||
'requested_blob': 'blob'
|
||||
}
|
||||
response = {
|
||||
'blob_data_payment_rate': 'RATE_TOO_LOW',
|
||||
'incoming_blob': {'error': 'RATE_UNSET'}
|
||||
}
|
||||
self.assertEqual(response, self.handler.handle_queries(query))
|
||||
|
||||
def test_response_when_rate_too_low(self):
|
||||
query = {
|
||||
'blob_data_payment_rate': -1.0,
|
||||
}
|
||||
response = {
|
||||
'blob_data_payment_rate': 'RATE_TOO_LOW',
|
||||
}
|
||||
self.assertEqual(response, self.handler.handle_queries(query))
|
||||
|
||||
def test_blob_unavailable_when_blob_not_validated(self):
|
||||
blob = mock.Mock()
|
||||
blob.get_is_verified.return_value = False
|
||||
self.blob_manager.get_blob.return_value = defer.succeed(blob)
|
||||
query = {
|
||||
'blob_data_payment_rate': 1.0,
|
||||
'requested_blob': 'blob'
|
||||
}
|
||||
response = {
|
||||
'blob_data_payment_rate': 'RATE_ACCEPTED',
|
||||
'incoming_blob': {'error': 'BLOB_UNAVAILABLE'}
|
||||
}
|
||||
self.assertEqual(response, self.handler.handle_queries(query))
|
||||
|
||||
def test_blob_unavailable_when_blob_cannot_be_opened(self):
|
||||
blob = mock.Mock()
|
||||
blob.get_is_verified.return_value = True
|
||||
blob.open_for_reading.return_value = None
|
||||
self.blob_manager.get_blob.return_value = defer.succeed(blob)
|
||||
query = {
|
||||
'blob_data_payment_rate': 0.0,
|
||||
'requested_blob': 'blob'
|
||||
}
|
||||
response = {
|
||||
'blob_data_payment_rate': 'RATE_ACCEPTED',
|
||||
'incoming_blob': {'error': 'BLOB_UNAVAILABLE'}
|
||||
}
|
||||
self.assertEqual(response, self.handler.handle_queries(query))
|
||||
|
||||
def test_blob_details_are_set_when_all_conditions_are_met(self):
|
||||
blob = mock.Mock()
|
||||
blob.get_is_verified.return_value = True
|
||||
blob.open_for_reading.return_value = True
|
||||
blob.blob_hash = 'DEADBEEF'
|
||||
blob.length = 42
|
||||
peer = mock.Mock()
|
||||
peer.host = "1.2.3.4"
|
||||
self.handler.peer = peer
|
||||
self.blob_manager.get_blob.return_value = defer.succeed(blob)
|
||||
query = {
|
||||
'blob_data_payment_rate': 1.0,
|
||||
'requested_blob': 'blob'
|
||||
}
|
||||
response = {
|
||||
'blob_data_payment_rate': 'RATE_ACCEPTED',
|
||||
'incoming_blob': {
|
||||
'blob_hash': 'DEADBEEF',
|
||||
'length': 42
|
||||
}
|
||||
}
|
||||
self.assertEqual(response, self.handler.handle_queries(query))
|
||||
|
||||
|
||||
class TestBlobRequestHandlerSender(unittest.TestCase):
|
||||
def test_nothing_happens_if_not_currently_uploading(self):
|
||||
from lbrynet.p2p.server import BlobRequestHandler
|
||||
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
|
||||
handler.currently_uploading = None
|
||||
deferred = handler.send_blob_if_requested(None)
|
||||
self.assertTrue(self.successResultOf(deferred))
|
||||
|
||||
def test_file_is_sent_to_consumer(self):
|
||||
# TODO: also check that the expected payment values are set
|
||||
consumer = proto_helpers.StringTransport()
|
||||
test_file = BytesIO(b'test')
|
||||
from lbrynet.p2p.server import BlobRequestHandler
|
||||
handler = BlobRequestHandler.BlobRequestHandler(None, None, None, None)
|
||||
handler.peer = mock.create_autospec(Peer.Peer)
|
||||
handler.currently_uploading = mock.Mock()
|
||||
handler.read_handle = test_file
|
||||
handler.send_blob_if_requested(consumer)
|
||||
while consumer.producer:
|
||||
consumer.producer.resumeProducing()
|
||||
self.assertEqual(consumer.value(), b'test')
|
|
@ -1,136 +0,0 @@
|
|||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
|
||||
from tests.test_utils import random_lbry_hash
|
||||
from lbrynet.p2p.BlobManager import DiskBlobManager
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.p2p.Peer import Peer
|
||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
from lbrynet.conf import Config
|
||||
|
||||
|
||||
class BlobManagerTest(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
self.blob_dir = tempfile.mkdtemp()
|
||||
self.db_dir = tempfile.mkdtemp()
|
||||
self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(Config(data_dir=self.blob_dir), ':memory:'))
|
||||
self.peer = Peer('somehost', 22)
|
||||
yield f2d(self.bm.storage.open())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
yield f2d(self.bm.storage.close())
|
||||
shutil.rmtree(self.blob_dir)
|
||||
shutil.rmtree(self.db_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _create_and_add_blob(self, should_announce=False):
|
||||
# create and add blob to blob manager
|
||||
data_len = random.randint(1, 1000)
|
||||
data = b''.join(random.choice(string.ascii_lowercase).encode() for _ in range(data_len))
|
||||
|
||||
hashobj = get_lbry_hash_obj()
|
||||
hashobj.update(data)
|
||||
out = hashobj.hexdigest()
|
||||
blob_hash = out
|
||||
|
||||
# create new blob
|
||||
yield f2d(self.bm.setup())
|
||||
blob = self.bm.get_blob(blob_hash, len(data))
|
||||
|
||||
writer, finished_d = blob.open_for_writing(self.peer)
|
||||
yield writer.write(data)
|
||||
yield self.bm.blob_completed(blob, should_announce)
|
||||
|
||||
# check to see if blob is there
|
||||
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hash)))
|
||||
blobs = yield self.bm.get_all_verified_blobs()
|
||||
self.assertIn(blob_hash, blobs)
|
||||
defer.returnValue(blob_hash)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_create_blob(self):
|
||||
blob_hashes = []
|
||||
|
||||
# create a bunch of blobs
|
||||
for i in range(0, 10):
|
||||
blob_hash = yield self._create_and_add_blob()
|
||||
blob_hashes.append(blob_hash)
|
||||
blobs = yield self.bm.get_all_verified_blobs()
|
||||
self.assertEqual(10, len(blobs))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_delete_blob(self):
|
||||
# create blob
|
||||
blob_hash = yield self._create_and_add_blob()
|
||||
blobs = yield self.bm.get_all_verified_blobs()
|
||||
self.assertEqual(len(blobs), 1)
|
||||
|
||||
# delete blob
|
||||
yield self.bm.delete_blobs([blob_hash])
|
||||
self.assertFalse(os.path.isfile(os.path.join(self.blob_dir, blob_hash)))
|
||||
blobs = yield self.bm.get_all_verified_blobs()
|
||||
self.assertEqual(len(blobs), 0)
|
||||
blobs = yield f2d(self.bm.storage.get_all_blob_hashes())
|
||||
self.assertEqual(len(blobs), 0)
|
||||
self.assertNotIn(blob_hash, self.bm.blobs)
|
||||
|
||||
# delete blob that was already deleted once
|
||||
yield self.bm.delete_blobs([blob_hash])
|
||||
|
||||
# delete blob that does not exist, nothing will
|
||||
# happen
|
||||
blob_hash = random_lbry_hash()
|
||||
yield self.bm.delete_blobs([blob_hash])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_delete_open_blob(self):
|
||||
# Test that a blob that is opened for writing will not be deleted
|
||||
|
||||
# create blobs
|
||||
blob_hash = None
|
||||
blob_hashes = []
|
||||
for i in range(0, 10):
|
||||
blob_hash = yield self._create_and_add_blob()
|
||||
blob_hashes.append(blob_hash)
|
||||
blobs = yield self.bm.get_all_verified_blobs()
|
||||
self.assertEqual(len(blobs), 10)
|
||||
|
||||
# open the last blob
|
||||
blob = self.bm.get_blob(blob_hashes[-1])
|
||||
w, finished_d = blob.open_for_writing(self.peer)
|
||||
|
||||
# schedule a close, just to leave the reactor clean
|
||||
finished_d.addBoth(lambda x: None)
|
||||
self.addCleanup(w.close)
|
||||
|
||||
# delete the last blob and check if it still exists
|
||||
yield self.bm.delete_blobs([blob_hash])
|
||||
blobs = yield self.bm.get_all_verified_blobs()
|
||||
self.assertEqual(len(blobs), 10)
|
||||
self.assertIn(blob_hashes[-1], blobs)
|
||||
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir, blob_hashes[-1])))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_should_announce(self):
|
||||
# create blob with should announce
|
||||
blob_hash = yield self._create_and_add_blob(should_announce=True)
|
||||
out = yield self.bm.get_should_announce(blob_hash)
|
||||
self.assertTrue(out)
|
||||
count = yield self.bm.count_should_announce_blobs()
|
||||
self.assertEqual(1, count)
|
||||
|
||||
# set should announce to False
|
||||
yield self.bm.set_should_announce(blob_hash, should_announce=False)
|
||||
out = yield self.bm.get_should_announce(blob_hash)
|
||||
self.assertFalse(out)
|
||||
count = yield self.bm.count_should_announce_blobs()
|
||||
self.assertEqual(0, count)
|
|
@ -1,97 +0,0 @@
|
|||
from unittest.mock import MagicMock
|
||||
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader
|
||||
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir
|
||||
|
||||
|
||||
class HTTPBlobDownloaderTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
|
||||
self.blob_manager = MagicMock()
|
||||
self.client = MagicMock()
|
||||
self.blob_hash = ('d17272b17a1ad61c4316ac13a651c2b0952063214a81333e'
|
||||
'838364b01b2f07edbd165bb7ec60d2fb2f337a2c02923852')
|
||||
self.blob = BlobFile(self.blob_dir, self.blob_hash)
|
||||
self.blob_manager.get_blob.side_effect = lambda _: self.blob
|
||||
self.response = MagicMock(code=200, length=400)
|
||||
self.client.get.side_effect = lambda uri: defer.succeed(self.response)
|
||||
self.downloader = HTTPBlobDownloader(
|
||||
self.blob_manager, [self.blob_hash], [('server1', 80)], self.client, retry=False
|
||||
)
|
||||
self.downloader.interval = 0
|
||||
|
||||
def tearDown(self):
|
||||
self.downloader.stop()
|
||||
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_download_successful(self):
|
||||
self.client.collect.side_effect = collect
|
||||
yield self.downloader.start()
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}')
|
||||
self.client.collect.assert_called()
|
||||
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||
self.assertTrue(self.blob.get_is_verified())
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_download_invalid_content(self):
|
||||
self.client.collect.side_effect = bad_collect
|
||||
yield self.downloader.start()
|
||||
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||
self.assertFalse(self.blob.get_is_verified())
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_peer_finished_first_causing_a_write_on_closed_handle(self):
|
||||
self.client.collect.side_effect = lambda response, write: defer.fail(IOError('I/O operation on closed file'))
|
||||
yield self.downloader.start()
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}')
|
||||
self.client.collect.assert_called()
|
||||
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_download_transfer_failed(self):
|
||||
self.client.collect.side_effect = lambda response, write: defer.fail(Exception())
|
||||
yield self.downloader.start()
|
||||
self.assertEqual(len(self.client.collect.mock_calls), self.downloader.max_failures)
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||
self.assertFalse(self.blob.get_is_verified())
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_blob_not_found(self):
|
||||
self.response.code = 404
|
||||
yield self.downloader.start()
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}')
|
||||
self.client.collect.assert_not_called()
|
||||
self.assertFalse(self.blob.get_is_verified())
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
def test_stop(self):
|
||||
self.client.collect.side_effect = lambda response, write: defer.Deferred()
|
||||
self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop
|
||||
self.downloader.stop()
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.client.get.assert_called_with(f'http://server1:80/{self.blob_hash}')
|
||||
self.client.collect.assert_called()
|
||||
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||
self.assertFalse(self.blob.get_is_verified())
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
|
||||
def collect(response, write):
|
||||
write(b'f' * response.length)
|
||||
|
||||
|
||||
def bad_collect(response, write):
|
||||
write('0' * response.length)
|
|
@ -1,162 +0,0 @@
|
|||
from lbrynet.blob.blob_file import BlobFile
|
||||
from lbrynet.p2p.Error import DownloadCanceledError, InvalidDataError
|
||||
|
||||
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lbry_hash
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
class BlobFileTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
|
||||
self.fake_content_len = 64
|
||||
self.fake_content = b'0'*self.fake_content_len
|
||||
self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca560' \
|
||||
'2b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105'
|
||||
|
||||
def tearDown(self):
|
||||
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_good_write_and_read(self):
|
||||
# test a write that should succeed
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
self.assertFalse(blob_file.verified)
|
||||
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(self.fake_content)
|
||||
writer.close()
|
||||
out = yield finished_d
|
||||
self.assertIsInstance(out, BlobFile)
|
||||
self.assertTrue(out.verified)
|
||||
self.assertEqual(self.fake_content_len, out.get_length())
|
||||
|
||||
# read from the instance used to write to, and verify content
|
||||
f = blob_file.open_for_reading()
|
||||
c = f.read()
|
||||
self.assertEqual(c, self.fake_content)
|
||||
self.assertFalse(out.is_downloading())
|
||||
|
||||
# read from newly declared instance, and verify content
|
||||
del blob_file
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
self.assertTrue(blob_file.verified)
|
||||
f = blob_file.open_for_reading()
|
||||
self.assertEqual(1, blob_file.readers)
|
||||
c = f.read()
|
||||
self.assertEqual(c, self.fake_content)
|
||||
|
||||
# close reader
|
||||
f.close()
|
||||
self.assertEqual(0, blob_file.readers)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_delete(self):
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(self.fake_content)
|
||||
out = yield finished_d
|
||||
out = yield blob_file.delete()
|
||||
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash)
|
||||
self.assertFalse(blob_file.verified)
|
||||
|
||||
def test_delete_fail(self):
|
||||
# deletes should fail if being written to
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
with self.assertRaises(ValueError):
|
||||
blob_file.delete()
|
||||
writer.write(self.fake_content)
|
||||
writer.close()
|
||||
|
||||
# deletes should fail if being read and not closed
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
self.assertTrue(blob_file.verified)
|
||||
r = blob_file.open_for_reading() # must be set to variable otherwise it gets garbage collected
|
||||
with self.assertRaises(ValueError):
|
||||
blob_file.delete()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_too_much_write(self):
|
||||
# writing too much data should result in failure
|
||||
expected_length = 16
|
||||
content = b'0'*32
|
||||
blob_hash = random_lbry_hash()
|
||||
blob_file = BlobFile(self.blob_dir, blob_hash, expected_length)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(content)
|
||||
out = yield self.assertFailure(finished_d, InvalidDataError)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_bad_hash(self):
|
||||
# test a write that should fail because its content's hash
|
||||
# does not equal the blob_hash
|
||||
length = 64
|
||||
content = b'0'*length
|
||||
blob_hash = random_lbry_hash()
|
||||
blob_file = BlobFile(self.blob_dir, blob_hash, length)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(content)
|
||||
yield self.assertFailure(finished_d, InvalidDataError)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_close_on_incomplete_write(self):
|
||||
# write all but 1 byte of data,
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(self.fake_content[:self.fake_content_len-1])
|
||||
writer.close()
|
||||
yield self.assertFailure(finished_d, DownloadCanceledError)
|
||||
|
||||
# writes after close will throw a IOError exception
|
||||
with self.assertRaises(IOError):
|
||||
writer.write(self.fake_content)
|
||||
|
||||
# another call to close will do nothing
|
||||
writer.close()
|
||||
|
||||
# file should not exist, since we did not finish write
|
||||
blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
out = blob_file_2.open_for_reading()
|
||||
self.assertIsNone(out)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_multiple_writers(self):
|
||||
# start first writer and write half way, and then start second writer and write everything
|
||||
blob_hash = self.fake_content_hash
|
||||
blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len)
|
||||
writer_1, finished_d_1 = blob_file.open_for_writing(peer=1)
|
||||
writer_1.write(self.fake_content[:self.fake_content_len//2])
|
||||
|
||||
writer_2, finished_d_2 = blob_file.open_for_writing(peer=2)
|
||||
writer_2.write(self.fake_content)
|
||||
out_2 = yield finished_d_2
|
||||
out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError)
|
||||
|
||||
self.assertIsInstance(out_2, BlobFile)
|
||||
self.assertTrue(out_2.verified)
|
||||
self.assertEqual(self.fake_content_len, out_2.get_length())
|
||||
|
||||
f = blob_file.open_for_reading()
|
||||
c = f.read()
|
||||
self.assertEqual(self.fake_content_len, len(c))
|
||||
self.assertEqual(bytearray(c), self.fake_content)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_multiple_writers_save_at_same_time(self):
|
||||
blob_hash = self.fake_content_hash
|
||||
blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len)
|
||||
writer_1, finished_d_1 = blob_file.open_for_writing(peer=1)
|
||||
writer_2, finished_d_2 = blob_file.open_for_writing(peer=2)
|
||||
|
||||
blob_file.save_verified_blob(writer_1)
|
||||
# second write should fail to save
|
||||
yield self.assertFailure(blob_file.save_verified_blob(writer_2), DownloadCanceledError)
|
||||
|
||||
# schedule a close, just to leave the reactor clean
|
||||
finished_d_1.addBoth(lambda x:None)
|
||||
finished_d_2.addBoth(lambda x:None)
|
||||
self.addCleanup(writer_1.close)
|
||||
self.addCleanup(writer_2.close)
|
|
@ -1,151 +0,0 @@
|
|||
import itertools
|
||||
import random
|
||||
from unittest import mock
|
||||
|
||||
from twisted.trial import unittest
|
||||
|
||||
from lbrynet.p2p.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
|
||||
from lbrynet.p2p.Strategy import BasicAvailabilityWeightedStrategy
|
||||
from lbrynet.p2p.Offer import Offer
|
||||
from lbrynet.conf import Config
|
||||
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
|
||||
|
||||
MAX_NEGOTIATION_TURNS = 10
|
||||
random.seed(12345)
|
||||
|
||||
|
||||
def get_random_sample(list_to_sample):
|
||||
result = list_to_sample[
|
||||
random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))]
|
||||
if not result:
|
||||
return get_random_sample(list_to_sample)
|
||||
return result
|
||||
|
||||
|
||||
def calculate_negotation_turns(client_base, host_base, host_is_generous=True,
|
||||
client_is_generous=True):
|
||||
blobs = [
|
||||
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc5'
|
||||
'09138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
|
||||
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe'
|
||||
'66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
|
||||
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184f'
|
||||
'c3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd',
|
||||
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0'
|
||||
'b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0',
|
||||
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c'
|
||||
'243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c',
|
||||
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb'
|
||||
'4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7',
|
||||
'6d8017aba362e5c5d0046625a039513419810a0397d72831'
|
||||
'8c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07',
|
||||
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9'
|
||||
'667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd',
|
||||
'8c70d5e2f5c3a6085006198e5192d157a125d92e73787944'
|
||||
'72007a61947992768926513fc10924785bdb1761df3c37e6',
|
||||
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835'
|
||||
'eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
|
||||
]
|
||||
|
||||
host = mock.Mock()
|
||||
host.host = "1.2.3.4"
|
||||
client = mock.Mock()
|
||||
client.host = "1.2.3.5"
|
||||
|
||||
conf = Config()
|
||||
|
||||
client_base_prm = BasePaymentRateManager(client_base, conf.min_info_rate)
|
||||
client_prm = NegotiatedPaymentRateManager(client_base_prm,
|
||||
DummyBlobAvailabilityTracker(),
|
||||
client_is_generous)
|
||||
host_base_prm = BasePaymentRateManager(host_base, conf.min_info_rate)
|
||||
host_prm = NegotiatedPaymentRateManager(host_base_prm,
|
||||
DummyBlobAvailabilityTracker(),
|
||||
host_is_generous)
|
||||
blobs_to_query = get_random_sample(blobs)
|
||||
accepted = False
|
||||
turns = 0
|
||||
while not accepted:
|
||||
rate = client_prm.get_rate_blob_data(host, blobs_to_query)
|
||||
offer = Offer(rate)
|
||||
accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer)
|
||||
turns += 1
|
||||
return turns
|
||||
|
||||
|
||||
class AvailabilityWeightedStrategyTests(unittest.TestCase):
|
||||
|
||||
def test_first_offer_is_zero_and_second_is_not_if_offer_not_accepted(self):
|
||||
conf = Config()
|
||||
strategy = BasicAvailabilityWeightedStrategy(
|
||||
DummyBlobAvailabilityTracker(), conf.data_rate, conf.is_generous_host
|
||||
)
|
||||
peer = "1.1.1.1"
|
||||
|
||||
blobs = strategy.price_model.blob_tracker.availability.keys()
|
||||
offer1 = strategy.make_offer(peer, blobs)
|
||||
|
||||
offer2 = strategy.make_offer(peer, blobs)
|
||||
|
||||
self.assertEqual(offer1.rate, 0.0)
|
||||
self.assertNotEqual(offer2.rate, 0.0)
|
||||
|
||||
def test_accept_zero_and_persist_if_accepted(self):
|
||||
conf = Config()
|
||||
host_strategy = BasicAvailabilityWeightedStrategy(
|
||||
DummyBlobAvailabilityTracker(), conf.data_rate, conf.is_generous_host
|
||||
)
|
||||
client_strategy = BasicAvailabilityWeightedStrategy(
|
||||
DummyBlobAvailabilityTracker(), conf.data_rate, conf.is_generous_host
|
||||
)
|
||||
|
||||
client = "1.1.1.1"
|
||||
host = "1.1.1.2"
|
||||
blobs = host_strategy.price_model.blob_tracker.availability.keys()
|
||||
|
||||
offer = client_strategy.make_offer(host, blobs)
|
||||
response1 = host_strategy.respond_to_offer(offer, client, blobs)
|
||||
client_strategy.update_accepted_offers(host, response1)
|
||||
|
||||
offer = client_strategy.make_offer(host, blobs)
|
||||
response2 = host_strategy.respond_to_offer(offer, client, blobs)
|
||||
client_strategy.update_accepted_offers(host, response2)
|
||||
|
||||
self.assertFalse(response1.is_too_low)
|
||||
self.assertTrue(response1.is_accepted)
|
||||
self.assertEqual(response1.rate, 0.0)
|
||||
|
||||
self.assertFalse(response2.is_too_low)
|
||||
self.assertTrue(response2.is_accepted)
|
||||
self.assertEqual(response2.rate, 0.0)
|
||||
|
||||
def test_how_many_turns_before_accept_with_similar_rate_settings(self):
|
||||
base_rates = [0.0001 * n for n in range(1, 10)]
|
||||
for host_base, client_base in itertools.product(base_rates, base_rates):
|
||||
turns = calculate_negotation_turns(host_base,
|
||||
client_base,
|
||||
client_is_generous=False,
|
||||
host_is_generous=False)
|
||||
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
|
||||
|
||||
def test_generous_connects_in_one_turn(self):
|
||||
base_rates = [0.0001 * n for n in range(1, 10)]
|
||||
for host_base, client_base in itertools.product(base_rates, base_rates):
|
||||
turns = calculate_negotation_turns(host_base, client_base)
|
||||
self.assertEqual(1, turns)
|
||||
|
||||
def test_how_many_turns_with_generous_client(self):
|
||||
base_rates = [0.0001 * n for n in range(1, 10)]
|
||||
for host_base, client_base in itertools.product(base_rates, base_rates):
|
||||
turns = calculate_negotation_turns(host_base,
|
||||
client_base,
|
||||
host_is_generous=False)
|
||||
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
|
||||
|
||||
def test_how_many_turns_with_generous_host(self):
|
||||
base_rates = [0.0001 * n for n in range(1, 10)]
|
||||
for host_base, client_base in itertools.product(base_rates, base_rates):
|
||||
turns = calculate_negotation_turns(host_base,
|
||||
client_base,
|
||||
client_is_generous=False)
|
||||
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
|
|
@ -1,241 +0,0 @@
|
|||
# pylint: skip-file
|
||||
import tempfile
|
||||
|
||||
from decimal import Decimal
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from lbrynet.p2p.Error import InsufficientFundsError
|
||||
from lbrynet.schema.claim import ClaimDict
|
||||
|
||||
test_metadata = {
|
||||
'license': 'NASA',
|
||||
'version': '_0_1_0',
|
||||
'description': 'test',
|
||||
'language': 'en',
|
||||
'author': 'test',
|
||||
'title': 'test',
|
||||
'nsfw': False,
|
||||
'thumbnail': 'test'
|
||||
}
|
||||
|
||||
test_claim_dict = {
|
||||
'version': '_0_0_1',
|
||||
'claimType': 'streamType',
|
||||
'stream': {'metadata': test_metadata, 'version': '_0_0_1', 'source':
|
||||
{'source': '8655f713819344980a9a0d67b198344e2c462c90f813e86f'
|
||||
'0c63789ab0868031f25c54d0bb31af6658e997e2041806eb',
|
||||
'sourceType': 'lbry_sd_hash', 'contentType': 'video/mp4', 'version': '_0_0_1'},
|
||||
}}
|
||||
|
||||
|
||||
#class MocLbryumWallet(LBRYumWallet):
|
||||
# def __init__(self, db_dir, max_usable_balance=3):
|
||||
# LBRYumWallet.__init__(self, SQLiteStorage(db_dir), SimpleConfig(
|
||||
# {"lbryum_path": db_dir, "wallet_path": os.path.join(db_dir, "testwallet")}
|
||||
# ))
|
||||
# self.db_dir = db_dir
|
||||
# self.wallet_balance = Decimal(10.0)
|
||||
# self.total_reserved_points = Decimal(0.0)
|
||||
# self.queued_payments = defaultdict(Decimal)
|
||||
# self.network = FakeNetwork()
|
||||
# self._mock_max_usable_balance = max_usable_balance
|
||||
# assert self.config.get_wallet_path() == os.path.join(self.db_dir, "testwallet")
|
||||
#
|
||||
# @defer.inlineCallbacks
|
||||
# def setup(self, password=None, seed=None):
|
||||
# yield self.storage.setup()
|
||||
# seed = seed or "travel nowhere air position hill peace suffer parent beautiful rise " \
|
||||
# "blood power home crumble teach"
|
||||
# storage = lbryum.wallet.WalletStorage(self.config.get_wallet_path())
|
||||
# self.wallet = lbryum.wallet.NewWallet(storage)
|
||||
# self.wallet.add_seed(seed, password)
|
||||
# self.wallet.create_master_keys(password)
|
||||
# self.wallet.create_main_account()
|
||||
#
|
||||
# @defer.inlineCallbacks
|
||||
# def stop(self):
|
||||
# yield self.storage.stop()
|
||||
# yield threads.deferToThread(shutil.rmtree, self.db_dir)
|
||||
#
|
||||
# def get_least_used_address(self, account=None, for_change=False, max_count=100):
|
||||
# return defer.succeed(None)
|
||||
#
|
||||
# def get_name_claims(self):
|
||||
# return threads.deferToThread(lambda: [])
|
||||
#
|
||||
# def _save_name_metadata(self, name, claim_outpoint, sd_hash):
|
||||
# return defer.succeed(True)
|
||||
#
|
||||
# def get_max_usable_balance_for_claim(self, name):
|
||||
# # The amount is returned on the basis of test_point_reservation_and_claim unittest
|
||||
# # Also affects test_successful_send_name_claim
|
||||
# return defer.succeed(self._mock_max_usable_balance)
|
||||
|
||||
|
||||
class WalletTest(unittest.TestCase):
|
||||
skip = "Needs to be ported to the new wallet."
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
user_dir = tempfile.mkdtemp()
|
||||
self.wallet = MocLbryumWallet(user_dir)
|
||||
yield self.wallet.setup()
|
||||
self.assertEqual(self.wallet.get_balance(), Decimal(10))
|
||||
|
||||
def tearDown(self):
|
||||
return self.wallet.stop()
|
||||
|
||||
def test_failed_send_name_claim(self):
|
||||
def not_enough_funds_send_name_claim(self, name, val, amount):
|
||||
claim_out = {'success': False, 'reason': 'Not enough funds'}
|
||||
return claim_out
|
||||
|
||||
self.wallet._send_name_claim = not_enough_funds_send_name_claim
|
||||
d = self.wallet.claim_name('test', 1, test_claim_dict)
|
||||
self.assertFailure(d, Exception)
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_successful_send_name_claim(self):
|
||||
expected_claim_out = {
|
||||
"claim_id": "f43dc06256a69988bdbea09a58c80493ba15dcfa",
|
||||
"fee": "0.00012",
|
||||
"nout": 0,
|
||||
"success": True,
|
||||
"txid": "6f8180002ef4d21f5b09ca7d9648a54d213c666daf8639dc283e2fd47450269e",
|
||||
"value": ClaimDict.load_dict(test_claim_dict).serialized.encode('hex'),
|
||||
"claim_address": "",
|
||||
"channel_claim_id": "",
|
||||
"channel_name": ""
|
||||
}
|
||||
|
||||
def success_send_name_claim(self, name, val, amount, certificate_id=None,
|
||||
claim_address=None, change_address=None):
|
||||
return defer.succeed(expected_claim_out)
|
||||
|
||||
self.wallet._send_name_claim = success_send_name_claim
|
||||
claim_out = yield self.wallet.claim_name('test', 1, test_claim_dict)
|
||||
self.assertNotIn('success', claim_out)
|
||||
self.assertEqual(expected_claim_out['claim_id'], claim_out['claim_id'])
|
||||
self.assertEqual(expected_claim_out['fee'], claim_out['fee'])
|
||||
self.assertEqual(expected_claim_out['nout'], claim_out['nout'])
|
||||
self.assertEqual(expected_claim_out['txid'], claim_out['txid'])
|
||||
self.assertEqual(expected_claim_out['value'], claim_out['value'])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_failed_support(self):
|
||||
# wallet.support_claim will check the balance before calling _support_claim
|
||||
try:
|
||||
yield self.wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 1000)
|
||||
except InsufficientFundsError:
|
||||
pass
|
||||
|
||||
def test_succesful_support(self):
|
||||
expected_support_out = {
|
||||
"fee": "0.000129",
|
||||
"nout": 0,
|
||||
"success": True,
|
||||
"txid": "11030a76521e5f552ca87ad70765d0cc52e6ea4c0dc0063335e6cf2a9a85085f"
|
||||
}
|
||||
|
||||
expected_result = {
|
||||
"fee": 0.000129,
|
||||
"nout": 0,
|
||||
"txid": "11030a76521e5f552ca87ad70765d0cc52e6ea4c0dc0063335e6cf2a9a85085f"
|
||||
}
|
||||
|
||||
def check_out(claim_out):
|
||||
self.assertDictEqual(expected_result, claim_out)
|
||||
|
||||
def success_support_claim(name, val, amount):
|
||||
return defer.succeed(expected_support_out)
|
||||
|
||||
self.wallet._support_claim = success_support_claim
|
||||
d = self.wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 1)
|
||||
d.addCallback(lambda claim_out: check_out(claim_out))
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_failed_abandon(self):
|
||||
try:
|
||||
yield self.wallet.abandon_claim("f43dc06256a69988bdbea09a58c80493ba15dcfa", None, None)
|
||||
raise Exception("test failed")
|
||||
except Exception as err:
|
||||
self.assertSubstring("claim not found", err.message)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_successful_abandon(self):
|
||||
expected_abandon_out = {
|
||||
"fee": "0.000096",
|
||||
"success": True,
|
||||
"txid": "0578c161ad8d36a7580c557d7444f967ea7f988e194c20d0e3c42c3cabf110dd"
|
||||
}
|
||||
|
||||
expected_abandon_result = {
|
||||
"fee": 0.000096,
|
||||
"txid": "0578c161ad8d36a7580c557d7444f967ea7f988e194c20d0e3c42c3cabf110dd"
|
||||
}
|
||||
|
||||
def success_abandon_claim(claim_outpoint, txid, nout):
|
||||
return defer.succeed(expected_abandon_out)
|
||||
|
||||
self.wallet._abandon_claim = success_abandon_claim
|
||||
claim_out = yield self.wallet.abandon_claim("f43dc06256a69988bdbea09a58c80493ba15dcfa", None, None)
|
||||
self.assertDictEqual(expected_abandon_result, claim_out)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_point_reservation_and_balance(self):
|
||||
# check that point reservations and cancellation changes the balance
|
||||
# properly
|
||||
def update_balance():
|
||||
return defer.succeed(5)
|
||||
|
||||
self.wallet._update_balance = update_balance
|
||||
yield self.wallet.update_balance()
|
||||
self.assertEqual(5, self.wallet.get_balance())
|
||||
|
||||
# test point reservation
|
||||
yield self.wallet.reserve_points('testid', 2)
|
||||
self.assertEqual(3, self.wallet.get_balance())
|
||||
self.assertEqual(2, self.wallet.total_reserved_points)
|
||||
|
||||
# test reserved points cancellation
|
||||
yield self.wallet.cancel_point_reservation(ReservedPoints('testid', 2))
|
||||
self.assertEqual(5, self.wallet.get_balance())
|
||||
self.assertEqual(0, self.wallet.total_reserved_points)
|
||||
|
||||
# test point sending
|
||||
reserve_points = yield self.wallet.reserve_points('testid', 2)
|
||||
yield self.wallet.send_points_to_address(reserve_points, 1)
|
||||
self.assertEqual(3, self.wallet.get_balance())
|
||||
# test failed point reservation
|
||||
out = yield self.wallet.reserve_points('testid', 4)
|
||||
self.assertIsNone(out)
|
||||
|
||||
def test_point_reservation_and_claim(self):
|
||||
# check that claims take into consideration point reservations
|
||||
def update_balance():
|
||||
return defer.succeed(5)
|
||||
|
||||
self.wallet._update_balance = update_balance
|
||||
d = self.wallet.update_balance()
|
||||
d.addCallback(lambda _: self.assertEqual(5, self.wallet.get_balance()))
|
||||
d.addCallback(lambda _: self.wallet.reserve_points('testid', 2))
|
||||
d.addCallback(lambda _: self.wallet.claim_name('test', 4, test_claim_dict))
|
||||
self.assertFailure(d, InsufficientFundsError)
|
||||
return d
|
||||
|
||||
def test_point_reservation_and_support(self):
|
||||
# check that supports take into consideration point reservations
|
||||
def update_balance():
|
||||
return defer.succeed(5)
|
||||
|
||||
self.wallet._update_balance = update_balance
|
||||
d = self.wallet.update_balance()
|
||||
d.addCallback(lambda _: self.assertEqual(5, self.wallet.get_balance()))
|
||||
d.addCallback(lambda _: self.wallet.reserve_points('testid', 2))
|
||||
d.addCallback(lambda _: self.wallet.support_claim(
|
||||
'test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 4))
|
||||
self.assertFailure(d, InsufficientFundsError)
|
||||
return d
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from lbrynet.blob import CryptBlob
|
||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
import random
|
||||
import string
|
||||
from io import BytesIO
|
||||
import os
|
||||
|
||||
AES_BLOCK_SIZE_BYTES = int(AES.block_size / 8)
|
||||
|
||||
class MocBlob:
|
||||
def __init__(self):
|
||||
self.data = b''
|
||||
|
||||
def read(self, write_func):
|
||||
data = self.data
|
||||
write_func(data)
|
||||
return defer.succeed(True)
|
||||
|
||||
def open_for_reading(self):
|
||||
return BytesIO(self.data)
|
||||
|
||||
def write(self, data):
|
||||
if not isinstance(data, bytes):
|
||||
data = data.encode()
|
||||
self.data += data
|
||||
|
||||
def close(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
|
||||
def random_string(length):
|
||||
return ''.join(random.choice(string.ascii_lowercase) for i in range(length))
|
||||
|
||||
|
||||
class TestCryptBlob(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _test_encrypt_decrypt(self, size_of_data):
|
||||
# max blob size is 2*2**20 -1 ( -1 due to required padding in the end )
|
||||
blob = MocBlob()
|
||||
blob_num = 0
|
||||
key = os.urandom(AES_BLOCK_SIZE_BYTES)
|
||||
iv = os.urandom(AES_BLOCK_SIZE_BYTES)
|
||||
maker = CryptBlob.CryptStreamBlobMaker(key, iv, blob_num, blob)
|
||||
write_size = size_of_data
|
||||
string_to_encrypt = random_string(size_of_data).encode()
|
||||
|
||||
# encrypt string
|
||||
done, num_bytes = maker.write(string_to_encrypt)
|
||||
yield maker.close()
|
||||
self.assertEqual(size_of_data, num_bytes)
|
||||
expected_encrypted_blob_size = int((size_of_data / AES_BLOCK_SIZE_BYTES) + 1) * AES_BLOCK_SIZE_BYTES
|
||||
self.assertEqual(expected_encrypted_blob_size, len(blob.data))
|
||||
|
||||
if size_of_data < MAX_BLOB_SIZE-1:
|
||||
self.assertFalse(done)
|
||||
else:
|
||||
self.assertTrue(done)
|
||||
self.data_buf = b''
|
||||
|
||||
def write_func(data):
|
||||
self.data_buf += data
|
||||
|
||||
# decrypt string
|
||||
decryptor = CryptBlob.StreamBlobDecryptor(blob, key, iv, size_of_data)
|
||||
yield decryptor.decrypt(write_func)
|
||||
self.assertEqual(self.data_buf, string_to_encrypt)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_encrypt_decrypt(self):
|
||||
yield self._test_encrypt_decrypt(1)
|
||||
yield self._test_encrypt_decrypt(16*2)
|
||||
yield self._test_encrypt_decrypt(2000)
|
||||
yield self._test_encrypt_decrypt(2*2**20-1)
|
|
@ -1,115 +0,0 @@
|
|||
import json
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.extras.daemon.PeerManager import PeerManager
|
||||
from lbrynet.p2p.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader
|
||||
from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier
|
||||
from lbrynet.p2p.BlobManager import DiskBlobManager
|
||||
from lbrynet.p2p.RateLimiter import DummyRateLimiter
|
||||
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.blob import EncryptedFileCreator
|
||||
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
|
||||
from lbrynet.p2p.StreamDescriptor import JSONBytesEncoder
|
||||
from tests import mocks
|
||||
from tests.test_utils import mk_db_and_blob_dir, rm_db_and_blob_dir
|
||||
|
||||
|
||||
FakeNode = mocks.Node
|
||||
FakeWallet = mocks.Wallet
|
||||
FakePeerFinder = mocks.PeerFinder
|
||||
FakeAnnouncer = mocks.Announcer
|
||||
GenFile = mocks.GenFile
|
||||
test_create_stream_sd_file = mocks.create_stream_sd_file
|
||||
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
|
||||
|
||||
MB = 2**20
|
||||
|
||||
|
||||
def iv_generator():
|
||||
while True:
|
||||
yield b'3' * (AES.block_size // 8)
|
||||
|
||||
|
||||
class CreateEncryptedFileTest(unittest.TestCase):
|
||||
timeout = 5
|
||||
|
||||
def setUp(self):
|
||||
self.tmp_db_dir, self.tmp_blob_dir = mk_db_and_blob_dir()
|
||||
conf = Config(data_dir=self.tmp_blob_dir)
|
||||
self.wallet = FakeWallet()
|
||||
self.peer_manager = PeerManager()
|
||||
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2)
|
||||
self.rate_limiter = DummyRateLimiter()
|
||||
self.sd_identifier = StreamDescriptorIdentifier()
|
||||
self.storage = SQLiteStorage(conf, ':memory:')
|
||||
self.blob_manager = DiskBlobManager(self.tmp_blob_dir, self.storage)
|
||||
self.prm = OnlyFreePaymentsManager()
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
conf, self.peer_finder, self.rate_limiter, self.blob_manager,
|
||||
self.wallet, self.prm, self.storage, self.sd_identifier
|
||||
)
|
||||
d = f2d(self.storage.open())
|
||||
d.addCallback(lambda _: f2d(self.lbry_file_manager.setup()))
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
yield self.lbry_file_manager.stop()
|
||||
yield f2d(self.blob_manager.stop())
|
||||
yield f2d(self.storage.close())
|
||||
rm_db_and_blob_dir(self.tmp_db_dir, self.tmp_blob_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def create_file(self, filename):
|
||||
handle = mocks.GenFile(3*MB, b'1')
|
||||
key = b'2' * (AES.block_size // 8)
|
||||
out = yield EncryptedFileCreator.create_lbry_file(
|
||||
self.blob_manager, self.storage, self.prm, self.lbry_file_manager, filename, handle, key, iv_generator()
|
||||
)
|
||||
defer.returnValue(out)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_can_create_file(self):
|
||||
expected_stream_hash = "41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c35" \
|
||||
"7b1acb742dd83151fb66393a7709e9f346260a4f4db6de10c25"
|
||||
expected_sd_hash = "40c485432daec586c1a2d247e6c08d137640a5af6e81f3f652" \
|
||||
"3e62e81a2e8945b0db7c94f1852e70e371d917b994352c"
|
||||
filename = 'test.file'
|
||||
lbry_file = yield self.create_file(filename)
|
||||
sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash))
|
||||
|
||||
# read the sd blob file
|
||||
sd_blob = self.blob_manager.blobs[sd_hash]
|
||||
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
||||
sd_file_info = yield sd_reader.get_info()
|
||||
|
||||
# this comes from the database, the blobs returned are sorted
|
||||
sd_info = yield f2d(get_sd_info(self.storage, lbry_file.stream_hash, include_blobs=True))
|
||||
self.maxDiff = None
|
||||
unicode_sd_info = json.loads(json.dumps(sd_info, sort_keys=True, cls=JSONBytesEncoder))
|
||||
self.assertDictEqual(unicode_sd_info, sd_file_info)
|
||||
self.assertEqual(sd_info['stream_hash'], expected_stream_hash)
|
||||
self.assertEqual(len(sd_info['blobs']), 3)
|
||||
self.assertNotEqual(sd_info['blobs'][0]['length'], 0)
|
||||
self.assertNotEqual(sd_info['blobs'][1]['length'], 0)
|
||||
self.assertEqual(sd_info['blobs'][2]['length'], 0)
|
||||
self.assertEqual(expected_stream_hash, lbry_file.stream_hash)
|
||||
self.assertEqual(sd_hash, lbry_file.sd_hash)
|
||||
self.assertEqual(sd_hash, expected_sd_hash)
|
||||
blobs = yield self.blob_manager.get_all_verified_blobs()
|
||||
self.assertEqual(3, len(blobs))
|
||||
num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs()
|
||||
self.assertEqual(2, num_should_announce_blobs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_can_create_file_with_unicode_filename(self):
|
||||
expected_stream_hash = ('d1da4258f3ce12edb91d7e8e160d091d3ab1432c2e55a6352dce0'
|
||||
'2fd5adb86fe144e93e110075b5865fff8617776c6c0')
|
||||
filename = '☃.file'
|
||||
lbry_file = yield self.create_file(filename)
|
||||
self.assertEqual(expected_stream_hash, lbry_file.stream_hash)
|
|
@ -1,185 +0,0 @@
|
|||
import types
|
||||
from unittest import mock
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer, task
|
||||
|
||||
|
||||
from lbrynet.p2p import PaymentRateManager
|
||||
from lbrynet.p2p.Error import DownloadDataTimeout, DownloadSDTimeout
|
||||
from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier
|
||||
from lbrynet.p2p.BlobManager import DiskBlobManager
|
||||
from lbrynet.p2p.RateLimiter import DummyRateLimiter
|
||||
from lbrynet.p2p.client.DownloadManager import DownloadManager
|
||||
from lbrynet.extras.daemon import Downloader
|
||||
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.extras.daemon.PeerFinder import DummyPeerFinder
|
||||
from lbrynet.blob.EncryptedFileStatusReport import EncryptedFileStatusReport
|
||||
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.extras.wallet import LbryWalletManager
|
||||
from lbrynet.conf import Config
|
||||
|
||||
|
||||
class MocDownloader:
|
||||
def __init__(self):
|
||||
self.finish_deferred = defer.Deferred(None)
|
||||
self.stop_called = False
|
||||
self.file_name = 'test'
|
||||
self.name = 'test'
|
||||
self.num_completed = 0
|
||||
self.num_known = 1
|
||||
self.running_status = ManagedEncryptedFileDownloader.STATUS_RUNNING
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def status(self):
|
||||
out = yield EncryptedFileStatusReport(
|
||||
self.name, self.num_completed, self.num_known, self.running_status)
|
||||
defer.returnValue(out)
|
||||
|
||||
def start(self):
|
||||
return self.finish_deferred
|
||||
|
||||
def stop(self):
|
||||
self.stop_called = True
|
||||
self.finish_deferred.callback(True)
|
||||
|
||||
|
||||
def moc_initialize(self, stream_info):
|
||||
self.sd_hash = "d5169241150022f996fa7cd6a9a1c421937276a3275eb912" \
|
||||
"790bd07ba7aec1fac5fd45431d226b8fb402691e79aeb24b"
|
||||
return None
|
||||
|
||||
|
||||
def moc_download_sd_blob(self):
|
||||
return None
|
||||
|
||||
|
||||
def moc_download(self, sd_blob, name, txid, nout, key_fee, file_name):
|
||||
self.pay_key_fee(key_fee, name)
|
||||
self.downloader = MocDownloader()
|
||||
self.downloader.start()
|
||||
|
||||
|
||||
def moc_pay_key_fee(d):
|
||||
def _moc_pay_key_fee(self, key_fee, name):
|
||||
d.callback(True)
|
||||
return _moc_pay_key_fee
|
||||
|
||||
|
||||
class GetStreamTests(unittest.TestCase):
|
||||
|
||||
def init_getstream_with_mocs(self):
|
||||
conf = Config()
|
||||
|
||||
sd_identifier = mock.Mock(spec=StreamDescriptorIdentifier)
|
||||
wallet = mock.Mock(spec=LbryWalletManager)
|
||||
prm = mock.Mock(spec=PaymentRateManager.NegotiatedPaymentRateManager)
|
||||
exchange_rate_manager = mock.Mock(spec=ExchangeRateManager)
|
||||
storage = mock.Mock(spec=SQLiteStorage)
|
||||
peer_finder = DummyPeerFinder()
|
||||
blob_manager = mock.Mock(spec=DiskBlobManager)
|
||||
max_key_fee = {'currency': "LBC", 'amount': 10, 'address': ''}
|
||||
disable_max_key_fee = False
|
||||
data_rate = {'currency': "LBC", 'amount': 0, 'address': ''}
|
||||
getstream = Downloader.GetStream(
|
||||
conf, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, DummyRateLimiter(), prm,
|
||||
storage, max_key_fee, disable_max_key_fee, timeout=3, data_rate=data_rate
|
||||
)
|
||||
getstream.download_manager = mock.Mock(spec=DownloadManager)
|
||||
return getstream
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_init_exception(self):
|
||||
"""
|
||||
test that if initialization would fail, by giving it invalid
|
||||
stream_info, that an exception is thrown
|
||||
"""
|
||||
|
||||
getstream = self.init_getstream_with_mocs()
|
||||
name = 'test'
|
||||
stream_info = None
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
yield getstream.start(stream_info, name, "deadbeef" * 12, 0)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_sd_blob_download_timeout(self):
|
||||
"""
|
||||
test that if download_sd_blob fails due to timeout,
|
||||
DownloadTimeoutError is raised
|
||||
"""
|
||||
def download_sd_blob(self):
|
||||
raise DownloadSDTimeout(self)
|
||||
|
||||
called_pay_fee = defer.Deferred()
|
||||
|
||||
getstream = self.init_getstream_with_mocs()
|
||||
getstream._initialize = types.MethodType(moc_initialize, getstream)
|
||||
getstream._download_sd_blob = types.MethodType(download_sd_blob, getstream)
|
||||
getstream._download = types.MethodType(moc_download, getstream)
|
||||
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream)
|
||||
name = 'test'
|
||||
stream_info = None
|
||||
with self.assertRaises(DownloadSDTimeout):
|
||||
yield getstream.start(stream_info, name, "deadbeef" * 12, 0)
|
||||
self.assertFalse(called_pay_fee.called)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_timeout(self):
|
||||
"""
|
||||
test that timeout (set to 3 here) exception is raised
|
||||
when download times out while downloading first blob, and key fee is paid
|
||||
"""
|
||||
called_pay_fee = defer.Deferred()
|
||||
|
||||
getstream = self.init_getstream_with_mocs()
|
||||
getstream._initialize = types.MethodType(moc_initialize, getstream)
|
||||
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
|
||||
getstream._download = types.MethodType(moc_download, getstream)
|
||||
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream)
|
||||
name = 'test'
|
||||
stream_info = None
|
||||
start = getstream.start(stream_info, name, "deadbeef" * 12, 0)
|
||||
with self.assertRaises(DownloadDataTimeout):
|
||||
yield start
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_finish_one_blob(self):
|
||||
"""
|
||||
test that if we have 1 completed blob, start() returns
|
||||
and key fee is paid
|
||||
"""
|
||||
called_pay_fee = defer.Deferred()
|
||||
|
||||
getstream = self.init_getstream_with_mocs()
|
||||
getstream._initialize = types.MethodType(moc_initialize, getstream)
|
||||
|
||||
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
|
||||
getstream._download = types.MethodType(moc_download, getstream)
|
||||
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream)
|
||||
name = 'test'
|
||||
stream_info = None
|
||||
self.assertFalse(getstream.wrote_data)
|
||||
getstream.data_downloading_deferred.callback(None)
|
||||
yield getstream.start(stream_info, name, "deadbeef" * 12, 0)
|
||||
self.assertTrue(getstream.wrote_data)
|
||||
|
||||
# self.assertTrue(getstream.pay_key_fee_called)
|
||||
|
||||
# @defer.inlineCallbacks
|
||||
# def test_finish_stopped_downloader(self):
|
||||
# """
|
||||
# test that if we have a stopped downloader, beforfe a blob is downloaded,
|
||||
# start() returns
|
||||
# """
|
||||
# getstream = self.init_getstream_with_mocs()
|
||||
# getstream._initialize = types.MethodType(moc_initialize, getstream)
|
||||
# getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
|
||||
# getstream._download = types.MethodType(moc_download, getstream)
|
||||
# name='test'
|
||||
# stream_info = None
|
||||
# start = getstream.start(stream_info,name)
|
||||
#
|
||||
# getstream.downloader.running_status = ManagedEncryptedFileDownloader.STATUS_STOPPED
|
||||
# self.clock.advance(1)
|
||||
# downloader, f_deferred = yield start
|
Loading…
Add table
Reference in a new issue