forked from LBRYCommunity/lbry-sdk
reflector v2
-add {‘sd_blob_hash’: …, ‘sd_blob_size ‘: …} query type with {‘send_sd_blob’: True/False, ‘needed_blobs’: []} response this allows the reflector client to know how much of a stream reflector already has covered, as to minimize the number of subsequent requests and prevent streams from being partially reflected -remove empty {} request
This commit is contained in:
parent
2126f69c93
commit
6fae07d29e
5 changed files with 409 additions and 181 deletions
|
@ -5,7 +5,7 @@ from twisted.protocols.basic import FileSender
|
|||
from twisted.internet.protocol import Protocol, ClientFactory
|
||||
from twisted.internet import defer, error
|
||||
|
||||
from lbrynet.reflector.common import IncompleteResponse
|
||||
from lbrynet.reflector.common import IncompleteResponse, REFLECTOR_V2
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -22,7 +22,7 @@ class BlobReflectorClient(Protocol):
|
|||
self.next_blob_to_send = None
|
||||
self.blob_read_handle = None
|
||||
self.received_handshake_response = False
|
||||
self.protocol_version = None
|
||||
self.protocol_version = self.factory.protocol_version
|
||||
self.file_sender = None
|
||||
self.producer = None
|
||||
self.streaming = False
|
||||
|
@ -32,7 +32,7 @@ class BlobReflectorClient(Protocol):
|
|||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
|
||||
def dataReceived(self, data):
|
||||
log.debug('Recieved %s', data)
|
||||
log.debug('Received %s', data)
|
||||
self.response_buff += data
|
||||
try:
|
||||
msg = self.parse_response(self.response_buff)
|
||||
|
@ -74,7 +74,7 @@ class BlobReflectorClient(Protocol):
|
|||
|
||||
def send_handshake(self):
|
||||
log.debug('Sending handshake')
|
||||
self.write(json.dumps({'version': 0}))
|
||||
self.write(json.dumps({'version': self.protocol_version}))
|
||||
return defer.succeed(None)
|
||||
|
||||
def parse_response(self, buff):
|
||||
|
@ -102,7 +102,6 @@ class BlobReflectorClient(Protocol):
|
|||
|
||||
def start_transfer(self):
|
||||
self.sent_blobs = True
|
||||
self.write(json.dumps({}))
|
||||
assert self.read_handle is not None, \
|
||||
"self.read_handle was None when trying to start the transfer"
|
||||
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
||||
|
@ -111,8 +110,8 @@ class BlobReflectorClient(Protocol):
|
|||
def handle_handshake_response(self, response_dict):
|
||||
if 'version' not in response_dict:
|
||||
raise ValueError("Need protocol version number!")
|
||||
self.protocol_version = int(response_dict['version'])
|
||||
if self.protocol_version != 0:
|
||||
server_version = int(response_dict['version'])
|
||||
if self.protocol_version != server_version:
|
||||
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
|
||||
self.received_handshake_response = True
|
||||
return defer.succeed(True)
|
||||
|
@ -184,6 +183,7 @@ class BlobReflectorClientFactory(ClientFactory):
|
|||
protocol = BlobReflectorClient
|
||||
|
||||
def __init__(self, blob_manager, blobs):
|
||||
self.protocol_version = REFLECTOR_V2
|
||||
self.blob_manager = blob_manager
|
||||
self.blobs = blobs
|
||||
self.p = None
|
||||
|
|
|
@ -1,61 +1,13 @@
|
|||
"""
|
||||
The reflector protocol (all dicts encoded in json):
|
||||
|
||||
Client Handshake (sent once per connection, at the start of the connection):
|
||||
|
||||
{
|
||||
'version': 0,
|
||||
}
|
||||
|
||||
|
||||
Server Handshake (sent once per connection, after receiving the client handshake):
|
||||
|
||||
{
|
||||
'version': 0,
|
||||
}
|
||||
|
||||
|
||||
Client Info Request:
|
||||
|
||||
{
|
||||
'blob_hash': "<blob_hash>",
|
||||
'blob_size': <blob_size>
|
||||
}
|
||||
|
||||
|
||||
Server Info Response (sent in response to Client Info Request):
|
||||
|
||||
{
|
||||
'send_blob': True|False
|
||||
}
|
||||
|
||||
If response is 'YES', client may send a Client Blob Request or a Client Info Request.
|
||||
If response is 'NO', client may only send a Client Info Request
|
||||
|
||||
|
||||
Client Blob Request:
|
||||
|
||||
{} # Yes, this is an empty dictionary, in case something needs to go here in the future
|
||||
<raw blob_data> # this blob data must match the info sent in the most recent Client Info Request
|
||||
|
||||
|
||||
Server Blob Response (sent in response to Client Blob Request):
|
||||
{
|
||||
'received_blob': True
|
||||
}
|
||||
|
||||
Client may now send another Client Info Request
|
||||
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
|
||||
from twisted.internet.error import ConnectionRefusedError
|
||||
from twisted.protocols.basic import FileSender
|
||||
from twisted.internet.protocol import Protocol, ClientFactory
|
||||
from twisted.internet import defer, error
|
||||
|
||||
from lbrynet.reflector.common import IncompleteResponse
|
||||
|
||||
from lbrynet.reflector.common import IncompleteResponse, ReflectorRequestError
|
||||
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -63,24 +15,31 @@ log = logging.getLogger(__name__)
|
|||
class EncryptedFileReflectorClient(Protocol):
|
||||
# Protocol stuff
|
||||
def connectionMade(self):
|
||||
log.debug("Connected to reflector")
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.response_buff = ''
|
||||
self.outgoing_buff = ''
|
||||
self.blob_hashes_to_send = []
|
||||
self.next_blob_to_send = None
|
||||
self.blob_read_handle = None
|
||||
self.received_handshake_response = False
|
||||
self.protocol_version = None
|
||||
self.read_handle = None
|
||||
self.sent_stream_info = False
|
||||
self.received_descriptor_response = False
|
||||
self.protocol_version = self.factory.protocol_version
|
||||
self.received_server_version = False
|
||||
self.server_version = None
|
||||
self.stream_descriptor = None
|
||||
self.descriptor_needed = None
|
||||
self.needed_blobs = []
|
||||
self.reflected_blobs = []
|
||||
self.file_sender = None
|
||||
self.producer = None
|
||||
self.streaming = False
|
||||
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
|
||||
d = self.load_descriptor()
|
||||
d.addCallback(lambda _: self.send_handshake())
|
||||
d.addErrback(
|
||||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
|
||||
def dataReceived(self, data):
|
||||
log.debug('Recieved %s', data)
|
||||
self.response_buff += data
|
||||
try:
|
||||
msg = self.parse_response(self.response_buff)
|
||||
|
@ -94,8 +53,8 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
|
||||
def connectionLost(self, reason):
|
||||
if reason.check(error.ConnectionDone):
|
||||
log.info('Finished sending data via reflector')
|
||||
self.factory.finished_deferred.callback(True)
|
||||
log.debug('Finished sending data via reflector')
|
||||
self.factory.finished_deferred.callback(self.reflected_blobs)
|
||||
else:
|
||||
log.debug('Reflector finished: %s', reason)
|
||||
self.factory.finished_deferred.callback(reason)
|
||||
|
@ -118,31 +77,54 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
from twisted.internet import reactor
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def get_blobs_to_send(self, stream_info_manager, stream_hash):
|
||||
log.debug('Getting blobs from stream hash: %s', stream_hash)
|
||||
d = stream_info_manager.get_blobs_for_stream(stream_hash)
|
||||
def get_validated_blobs(self, blobs_in_stream):
|
||||
def get_blobs(blobs):
|
||||
for (blob, _, _, blob_len) in blobs:
|
||||
if blob:
|
||||
yield self.blob_manager.get_blob(blob, True, blob_len)
|
||||
|
||||
def set_blobs(blob_hashes):
|
||||
for blob_hash, position, iv, length in blob_hashes:
|
||||
if blob_hash is not None:
|
||||
self.blob_hashes_to_send.append(blob_hash)
|
||||
log.debug("Preparing to send %i blobs", len(self.blob_hashes_to_send))
|
||||
dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
|
||||
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()])
|
||||
return dl
|
||||
|
||||
d.addCallback(set_blobs)
|
||||
def set_blobs_to_send(self, blobs_to_send):
|
||||
for blob in blobs_to_send:
|
||||
if blob not in self.blob_hashes_to_send:
|
||||
self.blob_hashes_to_send.append(blob)
|
||||
|
||||
d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash))
|
||||
def get_blobs_to_send(self):
|
||||
def _show_missing_blobs(filtered):
|
||||
if filtered:
|
||||
needs_desc = "" if not self.descriptor_needed else "descriptor and "
|
||||
log.info("Reflector needs %s%i blobs for %s",
|
||||
needs_desc,
|
||||
len(filtered),
|
||||
str(self.stream_descriptor)[:16])
|
||||
return filtered
|
||||
|
||||
def set_sd_blobs(sd_blob_hashes):
|
||||
for sd_blob_hash in sd_blob_hashes:
|
||||
self.blob_hashes_to_send.append(sd_blob_hash)
|
||||
log.debug("Preparing to send %i sd blobs", len(sd_blob_hashes))
|
||||
|
||||
d.addCallback(set_sd_blobs)
|
||||
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
|
||||
d.addCallback(self.get_validated_blobs)
|
||||
if not self.descriptor_needed:
|
||||
d.addCallback(lambda filtered:
|
||||
[blob for blob in filtered if blob.blob_hash in self.needed_blobs])
|
||||
d.addCallback(_show_missing_blobs)
|
||||
d.addCallback(self.set_blobs_to_send)
|
||||
return d
|
||||
|
||||
def send_request(self, request_dict):
|
||||
self.write(json.dumps(request_dict))
|
||||
|
||||
def send_handshake(self):
|
||||
log.debug('Sending handshake')
|
||||
self.write(json.dumps({'version': 0}))
|
||||
self.send_request({'version': self.protocol_version})
|
||||
|
||||
def load_descriptor(self):
|
||||
def _save_descriptor_blob(sd_blob):
|
||||
self.stream_descriptor = sd_blob
|
||||
|
||||
d = self.factory.stream_info_manager.get_sd_blob_hashes_for_stream(self.factory.stream_hash)
|
||||
d.addCallback(lambda sd: self.factory.blob_manager.get_blob(sd[0], True))
|
||||
d.addCallback(_save_descriptor_blob)
|
||||
return d
|
||||
|
||||
def parse_response(self, buff):
|
||||
try:
|
||||
|
@ -154,8 +136,10 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
log.warning("An error occurred handling the response: %s", err.getTraceback())
|
||||
|
||||
def handle_response(self, response_dict):
|
||||
if self.received_handshake_response is False:
|
||||
if not self.received_server_version:
|
||||
return self.handle_handshake_response(response_dict)
|
||||
elif not self.received_descriptor_response and self.server_version == REFLECTOR_V2:
|
||||
return self.handle_descriptor_response(response_dict)
|
||||
else:
|
||||
return self.handle_normal_response(response_dict)
|
||||
|
||||
|
@ -168,7 +152,6 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
return defer.succeed(None)
|
||||
|
||||
def start_transfer(self):
|
||||
self.write(json.dumps({}))
|
||||
assert self.read_handle is not None, \
|
||||
"self.read_handle was None when trying to start the transfer"
|
||||
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
||||
|
@ -177,12 +160,37 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
def handle_handshake_response(self, response_dict):
|
||||
if 'version' not in response_dict:
|
||||
raise ValueError("Need protocol version number!")
|
||||
self.protocol_version = int(response_dict['version'])
|
||||
if self.protocol_version != 0:
|
||||
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
|
||||
self.received_handshake_response = True
|
||||
self.server_version = int(response_dict['version'])
|
||||
if self.server_version not in [REFLECTOR_V1, REFLECTOR_V2]:
|
||||
raise ValueError("I can't handle protocol version {}!".format(self.server_version))
|
||||
self.received_server_version = True
|
||||
return defer.succeed(True)
|
||||
|
||||
def handle_descriptor_response(self, response_dict):
|
||||
if self.file_sender is None: # Expecting Server Info Response
|
||||
if 'send_sd_blob' not in response_dict:
|
||||
raise ReflectorRequestError("I don't know whether to send the sd blob or not!")
|
||||
if response_dict['send_sd_blob'] is True:
|
||||
self.file_sender = FileSender()
|
||||
else:
|
||||
self.received_descriptor_response = True
|
||||
self.descriptor_needed = response_dict['send_sd_blob']
|
||||
self.needed_blobs = response_dict.get('needed_blobs', [])
|
||||
return self.get_blobs_to_send()
|
||||
else: # Expecting Server Blob Response
|
||||
if 'received_sd_blob' not in response_dict:
|
||||
raise ValueError("I don't know if the sd blob made it to the intended destination!")
|
||||
else:
|
||||
self.received_descriptor_response = True
|
||||
if response_dict['received_sd_blob']:
|
||||
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||
log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16])
|
||||
else:
|
||||
log.warning("Reflector failed to receive descriptor %s, trying again later",
|
||||
self.next_blob_to_send.blob_hash[:16])
|
||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
||||
return self.set_not_uploading()
|
||||
|
||||
def handle_normal_response(self, response_dict):
|
||||
if self.file_sender is None: # Expecting Server Info Response
|
||||
if 'send_blob' not in response_dict:
|
||||
|
@ -191,13 +199,19 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
self.file_sender = FileSender()
|
||||
return defer.succeed(True)
|
||||
else:
|
||||
log.debug("Reflector already has %s", str(self.next_blob_to_send.blob_hash)[:16])
|
||||
log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16])
|
||||
return self.set_not_uploading()
|
||||
else: # Expecting Server Blob Response
|
||||
if 'received_blob' not in response_dict:
|
||||
raise ValueError("I don't know if the blob made it to the intended destination!")
|
||||
else:
|
||||
log.info("Reflector received %s", str(self.next_blob_to_send.blob_hash)[:16])
|
||||
if response_dict['received_blob']:
|
||||
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||
log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16])
|
||||
else:
|
||||
log.warning("Reflector failed to receive blob %s, trying again later",
|
||||
self.next_blob_to_send.blob_hash[:16])
|
||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
||||
return self.set_not_uploading()
|
||||
|
||||
def open_blob_for_reading(self, blob):
|
||||
|
@ -207,43 +221,51 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
log.debug('Getting ready to send %s', blob.blob_hash)
|
||||
self.next_blob_to_send = blob
|
||||
self.read_handle = read_handle
|
||||
return None
|
||||
else:
|
||||
log.warning("Can't reflect blob %s", str(blob.blob_hash)[:16])
|
||||
return defer.succeed(None)
|
||||
return defer.fail(ValueError(
|
||||
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)))
|
||||
|
||||
def send_blob_info(self):
|
||||
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
|
||||
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
self.write(json.dumps({
|
||||
r = {
|
||||
'blob_hash': self.next_blob_to_send.blob_hash,
|
||||
'blob_size': self.next_blob_to_send.length
|
||||
}))
|
||||
}
|
||||
self.send_request(r)
|
||||
|
||||
def send_descriptor_info(self):
|
||||
assert self.stream_descriptor is not None, "need to have a sd blob to send at this point"
|
||||
r = {
|
||||
'sd_blob_hash': self.stream_descriptor.blob_hash,
|
||||
'sd_blob_size': self.stream_descriptor.length
|
||||
}
|
||||
self.sent_stream_info = True
|
||||
self.send_request(r)
|
||||
|
||||
def skip_missing_blob(self, err, blob_hash):
|
||||
log.warning("Can't reflect blob %s", str(blob_hash)[:16])
|
||||
err.trap(ValueError)
|
||||
return self.send_next_request()
|
||||
|
||||
def send_next_request(self):
|
||||
if self.file_sender is not None:
|
||||
# send the blob
|
||||
log.debug('Sending %s to reflector', str(self.next_blob_to_send.blob_hash)[:16])
|
||||
return self.start_transfer()
|
||||
elif not self.sent_stream_info:
|
||||
# open the sd blob to send
|
||||
blob = self.stream_descriptor
|
||||
d = self.open_blob_for_reading(blob)
|
||||
d.addCallback(lambda _: self.send_descriptor_info())
|
||||
return d
|
||||
elif self.blob_hashes_to_send:
|
||||
# open the next blob to send
|
||||
blob_hash = self.blob_hashes_to_send[0]
|
||||
log.debug('No current blob, sending the next one: %s', blob_hash)
|
||||
blob = self.blob_hashes_to_send[0]
|
||||
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
|
||||
d = self.blob_manager.get_blob(blob_hash, True)
|
||||
d.addCallback(self.open_blob_for_reading)
|
||||
# send the server the next blob hash + length
|
||||
d = self.open_blob_for_reading(blob)
|
||||
d.addCallbacks(lambda _: self.send_blob_info(),
|
||||
lambda err: self.skip_missing_blob(err, blob_hash))
|
||||
lambda err: self.skip_missing_blob(err, blob.blob_hash))
|
||||
return d
|
||||
else:
|
||||
# close connection
|
||||
log.debug('No more blob hashes, closing connection')
|
||||
self.transport.loseConnection()
|
||||
|
||||
|
||||
|
@ -251,6 +273,7 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
|
|||
protocol = EncryptedFileReflectorClient
|
||||
|
||||
def __init__(self, blob_manager, stream_info_manager, stream_hash):
|
||||
self.protocol_version = REFLECTOR_V2
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.stream_hash = stream_hash
|
||||
|
@ -268,11 +291,13 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
|
|||
ClientFactory.startFactory(self)
|
||||
|
||||
def startedConnecting(self, connector):
|
||||
log.debug('Started connecting')
|
||||
log.debug('Connecting to reflector')
|
||||
|
||||
def clientConnectionLost(self, connector, reason):
|
||||
"""If we get disconnected, reconnect to server."""
|
||||
log.debug("connection lost: %s", reason)
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
log.debug("connection failed: %s", reason)
|
||||
if reason.check(ConnectionRefusedError):
|
||||
log.warning("Could not connect to reflector server")
|
||||
else:
|
||||
log.error("Reflector connection failed: %s", reason)
|
||||
|
|
|
@ -1,2 +1,27 @@
|
|||
REFLECTOR_V1 = 0
|
||||
REFLECTOR_V2 = 1
|
||||
|
||||
|
||||
class ReflectorClientVersionError(Exception):
|
||||
"""
|
||||
Raised by reflector server if client sends an incompatible or unknown version
|
||||
"""
|
||||
|
||||
|
||||
class ReflectorRequestError(Exception):
|
||||
"""
|
||||
Raised by reflector server if client sends a message without the required fields
|
||||
"""
|
||||
|
||||
|
||||
class ReflectorRequestDecodeError(Exception):
|
||||
"""
|
||||
Raised by reflector server if client sends an invalid json request
|
||||
"""
|
||||
|
||||
|
||||
class IncompleteResponse(Exception):
|
||||
pass
|
||||
"""
|
||||
Raised by reflector server when client sends a portion of a json request,
|
||||
used buffering the incoming request
|
||||
"""
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet.error import ConnectionLost, ConnectionDone
|
||||
from lbrynet.reflector import BlobClientFactory, ClientFactory
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -27,19 +28,13 @@ def _reflect_stream(lbry_file, reflector_server):
|
|||
)
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: log.info("Connected to %s", reflector_address))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s",
|
||||
len(reflected_blobs),
|
||||
lbry_file.uri))
|
||||
return d
|
||||
|
||||
|
||||
def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server):
|
||||
if reflector_has_stream:
|
||||
log.info("lbry://%s is available", lbry_file.uri)
|
||||
return defer.succeed(False)
|
||||
log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri)
|
||||
return _reflect_stream(lbry_file, reflector_server)
|
||||
|
||||
|
||||
def _catch_error(err, uri):
|
||||
msg = "An error occurred while checking availability for lbry://%s: %s"
|
||||
log.error(msg, uri, err.getTraceback())
|
||||
|
@ -47,5 +42,6 @@ def _catch_error(err, uri):
|
|||
|
||||
def check_and_restore_availability(lbry_file, reflector_server):
|
||||
d = _reflect_stream(lbry_file, reflector_server)
|
||||
d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost))
|
||||
d.addErrback(_catch_error, lbry_file.uri)
|
||||
return d
|
||||
|
|
|
@ -1,21 +1,36 @@
|
|||
import logging
|
||||
import json
|
||||
from twisted.python import failure
|
||||
from twisted.internet import error, defer
|
||||
from twisted.internet.protocol import Protocol, ServerFactory
|
||||
import json
|
||||
from lbrynet.core.utils import is_valid_blobhash
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
|
||||
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
||||
from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
MAXIMUM_QUERY_SIZE = 200
|
||||
SEND_SD_BLOB = 'send_sd_blob'
|
||||
SEND_BLOB = 'send_blob'
|
||||
RECEIVED_SD_BLOB = 'received_sd_blob'
|
||||
RECEIVED_BLOB = 'received_blob'
|
||||
NEEDED_BLOBS = 'needed_blobs'
|
||||
VERSION = 'version'
|
||||
BLOB_SIZE = 'blob_size'
|
||||
BLOB_HASH = 'blob_hash'
|
||||
SD_BLOB_SIZE = 'sd_blob_size'
|
||||
SD_BLOB_HASH = 'sd_blob_hash'
|
||||
|
||||
|
||||
class ReflectorServer(Protocol):
|
||||
|
||||
def connectionMade(self):
|
||||
peer_info = self.transport.getPeer()
|
||||
log.debug('Connection made to %s', peer_info)
|
||||
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.protocol_version = self.factory.protocol_version
|
||||
self.received_handshake = False
|
||||
self.peer_version = None
|
||||
self.receiving_blob = False
|
||||
|
@ -28,6 +43,60 @@ class ReflectorServer(Protocol):
|
|||
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
|
||||
log.info("Reflector upload from %s finished" % self.peer.host)
|
||||
|
||||
def handle_error(self, err):
|
||||
log.error(err.getTraceback())
|
||||
self.transport.loseConnection()
|
||||
|
||||
def send_response(self, response_dict):
|
||||
self.transport.write(json.dumps(response_dict))
|
||||
|
||||
############################
|
||||
# Incoming blob file stuff #
|
||||
############################
|
||||
|
||||
def clean_up_failed_upload(self, err, blob):
|
||||
log.warning("Failed to receive %s", blob)
|
||||
if err.check(DownloadCanceledError):
|
||||
self.blob_manager.delete_blobs([blob.blob_hash])
|
||||
else:
|
||||
log.exception(err)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_completed_blob(self, blob, response_key):
|
||||
yield self.blob_manager.blob_completed(blob)
|
||||
yield self.close_blob()
|
||||
log.info("Received %s", blob)
|
||||
yield self.send_response({response_key: True})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_failed_blob(self, err, response_key):
|
||||
yield self.clean_up_failed_upload(err, self.incoming_blob)
|
||||
yield self.send_response({response_key: False})
|
||||
|
||||
def handle_incoming_blob(self, response_key):
|
||||
"""
|
||||
Open blob for writing and send a response indicating if the transfer was
|
||||
successful when finished.
|
||||
|
||||
response_key will either be received_blob or received_sd_blob
|
||||
"""
|
||||
|
||||
blob = self.incoming_blob
|
||||
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
|
||||
self.blob_finished_d.addCallback(self._on_completed_blob, response_key)
|
||||
self.blob_finished_d.addErrback(self._on_failed_blob, response_key)
|
||||
|
||||
def close_blob(self):
|
||||
self.blob_finished_d = None
|
||||
self.blob_write = None
|
||||
self.cancel_write = None
|
||||
self.incoming_blob = None
|
||||
self.receiving_blob = False
|
||||
|
||||
####################
|
||||
# Request handling #
|
||||
####################
|
||||
|
||||
def dataReceived(self, data):
|
||||
if self.receiving_blob:
|
||||
self.blob_write(data)
|
||||
|
@ -38,7 +107,7 @@ class ReflectorServer(Protocol):
|
|||
if msg is not None:
|
||||
self.request_buff = ''
|
||||
d = self.handle_request(msg)
|
||||
d.addCallbacks(self.send_response, self.handle_error)
|
||||
d.addErrback(self.handle_error)
|
||||
if self.receiving_blob and extra_data:
|
||||
log.debug('Writing extra data to blob')
|
||||
self.blob_write(extra_data)
|
||||
|
@ -47,15 +116,15 @@ class ReflectorServer(Protocol):
|
|||
extra_data = None
|
||||
response = None
|
||||
curr_pos = 0
|
||||
while True:
|
||||
while not self.receiving_blob:
|
||||
next_close_paren = response_msg.find('}', curr_pos)
|
||||
if next_close_paren != -1:
|
||||
curr_pos = next_close_paren + 1
|
||||
try:
|
||||
response = json.loads(response_msg[:curr_pos])
|
||||
except ValueError:
|
||||
if curr_pos > 100:
|
||||
raise Exception("error decoding response")
|
||||
if curr_pos > MAXIMUM_QUERY_SIZE:
|
||||
raise ValueError("Error decoding response: %s" % str(response_msg))
|
||||
else:
|
||||
pass
|
||||
else:
|
||||
|
@ -65,73 +134,185 @@ class ReflectorServer(Protocol):
|
|||
break
|
||||
return response, extra_data
|
||||
|
||||
def need_handshake(self):
|
||||
return self.received_handshake is False
|
||||
|
||||
def is_descriptor_request(self, request_dict):
|
||||
if SD_BLOB_HASH not in request_dict or SD_BLOB_SIZE not in request_dict:
|
||||
return False
|
||||
if not is_valid_blobhash(request_dict[SD_BLOB_HASH]):
|
||||
raise InvalidBlobHashError(request_dict[SD_BLOB_HASH])
|
||||
return True
|
||||
|
||||
def is_blob_request(self, request_dict):
|
||||
if BLOB_HASH not in request_dict or BLOB_SIZE not in request_dict:
|
||||
return False
|
||||
if not is_valid_blobhash(request_dict[BLOB_HASH]):
|
||||
raise InvalidBlobHashError(request_dict[BLOB_HASH])
|
||||
return True
|
||||
|
||||
def handle_request(self, request_dict):
|
||||
if self.received_handshake is False:
|
||||
if self.need_handshake():
|
||||
return self.handle_handshake(request_dict)
|
||||
else:
|
||||
return self.handle_normal_request(request_dict)
|
||||
if self.is_descriptor_request(request_dict):
|
||||
return self.handle_descriptor_request(request_dict)
|
||||
if self.is_blob_request(request_dict):
|
||||
return self.handle_blob_request(request_dict)
|
||||
raise ReflectorRequestError("Invalid request")
|
||||
|
||||
def handle_handshake(self, request_dict):
|
||||
log.debug('Handling handshake')
|
||||
if 'version' not in request_dict:
|
||||
raise ValueError("Client should send version")
|
||||
self.peer_version = int(request_dict['version'])
|
||||
if self.peer_version != 0:
|
||||
raise ValueError("I don't know that version!")
|
||||
"""
|
||||
Upon connecting, the client sends a version handshake:
|
||||
{
|
||||
'version': int,
|
||||
}
|
||||
|
||||
The server replies with the same version if it is supported
|
||||
{
|
||||
'version': int,
|
||||
}
|
||||
"""
|
||||
|
||||
if VERSION not in request_dict:
|
||||
raise ReflectorRequestError("Client should send version")
|
||||
|
||||
if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]:
|
||||
raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION]))
|
||||
|
||||
log.debug('Handling handshake for client version %i', self.peer_version)
|
||||
|
||||
self.peer_version = int(request_dict[VERSION])
|
||||
self.received_handshake = True
|
||||
return defer.succeed({'version': 0})
|
||||
return self.send_handshake_response()
|
||||
|
||||
def determine_blob_needed(self, blob):
|
||||
if blob.is_validated():
|
||||
return {'send_blob': False}
|
||||
else:
|
||||
self.incoming_blob = blob
|
||||
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) # pylint: disable=line-too-long
|
||||
self.blob_finished_d.addCallback(lambda _: self.blob_manager.blob_completed(blob))
|
||||
return {'send_blob': True}
|
||||
def send_handshake_response(self):
|
||||
d = defer.succeed({VERSION: self.peer_version})
|
||||
d.addCallback(self.send_response)
|
||||
return d
|
||||
|
||||
def close_blob(self):
|
||||
self.blob_finished_d = None
|
||||
self.blob_write = None
|
||||
self.cancel_write = None
|
||||
self.incoming_blob = None
|
||||
self.receiving_blob = False
|
||||
def handle_descriptor_request(self, request_dict):
|
||||
"""
|
||||
If the client is reflecting a whole stream, they send a stream descriptor request:
|
||||
{
|
||||
'sd_blob_hash': str,
|
||||
'sd_blob_size': int
|
||||
}
|
||||
|
||||
The server indicates if it's aware of this stream already by requesting (or not requesting)
|
||||
the stream descriptor blob. If the server has a validated copy of the sd blob, it will
|
||||
include the needed_blobs field (a list of blob hashes missing from reflector) in the
|
||||
response. If the server does not have the sd blob the needed_blobs field will not be
|
||||
included, as the server does not know what blobs it is missing - so the client should send
|
||||
all of the blobs in the stream.
|
||||
{
|
||||
'send_sd_blob': bool
|
||||
'needed_blobs': list, conditional
|
||||
}
|
||||
|
||||
|
||||
The client may begin the file transfer of the sd blob if send_sd_blob was True.
|
||||
If the client sends the blob, after receiving it the server indicates if the
|
||||
transfer was successful:
|
||||
{
|
||||
'received_sd_blob': bool
|
||||
}
|
||||
"""
|
||||
|
||||
sd_blob_hash = request_dict[SD_BLOB_HASH]
|
||||
sd_blob_size = request_dict[SD_BLOB_SIZE]
|
||||
|
||||
def handle_normal_request(self, request_dict):
|
||||
if self.blob_write is None:
|
||||
# we haven't opened a blob yet, meaning we must be waiting for the
|
||||
# next message containing a blob hash and a length. this message
|
||||
# should be it. if it's one we want, open the blob for writing, and
|
||||
# return a nice response dict (in a Deferred) saying go ahead
|
||||
if not 'blob_hash' in request_dict or not 'blob_size' in request_dict:
|
||||
raise ValueError("Expected a blob hash and a blob size")
|
||||
if not is_valid_blobhash(request_dict['blob_hash']):
|
||||
raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash']))
|
||||
log.debug('Recieved info for blob: %s', request_dict['blob_hash'])
|
||||
d = self.blob_manager.get_blob(
|
||||
request_dict['blob_hash'],
|
||||
True,
|
||||
int(request_dict['blob_size'])
|
||||
)
|
||||
d.addCallback(self.determine_blob_needed)
|
||||
d = self.blob_manager.get_blob(sd_blob_hash, True, sd_blob_size)
|
||||
d.addCallback(self.get_descriptor_response)
|
||||
d.addCallback(self.send_response)
|
||||
else:
|
||||
self.receiving_blob = True
|
||||
d = self.blob_finished_d
|
||||
return d
|
||||
|
||||
def get_descriptor_response(self, sd_blob):
|
||||
if sd_blob.is_validated():
|
||||
d = defer.succeed({SEND_SD_BLOB: False})
|
||||
d.addCallback(self.request_needed_blobs, sd_blob)
|
||||
else:
|
||||
self.incoming_blob = sd_blob
|
||||
self.receiving_blob = True
|
||||
self.handle_incoming_blob(RECEIVED_SD_BLOB)
|
||||
d = defer.succeed({SEND_SD_BLOB: True})
|
||||
return d
|
||||
|
||||
def request_needed_blobs(self, response, sd_blob):
|
||||
def _add_needed_blobs_to_response(needed_blobs):
|
||||
response.update({NEEDED_BLOBS: needed_blobs})
|
||||
return response
|
||||
|
||||
d = self.determine_missing_blobs(sd_blob)
|
||||
d.addCallback(_add_needed_blobs_to_response)
|
||||
return d
|
||||
|
||||
def determine_missing_blobs(self, sd_blob):
|
||||
with sd_blob.open_for_reading() as sd_file:
|
||||
sd_blob_data = sd_file.read()
|
||||
decoded_sd_blob = json.loads(sd_blob_data)
|
||||
return self.get_unvalidated_blobs_in_stream(decoded_sd_blob)
|
||||
|
||||
def get_unvalidated_blobs_in_stream(self, sd_blob):
|
||||
dl = defer.DeferredList(list(self._iter_unvalidated_blobs_in_stream(sd_blob)),
|
||||
consumeErrors=True)
|
||||
dl.addCallback(lambda needed: [blob[1] for blob in needed if blob[1]])
|
||||
return dl
|
||||
|
||||
def _iter_unvalidated_blobs_in_stream(self, sd_blob):
|
||||
for blob in sd_blob['blobs']:
|
||||
if 'blob_hash' in blob and 'length' in blob:
|
||||
blob_hash, blob_len = blob['blob_hash'], blob['length']
|
||||
d = self.blob_manager.get_blob(blob_hash, True, blob_len)
|
||||
d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None)
|
||||
yield d
|
||||
|
||||
def handle_blob_request(self, request_dict):
|
||||
"""
|
||||
A client queries if the server will accept a blob
|
||||
{
|
||||
'blob_hash': str,
|
||||
'blob_size': int
|
||||
}
|
||||
|
||||
The server replies, send_blob will be False if the server has a validated copy of the blob:
|
||||
{
|
||||
'send_blob': bool
|
||||
}
|
||||
|
||||
The client may begin the raw blob file transfer if the server replied True.
|
||||
If the client sends the blob, the server replies:
|
||||
{
|
||||
'received_blob': bool
|
||||
}
|
||||
"""
|
||||
|
||||
blob_hash = request_dict[BLOB_HASH]
|
||||
blob_size = request_dict[BLOB_SIZE]
|
||||
|
||||
if self.blob_write is None:
|
||||
log.debug('Received info for blob: %s', blob_hash[:16])
|
||||
d = self.blob_manager.get_blob(blob_hash, True, blob_size)
|
||||
d.addCallback(self.get_blob_response)
|
||||
d.addCallback(self.send_response)
|
||||
else:
|
||||
# we have a blob open already, so this message should have nothing
|
||||
# important in it. to the deferred that fires when the blob is done,
|
||||
# add a callback which returns a nice response dict saying to keep
|
||||
# sending, and then return that deferred
|
||||
log.debug('blob is already open')
|
||||
self.receiving_blob = True
|
||||
d = self.blob_finished_d
|
||||
d.addCallback(lambda _: self.close_blob())
|
||||
d.addCallback(lambda _: {'received_blob': True})
|
||||
return d
|
||||
|
||||
def send_response(self, response_dict):
|
||||
self.transport.write(json.dumps(response_dict))
|
||||
|
||||
def handle_error(self, err):
|
||||
log.error(err.getTraceback())
|
||||
self.transport.loseConnection()
|
||||
def get_blob_response(self, blob):
|
||||
if blob.is_validated():
|
||||
return defer.succeed({SEND_BLOB: False})
|
||||
else:
|
||||
self.incoming_blob = blob
|
||||
self.receiving_blob = True
|
||||
self.handle_incoming_blob(RECEIVED_BLOB)
|
||||
d = defer.succeed({SEND_BLOB: True})
|
||||
return d
|
||||
|
||||
|
||||
class ReflectorServerFactory(ServerFactory):
|
||||
|
@ -140,6 +321,7 @@ class ReflectorServerFactory(ServerFactory):
|
|||
def __init__(self, peer_manager, blob_manager):
|
||||
self.peer_manager = peer_manager
|
||||
self.blob_manager = blob_manager
|
||||
self.protocol_version = REFLECTOR_V2
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
log.debug('Creating a protocol for %s', addr)
|
||||
|
|
Loading…
Reference in a new issue