From 4f60a98eb2764e75dc56eba547a395fdbc94be6c Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 5 Oct 2016 22:58:34 -0400 Subject: [PATCH] fix functional tests, add a few unit tests, MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit -add ‘generous’ parameter for NegotiatedPaymentRateManager to turn free hosting on/off, by default set to true. --- lbrynet/conf.py | 2 +- lbrynet/core/BlobAvailability.py | 13 +- lbrynet/core/Offer.py | 28 ++- lbrynet/core/PaymentRateManager.py | 22 ++- lbrynet/core/PriceModel.py | 9 +- lbrynet/core/Session.py | 8 +- lbrynet/core/Strategy.py | 180 ++++++++++------- lbrynet/core/client/BlobRequester.py | 18 +- lbrynet/core/client/ClientProtocol.py | 9 +- lbrynet/core/server/BlobRequestHandler.py | 79 ++++---- .../lbrylive/client/LiveStreamDownloader.py | 4 +- tests/functional/test_misc.py | 42 ++-- tests/functional/test_reflector.py | 3 +- tests/unit/core/test_Strategy.py | 183 +++++++++++++++--- 14 files changed, 406 insertions(+), 194 deletions(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index f9a413549..0be6d1c21 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -14,7 +14,7 @@ MAX_BLOB_INFOS_TO_REQUEST = 20 BLOBFILES_DIR = ".blobfiles" BLOB_SIZE = 2**21 -MIN_BLOB_DATA_PAYMENT_RATE = .001 # points/megabyte +MIN_BLOB_DATA_PAYMENT_RATE = .0001 # points/megabyte MIN_BLOB_INFO_PAYMENT_RATE = .02 # points/1000 infos MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE = .05 # points/1000 infos MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE = .05 # points/1000 infos diff --git a/lbrynet/core/BlobAvailability.py b/lbrynet/core/BlobAvailability.py index 5acd62b89..e7c8c2b44 100644 --- a/lbrynet/core/BlobAvailability.py +++ b/lbrynet/core/BlobAvailability.py @@ -3,6 +3,7 @@ import logging from twisted.internet import defer from twisted.internet.task import LoopingCall from lbrynet.core.PeerFinder import DummyPeerFinder +from decimal import Decimal log = logging.getLogger(__name__) @@ -17,7 +18,7 @@ class BlobAvailabilityTracker(object): def __init__(self, blob_manager, peer_finder, dht_node): self.availability = {} - self.last_mean_availability = 0.0 + self.last_mean_availability = Decimal(0.0) self._blob_manager = blob_manager self._peer_finder = peer_finder self._dht_node = dht_node @@ -85,7 +86,7 @@ class BlobAvailabilityTracker(object): def _get_mean_peers(self): num_peers = [len(self.availability[blob]) for blob in self.availability] - mean = float(sum(num_peers)) / float(max(1, len(num_peers))) + mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers))) self.last_mean_availability = mean @@ -97,7 +98,7 @@ class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): availability (dict): dictionary of peers for known blobs """ - def __init__(self): + def __init__(self, blob_manager=None, peer_finder=None, dht_node=None): self.availability = { '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'], 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'], @@ -110,10 +111,10 @@ class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], } - self.last_mean_availability = 0.0 - self._blob_manager = None + self.last_mean_availability = Decimal(0.0) + self._blob_manager = blob_manager self._peer_finder = DummyPeerFinder() - self._dht_node = None + self._dht_node = dht_node self._check_popular = None self._check_mine = None self._get_mean_peers() diff --git a/lbrynet/core/Offer.py b/lbrynet/core/Offer.py index c236c9fa2..4528d0c2e 100644 --- a/lbrynet/core/Offer.py +++ b/lbrynet/core/Offer.py @@ -1,4 +1,4 @@ -from lbrynet.core.Error import NegotiationError +from decimal import Decimal class Offer(object): @@ -13,13 +13,11 @@ class Offer(object): def __init__(self, offer): self._state = None self.rate = None - if isinstance(offer, float): + if isinstance(offer, Decimal): self.rate = round(offer, 5) - elif offer == Offer.RATE_ACCEPTED: - self.accept() - elif offer == Offer.RATE_TOO_LOW: - self.reject() - else: + elif isinstance(offer, float): + self.rate = round(Decimal(offer), 5) + if self.rate is None or self.rate < Decimal(0.0): self.unset() @property @@ -45,12 +43,22 @@ class Offer(object): return None def accept(self): - if self._state is None or self.is_unset: + if self.is_unset or self._state is None: self._state = Offer.RATE_ACCEPTED def reject(self): - if self._state is None or self.is_unset: + if self.is_unset or self._state is None: self._state = Offer.RATE_TOO_LOW def unset(self): - self._state = Offer.RATE_UNSET \ No newline at end of file + self._state = Offer.RATE_UNSET + + def handle(self, reply_message): + if reply_message == Offer.RATE_TOO_LOW: + self.reject() + elif reply_message == Offer.RATE_ACCEPTED: + self.accept() + elif reply_message == Offer.RATE_UNSET: + self.unset() + else: + raise Exception("Unknown offer reply %s" % str(reply_message)) \ No newline at end of file diff --git a/lbrynet/core/PaymentRateManager.py b/lbrynet/core/PaymentRateManager.py index 319f1e0d0..a74ac0ebd 100644 --- a/lbrynet/core/PaymentRateManager.py +++ b/lbrynet/core/PaymentRateManager.py @@ -1,5 +1,6 @@ from lbrynet.core.Strategy import get_default_strategy + class BasePaymentRateManager(object): def __init__(self, rate): self.min_blob_data_payment_rate = rate @@ -32,7 +33,7 @@ class PaymentRateManager(object): class NegotiatedPaymentRateManager(object): - def __init__(self, base, availability_tracker): + def __init__(self, base, availability_tracker, generous=True): """ @param base: a BasePaymentRateManager @param availability_tracker: a BlobAvailabilityTracker @@ -43,15 +44,28 @@ class NegotiatedPaymentRateManager(object): self.min_blob_data_payment_rate = self.base.min_blob_data_payment_rate self.points_paid = 0.0 self.blob_tracker = availability_tracker - self.strategy = get_default_strategy(self.blob_tracker) + self.generous = generous + self.strategy = get_default_strategy(self.blob_tracker, base_price=self.min_blob_data_payment_rate, is_generous=generous) def get_rate_blob_data(self, peer, blobs): response = self.strategy.make_offer(peer, blobs) return response.rate def accept_rate_blob_data(self, peer, blobs, offer): - response = self.strategy.respond_to_offer(offer, peer, blobs) - return response.accepted + offer = self.strategy.respond_to_offer(offer, peer, blobs) + self.strategy.offer_accepted(peer, offer) + return offer.accepted + + def reply_to_offer(self, peer, blobs, offer): + reply = self.strategy.respond_to_offer(offer, peer, blobs) + self.strategy.offer_accepted(peer, reply) + return reply + + def get_rate_for_peer(self, peer): + return self.strategy.accepted_offers.get(peer, False) def record_points_paid(self, amount): self.points_paid += amount + + def record_offer_reply(self, peer, offer): + self.strategy.offer_accepted(peer, offer) \ No newline at end of file diff --git a/lbrynet/core/PriceModel.py b/lbrynet/core/PriceModel.py index f9468e85d..887440434 100644 --- a/lbrynet/core/PriceModel.py +++ b/lbrynet/core/PriceModel.py @@ -1,4 +1,5 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE +from decimal import Decimal def get_default_price_model(blob_tracker, **kwargs): @@ -19,14 +20,14 @@ class MeanAvailabilityWeightedPrice(object): def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): self.blob_tracker = tracker - self.base_price = base_price - self.alpha = alpha + self.base_price = Decimal(base_price) + self.alpha = Decimal(alpha) def calculate_price(self, blob): mean_availability = self.blob_tracker.last_mean_availability availability = self.blob_tracker.availability.get(blob, []) index = 0 # blob.index - price = self.base_price * (mean_availability / max(1, len(availability))) / self._frontload(index) + price = self.base_price * (mean_availability / Decimal(max(1, len(availability)))) / self._frontload(index) return round(price, 5) def _frontload(self, index): @@ -37,4 +38,4 @@ class MeanAvailabilityWeightedPrice(object): @return: frontload multipler """ - return 2.0 - (self.alpha ** index) + return Decimal(2.0) - (self.alpha ** index) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index c85b7c0dd..1c62276b4 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -29,7 +29,7 @@ class Session(object): def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, - rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker=None): + rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker_class=None): """ @param blob_data_payment_rate: The default payment rate for blob data @@ -88,7 +88,9 @@ class Session(object): self.blob_dir = blob_dir self.blob_manager = blob_manager - self.blob_tracker = blob_tracker + + self.blob_tracker = None + self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker self.peer_port = peer_port @@ -261,7 +263,7 @@ class Session(object): self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) if self.blob_tracker is None: - self.blob_tracker = BlobAvailabilityTracker(self.blob_manager, self.peer_finder, self.dht_node) + self.blob_tracker = self.blob_tracker_class(self.blob_manager, self.peer_finder, self.dht_node) if self.payment_rate_manager is None: self.payment_rate_manager = NegotiatedPaymentRateManager(self.base_payment_rate_manager, self.blob_tracker) diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py index 71d4ca0ea..128c0127c 100644 --- a/lbrynet/core/Strategy.py +++ b/lbrynet/core/Strategy.py @@ -1,7 +1,8 @@ import logging - +from decimal import Decimal +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.core.Offer import Offer -from lbrynet.core.PriceModel import get_default_price_model +from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice log = logging.getLogger(__name__) @@ -10,7 +11,90 @@ def get_default_strategy(blob_tracker, **kwargs): return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs) -class BasicAvailabilityWeightedStrategy(object): +class BaseStrategy(object): + def __init__(self, price_model, max_rate, min_rate, is_generous=True): + self.price_model = price_model + self.is_generous = is_generous + self.accepted_offers = {} + self.offers_sent = {} + self.offers_received = {} + self.max_rate = max_rate or Decimal(self.price_model.base_price * 100) + self.min_rate = Decimal(min_rate) + + def add_offer_sent(self, peer): + turn = self.offers_sent.get(peer, 0) + 1 + self.offers_sent.update({peer: turn}) + + def add_offer_received(self, peer): + turn = self.offers_received.get(peer, 0) + 1 + self.offers_received.update({peer: turn}) + + def calculate_price_target(self, *args): + return self.price_model.calculate_price(*args) + + def bounded_price(self, price): + price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate))) + return price_for_return + + def make_offer(self, peer, blobs): + offer_count = self.offers_sent.get(peer, 0) + self.add_offer_sent(peer) + if peer in self.accepted_offers: + # if there was a previous accepted offer, use that + offer = self.accepted_offers[peer] + elif offer_count == 0 and self.is_generous: + # Try asking for it for free + offer = Offer(Decimal(0.0)) + else: + rates = [self.calculate_price_target(blob) for blob in blobs] + price = self._make_offer(rates, offer_count) + bounded_price = self.bounded_price(price) + offer = Offer(bounded_price) + log.debug("Offering: %s", offer.rate) + return offer + + def offer_accepted(self, peer, offer): + if not offer.accepted and peer in self.accepted_offers: + del self.accepted_offers[peer] + log.debug("Throwing out old accepted offer") + if offer.accepted: + self.accepted_offers.update({peer: offer}) + log.debug("Updated accepted offer %f", offer.rate) + + def respond_to_offer(self, offer, peer, blobs): + offer_count = self.offers_received.get(peer, 0) + self.add_offer_received(peer) + rates = [self.calculate_price_target(blob) for blob in blobs] + price = self._respond_to_offer(rates, offer_count) + bounded_price = self.bounded_price(price) + log.debug("Price target: %f", price) + + if peer in self.accepted_offers: + offer = self.accepted_offers[peer] + log.debug("Already accepted %f", offer.rate) + elif offer.rate == 0.0 and offer_count == 0 and self.is_generous: + # give blobs away for free by default on the first request + offer.accept() + self.accepted_offers.update({peer: offer}) + elif offer.rate >= bounded_price: + log.debug("Accept: %f", offer.rate) + offer.accept() + self.accepted_offers.update({peer: offer}) + else: + log.debug("Reject: %f", offer.rate) + offer.reject() + if peer in self.accepted_offers: + del self.accepted_offers[peer] + return offer + + def _make_offer(self, rates, offer_count): + return NotImplementedError() + + def _respond_to_offer(self, rates, offer_count): + return NotImplementedError() + + +class BasicAvailabilityWeightedStrategy(BaseStrategy): """ Basic strategy to target blob prices based on supply relative to mean supply @@ -18,77 +102,29 @@ class BasicAvailabilityWeightedStrategy(object): until the rate is accepted or a threshold is reached """ - def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0, **kwargs): - self._acceleration = acceleration # rate of how quickly to ramp offer - self._deceleration = deceleration - self._min_rate = min_rate - self._max_rate = max_rate - self._count_up = {} - self._count_down = {} - self._requested = {} - self._offers_to_peers = {} - self.model = get_default_price_model(blob_tracker, **kwargs) + def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0, + is_generous=True, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): + price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha) + BaseStrategy.__init__(self, price_model, max_rate, min_rate, is_generous) + self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer + self._deceleration = Decimal(deceleration) - def respond_to_offer(self, offer, peer, blobs): - request_count = self._count_up.get(peer, 0) - rates = [self._calculate_price(blob) for blob in blobs] - rate = sum(rates) / max(len(rates), 1) - discounted = self._discount(rate, request_count) - price = self._bounded_price(discounted) - log.info("Price target: %f, final: %f", discounted, price) - - self._inc_up_count(peer) - if offer.rate == 0.0 and request_count == 0: - # give blobs away for free by default on the first request - offer.accept() - return offer - elif offer.rate >= price: - log.info("Accept: %f", offer.rate) - offer.accept() - return offer - else: - log.info("Reject: %f", offer.rate) - offer.reject() - return offer - - def make_offer(self, peer, blobs): - # use mean turn-discounted price for all the blobs requested - # if there was a previous offer replied to, use the same rate if it was accepted - last_offer = self._offers_to_peers.get(peer, False) - if last_offer: - if last_offer.rate is not None and last_offer.accepted: - return last_offer - - request_count = self._count_down.get(peer, 0) - self._inc_down_count(peer) - if request_count == 0: - # Try asking for it for free - self._offers_to_peers.update({peer: Offer(0.0)}) - else: - rates = [self._calculate_price(blob) for blob in blobs] - mean_rate = sum(rates) / max(len(blobs), 1) - with_premium = self._premium(mean_rate, request_count) - price = self._bounded_price(with_premium) - self._offers_to_peers.update({peer: Offer(price)}) - return self._offers_to_peers[peer] - - def _bounded_price(self, price): - price_for_return = min(self._max_rate, max(price, self._min_rate)) - return price_for_return - - def _inc_up_count(self, peer): - turn = self._count_up.get(peer, 0) + 1 - self._count_up.update({peer: turn}) - - def _inc_down_count(self, peer): - turn = self._count_down.get(peer, 0) + 1 - self._count_down.update({peer: turn}) - - def _calculate_price(self, blob): - return self.model.calculate_price(blob) + def _get_mean_rate(self, rates): + mean_rate = Decimal(sum(rates)) / Decimal(max(len(rates), 1)) + return mean_rate def _premium(self, rate, turn): - return rate * (self._acceleration ** turn) + return rate * (self._acceleration ** Decimal(turn)) def _discount(self, rate, turn): - return rate * (self._deceleration ** turn) \ No newline at end of file + return rate * (self._deceleration ** Decimal(turn)) + + def _respond_to_offer(self, rates, offer_count): + rate = self._get_mean_rate(rates) + discounted = self._discount(rate, offer_count) + return round(discounted, 5) + + def _make_offer(self, rates, offer_count): + rate = self._get_mean_rate(rates) + with_premium = self._premium(rate, offer_count) + return round(with_premium, 5) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 74c568b57..b24cd9ac7 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -1,5 +1,6 @@ import logging from collections import defaultdict +from decimal import Decimal from twisted.internet import defer from twisted.python.failure import Failure @@ -17,7 +18,12 @@ log = logging.getLogger(__name__) def get_points(num_bytes, rate): - return 1.0 * num_bytes * rate / 2**20 + if isinstance(rate, float): + return 1.0 * num_bytes * rate / 2**20 + elif isinstance(rate, Decimal): + return 1.0 * num_bytes * float(rate) / 2**20 + else: + raise Exception("Unknown rate type") def cache(fn): @@ -356,18 +362,20 @@ class PriceRequest(RequestHelper): if 'blob_data_payment_rate' not in response_dict: return InvalidResponseError("response identifier not in response") assert self.protocol in self.protocol_prices - response = Offer(response_dict['blob_data_payment_rate']) rate = self.protocol_prices[self.protocol] - if response.accepted: + offer = Offer(rate) + offer.handle(response_dict['blob_data_payment_rate']) + self.payment_rate_manager.record_offer_reply(self.peer.host, offer) + + if offer.accepted: log.info("Offered rate %f/mb accepted by %s", rate, str(self.peer.host)) return True - elif response.too_low: + elif offer.too_low: log.info("Offered rate %f/mb rejected by %s", rate, str(self.peer.host)) del self.protocol_prices[self.protocol] return True else: log.warning("Price disagreement") - log.warning(rate) del self.protocol_prices[self.protocol] self.requestor._price_disagreements.append(self.peer) return False diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 121f86099..1989f8c96 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -1,5 +1,6 @@ import json import logging +from decimal import Decimal from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ClientFactory from twisted.python import failure @@ -14,6 +15,12 @@ from zope.interface import implements log = logging.getLogger(__name__) +def encode_decimal(obj): + if isinstance(obj, Decimal): + return float(obj) + raise TypeError(repr(obj) + " is not JSON serializable") + + class ClientProtocol(Protocol): implements(IRequestSender, IRateLimited) @@ -132,7 +139,7 @@ class ClientProtocol(Protocol): def _send_request_message(self, request_msg): # TODO: compare this message to the last one. If they're the same, # TODO: incrementally delay this message. - m = json.dumps(request_msg) + m = json.dumps(request_msg, default=encode_decimal) self.transport.write(m) def _get_valid_response(self, response_msg): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 138a0695a..0b46eea49 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -61,53 +61,20 @@ class BlobRequestHandler(object): def handle_queries(self, queries): response = defer.succeed({}) + log.debug("Handle query: %s", str(queries)) if self.AVAILABILITY_QUERY in queries: self._blobs_requested = queries[self.AVAILABILITY_QUERY] response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested)) - if self.PAYMENT_RATE_QUERY in queries: - offer = Offer(queries[self.PAYMENT_RATE_QUERY]) + offered_rate = queries[self.PAYMENT_RATE_QUERY] + offer = Offer(offered_rate) response.addCallback(lambda r: self._handle_payment_rate_query(offer, r)) - if self.BLOB_QUERY in queries: - if self.PAYMENT_RATE_QUERY in queries: - incoming = queries[self.BLOB_QUERY] - response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) - else: - response.addCallback(lambda _: {'incoming_blob': {'error': 'RATE_UNSET'}}) - + incoming = queries[self.BLOB_QUERY] + response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) return response - def _handle_payment_rate_query(self, offer, request): - blobs = request.get("available_blobs", []) - log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs)) - accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer) - if offer.accepted: - self.blob_data_payment_rate = offer.rate - request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" - elif offer.too_low: - request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" - offer.unset() - elif offer.is_unset: - request['incoming_blob'] = {'error': 'RATE_UNSET'} - return request - - def _handle_blob_query(self, response, query): - log.debug("Received the client's request to send a blob") - response['incoming_blob'] = {} - - if self.blob_data_payment_rate is None: - response['incoming_blob'] = {'error': "RATE_UNSET"} - return response - else: - return self._send_blob(response, query) - - def _send_blob(self, response, query): - d = self.blob_manager.get_blob(query, True) - d.addCallback(self.open_blob_for_reading, response) - return d - def open_blob_for_reading(self, blob, response): def failure(msg): log.debug("We can not send %s: %s", blob, msg) @@ -153,6 +120,39 @@ class BlobRequestHandler(object): d.addCallback(set_available) return d + def _handle_payment_rate_query(self, offer, request): + blobs = self._blobs_requested + log.info("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs)) + reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer) + if reply.accepted: + self.blob_data_payment_rate = offer.rate + request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" + log.info("Accepted rate: %f", offer.rate) + elif reply.too_low: + request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" + log.info("Reject rate: %f", offer.rate) + elif reply.is_unset: + log.warning("Rate unset") + request['incoming_blob'] = {'error': 'RATE_UNSET'} + log.debug("Returning rate query result: %s", str(request)) + + return request + + def _handle_blob_query(self, response, query): + log.debug("Received the client's request to send a blob") + response['incoming_blob'] = {} + + if self.blob_data_payment_rate is None: + response['incoming_blob'] = {'error': "RATE_UNSET"} + return response + else: + return self._send_blob(response, query) + + def _send_blob(self, response, query): + d = self.blob_manager.get_blob(query, True) + d.addCallback(self.open_blob_for_reading, response) + return d + def open_blob_for_reading(self, blob, response): response_fields = {} d = defer.succeed(None) @@ -161,7 +161,7 @@ class BlobRequestHandler(object): if read_handle is not None: self.currently_uploading = blob self.read_handle = read_handle - log.debug("Sending %s to client", str(blob)) + log.info("Sending %s to client", str(blob)) response_fields['blob_hash'] = blob.blob_hash response_fields['length'] = blob.length response['incoming_blob'] = response_fields @@ -180,7 +180,6 @@ class BlobRequestHandler(object): def _reply_to_send_request(self, response, incoming): response_fields = {} response['incoming_blob'] = response_fields - rate = self.blob_data_payment_rate if self.blob_data_payment_rate is None: log.debug("Rate not set yet") diff --git a/lbrynet/lbrylive/client/LiveStreamDownloader.py b/lbrynet/lbrylive/client/LiveStreamDownloader.py index 1ce9a413e..8b6befb54 100644 --- a/lbrynet/lbrylive/client/LiveStreamDownloader.py +++ b/lbrynet/lbrylive/client/LiveStreamDownloader.py @@ -144,7 +144,7 @@ class FullLiveStreamDownloaderFactory(object): def make_downloader(self, metadata, options, payment_rate_manager): # TODO: check options for payment rate manager parameters - payment_rate_manager = LiveStreamPaymentRateManager(self.default_payment_rate_manager, + prm = LiveStreamPaymentRateManager(self.default_payment_rate_manager, payment_rate_manager) def save_source_if_blob(stream_hash): @@ -161,7 +161,7 @@ class FullLiveStreamDownloaderFactory(object): def create_downloader(stream_hash): stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.stream_info_manager, - payment_rate_manager, self.wallet, True) + prm, self.wallet, True) # TODO: change upload_allowed=True above to something better d = stream_downloader.set_stream_info() d.addCallback(lambda _: stream_downloader) diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 2dcba52ca..2b78fa4c8 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -234,14 +234,14 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat hash_announcer = FakeAnnouncer() rate_limiter = RateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "server" os.mkdir(db_dir) session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node) stream_info_manager = TempEncryptedFileMetadataManager() @@ -349,7 +349,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra hash_announcer = FakeAnnouncer() rate_limiter = RateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() db_dir = "server_" + str(n) blob_dir = os.path.join(db_dir, "blobfiles") @@ -359,7 +358,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n), peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, peer_port=peer_port, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) stream_info_manager = TempEncryptedFileMetadataManager() @@ -463,14 +462,14 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "server" os.mkdir(db_dir) session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) base_payment_rate_manager = BaseLiveStreamPaymentRateManager(MIN_BLOB_INFO_PAYMENT_RATE) data_payment_rate_manager = session.payment_rate_manager @@ -600,7 +599,6 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): peer_finder = FakePeerFinder(5553, peer_manager, 1) hash_announcer = FakeAnnouncer() rate_limiter = RateLimiter() - blob_tracker = DummyBlobAvailabilityTracker() if slow is True: peer_port = 5553 @@ -615,7 +613,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) if slow is True: session.rate_limiter.set_ul_limit(2**11) @@ -779,7 +777,7 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") @@ -789,7 +787,7 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -868,7 +866,7 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "client" os.mkdir(db_dir) @@ -876,7 +874,7 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker=blob_tracker, dht_node_class=Node) + blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node) self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer) @@ -973,7 +971,7 @@ class TestTransfer(TestCase): peer_finder = FakePeerFinder(5553, peer_manager, 2) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") @@ -983,7 +981,7 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) @@ -1052,7 +1050,7 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + downloaders = [] @@ -1064,7 +1062,7 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, - rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) @@ -1171,7 +1169,7 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") @@ -1181,7 +1179,7 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -1290,7 +1288,7 @@ class TestStreamify(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") @@ -1300,7 +1298,7 @@ class TestStreamify(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -1343,7 +1341,7 @@ class TestStreamify(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - blob_tracker = DummyBlobAvailabilityTracker() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") @@ -1353,7 +1351,7 @@ class TestStreamify(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 8c26fb364..eaaf150fd 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -61,7 +61,6 @@ class TestReflector(unittest.TestCase): hash_announcer = mocks.Announcer() rate_limiter = RateLimiter.DummyRateLimiter() sd_identifier = StreamDescriptor.StreamDescriptorIdentifier() - blob_tracker = BlobAvailability.DummyBlobAvailabilityTracker() self.expected_blobs = [ ( @@ -95,7 +94,7 @@ class TestReflector(unittest.TestCase): use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker=blob_tracker, + blob_tracker_class=BlobAvailability.DummyBlobAvailabilityTracker, dht_node_class=Node ) diff --git a/tests/unit/core/test_Strategy.py b/tests/unit/core/test_Strategy.py index a443c36e9..ac98199c5 100644 --- a/tests/unit/core/test_Strategy.py +++ b/tests/unit/core/test_Strategy.py @@ -1,8 +1,15 @@ from twisted.trial import unittest import random +import mock +from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy +from lbrynet.core.Offer import Offer from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker + +MAX_NEGOTIATION_TURNS = 10 + + def get_random_sample(list_to_sample): result = list_to_sample[random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))] if not result: @@ -11,25 +18,33 @@ def get_random_sample(list_to_sample): class AvailabilityWeightedStrategyTests(unittest.TestCase): - def test_first_offer_is_zero_and_second_isnt(self): + def test_first_offer_is_zero_and_second_is_not_if_offer_not_accepted(self): strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) peer = "1.1.1.1" - blobs = strategy.model.blob_tracker.availability.keys() + + blobs = strategy.price_model.blob_tracker.availability.keys() offer1 = strategy.make_offer(peer, blobs) + offer2 = strategy.make_offer(peer, blobs) + self.assertEquals(offer1.rate, 0.0) self.assertNotEqual(offer2.rate, 0.0) - def test_accept_zero_and_persist(self): + def test_accept_zero_and_persist_if_accepted(self): host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) + client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) + client = "1.1.1.1" host = "1.1.1.2" - blobs = host_strategy.model.blob_tracker.availability.keys() - client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) + blobs = host_strategy.price_model.blob_tracker.availability.keys() + offer = client_strategy.make_offer(host, blobs) response1 = host_strategy.respond_to_offer(offer, client, blobs) + client_strategy.offer_accepted(host, response1) + offer = client_strategy.make_offer(host, blobs) response2 = host_strategy.respond_to_offer(offer, client, blobs) + client_strategy.offer_accepted(host, response2) self.assertEquals(response1.too_low, False) self.assertEquals(response1.accepted, True) @@ -39,7 +54,7 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase): self.assertEquals(response2.accepted, True) self.assertEquals(response2.rate, 0.0) - def test_turns_before_accept_with_similar_rate_settings(self): + def test_how_many_turns_before_accept_with_similar_rate_settings(self): blobs = [ 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', @@ -52,19 +67,143 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase): '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6', 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' ] - for x in range(10): - client_base = 0.001 * x - for y in range(10): - host_base = 0.001 * y - client_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=client_base) - host_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=host_base) - for z in range(100): - blobs_to_query = get_random_sample(blobs) - accepted = False - turns = 0 - while not accepted: - offer = client_strat.make_offer("2.3.4.5", blobs_to_query) - response = host_strat.respond_to_offer(offer, "3.4.5.6", blobs_to_query) - accepted = response.accepted - turns += 1 - self.assertGreater(5, turns) + + host = mock.Mock() + host.host = "1.2.3.4" + client = mock.Mock() + client.host = "1.2.3.5" + + for x in range(1, 10): + for y in range(1, 10): + host_base = 0.0001 * y + client_base = 0.0001 * x + client_base_prm = BasePaymentRateManager(client_base) + client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker(), generous=False) + host_base_prm = BasePaymentRateManager(host_base) + host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker(), generous=False) + + blobs_to_query = get_random_sample(blobs) + accepted = False + turns = 0 + while not accepted: + rate = client_prm.get_rate_blob_data(host, blobs_to_query) + offer = Offer(rate) + accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer) + turns += 1 + self.assertGreater(MAX_NEGOTIATION_TURNS, turns) + + def test_generous_connects_in_one_turn(self): + blobs = [ + 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', + 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', + '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd', + 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0', + '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c', + '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7', + '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07', + '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd', + '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6', + 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' + ] + + host = mock.Mock() + host.host = "1.2.3.4" + client = mock.Mock() + client.host = "1.2.3.5" + + for x in range(1, 10): + for y in range(1, 10): + host_base = 0.0001 * y + client_base = 0.0001 * x + client_base_prm = BasePaymentRateManager(client_base) + client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker()) + host_base_prm = BasePaymentRateManager(host_base) + host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker()) + + blobs_to_query = get_random_sample(blobs) + accepted = False + turns = 0 + while not accepted: + rate = client_prm.get_rate_blob_data(host, blobs_to_query) + offer = Offer(rate) + accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer) + turns += 1 + self.assertEqual(1, turns) + + + def test_how_many_turns_with_generous_client(self): + blobs = [ + 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', + 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', + '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd', + 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0', + '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c', + '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7', + '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07', + '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd', + '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6', + 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' + ] + + host = mock.Mock() + host.host = "1.2.3.4" + client = mock.Mock() + client.host = "1.2.3.5" + + for x in range(1, 10): + for y in range(1, 10): + host_base = 0.0001 * y + client_base = 0.0001 * x + client_base_prm = BasePaymentRateManager(client_base) + client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker()) + host_base_prm = BasePaymentRateManager(host_base) + host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker(), generous=False) + + blobs_to_query = get_random_sample(blobs) + accepted = False + turns = 0 + while not accepted: + rate = client_prm.get_rate_blob_data(host, blobs_to_query) + offer = Offer(rate) + accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer) + turns += 1 + self.assertGreater(MAX_NEGOTIATION_TURNS, turns) + + + def test_how_many_turns_with_generous_host(self): + blobs = [ + 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', + 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', + '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd', + 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0', + '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c', + '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7', + '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07', + '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd', + '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6', + 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' + ] + + host = mock.Mock() + host.host = "1.2.3.4" + client = mock.Mock() + client.host = "1.2.3.5" + + for x in range(1, 10): + for y in range(1, 10): + host_base = 0.0001 * y + client_base = 0.0001 * x + client_base_prm = BasePaymentRateManager(client_base) + client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker(), generous=False) + host_base_prm = BasePaymentRateManager(host_base) + host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker()) + + blobs_to_query = get_random_sample(blobs) + accepted = False + turns = 0 + while not accepted: + rate = client_prm.get_rate_blob_data(host, blobs_to_query) + offer = Offer(rate) + accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer) + turns += 1 + self.assertGreater(MAX_NEGOTIATION_TURNS, turns)