Merge branch 'master' into update_fix
This commit is contained in:
commit
dafa80ce29
11 changed files with 106 additions and 118 deletions
|
@ -22,12 +22,12 @@ at anytime.
|
||||||
* handling decryption error for blobs encrypted with an invalid key
|
* handling decryption error for blobs encrypted with an invalid key
|
||||||
* handling stream with no data blob (https://github.com/lbryio/lbry/issues/905)
|
* handling stream with no data blob (https://github.com/lbryio/lbry/issues/905)
|
||||||
* fetching the external ip
|
* fetching the external ip
|
||||||
* `blob_list` failing with --uri parameter (https://github.com/lbryio/lbry/issues/895)
|
* `blob_list` returning an error with --uri parameter and incorrectly returning `[]` for streams where blobs are known (https://github.com/lbryio/lbry/issues/895)
|
||||||
* `get` failing with a non-useful error message when given a uri for a channel claim
|
* `get` failing with a non-useful error message when given a uri for a channel claim
|
||||||
* exception checking in several wallet unit tests
|
* exception checking in several wallet unit tests
|
||||||
* daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method
|
* daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method
|
||||||
* `publish` command to allow updating claims with a `bid` amount higher than the wallet balance, so long as the amount is less than the wallet balance plus the bid amount of the claim being updated (https://github.com/lbryio/lbry/issues/748)
|
* `publish` command to allow updating claims with a `bid` amount higher than the wallet balance, so long as the amount is less than the wallet balance plus the bid amount of the claim being updated (https://github.com/lbryio/lbry/issues/748)
|
||||||
*
|
* incorrect `blob_num` for the stream terminator blob, which would result in creating invalid streams. Such invalid streams are detected on startup and are automatically removed (https://github.com/lbryio/lbry/issues/1124)
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
* `channel_list_mine`, replaced with `channel_list`
|
* `channel_list_mine`, replaced with `channel_list`
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
__version__ = "0.19.0rc35"
|
__version__ = "0.19.0rc37"
|
||||||
version = tuple(__version__.split('.'))
|
version = tuple(__version__.split('.'))
|
||||||
|
|
||||||
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import os
|
|
||||||
import binascii
|
import binascii
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import json
|
import json
|
||||||
|
@ -335,25 +334,6 @@ def get_sd_info(storage, stream_hash, include_blobs):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def create_plain_sd(storage, stream_hash, file_name, overwrite_existing=False):
|
|
||||||
def _get_file_name():
|
|
||||||
actual_file_name = file_name
|
|
||||||
if os.path.exists(actual_file_name):
|
|
||||||
ext_num = 1
|
|
||||||
while os.path.exists(actual_file_name + "_" + str(ext_num)):
|
|
||||||
ext_num += 1
|
|
||||||
actual_file_name = actual_file_name + "_" + str(ext_num)
|
|
||||||
return actual_file_name
|
|
||||||
|
|
||||||
if overwrite_existing is False:
|
|
||||||
file_name = yield threads.deferToThread(_get_file_name())
|
|
||||||
descriptor_writer = PlainStreamDescriptorWriter(file_name)
|
|
||||||
sd_info = yield get_sd_info(storage, stream_hash, True)
|
|
||||||
sd_hash = yield descriptor_writer.create_descriptor(sd_info)
|
|
||||||
defer.returnValue(sd_hash)
|
|
||||||
|
|
||||||
|
|
||||||
def get_blob_hashsum(b):
|
def get_blob_hashsum(b):
|
||||||
length = b['length']
|
length = b['length']
|
||||||
if length != 0:
|
if length != 0:
|
||||||
|
@ -377,13 +357,8 @@ def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos):
|
||||||
h.update(key)
|
h.update(key)
|
||||||
h.update(hex_suggested_file_name)
|
h.update(hex_suggested_file_name)
|
||||||
blobs_hashsum = get_lbry_hash_obj()
|
blobs_hashsum = get_lbry_hash_obj()
|
||||||
sorted_blob_infos = sorted(blob_infos, key=lambda x: x['blob_num'])
|
for blob in blob_infos:
|
||||||
for blob in sorted_blob_infos:
|
|
||||||
blobs_hashsum.update(get_blob_hashsum(blob))
|
blobs_hashsum.update(get_blob_hashsum(blob))
|
||||||
if sorted_blob_infos[-1]['length'] != 0:
|
|
||||||
raise InvalidStreamDescriptorError("Does not end with a zero-length blob.")
|
|
||||||
if 'blob_hash' in sorted_blob_infos[-1]:
|
|
||||||
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
|
|
||||||
h.update(blobs_hashsum.digest())
|
h.update(blobs_hashsum.digest())
|
||||||
return h.hexdigest()
|
return h.hexdigest()
|
||||||
|
|
||||||
|
@ -403,6 +378,12 @@ def validate_descriptor(stream_info):
|
||||||
blobs = stream_info['blobs']
|
blobs = stream_info['blobs']
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0]))
|
raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0]))
|
||||||
|
if stream_info['blobs'][-1]['length'] != 0:
|
||||||
|
raise InvalidStreamDescriptorError("Does not end with a zero-length blob.")
|
||||||
|
if any([False if blob_info['length'] > 0 else True for blob_info in stream_info['blobs'][:-1]]):
|
||||||
|
raise InvalidStreamDescriptorError("Contains zero-length data blob")
|
||||||
|
if 'blob_hash' in stream_info['blobs'][-1]:
|
||||||
|
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
|
||||||
|
|
||||||
verify_hex(key, "key")
|
verify_hex(key, "key")
|
||||||
verify_hex(hex_suggested_file_name, "suggested file name")
|
verify_hex(hex_suggested_file_name, "suggested file name")
|
||||||
|
@ -467,6 +448,11 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None):
|
||||||
sd_blob = yield downloader.download()
|
sd_blob = yield downloader.download()
|
||||||
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
||||||
sd_info = yield sd_reader.get_info()
|
sd_info = yield sd_reader.get_info()
|
||||||
|
try:
|
||||||
|
validate_descriptor(sd_info)
|
||||||
|
except InvalidStreamDescriptorError as err:
|
||||||
|
yield session.blob_manager.delete_blobs([blob_hash])
|
||||||
|
raise err
|
||||||
raw_sd = yield sd_reader._get_raw_data()
|
raw_sd = yield sd_reader._get_raw_data()
|
||||||
yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd))
|
yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd))
|
||||||
yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info)
|
yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info)
|
||||||
|
|
|
@ -86,10 +86,9 @@ class CryptStreamCreator(object):
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
if self.current_blob is not None:
|
if self.current_blob is not None:
|
||||||
self._close_current_blob()
|
self._close_current_blob()
|
||||||
self._finalize()
|
d = self._finalize()
|
||||||
dl = defer.DeferredList(self.finished_deferreds)
|
d.addCallback(lambda _: self._finished())
|
||||||
dl.addCallback(lambda _: self._finished())
|
return d
|
||||||
return dl
|
|
||||||
|
|
||||||
# TODO: move the stream creation process to its own thread and
|
# TODO: move the stream creation process to its own thread and
|
||||||
# remove the reactor from this process.
|
# remove the reactor from this process.
|
||||||
|
@ -112,6 +111,7 @@ class CryptStreamCreator(object):
|
||||||
|
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def _finalize(self):
|
def _finalize(self):
|
||||||
"""
|
"""
|
||||||
Finalize a stream by adding an empty
|
Finalize a stream by adding an empty
|
||||||
|
@ -119,14 +119,14 @@ class CryptStreamCreator(object):
|
||||||
the stream has ended. This empty blob is not
|
the stream has ended. This empty blob is not
|
||||||
saved to the blob manager
|
saved to the blob manager
|
||||||
"""
|
"""
|
||||||
log.debug("_finalize has been called")
|
|
||||||
|
yield defer.DeferredList(self.finished_deferreds)
|
||||||
self.blob_count += 1
|
self.blob_count += 1
|
||||||
iv = self.iv_generator.next()
|
iv = self.iv_generator.next()
|
||||||
final_blob_creator = self.blob_manager.get_blob_creator()
|
final_blob = self._get_blob_maker(iv, self.blob_manager.get_blob_creator())
|
||||||
final_blob = self._get_blob_maker(iv, final_blob_creator)
|
stream_terminator = yield final_blob.close()
|
||||||
d = final_blob.close()
|
terminator_info = yield self._blob_finished(stream_terminator)
|
||||||
d.addCallback(self._blob_finished)
|
defer.returnValue(terminator_info)
|
||||||
self.finished_deferreds.append(d)
|
|
||||||
|
|
||||||
def _write(self, data):
|
def _write(self, data):
|
||||||
while len(data) > 0:
|
while len(data) > 0:
|
||||||
|
|
|
@ -45,11 +45,12 @@ from lbrynet.core.Wallet import LBRYumWallet, ClaimOutpoint
|
||||||
from lbrynet.core.looping_call_manager import LoopingCallManager
|
from lbrynet.core.looping_call_manager import LoopingCallManager
|
||||||
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
||||||
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
||||||
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
|
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
|
||||||
from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
|
from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
|
||||||
from lbrynet.core.Error import NullFundsError, NegativeFundsError
|
from lbrynet.core.Error import NullFundsError, NegativeFundsError
|
||||||
from lbrynet.core.Peer import Peer
|
from lbrynet.core.Peer import Peer
|
||||||
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
|
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
|
||||||
|
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -161,16 +162,6 @@ class AlwaysSend(object):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
# If an instance has a lot of blobs, this call might get very expensive.
|
|
||||||
# For reflector, with 50k blobs, it definitely has an impact on the first run
|
|
||||||
# But doesn't seem to impact performance after that.
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def calculate_available_blob_size(blob_manager):
|
|
||||||
blob_hashes = yield blob_manager.get_all_verified_blobs()
|
|
||||||
blobs = yield defer.DeferredList([blob_manager.get_blob(b) for b in blob_hashes])
|
|
||||||
defer.returnValue(sum(b.length for success, b in blobs if success and b.length))
|
|
||||||
|
|
||||||
|
|
||||||
class Daemon(AuthJSONRPCServer):
|
class Daemon(AuthJSONRPCServer):
|
||||||
"""
|
"""
|
||||||
LBRYnet daemon, a jsonrpc interface to lbry functions
|
LBRYnet daemon, a jsonrpc interface to lbry functions
|
||||||
|
@ -622,7 +613,11 @@ class Daemon(AuthJSONRPCServer):
|
||||||
|
|
||||||
rate_manager = rate_manager or self.session.payment_rate_manager
|
rate_manager = rate_manager or self.session.payment_rate_manager
|
||||||
timeout = timeout or 30
|
timeout = timeout or 30
|
||||||
return download_sd_blob(self.session, blob_hash, rate_manager, timeout)
|
downloader = StandaloneBlobDownloader(
|
||||||
|
blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter,
|
||||||
|
rate_manager, self.session.wallet, timeout
|
||||||
|
)
|
||||||
|
return downloader.download()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_stream_analytics_report(self, claim_dict):
|
def _get_stream_analytics_report(self, claim_dict):
|
||||||
|
@ -949,27 +944,6 @@ class Daemon(AuthJSONRPCServer):
|
||||||
log.debug("Collected %i lbry files", len(lbry_files))
|
log.debug("Collected %i lbry files", len(lbry_files))
|
||||||
defer.returnValue(lbry_files)
|
defer.returnValue(lbry_files)
|
||||||
|
|
||||||
# TODO: do this and get_blobs_for_sd_hash in the stream info manager
|
|
||||||
def get_blobs_for_stream_hash(self, stream_hash):
|
|
||||||
def _iter_blobs(blob_hashes):
|
|
||||||
for blob_hash, blob_num, blob_iv, blob_length in blob_hashes:
|
|
||||||
if blob_hash:
|
|
||||||
yield self.session.blob_manager.get_blob(blob_hash, length=blob_length)
|
|
||||||
|
|
||||||
def _get_blobs(blob_hashes):
|
|
||||||
dl = defer.DeferredList(list(_iter_blobs(blob_hashes)), consumeErrors=True)
|
|
||||||
dl.addCallback(lambda blobs: [blob[1] for blob in blobs if blob[0]])
|
|
||||||
return dl
|
|
||||||
|
|
||||||
d = self.session.storage.get_blobs_for_stream(stream_hash)
|
|
||||||
d.addCallback(_get_blobs)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def get_blobs_for_sd_hash(self, sd_hash):
|
|
||||||
d = self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
|
|
||||||
d.addCallback(self.get_blobs_for_stream_hash)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _get_single_peer_downloader(self):
|
def _get_single_peer_downloader(self):
|
||||||
downloader = SinglePeerDownloader()
|
downloader = SinglePeerDownloader()
|
||||||
downloader.setup(self.session.wallet)
|
downloader.setup(self.session.wallet)
|
||||||
|
@ -2828,17 +2802,19 @@ class Daemon(AuthJSONRPCServer):
|
||||||
if announce_all:
|
if announce_all:
|
||||||
yield self.session.blob_manager.immediate_announce_all_blobs()
|
yield self.session.blob_manager.immediate_announce_all_blobs()
|
||||||
else:
|
else:
|
||||||
|
blob_hashes = []
|
||||||
if blob_hash:
|
if blob_hash:
|
||||||
blob_hashes = [blob_hash]
|
blob_hashes = blob_hashes.append(blob_hashes)
|
||||||
elif stream_hash:
|
elif stream_hash:
|
||||||
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
|
pass
|
||||||
blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
|
||||||
elif sd_hash:
|
elif sd_hash:
|
||||||
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
|
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if
|
|
||||||
blob.get_is_verified()]
|
|
||||||
else:
|
else:
|
||||||
raise Exception('single argument must be specified')
|
raise Exception('single argument must be specified')
|
||||||
|
if not blob_hash:
|
||||||
|
blobs = yield self.storage.get_blobs_for_stream(stream_hash)
|
||||||
|
blob_hashes.extend([blob.blob_hash for blob in blobs if blob.get_is_verified()])
|
||||||
|
|
||||||
yield self.session.blob_manager._immediate_announce(blob_hashes)
|
yield self.session.blob_manager._immediate_announce(blob_hashes)
|
||||||
|
|
||||||
response = yield self._render_response(True)
|
response = yield self._render_response(True)
|
||||||
|
@ -2916,24 +2892,23 @@ class Daemon(AuthJSONRPCServer):
|
||||||
Returns:
|
Returns:
|
||||||
(list) List of blob hashes
|
(list) List of blob hashes
|
||||||
"""
|
"""
|
||||||
|
if uri or stream_hash or sd_hash:
|
||||||
if uri:
|
if uri:
|
||||||
metadata = yield self._resolve_name(uri)
|
metadata = yield self._resolve_name(uri)
|
||||||
sd_hash = utils.get_sd_hash(metadata)
|
sd_hash = utils.get_sd_hash(metadata)
|
||||||
try:
|
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
|
|
||||||
except NoSuchSDHash:
|
|
||||||
blobs = []
|
|
||||||
elif stream_hash:
|
elif stream_hash:
|
||||||
try:
|
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
|
||||||
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
|
|
||||||
except NoSuchStreamHash:
|
|
||||||
blobs = []
|
|
||||||
elif sd_hash:
|
elif sd_hash:
|
||||||
try:
|
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
|
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
|
||||||
except NoSuchSDHash:
|
if stream_hash:
|
||||||
|
blobs = yield self.session.storage.get_blobs_for_stream(stream_hash)
|
||||||
|
else:
|
||||||
blobs = []
|
blobs = []
|
||||||
|
# get_blobs_for_stream does not include the sd blob, so we'll add it manually
|
||||||
|
if sd_hash in self.session.blob_manager.blobs:
|
||||||
|
blobs = [self.session.blob_manager.blobs[sd_hash]] + blobs
|
||||||
else:
|
else:
|
||||||
blobs = self.session.blob_manager.blobs.itervalues()
|
blobs = self.session.blob_manager.blobs.itervalues()
|
||||||
|
|
||||||
|
@ -2942,7 +2917,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
if finished:
|
if finished:
|
||||||
blobs = [blob for blob in blobs if blob.get_is_verified()]
|
blobs = [blob for blob in blobs if blob.get_is_verified()]
|
||||||
|
|
||||||
blob_hashes = [blob.blob_hash for blob in blobs]
|
blob_hashes = [blob.blob_hash for blob in blobs if blob.blob_hash]
|
||||||
page_size = page_size or len(blob_hashes)
|
page_size = page_size or len(blob_hashes)
|
||||||
page = page or 0
|
page = page or 0
|
||||||
start_index = page * page_size
|
start_index = page * page_size
|
||||||
|
|
|
@ -5,7 +5,7 @@ from twisted.internet.task import LoopingCall
|
||||||
|
|
||||||
from lbryschema.fee import Fee
|
from lbryschema.fee import Fee
|
||||||
|
|
||||||
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed
|
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError
|
||||||
from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout
|
from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout
|
||||||
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
|
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
|
||||||
from lbrynet.core.StreamDescriptor import download_sd_blob
|
from lbrynet.core.StreamDescriptor import download_sd_blob
|
||||||
|
@ -204,14 +204,12 @@ class GetStream(object):
|
||||||
|
|
||||||
safe_start_looping_call(self.checker, 1)
|
safe_start_looping_call(self.checker, 1)
|
||||||
self.set_status(DOWNLOAD_METADATA_CODE, name)
|
self.set_status(DOWNLOAD_METADATA_CODE, name)
|
||||||
|
try:
|
||||||
sd_blob = yield self._download_sd_blob()
|
sd_blob = yield self._download_sd_blob()
|
||||||
|
|
||||||
yield self._download(sd_blob, name, key_fee, txid, nout, file_name)
|
yield self._download(sd_blob, name, key_fee, txid, nout, file_name)
|
||||||
self.set_status(DOWNLOAD_RUNNING_CODE, name)
|
self.set_status(DOWNLOAD_RUNNING_CODE, name)
|
||||||
|
|
||||||
try:
|
|
||||||
yield self.data_downloading_deferred
|
yield self.data_downloading_deferred
|
||||||
except DownloadDataTimeout as err:
|
except (DownloadDataTimeout, InvalidStreamDescriptorError) as err:
|
||||||
safe_stop_looping_call(self.checker)
|
safe_stop_looping_call(self.checker)
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ from twisted.internet import defer
|
||||||
from twisted.protocols.basic import FileSender
|
from twisted.protocols.basic import FileSender
|
||||||
|
|
||||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType
|
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType
|
||||||
from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash
|
from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor
|
||||||
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
|
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -40,11 +40,15 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
|
||||||
hexlify(self.name), hexlify(self.key), hexlify(self.name),
|
hexlify(self.name), hexlify(self.key), hexlify(self.name),
|
||||||
self.blob_infos
|
self.blob_infos
|
||||||
)
|
)
|
||||||
|
|
||||||
# generate the sd info
|
# generate the sd info
|
||||||
self.sd_info = format_sd_info(
|
self.sd_info = format_sd_info(
|
||||||
EncryptedFileStreamType, hexlify(self.name), hexlify(self.key),
|
EncryptedFileStreamType, hexlify(self.name), hexlify(self.key),
|
||||||
hexlify(self.name), self.stream_hash, self.blob_infos
|
hexlify(self.name), self.stream_hash, self.blob_infos
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# sanity check
|
||||||
|
validate_descriptor(self.sd_info)
|
||||||
return defer.succeed(self.stream_hash)
|
return defer.succeed(self.stream_hash)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -6,12 +6,12 @@ import logging
|
||||||
|
|
||||||
from twisted.internet import defer, task, reactor
|
from twisted.internet import defer, task, reactor
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
from lbrynet.core.Error import InvalidStreamDescriptorError
|
||||||
from lbrynet.reflector.reupload import reflect_stream
|
from lbrynet.reflector.reupload import reflect_stream
|
||||||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
||||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||||
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info
|
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor
|
||||||
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
|
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
|
||||||
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
|
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
|
||||||
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
|
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
|
||||||
|
@ -113,13 +113,23 @@ class EncryptedFileManager(object):
|
||||||
file_info['suggested_file_name']
|
file_info['suggested_file_name']
|
||||||
)
|
)
|
||||||
yield lbry_file.get_claim_info()
|
yield lbry_file.get_claim_info()
|
||||||
|
try:
|
||||||
|
# verify the stream is valid (we might have downloaded an invalid stream
|
||||||
|
# in the past when the validation check didn't work)
|
||||||
|
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
|
||||||
|
validate_descriptor(stream_info)
|
||||||
|
except InvalidStreamDescriptorError as err:
|
||||||
|
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
||||||
|
lbry_file.sd_hash, err.message)
|
||||||
|
yield lbry_file.delete_data()
|
||||||
|
yield self.session.storage.delete_stream(lbry_file.stream_hash)
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
# restore will raise an Exception if status is unknown
|
# restore will raise an Exception if status is unknown
|
||||||
lbry_file.restore(file_info['status'])
|
lbry_file.restore(file_info['status'])
|
||||||
self.lbry_files.append(lbry_file)
|
self.lbry_files.append(lbry_file)
|
||||||
except Exception:
|
except Exception:
|
||||||
log.warning("Failed to start %i", file_info['rowid'])
|
log.warning("Failed to start %i", file_info.get('rowid'))
|
||||||
continue
|
|
||||||
log.info("Started %i lbry files", len(self.lbry_files))
|
log.info("Started %i lbry files", len(self.lbry_files))
|
||||||
if self.auto_re_reflect is True:
|
if self.auto_re_reflect is True:
|
||||||
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
|
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
|
||||||
|
|
|
@ -5,6 +5,7 @@ from twisted.trial import unittest
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from lbrynet.database.storage import SQLiteStorage
|
from lbrynet.database.storage import SQLiteStorage
|
||||||
|
from lbrynet.core.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader
|
||||||
from lbrynet.core import BlobManager
|
from lbrynet.core import BlobManager
|
||||||
from lbrynet.core import Session
|
from lbrynet.core import Session
|
||||||
from lbrynet.core.server import DHTHashAnnouncer
|
from lbrynet.core.server import DHTHashAnnouncer
|
||||||
|
@ -15,6 +16,7 @@ from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir
|
||||||
|
|
||||||
MB = 2**20
|
MB = 2**20
|
||||||
|
|
||||||
|
|
||||||
def iv_generator():
|
def iv_generator():
|
||||||
while True:
|
while True:
|
||||||
yield '3' * AES.block_size
|
yield '3' * AES.block_size
|
||||||
|
@ -22,6 +24,7 @@ def iv_generator():
|
||||||
|
|
||||||
class CreateEncryptedFileTest(unittest.TestCase):
|
class CreateEncryptedFileTest(unittest.TestCase):
|
||||||
timeout = 5
|
timeout = 5
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
mocks.mock_conf_settings(self)
|
mocks.mock_conf_settings(self)
|
||||||
|
@ -57,16 +60,28 @@ class CreateEncryptedFileTest(unittest.TestCase):
|
||||||
def test_can_create_file(self):
|
def test_can_create_file(self):
|
||||||
expected_stream_hash = "41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c35" \
|
expected_stream_hash = "41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c35" \
|
||||||
"7b1acb742dd83151fb66393a7709e9f346260a4f4db6de10c25"
|
"7b1acb742dd83151fb66393a7709e9f346260a4f4db6de10c25"
|
||||||
expected_sd_hash = "bc435ae0c4659635e6514e05bb1fcd0d365b234f6f0e78002" \
|
expected_sd_hash = "db043b44384c149126685990f6bb6563aa565ae331303d522" \
|
||||||
"d2576ff84a0b8710a9847757a9aa8cbeda5a8e1aeafa48b"
|
"c8728fe0534dd06fbcacae92b0891787ad9b68ffc8d20c1"
|
||||||
filename = 'test.file'
|
filename = 'test.file'
|
||||||
lbry_file = yield self.create_file(filename)
|
lbry_file = yield self.create_file(filename)
|
||||||
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)
|
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash)
|
||||||
|
|
||||||
|
# read the sd blob file
|
||||||
|
sd_blob = self.blob_manager.blobs[sd_hash]
|
||||||
|
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
||||||
|
sd_file_info = yield sd_reader.get_info()
|
||||||
|
|
||||||
|
# this comes from the database, the blobs returned are sorted
|
||||||
|
sd_info = yield get_sd_info(self.session.storage, lbry_file.stream_hash, include_blobs=True)
|
||||||
|
self.assertDictEqual(sd_info, sd_file_info)
|
||||||
|
self.assertEqual(sd_info['stream_hash'], expected_stream_hash)
|
||||||
|
self.assertEqual(len(sd_info['blobs']), 3)
|
||||||
|
self.assertNotEqual(sd_info['blobs'][0]['length'], 0)
|
||||||
|
self.assertNotEqual(sd_info['blobs'][1]['length'], 0)
|
||||||
|
self.assertEqual(sd_info['blobs'][2]['length'], 0)
|
||||||
self.assertEqual(expected_stream_hash, lbry_file.stream_hash)
|
self.assertEqual(expected_stream_hash, lbry_file.stream_hash)
|
||||||
self.assertEqual(sd_hash, lbry_file.sd_hash)
|
self.assertEqual(sd_hash, lbry_file.sd_hash)
|
||||||
self.assertEqual(sd_hash, expected_sd_hash)
|
self.assertEqual(sd_hash, expected_sd_hash)
|
||||||
|
|
||||||
blobs = yield self.blob_manager.get_all_verified_blobs()
|
blobs = yield self.blob_manager.get_all_verified_blobs()
|
||||||
self.assertEqual(3, len(blobs))
|
self.assertEqual(3, len(blobs))
|
||||||
num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs()
|
num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs()
|
||||||
|
|
|
@ -14,7 +14,7 @@ jsonrpc==1.2
|
||||||
jsonrpclib==0.1.7
|
jsonrpclib==0.1.7
|
||||||
jsonschema==2.6.0
|
jsonschema==2.6.0
|
||||||
keyring==10.4.0
|
keyring==10.4.0
|
||||||
git+https://github.com/lbryio/lbryum.git@v3.2.0rc16#egg=lbryum
|
git+https://github.com/lbryio/lbryum.git@v3.2.0rc17#egg=lbryum
|
||||||
git+https://github.com/lbryio/lbryschema.git@v0.0.15rc2#egg=lbryschema
|
git+https://github.com/lbryio/lbryschema.git@v0.0.15rc2#egg=lbryschema
|
||||||
miniupnpc==1.9
|
miniupnpc==1.9
|
||||||
pbkdf2==1.3
|
pbkdf2==1.3
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -21,7 +21,7 @@ requires = [
|
||||||
'envparse',
|
'envparse',
|
||||||
'jsonrpc',
|
'jsonrpc',
|
||||||
'jsonschema',
|
'jsonschema',
|
||||||
'lbryum==3.2.0rc16',
|
'lbryum==3.2.0rc17',
|
||||||
'lbryschema==0.0.15rc2',
|
'lbryschema==0.0.15rc2',
|
||||||
'miniupnpc',
|
'miniupnpc',
|
||||||
'pycrypto',
|
'pycrypto',
|
||||||
|
|
Loading…
Reference in a new issue