diff --git a/CHANGELOG.md b/CHANGELOG.md index d436a7750..e7b50de93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,12 @@ at anytime. * Get lbry files with pending claims * Add better logging to help track down [#478](https://github.com/lbryio/lbry/issues/478) * Catch UnknownNameErrors when resolving a name. [#479](https://github.com/lbryio/lbry/issues/479) +### Changed + * Add blob_get, descriptor_get, and blob_delete + * Add filter keyword args to blob_list + * Refactor get_availability + * Add optional peer search timeout, add peer_search_timeout setting + ## [0.8.3rc3] - 2017-02-14 diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 020e1447d..a411f0f7a 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -197,6 +197,7 @@ ADJUSTABLE_SETTINGS = { 'run_on_startup': (bool, False), 'run_reflector_server': (bool, False), 'sd_download_timeout': (int, 3), + 'peer_search_timeout': (int, 3), 'search_servers': (list, ['lighthouse1.lbry.io:50005']), 'search_timeout': (float, 5.0), 'startup_scripts': (list, []), diff --git a/lbrynet/core/BlobAvailability.py b/lbrynet/core/BlobAvailability.py index a15b625db..8175701fe 100644 --- a/lbrynet/core/BlobAvailability.py +++ b/lbrynet/core/BlobAvailability.py @@ -38,17 +38,17 @@ class BlobAvailabilityTracker(object): if self._check_mine.running: self._check_mine.stop() - def get_blob_availability(self, blob): + def get_blob_availability(self, blob, timeout=None): def _get_peer_count(peers): have_blob = sum(1 for peer in peers if peer.is_available()) return {blob: have_blob} - d = self._peer_finder.find_peers_for_blob(blob) + d = self._peer_finder.find_peers_for_blob(blob, timeout) d.addCallback(_get_peer_count) return d - def get_availability_for_blobs(self, blobs): - dl = [self.get_blob_availability(blob) for blob in blobs if blob] + def get_availability_for_blobs(self, blobs, timeout=None): + dl = [self.get_blob_availability(blob, timeout) for blob in blobs if blob] d = defer.DeferredList(dl) d.addCallback(lambda results: [val for success, val in results if success]) return d @@ -57,7 +57,6 @@ class BlobAvailabilityTracker(object): def last_mean_availability(self): return max(Decimal(0.01), self._last_mean_availability) - def _update_peers_for_blob(self, blob): def _save_peer_info(blob_hash, peers): v = {blob_hash: peers} diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 79a982740..cd7959235 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -80,7 +80,7 @@ class NoSuchBlobError(Exception): pass -class NoSuchStreamHashError(Exception): +class NoSuchStreamHash(Exception): pass diff --git a/lbrynet/core/PaymentRateManager.py b/lbrynet/core/PaymentRateManager.py index 491c3ed79..cdda9b630 100644 --- a/lbrynet/core/PaymentRateManager.py +++ b/lbrynet/core/PaymentRateManager.py @@ -1,4 +1,4 @@ -from lbrynet.core.Strategy import get_default_strategy +from lbrynet.core.Strategy import get_default_strategy, OnlyFreeStrategy from lbrynet import conf from decimal import Decimal @@ -81,3 +81,46 @@ class NegotiatedPaymentRateManager(object): return (offer.is_too_low and round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5)) return False + + +class OnlyFreePaymentsManager(object): + def __init__(self, **kwargs): + """ + A payment rate manager that will only ever accept and offer a rate of 0.0, + Used for testing + """ + + self.base = BasePaymentRateManager(0.0, 0.0) + self.points_paid = 0.0 + self.generous = True + self.strategy = OnlyFreeStrategy() + + def get_rate_blob_data(self, peer, blobs): + response = self.strategy.make_offer(peer, blobs) + return response.rate + + def accept_rate_blob_data(self, peer, blobs, offer): + offer = self.strategy.respond_to_offer(offer, peer, blobs) + self.strategy.update_accepted_offers(peer, offer) + return offer.is_accepted + + def reply_to_offer(self, peer, blobs, offer): + reply = self.strategy.respond_to_offer(offer, peer, blobs) + self.strategy.update_accepted_offers(peer, reply) + return reply + + def get_rate_for_peer(self, peer): + return self.strategy.accepted_offers.get(peer, False) + + def record_points_paid(self, amount): + self.points_paid += amount + + def record_offer_reply(self, peer, offer): + self.strategy.update_accepted_offers(peer, offer) + + def price_limit_reached(self, peer): + if peer in self.strategy.pending_sent_offers: + offer = self.strategy.pending_sent_offers[peer] + if offer.rate > 0.0: + return True + return False diff --git a/lbrynet/core/PriceModel.py b/lbrynet/core/PriceModel.py index 844a0e2c7..aad6eb42f 100644 --- a/lbrynet/core/PriceModel.py +++ b/lbrynet/core/PriceModel.py @@ -9,6 +9,14 @@ def get_default_price_model(blob_tracker, base_price, **kwargs): return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs) +class ZeroPrice(object): + def __init__(self): + self.base_price = 0.0 + + def calculate_price(self, blob): + return 0.0 + + class MeanAvailabilityWeightedPrice(object): """Calculate mean-blob-availability and stream-position weighted price for a blob diff --git a/lbrynet/core/Strategy.py b/lbrynet/core/Strategy.py index 4ae620f4a..0ee0d1efd 100644 --- a/lbrynet/core/Strategy.py +++ b/lbrynet/core/Strategy.py @@ -3,7 +3,7 @@ from decimal import Decimal from lbrynet import conf from lbrynet.interfaces import INegotiationStrategy from lbrynet.core.Offer import Offer -from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice +from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice, ZeroPrice def get_default_strategy(blob_tracker, **kwargs): @@ -134,3 +134,19 @@ class BasicAvailabilityWeightedStrategy(Strategy): with_premium = self._premium(rate, offer_count) rounded_price = round(with_premium, 5) return self._bounded_price(rounded_price) + + +class OnlyFreeStrategy(Strategy): + implementer(INegotiationStrategy) + def __init__(self, *args, **kwargs): + price_model = ZeroPrice() + Strategy.__init__(self, price_model, 0.0, 0.0, True) + + def _get_mean_rate(self, rates): + return 0.0 + + def _get_response_rate(self, rates, offer_count): + return 0.0 + + def _make_rate_offer(self, rates, offer_count): + return 0.0 diff --git a/lbrynet/core/client/DHTPeerFinder.py b/lbrynet/core/client/DHTPeerFinder.py index d4f299d8e..7970a194a 100644 --- a/lbrynet/core/client/DHTPeerFinder.py +++ b/lbrynet/core/client/DHTPeerFinder.py @@ -2,7 +2,9 @@ import binascii import logging from zope.interface import implements +from twisted.internet import defer, reactor from lbrynet.interfaces import IPeerFinder +from lbrynet.core.utils import short_hash log = logging.getLogger(__name__) @@ -34,21 +36,29 @@ class DHTPeerFinder(object): def _manage_peers(self): pass - def find_peers_for_blob(self, blob_hash): + @defer.inlineCallbacks + def find_peers_for_blob(self, blob_hash, timeout=None): + def _trigger_timeout(): + if not finished_deferred.called: + log.warning("Peer search for %s timed out", short_hash(blob_hash)) + finished_deferred.cancel() + bin_hash = binascii.unhexlify(blob_hash) + finished_deferred = self.dht_node.getPeersForBlob(bin_hash) - def filter_peers(peer_list): - peers = set(peer_list) - good_peers = [] - for host, port in peers: - peer = self.peer_manager.get_peer(host, port) - if peer.is_available() is True: - good_peers.append(peer) - return good_peers + if timeout is not None: + reactor.callLater(timeout, _trigger_timeout) - d = self.dht_node.getPeersForBlob(bin_hash) - d.addCallback(filter_peers) - return d + peer_list = yield finished_deferred + + peers = set(peer_list) + good_peers = [] + for host, port in peers: + peer = self.peer_manager.get_peer(host, port) + if peer.is_available() is True: + good_peers.append(peer) + + defer.returnValue(good_peers) def get_most_popular_hashes(self, num_to_return): return self.dht_node.get_most_popular_hashes(num_to_return) diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index 2d08f25c2..a47985bd8 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -150,7 +150,7 @@ class BlobRequestHandler(object): if read_handle is not None: self.currently_uploading = blob self.read_handle = read_handle - log.info("Sending %s to client", str(blob)) + log.info("Sending %s to %s", str(blob), self.peer) response_fields['blob_hash'] = blob.blob_hash response_fields['length'] = blob.length response['incoming_blob'] = response_fields diff --git a/lbrynet/lbryfile/EncryptedFileMetadataManager.py b/lbrynet/lbryfile/EncryptedFileMetadataManager.py index e9b2e7335..883b3a49d 100644 --- a/lbrynet/lbryfile/EncryptedFileMetadataManager.py +++ b/lbrynet/lbryfile/EncryptedFileMetadataManager.py @@ -4,7 +4,7 @@ import os from twisted.internet import defer from twisted.python.failure import Failure from twisted.enterprise import adbapi -from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError +from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -93,6 +93,9 @@ class DBEncryptedFileMetadataManager(object): def get_sd_blob_hashes_for_stream(self, stream_hash): return self._get_sd_blob_hashes_for_stream(stream_hash) + def get_stream_hash_for_sd_hash(self, sd_hash): + return self._get_stream_hash_for_sd_blob_hash(sd_hash) + def _open_db(self): # check_same_thread=False is solely to quiet a spurious error that appears to be due # to a bug in twisted, where the connection is closed by a different thread than the @@ -131,7 +134,7 @@ class DBEncryptedFileMetadataManager(object): d = self.db_conn.runQuery( "select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) d.addCallback( - lambda result: result[0][0] if result else Failure(NoSuchStreamHashError(stream_hash))) + lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash))) def do_delete(transaction, s_h): transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) @@ -166,7 +169,7 @@ class DBEncryptedFileMetadataManager(object): if res: return res[0] else: - raise NoSuchStreamHashError(stream_hash) + raise NoSuchStreamHash(stream_hash) d = self.db_conn.runQuery( "select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?", @@ -257,6 +260,20 @@ class DBEncryptedFileMetadataManager(object): d.addCallback(lambda results: [r[0] for r in results]) return d + @rerun_if_locked + def _get_stream_hash_for_sd_blob_hash(self, sd_blob_hash): + def _handle_result(result): + if not result: + raise NoSuchSDHash(sd_blob_hash) + return result[0][0] + + log.debug("Looking up sd blob hashes for sd blob hash %s", str(sd_blob_hash)) + d = self.db_conn.runQuery( + "select stream_hash from lbry_file_descriptors where sd_blob_hash = ?", + (sd_blob_hash,)) + d.addCallback(_handle_result) + return d + class TempEncryptedFileMetadataManager(object): def __init__(self): diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 3b3f27a39..ec2110807 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -7,7 +7,7 @@ from zope.interface import implements from twisted.internet import defer from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager -from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHashError +from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHash from lbrynet.core.utils import short_hash from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver @@ -111,7 +111,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): if sd_hash: self.sd_hash = sd_hash[0] else: - raise NoSuchStreamHashError(self.stream_hash) + raise NoSuchStreamHash(self.stream_hash) stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) if stream_metadata: name, txid, nout = stream_metadata diff --git a/lbrynet/lbrylive/LiveStreamMetadataManager.py b/lbrynet/lbrylive/LiveStreamMetadataManager.py index 76bb6f906..b809148cd 100644 --- a/lbrynet/lbrylive/LiveStreamMetadataManager.py +++ b/lbrynet/lbrylive/LiveStreamMetadataManager.py @@ -7,7 +7,7 @@ import sqlite3 from twisted.internet import defer from twisted.python.failure import Failure from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier -from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError +from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -150,7 +150,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier): def _delete_stream(self, stream_hash): d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) - d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) + d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHash(stream_hash))) def do_delete(transaction, s_h): transaction.execute("delete from live_streams where stream_hash = ?", (s_h,)) @@ -183,7 +183,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier): def _get_stream_info(self, stream_hash): d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?", (stream_hash,)) - d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) + d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHash(stream_hash))) return d @rerun_if_locked diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 07d9c0f3f..2589d3887 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -10,6 +10,7 @@ import simplejson as json import textwrap from requests import exceptions as requests_exceptions from decimal import Decimal +import random from twisted.web import server from twisted.internet import defer, threads, error, reactor, task @@ -28,6 +29,7 @@ from lbrynet.metadata.Metadata import verify_name_characters from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.lbryfile.StreamDescriptor import save_sd_info from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager @@ -36,16 +38,17 @@ from lbrynet.lbrynet_daemon.Downloader import GetStream from lbrynet.lbrynet_daemon.Publisher import Publisher from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer +from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core import log_support, utils, file_utils from lbrynet.core import system_info from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob -from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader from lbrynet.core.Session import Session from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash +from lbrynet.core.Error import NoSuchStreamHash log = logging.getLogger(__name__) @@ -285,6 +288,7 @@ class Daemon(AuthJSONRPCServer): self.announced_startup = True self.startup_status = STARTUP_STAGES[5] log.info("Started lbrynet-daemon") + log.info("%i blobs in manager", len(self.session.blob_manager.blobs)) if self.first_run: d = self._upload_log(log_type="first_run") @@ -292,7 +296,7 @@ class Daemon(AuthJSONRPCServer): d = self._upload_log(exclude_previous=True, log_type="start") else: d = defer.succeed(None) - + d.addCallback(lambda _: self.session.blob_manager.get_all_verified_blobs()) d.addCallback(lambda _: _announce()) return d @@ -734,26 +738,78 @@ class Daemon(AuthJSONRPCServer): EncryptedFileStreamType, file_opener_factory) return defer.succeed(None) - def _download_sd_blob(self, sd_hash, timeout=None): - timeout = timeout if timeout is not None else conf.settings['sd_download_timeout'] + def _download_sd_blob(self, sd_blob_hash, rate_manager=None, timeout=None): + """ + Download a sd blob and register it with the stream info manager + Use this when downloading a sd blob as part of a stream download - def cb(result): - if not r.called: - r.callback(result) + :param sd_blob_hash (str): sd blob hash + :param rate_manager (PaymentRateManager), optional: the payment rate manager to use, + defaults to session.payment_rate_manager + :param timeout (int): sd blob timeout + + :return: decoded sd blob + """ + timeout = timeout if timeout is not None else conf.settings['sd_download_timeout'] + rate_manager = rate_manager or self.session.payment_rate_manager + + def cb(sd_blob): + if not finished_d.called: + finished_d.callback(sd_blob) def eb(): - if not r.called: - log.error("sd blob download timed out: %s", sd_hash) - r.errback(Exception("sd timeout")) + if not finished_d.called: + finished_d.errback(Exception("Blob (%s) download timed out" % + sd_blob_hash[:SHORT_ID_LEN])) + + def save_sd_blob(sd_blob): + d = defer.succeed(read_sd_blob(sd_blob)) + d.addCallback(lambda decoded: save_sd_info(self.stream_info_manager, decoded)) + d.addCallback(self.stream_info_manager.save_sd_blob_hash_to_stream, sd_blob_hash) + d.addCallback(lambda _: sd_blob) + return d + + def read_sd_blob(sd_blob): + sd_blob_file = sd_blob.open_for_reading() + decoded_sd_blob = json.loads(sd_blob_file.read()) + sd_blob.close_read_handle(sd_blob_file) + return decoded_sd_blob + + finished_d = defer.Deferred() + finished_d.addCallback(save_sd_blob) - r = defer.Deferred(None) reactor.callLater(timeout, eb) - d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager) - d.addErrback(log.fail(), "Error downloading sd blob: %s", sd_hash) - d.addCallback(BlobStreamDescriptorReader) - d.addCallback(lambda blob: blob.get_info()) + d = download_sd_blob(self.session, sd_blob_hash, rate_manager) d.addCallback(cb) - return r + return finished_d + + def _download_blob(self, blob_hash, rate_manager=None, timeout=None): + """ + Download a blob + + :param blob_hash (str): blob hash + :param rate_manager (PaymentRateManager), optional: the payment rate manager to use, + defaults to session.payment_rate_manager + :param timeout (int): blob timeout + :return: BlobFile + """ + + def cb(blob): + if not finished_d.called: + finished_d.callback(blob) + + def eb(): + if not finished_d.called: + finished_d.errback(Exception("Blob (%s) download timed out" % + blob_hash[:SHORT_ID_LEN])) + + rate_manager = rate_manager or self.session.payment_rate_manager + timeout = timeout or 30 + finished_d = defer.Deferred(None) + reactor.callLater(timeout, eb) + d = download_sd_blob(self.session, blob_hash, rate_manager) + d.addCallback(cb) + return finished_d @defer.inlineCallbacks def _download_name(self, name, timeout=None, download_directory=None, @@ -1016,6 +1072,26 @@ class Daemon(AuthJSONRPCServer): ]) return d + def get_blobs_for_stream_hash(self, stream_hash): + def _iter_blobs(blob_hashes): + for blob_hash, blob_num, blob_iv, blob_length in blob_hashes: + if blob_hash: + yield self.session.blob_manager.get_blob(blob_hash, length=blob_length) + + def _get_blobs(blob_hashes): + dl = defer.DeferredList(list(_iter_blobs(blob_hashes)), consumeErrors=True) + dl.addCallback(lambda blobs: [blob[1] for blob in blobs if blob[0]]) + return dl + + d = self.stream_info_manager.get_blobs_for_stream(stream_hash) + d.addCallback(_get_blobs) + return d + + def get_blobs_for_sd_hash(self, sd_hash): + d = self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + d.addCallback(self.get_blobs_for_stream_hash) + return d + ############################################################################ # # # JSON-RPC API methods start here # @@ -1306,7 +1382,6 @@ class Daemon(AuthJSONRPCServer): ) return self._render_response(textwrap.dedent(fn.__doc__)) - def jsonrpc_commands(self): """ Return a list of available commands @@ -2000,28 +2075,94 @@ class Daemon(AuthJSONRPCServer): @AuthJSONRPCServer.auth_required def jsonrpc_download_descriptor(self, **kwargs): """ - DEPRECATED. Use `blob_get` instead + DEPRECATED. Use `descriptor_get` instead """ - return self.jsonrpc_blob_get(**kwargs) + return self.jsonrpc_descriptor_get(**kwargs) @AuthJSONRPCServer.auth_required - def jsonrpc_blob_get(self, sd_hash, timeout=None): + @defer.inlineCallbacks + def jsonrpc_descriptor_get(self, sd_hash, timeout=None, payment_rate_manager=None): """ Download and return a sd blob Args: sd_hash timeout (optional) + payment_rate_manager (optional): if not given the default payment rate manager + will be used. supported alternative rate managers: + only-free + Returns - sd blob, dict + Success/Fail message or decoded data """ - if timeout is None: - timeout = conf.settings['sd_download_timeout'] - d = self._download_sd_blob(sd_hash, timeout) - d.addCallbacks( - lambda r: self._render_response(r), - lambda _: self._render_response(False)) - return d + + payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager) + decoded_sd_blob = yield self._download_sd_blob(sd_hash, payment_rate_manager, + timeout=timeout) + result = yield self._render_response(decoded_sd_blob) + defer.returnValue(result) + + @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks + def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): + """ + Download and return a blob + + Args: + blob_hash + timeout (optional) + encoding (optional): by default no attempt at decoding is made + can be set to one of the following decoders: + json + payment_rate_manager (optional): if not given the default payment rate manager + will be used. supported alternative rate managers: + only-free + + Returns + Success/Fail message or decoded data + """ + + decoders = { + 'json': json.loads + } + + timeout = timeout or 30 + payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager) + blob = yield self._download_blob(blob_hash, rate_manager=payment_rate_manager, + timeout=timeout) + if encoding and encoding in decoders: + blob_file = blob.open_for_reading() + result = decoders[encoding](blob_file.read()) + blob.close_read_handle(blob_file) + else: + result = "Downloaded blob %s" % blob_hash + + response = yield self._render_response(result) + defer.returnValue(response) + + @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks + def jsonrpc_blob_delete(self, blob_hash): + """ + Delete a blob + + Args: + blob_hash + Returns: + Success/fail message + """ + + if blob_hash not in self.session.blob_manager.blobs: + response = yield self._render_response("Don't have that blob") + defer.returnValue(response) + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(blob_hash) + yield self.stream_info_manager.delete_stream(stream_hash) + except Exception as err: + pass + yield self.session.blob_manager.delete_blobs([blob_hash]) + response = yield self._render_response("Deleted %s" % blob_hash) + defer.returnValue(response) def jsonrpc_get_nametrie(self): """ @@ -2156,17 +2297,20 @@ class Daemon(AuthJSONRPCServer): """ return self.jsonrpc_peer_list(blob_hash) - def jsonrpc_peer_list(self, blob_hash): + def jsonrpc_peer_list(self, blob_hash, timeout=None): """ Get peers for blob hash Args: 'blob_hash': blob hash + 'timeout' (int, optional): peer search timeout Returns: List of contacts """ - d = self.session.peer_finder.find_peers_for_blob(blob_hash) + timeout = timeout or conf.settings['peer_search_timeout'] + + d = self.session.peer_finder.find_peers_for_blob(blob_hash, timeout=timeout) d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) d.addCallback(lambda r: self._render_response(r)) return d @@ -2214,19 +2358,54 @@ class Daemon(AuthJSONRPCServer): """ return self.jsonrpc_blob_list() - def jsonrpc_blob_list(self): + @defer.inlineCallbacks + def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, + finished=None, page_size=None, page=None): """ - Returns all blob hashes + Returns blob hashes, if not given filters returns all blobs known by the blob manager Args: - None + uri (str, optional): filter by blobs in stream for winning claim + stream_hash (str, optional): filter by blobs in given stream hash + sd_hash (str, optional): filter by blobs in given sd hash + needed (bool, optional): only return needed blobs + finished (bool, optional): only return finished blobs + page_size (int, optional): limit number of results returned + page (int, optional): filter to page x of [page_size] results Returns: list of blob hashes """ - d = self.session.blob_manager.get_all_verified_blobs() - d.addCallback(lambda r: self._render_response(r)) - return d + if uri: + metadata = yield self._resolve_name(uri) + sd_hash = get_sd_hash(metadata) + blobs = yield self.get_blobs_for_sd_hash(sd_hash) + elif stream_hash: + try: + blobs = yield self.get_blobs_for_stream_hash(stream_hash) + except NoSuchStreamHash: + blobs = [] + elif sd_hash: + try: + blobs = yield self.get_blobs_for_sd_hash(sd_hash) + except NoSuchSDHash: + blobs = [] + else: + blobs = self.session.blob_manager.blobs.itervalues() + + if needed: + blobs = [blob for blob in blobs if not blob.is_validated()] + if finished: + blobs = [blob for blob in blobs if blob.is_validated()] + + blob_hashes = [blob.blob_hash for blob in blobs] + page_size = page_size or len(blob_hashes) + page = page or 0 + start_index = page * page_size + stop_index = start_index + page_size + blob_hashes_for_return = blob_hashes[start_index:stop_index] + response = yield self._render_response(blob_hashes_for_return) + defer.returnValue(response) def jsonrpc_reflect_all_blobs(self): """ @@ -2262,12 +2441,15 @@ class Daemon(AuthJSONRPCServer): d = self._render_response(self.session.blob_tracker.last_mean_availability) return d - def jsonrpc_get_availability(self, name): + @defer.inlineCallbacks + def jsonrpc_get_availability(self, name, sd_timeout=None, peer_timeout=None): """ Get stream availability for a winning claim Arg: name (str): lbry uri + sd_timeout (int, optional): sd blob download timeout + peer_timeout (int, optional): how long to look for peers Returns: peers per blob / total blobs @@ -2283,17 +2465,38 @@ class Daemon(AuthJSONRPCServer): else: return 0.0 - d = self._resolve_name(name, force_refresh=True) - d.addCallback(get_sd_hash) - d.addCallback(self._download_sd_blob) - d.addCallbacks( - lambda descriptor: [blob.get('blob_hash') for blob in descriptor['blobs']], - lambda _: []) - d.addCallback(self.session.blob_tracker.get_availability_for_blobs) - d.addCallback(_get_mean) - d.addCallback(lambda result: self._render_response(result)) + def read_sd_blob(sd_blob): + sd_blob_file = sd_blob.open_for_reading() + decoded_sd_blob = json.loads(sd_blob_file.read()) + sd_blob.close_read_handle(sd_blob_file) + return decoded_sd_blob - return d + metadata = yield self._resolve_name(name) + sd_hash = get_sd_hash(metadata) + sd_timeout = sd_timeout or conf.settings['sd_download_timeout'] + peer_timeout = peer_timeout or conf.settings['peer_search_timeout'] + blobs = [] + try: + blobs = yield self.get_blobs_for_sd_hash(sd_hash) + need_sd_blob = False + log.info("Already have sd blob") + except NoSuchSDHash: + need_sd_blob = True + log.info("Need sd blob") + blob_hashes = [blob.blob_hash for blob in blobs] + if need_sd_blob: + # we don't want to use self._download_descriptor here because it would create a stream + sd_blob = yield self._download_blob(sd_hash, timeout=sd_timeout) + decoded = read_sd_blob(sd_blob) + blob_hashes = [blob.get("blob_hash") for blob in decoded['blobs'] + if blob.get("blob_hash")] + sample = random.sample(blob_hashes, min(len(blob_hashes), 5)) + log.info("check peers for %i of %i blobs in stream", len(sample), len(blob_hashes)) + availabilities = yield self.session.blob_tracker.get_availability_for_blobs(sample, + peer_timeout) + mean_availability = _get_mean(availabilities) + response = yield self._render_response(mean_availability) + defer.returnValue(response) def jsonrpc_get_start_notice(self): """ @@ -2670,3 +2873,14 @@ def get_lbry_file_search_value(search_fields): if value: return searchtype, value raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) + + +def get_blob_payment_rate_manager(session, payment_rate_manager=None): + if payment_rate_manager: + rate_managers = { + 'only-free': OnlyFreePaymentsManager() + } + if payment_rate_manager in rate_managers: + payment_rate_manager = rate_managers[payment_rate_manager] + log.info("Downloading blob with rate manager: %s", payment_rate_manager) + return payment_rate_manager or session.payment_rate_manager