Merge pull request #457 from lbryio/stream-availability

Reflector v2
This commit is contained in:
Jack Robison 2017-02-06 16:05:14 -05:00 committed by GitHub
commit a167beaad1
8 changed files with 589 additions and 238 deletions

View file

@ -29,7 +29,6 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFa
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.lbrynet_daemon.UIManager import UIManager
@ -266,9 +265,8 @@ class Daemon(AuthJSONRPCServer):
}
self.looping_call_manager = LoopingCallManager(calls)
self.sd_identifier = StreamDescriptorIdentifier()
self.stream_info_manager = TempEncryptedFileMetadataManager()
self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir)
self.lbry_ui_manager = UIManager(root)
self.lbry_file_metadata_manager = None
self.lbry_file_manager = None
@defer.inlineCallbacks
@ -653,12 +651,11 @@ class Daemon(AuthJSONRPCServer):
def _setup_lbry_file_manager(self):
self.startup_status = STARTUP_STAGES[3]
self.lbry_file_metadata_manager = DBEncryptedFileMetadataManager(self.db_dir)
d = self.lbry_file_metadata_manager.setup()
d = self.stream_info_manager.setup()
def set_lbry_file_manager():
self.lbry_file_manager = EncryptedFileManager(
self.session, self.lbry_file_metadata_manager,
self.session, self.stream_info_manager,
self.sd_identifier, download_directory=self.download_directory)
return self.lbry_file_manager.setup()

View file

@ -1,3 +1,69 @@
"""
Reflector is a protocol to re-host lbry blobs and streams
Client queries and server responses follow, all dicts are encoded as json
############# Handshake request and response #############
Upon connecting, the client sends a version handshake:
{
'version': int,
}
The server replies with the same version
{
'version': int,
}
############# Stream descriptor requests and responses #############
(if sending blobs directly this is skipped)
If the client is reflecting a whole stream, they send a stream descriptor request:
{
'sd_blob_hash': str,
'sd_blob_size': int
}
The server indicates if it's aware of this stream already by requesting (or not requesting)
the stream descriptor blob. If the server has a validated copy of the sd blob, it will
include the needed_blobs field (a list of blob hashes missing from reflector) in the response.
If the server does not have the sd blob the needed_blobs field will not be included, as the
server does not know what blobs it is missing - so the client should send all of the blobs
in the stream.
{
'send_sd_blob': bool
'needed_blobs': list, conditional
}
The client may begin the file transfer of the sd blob if send_sd_blob was True.
If the client sends the blob, after receiving it the server indicates if the
transfer was successful:
{
'received_sd_blob': bool
}
If the transfer was not successful (False), the blob is added to the needed_blobs queue
############# Blob requests and responses #############
A client with blobs to reflect (either populated by the client or by the stream descriptor
response) queries if the server is ready to begin transferring a blob
{
'blob_hash': str,
'blob_size': int
}
The server replies, send_blob will be False if the server has a validated copy of the blob:
{
'send_blob': bool
}
The client may begin the raw blob file transfer if the server replied True.
If the client sends the blob, the server replies:
{
'received_blob': bool
}
If the transfer was not successful (False), the blob is re-added to the needed_blobs queue
Blob requests continue for each of the blobs the client has queued to send, when completed
the client disconnects.
"""
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
from lbrynet.reflector.client.blob import BlobReflectorClientFactory as BlobClientFactory

View file

@ -5,7 +5,7 @@ from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
from lbrynet.reflector.common import IncompleteResponse
from lbrynet.reflector.common import IncompleteResponse, REFLECTOR_V2
log = logging.getLogger(__name__)
@ -22,7 +22,7 @@ class BlobReflectorClient(Protocol):
self.next_blob_to_send = None
self.blob_read_handle = None
self.received_handshake_response = False
self.protocol_version = None
self.protocol_version = self.factory.protocol_version
self.file_sender = None
self.producer = None
self.streaming = False
@ -32,7 +32,7 @@ class BlobReflectorClient(Protocol):
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
def dataReceived(self, data):
log.debug('Recieved %s', data)
log.debug('Received %s', data)
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
@ -74,7 +74,7 @@ class BlobReflectorClient(Protocol):
def send_handshake(self):
log.debug('Sending handshake')
self.write(json.dumps({'version': 0}))
self.write(json.dumps({'version': self.protocol_version}))
return defer.succeed(None)
def parse_response(self, buff):
@ -102,7 +102,6 @@ class BlobReflectorClient(Protocol):
def start_transfer(self):
self.sent_blobs = True
self.write(json.dumps({}))
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
@ -111,8 +110,8 @@ class BlobReflectorClient(Protocol):
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.protocol_version = int(response_dict['version'])
if self.protocol_version != 0:
server_version = int(response_dict['version'])
if self.protocol_version != server_version:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True
return defer.succeed(True)
@ -184,6 +183,7 @@ class BlobReflectorClientFactory(ClientFactory):
protocol = BlobReflectorClient
def __init__(self, blob_manager, blobs):
self.protocol_version = REFLECTOR_V2
self.blob_manager = blob_manager
self.blobs = blobs
self.p = None

View file

@ -1,61 +1,13 @@
"""
The reflector protocol (all dicts encoded in json):
Client Handshake (sent once per connection, at the start of the connection):
{
'version': 0,
}
Server Handshake (sent once per connection, after receiving the client handshake):
{
'version': 0,
}
Client Info Request:
{
'blob_hash': "<blob_hash>",
'blob_size': <blob_size>
}
Server Info Response (sent in response to Client Info Request):
{
'send_blob': True|False
}
If response is 'YES', client may send a Client Blob Request or a Client Info Request.
If response is 'NO', client may only send a Client Info Request
Client Blob Request:
{} # Yes, this is an empty dictionary, in case something needs to go here in the future
<raw blob_data> # this blob data must match the info sent in the most recent Client Info Request
Server Blob Response (sent in response to Client Blob Request):
{
'received_blob': True
}
Client may now send another Client Info Request
"""
import json
import logging
from twisted.internet.error import ConnectionRefusedError
from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
from lbrynet.reflector.common import IncompleteResponse
from lbrynet.reflector.common import IncompleteResponse, ReflectorRequestError
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
log = logging.getLogger(__name__)
@ -63,24 +15,31 @@ log = logging.getLogger(__name__)
class EncryptedFileReflectorClient(Protocol):
# Protocol stuff
def connectionMade(self):
log.debug("Connected to reflector")
self.blob_manager = self.factory.blob_manager
self.response_buff = ''
self.outgoing_buff = ''
self.blob_hashes_to_send = []
self.next_blob_to_send = None
self.blob_read_handle = None
self.received_handshake_response = False
self.protocol_version = None
self.read_handle = None
self.sent_stream_info = False
self.received_descriptor_response = False
self.protocol_version = self.factory.protocol_version
self.received_server_version = False
self.server_version = None
self.stream_descriptor = None
self.descriptor_needed = None
self.needed_blobs = []
self.reflected_blobs = []
self.file_sender = None
self.producer = None
self.streaming = False
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
d = self.load_descriptor()
d.addCallback(lambda _: self.send_handshake())
d.addErrback(
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
def dataReceived(self, data):
log.debug('Recieved %s', data)
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
@ -94,8 +53,8 @@ class EncryptedFileReflectorClient(Protocol):
def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
log.info('Finished sending data via reflector')
self.factory.finished_deferred.callback(True)
log.debug('Finished sending data via reflector')
self.factory.finished_deferred.callback(self.reflected_blobs)
else:
log.debug('Reflector finished: %s', reason)
self.factory.finished_deferred.callback(reason)
@ -118,31 +77,54 @@ class EncryptedFileReflectorClient(Protocol):
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def get_blobs_to_send(self, stream_info_manager, stream_hash):
log.debug('Getting blobs from stream hash: %s', stream_hash)
d = stream_info_manager.get_blobs_for_stream(stream_hash)
def get_validated_blobs(self, blobs_in_stream):
def get_blobs(blobs):
for (blob, _, _, blob_len) in blobs:
if blob:
yield self.blob_manager.get_blob(blob, True, blob_len)
def set_blobs(blob_hashes):
for blob_hash, position, iv, length in blob_hashes:
if blob_hash is not None:
self.blob_hashes_to_send.append(blob_hash)
log.debug("Preparing to send %i blobs", len(self.blob_hashes_to_send))
dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()])
return dl
d.addCallback(set_blobs)
def set_blobs_to_send(self, blobs_to_send):
for blob in blobs_to_send:
if blob not in self.blob_hashes_to_send:
self.blob_hashes_to_send.append(blob)
d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash))
def get_blobs_to_send(self):
def _show_missing_blobs(filtered):
if filtered:
needs_desc = "" if not self.descriptor_needed else "descriptor and "
log.info("Reflector needs %s%i blobs for %s",
needs_desc,
len(filtered),
str(self.stream_descriptor)[:16])
return filtered
def set_sd_blobs(sd_blob_hashes):
for sd_blob_hash in sd_blob_hashes:
self.blob_hashes_to_send.append(sd_blob_hash)
log.debug("Preparing to send %i sd blobs", len(sd_blob_hashes))
d.addCallback(set_sd_blobs)
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
d.addCallback(self.get_validated_blobs)
if not self.descriptor_needed:
d.addCallback(lambda filtered:
[blob for blob in filtered if blob.blob_hash in self.needed_blobs])
d.addCallback(_show_missing_blobs)
d.addCallback(self.set_blobs_to_send)
return d
def send_request(self, request_dict):
self.write(json.dumps(request_dict))
def send_handshake(self):
log.debug('Sending handshake')
self.write(json.dumps({'version': 0}))
self.send_request({'version': self.protocol_version})
def load_descriptor(self):
def _save_descriptor_blob(sd_blob):
self.stream_descriptor = sd_blob
d = self.factory.stream_info_manager.get_sd_blob_hashes_for_stream(self.factory.stream_hash)
d.addCallback(lambda sd: self.factory.blob_manager.get_blob(sd[0], True))
d.addCallback(_save_descriptor_blob)
return d
def parse_response(self, buff):
try:
@ -154,8 +136,10 @@ class EncryptedFileReflectorClient(Protocol):
log.warning("An error occurred handling the response: %s", err.getTraceback())
def handle_response(self, response_dict):
if self.received_handshake_response is False:
if not self.received_server_version:
return self.handle_handshake_response(response_dict)
elif not self.received_descriptor_response and self.server_version == REFLECTOR_V2:
return self.handle_descriptor_response(response_dict)
else:
return self.handle_normal_response(response_dict)
@ -168,7 +152,6 @@ class EncryptedFileReflectorClient(Protocol):
return defer.succeed(None)
def start_transfer(self):
self.write(json.dumps({}))
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
@ -177,12 +160,37 @@ class EncryptedFileReflectorClient(Protocol):
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.protocol_version = int(response_dict['version'])
if self.protocol_version != 0:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True
self.server_version = int(response_dict['version'])
if self.server_version not in [REFLECTOR_V1, REFLECTOR_V2]:
raise ValueError("I can't handle protocol version {}!".format(self.server_version))
self.received_server_version = True
return defer.succeed(True)
def handle_descriptor_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_sd_blob' not in response_dict:
raise ReflectorRequestError("I don't know whether to send the sd blob or not!")
if response_dict['send_sd_blob'] is True:
self.file_sender = FileSender()
else:
self.received_descriptor_response = True
self.descriptor_needed = response_dict['send_sd_blob']
self.needed_blobs = response_dict.get('needed_blobs', [])
return self.get_blobs_to_send()
else: # Expecting Server Blob Response
if 'received_sd_blob' not in response_dict:
raise ValueError("I don't know if the sd blob made it to the intended destination!")
else:
self.received_descriptor_response = True
if response_dict['received_sd_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16])
else:
log.warning("Reflector failed to receive descriptor %s, trying again later",
self.next_blob_to_send.blob_hash[:16])
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading()
def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
@ -191,13 +199,19 @@ class EncryptedFileReflectorClient(Protocol):
self.file_sender = FileSender()
return defer.succeed(True)
else:
log.debug("Reflector already has %s", str(self.next_blob_to_send.blob_hash)[:16])
log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16])
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
log.info("Reflector received %s", str(self.next_blob_to_send.blob_hash)[:16])
if response_dict['received_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16])
else:
log.warning("Reflector failed to receive blob %s, trying again later",
self.next_blob_to_send.blob_hash[:16])
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
@ -207,50 +221,59 @@ class EncryptedFileReflectorClient(Protocol):
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return None
else:
log.warning("Can't reflect blob %s", str(blob.blob_hash)[:16])
return defer.fail(ValueError(
return defer.succeed(None)
return defer.fail(ValueError(
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)))
def send_blob_info(self):
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
self.write(json.dumps({
r = {
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}))
}
self.send_request(r)
def send_descriptor_info(self):
assert self.stream_descriptor is not None, "need to have a sd blob to send at this point"
r = {
'sd_blob_hash': self.stream_descriptor.blob_hash,
'sd_blob_size': self.stream_descriptor.length
}
self.sent_stream_info = True
self.send_request(r)
def skip_missing_blob(self, err, blob_hash):
log.warning("Can't reflect blob %s", str(blob_hash)[:16])
err.trap(ValueError)
return self.send_next_request()
def send_next_request(self):
if self.file_sender is not None:
# send the blob
log.debug('Sending %s to reflector', str(self.next_blob_to_send.blob_hash)[:16])
return self.start_transfer()
elif not self.sent_stream_info:
# open the sd blob to send
blob = self.stream_descriptor
d = self.open_blob_for_reading(blob)
d.addCallback(lambda _: self.send_descriptor_info())
return d
elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
log.debug('No current blob, sending the next one: %s', blob_hash)
blob = self.blob_hashes_to_send[0]
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = self.blob_manager.get_blob(blob_hash, True)
d.addCallback(self.open_blob_for_reading)
# send the server the next blob hash + length
d = self.open_blob_for_reading(blob)
d.addCallbacks(lambda _: self.send_blob_info(),
lambda err: self.skip_missing_blob(err, blob_hash))
lambda err: self.skip_missing_blob(err, blob.blob_hash))
return d
else:
# close connection
log.debug('No more blob hashes, closing connection')
self.transport.loseConnection()
# close connection
self.transport.loseConnection()
class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient
def __init__(self, blob_manager, stream_info_manager, stream_hash):
self.protocol_version = REFLECTOR_V2
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash
@ -268,11 +291,13 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
ClientFactory.startFactory(self)
def startedConnecting(self, connector):
log.debug('Started connecting')
log.debug('Connecting to reflector')
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
log.debug("connection lost: %s", reason)
def clientConnectionFailed(self, connector, reason):
log.debug("connection failed: %s", reason)
if reason.check(ConnectionRefusedError):
log.warning("Could not connect to reflector server")
else:
log.error("Reflector connection failed: %s", reason)

View file

@ -1,2 +1,27 @@
REFLECTOR_V1 = 0
REFLECTOR_V2 = 1
class ReflectorClientVersionError(Exception):
"""
Raised by reflector server if client sends an incompatible or unknown version
"""
class ReflectorRequestError(Exception):
"""
Raised by reflector server if client sends a message without the required fields
"""
class ReflectorRequestDecodeError(Exception):
"""
Raised by reflector server if client sends an invalid json request
"""
class IncompleteResponse(Exception):
pass
"""
Raised by reflector server when client sends a portion of a json request,
used buffering the incoming request
"""

View file

@ -1,5 +1,6 @@
import logging
from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.internet.error import ConnectionLost, ConnectionDone
from lbrynet.reflector import BlobClientFactory, ClientFactory
log = logging.getLogger(__name__)
@ -27,19 +28,13 @@ def _reflect_stream(lbry_file, reflector_server):
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: log.info("Connected to %s", reflector_address))
d.addCallback(lambda _: factory.finished_deferred)
d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s",
len(reflected_blobs),
lbry_file.uri))
return d
def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server):
if reflector_has_stream:
log.info("lbry://%s is available", lbry_file.uri)
return defer.succeed(False)
log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri)
return _reflect_stream(lbry_file, reflector_server)
def _catch_error(err, uri):
msg = "An error occurred while checking availability for lbry://%s: %s"
log.error(msg, uri, err.getTraceback())
@ -47,5 +42,6 @@ def _catch_error(err, uri):
def check_and_restore_availability(lbry_file, reflector_server):
d = _reflect_stream(lbry_file, reflector_server)
d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost))
d.addErrback(_catch_error, lbry_file.uri)
return d

View file

@ -1,21 +1,36 @@
import logging
import json
from twisted.python import failure
from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ServerFactory
import json
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError
log = logging.getLogger(__name__)
MAXIMUM_QUERY_SIZE = 200
SEND_SD_BLOB = 'send_sd_blob'
SEND_BLOB = 'send_blob'
RECEIVED_SD_BLOB = 'received_sd_blob'
RECEIVED_BLOB = 'received_blob'
NEEDED_BLOBS = 'needed_blobs'
VERSION = 'version'
BLOB_SIZE = 'blob_size'
BLOB_HASH = 'blob_hash'
SD_BLOB_SIZE = 'sd_blob_size'
SD_BLOB_HASH = 'sd_blob_hash'
class ReflectorServer(Protocol):
def connectionMade(self):
peer_info = self.transport.getPeer()
log.debug('Connection made to %s', peer_info)
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.blob_manager = self.factory.blob_manager
self.protocol_version = self.factory.protocol_version
self.received_handshake = False
self.peer_version = None
self.receiving_blob = False
@ -28,6 +43,60 @@ class ReflectorServer(Protocol):
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
log.info("Reflector upload from %s finished" % self.peer.host)
def handle_error(self, err):
log.error(err.getTraceback())
self.transport.loseConnection()
def send_response(self, response_dict):
self.transport.write(json.dumps(response_dict))
############################
# Incoming blob file stuff #
############################
def clean_up_failed_upload(self, err, blob):
log.warning("Failed to receive %s", blob)
if err.check(DownloadCanceledError):
self.blob_manager.delete_blobs([blob.blob_hash])
else:
log.exception(err)
@defer.inlineCallbacks
def _on_completed_blob(self, blob, response_key):
yield self.blob_manager.blob_completed(blob)
yield self.close_blob()
log.info("Received %s", blob)
yield self.send_response({response_key: True})
@defer.inlineCallbacks
def _on_failed_blob(self, err, response_key):
yield self.clean_up_failed_upload(err, self.incoming_blob)
yield self.send_response({response_key: False})
def handle_incoming_blob(self, response_key):
"""
Open blob for writing and send a response indicating if the transfer was
successful when finished.
response_key will either be received_blob or received_sd_blob
"""
blob = self.incoming_blob
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
self.blob_finished_d.addCallback(self._on_completed_blob, response_key)
self.blob_finished_d.addErrback(self._on_failed_blob, response_key)
def close_blob(self):
self.blob_finished_d = None
self.blob_write = None
self.cancel_write = None
self.incoming_blob = None
self.receiving_blob = False
####################
# Request handling #
####################
def dataReceived(self, data):
if self.receiving_blob:
self.blob_write(data)
@ -38,7 +107,7 @@ class ReflectorServer(Protocol):
if msg is not None:
self.request_buff = ''
d = self.handle_request(msg)
d.addCallbacks(self.send_response, self.handle_error)
d.addErrback(self.handle_error)
if self.receiving_blob and extra_data:
log.debug('Writing extra data to blob')
self.blob_write(extra_data)
@ -47,15 +116,15 @@ class ReflectorServer(Protocol):
extra_data = None
response = None
curr_pos = 0
while True:
while not self.receiving_blob:
next_close_paren = response_msg.find('}', curr_pos)
if next_close_paren != -1:
curr_pos = next_close_paren + 1
try:
response = json.loads(response_msg[:curr_pos])
except ValueError:
if curr_pos > 100:
raise Exception("error decoding response")
if curr_pos > MAXIMUM_QUERY_SIZE:
raise ValueError("Error decoding response: %s" % str(response_msg))
else:
pass
else:
@ -65,73 +134,185 @@ class ReflectorServer(Protocol):
break
return response, extra_data
def need_handshake(self):
return self.received_handshake is False
def is_descriptor_request(self, request_dict):
if SD_BLOB_HASH not in request_dict or SD_BLOB_SIZE not in request_dict:
return False
if not is_valid_blobhash(request_dict[SD_BLOB_HASH]):
raise InvalidBlobHashError(request_dict[SD_BLOB_HASH])
return True
def is_blob_request(self, request_dict):
if BLOB_HASH not in request_dict or BLOB_SIZE not in request_dict:
return False
if not is_valid_blobhash(request_dict[BLOB_HASH]):
raise InvalidBlobHashError(request_dict[BLOB_HASH])
return True
def handle_request(self, request_dict):
if self.received_handshake is False:
if self.need_handshake():
return self.handle_handshake(request_dict)
else:
return self.handle_normal_request(request_dict)
if self.is_descriptor_request(request_dict):
return self.handle_descriptor_request(request_dict)
if self.is_blob_request(request_dict):
return self.handle_blob_request(request_dict)
raise ReflectorRequestError("Invalid request")
def handle_handshake(self, request_dict):
log.debug('Handling handshake')
if 'version' not in request_dict:
raise ValueError("Client should send version")
self.peer_version = int(request_dict['version'])
if self.peer_version != 0:
raise ValueError("I don't know that version!")
"""
Upon connecting, the client sends a version handshake:
{
'version': int,
}
The server replies with the same version if it is supported
{
'version': int,
}
"""
if VERSION not in request_dict:
raise ReflectorRequestError("Client should send version")
if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]:
raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION]))
log.debug('Handling handshake for client version %i', self.peer_version)
self.peer_version = int(request_dict[VERSION])
self.received_handshake = True
return defer.succeed({'version': 0})
return self.send_handshake_response()
def determine_blob_needed(self, blob):
if blob.is_validated():
return {'send_blob': False}
else:
self.incoming_blob = blob
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) # pylint: disable=line-too-long
self.blob_finished_d.addCallback(lambda _: self.blob_manager.blob_completed(blob))
return {'send_blob': True}
def send_handshake_response(self):
d = defer.succeed({VERSION: self.peer_version})
d.addCallback(self.send_response)
return d
def close_blob(self):
self.blob_finished_d = None
self.blob_write = None
self.cancel_write = None
self.incoming_blob = None
self.receiving_blob = False
def handle_descriptor_request(self, request_dict):
"""
If the client is reflecting a whole stream, they send a stream descriptor request:
{
'sd_blob_hash': str,
'sd_blob_size': int
}
The server indicates if it's aware of this stream already by requesting (or not requesting)
the stream descriptor blob. If the server has a validated copy of the sd blob, it will
include the needed_blobs field (a list of blob hashes missing from reflector) in the
response. If the server does not have the sd blob the needed_blobs field will not be
included, as the server does not know what blobs it is missing - so the client should send
all of the blobs in the stream.
{
'send_sd_blob': bool
'needed_blobs': list, conditional
}
The client may begin the file transfer of the sd blob if send_sd_blob was True.
If the client sends the blob, after receiving it the server indicates if the
transfer was successful:
{
'received_sd_blob': bool
}
"""
sd_blob_hash = request_dict[SD_BLOB_HASH]
sd_blob_size = request_dict[SD_BLOB_SIZE]
def handle_normal_request(self, request_dict):
if self.blob_write is None:
# we haven't opened a blob yet, meaning we must be waiting for the
# next message containing a blob hash and a length. this message
# should be it. if it's one we want, open the blob for writing, and
# return a nice response dict (in a Deferred) saying go ahead
if not 'blob_hash' in request_dict or not 'blob_size' in request_dict:
raise ValueError("Expected a blob hash and a blob size")
if not is_valid_blobhash(request_dict['blob_hash']):
raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash']))
log.debug('Recieved info for blob: %s', request_dict['blob_hash'])
d = self.blob_manager.get_blob(
request_dict['blob_hash'],
True,
int(request_dict['blob_size'])
)
d.addCallback(self.determine_blob_needed)
d = self.blob_manager.get_blob(sd_blob_hash, True, sd_blob_size)
d.addCallback(self.get_descriptor_response)
d.addCallback(self.send_response)
else:
self.receiving_blob = True
d = self.blob_finished_d
return d
def get_descriptor_response(self, sd_blob):
if sd_blob.is_validated():
d = defer.succeed({SEND_SD_BLOB: False})
d.addCallback(self.request_needed_blobs, sd_blob)
else:
self.incoming_blob = sd_blob
self.receiving_blob = True
self.handle_incoming_blob(RECEIVED_SD_BLOB)
d = defer.succeed({SEND_SD_BLOB: True})
return d
def request_needed_blobs(self, response, sd_blob):
def _add_needed_blobs_to_response(needed_blobs):
response.update({NEEDED_BLOBS: needed_blobs})
return response
d = self.determine_missing_blobs(sd_blob)
d.addCallback(_add_needed_blobs_to_response)
return d
def determine_missing_blobs(self, sd_blob):
with sd_blob.open_for_reading() as sd_file:
sd_blob_data = sd_file.read()
decoded_sd_blob = json.loads(sd_blob_data)
return self.get_unvalidated_blobs_in_stream(decoded_sd_blob)
def get_unvalidated_blobs_in_stream(self, sd_blob):
dl = defer.DeferredList(list(self._iter_unvalidated_blobs_in_stream(sd_blob)),
consumeErrors=True)
dl.addCallback(lambda needed: [blob[1] for blob in needed if blob[1]])
return dl
def _iter_unvalidated_blobs_in_stream(self, sd_blob):
for blob in sd_blob['blobs']:
if 'blob_hash' in blob and 'length' in blob:
blob_hash, blob_len = blob['blob_hash'], blob['length']
d = self.blob_manager.get_blob(blob_hash, True, blob_len)
d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None)
yield d
def handle_blob_request(self, request_dict):
"""
A client queries if the server will accept a blob
{
'blob_hash': str,
'blob_size': int
}
The server replies, send_blob will be False if the server has a validated copy of the blob:
{
'send_blob': bool
}
The client may begin the raw blob file transfer if the server replied True.
If the client sends the blob, the server replies:
{
'received_blob': bool
}
"""
blob_hash = request_dict[BLOB_HASH]
blob_size = request_dict[BLOB_SIZE]
if self.blob_write is None:
log.debug('Received info for blob: %s', blob_hash[:16])
d = self.blob_manager.get_blob(blob_hash, True, blob_size)
d.addCallback(self.get_blob_response)
d.addCallback(self.send_response)
else:
# we have a blob open already, so this message should have nothing
# important in it. to the deferred that fires when the blob is done,
# add a callback which returns a nice response dict saying to keep
# sending, and then return that deferred
log.debug('blob is already open')
self.receiving_blob = True
d = self.blob_finished_d
d.addCallback(lambda _: self.close_blob())
d.addCallback(lambda _: {'received_blob': True})
return d
def send_response(self, response_dict):
self.transport.write(json.dumps(response_dict))
def handle_error(self, err):
log.error(err.getTraceback())
self.transport.loseConnection()
def get_blob_response(self, blob):
if blob.is_validated():
return defer.succeed({SEND_BLOB: False})
else:
self.incoming_blob = blob
self.receiving_blob = True
self.handle_incoming_blob(RECEIVED_BLOB)
d = defer.succeed({SEND_BLOB: True})
return d
class ReflectorServerFactory(ServerFactory):
@ -140,6 +321,7 @@ class ReflectorServerFactory(ServerFactory):
def __init__(self, peer_manager, blob_manager):
self.peer_manager = peer_manager
self.blob_manager = blob_manager
self.protocol_version = REFLECTOR_V2
def buildProtocol(self, addr):
log.debug('Creating a protocol for %s', addr)

View file

@ -29,32 +29,8 @@ class TestReflector(unittest.TestCase):
self.lbry_file_manager = None
self.server_blob_manager = None
self.reflector_port = None
self.port = None
self.addCleanup(self.take_down_env)
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
if self.server_blob_manager is not None:
d.addCallback(lambda _: self.server_blob_manager.stop())
if self.reflector_port is not None:
d.addCallback(lambda _: self.reflector_port.stopListening())
def delete_test_env():
try:
shutil.rmtree('client')
except:
raise unittest.SkipTest("TODO: fix this for windows")
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
d.addErrback(lambda err: str(err))
return d
def test_reflector(self):
wallet = mocks.Wallet()
peer_manager = PeerManager.PeerManager()
peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
@ -98,7 +74,7 @@ class TestReflector(unittest.TestCase):
dht_node_class=Node
)
self.stream_info_manager = EncryptedFileMetadataManager.TempEncryptedFileMetadataManager()
self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(db_dir)
self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(
self.session, self.stream_info_manager, sd_identifier)
@ -118,6 +94,7 @@ class TestReflector(unittest.TestCase):
self.expected_blobs.append((sd_hash, 923))
def verify_stream_descriptor_file(stream_hash):
self.stream_hash = stream_hash
d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(verify_equal)
d.addCallback(
@ -127,7 +104,6 @@ class TestReflector(unittest.TestCase):
)
)
d.addCallback(save_sd_blob_hash)
d.addCallback(lambda _: stream_hash)
return d
def create_stream():
@ -149,45 +125,129 @@ class TestReflector(unittest.TestCase):
while self.reflector_port is None:
try:
self.reflector_port = reactor.listenTCP(port, server_factory)
self.port = port
except error.CannotListenError:
port += 1
return defer.succeed(port)
def send_to_server(port, stream_hash):
factory = reflector.ClientFactory(
self.session.blob_manager,
self.stream_info_manager,
stream_hash
)
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
d.addCallback(lambda _: start_server())
return d
from twisted.internet import reactor
reactor.connectTCP('localhost', port, factory)
return factory.finished_deferred
def take_down_env(self):
d = defer.succeed(True)
if self.lbry_file_manager is not None:
d.addCallback(lambda _: self.lbry_file_manager.stop())
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.stream_info_manager is not None:
d.addCallback(lambda _: self.stream_info_manager.stop())
if self.server_blob_manager is not None:
d.addCallback(lambda _: self.server_blob_manager.stop())
if self.reflector_port is not None:
d.addCallback(lambda _: self.reflector_port.stopListening())
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertEqual(blob_size, blob.length)
def delete_test_env():
try:
shutil.rmtree('client')
except:
raise unittest.SkipTest("TODO: fix this for windows")
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash, True)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
d.addErrback(lambda err: str(err))
return d
def test_stream_reflector(self):
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def upload_to_reflector(stream_hash):
d = start_server()
d.addCallback(lambda port: send_to_server(port, stream_hash))
d.addCallback(lambda _: verify_data_on_reflector())
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash, True)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
d.addCallback(lambda _: create_stream())
d.addCallback(verify_stream_descriptor_file)
d.addCallback(upload_to_reflector)
def send_to_server():
factory = reflector.ClientFactory(
self.session.blob_manager,
self.stream_info_manager,
self.stream_hash
)
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertEqual(blob_size, blob.length)
return
d = send_to_server()
d.addCallback(lambda _: verify_data_on_reflector())
return d
def test_blob_reflector(self):
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash, True)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def send_to_server(blob_hashes_to_send):
factory = reflector.BlobClientFactory(
self.session.blob_manager,
blob_hashes_to_send
)
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertEqual(blob_size, blob.length)
d = send_to_server([x[0] for x in self.expected_blobs])
d.addCallback(lambda _: verify_data_on_reflector())
return d
def test_blob_reflector_v1(self):
def verify_data_on_reflector():
check_blob_ds = []
for blob_hash, blob_size in self.expected_blobs:
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
return defer.DeferredList(check_blob_ds)
def verify_have_blob(blob_hash, blob_size):
d = self.server_blob_manager.get_blob(blob_hash, True)
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
return d
def send_to_server(blob_hashes_to_send):
factory = reflector.BlobClientFactory(
self.session.blob_manager,
blob_hashes_to_send
)
factory.protocol_version = 0
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)
return factory.finished_deferred
def verify_blob_completed(blob, blob_size):
self.assertTrue(blob.is_validated())
self.assertEqual(blob_size, blob.length)
d = send_to_server([x[0] for x in self.expected_blobs])
d.addCallback(lambda _: verify_data_on_reflector())
return d