diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 0848536a0..b34ee8dd9 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.7 +current_version = 0.3.8 commit = True tag = True message = Bump version: {current_version} -> {new_version} diff --git a/.pylintrc b/.pylintrc index 5e5aab35c..e49732469 100644 --- a/.pylintrc +++ b/.pylintrc @@ -298,7 +298,7 @@ ignored-classes=twisted.internet,RequestMessage # List of members which are set dynamically and missed by pylint inference # system, and so shouldn't trigger E1101 when accessed. Python regular # expressions are accepted. -generated-members= +generated-members=lbrynet.lbrynet_daemon.LBRYDaemon.Parameters [IMPORTS] diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index a0576d5af..49bd262cf 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -4,5 +4,5 @@ log = logging.getLogger(__name__) logging.getLogger(__name__).addHandler(logging.NullHandler()) log.setLevel(logging.ERROR) -__version__ = "0.3.7" +__version__ = "0.3.8" version = tuple(__version__.split('.')) \ No newline at end of file diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index aaf504080..ceb8a87a7 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -138,6 +138,11 @@ OK_CODE = 200 REMOTE_SERVER = "www.google.com" +class Parameters(object): + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + class LBRYDaemon(jsonrpc.JSONRPC): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -1013,97 +1018,37 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None, - file_name=None, stream_info=None, wait_for_write=True): + file_name=None, stream_info=None, wait_for_write=True): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ - - if not download_directory: - download_directory = self.download_directory - elif not os.path.isdir(download_directory): - download_directory = self.download_directory - - def _remove_from_wait(r): - del self.waiting_on[name] - return r - - def _setup_stream(stream_info): - if 'sources' in stream_info.keys(): - stream_hash = stream_info['sources']['lbry_sd_hash'] - else: - stream_hash = stream_info['stream_hash'] - - d = self._get_lbry_file_by_sd_hash(stream_hash) - def _add_results(l): - if l: - if os.path.isfile(os.path.join(self.download_directory, l.file_name)): - return defer.succeed((stream_info, l)) - return defer.succeed((stream_info, None)) - d.addCallback(_add_results) - return d - - def _wait_on_lbry_file(f): - if os.path.isfile(os.path.join(self.download_directory, f.file_name)): - written_file = file(os.path.join(self.download_directory, f.file_name)) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - - if not written_bytes: - d = defer.succeed(None) - d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f)) - return d - else: - return defer.succeed(_disp_file(f)) - - def _disp_file(f): - file_path = os.path.join(self.download_directory, f.file_name) - log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path) - return f - - def _get_stream(stream_info): - def _wait_for_write(): - try: - if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)): - written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name)) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() - else: - written_bytes = False - except: - written_bytes = False - - if not written_bytes: - d = defer.succeed(None) - d.addCallback(lambda _: reactor.callLater(1, _wait_for_write)) - return d - else: - return defer.succeed(None) - - self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet, - self.lbry_file_manager, max_key_fee=self.max_key_fee, - data_rate=self.data_rate, timeout=timeout, - download_directory=download_directory, file_name=file_name) - d = self.streams[name].start(stream_info, name) - if wait_for_write: - d.addCallback(lambda _: _wait_for_write()) - d.addCallback(lambda _: self.streams[name].downloader) - - return d + helper = _DownloadNameHelper( + self, name, timeout, download_directory, file_name, wait_for_write) if not stream_info: self.waiting_on[name] = True d = self._resolve_name(name) else: d = defer.succeed(stream_info) - d.addCallback(_setup_stream) - d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file)) + d.addCallback(helper._setup_stream) + d.addCallback(helper.wait_or_get_stream) if not stream_info: - d.addCallback(_remove_from_wait) + d.addCallback(helper._remove_from_wait) + return d + + def add_stream(self, name, timeout, download_directory, file_name, stream_info): + """Makes, adds and starts a stream""" + self.streams[name] = GetStream(self.sd_identifier, + self.session, + self.session.wallet, + self.lbry_file_manager, + max_key_fee=self.max_key_fee, + data_rate=self.data_rate, + timeout=timeout, + download_directory=download_directory, + file_name=file_name) + d = self.streams[name].start(stream_info, name) return d def _get_long_count_timestamp(self): @@ -1116,38 +1061,16 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _resolve_name(self, name, force_refresh=False): - def _cache_stream_info(stream_info): - def _add_txid(txid): - self.name_cache[name]['txid'] = txid - return defer.succeed(None) + """Resolves a name. Checks the cache first before going out to the blockchain. - self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()} - d = self.session.wallet.get_txid_for_name(name) - d.addCallback(_add_txid) - d.addCallback(lambda _: self._update_claim_cache()) - d.addCallback(lambda _: self.name_cache[name]['claim_metadata']) - - return d - - if not force_refresh: - if name in self.name_cache.keys(): - if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time: - log.info("[" + str(datetime.now()) + "] Returning cached stream info for lbry://" + name) - d = defer.succeed(self.name_cache[name]['claim_metadata']) - else: - log.info("[" + str(datetime.now()) + "] Refreshing stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - else: - log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - else: - log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) - - return d + Args: + name: the lbry:// to resolve + force_refresh: if True, always go out to the blockchain to resolve. + """ + if name.startswith('lbry://'): + raise ValueError('name %s should not start with lbry://') + helper = _ResolveNameHelper(self, name, force_refresh) + return helper.get_deferred() def _delete_lbry_file(self, lbry_file, delete_file=True): d = self.lbry_file_manager.delete_lbry_file(lbry_file) @@ -1670,65 +1593,54 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure) return d + def _process_get_parameters(self, p): + """Extract info from input parameters and fill in default values for `get` call.""" + # TODO: this process can be abstracted s.t. each method + # can spec what parameters it expects and how to set default values + timeout = p.get('timeout', self.download_timeout) + download_directory = p.get('download_directory', self.download_directory) + file_name = p.get('file_name') + stream_info = p.get('stream_info') + sd_hash = get_sd_hash(stream_info) + wait_for_write = p.get('wait_for_write', True) + name = p.get('name') + return Parameters( + timout=timeout, + download_directory=download_directory, + file_name=file_name, + stream_info=stream_info, + sd_hash=sd_hash, + wait_for_write=wait_for_write, + name=name + ) + def jsonrpc_get(self, p): - """ - Download stream from a LBRY uri + """Download stream from a LBRY uri. Args: 'name': name to download, string 'download_directory': optional, path to directory where file will be saved, string 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name + 'timout': optional + 'wait_for_write': optional, defaults to True Returns: 'stream_hash': hex string 'path': path of download """ - - if 'timeout' not in p.keys(): - timeout = self.download_timeout - else: - timeout = p['timeout'] - - if 'download_directory' not in p.keys(): - download_directory = self.download_directory - else: - download_directory = p['download_directory'] - - if 'file_name' in p.keys(): - file_name = p['file_name'] - else: - file_name = None - - if 'stream_info' in p.keys(): - stream_info = p['stream_info'] - if 'sources' in stream_info.keys(): - sd_hash = stream_info['sources']['lbry_sd_hash'] - else: - sd_hash = stream_info['stream_hash'] - else: - stream_info = None - - if 'wait_for_write' in p.keys(): - wait_for_write = p['wait_for_write'] - else: - wait_for_write = True - - if 'name' in p.keys(): - name = p['name'] - if p['name'] not in self.waiting_on.keys(): - d = self._download_name(name=name, timeout=timeout, download_directory=download_directory, - stream_info=stream_info, file_name=file_name, wait_for_write=wait_for_write) - d.addCallback(lambda l: {'stream_hash': sd_hash, - 'path': os.path.join(self.download_directory, l.file_name)} - if stream_info else - {'stream_hash': l.sd_hash, - 'path': os.path.join(self.download_directory, l.file_name)}) - d.addCallback(lambda message: self._render_response(message, OK_CODE)) - else: - d = server.failure - else: - d = server.failure - + params = self._process_get_parameters(p) + if not params.name: + return server.failure + if params.name in self.waiting_on: + return server.failure + d = self._download_name(name=params.name, + timeout=params.timeout, + download_directory=params.download_directory, + stream_info=params.stream_info, + file_name=params.file_name, + wait_for_write=params.wait_for_write) + d.addCallback(get_output_callback(params)) + d.addCallback(lambda message: self._render_response(message, OK_CODE)) return d def jsonrpc_stop_lbry_file(self, p): @@ -2275,8 +2187,161 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = threads.deferToThread(subprocess.Popen, ['open', '-R', path]) else: # No easy way to reveal specific files on Linux, so just open the containing directory - d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.dirname(path)]) + d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.path.dirname(path)]) d.addCallback(lambda _: self._render_response(True, OK_CODE)) return d + + +def get_sd_hash(stream_info): + if not stream_info: + return None + try: + return stream_info['sources']['lbry_sd_hash'] + except KeyError: + return stream_info.get('stream_hash') + + +def get_output_callback(params): + def callback(l): + return { + 'stream_hash': params.sd_hash if params.stream_info else l.sd_hash, + 'path': os.path.join(params.download_directory, l.file_name) + } + return callback + + +class _DownloadNameHelper(object): + def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None, + file_name=None, wait_for_write=True): + self.daemon = daemon + self.name = name + self.timeout = timeout + if not download_directory or not os.path.isdir(download_directory): + self.download_directory = daemon.download_directory + else: + self.download_directory = download_directory + self.file_name = file_name + self.wait_for_write = wait_for_write + + def _setup_stream(self, stream_info): + stream_hash = get_sd_hash(stream_info) + d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) + d.addCallback(self._add_results_callback(stream_info)) + return d + + def _add_results_callback(self, stream_info): + def add_results(l): + if l: + if os.path.isfile(os.path.join(self.download_directory, l.file_name)): + return defer.succeed((stream_info, l)) + return defer.succeed((stream_info, None)) + return add_results + + def wait_or_get_stream(self, args): + stream_info, lbry_file = args + if lbry_file: + return self._get_stream(stream_info) + else: + return self._wait_on_lbry_file(lbry_file) + + def _get_stream(self, stream_info): + d = self.daemon.add_stream( + self.name, self.timeout, self.download_directory, self.file_name, stream_info) + if self.wait_for_write: + d.addCallback(lambda _: self._wait_for_write()) + d.addCallback(lambda _: self.daemon.streams[self.name].downloader) + return d + + def _wait_for_write(self): + file_name = self.daemon.streams[self.name].downloader.file_name + written_bytes = self.get_written_bytes(file_name) + d = defer.succeed(None) + if not written_bytes: + d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) + return d + + def _wait_on_lbry_file(self, f): + written_bytes = self.get_written_bytes(f.file_name) + if not written_bytes: + d = defer.succeed(None) + d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f)) + return d + else: + return defer.succeed(self._disp_file(f)) + + def get_written_bytes(self, file_name): + try: + file_path = os.path.join(self.download_directory, file_name) + if os.path.isfile(file_path): + written_file = file(file_path) + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + written_file.close() + else: + written_bytes = False + except Exception: + writen_bytes = False + return written_bytes + + def _disp_file(self, f): + file_path = os.path.join(self.download_directory, f.file_name) + log.info("[%s] Already downloaded: %s --> %s", datetime.now(), f.sd_hash, file_path) + return f + + def _remove_from_wait(self, r): + del self.daemon.waiting_on[self.name] + return r + +class _ResolveNameHelper(object): + def __init__(self, daemon, name, force_refresh): + self.daemon = daemon + self.name = name + self.force_refresh = force_refresh + + def get_deferred(self): + if self.need_fresh_stream(): + log.info("Resolving stream info for lbry://%s", self.name) + d = self.wallet.get_stream_info_for_name(self.name) + d.addCallbacks(self._cache_stream_info, lambda _: defer.fail(UnknownNameError)) + else: + log.info("Returning cached stream info for lbry://%s", self.name) + d = defer.succeed(self.name_data['claim_metadata']) + return d + + @property + def name_data(self): + return self.daemon.name_cache[self.name] + + @property + def wallet(self): + return self.daemon.session.wallet + + def now(self): + return self.daemon._get_long_count_timestamp() + + def _add_txid(self, txid): + self.name_data['txid'] = txid + return defer.succeed(None) + + def _cache_stream_info(self, stream_info): + self.daemon.name_cache[self.name] = { + 'claim_metadata': stream_info, + 'timestamp': self.now() + } + d = self.wallet.get_txid_for_name(self.name) + d.addCallback(self._add_txid) + d.addCallback(lambda _: self.daemon._update_claim_cache()) + d.addCallback(lambda _: self.name_data['claim_metadata']) + return d + + def need_fresh_stream(self): + return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired() + + def is_in_cache(self): + return self.name in self.daemon.name_cache + + def is_cached_name_expired(self): + time_in_cache = self.now() - self.name_data['timestamp'] + return time_in_cache >= self.daemon.cache_time diff --git a/packaging/ubuntu/lbry.desktop b/packaging/ubuntu/lbry.desktop index 1b0f41790..1c28d3553 100644 --- a/packaging/ubuntu/lbry.desktop +++ b/packaging/ubuntu/lbry.desktop @@ -1,5 +1,5 @@ [Desktop Entry] -Version=0.3.7 +Version=0.3.8 Name=LBRY Comment=The world's first user-owned content marketplace Icon=lbry