From 64e04f8a68b784e59f221b994d75ddb834db0f2c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 4 Jan 2018 21:28:09 -0500 Subject: [PATCH] refactor start_lbry_file --- .../client/CryptStreamDownloader.py | 5 +- lbrynet/daemon/Daemon.py | 3 +- lbrynet/daemon/Downloader.py | 5 +- lbrynet/daemon/Publisher.py | 7 +- .../file_manager/EncryptedFileDownloader.py | 14 +-- lbrynet/file_manager/EncryptedFileManager.py | 98 ++++++++----------- .../lbry_file/EncryptedFileMetadataManager.py | 7 +- .../client/EncryptedFileDownloader.py | 47 ++------- lbrynet/reflector/server/server.py | 2 +- lbrynet/tests/functional/test_streamify.py | 10 +- .../client/test_EncryptedFileDownloader.py | 5 +- .../unit/lbrynet_daemon/test_Downloader.py | 2 +- 12 files changed, 81 insertions(+), 124 deletions(-) diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index bae9fffc7..706c12903 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -1,3 +1,4 @@ +import binascii import logging from zope.interface import implements from lbrynet.interfaces import IStreamDownloader @@ -61,8 +62,8 @@ class CryptStreamDownloader(object): self.blob_manager = blob_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.key = key - self.stream_name = stream_name + self.key = binascii.unhexlify(key) + self.stream_name = binascii.unhexlify(stream_name) self.completed = False self.stopped = True self.stopping = False diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index b9cee0ff4..b628831a2 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -670,8 +670,7 @@ class Daemon(AuthJSONRPCServer): self.streams[sd_hash] = GetStream(self.sd_identifier, self.session, self.exchange_rate_manager, self.max_key_fee, self.disable_max_key_fee, - conf.settings['data_rate'], timeout, - file_name) + conf.settings['data_rate'], timeout) try: lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index e3820521d..0cc4f7454 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -31,15 +31,13 @@ log = logging.getLogger(__name__) class GetStream(object): def __init__(self, sd_identifier, session, exchange_rate_manager, - max_key_fee, disable_max_key_fee, data_rate=None, timeout=None, - file_name=None): + max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): self.timeout = timeout or conf.settings['download_timeout'] self.data_rate = data_rate or conf.settings['data_rate'] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] self.disable_max_key_fee = disable_max_key_fee or conf.settings['disable_max_key_fee'] self.download_directory = conf.settings['download_directory'] - self.file_name = file_name self.timeout_counter = 0 self.code = None self.sd_hash = None @@ -126,7 +124,6 @@ class GetStream(object): [self.data_rate], self.payment_rate_manager, download_directory=self.download_directory, - file_name=self.file_name ) defer.returnValue(downloader) diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index f4e6e5501..569fa64ec 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -6,6 +6,7 @@ from twisted.internet import defer from lbrynet.core import file_utils from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbry_file.StreamDescriptor import publish_sd_blob @@ -36,13 +37,15 @@ class Publisher(object): read_handle) sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager, stream_hash) - self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash) + status = ManagedEncryptedFileDownloader.STATUS_FINISHED + self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash, + status=status) if 'source' not in claim_dict['stream']: claim_dict['stream']['source'] = {} claim_dict['stream']['source']['source'] = sd_hash claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash' claim_dict['stream']['source']['contentType'] = get_content_type(file_path) - claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here + claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) self.lbry_file.completed = True diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index f95757875..6e7491be6 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -37,14 +37,13 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, lbry_file_manager, payment_rate_manager, wallet, - download_directory, file_name=None, sd_hash=None, key=None, stream_name=None, + download_directory, sd_hash=None, key=None, stream_name=None, suggested_file_name=None): EncryptedFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, - download_directory, - file_name, key, stream_name, suggested_file_name) + download_directory, key, stream_name, suggested_file_name) self.sd_hash = sd_hash self.rowid = rowid self.lbry_file_manager = lbry_file_manager @@ -138,8 +137,7 @@ class ManagedEncryptedFileDownloaderFactory(object): return True @defer.inlineCallbacks - def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None, - file_name=None): + def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None): assert len(options) == 1 data_rate = options[0] stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager, @@ -147,9 +145,11 @@ class ManagedEncryptedFileDownloaderFactory(object): if metadata.metadata_source == StreamMetadata.FROM_BLOB: yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash) - lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager, + lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, + metadata.source_blob_hash, + payment_rate_manager, data_rate, - download_directory, file_name) + download_directory) defer.returnValue(lbry_file) @staticmethod diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 02cb5f8b1..9d022c1d7 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -12,7 +12,7 @@ from lbrynet.reflector.reupload import reflect_stream from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType +from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType, get_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call @@ -85,6 +85,27 @@ class EncryptedFileManager(object): self.sd_identifier.add_stream_downloader_factory( EncryptedFileStreamType, downloader_factory) + def _get_lbry_file(self, rowid, stream_hash, payment_rate_manager, sd_hash, key, + stream_name, suggested_file_name, download_directory=None): + download_directory = download_directory or self.download_directory + payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager + return ManagedEncryptedFileDownloader( + rowid, + stream_hash, + self.session.peer_finder, + self.session.rate_limiter, + self.session.blob_manager, + self.stream_info_manager, + self, + payment_rate_manager, + self.session.wallet, + download_directory, + sd_hash=sd_hash, + key=key, + stream_name=stream_name, + suggested_file_name=suggested_file_name + ) + @defer.inlineCallbacks def _start_lbry_files(self): files_and_options = yield self._get_all_lbry_files() @@ -96,68 +117,29 @@ class EncryptedFileManager(object): if len(files_and_options) > 500 and i % 500 == 0: log.info("Started %i/%i files", i, len(stream_infos)) if stream_hash in stream_infos: - if stream_infos[stream_hash]['suggested_file_name']: - file_name = os.path.basename(stream_infos[stream_hash]['suggested_file_name']) - else: - file_name = os.path.basename(stream_infos[stream_hash]['stream_name']) - - lbry_file = ManagedEncryptedFileDownloader( - rowid, - stream_hash, - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.stream_info_manager, - self, - payment_rate_manager, - self.session.wallet, - self.download_directory, - file_name=file_name, - sd_hash=stream_infos[stream_hash]['sd_hash'], - key=stream_infos[stream_hash]['key'], - stream_name=stream_infos[stream_hash]['stream_name'], - suggested_file_name=stream_infos[stream_hash]['suggested_file_name'] - ) + lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, + stream_infos[stream_hash]['sd_hash'], + stream_infos[stream_hash]['key'], + stream_infos[stream_hash]['stream_name'], + stream_infos[stream_hash]['suggested_file_name']) + log.info("initialized file %s", lbry_file.stream_name) try: # restore will raise an Exception if status is unknown lbry_file.restore(status) + self.lbry_files.append(lbry_file) except Exception: log.warning("Failed to start %i", rowid) continue - self.lbry_files.append(lbry_file) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) - @defer.inlineCallbacks - def start_lbry_file(self, rowid, stream_hash, - payment_rate_manager, blob_data_rate=None, - download_directory=None, file_name=None): - if not download_directory: - download_directory = self.download_directory - payment_rate_manager.min_blob_data_payment_rate = blob_data_rate - lbry_file_downloader = ManagedEncryptedFileDownloader( - rowid, - stream_hash, - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.stream_info_manager, - self, - payment_rate_manager, - self.session.wallet, - download_directory, - file_name=file_name - ) - yield lbry_file_downloader.set_stream_info() - self.lbry_files.append(lbry_file_downloader) - defer.returnValue(lbry_file_downloader) - @defer.inlineCallbacks def _stop_lbry_file(self, lbry_file): def wait_for_finished(lbry_file, count=2): if count or lbry_file.saving_status is not False: - return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, count=count - 1) + return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, + count=count - 1) try: yield lbry_file.stop(change_status=False) self.lbry_files.remove(lbry_file) @@ -175,14 +157,18 @@ class EncryptedFileManager(object): yield self._stop_lbry_file(lbry_file) @defer.inlineCallbacks - def add_lbry_file(self, stream_hash, payment_rate_manager=None, blob_data_rate=None, - download_directory=None, file_name=None): - if not payment_rate_manager: - payment_rate_manager = self.session.payment_rate_manager + def add_lbry_file(self, stream_hash, sd_hash, payment_rate_manager=None, blob_data_rate=None, + download_directory=None, status=None): rowid = yield self._save_lbry_file(stream_hash, blob_data_rate) - lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager, - blob_data_rate, download_directory, - file_name) + stream_metadata = yield get_sd_info(self.stream_info_manager, + stream_hash, False) + key = stream_metadata['key'] + stream_name = stream_metadata['stream_name'] + suggested_file_name = stream_metadata['suggested_file_name'] + lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, sd_hash, key, + stream_name, suggested_file_name, download_directory) + lbry_file.restore(status or ManagedEncryptedFileDownloader.STATUS_STOPPED) + self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @defer.inlineCallbacks diff --git a/lbrynet/lbry_file/EncryptedFileMetadataManager.py b/lbrynet/lbry_file/EncryptedFileMetadataManager.py index b3f82d4eb..be189fdb6 100644 --- a/lbrynet/lbry_file/EncryptedFileMetadataManager.py +++ b/lbrynet/lbry_file/EncryptedFileMetadataManager.py @@ -1,7 +1,6 @@ import os import logging import sqlite3 -import binascii from twisted.internet import defer from twisted.python.failure import Failure from twisted.enterprise import adbapi @@ -227,9 +226,9 @@ class DBEncryptedFileMetadataManager(object): log.warning("Missing sd hash for %s", stream_hash) continue response[stream_hash]['rowid'] = rowid - response[stream_hash]['key'] = binascii.unhexlify(key) - response[stream_hash]['stream_name'] = binascii.unhexlify(stream_name) - response[stream_hash]['suggested_file_name'] = binascii.unhexlify(suggested_file_name) + response[stream_hash]['key'] = key + response[stream_hash]['stream_name'] = stream_name + response[stream_hash]['suggested_file_name'] = suggested_file_name defer.returnValue(response) @rerun_if_locked diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index 8454a7f72..17b3cf501 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -6,7 +6,6 @@ from lbrynet.lbry_file.StreamDescriptor import save_sd_info from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager from lbrynet.core.StreamDescriptor import StreamMetadata -from lbrynet.core.Error import NoSuchStreamHash from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler import os @@ -28,24 +27,8 @@ class EncryptedFileDownloader(CryptStreamDownloader): payment_rate_manager, wallet, key, stream_name) self.stream_hash = stream_hash self.stream_info_manager = stream_info_manager - self.suggested_file_name = suggested_file_name + self.suggested_file_name = binascii.unhexlify(suggested_file_name) self._calculated_total_bytes = None - self.sd_hash = None - - @defer.inlineCallbacks - def set_stream_info(self): - if self.key is None: - out = yield self.stream_info_manager.get_stream_info(self.stream_hash) - key, stream_name, suggested_file_name = out - self.key = binascii.unhexlify(key) - self.stream_name = binascii.unhexlify(stream_name) - self.suggested_file_name = binascii.unhexlify(suggested_file_name) - - out = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) - if out: - self.sd_hash = out[0] - else: - raise NoSuchStreamHash(self.stream_hash) def delete_data(self): d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) @@ -173,12 +156,12 @@ class EncryptedFileDownloaderFactory(object): class EncryptedFileSaver(EncryptedFileDownloader): def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, download_directory, key, stream_name, - suggested_file_name, file_name): + suggested_file_name): EncryptedFileDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, key, stream_name, suggested_file_name) self.download_directory = download_directory - self.file_name = file_name + self.file_name = os.path.basename(self.suggested_file_name) self.file_written_to = None self.file_handle = None @@ -188,19 +171,6 @@ class EncryptedFileSaver(EncryptedFileDownloader): else: return str(self.file_name) - def set_stream_info(self): - d = EncryptedFileDownloader.set_stream_info(self) - - def set_file_name(): - if self.file_name is None: - if self.suggested_file_name: - self.file_name = os.path.basename(self.suggested_file_name) - else: - self.file_name = os.path.basename(self.stream_name) - - d.addCallback(lambda _: set_file_name()) - return d - def stop(self, err=None): d = EncryptedFileDownloader.stop(self, err=err) d.addCallback(lambda _: self._delete_from_info_manager()) @@ -274,15 +244,14 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory): self.download_directory = download_directory def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): - stream_name = binascii.unhexlify(stream_info.raw_info['stream_name']) + stream_name = stream_info.raw_info['stream_name'] key = stream_info.raw_info['key'] - suggested_file_name = binascii.unhexlify(stream_info.raw_info['suggested_file_name']) - file_name = os.path.join(self.download_directory, os.path.basename(suggested_file_name)) - + suggested_file_name = stream_info.raw_info['suggested_file_name'] return EncryptedFileSaver(stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.stream_info_manager, - payment_rate_manager, self.wallet, self.download_directory, key, - stream_name, suggested_file_name, file_name) + payment_rate_manager, self.wallet, self.download_directory, + key=key, stream_name=stream_name, + suggested_file_name=suggested_file_name) @staticmethod def get_description(): diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 995c35141..74e457c1d 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -108,7 +108,7 @@ class ReflectorServer(Protocol): yield save_sd_info(self.stream_info_manager, sd_info) yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'], blob.blob_hash) - self.lbry_file_manager.add_lbry_file(sd_info['stream_hash']) + yield self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'], blob.blob_hash) should_announce = True # if we already have the head blob, set it to be announced now that we know it's diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index 9fe4a29c1..31e5a7dad 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -144,9 +144,10 @@ class TestStreamify(TestCase): d = lbry_file.start() return d - def combine_stream(stream_hash): + def combine_stream(info): + stream_hash, sd_hash = info prm = self.session.payment_rate_manager - d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) + d = self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash, prm) d.addCallback(start_lbry_file) def check_md5_sum(): @@ -163,8 +164,9 @@ class TestStreamify(TestCase): test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)])) stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, suggested_file_name="test_file") - yield publish_sd_blob(self.stream_info_manager, self.session.blob_manager, stream_hash) - defer.returnValue(stream_hash) + sd_hash = yield publish_sd_blob(self.stream_info_manager, self.session.blob_manager, + stream_hash) + defer.returnValue((stream_hash, sd_hash)) d = self.session.setup() d.addCallback(lambda _: self.stream_info_manager.setup()) diff --git a/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py b/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py index b61be5228..bc5a65251 100644 --- a/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py +++ b/lbrynet/tests/unit/lbryfile/client/test_EncryptedFileDownloader.py @@ -10,6 +10,7 @@ class TestEncryptedFileSaver(unittest.TestCase): @defer.inlineCallbacks def test_setup_output(self): file_name = 'encrypted_file_saver_test.tmp' + file_name_hex = file_name.encode('hex') self.assertFalse(os.path.isfile(file_name)) # create file in the temporary trial folder @@ -25,8 +26,8 @@ class TestEncryptedFileSaver(unittest.TestCase): saver = EncryptedFileSaver(stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, - download_directory, key, file_name, file_name, - file_name) + download_directory, key, + file_name_hex, file_name_hex) yield saver._setup_output() self.assertTrue(os.path.isfile(file_name)) diff --git a/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py b/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py index 1a650f495..c8ef2feb4 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/lbrynet/tests/unit/lbrynet_daemon/test_Downloader.py @@ -103,7 +103,7 @@ class GetStreamTests(unittest.TestCase): DownloadTimeoutError is raised """ def download_sd_blob(self): - raise DownloadSDTimeout(self.file_name) + raise DownloadSDTimeout(self) getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream)