Merge branch 'use-storage-functions'
This commit is contained in:
commit
a01dd0d21d
2 changed files with 28 additions and 58 deletions
|
@ -22,7 +22,7 @@ at anytime.
|
||||||
* handling decryption error for blobs encrypted with an invalid key
|
* handling decryption error for blobs encrypted with an invalid key
|
||||||
* handling stream with no data blob (https://github.com/lbryio/lbry/issues/905)
|
* handling stream with no data blob (https://github.com/lbryio/lbry/issues/905)
|
||||||
* fetching the external ip
|
* fetching the external ip
|
||||||
* `blob_list` failing with --uri parameter (https://github.com/lbryio/lbry/issues/895)
|
* `blob_list` returning an error with --uri parameter and incorrectly returning `[]` for streams where blobs are known (https://github.com/lbryio/lbry/issues/895)
|
||||||
* `get` failing with a non-useful error message when given a uri for a channel claim
|
* `get` failing with a non-useful error message when given a uri for a channel claim
|
||||||
* exception checking in several wallet unit tests
|
* exception checking in several wallet unit tests
|
||||||
* daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method
|
* daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method
|
||||||
|
|
|
@ -45,8 +45,8 @@ from lbrynet.core.Wallet import LBRYumWallet, ClaimOutpoint
|
||||||
from lbrynet.core.looping_call_manager import LoopingCallManager
|
from lbrynet.core.looping_call_manager import LoopingCallManager
|
||||||
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
||||||
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
||||||
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
|
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
|
||||||
from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
|
from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
|
||||||
from lbrynet.core.Error import NullFundsError, NegativeFundsError
|
from lbrynet.core.Error import NullFundsError, NegativeFundsError
|
||||||
from lbrynet.core.Peer import Peer
|
from lbrynet.core.Peer import Peer
|
||||||
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
|
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
|
||||||
|
@ -161,16 +161,6 @@ class AlwaysSend(object):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
# If an instance has a lot of blobs, this call might get very expensive.
|
|
||||||
# For reflector, with 50k blobs, it definitely has an impact on the first run
|
|
||||||
# But doesn't seem to impact performance after that.
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def calculate_available_blob_size(blob_manager):
|
|
||||||
blob_hashes = yield blob_manager.get_all_verified_blobs()
|
|
||||||
blobs = yield defer.DeferredList([blob_manager.get_blob(b) for b in blob_hashes])
|
|
||||||
defer.returnValue(sum(b.length for success, b in blobs if success and b.length))
|
|
||||||
|
|
||||||
|
|
||||||
class Daemon(AuthJSONRPCServer):
|
class Daemon(AuthJSONRPCServer):
|
||||||
"""
|
"""
|
||||||
LBRYnet daemon, a jsonrpc interface to lbry functions
|
LBRYnet daemon, a jsonrpc interface to lbry functions
|
||||||
|
@ -955,27 +945,6 @@ class Daemon(AuthJSONRPCServer):
|
||||||
log.debug("Collected %i lbry files", len(lbry_files))
|
log.debug("Collected %i lbry files", len(lbry_files))
|
||||||
defer.returnValue(lbry_files)
|
defer.returnValue(lbry_files)
|
||||||
|
|
||||||
# TODO: do this and get_blobs_for_sd_hash in the stream info manager
|
|
||||||
def get_blobs_for_stream_hash(self, stream_hash):
|
|
||||||
def _iter_blobs(blob_hashes):
|
|
||||||
for blob_hash, blob_num, blob_iv, blob_length in blob_hashes:
|
|
||||||
if blob_hash:
|
|
||||||
yield self.session.blob_manager.get_blob(blob_hash, length=blob_length)
|
|
||||||
|
|
||||||
def _get_blobs(blob_hashes):
|
|
||||||
dl = defer.DeferredList(list(_iter_blobs(blob_hashes)), consumeErrors=True)
|
|
||||||
dl.addCallback(lambda blobs: [blob[1] for blob in blobs if blob[0]])
|
|
||||||
return dl
|
|
||||||
|
|
||||||
d = self.session.storage.get_blobs_for_stream(stream_hash)
|
|
||||||
d.addCallback(_get_blobs)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def get_blobs_for_sd_hash(self, sd_hash):
|
|
||||||
d = self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
|
|
||||||
d.addCallback(self.get_blobs_for_stream_hash)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _get_single_peer_downloader(self):
|
def _get_single_peer_downloader(self):
|
||||||
downloader = SinglePeerDownloader()
|
downloader = SinglePeerDownloader()
|
||||||
downloader.setup(self.session.wallet)
|
downloader.setup(self.session.wallet)
|
||||||
|
@ -2832,17 +2801,19 @@ class Daemon(AuthJSONRPCServer):
|
||||||
if announce_all:
|
if announce_all:
|
||||||
yield self.session.blob_manager.immediate_announce_all_blobs()
|
yield self.session.blob_manager.immediate_announce_all_blobs()
|
||||||
else:
|
else:
|
||||||
|
blob_hashes = []
|
||||||
if blob_hash:
|
if blob_hash:
|
||||||
blob_hashes = [blob_hash]
|
blob_hashes = blob_hashes.append(blob_hashes)
|
||||||
elif stream_hash:
|
elif stream_hash:
|
||||||
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
|
pass
|
||||||
blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
|
||||||
elif sd_hash:
|
elif sd_hash:
|
||||||
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
|
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if
|
|
||||||
blob.get_is_verified()]
|
|
||||||
else:
|
else:
|
||||||
raise Exception('single argument must be specified')
|
raise Exception('single argument must be specified')
|
||||||
|
if not blob_hash:
|
||||||
|
blobs = yield self.storage.get_blobs_for_stream(stream_hash)
|
||||||
|
blob_hashes.extend([blob.blob_hash for blob in blobs if blob.get_is_verified()])
|
||||||
|
|
||||||
yield self.session.blob_manager._immediate_announce(blob_hashes)
|
yield self.session.blob_manager._immediate_announce(blob_hashes)
|
||||||
|
|
||||||
response = yield self._render_response(True)
|
response = yield self._render_response(True)
|
||||||
|
@ -2920,24 +2891,23 @@ class Daemon(AuthJSONRPCServer):
|
||||||
Returns:
|
Returns:
|
||||||
(list) List of blob hashes
|
(list) List of blob hashes
|
||||||
"""
|
"""
|
||||||
|
if uri or stream_hash or sd_hash:
|
||||||
if uri:
|
if uri:
|
||||||
metadata = yield self._resolve_name(uri)
|
metadata = yield self._resolve_name(uri)
|
||||||
sd_hash = utils.get_sd_hash(metadata)
|
sd_hash = utils.get_sd_hash(metadata)
|
||||||
try:
|
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
|
elif stream_hash:
|
||||||
except NoSuchSDHash:
|
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
|
||||||
blobs = []
|
elif sd_hash:
|
||||||
elif stream_hash:
|
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
try:
|
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
|
||||||
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
|
if stream_hash:
|
||||||
except NoSuchStreamHash:
|
blobs = yield self.session.storage.get_blobs_for_stream(stream_hash)
|
||||||
blobs = []
|
else:
|
||||||
elif sd_hash:
|
|
||||||
try:
|
|
||||||
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
|
|
||||||
except NoSuchSDHash:
|
|
||||||
blobs = []
|
blobs = []
|
||||||
|
# get_blobs_for_stream does not include the sd blob, so we'll add it manually
|
||||||
|
if sd_hash in self.session.blob_manager.blobs:
|
||||||
|
blobs = [self.session.blob_manager.blobs[sd_hash]] + blobs
|
||||||
else:
|
else:
|
||||||
blobs = self.session.blob_manager.blobs.itervalues()
|
blobs = self.session.blob_manager.blobs.itervalues()
|
||||||
|
|
||||||
|
@ -2946,7 +2916,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
if finished:
|
if finished:
|
||||||
blobs = [blob for blob in blobs if blob.get_is_verified()]
|
blobs = [blob for blob in blobs if blob.get_is_verified()]
|
||||||
|
|
||||||
blob_hashes = [blob.blob_hash for blob in blobs]
|
blob_hashes = [blob.blob_hash for blob in blobs if blob.blob_hash]
|
||||||
page_size = page_size or len(blob_hashes)
|
page_size = page_size or len(blob_hashes)
|
||||||
page = page or 0
|
page = page or 0
|
||||||
start_index = page * page_size
|
start_index = page * page_size
|
||||||
|
|
Loading…
Reference in a new issue