get reflector client and server working; add func test to prove it

This commit is contained in:
Jimmy Kiselak 2016-08-09 00:59:50 -04:00
parent b7e2e87ac1
commit 7e2ad58edd
3 changed files with 194 additions and 21 deletions

View file

@ -51,6 +51,7 @@ import json
import logging import logging
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -74,8 +75,11 @@ class LBRYFileReflectorClient(Protocol):
self.received_handshake_response = False self.received_handshake_response = False
self.protocol_version = None self.protocol_version = None
self.file_sender = None self.file_sender = None
self.producer = None
self.streaming = False
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
d.addCallback(lambda _: self.send_handshake()) d.addCallback(lambda _: self.send_handshake())
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
def dataReceived(self, data): def dataReceived(self, data):
self.response_buff += data self.response_buff += data
@ -86,45 +90,64 @@ class LBRYFileReflectorClient(Protocol):
else: else:
self.response_buff = '' self.response_buff = ''
d = self.handle_response(msg) d = self.handle_response(msg)
d.addCallbacks(lambda _: self.send_next_request(), self.response_failure_handler) d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
def connectionLost(self, reason): def connectionLost(self, reason):
pass if reason.check(error.ConnectionDone):
self.factory.finished_deferred.callback(True)
else:
self.factory.finished_deferred.callback(reason)
# IConsumer stuff # IConsumer stuff
def registerProducer(self, producer, streaming): def registerProducer(self, producer, streaming):
assert streaming is True self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self): def unregisterProducer(self):
self.transport.loseConnection() self.producer = None
def write(self, data): def write(self, data):
self.transport.write(data) self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def get_blobs_to_send(self, stream_info_manager, stream_hash): def get_blobs_to_send(self, stream_info_manager, stream_hash):
d = stream_info_manager.get_blobs_for_stream(stream_hash) d = stream_info_manager.get_blobs_for_stream(stream_hash)
def set_blobs(blob_hashes): def set_blobs(blob_hashes):
for blob_hash, position, iv, length in blob_hashes: for blob_hash, position, iv, length in blob_hashes:
self.blob_hashes_to_send.append(blob_hash) if blob_hash is not None:
self.blob_hashes_to_send.append(blob_hash)
d.addCallback(set_blobs) d.addCallback(set_blobs)
d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream) d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash))
def set_sd_blobs(sd_blob_hashes): def set_sd_blobs(sd_blob_hashes):
for sd_blob_hash, in sd_blob_hashes: for sd_blob_hash in sd_blob_hashes:
self.blob_hashes_to_send.append(sd_blob_hash) self.blob_hashes_to_send.append(sd_blob_hash)
d.addCallback(set_sd_blobs) d.addCallback(set_sd_blobs)
return d return d
def send_handshake(self):
self.write(json.dumps({'version': 0}))
def parse_response(self, buff): def parse_response(self, buff):
try: try:
return json.loads(buff) return json.loads(buff)
except ValueError: except ValueError:
raise IncompleteResponseError() raise IncompleteResponseError()
def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())
def handle_response(self, response_dict): def handle_response(self, response_dict):
if self.received_handshake_response is False: if self.received_handshake_response is False:
return self.handle_handshake_response(response_dict) return self.handle_handshake_response(response_dict)
@ -132,15 +155,15 @@ class LBRYFileReflectorClient(Protocol):
return self.handle_normal_response(response_dict) return self.handle_normal_response(response_dict)
def set_not_uploading(self): def set_not_uploading(self):
if self.currently_uploading is not None: if self.next_blob_to_send is not None:
self.currently_uploading.close_read_handle(self.read_handle) self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None self.read_handle = None
self.currently_uploading = None self.next_blob_to_send = None
self.file_sender = None self.file_sender = None
return defer.succeed(None)
def start_transfer(self): def start_transfer(self):
self.write(json.dumps("{}")) self.write(json.dumps({}))
log.info("Starting the file upload")
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self) d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d return d
@ -152,14 +175,15 @@ class LBRYFileReflectorClient(Protocol):
if self.protocol_version != 0: if self.protocol_version != 0:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True self.received_handshake_response = True
return defer.succeed(True)
def handle_normal_response(self, response_dict): def handle_normal_response(self, response_dict):
if self.next_blob_to_send is not None: # Expecting Server Info Response if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict: if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!") raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True: if response_dict['send_blob'] is True:
self.file_sender = FileSender() self.file_sender = FileSender()
return True return defer.succeed(True)
else: else:
return self.set_not_uploading() return self.set_not_uploading()
else: # Expecting Server Blob Response else: # Expecting Server Blob Response
@ -175,13 +199,13 @@ class LBRYFileReflectorClient(Protocol):
self.next_blob_to_send = blob self.next_blob_to_send = blob
self.read_handle = read_handle self.read_handle = read_handle
return None return None
raise ValueError("Couldn't open that blob for some reason") raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
def send_blob_info(self): def send_blob_info(self):
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
self.write(json.dumps({ self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash, 'blob_hash': self.next_blob_to_send.blob_hash,
'blob_length': self.next_blob_to_send.length 'blob_size': self.next_blob_to_send.length
})) }))
def send_next_request(self): def send_next_request(self):
@ -210,6 +234,7 @@ class LBRYFileReflectorClientFactory(ClientFactory):
self.stream_info_manager = stream_info_manager self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash self.stream_hash = stream_hash
self.p = None self.p = None
self.finished_deferred = defer.Deferred()
def buildProtocol(self, addr): def buildProtocol(self, addr):
p = self.protocol() p = self.protocol()

View file

@ -1,6 +1,6 @@
import logging import logging
from twisted.python import failure from twisted.python import failure
from twisted.internet import error from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ServerFactory from twisted.internet.protocol import Protocol, ServerFactory
import json import json
@ -72,7 +72,8 @@ class ReflectorServer(Protocol):
self.peer_version = int(request_dict['version']) self.peer_version = int(request_dict['version'])
if self.peer_version != 0: if self.peer_version != 0:
raise ValueError("I don't know that version!") raise ValueError("I don't know that version!")
return {'version': 0} self.received_handshake = True
return defer.succeed({'version': 0})
def determine_blob_needed(self, blob): def determine_blob_needed(self, blob):
if blob.is_validated(): if blob.is_validated():
@ -87,6 +88,7 @@ class ReflectorServer(Protocol):
self.blob_write = None self.blob_write = None
self.cancel_write = None self.cancel_write = None
self.incoming_blob = None self.incoming_blob = None
self.receiving_blob = False
def handle_normal_request(self, request_dict): def handle_normal_request(self, request_dict):
if self.blob_write is None: if self.blob_write is None:
@ -113,11 +115,10 @@ class ReflectorServer(Protocol):
d = self.blob_finished_d d = self.blob_finished_d
d.addCallback(lambda _: self.close_blob()) d.addCallback(lambda _: self.close_blob())
d.addCallback(lambda _: {'received_blob': True}) d.addCallback(lambda _: {'received_blob': True})
d.addCallback(self.send_response)
return d return d
def send_response(self, response_dict): def send_response(self, response_dict):
self.write(json.dumps(response_dict)) self.transport.write(json.dumps(response_dict))
def handle_error(self, err): def handle_error(self, err):
pass pass

View file

@ -26,7 +26,7 @@ from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.lbryfilemanager.LBRYFileCreator import create_lbry_file from lbrynet.lbryfilemanager.LBRYFileCreator import create_lbry_file
from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.StreamDescriptor import get_sd_info from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from twisted.internet import defer, threads, task from twisted.internet import defer, threads, task, error
from twisted.trial.unittest import TestCase from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure from twisted.python.failure import Failure
import os import os
@ -38,6 +38,10 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier
from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier
from lbrynet.core.BlobManager import TempBlobManager
from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory
from lbrynet.reflector.server.server import ReflectorServerFactory
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
log_format = "%(funcName)s(): %(message)s" log_format = "%(funcName)s(): %(message)s"
@ -1370,3 +1374,146 @@ class TestStreamify(TestCase):
d.addCallback(lambda _: create_stream()) d.addCallback(lambda _: create_stream())
d.addCallback(combine_stream) d.addCallback(combine_stream)
return d return d
class TestReflector(TestCase):
def setUp(self):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.server_blob_manager = None
self.reflector_port = None
self.addCleanup(self.take_down_env)
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
if self.server_blob_manager is not None:
d.addCallback(lambda _: self.server_blob_manager.stop())
if self.reflector_port is not None:
d.addCallback(lambda _: self.reflector_port.stopListening())
def delete_test_env():
shutil.rmtree('client')
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d
def test_reflector(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
self.expected_blobs = [
('dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b'
'441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', 2097152),
('f4067522c1b49432a2a679512e3917144317caa1abba0c04'
'1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', 2097152),
('305486c434260484fcb2968ce0e963b72f81ba56c11b08b1'
'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', 1015056),
]
db_dir = "client"
os.mkdir(db_dir)
self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
self.stream_info_manager = TempLBRYFileMetadataManager()
self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier)
self.server_blob_manager = TempBlobManager(hash_announcer)
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: self.server_blob_manager.setup())
def verify_equal(sd_info):
self.assertEqual(sd_info, test_create_stream_sd_file)
def save_sd_blob_hash(sd_hash):
self.expected_blobs.append((sd_hash, 923))
def verify_stream_descriptor_file(stream_hash):
d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
d.addCallback(lambda _: publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager, stream_hash))
d.addCallback(save_sd_blob_hash)
d.addCallback(lambda _: stream_hash)
return d
def iv_generator():
iv = 0
while 1:
iv += 1
yield "%016d" % iv
def create_stream():
test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)]))
d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file,
key="0123456701234567", iv_generator=iv_generator())
return d
def start_server():
server_factory = ReflectorServerFactory(peer_manager, self.server_blob_manager)
from twisted.internet import reactor
port = 8943
while self.reflector_port is None:
try:
self.reflector_port = reactor.listenTCP(port, server_factory)
except error.CannotListenError:
port += 1
return defer.succeed(port)
def send_to_server(port, stream_hash):
factory = LBRYFileReflectorClientFactory(
self.session.blob_manager,
self.stream_info_manager,
stream_hash
)
from twisted.internet import reactor
reactor.connectTCP('localhost', port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertEqual(blob_size, blob.length)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash, True)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def upload_to_reflector(stream_hash):
d = start_server()
d.addCallback(lambda port: send_to_server(port, stream_hash))
d.addCallback(lambda _: verify_data_on_reflector())
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
d.addCallback(upload_to_reflector)
return d