forked from LBRYCommunity/lbry-sdk
fix problems in blobrequester
-also move is_generous_host into a conf setting
This commit is contained in:
parent
9ba38c84cc
commit
3f5efb1fa3
10 changed files with 64 additions and 39 deletions
|
@ -3,7 +3,7 @@ Some network wide and also application specific parameters
|
|||
"""
|
||||
import os
|
||||
|
||||
|
||||
is_generous_host = True
|
||||
IS_DEVELOPMENT_VERSION = False
|
||||
|
||||
MAX_HANDSHAKE_SIZE = 2**16
|
||||
|
|
|
@ -17,7 +17,7 @@ class BlobAvailabilityTracker(object):
|
|||
|
||||
def __init__(self, blob_manager, peer_finder, dht_node):
|
||||
self.availability = {}
|
||||
self.last_mean_availability = Decimal(0.0)
|
||||
self._last_mean_availability = Decimal(0.0)
|
||||
self._blob_manager = blob_manager
|
||||
self._peer_finder = peer_finder
|
||||
self._dht_node = dht_node
|
||||
|
@ -50,6 +50,11 @@ class BlobAvailabilityTracker(object):
|
|||
d.addCallback(lambda results: [val for success, val in results if success])
|
||||
return d
|
||||
|
||||
@property
|
||||
def last_mean_availability(self):
|
||||
return max(Decimal(0.01), self._last_mean_availability)
|
||||
|
||||
|
||||
def _update_peers_for_blob(self, blob):
|
||||
def _save_peer_info(blob_hash, peers):
|
||||
v = {blob_hash: peers}
|
||||
|
@ -86,4 +91,4 @@ class BlobAvailabilityTracker(object):
|
|||
def _get_mean_peers(self):
|
||||
num_peers = [len(self.availability[blob]) for blob in self.availability]
|
||||
mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers)))
|
||||
self.last_mean_availability = mean
|
||||
self._last_mean_availability = mean
|
||||
|
|
|
@ -30,9 +30,12 @@ class MeanAvailabilityWeightedPrice(object):
|
|||
mean_availability = self.blob_tracker.last_mean_availability
|
||||
availability = self.blob_tracker.availability.get(blob, [])
|
||||
index = 0 # blob.index
|
||||
price = self.base_price * (mean_availability / Decimal(max(1, len(availability)))) / self._frontload(index)
|
||||
price = self.base_price * self._get_availability_multiplier(mean_availability, availability) / self._frontload(index)
|
||||
return round(price, 5)
|
||||
|
||||
def _get_availability_multiplier(self, mean_availability, availability):
|
||||
return Decimal(max(1, mean_availability) / Decimal(max(1, len(availability))))
|
||||
|
||||
def _frontload(self, index):
|
||||
"""
|
||||
Get front-load multiplier, used to weight prices of blobs in a stream towards the front of the stream.
|
||||
|
|
|
@ -29,7 +29,7 @@ class Session(object):
|
|||
def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None,
|
||||
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
|
||||
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
|
||||
blob_tracker_class=None, payment_rate_manager_class=None):
|
||||
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True):
|
||||
"""
|
||||
@param blob_data_payment_rate: The default payment rate for blob data
|
||||
|
||||
|
@ -109,6 +109,7 @@ class Session(object):
|
|||
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
|
||||
self.payment_rate_manager = None
|
||||
self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
|
||||
self.is_generous = is_generous
|
||||
|
||||
def setup(self):
|
||||
"""Create the blob directory and database if necessary, start all desired services"""
|
||||
|
@ -271,7 +272,8 @@ class Session(object):
|
|||
self.dht_node)
|
||||
if self.payment_rate_manager is None:
|
||||
self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager,
|
||||
self.blob_tracker)
|
||||
self.blob_tracker,
|
||||
self.is_generous)
|
||||
|
||||
self.rate_limiter.start()
|
||||
d1 = self.blob_manager.setup()
|
||||
|
|
|
@ -19,6 +19,7 @@ class Strategy(object):
|
|||
self.price_model = price_model
|
||||
self.is_generous = is_generous
|
||||
self.accepted_offers = {}
|
||||
self.pending_sent_offers = {}
|
||||
self.offers_sent = {}
|
||||
self.offers_received = {}
|
||||
self.max_rate = max_rate or Decimal(self.price_model.base_price * 100)
|
||||
|
@ -36,13 +37,17 @@ class Strategy(object):
|
|||
if peer in self.accepted_offers:
|
||||
# if there was a previous accepted offer, use that
|
||||
offer = self.accepted_offers[peer]
|
||||
if peer in self.pending_sent_offers:
|
||||
del self.pending_sent_offers[peer]
|
||||
elif offer_count == 0 and self.is_generous:
|
||||
# Try asking for it for free
|
||||
offer = Offer(Decimal(0.0))
|
||||
self.pending_sent_offers.update({peer: offer})
|
||||
else:
|
||||
rates = [self.price_model.calculate_price(blob) for blob in blobs]
|
||||
price = self._make_rate_offer(rates, offer_count)
|
||||
offer = Offer(price)
|
||||
self.pending_sent_offers.update({peer: offer})
|
||||
return offer
|
||||
|
||||
def respond_to_offer(self, offer, peer, blobs):
|
||||
|
@ -50,7 +55,6 @@ class Strategy(object):
|
|||
self._add_offer_received(peer)
|
||||
rates = [self.price_model.calculate_price(blob) for blob in blobs]
|
||||
price = self._get_response_rate(rates, offer_count)
|
||||
|
||||
if peer in self.accepted_offers:
|
||||
offer = self.accepted_offers[peer]
|
||||
elif offer.rate == 0.0 and offer_count == 0 and self.is_generous:
|
||||
|
|
|
@ -51,6 +51,7 @@ class BlobRequester(object):
|
|||
self._available_blobs = defaultdict(list) # {Peer: [blob_hash]}
|
||||
self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}}
|
||||
self._protocol_prices = {} # {ClientProtocol: price}
|
||||
self._protocol_offers = {}
|
||||
self._price_disagreements = [] # [Peer]
|
||||
self._protocol_tries = {}
|
||||
self._incompatible_peers = []
|
||||
|
@ -84,17 +85,18 @@ class BlobRequester(object):
|
|||
if availability.can_make_request():
|
||||
availability.make_request_and_handle_response()
|
||||
sent_request = True
|
||||
|
||||
if price.can_make_request():
|
||||
# TODO: document why a PriceRequest is only made if an
|
||||
# Availability or Download request was made
|
||||
price.make_request_and_handle_response()
|
||||
sent_request = True
|
||||
if download.can_make_request():
|
||||
try:
|
||||
download.make_request_and_handle_response()
|
||||
sent_request = True
|
||||
except InsufficientFundsError as err:
|
||||
return defer.fail(err)
|
||||
if sent_request and price.can_make_request():
|
||||
# TODO: document why a PriceRequest is only made if an
|
||||
# Availability or Download request was made
|
||||
price.make_request_and_handle_response()
|
||||
|
||||
return defer.succeed(sent_request)
|
||||
|
||||
def _get_hash_for_peer_search(self):
|
||||
|
@ -182,6 +184,10 @@ class RequestHelper(object):
|
|||
def protocol_prices(self):
|
||||
return self.requestor._protocol_prices
|
||||
|
||||
@property
|
||||
def protocol_offers(self):
|
||||
return self.requestor._protocol_offers
|
||||
|
||||
@property
|
||||
def available_blobs(self):
|
||||
return self.requestor._available_blobs[self.peer]
|
||||
|
@ -213,7 +219,7 @@ class RequestHelper(object):
|
|||
rate = self.protocol_prices.get(self.protocol)
|
||||
if rate is None:
|
||||
rate = self.payment_rate_manager.get_rate_blob_data(self.peer, self.available_blobs)
|
||||
self.protocol_prices[self.protocol] = rate
|
||||
self.protocol_offers[self.protocol] = rate
|
||||
return rate
|
||||
|
||||
|
||||
|
@ -337,7 +343,9 @@ class AvailabilityRequest(RequestHelper):
|
|||
class PriceRequest(RequestHelper):
|
||||
"""Ask a peer if a certain price is acceptable"""
|
||||
def can_make_request(self):
|
||||
return self.get_and_save_rate() is not None
|
||||
if self.requestor._available_blobs:
|
||||
return self.get_and_save_rate() is not None
|
||||
return False
|
||||
|
||||
def make_request_and_handle_response(self):
|
||||
request = self._get_price_request()
|
||||
|
@ -362,22 +370,19 @@ class PriceRequest(RequestHelper):
|
|||
assert request.response_identifier == 'blob_data_payment_rate'
|
||||
if 'blob_data_payment_rate' not in response_dict:
|
||||
return InvalidResponseError("response identifier not in response")
|
||||
assert self.protocol in self.protocol_prices
|
||||
rate = self.protocol_prices[self.protocol]
|
||||
offer = Offer(rate)
|
||||
assert self.protocol in self.protocol_offers
|
||||
offer = Offer(self.protocol_offers[self.protocol])
|
||||
offer.handle(response_dict['blob_data_payment_rate'])
|
||||
self.payment_rate_manager.record_offer_reply(self.peer.host, offer)
|
||||
|
||||
if offer.is_accepted:
|
||||
log.debug("Offered rate %f/mb accepted by %s", rate, str(self.peer.host))
|
||||
log.info("Offered rate %f/mb accepted by %s", offer.rate, self.peer.host)
|
||||
self.protocol_prices[self.protocol] = offer.rate
|
||||
return True
|
||||
elif offer.is_too_low:
|
||||
log.debug("Offered rate %f/mb rejected by %s", rate, str(self.peer.host))
|
||||
del self.protocol_prices[self.protocol]
|
||||
log.debug("Offered rate %f/mb rejected by %s", offer.rate, self.peer.host)
|
||||
return True
|
||||
else:
|
||||
log.warning("Price disagreement")
|
||||
del self.protocol_prices[self.protocol]
|
||||
self.requestor._price_disagreements.append(self.peer)
|
||||
return False
|
||||
|
||||
|
@ -389,7 +394,9 @@ class DownloadRequest(RequestHelper):
|
|||
self.wallet = wallet
|
||||
|
||||
def can_make_request(self):
|
||||
return self.get_blob_details()
|
||||
if self.protocol in self.protocol_prices:
|
||||
return self.get_blob_details()
|
||||
return False
|
||||
|
||||
def make_request_and_handle_response(self):
|
||||
request = self._get_request()
|
||||
|
|
|
@ -69,6 +69,8 @@ class BlobRequestHandler(object):
|
|||
if self.PAYMENT_RATE_QUERY in queries:
|
||||
offered_rate = queries[self.PAYMENT_RATE_QUERY]
|
||||
offer = Offer(offered_rate)
|
||||
if offer.rate is None:
|
||||
log.warning("Empty rate offer")
|
||||
response.addCallback(lambda r: self._handle_payment_rate_query(offer, r))
|
||||
if self.BLOB_QUERY in queries:
|
||||
incoming = queries[self.BLOB_QUERY]
|
||||
|
|
|
@ -48,7 +48,7 @@ from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS,
|
|||
DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, \
|
||||
LOG_POST_URL, LOG_FILE_NAME, REFLECTOR_SERVERS, SEARCH_SERVERS
|
||||
from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT
|
||||
from lbrynet.conf import DEFAULT_TIMEOUT
|
||||
from lbrynet.conf import DEFAULT_TIMEOUT, is_generous_host
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader
|
||||
from lbrynet.core.Session import Session
|
||||
|
@ -1063,7 +1063,8 @@ class Daemon(jsonrpc.JSONRPC):
|
|||
self.session = Session(results['default_data_payment_rate'], db_dir=self.db_dir, lbryid=self.lbryid,
|
||||
blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port,
|
||||
known_dht_nodes=self.known_dht_nodes, peer_port=self.peer_port,
|
||||
use_upnp=self.use_upnp, wallet=results['wallet'])
|
||||
use_upnp=self.use_upnp, wallet=results['wallet'],
|
||||
is_generous=is_generous_host)
|
||||
self.startup_status = STARTUP_STAGES[2]
|
||||
|
||||
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
|
||||
|
|
|
@ -11,7 +11,6 @@ from Crypto.PublicKey import RSA
|
|||
from Crypto import Random
|
||||
from Crypto.Hash import MD5
|
||||
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
|
||||
from lbrynet.conf import MIN_BLOB_INFO_PAYMENT_RATE
|
||||
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
|
||||
|
@ -208,7 +207,7 @@ test_create_stream_sd_file = {
|
|||
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'}
|
||||
|
||||
|
||||
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None):
|
||||
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None, is_generous=False):
|
||||
if sys.platform.startswith("linux"):
|
||||
sys.modules = sys.modules.copy()
|
||||
del sys.modules['twisted.internet.reactor']
|
||||
|
@ -237,9 +236,9 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
|
|||
os.mkdir(db_dir)
|
||||
|
||||
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node)
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node, is_generous=is_generous)
|
||||
|
||||
stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
|
@ -320,7 +319,7 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rat
|
|||
reactor.run()
|
||||
|
||||
|
||||
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None):
|
||||
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None, is_generous=False):
|
||||
|
||||
if sys.platform.startswith("linux"):
|
||||
sys.modules = sys.modules.copy()
|
||||
|
@ -354,7 +353,8 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_ra
|
|||
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n),
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=None, peer_port=peer_port,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=is_generous)
|
||||
|
||||
stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
|
@ -565,7 +565,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
|
|||
reactor.run()
|
||||
|
||||
|
||||
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
|
||||
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False):
|
||||
|
||||
if sys.platform.startswith("linux"):
|
||||
sys.modules = sys.modules.copy()
|
||||
|
@ -599,7 +599,8 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
|
|||
session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="efgh",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=peer_port,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=is_generous)
|
||||
|
||||
if slow is True:
|
||||
session.rate_limiter.set_ul_limit(2**11)
|
||||
|
@ -672,6 +673,7 @@ class TestTransfer(TestCase):
|
|||
self.session = None
|
||||
self.stream_info_manager = None
|
||||
self.lbry_file_manager = None
|
||||
self.is_generous = True
|
||||
self.addCleanup(self.take_down_env)
|
||||
|
||||
def take_down_env(self):
|
||||
|
@ -772,7 +774,7 @@ class TestTransfer(TestCase):
|
|||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node)
|
||||
dht_node_class=Node, is_generous=self.is_generous)
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
|
@ -1235,6 +1237,7 @@ class TestStreamify(TestCase):
|
|||
self.stream_info_manager = None
|
||||
self.lbry_file_manager = None
|
||||
self.addCleanup(self.take_down_env)
|
||||
self.is_generous = True
|
||||
|
||||
def take_down_env(self):
|
||||
|
||||
|
@ -1255,7 +1258,6 @@ class TestStreamify(TestCase):
|
|||
return d
|
||||
|
||||
def test_create_stream(self):
|
||||
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 2)
|
||||
|
@ -1272,7 +1274,8 @@ class TestStreamify(TestCase):
|
|||
self.session = Session(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker)
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=self.is_generous)
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
|
@ -1316,7 +1319,6 @@ class TestStreamify(TestCase):
|
|||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
|
||||
db_dir = "client"
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
os.mkdir(db_dir)
|
||||
|
|
|
@ -157,7 +157,6 @@ class DummyBlobAvailabilityTracker(BlobAvailabilityTracker):
|
|||
'f99d24cd50d4bfd77c2598bfbeeb8415bf0feef21200bdf0b8fbbde7751a77b7a2c68e09c25465a2f40fba8eecb0b4e0': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
'c84aa1fd8f5009f7c4e71e444e40d95610abc1480834f835eefb267287aeb10025880a3ce22580db8c6d92efb5bc0c9c': ['1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4', '1.2.3.4'],
|
||||
}
|
||||
self.last_mean_availability = Decimal(0.0)
|
||||
self._blob_manager = None
|
||||
self._peer_finder = PeerFinder(11223, 11224, 2)
|
||||
self._dht_node = None
|
||||
|
|
Loading…
Reference in a new issue