Merge pull request #177 from lbryio/blob-request-handler-refactor
refactor BlobRequestHandler
This commit is contained in:
commit
8222f940b4
1 changed files with 46 additions and 36 deletions
|
@ -32,12 +32,14 @@ class BlobRequestHandlerFactory(object):
|
||||||
|
|
||||||
class BlobRequestHandler(object):
|
class BlobRequestHandler(object):
|
||||||
implements(IQueryHandler, IBlobSender)
|
implements(IQueryHandler, IBlobSender)
|
||||||
|
PAYMENT_RATE_QUERY = 'blob_data_payment_rate'
|
||||||
|
BLOB_QUERY = 'requested_blob'
|
||||||
|
|
||||||
def __init__(self, blob_manager, wallet, payment_rate_manager):
|
def __init__(self, blob_manager, wallet, payment_rate_manager):
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.payment_rate_manager = payment_rate_manager
|
self.payment_rate_manager = payment_rate_manager
|
||||||
self.wallet = wallet
|
self.wallet = wallet
|
||||||
self.query_identifiers = ['blob_data_payment_rate', 'requested_blob']
|
self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY]
|
||||||
self.peer = None
|
self.peer = None
|
||||||
self.blob_data_payment_rate = None
|
self.blob_data_payment_rate = None
|
||||||
self.read_handle = None
|
self.read_handle = None
|
||||||
|
@ -54,44 +56,52 @@ class BlobRequestHandler(object):
|
||||||
|
|
||||||
def handle_queries(self, queries):
|
def handle_queries(self, queries):
|
||||||
response = {}
|
response = {}
|
||||||
if self.query_identifiers[0] in queries:
|
if self.PAYMENT_RATE_QUERY in queries:
|
||||||
if not self.handle_blob_data_payment_rate(queries[self.query_identifiers[0]]):
|
self._handle_payment_rate_query(response, queries[self.PAYMENT_RATE_QUERY])
|
||||||
response['blob_data_payment_rate'] = "RATE_TOO_LOW"
|
if self.BLOB_QUERY in queries:
|
||||||
else:
|
return self._handle_blob_query(response, queries[self.BLOB_QUERY])
|
||||||
response['blob_data_payment_rate'] = 'RATE_ACCEPTED'
|
|
||||||
|
|
||||||
if self.query_identifiers[1] in queries:
|
|
||||||
log.debug("Received the client's request to send a blob")
|
|
||||||
response_fields = {}
|
|
||||||
response['incoming_blob'] = response_fields
|
|
||||||
|
|
||||||
if self.blob_data_payment_rate is None:
|
|
||||||
response_fields['error'] = "RATE_UNSET"
|
|
||||||
return defer.succeed(response)
|
|
||||||
else:
|
|
||||||
|
|
||||||
d = self.blob_manager.get_blob(queries[self.query_identifiers[1]], True)
|
|
||||||
|
|
||||||
def open_blob_for_reading(blob):
|
|
||||||
if blob.is_validated():
|
|
||||||
read_handle = blob.open_for_reading()
|
|
||||||
if read_handle is not None:
|
|
||||||
self.currently_uploading = blob
|
|
||||||
self.read_handle = read_handle
|
|
||||||
log.info("Sending %s to client", str(blob))
|
|
||||||
response_fields['blob_hash'] = blob.blob_hash
|
|
||||||
response_fields['length'] = blob.length
|
|
||||||
return response
|
|
||||||
log.warning("We can not send %s", str(blob))
|
|
||||||
response_fields['error'] = "BLOB_UNAVAILABLE"
|
|
||||||
return response
|
|
||||||
|
|
||||||
d.addCallback(open_blob_for_reading)
|
|
||||||
|
|
||||||
return d
|
|
||||||
else:
|
else:
|
||||||
return defer.succeed(response)
|
return defer.succeed(response)
|
||||||
|
|
||||||
|
def _handle_payment_rate_query(self, response, query):
|
||||||
|
if not self.handle_blob_data_payment_rate(query):
|
||||||
|
response['blob_data_payment_rate'] = "RATE_TOO_LOW"
|
||||||
|
else:
|
||||||
|
response['blob_data_payment_rate'] = 'RATE_ACCEPTED'
|
||||||
|
|
||||||
|
def _handle_blob_query(self, response, query):
|
||||||
|
log.debug("Received the client's request to send a blob")
|
||||||
|
response['incoming_blob'] = {}
|
||||||
|
|
||||||
|
if self.blob_data_payment_rate is None:
|
||||||
|
response['incoming_blob']['error'] = "RATE_UNSET"
|
||||||
|
return defer.succeed(response)
|
||||||
|
else:
|
||||||
|
return self._send_blob(response, query)
|
||||||
|
|
||||||
|
def _send_blob(self, response, query):
|
||||||
|
d = self.blob_manager.get_blob(query, True)
|
||||||
|
d.addCallback(self.open_blob_for_reading, response)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def open_blob_for_reading(self, blob, response):
|
||||||
|
def failure(msg):
|
||||||
|
log.warning("We can not send %s: %s", blob, msg)
|
||||||
|
response['incoming_blob']['error'] = "BLOB_UNAVAILABLE"
|
||||||
|
return response
|
||||||
|
if not blob.is_validated():
|
||||||
|
return failure("blob can't be validated")
|
||||||
|
read_handle = blob.open_for_reading()
|
||||||
|
if read_handle is None:
|
||||||
|
return failure("blob can't be opened")
|
||||||
|
|
||||||
|
self.currently_uploading = blob
|
||||||
|
self.read_handle = read_handle
|
||||||
|
log.info("Sending %s to client", blob)
|
||||||
|
response['incoming_blob']['blob_hash'] = blob.blob_hash
|
||||||
|
response['incoming_blob']['length'] = blob.length
|
||||||
|
return response
|
||||||
|
|
||||||
######### IBlobSender #########
|
######### IBlobSender #########
|
||||||
|
|
||||||
def send_blob_if_requested(self, consumer):
|
def send_blob_if_requested(self, consumer):
|
||||||
|
|
Loading…
Reference in a new issue