diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 40f75156f..08bd27d3a 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -132,6 +132,11 @@ OK_CODE = 200 REMOTE_SERVER = "www.google.com" +class Parameters(object): + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + class LBRYDaemon(jsonrpc.JSONRPC): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -1102,97 +1107,38 @@ class LBRYDaemon(jsonrpc.JSONRPC): return r def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None, - file_name=None, stream_info=None, wait_for_write=True): + file_name=None, stream_info=None, wait_for_write=True): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ - - if not download_directory: - download_directory = self.download_directory - elif not os.path.isdir(download_directory): - download_directory = self.download_directory - - def _remove_from_wait(r): - del self.waiting_on[name] - return r - - def _setup_stream(stream_info): - if 'sources' in stream_info.keys(): - stream_hash = stream_info['sources']['lbry_sd_hash'] - else: - stream_hash = stream_info['stream_hash'] - - d = self._get_lbry_file_by_sd_hash(stream_hash) - def _add_results(l): - if l: - if os.path.isfile(os.path.join(self.download_directory, l.file_name)): - return defer.succeed((stream_info, l)) - return defer.succeed((stream_info, None)) - d.addCallback(_add_results) - return d - - def _wait_on_lbry_file(f): - if os.path.isfile(os.path.join(self.download_directory, f.file_name)): - written_file = file(os.path.join(self.download_directory, f.file_name)) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - - if not written_bytes: - d = defer.succeed(None) - d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f)) - return d - else: - return defer.succeed(_disp_file(f)) - - def _disp_file(f): - file_path = os.path.join(self.download_directory, f.file_name) - log.info("Already downloaded: " + str(f.sd_hash) + " --> " + file_path) - return f - - def _get_stream(stream_info): - def _wait_for_write(): - try: - if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)): - written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name)) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - except: - written_bytes = False - - if not written_bytes: - d = defer.succeed(None) - d.addCallback(lambda _: reactor.callLater(1, _wait_for_write)) - return d - else: - return defer.succeed(None) - - self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet, - self.lbry_file_manager, self.exchange_rate_manager, - max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout, - download_directory=download_directory, file_name=file_name) - d = self.streams[name].start(stream_info, name) - if wait_for_write: - d.addCallback(lambda _: _wait_for_write()) - d.addCallback(lambda _: self.streams[name].downloader) - - return d + helper = _DownloadNameHelper( + self, name, timeout, download_directory, file_name, wait_for_write) if not stream_info: self.waiting_on[name] = True d = self._resolve_name(name) else: d = defer.succeed(stream_info) - d.addCallback(_setup_stream) - d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file)) + d.addCallback(helper._setup_stream) + d.addCallback(helper.wait_or_get_stream) if not stream_info: - d.addCallback(_remove_from_wait) + d.addCallback(helper._remove_from_wait) + return d + + def add_stream(self, name, timeout, download_directory, file_name, stream_info): + """Makes, adds and starts a stream""" + self.streams[name] = GetStream(self.sd_identifier, + self.session, + self.session.wallet, + self.lbry_file_manager, + self.exchange_rate_manager, + max_key_fee=self.max_key_fee, + data_rate=self.data_rate, + timeout=timeout, + download_directory=download_directory, + file_name=file_name) + d = self.streams[name].start(stream_info, name) return d def _get_long_count_timestamp(self): @@ -1205,44 +1151,16 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _resolve_name(self, name, force_refresh=False): - try: - verify_name_characters(name) - except AssertionError: - log.error("Bad name") - return defer.fail(InvalidNameError("Bad name")) + """Resolves a name. Checks the cache first before going out to the blockchain. - def _cache_stream_info(stream_info): - def _add_txid(txid): - self.name_cache[name]['txid'] = txid - return defer.succeed(None) - - self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()} - d = self.session.wallet.get_txid_for_name(name) - d.addCallback(_add_txid) - d.addCallback(lambda _: self._update_claim_cache()) - d.addCallback(lambda _: self.name_cache[name]['claim_metadata']) - - return d - - if not force_refresh: - if name in self.name_cache.keys(): - if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time: - log.info("Returning cached stream info for lbry://" + name) - d = defer.succeed(self.name_cache[name]['claim_metadata']) - else: - log.info("Refreshing stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - else: - log.info("Resolving stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - else: - log.info("Resolving stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - - return d + Args: + name: the lbry:// to resolve + force_refresh: if True, always go out to the blockchain to resolve. + """ + if name.startswith('lbry://'): + raise ValueError('name %s should not start with lbry://') + helper = _ResolveNameHelper(self, name, force_refresh) + return helper.get_deferred() def _delete_lbry_file(self, lbry_file, delete_file=True): d = self.lbry_file_manager.delete_lbry_file(lbry_file) @@ -1790,65 +1708,54 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d + def _process_get_parameters(self, p): + """Extract info from input parameters and fill in default values for `get` call.""" + # TODO: this process can be abstracted s.t. each method + # can spec what parameters it expects and how to set default values + timeout = p.get('timeout', self.download_timeout) + download_directory = p.get('download_directory', self.download_directory) + file_name = p.get('file_name') + stream_info = p.get('stream_info') + sd_hash = get_sd_hash(stream_info) + wait_for_write = p.get('wait_for_write', True) + name = p.get('name') + return Parameters( + timeout=timeout, + download_directory=download_directory, + file_name=file_name, + stream_info=stream_info, + sd_hash=sd_hash, + wait_for_write=wait_for_write, + name=name + ) + def jsonrpc_get(self, p): - """ - Download stream from a LBRY uri + """Download stream from a LBRY uri. Args: 'name': name to download, string 'download_directory': optional, path to directory where file will be saved, string 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name + 'timeout': optional + 'wait_for_write': optional, defaults to True Returns: 'stream_hash': hex string 'path': path of download """ - - if 'timeout' not in p.keys(): - timeout = self.download_timeout - else: - timeout = p['timeout'] - - if 'download_directory' not in p.keys(): - download_directory = self.download_directory - else: - download_directory = p['download_directory'] - - if 'file_name' in p.keys(): - file_name = p['file_name'] - else: - file_name = None - - if 'stream_info' in p.keys(): - stream_info = p['stream_info'] - if 'sources' in stream_info.keys(): - sd_hash = stream_info['sources']['lbry_sd_hash'] - else: - sd_hash = stream_info['stream_hash'] - else: - stream_info = None - - if 'wait_for_write' in p.keys(): - wait_for_write = p['wait_for_write'] - else: - wait_for_write = True - - if 'name' in p.keys(): - name = p['name'] - if p['name'] not in self.waiting_on.keys(): - d = self._download_name(name=name, timeout=timeout, download_directory=download_directory, - stream_info=stream_info, file_name=file_name, wait_for_write=wait_for_write) - d.addCallback(lambda l: {'stream_hash': sd_hash, - 'path': os.path.join(self.download_directory, l.file_name)} - if stream_info else - {'stream_hash': l.sd_hash, - 'path': os.path.join(self.download_directory, l.file_name)}) - d.addCallback(lambda message: self._render_response(message, OK_CODE)) - else: - d = server.failure - else: - d = server.failure - + params = self._process_get_parameters(p) + if not params.name: + return server.failure + if params.name in self.waiting_on: + return server.failure + d = self._download_name(name=params.name, + timeout=params.timeout, + download_directory=params.download_directory, + stream_info=params.stream_info, + file_name=params.file_name, + wait_for_write=params.wait_for_write) + d.addCallback(get_output_callback(params)) + d.addCallback(lambda message: self._render_response(message, OK_CODE)) return d def jsonrpc_stop_lbry_file(self, p): @@ -2407,7 +2314,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = threads.deferToThread(subprocess.Popen, ['open', '-R', path]) else: # No easy way to reveal specific files on Linux, so just open the containing directory - d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.dirname(path)]) + d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.path.dirname(path)]) d.addCallback(lambda _: self._render_response(True, OK_CODE)) return d @@ -2447,3 +2354,165 @@ def get_version_from_tag(tag): return match.group(1) else: raise Exception('Failed to parse version from tag {}'.format(tag)) + + +def get_sd_hash(stream_info): + if not stream_info: + return None + try: + return stream_info['sources']['lbry_sd_hash'] + except KeyError: + return stream_info.get('stream_hash') + + +def get_output_callback(params): + def callback(l): + return { + 'stream_hash': params.sd_hash if params.stream_info else l.sd_hash, + 'path': os.path.join(params.download_directory, l.file_name) + } + return callback + + +class _DownloadNameHelper(object): + def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None, + file_name=None, wait_for_write=True): + self.daemon = daemon + self.name = name + self.timeout = timeout + if not download_directory or not os.path.isdir(download_directory): + self.download_directory = daemon.download_directory + else: + self.download_directory = download_directory + self.file_name = file_name + self.wait_for_write = wait_for_write + + def _setup_stream(self, stream_info): + stream_hash = get_sd_hash(stream_info) + d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) + d.addCallback(self._add_results_callback(stream_info)) + return d + + def _add_results_callback(self, stream_info): + def add_results(l): + if l: + if os.path.isfile(os.path.join(self.download_directory, l.file_name)): + return defer.succeed((stream_info, l)) + return defer.succeed((stream_info, None)) + return add_results + + def wait_or_get_stream(self, args): + stream_info, lbry_file = args + if lbry_file: + return self._wait_on_lbry_file(lbry_file) + else: + return self._get_stream(stream_info) + + def _get_stream(self, stream_info): + d = self.daemon.add_stream( + self.name, self.timeout, self.download_directory, self.file_name, stream_info) + if self.wait_for_write: + d.addCallback(lambda _: self._wait_for_write()) + d.addCallback(lambda _: self.daemon.streams[self.name].downloader) + return d + + def _wait_for_write(self): + d = defer.succeed(None) + if not self.has_downloader_wrote(): + d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) + return d + + def has_downloader_wrote(self): + downloader = self.daemon.streams[self.name].downloader + if not downloader: + return False + return self.get_written_bytes(downloader.file_name) + + def _wait_on_lbry_file(self, f): + written_bytes = self.get_written_bytes(f.file_name) + if written_bytes: + return defer.succeed(self._disp_file(f)) + d = defer.succeed(None) + d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f)) + return d + + def get_written_bytes(self, file_name): + """Returns the number of bytes written to `file_name`. + + Returns False if there were issues reading `file_name`. + """ + try: + file_path = os.path.join(self.download_directory, file_name) + if os.path.isfile(file_path): + written_file = file(file_path) + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + written_file.close() + else: + written_bytes = False + except Exception: + writen_bytes = False + return written_bytes + + def _disp_file(self, f): + file_path = os.path.join(self.download_directory, f.file_name) + log.info("Already downloaded: %s --> %s", f.sd_hash, file_path) + return f + + def _remove_from_wait(self, r): + del self.daemon.waiting_on[self.name] + return r + + +class _ResolveNameHelper(object): + def __init__(self, daemon, name, force_refresh): + self.daemon = daemon + self.name = name + self.force_refresh = force_refresh + + def get_deferred(self): + if self.need_fresh_stream(): + log.info("Resolving stream info for lbry://%s", self.name) + d = self.wallet.get_stream_info_for_name(self.name) + d.addCallbacks(self._cache_stream_info, lambda _: defer.fail(UnknownNameError)) + else: + log.info("Returning cached stream info for lbry://%s", self.name) + d = defer.succeed(self.name_data['claim_metadata']) + return d + + @property + def name_data(self): + return self.daemon.name_cache[self.name] + + @property + def wallet(self): + return self.daemon.session.wallet + + def now(self): + return self.daemon._get_long_count_timestamp() + + def _add_txid(self, txid): + self.name_data['txid'] = txid + return defer.succeed(None) + + def _cache_stream_info(self, stream_info): + self.daemon.name_cache[self.name] = { + 'claim_metadata': stream_info, + 'timestamp': self.now() + } + d = self.wallet.get_txid_for_name(self.name) + d.addCallback(self._add_txid) + d.addCallback(lambda _: self.daemon._update_claim_cache()) + d.addCallback(lambda _: self.name_data['claim_metadata']) + return d + + def need_fresh_stream(self): + return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired() + + def is_in_cache(self): + return self.name in self.daemon.name_cache + + def is_cached_name_expired(self): + time_in_cache = self.now() - self.name_data['timestamp'] + return time_in_cache >= self.daemon.cache_time + diff --git a/lbrynet/lbrynet_daemon/LBRYDownloader.py b/lbrynet/lbrynet_daemon/LBRYDownloader.py index 62d9cd33d..323882eaf 100644 --- a/lbrynet/lbrynet_daemon/LBRYDownloader.py +++ b/lbrynet/lbrynet_daemon/LBRYDownloader.py @@ -150,21 +150,19 @@ class GetStream(object): return self.finished def _start_download(self, downloader): - def _pay_key_fee(): - if self.fee is not None: - fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount - reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) - if reserved_points is None: - return defer.fail(InsufficientFundsError()) - return self.wallet.send_points_to_address(reserved_points, fee_lbc) - - return defer.succeed(None) - - d = _pay_key_fee() - + log.info('Starting download for %s', self.name) self.downloader = downloader self.download_path = os.path.join(downloader.download_directory, downloader.file_name) + d = self._pay_key_fee() d.addCallback(lambda _: log.info("Downloading %s --> %s", self.stream_hash, self.downloader.file_name)) d.addCallback(lambda _: self.downloader.start()) + def _pay_key_fee(self): + if self.fee is not None: + fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount + reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) + if reserved_points is None: + return defer.fail(InsufficientFundsError()) + return self.wallet.send_points_to_address(reserved_points, fee_lbc) + return defer.succeed(None)