From f999073fb401c5bb1e1240c9d4e3178c61d7b0b4 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 20 Jul 2016 20:34:02 -0400 Subject: [PATCH] revert download refactor to be re-merged after fixes --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 385 +++++++++++---------------- 1 file changed, 160 insertions(+), 225 deletions(-) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 888406e09..b3b234736 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -143,11 +143,6 @@ OK_CODE = 200 REMOTE_SERVER = "www.google.com" -class Parameters(object): - def __init__(self, **kwargs): - self.__dict__.update(kwargs) - - class LBRYDaemon(jsonrpc.JSONRPC): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -1023,37 +1018,97 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None, - file_name=None, stream_info=None, wait_for_write=True): + file_name=None, stream_info=None, wait_for_write=True): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ - helper = _DownloadNameHelper( - self, name, timeout, download_directory, file_name, wait_for_write) + + if not download_directory: + download_directory = self.download_directory + elif not os.path.isdir(download_directory): + download_directory = self.download_directory + + def _remove_from_wait(r): + del self.waiting_on[name] + return r + + def _setup_stream(stream_info): + if 'sources' in stream_info.keys(): + stream_hash = stream_info['sources']['lbry_sd_hash'] + else: + stream_hash = stream_info['stream_hash'] + + d = self._get_lbry_file_by_sd_hash(stream_hash) + def _add_results(l): + if l: + if os.path.isfile(os.path.join(self.download_directory, l.file_name)): + return defer.succeed((stream_info, l)) + return defer.succeed((stream_info, None)) + d.addCallback(_add_results) + return d + + def _wait_on_lbry_file(f): + if os.path.isfile(os.path.join(self.download_directory, f.file_name)): + written_file = file(os.path.join(self.download_directory, f.file_name)) + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + written_file.close() + else: + written_bytes = False + + if not written_bytes: + d = defer.succeed(None) + d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f)) + return d + else: + return defer.succeed(_disp_file(f)) + + def _disp_file(f): + file_path = os.path.join(self.download_directory, f.file_name) + log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path) + return f + + def _get_stream(stream_info): + def _wait_for_write(): + try: + if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)): + written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name)) + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + written_file.close() + else: + written_bytes = False + except: + written_bytes = False + + if not written_bytes: + d = defer.succeed(None) + d.addCallback(lambda _: reactor.callLater(1, _wait_for_write)) + return d + else: + return defer.succeed(None) + + self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet, + self.lbry_file_manager, max_key_fee=self.max_key_fee, + data_rate=self.data_rate, timeout=timeout, + download_directory=download_directory, file_name=file_name) + d = self.streams[name].start(stream_info, name) + if wait_for_write: + d.addCallback(lambda _: _wait_for_write()) + d.addCallback(lambda _: self.streams[name].downloader) + + return d if not stream_info: self.waiting_on[name] = True d = self._resolve_name(name) else: d = defer.succeed(stream_info) - d.addCallback(helper._setup_stream) - d.addCallback(helper.wait_or_get_stream) + d.addCallback(_setup_stream) + d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file)) if not stream_info: - d.addCallback(helper._remove_from_wait) - return d - - def add_stream(self, name, timeout, download_directory, file_name, stream_info): - """Makes, adds and starts a stream""" - self.streams[name] = GetStream(self.sd_identifier, - self.session, - self.session.wallet, - self.lbry_file_manager, - max_key_fee=self.max_key_fee, - data_rate=self.data_rate, - timeout=timeout, - download_directory=download_directory, - file_name=file_name) - d = self.streams[name].start(stream_info, name) + d.addCallback(_remove_from_wait) return d def _get_long_count_timestamp(self): @@ -1066,16 +1121,38 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _resolve_name(self, name, force_refresh=False): - """Resolves a name. Checks the cache first before going out to the blockchain. + def _cache_stream_info(stream_info): + def _add_txid(txid): + self.name_cache[name]['txid'] = txid + return defer.succeed(None) - Args: - name: the lbry:// to resolve - force_refresh: if True, always go out to the blockchain to resolve. - """ - if name.startswith('lbry://'): - raise ValueError('name %s should not start with lbry://') - helper = _ResolveNameHelper(self, name, force_refresh) - return helper.get_deferred() + self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()} + d = self.session.wallet.get_txid_for_name(name) + d.addCallback(_add_txid) + d.addCallback(lambda _: self._update_claim_cache()) + d.addCallback(lambda _: self.name_cache[name]['claim_metadata']) + + return d + + if not force_refresh: + if name in self.name_cache.keys(): + if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time: + log.info("[" + str(datetime.now()) + "] Returning cached stream info for lbry://" + name) + d = defer.succeed(self.name_cache[name]['claim_metadata']) + else: + log.info("[" + str(datetime.now()) + "] Refreshing stream info for lbry://" + name) + d = self.session.wallet.get_stream_info_for_name(name) + d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) + else: + log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name) + d = self.session.wallet.get_stream_info_for_name(name) + d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) + else: + log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name) + d = self.session.wallet.get_stream_info_for_name(name) + d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) + + return d def _delete_lbry_file(self, lbry_file, delete_file=True): d = self.lbry_file_manager.delete_lbry_file(lbry_file) @@ -1598,54 +1675,65 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure) return d - def _process_get_parameters(self, p): - """Extract info from input parameters and fill in default values for `get` call.""" - # TODO: this process can be abstracted s.t. each method - # can spec what parameters it expects and how to set default values - timeout = p.get('timeout', self.download_timeout) - download_directory = p.get('download_directory', self.download_directory) - file_name = p.get('file_name') - stream_info = p.get('stream_info') - sd_hash = get_sd_hash(stream_info) - wait_for_write = p.get('wait_for_write', True) - name = p.get('name') - return Parameters( - timout=timeout, - download_directory=download_directory, - file_name=file_name, - stream_info=stream_info, - sd_hash=sd_hash, - wait_for_write=wait_for_write, - name=name - ) - def jsonrpc_get(self, p): - """Download stream from a LBRY uri. + """ + Download stream from a LBRY uri Args: 'name': name to download, string 'download_directory': optional, path to directory where file will be saved, string 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name - 'timout': optional - 'wait_for_write': optional, defaults to True Returns: 'stream_hash': hex string 'path': path of download """ - params = self._process_get_parameters(p) - if not params.name: - return server.failure - if params.name in self.waiting_on: - return server.failure - d = self._download_name(name=params.name, - timeout=params.timeout, - download_directory=params.download_directory, - stream_info=params.stream_info, - file_name=params.file_name, - wait_for_write=params.wait_for_write) - d.addCallback(get_output_callback(params)) - d.addCallback(lambda message: self._render_response(message, OK_CODE)) + + if 'timeout' not in p.keys(): + timeout = self.download_timeout + else: + timeout = p['timeout'] + + if 'download_directory' not in p.keys(): + download_directory = self.download_directory + else: + download_directory = p['download_directory'] + + if 'file_name' in p.keys(): + file_name = p['file_name'] + else: + file_name = None + + if 'stream_info' in p.keys(): + stream_info = p['stream_info'] + if 'sources' in stream_info.keys(): + sd_hash = stream_info['sources']['lbry_sd_hash'] + else: + sd_hash = stream_info['stream_hash'] + else: + stream_info = None + + if 'wait_for_write' in p.keys(): + wait_for_write = p['wait_for_write'] + else: + wait_for_write = True + + if 'name' in p.keys(): + name = p['name'] + if p['name'] not in self.waiting_on.keys(): + d = self._download_name(name=name, timeout=timeout, download_directory=download_directory, + stream_info=stream_info, file_name=file_name, wait_for_write=wait_for_write) + d.addCallback(lambda l: {'stream_hash': sd_hash, + 'path': os.path.join(self.download_directory, l.file_name)} + if stream_info else + {'stream_hash': l.sd_hash, + 'path': os.path.join(self.download_directory, l.file_name)}) + d.addCallback(lambda message: self._render_response(message, OK_CODE)) + else: + d = server.failure + else: + d = server.failure + return d def jsonrpc_stop_lbry_file(self, p): @@ -2192,161 +2280,8 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = threads.deferToThread(subprocess.Popen, ['open', '-R', path]) else: # No easy way to reveal specific files on Linux, so just open the containing directory - d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.path.dirname(path)]) + d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.dirname(path)]) d.addCallback(lambda _: self._render_response(True, OK_CODE)) return d - - -def get_sd_hash(stream_info): - if not stream_info: - return None - try: - return stream_info['sources']['lbry_sd_hash'] - except KeyError: - return stream_info.get('stream_hash') - - -def get_output_callback(params): - def callback(l): - return { - 'stream_hash': params.sd_hash if params.stream_info else l.sd_hash, - 'path': os.path.join(params.download_directory, l.file_name) - } - return callback - - -class _DownloadNameHelper(object): - def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None, - file_name=None, wait_for_write=True): - self.daemon = daemon - self.name = name - self.timeout = timeout - if not download_directory or not os.path.isdir(download_directory): - self.download_directory = daemon.download_directory - else: - self.download_directory = download_directory - self.file_name = file_name - self.wait_for_write = wait_for_write - - def _setup_stream(self, stream_info): - stream_hash = get_sd_hash(stream_info) - d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) - d.addCallback(self._add_results_callback(stream_info)) - return d - - def _add_results_callback(self, stream_info): - def add_results(l): - if l: - if os.path.isfile(os.path.join(self.download_directory, l.file_name)): - return defer.succeed((stream_info, l)) - return defer.succeed((stream_info, None)) - return add_results - - def wait_or_get_stream(self, args): - stream_info, lbry_file = args - if lbry_file: - return self._get_stream(stream_info) - else: - return self._wait_on_lbry_file(lbry_file) - - def _get_stream(self, stream_info): - d = self.daemon.add_stream( - self.name, self.timeout, self.download_directory, self.file_name, stream_info) - if self.wait_for_write: - d.addCallback(lambda _: self._wait_for_write()) - d.addCallback(lambda _: self.daemon.streams[self.name].downloader) - return d - - def _wait_for_write(self): - file_name = self.daemon.streams[self.name].downloader.file_name - written_bytes = self.get_written_bytes(file_name) - d = defer.succeed(None) - if not written_bytes: - d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) - return d - - def _wait_on_lbry_file(self, f): - written_bytes = self.get_written_bytes(f.file_name) - if not written_bytes: - d = defer.succeed(None) - d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f)) - return d - else: - return defer.succeed(self._disp_file(f)) - - def get_written_bytes(self, file_name): - try: - file_path = os.path.join(self.download_directory, file_name) - if os.path.isfile(file_path): - written_file = file(file_path) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - except Exception: - writen_bytes = False - return written_bytes - - def _disp_file(self, f): - file_path = os.path.join(self.download_directory, f.file_name) - log.info("Already downloaded: %s --> %s", f.sd_hash, file_path) - return f - - def _remove_from_wait(self, r): - del self.daemon.waiting_on[self.name] - return r - -class _ResolveNameHelper(object): - def __init__(self, daemon, name, force_refresh): - self.daemon = daemon - self.name = name - self.force_refresh = force_refresh - - def get_deferred(self): - if self.need_fresh_stream(): - log.info("Resolving stream info for lbry://%s", self.name) - d = self.wallet.get_stream_info_for_name(self.name) - d.addCallbacks(self._cache_stream_info, lambda _: defer.fail(UnknownNameError)) - else: - log.info("Returning cached stream info for lbry://%s", self.name) - d = defer.succeed(self.name_data['claim_metadata']) - return d - - @property - def name_data(self): - return self.daemon.name_cache[self.name] - - @property - def wallet(self): - return self.daemon.session.wallet - - def now(self): - return self.daemon._get_long_count_timestamp() - - def _add_txid(self, txid): - self.name_data['txid'] = txid - return defer.succeed(None) - - def _cache_stream_info(self, stream_info): - self.daemon.name_cache[self.name] = { - 'claim_metadata': stream_info, - 'timestamp': self.now() - } - d = self.wallet.get_txid_for_name(self.name) - d.addCallback(self._add_txid) - d.addCallback(lambda _: self.daemon._update_claim_cache()) - d.addCallback(lambda _: self.name_data['claim_metadata']) - return d - - def need_fresh_stream(self): - return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired() - - def is_in_cache(self): - return self.name in self.daemon.name_cache - - def is_cached_name_expired(self): - time_in_cache = self.now() - self.name_data['timestamp'] - return time_in_cache >= self.daemon.cache_time