diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 51d630951..4babfa91c 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -32,12 +32,14 @@ class BlobRequestHandlerFactory(object): class BlobRequestHandler(object): implements(IQueryHandler, IBlobSender) + PAYMENT_RATE_QUERY = 'blob_data_payment_rate' + BLOB_QUERY = 'requested_blob' def __init__(self, blob_manager, wallet, payment_rate_manager): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.query_identifiers = ['blob_data_payment_rate', 'requested_blob'] + self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY] self.peer = None self.blob_data_payment_rate = None self.read_handle = None @@ -54,44 +56,52 @@ class BlobRequestHandler(object): def handle_queries(self, queries): response = {} - if self.query_identifiers[0] in queries: - if not self.handle_blob_data_payment_rate(queries[self.query_identifiers[0]]): - response['blob_data_payment_rate'] = "RATE_TOO_LOW" - else: - response['blob_data_payment_rate'] = 'RATE_ACCEPTED' - - if self.query_identifiers[1] in queries: - log.debug("Received the client's request to send a blob") - response_fields = {} - response['incoming_blob'] = response_fields - - if self.blob_data_payment_rate is None: - response_fields['error'] = "RATE_UNSET" - return defer.succeed(response) - else: - - d = self.blob_manager.get_blob(queries[self.query_identifiers[1]], True) - - def open_blob_for_reading(blob): - if blob.is_validated(): - read_handle = blob.open_for_reading() - if read_handle is not None: - self.currently_uploading = blob - self.read_handle = read_handle - log.info("Sending %s to client", str(blob)) - response_fields['blob_hash'] = blob.blob_hash - response_fields['length'] = blob.length - return response - log.warning("We can not send %s", str(blob)) - response_fields['error'] = "BLOB_UNAVAILABLE" - return response - - d.addCallback(open_blob_for_reading) - - return d + if self.PAYMENT_RATE_QUERY in queries: + self._handle_payment_rate_query(response, queries[self.PAYMENT_RATE_QUERY]) + if self.BLOB_QUERY in queries: + return self._handle_blob_query(response, queries[self.BLOB_QUERY]) else: return defer.succeed(response) + def _handle_payment_rate_query(self, response, query): + if not self.handle_blob_data_payment_rate(query): + response['blob_data_payment_rate'] = "RATE_TOO_LOW" + else: + response['blob_data_payment_rate'] = 'RATE_ACCEPTED' + + def _handle_blob_query(self, response, query): + log.debug("Received the client's request to send a blob") + response['incoming_blob'] = {} + + if self.blob_data_payment_rate is None: + response['incoming_blob']['error'] = "RATE_UNSET" + return defer.succeed(response) + else: + return self._send_blob(response, query) + + def _send_blob(self, response, query): + d = self.blob_manager.get_blob(query, True) + d.addCallback(self.open_blob_for_reading, response) + return d + + def open_blob_for_reading(self, blob, response): + def failure(msg): + log.warning("We can not send %s: %s", blob, msg) + response['incoming_blob']['error'] = "BLOB_UNAVAILABLE" + return response + if not blob.is_validated(): + return failure("blob can't be validated") + read_handle = blob.open_for_reading() + if read_handle is None: + return failure("blob can't be opened") + + self.currently_uploading = blob + self.read_handle = read_handle + log.info("Sending %s to client", blob) + response['incoming_blob']['blob_hash'] = blob.blob_hash + response['incoming_blob']['length'] = blob.length + return response + ######### IBlobSender ######### def send_blob_if_requested(self, consumer):