Merge pull request #198 from lbryio/fix-blobrequester

fix problems in blobrequester
This commit is contained in:
Jack Robison 2016-10-20 12:41:26 -04:00 committed by GitHub
commit ea3225c897
12 changed files with 99 additions and 49 deletions

View file

@ -3,7 +3,7 @@ Some network wide and also application specific parameters
"""
import os
is_generous_host = True
IS_DEVELOPMENT_VERSION = False
MAX_HANDSHAKE_SIZE = 2**16

View file

@ -17,7 +17,7 @@ class BlobAvailabilityTracker(object):
def __init__(self, blob_manager, peer_finder, dht_node):
self.availability = {}
self.last_mean_availability = Decimal(0.0)
self._last_mean_availability = Decimal(0.0)
self._blob_manager = blob_manager
self._peer_finder = peer_finder
self._dht_node = dht_node
@ -50,6 +50,11 @@ class BlobAvailabilityTracker(object):
d.addCallback(lambda results: [val for success, val in results if success])
return d
@property
def last_mean_availability(self):
return max(Decimal(0.01), self._last_mean_availability)
def _update_peers_for_blob(self, blob):
def _save_peer_info(blob_hash, peers):
v = {blob_hash: peers}
@ -86,4 +91,4 @@ class BlobAvailabilityTracker(object):
def _get_mean_peers(self):
num_peers = [len(self.availability[blob]) for blob in self.availability]
mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers)))
self.last_mean_availability = mean
self._last_mean_availability = mean

View file

@ -1,5 +1,7 @@
from lbrynet.core.Strategy import get_default_strategy
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, MIN_BLOB_INFO_PAYMENT_RATE, is_generous_host
from decimal import Decimal
class BasePaymentRateManager(object):
def __init__(self, rate=MIN_BLOB_DATA_PAYMENT_RATE, info_rate=MIN_BLOB_INFO_PAYMENT_RATE):
@ -34,7 +36,7 @@ class PaymentRateManager(object):
class NegotiatedPaymentRateManager(object):
def __init__(self, base, availability_tracker, generous=True):
def __init__(self, base, availability_tracker, generous=is_generous_host):
"""
@param base: a BasePaymentRateManager
@param availability_tracker: a BlobAvailabilityTracker
@ -71,3 +73,9 @@ class NegotiatedPaymentRateManager(object):
def record_offer_reply(self, peer, offer):
self.strategy.update_accepted_offers(peer, offer)
def price_limit_reached(self, peer):
if peer in self.strategy.pending_sent_offers:
offer = self.strategy.pending_sent_offers[peer]
return offer.is_too_low and round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5)
return False

View file

@ -30,9 +30,12 @@ class MeanAvailabilityWeightedPrice(object):
mean_availability = self.blob_tracker.last_mean_availability
availability = self.blob_tracker.availability.get(blob, [])
index = 0 # blob.index
price = self.base_price * (mean_availability / Decimal(max(1, len(availability)))) / self._frontload(index)
price = self.base_price * self._get_availability_multiplier(mean_availability, availability) / self._frontload(index)
return round(price, 5)
def _get_availability_multiplier(self, mean_availability, availability):
return Decimal(max(1, mean_availability) / Decimal(max(1, len(availability))))
def _frontload(self, index):
"""
Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream.

View file

@ -29,7 +29,7 @@ class Session(object):
def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None):
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True):
"""
@param blob_data_payment_rate: The default payment rate for blob data
@ -109,6 +109,7 @@ class Session(object):
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = None
self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
self.is_generous = is_generous
def setup(self):
"""Create the blob directory and database if necessary, start all desired services"""
@ -271,7 +272,8 @@ class Session(object):
self.dht_node)
if self.payment_rate_manager is None:
self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager,
self.blob_tracker)
self.blob_tracker,
self.is_generous)
self.rate_limiter.start()
d1 = self.blob_manager.setup()

View file

@ -1,5 +1,6 @@
from zope.interface import implementer
from decimal import Decimal
from lbrynet.conf import is_generous_host
from lbrynet.interfaces import INegotiationStrategy
from lbrynet.core.Offer import Offer
from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice
@ -15,13 +16,14 @@ class Strategy(object):
"""
implementer(INegotiationStrategy)
def __init__(self, price_model, max_rate, min_rate, is_generous=True):
def __init__(self, price_model, max_rate, min_rate, is_generous=is_generous_host):
self.price_model = price_model
self.is_generous = is_generous
self.accepted_offers = {}
self.pending_sent_offers = {}
self.offers_sent = {}
self.offers_received = {}
self.max_rate = max_rate or Decimal(self.price_model.base_price * 100)
self.max_rate = max_rate or Decimal(self.price_model.base_price * 50)
self.min_rate = Decimal(min_rate)
def _make_rate_offer(self, rates, offer_count):
@ -36,13 +38,17 @@ class Strategy(object):
if peer in self.accepted_offers:
# if there was a previous accepted offer, use that
offer = self.accepted_offers[peer]
if peer in self.pending_sent_offers:
del self.pending_sent_offers[peer]
elif offer_count == 0 and self.is_generous:
# Try asking for it for free
offer = Offer(Decimal(0.0))
self.pending_sent_offers.update({peer: offer})
else:
rates = [self.price_model.calculate_price(blob) for blob in blobs]
price = self._make_rate_offer(rates, offer_count)
offer = Offer(price)
self.pending_sent_offers.update({peer: offer})
return offer
def respond_to_offer(self, offer, peer, blobs):
@ -50,7 +56,6 @@ class Strategy(object):
self._add_offer_received(peer)
rates = [self.price_model.calculate_price(blob) for blob in blobs]
price = self._get_response_rate(rates, offer_count)
if peer in self.accepted_offers:
offer = self.accepted_offers[peer]
elif offer.rate == 0.0 and offer_count == 0 and self.is_generous:
@ -71,6 +76,7 @@ class Strategy(object):
del self.accepted_offers[peer]
if offer.is_accepted:
self.accepted_offers.update({peer: offer})
self.pending_sent_offers.update({peer: offer})
def _add_offer_sent(self, peer):
turn = self.offers_sent.get(peer, 0) + 1
@ -95,7 +101,7 @@ class BasicAvailabilityWeightedStrategy(Strategy):
implementer(INegotiationStrategy)
def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=None, min_rate=0.0,
is_generous=True, base_price=0.0001, alpha=1.0):
is_generous=is_generous_host, base_price=0.0001, alpha=1.0):
price_model = MeanAvailabilityWeightedPrice(blob_tracker, base_price=base_price, alpha=alpha)
Strategy.__init__(self, price_model, max_rate, min_rate, is_generous)
self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer

View file

@ -385,7 +385,7 @@ class Wallet(object):
def _get_my_unspent_claim(claims):
for claim in claims:
if claim['name'] == name and not claim['is spent']:
if claim['name'] == name and not claim['is spent'] and not claim.get('supported_claimid'):
return claim
return False

View file

@ -51,8 +51,10 @@ class BlobRequester(object):
self._available_blobs = defaultdict(list) # {Peer: [blob_hash]}
self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}}
self._protocol_prices = {} # {ClientProtocol: price}
self._protocol_offers = {}
self._price_disagreements = [] # [Peer]
self._protocol_tries = {}
self._maxed_out_peers = []
self._incompatible_peers = []
######## IRequestCreator #########
@ -84,17 +86,18 @@ class BlobRequester(object):
if availability.can_make_request():
availability.make_request_and_handle_response()
sent_request = True
if price.can_make_request():
# TODO: document why a PriceRequest is only made if an
# Availability or Download request was made
price.make_request_and_handle_response()
sent_request = True
if download.can_make_request():
try:
download.make_request_and_handle_response()
sent_request = True
except InsufficientFundsError as err:
return defer.fail(err)
if sent_request and price.can_make_request():
# TODO: document why a PriceRequest is only made if an
# Availability or Download request was made
price.make_request_and_handle_response()
return defer.succeed(sent_request)
def _get_hash_for_peer_search(self):
@ -118,7 +121,9 @@ class BlobRequester(object):
def choose_best_peers(peers):
bad_peers = self._get_bad_peers()
return [p for p in peers if not p in bad_peers]
without_bad_peers = [p for p in peers if not p in bad_peers]
without_maxed_out_peers = [p for p in without_bad_peers if p not in self._maxed_out_peers]
return without_maxed_out_peers
d.addCallback(choose_best_peers)
@ -182,6 +187,10 @@ class RequestHelper(object):
def protocol_prices(self):
return self.requestor._protocol_prices
@property
def protocol_offers(self):
return self.requestor._protocol_offers
@property
def available_blobs(self):
return self.requestor._available_blobs[self.peer]
@ -190,6 +199,10 @@ class RequestHelper(object):
def unavailable_blobs(self):
return self.requestor._unavailable_blobs[self.peer]
@property
def maxed_out_peers(self):
return self.requestor._maxed_out_peers
def update_local_score(self, score):
self.requestor._update_local_score(self.peer, score)
@ -210,10 +223,18 @@ class RequestHelper(object):
return reason
def get_and_save_rate(self):
if self.payment_rate_manager.price_limit_reached(self.peer):
if self.peer not in self.maxed_out_peers:
self.maxed_out_peers.append(self.peer)
return None
rate = self.protocol_prices.get(self.protocol)
if rate is None:
if self.peer in self.payment_rate_manager.strategy.pending_sent_offers:
pending = self.payment_rate_manager.strategy.pending_sent_offers[self.peer]
if not pending.is_too_low and not pending.is_accepted:
return pending.rate
rate = self.payment_rate_manager.get_rate_blob_data(self.peer, self.available_blobs)
self.protocol_prices[self.protocol] = rate
self.protocol_offers[self.protocol] = rate
return rate
@ -337,7 +358,9 @@ class AvailabilityRequest(RequestHelper):
class PriceRequest(RequestHelper):
"""Ask a peer if a certain price is acceptable"""
def can_make_request(self):
if len(self.available_blobs) and not self.protocol in self.protocol_prices:
return self.get_and_save_rate() is not None
return False
def make_request_and_handle_response(self):
request = self._get_price_request()
@ -362,22 +385,19 @@ class PriceRequest(RequestHelper):
assert request.response_identifier == 'blob_data_payment_rate'
if 'blob_data_payment_rate' not in response_dict:
return InvalidResponseError("response identifier not in response")
assert self.protocol in self.protocol_prices
rate = self.protocol_prices[self.protocol]
offer = Offer(rate)
assert self.protocol in self.protocol_offers
offer = Offer(self.protocol_offers[self.protocol])
offer.handle(response_dict['blob_data_payment_rate'])
self.payment_rate_manager.record_offer_reply(self.peer.host, offer)
self.payment_rate_manager.record_offer_reply(self.peer, offer)
if offer.is_accepted:
log.debug("Offered rate %f/mb accepted by %s", rate, str(self.peer.host))
log.info("Offered rate %f/mb accepted by %s", offer.rate, self.peer.host)
self.protocol_prices[self.protocol] = offer.rate
return True
elif offer.is_too_low:
log.debug("Offered rate %f/mb rejected by %s", rate, str(self.peer.host))
del self.protocol_prices[self.protocol]
return True
log.debug("Offered rate %f/mb rejected by %s", offer.rate, self.peer.host)
return not self.payment_rate_manager.price_limit_reached(self.peer)
else:
log.warning("Price disagreement")
del self.protocol_prices[self.protocol]
self.requestor._price_disagreements.append(self.peer)
return False
@ -389,7 +409,9 @@ class DownloadRequest(RequestHelper):
self.wallet = wallet
def can_make_request(self):
if self.protocol in self.protocol_prices:
return self.get_blob_details()
return False
def make_request_and_handle_response(self):
request = self._get_request()

View file

@ -69,6 +69,8 @@ class BlobRequestHandler(object):
if self.PAYMENT_RATE_QUERY in queries:
offered_rate = queries[self.PAYMENT_RATE_QUERY]
offer = Offer(offered_rate)
if offer.rate is None:
log.warning("Empty rate offer")
response.addCallback(lambda r: self._handle_payment_rate_query(offer, r))
if self.BLOB_QUERY in queries:
incoming = queries[self.BLOB_QUERY]

View file

@ -48,7 +48,7 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS,
DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, \
LOG_POST_URL, LOG_FILE_NAME, REFLECTOR_SERVERS, SEARCH_SERVERS
from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT
from lbrynet.conf import DEFAULT_TIMEOUT
from lbrynet.conf import DEFAULT_TIMEOUT, is_generous_host
from lbrynet import conf
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader
from lbrynet.core.Session import Session
@ -1063,7 +1063,8 @@ class Daemon(jsonrpc.JSONRPC):
self.session = Session(results['default_data_payment_rate'], db_dir=self.db_dir, lbryid=self.lbryid,
blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port,
known_dht_nodes=self.known_dht_nodes, peer_port=self.peer_port,
use_upnp=self.use_upnp, wallet=results['wallet'])
use_upnp=self.use_upnp, wallet=results['wallet'],
is_generous=is_generous_host)
self.startup_status = STARTUP_STAGES[2]
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)

View file

@ -11,7 +11,6 @@ from Crypto.PublicKey import RSA
from Crypto import Random
from Crypto.Hash import MD5
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
from lbrynet.conf import MIN_BLOB_INFO_PAYMENT_RATE
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
@ -208,7 +207,7 @@ test_create_stream_sd_file = {
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'}
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None):
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None, is_generous=False):
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
@ -239,7 +238,7 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node)
dht_node_class=Node, is_generous=is_generous)
stream_info_manager = TempEncryptedFileMetadataManager()
@ -320,7 +319,7 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
reactor.run()
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None):
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None, is_generous=False):
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
@ -354,7 +353,8 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n),
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=is_generous)
stream_info_manager = TempEncryptedFileMetadataManager()
@ -565,7 +565,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
reactor.run()
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False):
if sys.platform.startswith("linux"):
sys.modules = sys.modules.copy()
@ -599,7 +599,8 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=is_generous)
if slow is True:
session.rate_limiter.set_ul_limit(2**11)
@ -672,6 +673,7 @@ class TestTransfer(TestCase):
self.session = None
self.stream_info_manager = None
self.lbry_file_manager = None
self.is_generous = True
self.addCleanup(self.take_down_env)
def take_down_env(self):
@ -772,7 +774,7 @@ class TestTransfer(TestCase):
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node)
dht_node_class=Node, is_generous=self.is_generous)
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -1235,6 +1237,7 @@ class TestStreamify(TestCase):
self.stream_info_manager = None
self.lbry_file_manager = None
self.addCleanup(self.take_down_env)
self.is_generous = True
def take_down_env(self):
@ -1255,7 +1258,6 @@ class TestStreamify(TestCase):
return d
def test_create_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
@ -1272,7 +1274,8 @@ class TestStreamify(TestCase):
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=self.is_generous)
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -1316,7 +1319,6 @@ class TestStreamify(TestCase):
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)

View file

@ -157,7 +157,6 @@ class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
}
self.last_mean_availability = Decimal(0.0)
self._blob_manager = None
self._peer_finder = PeerFinder(11223, 11224, 2)
self._dht_node = None