lbry-sdk/lbrynet/reflector/client/client.py

465 lines
16 KiB
Python
Raw Normal View History

2016-08-04 05:03:14 +02:00
"""
The reflector protocol (all dicts encoded in json):
Client Handshake (sent once per connection, at the start of the connection):
{
'version': 0,
}
Server Handshake (sent once per connection, after receiving the client handshake):
{
'version': 0,
}
Client Info Request:
{
'blob_hash': "<blob_hash>",
'blob_size': <blob_size>
}
Server Info Response (sent in response to Client Info Request):
{
'send_blob': True|False
2016-08-04 05:03:14 +02:00
}
If response is 'YES', client may send a Client Blob Request or a Client Info Request.
If response is 'NO', client may only send a Client Info Request
Client Blob Request:
{} # Yes, this is an empty dictionary, in case something needs to go here in the future
<raw blob_data> # this blob data must match the info sent in the most recent Client Info Request
Server Blob Response (sent in response to Client Blob Request):
{
'received_blob': True
2016-08-04 05:03:14 +02:00
}
Client may now send another Client Info Request
"""
import json
import logging
2016-11-10 23:16:35 +01:00
from twisted.protocols.basic import FileSender
2016-08-04 05:03:14 +02:00
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
2016-08-04 05:03:14 +02:00
log = logging.getLogger(__name__)
class IncompleteResponseError(Exception):
pass
2016-09-27 20:18:16 +02:00
class EncryptedFileReflectorClient(Protocol):
# Protocol stuff
2016-08-04 05:03:14 +02:00
def connectionMade(self):
self.blob_manager = self.factory.blob_manager
2016-08-04 05:03:14 +02:00
self.response_buff = ''
self.outgoing_buff = ''
self.blob_hashes_to_send = []
self.next_blob_to_send = None
self.blob_read_handle = None
2016-08-04 05:03:14 +02:00
self.received_handshake_response = False
self.protocol_version = None
self.file_sender = None
self.producer = None
self.streaming = False
2016-08-04 05:03:14 +02:00
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
d.addCallback(lambda _: self.send_handshake())
2016-11-10 23:16:35 +01:00
d.addErrback(
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
2016-08-04 05:03:14 +02:00
def dataReceived(self, data):
2016-08-11 02:04:03 +02:00
log.debug('Recieved %s', data)
2016-08-04 05:03:14 +02:00
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
except IncompleteResponseError:
pass
else:
self.response_buff = ''
d = self.handle_response(msg)
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
2016-08-04 05:03:14 +02:00
def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
2016-08-11 02:04:03 +02:00
log.debug('Finished sending data via reflector')
self.factory.finished_deferred.callback(True)
else:
2016-11-01 18:05:19 +01:00
log.debug('Reflector finished: %s', reason)
self.factory.finished_deferred.callback(reason)
2016-08-04 05:03:14 +02:00
# IConsumer stuff
def registerProducer(self, producer, streaming):
self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.producer = None
def write(self, data):
self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
2016-08-04 05:03:14 +02:00
def get_blobs_to_send(self, stream_info_manager, stream_hash):
2016-08-11 02:04:03 +02:00
log.debug('Getting blobs from stream hash: %s', stream_hash)
2016-08-04 05:03:14 +02:00
d = stream_info_manager.get_blobs_for_stream(stream_hash)
def set_blobs(blob_hashes):
for blob_hash, position, iv, length in blob_hashes:
log.debug("Preparing to send %s", blob_hash)
if blob_hash is not None:
self.blob_hashes_to_send.append(blob_hash)
2016-08-04 05:03:14 +02:00
d.addCallback(set_blobs)
d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash))
def set_sd_blobs(sd_blob_hashes):
for sd_blob_hash in sd_blob_hashes:
self.blob_hashes_to_send.append(sd_blob_hash)
d.addCallback(set_sd_blobs)
2016-08-04 05:03:14 +02:00
return d
def send_handshake(self):
2016-08-11 02:04:03 +02:00
log.debug('Sending handshake')
self.write(json.dumps({'version': 0}))
2016-08-04 05:03:14 +02:00
def parse_response(self, buff):
try:
return json.loads(buff)
except ValueError:
raise IncompleteResponseError()
def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())
2016-08-04 05:03:14 +02:00
def handle_response(self, response_dict):
if self.received_handshake_response is False:
return self.handle_handshake_response(response_dict)
2016-08-04 05:03:14 +02:00
else:
return self.handle_normal_response(response_dict)
def set_not_uploading(self):
if self.next_blob_to_send is not None:
self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None
self.next_blob_to_send = None
self.file_sender = None
return defer.succeed(None)
def start_transfer(self):
self.write(json.dumps({}))
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d
2016-08-04 05:03:14 +02:00
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.protocol_version = int(response_dict['version'])
if self.protocol_version != 0:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True
return defer.succeed(True)
2016-08-04 05:03:14 +02:00
def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return defer.succeed(True)
else:
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.is_validated():
read_handle = blob.open_for_reading()
if read_handle is not None:
2016-08-11 02:04:03 +02:00
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return None
2016-11-10 23:16:35 +01:00
raise ValueError(
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
def send_blob_info(self):
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
2016-08-11 02:04:03 +02:00
log.debug('sending blob info')
self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}))
2016-08-04 05:03:14 +02:00
def send_next_request(self):
if self.file_sender is not None:
2016-08-04 05:03:14 +02:00
# send the blob
2016-08-11 02:04:03 +02:00
log.debug('Sending the blob')
return self.start_transfer()
elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
2016-08-11 02:04:03 +02:00
log.debug('No current blob, sending the next one: %s', blob_hash)
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = self.blob_manager.get_blob(blob_hash, True)
d.addCallback(self.open_blob_for_reading)
2016-08-04 05:03:14 +02:00
# send the server the next blob hash + length
d.addCallback(lambda _: self.send_blob_info())
return d
2016-08-04 05:03:14 +02:00
else:
# close connection
2016-08-11 02:04:03 +02:00
log.debug('No more blob hashes, closing connection')
2016-08-11 02:36:52 +02:00
self.transport.loseConnection()
2016-08-04 05:03:14 +02:00
2016-09-27 20:18:16 +02:00
class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient
2016-08-04 05:03:14 +02:00
def __init__(self, blob_manager, stream_info_manager, stream_hash):
self.blob_manager = blob_manager
2016-08-04 05:03:14 +02:00
self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash
self.p = None
self.finished_deferred = defer.Deferred()
2016-08-04 05:03:14 +02:00
def buildProtocol(self, addr):
p = self.protocol()
p.factory = self
self.p = p
2016-08-11 02:04:03 +02:00
return p
def startFactory(self):
log.debug('Starting reflector factory')
ClientFactory.startFactory(self)
def startedConnecting(self, connector):
log.debug('Started connecting')
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
2016-08-11 02:39:28 +02:00
log.debug("connection lost: %s", reason)
2016-08-11 02:04:03 +02:00
def clientConnectionFailed(self, connector, reason):
2016-08-11 02:39:28 +02:00
log.debug("connection failed: %s", reason)
2016-08-27 01:58:53 +02:00
class BlobReflectorClient(Protocol):
2016-08-27 01:58:53 +02:00
# Protocol stuff
def connectionMade(self):
self.blob_manager = self.factory.blob_manager
self.response_buff = ''
self.outgoing_buff = ''
self.blob_hashes_to_send = self.factory.blobs
self.next_blob_to_send = None
self.blob_read_handle = None
self.received_handshake_response = False
self.protocol_version = None
self.file_sender = None
self.producer = None
self.streaming = False
self.sent_blobs = False
2016-08-27 01:58:53 +02:00
d = self.send_handshake()
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
def dataReceived(self, data):
log.debug('Recieved %s', data)
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
except IncompleteResponseError:
pass
else:
self.response_buff = ''
d = self.handle_response(msg)
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
self.factory.sent_blobs = self.sent_blobs
if self.factory.sent_blobs:
log.info('Finished sending data via reflector')
2016-08-27 01:58:53 +02:00
self.factory.finished_deferred.callback(True)
else:
log.info('Reflector finished: %s', reason)
2016-08-27 01:58:53 +02:00
self.factory.finished_deferred.callback(reason)
# IConsumer stuff
def registerProducer(self, producer, streaming):
self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.producer = None
def write(self, data):
self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def send_handshake(self):
log.debug('Sending handshake')
self.write(json.dumps({'version': 0}))
2016-08-27 02:13:10 +02:00
return defer.succeed(None)
2016-08-27 01:58:53 +02:00
def parse_response(self, buff):
try:
return json.loads(buff)
except ValueError:
raise IncompleteResponseError()
def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())
def handle_response(self, response_dict):
if self.received_handshake_response is False:
return self.handle_handshake_response(response_dict)
else:
return self.handle_normal_response(response_dict)
def set_not_uploading(self):
if self.next_blob_to_send is not None:
self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None
self.next_blob_to_send = None
self.file_sender = None
return defer.succeed(None)
def start_transfer(self):
self.sent_blobs = True
2016-08-27 01:58:53 +02:00
self.write(json.dumps({}))
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.protocol_version = int(response_dict['version'])
if self.protocol_version != 0:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True
return defer.succeed(True)
def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return defer.succeed(True)
else:
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.is_validated():
read_handle = blob.open_for_reading()
if read_handle is not None:
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return None
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
def send_blob_info(self):
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
2016-08-27 01:58:53 +02:00
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
log.debug('sending blob info')
self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}))
2016-11-01 21:32:34 +01:00
def log_fail_and_disconnect(self, err, blob_hash):
log.error("Error reflecting blob %s", blob_hash)
self.transport.loseConnection()
2016-08-27 01:58:53 +02:00
def send_next_request(self):
if self.file_sender is not None:
# send the blob
log.debug('Sending the blob')
return self.start_transfer()
elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
log.debug('No current blob, sending the next one: %s', blob_hash)
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = self.blob_manager.get_blob(blob_hash, True)
d.addCallback(self.open_blob_for_reading)
# send the server the next blob hash + length
2016-11-01 21:32:34 +01:00
d.addCallbacks(lambda _: self.send_blob_info(), lambda err: self.log_fail_and_disconnect(err, blob_hash))
2016-08-27 01:58:53 +02:00
return d
else:
# close connection
log.debug('No more blob hashes, closing connection')
self.transport.loseConnection()
class BlobReflectorClientFactory(ClientFactory):
protocol = BlobReflectorClient
2016-08-27 01:58:53 +02:00
def __init__(self, blob_manager, blobs):
self.blob_manager = blob_manager
self.blobs = blobs
self.p = None
self.sent_blobs = False
2016-08-27 01:58:53 +02:00
self.finished_deferred = defer.Deferred()
def buildProtocol(self, addr):
p = self.protocol()
p.factory = self
self.p = p
return p
def startFactory(self):
log.debug('Starting reflector factory')
ClientFactory.startFactory(self)
def startedConnecting(self, connector):
log.debug('Started connecting')
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
log.debug("connection lost: %s", reason)
def clientConnectionFailed(self, connector, reason):
log.debug("connection failed: %s", reason)