fix functional tests, add a few unit tests,

-add ‘generous’ parameter for NegotiatedPaymentRateManager to turn free
hosting on/off, by default set to true.
This commit is contained in:
Jack 2016-10-05 22:58:34 -04:00
parent dc2f0adb3e
commit 4f60a98eb2
14 changed files with 406 additions and 194 deletions

View file

@ -14,7 +14,7 @@ MAX_BLOB_INFOS_TO_REQUEST = 20
BLOBFILES_DIR = ".blobfiles" BLOBFILES_DIR = ".blobfiles"
BLOB_SIZE = 2**21 BLOB_SIZE = 2**21
MIN_BLOB_DATA_PAYMENT_RATE = .001 # points/megabyte MIN_BLOB_DATA_PAYMENT_RATE = .0001 # points/megabyte
MIN_BLOB_INFO_PAYMENT_RATE = .02 # points/1000 infos MIN_BLOB_INFO_PAYMENT_RATE = .02 # points/1000 infos
MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE = .05 # points/1000 infos MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE = .05 # points/1000 infos
MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE = .05 # points/1000 infos MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE = .05 # points/1000 infos

View file

@ -3,6 +3,7 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from lbrynet.core.PeerFinder import DummyPeerFinder from lbrynet.core.PeerFinder import DummyPeerFinder
from decimal import Decimal
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -17,7 +18,7 @@ class BlobAvailabilityTracker(object):
def __init__(self, blob_manager, peer_finder, dht_node): def __init__(self, blob_manager, peer_finder, dht_node):
self.availability = {} self.availability = {}
self.last_mean_availability = 0.0 self.last_mean_availability = Decimal(0.0)
self._blob_manager = blob_manager self._blob_manager = blob_manager
self._peer_finder = peer_finder self._peer_finder = peer_finder
self._dht_node = dht_node self._dht_node = dht_node
@ -85,7 +86,7 @@ class BlobAvailabilityTracker(object):
def _get_mean_peers(self): def _get_mean_peers(self):
num_peers = [len(self.availability[blob]) for blob in self.availability] num_peers = [len(self.availability[blob]) for blob in self.availability]
mean = float(sum(num_peers)) / float(max(1, len(num_peers))) mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers)))
self.last_mean_availability = mean self.last_mean_availability = mean
@ -97,7 +98,7 @@ class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
availability (dict): dictionary of peers for known blobs availability (dict): dictionary of peers for known blobs
""" """
def __init__(self): def __init__(self, blob_manager=None, peer_finder=None, dht_node=None):
self.availability = { self.availability = {
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'], '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'],
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'], 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'],
@ -110,10 +111,10 @@ class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
} }
self.last_mean_availability = 0.0 self.last_mean_availability = Decimal(0.0)
self._blob_manager = None self._blob_manager = blob_manager
self._peer_finder = DummyPeerFinder() self._peer_finder = DummyPeerFinder()
self._dht_node = None self._dht_node = dht_node
self._check_popular = None self._check_popular = None
self._check_mine = None self._check_mine = None
self._get_mean_peers() self._get_mean_peers()

View file

@ -1,4 +1,4 @@
from lbrynet.core.Error import NegotiationError from decimal import Decimal
class Offer(object): class Offer(object):
@ -13,13 +13,11 @@ class Offer(object):
def __init__(self, offer): def __init__(self, offer):
self._state = None self._state = None
self.rate = None self.rate = None
if isinstance(offer, float): if isinstance(offer, Decimal):
self.rate = round(offer, 5) self.rate = round(offer, 5)
elif offer == Offer.RATE_ACCEPTED: elif isinstance(offer, float):
self.accept() self.rate = round(Decimal(offer), 5)
elif offer == Offer.RATE_TOO_LOW: if self.rate is None or self.rate < Decimal(0.0):
self.reject()
else:
self.unset() self.unset()
@property @property
@ -45,12 +43,22 @@ class Offer(object):
return None return None
def accept(self): def accept(self):
if self._state is None or self.is_unset: if self.is_unset or self._state is None:
self._state = Offer.RATE_ACCEPTED self._state = Offer.RATE_ACCEPTED
def reject(self): def reject(self):
if self._state is None or self.is_unset: if self.is_unset or self._state is None:
self._state = Offer.RATE_TOO_LOW self._state = Offer.RATE_TOO_LOW
def unset(self): def unset(self):
self._state = Offer.RATE_UNSET self._state = Offer.RATE_UNSET
def handle(self, reply_message):
if reply_message == Offer.RATE_TOO_LOW:
self.reject()
elif reply_message == Offer.RATE_ACCEPTED:
self.accept()
elif reply_message == Offer.RATE_UNSET:
self.unset()
else:
raise Exception("Unknown offer reply %s" % str(reply_message))

View file

@ -1,5 +1,6 @@
from lbrynet.core.Strategy import get_default_strategy from lbrynet.core.Strategy import get_default_strategy
class BasePaymentRateManager(object): class BasePaymentRateManager(object):
def __init__(self, rate): def __init__(self, rate):
self.min_blob_data_payment_rate = rate self.min_blob_data_payment_rate = rate
@ -32,7 +33,7 @@ class PaymentRateManager(object):
class NegotiatedPaymentRateManager(object): class NegotiatedPaymentRateManager(object):
def __init__(self, base, availability_tracker): def __init__(self, base, availability_tracker, generous=True):
""" """
@param base: a BasePaymentRateManager @param base: a BasePaymentRateManager
@param availability_tracker: a BlobAvailabilityTracker @param availability_tracker: a BlobAvailabilityTracker
@ -43,15 +44,28 @@ class NegotiatedPaymentRateManager(object):
self.min_blob_data_payment_rate = self.base.min_blob_data_payment_rate self.min_blob_data_payment_rate = self.base.min_blob_data_payment_rate
self.points_paid = 0.0 self.points_paid = 0.0
self.blob_tracker = availability_tracker self.blob_tracker = availability_tracker
self.strategy = get_default_strategy(self.blob_tracker) self.generous = generous
self.strategy = get_default_strategy(self.blob_tracker, base_price=self.min_blob_data_payment_rate, is_generous=generous)
def get_rate_blob_data(self, peer, blobs): def get_rate_blob_data(self, peer, blobs):
response = self.strategy.make_offer(peer, blobs) response = self.strategy.make_offer(peer, blobs)
return response.rate return response.rate
def accept_rate_blob_data(self, peer, blobs, offer): def accept_rate_blob_data(self, peer, blobs, offer):
response = self.strategy.respond_to_offer(offer, peer, blobs) offer = self.strategy.respond_to_offer(offer, peer, blobs)
return response.accepted self.strategy.offer_accepted(peer, offer)
return offer.accepted
def reply_to_offer(self, peer, blobs, offer):
reply = self.strategy.respond_to_offer(offer, peer, blobs)
self.strategy.offer_accepted(peer, reply)
return reply
def get_rate_for_peer(self, peer):
return self.strategy.accepted_offers.get(peer, False)
def record_points_paid(self, amount): def record_points_paid(self, amount):
self.points_paid += amount self.points_paid += amount
def record_offer_reply(self, peer, offer):
self.strategy.offer_accepted(peer, offer)

View file

@ -1,4 +1,5 @@
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from decimal import Decimal
def get_default_price_model(blob_tracker, **kwargs): def get_default_price_model(blob_tracker, **kwargs):
@ -19,14 +20,14 @@ class MeanAvailabilityWeightedPrice(object):
def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0):
self.blob_tracker = tracker self.blob_tracker = tracker
self.base_price = base_price self.base_price = Decimal(base_price)
self.alpha = alpha self.alpha = Decimal(alpha)
def calculate_price(self, blob): def calculate_price(self, blob):
mean_availability = self.blob_tracker.last_mean_availability mean_availability = self.blob_tracker.last_mean_availability
availability = self.blob_tracker.availability.get(blob, []) availability = self.blob_tracker.availability.get(blob, [])
index = 0 # blob.index index = 0 # blob.index
price = self.base_price * (mean_availability / max(1, len(availability))) / self._frontload(index) price = self.base_price * (mean_availability / Decimal(max(1, len(availability)))) / self._frontload(index)
return round(price, 5) return round(price, 5)
def _frontload(self, index): def _frontload(self, index):
@ -37,4 +38,4 @@ class MeanAvailabilityWeightedPrice(object):
@return: frontload multipler @return: frontload multipler
""" """
return 2.0 - (self.alpha ** index) return Decimal(2.0) - (self.alpha ** index)

View file

@ -29,7 +29,7 @@ class Session(object):
def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None,
blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True,
rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker=None): rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker_class=None):
""" """
@param blob_data_payment_rate: The default payment rate for blob data @param blob_data_payment_rate: The default payment rate for blob data
@ -88,7 +88,9 @@ class Session(object):
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.blob_tracker = blob_tracker
self.blob_tracker = None
self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker
self.peer_port = peer_port self.peer_port = peer_port
@ -261,7 +263,7 @@ class Session(object):
self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir)
if self.blob_tracker is None: if self.blob_tracker is None:
self.blob_tracker = BlobAvailabilityTracker(self.blob_manager, self.peer_finder, self.dht_node) self.blob_tracker = self.blob_tracker_class(self.blob_manager, self.peer_finder, self.dht_node)
if self.payment_rate_manager is None: if self.payment_rate_manager is None:
self.payment_rate_manager = NegotiatedPaymentRateManager(self.base_payment_rate_manager, self.blob_tracker) self.payment_rate_manager = NegotiatedPaymentRateManager(self.base_payment_rate_manager, self.blob_tracker)

View file

@ -1,7 +1,8 @@
import logging import logging
from decimal import Decimal
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from lbrynet.core.PriceModel import get_default_price_model from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -10,7 +11,90 @@ def get_default_strategy(blob_tracker, **kwargs):
return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs) return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs)
class BasicAvailabilityWeightedStrategy(object): class BaseStrategy(object):
def __init__(self, price_model, max_rate, min_rate, is_generous=True):
self.price_model = price_model
self.is_generous = is_generous
self.accepted_offers = {}
self.offers_sent = {}
self.offers_received = {}
self.max_rate = max_rate or Decimal(self.price_model.base_price * 100)
self.min_rate = Decimal(min_rate)
def add_offer_sent(self, peer):
turn = self.offers_sent.get(peer, 0) + 1
self.offers_sent.update({peer: turn})
def add_offer_received(self, peer):
turn = self.offers_received.get(peer, 0) + 1
self.offers_received.update({peer: turn})
def calculate_price_target(self, *args):
return self.price_model.calculate_price(*args)
def bounded_price(self, price):
price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate)))
return price_for_return
def make_offer(self, peer, blobs):
offer_count = self.offers_sent.get(peer, 0)
self.add_offer_sent(peer)
if peer in self.accepted_offers:
# if there was a previous accepted offer, use that
offer = self.accepted_offers[peer]
elif offer_count == 0 and self.is_generous:
# Try asking for it for free
offer = Offer(Decimal(0.0))
else:
rates = [self.calculate_price_target(blob) for blob in blobs]
price = self._make_offer(rates, offer_count)
bounded_price = self.bounded_price(price)
offer = Offer(bounded_price)
log.debug("Offering: %s", offer.rate)
return offer
def offer_accepted(self, peer, offer):
if not offer.accepted and peer in self.accepted_offers:
del self.accepted_offers[peer]
log.debug("Throwing out old accepted offer")
if offer.accepted:
self.accepted_offers.update({peer: offer})
log.debug("Updated accepted offer %f", offer.rate)
def respond_to_offer(self, offer, peer, blobs):
offer_count = self.offers_received.get(peer, 0)
self.add_offer_received(peer)
rates = [self.calculate_price_target(blob) for blob in blobs]
price = self._respond_to_offer(rates, offer_count)
bounded_price = self.bounded_price(price)
log.debug("Price target: %f", price)
if peer in self.accepted_offers:
offer = self.accepted_offers[peer]
log.debug("Already accepted %f", offer.rate)
elif offer.rate == 0.0 and offer_count == 0 and self.is_generous:
# give blobs away for free by default on the first request
offer.accept()
self.accepted_offers.update({peer: offer})
elif offer.rate >= bounded_price:
log.debug("Accept: %f", offer.rate)
offer.accept()
self.accepted_offers.update({peer: offer})
else:
log.debug("Reject: %f", offer.rate)
offer.reject()
if peer in self.accepted_offers:
del self.accepted_offers[peer]
return offer
def _make_offer(self, rates, offer_count):
return NotImplementedError()
def _respond_to_offer(self, rates, offer_count):
return NotImplementedError()
class BasicAvailabilityWeightedStrategy(BaseStrategy):
""" """
Basic strategy to target blob prices based on supply relative to mean supply Basic strategy to target blob prices based on supply relative to mean supply
@ -18,77 +102,29 @@ class BasicAvailabilityWeightedStrategy(object):
until the rate is accepted or a threshold is reached until the rate is accepted or a threshold is reached
""" """
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0, **kwargs): def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0,
self._acceleration = acceleration # rate of how quickly to ramp offer is_generous=True, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0):
self._deceleration = deceleration price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha)
self._min_rate = min_rate BaseStrategy.__init__(self, price_model, max_rate, min_rate, is_generous)
self._max_rate = max_rate self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer
self._count_up = {} self._deceleration = Decimal(deceleration)
self._count_down = {}
self._requested = {}
self._offers_to_peers = {}
self.model = get_default_price_model(blob_tracker, **kwargs)
def respond_to_offer(self, offer, peer, blobs): def _get_mean_rate(self, rates):
request_count = self._count_up.get(peer, 0) mean_rate = Decimal(sum(rates)) / Decimal(max(len(rates), 1))
rates = [self._calculate_price(blob) for blob in blobs] return mean_rate
rate = sum(rates) / max(len(rates), 1)
discounted = self._discount(rate, request_count)
price = self._bounded_price(discounted)
log.info("Price target: %f, final: %f", discounted, price)
self._inc_up_count(peer)
if offer.rate == 0.0 and request_count == 0:
# give blobs away for free by default on the first request
offer.accept()
return offer
elif offer.rate >= price:
log.info("Accept: %f", offer.rate)
offer.accept()
return offer
else:
log.info("Reject: %f", offer.rate)
offer.reject()
return offer
def make_offer(self, peer, blobs):
# use mean turn-discounted price for all the blobs requested
# if there was a previous offer replied to, use the same rate if it was accepted
last_offer = self._offers_to_peers.get(peer, False)
if last_offer:
if last_offer.rate is not None and last_offer.accepted:
return last_offer
request_count = self._count_down.get(peer, 0)
self._inc_down_count(peer)
if request_count == 0:
# Try asking for it for free
self._offers_to_peers.update({peer: Offer(0.0)})
else:
rates = [self._calculate_price(blob) for blob in blobs]
mean_rate = sum(rates) / max(len(blobs), 1)
with_premium = self._premium(mean_rate, request_count)
price = self._bounded_price(with_premium)
self._offers_to_peers.update({peer: Offer(price)})
return self._offers_to_peers[peer]
def _bounded_price(self, price):
price_for_return = min(self._max_rate, max(price, self._min_rate))
return price_for_return
def _inc_up_count(self, peer):
turn = self._count_up.get(peer, 0) + 1
self._count_up.update({peer: turn})
def _inc_down_count(self, peer):
turn = self._count_down.get(peer, 0) + 1
self._count_down.update({peer: turn})
def _calculate_price(self, blob):
return self.model.calculate_price(blob)
def _premium(self, rate, turn): def _premium(self, rate, turn):
return rate * (self._acceleration ** turn) return rate * (self._acceleration ** Decimal(turn))
def _discount(self, rate, turn): def _discount(self, rate, turn):
return rate * (self._deceleration ** turn) return rate * (self._deceleration ** Decimal(turn))
def _respond_to_offer(self, rates, offer_count):
rate = self._get_mean_rate(rates)
discounted = self._discount(rate, offer_count)
return round(discounted, 5)
def _make_offer(self, rates, offer_count):
rate = self._get_mean_rate(rates)
with_premium = self._premium(rate, offer_count)
return round(with_premium, 5)

View file

@ -1,5 +1,6 @@
import logging import logging
from collections import defaultdict from collections import defaultdict
from decimal import Decimal
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -17,7 +18,12 @@ log = logging.getLogger(__name__)
def get_points(num_bytes, rate): def get_points(num_bytes, rate):
return 1.0 * num_bytes * rate / 2**20 if isinstance(rate, float):
return 1.0 * num_bytes * rate / 2**20
elif isinstance(rate, Decimal):
return 1.0 * num_bytes * float(rate) / 2**20
else:
raise Exception("Unknown rate type")
def cache(fn): def cache(fn):
@ -356,18 +362,20 @@ class PriceRequest(RequestHelper):
if 'blob_data_payment_rate' not in response_dict: if 'blob_data_payment_rate' not in response_dict:
return InvalidResponseError("response identifier not in response") return InvalidResponseError("response identifier not in response")
assert self.protocol in self.protocol_prices assert self.protocol in self.protocol_prices
response = Offer(response_dict['blob_data_payment_rate'])
rate = self.protocol_prices[self.protocol] rate = self.protocol_prices[self.protocol]
if response.accepted: offer = Offer(rate)
offer.handle(response_dict['blob_data_payment_rate'])
self.payment_rate_manager.record_offer_reply(self.peer.host, offer)
if offer.accepted:
log.info("Offered rate %f/mb accepted by %s", rate, str(self.peer.host)) log.info("Offered rate %f/mb accepted by %s", rate, str(self.peer.host))
return True return True
elif response.too_low: elif offer.too_low:
log.info("Offered rate %f/mb rejected by %s", rate, str(self.peer.host)) log.info("Offered rate %f/mb rejected by %s", rate, str(self.peer.host))
del self.protocol_prices[self.protocol] del self.protocol_prices[self.protocol]
return True return True
else: else:
log.warning("Price disagreement") log.warning("Price disagreement")
log.warning(rate)
del self.protocol_prices[self.protocol] del self.protocol_prices[self.protocol]
self.requestor._price_disagreements.append(self.peer) self.requestor._price_disagreements.append(self.peer)
return False return False

View file

@ -1,5 +1,6 @@
import json import json
import logging import logging
from decimal import Decimal
from twisted.internet import error, defer from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet.protocol import Protocol, ClientFactory
from twisted.python import failure from twisted.python import failure
@ -14,6 +15,12 @@ from zope.interface import implements
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def encode_decimal(obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError(repr(obj) + " is not JSON serializable")
class ClientProtocol(Protocol): class ClientProtocol(Protocol):
implements(IRequestSender, IRateLimited) implements(IRequestSender, IRateLimited)
@ -132,7 +139,7 @@ class ClientProtocol(Protocol):
def _send_request_message(self, request_msg): def _send_request_message(self, request_msg):
# TODO: compare this message to the last one. If they're the same, # TODO: compare this message to the last one. If they're the same,
# TODO: incrementally delay this message. # TODO: incrementally delay this message.
m = json.dumps(request_msg) m = json.dumps(request_msg, default=encode_decimal)
self.transport.write(m) self.transport.write(m)
def _get_valid_response(self, response_msg): def _get_valid_response(self, response_msg):

View file

@ -61,53 +61,20 @@ class BlobRequestHandler(object):
def handle_queries(self, queries): def handle_queries(self, queries):
response = defer.succeed({}) response = defer.succeed({})
log.debug("Handle query: %s", str(queries))
if self.AVAILABILITY_QUERY in queries: if self.AVAILABILITY_QUERY in queries:
self._blobs_requested = queries[self.AVAILABILITY_QUERY] self._blobs_requested = queries[self.AVAILABILITY_QUERY]
response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested)) response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested))
if self.PAYMENT_RATE_QUERY in queries: if self.PAYMENT_RATE_QUERY in queries:
offer = Offer(queries[self.PAYMENT_RATE_QUERY]) offered_rate = queries[self.PAYMENT_RATE_QUERY]
offer = Offer(offered_rate)
response.addCallback(lambda r: self._handle_payment_rate_query(offer, r)) response.addCallback(lambda r: self._handle_payment_rate_query(offer, r))
if self.BLOB_QUERY in queries: if self.BLOB_QUERY in queries:
if self.PAYMENT_RATE_QUERY in queries: incoming = queries[self.BLOB_QUERY]
incoming = queries[self.BLOB_QUERY] response.addCallback(lambda r: self._reply_to_send_request(r, incoming))
response.addCallback(lambda r: self._reply_to_send_request(r, incoming))
else:
response.addCallback(lambda _: {'incoming_blob': {'error': 'RATE_UNSET'}})
return response return response
def _handle_payment_rate_query(self, offer, request):
blobs = request.get("available_blobs", [])
log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs))
accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer)
if offer.accepted:
self.blob_data_payment_rate = offer.rate
request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED"
elif offer.too_low:
request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW"
offer.unset()
elif offer.is_unset:
request['incoming_blob'] = {'error': 'RATE_UNSET'}
return request
def _handle_blob_query(self, response, query):
log.debug("Received the client's request to send a blob")
response['incoming_blob'] = {}
if self.blob_data_payment_rate is None:
response['incoming_blob'] = {'error': "RATE_UNSET"}
return response
else:
return self._send_blob(response, query)
def _send_blob(self, response, query):
d = self.blob_manager.get_blob(query, True)
d.addCallback(self.open_blob_for_reading, response)
return d
def open_blob_for_reading(self, blob, response): def open_blob_for_reading(self, blob, response):
def failure(msg): def failure(msg):
log.debug("We can not send %s: %s", blob, msg) log.debug("We can not send %s: %s", blob, msg)
@ -153,6 +120,39 @@ class BlobRequestHandler(object):
d.addCallback(set_available) d.addCallback(set_available)
return d return d
def _handle_payment_rate_query(self, offer, request):
blobs = self._blobs_requested
log.info("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs))
reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer)
if reply.accepted:
self.blob_data_payment_rate = offer.rate
request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED"
log.info("Accepted rate: %f", offer.rate)
elif reply.too_low:
request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW"
log.info("Reject rate: %f", offer.rate)
elif reply.is_unset:
log.warning("Rate unset")
request['incoming_blob'] = {'error': 'RATE_UNSET'}
log.debug("Returning rate query result: %s", str(request))
return request
def _handle_blob_query(self, response, query):
log.debug("Received the client's request to send a blob")
response['incoming_blob'] = {}
if self.blob_data_payment_rate is None:
response['incoming_blob'] = {'error': "RATE_UNSET"}
return response
else:
return self._send_blob(response, query)
def _send_blob(self, response, query):
d = self.blob_manager.get_blob(query, True)
d.addCallback(self.open_blob_for_reading, response)
return d
def open_blob_for_reading(self, blob, response): def open_blob_for_reading(self, blob, response):
response_fields = {} response_fields = {}
d = defer.succeed(None) d = defer.succeed(None)
@ -161,7 +161,7 @@ class BlobRequestHandler(object):
if read_handle is not None: if read_handle is not None:
self.currently_uploading = blob self.currently_uploading = blob
self.read_handle = read_handle self.read_handle = read_handle
log.debug("Sending %s to client", str(blob)) log.info("Sending %s to client", str(blob))
response_fields['blob_hash'] = blob.blob_hash response_fields['blob_hash'] = blob.blob_hash
response_fields['length'] = blob.length response_fields['length'] = blob.length
response['incoming_blob'] = response_fields response['incoming_blob'] = response_fields
@ -180,7 +180,6 @@ class BlobRequestHandler(object):
def _reply_to_send_request(self, response, incoming): def _reply_to_send_request(self, response, incoming):
response_fields = {} response_fields = {}
response['incoming_blob'] = response_fields response['incoming_blob'] = response_fields
rate = self.blob_data_payment_rate
if self.blob_data_payment_rate is None: if self.blob_data_payment_rate is None:
log.debug("Rate not set yet") log.debug("Rate not set yet")

View file

@ -144,7 +144,7 @@ class FullLiveStreamDownloaderFactory(object):
def make_downloader(self, metadata, options, payment_rate_manager): def make_downloader(self, metadata, options, payment_rate_manager):
# TODO: check options for payment rate manager parameters # TODO: check options for payment rate manager parameters
payment_rate_manager = LiveStreamPaymentRateManager(self.default_payment_rate_manager, prm = LiveStreamPaymentRateManager(self.default_payment_rate_manager,
payment_rate_manager) payment_rate_manager)
def save_source_if_blob(stream_hash): def save_source_if_blob(stream_hash):
@ -161,7 +161,7 @@ class FullLiveStreamDownloaderFactory(object):
def create_downloader(stream_hash): def create_downloader(stream_hash):
stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter, stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter,
self.blob_manager, self.stream_info_manager, self.blob_manager, self.stream_info_manager,
payment_rate_manager, self.wallet, True) prm, self.wallet, True)
# TODO: change upload_allowed=True above to something better # TODO: change upload_allowed=True above to something better
d = stream_downloader.set_stream_info() d = stream_downloader.set_stream_info()
d.addCallback(lambda _: stream_downloader) d.addCallback(lambda _: stream_downloader)

View file

@ -234,14 +234,14 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = RateLimiter() rate_limiter = RateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "server" db_dir = "server"
os.mkdir(db_dir) os.mkdir(db_dir)
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node) dht_node_class=Node)
stream_info_manager = TempEncryptedFileMetadataManager() stream_info_manager = TempEncryptedFileMetadataManager()
@ -349,7 +349,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = RateLimiter() rate_limiter = RateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "server_" + str(n) db_dir = "server_" + str(n)
blob_dir = os.path.join(db_dir, "blobfiles") blob_dir = os.path.join(db_dir, "blobfiles")
@ -359,7 +358,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n), session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n),
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=peer_port, blob_dir=None, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
stream_info_manager = TempEncryptedFileMetadataManager() stream_info_manager = TempEncryptedFileMetadataManager()
@ -463,14 +462,14 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "server" db_dir = "server"
os.mkdir(db_dir) os.mkdir(db_dir)
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
base_payment_rate_manager = BaseLiveStreamPaymentRateManager(MIN_BLOB_INFO_PAYMENT_RATE) base_payment_rate_manager = BaseLiveStreamPaymentRateManager(MIN_BLOB_INFO_PAYMENT_RATE)
data_payment_rate_manager = session.payment_rate_manager data_payment_rate_manager = session.payment_rate_manager
@ -600,7 +599,6 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
peer_finder = FakePeerFinder(5553, peer_manager, 1) peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = RateLimiter() rate_limiter = RateLimiter()
blob_tracker = DummyBlobAvailabilityTracker()
if slow is True: if slow is True:
peer_port = 5553 peer_port = 5553
@ -615,7 +613,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh", session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port, blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
if slow is True: if slow is True:
session.rate_limiter.set_ul_limit(2**11) session.rate_limiter.set_ul_limit(2**11)
@ -779,7 +777,7 @@ class TestTransfer(TestCase):
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "client" db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles") blob_dir = os.path.join(db_dir, "blobfiles")
@ -789,7 +787,7 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node) dht_node_class=Node)
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -868,7 +866,7 @@ class TestTransfer(TestCase):
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "client" db_dir = "client"
os.mkdir(db_dir) os.mkdir(db_dir)
@ -876,7 +874,7 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker=blob_tracker, dht_node_class=Node) blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node)
self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer) self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer)
@ -973,7 +971,7 @@ class TestTransfer(TestCase):
peer_finder = FakePeerFinder(5553, peer_manager, 2) peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "client" db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles") blob_dir = os.path.join(db_dir, "blobfiles")
@ -983,7 +981,7 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
@ -1052,7 +1050,7 @@ class TestTransfer(TestCase):
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
downloaders = [] downloaders = []
@ -1064,7 +1062,7 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, use_upnp=False, blob_dir=blob_dir, peer_port=5553, use_upnp=False,
rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
@ -1171,7 +1169,7 @@ class TestTransfer(TestCase):
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "client" db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles") blob_dir = os.path.join(db_dir, "blobfiles")
@ -1181,7 +1179,7 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=5553, blob_dir=None, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -1290,7 +1288,7 @@ class TestStreamify(TestCase):
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "client" db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles") blob_dir = os.path.join(db_dir, "blobfiles")
@ -1300,7 +1298,7 @@ class TestStreamify(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = TempEncryptedFileMetadataManager() self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -1343,7 +1341,7 @@ class TestStreamify(TestCase):
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
blob_tracker = DummyBlobAvailabilityTracker()
db_dir = "client" db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles") blob_dir = os.path.join(db_dir, "blobfiles")
@ -1353,7 +1351,7 @@ class TestStreamify(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker=blob_tracker) use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)

View file

@ -61,7 +61,6 @@ class TestReflector(unittest.TestCase):
hash_announcer = mocks.Announcer() hash_announcer = mocks.Announcer()
rate_limiter = RateLimiter.DummyRateLimiter() rate_limiter = RateLimiter.DummyRateLimiter()
sd_identifier = StreamDescriptor.StreamDescriptorIdentifier() sd_identifier = StreamDescriptor.StreamDescriptorIdentifier()
blob_tracker = BlobAvailability.DummyBlobAvailabilityTracker()
self.expected_blobs = [ self.expected_blobs = [
( (
@ -95,7 +94,7 @@ class TestReflector(unittest.TestCase):
use_upnp=False, use_upnp=False,
rate_limiter=rate_limiter, rate_limiter=rate_limiter,
wallet=wallet, wallet=wallet,
blob_tracker=blob_tracker, blob_tracker_class=BlobAvailability.DummyBlobAvailabilityTracker,
dht_node_class=Node dht_node_class=Node
) )

View file

@ -1,8 +1,15 @@
from twisted.trial import unittest from twisted.trial import unittest
import random import random
import mock
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy
from lbrynet.core.Offer import Offer
from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker
MAX_NEGOTIATION_TURNS = 10
def get_random_sample(list_to_sample): def get_random_sample(list_to_sample):
result = list_to_sample[random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))] result = list_to_sample[random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))]
if not result: if not result:
@ -11,25 +18,33 @@ def get_random_sample(list_to_sample):
class AvailabilityWeightedStrategyTests(unittest.TestCase): class AvailabilityWeightedStrategyTests(unittest.TestCase):
def test_first_offer_is_zero_and_second_isnt(self): def test_first_offer_is_zero_and_second_is_not_if_offer_not_accepted(self):
strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
peer = "1.1.1.1" peer = "1.1.1.1"
blobs = strategy.model.blob_tracker.availability.keys()
blobs = strategy.price_model.blob_tracker.availability.keys()
offer1 = strategy.make_offer(peer, blobs) offer1 = strategy.make_offer(peer, blobs)
offer2 = strategy.make_offer(peer, blobs) offer2 = strategy.make_offer(peer, blobs)
self.assertEquals(offer1.rate, 0.0) self.assertEquals(offer1.rate, 0.0)
self.assertNotEqual(offer2.rate, 0.0) self.assertNotEqual(offer2.rate, 0.0)
def test_accept_zero_and_persist(self): def test_accept_zero_and_persist_if_accepted(self):
host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
client = "1.1.1.1" client = "1.1.1.1"
host = "1.1.1.2" host = "1.1.1.2"
blobs = host_strategy.model.blob_tracker.availability.keys() blobs = host_strategy.price_model.blob_tracker.availability.keys()
client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
offer = client_strategy.make_offer(host, blobs) offer = client_strategy.make_offer(host, blobs)
response1 = host_strategy.respond_to_offer(offer, client, blobs) response1 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.offer_accepted(host, response1)
offer = client_strategy.make_offer(host, blobs) offer = client_strategy.make_offer(host, blobs)
response2 = host_strategy.respond_to_offer(offer, client, blobs) response2 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.offer_accepted(host, response2)
self.assertEquals(response1.too_low, False) self.assertEquals(response1.too_low, False)
self.assertEquals(response1.accepted, True) self.assertEquals(response1.accepted, True)
@ -39,7 +54,7 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase):
self.assertEquals(response2.accepted, True) self.assertEquals(response2.accepted, True)
self.assertEquals(response2.rate, 0.0) self.assertEquals(response2.rate, 0.0)
def test_turns_before_accept_with_similar_rate_settings(self): def test_how_many_turns_before_accept_with_similar_rate_settings(self):
blobs = [ blobs = [
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
@ -52,19 +67,143 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase):
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6', '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6',
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
] ]
for x in range(10):
client_base = 0.001 * x host = mock.Mock()
for y in range(10): host.host = "1.2.3.4"
host_base = 0.001 * y client = mock.Mock()
client_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=client_base) client.host = "1.2.3.5"
host_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=host_base)
for z in range(100): for x in range(1, 10):
blobs_to_query = get_random_sample(blobs) for y in range(1, 10):
accepted = False host_base = 0.0001 * y
turns = 0 client_base = 0.0001 * x
while not accepted: client_base_prm = BasePaymentRateManager(client_base)
offer = client_strat.make_offer("2.3.4.5", blobs_to_query) client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker(), generous=False)
response = host_strat.respond_to_offer(offer, "3.4.5.6", blobs_to_query) host_base_prm = BasePaymentRateManager(host_base)
accepted = response.accepted host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker(), generous=False)
turns += 1
self.assertGreater(5, turns) blobs_to_query = get_random_sample(blobs)
accepted = False
turns = 0
while not accepted:
rate = client_prm.get_rate_blob_data(host, blobs_to_query)
offer = Offer(rate)
accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer)
turns += 1
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
def test_generous_connects_in_one_turn(self):
blobs = [
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd',
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0',
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c',
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7',
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07',
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd',
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6',
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
]
host = mock.Mock()
host.host = "1.2.3.4"
client = mock.Mock()
client.host = "1.2.3.5"
for x in range(1, 10):
for y in range(1, 10):
host_base = 0.0001 * y
client_base = 0.0001 * x
client_base_prm = BasePaymentRateManager(client_base)
client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker())
host_base_prm = BasePaymentRateManager(host_base)
host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker())
blobs_to_query = get_random_sample(blobs)
accepted = False
turns = 0
while not accepted:
rate = client_prm.get_rate_blob_data(host, blobs_to_query)
offer = Offer(rate)
accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer)
turns += 1
self.assertEqual(1, turns)
def test_how_many_turns_with_generous_client(self):
blobs = [
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd',
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0',
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c',
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7',
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07',
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd',
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6',
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
]
host = mock.Mock()
host.host = "1.2.3.4"
client = mock.Mock()
client.host = "1.2.3.5"
for x in range(1, 10):
for y in range(1, 10):
host_base = 0.0001 * y
client_base = 0.0001 * x
client_base_prm = BasePaymentRateManager(client_base)
client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker())
host_base_prm = BasePaymentRateManager(host_base)
host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker(), generous=False)
blobs_to_query = get_random_sample(blobs)
accepted = False
turns = 0
while not accepted:
rate = client_prm.get_rate_blob_data(host, blobs_to_query)
offer = Offer(rate)
accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer)
turns += 1
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
def test_how_many_turns_with_generous_host(self):
blobs = [
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd',
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0',
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c',
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7',
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07',
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd',
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6',
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
]
host = mock.Mock()
host.host = "1.2.3.4"
client = mock.Mock()
client.host = "1.2.3.5"
for x in range(1, 10):
for y in range(1, 10):
host_base = 0.0001 * y
client_base = 0.0001 * x
client_base_prm = BasePaymentRateManager(client_base)
client_prm = NegotiatedPaymentRateManager(client_base_prm, DummyBlobAvailabilityTracker(), generous=False)
host_base_prm = BasePaymentRateManager(host_base)
host_prm = NegotiatedPaymentRateManager(host_base_prm, DummyBlobAvailabilityTracker())
blobs_to_query = get_random_sample(blobs)
accepted = False
turns = 0
while not accepted:
rate = client_prm.get_rate_blob_data(host, blobs_to_query)
offer = Offer(rate)
accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer)
turns += 1
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)