Merge pull request #469 from lbryio/blob-commands

Blob commands
This commit is contained in:
Jack Robison 2017-02-16 12:18:24 -05:00 committed by GitHub
commit 6e310517e0
13 changed files with 390 additions and 76 deletions

View file

@ -16,6 +16,12 @@ at anytime.
* Get lbry files with pending claims * Get lbry files with pending claims
* Add better logging to help track down [#478](https://github.com/lbryio/lbry/issues/478) * Add better logging to help track down [#478](https://github.com/lbryio/lbry/issues/478)
* Catch UnknownNameErrors when resolving a name. [#479](https://github.com/lbryio/lbry/issues/479) * Catch UnknownNameErrors when resolving a name. [#479](https://github.com/lbryio/lbry/issues/479)
### Changed
* Add blob_get, descriptor_get, and blob_delete
* Add filter keyword args to blob_list
* Refactor get_availability
* Add optional peer search timeout, add peer_search_timeout setting
## [0.8.3rc3] - 2017-02-14 ## [0.8.3rc3] - 2017-02-14

View file

@ -197,6 +197,7 @@ ADJUSTABLE_SETTINGS = {
'run_on_startup': (bool, False), 'run_on_startup': (bool, False),
'run_reflector_server': (bool, False), 'run_reflector_server': (bool, False),
'sd_download_timeout': (int, 3), 'sd_download_timeout': (int, 3),
'peer_search_timeout': (int, 3),
'search_servers': (list, ['lighthouse1.lbry.io:50005']), 'search_servers': (list, ['lighthouse1.lbry.io:50005']),
'search_timeout': (float, 5.0), 'search_timeout': (float, 5.0),
'startup_scripts': (list, []), 'startup_scripts': (list, []),

View file

@ -38,17 +38,17 @@ class BlobAvailabilityTracker(object):
if self._check_mine.running: if self._check_mine.running:
self._check_mine.stop() self._check_mine.stop()
def get_blob_availability(self, blob): def get_blob_availability(self, blob, timeout=None):
def _get_peer_count(peers): def _get_peer_count(peers):
have_blob = sum(1 for peer in peers if peer.is_available()) have_blob = sum(1 for peer in peers if peer.is_available())
return {blob: have_blob} return {blob: have_blob}
d = self._peer_finder.find_peers_for_blob(blob) d = self._peer_finder.find_peers_for_blob(blob, timeout)
d.addCallback(_get_peer_count) d.addCallback(_get_peer_count)
return d return d
def get_availability_for_blobs(self, blobs): def get_availability_for_blobs(self, blobs, timeout=None):
dl = [self.get_blob_availability(blob) for blob in blobs if blob] dl = [self.get_blob_availability(blob, timeout) for blob in blobs if blob]
d = defer.DeferredList(dl) d = defer.DeferredList(dl)
d.addCallback(lambda results: [val for success, val in results if success]) d.addCallback(lambda results: [val for success, val in results if success])
return d return d
@ -57,7 +57,6 @@ class BlobAvailabilityTracker(object):
def last_mean_availability(self): def last_mean_availability(self):
return max(Decimal(0.01), self._last_mean_availability) return max(Decimal(0.01), self._last_mean_availability)
def _update_peers_for_blob(self, blob): def _update_peers_for_blob(self, blob):
def _save_peer_info(blob_hash, peers): def _save_peer_info(blob_hash, peers):
v = {blob_hash: peers} v = {blob_hash: peers}

View file

@ -80,7 +80,7 @@ class NoSuchBlobError(Exception):
pass pass
class NoSuchStreamHashError(Exception): class NoSuchStreamHash(Exception):
pass pass

View file

@ -1,4 +1,4 @@
from lbrynet.core.Strategy import get_default_strategy from lbrynet.core.Strategy import get_default_strategy, OnlyFreeStrategy
from lbrynet import conf from lbrynet import conf
from decimal import Decimal from decimal import Decimal
@ -81,3 +81,46 @@ class NegotiatedPaymentRateManager(object):
return (offer.is_too_low and return (offer.is_too_low and
round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5)) round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5))
return False return False
class OnlyFreePaymentsManager(object):
def __init__(self, **kwargs):
"""
A payment rate manager that will only ever accept and offer a rate of 0.0,
Used for testing
"""
self.base = BasePaymentRateManager(0.0, 0.0)
self.points_paid = 0.0
self.generous = True
self.strategy = OnlyFreeStrategy()
def get_rate_blob_data(self, peer, blobs):
response = self.strategy.make_offer(peer, blobs)
return response.rate
def accept_rate_blob_data(self, peer, blobs, offer):
offer = self.strategy.respond_to_offer(offer, peer, blobs)
self.strategy.update_accepted_offers(peer, offer)
return offer.is_accepted
def reply_to_offer(self, peer, blobs, offer):
reply = self.strategy.respond_to_offer(offer, peer, blobs)
self.strategy.update_accepted_offers(peer, reply)
return reply
def get_rate_for_peer(self, peer):
return self.strategy.accepted_offers.get(peer, False)
def record_points_paid(self, amount):
self.points_paid += amount
def record_offer_reply(self, peer, offer):
self.strategy.update_accepted_offers(peer, offer)
def price_limit_reached(self, peer):
if peer in self.strategy.pending_sent_offers:
offer = self.strategy.pending_sent_offers[peer]
if offer.rate > 0.0:
return True
return False

View file

@ -9,6 +9,14 @@ def get_default_price_model(blob_tracker, base_price, **kwargs):
return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs) return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs)
class ZeroPrice(object):
def __init__(self):
self.base_price = 0.0
def calculate_price(self, blob):
return 0.0
class MeanAvailabilityWeightedPrice(object): class MeanAvailabilityWeightedPrice(object):
"""Calculate mean-blob-availability and stream-position weighted price for a blob """Calculate mean-blob-availability and stream-position weighted price for a blob

View file

@ -3,7 +3,7 @@ from decimal import Decimal
from lbrynet import conf from lbrynet import conf
from lbrynet.interfaces import INegotiationStrategy from lbrynet.interfaces import INegotiationStrategy
from lbrynet.core.Offer import Offer from lbrynet.core.Offer import Offer
from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice from lbrynet.core.PriceModel import MeanAvailabilityWeightedPrice, ZeroPrice
def get_default_strategy(blob_tracker, **kwargs): def get_default_strategy(blob_tracker, **kwargs):
@ -134,3 +134,19 @@ class BasicAvailabilityWeightedStrategy(Strategy):
with_premium = self._premium(rate, offer_count) with_premium = self._premium(rate, offer_count)
rounded_price = round(with_premium, 5) rounded_price = round(with_premium, 5)
return self._bounded_price(rounded_price) return self._bounded_price(rounded_price)
class OnlyFreeStrategy(Strategy):
implementer(INegotiationStrategy)
def __init__(self, *args, **kwargs):
price_model = ZeroPrice()
Strategy.__init__(self, price_model, 0.0, 0.0, True)
def _get_mean_rate(self, rates):
return 0.0
def _get_response_rate(self, rates, offer_count):
return 0.0
def _make_rate_offer(self, rates, offer_count):
return 0.0

View file

@ -2,7 +2,9 @@ import binascii
import logging import logging
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer, reactor
from lbrynet.interfaces import IPeerFinder from lbrynet.interfaces import IPeerFinder
from lbrynet.core.utils import short_hash
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -34,21 +36,29 @@ class DHTPeerFinder(object):
def _manage_peers(self): def _manage_peers(self):
pass pass
def find_peers_for_blob(self, blob_hash): @defer.inlineCallbacks
def find_peers_for_blob(self, blob_hash, timeout=None):
def _trigger_timeout():
if not finished_deferred.called:
log.warning("Peer search for %s timed out", short_hash(blob_hash))
finished_deferred.cancel()
bin_hash = binascii.unhexlify(blob_hash) bin_hash = binascii.unhexlify(blob_hash)
finished_deferred = self.dht_node.getPeersForBlob(bin_hash)
def filter_peers(peer_list): if timeout is not None:
peers = set(peer_list) reactor.callLater(timeout, _trigger_timeout)
good_peers = []
for host, port in peers:
peer = self.peer_manager.get_peer(host, port)
if peer.is_available() is True:
good_peers.append(peer)
return good_peers
d = self.dht_node.getPeersForBlob(bin_hash) peer_list = yield finished_deferred
d.addCallback(filter_peers)
return d peers = set(peer_list)
good_peers = []
for host, port in peers:
peer = self.peer_manager.get_peer(host, port)
if peer.is_available() is True:
good_peers.append(peer)
defer.returnValue(good_peers)
def get_most_popular_hashes(self, num_to_return): def get_most_popular_hashes(self, num_to_return):
return self.dht_node.get_most_popular_hashes(num_to_return) return self.dht_node.get_most_popular_hashes(num_to_return)

View file

@ -150,7 +150,7 @@ class BlobRequestHandler(object):
if read_handle is not None: if read_handle is not None:
self.currently_uploading = blob self.currently_uploading = blob
self.read_handle = read_handle self.read_handle = read_handle
log.info("Sending %s to client", str(blob)) log.info("Sending %s to %s", str(blob), self.peer)
response_fields['blob_hash'] = blob.blob_hash response_fields['blob_hash'] = blob.blob_hash
response_fields['length'] = blob.length response_fields['length'] = blob.length
response['incoming_blob'] = response_fields response['incoming_blob'] = response_fields

View file

@ -4,7 +4,7 @@ import os
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -93,6 +93,9 @@ class DBEncryptedFileMetadataManager(object):
def get_sd_blob_hashes_for_stream(self, stream_hash): def get_sd_blob_hashes_for_stream(self, stream_hash):
return self._get_sd_blob_hashes_for_stream(stream_hash) return self._get_sd_blob_hashes_for_stream(stream_hash)
def get_stream_hash_for_sd_hash(self, sd_hash):
return self._get_stream_hash_for_sd_blob_hash(sd_hash)
def _open_db(self): def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due # check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the # to a bug in twisted, where the connection is closed by a different thread than the
@ -131,7 +134,7 @@ class DBEncryptedFileMetadataManager(object):
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) "select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback( d.addCallback(
lambda result: result[0][0] if result else Failure(NoSuchStreamHashError(stream_hash))) lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, s_h): def do_delete(transaction, s_h):
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
@ -166,7 +169,7 @@ class DBEncryptedFileMetadataManager(object):
if res: if res:
return res[0] return res[0]
else: else:
raise NoSuchStreamHashError(stream_hash) raise NoSuchStreamHash(stream_hash)
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?", "select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?",
@ -257,6 +260,20 @@ class DBEncryptedFileMetadataManager(object):
d.addCallback(lambda results: [r[0] for r in results]) d.addCallback(lambda results: [r[0] for r in results])
return d return d
@rerun_if_locked
def _get_stream_hash_for_sd_blob_hash(self, sd_blob_hash):
def _handle_result(result):
if not result:
raise NoSuchSDHash(sd_blob_hash)
return result[0][0]
log.debug("Looking up sd blob hashes for sd blob hash %s", str(sd_blob_hash))
d = self.db_conn.runQuery(
"select stream_hash from lbry_file_descriptors where sd_blob_hash = ?",
(sd_blob_hash,))
d.addCallback(_handle_result)
return d
class TempEncryptedFileMetadataManager(object): class TempEncryptedFileMetadataManager(object):
def __init__(self): def __init__(self):

View file

@ -7,7 +7,7 @@ from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHashError from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHash
from lbrynet.core.utils import short_hash from lbrynet.core.utils import short_hash
from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver
@ -111,7 +111,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
if sd_hash: if sd_hash:
self.sd_hash = sd_hash[0] self.sd_hash = sd_hash[0]
else: else:
raise NoSuchStreamHashError(self.stream_hash) raise NoSuchStreamHash(self.stream_hash)
stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
if stream_metadata: if stream_metadata:
name, txid, nout = stream_metadata name, txid, nout = stream_metadata

View file

@ -7,7 +7,7 @@ import sqlite3
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -150,7 +150,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier):
def _delete_stream(self, stream_hash): def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, s_h): def do_delete(transaction, s_h):
transaction.execute("delete from live_streams where stream_hash = ?", (s_h,)) transaction.execute("delete from live_streams where stream_hash = ?", (s_h,))
@ -183,7 +183,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier):
def _get_stream_info(self, stream_hash): def _get_stream_info(self, stream_hash):
d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?", d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?",
(stream_hash,)) (stream_hash,))
d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHash(stream_hash)))
return d return d
@rerun_if_locked @rerun_if_locked

View file

@ -10,6 +10,7 @@ import simplejson as json
import textwrap import textwrap
from requests import exceptions as requests_exceptions from requests import exceptions as requests_exceptions
from decimal import Decimal from decimal import Decimal
import random
from twisted.web import server from twisted.web import server
from twisted.internet import defer, threads, error, reactor, task from twisted.internet import defer, threads, error, reactor, task
@ -28,6 +29,7 @@ from lbrynet.metadata.Metadata import verify_name_characters
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
@ -36,16 +38,17 @@ from lbrynet.lbrynet_daemon.Downloader import GetStream
from lbrynet.lbrynet_daemon.Publisher import Publisher from lbrynet.lbrynet_daemon.Publisher import Publisher
from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core import log_support, utils, file_utils from lbrynet.core import log_support, utils, file_utils
from lbrynet.core import system_info from lbrynet.core import system_info
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage
from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -285,6 +288,7 @@ class Daemon(AuthJSONRPCServer):
self.announced_startup = True self.announced_startup = True
self.startup_status = STARTUP_STAGES[5] self.startup_status = STARTUP_STAGES[5]
log.info("Started lbrynet-daemon") log.info("Started lbrynet-daemon")
log.info("%i blobs in manager", len(self.session.blob_manager.blobs))
if self.first_run: if self.first_run:
d = self._upload_log(log_type="first_run") d = self._upload_log(log_type="first_run")
@ -292,7 +296,7 @@ class Daemon(AuthJSONRPCServer):
d = self._upload_log(exclude_previous=True, log_type="start") d = self._upload_log(exclude_previous=True, log_type="start")
else: else:
d = defer.succeed(None) d = defer.succeed(None)
d.addCallback(lambda _: self.session.blob_manager.get_all_verified_blobs())
d.addCallback(lambda _: _announce()) d.addCallback(lambda _: _announce())
return d return d
@ -734,26 +738,78 @@ class Daemon(AuthJSONRPCServer):
EncryptedFileStreamType, file_opener_factory) EncryptedFileStreamType, file_opener_factory)
return defer.succeed(None) return defer.succeed(None)
def _download_sd_blob(self, sd_hash, timeout=None): def _download_sd_blob(self, sd_blob_hash, rate_manager=None, timeout=None):
timeout = timeout if timeout is not None else conf.settings['sd_download_timeout'] """
Download a sd blob and register it with the stream info manager
Use this when downloading a sd blob as part of a stream download
def cb(result): :param sd_blob_hash (str): sd blob hash
if not r.called: :param rate_manager (PaymentRateManager), optional: the payment rate manager to use,
r.callback(result) defaults to session.payment_rate_manager
:param timeout (int): sd blob timeout
:return: decoded sd blob
"""
timeout = timeout if timeout is not None else conf.settings['sd_download_timeout']
rate_manager = rate_manager or self.session.payment_rate_manager
def cb(sd_blob):
if not finished_d.called:
finished_d.callback(sd_blob)
def eb(): def eb():
if not r.called: if not finished_d.called:
log.error("sd blob download timed out: %s", sd_hash) finished_d.errback(Exception("Blob (%s) download timed out" %
r.errback(Exception("sd timeout")) sd_blob_hash[:SHORT_ID_LEN]))
def save_sd_blob(sd_blob):
d = defer.succeed(read_sd_blob(sd_blob))
d.addCallback(lambda decoded: save_sd_info(self.stream_info_manager, decoded))
d.addCallback(self.stream_info_manager.save_sd_blob_hash_to_stream, sd_blob_hash)
d.addCallback(lambda _: sd_blob)
return d
def read_sd_blob(sd_blob):
sd_blob_file = sd_blob.open_for_reading()
decoded_sd_blob = json.loads(sd_blob_file.read())
sd_blob.close_read_handle(sd_blob_file)
return decoded_sd_blob
finished_d = defer.Deferred()
finished_d.addCallback(save_sd_blob)
r = defer.Deferred(None)
reactor.callLater(timeout, eb) reactor.callLater(timeout, eb)
d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager) d = download_sd_blob(self.session, sd_blob_hash, rate_manager)
d.addErrback(log.fail(), "Error downloading sd blob: %s", sd_hash)
d.addCallback(BlobStreamDescriptorReader)
d.addCallback(lambda blob: blob.get_info())
d.addCallback(cb) d.addCallback(cb)
return r return finished_d
def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
"""
Download a blob
:param blob_hash (str): blob hash
:param rate_manager (PaymentRateManager), optional: the payment rate manager to use,
defaults to session.payment_rate_manager
:param timeout (int): blob timeout
:return: BlobFile
"""
def cb(blob):
if not finished_d.called:
finished_d.callback(blob)
def eb():
if not finished_d.called:
finished_d.errback(Exception("Blob (%s) download timed out" %
blob_hash[:SHORT_ID_LEN]))
rate_manager = rate_manager or self.session.payment_rate_manager
timeout = timeout or 30
finished_d = defer.Deferred(None)
reactor.callLater(timeout, eb)
d = download_sd_blob(self.session, blob_hash, rate_manager)
d.addCallback(cb)
return finished_d
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, timeout=None, download_directory=None, def _download_name(self, name, timeout=None, download_directory=None,
@ -1016,6 +1072,26 @@ class Daemon(AuthJSONRPCServer):
]) ])
return d return d
def get_blobs_for_stream_hash(self, stream_hash):
def _iter_blobs(blob_hashes):
for blob_hash, blob_num, blob_iv, blob_length in blob_hashes:
if blob_hash:
yield self.session.blob_manager.get_blob(blob_hash, length=blob_length)
def _get_blobs(blob_hashes):
dl = defer.DeferredList(list(_iter_blobs(blob_hashes)), consumeErrors=True)
dl.addCallback(lambda blobs: [blob[1] for blob in blobs if blob[0]])
return dl
d = self.stream_info_manager.get_blobs_for_stream(stream_hash)
d.addCallback(_get_blobs)
return d
def get_blobs_for_sd_hash(self, sd_hash):
d = self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
d.addCallback(self.get_blobs_for_stream_hash)
return d
############################################################################ ############################################################################
# # # #
# JSON-RPC API methods start here # # JSON-RPC API methods start here #
@ -1306,7 +1382,6 @@ class Daemon(AuthJSONRPCServer):
) )
return self._render_response(textwrap.dedent(fn.__doc__)) return self._render_response(textwrap.dedent(fn.__doc__))
def jsonrpc_commands(self): def jsonrpc_commands(self):
""" """
Return a list of available commands Return a list of available commands
@ -2000,28 +2075,94 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_download_descriptor(self, **kwargs): def jsonrpc_download_descriptor(self, **kwargs):
""" """
DEPRECATED. Use `blob_get` instead DEPRECATED. Use `descriptor_get` instead
""" """
return self.jsonrpc_blob_get(**kwargs) return self.jsonrpc_descriptor_get(**kwargs)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_blob_get(self, sd_hash, timeout=None): @defer.inlineCallbacks
def jsonrpc_descriptor_get(self, sd_hash, timeout=None, payment_rate_manager=None):
""" """
Download and return a sd blob Download and return a sd blob
Args: Args:
sd_hash sd_hash
timeout (optional) timeout (optional)
payment_rate_manager (optional): if not given the default payment rate manager
will be used. supported alternative rate managers:
only-free
Returns Returns
sd blob, dict Success/Fail message or decoded data
""" """
if timeout is None:
timeout = conf.settings['sd_download_timeout'] payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager)
d = self._download_sd_blob(sd_hash, timeout) decoded_sd_blob = yield self._download_sd_blob(sd_hash, payment_rate_manager,
d.addCallbacks( timeout=timeout)
lambda r: self._render_response(r), result = yield self._render_response(decoded_sd_blob)
lambda _: self._render_response(False)) defer.returnValue(result)
return d
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None):
"""
Download and return a blob
Args:
blob_hash
timeout (optional)
encoding (optional): by default no attempt at decoding is made
can be set to one of the following decoders:
json
payment_rate_manager (optional): if not given the default payment rate manager
will be used. supported alternative rate managers:
only-free
Returns
Success/Fail message or decoded data
"""
decoders = {
'json': json.loads
}
timeout = timeout or 30
payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager)
blob = yield self._download_blob(blob_hash, rate_manager=payment_rate_manager,
timeout=timeout)
if encoding and encoding in decoders:
blob_file = blob.open_for_reading()
result = decoders[encoding](blob_file.read())
blob.close_read_handle(blob_file)
else:
result = "Downloaded blob %s" % blob_hash
response = yield self._render_response(result)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_blob_delete(self, blob_hash):
"""
Delete a blob
Args:
blob_hash
Returns:
Success/fail message
"""
if blob_hash not in self.session.blob_manager.blobs:
response = yield self._render_response("Don't have that blob")
defer.returnValue(response)
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(blob_hash)
yield self.stream_info_manager.delete_stream(stream_hash)
except Exception as err:
pass
yield self.session.blob_manager.delete_blobs([blob_hash])
response = yield self._render_response("Deleted %s" % blob_hash)
defer.returnValue(response)
def jsonrpc_get_nametrie(self): def jsonrpc_get_nametrie(self):
""" """
@ -2156,17 +2297,20 @@ class Daemon(AuthJSONRPCServer):
""" """
return self.jsonrpc_peer_list(blob_hash) return self.jsonrpc_peer_list(blob_hash)
def jsonrpc_peer_list(self, blob_hash): def jsonrpc_peer_list(self, blob_hash, timeout=None):
""" """
Get peers for blob hash Get peers for blob hash
Args: Args:
'blob_hash': blob hash 'blob_hash': blob hash
'timeout' (int, optional): peer search timeout
Returns: Returns:
List of contacts List of contacts
""" """
d = self.session.peer_finder.find_peers_for_blob(blob_hash) timeout = timeout or conf.settings['peer_search_timeout']
d = self.session.peer_finder.find_peers_for_blob(blob_hash, timeout=timeout)
d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r])
d.addCallback(lambda r: self._render_response(r)) d.addCallback(lambda r: self._render_response(r))
return d return d
@ -2214,19 +2358,54 @@ class Daemon(AuthJSONRPCServer):
""" """
return self.jsonrpc_blob_list() return self.jsonrpc_blob_list()
def jsonrpc_blob_list(self): @defer.inlineCallbacks
def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None,
finished=None, page_size=None, page=None):
""" """
Returns all blob hashes Returns blob hashes, if not given filters returns all blobs known by the blob manager
Args: Args:
None uri (str, optional): filter by blobs in stream for winning claim
stream_hash (str, optional): filter by blobs in given stream hash
sd_hash (str, optional): filter by blobs in given sd hash
needed (bool, optional): only return needed blobs
finished (bool, optional): only return finished blobs
page_size (int, optional): limit number of results returned
page (int, optional): filter to page x of [page_size] results
Returns: Returns:
list of blob hashes list of blob hashes
""" """
d = self.session.blob_manager.get_all_verified_blobs() if uri:
d.addCallback(lambda r: self._render_response(r)) metadata = yield self._resolve_name(uri)
return d sd_hash = get_sd_hash(metadata)
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
elif stream_hash:
try:
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
except NoSuchStreamHash:
blobs = []
elif sd_hash:
try:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
except NoSuchSDHash:
blobs = []
else:
blobs = self.session.blob_manager.blobs.itervalues()
if needed:
blobs = [blob for blob in blobs if not blob.is_validated()]
if finished:
blobs = [blob for blob in blobs if blob.is_validated()]
blob_hashes = [blob.blob_hash for blob in blobs]
page_size = page_size or len(blob_hashes)
page = page or 0
start_index = page * page_size
stop_index = start_index + page_size
blob_hashes_for_return = blob_hashes[start_index:stop_index]
response = yield self._render_response(blob_hashes_for_return)
defer.returnValue(response)
def jsonrpc_reflect_all_blobs(self): def jsonrpc_reflect_all_blobs(self):
""" """
@ -2262,12 +2441,15 @@ class Daemon(AuthJSONRPCServer):
d = self._render_response(self.session.blob_tracker.last_mean_availability) d = self._render_response(self.session.blob_tracker.last_mean_availability)
return d return d
def jsonrpc_get_availability(self, name): @defer.inlineCallbacks
def jsonrpc_get_availability(self, name, sd_timeout=None, peer_timeout=None):
""" """
Get stream availability for a winning claim Get stream availability for a winning claim
Arg: Arg:
name (str): lbry uri name (str): lbry uri
sd_timeout (int, optional): sd blob download timeout
peer_timeout (int, optional): how long to look for peers
Returns: Returns:
peers per blob / total blobs peers per blob / total blobs
@ -2283,17 +2465,38 @@ class Daemon(AuthJSONRPCServer):
else: else:
return 0.0 return 0.0
d = self._resolve_name(name, force_refresh=True) def read_sd_blob(sd_blob):
d.addCallback(get_sd_hash) sd_blob_file = sd_blob.open_for_reading()
d.addCallback(self._download_sd_blob) decoded_sd_blob = json.loads(sd_blob_file.read())
d.addCallbacks( sd_blob.close_read_handle(sd_blob_file)
lambda descriptor: [blob.get('blob_hash') for blob in descriptor['blobs']], return decoded_sd_blob
lambda _: [])
d.addCallback(self.session.blob_tracker.get_availability_for_blobs)
d.addCallback(_get_mean)
d.addCallback(lambda result: self._render_response(result))
return d metadata = yield self._resolve_name(name)
sd_hash = get_sd_hash(metadata)
sd_timeout = sd_timeout or conf.settings['sd_download_timeout']
peer_timeout = peer_timeout or conf.settings['peer_search_timeout']
blobs = []
try:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
need_sd_blob = False
log.info("Already have sd blob")
except NoSuchSDHash:
need_sd_blob = True
log.info("Need sd blob")
blob_hashes = [blob.blob_hash for blob in blobs]
if need_sd_blob:
# we don't want to use self._download_descriptor here because it would create a stream
sd_blob = yield self._download_blob(sd_hash, timeout=sd_timeout)
decoded = read_sd_blob(sd_blob)
blob_hashes = [blob.get("blob_hash") for blob in decoded['blobs']
if blob.get("blob_hash")]
sample = random.sample(blob_hashes, min(len(blob_hashes), 5))
log.info("check peers for %i of %i blobs in stream", len(sample), len(blob_hashes))
availabilities = yield self.session.blob_tracker.get_availability_for_blobs(sample,
peer_timeout)
mean_availability = _get_mean(availabilities)
response = yield self._render_response(mean_availability)
defer.returnValue(response)
def jsonrpc_get_start_notice(self): def jsonrpc_get_start_notice(self):
""" """
@ -2670,3 +2873,14 @@ def get_lbry_file_search_value(search_fields):
if value: if value:
return searchtype, value return searchtype, value
raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) raise NoValidSearch('{} is missing a valid search type'.format(search_fields))
def get_blob_payment_rate_manager(session, payment_rate_manager=None):
if payment_rate_manager:
rate_managers = {
'only-free': OnlyFreePaymentsManager()
}
if payment_rate_manager in rate_managers:
payment_rate_manager = rate_managers[payment_rate_manager]
log.info("Downloading blob with rate manager: %s", payment_rate_manager)
return payment_rate_manager or session.payment_rate_manager