BlobRequestHandler and Strategy tests

This commit is contained in:
Jack 2016-10-03 02:44:58 -04:00
parent 5e4e619708
commit 967700dc28
6 changed files with 112 additions and 53 deletions

View file

@ -19,6 +19,8 @@ class Offer(object):
self.accept() self.accept()
elif offer == Offer.RATE_TOO_LOW: elif offer == Offer.RATE_TOO_LOW:
self.reject() self.reject()
else:
self.unset()
@property @property
def accepted(self): def accepted(self):
@ -28,19 +30,27 @@ class Offer(object):
def too_low(self): def too_low(self):
return self._state is Offer.RATE_TOO_LOW return self._state is Offer.RATE_TOO_LOW
@property
def is_unset(self):
return self._state is Offer.RATE_UNSET
@property @property
def message(self): def message(self):
if self.accepted: if self.accepted:
return Offer.RATE_ACCEPTED return Offer.RATE_ACCEPTED
elif self.too_low: elif self.too_low:
return Offer.RATE_TOO_LOW return Offer.RATE_TOO_LOW
elif self.rate is None: elif self.is_unset:
return Offer.RATE_UNSET return Offer.RATE_UNSET
return None
def accept(self): def accept(self):
if self._state is None: if self._state is None or self.is_unset:
self._state = Offer.RATE_ACCEPTED self._state = Offer.RATE_ACCEPTED
def reject(self): def reject(self):
if self._state is None: if self._state is None or self.is_unset:
self._state = Offer.RATE_TOO_LOW self._state = Offer.RATE_TOO_LOW
def unset(self):
self._state = Offer.RATE_UNSET

View file

@ -1,6 +1,10 @@
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
def get_default_price_model(blob_tracker, **kwargs):
return MeanAvailabilityWeightedPrice(blob_tracker, **kwargs)
class MeanAvailabilityWeightedPrice(object): class MeanAvailabilityWeightedPrice(object):
""" """
Calculate mean-blob-availability and stream-position weighted price for a blob Calculate mean-blob-availability and stream-position weighted price for a blob
@ -13,18 +17,17 @@ class MeanAvailabilityWeightedPrice(object):
blob_tracker (BlobAvailabilityTracker): blob availability tracker blob_tracker (BlobAvailabilityTracker): blob availability tracker
""" """
def __init__(self, tracker, min_price=MIN_BLOB_DATA_PAYMENT_RATE, base_price=None, alpha=1.0): def __init__(self, tracker, base_price=MIN_BLOB_DATA_PAYMENT_RATE, alpha=1.0):
self.blob_tracker = tracker self.blob_tracker = tracker
self.min_price = min_price self.base_price = base_price
self.base_price = base_price if base_price is not None else min_price * 10
self.alpha = alpha self.alpha = alpha
def calculate_price(self, blob): def calculate_price(self, blob):
mean_availability = self.blob_tracker.last_mean_availability mean_availability = self.blob_tracker.last_mean_availability
availability = self.blob_tracker.availability.get(blob, []) availability = self.blob_tracker.availability.get(blob, [])
index = 0 # blob.index index = 0 # blob.index
price = self.base_price * (mean_availability / max(1, len(availability))) / self._frontload(index) price = self.base_price * (mean_availability / max(1, len(availability))) / self._frontload(index)
return round(max(self.min_price, price), 5) return round(price, 5)
def _frontload(self, index): def _frontload(self, index):
""" """
@ -34,4 +37,4 @@ class MeanAvailabilityWeightedPrice(object):
@return: frontload multipler @return: frontload multipler
""" """
return 2.0 - (self.alpha ** index) return 2.0 - (self.alpha ** index)

View file

@ -1,7 +1,7 @@
import logging import logging
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice from lbrynet.core.PriceModel import get_default_price_model
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -18,7 +18,7 @@ class BasicAvailabilityWeightedStrategy(object):
until the rate is accepted or a threshold is reached until the rate is accepted or a threshold is reached
""" """
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0): def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005, min_rate=0.0, **kwargs):
self._acceleration = acceleration # rate of how quickly to ramp offer self._acceleration = acceleration # rate of how quickly to ramp offer
self._deceleration = deceleration self._deceleration = deceleration
self._min_rate = min_rate self._min_rate = min_rate
@ -27,7 +27,7 @@ class BasicAvailabilityWeightedStrategy(object):
self._count_down = {} self._count_down = {}
self._requested = {} self._requested = {}
self._offers_to_peers = {} self._offers_to_peers = {}
self.model = MeanAvailabilityWeightedPrice(blob_tracker) self.model = get_default_price_model(blob_tracker, **kwargs)
def respond_to_offer(self, offer, peer, blobs): def respond_to_offer(self, offer, peer, blobs):
request_count = self._count_up.get(peer, 0) request_count = self._count_up.get(peer, 0)
@ -53,18 +53,24 @@ class BasicAvailabilityWeightedStrategy(object):
def make_offer(self, peer, blobs): def make_offer(self, peer, blobs):
# use mean turn-discounted price for all the blobs requested # use mean turn-discounted price for all the blobs requested
# if there was a previous offer replied to, use the same rate if it was accepted
last_offer = self._offers_to_peers.get(peer, False)
if last_offer:
if last_offer.rate is not None and last_offer.accepted:
return last_offer
request_count = self._count_down.get(peer, 0) request_count = self._count_down.get(peer, 0)
self._inc_down_count(peer) self._inc_down_count(peer)
if request_count == 0: if request_count == 0:
# Try asking for it for free # Try asking for it for free
offer = Offer(0.0) self._offers_to_peers.update({peer: Offer(0.0)})
else: else:
rates = [self._calculate_price(blob) for blob in blobs] rates = [self._calculate_price(blob) for blob in blobs]
mean_rate = sum(rates) / max(len(blobs), 1) mean_rate = sum(rates) / max(len(blobs), 1)
with_premium = self._premium(mean_rate, request_count) with_premium = self._premium(mean_rate, request_count)
price = self._bounded_price(with_premium) price = self._bounded_price(with_premium)
offer = Offer(price) self._offers_to_peers.update({peer: Offer(price)})
return offer return self._offers_to_peers[peer]
def _bounded_price(self, price): def _bounded_price(self, price):
price_for_return = min(self._max_rate, max(price, self._min_rate)) price_for_return = min(self._max_rate, max(price, self._min_rate))

View file

@ -71,8 +71,11 @@ class BlobRequestHandler(object):
response.addCallback(lambda r: self._handle_payment_rate_query(offer, r)) response.addCallback(lambda r: self._handle_payment_rate_query(offer, r))
if self.BLOB_QUERY in queries: if self.BLOB_QUERY in queries:
incoming = queries[self.BLOB_QUERY] if self.PAYMENT_RATE_QUERY in queries:
response.addCallback(lambda r: self._reply_to_send_request(r, incoming)) incoming = queries[self.BLOB_QUERY]
response.addCallback(lambda r: self._reply_to_send_request(r, incoming))
else:
response.addCallback(lambda _: {'incoming_blob': {'error': 'RATE_UNSET'}})
return response return response
@ -80,11 +83,14 @@ class BlobRequestHandler(object):
blobs = request.get("available_blobs", []) blobs = request.get("available_blobs", [])
log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs)) log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs))
accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer) accepted = self.payment_rate_manager.accept_rate_blob_data(self.peer, blobs, offer)
if accepted: if offer.accepted:
self.blob_data_payment_rate = offer.rate self.blob_data_payment_rate = offer.rate
request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED"
else: elif offer.too_low:
request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW"
offer.unset()
elif offer.is_unset:
request['incoming_blob'] = {'error': 'RATE_UNSET'}
return request return request
def _handle_blob_query(self, response, query): def _handle_blob_query(self, response, query):
@ -92,8 +98,8 @@ class BlobRequestHandler(object):
response['incoming_blob'] = {} response['incoming_blob'] = {}
if self.blob_data_payment_rate is None: if self.blob_data_payment_rate is None:
response['incoming_blob']['error'] = "RATE_UNSET" response['incoming_blob'] = {'error': "RATE_UNSET"}
return defer.succeed(response) return response
else: else:
return self._send_blob(response, query) return self._send_blob(response, query)
@ -105,7 +111,7 @@ class BlobRequestHandler(object):
def open_blob_for_reading(self, blob, response): def open_blob_for_reading(self, blob, response):
def failure(msg): def failure(msg):
log.warning("We can not send %s: %s", blob, msg) log.warning("We can not send %s: %s", blob, msg)
response['incoming_blob']['error'] = "BLOB_UNAVAILABLE" response['incoming_blob'] = {'error': 'BLOB_UNAVAILABLE'}
return response return response
if not blob.is_validated(): if not blob.is_validated():
return failure("blob can't be validated") return failure("blob can't be validated")
@ -163,7 +169,7 @@ class BlobRequestHandler(object):
d.addCallback(lambda _: response) d.addCallback(lambda _: response)
return d return d
log.warning("We can not send %s", str(blob)) log.warning("We can not send %s", str(blob))
response['error'] = "BLOB_UNAVAILABLE" response['incoming_blob'] = {'error': 'BLOB_UNAVAILABLE'}
d.addCallback(lambda _: response) d.addCallback(lambda _: response)
return d return d
@ -178,7 +184,7 @@ class BlobRequestHandler(object):
if self.blob_data_payment_rate is None: if self.blob_data_payment_rate is None:
log.warning("Rate not set yet") log.warning("Rate not set yet")
response['error'] = "RATE_UNSET" response['incoming_blob'] = {'error': 'RATE_UNSET'}
return defer.succeed(response) return defer.succeed(response)
else: else:
log.debug("Requested blob: %s", str(incoming)) log.debug("Requested blob: %s", str(incoming))

View file

@ -7,18 +7,18 @@ from twisted.trial import unittest
from lbrynet.core import Peer from lbrynet.core import Peer
from lbrynet.core.server import BlobRequestHandler from lbrynet.core.server import BlobRequestHandler
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager, BasePaymentRateManager
from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker
class TestBlobRequestHandlerQueries(unittest.TestCase): class TestBlobRequestHandlerQueries(unittest.TestCase):
def setUp(self): def setUp(self):
self.blob_manager = mock.Mock() self.blob_manager = mock.Mock()
self.payment_rate_manager = mock.Mock() self.payment_rate_manager = NegotiatedPaymentRateManager(BasePaymentRateManager(0.001), DummyBlobAvailabilityTracker())
self.handler = BlobRequestHandler.BlobRequestHandler( self.handler = BlobRequestHandler.BlobRequestHandler(self.blob_manager, None, self.payment_rate_manager)
self.blob_manager, None, self.payment_rate_manager)
def test_empty_response_when_empty_query(self): def test_empty_response_when_empty_query(self):
self.assertEqual( self.assertEqual({}, self.successResultOf(self.handler.handle_queries({})))
{}, self.successResultOf(self.handler.handle_queries({})))
def test_error_set_when_rate_is_missing(self): def test_error_set_when_rate_is_missing(self):
query = {'requested_blob': 'blob'} query = {'requested_blob': 'blob'}
@ -27,9 +27,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred)) self.assertEqual(response, self.successResultOf(deferred))
def test_error_set_when_rate_too_low(self): def test_error_set_when_rate_too_low(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = False
query = { query = {
'blob_data_payment_rate': 'way_too_low', 'blob_data_payment_rate': '-1.0',
'requested_blob': 'blob' 'requested_blob': 'blob'
} }
deferred = self.handler.handle_queries(query) deferred = self.handler.handle_queries(query)
@ -40,9 +39,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred)) self.assertEqual(response, self.successResultOf(deferred))
def test_response_when_rate_too_low(self): def test_response_when_rate_too_low(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = False
query = { query = {
'blob_data_payment_rate': 'way_too_low', 'blob_data_payment_rate': '-1.0',
} }
deferred = self.handler.handle_queries(query) deferred = self.handler.handle_queries(query)
response = { response = {
@ -51,12 +49,11 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred)) self.assertEqual(response, self.successResultOf(deferred))
def test_blob_unavailable_when_blob_not_validated(self): def test_blob_unavailable_when_blob_not_validated(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = True
blob = mock.Mock() blob = mock.Mock()
blob.is_validated.return_value = False blob.is_validated.return_value = False
self.blob_manager.get_blob.return_value = defer.succeed(blob) self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = { query = {
'blob_data_payment_rate': 'rate', 'blob_data_payment_rate': 1.0,
'requested_blob': 'blob' 'requested_blob': 'blob'
} }
deferred = self.handler.handle_queries(query) deferred = self.handler.handle_queries(query)
@ -67,13 +64,12 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred)) self.assertEqual(response, self.successResultOf(deferred))
def test_blob_unavailable_when_blob_cannot_be_opened(self): def test_blob_unavailable_when_blob_cannot_be_opened(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = True
blob = mock.Mock() blob = mock.Mock()
blob.is_validated.return_value = True blob.is_validated.return_value = True
blob.open_for_reading.return_value = None blob.open_for_reading.return_value = None
self.blob_manager.get_blob.return_value = defer.succeed(blob) self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = { query = {
'blob_data_payment_rate': 'rate', 'blob_data_payment_rate': 0.0,
'requested_blob': 'blob' 'requested_blob': 'blob'
} }
deferred = self.handler.handle_queries(query) deferred = self.handler.handle_queries(query)
@ -84,15 +80,17 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
self.assertEqual(response, self.successResultOf(deferred)) self.assertEqual(response, self.successResultOf(deferred))
def test_blob_details_are_set_when_all_conditions_are_met(self): def test_blob_details_are_set_when_all_conditions_are_met(self):
self.payment_rate_manager.accept_rate_blob_data.return_value = True
blob = mock.Mock() blob = mock.Mock()
blob.is_validated.return_value = True blob.is_validated.return_value = True
blob.open_for_reading.return_value = True blob.open_for_reading.return_value = True
blob.blob_hash = 'DEADBEEF' blob.blob_hash = 'DEADBEEF'
blob.length = 42 blob.length = 42
peer = mock.Mock()
peer.host = "1.2.3.4"
self.handler.peer = peer
self.blob_manager.get_blob.return_value = defer.succeed(blob) self.blob_manager.get_blob.return_value = defer.succeed(blob)
query = { query = {
'blob_data_payment_rate': 'rate', 'blob_data_payment_rate': 1.0,
'requested_blob': 'blob' 'requested_blob': 'blob'
} }
deferred = self.handler.handle_queries(query) deferred = self.handler.handle_queries(query)
@ -103,7 +101,8 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
'length': 42 'length': 42
} }
} }
self.assertEqual(response, self.successResultOf(deferred)) result = self.successResultOf(deferred)
self.assertEqual(response, result)
class TestBlobRequestHandlerSender(unittest.TestCase): class TestBlobRequestHandlerSender(unittest.TestCase):

View file

@ -1,9 +1,16 @@
from twisted.trial import unittest from twisted.trial import unittest
import random
from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy from lbrynet.core.Strategy import BasicAvailabilityWeightedStrategy
from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker from lbrynet.core.BlobAvailability import DummyBlobAvailabilityTracker
def get_random_sample(list_to_sample):
result = list_to_sample[random.randint(1, len(list_to_sample)):random.randint(1, len(list_to_sample))]
if not result:
return get_random_sample(list_to_sample)
return result
class StrategyTests(unittest.TestCase):
class AvailabilityWeightedStrategyTests(unittest.TestCase):
def test_first_offer_is_zero_and_second_isnt(self): def test_first_offer_is_zero_and_second_isnt(self):
strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
peer = "1.1.1.1" peer = "1.1.1.1"
@ -13,23 +20,51 @@ class StrategyTests(unittest.TestCase):
self.assertEquals(offer1.rate, 0.0) self.assertEquals(offer1.rate, 0.0)
self.assertNotEqual(offer2.rate, 0.0) self.assertNotEqual(offer2.rate, 0.0)
def test_accept_zero_for_first_offer_and_reject_after(self): def test_accept_zero_and_persist(self):
host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) host_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
client = "1.1.1.1" client = "1.1.1.1"
host = "1.1.1.2" host = "1.1.1.2"
blobs = host_strategy.model.blob_tracker.availability.keys() blobs = host_strategy.model.blob_tracker.availability.keys()
client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker())
client_offer1 = client_strategy.make_offer(host, blobs) offer = client_strategy.make_offer(host, blobs)
client_strategy = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker()) response1 = host_strategy.respond_to_offer(offer, client, blobs)
client_offer2 = client_strategy.make_offer(host, blobs) offer = client_strategy.make_offer(host, blobs)
response2 = host_strategy.respond_to_offer(offer, client, blobs)
host_response1 = host_strategy.respond_to_offer(client_offer1, client, blobs) self.assertEquals(response1.too_low, False)
host_response2 = host_strategy.respond_to_offer(client_offer2, client, blobs) self.assertEquals(response1.accepted, True)
self.assertEquals(response1.rate, 0.0)
self.assertEquals(host_response2.too_low, False) self.assertEquals(response2.too_low, False)
self.assertEquals(host_response1.accepted, True) self.assertEquals(response2.accepted, True)
self.assertEquals(host_response1.rate, 0.0) self.assertEquals(response2.rate, 0.0)
self.assertEquals(host_response2.too_low, True) def test_turns_before_accept_with_similar_rate_settings(self):
self.assertEquals(host_response2.accepted, False) blobs = [
self.assertEquals(host_response2.rate, 0.0) 'b2e48bb4c88cf46b76adf0d47a72389fae0cd1f19ed27dc509138c99509a25423a4cef788d571dca7988e1dca69e6fa0',
'd7c82e6cac093b3f16107d2ae2b2c75424f1fcad2c7fbdbe66e4a13c0b6bd27b67b3a29c403b82279ab0f7c1c48d6787',
'5a450b416275da4bdff604ee7b58eaedc7913c5005b7184fc3bc5ef0b1add00613587f54217c91097fc039ed9eace9dd',
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0',
'9dbda74a472a2e5861a5d18197aeba0f5de67c67e401124c243d2f0f41edf01d7a26aeb0b5fc9bf47f6361e0f0968e2c',
'91dc64cf1ff42e20d627b033ad5e4c3a4a96856ed8a6e3fb4cd5fa1cfba4bf72eefd325f579db92f45f4355550ace8e7',
'6d8017aba362e5c5d0046625a039513419810a0397d728318c328a5cc5d96efb589fbca0728e54fe5adbf87e9545ee07',
'6af95cd062b4a179576997ef1054c9d2120f8592eea045e9667bea411d520262cd5a47b137eabb7a7871f5f8a79c92dd',
'8c70d5e2f5c3a6085006198e5192d157a125d92e7378794472007a61947992768926513fc10924785bdb1761df3c37e6',
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c'
]
for x in range(10):
client_base = 0.001 * x
for y in range(10):
host_base = 0.001 * y
client_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=client_base)
host_strat = BasicAvailabilityWeightedStrategy(DummyBlobAvailabilityTracker(), base_price=host_base)
for z in range(100):
blobs_to_query = get_random_sample(blobs)
accepted = False
turns = 0
while not accepted:
offer = client_strat.make_offer("2.3.4.5", blobs_to_query)
response = host_strat.respond_to_offer(offer, "3.4.5.6", blobs_to_query)
accepted = response.accepted
turns += 1
self.assertGreater(5, turns)