diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 2005d7784..bd9808e12 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -14,7 +14,7 @@ MAX_BLOB_INFOS_TO_REQUEST = 20 BLOBFILES_DIR = ".blobfiles" BLOB_SIZE = 2**21 -MIN_BLOB_DATA_PAYMENT_RATE = .005 # points/megabyte +MIN_BLOB_DATA_PAYMENT_RATE = .001 # points/megabyte MIN_BLOB_INFO_PAYMENT_RATE = .02 # points/1000 infos MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE = .05 # points/1000 infos MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE = .05 # points/1000 infos diff --git a/lbrynet/core/BlobPrice.py b/lbrynet/core/BlobAvailability.py similarity index 60% rename from lbrynet/core/BlobPrice.py rename to lbrynet/core/BlobAvailability.py index 18417c5cc..6a17f3eed 100644 --- a/lbrynet/core/BlobPrice.py +++ b/lbrynet/core/BlobAvailability.py @@ -2,53 +2,21 @@ import logging from twisted.internet import defer from twisted.internet.task import LoopingCall -from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE as min_price log = logging.getLogger(__name__) -base_price = min_price * 10 -# how heavily to value blobs towards the front of the stream -alpha = 1.0 - - -def frontload(index): +class BlobAvailabilityTracker(object): """ - Get frontload multipler - - @param index: blob position in stream - @return: frontload multipler - """ - - return 2.0 - (alpha**index) - - -def calculate_price(mean_availability, availability, index_position=0): - """ - Calculate mean-blob-availability and stream-position weighted price for a blob - - @param mean_availability: sum of blob availabilities over the number of known blobs - @param availability: number of known peers for blob - @param index_position: blob index position in stream - @return: price - """ - - price = max(min_price, base_price * (mean_availability/max(1, availability)) * frontload(index_position)) - return price - - -class BlobPriceAndAvailabilityTracker(object): - """ - Class to track peer counts for known blobs and update price targets + Class to track peer counts for known blobs, and to discover new popular blobs Attributes: - prices (dict): dictionary of blob prices availability (dict): dictionary of peers for known blobs """ def __init__(self, blob_manager, peer_finder, dht_node): self.availability = {} - self.prices = {} + self.last_mean_availability = 0.0 self._blob_manager = blob_manager self._peer_finder = peer_finder self._dht_node = dht_node @@ -70,9 +38,6 @@ class BlobPriceAndAvailabilityTracker(object): def _save_peer_info(blob_hash, peers): v = {blob_hash: peers} self.availability.update(v) - - new_price = self._get_price(blob) - self.prices.update({blob: new_price}) return v d = self._peer_finder.find_peers_for_blob(blob) @@ -88,6 +53,7 @@ class BlobPriceAndAvailabilityTracker(object): dl.append(self._update_peers_for_blob(encoded)) return defer.DeferredList(dl) d = _get_most_popular() + d.addCallback(lambda _: self._get_mean_peers()) def _update_mine(self): def _get_peers(blobs): @@ -97,14 +63,9 @@ class BlobPriceAndAvailabilityTracker(object): return defer.DeferredList(dl) d = self._blob_manager.get_all_verified_blobs() d.addCallback(_get_peers) + d.addCallback(lambda _: self._get_mean_peers()) def _get_mean_peers(self): num_peers = [len(self.availability[blob]) for blob in self.availability] mean = float(sum(num_peers)) / float(max(1, len(num_peers))) - return mean - - def _get_price(self, blob): - mean_available = self._get_mean_peers() - blob_availability = len(self.availability.get(blob, [])) - price = calculate_price(mean_available, blob_availability) - return price \ No newline at end of file + self.last_mean_availability = mean \ No newline at end of file diff --git a/lbrynet/core/BlobHistory.py b/lbrynet/core/BlobHistory.py deleted file mode 100644 index eebafd8bc..000000000 --- a/lbrynet/core/BlobHistory.py +++ /dev/null @@ -1,52 +0,0 @@ -import os -from twisted.enterprise import adbapi -import time - - -class BlobHistoryManager(object): - """ - Class to archive historical blob upload and download information - - This class creates two tables in lbry data folder/blob_history.db, 'download' and 'upload' - The tables store information about what blob was uploaded or downloaded, to or from which peer, - at what price, and when. - """ - - def __init__(self, db_dir): - self.db = None - self.db_dir = db_dir - - def _open_db(self): - self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blob_history.db"), - check_same_thread=False) - - def create_tables(transaction): - transaction.execute("create table if not exists download (" + - " id integer primary key autoincrement, " + - " blob text, " + - " host text, " + - " rate float, " + - " ts integer)") - - transaction.execute("create table if not exists upload (" + - " id integer primary key autoincrement, " + - " blob text, " + - " host text, " + - " rate float, " + - " ts integer)") - - return self.db.runInteraction(create_tables) - - def add_transaction(self, blob_hash, host, rate, upload=False): - ts = int(time.time()) - if upload: - d = self.db.runQuery("insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) - else: - d = self.db.runQuery("insert into download values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) - return d - - def start(self): - d = self._open_db() - return d - - diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 1e362b943..5ae593fcb 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -2,6 +2,7 @@ import logging import os import time import sqlite3 + from twisted.internet import threads, defer from twisted.python.failure import Failure from twisted.enterprise import adbapi @@ -11,8 +12,6 @@ from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked -from lbrynet.core.BlobHistory import BlobHistoryManager - log = logging.getLogger(__name__) @@ -71,6 +70,12 @@ class BlobManager(DHTHashSupplier): def get_all_verified_blobs(self): pass + def add_blob_to_download_history(self, blob_hash, host, rate): + pass + + def add_blob_to_upload_history(self, blob_hash, host, rate): + pass + class DiskBlobManager(BlobManager): """This class stores blobs on the hard disk""" @@ -84,13 +89,11 @@ class DiskBlobManager(BlobManager): self.blobs = {} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} self._next_manage_call = None - self.blob_history_manager = BlobHistoryManager(db_dir) def setup(self): log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir), str(self.db_file)) d = self._open_db() - d.addCallback(lambda _: self.blob_history_manager.start()) d.addCallback(lambda _: self._manage()) return d @@ -188,6 +191,14 @@ class DiskBlobManager(BlobManager): d.addCallback(self.completed_blobs) return d + def add_blob_to_download_history(self, blob_hash, host, rate): + d = self._add_blob_to_download_history(blob_hash, host, rate) + return d + + def add_blob_to_upload_history(self, blob_hash, host, rate): + d = self._add_blob_to_upload_history(blob_hash, host, rate) + return d + def _manage(self): from twisted.internet import reactor @@ -246,12 +257,29 @@ class DiskBlobManager(BlobManager): # one that opened it. The individual connections in the pool are not used in multiple # threads. self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) - return self.db_conn.runQuery("create table if not exists blobs (" + - " blob_hash text primary key, " + - " blob_length integer, " + - " last_verified_time real, " + - " next_announce_time real" - ")") + + def create_tables(transaction): + transaction.execute("create table if not exists blobs (" + + " blob_hash text primary key, " + + " blob_length integer, " + + " last_verified_time real, " + + " next_announce_time real)") + + transaction.execute("create table if not exists download (" + + " id integer primary key autoincrement, " + + " blob text, " + + " host text, " + + " rate float, " + + " ts integer)") + + transaction.execute("create table if not exists upload (" + + " id integer primary key autoincrement, " + + " blob text, " + + " host text, " + + " rate float, " + + " ts integer)") + + return self.db_conn.runInteraction(create_tables) @rerun_if_locked def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None): @@ -429,6 +457,18 @@ class DiskBlobManager(BlobManager): d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs)) return d + @rerun_if_locked + def _add_blob_to_download_history(self, blob_hash, host, rate): + ts = int(time.time()) + d = self.db_conn.runQuery("insert into download values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) + return d + + @rerun_if_locked + def _add_blob_to_upload_history(self, blob_hash, host, rate): + ts = int(time.time()) + d = self.db_conn.runQuery("insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) + return d + class TempBlobManager(BlobManager): """This class stores blobs in memory""" @@ -529,7 +569,6 @@ class TempBlobManager(BlobManager): d.addCallback(lambda _: set_next_manage_call()) def _delete_blobs_marked_for_deletion(self): - def remove_from_list(b_h): del self.blob_hashes_to_delete[b_h] log.info("Deleted blob %s", blob_hash) @@ -558,4 +597,4 @@ class TempBlobManager(BlobManager): d = defer.fail(Failure(NoSuchBlobError(blob_hash))) log.warning("Blob %s cannot be deleted because it is unknown") ds.append(d) - return defer.DeferredList(ds) \ No newline at end of file + return defer.DeferredList(ds) diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 8146dc169..dfc9bfe98 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -87,4 +87,8 @@ class NoSuchStreamHashError(Exception): class InvalidBlobHashError(Exception): + pass + + +class NegotiationError(Exception): pass \ No newline at end of file diff --git a/lbrynet/core/Offer.py b/lbrynet/core/Offer.py new file mode 100644 index 000000000..ea8884f88 --- /dev/null +++ b/lbrynet/core/Offer.py @@ -0,0 +1,70 @@ +from lbrynet.core.Error import NegotiationError + + +class Offer(object): + """ + A rate offer to download blobs from a host + """ + + def __init__(self, offer): + self._state = None + self.rate = None + if isinstance(offer, float): + self.rate = round(offer, 5) + elif offer == Negotiate.RATE_ACCEPTED: + self.accept() + elif offer == Negotiate.RATE_TOO_LOW: + self.reject() + + @property + def accepted(self): + return self._state is Negotiate.RATE_ACCEPTED + + @property + def too_low(self): + return self._state is Negotiate.RATE_TOO_LOW + + @property + def message(self): + if self.accepted: + return Negotiate.RATE_ACCEPTED + elif self.too_low: + return Negotiate.RATE_TOO_LOW + elif self.rate is None: + return Negotiate.RATE_UNSET + + def accept(self): + if self._state is None: + self._state = Negotiate.RATE_ACCEPTED + + def reject(self): + if self._state is None: + self._state = Negotiate.RATE_TOO_LOW + + +class Negotiate(object): + """ + Helper class for converting to and from Offers + """ + + RATE_ACCEPTED = "RATE_ACCEPTED" + RATE_TOO_LOW = "RATE_TOO_LOW" + RATE_UNSET = "RATE_UNSET" + + PAYMENT_RATE = "blob_data_payment_rate" + ERROR = "error" + + @staticmethod + def get_offer_from_request(request_dict): + error = request_dict.get(Negotiate.ERROR, False) + if error: + raise NegotiationError() + return Offer(request_dict.get(Negotiate.PAYMENT_RATE)) + + @staticmethod + def make_dict_from_offer(offer): + if offer.message: + request_dict = {Negotiate.PAYMENT_RATE: offer.message} + else: + request_dict = {Negotiate.PAYMENT_RATE: offer.rate} + return request_dict diff --git a/lbrynet/core/PaymentRateManager.py b/lbrynet/core/PaymentRateManager.py index a18882ac0..319f1e0d0 100644 --- a/lbrynet/core/PaymentRateManager.py +++ b/lbrynet/core/PaymentRateManager.py @@ -1,3 +1,5 @@ +from lbrynet.core.Strategy import get_default_strategy + class BasePaymentRateManager(object): def __init__(self, rate): self.min_blob_data_payment_rate = rate @@ -26,4 +28,30 @@ class PaymentRateManager(object): return self.min_blob_data_payment_rate def record_points_paid(self, amount): - self.points_paid += amount \ No newline at end of file + self.points_paid += amount + + +class NegotiatedPaymentRateManager(object): + def __init__(self, base, availability_tracker): + """ + @param base: a BasePaymentRateManager + @param availability_tracker: a BlobAvailabilityTracker + @param rate: the min blob data payment rate + """ + + self.base = base + self.min_blob_data_payment_rate = self.base.min_blob_data_payment_rate + self.points_paid = 0.0 + self.blob_tracker = availability_tracker + self.strategy = get_default_strategy(self.blob_tracker) + + def get_rate_blob_data(self, peer, blobs): + response = self.strategy.make_offer(peer, blobs) + return response.rate + + def accept_rate_blob_data(self, peer, blobs, offer): + response = self.strategy.respond_to_offer(offer, peer, blobs) + return response.accepted + + def record_points_paid(self, amount): + self.points_paid += amount diff --git a/lbrynet/core/Peer.py b/lbrynet/core/Peer.py index c3b4a76ec..06138abd8 100644 --- a/lbrynet/core/Peer.py +++ b/lbrynet/core/Peer.py @@ -12,8 +12,7 @@ class Peer(object): self.stats = defaultdict(float) # {string stat_type, float count} def is_available(self): - if (self.attempt_connection_at is None or - datetime.datetime.today() > self.attempt_connection_at): + if self.attempt_connection_at is None or datetime.datetime.today() > self.attempt_connection_at: return True return False @@ -33,4 +32,4 @@ class Peer(object): self.stats[stat_type] += count def __str__(self): - return self.host + ":" + str(self.port) \ No newline at end of file + return self.host + ":" + str(self.port) diff --git a/lbrynet/core/PriceModel.py b/lbrynet/core/PriceModel.py new file mode 100644 index 000000000..c0bbaf09f --- /dev/null +++ b/lbrynet/core/PriceModel.py @@ -0,0 +1,37 @@ +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE + + +class MeanAvailabilityWeightedPrice(object): + """ + Calculate mean-blob-availability and stream-position weighted price for a blob + + Attributes: + min_price (float): minimum accepted price + base_price (float): base price to shift from + alpha (float): constant used to more highly value blobs at the beginning of a stream + alpha defaults to 1.0, which has a null effect + blob_tracker (BlobAvailabilityTracker): blob availability tracker + """ + + def __init__(self, tracker, min_price=MIN_BLOB_DATA_PAYMENT_RATE, base_price=None, alpha=1.0): + self.blob_tracker = tracker + self.min_price = min_price + self.base_price = base_price if base_price is not None else min_price * 10 + self.alpha = alpha + + def calculate_price(self, blob): + mean_availability = self.blob_tracker.last_mean_availability + availability = self.blob_tracker.availability.get(blob, []) + index = 0 # blob.index + price = self.base_price * (mean_availability / max(1, len(availability))) * self._frontload(index) + return round(max(self.min_price, price), 5) + + def _frontload(self, index): + """ + Get frontload multipler + + @param index: blob position in stream + @return: frontload multipler + """ + + return 2.0 - (self.alpha ** index) \ No newline at end of file diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 82ca8028a..30f68d400 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -9,8 +9,8 @@ from lbrynet.core.client.DHTPeerFinder import DHTPeerFinder from lbrynet.core.HashAnnouncer import DummyHashAnnouncer from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer from lbrynet.core.utils import generate_id -from lbrynet.core.PaymentRateManager import BasePaymentRateManager -from lbrynet.core.BlobPrice import BlobPriceAndAvailabilityTracker +from lbrynet.core.PaymentRateManager import BasePaymentRateManager, NegotiatedPaymentRateManager +from lbrynet.core.BlobAvailability import BlobAvailabilityTracker from twisted.internet import threads, defer @@ -29,7 +29,7 @@ class LBRYSession(object): def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, - rate_limiter=None, wallet=None, dht_node_class=node.Node): + rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker=None): """ @param blob_data_payment_rate: The default payment rate for blob data @@ -88,6 +88,7 @@ class LBRYSession(object): self.blob_dir = blob_dir self.blob_manager = blob_manager + self.blob_tracker = blob_tracker self.peer_port = peer_port @@ -104,7 +105,7 @@ class LBRYSession(object): self.dht_node = None self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) - self.blob_tracker = None + self.payment_rate_manager = None def setup(self): """Create the blob directory and database if necessary, start all desired services""" @@ -260,7 +261,9 @@ class LBRYSession(object): self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) if self.blob_tracker is None: - self.blob_tracker = BlobPriceAndAvailabilityTracker(self.blob_manager, self.peer_finder, self.dht_node) + self.blob_tracker = BlobAvailabilityTracker(self.blob_manager, self.peer_finder, self.dht_node) + if self.payment_rate_manager is None: + self.payment_rate_manager = NegotiatedPaymentRateManager(self.base_payment_rate_manager, self.blob_tracker) self.rate_limiter.start() d1 = self.blob_manager.setup() diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py new file mode 100644 index 000000000..b4d52fc20 --- /dev/null +++ b/lbrynet/core/Strategy.py @@ -0,0 +1,78 @@ +import logging + +from lbrynet.core.Offer import Offer +from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice + +log = logging.getLogger(__name__) + + +def get_default_strategy(blob_tracker, **kwargs): + return BasicAvailabilityWeightedStrategy(blob_tracker, **kwargs) + + +class BasicAvailabilityWeightedStrategy(object): + """ + Basic strategy to target blob prices based on supply relative to mean supply + + Discount price target with each incoming request, and raise it with each outgoing from the modeled price + until the rate is accepted or a threshold is reached + """ + + def __init__(self, blob_tracker, acceleration=1.25, deceleration=0.9, max_rate=0.005): + self._acceleration = acceleration # rate of how quickly to ramp offer + self._deceleration = deceleration + self._max_rate = max_rate + self._count_up = {} + self._count_down = {} + self._requested = {} + self._offers_to_peers = {} + self.model = MeanAvailabilityWeightedPrice(blob_tracker) + + def respond_to_offer(self, offer, peer, blobs): + request_count = self._count_up.get(peer, 0) + rates = [self._calculate_price(blob) for blob in blobs] + rate = self._discount(sum(rates) / max(len(blobs), 1), request_count) + log.info("Target rate: %s", rate) + + self._inc_up_count(peer) + if offer.accepted: + return offer + elif offer.rate >= rate: + log.info("Accept: %f", offer.rate) + offer.accept() + return offer + else: + log.info("Reject: %f", offer.rate) + offer.reject() + return offer + + def make_offer(self, peer, blobs): + # use mean turn-discounted price for all the blobs requested + request_count = self._count_down.get(peer, 0) + self._inc_down_count(peer) + if request_count == 0: + # Try asking for it for free + offer = Offer(0.0) + else: + rates = [self._calculate_price(blob) for blob in blobs] + mean_rate = sum(rates) / max(len(blobs), 1) + with_premium = self._premium(mean_rate, request_count) + offer = Offer(with_premium) + return offer + + def _inc_up_count(self, peer): + turn = self._count_up.get(peer, 0) + 1 + self._count_up.update({peer: turn}) + + def _inc_down_count(self, peer): + turn = self._count_down.get(peer, 0) + 1 + self._count_down.update({peer: turn}) + + def _calculate_price(self, blob): + return self.model.calculate_price(blob) + + def _premium(self, rate, turn): + return min(rate * (self._acceleration ** turn), self._max_rate) + + def _discount(self, rate, turn): + return min(rate * (self._deceleration ** turn), self._max_rate) \ No newline at end of file diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index e257cbde1..7e956b452 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -1,14 +1,16 @@ -from collections import defaultdict import logging +from collections import defaultdict + from twisted.internet import defer from twisted.python.failure import Failure from zope.interface import implements -from lbrynet.core.Error import PriceDisagreementError, DownloadCanceledError, InsufficientFundsError -from lbrynet.core.Error import InvalidResponseError, RequestCanceledError, NoResponseError + from lbrynet.core.Error import ConnectionClosedBeforeResponseError +from lbrynet.core.Error import InvalidResponseError, RequestCanceledError, NoResponseError +from lbrynet.core.Error import PriceDisagreementError, DownloadCanceledError, InsufficientFundsError from lbrynet.core.client.ClientRequest import ClientRequest, ClientBlobRequest from lbrynet.interfaces import IRequestCreator - +from lbrynet.core.Offer import Negotiate, Offer log = logging.getLogger(__name__) @@ -27,6 +29,7 @@ class BlobRequester(object): self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}} self._protocol_prices = {} # {ClientProtocol: price} self._price_disagreements = [] # [Peer] + self._protocol_tries = {} self._incompatible_peers = [] ######## IRequestCreator ######### @@ -46,7 +49,8 @@ class BlobRequester(object): d1.addCallback(self._handle_availability, peer, a_r) d1.addErrback(self._request_failed, "availability request", peer) sent_request = True - if d_r is not None: + + if d_r is not None and protocol in self._protocol_prices: reserved_points = self._reserve_points(peer, protocol, d_r.max_pay_units) if reserved_points is not None: # Note: The following three callbacks will be called when the blob has been @@ -68,17 +72,18 @@ class BlobRequester(object): # downloaded. d2.addCallback(self._handle_incoming_blob, peer, d_r) d2.addErrback(self._request_failed, "download request", peer) - sent_request = True else: d_r.cancel(InsufficientFundsError()) d_r.finished_deferred.addErrback(lambda _: True) return defer.fail(InsufficientFundsError()) + if sent_request is True: if p_r is not None: d3 = protocol.add_request(p_r) d3.addCallback(self._handle_price_response, peer, p_r, protocol) d3.addErrback(self._request_failed, "price request", peer) + return defer.succeed(sent_request) def get_new_peers(self): @@ -88,6 +93,56 @@ class BlobRequester(object): ######### internal calls ######### + def _blobs_to_download(self): + needed_blobs = self.download_manager.needed_blobs() + return sorted(needed_blobs, key=lambda b: b.is_downloading()) + + def _get_blobs_to_request_from_peer(self, peer): + all_needed = [b.blob_hash for b in self._blobs_to_download() if not b.blob_hash in self._available_blobs[peer]] + # sort them so that the peer will be asked first for blobs it hasn't said it doesn't have + to_request = sorted(all_needed, key=lambda b: b in self._unavailable_blobs[peer])[:20] + return to_request + + def _price_settled(self, protocol): + if protocol in self._protocol_prices: + return True + return False + + def _get_price_request(self, peer, protocol): + request = None + response_identifier = Negotiate.PAYMENT_RATE + if protocol not in self._protocol_prices: + blobs_to_request = self._available_blobs[peer] + if blobs_to_request: + rate = self.payment_rate_manager.get_rate_blob_data(peer, blobs_to_request) + self._protocol_prices[protocol] = rate + offer = Offer(rate) + request = ClientRequest(Negotiate.make_dict_from_offer(offer), response_identifier) + log.debug("Offer rate %s to %s for %i blobs", str(rate), str(peer), len(blobs_to_request)) + else: + log.debug("No blobs to request from %s", str(peer)) + return request + + def _handle_price_response(self, response_dict, peer, request, protocol): + if not request.response_identifier in response_dict: + return InvalidResponseError("response identifier not in response") + assert protocol in self._protocol_prices + offer = Negotiate.get_offer_from_request(response_dict) + rate = self._protocol_prices[protocol] + if offer.accepted: + log.info("Offered rate %f/mb accepted by %s", rate, str(peer.host)) + return True + elif offer.too_low: + log.info("Offered rate %f/mb rejected by %s", rate, str(peer.host)) + del self._protocol_prices[protocol] + return True + else: + log.warning("Price disagreement") + log.warning(offer.rate) + del self._protocol_prices[protocol] + self._price_disagreements.append(peer) + return False + def _download_succeeded(self, arg, peer, blob): log.info("Blob %s has been successfully downloaded from %s", str(blob), str(peer)) self._update_local_score(peer, 5.0) @@ -101,13 +156,13 @@ class BlobRequester(object): self._update_local_score(peer, -10.0) return reason - def _record_blob_aquired(self, blob, host, rate): - self.blob_manager.blob_history_manager.add_transaction(blob, host, rate, upload=False) + def _record_blob_acquired(self, blob, host, rate): + d = self.blob_manager.add_blob_to_download_history(blob, host, rate) def _pay_or_cancel_payment(self, arg, protocol, reserved_points, blob): if blob.length != 0 and (not isinstance(arg, Failure) or arg.check(DownloadCanceledError)): self._pay_peer(protocol, blob.length, reserved_points) - self._record_blob_aquired(str(blob), protocol.peer.host, reserved_points.amount) + self._record_blob_acquired(str(blob), protocol.peer.host, reserved_points.amount) else: self._cancel_points(reserved_points) @@ -178,17 +233,11 @@ class BlobRequester(object): return True return False - def _blobs_to_download(self): - needed_blobs = self.download_manager.needed_blobs() - return sorted(needed_blobs, key=lambda b: b.is_downloading()) - def _blobs_without_sources(self): return [b for b in self.download_manager.needed_blobs() if not self._hash_available(b.blob_hash)] def _get_availability_request(self, peer): - all_needed = [b.blob_hash for b in self._blobs_to_download() if not b.blob_hash in self._available_blobs[peer]] - # sort them so that the peer will be asked first for blobs it hasn't said it doesn't have - to_request = sorted(all_needed, key=lambda b: b in self._unavailable_blobs[peer])[:20] + to_request = self._get_blobs_to_request_from_peer(peer) if to_request: r_dict = {'requested_blobs': to_request} response_identifier = 'available_blobs' @@ -217,36 +266,24 @@ class BlobRequester(object): request = ClientBlobRequest(request_dict, response_identifier, counting_write_func, d, cancel_func, blob_to_download) - log.info("Requesting blob %s from %s", str(blob_to_download), str(peer)) - return request - - def _price_settled(self, protocol): - if protocol in self._protocol_prices: - return True - return False - - def _get_price_request(self, peer, protocol): - request = None - if not protocol in self._protocol_prices: - self._protocol_prices[protocol] = self.payment_rate_manager.get_rate_blob_data(peer) - request_dict = {'blob_data_payment_rate': self._protocol_prices[protocol]} - request = ClientRequest(request_dict, 'blob_data_payment_rate') + # log.info("Requesting blob %s from %s", str(blob_to_download), str(peer)) return request def _update_local_score(self, peer, amount): self._peers[peer] += amount def _reserve_points(self, peer, protocol, max_bytes): - assert protocol in self._protocol_prices - points_to_reserve = 1.0 * max_bytes * self._protocol_prices[protocol] / 2**20 - return self.wallet.reserve_points(peer, points_to_reserve) + if protocol in self._protocol_prices: + points_to_reserve = 1.0 * max_bytes * self._protocol_prices[protocol] / 2 ** 20 + return self.wallet.reserve_points(peer, points_to_reserve) + return None def _pay_peer(self, protocol, num_bytes, reserved_points): - assert num_bytes != 0 - assert protocol in self._protocol_prices - point_amount = 1.0 * num_bytes * self._protocol_prices[protocol] / 2**20 - self.wallet.send_points(reserved_points, point_amount) - self.payment_rate_manager.record_points_paid(point_amount) + if num_bytes != 0 and protocol in self._protocol_prices: + point_amount = 1.0 * num_bytes * self._protocol_prices[protocol] / 2**20 + self.wallet.send_points(reserved_points, point_amount) + self.payment_rate_manager.record_points_paid(point_amount) + log.debug("Pay peer %s", str(point_amount)) def _cancel_points(self, reserved_points): self.wallet.cancel_point_reservation(reserved_points) @@ -268,6 +305,7 @@ class BlobRequester(object): return True def _handle_incoming_blob(self, response_dict, peer, request): + log.debug("Handling incoming blob: %s", str(response_dict)) if not request.response_identifier in response_dict: return InvalidResponseError("response identifier not in response") if not type(response_dict[request.response_identifier]) == dict: @@ -295,18 +333,6 @@ class BlobRequester(object): return InvalidResponseError("Could not set the length of the blob") return True - def _handle_price_response(self, response_dict, peer, request, protocol): - if not request.response_identifier in response_dict: - return InvalidResponseError("response identifier not in response") - assert protocol in self._protocol_prices - response = response_dict[request.response_identifier] - if response == "RATE_ACCEPTED": - return True - else: - del self._protocol_prices[protocol] - self._price_disagreements.append(peer) - return True - def _request_failed(self, reason, request_type, peer): if reason.check(RequestCanceledError): return diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index aad0dcdf7..121f86099 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -191,7 +191,9 @@ class ClientProtocol(Protocol): for success, result in results: if success is False: failed = True - log.info("The connection is closing due to an error: %s", str(result.getTraceback())) + if not isinstance(result.value, DownloadCanceledError): + log.info(result.value) + log.info("The connection is closing due to an error: %s", str(result.getTraceback())) if failed is False: log.debug("Asking for another request.") from twisted.internet import reactor @@ -215,7 +217,7 @@ class ClientProtocol(Protocol): # TODO: always be this way. it's done this way now because the client has no other way # TODO: of telling the server it wants the download to stop. It would be great if the # TODO: protocol had such a mechanism. - log.info("Closing the connection to %s because the download of blob %s was canceled", + log.debug("Closing the connection to %s because the download of blob %s was canceled", str(self.peer), str(self._blob_download_request.blob)) #self.transport.loseConnection() #return True diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index 517a1406f..3b7fe23c4 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -78,8 +78,7 @@ def disable_third_party_loggers(): def disable_noisy_loggers(): logging.getLogger('BitcoinRPC').setLevel(logging.INFO) logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO) - logging.getLogger('lbrynet.core.client').setLevel(logging.INFO) - logging.getLogger('lbrynet.core.server').setLevel(logging.INFO) + logging.getLogger('lbrynet.core').setLevel(logging.INFO) logging.getLogger('lbrynet.dht').setLevel(logging.INFO) logging.getLogger('lbrynet.lbrynet_daemon').setLevel(logging.INFO) logging.getLogger('lbrynet.core.LBRYWallet').setLevel(logging.INFO) diff --git a/lbrynet/core/server/BlobAvailabilityHandler.py b/lbrynet/core/server/BlobAvailabilityHandler.py index a5d550bdf..dbd373a36 100644 --- a/lbrynet/core/server/BlobAvailabilityHandler.py +++ b/lbrynet/core/server/BlobAvailabilityHandler.py @@ -40,7 +40,7 @@ class BlobAvailabilityHandler(object): def handle_queries(self, queries): if self.query_identifiers[0] in queries: - log.debug("Received the client's list of requested blobs") + log.info("Received the client's list of requested blobs") d = self._get_available_blobs(queries[self.query_identifiers[0]]) def set_field(available_blobs): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 69686c5aa..a594b9c42 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -1,8 +1,12 @@ import logging + from twisted.internet import defer from twisted.protocols.basic import FileSender from twisted.python.failure import Failure from zope.interface import implements + +from lbrynet.core.Offer import Offer, Negotiate +from lbrynet.core.Strategy import get_default_strategy from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IBlobSender @@ -12,7 +16,8 @@ log = logging.getLogger(__name__) class BlobRequestHandlerFactory(object): implements(IQueryHandlerFactory) - def __init__(self, blob_manager, wallet, payment_rate_manager): + def __init__(self, blob_manager, blob_tracker, wallet, payment_rate_manager): + self.blob_tracker = blob_tracker self.blob_manager = blob_manager self.wallet = wallet self.payment_rate_manager = payment_rate_manager @@ -20,7 +25,7 @@ class BlobRequestHandlerFactory(object): ######### IQueryHandlerFactory ######### def build_query_handler(self): - q_h = BlobRequestHandler(self.blob_manager, self.wallet, self.payment_rate_manager) + q_h = BlobRequestHandler(self.blob_manager, self.blob_tracker, self.wallet, self.payment_rate_manager) return q_h def get_primary_query_identifier(self): @@ -33,17 +38,20 @@ class BlobRequestHandlerFactory(object): class BlobRequestHandler(object): implements(IQueryHandler, IBlobSender) - def __init__(self, blob_manager, wallet, payment_rate_manager): + def __init__(self, blob_manager, blob_tracker, wallet, payment_rate_manager): self.blob_manager = blob_manager + self.blob_tracker = blob_tracker self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.query_identifiers = ['blob_data_payment_rate', 'requested_blob'] + self.query_identifiers = ['blob_data_payment_rate', 'requested_blob', 'requested_blobs'] self.peer = None self.blob_data_payment_rate = None self.read_handle = None self.currently_uploading = None self.file_sender = None self.blob_bytes_uploaded = 0 + self.strategy = get_default_strategy(self.blob_tracker) + self._blobs_requested = [] ######### IQueryHandler ######### @@ -53,52 +61,22 @@ class BlobRequestHandler(object): request_handler.register_blob_sender(self) def handle_queries(self, queries): - response = {} + response = defer.succeed({}) + + if self.query_identifiers[2] in queries: + self._blobs_requested = queries[self.query_identifiers[2]] + response.addCallback(lambda r: self._reply_to_availability(r, self._blobs_requested)) + if self.query_identifiers[0] in queries: - requested_rate = queries[self.query_identifiers[0]] - if not self.handle_blob_data_payment_rate(requested_rate): - response['blob_data_payment_rate'] = "RATE_TOO_LOW" - else: - response['blob_data_payment_rate'] = 'RATE_ACCEPTED' - log.debug(response['blob_data_payment_rate']) + offer = Offer(queries[self.query_identifiers[0]]) + response.addCallback(lambda r: self.reply_to_offer(offer, r)) if self.query_identifiers[1] in queries: - log.debug("Received the client's request to send a blob") - response_fields = {} - response['incoming_blob'] = response_fields + incoming = queries[self.query_identifiers[1]] + log.info("Request download: %s", str(incoming)) + response.addCallback(lambda r: self._reply_to_send_request({}, incoming)) - if self.blob_data_payment_rate is None: - response_fields['error'] = "RATE_UNSET" - return defer.succeed(response) - else: - - d = self.blob_manager.get_blob(queries[self.query_identifiers[1]], True) - - def open_blob_for_reading(blob): - if blob.is_validated(): - read_handle = blob.open_for_reading() - if read_handle is not None: - self.currently_uploading = blob - self.read_handle = read_handle - log.info("Sending %s to client", str(blob)) - response_fields['blob_hash'] = blob.blob_hash - response_fields['length'] = blob.length - return response - log.warning("We can not send %s", str(blob)) - response_fields['error'] = "BLOB_UNAVAILABLE" - return response, blob - - def record_transaction(response, blob, rate): - d = self.blob_manager.blob_history_manager.add_transaction(str(blob), self.peer.host, rate, upload=True) - d.addCallback(lambda _: response) - return d - - d.addCallback(open_blob_for_reading) - d.addCallback(lambda (response, blob): record_transaction(response, blob, queries[self.query_identifiers[0]])) - - return d - else: - return defer.succeed(response) + return response ######### IBlobSender ######### @@ -116,12 +94,72 @@ class BlobRequestHandler(object): ######### internal ######### - def handle_blob_data_payment_rate(self, requested_payment_rate): - if not self.payment_rate_manager.accept_rate_blob_data(self.peer, requested_payment_rate): - return False + def _add_to_response(self, response, to_add): + + return response + + def _reply_to_availability(self, request, blobs): + d = self._get_available_blobs(blobs) + + def set_available(available_blobs): + log.debug("available blobs: %s", str(available_blobs)) + request.update({'available_blobs': available_blobs}) + return request + + d.addCallback(set_available) + return d + + def open_blob_for_reading(self, blob, response): + response_fields = {} + if blob.is_validated(): + read_handle = blob.open_for_reading() + if read_handle is not None: + self.currently_uploading = blob + self.read_handle = read_handle + log.info("Sending %s to client", str(blob)) + response_fields['blob_hash'] = blob.blob_hash + response_fields['length'] = blob.length + response['incoming_blob'] = response_fields + log.info(response) + return response, blob + log.warning("We can not send %s", str(blob)) + response['error'] = "BLOB_UNAVAILABLE" + return response, blob + + def record_transaction(self, response, blob, rate): + d = self.blob_manager.add_blob_to_upload_history(str(blob), self.peer.host, rate) + d.addCallback(lambda _: response) + log.info(response) + return d + + def _reply_to_send_request(self, response, incoming): + response_fields = {} + response['incoming_blob'] = response_fields + rate = self.blob_data_payment_rate + + if self.blob_data_payment_rate is None: + log.warning("Rate not set yet") + response['error'] = "RATE_UNSET" + return defer.succeed(response) else: - self.blob_data_payment_rate = requested_payment_rate - return True + d = self.blob_manager.get_blob(incoming, True) + d.addCallback(lambda blob: self.open_blob_for_reading(blob, response)) + d.addCallback(lambda (r, blob): self.record_transaction(r, blob, rate)) + return d + + def reply_to_offer(self, offer, request): + blobs = request.get("available_blobs", []) + log.info("Offered rate %f/mb for %i blobs", offer.rate, len(blobs)) + reply = self.strategy.respond_to_offer(offer, self.peer, blobs) + if reply.accepted: + self.blob_data_payment_rate = reply.rate + r = Negotiate.make_dict_from_offer(reply) + request.update(r) + return request + + def _get_available_blobs(self, requested_blobs): + d = self.blob_manager.completed_blobs(requested_blobs) + return d def send_file(self, consumer): @@ -140,7 +178,7 @@ class BlobRequestHandler(object): def start_transfer(): self.file_sender = FileSender() - log.debug("Starting the file upload") + log.info("Starting the file upload") assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, consumer, count_bytes) return d diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index e0091598f..031a3f8aa 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -48,7 +48,7 @@ class CryptStreamDownloader(object): @param blob_manager: A BlobManager object - @param payment_rate_manager: A PaymentRateManager object + @param payment_rate_manager: A NegotiatedPaymentRateManager object @param wallet: An object which implements the ILBRYWallet interface diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index afde816ea..338e5cefe 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -12,7 +12,7 @@ from twisted.python.failure import Failure from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType -from lbrynet.core.PaymentRateManager import PaymentRateManager +from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError, CurrentlyStoppingError from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -74,7 +74,8 @@ class LBRYFileManager(object): def _start_lbry_files(self): def set_options_and_restore(rowid, stream_hash, options): - payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) + payment_rate_manager = NegotiatedPaymentRateManager(self.session.base_payment_rate_manager, + self.session.blob_tracker) d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, blob_data_rate=options) d.addCallback(lambda downloader: downloader.restore()) diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index 42e9e2476..309d654db 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -644,7 +644,7 @@ class AddStream(CommandHandler): for option, option_value in zip(self.download_options, self.options_chosen): if option.short_description == "data payment rate": if option_value == None: - rate = self.payment_rate_manager.get_effective_min_blob_data_payment_rate() + rate = 0.0 else: rate = option_value stream_size = None diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index b258d914d..e0d64b783 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -27,8 +27,7 @@ from txjsonrpc.web.jsonrpc import Handler from lbrynet import __version__ as lbrynet_version from lbryum.version import LBRYUM_VERSION as lbryum_version from lbrynet import analytics -from lbrynet.core.PaymentRateManager import PaymentRateManager -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory +from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import UnknownNameError, InsufficientFundsError, InvalidNameError @@ -545,7 +544,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self._setup_stream_identifier()) d.addCallback(lambda _: self._setup_lbry_file_manager()) - d.addCallback(lambda _: self._setup_lbry_file_opener()) d.addCallback(lambda _: self._setup_query_handlers()) d.addCallback(lambda _: self._setup_server()) d.addCallback(lambda _: _log_starting_vals()) @@ -776,17 +774,15 @@ class LBRYDaemon(jsonrpc.JSONRPC): handlers = [ # CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, # self._server_payment_rate_manager), - BlobAvailabilityHandlerFactory(self.session.blob_manager), - # BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, - # self._server_payment_rate_manager), + # BlobAvailabilityHandlerFactory(self.session.blob_manager), + BlobRequestHandlerFactory(self.session.blob_manager, self.session.blob_tracker, self.session.wallet, + self.session.payment_rate_manager), self.session.wallet.get_wallet_info_query_handler_factory(), ] def get_blob_request_handler_factory(rate): - self.blob_request_payment_rate_manager = PaymentRateManager( - self.session.base_payment_rate_manager, rate - ) - handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, + self.blob_request_payment_rate_manager = self.session.payment_rate_manager + handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.blob_tracker, self.session.wallet, self.blob_request_payment_rate_manager)) d1 = self.settings.get_server_data_payment_rate() @@ -1097,14 +1093,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, file_opener_factory) return defer.succeed(None) - def _setup_lbry_file_opener(self): - - downloader_factory = LBRYFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter, - self.session.blob_manager, self.stream_info_manager, - self.session.wallet) - self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory) - return defer.succeed(True) - def _download_sd_blob(self, sd_hash, timeout=DEFAULT_SD_DOWNLOAD_TIMEOUT): def cb(result): if not r.called: @@ -1116,7 +1104,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): r = defer.Deferred(None) reactor.callLater(timeout, eb) - d = download_sd_blob(self.session, sd_hash, PaymentRateManager(self.session.base_payment_rate_manager)) + d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager) d.addCallback(BlobStreamDescriptorReader) d.addCallback(lambda blob: blob.get_info()) d.addCallback(cb) @@ -2565,6 +2553,20 @@ class LBRYDaemon(jsonrpc.JSONRPC): return d + def jsonrpc_get_mean_availability(self): + """ + Get mean blob availability + + Args: + None + Returns: + Mean peers for a blob + """ + + d = self._render_response(self.session.blob_tracker.last_mean_availability, OK_CODE) + return d + + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" response = requests.get('https://api.github.com/repos/lbryio/lbry/releases/latest') diff --git a/lbrynet/lbrynet_daemon/LBRYDownloader.py b/lbrynet/lbrynet_daemon/LBRYDownloader.py index cf7289354..7553ff4f0 100644 --- a/lbrynet/lbrynet_daemon/LBRYDownloader.py +++ b/lbrynet/lbrynet_daemon/LBRYDownloader.py @@ -9,7 +9,6 @@ from twisted.internet import defer from twisted.internet.task import LoopingCall from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed -from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.metadata.LBRYFee import LBRYFeeValidator from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory @@ -52,7 +51,7 @@ class GetStream(object): self.file_name = file_name self.session = session self.exchange_rate_manager = exchange_rate_manager - self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) + self.payment_rate_manager = self.session.payment_rate_manager self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier self.stream_hash = None diff --git a/lbrynet/lbrynet_daemon/LBRYPublisher.py b/lbrynet/lbrynet_daemon/LBRYPublisher.py index f162ca24c..622625bd1 100644 --- a/lbrynet/lbrynet_daemon/LBRYPublisher.py +++ b/lbrynet/lbrynet_daemon/LBRYPublisher.py @@ -9,7 +9,6 @@ from appdirs import user_data_dir from lbrynet.core.Error import InsufficientFundsError from lbrynet.lbryfilemanager.LBRYFileCreator import create_lbry_file from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob -from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.metadata.LBRYMetadata import Metadata from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader from lbrynet import reflector @@ -102,7 +101,7 @@ class Publisher(object): def add_to_lbry_files(self, stream_hash): self.stream_hash = stream_hash - prm = PaymentRateManager(self.session.base_payment_rate_manager) + prm = self.session.payment_rate_manager d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) d.addCallback(self.set_lbry_file) return d