lbry-sdk/lbrynet/reflector/client/client.py

366 lines
14 KiB
Python
Raw Normal View History

2016-08-04 05:03:14 +02:00
import json
import logging
2016-11-10 23:16:35 +01:00
from twisted.internet.error import ConnectionRefusedError
from twisted.protocols.basic import FileSender
2016-08-04 05:03:14 +02:00
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import defer, error
2016-08-04 05:03:14 +02:00
from lbrynet.reflector.common import IncompleteResponse, ReflectorRequestError
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
2016-08-04 05:03:14 +02:00
log = logging.getLogger(__name__)
2016-08-04 05:03:14 +02:00
2016-09-27 20:18:16 +02:00
class EncryptedFileReflectorClient(Protocol):
# Protocol stuff
2016-08-04 05:03:14 +02:00
def connectionMade(self):
log.debug("Connected to reflector")
2016-08-04 05:03:14 +02:00
self.response_buff = ''
self.outgoing_buff = ''
self.blob_hashes_to_send = []
self.failed_blob_hashes = []
2016-08-04 05:03:14 +02:00
self.next_blob_to_send = None
self.read_handle = None
self.sent_stream_info = False
self.received_descriptor_response = False
self.received_server_version = False
self.server_version = None
self.stream_descriptor = None
self.descriptor_needed = None
self.needed_blobs = []
self.reflected_blobs = []
self.file_sender = None
self.producer = None
self.streaming = False
d = self.load_descriptor()
2016-08-04 05:03:14 +02:00
d.addCallback(lambda _: self.send_handshake())
2016-11-10 23:16:35 +01:00
d.addErrback(
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
2016-08-04 05:03:14 +02:00
@property
2017-03-06 22:52:16 +01:00
def name(self):
return "lbry://%s" % self.factory.name
@property
def blob_manager(self):
return self.factory.blob_manager
@property
def protocol_version(self):
return self.factory.protocol_version
@property
def stream_info_manager(self):
return self.factory.stream_info_manager
@property
def stream_hash(self):
return self.factory.stream_hash
2016-08-04 05:03:14 +02:00
def dataReceived(self, data):
self.response_buff += data
try:
msg = self.parse_response(self.response_buff)
except IncompleteResponse:
2016-08-04 05:03:14 +02:00
pass
else:
self.response_buff = ''
d = self.handle_response(msg)
d.addCallback(lambda _: self.send_next_request())
d.addErrback(self.response_failure_handler)
2016-08-04 05:03:14 +02:00
def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
if not self.needed_blobs:
log.info("Reflector has all blobs for %s (%s)",
2017-03-06 22:52:16 +01:00
self.name, self.stream_descriptor)
elif not self.reflected_blobs:
log.info("No more completed blobs for %s (%s) to reflect, %i are still needed",
2017-03-06 22:52:16 +01:00
self.name, self.stream_descriptor, len(self.needed_blobs))
else:
log.info('Finished sending reflector %i blobs for %s (%s)',
2017-03-06 22:52:16 +01:00
len(self.reflected_blobs), self.name, self.stream_descriptor)
self.factory.finished_deferred.callback(self.reflected_blobs)
elif reason.check(error.ConnectionLost):
2017-03-06 22:52:16 +01:00
log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.name,
self.stream_descriptor, len(self.reflected_blobs))
self.factory.finished_deferred.callback(self.reflected_blobs)
else:
2017-03-06 22:52:16 +01:00
log.info('Reflector finished for %s (%s): %s', self.name, self.stream_descriptor,
reason)
self.factory.finished_deferred.callback(reason)
2016-08-04 05:03:14 +02:00
# IConsumer stuff
def registerProducer(self, producer, streaming):
self.producer = producer
self.streaming = streaming
if self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.producer = None
def write(self, data):
self.transport.write(data)
if self.producer is not None and self.streaming is False:
from twisted.internet import reactor
reactor.callLater(0, self.producer.resumeProducing)
def get_validated_blobs(self, blobs_in_stream):
def get_blobs(blobs):
for (blob, _, _, blob_len) in blobs:
2017-09-07 21:03:01 +02:00
if blob and blob_len:
2017-02-14 20:18:42 +01:00
yield self.blob_manager.get_blob(blob, blob_len)
dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()])
return dl
def set_blobs_to_send(self, blobs_to_send):
for blob in blobs_to_send:
if blob not in self.blob_hashes_to_send:
self.blob_hashes_to_send.append(blob)
def get_blobs_to_send(self):
def _show_missing_blobs(filtered):
if filtered:
needs_desc = "" if not self.descriptor_needed else "descriptor and "
log.info("Reflector needs %s%i blobs for %s",
needs_desc,
len(filtered),
2017-03-06 22:52:16 +01:00
self.name)
return filtered
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
d.addCallback(self.get_validated_blobs)
if not self.descriptor_needed:
d.addCallback(lambda filtered:
[blob for blob in filtered if blob.blob_hash in self.needed_blobs])
d.addCallback(_show_missing_blobs)
d.addCallback(self.set_blobs_to_send)
d.addCallback(lambda _: None if self.descriptor_needed else self.set_not_uploading())
return d
2016-08-04 05:03:14 +02:00
def send_request(self, request_dict):
self.write(json.dumps(request_dict))
def send_handshake(self):
self.send_request({'version': self.protocol_version})
def load_descriptor(self):
def _save_descriptor_blob(sd_blob):
self.stream_descriptor = sd_blob
d = self.factory.stream_info_manager.get_sd_blob_hashes_for_stream(self.factory.stream_hash)
2017-02-14 20:18:42 +01:00
d.addCallback(lambda sd: self.factory.blob_manager.get_blob(sd[0]))
d.addCallback(_save_descriptor_blob)
2016-08-04 05:03:14 +02:00
return d
def parse_response(self, buff):
try:
return json.loads(buff)
except ValueError:
raise IncompleteResponse()
2016-08-04 05:03:14 +02:00
def response_failure_handler(self, err):
log.warning("An error occurred handling the response: %s", err.getTraceback())
2016-08-04 05:03:14 +02:00
def handle_response(self, response_dict):
if not self.received_server_version:
return self.handle_handshake_response(response_dict)
elif not self.received_descriptor_response and self.server_version == REFLECTOR_V2:
return self.handle_descriptor_response(response_dict)
2016-08-04 05:03:14 +02:00
else:
return self.handle_normal_response(response_dict)
def set_not_uploading(self):
if self.next_blob_to_send is not None:
log.debug("Close %s", self.next_blob_to_send)
self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None
self.next_blob_to_send = None
if self.file_sender is not None:
self.file_sender.stopProducing()
self.file_sender = None
return defer.succeed(None)
def start_transfer(self):
2016-11-30 21:20:45 +01:00
assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d
2016-08-04 05:03:14 +02:00
def handle_handshake_response(self, response_dict):
if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.server_version = int(response_dict['version'])
if self.server_version not in [REFLECTOR_V1, REFLECTOR_V2]:
raise ValueError("I can't handle protocol version {}!".format(self.server_version))
self.received_server_version = True
return defer.succeed(True)
2016-08-04 05:03:14 +02:00
def handle_descriptor_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_sd_blob' not in response_dict:
raise ReflectorRequestError("I don't know whether to send the sd blob or not!")
if response_dict['send_sd_blob'] is True:
self.file_sender = FileSender()
else:
self.received_descriptor_response = True
self.descriptor_needed = response_dict['send_sd_blob']
self.needed_blobs = response_dict.get('needed_blobs', [])
return self.get_blobs_to_send()
else: # Expecting Server Blob Response
if 'received_sd_blob' not in response_dict:
raise ValueError("I don't know if the sd blob made it to the intended destination!")
else:
self.received_descriptor_response = True
if response_dict['received_sd_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.info("Sent reflector descriptor %s", self.next_blob_to_send)
else:
log.warning("Reflector failed to receive descriptor %s for %s",
2017-03-06 22:52:16 +01:00
self.next_blob_to_send, self.name)
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading()
2016-08-04 05:03:14 +02:00
def handle_normal_response(self, response_dict):
if self.file_sender is None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return defer.succeed(True)
else:
log.warning("Reflector already has %s for %s", self.next_blob_to_send,
2017-03-06 22:52:16 +01:00
self.name)
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
if response_dict['received_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
2017-02-13 18:12:32 +01:00
log.debug("Sent reflector blob %s for %s", self.next_blob_to_send,
2017-03-06 22:52:16 +01:00
self.name)
else:
log.warning("Reflector failed to receive blob %s for %s",
2017-03-06 22:52:16 +01:00
self.next_blob_to_send, self.name)
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.get_is_verified():
read_handle = blob.open_for_reading()
if read_handle is not None:
2016-08-11 02:04:03 +02:00
log.debug('Getting ready to send %s', blob.blob_hash)
self.next_blob_to_send = blob
self.read_handle = read_handle
return defer.succeed(None)
return defer.fail(ValueError(
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)))
def send_blob_info(self):
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
r = {
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length
}
self.send_request(r)
def send_descriptor_info(self):
assert self.stream_descriptor is not None, "need to have a sd blob to send at this point"
r = {
'sd_blob_hash': self.stream_descriptor.blob_hash,
'sd_blob_size': self.stream_descriptor.length
}
self.sent_stream_info = True
self.send_request(r)
2016-08-04 05:03:14 +02:00
def skip_missing_blob(self, err, blob_hash):
err.trap(ValueError)
if blob_hash not in self.failed_blob_hashes:
log.warning("Failed to reflect blob %s for %s, reason: %s",
2017-03-06 22:52:16 +01:00
str(blob_hash)[:16], self.name, err.getTraceback())
self.blob_hashes_to_send.append(blob_hash)
self.failed_blob_hashes.append(blob_hash)
else:
log.warning("Failed second try reflecting blob %s for %s, giving up, reason: %s",
2017-03-06 22:52:16 +01:00
str(blob_hash)[:16], self.name, err.getTraceback())
2016-08-04 05:03:14 +02:00
def send_next_request(self):
if self.file_sender is not None:
2016-08-04 05:03:14 +02:00
# send the blob
return self.start_transfer()
elif not self.sent_stream_info:
# open the sd blob to send
blob = self.stream_descriptor
d = self.open_blob_for_reading(blob)
d.addCallbacks(lambda _: self.send_descriptor_info(),
lambda err: self.skip_missing_blob(err, blob.blob_hash))
return d
elif self.blob_hashes_to_send:
# open the next blob to send
blob = self.blob_hashes_to_send[0]
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = self.open_blob_for_reading(blob)
2017-01-25 19:10:38 +01:00
d.addCallbacks(lambda _: self.send_blob_info(),
lambda err: self.skip_missing_blob(err, blob.blob_hash))
return d
# close connection
self.transport.loseConnection()
2016-08-04 05:03:14 +02:00
2016-09-27 20:18:16 +02:00
class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient
2016-08-04 05:03:14 +02:00
def __init__(self, lbry_file):
self._lbry_file = lbry_file
2016-08-04 05:03:14 +02:00
self.p = None
self.finished_deferred = defer.Deferred()
2016-08-04 05:03:14 +02:00
@property
def blob_manager(self):
return self._lbry_file.blob_manager
@property
def stream_info_manager(self):
return self._lbry_file.stream_info_manager
@property
def stream_hash(self):
return self._lbry_file.stream_hash
@property
2017-03-06 22:52:16 +01:00
def name(self):
return self._lbry_file.name
@property
def protocol_version(self):
return REFLECTOR_V2
2016-08-04 05:03:14 +02:00
def buildProtocol(self, addr):
p = self.protocol()
p.factory = self
self.p = p
2016-08-11 02:04:03 +02:00
return p
2016-08-27 01:58:53 +02:00
def startFactory(self):
log.debug('Starting reflector factory')
ClientFactory.startFactory(self)
def startedConnecting(self, connector):
log.debug('Connecting to reflector')
2016-08-27 01:58:53 +02:00
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
def clientConnectionFailed(self, connector, reason):
if reason.check(ConnectionRefusedError):
log.warning("Could not connect to reflector server")
else:
log.error("Reflector connection failed: %s", reason)