diff --git a/lbrynet/core/BlobHistory.py b/lbrynet/core/BlobHistory.py new file mode 100644 index 000000000..eebafd8bc --- /dev/null +++ b/lbrynet/core/BlobHistory.py @@ -0,0 +1,52 @@ +import os +from twisted.enterprise import adbapi +import time + + +class BlobHistoryManager(object): + """ + Class to archive historical blob upload and download information + + This class creates two tables in lbry data folder/blob_history.db, 'download' and 'upload' + The tables store information about what blob was uploaded or downloaded, to or from which peer, + at what price, and when. + """ + + def __init__(self, db_dir): + self.db = None + self.db_dir = db_dir + + def _open_db(self): + self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blob_history.db"), + check_same_thread=False) + + def create_tables(transaction): + transaction.execute("create table if not exists download (" + + " id integer primary key autoincrement, " + + " blob text, " + + " host text, " + + " rate float, " + + " ts integer)") + + transaction.execute("create table if not exists upload (" + + " id integer primary key autoincrement, " + + " blob text, " + + " host text, " + + " rate float, " + + " ts integer)") + + return self.db.runInteraction(create_tables) + + def add_transaction(self, blob_hash, host, rate, upload=False): + ts = int(time.time()) + if upload: + d = self.db.runQuery("insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) + else: + d = self.db.runQuery("insert into download values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) + return d + + def start(self): + d = self._open_db() + return d + + diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index fc37a3b7e..1e362b943 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -11,6 +11,7 @@ from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked +from lbrynet.core.BlobHistory import BlobHistoryManager log = logging.getLogger(__name__) @@ -83,11 +84,13 @@ class DiskBlobManager(BlobManager): self.blobs = {} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} self._next_manage_call = None + self.blob_history_manager = BlobHistoryManager(db_dir) def setup(self): log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir), str(self.db_file)) d = self._open_db() + d.addCallback(lambda _: self.blob_history_manager.start()) d.addCallback(lambda _: self._manage()) return d diff --git a/lbrynet/core/BlobPrice.py b/lbrynet/core/BlobPrice.py new file mode 100644 index 000000000..a41978e66 --- /dev/null +++ b/lbrynet/core/BlobPrice.py @@ -0,0 +1,111 @@ +import logging + +from zope.interface import Interface, Attribute +from twisted.internet import defer +from twisted.internet.task import LoopingCall +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE as min_price + +log = logging.getLogger(__name__) + +base_price = min_price * 10 + +# how heavily to value blobs towards the front of the stream +alpha = 1.0 + + +def frontload(index): + """ + Get frontload multipler + + @param index: blob position in stream + @return: frontload multipler + """ + + return 2.0 - (alpha**index) + + +def calculate_price(mean_availability, availability, index_position=0): + """ + Calculate mean availability weighted price for a blob + + @param mean_availability: sum of blob availabilities over the number of known blobs + @param availability: number of known peers for blob + @param index_position: blob index position in stream + @return: price + """ + + price = max(min_price, base_price * (mean_availability/max(1, availability)) * frontload(index_position)) + return price + + +class BlobPriceAndAvailabilityTracker(object): + """ + Class to track peer counts for known blobs and update price targets + + Attributes: + prices (dist): dictionary of blob prices + availability (dict): dictionary of peers for known blobs + """ + + def __init__(self, blob_manager, peer_finder, dht_node): + self.availability = {} + self.prices = {} + self._blob_manager = blob_manager + self._peer_finder = peer_finder + self._dht_node = dht_node + self._check_popular = LoopingCall(self._update_most_popular) + self._check_mine = LoopingCall(self._update_mine) + + def start(self): + log.info("Starting blob tracker") + self._check_popular.start(30) + self._check_mine.start(120) + + def stop(self): + if self._check_popular.running: + self._check_popular.stop() + if self._check_mine.running: + self._check_mine.stop() + + def _update_peers_for_blob(self, blob): + def _save_peer_info(blob_hash, peers): + v = {blob_hash: peers} + self.availability.update(v) + + new_price = self._get_price(blob) + self.prices.update({blob: new_price}) + return v + + d = self._peer_finder.find_peers_for_blob(blob) + d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) + d.addCallback(lambda peers: _save_peer_info(blob, peers)) + return d + + def _update_most_popular(self): + def _get_most_popular(): + dl = [] + for (hash, _) in self._dht_node.get_most_popular_hashes(100): + encoded = hash.encode('hex') + dl.append(self._update_peers_for_blob(encoded)) + return defer.DeferredList(dl) + d = _get_most_popular() + + def _update_mine(self): + def _get_peers(blobs): + dl = [] + for hash in blobs: + dl.append(self._update_peers_for_blob(hash)) + return defer.DeferredList(dl) + d = self._blob_manager.get_all_verified_blobs() + d.addCallback(_get_peers) + + def _get_mean_peers(self): + num_peers = [len(self.availability[blob]) for blob in self.availability] + mean = float(sum(num_peers)) / float(max(1, len(num_peers))) + return mean + + def _get_price(self, blob): + mean_available = self._get_mean_peers() + blob_availability = len(self.availability.get(blob, [])) + price = calculate_price(mean_available, blob_availability) + return price \ No newline at end of file diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index a53df0cc1..82ca8028a 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -10,6 +10,7 @@ from lbrynet.core.HashAnnouncer import DummyHashAnnouncer from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer from lbrynet.core.utils import generate_id from lbrynet.core.PaymentRateManager import BasePaymentRateManager +from lbrynet.core.BlobPrice import BlobPriceAndAvailabilityTracker from twisted.internet import threads, defer @@ -103,6 +104,7 @@ class LBRYSession(object): self.dht_node = None self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) + self.blob_tracker = None def setup(self): """Create the blob directory and database if necessary, start all desired services""" @@ -136,6 +138,8 @@ class LBRYSession(object): def shut_down(self): """Stop all services""" ds = [] + if self.blob_manager is not None: + ds.append(defer.maybeDeferred(self.blob_tracker.stop)) if self.dht_node is not None: ds.append(defer.maybeDeferred(self.dht_node.stop)) if self.rate_limiter is not None: @@ -255,13 +259,17 @@ class LBRYSession(object): else: self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) + if self.blob_tracker is None: + self.blob_tracker = BlobPriceAndAvailabilityTracker(self.blob_manager, self.peer_finder, self.dht_node) + self.rate_limiter.start() d1 = self.blob_manager.setup() d2 = self.wallet.start() dl = defer.DeferredList([d1, d2], fireOnOneErrback=True, consumeErrors=True) + dl.addCallback(lambda _: self.blob_tracker.start()) - dl.addErrback(lambda err: err.value.subFailure) + dl.addErrback(self._subfailure) return dl def _unset_upnp(self): @@ -282,3 +290,9 @@ class LBRYSession(object): d = threads.deferToThread(threaded_unset_upnp) d.addErrback(lambda err: str(err)) return d + + def _subfailure(self, err): + log.warning(err.getTraceback()) + return err.value.subFailure + + diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index add4279a5..e257cbde1 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -101,11 +101,16 @@ class BlobRequester(object): self._update_local_score(peer, -10.0) return reason + def _record_blob_aquired(self, blob, host, rate): + self.blob_manager.blob_history_manager.add_transaction(blob, host, rate, upload=False) + def _pay_or_cancel_payment(self, arg, protocol, reserved_points, blob): if blob.length != 0 and (not isinstance(arg, Failure) or arg.check(DownloadCanceledError)): self._pay_peer(protocol, blob.length, reserved_points) + self._record_blob_aquired(str(blob), protocol.peer.host, reserved_points.amount) else: self._cancel_points(reserved_points) + return arg def _handle_download_error(self, err, peer, blob_to_download): diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 51d630951..69686c5aa 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -55,10 +55,12 @@ class BlobRequestHandler(object): def handle_queries(self, queries): response = {} if self.query_identifiers[0] in queries: - if not self.handle_blob_data_payment_rate(queries[self.query_identifiers[0]]): + requested_rate = queries[self.query_identifiers[0]] + if not self.handle_blob_data_payment_rate(requested_rate): response['blob_data_payment_rate'] = "RATE_TOO_LOW" else: response['blob_data_payment_rate'] = 'RATE_ACCEPTED' + log.debug(response['blob_data_payment_rate']) if self.query_identifiers[1] in queries: log.debug("Received the client's request to send a blob") @@ -84,9 +86,15 @@ class BlobRequestHandler(object): return response log.warning("We can not send %s", str(blob)) response_fields['error'] = "BLOB_UNAVAILABLE" - return response + return response, blob + + def record_transaction(response, blob, rate): + d = self.blob_manager.blob_history_manager.add_transaction(str(blob), self.peer.host, rate, upload=True) + d.addCallback(lambda _: response) + return d d.addCallback(open_blob_for_reading) + d.addCallback(lambda (response, blob): record_transaction(response, blob, queries[self.query_identifiers[0]])) return d else: @@ -155,6 +163,6 @@ class BlobRequestHandler(object): self.currently_uploading = None self.file_sender = None if reason is not None and isinstance(reason, Failure): - log.info("Upload has failed. Reason: %s", reason.getErrorMessage()) + log.warning("Upload has failed. Reason: %s", reason.getErrorMessage()) return _send_file()