From 3ddf8680fb902b99092d95357d12e7e45e43d0de Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 15 Feb 2017 23:39:17 -0500 Subject: [PATCH] blob commands refactor blob_get add descriptor_get add blob_delete refactor blob_list --- lbrynet/core/Error.py | 2 +- .../lbryfile/EncryptedFileMetadataManager.py | 23 +- .../EncryptedFileDownloader.py | 4 +- lbrynet/lbrylive/LiveStreamMetadataManager.py | 6 +- lbrynet/lbrynet_daemon/Daemon.py | 252 +++++++++++++++--- 5 files changed, 245 insertions(+), 42 deletions(-) diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 79a982740..cd7959235 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -80,7 +80,7 @@ class NoSuchBlobError(Exception): pass -class NoSuchStreamHashError(Exception): +class NoSuchStreamHash(Exception): pass diff --git a/lbrynet/lbryfile/EncryptedFileMetadataManager.py b/lbrynet/lbryfile/EncryptedFileMetadataManager.py index e9b2e7335..883b3a49d 100644 --- a/lbrynet/lbryfile/EncryptedFileMetadataManager.py +++ b/lbrynet/lbryfile/EncryptedFileMetadataManager.py @@ -4,7 +4,7 @@ import os from twisted.internet import defer from twisted.python.failure import Failure from twisted.enterprise import adbapi -from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError +from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -93,6 +93,9 @@ class DBEncryptedFileMetadataManager(object): def get_sd_blob_hashes_for_stream(self, stream_hash): return self._get_sd_blob_hashes_for_stream(stream_hash) + def get_stream_hash_for_sd_hash(self, sd_hash): + return self._get_stream_hash_for_sd_blob_hash(sd_hash) + def _open_db(self): # check_same_thread=False is solely to quiet a spurious error that appears to be due # to a bug in twisted, where the connection is closed by a different thread than the @@ -131,7 +134,7 @@ class DBEncryptedFileMetadataManager(object): d = self.db_conn.runQuery( "select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) d.addCallback( - lambda result: result[0][0] if result else Failure(NoSuchStreamHashError(stream_hash))) + lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash))) def do_delete(transaction, s_h): transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) @@ -166,7 +169,7 @@ class DBEncryptedFileMetadataManager(object): if res: return res[0] else: - raise NoSuchStreamHashError(stream_hash) + raise NoSuchStreamHash(stream_hash) d = self.db_conn.runQuery( "select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?", @@ -257,6 +260,20 @@ class DBEncryptedFileMetadataManager(object): d.addCallback(lambda results: [r[0] for r in results]) return d + @rerun_if_locked + def _get_stream_hash_for_sd_blob_hash(self, sd_blob_hash): + def _handle_result(result): + if not result: + raise NoSuchSDHash(sd_blob_hash) + return result[0][0] + + log.debug("Looking up sd blob hashes for sd blob hash %s", str(sd_blob_hash)) + d = self.db_conn.runQuery( + "select stream_hash from lbry_file_descriptors where sd_blob_hash = ?", + (sd_blob_hash,)) + d.addCallback(_handle_result) + return d + class TempEncryptedFileMetadataManager(object): def __init__(self): diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 3b3f27a39..ec2110807 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -7,7 +7,7 @@ from zope.interface import implements from twisted.internet import defer from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager -from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHashError +from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHash from lbrynet.core.utils import short_hash from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver @@ -111,7 +111,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): if sd_hash: self.sd_hash = sd_hash[0] else: - raise NoSuchStreamHashError(self.stream_hash) + raise NoSuchStreamHash(self.stream_hash) stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) if stream_metadata: name, txid, nout = stream_metadata diff --git a/lbrynet/lbrylive/LiveStreamMetadataManager.py b/lbrynet/lbrylive/LiveStreamMetadataManager.py index 76bb6f906..b809148cd 100644 --- a/lbrynet/lbrylive/LiveStreamMetadataManager.py +++ b/lbrynet/lbrylive/LiveStreamMetadataManager.py @@ -7,7 +7,7 @@ import sqlite3 from twisted.internet import defer from twisted.python.failure import Failure from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier -from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError +from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -150,7 +150,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier): def _delete_stream(self, stream_hash): d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) - d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) + d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHash(stream_hash))) def do_delete(transaction, s_h): transaction.execute("delete from live_streams where stream_hash = ?", (s_h,)) @@ -183,7 +183,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier): def _get_stream_info(self, stream_hash): d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?", (stream_hash,)) - d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) + d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHash(stream_hash))) return d @rerun_if_locked diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 1bf47a1a2..2589d3887 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -29,6 +29,7 @@ from lbrynet.metadata.Metadata import verify_name_characters from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.lbryfile.StreamDescriptor import save_sd_info from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager @@ -37,16 +38,17 @@ from lbrynet.lbrynet_daemon.Downloader import GetStream from lbrynet.lbrynet_daemon.Publisher import Publisher from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer +from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core import log_support, utils, file_utils from lbrynet.core import system_info from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob -from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader from lbrynet.core.Session import Session from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash +from lbrynet.core.Error import NoSuchStreamHash log = logging.getLogger(__name__) @@ -736,26 +738,78 @@ class Daemon(AuthJSONRPCServer): EncryptedFileStreamType, file_opener_factory) return defer.succeed(None) - def _download_sd_blob(self, sd_hash, timeout=None): - timeout = timeout if timeout is not None else conf.settings['sd_download_timeout'] + def _download_sd_blob(self, sd_blob_hash, rate_manager=None, timeout=None): + """ + Download a sd blob and register it with the stream info manager + Use this when downloading a sd blob as part of a stream download - def cb(result): - if not r.called: - r.callback(result) + :param sd_blob_hash (str): sd blob hash + :param rate_manager (PaymentRateManager), optional: the payment rate manager to use, + defaults to session.payment_rate_manager + :param timeout (int): sd blob timeout + + :return: decoded sd blob + """ + timeout = timeout if timeout is not None else conf.settings['sd_download_timeout'] + rate_manager = rate_manager or self.session.payment_rate_manager + + def cb(sd_blob): + if not finished_d.called: + finished_d.callback(sd_blob) def eb(): - if not r.called: - log.error("sd blob download timed out: %s", sd_hash) - r.errback(Exception("sd timeout")) + if not finished_d.called: + finished_d.errback(Exception("Blob (%s) download timed out" % + sd_blob_hash[:SHORT_ID_LEN])) + + def save_sd_blob(sd_blob): + d = defer.succeed(read_sd_blob(sd_blob)) + d.addCallback(lambda decoded: save_sd_info(self.stream_info_manager, decoded)) + d.addCallback(self.stream_info_manager.save_sd_blob_hash_to_stream, sd_blob_hash) + d.addCallback(lambda _: sd_blob) + return d + + def read_sd_blob(sd_blob): + sd_blob_file = sd_blob.open_for_reading() + decoded_sd_blob = json.loads(sd_blob_file.read()) + sd_blob.close_read_handle(sd_blob_file) + return decoded_sd_blob + + finished_d = defer.Deferred() + finished_d.addCallback(save_sd_blob) - r = defer.Deferred(None) reactor.callLater(timeout, eb) - d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager) - d.addErrback(log.fail(), "Error downloading sd blob: %s", sd_hash) - d.addCallback(BlobStreamDescriptorReader) - d.addCallback(lambda blob: blob.get_info()) + d = download_sd_blob(self.session, sd_blob_hash, rate_manager) d.addCallback(cb) - return r + return finished_d + + def _download_blob(self, blob_hash, rate_manager=None, timeout=None): + """ + Download a blob + + :param blob_hash (str): blob hash + :param rate_manager (PaymentRateManager), optional: the payment rate manager to use, + defaults to session.payment_rate_manager + :param timeout (int): blob timeout + :return: BlobFile + """ + + def cb(blob): + if not finished_d.called: + finished_d.callback(blob) + + def eb(): + if not finished_d.called: + finished_d.errback(Exception("Blob (%s) download timed out" % + blob_hash[:SHORT_ID_LEN])) + + rate_manager = rate_manager or self.session.payment_rate_manager + timeout = timeout or 30 + finished_d = defer.Deferred(None) + reactor.callLater(timeout, eb) + d = download_sd_blob(self.session, blob_hash, rate_manager) + d.addCallback(cb) + return finished_d @defer.inlineCallbacks def _download_name(self, name, timeout=None, download_directory=None, @@ -1018,6 +1072,26 @@ class Daemon(AuthJSONRPCServer): ]) return d + def get_blobs_for_stream_hash(self, stream_hash): + def _iter_blobs(blob_hashes): + for blob_hash, blob_num, blob_iv, blob_length in blob_hashes: + if blob_hash: + yield self.session.blob_manager.get_blob(blob_hash, length=blob_length) + + def _get_blobs(blob_hashes): + dl = defer.DeferredList(list(_iter_blobs(blob_hashes)), consumeErrors=True) + dl.addCallback(lambda blobs: [blob[1] for blob in blobs if blob[0]]) + return dl + + d = self.stream_info_manager.get_blobs_for_stream(stream_hash) + d.addCallback(_get_blobs) + return d + + def get_blobs_for_sd_hash(self, sd_hash): + d = self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + d.addCallback(self.get_blobs_for_stream_hash) + return d + ############################################################################ # # # JSON-RPC API methods start here # @@ -1308,7 +1382,6 @@ class Daemon(AuthJSONRPCServer): ) return self._render_response(textwrap.dedent(fn.__doc__)) - def jsonrpc_commands(self): """ Return a list of available commands @@ -2002,28 +2075,94 @@ class Daemon(AuthJSONRPCServer): @AuthJSONRPCServer.auth_required def jsonrpc_download_descriptor(self, **kwargs): """ - DEPRECATED. Use `blob_get` instead + DEPRECATED. Use `descriptor_get` instead """ - return self.jsonrpc_blob_get(**kwargs) + return self.jsonrpc_descriptor_get(**kwargs) @AuthJSONRPCServer.auth_required - def jsonrpc_blob_get(self, sd_hash, timeout=None): + @defer.inlineCallbacks + def jsonrpc_descriptor_get(self, sd_hash, timeout=None, payment_rate_manager=None): """ Download and return a sd blob Args: sd_hash timeout (optional) + payment_rate_manager (optional): if not given the default payment rate manager + will be used. supported alternative rate managers: + only-free + Returns - sd blob, dict + Success/Fail message or decoded data """ - if timeout is None: - timeout = conf.settings['sd_download_timeout'] - d = self._download_sd_blob(sd_hash, timeout) - d.addCallbacks( - lambda r: self._render_response(r), - lambda _: self._render_response(False)) - return d + + payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager) + decoded_sd_blob = yield self._download_sd_blob(sd_hash, payment_rate_manager, + timeout=timeout) + result = yield self._render_response(decoded_sd_blob) + defer.returnValue(result) + + @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks + def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): + """ + Download and return a blob + + Args: + blob_hash + timeout (optional) + encoding (optional): by default no attempt at decoding is made + can be set to one of the following decoders: + json + payment_rate_manager (optional): if not given the default payment rate manager + will be used. supported alternative rate managers: + only-free + + Returns + Success/Fail message or decoded data + """ + + decoders = { + 'json': json.loads + } + + timeout = timeout or 30 + payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager) + blob = yield self._download_blob(blob_hash, rate_manager=payment_rate_manager, + timeout=timeout) + if encoding and encoding in decoders: + blob_file = blob.open_for_reading() + result = decoders[encoding](blob_file.read()) + blob.close_read_handle(blob_file) + else: + result = "Downloaded blob %s" % blob_hash + + response = yield self._render_response(result) + defer.returnValue(response) + + @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks + def jsonrpc_blob_delete(self, blob_hash): + """ + Delete a blob + + Args: + blob_hash + Returns: + Success/fail message + """ + + if blob_hash not in self.session.blob_manager.blobs: + response = yield self._render_response("Don't have that blob") + defer.returnValue(response) + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(blob_hash) + yield self.stream_info_manager.delete_stream(stream_hash) + except Exception as err: + pass + yield self.session.blob_manager.delete_blobs([blob_hash]) + response = yield self._render_response("Deleted %s" % blob_hash) + defer.returnValue(response) def jsonrpc_get_nametrie(self): """ @@ -2219,19 +2358,54 @@ class Daemon(AuthJSONRPCServer): """ return self.jsonrpc_blob_list() - def jsonrpc_blob_list(self): + @defer.inlineCallbacks + def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, + finished=None, page_size=None, page=None): """ - Returns all blob hashes + Returns blob hashes, if not given filters returns all blobs known by the blob manager Args: - None + uri (str, optional): filter by blobs in stream for winning claim + stream_hash (str, optional): filter by blobs in given stream hash + sd_hash (str, optional): filter by blobs in given sd hash + needed (bool, optional): only return needed blobs + finished (bool, optional): only return finished blobs + page_size (int, optional): limit number of results returned + page (int, optional): filter to page x of [page_size] results Returns: list of blob hashes """ - d = self.session.blob_manager.get_all_verified_blobs() - d.addCallback(lambda r: self._render_response(r)) - return d + if uri: + metadata = yield self._resolve_name(uri) + sd_hash = get_sd_hash(metadata) + blobs = yield self.get_blobs_for_sd_hash(sd_hash) + elif stream_hash: + try: + blobs = yield self.get_blobs_for_stream_hash(stream_hash) + except NoSuchStreamHash: + blobs = [] + elif sd_hash: + try: + blobs = yield self.get_blobs_for_sd_hash(sd_hash) + except NoSuchSDHash: + blobs = [] + else: + blobs = self.session.blob_manager.blobs.itervalues() + + if needed: + blobs = [blob for blob in blobs if not blob.is_validated()] + if finished: + blobs = [blob for blob in blobs if blob.is_validated()] + + blob_hashes = [blob.blob_hash for blob in blobs] + page_size = page_size or len(blob_hashes) + page = page or 0 + start_index = page * page_size + stop_index = start_index + page_size + blob_hashes_for_return = blob_hashes[start_index:stop_index] + response = yield self._render_response(blob_hashes_for_return) + defer.returnValue(response) def jsonrpc_reflect_all_blobs(self): """ @@ -2295,6 +2469,7 @@ class Daemon(AuthJSONRPCServer): sd_blob_file = sd_blob.open_for_reading() decoded_sd_blob = json.loads(sd_blob_file.read()) sd_blob.close_read_handle(sd_blob_file) + return decoded_sd_blob metadata = yield self._resolve_name(name) sd_hash = get_sd_hash(metadata) @@ -2698,3 +2873,14 @@ def get_lbry_file_search_value(search_fields): if value: return searchtype, value raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) + + +def get_blob_payment_rate_manager(session, payment_rate_manager=None): + if payment_rate_manager: + rate_managers = { + 'only-free': OnlyFreePaymentsManager() + } + if payment_rate_manager in rate_managers: + payment_rate_manager = rate_managers[payment_rate_manager] + log.info("Downloading blob with rate manager: %s", payment_rate_manager) + return payment_rate_manager or session.payment_rate_manager