delete old reflector code

This commit is contained in:
Jack Robison 2019-01-22 16:14:35 -05:00 committed by Lex Berezhny
parent 34a725f7ad
commit 273e614715
8 changed files with 0 additions and 1032 deletions

View file

@ -1,65 +0,0 @@
__doc__ = """
Reflector is a protocol to re-host lbry blobs and streams
Client queries and server responses follow, all dicts are encoded as json
############# Handshake request and response #############
Upon connecting, the client sends a version handshake:
{
'version': int,
}
The server replies with the same version
{
'version': int,
}
############# Stream descriptor requests and responses #############
(if sending blobs directly this is skipped)
If the client is reflecting a whole stream, they send a stream descriptor request:
{
'sd_blob_hash': str,
'sd_blob_size': int
}
The server indicates if it's aware of this stream already by requesting (or not requesting)
the stream descriptor blob. If the server has a validated copy of the sd blob, it will
include the needed_blobs field (a list of blob hashes missing from reflector) in the response.
If the server does not have the sd blob the needed_blobs field will not be included, as the
server does not know what blobs it is missing - so the client should send all of the blobs
in the stream.
{
'send_sd_blob': bool
'needed_blobs': list, conditional
}
The client may begin the file transfer of the sd blob if send_sd_blob was True.
If the client sends the blob, after receiving it the server indicates if the
transfer was successful:
{
'received_sd_blob': bool
}
If the transfer was not successful (False), the blob is added to the needed_blobs queue
############# Blob requests and responses #############
A client with blobs to reflect (either populated by the client or by the stream descriptor
response) queries if the server is ready to begin transferring a blob
{
'blob_hash': str,
'blob_size': int
}
The server replies, send_blob will be False if the server has a validated copy of the blob:
{
'send_blob': bool
}
The client may begin the raw blob file transfer if the server replied True.
If the client sends the blob, the server replies:
{
'received_blob': bool
}
If the transfer was not successful (False), the blob is re-added to the needed_blobs queue
Blob requests continue for each of the blobs the client has queued to send, when completed
the client disconnects.
"""

View file

@ -1,206 +0,0 @@
import json
import logging
from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
from lbrynet.extras.reflector.common import IncompleteResponse, REFLECTOR_V2
log = logging.getLogger(__name__)
class BlobReflectorClient(Protocol):
# Protocol stuff
def connectionMade(self):
self.blob_manager = self.factory.blob_manager
self.response_buff = b''
self.outgoing_buff = ''
self.blob_hashes_to_send = self.factory.blobs
self.next_blob_to_send = None
self.blob_read_handle = None
self.received_handshake_response = False
self.protocol_version = self.factory.protocol_version
self.file_sender = None
self.producer = None
self.streaming = False
self.reflected_blobs = []
d = self.send_handshake()
d.addErrback(
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
def dataReceived(self, data):
log.debug('Received %s', data)
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
except IncompleteResponse:
pass
else:
self.response_buff = b''
d = self.handle_response(msg)
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
if self.reflected_blobs:
log.info('Finished sending data via reflector')
self.factory.finished_deferred.callback(self.reflected_blobs)
else:
log.info('Reflector finished: %s', reason)
self.factory.finished_deferred.callback(reason)
# IConsumer stuff
def registerProducer(self, producer, streaming):
self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.producer = None
def write(self, data):
self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def send_handshake(self):
log.debug('Sending handshake')
self.write(json.dumps({'version': self.protocol_version}).encode())
return defer.succeed(None)
def parse_response(self, buff):
try:
return json.loads(buff)
except ValueError:
raise IncompleteResponse()
def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())
def handle_response(self, response_dict):
if self.received_handshake_response is False:
return self.handle_handshake_response(response_dict)
else:
return self.handle_normal_response(response_dict)
def set_not_uploading(self):
if self.next_blob_to_send is not None:
self.read_handle.close()
self.read_handle = None
self.next_blob_to_send = None
self.file_sender = None
return defer.succeed(None)
def start_transfer(self):
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
d.addCallback(lambda _: self.read_handle.close())
return d
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
server_version = int(response_dict['version'])
if self.protocol_version != server_version:
raise ValueError(f"I can't handle protocol version {self.protocol_version}!")
self.received_handshake_response = True
return defer.succeed(True)
def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return defer.succeed(True)
else:
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
if response_dict['received_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.get_is_verified():
read_handle = blob.open_for_reading()
if read_handle is not None:
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return None
raise ValueError(
f"Couldn't open that blob for some reason. blob_hash: {blob.blob_hash}")
def send_blob_info(self):
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
log.debug('sending blob info')
self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}).encode())
def disconnect(self, err):
self.transport.loseConnection()
def send_next_request(self):
if self.file_sender is not None:
# send the blob
log.debug('Sending the blob')
return self.start_transfer()
elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
log.debug('No current blob, sending the next one: %s', blob_hash)
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
blob = self.blob_manager.get_blob(blob_hash)
self.open_blob_for_reading(blob)
# send the server the next blob hash + length
return self.send_blob_info()
else:
# close connection
log.debug('No more blob hashes, closing connection')
self.transport.loseConnection()
class BlobReflectorClientFactory(ClientFactory):
protocol = BlobReflectorClient
def __init__(self, blob_manager, blobs):
self.protocol_version = REFLECTOR_V2
self.blob_manager = blob_manager
self.blobs = blobs
self.p = None
self.finished_deferred = defer.Deferred()
def buildProtocol(self, addr):
p = self.protocol()
p.factory = self
self.p = p
return p
def startFactory(self):
log.debug('Starting reflector factory')
ClientFactory.startFactory(self)
def startedConnecting(self, connector):
log.debug('Started connecting')
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
log.debug("connection lost: %s", reason.getErrorMessage())
def clientConnectionFailed(self, connector, reason):
log.debug("connection failed: %s", reason.getErrorMessage())

View file

@ -1,346 +0,0 @@
import json
import logging
from twisted.internet.error import ConnectionRefusedError
from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
from lbrynet.extras.compat import f2d
from lbrynet.extras.reflector.common import IncompleteResponse, ReflectorRequestError
from lbrynet.extras.reflector.common import REFLECTOR_V1, REFLECTOR_V2
log = logging.getLogger(__name__)
class EncryptedFileReflectorClient(Protocol):
# Protocol stuff
def connectionMade(self):
log.debug("Connected to reflector")
self.response_buff = b''
self.outgoing_buff = b''
self.blob_hashes_to_send = []
self.failed_blob_hashes = []
self.next_blob_to_send = None
self.read_handle = None
self.sent_stream_info = False
self.received_descriptor_response = False
self.received_server_version = False
self.server_version = None
self.stream_descriptor = None
self.descriptor_needed = None
self.needed_blobs = []
self.reflected_blobs = []
self.file_sender = None
self.producer = None
self.streaming = False
self.blob_manager = self.factory.blob_manager
self.protocol_version = self.factory.protocol_version
self.stream_hash = self.factory.stream_hash
self.sd_hash = self.factory.sd_hash
d = self.load_descriptor()
d.addCallback(lambda _: self.send_handshake())
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
def dataReceived(self, data):
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
except IncompleteResponse:
pass
else:
self.response_buff = b''
d = self.handle_response(msg)
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
def store_result(self, result):
if not self.needed_blobs or len(self.reflected_blobs) == len(self.needed_blobs):
reflected = True
else:
reflected = False
d = f2d(self.blob_manager.storage.update_reflected_stream(
self.sd_hash, self.transport.getPeer().host, reflected
))
d.addCallback(lambda _: result)
return d
def connectionLost(self, reason):
# make sure blob file readers get closed
self.set_not_uploading()
if reason.check(error.ConnectionDone):
if not self.needed_blobs:
log.info("Reflector has all blobs for %s", self.stream_descriptor)
elif not self.reflected_blobs:
log.info("No more completed blobs for %s to reflect, %i are still needed",
self.stream_descriptor, len(self.needed_blobs))
else:
log.info('Finished sending reflector %i blobs for %s',
len(self.reflected_blobs), self.stream_descriptor)
result = self.reflected_blobs
elif reason.check(error.ConnectionLost):
log.warning("Stopped reflecting %s after sending %i blobs",
self.stream_descriptor, len(self.reflected_blobs))
result = self.reflected_blobs
else:
log.info('Reflector finished for %s: %s', self.stream_descriptor,
reason)
result = reason
self.factory.finished_deferred.addCallback(self.store_result)
self.factory.finished_deferred.callback(result)
# IConsumer stuff
def registerProducer(self, producer, streaming):
self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.producer = None
def write(self, data):
self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def get_validated_blobs(self, blobs_in_stream):
def get_blobs(blobs):
for crypt_blob in blobs:
if crypt_blob.blob_hash and crypt_blob.length:
yield self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length)
return [blob for blob in get_blobs(blobs_in_stream) if blob.get_is_verified()]
def set_blobs_to_send(self, blobs_to_send):
for blob in blobs_to_send:
if blob.blob_hash not in self.blob_hashes_to_send:
self.blob_hashes_to_send.append(blob.blob_hash)
def get_blobs_to_send(self):
def _show_missing_blobs(filtered):
if filtered:
needs_desc = "" if not self.descriptor_needed else "descriptor and "
log.info("Reflector needs %s%i blobs for stream",
needs_desc,
len(filtered))
return filtered
d = f2d(self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash))
d.addCallback(self.get_validated_blobs)
if not self.descriptor_needed:
d.addCallback(lambda filtered:
[blob for blob in filtered if blob.blob_hash in self.needed_blobs])
d.addCallback(_show_missing_blobs)
d.addCallback(self.set_blobs_to_send)
d.addCallback(lambda _: None if self.descriptor_needed else self.set_not_uploading())
return d
def send_request(self, request_dict):
self.write(json.dumps(request_dict).encode())
def send_handshake(self):
self.send_request({'version': self.protocol_version})
@defer.inlineCallbacks
def load_descriptor(self):
if self.sd_hash:
self.stream_descriptor = yield self.factory.blob_manager.get_blob(self.sd_hash)
else:
raise ValueError("no sd hash for stream %s" % self.stream_hash)
def parse_response(self, buff):
try:
return json.loads(buff)
except ValueError:
raise IncompleteResponse()
def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())
def handle_response(self, response_dict):
if not self.received_server_version:
return self.handle_handshake_response(response_dict)
elif not self.received_descriptor_response and self.server_version == REFLECTOR_V2:
return self.handle_descriptor_response(response_dict)
else:
return self.handle_normal_response(response_dict)
def set_not_uploading(self):
if self.next_blob_to_send is not None:
log.debug("Close %s", self.next_blob_to_send)
self.read_handle.close()
self.read_handle = None
self.next_blob_to_send = None
if self.file_sender is not None:
self.file_sender.stopProducing()
self.file_sender = None
return defer.succeed(None)
def start_transfer(self):
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
d.addCallback(lambda _: self.read_handle.close())
return d
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.server_version = int(response_dict['version'])
if self.server_version not in [REFLECTOR_V1, REFLECTOR_V2]:
raise ValueError(f"I can't handle protocol version {self.server_version}!")
self.received_server_version = True
return defer.succeed(True)
def handle_descriptor_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_sd_blob' not in response_dict:
raise ReflectorRequestError("I don't know whether to send the sd blob or not!")
if response_dict['send_sd_blob'] is True:
self.file_sender = FileSender()
else:
self.received_descriptor_response = True
self.descriptor_needed = response_dict['send_sd_blob']
self.needed_blobs = response_dict.get('needed_blobs', [])
return self.get_blobs_to_send()
else: # Expecting Server Blob Response
if 'received_sd_blob' not in response_dict:
raise ValueError("I don't know if the sd blob made it to the intended destination!")
else:
self.received_descriptor_response = True
disconnect = False
if response_dict['received_sd_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.info("Sent reflector descriptor %s", self.next_blob_to_send)
else:
log.warning("Reflector failed to receive descriptor %s",
self.next_blob_to_send)
disconnect = True
d = self.set_not_uploading()
if disconnect:
d.addCallback(lambda _: self.transport.loseConnection())
return d
def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return defer.succeed(True)
else:
log.info("Reflector already has %s", self.next_blob_to_send)
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
if response_dict['received_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.debug("Sent reflector blob %s", self.next_blob_to_send)
else:
log.warning("Reflector failed to receive blob %s", self.next_blob_to_send)
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.get_is_verified():
read_handle = blob.open_for_reading()
if read_handle is not None:
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return defer.succeed(None)
return defer.fail(ValueError(
f"Couldn't open that blob for some reason. blob_hash: {blob.blob_hash}"))
def send_blob_info(self):
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
r = {
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}
self.send_request(r)
def send_descriptor_info(self):
assert self.stream_descriptor is not None, "need to have a sd blob to send at this point"
r = {
'sd_blob_hash': self.stream_descriptor.blob_hash,
'sd_blob_size': self.stream_descriptor.length
}
self.sent_stream_info = True
self.send_request(r)
def skip_missing_blob(self, err, blob_hash):
err.trap(ValueError)
if blob_hash not in self.failed_blob_hashes:
log.warning("Failed to reflect blob %s, reason: %s",
str(blob_hash)[:16], err.getTraceback())
self.blob_hashes_to_send.append(blob_hash)
self.failed_blob_hashes.append(blob_hash)
else:
log.warning("Failed second try reflecting blob %s, giving up, reason: %s",
str(blob_hash)[:16], err.getTraceback())
def send_next_request(self):
if self.file_sender is not None:
# send the blob
return self.start_transfer()
elif not self.sent_stream_info:
# open the sd blob to send
blob = self.stream_descriptor
d = self.open_blob_for_reading(blob)
d.addCallbacks(lambda _: self.send_descriptor_info(),
lambda err: self.skip_missing_blob(err, blob.blob_hash))
return d
elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = defer.succeed(self.blob_manager.get_blob(blob_hash))
d.addCallback(self.open_blob_for_reading)
d.addCallbacks(lambda _: self.send_blob_info(),
lambda err: self.skip_missing_blob(err, blob.blob_hash))
return d
# close connection
self.transport.loseConnection()
class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient
protocol_version = REFLECTOR_V2
def __init__(self, blob_manager, stream_hash, sd_hash):
self.blob_manager = blob_manager
self.stream_hash = stream_hash
self.sd_hash = sd_hash
self.p = None
self.finished_deferred = defer.Deferred()
def buildProtocol(self, addr):
p = self.protocol()
p.factory = self
self.p = p
return p
def startFactory(self):
log.debug('Starting reflector factory')
ClientFactory.startFactory(self)
def startedConnecting(self, connector):
log.debug('Connecting to reflector')
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
def clientConnectionFailed(self, connector, reason):
if reason.check(ConnectionRefusedError):
log.warning("Could not connect to reflector server")
else:
log.error("Reflector connection failed: %s", reason)

View file

@ -1,27 +0,0 @@
REFLECTOR_V1 = 0
REFLECTOR_V2 = 1
class ReflectorClientVersionError(Exception):
"""
Raised by reflector server if client sends an incompatible or unknown version
"""
class ReflectorRequestError(Exception):
"""
Raised by reflector server if client sends a message without the required fields
"""
class ReflectorRequestDecodeError(Exception):
"""
Raised by reflector server if client sends an invalid json request
"""
class IncompleteResponse(Exception):
"""
Raised by reflector server when client sends a portion of a json request,
used buffering the incoming request
"""

View file

@ -1,63 +0,0 @@
from twisted.internet import reactor, defer
from lbrynet.extras.reflector.client.client import EncryptedFileReflectorClientFactory
from lbrynet.extras.reflector.client.blob import BlobReflectorClientFactory
def _is_ip(host):
try:
if len(host.split(".")) == 4 and all([0 <= int(x) <= 255 for x in host.split(".")]):
return True
return False
except ValueError:
return False
@defer.inlineCallbacks
def resolve(host):
if _is_ip(host):
ip = host
else:
ip = yield reactor.resolve(host)
defer.returnValue(ip)
@defer.inlineCallbacks
def _reflect_stream(blob_manager, stream_hash, sd_hash, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = EncryptedFileReflectorClientFactory(blob_manager, stream_hash, sd_hash)
ip = yield resolve(reflector_address)
yield reactor.connectTCP(ip, reflector_port, factory)
result = yield factory.finished_deferred
defer.returnValue(result)
def _reflect_file(lbry_file, reflector_server):
return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, lbry_file.sd_hash, reflector_server)
@defer.inlineCallbacks
def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = BlobReflectorClientFactory(blob_manager, blob_hashes)
ip = yield resolve(reflector_address)
yield reactor.connectTCP(ip, reflector_port, factory)
result = yield factory.finished_deferred
return result
def reflect_file(lbry_file, reflector_server):
if len(reflector_server.split(":")) == 2:
host, port = tuple(reflector_server.split(":"))
reflector_server = host, int(port)
else:
reflector_server = reflector_server, 5566
return _reflect_file(lbry_file, reflector_server)
def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server):
if len(reflector_server.split(":")) == 2:
host, port = tuple(reflector_server.split(":"))
reflector_server = host, int(port)
else:
reflector_server = reflector_server, 5566
return _reflect_blobs(blob_manager, blob_hashes, reflector_server)

View file

@ -1,325 +0,0 @@
import logging
import json
from twisted.python import failure
from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ServerFactory
from lbrynet.blob.blob_file import is_valid_blobhash
from lbrynet.p2p.Error import DownloadCanceledError, InvalidBlobHashError
from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.p2p.StreamDescriptor import save_sd_info
from lbrynet.extras.reflector.common import REFLECTOR_V1, REFLECTOR_V2
from lbrynet.extras.reflector.common import ReflectorRequestError, ReflectorClientVersionError
log = logging.getLogger(__name__)
MAXIMUM_QUERY_SIZE = 200
SEND_SD_BLOB = 'send_sd_blob'
SEND_BLOB = 'send_blob'
RECEIVED_SD_BLOB = 'received_sd_blob'
RECEIVED_BLOB = 'received_blob'
NEEDED_BLOBS = 'needed_blobs'
VERSION = 'version'
BLOB_SIZE = 'blob_size'
BLOB_HASH = 'blob_hash'
SD_BLOB_SIZE = 'sd_blob_size'
SD_BLOB_HASH = 'sd_blob_hash'
class ReflectorServer(Protocol):
def connectionMade(self):
peer_info = self.transport.getPeer()
log.debug('Connection made to %s', peer_info)
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.blob_manager = self.factory.blob_manager
self.storage = self.factory.blob_manager.storage
self.lbry_file_manager = self.factory.lbry_file_manager
self.protocol_version = self.factory.protocol_version
self.received_handshake = False
self.peer_version = None
self.receiving_blob = False
self.incoming_blob = None
self.blob_finished_d = None
self.request_buff = b""
self.blob_writer = None
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
log.info("Reflector upload from %s finished" % self.peer.host)
def handle_error(self, err):
log.error(err.getTraceback())
self.transport.loseConnection()
def send_response(self, response_dict):
self.transport.write(json.dumps(response_dict).encode())
############################
# Incoming blob file stuff #
############################
def clean_up_failed_upload(self, err, blob):
log.warning("Failed to receive %s", blob)
if err.check(DownloadCanceledError):
self.blob_manager.delete_blobs([blob.blob_hash])
else:
log.exception(err)
@defer.inlineCallbacks
def _on_completed_blob(self, blob, response_key):
yield self.blob_manager.blob_completed(blob, should_announce=False)
if response_key == RECEIVED_SD_BLOB:
sd_info = yield BlobStreamDescriptorReader(blob).get_info()
yield save_sd_info(self.blob_manager, blob.blob_hash, sd_info)
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
else:
stream_hash = yield self.storage.get_stream_of_blob(blob.blob_hash)
if stream_hash is not None:
blob_num = yield self.storage.get_blob_num_by_hash(stream_hash,
blob.blob_hash)
if blob_num == 0:
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
yield self.close_blob()
log.info("Received %s", blob)
yield self.send_response({response_key: True})
@defer.inlineCallbacks
def _on_failed_blob(self, err, response_key):
yield self.clean_up_failed_upload(err, self.incoming_blob)
yield self.send_response({response_key: False})
def handle_incoming_blob(self, response_key):
"""
Open blob for writing and send a response indicating if the transfer was
successful when finished.
response_key will either be received_blob or received_sd_blob
"""
blob = self.incoming_blob
self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer)
self.blob_finished_d.addCallback(self._on_completed_blob, response_key)
self.blob_finished_d.addErrback(self._on_failed_blob, response_key)
def close_blob(self):
self.blob_writer.close()
self.blob_writer = None
self.blob_finished_d = None
self.incoming_blob = None
self.receiving_blob = False
####################
# Request handling #
####################
def dataReceived(self, data):
if self.receiving_blob:
self.blob_writer.write(data)
else:
log.debug('Not yet receiving blob, data needs further processing')
self.request_buff += data
msg, extra_data = self._get_valid_response(self.request_buff)
if msg is not None:
self.request_buff = b''
d = self.handle_request(msg)
d.addErrback(self.handle_error)
if self.receiving_blob and extra_data:
log.debug('Writing extra data to blob')
self.blob_writer.write(extra_data)
def _get_valid_response(self, response_msg):
extra_data = None
response = None
curr_pos = 0
while not self.receiving_blob:
next_close_paren = response_msg.find(b'}', curr_pos)
if next_close_paren != -1:
curr_pos = next_close_paren + 1
try:
response = json.loads(response_msg[:curr_pos])
except ValueError:
if curr_pos > MAXIMUM_QUERY_SIZE:
raise ValueError("Error decoding response: %s" % str(response_msg))
else:
pass
else:
extra_data = response_msg[curr_pos:]
break
else:
break
return response, extra_data
def need_handshake(self):
return self.received_handshake is False
def is_descriptor_request(self, request_dict):
if SD_BLOB_HASH not in request_dict or SD_BLOB_SIZE not in request_dict:
return False
if not is_valid_blobhash(request_dict[SD_BLOB_HASH]):
raise InvalidBlobHashError(request_dict[SD_BLOB_HASH])
return True
def is_blob_request(self, request_dict):
if BLOB_HASH not in request_dict or BLOB_SIZE not in request_dict:
return False
if not is_valid_blobhash(request_dict[BLOB_HASH]):
raise InvalidBlobHashError(request_dict[BLOB_HASH])
return True
def handle_request(self, request_dict):
if self.need_handshake():
return self.handle_handshake(request_dict)
if self.is_descriptor_request(request_dict):
return self.handle_descriptor_request(request_dict)
if self.is_blob_request(request_dict):
return self.handle_blob_request(request_dict)
raise ReflectorRequestError("Invalid request")
def handle_handshake(self, request_dict):
"""
Upon connecting, the client sends a version handshake:
{
'version': int,
}
The server replies with the same version if it is supported
{
'version': int,
}
"""
if VERSION not in request_dict:
raise ReflectorRequestError("Client should send version")
if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]:
raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION]))
self.peer_version = int(request_dict[VERSION])
log.debug('Handling handshake for client version %i', self.peer_version)
self.received_handshake = True
return self.send_handshake_response()
def send_handshake_response(self):
d = defer.succeed({VERSION: self.peer_version})
d.addCallback(self.send_response)
return d
def handle_descriptor_request(self, request_dict):
"""
If the client is reflecting a whole stream, they send a stream descriptor request:
{
'sd_blob_hash': str,
'sd_blob_size': int
}
The server indicates if it's aware of this stream already by requesting (or not requesting)
the stream descriptor blob. If the server has a validated copy of the sd blob, it will
include the needed_blobs field (a list of blob hashes missing from reflector) in the
response. If the server does not have the sd blob the needed_blobs field will not be
included, as the server does not know what blobs it is missing - so the client should send
all of the blobs in the stream.
{
'send_sd_blob': bool
'needed_blobs': list, conditional
}
The client may begin the file transfer of the sd blob if send_sd_blob was True.
If the client sends the blob, after receiving it the server indicates if the
transfer was successful:
{
'received_sd_blob': bool
}
"""
sd_blob_hash = request_dict[SD_BLOB_HASH]
sd_blob_size = request_dict[SD_BLOB_SIZE]
if self.blob_writer is None:
d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size)
d.addCallback(self.get_descriptor_response)
d.addCallback(self.send_response)
else:
self.receiving_blob = True
d = self.blob_finished_d
return d
@defer.inlineCallbacks
def get_descriptor_response(self, sd_blob):
if sd_blob.get_is_verified():
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info)
yield self.storage.verify_will_announce_head_and_sd_blobs(sd_info['stream_hash'])
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_info['stream_hash'])
else:
self.incoming_blob = sd_blob
self.receiving_blob = True
self.handle_incoming_blob(RECEIVED_SD_BLOB)
response = {SEND_SD_BLOB: True}
defer.returnValue(response)
@defer.inlineCallbacks
def request_needed_blobs(self, response, stream_hash):
needed_blobs = yield self.storage.get_pending_blobs_for_stream(stream_hash)
response.update({NEEDED_BLOBS: needed_blobs})
defer.returnValue(response)
def handle_blob_request(self, request_dict):
"""
A client queries if the server will accept a blob
{
'blob_hash': str,
'blob_size': int
}
The server replies, send_blob will be False if the server has a validated copy of the blob:
{
'send_blob': bool
}
The client may begin the raw blob file transfer if the server replied True.
If the client sends the blob, the server replies:
{
'received_blob': bool
}
"""
blob_hash = request_dict[BLOB_HASH]
blob_size = request_dict[BLOB_SIZE]
if self.blob_writer is None:
log.debug('Received info for blob: %s', blob_hash[:16])
d = self.blob_manager.get_blob(blob_hash, blob_size)
d.addCallback(self.get_blob_response)
d.addCallback(self.send_response)
else:
log.debug('blob is already open')
self.receiving_blob = True
d = self.blob_finished_d
return d
def get_blob_response(self, blob):
if blob.get_is_verified():
return defer.succeed({SEND_BLOB: False})
else:
self.incoming_blob = blob
self.receiving_blob = True
self.handle_incoming_blob(RECEIVED_BLOB)
d = defer.succeed({SEND_BLOB: True})
return d
class ReflectorServerFactory(ServerFactory):
protocol = ReflectorServer
def __init__(self, peer_manager, blob_manager, lbry_file_manager):
self.peer_manager = peer_manager
self.blob_manager = blob_manager
self.lbry_file_manager = lbry_file_manager
self.protocol_version = REFLECTOR_V2
def buildProtocol(self, addr):
log.debug('Creating a protocol for %s', addr)
return ServerFactory.buildProtocol(self, addr)