diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 785d3d42f..4f895d69b 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -3,7 +3,7 @@ Some network wide and also application specific parameters """ import os - +is_generous_host = True IS_DEVELOPMENT_VERSION = False MAX_HANDSHAKE_SIZE = 2**16 diff --git a/lbrynet/core/BlobAvailability.py b/lbrynet/core/BlobAvailability.py index f003b2770..6cfe52852 100644 --- a/lbrynet/core/BlobAvailability.py +++ b/lbrynet/core/BlobAvailability.py @@ -17,7 +17,7 @@ class BlobAvailabilityTracker(object): def __init__(self, blob_manager, peer_finder, dht_node): self.availability = {} - self.last_mean_availability = Decimal(0.0) + self._last_mean_availability = Decimal(0.0) self._blob_manager = blob_manager self._peer_finder = peer_finder self._dht_node = dht_node @@ -50,6 +50,11 @@ class BlobAvailabilityTracker(object): d.addCallback(lambda results: [val for success, val in results if success]) return d + @property + def last_mean_availability(self): + return max(Decimal(0.01), self._last_mean_availability) + + def _update_peers_for_blob(self, blob): def _save_peer_info(blob_hash, peers): v = {blob_hash: peers} @@ -86,4 +91,4 @@ class BlobAvailabilityTracker(object): def _get_mean_peers(self): num_peers = [len(self.availability[blob]) for blob in self.availability] mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers))) - self.last_mean_availability = mean + self._last_mean_availability = mean diff --git a/lbrynet/core/PaymentRateManager.py b/lbrynet/core/PaymentRateManager.py index 914e21947..6e7c02047 100644 --- a/lbrynet/core/PaymentRateManager.py +++ b/lbrynet/core/PaymentRateManager.py @@ -1,5 +1,7 @@ from lbrynet.core.Strategy import get_default_strategy -from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE, is_generous_host +from decimal import Decimal + class BasePaymentRateManager(object): def __init__(self, rate=MIN_BLOB_DATA_PAYMENT_RATE, info_rate=MIN_BLOB_INFO_PAYMENT_RATE): @@ -34,7 +36,7 @@ class PaymentRateManager(object): class NegotiatedPaymentRateManager(object): - def __init__(self, base, availability_tracker, generous=True): + def __init__(self, base, availability_tracker, generous=is_generous_host): """ @param base: a BasePaymentRateManager @param availability_tracker: a BlobAvailabilityTracker @@ -70,4 +72,10 @@ class NegotiatedPaymentRateManager(object): self.points_paid += amount def record_offer_reply(self, peer, offer): - self.strategy.update_accepted_offers(peer, offer) \ No newline at end of file + self.strategy.update_accepted_offers(peer, offer) + + def price_limit_reached(self, peer): + if peer in self.strategy.pending_sent_offers: + offer = self.strategy.pending_sent_offers[peer] + return offer.is_too_low and round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5) + return False \ No newline at end of file diff --git a/lbrynet/core/PriceModel.py b/lbrynet/core/PriceModel.py index 299d50223..71ec87769 100644 --- a/lbrynet/core/PriceModel.py +++ b/lbrynet/core/PriceModel.py @@ -30,9 +30,12 @@ class MeanAvailabilityWeightedPrice(object): mean_availability = self.blob_tracker.last_mean_availability availability = self.blob_tracker.availability.get(blob, []) index = 0 # blob.index - price = self.base_price * (mean_availability / Decimal(max(1, len(availability)))) / self._frontload(index) + price = self.base_price * self._get_availability_multiplier(mean_availability, availability) / self._frontload(index) return round(price, 5) + def _get_availability_multiplier(self, mean_availability, availability): + return Decimal(max(1, mean_availability) / Decimal(max(1, len(availability)))) + def _frontload(self, index): """ Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream. diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 1b0fb700b..ff5cf80d7 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -29,7 +29,7 @@ class Session(object): def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, - blob_tracker_class=None, payment_rate_manager_class=None): + blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True): """ @param blob_data_payment_rate: The default payment rate for blob data @@ -109,6 +109,7 @@ class Session(object): self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.payment_rate_manager = None self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager + self.is_generous = is_generous def setup(self): """Create the blob directory and database if necessary, start all desired services""" @@ -271,7 +272,8 @@ class Session(object): self.dht_node) if self.payment_rate_manager is None: self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager, - self.blob_tracker) + self.blob_tracker, + self.is_generous) self.rate_limiter.start() d1 = self.blob_manager.setup() diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py index 80fff00cb..f25531652 100644 --- a/lbrynet/core/Strategy.py +++ b/lbrynet/core/Strategy.py @@ -1,5 +1,6 @@ from zope.interface import implementer from decimal import Decimal +from lbrynet.conf import is_generous_host from lbrynet.interfaces import INegotiationStrategy from lbrynet.core.Offer import Offer from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice @@ -15,13 +16,14 @@ class Strategy(object): """ implementer(INegotiationStrategy) - def __init__(self, price_model, max_rate, min_rate, is_generous=True): + def __init__(self, price_model, max_rate, min_rate, is_generous=is_generous_host): self.price_model = price_model self.is_generous = is_generous self.accepted_offers = {} + self.pending_sent_offers = {} self.offers_sent = {} self.offers_received = {} - self.max_rate = max_rate or Decimal(self.price_model.base_price * 100) + self.max_rate = max_rate or Decimal(self.price_model.base_price * 50) self.min_rate = Decimal(min_rate) def _make_rate_offer(self, rates, offer_count): @@ -36,13 +38,17 @@ class Strategy(object): if peer in self.accepted_offers: # if there was a previous accepted offer, use that offer = self.accepted_offers[peer] + if peer in self.pending_sent_offers: + del self.pending_sent_offers[peer] elif offer_count == 0 and self.is_generous: # Try asking for it for free offer = Offer(Decimal(0.0)) + self.pending_sent_offers.update({peer: offer}) else: rates = [self.price_model.calculate_price(blob) for blob in blobs] price = self._make_rate_offer(rates, offer_count) offer = Offer(price) + self.pending_sent_offers.update({peer: offer}) return offer def respond_to_offer(self, offer, peer, blobs): @@ -50,7 +56,6 @@ class Strategy(object): self._add_offer_received(peer) rates = [self.price_model.calculate_price(blob) for blob in blobs] price = self._get_response_rate(rates, offer_count) - if peer in self.accepted_offers: offer = self.accepted_offers[peer] elif offer.rate == 0.0 and offer_count == 0 and self.is_generous: @@ -71,6 +76,7 @@ class Strategy(object): del self.accepted_offers[peer] if offer.is_accepted: self.accepted_offers.update({peer: offer}) + self.pending_sent_offers.update({peer: offer}) def _add_offer_sent(self, peer): turn = self.offers_sent.get(peer, 0) + 1 @@ -95,7 +101,7 @@ class BasicAvailabilityWeightedStrategy(Strategy): implementer(INegotiationStrategy) def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0, - is_generous=True, base_price=0.0001, alpha=1.0): + is_generous=is_generous_host, base_price=0.0001, alpha=1.0): price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha) Strategy.__init__(self, price_model, max_rate, min_rate, is_generous) self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 896d71de8..528ad983f 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -385,7 +385,7 @@ class Wallet(object): def _get_my_unspent_claim(claims): for claim in claims: - if claim['name'] == name and not claim['is spent']: + if claim['name'] == name and not claim['is spent'] and not claim.get('supported_claimid'): return claim return False diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 8c59bc04f..a0e724c3c 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -51,8 +51,10 @@ class BlobRequester(object): self._available_blobs = defaultdict(list) # {Peer: [blob_hash]} self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}} self._protocol_prices = {} # {ClientProtocol: price} + self._protocol_offers = {} self._price_disagreements = [] # [Peer] self._protocol_tries = {} + self._maxed_out_peers = [] self._incompatible_peers = [] ######## IRequestCreator ######### @@ -84,17 +86,18 @@ class BlobRequester(object): if availability.can_make_request(): availability.make_request_and_handle_response() sent_request = True - + if price.can_make_request(): + # TODO: document why a PriceRequest is only made if an + # Availability or Download request was made + price.make_request_and_handle_response() + sent_request = True if download.can_make_request(): try: download.make_request_and_handle_response() sent_request = True except InsufficientFundsError as err: return defer.fail(err) - if sent_request and price.can_make_request(): - # TODO: document why a PriceRequest is only made if an - # Availability or Download request was made - price.make_request_and_handle_response() + return defer.succeed(sent_request) def _get_hash_for_peer_search(self): @@ -118,7 +121,9 @@ class BlobRequester(object): def choose_best_peers(peers): bad_peers = self._get_bad_peers() - return [p for p in peers if not p in bad_peers] + without_bad_peers = [p for p in peers if not p in bad_peers] + without_maxed_out_peers = [p for p in without_bad_peers if p not in self._maxed_out_peers] + return without_maxed_out_peers d.addCallback(choose_best_peers) @@ -182,6 +187,10 @@ class RequestHelper(object): def protocol_prices(self): return self.requestor._protocol_prices + @property + def protocol_offers(self): + return self.requestor._protocol_offers + @property def available_blobs(self): return self.requestor._available_blobs[self.peer] @@ -190,6 +199,10 @@ class RequestHelper(object): def unavailable_blobs(self): return self.requestor._unavailable_blobs[self.peer] + @property + def maxed_out_peers(self): + return self.requestor._maxed_out_peers + def update_local_score(self, score): self.requestor._update_local_score(self.peer, score) @@ -210,10 +223,18 @@ class RequestHelper(object): return reason def get_and_save_rate(self): + if self.payment_rate_manager.price_limit_reached(self.peer): + if self.peer not in self.maxed_out_peers: + self.maxed_out_peers.append(self.peer) + return None rate = self.protocol_prices.get(self.protocol) if rate is None: + if self.peer in self.payment_rate_manager.strategy.pending_sent_offers: + pending = self.payment_rate_manager.strategy.pending_sent_offers[self.peer] + if not pending.is_too_low and not pending.is_accepted: + return pending.rate rate = self.payment_rate_manager.get_rate_blob_data(self.peer, self.available_blobs) - self.protocol_prices[self.protocol] = rate + self.protocol_offers[self.protocol] = rate return rate @@ -337,7 +358,9 @@ class AvailabilityRequest(RequestHelper): class PriceRequest(RequestHelper): """Ask a peer if a certain price is acceptable""" def can_make_request(self): - return self.get_and_save_rate() is not None + if len(self.available_blobs) and not self.protocol in self.protocol_prices: + return self.get_and_save_rate() is not None + return False def make_request_and_handle_response(self): request = self._get_price_request() @@ -362,22 +385,19 @@ class PriceRequest(RequestHelper): assert request.response_identifier == 'blob_data_payment_rate' if 'blob_data_payment_rate' not in response_dict: return InvalidResponseError("response identifier not in response") - assert self.protocol in self.protocol_prices - rate = self.protocol_prices[self.protocol] - offer = Offer(rate) + assert self.protocol in self.protocol_offers + offer = Offer(self.protocol_offers[self.protocol]) offer.handle(response_dict['blob_data_payment_rate']) - self.payment_rate_manager.record_offer_reply(self.peer.host, offer) - + self.payment_rate_manager.record_offer_reply(self.peer, offer) if offer.is_accepted: - log.debug("Offered rate %f/mb accepted by %s", rate, str(self.peer.host)) + log.info("Offered rate %f/mb accepted by %s", offer.rate, self.peer.host) + self.protocol_prices[self.protocol] = offer.rate return True elif offer.is_too_low: - log.debug("Offered rate %f/mb rejected by %s", rate, str(self.peer.host)) - del self.protocol_prices[self.protocol] - return True + log.debug("Offered rate %f/mb rejected by %s", offer.rate, self.peer.host) + return not self.payment_rate_manager.price_limit_reached(self.peer) else: log.warning("Price disagreement") - del self.protocol_prices[self.protocol] self.requestor._price_disagreements.append(self.peer) return False @@ -389,7 +409,9 @@ class DownloadRequest(RequestHelper): self.wallet = wallet def can_make_request(self): - return self.get_blob_details() + if self.protocol in self.protocol_prices: + return self.get_blob_details() + return False def make_request_and_handle_response(self): request = self._get_request() diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 94e329da4..15874c215 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -69,6 +69,8 @@ class BlobRequestHandler(object): if self.PAYMENT_RATE_QUERY in queries: offered_rate = queries[self.PAYMENT_RATE_QUERY] offer = Offer(offered_rate) + if offer.rate is None: + log.warning("Empty rate offer") response.addCallback(lambda r: self._handle_payment_rate_query(offer, r)) if self.BLOB_QUERY in queries: incoming = queries[self.BLOB_QUERY] diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 4b26f113c..44fa1902b 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -48,7 +48,7 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, \ LOG_POST_URL, LOG_FILE_NAME, REFLECTOR_SERVERS, SEARCH_SERVERS from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT -from lbrynet.conf import DEFAULT_TIMEOUT +from lbrynet.conf import DEFAULT_TIMEOUT, is_generous_host from lbrynet import conf from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader from lbrynet.core.Session import Session @@ -1063,7 +1063,8 @@ class Daemon(jsonrpc.JSONRPC): self.session = Session(results['default_data_payment_rate'], db_dir=self.db_dir, lbryid=self.lbryid, blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port, known_dht_nodes=self.known_dht_nodes, peer_port=self.peer_port, - use_upnp=self.use_upnp, wallet=results['wallet']) + use_upnp=self.use_upnp, wallet=results['wallet'], + is_generous=is_generous_host) self.startup_status = STARTUP_STAGES[2] dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 9bfe49ffd..8b8ec0de8 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -11,7 +11,6 @@ from Crypto.PublicKey import RSA from Crypto import Random from Crypto.Hash import MD5 from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE -from lbrynet.conf import MIN_BLOB_INFO_PAYMENT_RATE from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager @@ -208,7 +207,7 @@ test_create_stream_sd_file = { 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'} -def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None): +def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None, is_generous=False): if sys.platform.startswith("linux"): sys.modules = sys.modules.copy() del sys.modules['twisted.internet.reactor'] @@ -237,9 +236,9 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat os.mkdir(db_dir) session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node) + peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, + dht_node_class=Node, is_generous=is_generous) stream_info_manager = TempEncryptedFileMetadataManager() @@ -320,7 +319,7 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat reactor.run() -def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None): +def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None, is_generous=False): if sys.platform.startswith("linux"): sys.modules = sys.modules.copy() @@ -354,7 +353,8 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n), peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, peer_port=peer_port, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=is_generous) stream_info_manager = TempEncryptedFileMetadataManager() @@ -565,7 +565,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event): reactor.run() -def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): +def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False): if sys.platform.startswith("linux"): sys.modules = sys.modules.copy() @@ -599,7 +599,8 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow): session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=is_generous) if slow is True: session.rate_limiter.set_ul_limit(2**11) @@ -672,6 +673,7 @@ class TestTransfer(TestCase): self.session = None self.stream_info_manager = None self.lbry_file_manager = None + self.is_generous = True self.addCleanup(self.take_down_env) def take_down_env(self): @@ -772,7 +774,7 @@ class TestTransfer(TestCase): peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node) + dht_node_class=Node, is_generous=self.is_generous) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -1235,6 +1237,7 @@ class TestStreamify(TestCase): self.stream_info_manager = None self.lbry_file_manager = None self.addCleanup(self.take_down_env) + self.is_generous = True def take_down_env(self): @@ -1255,7 +1258,6 @@ class TestStreamify(TestCase): return d def test_create_stream(self): - wallet = FakeWallet() peer_manager = PeerManager() peer_finder = FakePeerFinder(5553, peer_manager, 2) @@ -1272,7 +1274,8 @@ class TestStreamify(TestCase): self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker) + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=self.is_generous) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -1316,7 +1319,6 @@ class TestStreamify(TestCase): rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - db_dir = "client" blob_dir = os.path.join(db_dir, "blobfiles") os.mkdir(db_dir) diff --git a/tests/mocks.py b/tests/mocks.py index 40e37dcca..1dbb3fdf3 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -157,7 +157,6 @@ class DummyBlobAvailabilityTracker(BlobAvailabilityTracker): 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'], } - self.last_mean_availability = Decimal(0.0) self._blob_manager = None self._peer_finder = PeerFinder(11223, 11224, 2) self._dht_node = None