blob commands

refactor blob_get
add descriptor_get
add blob_delete
refactor blob_list
This commit is contained in:
Jack Robison 2017-02-15 23:39:17 -05:00
parent 25ec8fde23
commit 3ddf8680fb
5 changed files with 245 additions and 42 deletions

View file

@ -80,7 +80,7 @@ class NoSuchBlobError(Exception):
pass pass
class NoSuchStreamHashError(Exception): class NoSuchStreamHash(Exception):
pass pass

View file

@ -4,7 +4,7 @@ import os
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -93,6 +93,9 @@ class DBEncryptedFileMetadataManager(object):
def get_sd_blob_hashes_for_stream(self, stream_hash): def get_sd_blob_hashes_for_stream(self, stream_hash):
return self._get_sd_blob_hashes_for_stream(stream_hash) return self._get_sd_blob_hashes_for_stream(stream_hash)
def get_stream_hash_for_sd_hash(self, sd_hash):
return self._get_stream_hash_for_sd_blob_hash(sd_hash)
def _open_db(self): def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due # check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the # to a bug in twisted, where the connection is closed by a different thread than the
@ -131,7 +134,7 @@ class DBEncryptedFileMetadataManager(object):
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) "select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback( d.addCallback(
lambda result: result[0][0] if result else Failure(NoSuchStreamHashError(stream_hash))) lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, s_h): def do_delete(transaction, s_h):
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
@ -166,7 +169,7 @@ class DBEncryptedFileMetadataManager(object):
if res: if res:
return res[0] return res[0]
else: else:
raise NoSuchStreamHashError(stream_hash) raise NoSuchStreamHash(stream_hash)
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
"select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?", "select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?",
@ -257,6 +260,20 @@ class DBEncryptedFileMetadataManager(object):
d.addCallback(lambda results: [r[0] for r in results]) d.addCallback(lambda results: [r[0] for r in results])
return d return d
@rerun_if_locked
def _get_stream_hash_for_sd_blob_hash(self, sd_blob_hash):
def _handle_result(result):
if not result:
raise NoSuchSDHash(sd_blob_hash)
return result[0][0]
log.debug("Looking up sd blob hashes for sd blob hash %s", str(sd_blob_hash))
d = self.db_conn.runQuery(
"select stream_hash from lbry_file_descriptors where sd_blob_hash = ?",
(sd_blob_hash,))
d.addCallback(_handle_result)
return d
class TempEncryptedFileMetadataManager(object): class TempEncryptedFileMetadataManager(object):
def __init__(self): def __init__(self):

View file

@ -7,7 +7,7 @@ from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHashError from lbrynet.core.Error import NoSuchSDHash, NoSuchStreamHash
from lbrynet.core.utils import short_hash from lbrynet.core.utils import short_hash
from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver
@ -111,7 +111,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
if sd_hash: if sd_hash:
self.sd_hash = sd_hash[0] self.sd_hash = sd_hash[0]
else: else:
raise NoSuchStreamHashError(self.stream_hash) raise NoSuchStreamHash(self.stream_hash)
stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
if stream_metadata: if stream_metadata:
name, txid, nout = stream_metadata name, txid, nout = stream_metadata

View file

@ -7,7 +7,7 @@ import sqlite3
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -150,7 +150,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier):
def _delete_stream(self, stream_hash): def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, s_h): def do_delete(transaction, s_h):
transaction.execute("delete from live_streams where stream_hash = ?", (s_h,)) transaction.execute("delete from live_streams where stream_hash = ?", (s_h,))
@ -183,7 +183,7 @@ class DBLiveStreamMetadataManager(DHTHashSupplier):
def _get_stream_info(self, stream_hash): def _get_stream_info(self, stream_hash):
d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?", d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?",
(stream_hash,)) (stream_hash,))
d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHash(stream_hash)))
return d return d
@rerun_if_locked @rerun_if_locked

View file

@ -29,6 +29,7 @@ from lbrynet.metadata.Metadata import verify_name_characters
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
@ -37,16 +38,17 @@ from lbrynet.lbrynet_daemon.Downloader import GetStream
from lbrynet.lbrynet_daemon.Publisher import Publisher from lbrynet.lbrynet_daemon.Publisher import Publisher
from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager from lbrynet.lbrynet_daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer from lbrynet.lbrynet_daemon.auth.server import AuthJSONRPCServer
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core import log_support, utils, file_utils from lbrynet.core import log_support, utils, file_utils
from lbrynet.core import system_info from lbrynet.core import system_info
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage
from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -736,26 +738,78 @@ class Daemon(AuthJSONRPCServer):
EncryptedFileStreamType, file_opener_factory) EncryptedFileStreamType, file_opener_factory)
return defer.succeed(None) return defer.succeed(None)
def _download_sd_blob(self, sd_hash, timeout=None): def _download_sd_blob(self, sd_blob_hash, rate_manager=None, timeout=None):
timeout = timeout if timeout is not None else conf.settings['sd_download_timeout'] """
Download a sd blob and register it with the stream info manager
Use this when downloading a sd blob as part of a stream download
def cb(result): :param sd_blob_hash (str): sd blob hash
if not r.called: :param rate_manager (PaymentRateManager), optional: the payment rate manager to use,
r.callback(result) defaults to session.payment_rate_manager
:param timeout (int): sd blob timeout
:return: decoded sd blob
"""
timeout = timeout if timeout is not None else conf.settings['sd_download_timeout']
rate_manager = rate_manager or self.session.payment_rate_manager
def cb(sd_blob):
if not finished_d.called:
finished_d.callback(sd_blob)
def eb(): def eb():
if not r.called: if not finished_d.called:
log.error("sd blob download timed out: %s", sd_hash) finished_d.errback(Exception("Blob (%s) download timed out" %
r.errback(Exception("sd timeout")) sd_blob_hash[:SHORT_ID_LEN]))
def save_sd_blob(sd_blob):
d = defer.succeed(read_sd_blob(sd_blob))
d.addCallback(lambda decoded: save_sd_info(self.stream_info_manager, decoded))
d.addCallback(self.stream_info_manager.save_sd_blob_hash_to_stream, sd_blob_hash)
d.addCallback(lambda _: sd_blob)
return d
def read_sd_blob(sd_blob):
sd_blob_file = sd_blob.open_for_reading()
decoded_sd_blob = json.loads(sd_blob_file.read())
sd_blob.close_read_handle(sd_blob_file)
return decoded_sd_blob
finished_d = defer.Deferred()
finished_d.addCallback(save_sd_blob)
r = defer.Deferred(None)
reactor.callLater(timeout, eb) reactor.callLater(timeout, eb)
d = download_sd_blob(self.session, sd_hash, self.session.payment_rate_manager) d = download_sd_blob(self.session, sd_blob_hash, rate_manager)
d.addErrback(log.fail(), "Error downloading sd blob: %s", sd_hash)
d.addCallback(BlobStreamDescriptorReader)
d.addCallback(lambda blob: blob.get_info())
d.addCallback(cb) d.addCallback(cb)
return r return finished_d
def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
"""
Download a blob
:param blob_hash (str): blob hash
:param rate_manager (PaymentRateManager), optional: the payment rate manager to use,
defaults to session.payment_rate_manager
:param timeout (int): blob timeout
:return: BlobFile
"""
def cb(blob):
if not finished_d.called:
finished_d.callback(blob)
def eb():
if not finished_d.called:
finished_d.errback(Exception("Blob (%s) download timed out" %
blob_hash[:SHORT_ID_LEN]))
rate_manager = rate_manager or self.session.payment_rate_manager
timeout = timeout or 30
finished_d = defer.Deferred(None)
reactor.callLater(timeout, eb)
d = download_sd_blob(self.session, blob_hash, rate_manager)
d.addCallback(cb)
return finished_d
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, timeout=None, download_directory=None, def _download_name(self, name, timeout=None, download_directory=None,
@ -1018,6 +1072,26 @@ class Daemon(AuthJSONRPCServer):
]) ])
return d return d
def get_blobs_for_stream_hash(self, stream_hash):
def _iter_blobs(blob_hashes):
for blob_hash, blob_num, blob_iv, blob_length in blob_hashes:
if blob_hash:
yield self.session.blob_manager.get_blob(blob_hash, length=blob_length)
def _get_blobs(blob_hashes):
dl = defer.DeferredList(list(_iter_blobs(blob_hashes)), consumeErrors=True)
dl.addCallback(lambda blobs: [blob[1] for blob in blobs if blob[0]])
return dl
d = self.stream_info_manager.get_blobs_for_stream(stream_hash)
d.addCallback(_get_blobs)
return d
def get_blobs_for_sd_hash(self, sd_hash):
d = self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
d.addCallback(self.get_blobs_for_stream_hash)
return d
############################################################################ ############################################################################
# # # #
# JSON-RPC API methods start here # # JSON-RPC API methods start here #
@ -1308,7 +1382,6 @@ class Daemon(AuthJSONRPCServer):
) )
return self._render_response(textwrap.dedent(fn.__doc__)) return self._render_response(textwrap.dedent(fn.__doc__))
def jsonrpc_commands(self): def jsonrpc_commands(self):
""" """
Return a list of available commands Return a list of available commands
@ -2002,28 +2075,94 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_download_descriptor(self, **kwargs): def jsonrpc_download_descriptor(self, **kwargs):
""" """
DEPRECATED. Use `blob_get` instead DEPRECATED. Use `descriptor_get` instead
""" """
return self.jsonrpc_blob_get(**kwargs) return self.jsonrpc_descriptor_get(**kwargs)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_blob_get(self, sd_hash, timeout=None): @defer.inlineCallbacks
def jsonrpc_descriptor_get(self, sd_hash, timeout=None, payment_rate_manager=None):
""" """
Download and return a sd blob Download and return a sd blob
Args: Args:
sd_hash sd_hash
timeout (optional) timeout (optional)
payment_rate_manager (optional): if not given the default payment rate manager
will be used. supported alternative rate managers:
only-free
Returns Returns
sd blob, dict Success/Fail message or decoded data
""" """
if timeout is None:
timeout = conf.settings['sd_download_timeout'] payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager)
d = self._download_sd_blob(sd_hash, timeout) decoded_sd_blob = yield self._download_sd_blob(sd_hash, payment_rate_manager,
d.addCallbacks( timeout=timeout)
lambda r: self._render_response(r), result = yield self._render_response(decoded_sd_blob)
lambda _: self._render_response(False)) defer.returnValue(result)
return d
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None):
"""
Download and return a blob
Args:
blob_hash
timeout (optional)
encoding (optional): by default no attempt at decoding is made
can be set to one of the following decoders:
json
payment_rate_manager (optional): if not given the default payment rate manager
will be used. supported alternative rate managers:
only-free
Returns
Success/Fail message or decoded data
"""
decoders = {
'json': json.loads
}
timeout = timeout or 30
payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager)
blob = yield self._download_blob(blob_hash, rate_manager=payment_rate_manager,
timeout=timeout)
if encoding and encoding in decoders:
blob_file = blob.open_for_reading()
result = decoders[encoding](blob_file.read())
blob.close_read_handle(blob_file)
else:
result = "Downloaded blob %s" % blob_hash
response = yield self._render_response(result)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_blob_delete(self, blob_hash):
"""
Delete a blob
Args:
blob_hash
Returns:
Success/fail message
"""
if blob_hash not in self.session.blob_manager.blobs:
response = yield self._render_response("Don't have that blob")
defer.returnValue(response)
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(blob_hash)
yield self.stream_info_manager.delete_stream(stream_hash)
except Exception as err:
pass
yield self.session.blob_manager.delete_blobs([blob_hash])
response = yield self._render_response("Deleted %s" % blob_hash)
defer.returnValue(response)
def jsonrpc_get_nametrie(self): def jsonrpc_get_nametrie(self):
""" """
@ -2219,19 +2358,54 @@ class Daemon(AuthJSONRPCServer):
""" """
return self.jsonrpc_blob_list() return self.jsonrpc_blob_list()
def jsonrpc_blob_list(self): @defer.inlineCallbacks
def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None,
finished=None, page_size=None, page=None):
""" """
Returns all blob hashes Returns blob hashes, if not given filters returns all blobs known by the blob manager
Args: Args:
None uri (str, optional): filter by blobs in stream for winning claim
stream_hash (str, optional): filter by blobs in given stream hash
sd_hash (str, optional): filter by blobs in given sd hash
needed (bool, optional): only return needed blobs
finished (bool, optional): only return finished blobs
page_size (int, optional): limit number of results returned
page (int, optional): filter to page x of [page_size] results
Returns: Returns:
list of blob hashes list of blob hashes
""" """
d = self.session.blob_manager.get_all_verified_blobs() if uri:
d.addCallback(lambda r: self._render_response(r)) metadata = yield self._resolve_name(uri)
return d sd_hash = get_sd_hash(metadata)
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
elif stream_hash:
try:
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
except NoSuchStreamHash:
blobs = []
elif sd_hash:
try:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
except NoSuchSDHash:
blobs = []
else:
blobs = self.session.blob_manager.blobs.itervalues()
if needed:
blobs = [blob for blob in blobs if not blob.is_validated()]
if finished:
blobs = [blob for blob in blobs if blob.is_validated()]
blob_hashes = [blob.blob_hash for blob in blobs]
page_size = page_size or len(blob_hashes)
page = page or 0
start_index = page * page_size
stop_index = start_index + page_size
blob_hashes_for_return = blob_hashes[start_index:stop_index]
response = yield self._render_response(blob_hashes_for_return)
defer.returnValue(response)
def jsonrpc_reflect_all_blobs(self): def jsonrpc_reflect_all_blobs(self):
""" """
@ -2295,6 +2469,7 @@ class Daemon(AuthJSONRPCServer):
sd_blob_file = sd_blob.open_for_reading() sd_blob_file = sd_blob.open_for_reading()
decoded_sd_blob = json.loads(sd_blob_file.read()) decoded_sd_blob = json.loads(sd_blob_file.read())
sd_blob.close_read_handle(sd_blob_file) sd_blob.close_read_handle(sd_blob_file)
return decoded_sd_blob
metadata = yield self._resolve_name(name) metadata = yield self._resolve_name(name)
sd_hash = get_sd_hash(metadata) sd_hash = get_sd_hash(metadata)
@ -2698,3 +2873,14 @@ def get_lbry_file_search_value(search_fields):
if value: if value:
return searchtype, value return searchtype, value
raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) raise NoValidSearch('{} is missing a valid search type'.format(search_fields))
def get_blob_payment_rate_manager(session, payment_rate_manager=None):
if payment_rate_manager:
rate_managers = {
'only-free': OnlyFreePaymentsManager()
}
if payment_rate_manager in rate_managers:
payment_rate_manager = rate_managers[payment_rate_manager]
log.info("Downloading blob with rate manager: %s", payment_rate_manager)
return payment_rate_manager or session.payment_rate_manager