This commit is contained in:
Jack 2016-10-13 13:35:55 -04:00
parent 782e197e66
commit 19c2264429
19 changed files with 235 additions and 179 deletions

View file

@ -2,7 +2,6 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from lbrynet.core.PeerFinder import DummyPeerFinder
from decimal import Decimal from decimal import Decimal
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -48,7 +47,7 @@ class BlobAvailabilityTracker(object):
def get_availability_for_blobs(self, blobs): def get_availability_for_blobs(self, blobs):
dl = [self.get_blob_availability(blob) for blob in blobs if blob] dl = [self.get_blob_availability(blob) for blob in blobs if blob]
d = defer.DeferredList(dl) d = defer.DeferredList(dl)
d.addCallback(lambda results: [r[1] for r in results]) d.addCallback(lambda results: [val for success, val in results if success])
return d return d
def _update_peers_for_blob(self, blob): def _update_peers_for_blob(self, blob):
@ -62,15 +61,15 @@ class BlobAvailabilityTracker(object):
d.addCallback(lambda peers: _save_peer_info(blob, peers)) d.addCallback(lambda peers: _save_peer_info(blob, peers))
return d return d
def _update_most_popular(self): def _get_most_popular(self):
def _get_most_popular(): dl = []
dl = [] for (hash, _) in self._dht_node.get_most_popular_hashes(100):
for (hash, _) in self._dht_node.get_most_popular_hashes(100): encoded = hash.encode('hex')
encoded = hash.encode('hex') dl.append(self._update_peers_for_blob(encoded))
dl.append(self._update_peers_for_blob(encoded)) return defer.DeferredList(dl)
return defer.DeferredList(dl)
d = _get_most_popular() def _update_most_popular(self):
d = self._get_most_popular()
d.addCallback(lambda _: self._get_mean_peers()) d.addCallback(lambda _: self._get_mean_peers())
def _update_mine(self): def _update_mine(self):
@ -88,39 +87,3 @@ class BlobAvailabilityTracker(object):
num_peers = [len(self.availability[blob]) for blob in self.availability] num_peers = [len(self.availability[blob]) for blob in self.availability]
mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers))) mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers)))
self.last_mean_availability = mean self.last_mean_availability = mean
class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
"""
Class to track peer counts for known blobs, and to discover new popular blobs
Attributes:
availability (dict): dictionary of peers for known blobs
"""
def __init__(self, blob_manager=None, peer_finder=None, dht_node=None):
self.availability = {
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'],
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'],
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4'],
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
}
self.last_mean_availability = Decimal(0.0)
self._blob_manager = blob_manager
self._peer_finder = DummyPeerFinder()
self._dht_node = dht_node
self._check_popular = None
self._check_mine = None
self._get_mean_peers()
def start(self):
pass
def stop(self):
pass

View file

@ -2,9 +2,7 @@ from decimal import Decimal
class Offer(object): class Offer(object):
""" """A rate offer to download blobs from a host."""
A rate offer to download blobs from a host
"""
RATE_ACCEPTED = "RATE_ACCEPTED" RATE_ACCEPTED = "RATE_ACCEPTED"
RATE_TOO_LOW = "RATE_TOO_LOW" RATE_TOO_LOW = "RATE_TOO_LOW"
@ -21,11 +19,11 @@ class Offer(object):
self.unset() self.unset()
@property @property
def accepted(self): def is_accepted(self):
return self._state is Offer.RATE_ACCEPTED return self._state is Offer.RATE_ACCEPTED
@property @property
def too_low(self): def is_too_low(self):
return self._state is Offer.RATE_TOO_LOW return self._state is Offer.RATE_TOO_LOW
@property @property
@ -34,9 +32,9 @@ class Offer(object):
@property @property
def message(self): def message(self):
if self.accepted: if self.is_accepted:
return Offer.RATE_ACCEPTED return Offer.RATE_ACCEPTED
elif self.too_low: elif self.is_too_low:
return Offer.RATE_TOO_LOW return Offer.RATE_TOO_LOW
elif self.is_unset: elif self.is_unset:
return Offer.RATE_UNSET return Offer.RATE_UNSET

View file

@ -1,9 +1,10 @@
from lbrynet.core.Strategy import get_default_strategy from lbrynet.core.Strategy import get_default_strategy
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE
class BasePaymentRateManager(object): class BasePaymentRateManager(object):
def __init__(self, rate): def __init__(self, rate=MIN_BLOB_DATA_PAYMENT_RATE, info_rate=MIN_BLOB_INFO_PAYMENT_RATE):
self.min_blob_data_payment_rate = rate self.min_blob_data_payment_rate = rate
self.min_blob_info_payment_rate = info_rate
class PaymentRateManager(object): class PaymentRateManager(object):
@ -41,11 +42,12 @@ class NegotiatedPaymentRateManager(object):
""" """
self.base = base self.base = base
self.min_blob_data_payment_rate = self.base.min_blob_data_payment_rate
self.points_paid = 0.0 self.points_paid = 0.0
self.blob_tracker = availability_tracker self.blob_tracker = availability_tracker
self.generous = generous self.generous = generous
self.strategy = get_default_strategy(self.blob_tracker, base_price=self.min_blob_data_payment_rate, is_generous=generous) self.strategy = get_default_strategy(self.blob_tracker,
base_price=self.base.min_blob_data_payment_rate,
is_generous=generous)
def get_rate_blob_data(self, peer, blobs): def get_rate_blob_data(self, peer, blobs):
response = self.strategy.make_offer(peer, blobs) response = self.strategy.make_offer(peer, blobs)
@ -53,12 +55,12 @@ class NegotiatedPaymentRateManager(object):
def accept_rate_blob_data(self, peer, blobs, offer): def accept_rate_blob_data(self, peer, blobs, offer):
offer = self.strategy.respond_to_offer(offer, peer, blobs) offer = self.strategy.respond_to_offer(offer, peer, blobs)
self.strategy.offer_accepted(peer, offer) self.strategy.update_accepted_offers(peer, offer)
return offer.accepted return offer.is_accepted
def reply_to_offer(self, peer, blobs, offer): def reply_to_offer(self, peer, blobs, offer):
reply = self.strategy.respond_to_offer(offer, peer, blobs) reply = self.strategy.respond_to_offer(offer, peer, blobs)
self.strategy.offer_accepted(peer, reply) self.strategy.update_accepted_offers(peer, reply)
return reply return reply
def get_rate_for_peer(self, peer): def get_rate_for_peer(self, peer):
@ -68,4 +70,4 @@ class NegotiatedPaymentRateManager(object):
self.points_paid += amount self.points_paid += amount
def record_offer_reply(self, peer, offer): def record_offer_reply(self, peer, offer):
self.strategy.offer_accepted(peer, offer) self.strategy.update_accepted_offers(peer, offer)

View file

@ -1,5 +1,6 @@
from collections import defaultdict
import datetime import datetime
from collections import defaultdict
from lbrynet.core import utils
class Peer(object): class Peer(object):
@ -12,7 +13,7 @@ class Peer(object):
self.stats = defaultdict(float) # {string stat_type, float count} self.stats = defaultdict(float) # {string stat_type, float count}
def is_available(self): def is_available(self):
if self.attempt_connection_at is None or datetime.datetime.today() > self.attempt_connection_at: if self.attempt_connection_at is None or utils.today() > self.attempt_connection_at:
return True return True
return False return False
@ -23,7 +24,7 @@ class Peer(object):
def report_down(self): def report_down(self):
self.down_count += 1 self.down_count += 1
timeout_time = datetime.timedelta(seconds=60 * self.down_count) timeout_time = datetime.timedelta(seconds=60 * self.down_count)
self.attempt_connection_at = datetime.datetime.today() + timeout_time self.attempt_connection_at = utils.today() + timeout_time
def update_score(self, score_change): def update_score(self, score_change):
self.score += score_change self.score += score_change

View file

@ -1,9 +1,12 @@
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from zope.interface import implementer
from decimal import Decimal from decimal import Decimal
from lbrynet.interfaces import IBlobPriceModel
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
def get_default_price_model(blob_tracker, **kwargs):
return MeanAvailabilityWeightedPrice(blob_tracker, **kwargs) def get_default_price_model(blob_tracker, base_price, **kwargs):
return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs)
class MeanAvailabilityWeightedPrice(object): class MeanAvailabilityWeightedPrice(object):
@ -11,12 +14,12 @@ class MeanAvailabilityWeightedPrice(object):
Calculate mean-blob-availability and stream-position weighted price for a blob Calculate mean-blob-availability and stream-position weighted price for a blob
Attributes: Attributes:
min_price (float): minimum accepted price base_price (float): base price
base_price (float): base price to shift from alpha (float): constant, > 0.0 and <= 1.0, used to more highly value blobs at the beginning of a stream.
alpha (float): constant used to more highly value blobs at the beginning of a stream alpha defaults to 1.0, which has a null effect
alpha defaults to 1.0, which has a null effect
blob_tracker (BlobAvailabilityTracker): blob availability tracker blob_tracker (BlobAvailabilityTracker): blob availability tracker
""" """
implementer(IBlobPriceModel)
def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0):
self.blob_tracker = tracker self.blob_tracker = tracker
@ -32,10 +35,13 @@ class MeanAvailabilityWeightedPrice(object):
def _frontload(self, index): def _frontload(self, index):
""" """
Get frontload multipler Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream.
At index 0, returns 1.0
As index increases, return value approaches 2.0
@param index: blob position in stream @param index: blob position in stream
@return: frontload multipler @return: front-load multiplier
""" """
return Decimal(2.0) - (self.alpha ** index) return Decimal(2.0) - (self.alpha ** index)

View file

@ -27,9 +27,9 @@ class Session(object):
the rate limiter, which attempts to ensure download and upload rates stay below a set maximum, the rate limiter, which attempts to ensure download and upload rates stay below a set maximum,
and upnp, which opens holes in compatible firewalls so that remote peers can connect to this peer.""" and upnp, which opens holes in compatible firewalls so that remote peers can connect to this peer."""
def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker_class=None): blob_tracker_class=None, payment_rate_manager_class=None):
""" """
@param blob_data_payment_rate: The default payment rate for blob data @param blob_data_payment_rate: The default payment rate for blob data
@ -108,6 +108,7 @@ class Session(object):
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = None self.payment_rate_manager = None
self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
def setup(self): def setup(self):
"""Create the blob directory and database if necessary, start all desired services""" """Create the blob directory and database if necessary, start all desired services"""
@ -260,12 +261,17 @@ class Session(object):
if self.blob_dir is None: if self.blob_dir is None:
self.blob_manager = TempBlobManager(self.hash_announcer) self.blob_manager = TempBlobManager(self.hash_announcer)
else: else:
self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) self.blob_manager = DiskBlobManager(self.hash_announcer,
self.blob_dir,
self.db_dir)
if self.blob_tracker is None: if self.blob_tracker is None:
self.blob_tracker = self.blob_tracker_class(self.blob_manager, self.peer_finder, self.dht_node) self.blob_tracker = self.blob_tracker_class(self.blob_manager,
self.peer_finder,
self.dht_node)
if self.payment_rate_manager is None: if self.payment_rate_manager is None:
self.payment_rate_manager = NegotiatedPaymentRateManager(self.base_payment_rate_manager, self.blob_tracker) self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager,
self.blob_tracker)
self.rate_limiter.start() self.rate_limiter.start()
d1 = self.blob_manager.setup() d1 = self.blob_manager.setup()

View file

@ -1,17 +1,20 @@
import logging from zope.interface import implementer
from decimal import Decimal from decimal import Decimal
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.interfaces import INegotiationStrategy
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice
log = logging.getLogger(__name__)
def get_default_strategy(blob_tracker, **kwargs): def get_default_strategy(blob_tracker, **kwargs):
return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs) return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs)
class BaseStrategy(object): class Strategy(object):
"""
Base for negotiation strategies
"""
implementer(INegotiationStrategy)
def __init__(self, price_model, max_rate, min_rate, is_generous=True): def __init__(self, price_model, max_rate, min_rate, is_generous=True):
self.price_model = price_model self.price_model = price_model
self.is_generous = is_generous self.is_generous = is_generous
@ -21,24 +24,15 @@ class BaseStrategy(object):
self.max_rate = max_rate or Decimal(self.price_model.base_price * 100) self.max_rate = max_rate or Decimal(self.price_model.base_price * 100)
self.min_rate = Decimal(min_rate) self.min_rate = Decimal(min_rate)
def add_offer_sent(self, peer): def _make_rate_offer(self, rates, offer_count):
turn = self.offers_sent.get(peer, 0) + 1 return NotImplementedError()
self.offers_sent.update({peer: turn})
def add_offer_received(self, peer): def _get_response_rate(self, rates, offer_count):
turn = self.offers_received.get(peer, 0) + 1 return NotImplementedError()
self.offers_received.update({peer: turn})
def calculate_price_target(self, *args):
return self.price_model.calculate_price(*args)
def bounded_price(self, price):
price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate)))
return price_for_return
def make_offer(self, peer, blobs): def make_offer(self, peer, blobs):
offer_count = self.offers_sent.get(peer, 0) offer_count = self.offers_sent.get(peer, 0)
self.add_offer_sent(peer) self._add_offer_sent(peer)
if peer in self.accepted_offers: if peer in self.accepted_offers:
# if there was a previous accepted offer, use that # if there was a previous accepted offer, use that
offer = self.accepted_offers[peer] offer = self.accepted_offers[peer]
@ -46,67 +40,65 @@ class BaseStrategy(object):
# Try asking for it for free # Try asking for it for free
offer = Offer(Decimal(0.0)) offer = Offer(Decimal(0.0))
else: else:
rates = [self.calculate_price_target(blob) for blob in blobs] rates = [self.price_model.calculate_price(blob) for blob in blobs]
price = self._make_offer(rates, offer_count) price = self._make_rate_offer(rates, offer_count)
bounded_price = self.bounded_price(price) offer = Offer(price)
offer = Offer(bounded_price)
log.debug("Offering: %s", offer.rate)
return offer return offer
def offer_accepted(self, peer, offer):
if not offer.accepted and peer in self.accepted_offers:
del self.accepted_offers[peer]
log.debug("Throwing out old accepted offer")
if offer.accepted:
self.accepted_offers.update({peer: offer})
log.debug("Updated accepted offer %f", offer.rate)
def respond_to_offer(self, offer, peer, blobs): def respond_to_offer(self, offer, peer, blobs):
offer_count = self.offers_received.get(peer, 0) offer_count = self.offers_received.get(peer, 0)
self.add_offer_received(peer) self._add_offer_received(peer)
rates = [self.calculate_price_target(blob) for blob in blobs] rates = [self.price_model.calculate_price(blob) for blob in blobs]
price = self._respond_to_offer(rates, offer_count) price = self._get_response_rate(rates, offer_count)
bounded_price = self.bounded_price(price)
log.debug("Price target: %f", price)
if peer in self.accepted_offers: if peer in self.accepted_offers:
offer = self.accepted_offers[peer] offer = self.accepted_offers[peer]
log.debug("Already accepted %f", offer.rate)
elif offer.rate == 0.0 and offer_count == 0 and self.is_generous: elif offer.rate == 0.0 and offer_count == 0 and self.is_generous:
# give blobs away for free by default on the first request # give blobs away for free by default on the first request
offer.accept() offer.accept()
self.accepted_offers.update({peer: offer}) self.accepted_offers.update({peer: offer})
elif offer.rate >= bounded_price: elif offer.rate >= price:
log.debug("Accept: %f", offer.rate)
offer.accept() offer.accept()
self.accepted_offers.update({peer: offer}) self.accepted_offers.update({peer: offer})
else: else:
log.debug("Reject: %f", offer.rate)
offer.reject() offer.reject()
if peer in self.accepted_offers: if peer in self.accepted_offers:
del self.accepted_offers[peer] del self.accepted_offers[peer]
return offer return offer
def _make_offer(self, rates, offer_count): def update_accepted_offers(self, peer, offer):
return NotImplementedError() if not offer.is_accepted and peer in self.accepted_offers:
del self.accepted_offers[peer]
if offer.is_accepted:
self.accepted_offers.update({peer: offer})
def _respond_to_offer(self, rates, offer_count): def _add_offer_sent(self, peer):
return NotImplementedError() turn = self.offers_sent.get(peer, 0) + 1
self.offers_sent.update({peer: turn})
def _add_offer_received(self, peer):
turn = self.offers_received.get(peer, 0) + 1
self.offers_received.update({peer: turn})
def _bounded_price(self, price):
price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate)))
return price_for_return
class BasicAvailabilityWeightedStrategy(BaseStrategy): class BasicAvailabilityWeightedStrategy(Strategy):
""" """
Basic strategy to target blob prices based on supply relative to mean supply Basic strategy to target blob prices based on supply relative to mean supply
Discount price target with each incoming request, and raise it with each outgoing from the modeled price Discount price target with each incoming request, and raise it with each outgoing from the modeled price
until the rate is accepted or a threshold is reached until the rate is accepted or a threshold is reached
""" """
implementer(INegotiationStrategy)
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0, def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0,
is_generous=True, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): is_generous=True, base_price=0.0001, alpha=1.0):
price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha) price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha)
BaseStrategy.__init__(self, price_model, max_rate, min_rate, is_generous) Strategy.__init__(self, price_model, max_rate, min_rate, is_generous)
self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer
self._deceleration = Decimal(deceleration) self._deceleration = Decimal(deceleration)
def _get_mean_rate(self, rates): def _get_mean_rate(self, rates):
@ -119,12 +111,14 @@ class BasicAvailabilityWeightedStrategy(BaseStrategy):
def _discount(self, rate, turn): def _discount(self, rate, turn):
return rate * (self._deceleration ** Decimal(turn)) return rate * (self._deceleration ** Decimal(turn))
def _respond_to_offer(self, rates, offer_count): def _get_response_rate(self, rates, offer_count):
rate = self._get_mean_rate(rates) rate = self._get_mean_rate(rates)
discounted = self._discount(rate, offer_count) discounted = self._discount(rate, offer_count)
return round(discounted, 5) rounded_price = round(discounted, 5)
return self._bounded_price(rounded_price)
def _make_offer(self, rates, offer_count): def _make_rate_offer(self, rates, offer_count):
rate = self._get_mean_rate(rates) rate = self._get_mean_rate(rates)
with_premium = self._premium(rate, offer_count) with_premium = self._premium(rate, offer_count)
return round(with_premium, 5) rounded_price = round(with_premium, 5)
return self._bounded_price(rounded_price)

View file

@ -29,6 +29,7 @@ def get_points(num_bytes, rate):
def cache(fn): def cache(fn):
"""Caches the function call for each instance""" """Caches the function call for each instance"""
attr = '__{}_value'.format(fn.__name__) attr = '__{}_value'.format(fn.__name__)
def helper(self): def helper(self):
if not hasattr(self, attr): if not hasattr(self, attr):
value = fn(self) value = fn(self)
@ -345,9 +346,9 @@ class PriceRequest(RequestHelper):
def _get_price_request(self): def _get_price_request(self):
rate = self.get_and_save_rate() rate = self.get_and_save_rate()
if rate is None: if rate is None:
log.debug("No blobs to request from %s", str(self.peer)) log.debug("No blobs to request from %s", self.peer)
raise Exception('Cannot make a price request without a payment rate') raise Exception('Cannot make a price request without a payment rate')
log.debug("Offer rate %s to %s for %i blobs", str(rate), str(self.peer), len(self.available_blobs)) log.debug("Offer rate %s to %s for %i blobs", rate, self.peer, len(self.available_blobs))
request_dict = {'blob_data_payment_rate': rate} request_dict = {'blob_data_payment_rate': rate}
return ClientRequest(request_dict, 'blob_data_payment_rate') return ClientRequest(request_dict, 'blob_data_payment_rate')
@ -367,11 +368,11 @@ class PriceRequest(RequestHelper):
offer.handle(response_dict['blob_data_payment_rate']) offer.handle(response_dict['blob_data_payment_rate'])
self.payment_rate_manager.record_offer_reply(self.peer.host, offer) self.payment_rate_manager.record_offer_reply(self.peer.host, offer)
if offer.accepted: if offer.is_accepted:
log.info("Offered rate %f/mb accepted by %s", rate, str(self.peer.host)) log.debug("Offered rate %f/mb accepted by %s", rate, str(self.peer.host))
return True return True
elif offer.too_low: elif offer.is_too_low:
log.info("Offered rate %f/mb rejected by %s", rate, str(self.peer.host)) log.debug("Offered rate %f/mb rejected by %s", rate, str(self.peer.host))
del self.protocol_prices[self.protocol] del self.protocol_prices[self.protocol]
return True return True
else: else:

View file

@ -104,15 +104,15 @@ class BlobRequestHandler(object):
def _handle_payment_rate_query(self, offer, request): def _handle_payment_rate_query(self, offer, request):
blobs = self._blobs_requested blobs = self._blobs_requested
log.info("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs)) log.debug("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs))
reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer) reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer)
if reply.accepted: if reply.is_accepted:
self.blob_data_payment_rate = offer.rate self.blob_data_payment_rate = offer.rate
request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED"
log.info("Accepted rate: %f", offer.rate) log.debug("Accepted rate: %f", offer.rate)
elif reply.too_low: elif reply.is_too_low:
request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW"
log.info("Reject rate: %f", offer.rate) log.debug("Reject rate: %f", offer.rate)
elif reply.is_unset: elif reply.is_unset:
log.warning("Rate unset") log.warning("Rate unset")
request['incoming_blob'] = {'error': 'RATE_UNSET'} request['incoming_blob'] = {'error': 'RATE_UNSET'}

View file

@ -4,6 +4,7 @@ import random
import os import os
import json import json
import yaml import yaml
import datetime
from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.cryptoutils import get_lbry_hash_obj
@ -76,4 +77,8 @@ def save_settings(path, settings):
assert encoder is not False, "Unknown settings format .%s" % ext assert encoder is not False, "Unknown settings format .%s" % ext
f = open(path, 'w') f = open(path, 'w')
f.write(encoder(settings)) f.write(encoder(settings))
f.close() f.close()
def today():
return datetime.datetime.today()

View file

@ -647,4 +647,60 @@ class IWallet(Interface):
@type amount: float @type amount: float
@return: None @return: None
"""
class IBlobPriceModel(Interface):
"""
A blob price model
Used by INegotiationStrategy classes
"""
def calculate_price(self, blob):
"""
Calculate the price for a blob
@param blob: a blob hash
@type blob: str
@return: blob price target
@type: Decimal
"""
class INegotiationStrategy(Interface):
"""
Strategy to negotiate download payment rates
"""
def make_offer(self, peer, blobs):
"""
Make a rate offer for the given peer and blobs
@param peer: peer to make an offer to
@type: str
@param blobs: blob hashes to make an offer for
@type: list
@return: rate offer
@rtype: Offer
"""
def respond_to_offer(self, offer, peer, blobs):
"""
Respond to a rate offer given by a peer
@param offer: offer to reply to
@type: Offer
@param peer: peer to make an offer to
@type: str
@param blobs: blob hashes to make an offer for
@type: list
@return: accepted, rejected, or unset offer
@rtype: Offer
""" """

View file

@ -14,16 +14,16 @@ class EncryptedFileOptions(object):
prm = payment_rate_manager prm = payment_rate_manager
def get_default_data_rate_description(): def get_default_data_rate_description():
if prm.min_blob_data_payment_rate is None: if prm.base.min_blob_data_payment_rate is None:
return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate) return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)
else: else:
return "%f LBC/MB" % prm.min_blob_data_payment_rate return "%f LBC/MB" % prm.base.min_blob_data_payment_rate
rate_choices = [] rate_choices = []
rate_choices.append(DownloadOptionChoice(prm.min_blob_data_payment_rate, rate_choices.append(DownloadOptionChoice(prm.base.min_blob_data_payment_rate,
"No change - %s" % get_default_data_rate_description(), "No change - %s" % get_default_data_rate_description(),
"No change - %s" % get_default_data_rate_description())) "No change - %s" % get_default_data_rate_description()))
if prm.min_blob_data_payment_rate is not None: if prm.base.min_blob_data_payment_rate is not None:
rate_choices.append(DownloadOptionChoice(None, rate_choices.append(DownloadOptionChoice(None,
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate), "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate),
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate))) "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)))
@ -36,7 +36,7 @@ class EncryptedFileOptions(object):
rate_choices, rate_choices,
"Rate which will be paid for data", "Rate which will be paid for data",
"data payment rate", "data payment rate",
prm.min_blob_data_payment_rate, prm.base.min_blob_data_payment_rate,
get_default_data_rate_description() get_default_data_rate_description()
), ),
DownloadOption( DownloadOption(

View file

@ -20,7 +20,6 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, API_CONNECTION_STRING # ,
from lbrynet.core.utils import generate_id from lbrynet.core.utils import generate_id
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.PTCWallet import PTCWallet from lbrynet.core.PTCWallet import PTCWallet
@ -363,19 +362,14 @@ class Console():
def _setup_query_handlers(self): def _setup_query_handlers(self):
handlers = [ handlers = [
#CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet,
# self._server_payment_rate_manager),
BlobAvailabilityHandlerFactory(self.session.blob_manager),
#BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
# self._server_payment_rate_manager),
self.session.wallet.get_wallet_info_query_handler_factory(), self.session.wallet.get_wallet_info_query_handler_factory(),
] ]
def get_blob_request_handler_factory(rate): def get_blob_request_handler_factory(rate):
self.blob_request_payment_rate_manager = PaymentRateManager( self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager,
self.session.base_payment_rate_manager, rate rate)
) handlers.append(BlobRequestHandlerFactory(self.session.blob_manager,
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, self.session.wallet,
self.blob_request_payment_rate_manager)) self.blob_request_payment_rate_manager))
d1 = self.settings.get_server_data_payment_rate() d1 = self.settings.get_server_data_payment_rate()

View file

@ -773,9 +773,6 @@ class Daemon(jsonrpc.JSONRPC):
def _setup_query_handlers(self): def _setup_query_handlers(self):
handlers = [ handlers = [
# CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet,
# self._server_payment_rate_manager),
# BlobAvailabilityHandlerFactory(self.session.blob_manager),
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
self.session.payment_rate_manager), self.session.payment_rate_manager),
self.session.wallet.get_wallet_info_query_handler_factory(), self.session.wallet.get_wallet_info_query_handler_factory(),

View file

@ -33,10 +33,9 @@ from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure from twisted.python.failure import Failure
import os import os
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker from tests.mocks import DummyBlobAvailabilityTracker
from lbrynet.core.PeerManager import PeerManager from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
@ -272,7 +271,6 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True, session.payment_rate_manager): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,
@ -398,7 +396,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
server_port = None server_port = None
query_handler_factories = { query_handler_factories = {
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet, BlobRequestHandlerFactory(session.blob_manager, session.wallet,
session.payment_rate_manager): True, session.payment_rate_manager): True,
session.wallet.get_wallet_info_query_handler_factory(): True, session.wallet.get_wallet_info_query_handler_factory(): True,

View file

@ -12,7 +12,6 @@ from lbrynet.core import PeerManager
from lbrynet.core import RateLimiter from lbrynet.core import RateLimiter
from lbrynet.core import Session from lbrynet.core import Session
from lbrynet.core import StreamDescriptor from lbrynet.core import StreamDescriptor
from lbrynet.core import BlobAvailability
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from lbrynet.lbryfile import EncryptedFileMetadataManager from lbrynet.lbryfile import EncryptedFileMetadataManager
from lbrynet.lbryfile.client import EncryptedFileOptions from lbrynet.lbryfile.client import EncryptedFileOptions
@ -94,7 +93,7 @@ class TestReflector(unittest.TestCase):
use_upnp=False, use_upnp=False,
rate_limiter=rate_limiter, rate_limiter=rate_limiter,
wallet=wallet, wallet=wallet,
blob_tracker_class=BlobAvailability.DummyBlobAvailabilityTracker, blob_tracker_class=mocks.DummyBlobAvailabilityTracker,
dht_node_class=Node dht_node_class=Node
) )

View file

@ -1,9 +1,11 @@
import io import io
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
from decimal import Decimal
from twisted.internet import defer, threads, task, error from twisted.internet import defer, threads, task, error
from lbrynet.core import PTCWallet from lbrynet.core import PTCWallet
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker
class Node(object): class Node(object):
@ -134,6 +136,43 @@ class GenFile(io.RawIOBase):
return output return output
class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
"""
Class to track peer counts for known blobs, and to discover new popular blobs
Attributes:
availability (dict): dictionary of peers for known blobs
"""
def __init__(self, blob_manager=None, peer_finder=None, dht_node=None):
self.availability = {
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'],
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'],
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4'],
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
}
self.last_mean_availability = Decimal(0.0)
self._blob_manager = None
self._peer_finder = PeerFinder(11223, 11224, 2)
self._dht_node = None
self._check_popular = None
self._check_mine = None
self._get_mean_peers()
def start(self):
pass
def stop(self):
pass
create_stream_sd_file = { create_stream_sd_file = {
'stream_name': '746573745f66696c65', 'stream_name': '746573745f66696c65',
'blobs': [ 'blobs': [

View file

@ -1,14 +1,14 @@
import StringIO import StringIO
import mock import mock
from twisted.internet import defer, protocol from twisted.internet import defer
from twisted.test import proto_helpers from twisted.test import proto_helpers
from twisted.trial import unittest from twisted.trial import unittest
from lbrynet.core import Peer from lbrynet.core import Peer
from lbrynet.core.server import BlobRequestHandler from lbrynet.core.server import BlobRequestHandler
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker from tests.mocks import DummyBlobAvailabilityTracker
class TestBlobRequestHandlerQueries(unittest.TestCase): class TestBlobRequestHandlerQueries(unittest.TestCase):

View file

@ -4,8 +4,7 @@ import mock
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker from tests.mocks import DummyBlobAvailabilityTracker
MAX_NEGOTIATION_TURNS = 10 MAX_NEGOTIATION_TURNS = 10
@ -40,18 +39,18 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase):
offer = client_strategy.make_offer(host, blobs) offer = client_strategy.make_offer(host, blobs)
response1 = host_strategy.respond_to_offer(offer, client, blobs) response1 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.offer_accepted(host, response1) client_strategy.update_accepted_offers(host, response1)
offer = client_strategy.make_offer(host, blobs) offer = client_strategy.make_offer(host, blobs)
response2 = host_strategy.respond_to_offer(offer, client, blobs) response2 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.offer_accepted(host, response2) client_strategy.update_accepted_offers(host, response2)
self.assertEquals(response1.too_low, False) self.assertEquals(response1.is_too_low, False)
self.assertEquals(response1.accepted, True) self.assertEquals(response1.is_accepted, True)
self.assertEquals(response1.rate, 0.0) self.assertEquals(response1.rate, 0.0)
self.assertEquals(response2.too_low, False) self.assertEquals(response2.is_too_low, False)
self.assertEquals(response2.accepted, True) self.assertEquals(response2.is_accepted, True)
self.assertEquals(response2.rate, 0.0) self.assertEquals(response2.rate, 0.0)
def test_how_many_turns_before_accept_with_similar_rate_settings(self): def test_how_many_turns_before_accept_with_similar_rate_settings(self):
@ -169,7 +168,6 @@ class AvailabilityWeightedStrategyTests(unittest.TestCase):
turns += 1 turns += 1
self.assertGreater(MAX_NEGOTIATION_TURNS, turns) self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
def test_how_many_turns_with_generous_host(self): def test_how_many_turns_with_generous_host(self):
blobs = [ blobs = [
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',