forked from LBRYCommunity/lbry-sdk
blob upload/download history and BlobPriceAndAvailabilityTracker
This commit is contained in:
parent
c75d240a65
commit
5dd29da84f
6 changed files with 197 additions and 4 deletions
52
lbrynet/core/BlobHistory.py
Normal file
52
lbrynet/core/BlobHistory.py
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
import os
|
||||||
|
from twisted.enterprise import adbapi
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class BlobHistoryManager(object):
|
||||||
|
"""
|
||||||
|
Class to archive historical blob upload and download information
|
||||||
|
|
||||||
|
This class creates two tables in lbry data folder/blob_history.db, 'download' and 'upload'
|
||||||
|
The tables store information about what blob was uploaded or downloaded, to or from which peer,
|
||||||
|
at what price, and when.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, db_dir):
|
||||||
|
self.db = None
|
||||||
|
self.db_dir = db_dir
|
||||||
|
|
||||||
|
def _open_db(self):
|
||||||
|
self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blob_history.db"),
|
||||||
|
check_same_thread=False)
|
||||||
|
|
||||||
|
def create_tables(transaction):
|
||||||
|
transaction.execute("create table if not exists download (" +
|
||||||
|
" id integer primary key autoincrement, " +
|
||||||
|
" blob text, " +
|
||||||
|
" host text, " +
|
||||||
|
" rate float, " +
|
||||||
|
" ts integer)")
|
||||||
|
|
||||||
|
transaction.execute("create table if not exists upload (" +
|
||||||
|
" id integer primary key autoincrement, " +
|
||||||
|
" blob text, " +
|
||||||
|
" host text, " +
|
||||||
|
" rate float, " +
|
||||||
|
" ts integer)")
|
||||||
|
|
||||||
|
return self.db.runInteraction(create_tables)
|
||||||
|
|
||||||
|
def add_transaction(self, blob_hash, host, rate, upload=False):
|
||||||
|
ts = int(time.time())
|
||||||
|
if upload:
|
||||||
|
d = self.db.runQuery("insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts))
|
||||||
|
else:
|
||||||
|
d = self.db.runQuery("insert into download values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
d = self._open_db()
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ from lbrynet.core.utils import is_valid_blobhash
|
||||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj
|
from lbrynet.core.cryptoutils import get_lbry_hash_obj
|
||||||
from lbrynet.core.Error import NoSuchBlobError
|
from lbrynet.core.Error import NoSuchBlobError
|
||||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||||
|
from lbrynet.core.BlobHistory import BlobHistoryManager
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -83,11 +84,13 @@ class DiskBlobManager(BlobManager):
|
||||||
self.blobs = {}
|
self.blobs = {}
|
||||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
||||||
self._next_manage_call = None
|
self._next_manage_call = None
|
||||||
|
self.blob_history_manager = BlobHistoryManager(db_dir)
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir),
|
log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir),
|
||||||
str(self.db_file))
|
str(self.db_file))
|
||||||
d = self._open_db()
|
d = self._open_db()
|
||||||
|
d.addCallback(lambda _: self.blob_history_manager.start())
|
||||||
d.addCallback(lambda _: self._manage())
|
d.addCallback(lambda _: self._manage())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
111
lbrynet/core/BlobPrice.py
Normal file
111
lbrynet/core/BlobPrice.py
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from zope.interface import Interface, Attribute
|
||||||
|
from twisted.internet import defer
|
||||||
|
from twisted.internet.task import LoopingCall
|
||||||
|
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE as min_price
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
base_price = min_price * 10
|
||||||
|
|
||||||
|
# how heavily to value blobs towards the front of the stream
|
||||||
|
alpha = 1.0
|
||||||
|
|
||||||
|
|
||||||
|
def frontload(index):
|
||||||
|
"""
|
||||||
|
Get frontload multipler
|
||||||
|
|
||||||
|
@param index: blob position in stream
|
||||||
|
@return: frontload multipler
|
||||||
|
"""
|
||||||
|
|
||||||
|
return 2.0 - (alpha**index)
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_price(mean_availability, availability, index_position=0):
|
||||||
|
"""
|
||||||
|
Calculate mean availability weighted price for a blob
|
||||||
|
|
||||||
|
@param mean_availability: sum of blob availabilities over the number of known blobs
|
||||||
|
@param availability: number of known peers for blob
|
||||||
|
@param index_position: blob index position in stream
|
||||||
|
@return: price
|
||||||
|
"""
|
||||||
|
|
||||||
|
price = max(min_price, base_price * (mean_availability/max(1, availability)) * frontload(index_position))
|
||||||
|
return price
|
||||||
|
|
||||||
|
|
||||||
|
class BlobPriceAndAvailabilityTracker(object):
|
||||||
|
"""
|
||||||
|
Class to track peer counts for known blobs and update price targets
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
prices (dist): dictionary of blob prices
|
||||||
|
availability (dict): dictionary of peers for known blobs
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, blob_manager, peer_finder, dht_node):
|
||||||
|
self.availability = {}
|
||||||
|
self.prices = {}
|
||||||
|
self._blob_manager = blob_manager
|
||||||
|
self._peer_finder = peer_finder
|
||||||
|
self._dht_node = dht_node
|
||||||
|
self._check_popular = LoopingCall(self._update_most_popular)
|
||||||
|
self._check_mine = LoopingCall(self._update_mine)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
log.info("Starting blob tracker")
|
||||||
|
self._check_popular.start(30)
|
||||||
|
self._check_mine.start(120)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self._check_popular.running:
|
||||||
|
self._check_popular.stop()
|
||||||
|
if self._check_mine.running:
|
||||||
|
self._check_mine.stop()
|
||||||
|
|
||||||
|
def _update_peers_for_blob(self, blob):
|
||||||
|
def _save_peer_info(blob_hash, peers):
|
||||||
|
v = {blob_hash: peers}
|
||||||
|
self.availability.update(v)
|
||||||
|
|
||||||
|
new_price = self._get_price(blob)
|
||||||
|
self.prices.update({blob: new_price})
|
||||||
|
return v
|
||||||
|
|
||||||
|
d = self._peer_finder.find_peers_for_blob(blob)
|
||||||
|
d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r])
|
||||||
|
d.addCallback(lambda peers: _save_peer_info(blob, peers))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _update_most_popular(self):
|
||||||
|
def _get_most_popular():
|
||||||
|
dl = []
|
||||||
|
for (hash, _) in self._dht_node.get_most_popular_hashes(100):
|
||||||
|
encoded = hash.encode('hex')
|
||||||
|
dl.append(self._update_peers_for_blob(encoded))
|
||||||
|
return defer.DeferredList(dl)
|
||||||
|
d = _get_most_popular()
|
||||||
|
|
||||||
|
def _update_mine(self):
|
||||||
|
def _get_peers(blobs):
|
||||||
|
dl = []
|
||||||
|
for hash in blobs:
|
||||||
|
dl.append(self._update_peers_for_blob(hash))
|
||||||
|
return defer.DeferredList(dl)
|
||||||
|
d = self._blob_manager.get_all_verified_blobs()
|
||||||
|
d.addCallback(_get_peers)
|
||||||
|
|
||||||
|
def _get_mean_peers(self):
|
||||||
|
num_peers = [len(self.availability[blob]) for blob in self.availability]
|
||||||
|
mean = float(sum(num_peers)) / float(max(1, len(num_peers)))
|
||||||
|
return mean
|
||||||
|
|
||||||
|
def _get_price(self, blob):
|
||||||
|
mean_available = self._get_mean_peers()
|
||||||
|
blob_availability = len(self.availability.get(blob, []))
|
||||||
|
price = calculate_price(mean_available, blob_availability)
|
||||||
|
return price
|
|
@ -10,6 +10,7 @@ from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
|
||||||
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer
|
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer
|
||||||
from lbrynet.core.utils import generate_id
|
from lbrynet.core.utils import generate_id
|
||||||
from lbrynet.core.PaymentRateManager import BasePaymentRateManager
|
from lbrynet.core.PaymentRateManager import BasePaymentRateManager
|
||||||
|
from lbrynet.core.BlobPrice import BlobPriceAndAvailabilityTracker
|
||||||
from twisted.internet import threads, defer
|
from twisted.internet import threads, defer
|
||||||
|
|
||||||
|
|
||||||
|
@ -103,6 +104,7 @@ class LBRYSession(object):
|
||||||
self.dht_node = None
|
self.dht_node = None
|
||||||
|
|
||||||
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
|
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
|
||||||
|
self.blob_tracker = None
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
"""Create the blob directory and database if necessary, start all desired services"""
|
"""Create the blob directory and database if necessary, start all desired services"""
|
||||||
|
@ -136,6 +138,8 @@ class LBRYSession(object):
|
||||||
def shut_down(self):
|
def shut_down(self):
|
||||||
"""Stop all services"""
|
"""Stop all services"""
|
||||||
ds = []
|
ds = []
|
||||||
|
if self.blob_manager is not None:
|
||||||
|
ds.append(defer.maybeDeferred(self.blob_tracker.stop))
|
||||||
if self.dht_node is not None:
|
if self.dht_node is not None:
|
||||||
ds.append(defer.maybeDeferred(self.dht_node.stop))
|
ds.append(defer.maybeDeferred(self.dht_node.stop))
|
||||||
if self.rate_limiter is not None:
|
if self.rate_limiter is not None:
|
||||||
|
@ -255,13 +259,17 @@ class LBRYSession(object):
|
||||||
else:
|
else:
|
||||||
self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir)
|
self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir)
|
||||||
|
|
||||||
|
if self.blob_tracker is None:
|
||||||
|
self.blob_tracker = BlobPriceAndAvailabilityTracker(self.blob_manager, self.peer_finder, self.dht_node)
|
||||||
|
|
||||||
self.rate_limiter.start()
|
self.rate_limiter.start()
|
||||||
d1 = self.blob_manager.setup()
|
d1 = self.blob_manager.setup()
|
||||||
d2 = self.wallet.start()
|
d2 = self.wallet.start()
|
||||||
|
|
||||||
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True, consumeErrors=True)
|
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True, consumeErrors=True)
|
||||||
|
dl.addCallback(lambda _: self.blob_tracker.start())
|
||||||
|
|
||||||
dl.addErrback(lambda err: err.value.subFailure)
|
dl.addErrback(self._subfailure)
|
||||||
return dl
|
return dl
|
||||||
|
|
||||||
def _unset_upnp(self):
|
def _unset_upnp(self):
|
||||||
|
@ -282,3 +290,9 @@ class LBRYSession(object):
|
||||||
d = threads.deferToThread(threaded_unset_upnp)
|
d = threads.deferToThread(threaded_unset_upnp)
|
||||||
d.addErrback(lambda err: str(err))
|
d.addErrback(lambda err: str(err))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def _subfailure(self, err):
|
||||||
|
log.warning(err.getTraceback())
|
||||||
|
return err.value.subFailure
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -101,11 +101,16 @@ class BlobRequester(object):
|
||||||
self._update_local_score(peer, -10.0)
|
self._update_local_score(peer, -10.0)
|
||||||
return reason
|
return reason
|
||||||
|
|
||||||
|
def _record_blob_aquired(self, blob, host, rate):
|
||||||
|
self.blob_manager.blob_history_manager.add_transaction(blob, host, rate, upload=False)
|
||||||
|
|
||||||
def _pay_or_cancel_payment(self, arg, protocol, reserved_points, blob):
|
def _pay_or_cancel_payment(self, arg, protocol, reserved_points, blob):
|
||||||
if blob.length != 0 and (not isinstance(arg, Failure) or arg.check(DownloadCanceledError)):
|
if blob.length != 0 and (not isinstance(arg, Failure) or arg.check(DownloadCanceledError)):
|
||||||
self._pay_peer(protocol, blob.length, reserved_points)
|
self._pay_peer(protocol, blob.length, reserved_points)
|
||||||
|
self._record_blob_aquired(str(blob), protocol.peer.host, reserved_points.amount)
|
||||||
else:
|
else:
|
||||||
self._cancel_points(reserved_points)
|
self._cancel_points(reserved_points)
|
||||||
|
|
||||||
return arg
|
return arg
|
||||||
|
|
||||||
def _handle_download_error(self, err, peer, blob_to_download):
|
def _handle_download_error(self, err, peer, blob_to_download):
|
||||||
|
|
|
@ -55,10 +55,12 @@ class BlobRequestHandler(object):
|
||||||
def handle_queries(self, queries):
|
def handle_queries(self, queries):
|
||||||
response = {}
|
response = {}
|
||||||
if self.query_identifiers[0] in queries:
|
if self.query_identifiers[0] in queries:
|
||||||
if not self.handle_blob_data_payment_rate(queries[self.query_identifiers[0]]):
|
requested_rate = queries[self.query_identifiers[0]]
|
||||||
|
if not self.handle_blob_data_payment_rate(requested_rate):
|
||||||
response['blob_data_payment_rate'] = "RATE_TOO_LOW"
|
response['blob_data_payment_rate'] = "RATE_TOO_LOW"
|
||||||
else:
|
else:
|
||||||
response['blob_data_payment_rate'] = 'RATE_ACCEPTED'
|
response['blob_data_payment_rate'] = 'RATE_ACCEPTED'
|
||||||
|
log.debug(response['blob_data_payment_rate'])
|
||||||
|
|
||||||
if self.query_identifiers[1] in queries:
|
if self.query_identifiers[1] in queries:
|
||||||
log.debug("Received the client's request to send a blob")
|
log.debug("Received the client's request to send a blob")
|
||||||
|
@ -84,9 +86,15 @@ class BlobRequestHandler(object):
|
||||||
return response
|
return response
|
||||||
log.warning("We can not send %s", str(blob))
|
log.warning("We can not send %s", str(blob))
|
||||||
response_fields['error'] = "BLOB_UNAVAILABLE"
|
response_fields['error'] = "BLOB_UNAVAILABLE"
|
||||||
return response
|
return response, blob
|
||||||
|
|
||||||
|
def record_transaction(response, blob, rate):
|
||||||
|
d = self.blob_manager.blob_history_manager.add_transaction(str(blob), self.peer.host, rate, upload=True)
|
||||||
|
d.addCallback(lambda _: response)
|
||||||
|
return d
|
||||||
|
|
||||||
d.addCallback(open_blob_for_reading)
|
d.addCallback(open_blob_for_reading)
|
||||||
|
d.addCallback(lambda (response, blob): record_transaction(response, blob, queries[self.query_identifiers[0]]))
|
||||||
|
|
||||||
return d
|
return d
|
||||||
else:
|
else:
|
||||||
|
@ -155,6 +163,6 @@ class BlobRequestHandler(object):
|
||||||
self.currently_uploading = None
|
self.currently_uploading = None
|
||||||
self.file_sender = None
|
self.file_sender = None
|
||||||
if reason is not None and isinstance(reason, Failure):
|
if reason is not None and isinstance(reason, Failure):
|
||||||
log.info("Upload has failed. Reason: %s", reason.getErrorMessage())
|
log.warning("Upload has failed. Reason: %s", reason.getErrorMessage())
|
||||||
|
|
||||||
return _send_file()
|
return _send_file()
|
||||||
|
|
Loading…
Add table
Reference in a new issue