Merge pull request #173 from lbryio/blob-prices

Blob prices
This commit is contained in:
Jack Robison 2016-10-13 14:59:26 -04:00 committed by GitHub
commit 32a973943f
35 changed files with 1091 additions and 329 deletions

View file

@ -14,7 +14,7 @@ MAX_BLOB_INFOS_TO_REQUEST = 20
BLOBFILES_DIR = ".blobfiles"
BLOB_SIZE = 2**21
MIN_BLOB_DATA_PAYMENT_RATE = .005 # points/megabyte
MIN_BLOB_DATA_PAYMENT_RATE = .0001 # points/megabyte
MIN_BLOB_INFO_PAYMENT_RATE = .02 # points/1000 infos
MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE = .05 # points/1000 infos
MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE = .05 # points/1000 infos
@ -28,12 +28,10 @@ KNOWN_DHT_NODES = [('104.236.42.182', 4000),
POINTTRADER_SERVER = 'http://ec2-54-187-192-68.us-west-2.compute.amazonaws.com:2424'
#POINTTRADER_SERVER = 'http://127.0.0.1:2424'
if IS_DEVELOPMENT_VERSION:
SEARCH_SERVERS = ["http://107.170.207.64:50005"]
else:
SEARCH_SERVERS = ["http://lighthouse1.lbry.io:50005",
"http://lighthouse2.lbry.io:50005",
"http://lighthouse3.lbry.io:50005"]
SEARCH_SERVERS = ["http://lighthouse1.lbry.io:50005",
"http://lighthouse2.lbry.io:50005",
"http://lighthouse3.lbry.io:50005"]
REFLECTOR_SERVERS = [("reflector.lbry.io", 5566)]

View file

@ -0,0 +1,89 @@
import logging
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from decimal import Decimal
log = logging.getLogger(__name__)
class BlobAvailabilityTracker(object):
"""
Class to track peer counts for known blobs, and to discover new popular blobs
Attributes:
availability (dict): dictionary of peers for known blobs
"""
def __init__(self, blob_manager, peer_finder, dht_node):
self.availability = {}
self.last_mean_availability = Decimal(0.0)
self._blob_manager = blob_manager
self._peer_finder = peer_finder
self._dht_node = dht_node
self._check_popular = LoopingCall(self._update_most_popular)
self._check_mine = LoopingCall(self._update_mine)
def start(self):
log.info("Starting blob tracker")
self._check_popular.start(30)
self._check_mine.start(120)
def stop(self):
if self._check_popular.running:
self._check_popular.stop()
if self._check_mine.running:
self._check_mine.stop()
def get_blob_availability(self, blob):
def _get_peer_count(peers):
have_blob = sum(1 for peer in peers if peer.is_available())
return {blob: have_blob}
d = self._peer_finder.find_peers_for_blob(blob)
d.addCallback(_get_peer_count)
return d
def get_availability_for_blobs(self, blobs):
dl = [self.get_blob_availability(blob) for blob in blobs if blob]
d = defer.DeferredList(dl)
d.addCallback(lambda results: [val for success, val in results if success])
return d
def _update_peers_for_blob(self, blob):
def _save_peer_info(blob_hash, peers):
v = {blob_hash: peers}
self.availability.update(v)
return v
d = self._peer_finder.find_peers_for_blob(blob)
d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r])
d.addCallback(lambda peers: _save_peer_info(blob, peers))
return d
def _get_most_popular(self):
dl = []
for (hash, _) in self._dht_node.get_most_popular_hashes(100):
encoded = hash.encode('hex')
dl.append(self._update_peers_for_blob(encoded))
return defer.DeferredList(dl)
def _update_most_popular(self):
d = self._get_most_popular()
d.addCallback(lambda _: self._get_mean_peers())
def _update_mine(self):
def _get_peers(blobs):
dl = []
for hash in blobs:
dl.append(self._update_peers_for_blob(hash))
return defer.DeferredList(dl)
d = self._blob_manager.get_all_verified_blobs()
d.addCallback(_get_peers)
d.addCallback(lambda _: self._get_mean_peers())
def _get_mean_peers(self):
num_peers = [len(self.availability[blob]) for blob in self.availability]
mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers)))
self.last_mean_availability = mean

View file

@ -2,6 +2,7 @@ import logging
import os
import time
import sqlite3
from twisted.internet import threads, defer
from twisted.python.failure import Failure
from twisted.enterprise import adbapi
@ -12,7 +13,6 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj
from lbrynet.core.Error import NoSuchBlobError
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
@ -70,6 +70,12 @@ class BlobManager(DHTHashSupplier):
def get_all_verified_blobs(self):
pass
def add_blob_to_download_history(self, blob_hash, host, rate):
pass
def add_blob_to_upload_history(self, blob_hash, host, rate):
pass
class DiskBlobManager(BlobManager):
"""This class stores blobs on the hard disk"""
@ -185,6 +191,14 @@ class DiskBlobManager(BlobManager):
d.addCallback(self.completed_blobs)
return d
def add_blob_to_download_history(self, blob_hash, host, rate):
d = self._add_blob_to_download_history(blob_hash, host, rate)
return d
def add_blob_to_upload_history(self, blob_hash, host, rate):
d = self._add_blob_to_upload_history(blob_hash, host, rate)
return d
def _manage(self):
from twisted.internet import reactor
@ -243,12 +257,29 @@ class DiskBlobManager(BlobManager):
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False)
return self.db_conn.runQuery("create table if not exists blobs (" +
" blob_hash text primary key, " +
" blob_length integer, " +
" last_verified_time real, " +
" next_announce_time real"
")")
def create_tables(transaction):
transaction.execute("create table if not exists blobs (" +
" blob_hash text primary key, " +
" blob_length integer, " +
" last_verified_time real, " +
" next_announce_time real)")
transaction.execute("create table if not exists download (" +
" id integer primary key autoincrement, " +
" blob text, " +
" host text, " +
" rate float, " +
" ts integer)")
transaction.execute("create table if not exists upload (" +
" id integer primary key autoincrement, " +
" blob text, " +
" host text, " +
" rate float, " +
" ts integer)")
return self.db_conn.runInteraction(create_tables)
@rerun_if_locked
def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None):
@ -426,6 +457,18 @@ class DiskBlobManager(BlobManager):
d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs))
return d
@rerun_if_locked
def _add_blob_to_download_history(self, blob_hash, host, rate):
ts = int(time.time())
d = self.db_conn.runQuery("insert into download values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts))
return d
@rerun_if_locked
def _add_blob_to_upload_history(self, blob_hash, host, rate):
ts = int(time.time())
d = self.db_conn.runQuery("insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts))
return d
class TempBlobManager(BlobManager):
"""This class stores blobs in memory"""
@ -526,7 +569,6 @@ class TempBlobManager(BlobManager):
d.addCallback(lambda _: set_next_manage_call())
def _delete_blobs_marked_for_deletion(self):
def remove_from_list(b_h):
del self.blob_hashes_to_delete[b_h]
log.info("Deleted blob %s", blob_hash)
@ -555,4 +597,4 @@ class TempBlobManager(BlobManager):
d = defer.fail(Failure(NoSuchBlobError(blob_hash)))
log.warning("Blob %s cannot be deleted because it is unknown")
ds.append(d)
return defer.DeferredList(ds)
return defer.DeferredList(ds)

View file

@ -87,4 +87,8 @@ class NoSuchStreamHashError(Exception):
class InvalidBlobHashError(Exception):
pass
class NegotiationError(Exception):
pass

62
lbrynet/core/Offer.py Normal file
View file

@ -0,0 +1,62 @@
from decimal import Decimal
class Offer(object):
"""A rate offer to download blobs from a host."""
RATE_ACCEPTED = "RATE_ACCEPTED"
RATE_TOO_LOW = "RATE_TOO_LOW"
RATE_UNSET = "RATE_UNSET"
def __init__(self, offer):
self._state = None
self.rate = None
if isinstance(offer, Decimal):
self.rate = round(offer, 5)
elif isinstance(offer, float):
self.rate = round(Decimal(offer), 5)
if self.rate is None or self.rate < Decimal(0.0):
self.unset()
@property
def is_accepted(self):
return self._state is Offer.RATE_ACCEPTED
@property
def is_too_low(self):
return self._state is Offer.RATE_TOO_LOW
@property
def is_unset(self):
return self._state is Offer.RATE_UNSET
@property
def message(self):
if self.is_accepted:
return Offer.RATE_ACCEPTED
elif self.is_too_low:
return Offer.RATE_TOO_LOW
elif self.is_unset:
return Offer.RATE_UNSET
return None
def accept(self):
if self.is_unset or self._state is None:
self._state = Offer.RATE_ACCEPTED
def reject(self):
if self.is_unset or self._state is None:
self._state = Offer.RATE_TOO_LOW
def unset(self):
self._state = Offer.RATE_UNSET
def handle(self, reply_message):
if reply_message == Offer.RATE_TOO_LOW:
self.reject()
elif reply_message == Offer.RATE_ACCEPTED:
self.accept()
elif reply_message == Offer.RATE_UNSET:
self.unset()
else:
raise Exception("Unknown offer reply %s" % str(reply_message))

View file

@ -1,6 +1,10 @@
from lbrynet.core.Strategy import get_default_strategy
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE
class BasePaymentRateManager(object):
def __init__(self, rate):
def __init__(self, rate=MIN_BLOB_DATA_PAYMENT_RATE, info_rate=MIN_BLOB_INFO_PAYMENT_RATE):
self.min_blob_data_payment_rate = rate
self.min_blob_info_payment_rate = info_rate
class PaymentRateManager(object):
@ -26,4 +30,44 @@ class PaymentRateManager(object):
return self.min_blob_data_payment_rate
def record_points_paid(self, amount):
self.points_paid += amount
self.points_paid += amount
class NegotiatedPaymentRateManager(object):
def __init__(self, base, availability_tracker, generous=True):
"""
@param base: a BasePaymentRateManager
@param availability_tracker: a BlobAvailabilityTracker
@param rate: the min blob data payment rate
"""
self.base = base
self.points_paid = 0.0
self.blob_tracker = availability_tracker
self.generous = generous
self.strategy = get_default_strategy(self.blob_tracker,
base_price=self.base.min_blob_data_payment_rate,
is_generous=generous)
def get_rate_blob_data(self, peer, blobs):
response = self.strategy.make_offer(peer, blobs)
return response.rate
def accept_rate_blob_data(self, peer, blobs, offer):
offer = self.strategy.respond_to_offer(offer, peer, blobs)
self.strategy.update_accepted_offers(peer, offer)
return offer.is_accepted
def reply_to_offer(self, peer, blobs, offer):
reply = self.strategy.respond_to_offer(offer, peer, blobs)
self.strategy.update_accepted_offers(peer, reply)
return reply
def get_rate_for_peer(self, peer):
return self.strategy.accepted_offers.get(peer, False)
def record_points_paid(self, amount):
self.points_paid += amount
def record_offer_reply(self, peer, offer):
self.strategy.update_accepted_offers(peer, offer)

View file

@ -1,5 +1,6 @@
from collections import defaultdict
import datetime
from collections import defaultdict
from lbrynet.core import utils
class Peer(object):
@ -12,8 +13,7 @@ class Peer(object):
self.stats = defaultdict(float) # {string stat_type, float count}
def is_available(self):
if (self.attempt_connection_at is None or
datetime.datetime.today() > self.attempt_connection_at):
if self.attempt_connection_at is None or utils.today() > self.attempt_connection_at:
return True
return False
@ -24,7 +24,7 @@ class Peer(object):
def report_down(self):
self.down_count += 1
timeout_time = datetime.timedelta(seconds=60 * self.down_count)
self.attempt_connection_at = datetime.datetime.today() + timeout_time
self.attempt_connection_at = utils.today() + timeout_time
def update_score(self, score_change):
self.score += score_change
@ -37,4 +37,3 @@ class Peer(object):
def __repr__(self):
return 'Peer({!r}, {!r})'.format(self.host, self.port)

View file

@ -0,0 +1,47 @@
from zope.interface import implementer
from decimal import Decimal
from lbrynet.interfaces import IBlobPriceModel
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
def get_default_price_model(blob_tracker, base_price, **kwargs):
return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs)
class MeanAvailabilityWeightedPrice(object):
"""
Calculate mean-blob-availability and stream-position weighted price for a blob
Attributes:
base_price (float): base price
alpha (float): constant, > 0.0 and <= 1.0, used to more highly value blobs at the beginning of a stream.
alpha defaults to 1.0, which has a null effect
blob_tracker (BlobAvailabilityTracker): blob availability tracker
"""
implementer(IBlobPriceModel)
def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0):
self.blob_tracker = tracker
self.base_price = Decimal(base_price)
self.alpha = Decimal(alpha)
def calculate_price(self, blob):
mean_availability = self.blob_tracker.last_mean_availability
availability = self.blob_tracker.availability.get(blob, [])
index = 0 # blob.index
price = self.base_price * (mean_availability / Decimal(max(1, len(availability)))) / self._frontload(index)
return round(price, 5)
def _frontload(self, index):
"""
Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream.
At index 0, returns 1.0
As index increases, return value approaches 2.0
@param index: blob position in stream
@return: front-load multiplier
"""
return Decimal(2.0) - (self.alpha ** index)

View file

@ -9,7 +9,8 @@ from lbrynet.core.client.DHTPeerFinder import DHTPeerFinder
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer
from lbrynet.core.utils import generate_id
from lbrynet.core.PaymentRateManager import BasePaymentRateManager
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, NegotiatedPaymentRateManager
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker
from twisted.internet import threads, defer
@ -26,9 +27,9 @@ class Session(object):
the rate limiter, which attempts to ensure download and upload rates stay below a set maximum,
and upnp, which opens holes in compatible firewalls so that remote peers can connect to this peer."""
def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None,
blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True,
rate_limiter=None, wallet=None, dht_node_class=node.Node):
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None):
"""
@param blob_data_payment_rate: The default payment rate for blob data
@ -88,6 +89,9 @@ class Session(object):
self.blob_dir = blob_dir
self.blob_manager = blob_manager
self.blob_tracker = None
self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker
self.peer_port = peer_port
self.use_upnp = use_upnp
@ -103,6 +107,8 @@ class Session(object):
self.dht_node = None
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = None
self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
def setup(self):
"""Create the blob directory and database if necessary, start all desired services"""
@ -136,6 +142,8 @@ class Session(object):
def shut_down(self):
"""Stop all services"""
ds = []
if self.blob_manager is not None:
ds.append(defer.maybeDeferred(self.blob_tracker.stop))
if self.dht_node is not None:
ds.append(defer.maybeDeferred(self.dht_node.stop))
if self.rate_limiter is not None:
@ -253,15 +261,26 @@ class Session(object):
if self.blob_dir is None:
self.blob_manager = TempBlobManager(self.hash_announcer)
else:
self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir)
self.blob_manager = DiskBlobManager(self.hash_announcer,
self.blob_dir,
self.db_dir)
if self.blob_tracker is None:
self.blob_tracker = self.blob_tracker_class(self.blob_manager,
self.peer_finder,
self.dht_node)
if self.payment_rate_manager is None:
self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager,
self.blob_tracker)
self.rate_limiter.start()
d1 = self.blob_manager.setup()
d2 = self.wallet.start()
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True, consumeErrors=True)
dl.addCallback(lambda _: self.blob_tracker.start())
dl.addErrback(lambda err: err.value.subFailure)
dl.addErrback(self._subfailure)
return dl
def _unset_upnp(self):
@ -282,3 +301,9 @@ class Session(object):
d = threads.deferToThread(threaded_unset_upnp)
d.addErrback(lambda err: str(err))
return d
def _subfailure(self, err):
log.error(err.getTraceback())
return err.value

124
lbrynet/core/Strategy.py Normal file
View file

@ -0,0 +1,124 @@
from zope.interface import implementer
from decimal import Decimal
from lbrynet.interfaces import INegotiationStrategy
from lbrynet.core.Offer import Offer
from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice
def get_default_strategy(blob_tracker, **kwargs):
return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs)
class Strategy(object):
"""
Base for negotiation strategies
"""
implementer(INegotiationStrategy)
def __init__(self, price_model, max_rate, min_rate, is_generous=True):
self.price_model = price_model
self.is_generous = is_generous
self.accepted_offers = {}
self.offers_sent = {}
self.offers_received = {}
self.max_rate = max_rate or Decimal(self.price_model.base_price * 100)
self.min_rate = Decimal(min_rate)
def _make_rate_offer(self, rates, offer_count):
return NotImplementedError()
def _get_response_rate(self, rates, offer_count):
return NotImplementedError()
def make_offer(self, peer, blobs):
offer_count = self.offers_sent.get(peer, 0)
self._add_offer_sent(peer)
if peer in self.accepted_offers:
# if there was a previous accepted offer, use that
offer = self.accepted_offers[peer]
elif offer_count == 0 and self.is_generous:
# Try asking for it for free
offer = Offer(Decimal(0.0))
else:
rates = [self.price_model.calculate_price(blob) for blob in blobs]
price = self._make_rate_offer(rates, offer_count)
offer = Offer(price)
return offer
def respond_to_offer(self, offer, peer, blobs):
offer_count = self.offers_received.get(peer, 0)
self._add_offer_received(peer)
rates = [self.price_model.calculate_price(blob) for blob in blobs]
price = self._get_response_rate(rates, offer_count)
if peer in self.accepted_offers:
offer = self.accepted_offers[peer]
elif offer.rate == 0.0 and offer_count == 0 and self.is_generous:
# give blobs away for free by default on the first request
offer.accept()
self.accepted_offers.update({peer: offer})
elif offer.rate >= price:
offer.accept()
self.accepted_offers.update({peer: offer})
else:
offer.reject()
if peer in self.accepted_offers:
del self.accepted_offers[peer]
return offer
def update_accepted_offers(self, peer, offer):
if not offer.is_accepted and peer in self.accepted_offers:
del self.accepted_offers[peer]
if offer.is_accepted:
self.accepted_offers.update({peer: offer})
def _add_offer_sent(self, peer):
turn = self.offers_sent.get(peer, 0) + 1
self.offers_sent.update({peer: turn})
def _add_offer_received(self, peer):
turn = self.offers_received.get(peer, 0) + 1
self.offers_received.update({peer: turn})
def _bounded_price(self, price):
price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate)))
return price_for_return
class BasicAvailabilityWeightedStrategy(Strategy):
"""
Basic strategy to target blob prices based on supply relative to mean supply
Discount price target with each incoming request, and raise it with each outgoing from the modeled price
until the rate is accepted or a threshold is reached
"""
implementer(INegotiationStrategy)
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0,
is_generous=True, base_price=0.0001, alpha=1.0):
price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha)
Strategy.__init__(self, price_model, max_rate, min_rate, is_generous)
self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer
self._deceleration = Decimal(deceleration)
def _get_mean_rate(self, rates):
mean_rate = Decimal(sum(rates)) / Decimal(max(len(rates), 1))
return mean_rate
def _premium(self, rate, turn):
return rate * (self._acceleration ** Decimal(turn))
def _discount(self, rate, turn):
return rate * (self._deceleration ** Decimal(turn))
def _get_response_rate(self, rates, offer_count):
rate = self._get_mean_rate(rates)
discounted = self._discount(rate, offer_count)
rounded_price = round(discounted, 5)
return self._bounded_price(rounded_price)
def _make_rate_offer(self, rates, offer_count):
rate = self._get_mean_rate(rates)
with_premium = self._premium(rate, offer_count)
rounded_price = round(with_premium, 5)
return self._bounded_price(rounded_price)

View file

@ -550,6 +550,9 @@ class Wallet(object):
d.addCallback(_decode)
return d
def get_claim_metadata_for_sd_hash(self, sd_hash):
return self._get_claim_metadata_for_sd_hash(sd_hash)
def get_name_and_validity_for_sd_hash(self, sd_hash):
d = self._get_claim_metadata_for_sd_hash(sd_hash)
d.addCallback(lambda name_txid: self._get_status_of_claim(name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None)

View file

@ -1,25 +1,35 @@
from collections import defaultdict
import logging
from collections import defaultdict
from decimal import Decimal
from twisted.internet import defer
from twisted.python.failure import Failure
from zope.interface import implements
from lbrynet.core.Error import PriceDisagreementError, DownloadCanceledError, InsufficientFundsError
from lbrynet.core.Error import InvalidResponseError, RequestCanceledError, NoResponseError
from lbrynet.core.Error import ConnectionClosedBeforeResponseError
from lbrynet.core.Error import InvalidResponseError, RequestCanceledError, NoResponseError
from lbrynet.core.Error import PriceDisagreementError, DownloadCanceledError, InsufficientFundsError
from lbrynet.core.client.ClientRequest import ClientRequest, ClientBlobRequest
from lbrynet.interfaces import IRequestCreator
from lbrynet.core.Offer import Offer
log = logging.getLogger(__name__)
def get_points(num_bytes, rate):
return 1.0 * num_bytes * rate / 2**20
if isinstance(rate, float):
return 1.0 * num_bytes * rate / 2**20
elif isinstance(rate, Decimal):
return 1.0 * num_bytes * float(rate) / 2**20
else:
raise Exception("Unknown rate type")
def cache(fn):
"""Caches the function call for each instance"""
attr = '__{}_value'.format(fn.__name__)
def helper(self):
if not hasattr(self, attr):
value = fn(self)
@ -42,6 +52,7 @@ class BlobRequester(object):
self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}}
self._protocol_prices = {} # {ClientProtocol: price}
self._price_disagreements = [] # [Peer]
self._protocol_tries = {}
self._incompatible_peers = []
######## IRequestCreator #########
@ -65,9 +76,9 @@ class BlobRequester(object):
def _send_next_request(self, peer, protocol):
log.debug('Sending a blob request for %s and %s', peer, protocol)
availability = AvailabilityRequest(self, peer, protocol)
download = DownloadRequest(self, peer, protocol, self.wallet, self.payment_rate_manager)
price = PriceRequest(self, peer, protocol)
availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager)
download = DownloadRequest(self, peer, protocol, self.payment_rate_manager, self.wallet)
price = PriceRequest(self, peer, protocol, self.payment_rate_manager)
sent_request = False
if availability.can_make_request():
@ -161,10 +172,11 @@ class BlobRequester(object):
class RequestHelper(object):
def __init__(self, requestor, peer, protocol):
def __init__(self, requestor, peer, protocol, payment_rate_manager):
self.requestor = requestor
self.peer = peer
self.protocol = protocol
self.payment_rate_manager = payment_rate_manager
@property
def protocol_prices(self):
@ -197,10 +209,10 @@ class RequestHelper(object):
return
return reason
def get_and_save_rate_for_protocol(self):
def get_and_save_rate(self):
rate = self.protocol_prices.get(self.protocol)
if rate is None:
rate = self.requestor.payment_rate_manager.get_rate_blob_data(self.peer)
rate = self.payment_rate_manager.get_rate_blob_data(self.peer, self.available_blobs)
self.protocol_prices[self.protocol] = rate
return rate
@ -322,12 +334,59 @@ class AvailabilityRequest(RequestHelper):
self.unavailable_blobs.remove(blob_hash)
class PriceRequest(RequestHelper):
"""Ask a peer if a certain price is acceptable"""
def can_make_request(self):
return self.get_and_save_rate() is not None
def make_request_and_handle_response(self):
request = self._get_price_request()
self._handle_price_request(request)
def _get_price_request(self):
rate = self.get_and_save_rate()
if rate is None:
log.debug("No blobs to request from %s", self.peer)
raise Exception('Cannot make a price request without a payment rate')
log.debug("Offer rate %s to %s for %i blobs", rate, self.peer, len(self.available_blobs))
request_dict = {'blob_data_payment_rate': rate}
return ClientRequest(request_dict, 'blob_data_payment_rate')
def _handle_price_request(self, price_request):
d = self.protocol.add_request(price_request)
d.addCallback(self._handle_price_response, price_request)
d.addErrback(self._request_failed, "price request")
def _handle_price_response(self, response_dict, request):
assert request.response_identifier == 'blob_data_payment_rate'
if 'blob_data_payment_rate' not in response_dict:
return InvalidResponseError("response identifier not in response")
assert self.protocol in self.protocol_prices
rate = self.protocol_prices[self.protocol]
offer = Offer(rate)
offer.handle(response_dict['blob_data_payment_rate'])
self.payment_rate_manager.record_offer_reply(self.peer.host, offer)
if offer.is_accepted:
log.debug("Offered rate %f/mb accepted by %s", rate, str(self.peer.host))
return True
elif offer.is_too_low:
log.debug("Offered rate %f/mb rejected by %s", rate, str(self.peer.host))
del self.protocol_prices[self.protocol]
return True
else:
log.warning("Price disagreement")
del self.protocol_prices[self.protocol]
self.requestor._price_disagreements.append(self.peer)
return False
class DownloadRequest(RequestHelper):
"""Choose a blob and download it from a peer and also pay the peer for the data."""
def __init__(self, requestor, peer, protocol, wallet, payment_rate_manager):
RequestHelper.__init__(self, requestor, peer, protocol)
def __init__(self, requester, peer, protocol, payment_rate_manager, wallet):
RequestHelper.__init__(self, requester, peer, protocol, payment_rate_manager)
self.wallet = wallet
self.payment_rate_manager = payment_rate_manager
def can_make_request(self):
return self.get_blob_details()
@ -413,6 +472,9 @@ class DownloadRequest(RequestHelper):
def _pay_or_cancel_payment(self, arg, reserved_points, blob):
if self._can_pay_peer(blob, arg):
self._pay_peer(blob.length, reserved_points)
d = self.requestor.blob_manager.add_blob_to_download_history(str(blob),
str(self.peer.host),
float(self.protocol_prices[self.protocol]))
else:
self._cancel_points(reserved_points)
return arg
@ -425,7 +487,7 @@ class DownloadRequest(RequestHelper):
def _pay_peer(self, num_bytes, reserved_points):
assert num_bytes != 0
rate = self.get_and_save_rate_for_protocol()
rate = self.get_and_save_rate()
point_amount = get_points(num_bytes, rate)
self.wallet.send_points(reserved_points, point_amount)
self.payment_rate_manager.record_points_paid(point_amount)
@ -452,7 +514,7 @@ class DownloadRequest(RequestHelper):
# not yet been set for this protocol or for it to have been
# removed so instead I switched it to check if a rate has been set
# and calculate it if it has not
rate = self.get_and_save_rate_for_protocol()
rate = self.get_and_save_rate()
points_to_reserve = get_points(num_bytes, rate)
return self.wallet.reserve_points(self.peer, points_to_reserve)
@ -481,39 +543,4 @@ class BlobDownloadDetails(object):
def counting_write_func(self, data):
self.peer.update_stats('blob_bytes_downloaded', len(data))
return self.write_func(data)
class PriceRequest(RequestHelper):
"""Ask a peer if a certain price is acceptable"""
def can_make_request(self):
return self.get_and_save_rate_for_protocol() is not None
def make_request_and_handle_response(self):
request = self._get_price_request()
self._handle_price_request(request)
def _get_price_request(self):
rate = self.get_and_save_rate_for_protocol()
if rate is None:
raise Exception('Cannot make a price request without a payment rate')
request_dict = {'blob_data_payment_rate': rate}
return ClientRequest(request_dict, 'blob_data_payment_rate')
def _handle_price_request(self, price_request):
d = self.protocol.add_request(price_request)
d.addCallback(self._handle_price_response, price_request)
d.addErrback(self._request_failed, "price request")
def _handle_price_response(self, response_dict, request):
assert request.response_identifier == 'blob_data_payment_rate'
if 'blob_data_payment_rate' not in response_dict:
return InvalidResponseError("response identifier not in response")
assert self.protocol in self.protocol_prices
response = response_dict['blob_data_payment_rate']
if response == "RATE_ACCEPTED":
return True
else:
del self.protocol_prices[self.protocol]
self.requestor._price_disagreements.append(self.peer)
return True
return self.write_func(data)

View file

@ -1,5 +1,6 @@
import json
import logging
from decimal import Decimal
from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.python import failure
@ -14,6 +15,12 @@ from zope.interface import implements
log = logging.getLogger(__name__)
def encode_decimal(obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError(repr(obj) + " is not JSON serializable")
class ClientProtocol(Protocol):
implements(IRequestSender, IRateLimited)
@ -132,7 +139,7 @@ class ClientProtocol(Protocol):
def _send_request_message(self, request_msg):
# TODO: compare this message to the last one. If they're the same,
# TODO: incrementally delay this message.
m = json.dumps(request_msg)
m = json.dumps(request_msg, default=encode_decimal)
self.transport.write(m)
def _get_valid_response(self, response_msg):
@ -191,7 +198,9 @@ class ClientProtocol(Protocol):
for success, result in results:
if success is False:
failed = True
log.info("The connection is closing due to an error: %s", str(result.getTraceback()))
if not isinstance(result.value, DownloadCanceledError):
log.info(result.value)
log.info("The connection is closing due to an error: %s", str(result.getTraceback()))
if failed is False:
log.debug("Asking for another request.")
from twisted.internet import reactor
@ -215,7 +224,7 @@ class ClientProtocol(Protocol):
# TODO: always be this way. it's done this way now because the client has no other way
# TODO: of telling the server it wants the download to stop. It would be great if the
# TODO: protocol had such a mechanism.
log.info("Closing the connection to %s because the download of blob %s was canceled",
log.debug("Closing the connection to %s because the download of blob %s was canceled",
str(self.peer), str(self._blob_download_request.blob))
#self.transport.loseConnection()
#return True

View file

@ -73,12 +73,12 @@ def _log_decorator(fn):
def disable_third_party_loggers():
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('BitcoinRPC').setLevel(logging.INFO)
def disable_noisy_loggers():
logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO)
logging.getLogger('lbrynet.core.client').setLevel(logging.INFO)
logging.getLogger('lbrynet.core.server').setLevel(logging.INFO)
logging.getLogger('lbrynet.core').setLevel(logging.INFO)
logging.getLogger('lbrynet.dht').setLevel(logging.INFO)
logging.getLogger('lbrynet.lbrynet_daemon').setLevel(logging.INFO)
logging.getLogger('lbrynet.core.Wallet').setLevel(logging.INFO)

View file

@ -40,7 +40,7 @@ class BlobAvailabilityHandler(object):
def handle_queries(self, queries):
if self.query_identifiers[0] in queries:
log.debug("Received the client's list of requested blobs")
log.info("Received the client's list of requested blobs")
d = self._get_available_blobs(queries[self.query_identifiers[0]])
def set_field(available_blobs):

View file

@ -1,8 +1,11 @@
import logging
from twisted.internet import defer
from twisted.protocols.basic import FileSender
from twisted.python.failure import Failure
from zope.interface import implements
from lbrynet.core.Offer import Offer
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender
@ -34,18 +37,20 @@ class BlobRequestHandler(object):
implements(IQueryHandler, IBlobSender)
PAYMENT_RATE_QUERY = 'blob_data_payment_rate'
BLOB_QUERY = 'requested_blob'
AVAILABILITY_QUERY = 'requested_blobs'
def __init__(self, blob_manager, wallet, payment_rate_manager):
self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager
self.wallet = wallet
self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY]
self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY]
self.peer = None
self.blob_data_payment_rate = None
self.read_handle = None
self.currently_uploading = None
self.file_sender = None
self.blob_bytes_uploaded = 0
self._blobs_requested = []
######### IQueryHandler #########
@ -55,51 +60,19 @@ class BlobRequestHandler(object):
request_handler.register_blob_sender(self)
def handle_queries(self, queries):
response = {}
response = defer.succeed({})
log.debug("Handle query: %s", str(queries))
if self.AVAILABILITY_QUERY in queries:
self._blobs_requested = queries[self.AVAILABILITY_QUERY]
response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested))
if self.PAYMENT_RATE_QUERY in queries:
self._handle_payment_rate_query(response, queries[self.PAYMENT_RATE_QUERY])
offered_rate = queries[self.PAYMENT_RATE_QUERY]
offer = Offer(offered_rate)
response.addCallback(lambda r: self._handle_payment_rate_query(offer, r))
if self.BLOB_QUERY in queries:
return self._handle_blob_query(response, queries[self.BLOB_QUERY])
else:
return defer.succeed(response)
def _handle_payment_rate_query(self, response, query):
if not self.handle_blob_data_payment_rate(query):
response['blob_data_payment_rate'] = "RATE_TOO_LOW"
else:
response['blob_data_payment_rate'] = 'RATE_ACCEPTED'
def _handle_blob_query(self, response, query):
log.debug("Received the client's request to send a blob")
response['incoming_blob'] = {}
if self.blob_data_payment_rate is None:
response['incoming_blob']['error'] = "RATE_UNSET"
return defer.succeed(response)
else:
return self._send_blob(response, query)
def _send_blob(self, response, query):
d = self.blob_manager.get_blob(query, True)
d.addCallback(self.open_blob_for_reading, response)
return d
def open_blob_for_reading(self, blob, response):
def failure(msg):
log.warning("We can not send %s: %s", blob, msg)
response['incoming_blob']['error'] = "BLOB_UNAVAILABLE"
return response
if not blob.is_validated():
return failure("blob can't be validated")
read_handle = blob.open_for_reading()
if read_handle is None:
return failure("blob can't be opened")
self.currently_uploading = blob
self.read_handle = read_handle
log.info("Sending %s to client", blob)
response['incoming_blob']['blob_hash'] = blob.blob_hash
response['incoming_blob']['length'] = blob.length
incoming = queries[self.BLOB_QUERY]
response.addCallback(lambda r: self._reply_to_send_request(r, incoming))
return response
######### IBlobSender #########
@ -118,12 +91,91 @@ class BlobRequestHandler(object):
######### internal #########
def handle_blob_data_payment_rate(self, requested_payment_rate):
if not self.payment_rate_manager.accept_rate_blob_data(self.peer, requested_payment_rate):
return False
def _reply_to_availability(self, request, blobs):
d = self._get_available_blobs(blobs)
def set_available(available_blobs):
log.debug("available blobs: %s", str(available_blobs))
request.update({'available_blobs': available_blobs})
return request
d.addCallback(set_available)
return d
def _handle_payment_rate_query(self, offer, request):
blobs = self._blobs_requested
log.debug("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs))
reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer)
if reply.is_accepted:
self.blob_data_payment_rate = offer.rate
request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED"
log.debug("Accepted rate: %f", offer.rate)
elif reply.is_too_low:
request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW"
log.debug("Reject rate: %f", offer.rate)
elif reply.is_unset:
log.warning("Rate unset")
request['incoming_blob'] = {'error': 'RATE_UNSET'}
log.debug("Returning rate query result: %s", str(request))
return request
def _handle_blob_query(self, response, query):
log.debug("Received the client's request to send a blob")
response['incoming_blob'] = {}
if self.blob_data_payment_rate is None:
response['incoming_blob'] = {'error': "RATE_UNSET"}
return response
else:
self.blob_data_payment_rate = requested_payment_rate
return True
return self._send_blob(response, query)
def _send_blob(self, response, query):
d = self.blob_manager.get_blob(query, True)
d.addCallback(self.open_blob_for_reading, response)
return d
def open_blob_for_reading(self, blob, response):
response_fields = {}
d = defer.succeed(None)
if blob.is_validated():
read_handle = blob.open_for_reading()
if read_handle is not None:
self.currently_uploading = blob
self.read_handle = read_handle
log.info("Sending %s to client", str(blob))
response_fields['blob_hash'] = blob.blob_hash
response_fields['length'] = blob.length
response['incoming_blob'] = response_fields
d.addCallback(lambda _: self.record_transaction(blob))
d.addCallback(lambda _: response)
return d
log.debug("We can not send %s", str(blob))
response['incoming_blob'] = {'error': 'BLOB_UNAVAILABLE'}
d.addCallback(lambda _: response)
return d
def record_transaction(self, blob):
d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, self.blob_data_payment_rate)
return d
def _reply_to_send_request(self, response, incoming):
response_fields = {}
response['incoming_blob'] = response_fields
if self.blob_data_payment_rate is None:
log.debug("Rate not set yet")
response['incoming_blob'] = {'error': 'RATE_UNSET'}
return defer.succeed(response)
else:
log.debug("Requested blob: %s", str(incoming))
d = self.blob_manager.get_blob(incoming, True)
d.addCallback(lambda blob: self.open_blob_for_reading(blob, response))
return d
def _get_available_blobs(self, requested_blobs):
d = self.blob_manager.completed_blobs(requested_blobs)
return d
def send_file(self, consumer):
@ -165,6 +217,6 @@ class BlobRequestHandler(object):
self.currently_uploading = None
self.file_sender = None
if reason is not None and isinstance(reason, Failure):
log.info("Upload has failed. Reason: %s", reason.getErrorMessage())
log.warning("Upload has failed. Reason: %s", reason.getErrorMessage())
return _send_file()

View file

@ -4,6 +4,7 @@ import random
import os
import json
import yaml
import datetime
from lbrynet.core.cryptoutils import get_lbry_hash_obj
@ -76,4 +77,8 @@ def save_settings(path, settings):
assert encoder is not False, "Unknown settings format .%s" % ext
f = open(path, 'w')
f.write(encoder(settings))
f.close()
f.close()
def today():
return datetime.datetime.today()

View file

@ -48,7 +48,7 @@ class CryptStreamDownloader(object):
@param blob_manager: A BlobManager object
@param payment_rate_manager: A PaymentRateManager object
@param payment_rate_manager: A NegotiatedPaymentRateManager object
@param wallet: An object which implements the IWallet interface

View file

@ -647,4 +647,60 @@ class IWallet(Interface):
@type amount: float
@return: None
"""
class IBlobPriceModel(Interface):
"""
A blob price model
Used by INegotiationStrategy classes
"""
def calculate_price(self, blob):
"""
Calculate the price for a blob
@param blob: a blob hash
@type blob: str
@return: blob price target
@type: Decimal
"""
class INegotiationStrategy(Interface):
"""
Strategy to negotiate download payment rates
"""
def make_offer(self, peer, blobs):
"""
Make a rate offer for the given peer and blobs
@param peer: peer to make an offer to
@type: str
@param blobs: blob hashes to make an offer for
@type: list
@return: rate offer
@rtype: Offer
"""
def respond_to_offer(self, offer, peer, blobs):
"""
Respond to a rate offer given by a peer
@param offer: offer to reply to
@type: Offer
@param peer: peer to make an offer to
@type: str
@param blobs: blob hashes to make an offer for
@type: list
@return: accepted, rejected, or unset offer
@rtype: Offer
"""

View file

@ -14,16 +14,16 @@ class EncryptedFileOptions(object):
prm = payment_rate_manager
def get_default_data_rate_description():
if prm.min_blob_data_payment_rate is None:
if prm.base.min_blob_data_payment_rate is None:
return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)
else:
return "%f LBC/MB" % prm.min_blob_data_payment_rate
return "%f LBC/MB" % prm.base.min_blob_data_payment_rate
rate_choices = []
rate_choices.append(DownloadOptionChoice(prm.min_blob_data_payment_rate,
rate_choices.append(DownloadOptionChoice(prm.base.min_blob_data_payment_rate,
"No change - %s" % get_default_data_rate_description(),
"No change - %s" % get_default_data_rate_description()))
if prm.min_blob_data_payment_rate is not None:
if prm.base.min_blob_data_payment_rate is not None:
rate_choices.append(DownloadOptionChoice(None,
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate),
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)))
@ -36,7 +36,7 @@ class EncryptedFileOptions(object):
rate_choices,
"Rate which will be paid for data",
"data payment rate",
prm.min_blob_data_payment_rate,
prm.base.min_blob_data_payment_rate,
get_default_data_rate_description()
),
DownloadOption(

View file

@ -41,7 +41,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
def _save_sd_hash(sd_hash):
if len(sd_hash):
self.sd_hash = sd_hash[0]
d = self.wallet._get_claim_metadata_for_sd_hash(self.sd_hash)
d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
else:
d = defer.succeed(None)
@ -122,13 +122,12 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
def _start(self):
d = EncryptedFileSaver._start(self)
d.addCallback(lambda _: self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash))
d.addCallback(lambda _: self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash))
def _save_sd_hash(sd_hash):
if len(sd_hash):
self.sd_hash = sd_hash[0]
d = self.wallet._get_claim_metadata_for_sd_hash(self.sd_hash)
d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
else:
d = defer.succeed(None)

View file

@ -9,10 +9,10 @@ from twisted.enterprise import adbapi
from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError, CurrentlyStoppingError
from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -74,7 +74,8 @@ class EncryptedFileManager(object):
def _start_lbry_files(self):
def set_options_and_restore(rowid, stream_hash, options):
payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
payment_rate_manager = NegotiatedPaymentRateManager(self.session.base_payment_rate_manager,
self.session.blob_tracker)
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
d.addCallback(lambda downloader: downloader.restore())

View file

@ -19,11 +19,18 @@ class LiveStreamPaymentRateManager(object):
def accept_rate_live_blob_info(self, peer, payment_rate):
return payment_rate >= self.get_effective_min_live_blob_info_payment_rate()
def get_rate_blob_data(self, peer):
return self.get_effective_min_blob_data_payment_rate()
def get_rate_blob_data(self, peer, blobs):
response = self._payment_rate_manager.strategy.make_offer(peer, blobs)
return response.rate
def accept_rate_blob_data(self, peer, payment_rate):
return payment_rate >= self.get_effective_min_blob_data_payment_rate()
def accept_rate_blob_data(self, peer, blobs, offer):
response = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs)
return response.accepted
def reply_to_offer(self, peer, blobs, offer):
reply = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs)
self._payment_rate_manager.strategy.offer_accepted(peer, reply)
return reply
def get_effective_min_blob_data_payment_rate(self):
rate = self.min_blob_data_payment_rate
@ -42,4 +49,4 @@ class LiveStreamPaymentRateManager(object):
return rate
def record_points_paid(self, amount):
self.points_paid += amount
self.points_paid += amount

View file

@ -144,7 +144,7 @@ class FullLiveStreamDownloaderFactory(object):
def make_downloader(self, metadata, options, payment_rate_manager):
# TODO: check options for payment rate manager parameters
payment_rate_manager = LiveStreamPaymentRateManager(self.default_payment_rate_manager,
prm = LiveStreamPaymentRateManager(self.default_payment_rate_manager,
payment_rate_manager)
def save_source_if_blob(stream_hash):
@ -161,7 +161,7 @@ class FullLiveStreamDownloaderFactory(object):
def create_downloader(stream_hash):
stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter,
self.blob_manager, self.stream_info_manager,
payment_rate_manager, self.wallet, True)
prm, self.wallet, True)
# TODO: change upload_allowed=True above to something better
d = stream_downloader.set_stream_info()
d.addCallback(lambda _: stream_downloader)

View file

@ -20,7 +20,6 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, API_CONNECTION_STRING # ,
from lbrynet.core.utils import generate_id
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.PTCWallet import PTCWallet
@ -363,19 +362,14 @@ class Console():
def _setup_query_handlers(self):
handlers = [
#CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet,
# self._server_payment_rate_manager),
BlobAvailabilityHandlerFactory(self.session.blob_manager),
#BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
# self._server_payment_rate_manager),
self.session.wallet.get_wallet_info_query_handler_factory(),
]
def get_blob_request_handler_factory(rate):
self.blob_request_payment_rate_manager = PaymentRateManager(
self.session.base_payment_rate_manager, rate
)
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager,
rate)
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager,
self.session.wallet,
self.blob_request_payment_rate_manager))
d1 = self.settings.get_server_data_payment_rate()

View file

@ -644,7 +644,7 @@ class AddStream(CommandHandler):
for option, option_value in zip(self.download_options, self.options_chosen):
if option.short_description == "data payment rate":
if option_value == None:
rate = self.payment_rate_manager.get_effective_min_blob_data_payment_rate()
rate = 0.0
else:
rate = option_value
stream_size = None

View file

@ -27,8 +27,6 @@ from txjsonrpc.web.jsonrpc import Handler
from lbrynet import __version__ as lbrynet_version
from lbryum.version import LBRYUM_VERSION as lbryum_version
from lbrynet import analytics
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError
@ -546,7 +544,6 @@ class Daemon(jsonrpc.JSONRPC):
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self._setup_stream_identifier())
d.addCallback(lambda _: self._setup_lbry_file_manager())
d.addCallback(lambda _: self._setup_lbry_file_opener())
d.addCallback(lambda _: self._setup_query_handlers())
d.addCallback(lambda _: self._setup_server())
d.addCallback(lambda _: _log_starting_vals())
@ -679,7 +676,6 @@ class Daemon(jsonrpc.JSONRPC):
# TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY.
def _start_file(f):
d = self.lbry_file_manager.toggle_lbry_file_running(f)
d.addCallback(lambda _: self.lighthouse_client.announce_sd(f.sd_hash))
return defer.succeed("Started LBRY file")
def _get_and_start_file(name):
@ -778,20 +774,13 @@ class Daemon(jsonrpc.JSONRPC):
def _setup_query_handlers(self):
handlers = [
# CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet,
# self._server_payment_rate_manager),
BlobAvailabilityHandlerFactory(self.session.blob_manager),
# BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
# self._server_payment_rate_manager),
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
self.session.payment_rate_manager),
self.session.wallet.get_wallet_info_query_handler_factory(),
]
def get_blob_request_handler_factory(rate):
self.blob_request_payment_rate_manager = PaymentRateManager(
self.session.base_payment_rate_manager, rate
)
handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
self.blob_request_payment_rate_manager))
self.blob_request_payment_rate_manager = self.session.payment_rate_manager
d1 = self.settings.get_server_data_payment_rate()
d1.addCallback(get_blob_request_handler_factory)
@ -1104,14 +1093,6 @@ class Daemon(jsonrpc.JSONRPC):
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_opener_factory)
return defer.succeed(None)
def _setup_lbry_file_opener(self):
downloader_factory = EncryptedFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter,
self.session.blob_manager, self.stream_info_manager,
self.session.wallet)
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, downloader_factory)
return defer.succeed(True)
def _download_sd_blob(self, sd_hash, timeout=DEFAULT_SD_DOWNLOAD_TIMEOUT):
def cb(result):
if not r.called:
@ -1123,7 +1104,7 @@ class Daemon(jsonrpc.JSONRPC):
r = defer.Deferred(None)
reactor.callLater(timeout, eb)
d = download_sd_blob(self.session, sd_hash, PaymentRateManager(self.session.base_payment_rate_manager))
d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager)
d.addCallback(BlobStreamDescriptorReader)
d.addCallback(lambda blob: blob.get_info())
d.addCallback(cb)
@ -1906,13 +1887,8 @@ class Daemon(jsonrpc.JSONRPC):
"""
name = p['name']
force = p.get('force', False)
if force:
d = self._get_est_cost(name)
else:
d = self._search(name)
d.addCallback(lambda r: [i['cost'] for i in r][0])
d = self._get_est_cost(name)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
@ -2587,6 +2563,53 @@ class Daemon(jsonrpc.JSONRPC):
d = self._render_response(SEARCH_SERVERS, OK_CODE)
return d
def jsonrpc_get_mean_availability(self):
"""
Get mean blob availability
Args:
None
Returns:
Mean peers for a blob
"""
d = self._render_response(self.session.blob_tracker.last_mean_availability, OK_CODE)
return d
def jsonrpc_get_availability(self, p):
"""
Get stream availability for a winning claim
Arg:
name (str): lbry uri
Returns:
peers per blob / total blobs
"""
def _get_mean(blob_availabilities):
peer_counts = []
for blob_availability in blob_availabilities:
for blob, peers in blob_availability.iteritems():
peer_counts.append(peers)
if peer_counts:
return round(1.0 * sum(peer_counts) / len(peer_counts), 2)
else:
return 0.0
name = p['name']
d = self._resolve_name(name, force_refresh=True)
d.addCallback(get_sd_hash)
d.addCallback(self._download_sd_blob)
d.addCallbacks(lambda descriptor: [blob.get('blob_hash') for blob in descriptor['blobs']],
lambda _: [])
d.addCallback(self.session.blob_tracker.get_availability_for_blobs)
d.addCallback(_get_mean)
d.addCallback(lambda result: self._render_response(result, OK_CODE))
return d
def get_lbrynet_version_from_github():
"""Return the latest released version from github."""

View file

@ -9,7 +9,6 @@ from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.metadata.Fee import FeeValidator
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
@ -48,11 +47,10 @@ class GetStream(object):
self.description = None
self.fee = None
self.data_rate = data_rate
self.name = None
self.file_name = file_name
self.session = session
self.exchange_rate_manager = exchange_rate_manager
self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
self.payment_rate_manager = self.session.payment_rate_manager
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.stream_hash = None
@ -148,7 +146,7 @@ class GetStream(object):
return self.finished
def _start_download(self, downloader):
log.info('Starting download for %s', self.name)
log.info('Starting download for %s', self.resolved_name)
self.downloader = downloader
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)

View file

@ -37,24 +37,34 @@ class MarketFeed(object):
self._updater = LoopingCall(self._update_price)
def _make_request(self):
r = requests.get(self.url, self.params)
return r.text
try:
r = requests.get(self.url, self.params)
return defer.succeed(r.text)
except Exception as err:
log.error(err)
return defer.fail(err)
def _handle_response(self, response):
return NotImplementedError
def _subtract_fee(self, from_amount):
# increase amount to account for market fees
return defer.succeed(from_amount / (1.0 - self.fee))
def _save_price(self, price):
log.debug("Saving price update %f for %s" % (price, self.market))
self.rate = ExchangeRate(self.market, price, int(time.time()))
def _log_error(self, err):
log.error(err)
log.warning("There was a problem updating %s exchange rate information from %s", self.market, self.name)
def _update_price(self):
d = defer.succeed(self._make_request())
d = self._make_request()
d.addCallback(self._handle_response)
d.addCallback(self._subtract_fee)
d.addCallback(self._save_price)
d.addErrback(self._log_error)
def start(self):
if not self._updater.running:
@ -94,7 +104,11 @@ class GoogleBTCFeed(MarketFeed):
)
def _make_request(self):
return googlefinance.getQuotes('CURRENCY:USDBTC')[0]
try:
r = googlefinance.getQuotes('CURRENCY:USDBTC')[0]
return defer.succeed(r)
except Exception as err:
return defer.fail(err)
def _handle_response(self, response):
return float(response['LastTradePrice'])

View file

@ -9,7 +9,6 @@ from appdirs import user_data_dir
from lbrynet.core.Error import InsufficientFundsError
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.metadata.Metadata import Metadata
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet import reflector
@ -102,7 +101,7 @@ class Publisher(object):
def add_to_lbry_files(self, stream_hash):
self.stream_hash = stream_hash
prm = PaymentRateManager(self.session.base_payment_rate_manager)
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(self.set_lbry_file)
return d

View file

@ -13,13 +13,10 @@ from Crypto.Hash import MD5
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from lbrynet.conf import MIN_BLOB_INFO_PAYMENT_RATE
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
from lbrynet.lbrylive.PaymentRateManager import BaseLiveStreamPaymentRateManager
from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, DBEncryptedFileMetadataManager
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger
from lbrynet.core.Session import Session
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
@ -29,22 +26,19 @@ from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.StreamDescriptor import get_sd_info
from twisted.internet import defer, threads, task, error
from twisted.internet import defer, threads, task
from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure
import os
from lbrynet.dht.node import Node
from tests.mocks import DummyBlobAvailabilityTracker
from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier
from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier
from lbrynet.core.BlobManager import TempBlobManager
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory
from lbrynet.reflector.server.server import ReflectorServerFactory
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
log_format = "%(funcName)s(): %(message)s"
@ -106,6 +100,9 @@ class FakeWallet(object):
def set_public_key_for_peer(self, peer, public_key):
pass
def get_claim_metadata_for_sd_hash(self, sd_hash):
return "fakeuri", "faketxid"
class FakePeerFinder(object):
def __init__(self, start_port, peer_manager, num_peers):
@ -212,16 +209,12 @@ test_create_stream_sd_file = {
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor
@ -239,12 +232,14 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
rate_limiter = RateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "server"
os.mkdir(db_dir)
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node)
stream_info_manager = TempEncryptedFileMetadataManager()
@ -274,9 +269,8 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
server_port = None
query_handler_factories = {
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
PaymentRateManager(session.base_payment_rate_manager)): True,
session.payment_rate_manager): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
@ -322,20 +316,18 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
sd_hash_queue.put(sd_hash)
reactor.callLater(1, start_all)
reactor.run()
if not reactor.running:
reactor.run()
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor
@ -362,7 +354,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n),
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
stream_info_manager = TempEncryptedFileMetadataManager()
@ -379,7 +371,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file():
prm = PaymentRateManager(session.base_payment_rate_manager)
prm = session.payment_rate_manager
d = download_sd_blob(session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
@ -402,9 +394,8 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
server_port = None
query_handler_factories = {
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
PaymentRateManager(session.base_payment_rate_manager)): True,
session.payment_rate_manager): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
@ -438,21 +429,18 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
d = task.deferLater(reactor, 1.0, start_transfer)
d.addCallback(lambda _: start_server())
reactor.run()
if not reactor.running:
reactor.run()
def start_live_server(sd_hash_queue, kill_event, dead_event):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor
@ -470,18 +458,14 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "server"
os.mkdir(db_dir)
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
base_payment_rate_manager = BaseLiveStreamPaymentRateManager(MIN_BLOB_INFO_PAYMENT_RATE)
data_payment_rate_manager = PaymentRateManager(session.base_payment_rate_manager)
payment_rate_manager = LiveStreamPaymentRateManager(base_payment_rate_manager,
data_payment_rate_manager)
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker)
stream_info_manager = DBLiveStreamMetadataManager(session.db_dir, hash_announcer)
logging.debug("Created the session")
@ -492,10 +476,9 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
logging.debug("Starting the server protocol")
query_handler_factories = {
CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet,
payment_rate_manager): True,
BlobAvailabilityHandlerFactory(session.blob_manager): True,
session.payment_rate_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
payment_rate_manager): True,
session.payment_rate_manager): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
@ -563,12 +546,9 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
return d
def enable_live_stream():
base_live_stream_payment_rate_manager = BaseLiveStreamPaymentRateManager(
MIN_BLOB_INFO_PAYMENT_RATE
)
add_live_stream_to_sd_identifier(sd_identifier, base_live_stream_payment_rate_manager)
add_live_stream_to_sd_identifier(sd_identifier, session.base_payment_rate_manager)
add_full_live_stream_downloader_to_sd_identifier(session, stream_info_manager, sd_identifier,
base_live_stream_payment_rate_manager)
session.base_payment_rate_manager)
def run_server():
d = session.setup()
@ -581,20 +561,18 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
return d
reactor.callLater(1, run_server)
reactor.run()
if not reactor.running:
reactor.run()
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor
@ -621,7 +599,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
if slow is True:
session.rate_limiter.set_ul_limit(2**11)
@ -643,9 +621,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
server_port = None
query_handler_factories = {
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
PaymentRateManager(session.base_payment_rate_manager)): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
@ -686,7 +662,8 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
logging.debug("blob hash has been added to the queue")
reactor.callLater(1, start_all)
reactor.run()
if not reactor.running:
reactor.run()
class TestTransfer(TestCase):
@ -768,7 +745,6 @@ class TestTransfer(TestCase):
return d
@unittest.skip("Sadly skipping failing test instead of fixing it")
def test_lbry_transfer(self):
sd_hash_queue = Queue()
kill_event = Event()
@ -786,6 +762,7 @@ class TestTransfer(TestCase):
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
@ -794,7 +771,8 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node)
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -808,7 +786,7 @@ class TestTransfer(TestCase):
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
prm = self.session.payment_rate_manager
d = download_sd_blob(self.session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
@ -855,7 +833,7 @@ class TestTransfer(TestCase):
return d
@require_system('Linux')
@unittest.skip("Sadly skipping failing test instead of fixing it")
def test_live_transfer(self):
sd_hash_queue = Queue()
@ -878,7 +856,8 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node)
self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer)
@ -893,12 +872,10 @@ class TestTransfer(TestCase):
def start_lbry_file(lbry_file):
lbry_file = lbry_file
logging.debug("Calling lbry_file.start()")
return lbry_file.start()
def download_stream(sd_blob_hash):
logging.debug("Downloaded the sd blob. Reading it now")
prm = PaymentRateManager(self.session.base_payment_rate_manager)
prm = self.session.payment_rate_manager
d = download_sd_blob(self.session, sd_blob_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(create_downloader, prm)
@ -907,20 +884,17 @@ class TestTransfer(TestCase):
def do_download(sd_blob_hash):
logging.debug("Starting the download")
d = self.session.setup()
d.addCallback(lambda _: enable_live_stream())
d.addCallback(lambda _: download_stream(sd_blob_hash))
return d
def enable_live_stream():
base_live_stream_payment_rate_manager = BaseLiveStreamPaymentRateManager(
MIN_BLOB_INFO_PAYMENT_RATE
)
add_live_stream_to_sd_identifier(sd_identifier,
base_live_stream_payment_rate_manager)
add_live_stream_to_sd_identifier(sd_identifier, self.session.payment_rate_manager)
add_full_live_stream_downloader_to_sd_identifier(self.session, self.stream_info_manager,
sd_identifier,
base_live_stream_payment_rate_manager)
self.session.payment_rate_manager)
d.addCallback(do_download)
@ -951,7 +925,6 @@ class TestTransfer(TestCase):
d.addBoth(stop)
return d
@require_system('Linux')
def test_last_blob_retrieval(self):
kill_event = Event()
@ -976,6 +949,7 @@ class TestTransfer(TestCase):
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
@ -984,7 +958,7 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
@ -997,7 +971,7 @@ class TestTransfer(TestCase):
d.addCallback(get_blob_hash)
def download_blob(blob_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
prm = self.session.payment_rate_manager
downloader = StandaloneBlobDownloader(blob_hash, self.session.blob_manager, peer_finder,
rate_limiter, prm, wallet)
d = downloader.download()
@ -1036,7 +1010,6 @@ class TestTransfer(TestCase):
return d
@unittest.skip("Sadly skipping failing test instead of fixing it")
def test_double_download(self):
sd_hash_queue = Queue()
kill_event = Event()
@ -1054,6 +1027,7 @@ class TestTransfer(TestCase):
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
downloaders = []
db_dir = "client"
@ -1064,10 +1038,9 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, use_upnp=False,
rate_limiter=rate_limiter, wallet=wallet)
rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
def make_downloader(metadata, prm):
@ -1082,7 +1055,7 @@ class TestTransfer(TestCase):
return downloader
def download_file(sd_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
prm = self.session.payment_rate_manager
d = download_sd_blob(self.session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
@ -1115,7 +1088,6 @@ class TestTransfer(TestCase):
return d
def start_transfer(sd_hash):
logging.debug("Starting the transfer")
d = self.session.setup()
@ -1172,6 +1144,7 @@ class TestTransfer(TestCase):
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
@ -1180,7 +1153,7 @@ class TestTransfer(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -1205,7 +1178,7 @@ class TestTransfer(TestCase):
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
prm = self.session.payment_rate_manager
d = download_sd_blob(self.session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
@ -1290,6 +1263,7 @@ class TestStreamify(TestCase):
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
@ -1298,7 +1272,7 @@ class TestStreamify(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -1342,6 +1316,7 @@ class TestStreamify(TestCase):
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
@ -1350,7 +1325,7 @@ class TestStreamify(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
@ -1363,7 +1338,7 @@ class TestStreamify(TestCase):
def combine_stream(stream_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(start_lbry_file)

View file

@ -12,6 +12,7 @@ from lbrynet.core import PeerManager
from lbrynet.core import RateLimiter
from lbrynet.core import Session
from lbrynet.core import StreamDescriptor
from lbrynet.dht.node import Node
from lbrynet.lbryfile import EncryptedFileMetadataManager
from lbrynet.lbryfile.client import EncryptedFileOptions
from lbrynet.lbryfilemanager import EncryptedFileCreator
@ -91,7 +92,9 @@ class TestReflector(unittest.TestCase):
peer_port=5553,
use_upnp=False,
rate_limiter=rate_limiter,
wallet=wallet
wallet=wallet,
blob_tracker_class=mocks.DummyBlobAvailabilityTracker,
dht_node_class=Node
)
self.stream_info_manager = EncryptedFileMetadataManager.TempEncryptedFileMetadataManager()

View file

@ -1,9 +1,11 @@
import io
from Crypto.PublicKey import RSA
from decimal import Decimal
from twisted.internet import defer, threads, task, error
from lbrynet.core import PTCWallet
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker
class Node(object):
@ -134,6 +136,43 @@ class GenFile(io.RawIOBase):
return output
class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
"""
Class to track peer counts for known blobs, and to discover new popular blobs
Attributes:
availability (dict): dictionary of peers for known blobs
"""
def __init__(self, blob_manager=None, peer_finder=None, dht_node=None):
self.availability = {
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'],
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'],
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4'],
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
}
self.last_mean_availability = Decimal(0.0)
self._blob_manager = None
self._peer_finder = PeerFinder(11223, 11224, 2)
self._dht_node = None
self._check_popular = None
self._check_mine = None
self._get_mean_peers()
def start(self):
pass
def stop(self):
pass
create_stream_sd_file = {
'stream_name': '746573745f66696c65',
'blobs': [

View file

@ -1,24 +1,24 @@
import StringIO
import mock
from twisted.internet import defer, protocol
from twisted.internet import defer
from twisted.test import proto_helpers
from twisted.trial import unittest
from lbrynet.core import Peer
from lbrynet.core.server import BlobRequestHandler
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from tests.mocks import DummyBlobAvailabilityTracker
class TestBlobRequestHandlerQueries(unittest.TestCase):
def setUp(self):
self.blob_manager = mock.Mock()
self.payment_rate_manager = mock.Mock()
self.handler = BlobRequestHandler.BlobRequestHandler(
self.blob_manager, None, self.payment_rate_manager)
self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker())
self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager)
def test_empty_response_when_empty_query(self):
self.assertEqual(
{}, self.successResultOf(self.handler.handle_queries({})))
self.assertEqual({}, self.successResultOf(self.handler.handle_queries({})))
def test_error_set_when_rate_is_missing(self):
query = {'requested_blob': 'blob'}
@ -27,9 +27,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred))
def test_error_set_when_rate_too_low(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = False
query = {
'blob_data_payment_rate': 'way_too_low',
'blob_data_payment_rate': '-1.0',
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
@ -40,9 +39,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred))
def test_response_when_rate_too_low(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = False
query = {
'blob_data_payment_rate': 'way_too_low',
'blob_data_payment_rate': '-1.0',
}
deferred = self.handler.handle_queries(query)
response = {
@ -51,12 +49,11 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred))
def test_blob_unavailable_when_blob_not_validated(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = True
blob = mock.Mock()
blob.is_validated.return_value = False
self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = {
'blob_data_payment_rate': 'rate',
'blob_data_payment_rate': 1.0,
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
@ -67,13 +64,12 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred))
def test_blob_unavailable_when_blob_cannot_be_opened(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = True
blob = mock.Mock()
blob.is_validated.return_value = True
blob.open_for_reading.return_value = None
self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = {
'blob_data_payment_rate': 'rate',
'blob_data_payment_rate': 0.0,
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
@ -84,15 +80,17 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred))
def test_blob_details_are_set_when_all_conditions_are_met(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = True
blob = mock.Mock()
blob.is_validated.return_value = True
blob.open_for_reading.return_value = True
blob.blob_hash = 'DEADBEEF'
blob.length = 42
peer = mock.Mock()
peer.host = "1.2.3.4"
self.handler.peer = peer
self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = {
'blob_data_payment_rate': 'rate',
'blob_data_payment_rate': 1.0,
'requested_blob': 'blob'
}
deferred = self.handler.handle_queries(query)
@ -103,7 +101,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
'length': 42
}
}
self.assertEqual(response, self.successResultOf(deferred))
result = self.successResultOf(deferred)
self.assertEqual(response, result)
class TestBlobRequestHandlerSender(unittest.TestCase):

View file

@ -0,0 +1,125 @@
import itertools
from twisted.trial import unittest
import random
import mock
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy
from lbrynet.core.Offer import Offer
from tests.mocks import DummyBlobAvailabilityTracker
MAX_NEGOTIATION_TURNS = 10
random.seed(12345)
def get_random_sample(list_to_sample):
result = list_to_sample[random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))]
if not result:
return get_random_sample(list_to_sample)
return result
def calculate_negotation_turns(client_base, host_base, host_is_generous=True, client_is_generous=True):
blobs = [
'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd',
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0',
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c',
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7',
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07',
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd',
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6',
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
]
host = mock.Mock()
host.host = "1.2.3.4"
client = mock.Mock()
client.host = "1.2.3.5"
client_base_prm = BasePaymentRateManager(client_base)
client_prm = NegotiatedPaymentRateManager(client_base_prm,
DummyBlobAvailabilityTracker(),
generous=client_is_generous)
host_base_prm = BasePaymentRateManager(host_base)
host_prm = NegotiatedPaymentRateManager(host_base_prm,
DummyBlobAvailabilityTracker(),
generous=host_is_generous)
blobs_to_query = get_random_sample(blobs)
accepted = False
turns = 0
while not accepted:
rate = client_prm.get_rate_blob_data(host, blobs_to_query)
offer = Offer(rate)
accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer)
turns += 1
return turns
class AvailabilityWeightedStrategyTests(unittest.TestCase):
def test_first_offer_is_zero_and_second_is_not_if_offer_not_accepted(self):
strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
peer = "1.1.1.1"
blobs = strategy.price_model.blob_tracker.availability.keys()
offer1 = strategy.make_offer(peer, blobs)
offer2 = strategy.make_offer(peer, blobs)
self.assertEquals(offer1.rate, 0.0)
self.assertNotEqual(offer2.rate, 0.0)
def test_accept_zero_and_persist_if_accepted(self):
host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
client = "1.1.1.1"
host = "1.1.1.2"
blobs = host_strategy.price_model.blob_tracker.availability.keys()
offer = client_strategy.make_offer(host, blobs)
response1 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.update_accepted_offers(host, response1)
offer = client_strategy.make_offer(host, blobs)
response2 = host_strategy.respond_to_offer(offer, client, blobs)
client_strategy.update_accepted_offers(host, response2)
self.assertEquals(response1.is_too_low, False)
self.assertEquals(response1.is_accepted, True)
self.assertEquals(response1.rate, 0.0)
self.assertEquals(response2.is_too_low, False)
self.assertEquals(response2.is_accepted, True)
self.assertEquals(response2.rate, 0.0)
def test_how_many_turns_before_accept_with_similar_rate_settings(self):
base_rates = [0.0001 * n for n in range(1, 10)]
for host_base, client_base in itertools.product(base_rates, base_rates):
turns = calculate_negotation_turns(host_base,
client_base,
client_is_generous=False,
host_is_generous=False)
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
def test_generous_connects_in_one_turn(self):
base_rates = [0.0001 * n for n in range(1, 10)]
for host_base, client_base in itertools.product(base_rates, base_rates):
turns = calculate_negotation_turns(host_base, client_base)
self.assertEqual(1, turns)
def test_how_many_turns_with_generous_client(self):
base_rates = [0.0001 * n for n in range(1, 10)]
for host_base, client_base in itertools.product(base_rates, base_rates):
turns = calculate_negotation_turns(host_base,
client_base,
host_is_generous=False)
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)
def test_how_many_turns_with_generous_host(self):
base_rates = [0.0001 * n for n in range(1, 10)]
for host_base, client_base in itertools.product(base_rates, base_rates):
turns = calculate_negotation_turns(host_base,
client_base,
client_is_generous=False)
self.assertGreater(MAX_NEGOTIATION_TURNS, turns)