Merge branch 'master' into blob-prices

# Conflicts:
#	lbrynet/core/server/BlobRequestHandler.py
This commit is contained in:
Jack 2016-09-30 00:11:53 -04:00
commit b853656aa6

View file

@ -5,8 +5,7 @@ from twisted.protocols.basic import FileSender
from twisted.python.failure import Failure from twisted.python.failure import Failure
from zope.interface import implements from zope.interface import implements
from lbrynet.core.Offer import Offer, Negotiate from lbrynet.core.Offer import Offer
from lbrynet.core.Strategy import get_default_strategy
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender
@ -36,12 +35,15 @@ class BlobRequestHandlerFactory(object):
class BlobRequestHandler(object): class BlobRequestHandler(object):
implements(IQueryHandler, IBlobSender) implements(IQueryHandler, IBlobSender)
PAYMENT_RATE_QUERY = 'blob_data_payment_rate'
BLOB_QUERY = 'requested_blob'
AVAILABILITY_QUERY = 'requested_blobs'
def __init__(self, blob_manager, wallet, payment_rate_manager): def __init__(self, blob_manager, wallet, payment_rate_manager):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.wallet = wallet self.wallet = wallet
self.query_identifiers = ['blob_data_payment_rate', 'requested_blob', 'requested_blobs'] self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY]
self.peer = None self.peer = None
self.blob_data_payment_rate = None self.blob_data_payment_rate = None
self.read_handle = None self.read_handle = None
@ -60,20 +62,64 @@ class BlobRequestHandler(object):
def handle_queries(self, queries): def handle_queries(self, queries):
response = defer.succeed({}) response = defer.succeed({})
if self.query_identifiers[2] in queries: if self.AVAILABILITY_QUERY in queries:
self._blobs_requested = queries[self.query_identifiers[2]] self._blobs_requested = queries[self.AVAILABILITY_QUERY]
response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested)) response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested))
if self.query_identifiers[0] in queries: if self.PAYMENT_RATE_QUERY in queries:
offer = Offer(queries[self.query_identifiers[0]]) offer = Offer(queries[self.PAYMENT_RATE_QUERY])
response.addCallback(lambda r: self.reply_to_offer(offer, r)) response.addCallback(lambda r: self._handle_payment_rate_query(offer, r))
if self.query_identifiers[1] in queries: if self.BLOB_QUERY in queries:
incoming = queries[self.query_identifiers[1]] incoming = queries[self.BLOB_QUERY]
response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) response.addCallback(lambda r: self._reply_to_send_request(r, incoming))
return response return response
def _handle_payment_rate_query(self, offer, request):
blobs = request.get("available_blobs", [])
log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs))
accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer)
if accepted:
self.blob_data_payment_rate = offer.rate
request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED"
else:
request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW"
return request
def _handle_blob_query(self, response, query):
log.debug("Received the client's request to send a blob")
response['incoming_blob'] = {}
if self.blob_data_payment_rate is None:
response['incoming_blob']['error'] = "RATE_UNSET"
return defer.succeed(response)
else:
return self._send_blob(response, query)
def _send_blob(self, response, query):
d = self.blob_manager.get_blob(query, True)
d.addCallback(self.open_blob_for_reading, response)
return d
def open_blob_for_reading(self, blob, response):
def failure(msg):
log.warning("We can not send %s: %s", blob, msg)
response['incoming_blob']['error'] = "BLOB_UNAVAILABLE"
return response
if not blob.is_validated():
return failure("blob can't be validated")
read_handle = blob.open_for_reading()
if read_handle is None:
return failure("blob can't be opened")
self.currently_uploading = blob
self.read_handle = read_handle
log.info("Sending %s to client", blob)
response['incoming_blob']['blob_hash'] = blob.blob_hash
response['incoming_blob']['length'] = blob.length
return response
######### IBlobSender ######### ######### IBlobSender #########
def send_blob_if_requested(self, consumer): def send_blob_if_requested(self, consumer):
@ -140,16 +186,6 @@ class BlobRequestHandler(object):
d.addCallback(lambda blob: self.open_blob_for_reading(blob, response)) d.addCallback(lambda blob: self.open_blob_for_reading(blob, response))
return d return d
def reply_to_offer(self, offer, request):
blobs = request.get("available_blobs", [])
log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs))
accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer)
if accepted:
self.blob_data_payment_rate = offer.rate
r = Negotiate.make_dict_from_offer(offer)
request.update(r)
return request
def _get_available_blobs(self, requested_blobs): def _get_available_blobs(self, requested_blobs):
d = self.blob_manager.completed_blobs(requested_blobs) d = self.blob_manager.completed_blobs(requested_blobs)
return d return d