get uploads working

-add error catching in exchange rate manager
-add free data on first request with default negotiation strategy
This commit is contained in:
Jack 2016-09-27 23:56:08 -04:00
parent 1720cce3b0
commit 1cc6b7658c
4 changed files with 57 additions and 44 deletions

View file

@ -18,9 +18,10 @@ class BasicAvailabilityWeightedStrategy(object):
until the rate is accepted or a threshold is reached
"""
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005):
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0):
self._acceleration = acceleration # rate of how quickly to ramp offer
self._deceleration = deceleration
self._min_rate = min_rate
self._max_rate = max_rate
self._count_up = {}
self._count_down = {}
@ -31,13 +32,17 @@ class BasicAvailabilityWeightedStrategy(object):
def respond_to_offer(self, offer, peer, blobs):
request_count = self._count_up.get(peer, 0)
rates = [self._calculate_price(blob) for blob in blobs]
rate = self._discount(sum(rates) / max(len(blobs), 1), request_count)
log.info("Target rate: %s", rate)
rate = sum(rates) / max(len(rates), 1)
discounted = self._discount(rate, request_count)
price = self._bounded_price(discounted)
log.info("Price target: %f, final: %f", discounted, price)
self._inc_up_count(peer)
if offer.accepted:
if offer.rate == 0.0 and request_count == 0:
# give blobs away for free by default on the first request
offer.accept()
return offer
elif offer.rate >= rate:
elif offer.rate >= price:
log.info("Accept: %f", offer.rate)
offer.accept()
return offer
@ -57,9 +62,14 @@ class BasicAvailabilityWeightedStrategy(object):
rates = [self._calculate_price(blob) for blob in blobs]
mean_rate = sum(rates) / max(len(blobs), 1)
with_premium = self._premium(mean_rate, request_count)
offer = Offer(with_premium)
price = self._bounded_price(with_premium)
offer = Offer(price)
return offer
def _bounded_price(self, price):
price_for_return = min(self._max_rate, max(price, self._min_rate))
return price_for_return
def _inc_up_count(self, peer):
turn = self._count_up.get(peer, 0) + 1
self._count_up.update({peer: turn})
@ -72,7 +82,7 @@ class BasicAvailabilityWeightedStrategy(object):
return self.model.calculate_price(blob)
def _premium(self, rate, turn):
return min(rate * (self._acceleration ** turn), self._max_rate)
return rate * (self._acceleration ** turn)
def _discount(self, rate, turn):
return min(rate * (self._deceleration ** turn), self._max_rate)
return rate * (self._deceleration ** turn)

View file

@ -16,8 +16,7 @@ log = logging.getLogger(__name__)
class BlobRequestHandlerFactory(object):
implements(IQueryHandlerFactory)
def __init__(self, blob_manager, blob_tracker, wallet, payment_rate_manager):
self.blob_tracker = blob_tracker
def __init__(self, blob_manager, wallet, payment_rate_manager):
self.blob_manager = blob_manager
self.wallet = wallet
self.payment_rate_manager = payment_rate_manager
@ -25,7 +24,7 @@ class BlobRequestHandlerFactory(object):
######### IQueryHandlerFactory #########
def build_query_handler(self):
q_h = BlobRequestHandler(self.blob_manager, self.blob_tracker, self.wallet, self.payment_rate_manager)
q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager)
return q_h
def get_primary_query_identifier(self):
@ -38,9 +37,8 @@ class BlobRequestHandlerFactory(object):
class BlobRequestHandler(object):
implements(IQueryHandler, IBlobSender)
def __init__(self, blob_manager, blob_tracker, wallet, payment_rate_manager):
def __init__(self, blob_manager, wallet, payment_rate_manager):
self.blob_manager = blob_manager
self.blob_tracker = blob_tracker
self.payment_rate_manager = payment_rate_manager
self.wallet = wallet
self.query_identifiers = ['blob_data_payment_rate', 'requested_blob', 'requested_blobs']
@ -50,7 +48,6 @@ class BlobRequestHandler(object):
self.currently_uploading = None
self.file_sender = None
self.blob_bytes_uploaded = 0
self.strategy = get_default_strategy(self.blob_tracker)
self._blobs_requested = []
######### IQueryHandler #########
@ -73,8 +70,7 @@ class BlobRequestHandler(object):
if self.query_identifiers[1] in queries:
incoming = queries[self.query_identifiers[1]]
log.info("Request download: %s", str(incoming))
response.addCallback(lambda r: self._reply_to_send_request({}, incoming))
response.addCallback(lambda r: self._reply_to_send_request(r, incoming))
return response
@ -94,10 +90,6 @@ class BlobRequestHandler(object):
######### internal #########
def _add_to_response(self, response, to_add):
return response
def _reply_to_availability(self, request, blobs):
d = self._get_available_blobs(blobs)
@ -111,25 +103,26 @@ class BlobRequestHandler(object):
def open_blob_for_reading(self, blob, response):
response_fields = {}
d = defer.succeed(None)
if blob.is_validated():
read_handle = blob.open_for_reading()
if read_handle is not None:
self.currently_uploading = blob
self.read_handle = read_handle
log.info("Sending %s to client", str(blob))
log.debug("Sending %s to client", str(blob))
response_fields['blob_hash'] = blob.blob_hash
response_fields['length'] = blob.length
response['incoming_blob'] = response_fields
log.info(response)
return response, blob
d.addCallback(lambda _: self.record_transaction(blob))
d.addCallback(lambda _: response)
return d
log.warning("We can not send %s", str(blob))
response['error'] = "BLOB_UNAVAILABLE"
return response, blob
def record_transaction(self, response, blob, rate):
d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, rate)
d.addCallback(lambda _: response)
log.info(response)
return d
def record_transaction(self, blob):
d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, self.blob_data_payment_rate)
return d
def _reply_to_send_request(self, response, incoming):
@ -142,18 +135,18 @@ class BlobRequestHandler(object):
response['error'] = "RATE_UNSET"
return defer.succeed(response)
else:
log.debug("Requested blob: %s", str(incoming))
d = self.blob_manager.get_blob(incoming, True)
d.addCallback(lambda blob: self.open_blob_for_reading(blob, response))
d.addCallback(lambda (r, blob): self.record_transaction(r, blob, rate))
return d
def reply_to_offer(self, offer, request):
blobs = request.get("available_blobs", [])
log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs))
reply = self.strategy.respond_to_offer(offer, self.peer, blobs)
if reply.accepted:
self.blob_data_payment_rate = reply.rate
r = Negotiate.make_dict_from_offer(reply)
accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer)
if accepted:
self.blob_data_payment_rate = offer.rate
r = Negotiate.make_dict_from_offer(offer)
request.update(r)
return request
@ -178,7 +171,7 @@ class BlobRequestHandler(object):
def start_transfer():
self.file_sender = FileSender()
log.info("Starting the file upload")
log.debug("Starting the file upload")
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, consumer, count_bytes)
return d

View file

@ -673,7 +673,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
# TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY.
def _start_file(f):
d = self.lbry_file_manager.toggle_lbry_file_running(f)
d.addCallback(lambda _: self.lighthouse_client.announce_sd(f.sd_hash))
return defer.succeed("Started LBRY file")
def _get_and_start_file(name):
@ -775,15 +774,13 @@ class LBRYDaemon(jsonrpc.JSONRPC):
# CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet,
# self._server_payment_rate_manager),
# BlobAvailabilityHandlerFactory(self.session.blob_manager),
BlobRequestHandlerFactory(self.session.blob_manager, self.session.blob_tracker, self.session.wallet,
self.session.payment_rate_manager),
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
self.session.payment_rate_manager),
self.session.wallet.get_wallet_info_query_handler_factory(),
]
def get_blob_request_handler_factory(rate):
self.blob_request_payment_rate_manager = self.session.payment_rate_manager
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.blob_tracker, self.session.wallet,
self.blob_request_payment_rate_manager))
d1 = self.settings.get_server_data_payment_rate()
d1.addCallback(get_blob_request_handler_factory)
@ -2552,7 +2549,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = self._render_response(SEARCH_SERVERS, OK_CODE)
return d
def jsonrpc_get_mean_availability(self):
"""
Get mean blob availability

View file

@ -37,24 +37,34 @@ class MarketFeed(object):
self._updater = LoopingCall(self._update_price)
def _make_request(self):
r = requests.get(self.url, self.params)
return r.text
try:
r = requests.get(self.url, self.params)
return defer.succeed(r.text)
except Exception as err:
log.error(err)
return defer.fail(err)
def _handle_response(self, response):
return NotImplementedError
def _subtract_fee(self, from_amount):
# increase amount to account for market fees
return defer.succeed(from_amount / (1.0 - self.fee))
def _save_price(self, price):
log.debug("Saving price update %f for %s" % (price, self.market))
self.rate = ExchangeRate(self.market, price, int(time.time()))
def _log_error(self, err):
log.error(err)
log.warning("There was a problem updating %s exchange rate information from %s", self.market, self.name)
def _update_price(self):
d = defer.succeed(self._make_request())
d = self._make_request()
d.addCallback(self._handle_response)
d.addCallback(self._subtract_fee)
d.addCallback(self._save_price)
d.addErrback(self._log_error)
def start(self):
if not self._updater.running:
@ -94,7 +104,11 @@ class GoogleBTCFeed(MarketFeed):
)
def _make_request(self):
return googlefinance.getQuotes('CURRENCY:USDBTC')[0]
try:
r = googlefinance.getQuotes('CURRENCY:USDBTC')[0]
return defer.succeed(r)
except Exception as err:
return defer.fail(err)
def _handle_response(self, response):
return float(response['LastTradePrice'])