diff --git a/lbrynet/core/BlobAvailability.py b/lbrynet/core/BlobAvailability.py index e7c8c2b44..f003b2770 100644 --- a/lbrynet/core/BlobAvailability.py +++ b/lbrynet/core/BlobAvailability.py @@ -2,7 +2,6 @@ import logging from twisted.internet import defer from twisted.internet.task import LoopingCall -from lbrynet.core.PeerFinder import DummyPeerFinder from decimal import Decimal log = logging.getLogger(__name__) @@ -48,7 +47,7 @@ class BlobAvailabilityTracker(object): def get_availability_for_blobs(self, blobs): dl = [self.get_blob_availability(blob) for blob in blobs if blob] d = defer.DeferredList(dl) - d.addCallback(lambda results: [r[1] for r in results]) + d.addCallback(lambda results: [val for success, val in results if success]) return d def _update_peers_for_blob(self, blob): @@ -62,15 +61,15 @@ class BlobAvailabilityTracker(object): d.addCallback(lambda peers: _save_peer_info(blob, peers)) return d - def _update_most_popular(self): - def _get_most_popular(): - dl = [] - for (hash, _) in self._dht_node.get_most_popular_hashes(100): - encoded = hash.encode('hex') - dl.append(self._update_peers_for_blob(encoded)) - return defer.DeferredList(dl) + def _get_most_popular(self): + dl = [] + for (hash, _) in self._dht_node.get_most_popular_hashes(100): + encoded = hash.encode('hex') + dl.append(self._update_peers_for_blob(encoded)) + return defer.DeferredList(dl) - d = _get_most_popular() + def _update_most_popular(self): + d = self._get_most_popular() d.addCallback(lambda _: self._get_mean_peers()) def _update_mine(self): @@ -88,39 +87,3 @@ class BlobAvailabilityTracker(object): num_peers = [len(self.availability[blob]) for blob in self.availability] mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers))) self.last_mean_availability = mean - - -class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): - """ - Class to track peer counts for known blobs, and to discover new popular blobs - - Attributes: - availability (dict): dictionary of peers for known blobs - """ - - def __init__(self, blob_manager=None, peer_finder=None, dht_node=None): - self.availability = { - '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'], - 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'], - '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4'], - '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], - } - self.last_mean_availability = Decimal(0.0) - self._blob_manager = blob_manager - self._peer_finder = DummyPeerFinder() - self._dht_node = dht_node - self._check_popular = None - self._check_mine = None - self._get_mean_peers() - - def start(self): - pass - - def stop(self): - pass diff --git a/lbrynet/core/Offer.py b/lbrynet/core/Offer.py index 4528d0c2e..48b4b56fa 100644 --- a/lbrynet/core/Offer.py +++ b/lbrynet/core/Offer.py @@ -2,9 +2,7 @@ from decimal import Decimal class Offer(object): - """ - A rate offer to download blobs from a host - """ + """A rate offer to download blobs from a host.""" RATE_ACCEPTED = "RATE_ACCEPTED" RATE_TOO_LOW = "RATE_TOO_LOW" @@ -21,11 +19,11 @@ class Offer(object): self.unset() @property - def accepted(self): + def is_accepted(self): return self._state is Offer.RATE_ACCEPTED @property - def too_low(self): + def is_too_low(self): return self._state is Offer.RATE_TOO_LOW @property @@ -34,9 +32,9 @@ class Offer(object): @property def message(self): - if self.accepted: + if self.is_accepted: return Offer.RATE_ACCEPTED - elif self.too_low: + elif self.is_too_low: return Offer.RATE_TOO_LOW elif self.is_unset: return Offer.RATE_UNSET diff --git a/lbrynet/core/PaymentRateManager.py b/lbrynet/core/PaymentRateManager.py index a74ac0ebd..914e21947 100644 --- a/lbrynet/core/PaymentRateManager.py +++ b/lbrynet/core/PaymentRateManager.py @@ -1,9 +1,10 @@ from lbrynet.core.Strategy import get_default_strategy - +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE class BasePaymentRateManager(object): - def __init__(self, rate): + def __init__(self, rate=MIN_BLOB_DATA_PAYMENT_RATE, info_rate=MIN_BLOB_INFO_PAYMENT_RATE): self.min_blob_data_payment_rate = rate + self.min_blob_info_payment_rate = info_rate class PaymentRateManager(object): @@ -41,11 +42,12 @@ class NegotiatedPaymentRateManager(object): """ self.base = base - self.min_blob_data_payment_rate = self.base.min_blob_data_payment_rate self.points_paid = 0.0 self.blob_tracker = availability_tracker self.generous = generous - self.strategy = get_default_strategy(self.blob_tracker, base_price=self.min_blob_data_payment_rate, is_generous=generous) + self.strategy = get_default_strategy(self.blob_tracker, + base_price=self.base.min_blob_data_payment_rate, + is_generous=generous) def get_rate_blob_data(self, peer, blobs): response = self.strategy.make_offer(peer, blobs) @@ -53,12 +55,12 @@ class NegotiatedPaymentRateManager(object): def accept_rate_blob_data(self, peer, blobs, offer): offer = self.strategy.respond_to_offer(offer, peer, blobs) - self.strategy.offer_accepted(peer, offer) - return offer.accepted + self.strategy.update_accepted_offers(peer, offer) + return offer.is_accepted def reply_to_offer(self, peer, blobs, offer): reply = self.strategy.respond_to_offer(offer, peer, blobs) - self.strategy.offer_accepted(peer, reply) + self.strategy.update_accepted_offers(peer, reply) return reply def get_rate_for_peer(self, peer): @@ -68,4 +70,4 @@ class NegotiatedPaymentRateManager(object): self.points_paid += amount def record_offer_reply(self, peer, offer): - self.strategy.offer_accepted(peer, offer) \ No newline at end of file + self.strategy.update_accepted_offers(peer, offer) \ No newline at end of file diff --git a/lbrynet/core/Peer.py b/lbrynet/core/Peer.py index 60f5cfdd4..3705f6e68 100644 --- a/lbrynet/core/Peer.py +++ b/lbrynet/core/Peer.py @@ -1,5 +1,6 @@ -from collections import defaultdict import datetime +from collections import defaultdict +from lbrynet.core import utils class Peer(object): @@ -12,7 +13,7 @@ class Peer(object): self.stats = defaultdict(float) # {string stat_type, float count} def is_available(self): - if self.attempt_connection_at is None or datetime.datetime.today() > self.attempt_connection_at: + if self.attempt_connection_at is None or utils.today() > self.attempt_connection_at: return True return False @@ -23,7 +24,7 @@ class Peer(object): def report_down(self): self.down_count += 1 timeout_time = datetime.timedelta(seconds=60 * self.down_count) - self.attempt_connection_at = datetime.datetime.today() + timeout_time + self.attempt_connection_at = utils.today() + timeout_time def update_score(self, score_change): self.score += score_change diff --git a/lbrynet/core/PriceModel.py b/lbrynet/core/PriceModel.py index 887440434..299d50223 100644 --- a/lbrynet/core/PriceModel.py +++ b/lbrynet/core/PriceModel.py @@ -1,9 +1,12 @@ -from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE +from zope.interface import implementer from decimal import Decimal +from lbrynet.interfaces import IBlobPriceModel +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE -def get_default_price_model(blob_tracker, **kwargs): - return MeanAvailabilityWeightedPrice(blob_tracker, **kwargs) + +def get_default_price_model(blob_tracker, base_price, **kwargs): + return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs) class MeanAvailabilityWeightedPrice(object): @@ -11,12 +14,12 @@ class MeanAvailabilityWeightedPrice(object): Calculate mean-blob-availability and stream-position weighted price for a blob Attributes: - min_price (float): minimum accepted price - base_price (float): base price to shift from - alpha (float): constant used to more highly value blobs at the beginning of a stream - alpha defaults to 1.0, which has a null effect + base_price (float): base price + alpha (float): constant, > 0.0 and <= 1.0, used to more highly value blobs at the beginning of a stream. + alpha defaults to 1.0, which has a null effect blob_tracker (BlobAvailabilityTracker): blob availability tracker """ + implementer(IBlobPriceModel) def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): self.blob_tracker = tracker @@ -32,10 +35,13 @@ class MeanAvailabilityWeightedPrice(object): def _frontload(self, index): """ - Get frontload multipler + Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream. + + At index 0, returns 1.0 + As index increases, return value approaches 2.0 @param index: blob position in stream - @return: frontload multipler + @return: front-load multiplier """ return Decimal(2.0) - (self.alpha ** index) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 1c62276b4..1b0fb700b 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -27,9 +27,9 @@ class Session(object): the rate limiter, which attempts to ensure download and upload rates stay below a set maximum, and upnp, which opens holes in compatible firewalls so that remote peers can connect to this peer.""" def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, - known_dht_nodes=None, peer_finder=None, hash_announcer=None, - blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, - rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker_class=None): + known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, + peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, + blob_tracker_class=None, payment_rate_manager_class=None): """ @param blob_data_payment_rate: The default payment rate for blob data @@ -108,6 +108,7 @@ class Session(object): self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.payment_rate_manager = None + self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager def setup(self): """Create the blob directory and database if necessary, start all desired services""" @@ -260,12 +261,17 @@ class Session(object): if self.blob_dir is None: self.blob_manager = TempBlobManager(self.hash_announcer) else: - self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) + self.blob_manager = DiskBlobManager(self.hash_announcer, + self.blob_dir, + self.db_dir) if self.blob_tracker is None: - self.blob_tracker = self.blob_tracker_class(self.blob_manager, self.peer_finder, self.dht_node) + self.blob_tracker = self.blob_tracker_class(self.blob_manager, + self.peer_finder, + self.dht_node) if self.payment_rate_manager is None: - self.payment_rate_manager = NegotiatedPaymentRateManager(self.base_payment_rate_manager, self.blob_tracker) + self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager, + self.blob_tracker) self.rate_limiter.start() d1 = self.blob_manager.setup() diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py index 128c0127c..80fff00cb 100644 --- a/lbrynet/core/Strategy.py +++ b/lbrynet/core/Strategy.py @@ -1,17 +1,20 @@ -import logging +from zope.interface import implementer from decimal import Decimal -from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE +from lbrynet.interfaces import INegotiationStrategy from lbrynet.core.Offer import Offer from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice -log = logging.getLogger(__name__) - def get_default_strategy(blob_tracker, **kwargs): return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs) -class BaseStrategy(object): +class Strategy(object): + """ + Base for negotiation strategies + """ + implementer(INegotiationStrategy) + def __init__(self, price_model, max_rate, min_rate, is_generous=True): self.price_model = price_model self.is_generous = is_generous @@ -21,24 +24,15 @@ class BaseStrategy(object): self.max_rate = max_rate or Decimal(self.price_model.base_price * 100) self.min_rate = Decimal(min_rate) - def add_offer_sent(self, peer): - turn = self.offers_sent.get(peer, 0) + 1 - self.offers_sent.update({peer: turn}) + def _make_rate_offer(self, rates, offer_count): + return NotImplementedError() - def add_offer_received(self, peer): - turn = self.offers_received.get(peer, 0) + 1 - self.offers_received.update({peer: turn}) - - def calculate_price_target(self, *args): - return self.price_model.calculate_price(*args) - - def bounded_price(self, price): - price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate))) - return price_for_return + def _get_response_rate(self, rates, offer_count): + return NotImplementedError() def make_offer(self, peer, blobs): offer_count = self.offers_sent.get(peer, 0) - self.add_offer_sent(peer) + self._add_offer_sent(peer) if peer in self.accepted_offers: # if there was a previous accepted offer, use that offer = self.accepted_offers[peer] @@ -46,67 +40,65 @@ class BaseStrategy(object): # Try asking for it for free offer = Offer(Decimal(0.0)) else: - rates = [self.calculate_price_target(blob) for blob in blobs] - price = self._make_offer(rates, offer_count) - bounded_price = self.bounded_price(price) - offer = Offer(bounded_price) - log.debug("Offering: %s", offer.rate) + rates = [self.price_model.calculate_price(blob) for blob in blobs] + price = self._make_rate_offer(rates, offer_count) + offer = Offer(price) return offer - def offer_accepted(self, peer, offer): - if not offer.accepted and peer in self.accepted_offers: - del self.accepted_offers[peer] - log.debug("Throwing out old accepted offer") - if offer.accepted: - self.accepted_offers.update({peer: offer}) - log.debug("Updated accepted offer %f", offer.rate) - def respond_to_offer(self, offer, peer, blobs): offer_count = self.offers_received.get(peer, 0) - self.add_offer_received(peer) - rates = [self.calculate_price_target(blob) for blob in blobs] - price = self._respond_to_offer(rates, offer_count) - bounded_price = self.bounded_price(price) - log.debug("Price target: %f", price) + self._add_offer_received(peer) + rates = [self.price_model.calculate_price(blob) for blob in blobs] + price = self._get_response_rate(rates, offer_count) if peer in self.accepted_offers: offer = self.accepted_offers[peer] - log.debug("Already accepted %f", offer.rate) elif offer.rate == 0.0 and offer_count == 0 and self.is_generous: # give blobs away for free by default on the first request offer.accept() self.accepted_offers.update({peer: offer}) - elif offer.rate >= bounded_price: - log.debug("Accept: %f", offer.rate) + elif offer.rate >= price: offer.accept() self.accepted_offers.update({peer: offer}) else: - log.debug("Reject: %f", offer.rate) offer.reject() if peer in self.accepted_offers: del self.accepted_offers[peer] return offer - def _make_offer(self, rates, offer_count): - return NotImplementedError() + def update_accepted_offers(self, peer, offer): + if not offer.is_accepted and peer in self.accepted_offers: + del self.accepted_offers[peer] + if offer.is_accepted: + self.accepted_offers.update({peer: offer}) - def _respond_to_offer(self, rates, offer_count): - return NotImplementedError() + def _add_offer_sent(self, peer): + turn = self.offers_sent.get(peer, 0) + 1 + self.offers_sent.update({peer: turn}) + + def _add_offer_received(self, peer): + turn = self.offers_received.get(peer, 0) + 1 + self.offers_received.update({peer: turn}) + + def _bounded_price(self, price): + price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate))) + return price_for_return -class BasicAvailabilityWeightedStrategy(BaseStrategy): +class BasicAvailabilityWeightedStrategy(Strategy): """ Basic strategy to target blob prices based on supply relative to mean supply Discount price target with each incoming request, and raise it with each outgoing from the modeled price until the rate is accepted or a threshold is reached """ + implementer(INegotiationStrategy) def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0, - is_generous=True, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): + is_generous=True, base_price=0.0001, alpha=1.0): price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha) - BaseStrategy.__init__(self, price_model, max_rate, min_rate, is_generous) - self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer + Strategy.__init__(self, price_model, max_rate, min_rate, is_generous) + self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer self._deceleration = Decimal(deceleration) def _get_mean_rate(self, rates): @@ -119,12 +111,14 @@ class BasicAvailabilityWeightedStrategy(BaseStrategy): def _discount(self, rate, turn): return rate * (self._deceleration ** Decimal(turn)) - def _respond_to_offer(self, rates, offer_count): + def _get_response_rate(self, rates, offer_count): rate = self._get_mean_rate(rates) discounted = self._discount(rate, offer_count) - return round(discounted, 5) + rounded_price = round(discounted, 5) + return self._bounded_price(rounded_price) - def _make_offer(self, rates, offer_count): + def _make_rate_offer(self, rates, offer_count): rate = self._get_mean_rate(rates) with_premium = self._premium(rate, offer_count) - return round(with_premium, 5) + rounded_price = round(with_premium, 5) + return self._bounded_price(rounded_price) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index b24cd9ac7..8c59bc04f 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -29,6 +29,7 @@ def get_points(num_bytes, rate): def cache(fn): """Caches the function call for each instance""" attr = '__{}_value'.format(fn.__name__) + def helper(self): if not hasattr(self, attr): value = fn(self) @@ -345,9 +346,9 @@ class PriceRequest(RequestHelper): def _get_price_request(self): rate = self.get_and_save_rate() if rate is None: - log.debug("No blobs to request from %s", str(self.peer)) + log.debug("No blobs to request from %s", self.peer) raise Exception('Cannot make a price request without a payment rate') - log.debug("Offer rate %s to %s for %i blobs", str(rate), str(self.peer), len(self.available_blobs)) + log.debug("Offer rate %s to %s for %i blobs", rate, self.peer, len(self.available_blobs)) request_dict = {'blob_data_payment_rate': rate} return ClientRequest(request_dict, 'blob_data_payment_rate') @@ -367,11 +368,11 @@ class PriceRequest(RequestHelper): offer.handle(response_dict['blob_data_payment_rate']) self.payment_rate_manager.record_offer_reply(self.peer.host, offer) - if offer.accepted: - log.info("Offered rate %f/mb accepted by %s", rate, str(self.peer.host)) + if offer.is_accepted: + log.debug("Offered rate %f/mb accepted by %s", rate, str(self.peer.host)) return True - elif offer.too_low: - log.info("Offered rate %f/mb rejected by %s", rate, str(self.peer.host)) + elif offer.is_too_low: + log.debug("Offered rate %f/mb rejected by %s", rate, str(self.peer.host)) del self.protocol_prices[self.protocol] return True else: diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 53203cad9..94e329da4 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -104,15 +104,15 @@ class BlobRequestHandler(object): def _handle_payment_rate_query(self, offer, request): blobs = self._blobs_requested - log.info("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs)) + log.debug("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs)) reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer) - if reply.accepted: + if reply.is_accepted: self.blob_data_payment_rate = offer.rate request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" - log.info("Accepted rate: %f", offer.rate) - elif reply.too_low: + log.debug("Accepted rate: %f", offer.rate) + elif reply.is_too_low: request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" - log.info("Reject rate: %f", offer.rate) + log.debug("Reject rate: %f", offer.rate) elif reply.is_unset: log.warning("Rate unset") request['incoming_blob'] = {'error': 'RATE_UNSET'} diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index d71e01e3a..f9f114233 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -4,6 +4,7 @@ import random import os import json import yaml +import datetime from lbrynet.core.cryptoutils import get_lbry_hash_obj @@ -76,4 +77,8 @@ def save_settings(path, settings): assert encoder is not False, "Unknown settings format .%s" % ext f = open(path, 'w') f.write(encoder(settings)) - f.close() \ No newline at end of file + f.close() + + +def today(): + return datetime.datetime.today() \ No newline at end of file diff --git a/lbrynet/interfaces.py b/lbrynet/interfaces.py index 5597ae8a5..10fbc8c66 100644 --- a/lbrynet/interfaces.py +++ b/lbrynet/interfaces.py @@ -647,4 +647,60 @@ class IWallet(Interface): @type amount: float @return: None + """ + + +class IBlobPriceModel(Interface): + """ + A blob price model + + Used by INegotiationStrategy classes + """ + + def calculate_price(self, blob): + """ + Calculate the price for a blob + + @param blob: a blob hash + @type blob: str + + @return: blob price target + @type: Decimal + """ + + +class INegotiationStrategy(Interface): + """ + Strategy to negotiate download payment rates + """ + + def make_offer(self, peer, blobs): + """ + Make a rate offer for the given peer and blobs + + @param peer: peer to make an offer to + @type: str + + @param blobs: blob hashes to make an offer for + @type: list + + @return: rate offer + @rtype: Offer + """ + + def respond_to_offer(self, offer, peer, blobs): + """ + Respond to a rate offer given by a peer + + @param offer: offer to reply to + @type: Offer + + @param peer: peer to make an offer to + @type: str + + @param blobs: blob hashes to make an offer for + @type: list + + @return: accepted, rejected, or unset offer + @rtype: Offer """ \ No newline at end of file diff --git a/lbrynet/lbryfile/client/EncryptedFileOptions.py b/lbrynet/lbryfile/client/EncryptedFileOptions.py index fc3423708..067e3437d 100644 --- a/lbrynet/lbryfile/client/EncryptedFileOptions.py +++ b/lbrynet/lbryfile/client/EncryptedFileOptions.py @@ -14,16 +14,16 @@ class EncryptedFileOptions(object): prm = payment_rate_manager def get_default_data_rate_description(): - if prm.min_blob_data_payment_rate is None: + if prm.base.min_blob_data_payment_rate is None: return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate) else: - return "%f LBC/MB" % prm.min_blob_data_payment_rate + return "%f LBC/MB" % prm.base.min_blob_data_payment_rate rate_choices = [] - rate_choices.append(DownloadOptionChoice(prm.min_blob_data_payment_rate, + rate_choices.append(DownloadOptionChoice(prm.base.min_blob_data_payment_rate, "No change - %s" % get_default_data_rate_description(), "No change - %s" % get_default_data_rate_description())) - if prm.min_blob_data_payment_rate is not None: + if prm.base.min_blob_data_payment_rate is not None: rate_choices.append(DownloadOptionChoice(None, "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate), "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate))) @@ -36,7 +36,7 @@ class EncryptedFileOptions(object): rate_choices, "Rate which will be paid for data", "data payment rate", - prm.min_blob_data_payment_rate, + prm.base.min_blob_data_payment_rate, get_default_data_rate_description() ), DownloadOption( diff --git a/lbrynet/lbrynet_console/Console.py b/lbrynet/lbrynet_console/Console.py index cada41cb7..3f700222e 100644 --- a/lbrynet/lbrynet_console/Console.py +++ b/lbrynet/lbrynet_console/Console.py @@ -20,7 +20,6 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, API_CONNECTION_STRING # , from lbrynet.core.utils import generate_id from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.PaymentRateManager import PaymentRateManager -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.PTCWallet import PTCWallet @@ -363,19 +362,14 @@ class Console(): def _setup_query_handlers(self): handlers = [ - #CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, - # self._server_payment_rate_manager), - BlobAvailabilityHandlerFactory(self.session.blob_manager), - #BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, - # self._server_payment_rate_manager), self.session.wallet.get_wallet_info_query_handler_factory(), ] def get_blob_request_handler_factory(rate): - self.blob_request_payment_rate_manager = PaymentRateManager( - self.session.base_payment_rate_manager, rate - ) - handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, + self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager, + rate) + handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, + self.session.wallet, self.blob_request_payment_rate_manager)) d1 = self.settings.get_server_data_payment_rate() diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index faed462d8..47e165eec 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -773,9 +773,6 @@ class Daemon(jsonrpc.JSONRPC): def _setup_query_handlers(self): handlers = [ - # CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, - # self._server_payment_rate_manager), - # BlobAvailabilityHandlerFactory(self.session.blob_manager), BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, self.session.payment_rate_manager), self.session.wallet.get_wallet_info_query_handler_factory(), diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 5fb9f65de..26f7c9deb 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -33,10 +33,9 @@ from twisted.trial.unittest import TestCase from twisted.python.failure import Failure import os from lbrynet.dht.node import Node -from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker +from tests.mocks import DummyBlobAvailabilityTracker from lbrynet.core.PeerManager import PeerManager from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory @@ -272,7 +271,6 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat server_port = None query_handler_factories = { - BlobAvailabilityHandlerFactory(session.blob_manager): True, BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, session.wallet.get_wallet_info_query_handler_factory(): True, @@ -398,7 +396,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra server_port = None query_handler_factories = { - BlobAvailabilityHandlerFactory(session.blob_manager): True, BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, session.wallet.get_wallet_info_query_handler_factory(): True, diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index eaaf150fd..7dd0ad6e7 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -12,7 +12,6 @@ from lbrynet.core import PeerManager from lbrynet.core import RateLimiter from lbrynet.core import Session from lbrynet.core import StreamDescriptor -from lbrynet.core import BlobAvailability from lbrynet.dht.node import Node from lbrynet.lbryfile import EncryptedFileMetadataManager from lbrynet.lbryfile.client import EncryptedFileOptions @@ -94,7 +93,7 @@ class TestReflector(unittest.TestCase): use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=BlobAvailability.DummyBlobAvailabilityTracker, + blob_tracker_class=mocks.DummyBlobAvailabilityTracker, dht_node_class=Node ) diff --git a/tests/mocks.py b/tests/mocks.py index 6ea183195..40e37dcca 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -1,9 +1,11 @@ import io from Crypto.PublicKey import RSA +from decimal import Decimal from twisted.internet import defer, threads, task, error from lbrynet.core import PTCWallet +from lbrynet.core.BlobAvailability import BlobAvailabilityTracker class Node(object): @@ -134,6 +136,43 @@ class GenFile(io.RawIOBase): return output +class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): + """ + Class to track peer counts for known blobs, and to discover new popular blobs + + Attributes: + availability (dict): dictionary of peers for known blobs + """ + + def __init__(self, blob_manager=None, peer_finder=None, dht_node=None): + self.availability = { + '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'], + 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'], + '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4'], + '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + } + self.last_mean_availability = Decimal(0.0) + self._blob_manager = None + self._peer_finder = PeerFinder(11223, 11224, 2) + self._dht_node = None + self._check_popular = None + self._check_mine = None + self._get_mean_peers() + + def start(self): + pass + + def stop(self): + pass + + + create_stream_sd_file = { 'stream_name': '746573745f66696c65', 'blobs': [ diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index a54ca3303..31d7e48ee 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -1,14 +1,14 @@ import StringIO import mock -from twisted.internet import defer, protocol +from twisted.internet import defer from twisted.test import proto_helpers from twisted.trial import unittest from lbrynet.core import Peer from lbrynet.core.server import BlobRequestHandler from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager -from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker +from tests.mocks import DummyBlobAvailabilityTracker class TestBlobRequestHandlerQueries(unittest.TestCase): diff --git a/tests/unit/core/test_Strategy.py b/tests/unit/core/test_Strategy.py index ac98199c5..6d84a0f88 100644 --- a/tests/unit/core/test_Strategy.py +++ b/tests/unit/core/test_Strategy.py @@ -4,8 +4,7 @@ import mock from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy from lbrynet.core.Offer import Offer -from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker - +from tests.mocks import DummyBlobAvailabilityTracker MAX_NEGOTIATION_TURNS = 10 @@ -40,18 +39,18 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase): offer = client_strategy.make_offer(host, blobs) response1 = host_strategy.respond_to_offer(offer, client, blobs) - client_strategy.offer_accepted(host, response1) + client_strategy.update_accepted_offers(host, response1) offer = client_strategy.make_offer(host, blobs) response2 = host_strategy.respond_to_offer(offer, client, blobs) - client_strategy.offer_accepted(host, response2) + client_strategy.update_accepted_offers(host, response2) - self.assertEquals(response1.too_low, False) - self.assertEquals(response1.accepted, True) + self.assertEquals(response1.is_too_low, False) + self.assertEquals(response1.is_accepted, True) self.assertEquals(response1.rate, 0.0) - self.assertEquals(response2.too_low, False) - self.assertEquals(response2.accepted, True) + self.assertEquals(response2.is_too_low, False) + self.assertEquals(response2.is_accepted, True) self.assertEquals(response2.rate, 0.0) def test_how_many_turns_before_accept_with_similar_rate_settings(self): @@ -169,7 +168,6 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase): turns += 1 self.assertGreater(MAX_NEGOTIATION_TURNS, turns) - def test_how_many_turns_with_generous_host(self): blobs = [ 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',