From 967700dc2863dfe09324d548d99cde319c53bcbd Mon Sep 17 00:00:00 2001 From: Jack Date: Mon, 3 Oct 2016 02:44:58 -0400 Subject: [PATCH] BlobRequestHandler and Strategy tests --- lbrynet/core/Offer.py | 16 ++++- lbrynet/core/PriceModel.py | 15 +++-- lbrynet/core/Strategy.py | 18 ++++-- lbrynet/core/server/BlobRequestHandler.py | 24 +++++--- .../core/server/test_BlobRequestHandler.py | 31 +++++----- tests/unit/core/test_Strategy.py | 61 +++++++++++++++---- 6 files changed, 112 insertions(+), 53 deletions(-) diff --git a/lbrynet/core/Offer.py b/lbrynet/core/Offer.py index f19a62fc3..c236c9fa2 100644 --- a/lbrynet/core/Offer.py +++ b/lbrynet/core/Offer.py @@ -19,6 +19,8 @@ class Offer(object): self.accept() elif offer == Offer.RATE_TOO_LOW: self.reject() + else: + self.unset() @property def accepted(self): @@ -28,19 +30,27 @@ class Offer(object): def too_low(self): return self._state is Offer.RATE_TOO_LOW + @property + def is_unset(self): + return self._state is Offer.RATE_UNSET + @property def message(self): if self.accepted: return Offer.RATE_ACCEPTED elif self.too_low: return Offer.RATE_TOO_LOW - elif self.rate is None: + elif self.is_unset: return Offer.RATE_UNSET + return None def accept(self): - if self._state is None: + if self._state is None or self.is_unset: self._state = Offer.RATE_ACCEPTED def reject(self): - if self._state is None: + if self._state is None or self.is_unset: self._state = Offer.RATE_TOO_LOW + + def unset(self): + self._state = Offer.RATE_UNSET \ No newline at end of file diff --git a/lbrynet/core/PriceModel.py b/lbrynet/core/PriceModel.py index 1c1373727..f9468e85d 100644 --- a/lbrynet/core/PriceModel.py +++ b/lbrynet/core/PriceModel.py @@ -1,6 +1,10 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE +def get_default_price_model(blob_tracker, **kwargs): + return MeanAvailabilityWeightedPrice(blob_tracker, **kwargs) + + class MeanAvailabilityWeightedPrice(object): """ Calculate mean-blob-availability and stream-position weighted price for a blob @@ -13,18 +17,17 @@ class MeanAvailabilityWeightedPrice(object): blob_tracker (BlobAvailabilityTracker): blob availability tracker """ - def __init__(self, tracker, min_price=MIN_BLOB_DATA_PAYMENT_RATE, base_price=None, alpha=1.0): + def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0): self.blob_tracker = tracker - self.min_price = min_price - self.base_price = base_price if base_price is not None else min_price * 10 + self.base_price = base_price self.alpha = alpha def calculate_price(self, blob): mean_availability = self.blob_tracker.last_mean_availability availability = self.blob_tracker.availability.get(blob, []) - index = 0 # blob.index + index = 0 # blob.index price = self.base_price * (mean_availability / max(1, len(availability))) / self._frontload(index) - return round(max(self.min_price, price), 5) + return round(price, 5) def _frontload(self, index): """ @@ -34,4 +37,4 @@ class MeanAvailabilityWeightedPrice(object): @return: frontload multipler """ - return 2.0 - (self.alpha ** index) \ No newline at end of file + return 2.0 - (self.alpha ** index) diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py index ead935dc4..71d4ca0ea 100644 --- a/lbrynet/core/Strategy.py +++ b/lbrynet/core/Strategy.py @@ -1,7 +1,7 @@ import logging from lbrynet.core.Offer import Offer -from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice +from lbrynet.core.PriceModel import get_default_price_model log = logging.getLogger(__name__) @@ -18,7 +18,7 @@ class BasicAvailabilityWeightedStrategy(object): until the rate is accepted or a threshold is reached """ - def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0): + def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0, **kwargs): self._acceleration = acceleration # rate of how quickly to ramp offer self._deceleration = deceleration self._min_rate = min_rate @@ -27,7 +27,7 @@ class BasicAvailabilityWeightedStrategy(object): self._count_down = {} self._requested = {} self._offers_to_peers = {} - self.model = MeanAvailabilityWeightedPrice(blob_tracker) + self.model = get_default_price_model(blob_tracker, **kwargs) def respond_to_offer(self, offer, peer, blobs): request_count = self._count_up.get(peer, 0) @@ -53,18 +53,24 @@ class BasicAvailabilityWeightedStrategy(object): def make_offer(self, peer, blobs): # use mean turn-discounted price for all the blobs requested + # if there was a previous offer replied to, use the same rate if it was accepted + last_offer = self._offers_to_peers.get(peer, False) + if last_offer: + if last_offer.rate is not None and last_offer.accepted: + return last_offer + request_count = self._count_down.get(peer, 0) self._inc_down_count(peer) if request_count == 0: # Try asking for it for free - offer = Offer(0.0) + self._offers_to_peers.update({peer: Offer(0.0)}) else: rates = [self._calculate_price(blob) for blob in blobs] mean_rate = sum(rates) / max(len(blobs), 1) with_premium = self._premium(mean_rate, request_count) price = self._bounded_price(with_premium) - offer = Offer(price) - return offer + self._offers_to_peers.update({peer: Offer(price)}) + return self._offers_to_peers[peer] def _bounded_price(self, price): price_for_return = min(self._max_rate, max(price, self._min_rate)) diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 266ff82c0..0b9b5cb6b 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -71,8 +71,11 @@ class BlobRequestHandler(object): response.addCallback(lambda r: self._handle_payment_rate_query(offer, r)) if self.BLOB_QUERY in queries: - incoming = queries[self.BLOB_QUERY] - response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) + if self.PAYMENT_RATE_QUERY in queries: + incoming = queries[self.BLOB_QUERY] + response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) + else: + response.addCallback(lambda _: {'incoming_blob': {'error': 'RATE_UNSET'}}) return response @@ -80,11 +83,14 @@ class BlobRequestHandler(object): blobs = request.get("available_blobs", []) log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs)) accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer) - if accepted: + if offer.accepted: self.blob_data_payment_rate = offer.rate request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" - else: + elif offer.too_low: request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" + offer.unset() + elif offer.is_unset: + request['incoming_blob'] = {'error': 'RATE_UNSET'} return request def _handle_blob_query(self, response, query): @@ -92,8 +98,8 @@ class BlobRequestHandler(object): response['incoming_blob'] = {} if self.blob_data_payment_rate is None: - response['incoming_blob']['error'] = "RATE_UNSET" - return defer.succeed(response) + response['incoming_blob'] = {'error': "RATE_UNSET"} + return response else: return self._send_blob(response, query) @@ -105,7 +111,7 @@ class BlobRequestHandler(object): def open_blob_for_reading(self, blob, response): def failure(msg): log.warning("We can not send %s: %s", blob, msg) - response['incoming_blob']['error'] = "BLOB_UNAVAILABLE" + response['incoming_blob'] = {'error': 'BLOB_UNAVAILABLE'} return response if not blob.is_validated(): return failure("blob can't be validated") @@ -163,7 +169,7 @@ class BlobRequestHandler(object): d.addCallback(lambda _: response) return d log.warning("We can not send %s", str(blob)) - response['error'] = "BLOB_UNAVAILABLE" + response['incoming_blob'] = {'error': 'BLOB_UNAVAILABLE'} d.addCallback(lambda _: response) return d @@ -178,7 +184,7 @@ class BlobRequestHandler(object): if self.blob_data_payment_rate is None: log.warning("Rate not set yet") - response['error'] = "RATE_UNSET" + response['incoming_blob'] = {'error': 'RATE_UNSET'} return defer.succeed(response) else: log.debug("Requested blob: %s", str(incoming)) diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index 5c55af574..a54ca3303 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -7,18 +7,18 @@ from twisted.trial import unittest from lbrynet.core import Peer from lbrynet.core.server import BlobRequestHandler +from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager +from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker class TestBlobRequestHandlerQueries(unittest.TestCase): def setUp(self): self.blob_manager = mock.Mock() - self.payment_rate_manager = mock.Mock() - self.handler = BlobRequestHandler.BlobRequestHandler( - self.blob_manager, None, self.payment_rate_manager) + self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker()) + self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager) def test_empty_response_when_empty_query(self): - self.assertEqual( - {}, self.successResultOf(self.handler.handle_queries({}))) + self.assertEqual({}, self.successResultOf(self.handler.handle_queries({}))) def test_error_set_when_rate_is_missing(self): query = {'requested_blob': 'blob'} @@ -27,9 +27,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_error_set_when_rate_too_low(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = False query = { - 'blob_data_payment_rate': 'way_too_low', + 'blob_data_payment_rate': '-1.0', 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -40,9 +39,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_response_when_rate_too_low(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = False query = { - 'blob_data_payment_rate': 'way_too_low', + 'blob_data_payment_rate': '-1.0', } deferred = self.handler.handle_queries(query) response = { @@ -51,12 +49,11 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_blob_unavailable_when_blob_not_validated(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = True blob = mock.Mock() blob.is_validated.return_value = False self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { - 'blob_data_payment_rate': 'rate', + 'blob_data_payment_rate': 1.0, 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -67,13 +64,12 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_blob_unavailable_when_blob_cannot_be_opened(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = True blob = mock.Mock() blob.is_validated.return_value = True blob.open_for_reading.return_value = None self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { - 'blob_data_payment_rate': 'rate', + 'blob_data_payment_rate': 0.0, 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -84,15 +80,17 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): self.assertEqual(response, self.successResultOf(deferred)) def test_blob_details_are_set_when_all_conditions_are_met(self): - self.payment_rate_manager.accept_rate_blob_data.return_value = True blob = mock.Mock() blob.is_validated.return_value = True blob.open_for_reading.return_value = True blob.blob_hash = 'DEADBEEF' blob.length = 42 + peer = mock.Mock() + peer.host = "1.2.3.4" + self.handler.peer = peer self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { - 'blob_data_payment_rate': 'rate', + 'blob_data_payment_rate': 1.0, 'requested_blob': 'blob' } deferred = self.handler.handle_queries(query) @@ -103,7 +101,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): 'length': 42 } } - self.assertEqual(response, self.successResultOf(deferred)) + result = self.successResultOf(deferred) + self.assertEqual(response, result) class TestBlobRequestHandlerSender(unittest.TestCase): diff --git a/tests/unit/core/test_Strategy.py b/tests/unit/core/test_Strategy.py index 4f52c1754..a443c36e9 100644 --- a/tests/unit/core/test_Strategy.py +++ b/tests/unit/core/test_Strategy.py @@ -1,9 +1,16 @@ from twisted.trial import unittest +import random from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker +def get_random_sample(list_to_sample): + result = list_to_sample[random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))] + if not result: + return get_random_sample(list_to_sample) + return result -class StrategyTests(unittest.TestCase): + +class AvailabilityWeightedStrategyTests(unittest.TestCase): def test_first_offer_is_zero_and_second_isnt(self): strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) peer = "1.1.1.1" @@ -13,23 +20,51 @@ class StrategyTests(unittest.TestCase): self.assertEquals(offer1.rate, 0.0) self.assertNotEqual(offer2.rate, 0.0) - def test_accept_zero_for_first_offer_and_reject_after(self): + def test_accept_zero_and_persist(self): host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) client = "1.1.1.1" host = "1.1.1.2" blobs = host_strategy.model.blob_tracker.availability.keys() client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) - client_offer1 = client_strategy.make_offer(host, blobs) - client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) - client_offer2 = client_strategy.make_offer(host, blobs) + offer = client_strategy.make_offer(host, blobs) + response1 = host_strategy.respond_to_offer(offer, client, blobs) + offer = client_strategy.make_offer(host, blobs) + response2 = host_strategy.respond_to_offer(offer, client, blobs) - host_response1 = host_strategy.respond_to_offer(client_offer1, client, blobs) - host_response2 = host_strategy.respond_to_offer(client_offer2, client, blobs) + self.assertEquals(response1.too_low, False) + self.assertEquals(response1.accepted, True) + self.assertEquals(response1.rate, 0.0) - self.assertEquals(host_response2.too_low, False) - self.assertEquals(host_response1.accepted, True) - self.assertEquals(host_response1.rate, 0.0) + self.assertEquals(response2.too_low, False) + self.assertEquals(response2.accepted, True) + self.assertEquals(response2.rate, 0.0) - self.assertEquals(host_response2.too_low, True) - self.assertEquals(host_response2.accepted, False) - self.assertEquals(host_response2.rate, 0.0) \ No newline at end of file + def test_turns_before_accept_with_similar_rate_settings(self): + blobs = [ + 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0', + 'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787', + '5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd', + 'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0', + '9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c', + '91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7', + '6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07', + '6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd', + '8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6', + 'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c' + ] + for x in range(10): + client_base = 0.001 * x + for y in range(10): + host_base = 0.001 * y + client_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=client_base) + host_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=host_base) + for z in range(100): + blobs_to_query = get_random_sample(blobs) + accepted = False + turns = 0 + while not accepted: + offer = client_strat.make_offer("2.3.4.5", blobs_to_query) + response = host_strat.respond_to_offer(offer, "3.4.5.6", blobs_to_query) + accepted = response.accepted + turns += 1 + self.assertGreater(5, turns)