diff --git a/lbrynet/conf.py b/lbrynet/conf.py index cc040be6f..785d3d42f 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -14,7 +14,7 @@ MAX_BLOB_INFOS_TO_REQUEST = 20 BLOBFILES_DIR = ".blobfiles" BLOB_SIZE = 2**21 -MIN_BLOB_DATA_PAYMENT_RATE = .005 # points/megabyte +MIN_BLOB_DATA_PAYMENT_RATE = .0001 # points/megabyte MIN_BLOB_INFO_PAYMENT_RATE = .02 # points/1000 infos MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE = .05 # points/1000 infos MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE = .05 # points/1000 infos @@ -28,12 +28,10 @@ KNOWN_DHT_NODES = [('104.236.42.182', 4000), POINTTRADER_SERVER = 'http://ec2-54-187-192-68.us-west-2.compute.amazonaws.com:2424' #POINTTRADER_SERVER = 'http://127.0.0.1:2424' -if IS_DEVELOPMENT_VERSION: - SEARCH_SERVERS = ["http://107.170.207.64:50005"] -else: - SEARCH_SERVERS = ["http://lighthouse1.lbry.io:50005", - "http://lighthouse2.lbry.io:50005", - "http://lighthouse3.lbry.io:50005"] + +SEARCH_SERVERS = ["http://lighthouse1.lbry.io:50005", + "http://lighthouse2.lbry.io:50005", + "http://lighthouse3.lbry.io:50005"] REFLECTOR_SERVERS = [("reflector.lbry.io", 5566)] diff --git a/lbrynet/core/BlobAvailability.py b/lbrynet/core/BlobAvailability.py new file mode 100644 index 000000000..f003b2770 --- /dev/null +++ b/lbrynet/core/BlobAvailability.py @@ -0,0 +1,89 @@ +import logging + +from twisted.internet import defer +from twisted.internet.task import LoopingCall +from decimal import Decimal + +log = logging.getLogger(__name__) + + +class BlobAvailabilityTracker(object): + """ + Class to track peer counts for known blobs, and to discover new popular blobs + + Attributes: + availability (dict): dictionary of peers for known blobs + """ + + def __init__(self, blob_manager, peer_finder, dht_node): + self.availability = {} + self.last_mean_availability = Decimal(0.0) + self._blob_manager = blob_manager + self._peer_finder = peer_finder + self._dht_node = dht_node + self._check_popular = LoopingCall(self._update_most_popular) + self._check_mine = LoopingCall(self._update_mine) + + def start(self): + log.info("Starting blob tracker") + self._check_popular.start(30) + self._check_mine.start(120) + + def stop(self): + if self._check_popular.running: + self._check_popular.stop() + if self._check_mine.running: + self._check_mine.stop() + + def get_blob_availability(self, blob): + def _get_peer_count(peers): + have_blob = sum(1 for peer in peers if peer.is_available()) + return {blob: have_blob} + + d = self._peer_finder.find_peers_for_blob(blob) + d.addCallback(_get_peer_count) + return d + + def get_availability_for_blobs(self, blobs): + dl = [self.get_blob_availability(blob) for blob in blobs if blob] + d = defer.DeferredList(dl) + d.addCallback(lambda results: [val for success, val in results if success]) + return d + + def _update_peers_for_blob(self, blob): + def _save_peer_info(blob_hash, peers): + v = {blob_hash: peers} + self.availability.update(v) + return v + + d = self._peer_finder.find_peers_for_blob(blob) + d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) + d.addCallback(lambda peers: _save_peer_info(blob, peers)) + return d + + def _get_most_popular(self): + dl = [] + for (hash, _) in self._dht_node.get_most_popular_hashes(100): + encoded = hash.encode('hex') + dl.append(self._update_peers_for_blob(encoded)) + return defer.DeferredList(dl) + + def _update_most_popular(self): + d = self._get_most_popular() + d.addCallback(lambda _: self._get_mean_peers()) + + def _update_mine(self): + def _get_peers(blobs): + dl = [] + for hash in blobs: + dl.append(self._update_peers_for_blob(hash)) + return defer.DeferredList(dl) + + d = self._blob_manager.get_all_verified_blobs() + d.addCallback(_get_peers) + d.addCallback(lambda _: self._get_mean_peers()) + + def _get_mean_peers(self): + num_peers = [len(self.availability[blob]) for blob in self.availability] + mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers))) + self.last_mean_availability = mean diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index fc37a3b7e..5ae593fcb 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -2,6 +2,7 @@ import logging import os import time import sqlite3 + from twisted.internet import threads, defer from twisted.python.failure import Failure from twisted.enterprise import adbapi @@ -12,7 +13,6 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked - log = logging.getLogger(__name__) @@ -70,6 +70,12 @@ class BlobManager(DHTHashSupplier): def get_all_verified_blobs(self): pass + def add_blob_to_download_history(self, blob_hash, host, rate): + pass + + def add_blob_to_upload_history(self, blob_hash, host, rate): + pass + class DiskBlobManager(BlobManager): """This class stores blobs on the hard disk""" @@ -185,6 +191,14 @@ class DiskBlobManager(BlobManager): d.addCallback(self.completed_blobs) return d + def add_blob_to_download_history(self, blob_hash, host, rate): + d = self._add_blob_to_download_history(blob_hash, host, rate) + return d + + def add_blob_to_upload_history(self, blob_hash, host, rate): + d = self._add_blob_to_upload_history(blob_hash, host, rate) + return d + def _manage(self): from twisted.internet import reactor @@ -243,12 +257,29 @@ class DiskBlobManager(BlobManager): # one that opened it. The individual connections in the pool are not used in multiple # threads. self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) - return self.db_conn.runQuery("create table if not exists blobs (" + - " blob_hash text primary key, " + - " blob_length integer, " + - " last_verified_time real, " + - " next_announce_time real" - ")") + + def create_tables(transaction): + transaction.execute("create table if not exists blobs (" + + " blob_hash text primary key, " + + " blob_length integer, " + + " last_verified_time real, " + + " next_announce_time real)") + + transaction.execute("create table if not exists download (" + + " id integer primary key autoincrement, " + + " blob text, " + + " host text, " + + " rate float, " + + " ts integer)") + + transaction.execute("create table if not exists upload (" + + " id integer primary key autoincrement, " + + " blob text, " + + " host text, " + + " rate float, " + + " ts integer)") + + return self.db_conn.runInteraction(create_tables) @rerun_if_locked def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None): @@ -426,6 +457,18 @@ class DiskBlobManager(BlobManager): d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs)) return d + @rerun_if_locked + def _add_blob_to_download_history(self, blob_hash, host, rate): + ts = int(time.time()) + d = self.db_conn.runQuery("insert into download values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) + return d + + @rerun_if_locked + def _add_blob_to_upload_history(self, blob_hash, host, rate): + ts = int(time.time()) + d = self.db_conn.runQuery("insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) + return d + class TempBlobManager(BlobManager): """This class stores blobs in memory""" @@ -526,7 +569,6 @@ class TempBlobManager(BlobManager): d.addCallback(lambda _: set_next_manage_call()) def _delete_blobs_marked_for_deletion(self): - def remove_from_list(b_h): del self.blob_hashes_to_delete[b_h] log.info("Deleted blob %s", blob_hash) @@ -555,4 +597,4 @@ class TempBlobManager(BlobManager): d = defer.fail(Failure(NoSuchBlobError(blob_hash))) log.warning("Blob %s cannot be deleted because it is unknown") ds.append(d) - return defer.DeferredList(ds) \ No newline at end of file + return defer.DeferredList(ds) diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 8146dc169..dfc9bfe98 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -87,4 +87,8 @@ class NoSuchStreamHashError(Exception): class InvalidBlobHashError(Exception): + pass + + +class NegotiationError(Exception): pass \ No newline at end of file diff --git a/lbrynet/core/Offer.py b/lbrynet/core/Offer.py new file mode 100644 index 000000000..48b4b56fa --- /dev/null +++ b/lbrynet/core/Offer.py @@ -0,0 +1,62 @@ +from decimal import Decimal + + +class Offer(object): + """A rate offer to download blobs from a host.""" + + RATE_ACCEPTED = "RATE_ACCEPTED" + RATE_TOO_LOW = "RATE_TOO_LOW" + RATE_UNSET = "RATE_UNSET" + + def __init__(self, offer): + self._state = None + self.rate = None + if isinstance(offer, Decimal): + self.rate = round(offer, 5) + elif isinstance(offer, float): + self.rate = round(Decimal(offer), 5) + if self.rate is None or self.rate < Decimal(0.0): + self.unset() + + @property + def is_accepted(self): + return self._state is Offer.RATE_ACCEPTED + + @property + def is_too_low(self): + return self._state is Offer.RATE_TOO_LOW + + @property + def is_unset(self): + return self._state is Offer.RATE_UNSET + + @property + def message(self): + if self.is_accepted: + return Offer.RATE_ACCEPTED + elif self.is_too_low: + return Offer.RATE_TOO_LOW + elif self.is_unset: + return Offer.RATE_UNSET + return None + + def accept(self): + if self.is_unset or self._state is None: + self._state = Offer.RATE_ACCEPTED + + def reject(self): + if self.is_unset or self._state is None: + self._state = Offer.RATE_TOO_LOW + + def unset(self): + self._state = Offer.RATE_UNSET + + def handle(self, reply_message): + if reply_message == Offer.RATE_TOO_LOW: + self.reject() + elif reply_message == Offer.RATE_ACCEPTED: + self.accept() + elif reply_message == Offer.RATE_UNSET: + self.unset() + else: + raise Exception("Unknown offer reply %s" % str(reply_message)) \ No newline at end of file diff --git a/lbrynet/core/PaymentRateManager.py b/lbrynet/core/PaymentRateManager.py index a18882ac0..914e21947 100644 --- a/lbrynet/core/PaymentRateManager.py +++ b/lbrynet/core/PaymentRateManager.py @@ -1,6 +1,10 @@ +from lbrynet.core.Strategy import get_default_strategy +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE + class BasePaymentRateManager(object): - def __init__(self, rate): + def __init__(self, rate=MIN_BLOB_DATA_PAYMENT_RATE, info_rate=MIN_BLOB_INFO_PAYMENT_RATE): self.min_blob_data_payment_rate = rate + self.min_blob_info_payment_rate = info_rate class PaymentRateManager(object): @@ -26,4 +30,44 @@ class PaymentRateManager(object): return self.min_blob_data_payment_rate def record_points_paid(self, amount): - self.points_paid += amount \ No newline at end of file + self.points_paid += amount + + +class NegotiatedPaymentRateManager(object): + def __init__(self, base, availability_tracker, generous=True): + """ + @param base: a BasePaymentRateManager + @param availability_tracker: a BlobAvailabilityTracker + @param rate: the min blob data payment rate + """ + + self.base = base + self.points_paid = 0.0 + self.blob_tracker = availability_tracker + self.generous = generous + self.strategy = get_default_strategy(self.blob_tracker, + base_price=self.base.min_blob_data_payment_rate, + is_generous=generous) + + def get_rate_blob_data(self, peer, blobs): + response = self.strategy.make_offer(peer, blobs) + return response.rate + + def accept_rate_blob_data(self, peer, blobs, offer): + offer = self.strategy.respond_to_offer(offer, peer, blobs) + self.strategy.update_accepted_offers(peer, offer) + return offer.is_accepted + + def reply_to_offer(self, peer, blobs, offer): + reply = self.strategy.respond_to_offer(offer, peer, blobs) + self.strategy.update_accepted_offers(peer, reply) + return reply + + def get_rate_for_peer(self, peer): + return self.strategy.accepted_offers.get(peer, False) + + def record_points_paid(self, amount): + self.points_paid += amount + + def record_offer_reply(self, peer, offer): + self.strategy.update_accepted_offers(peer, offer) \ No newline at end of file diff --git a/lbrynet/core/Peer.py b/lbrynet/core/Peer.py index ba2cc80ee..3705f6e68 100644 --- a/lbrynet/core/Peer.py +++ b/lbrynet/core/Peer.py @@ -1,5 +1,6 @@ -from collections import defaultdict import datetime +from collections import defaultdict +from lbrynet.core import utils class Peer(object): @@ -12,8 +13,7 @@ class Peer(object): self.stats = defaultdict(float) # {string stat_type, float count} def is_available(self): - if (self.attempt_connection_at is None or - datetime.datetime.today() > self.attempt_connection_at): + if self.attempt_connection_at is None or utils.today() > self.attempt_connection_at: return True return False @@ -24,7 +24,7 @@ class Peer(object): def report_down(self): self.down_count += 1 timeout_time = datetime.timedelta(seconds=60 * self.down_count) - self.attempt_connection_at = datetime.datetime.today() + timeout_time + self.attempt_connection_at = utils.today() + timeout_time def update_score(self, score_change): self.score += score_change @@ -37,4 +37,3 @@ class Peer(object): def __repr__(self): return 'Peer({!r}, {!r})'.format(self.host, self.port) - diff --git a/lbrynet/core/PriceModel.py b/lbrynet/core/PriceModel.py new file mode 100644 index 000000000..299d50223 --- /dev/null +++ b/lbrynet/core/PriceModel.py @@ -0,0 +1,47 @@ +from zope.interface import implementer +from decimal import Decimal + +from lbrynet.interfaces import IBlobPriceModel +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE + + +def get_default_price_model(blob_tracker, base_price, **kwargs): + return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs) + + +class MeanAvailabilityWeightedPrice(object): + """ + Calculate mean-blob-availability and stream-position weighted price for a blob + + Attributes: + base_price (float): base price + alpha (float): constant, > 0.0 and <= 1.0, used to more highly value blobs at the beginning of a stream. + alpha defaults to 1.0, which has a null effect + blob_tracker (BlobAvailabilityTracker): blob availability tracker + """ + implementer(IBlobPriceModel) + + def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): + self.blob_tracker = tracker + self.base_price = Decimal(base_price) + self.alpha = Decimal(alpha) + + def calculate_price(self, blob): + mean_availability = self.blob_tracker.last_mean_availability + availability = self.blob_tracker.availability.get(blob, []) + index = 0 # blob.index + price = self.base_price * (mean_availability / Decimal(max(1, len(availability)))) / self._frontload(index) + return round(price, 5) + + def _frontload(self, index): + """ + Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream. + + At index 0, returns 1.0 + As index increases, return value approaches 2.0 + + @param index: blob position in stream + @return: front-load multiplier + """ + + return Decimal(2.0) - (self.alpha ** index) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 34cfdf529..1b0fb700b 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -9,7 +9,8 @@ from lbrynet.core.client.DHTPeerFinder import DHTPeerFinder from lbrynet.core.HashAnnouncer import DummyHashAnnouncer from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer from lbrynet.core.utils import generate_id -from lbrynet.core.PaymentRateManager import BasePaymentRateManager +from lbrynet.core.PaymentRateManager import BasePaymentRateManager, NegotiatedPaymentRateManager +from lbrynet.core.BlobAvailability import BlobAvailabilityTracker from twisted.internet import threads, defer @@ -26,9 +27,9 @@ class Session(object): the rate limiter, which attempts to ensure download and upload rates stay below a set maximum, and upnp, which opens holes in compatible firewalls so that remote peers can connect to this peer.""" def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, - known_dht_nodes=None, peer_finder=None, hash_announcer=None, - blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, - rate_limiter=None, wallet=None, dht_node_class=node.Node): + known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, + peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, + blob_tracker_class=None, payment_rate_manager_class=None): """ @param blob_data_payment_rate: The default payment rate for blob data @@ -88,6 +89,9 @@ class Session(object): self.blob_dir = blob_dir self.blob_manager = blob_manager + self.blob_tracker = None + self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker + self.peer_port = peer_port self.use_upnp = use_upnp @@ -103,6 +107,8 @@ class Session(object): self.dht_node = None self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) + self.payment_rate_manager = None + self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager def setup(self): """Create the blob directory and database if necessary, start all desired services""" @@ -136,6 +142,8 @@ class Session(object): def shut_down(self): """Stop all services""" ds = [] + if self.blob_manager is not None: + ds.append(defer.maybeDeferred(self.blob_tracker.stop)) if self.dht_node is not None: ds.append(defer.maybeDeferred(self.dht_node.stop)) if self.rate_limiter is not None: @@ -253,15 +261,26 @@ class Session(object): if self.blob_dir is None: self.blob_manager = TempBlobManager(self.hash_announcer) else: - self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) + self.blob_manager = DiskBlobManager(self.hash_announcer, + self.blob_dir, + self.db_dir) + + if self.blob_tracker is None: + self.blob_tracker = self.blob_tracker_class(self.blob_manager, + self.peer_finder, + self.dht_node) + if self.payment_rate_manager is None: + self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager, + self.blob_tracker) self.rate_limiter.start() d1 = self.blob_manager.setup() d2 = self.wallet.start() dl = defer.DeferredList([d1, d2], fireOnOneErrback=True, consumeErrors=True) + dl.addCallback(lambda _: self.blob_tracker.start()) - dl.addErrback(lambda err: err.value.subFailure) + dl.addErrback(self._subfailure) return dl def _unset_upnp(self): @@ -282,3 +301,9 @@ class Session(object): d = threads.deferToThread(threaded_unset_upnp) d.addErrback(lambda err: str(err)) return d + + def _subfailure(self, err): + log.error(err.getTraceback()) + return err.value + + diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py new file mode 100644 index 000000000..80fff00cb --- /dev/null +++ b/lbrynet/core/Strategy.py @@ -0,0 +1,124 @@ +from zope.interface import implementer +from decimal import Decimal +from lbrynet.interfaces import INegotiationStrategy +from lbrynet.core.Offer import Offer +from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice + + +def get_default_strategy(blob_tracker, **kwargs): + return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs) + + +class Strategy(object): + """ + Base for negotiation strategies + """ + implementer(INegotiationStrategy) + + def __init__(self, price_model, max_rate, min_rate, is_generous=True): + self.price_model = price_model + self.is_generous = is_generous + self.accepted_offers = {} + self.offers_sent = {} + self.offers_received = {} + self.max_rate = max_rate or Decimal(self.price_model.base_price * 100) + self.min_rate = Decimal(min_rate) + + def _make_rate_offer(self, rates, offer_count): + return NotImplementedError() + + def _get_response_rate(self, rates, offer_count): + return NotImplementedError() + + def make_offer(self, peer, blobs): + offer_count = self.offers_sent.get(peer, 0) + self._add_offer_sent(peer) + if peer in self.accepted_offers: + # if there was a previous accepted offer, use that + offer = self.accepted_offers[peer] + elif offer_count == 0 and self.is_generous: + # Try asking for it for free + offer = Offer(Decimal(0.0)) + else: + rates = [self.price_model.calculate_price(blob) for blob in blobs] + price = self._make_rate_offer(rates, offer_count) + offer = Offer(price) + return offer + + def respond_to_offer(self, offer, peer, blobs): + offer_count = self.offers_received.get(peer, 0) + self._add_offer_received(peer) + rates = [self.price_model.calculate_price(blob) for blob in blobs] + price = self._get_response_rate(rates, offer_count) + + if peer in self.accepted_offers: + offer = self.accepted_offers[peer] + elif offer.rate == 0.0 and offer_count == 0 and self.is_generous: + # give blobs away for free by default on the first request + offer.accept() + self.accepted_offers.update({peer: offer}) + elif offer.rate >= price: + offer.accept() + self.accepted_offers.update({peer: offer}) + else: + offer.reject() + if peer in self.accepted_offers: + del self.accepted_offers[peer] + return offer + + def update_accepted_offers(self, peer, offer): + if not offer.is_accepted and peer in self.accepted_offers: + del self.accepted_offers[peer] + if offer.is_accepted: + self.accepted_offers.update({peer: offer}) + + def _add_offer_sent(self, peer): + turn = self.offers_sent.get(peer, 0) + 1 + self.offers_sent.update({peer: turn}) + + def _add_offer_received(self, peer): + turn = self.offers_received.get(peer, 0) + 1 + self.offers_received.update({peer: turn}) + + def _bounded_price(self, price): + price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate))) + return price_for_return + + +class BasicAvailabilityWeightedStrategy(Strategy): + """ + Basic strategy to target blob prices based on supply relative to mean supply + + Discount price target with each incoming request, and raise it with each outgoing from the modeled price + until the rate is accepted or a threshold is reached + """ + implementer(INegotiationStrategy) + + def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0, + is_generous=True, base_price=0.0001, alpha=1.0): + price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha) + Strategy.__init__(self, price_model, max_rate, min_rate, is_generous) + self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer + self._deceleration = Decimal(deceleration) + + def _get_mean_rate(self, rates): + mean_rate = Decimal(sum(rates)) / Decimal(max(len(rates), 1)) + return mean_rate + + def _premium(self, rate, turn): + return rate * (self._acceleration ** Decimal(turn)) + + def _discount(self, rate, turn): + return rate * (self._deceleration ** Decimal(turn)) + + def _get_response_rate(self, rates, offer_count): + rate = self._get_mean_rate(rates) + discounted = self._discount(rate, offer_count) + rounded_price = round(discounted, 5) + return self._bounded_price(rounded_price) + + def _make_rate_offer(self, rates, offer_count): + rate = self._get_mean_rate(rates) + with_premium = self._premium(rate, offer_count) + rounded_price = round(with_premium, 5) + return self._bounded_price(rounded_price) diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index e0a11d04f..0ce5c66d3 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -550,6 +550,9 @@ class Wallet(object): d.addCallback(_decode) return d + def get_claim_metadata_for_sd_hash(self, sd_hash): + return self._get_claim_metadata_for_sd_hash(sd_hash) + def get_name_and_validity_for_sd_hash(self, sd_hash): d = self._get_claim_metadata_for_sd_hash(sd_hash) d.addCallback(lambda name_txid: self._get_status_of_claim(name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 736a06494..8c59bc04f 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -1,25 +1,35 @@ -from collections import defaultdict import logging +from collections import defaultdict +from decimal import Decimal + from twisted.internet import defer from twisted.python.failure import Failure from zope.interface import implements -from lbrynet.core.Error import PriceDisagreementError, DownloadCanceledError, InsufficientFundsError -from lbrynet.core.Error import InvalidResponseError, RequestCanceledError, NoResponseError + from lbrynet.core.Error import ConnectionClosedBeforeResponseError +from lbrynet.core.Error import InvalidResponseError, RequestCanceledError, NoResponseError +from lbrynet.core.Error import PriceDisagreementError, DownloadCanceledError, InsufficientFundsError from lbrynet.core.client.ClientRequest import ClientRequest, ClientBlobRequest from lbrynet.interfaces import IRequestCreator +from lbrynet.core.Offer import Offer log = logging.getLogger(__name__) def get_points(num_bytes, rate): - return 1.0 * num_bytes * rate / 2**20 + if isinstance(rate, float): + return 1.0 * num_bytes * rate / 2**20 + elif isinstance(rate, Decimal): + return 1.0 * num_bytes * float(rate) / 2**20 + else: + raise Exception("Unknown rate type") def cache(fn): """Caches the function call for each instance""" attr = '__{}_value'.format(fn.__name__) + def helper(self): if not hasattr(self, attr): value = fn(self) @@ -42,6 +52,7 @@ class BlobRequester(object): self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}} self._protocol_prices = {} # {ClientProtocol: price} self._price_disagreements = [] # [Peer] + self._protocol_tries = {} self._incompatible_peers = [] ######## IRequestCreator ######### @@ -65,9 +76,9 @@ class BlobRequester(object): def _send_next_request(self, peer, protocol): log.debug('Sending a blob request for %s and %s', peer, protocol) - availability = AvailabilityRequest(self, peer, protocol) - download = DownloadRequest(self, peer, protocol, self.wallet, self.payment_rate_manager) - price = PriceRequest(self, peer, protocol) + availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager) + download = DownloadRequest(self, peer, protocol, self.payment_rate_manager, self.wallet) + price = PriceRequest(self, peer, protocol, self.payment_rate_manager) sent_request = False if availability.can_make_request(): @@ -161,10 +172,11 @@ class BlobRequester(object): class RequestHelper(object): - def __init__(self, requestor, peer, protocol): + def __init__(self, requestor, peer, protocol, payment_rate_manager): self.requestor = requestor self.peer = peer self.protocol = protocol + self.payment_rate_manager = payment_rate_manager @property def protocol_prices(self): @@ -197,10 +209,10 @@ class RequestHelper(object): return return reason - def get_and_save_rate_for_protocol(self): + def get_and_save_rate(self): rate = self.protocol_prices.get(self.protocol) if rate is None: - rate = self.requestor.payment_rate_manager.get_rate_blob_data(self.peer) + rate = self.payment_rate_manager.get_rate_blob_data(self.peer, self.available_blobs) self.protocol_prices[self.protocol] = rate return rate @@ -322,12 +334,59 @@ class AvailabilityRequest(RequestHelper): self.unavailable_blobs.remove(blob_hash) +class PriceRequest(RequestHelper): + """Ask a peer if a certain price is acceptable""" + def can_make_request(self): + return self.get_and_save_rate() is not None + + def make_request_and_handle_response(self): + request = self._get_price_request() + self._handle_price_request(request) + + def _get_price_request(self): + rate = self.get_and_save_rate() + if rate is None: + log.debug("No blobs to request from %s", self.peer) + raise Exception('Cannot make a price request without a payment rate') + log.debug("Offer rate %s to %s for %i blobs", rate, self.peer, len(self.available_blobs)) + + request_dict = {'blob_data_payment_rate': rate} + return ClientRequest(request_dict, 'blob_data_payment_rate') + + def _handle_price_request(self, price_request): + d = self.protocol.add_request(price_request) + d.addCallback(self._handle_price_response, price_request) + d.addErrback(self._request_failed, "price request") + + def _handle_price_response(self, response_dict, request): + assert request.response_identifier == 'blob_data_payment_rate' + if 'blob_data_payment_rate' not in response_dict: + return InvalidResponseError("response identifier not in response") + assert self.protocol in self.protocol_prices + rate = self.protocol_prices[self.protocol] + offer = Offer(rate) + offer.handle(response_dict['blob_data_payment_rate']) + self.payment_rate_manager.record_offer_reply(self.peer.host, offer) + + if offer.is_accepted: + log.debug("Offered rate %f/mb accepted by %s", rate, str(self.peer.host)) + return True + elif offer.is_too_low: + log.debug("Offered rate %f/mb rejected by %s", rate, str(self.peer.host)) + del self.protocol_prices[self.protocol] + return True + else: + log.warning("Price disagreement") + del self.protocol_prices[self.protocol] + self.requestor._price_disagreements.append(self.peer) + return False + + class DownloadRequest(RequestHelper): """Choose a blob and download it from a peer and also pay the peer for the data.""" - def __init__(self, requestor, peer, protocol, wallet, payment_rate_manager): - RequestHelper.__init__(self, requestor, peer, protocol) + def __init__(self, requester, peer, protocol, payment_rate_manager, wallet): + RequestHelper.__init__(self, requester, peer, protocol, payment_rate_manager) self.wallet = wallet - self.payment_rate_manager = payment_rate_manager def can_make_request(self): return self.get_blob_details() @@ -413,6 +472,9 @@ class DownloadRequest(RequestHelper): def _pay_or_cancel_payment(self, arg, reserved_points, blob): if self._can_pay_peer(blob, arg): self._pay_peer(blob.length, reserved_points) + d = self.requestor.blob_manager.add_blob_to_download_history(str(blob), + str(self.peer.host), + float(self.protocol_prices[self.protocol])) else: self._cancel_points(reserved_points) return arg @@ -425,7 +487,7 @@ class DownloadRequest(RequestHelper): def _pay_peer(self, num_bytes, reserved_points): assert num_bytes != 0 - rate = self.get_and_save_rate_for_protocol() + rate = self.get_and_save_rate() point_amount = get_points(num_bytes, rate) self.wallet.send_points(reserved_points, point_amount) self.payment_rate_manager.record_points_paid(point_amount) @@ -452,7 +514,7 @@ class DownloadRequest(RequestHelper): # not yet been set for this protocol or for it to have been # removed so instead I switched it to check if a rate has been set # and calculate it if it has not - rate = self.get_and_save_rate_for_protocol() + rate = self.get_and_save_rate() points_to_reserve = get_points(num_bytes, rate) return self.wallet.reserve_points(self.peer, points_to_reserve) @@ -481,39 +543,4 @@ class BlobDownloadDetails(object): def counting_write_func(self, data): self.peer.update_stats('blob_bytes_downloaded', len(data)) - return self.write_func(data) - - -class PriceRequest(RequestHelper): - """Ask a peer if a certain price is acceptable""" - def can_make_request(self): - return self.get_and_save_rate_for_protocol() is not None - - def make_request_and_handle_response(self): - request = self._get_price_request() - self._handle_price_request(request) - - def _get_price_request(self): - rate = self.get_and_save_rate_for_protocol() - if rate is None: - raise Exception('Cannot make a price request without a payment rate') - request_dict = {'blob_data_payment_rate': rate} - return ClientRequest(request_dict, 'blob_data_payment_rate') - - def _handle_price_request(self, price_request): - d = self.protocol.add_request(price_request) - d.addCallback(self._handle_price_response, price_request) - d.addErrback(self._request_failed, "price request") - - def _handle_price_response(self, response_dict, request): - assert request.response_identifier == 'blob_data_payment_rate' - if 'blob_data_payment_rate' not in response_dict: - return InvalidResponseError("response identifier not in response") - assert self.protocol in self.protocol_prices - response = response_dict['blob_data_payment_rate'] - if response == "RATE_ACCEPTED": - return True - else: - del self.protocol_prices[self.protocol] - self.requestor._price_disagreements.append(self.peer) - return True + return self.write_func(data) \ No newline at end of file diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index aad0dcdf7..1989f8c96 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -1,5 +1,6 @@ import json import logging +from decimal import Decimal from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ClientFactory from twisted.python import failure @@ -14,6 +15,12 @@ from zope.interface import implements log = logging.getLogger(__name__) +def encode_decimal(obj): + if isinstance(obj, Decimal): + return float(obj) + raise TypeError(repr(obj) + " is not JSON serializable") + + class ClientProtocol(Protocol): implements(IRequestSender, IRateLimited) @@ -132,7 +139,7 @@ class ClientProtocol(Protocol): def _send_request_message(self, request_msg): # TODO: compare this message to the last one. If they're the same, # TODO: incrementally delay this message. - m = json.dumps(request_msg) + m = json.dumps(request_msg, default=encode_decimal) self.transport.write(m) def _get_valid_response(self, response_msg): @@ -191,7 +198,9 @@ class ClientProtocol(Protocol): for success, result in results: if success is False: failed = True - log.info("The connection is closing due to an error: %s", str(result.getTraceback())) + if not isinstance(result.value, DownloadCanceledError): + log.info(result.value) + log.info("The connection is closing due to an error: %s", str(result.getTraceback())) if failed is False: log.debug("Asking for another request.") from twisted.internet import reactor @@ -215,7 +224,7 @@ class ClientProtocol(Protocol): # TODO: always be this way. it's done this way now because the client has no other way # TODO: of telling the server it wants the download to stop. It would be great if the # TODO: protocol had such a mechanism. - log.info("Closing the connection to %s because the download of blob %s was canceled", + log.debug("Closing the connection to %s because the download of blob %s was canceled", str(self.peer), str(self._blob_download_request.blob)) #self.transport.loseConnection() #return True diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index 6684a9153..2f2a5c75b 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -73,12 +73,12 @@ def _log_decorator(fn): def disable_third_party_loggers(): logging.getLogger('requests').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('BitcoinRPC').setLevel(logging.INFO) def disable_noisy_loggers(): logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO) - logging.getLogger('lbrynet.core.client').setLevel(logging.INFO) - logging.getLogger('lbrynet.core.server').setLevel(logging.INFO) + logging.getLogger('lbrynet.core').setLevel(logging.INFO) logging.getLogger('lbrynet.dht').setLevel(logging.INFO) logging.getLogger('lbrynet.lbrynet_daemon').setLevel(logging.INFO) logging.getLogger('lbrynet.core.Wallet').setLevel(logging.INFO) diff --git a/lbrynet/core/server/BlobAvailabilityHandler.py b/lbrynet/core/server/BlobAvailabilityHandler.py index a5d550bdf..dbd373a36 100644 --- a/lbrynet/core/server/BlobAvailabilityHandler.py +++ b/lbrynet/core/server/BlobAvailabilityHandler.py @@ -40,7 +40,7 @@ class BlobAvailabilityHandler(object): def handle_queries(self, queries): if self.query_identifiers[0] in queries: - log.debug("Received the client's list of requested blobs") + log.info("Received the client's list of requested blobs") d = self._get_available_blobs(queries[self.query_identifiers[0]]) def set_field(available_blobs): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 4babfa91c..94e329da4 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -1,8 +1,11 @@ import logging + from twisted.internet import defer from twisted.protocols.basic import FileSender from twisted.python.failure import Failure from zope.interface import implements + +from lbrynet.core.Offer import Offer from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender @@ -34,18 +37,20 @@ class BlobRequestHandler(object): implements(IQueryHandler, IBlobSender) PAYMENT_RATE_QUERY = 'blob_data_payment_rate' BLOB_QUERY = 'requested_blob' + AVAILABILITY_QUERY = 'requested_blobs' def __init__(self, blob_manager, wallet, payment_rate_manager): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY] + self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] self.peer = None self.blob_data_payment_rate = None self.read_handle = None self.currently_uploading = None self.file_sender = None self.blob_bytes_uploaded = 0 + self._blobs_requested = [] ######### IQueryHandler ######### @@ -55,51 +60,19 @@ class BlobRequestHandler(object): request_handler.register_blob_sender(self) def handle_queries(self, queries): - response = {} + response = defer.succeed({}) + log.debug("Handle query: %s", str(queries)) + + if self.AVAILABILITY_QUERY in queries: + self._blobs_requested = queries[self.AVAILABILITY_QUERY] + response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested)) if self.PAYMENT_RATE_QUERY in queries: - self._handle_payment_rate_query(response, queries[self.PAYMENT_RATE_QUERY]) + offered_rate = queries[self.PAYMENT_RATE_QUERY] + offer = Offer(offered_rate) + response.addCallback(lambda r: self._handle_payment_rate_query(offer, r)) if self.BLOB_QUERY in queries: - return self._handle_blob_query(response, queries[self.BLOB_QUERY]) - else: - return defer.succeed(response) - - def _handle_payment_rate_query(self, response, query): - if not self.handle_blob_data_payment_rate(query): - response['blob_data_payment_rate'] = "RATE_TOO_LOW" - else: - response['blob_data_payment_rate'] = 'RATE_ACCEPTED' - - def _handle_blob_query(self, response, query): - log.debug("Received the client's request to send a blob") - response['incoming_blob'] = {} - - if self.blob_data_payment_rate is None: - response['incoming_blob']['error'] = "RATE_UNSET" - return defer.succeed(response) - else: - return self._send_blob(response, query) - - def _send_blob(self, response, query): - d = self.blob_manager.get_blob(query, True) - d.addCallback(self.open_blob_for_reading, response) - return d - - def open_blob_for_reading(self, blob, response): - def failure(msg): - log.warning("We can not send %s: %s", blob, msg) - response['incoming_blob']['error'] = "BLOB_UNAVAILABLE" - return response - if not blob.is_validated(): - return failure("blob can't be validated") - read_handle = blob.open_for_reading() - if read_handle is None: - return failure("blob can't be opened") - - self.currently_uploading = blob - self.read_handle = read_handle - log.info("Sending %s to client", blob) - response['incoming_blob']['blob_hash'] = blob.blob_hash - response['incoming_blob']['length'] = blob.length + incoming = queries[self.BLOB_QUERY] + response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) return response ######### IBlobSender ######### @@ -118,12 +91,91 @@ class BlobRequestHandler(object): ######### internal ######### - def handle_blob_data_payment_rate(self, requested_payment_rate): - if not self.payment_rate_manager.accept_rate_blob_data(self.peer, requested_payment_rate): - return False + def _reply_to_availability(self, request, blobs): + d = self._get_available_blobs(blobs) + + def set_available(available_blobs): + log.debug("available blobs: %s", str(available_blobs)) + request.update({'available_blobs': available_blobs}) + return request + + d.addCallback(set_available) + return d + + def _handle_payment_rate_query(self, offer, request): + blobs = self._blobs_requested + log.debug("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs)) + reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer) + if reply.is_accepted: + self.blob_data_payment_rate = offer.rate + request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" + log.debug("Accepted rate: %f", offer.rate) + elif reply.is_too_low: + request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" + log.debug("Reject rate: %f", offer.rate) + elif reply.is_unset: + log.warning("Rate unset") + request['incoming_blob'] = {'error': 'RATE_UNSET'} + log.debug("Returning rate query result: %s", str(request)) + + return request + + def _handle_blob_query(self, response, query): + log.debug("Received the client's request to send a blob") + response['incoming_blob'] = {} + + if self.blob_data_payment_rate is None: + response['incoming_blob'] = {'error': "RATE_UNSET"} + return response else: - self.blob_data_payment_rate = requested_payment_rate - return True + return self._send_blob(response, query) + + def _send_blob(self, response, query): + d = self.blob_manager.get_blob(query, True) + d.addCallback(self.open_blob_for_reading, response) + return d + + def open_blob_for_reading(self, blob, response): + response_fields = {} + d = defer.succeed(None) + if blob.is_validated(): + read_handle = blob.open_for_reading() + if read_handle is not None: + self.currently_uploading = blob + self.read_handle = read_handle + log.info("Sending %s to client", str(blob)) + response_fields['blob_hash'] = blob.blob_hash + response_fields['length'] = blob.length + response['incoming_blob'] = response_fields + d.addCallback(lambda _: self.record_transaction(blob)) + d.addCallback(lambda _: response) + return d + log.debug("We can not send %s", str(blob)) + response['incoming_blob'] = {'error': 'BLOB_UNAVAILABLE'} + d.addCallback(lambda _: response) + return d + + def record_transaction(self, blob): + d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, self.blob_data_payment_rate) + return d + + def _reply_to_send_request(self, response, incoming): + response_fields = {} + response['incoming_blob'] = response_fields + + if self.blob_data_payment_rate is None: + log.debug("Rate not set yet") + response['incoming_blob'] = {'error': 'RATE_UNSET'} + return defer.succeed(response) + else: + log.debug("Requested blob: %s", str(incoming)) + d = self.blob_manager.get_blob(incoming, True) + d.addCallback(lambda blob: self.open_blob_for_reading(blob, response)) + return d + + def _get_available_blobs(self, requested_blobs): + d = self.blob_manager.completed_blobs(requested_blobs) + return d def send_file(self, consumer): @@ -165,6 +217,6 @@ class BlobRequestHandler(object): self.currently_uploading = None self.file_sender = None if reason is not None and isinstance(reason, Failure): - log.info("Upload has failed. Reason: %s", reason.getErrorMessage()) + log.warning("Upload has failed. Reason: %s", reason.getErrorMessage()) return _send_file() diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index d71e01e3a..f9f114233 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -4,6 +4,7 @@ import random import os import json import yaml +import datetime from lbrynet.core.cryptoutils import get_lbry_hash_obj @@ -76,4 +77,8 @@ def save_settings(path, settings): assert encoder is not False, "Unknown settings format .%s" % ext f = open(path, 'w') f.write(encoder(settings)) - f.close() \ No newline at end of file + f.close() + + +def today(): + return datetime.datetime.today() \ No newline at end of file diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index c3e8305cb..482f755d7 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -48,7 +48,7 @@ class CryptStreamDownloader(object): @param blob_manager: A BlobManager object - @param payment_rate_manager: A PaymentRateManager object + @param payment_rate_manager: A NegotiatedPaymentRateManager object @param wallet: An object which implements the IWallet interface diff --git a/lbrynet/interfaces.py b/lbrynet/interfaces.py index 5597ae8a5..10fbc8c66 100644 --- a/lbrynet/interfaces.py +++ b/lbrynet/interfaces.py @@ -647,4 +647,60 @@ class IWallet(Interface): @type amount: float @return: None + """ + + +class IBlobPriceModel(Interface): + """ + A blob price model + + Used by INegotiationStrategy classes + """ + + def calculate_price(self, blob): + """ + Calculate the price for a blob + + @param blob: a blob hash + @type blob: str + + @return: blob price target + @type: Decimal + """ + + +class INegotiationStrategy(Interface): + """ + Strategy to negotiate download payment rates + """ + + def make_offer(self, peer, blobs): + """ + Make a rate offer for the given peer and blobs + + @param peer: peer to make an offer to + @type: str + + @param blobs: blob hashes to make an offer for + @type: list + + @return: rate offer + @rtype: Offer + """ + + def respond_to_offer(self, offer, peer, blobs): + """ + Respond to a rate offer given by a peer + + @param offer: offer to reply to + @type: Offer + + @param peer: peer to make an offer to + @type: str + + @param blobs: blob hashes to make an offer for + @type: list + + @return: accepted, rejected, or unset offer + @rtype: Offer """ \ No newline at end of file diff --git a/lbrynet/lbryfile/client/EncryptedFileOptions.py b/lbrynet/lbryfile/client/EncryptedFileOptions.py index fc3423708..067e3437d 100644 --- a/lbrynet/lbryfile/client/EncryptedFileOptions.py +++ b/lbrynet/lbryfile/client/EncryptedFileOptions.py @@ -14,16 +14,16 @@ class EncryptedFileOptions(object): prm = payment_rate_manager def get_default_data_rate_description(): - if prm.min_blob_data_payment_rate is None: + if prm.base.min_blob_data_payment_rate is None: return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate) else: - return "%f LBC/MB" % prm.min_blob_data_payment_rate + return "%f LBC/MB" % prm.base.min_blob_data_payment_rate rate_choices = [] - rate_choices.append(DownloadOptionChoice(prm.min_blob_data_payment_rate, + rate_choices.append(DownloadOptionChoice(prm.base.min_blob_data_payment_rate, "No change - %s" % get_default_data_rate_description(), "No change - %s" % get_default_data_rate_description())) - if prm.min_blob_data_payment_rate is not None: + if prm.base.min_blob_data_payment_rate is not None: rate_choices.append(DownloadOptionChoice(None, "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate), "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate))) @@ -36,7 +36,7 @@ class EncryptedFileOptions(object): rate_choices, "Rate which will be paid for data", "data payment rate", - prm.min_blob_data_payment_rate, + prm.base.min_blob_data_payment_rate, get_default_data_rate_description() ), DownloadOption( diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 5d9ab5a0d..5546787c4 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -41,7 +41,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def _save_sd_hash(sd_hash): if len(sd_hash): self.sd_hash = sd_hash[0] - d = self.wallet._get_claim_metadata_for_sd_hash(self.sd_hash) + d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) else: d = defer.succeed(None) @@ -122,13 +122,12 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def _start(self): d = EncryptedFileSaver._start(self) - - d.addCallback(lambda _: self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash)) + d.addCallback(lambda _: self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)) def _save_sd_hash(sd_hash): if len(sd_hash): self.sd_hash = sd_hash[0] - d = self.wallet._get_claim_metadata_for_sd_hash(self.sd_hash) + d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) else: d = defer.succeed(None) diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index 20c9690a7..9c774d3d9 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -9,10 +9,10 @@ from twisted.enterprise import adbapi from twisted.internet import defer, task, reactor from twisted.python.failure import Failure +from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType -from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError, CurrentlyStoppingError from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -74,7 +74,8 @@ class EncryptedFileManager(object): def _start_lbry_files(self): def set_options_and_restore(rowid, stream_hash, options): - payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) + payment_rate_manager = NegotiatedPaymentRateManager(self.session.base_payment_rate_manager, + self.session.blob_tracker) d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, blob_data_rate=options) d.addCallback(lambda downloader: downloader.restore()) diff --git a/lbrynet/lbrylive/PaymentRateManager.py b/lbrynet/lbrylive/PaymentRateManager.py index 77ff09030..8a14cf9a8 100644 --- a/lbrynet/lbrylive/PaymentRateManager.py +++ b/lbrynet/lbrylive/PaymentRateManager.py @@ -19,11 +19,18 @@ class LiveStreamPaymentRateManager(object): def accept_rate_live_blob_info(self, peer, payment_rate): return payment_rate >= self.get_effective_min_live_blob_info_payment_rate() - def get_rate_blob_data(self, peer): - return self.get_effective_min_blob_data_payment_rate() + def get_rate_blob_data(self, peer, blobs): + response = self._payment_rate_manager.strategy.make_offer(peer, blobs) + return response.rate - def accept_rate_blob_data(self, peer, payment_rate): - return payment_rate >= self.get_effective_min_blob_data_payment_rate() + def accept_rate_blob_data(self, peer, blobs, offer): + response = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs) + return response.accepted + + def reply_to_offer(self, peer, blobs, offer): + reply = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs) + self._payment_rate_manager.strategy.offer_accepted(peer, reply) + return reply def get_effective_min_blob_data_payment_rate(self): rate = self.min_blob_data_payment_rate @@ -42,4 +49,4 @@ class LiveStreamPaymentRateManager(object): return rate def record_points_paid(self, amount): - self.points_paid += amount \ No newline at end of file + self.points_paid += amount diff --git a/lbrynet/lbrylive/client/LiveStreamDownloader.py b/lbrynet/lbrylive/client/LiveStreamDownloader.py index 1ce9a413e..8b6befb54 100644 --- a/lbrynet/lbrylive/client/LiveStreamDownloader.py +++ b/lbrynet/lbrylive/client/LiveStreamDownloader.py @@ -144,7 +144,7 @@ class FullLiveStreamDownloaderFactory(object): def make_downloader(self, metadata, options, payment_rate_manager): # TODO: check options for payment rate manager parameters - payment_rate_manager = LiveStreamPaymentRateManager(self.default_payment_rate_manager, + prm = LiveStreamPaymentRateManager(self.default_payment_rate_manager, payment_rate_manager) def save_source_if_blob(stream_hash): @@ -161,7 +161,7 @@ class FullLiveStreamDownloaderFactory(object): def create_downloader(stream_hash): stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.stream_info_manager, - payment_rate_manager, self.wallet, True) + prm, self.wallet, True) # TODO: change upload_allowed=True above to something better d = stream_downloader.set_stream_info() d.addCallback(lambda _: stream_downloader) diff --git a/lbrynet/lbrynet_console/Console.py b/lbrynet/lbrynet_console/Console.py index cada41cb7..3f700222e 100644 --- a/lbrynet/lbrynet_console/Console.py +++ b/lbrynet/lbrynet_console/Console.py @@ -20,7 +20,6 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, API_CONNECTION_STRING # , from lbrynet.core.utils import generate_id from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.PaymentRateManager import PaymentRateManager -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.PTCWallet import PTCWallet @@ -363,19 +362,14 @@ class Console(): def _setup_query_handlers(self): handlers = [ - #CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, - # self._server_payment_rate_manager), - BlobAvailabilityHandlerFactory(self.session.blob_manager), - #BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, - # self._server_payment_rate_manager), self.session.wallet.get_wallet_info_query_handler_factory(), ] def get_blob_request_handler_factory(rate): - self.blob_request_payment_rate_manager = PaymentRateManager( - self.session.base_payment_rate_manager, rate - ) - handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, + self.blob_request_payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager, + rate) + handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, + self.session.wallet, self.blob_request_payment_rate_manager)) d1 = self.settings.get_server_data_payment_rate() diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index a7f5d82e7..25054000a 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -644,7 +644,7 @@ class AddStream(CommandHandler): for option, option_value in zip(self.download_options, self.options_chosen): if option.short_description == "data payment rate": if option_value == None: - rate = self.payment_rate_manager.get_effective_min_blob_data_payment_rate() + rate = 0.0 else: rate = option_value stream_size = None diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 0a9bd9b8f..60c29e60b 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -27,8 +27,6 @@ from txjsonrpc.web.jsonrpc import Handler from lbrynet import __version__ as lbrynet_version from lbryum.version import LBRYUM_VERSION as lbryum_version from lbrynet import analytics -from lbrynet.core.PaymentRateManager import PaymentRateManager -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError @@ -546,7 +544,6 @@ class Daemon(jsonrpc.JSONRPC): d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self._setup_stream_identifier()) d.addCallback(lambda _: self._setup_lbry_file_manager()) - d.addCallback(lambda _: self._setup_lbry_file_opener()) d.addCallback(lambda _: self._setup_query_handlers()) d.addCallback(lambda _: self._setup_server()) d.addCallback(lambda _: _log_starting_vals()) @@ -679,7 +676,6 @@ class Daemon(jsonrpc.JSONRPC): # TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY. def _start_file(f): d = self.lbry_file_manager.toggle_lbry_file_running(f) - d.addCallback(lambda _: self.lighthouse_client.announce_sd(f.sd_hash)) return defer.succeed("Started LBRY file") def _get_and_start_file(name): @@ -778,20 +774,13 @@ class Daemon(jsonrpc.JSONRPC): def _setup_query_handlers(self): handlers = [ - # CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, - # self._server_payment_rate_manager), - BlobAvailabilityHandlerFactory(self.session.blob_manager), - # BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, - # self._server_payment_rate_manager), + BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, + self.session.payment_rate_manager), self.session.wallet.get_wallet_info_query_handler_factory(), ] def get_blob_request_handler_factory(rate): - self.blob_request_payment_rate_manager = PaymentRateManager( - self.session.base_payment_rate_manager, rate - ) - handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, - self.blob_request_payment_rate_manager)) + self.blob_request_payment_rate_manager = self.session.payment_rate_manager d1 = self.settings.get_server_data_payment_rate() d1.addCallback(get_blob_request_handler_factory) @@ -1104,14 +1093,6 @@ class Daemon(jsonrpc.JSONRPC): self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_opener_factory) return defer.succeed(None) - def _setup_lbry_file_opener(self): - - downloader_factory = EncryptedFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter, - self.session.blob_manager, self.stream_info_manager, - self.session.wallet) - self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, downloader_factory) - return defer.succeed(True) - def _download_sd_blob(self, sd_hash, timeout=DEFAULT_SD_DOWNLOAD_TIMEOUT): def cb(result): if not r.called: @@ -1123,7 +1104,7 @@ class Daemon(jsonrpc.JSONRPC): r = defer.Deferred(None) reactor.callLater(timeout, eb) - d = download_sd_blob(self.session, sd_hash, PaymentRateManager(self.session.base_payment_rate_manager)) + d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager) d.addCallback(BlobStreamDescriptorReader) d.addCallback(lambda blob: blob.get_info()) d.addCallback(cb) @@ -1906,13 +1887,8 @@ class Daemon(jsonrpc.JSONRPC): """ name = p['name'] - force = p.get('force', False) - if force: - d = self._get_est_cost(name) - else: - d = self._search(name) - d.addCallback(lambda r: [i['cost'] for i in r][0]) + d = self._get_est_cost(name) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -2587,6 +2563,53 @@ class Daemon(jsonrpc.JSONRPC): d = self._render_response(SEARCH_SERVERS, OK_CODE) return d + def jsonrpc_get_mean_availability(self): + """ + Get mean blob availability + + Args: + None + Returns: + Mean peers for a blob + """ + + d = self._render_response(self.session.blob_tracker.last_mean_availability, OK_CODE) + return d + + def jsonrpc_get_availability(self, p): + """ + Get stream availability for a winning claim + + Arg: + name (str): lbry uri + + Returns: + peers per blob / total blobs + """ + + def _get_mean(blob_availabilities): + peer_counts = [] + for blob_availability in blob_availabilities: + for blob, peers in blob_availability.iteritems(): + peer_counts.append(peers) + if peer_counts: + return round(1.0 * sum(peer_counts) / len(peer_counts), 2) + else: + return 0.0 + + name = p['name'] + + d = self._resolve_name(name, force_refresh=True) + d.addCallback(get_sd_hash) + d.addCallback(self._download_sd_blob) + d.addCallbacks(lambda descriptor: [blob.get('blob_hash') for blob in descriptor['blobs']], + lambda _: []) + d.addCallback(self.session.blob_tracker.get_availability_for_blobs) + d.addCallback(_get_mean) + d.addCallback(lambda result: self._render_response(result, OK_CODE)) + + return d + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index 99148d656..f4056bc9e 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -9,7 +9,6 @@ from twisted.internet import defer from twisted.internet.task import LoopingCall from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed -from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.metadata.Fee import FeeValidator from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -48,11 +47,10 @@ class GetStream(object): self.description = None self.fee = None self.data_rate = data_rate - self.name = None self.file_name = file_name self.session = session self.exchange_rate_manager = exchange_rate_manager - self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) + self.payment_rate_manager = self.session.payment_rate_manager self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier self.stream_hash = None @@ -148,7 +146,7 @@ class GetStream(object): return self.finished def _start_download(self, downloader): - log.info('Starting download for %s', self.name) + log.info('Starting download for %s', self.resolved_name) self.downloader = downloader self.download_path = os.path.join(downloader.download_directory, downloader.file_name) diff --git a/lbrynet/lbrynet_daemon/ExchangeRateManager.py b/lbrynet/lbrynet_daemon/ExchangeRateManager.py index af88a6448..fe9b293e1 100644 --- a/lbrynet/lbrynet_daemon/ExchangeRateManager.py +++ b/lbrynet/lbrynet_daemon/ExchangeRateManager.py @@ -37,24 +37,34 @@ class MarketFeed(object): self._updater = LoopingCall(self._update_price) def _make_request(self): - r = requests.get(self.url, self.params) - return r.text + try: + r = requests.get(self.url, self.params) + return defer.succeed(r.text) + except Exception as err: + log.error(err) + return defer.fail(err) def _handle_response(self, response): return NotImplementedError def _subtract_fee(self, from_amount): + # increase amount to account for market fees return defer.succeed(from_amount / (1.0 - self.fee)) def _save_price(self, price): log.debug("Saving price update %f for %s" % (price, self.market)) self.rate = ExchangeRate(self.market, price, int(time.time())) + def _log_error(self, err): + log.error(err) + log.warning("There was a problem updating %s exchange rate information from %s", self.market, self.name) + def _update_price(self): - d = defer.succeed(self._make_request()) + d = self._make_request() d.addCallback(self._handle_response) d.addCallback(self._subtract_fee) d.addCallback(self._save_price) + d.addErrback(self._log_error) def start(self): if not self._updater.running: @@ -94,7 +104,11 @@ class GoogleBTCFeed(MarketFeed): ) def _make_request(self): - return googlefinance.getQuotes('CURRENCY:USDBTC')[0] + try: + r = googlefinance.getQuotes('CURRENCY:USDBTC')[0] + return defer.succeed(r) + except Exception as err: + return defer.fail(err) def _handle_response(self, response): return float(response['LastTradePrice']) diff --git a/lbrynet/lbrynet_daemon/Publisher.py b/lbrynet/lbrynet_daemon/Publisher.py index 6540d6016..7a6e73ee3 100644 --- a/lbrynet/lbrynet_daemon/Publisher.py +++ b/lbrynet/lbrynet_daemon/Publisher.py @@ -9,7 +9,6 @@ from appdirs import user_data_dir from lbrynet.core.Error import InsufficientFundsError from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob -from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.metadata.Metadata import Metadata from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet import reflector @@ -102,7 +101,7 @@ class Publisher(object): def add_to_lbry_files(self, stream_hash): self.stream_hash = stream_hash - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) d.addCallback(self.set_lbry_file) return d diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index ce3d3164d..9bfe49ffd 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -13,13 +13,10 @@ from Crypto.Hash import MD5 from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.conf import MIN_BLOB_INFO_PAYMENT_RATE from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator -from lbrynet.lbrylive.PaymentRateManager import BaseLiveStreamPaymentRateManager -from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, DBEncryptedFileMetadataManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager -from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.PTCWallet import PointTraderKeyQueryHandlerFactory, PointTraderKeyExchanger from lbrynet.core.Session import Session from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader @@ -29,22 +26,19 @@ from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.StreamDescriptor import get_sd_info -from twisted.internet import defer, threads, task, error +from twisted.internet import defer, threads, task from twisted.trial.unittest import TestCase from twisted.python.failure import Failure import os +from lbrynet.dht.node import Node +from tests.mocks import DummyBlobAvailabilityTracker from lbrynet.core.PeerManager import PeerManager from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier -from lbrynet.core.BlobManager import TempBlobManager -from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory -from lbrynet.reflector.server.server import ReflectorServerFactory -from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob log_format = "%(funcName)s(): %(message)s" @@ -106,6 +100,9 @@ class FakeWallet(object): def set_public_key_for_peer(self, peer, public_key): pass + def get_claim_metadata_for_sd_hash(self, sd_hash): + return "fakeuri", "faketxid" + class FakePeerFinder(object): def __init__(self, start_port, peer_manager, num_peers): @@ -212,16 +209,12 @@ test_create_stream_sd_file = { def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None): - - sys.modules = sys.modules.copy() - - del sys.modules['twisted.internet.reactor'] - - import twisted.internet - - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor + if sys.platform.startswith("linux"): + sys.modules = sys.modules.copy() + del sys.modules['twisted.internet.reactor'] + import twisted.internet + twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() + sys.modules['twisted.internet.reactor'] = twisted.internet.reactor from twisted.internet import reactor @@ -239,12 +232,14 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat rate_limiter = RateLimiter() sd_identifier = StreamDescriptorIdentifier() + db_dir = "server" os.mkdir(db_dir) session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, + dht_node_class=Node) stream_info_manager = TempEncryptedFileMetadataManager() @@ -274,9 +269,8 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat server_port = None query_handler_factories = { - BlobAvailabilityHandlerFactory(session.blob_manager): True, BlobRequestHandlerFactory(session.blob_manager, session.wallet, - PaymentRateManager(session.base_payment_rate_manager)): True, + session.payment_rate_manager): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -322,20 +316,18 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat sd_hash_queue.put(sd_hash) reactor.callLater(1, start_all) - reactor.run() + if not reactor.running: + reactor.run() def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None): - sys.modules = sys.modules.copy() - - del sys.modules['twisted.internet.reactor'] - - import twisted.internet - - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor + if sys.platform.startswith("linux"): + sys.modules = sys.modules.copy() + del sys.modules['twisted.internet.reactor'] + import twisted.internet + twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() + sys.modules['twisted.internet.reactor'] = twisted.internet.reactor from twisted.internet import reactor @@ -362,7 +354,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n), peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, peer_port=peer_port, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) stream_info_manager = TempEncryptedFileMetadataManager() @@ -379,7 +371,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra return factories[0].make_downloader(metadata, chosen_options, prm) def download_file(): - prm = PaymentRateManager(session.base_payment_rate_manager) + prm = session.payment_rate_manager d = download_sd_blob(session, sd_hash, prm) d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(make_downloader, prm) @@ -402,9 +394,8 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra server_port = None query_handler_factories = { - BlobAvailabilityHandlerFactory(session.blob_manager): True, BlobRequestHandlerFactory(session.blob_manager, session.wallet, - PaymentRateManager(session.base_payment_rate_manager)): True, + session.payment_rate_manager): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -438,21 +429,18 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra d = task.deferLater(reactor, 1.0, start_transfer) d.addCallback(lambda _: start_server()) - - reactor.run() + if not reactor.running: + reactor.run() def start_live_server(sd_hash_queue, kill_event, dead_event): - sys.modules = sys.modules.copy() - - del sys.modules['twisted.internet.reactor'] - - import twisted.internet - - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor + if sys.platform.startswith("linux"): + sys.modules = sys.modules.copy() + del sys.modules['twisted.internet.reactor'] + import twisted.internet + twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() + sys.modules['twisted.internet.reactor'] = twisted.internet.reactor from twisted.internet import reactor @@ -470,18 +458,14 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + db_dir = "server" os.mkdir(db_dir) session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) - - base_payment_rate_manager = BaseLiveStreamPaymentRateManager(MIN_BLOB_INFO_PAYMENT_RATE) - data_payment_rate_manager = PaymentRateManager(session.base_payment_rate_manager) - payment_rate_manager = LiveStreamPaymentRateManager(base_payment_rate_manager, - data_payment_rate_manager) - + peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker) stream_info_manager = DBLiveStreamMetadataManager(session.db_dir, hash_announcer) logging.debug("Created the session") @@ -492,10 +476,9 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): logging.debug("Starting the server protocol") query_handler_factories = { CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet, - payment_rate_manager): True, - BlobAvailabilityHandlerFactory(session.blob_manager): True, + session.payment_rate_manager): True, BlobRequestHandlerFactory(session.blob_manager, session.wallet, - payment_rate_manager): True, + session.payment_rate_manager): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -563,12 +546,9 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): return d def enable_live_stream(): - base_live_stream_payment_rate_manager = BaseLiveStreamPaymentRateManager( - MIN_BLOB_INFO_PAYMENT_RATE - ) - add_live_stream_to_sd_identifier(sd_identifier, base_live_stream_payment_rate_manager) + add_live_stream_to_sd_identifier(sd_identifier, session.base_payment_rate_manager) add_full_live_stream_downloader_to_sd_identifier(session, stream_info_manager, sd_identifier, - base_live_stream_payment_rate_manager) + session.base_payment_rate_manager) def run_server(): d = session.setup() @@ -581,20 +561,18 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): return d reactor.callLater(1, run_server) - reactor.run() + if not reactor.running: + reactor.run() def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): - sys.modules = sys.modules.copy() - - del sys.modules['twisted.internet.reactor'] - - import twisted.internet - - twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() - - sys.modules['twisted.internet.reactor'] = twisted.internet.reactor + if sys.platform.startswith("linux"): + sys.modules = sys.modules.copy() + del sys.modules['twisted.internet.reactor'] + import twisted.internet + twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor() + sys.modules['twisted.internet.reactor'] = twisted.internet.reactor from twisted.internet import reactor @@ -621,7 +599,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) if slow is True: session.rate_limiter.set_ul_limit(2**11) @@ -643,9 +621,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): server_port = None query_handler_factories = { - BlobAvailabilityHandlerFactory(session.blob_manager): True, - BlobRequestHandlerFactory(session.blob_manager, session.wallet, - PaymentRateManager(session.base_payment_rate_manager)): True, + BlobRequestHandlerFactory(session.blob_manager, session.wallet, session.payment_rate_manager): True, session.wallet.get_wallet_info_query_handler_factory(): True, } @@ -686,7 +662,8 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): logging.debug("blob hash has been added to the queue") reactor.callLater(1, start_all) - reactor.run() + if not reactor.running: + reactor.run() class TestTransfer(TestCase): @@ -768,7 +745,6 @@ class TestTransfer(TestCase): return d - @unittest.skip("Sadly skipping failing test instead of fixing it") def test_lbry_transfer(self): sd_hash_queue = Queue() kill_event = Event() @@ -786,6 +762,7 @@ class TestTransfer(TestCase): rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") os.mkdir(db_dir) @@ -794,7 +771,8 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, + dht_node_class=Node) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -808,7 +786,7 @@ class TestTransfer(TestCase): return factories[0].make_downloader(metadata, chosen_options, prm) def download_file(sd_hash): - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager d = download_sd_blob(self.session, sd_hash, prm) d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(make_downloader, prm) @@ -855,7 +833,7 @@ class TestTransfer(TestCase): return d - @require_system('Linux') + @unittest.skip("Sadly skipping failing test instead of fixing it") def test_live_transfer(self): sd_hash_queue = Queue() @@ -878,7 +856,8 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, - peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node) self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer) @@ -893,12 +872,10 @@ class TestTransfer(TestCase): def start_lbry_file(lbry_file): lbry_file = lbry_file - logging.debug("Calling lbry_file.start()") return lbry_file.start() def download_stream(sd_blob_hash): - logging.debug("Downloaded the sd blob. Reading it now") - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager d = download_sd_blob(self.session, sd_blob_hash, prm) d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(create_downloader, prm) @@ -907,20 +884,17 @@ class TestTransfer(TestCase): def do_download(sd_blob_hash): logging.debug("Starting the download") + d = self.session.setup() d.addCallback(lambda _: enable_live_stream()) d.addCallback(lambda _: download_stream(sd_blob_hash)) return d def enable_live_stream(): - base_live_stream_payment_rate_manager = BaseLiveStreamPaymentRateManager( - MIN_BLOB_INFO_PAYMENT_RATE - ) - add_live_stream_to_sd_identifier(sd_identifier, - base_live_stream_payment_rate_manager) + add_live_stream_to_sd_identifier(sd_identifier, self.session.payment_rate_manager) add_full_live_stream_downloader_to_sd_identifier(self.session, self.stream_info_manager, sd_identifier, - base_live_stream_payment_rate_manager) + self.session.payment_rate_manager) d.addCallback(do_download) @@ -951,7 +925,6 @@ class TestTransfer(TestCase): d.addBoth(stop) return d - @require_system('Linux') def test_last_blob_retrieval(self): kill_event = Event() @@ -976,6 +949,7 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") os.mkdir(db_dir) @@ -984,7 +958,7 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) @@ -997,7 +971,7 @@ class TestTransfer(TestCase): d.addCallback(get_blob_hash) def download_blob(blob_hash): - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager downloader = StandaloneBlobDownloader(blob_hash, self.session.blob_manager, peer_finder, rate_limiter, prm, wallet) d = downloader.download() @@ -1036,7 +1010,6 @@ class TestTransfer(TestCase): return d - @unittest.skip("Sadly skipping failing test instead of fixing it") def test_double_download(self): sd_hash_queue = Queue() kill_event = Event() @@ -1054,6 +1027,7 @@ class TestTransfer(TestCase): rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + downloaders = [] db_dir = "client" @@ -1064,10 +1038,9 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, - rate_limiter=rate_limiter, wallet=wallet) + rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) - self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) def make_downloader(metadata, prm): @@ -1082,7 +1055,7 @@ class TestTransfer(TestCase): return downloader def download_file(sd_hash): - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager d = download_sd_blob(self.session, sd_hash, prm) d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(make_downloader, prm) @@ -1115,7 +1088,6 @@ class TestTransfer(TestCase): return d def start_transfer(sd_hash): - logging.debug("Starting the transfer") d = self.session.setup() @@ -1172,6 +1144,7 @@ class TestTransfer(TestCase): rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") os.mkdir(db_dir) @@ -1180,7 +1153,7 @@ class TestTransfer(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -1205,7 +1178,7 @@ class TestTransfer(TestCase): return factories[0].make_downloader(metadata, chosen_options, prm) def download_file(sd_hash): - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager d = download_sd_blob(self.session, sd_hash, prm) d.addCallback(sd_identifier.get_metadata_for_sd_blob) d.addCallback(make_downloader, prm) @@ -1290,6 +1263,7 @@ class TestStreamify(TestCase): rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") os.mkdir(db_dir) @@ -1298,7 +1272,7 @@ class TestStreamify(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -1342,6 +1316,7 @@ class TestStreamify(TestCase): rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") os.mkdir(db_dir) @@ -1350,7 +1325,7 @@ class TestStreamify(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) @@ -1363,7 +1338,7 @@ class TestStreamify(TestCase): def combine_stream(stream_hash): - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) d.addCallback(start_lbry_file) diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index c3ce00ca5..7dd0ad6e7 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -12,6 +12,7 @@ from lbrynet.core import PeerManager from lbrynet.core import RateLimiter from lbrynet.core import Session from lbrynet.core import StreamDescriptor +from lbrynet.dht.node import Node from lbrynet.lbryfile import EncryptedFileMetadataManager from lbrynet.lbryfile.client import EncryptedFileOptions from lbrynet.lbryfilemanager import EncryptedFileCreator @@ -91,7 +92,9 @@ class TestReflector(unittest.TestCase): peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, - wallet=wallet + wallet=wallet, + blob_tracker_class=mocks.DummyBlobAvailabilityTracker, + dht_node_class=Node ) self.stream_info_manager = EncryptedFileMetadataManager.TempEncryptedFileMetadataManager() diff --git a/tests/mocks.py b/tests/mocks.py index 6ea183195..40e37dcca 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -1,9 +1,11 @@ import io from Crypto.PublicKey import RSA +from decimal import Decimal from twisted.internet import defer, threads, task, error from lbrynet.core import PTCWallet +from lbrynet.core.BlobAvailability import BlobAvailabilityTracker class Node(object): @@ -134,6 +136,43 @@ class GenFile(io.RawIOBase): return output +class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): + """ + Class to track peer counts for known blobs, and to discover new popular blobs + + Attributes: + availability (dict): dictionary of peers for known blobs + """ + + def __init__(self, blob_manager=None, peer_finder=None, dht_node=None): + self.availability = { + '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7': ['1.2.3.4'], + 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0': ['1.2.3.4', '1.2.3.4'], + '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4'], + '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], + } + self.last_mean_availability = Decimal(0.0) + self._blob_manager = None + self._peer_finder = PeerFinder(11223, 11224, 2) + self._dht_node = None + self._check_popular = None + self._check_mine = None + self._get_mean_peers() + + def start(self): + pass + + def stop(self): + pass + + + create_stream_sd_file = { 'stream_name': '746573745f66696c65', 'blobs': [ diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index 5c55af574..31d7e48ee 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -1,24 +1,24 @@ import StringIO import mock -from twisted.internet import defer, protocol +from twisted.internet import defer from twisted.test import proto_helpers from twisted.trial import unittest from lbrynet.core import Peer from lbrynet.core.server import BlobRequestHandler +from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager +from tests.mocks import DummyBlobAvailabilityTracker class TestBlobRequestHandlerQueries(unittest.TestCase): def setUp(self): self.blob_manager = mock.Mock() - self.payment_rate_manager = mock.Mock() - self.handler = BlobRequestHandler.BlobRequestHandler( - self.blob_manager, None, self.payment_rate_manager) + self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) + self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager) def test_empty_response_when_empty_query(self): - self.assertEqual( - {}, self.successResultOf(self.handler.handle_queries({}))) + self.assertEqual({}, self.successResultOf(self.handler.handle_queries({}))) def test_error_set_when_rate_is_missing(self): query = {'requested_blob': 'blob'} @@ -27,9 +27,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_error_set_when_rate_too_low(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = False query = { - 'blob_data_payment_rate': 'way_too_low', + 'blob_data_payment_rate': '-1.0', 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -40,9 +39,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_response_when_rate_too_low(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = False query = { - 'blob_data_payment_rate': 'way_too_low', + 'blob_data_payment_rate': '-1.0', } deferred = self.handler.handle_queries(query) response = { @@ -51,12 +49,11 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_blob_unavailable_when_blob_not_validated(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = True blob = mock.Mock() blob.is_validated.return_value = False self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { - 'blob_data_payment_rate': 'rate', + 'blob_data_payment_rate': 1.0, 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -67,13 +64,12 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_blob_unavailable_when_blob_cannot_be_opened(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = True blob = mock.Mock() blob.is_validated.return_value = True blob.open_for_reading.return_value = None self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { - 'blob_data_payment_rate': 'rate', + 'blob_data_payment_rate': 0.0, 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -84,15 +80,17 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_blob_details_are_set_when_all_conditions_are_met(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = True blob = mock.Mock() blob.is_validated.return_value = True blob.open_for_reading.return_value = True blob.blob_hash = 'DEADBEEF' blob.length = 42 + peer = mock.Mock() + peer.host = "1.2.3.4" + self.handler.peer = peer self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { - 'blob_data_payment_rate': 'rate', + 'blob_data_payment_rate': 1.0, 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -103,7 +101,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): 'length': 42 } } - self.assertEqual(response, self.successResultOf(deferred)) + result = self.successResultOf(deferred) + self.assertEqual(response, result) class TestBlobRequestHandlerSender(unittest.TestCase): diff --git a/tests/unit/core/test_Strategy.py b/tests/unit/core/test_Strategy.py new file mode 100644 index 000000000..62e18d7f7 --- /dev/null +++ b/tests/unit/core/test_Strategy.py @@ -0,0 +1,125 @@ +import itertools +from twisted.trial import unittest +import random +import mock +from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager +from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy +from lbrynet.core.Offer import Offer +from tests.mocks import DummyBlobAvailabilityTracker + +MAX_NEGOTIATION_TURNS = 10 +random.seed(12345) + + +def get_random_sample(list_to_sample): + result = list_to_sample[random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))] + if not result: + return get_random_sample(list_to_sample) + return result + + +def calculate_negotation_turns(client_base, host_base, host_is_generous=True, client_is_generous=True): + blobs = [ + 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', + 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', + '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd', + 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0', + '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c', + '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7', + '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07', + '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd', + '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6', + 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' + ] + + host = mock.Mock() + host.host = "1.2.3.4" + client = mock.Mock() + client.host = "1.2.3.5" + + client_base_prm = BasePaymentRateManager(client_base) + client_prm = NegotiatedPaymentRateManager(client_base_prm, + DummyBlobAvailabilityTracker(), + generous=client_is_generous) + host_base_prm = BasePaymentRateManager(host_base) + host_prm = NegotiatedPaymentRateManager(host_base_prm, + DummyBlobAvailabilityTracker(), + generous=host_is_generous) + blobs_to_query = get_random_sample(blobs) + accepted = False + turns = 0 + while not accepted: + rate = client_prm.get_rate_blob_data(host, blobs_to_query) + offer = Offer(rate) + accepted = host_prm.accept_rate_blob_data(client, blobs_to_query, offer) + turns += 1 + return turns + + +class AvailabilityWeightedStrategyTests(unittest.TestCase): + def test_first_offer_is_zero_and_second_is_not_if_offer_not_accepted(self): + strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) + peer = "1.1.1.1" + + blobs = strategy.price_model.blob_tracker.availability.keys() + offer1 = strategy.make_offer(peer, blobs) + + offer2 = strategy.make_offer(peer, blobs) + + self.assertEquals(offer1.rate, 0.0) + self.assertNotEqual(offer2.rate, 0.0) + + def test_accept_zero_and_persist_if_accepted(self): + host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) + client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) + + client = "1.1.1.1" + host = "1.1.1.2" + blobs = host_strategy.price_model.blob_tracker.availability.keys() + + offer = client_strategy.make_offer(host, blobs) + response1 = host_strategy.respond_to_offer(offer, client, blobs) + client_strategy.update_accepted_offers(host, response1) + + offer = client_strategy.make_offer(host, blobs) + response2 = host_strategy.respond_to_offer(offer, client, blobs) + client_strategy.update_accepted_offers(host, response2) + + self.assertEquals(response1.is_too_low, False) + self.assertEquals(response1.is_accepted, True) + self.assertEquals(response1.rate, 0.0) + + self.assertEquals(response2.is_too_low, False) + self.assertEquals(response2.is_accepted, True) + self.assertEquals(response2.rate, 0.0) + + def test_how_many_turns_before_accept_with_similar_rate_settings(self): + base_rates = [0.0001 * n for n in range(1, 10)] + for host_base, client_base in itertools.product(base_rates, base_rates): + turns = calculate_negotation_turns(host_base, + client_base, + client_is_generous=False, + host_is_generous=False) + self.assertGreater(MAX_NEGOTIATION_TURNS, turns) + + def test_generous_connects_in_one_turn(self): + base_rates = [0.0001 * n for n in range(1, 10)] + for host_base, client_base in itertools.product(base_rates, base_rates): + turns = calculate_negotation_turns(host_base, client_base) + self.assertEqual(1, turns) + + def test_how_many_turns_with_generous_client(self): + base_rates = [0.0001 * n for n in range(1, 10)] + for host_base, client_base in itertools.product(base_rates, base_rates): + turns = calculate_negotation_turns(host_base, + client_base, + host_is_generous=False) + self.assertGreater(MAX_NEGOTIATION_TURNS, turns) + + def test_how_many_turns_with_generous_host(self): + base_rates = [0.0001 * n for n in range(1, 10)] + for host_base, client_base in itertools.product(base_rates, base_rates): + turns = calculate_negotation_turns(host_base, + client_base, + client_is_generous=False) + self.assertGreater(MAX_NEGOTIATION_TURNS, turns)