diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index a76d3428d..266ff82c0 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -5,8 +5,7 @@ from twisted.protocols.basic import FileSender from twisted.python.failure import Failure from zope.interface import implements -from lbrynet.core.Offer import Offer, Negotiate -from lbrynet.core.Strategy import get_default_strategy +from lbrynet.core.Offer import Offer from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender @@ -36,12 +35,15 @@ class BlobRequestHandlerFactory(object): class BlobRequestHandler(object): implements(IQueryHandler, IBlobSender) + PAYMENT_RATE_QUERY = 'blob_data_payment_rate' + BLOB_QUERY = 'requested_blob' + AVAILABILITY_QUERY = 'requested_blobs' def __init__(self, blob_manager, wallet, payment_rate_manager): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.query_identifiers = ['blob_data_payment_rate', 'requested_blob', 'requested_blobs'] + self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] self.peer = None self.blob_data_payment_rate = None self.read_handle = None @@ -60,20 +62,64 @@ class BlobRequestHandler(object): def handle_queries(self, queries): response = defer.succeed({}) - if self.query_identifiers[2] in queries: - self._blobs_requested = queries[self.query_identifiers[2]] + if self.AVAILABILITY_QUERY in queries: + self._blobs_requested = queries[self.AVAILABILITY_QUERY] response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested)) - if self.query_identifiers[0] in queries: - offer = Offer(queries[self.query_identifiers[0]]) - response.addCallback(lambda r: self.reply_to_offer(offer, r)) + if self.PAYMENT_RATE_QUERY in queries: + offer = Offer(queries[self.PAYMENT_RATE_QUERY]) + response.addCallback(lambda r: self._handle_payment_rate_query(offer, r)) - if self.query_identifiers[1] in queries: - incoming = queries[self.query_identifiers[1]] + if self.BLOB_QUERY in queries: + incoming = queries[self.BLOB_QUERY] response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) return response + def _handle_payment_rate_query(self, offer, request): + blobs = request.get("available_blobs", []) + log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs)) + accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer) + if accepted: + self.blob_data_payment_rate = offer.rate + request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" + else: + request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" + return request + + def _handle_blob_query(self, response, query): + log.debug("Received the client's request to send a blob") + response['incoming_blob'] = {} + + if self.blob_data_payment_rate is None: + response['incoming_blob']['error'] = "RATE_UNSET" + return defer.succeed(response) + else: + return self._send_blob(response, query) + + def _send_blob(self, response, query): + d = self.blob_manager.get_blob(query, True) + d.addCallback(self.open_blob_for_reading, response) + return d + + def open_blob_for_reading(self, blob, response): + def failure(msg): + log.warning("We can not send %s: %s", blob, msg) + response['incoming_blob']['error'] = "BLOB_UNAVAILABLE" + return response + if not blob.is_validated(): + return failure("blob can't be validated") + read_handle = blob.open_for_reading() + if read_handle is None: + return failure("blob can't be opened") + + self.currently_uploading = blob + self.read_handle = read_handle + log.info("Sending %s to client", blob) + response['incoming_blob']['blob_hash'] = blob.blob_hash + response['incoming_blob']['length'] = blob.length + return response + ######### IBlobSender ######### def send_blob_if_requested(self, consumer): @@ -140,16 +186,6 @@ class BlobRequestHandler(object): d.addCallback(lambda blob: self.open_blob_for_reading(blob, response)) return d - def reply_to_offer(self, offer, request): - blobs = request.get("available_blobs", []) - log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs)) - accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer) - if accepted: - self.blob_data_payment_rate = offer.rate - r = Negotiate.make_dict_from_offer(offer) - request.update(r) - return request - def _get_available_blobs(self, requested_blobs): d = self.blob_manager.completed_blobs(requested_blobs) return d