forked from LBRYCommunity/lbry-sdk
Merge pull request #200 from lbryio/cleanup-tests
Start to cleanup functional tests to improve readability
This commit is contained in:
commit
572f1d1f6e
6 changed files with 322 additions and 433 deletions
|
@ -1,10 +1,11 @@
|
|||
import shutil
|
||||
from multiprocessing import Process, Event, Queue
|
||||
import io
|
||||
import logging
|
||||
from multiprocessing import Process, Event, Queue
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import sys
|
||||
import random
|
||||
import io
|
||||
import unittest
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
|
@ -15,7 +16,8 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
|
|||
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, DBEncryptedFileMetadataManager
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
|
||||
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
|
||||
from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger
|
||||
from lbrynet.core.Session import Session
|
||||
|
@ -30,17 +32,26 @@ from lbrynet.lbryfile.StreamDescriptor import get_sd_info
|
|||
from twisted.internet import defer, threads, task
|
||||
from twisted.trial.unittest import TestCase
|
||||
from twisted.python.failure import Failure
|
||||
import os
|
||||
|
||||
from lbrynet.dht.node import Node
|
||||
from tests.mocks import DummyBlobAvailabilityTracker
|
||||
from lbrynet.core.PeerManager import PeerManager
|
||||
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
|
||||
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
||||
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
||||
|
||||
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
|
||||
from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier
|
||||
from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier
|
||||
|
||||
from tests import mocks
|
||||
|
||||
FakeNode = mocks.Node
|
||||
FakeWallet = mocks.Wallet
|
||||
FakePeerFinder = mocks.PeerFinder
|
||||
FakeAnnouncer = mocks.Announcer
|
||||
GenFile = mocks.GenFile
|
||||
test_create_stream_sd_file = mocks.create_stream_sd_file
|
||||
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
|
||||
|
||||
log_format = "%(funcName)s(): %(message)s"
|
||||
logging.basicConfig(level=logging.WARNING, format=log_format)
|
||||
|
@ -54,162 +65,7 @@ def require_system(system):
|
|||
else:
|
||||
return unittest.skip("Skipping. Test can only be run on " + system)
|
||||
|
||||
|
||||
class FakeNode(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def joinNetwork(self, *args):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
class FakeWallet(object):
|
||||
def __init__(self):
|
||||
self.private_key = RSA.generate(1024)
|
||||
self.encoded_public_key = self.private_key.publickey().exportKey()
|
||||
|
||||
def start(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
def stop(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_info_exchanger(self):
|
||||
return PointTraderKeyExchanger(self)
|
||||
|
||||
def get_wallet_info_query_handler_factory(self):
|
||||
return PointTraderKeyQueryHandlerFactory(self)
|
||||
|
||||
def reserve_points(self, *args):
|
||||
return True
|
||||
|
||||
def cancel_point_reservation(self, *args):
|
||||
pass
|
||||
|
||||
def send_points(self, *args):
|
||||
return defer.succeed(True)
|
||||
|
||||
def add_expected_payment(self, *args):
|
||||
pass
|
||||
|
||||
def get_balance(self):
|
||||
return defer.succeed(1000)
|
||||
|
||||
def set_public_key_for_peer(self, peer, public_key):
|
||||
pass
|
||||
|
||||
def get_claim_metadata_for_sd_hash(self, sd_hash):
|
||||
return "fakeuri", "faketxid"
|
||||
|
||||
|
||||
class FakePeerFinder(object):
|
||||
def __init__(self, start_port, peer_manager, num_peers):
|
||||
self.start_port = start_port
|
||||
self.peer_manager = peer_manager
|
||||
self.num_peers = num_peers
|
||||
self.count = 0
|
||||
|
||||
def find_peers_for_blob(self, *args):
|
||||
peer_port = self.start_port + self.count
|
||||
self.count += 1
|
||||
if self.count >= self.num_peers:
|
||||
self.count = 0
|
||||
return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)])
|
||||
|
||||
def run_manage_loop(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
class FakeAnnouncer(object):
|
||||
|
||||
def __init__(self, *args):
|
||||
pass
|
||||
|
||||
def add_supplier(self, supplier):
|
||||
pass
|
||||
|
||||
def immediate_announce(self, *args):
|
||||
pass
|
||||
|
||||
def run_manage_loop(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
class GenFile(io.RawIOBase):
|
||||
def __init__(self, size, pattern):
|
||||
io.RawIOBase.__init__(self)
|
||||
self.size = size
|
||||
self.pattern = pattern
|
||||
self.read_so_far = 0
|
||||
self.buff = b''
|
||||
self.last_offset = 0
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
def writable(self):
|
||||
return False
|
||||
|
||||
def read(self, n=-1):
|
||||
if n > -1:
|
||||
bytes_to_read = min(n, self.size - self.read_so_far)
|
||||
else:
|
||||
bytes_to_read = self.size - self.read_so_far
|
||||
output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
|
||||
bytes_to_read -= len(output)
|
||||
while bytes_to_read > 0:
|
||||
self.buff = self._generate_chunk()
|
||||
new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:]
|
||||
bytes_to_read -= len(new_output)
|
||||
output += new_output
|
||||
self.read_so_far += len(output)
|
||||
return output
|
||||
|
||||
def readall(self):
|
||||
return self.read()
|
||||
|
||||
def _generate_chunk(self, n=2**10):
|
||||
output = self.pattern[self.last_offset:self.last_offset + n]
|
||||
n_left = n - len(output)
|
||||
whole_patterns = n_left / len(self.pattern)
|
||||
output += self.pattern * whole_patterns
|
||||
self.last_offset = n - len(output)
|
||||
output += self.pattern[:self.last_offset]
|
||||
return output
|
||||
|
||||
|
||||
test_create_stream_sd_file = {
|
||||
'stream_name': '746573745f66696c65',
|
||||
'blobs': [
|
||||
{'length': 2097152, 'blob_num': 0,
|
||||
'blob_hash':
|
||||
'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586',
|
||||
'iv': '30303030303030303030303030303031'},
|
||||
{'length': 2097152, 'blob_num': 1,
|
||||
'blob_hash':
|
||||
'f4067522c1b49432a2a679512e3917144317caa1abba0c041e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70',
|
||||
'iv': '30303030303030303030303030303032'},
|
||||
{'length': 1015056, 'blob_num': 2,
|
||||
'blob_hash':
|
||||
'305486c434260484fcb2968ce0e963b72f81ba56c11b08b1af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9',
|
||||
'iv': '30303030303030303030303030303033'},
|
||||
{'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'}],
|
||||
'stream_type': 'lbryfile',
|
||||
'key': '30313233343536373031323334353637',
|
||||
'suggested_file_name': '746573745f66696c65',
|
||||
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'}
|
||||
|
||||
|
||||
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None, is_generous=False):
|
||||
def use_epoll_on_linux():
|
||||
if sys.platform.startswith("linux"):
|
||||
sys.modules = sys.modules.copy()
|
||||
del sys.modules['twisted.internet.reactor']
|
||||
|
@ -217,47 +73,63 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
|
|||
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
|
||||
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
logging.debug("Starting the uploader")
|
||||
class LbryUploader(object):
|
||||
def __init__(self, sd_hash_queue, kill_event, dead_event,
|
||||
file_size, ul_rate_limit=None, is_generous=False):
|
||||
self.sd_hash_queue = sd_hash_queue
|
||||
self.kill_event = kill_event
|
||||
self.dead_event = dead_event
|
||||
self.file_size = file_size
|
||||
self.ul_rate_limit = ul_rate_limit
|
||||
self.is_generous = is_generous
|
||||
# these attributes get defined in `start`
|
||||
self.reactor = None
|
||||
self.sd_identifier = None
|
||||
self.session = None
|
||||
self.lbry_file_manager = None
|
||||
self.server_port = None
|
||||
self.kill_check = None
|
||||
|
||||
Random.atfork()
|
||||
def start(self):
|
||||
use_epoll_on_linux()
|
||||
from twisted.internet import reactor
|
||||
self.reactor = reactor
|
||||
logging.debug("Starting the uploader")
|
||||
Random.atfork()
|
||||
r = random.Random()
|
||||
r.seed("start_lbry_uploader")
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 1)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = RateLimiter()
|
||||
self.sd_identifier = StreamDescriptorIdentifier()
|
||||
db_dir = "server"
|
||||
os.mkdir(db_dir)
|
||||
self.session = Session(
|
||||
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node, is_generous=self.is_generous)
|
||||
stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
self.session, stream_info_manager, self.sd_identifier)
|
||||
if self.ul_rate_limit is not None:
|
||||
self.session.rate_limiter.set_ul_limit(self.ul_rate_limit)
|
||||
reactor.callLater(1, self.start_all)
|
||||
if not reactor.running:
|
||||
reactor.run()
|
||||
|
||||
r = random.Random()
|
||||
r.seed("start_lbry_uploader")
|
||||
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 1)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = RateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
|
||||
db_dir = "server"
|
||||
os.mkdir(db_dir)
|
||||
|
||||
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node, is_generous=is_generous)
|
||||
|
||||
stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier)
|
||||
|
||||
if ul_rate_limit is not None:
|
||||
session.rate_limiter.set_ul_limit(ul_rate_limit)
|
||||
|
||||
def start_all():
|
||||
|
||||
d = session.setup()
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
|
||||
d.addCallback(lambda _: lbry_file_manager.setup())
|
||||
d.addCallback(lambda _: start_server())
|
||||
d.addCallback(lambda _: create_stream())
|
||||
d.addCallback(create_stream_descriptor)
|
||||
d.addCallback(put_sd_hash_on_queue)
|
||||
def start_all(self):
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||
d.addCallback(lambda _: self.start_server())
|
||||
d.addCallback(lambda _: self.create_stream())
|
||||
d.addCallback(self.create_stream_descriptor)
|
||||
d.addCallback(self.put_sd_hash_on_queue)
|
||||
|
||||
def print_error(err):
|
||||
logging.critical("Server error: %s", err.getErrorMessage())
|
||||
|
@ -265,10 +137,8 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
|
|||
d.addErrback(print_error)
|
||||
return d
|
||||
|
||||
def start_server():
|
||||
|
||||
server_port = None
|
||||
|
||||
def start_server(self):
|
||||
session = self.session
|
||||
query_handler_factories = {
|
||||
BlobAvailabilityHandlerFactory(session.blob_manager): True,
|
||||
BlobRequestHandlerFactory(
|
||||
|
@ -277,62 +147,50 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
|
|||
analytics.Track()): True,
|
||||
session.wallet.get_wallet_info_query_handler_factory(): True,
|
||||
}
|
||||
|
||||
server_factory = ServerProtocolFactory(session.rate_limiter,
|
||||
query_handler_factories,
|
||||
session.peer_manager)
|
||||
|
||||
server_port = reactor.listenTCP(5553, server_factory)
|
||||
self.server_port = self.reactor.listenTCP(5553, server_factory)
|
||||
logging.debug("Started listening")
|
||||
|
||||
def kill_server():
|
||||
ds = []
|
||||
ds.append(session.shut_down())
|
||||
ds.append(lbry_file_manager.stop())
|
||||
if server_port:
|
||||
ds.append(server_port.stopListening())
|
||||
kill_check.stop()
|
||||
dead_event.set()
|
||||
dl = defer.DeferredList(ds)
|
||||
dl.addCallback(lambda _: reactor.stop())
|
||||
return dl
|
||||
|
||||
def check_for_kill():
|
||||
if kill_event.is_set():
|
||||
kill_server()
|
||||
|
||||
kill_check = task.LoopingCall(check_for_kill)
|
||||
kill_check.start(1.0)
|
||||
self.kill_check = task.LoopingCall(self.check_for_kill)
|
||||
self.kill_check.start(1.0)
|
||||
return True
|
||||
|
||||
def create_stream():
|
||||
test_file = GenFile(file_size, b''.join([chr(i) for i in xrange(0, 64, 6)]))
|
||||
d = create_lbry_file(session, lbry_file_manager, "test_file", test_file)
|
||||
def kill_server(self):
|
||||
session = self.session
|
||||
ds = []
|
||||
ds.append(session.shut_down())
|
||||
ds.append(self.lbry_file_manager.stop())
|
||||
if self.server_port:
|
||||
ds.append(self.server_port.stopListening())
|
||||
self.kill_check.stop()
|
||||
self.dead_event.set()
|
||||
dl = defer.DeferredList(ds)
|
||||
dl.addCallback(lambda _: self.reactor.stop())
|
||||
return dl
|
||||
|
||||
def check_for_kill(self):
|
||||
if self.kill_event.is_set():
|
||||
self.kill_server()
|
||||
|
||||
def create_stream(self):
|
||||
test_file = GenFile(self.file_size, b''.join([chr(i) for i in xrange(0, 64, 6)]))
|
||||
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file)
|
||||
return d
|
||||
|
||||
def create_stream_descriptor(stream_hash):
|
||||
descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager)
|
||||
d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True)
|
||||
def create_stream_descriptor(self, stream_hash):
|
||||
descriptor_writer = BlobStreamDescriptorWriter(self.session.blob_manager)
|
||||
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
|
||||
d.addCallback(descriptor_writer.create_descriptor)
|
||||
return d
|
||||
|
||||
def put_sd_hash_on_queue(sd_hash):
|
||||
sd_hash_queue.put(sd_hash)
|
||||
|
||||
reactor.callLater(1, start_all)
|
||||
if not reactor.running:
|
||||
reactor.run()
|
||||
def put_sd_hash_on_queue(self, sd_hash):
|
||||
self.sd_hash_queue.put(sd_hash)
|
||||
|
||||
|
||||
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None, is_generous=False):
|
||||
|
||||
if sys.platform.startswith("linux"):
|
||||
sys.modules = sys.modules.copy()
|
||||
del sys.modules['twisted.internet.reactor']
|
||||
import twisted.internet
|
||||
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
|
||||
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
|
||||
|
||||
def start_lbry_reuploader(sd_hash, kill_event, dead_event,
|
||||
ready_event, n, ul_rate_limit=None, is_generous=False):
|
||||
use_epoll_on_linux()
|
||||
from twisted.internet import reactor
|
||||
|
||||
logging.debug("Starting the uploader")
|
||||
|
@ -340,7 +198,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
|
|||
Random.atfork()
|
||||
|
||||
r = random.Random()
|
||||
r.seed("start_lbry_uploader")
|
||||
r.seed("start_lbry_reuploader")
|
||||
|
||||
wallet = FakeWallet()
|
||||
peer_port = 5553 + n
|
||||
|
@ -442,14 +300,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
|
|||
|
||||
|
||||
def start_live_server(sd_hash_queue, kill_event, dead_event):
|
||||
|
||||
if sys.platform.startswith("linux"):
|
||||
sys.modules = sys.modules.copy()
|
||||
del sys.modules['twisted.internet.reactor']
|
||||
import twisted.internet
|
||||
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
|
||||
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
|
||||
|
||||
use_epoll_on_linux()
|
||||
from twisted.internet import reactor
|
||||
|
||||
logging.debug("In start_server.")
|
||||
|
@ -575,14 +426,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
|
|||
|
||||
|
||||
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False):
|
||||
|
||||
if sys.platform.startswith("linux"):
|
||||
sys.modules = sys.modules.copy()
|
||||
del sys.modules['twisted.internet.reactor']
|
||||
import twisted.internet
|
||||
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
|
||||
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
|
||||
|
||||
use_epoll_on_linux()
|
||||
from twisted.internet import reactor
|
||||
|
||||
logging.debug("Starting the uploader")
|
||||
|
@ -763,7 +607,8 @@ class TestTransfer(TestCase):
|
|||
sd_hash_queue = Queue()
|
||||
kill_event = Event()
|
||||
dead_event = Event()
|
||||
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343))
|
||||
lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343)
|
||||
uploader = Process(target=lbry_uploader.start)
|
||||
uploader.start()
|
||||
self.server_processes.append(uploader)
|
||||
|
||||
|
@ -782,21 +627,25 @@ class TestTransfer(TestCase):
|
|||
os.mkdir(db_dir)
|
||||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node, is_generous=self.is_generous)
|
||||
self.session = Session(
|
||||
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node, is_generous=self.is_generous)
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
self.session, self.stream_info_manager, sd_identifier)
|
||||
|
||||
def make_downloader(metadata, prm):
|
||||
info_validator = metadata.validator
|
||||
options = metadata.options
|
||||
factories = metadata.factories
|
||||
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
|
||||
chosen_options = [
|
||||
o.default_value for o in options.get_downloader_options(info_validator, prm)]
|
||||
return factories[0].make_downloader(metadata, chosen_options, prm)
|
||||
|
||||
def download_file(sd_hash):
|
||||
|
@ -868,10 +717,12 @@ class TestTransfer(TestCase):
|
|||
db_dir = "client"
|
||||
os.mkdir(db_dir)
|
||||
|
||||
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None,
|
||||
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node)
|
||||
self.session = Session(
|
||||
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None,
|
||||
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node
|
||||
)
|
||||
|
||||
self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer)
|
||||
|
||||
|
@ -881,7 +732,8 @@ class TestTransfer(TestCase):
|
|||
info_validator = metadata.validator
|
||||
options = metadata.options
|
||||
factories = metadata.factories
|
||||
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
|
||||
chosen_options = [
|
||||
o.default_value for o in options.get_downloader_options(info_validator, prm)]
|
||||
return factories[0].make_downloader(metadata, chosen_options, prm)
|
||||
|
||||
def start_lbry_file(lbry_file):
|
||||
|
@ -940,7 +792,6 @@ class TestTransfer(TestCase):
|
|||
return d
|
||||
|
||||
def test_last_blob_retrieval(self):
|
||||
|
||||
kill_event = Event()
|
||||
dead_event_1 = Event()
|
||||
blob_hash_queue_1 = Queue()
|
||||
|
@ -969,10 +820,12 @@ class TestTransfer(TestCase):
|
|||
os.mkdir(db_dir)
|
||||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
|
||||
self.session = Session(
|
||||
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker)
|
||||
|
||||
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
|
||||
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
|
||||
|
@ -986,8 +839,8 @@ class TestTransfer(TestCase):
|
|||
|
||||
def download_blob(blob_hash):
|
||||
prm = self.session.payment_rate_manager
|
||||
downloader = StandaloneBlobDownloader(blob_hash, self.session.blob_manager, peer_finder,
|
||||
rate_limiter, prm, wallet)
|
||||
downloader = StandaloneBlobDownloader(
|
||||
blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet)
|
||||
d = downloader.download()
|
||||
return d
|
||||
|
||||
|
@ -1012,23 +865,20 @@ class TestTransfer(TestCase):
|
|||
d1 = self.wait_for_event(dead_event_1, 15)
|
||||
d2 = self.wait_for_event(dead_event_2, 15)
|
||||
dl = defer.DeferredList([d1, d2])
|
||||
|
||||
def print_shutting_down():
|
||||
logging.info("Client is shutting down")
|
||||
|
||||
dl.addCallback(lambda _: print_shutting_down())
|
||||
dl.addCallback(lambda _: arg)
|
||||
return dl
|
||||
|
||||
d.addBoth(stop)
|
||||
|
||||
return d
|
||||
|
||||
def test_double_download(self):
|
||||
sd_hash_queue = Queue()
|
||||
kill_event = Event()
|
||||
dead_event = Event()
|
||||
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343))
|
||||
lbry_uploader = LbryUploader(sd_hash_queue, kill_event, dead_event, 5209343)
|
||||
uploader = Process(target=lbry_uploader.start)
|
||||
uploader.start()
|
||||
self.server_processes.append(uploader)
|
||||
|
||||
|
@ -1144,8 +994,9 @@ class TestTransfer(TestCase):
|
|||
kill_event = Event()
|
||||
dead_events = [Event() for _ in range(num_uploaders)]
|
||||
ready_events = [Event() for _ in range(1, num_uploaders)]
|
||||
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_events[0],
|
||||
9373419, 2**22))
|
||||
lbry_uploader = LbryUploader(
|
||||
sd_hash_queue, kill_event, dead_events[0], 5209343, 9373419, 2**22)
|
||||
uploader = Process(target=lbry_uploader.start)
|
||||
uploader.start()
|
||||
self.server_processes.append(uploader)
|
||||
|
||||
|
@ -1240,140 +1091,3 @@ class TestTransfer(TestCase):
|
|||
d.addBoth(stop)
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class TestStreamify(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.session = None
|
||||
self.stream_info_manager = None
|
||||
self.lbry_file_manager = None
|
||||
self.addCleanup(self.take_down_env)
|
||||
self.is_generous = True
|
||||
|
||||
def take_down_env(self):
|
||||
|
||||
d = defer.succeed(True)
|
||||
if self.lbry_file_manager is not None:
|
||||
d.addCallback(lambda _: self.lbry_file_manager.stop())
|
||||
if self.session is not None:
|
||||
d.addCallback(lambda _: self.session.shut_down())
|
||||
if self.stream_info_manager is not None:
|
||||
d.addCallback(lambda _: self.stream_info_manager.stop())
|
||||
|
||||
def delete_test_env():
|
||||
shutil.rmtree('client')
|
||||
if os.path.exists("test_file"):
|
||||
os.remove("test_file")
|
||||
|
||||
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
|
||||
return d
|
||||
|
||||
def test_create_stream(self):
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 2)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
|
||||
db_dir = "client"
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
os.mkdir(db_dir)
|
||||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=self.is_generous)
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||
|
||||
def verify_equal(sd_info):
|
||||
self.assertEqual(sd_info, test_create_stream_sd_file)
|
||||
|
||||
def verify_stream_descriptor_file(stream_hash):
|
||||
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
|
||||
d.addCallback(verify_equal)
|
||||
return d
|
||||
|
||||
def iv_generator():
|
||||
iv = 0
|
||||
while 1:
|
||||
iv += 1
|
||||
yield "%016d" % iv
|
||||
|
||||
def create_stream():
|
||||
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
|
||||
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
|
||||
key="0123456701234567", iv_generator=iv_generator())
|
||||
return d
|
||||
|
||||
d.addCallback(lambda _: create_stream())
|
||||
d.addCallback(verify_stream_descriptor_file)
|
||||
return d
|
||||
|
||||
def test_create_and_combine_stream(self):
|
||||
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 2)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
db_dir = "client"
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
os.mkdir(db_dir)
|
||||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
|
||||
|
||||
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
|
||||
|
||||
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
|
||||
|
||||
def start_lbry_file(lbry_file):
|
||||
logging.debug("Calling lbry_file.start()")
|
||||
d = lbry_file.start()
|
||||
return d
|
||||
|
||||
def combine_stream(stream_hash):
|
||||
|
||||
prm = self.session.payment_rate_manager
|
||||
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
|
||||
d.addCallback(start_lbry_file)
|
||||
|
||||
def check_md5_sum():
|
||||
f = open('test_file')
|
||||
hashsum = MD5.new()
|
||||
hashsum.update(f.read())
|
||||
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
|
||||
|
||||
d.addCallback(lambda _: check_md5_sum())
|
||||
return d
|
||||
|
||||
def create_stream():
|
||||
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
|
||||
return create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
|
||||
suggested_file_name="test_file")
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||
d.addCallback(lambda _: create_stream())
|
||||
d.addCallback(combine_stream)
|
||||
return d
|
||||
|
|
|
@ -93,7 +93,7 @@ class TestReflector(unittest.TestCase):
|
|||
use_upnp=False,
|
||||
rate_limiter=rate_limiter,
|
||||
wallet=wallet,
|
||||
blob_tracker_class=mocks.DummyBlobAvailabilityTracker,
|
||||
blob_tracker_class=mocks.BlobAvailabilityTracker,
|
||||
dht_node_class=Node
|
||||
)
|
||||
|
||||
|
|
172
tests/functional/test_streamify.py
Normal file
172
tests/functional/test_streamify.py
Normal file
|
@ -0,0 +1,172 @@
|
|||
import logging
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from Crypto.Hash import MD5
|
||||
from twisted.trial.unittest import TestCase
|
||||
from twisted.internet import defer, threads
|
||||
|
||||
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
|
||||
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
|
||||
from lbrynet.core.Session import Session
|
||||
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
|
||||
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
|
||||
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
|
||||
from lbrynet.lbryfile.StreamDescriptor import get_sd_info
|
||||
from lbrynet.core.PeerManager import PeerManager
|
||||
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
|
||||
|
||||
from tests import mocks
|
||||
|
||||
|
||||
FakeNode = mocks.Node
|
||||
FakeWallet = mocks.Wallet
|
||||
FakePeerFinder = mocks.PeerFinder
|
||||
FakeAnnouncer = mocks.Announcer
|
||||
GenFile = mocks.GenFile
|
||||
test_create_stream_sd_file = mocks.create_stream_sd_file
|
||||
DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
|
||||
|
||||
|
||||
class TestStreamify(TestCase):
|
||||
def setUp(self):
|
||||
self.session = None
|
||||
self.stream_info_manager = None
|
||||
self.lbry_file_manager = None
|
||||
self.addCleanup(self.take_down_env)
|
||||
self.is_generous = True
|
||||
|
||||
def take_down_env(self):
|
||||
d = defer.succeed(True)
|
||||
if self.lbry_file_manager is not None:
|
||||
d.addCallback(lambda _: self.lbry_file_manager.stop())
|
||||
if self.session is not None:
|
||||
d.addCallback(lambda _: self.session.shut_down())
|
||||
if self.stream_info_manager is not None:
|
||||
d.addCallback(lambda _: self.stream_info_manager.stop())
|
||||
|
||||
def delete_test_env():
|
||||
shutil.rmtree('client')
|
||||
if os.path.exists("test_file"):
|
||||
os.remove("test_file")
|
||||
|
||||
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
|
||||
return d
|
||||
|
||||
def test_create_stream(self):
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 2)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
|
||||
db_dir = "client"
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
os.mkdir(db_dir)
|
||||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(
|
||||
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=self.is_generous
|
||||
)
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
self.session, self.stream_info_manager, sd_identifier)
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||
|
||||
def verify_equal(sd_info):
|
||||
self.assertEqual(sd_info, test_create_stream_sd_file)
|
||||
|
||||
def verify_stream_descriptor_file(stream_hash):
|
||||
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
|
||||
d.addCallback(verify_equal)
|
||||
return d
|
||||
|
||||
def iv_generator():
|
||||
iv = 0
|
||||
while 1:
|
||||
iv += 1
|
||||
yield "%016d" % iv
|
||||
|
||||
def create_stream():
|
||||
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
|
||||
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
|
||||
key="0123456701234567", iv_generator=iv_generator())
|
||||
return d
|
||||
|
||||
d.addCallback(lambda _: create_stream())
|
||||
d.addCallback(verify_stream_descriptor_file)
|
||||
return d
|
||||
|
||||
def test_create_and_combine_stream(self):
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 2)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
db_dir = "client"
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
os.mkdir(db_dir)
|
||||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(
|
||||
MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker
|
||||
)
|
||||
|
||||
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
|
||||
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
self.session, self.stream_info_manager, sd_identifier)
|
||||
|
||||
def start_lbry_file(lbry_file):
|
||||
logging.debug("Calling lbry_file.start()")
|
||||
d = lbry_file.start()
|
||||
return d
|
||||
|
||||
def combine_stream(stream_hash):
|
||||
prm = self.session.payment_rate_manager
|
||||
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
|
||||
d.addCallback(start_lbry_file)
|
||||
|
||||
def check_md5_sum():
|
||||
f = open('test_file')
|
||||
hashsum = MD5.new()
|
||||
hashsum.update(f.read())
|
||||
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
|
||||
|
||||
d.addCallback(lambda _: check_md5_sum())
|
||||
return d
|
||||
|
||||
def create_stream():
|
||||
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
|
||||
return create_lbry_file(
|
||||
self.session, self.lbry_file_manager, "test_file", test_file,
|
||||
suggested_file_name="test_file")
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||
d.addCallback(lambda _: create_stream())
|
||||
d.addCallback(combine_stream)
|
||||
return d
|
|
@ -5,7 +5,7 @@ from decimal import Decimal
|
|||
from twisted.internet import defer, threads, task, error
|
||||
|
||||
from lbrynet.core import PTCWallet
|
||||
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker
|
||||
from lbrynet.core import BlobAvailability
|
||||
|
||||
|
||||
class Node(object):
|
||||
|
@ -54,6 +54,9 @@ class Wallet(object):
|
|||
def set_public_key_for_peer(self, peer, public_key):
|
||||
pass
|
||||
|
||||
def get_claim_metadata_for_sd_hash(self, sd_hash):
|
||||
return "fakeuri", "faketxid"
|
||||
|
||||
|
||||
class PeerFinder(object):
|
||||
def __init__(self, start_port, peer_manager, num_peers):
|
||||
|
@ -136,7 +139,7 @@ class GenFile(io.RawIOBase):
|
|||
return output
|
||||
|
||||
|
||||
class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
|
||||
class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker):
|
||||
"""
|
||||
Class to track peer counts for known blobs, and to discover new popular blobs
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ from lbrynet import analytics
|
|||
from lbrynet.core import Peer
|
||||
from lbrynet.core.server import BlobRequestHandler
|
||||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
|
||||
from tests.mocks import DummyBlobAvailabilityTracker
|
||||
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
|
||||
|
||||
|
||||
class TestBlobRequestHandlerQueries(unittest.TestCase):
|
||||
|
|
|
@ -5,7 +5,7 @@ import mock
|
|||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
|
||||
from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy
|
||||
from lbrynet.core.Offer import Offer
|
||||
from tests.mocks import DummyBlobAvailabilityTracker
|
||||
from tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
|
||||
|
||||
MAX_NEGOTIATION_TURNS = 10
|
||||
random.seed(12345)
|
||||
|
|
Loading…
Reference in a new issue