Merge pull request #73 from lbryio/refactor-dl-name

Refactor jsonrpc_get, _download_name, _resolve_name
This commit is contained in:
Job Evers‐Meltzer 2016-07-20 07:07:02 -05:00 committed by GitHub
commit 34b8841f5c
2 changed files with 226 additions and 161 deletions

View file

@ -298,7 +298,7 @@ ignored-classes=twisted.internet,RequestMessage
# List of members which are set dynamically and missed by pylint inference # List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular # system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted. # expressions are accepted.
generated-members= generated-members=lbrynet.lbrynet_daemon.LBRYDaemon.Parameters
[IMPORTS] [IMPORTS]

View file

@ -138,6 +138,11 @@ OK_CODE = 200
REMOTE_SERVER = "www.google.com" REMOTE_SERVER = "www.google.com"
class Parameters(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class LBRYDaemon(jsonrpc.JSONRPC): class LBRYDaemon(jsonrpc.JSONRPC):
""" """
LBRYnet daemon, a jsonrpc interface to lbry functions LBRYnet daemon, a jsonrpc interface to lbry functions
@ -1013,97 +1018,37 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return defer.succeed(True) return defer.succeed(True)
def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None, def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None,
file_name=None, stream_info=None, wait_for_write=True): file_name=None, stream_info=None, wait_for_write=True):
""" """
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
""" """
helper = _DownloadNameHelper(
if not download_directory: self, name, timeout, download_directory, file_name, wait_for_write)
download_directory = self.download_directory
elif not os.path.isdir(download_directory):
download_directory = self.download_directory
def _remove_from_wait(r):
del self.waiting_on[name]
return r
def _setup_stream(stream_info):
if 'sources' in stream_info.keys():
stream_hash = stream_info['sources']['lbry_sd_hash']
else:
stream_hash = stream_info['stream_hash']
d = self._get_lbry_file_by_sd_hash(stream_hash)
def _add_results(l):
if l:
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
return defer.succeed((stream_info, l))
return defer.succeed((stream_info, None))
d.addCallback(_add_results)
return d
def _wait_on_lbry_file(f):
if os.path.isfile(os.path.join(self.download_directory, f.file_name)):
written_file = file(os.path.join(self.download_directory, f.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f))
return d
else:
return defer.succeed(_disp_file(f))
def _disp_file(f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path)
return f
def _get_stream(stream_info):
def _wait_for_write():
try:
if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)):
written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
except:
written_bytes = False
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, _wait_for_write))
return d
else:
return defer.succeed(None)
self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet,
self.lbry_file_manager, max_key_fee=self.max_key_fee,
data_rate=self.data_rate, timeout=timeout,
download_directory=download_directory, file_name=file_name)
d = self.streams[name].start(stream_info, name)
if wait_for_write:
d.addCallback(lambda _: _wait_for_write())
d.addCallback(lambda _: self.streams[name].downloader)
return d
if not stream_info: if not stream_info:
self.waiting_on[name] = True self.waiting_on[name] = True
d = self._resolve_name(name) d = self._resolve_name(name)
else: else:
d = defer.succeed(stream_info) d = defer.succeed(stream_info)
d.addCallback(_setup_stream) d.addCallback(helper._setup_stream)
d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file)) d.addCallback(helper.wait_or_get_stream)
if not stream_info: if not stream_info:
d.addCallback(_remove_from_wait) d.addCallback(helper._remove_from_wait)
return d
def add_stream(self, name, timeout, download_directory, file_name, stream_info):
"""Makes, adds and starts a stream"""
self.streams[name] = GetStream(self.sd_identifier,
self.session,
self.session.wallet,
self.lbry_file_manager,
max_key_fee=self.max_key_fee,
data_rate=self.data_rate,
timeout=timeout,
download_directory=download_directory,
file_name=file_name)
d = self.streams[name].start(stream_info, name)
return d return d
def _get_long_count_timestamp(self): def _get_long_count_timestamp(self):
@ -1116,38 +1061,16 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return defer.succeed(True) return defer.succeed(True)
def _resolve_name(self, name, force_refresh=False): def _resolve_name(self, name, force_refresh=False):
def _cache_stream_info(stream_info): """Resolves a name. Checks the cache first before going out to the blockchain.
def _add_txid(txid):
self.name_cache[name]['txid'] = txid
return defer.succeed(None)
self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()} Args:
d = self.session.wallet.get_txid_for_name(name) name: the lbry://<name> to resolve
d.addCallback(_add_txid) force_refresh: if True, always go out to the blockchain to resolve.
d.addCallback(lambda _: self._update_claim_cache()) """
d.addCallback(lambda _: self.name_cache[name]['claim_metadata']) if name.startswith('lbry://'):
raise ValueError('name %s should not start with lbry://')
return d helper = _ResolveNameHelper(self, name, force_refresh)
return helper.get_deferred()
if not force_refresh:
if name in self.name_cache.keys():
if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time:
log.info("[" + str(datetime.now()) + "] Returning cached stream info for lbry://" + name)
d = defer.succeed(self.name_cache[name]['claim_metadata'])
else:
log.info("[" + str(datetime.now()) + "] Refreshing stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
return d
def _delete_lbry_file(self, lbry_file, delete_file=True): def _delete_lbry_file(self, lbry_file, delete_file=True):
d = self.lbry_file_manager.delete_lbry_file(lbry_file) d = self.lbry_file_manager.delete_lbry_file(lbry_file)
@ -1670,65 +1593,54 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure) d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure)
return d return d
def _process_get_parameters(self, p):
"""Extract info from input parameters and fill in default values for `get` call."""
# TODO: this process can be abstracted s.t. each method
# can spec what parameters it expects and how to set default values
timeout = p.get('timeout', self.download_timeout)
download_directory = p.get('download_directory', self.download_directory)
file_name = p.get('file_name')
stream_info = p.get('stream_info')
sd_hash = get_sd_hash(stream_info)
wait_for_write = p.get('wait_for_write', True)
name = p.get('name')
return Parameters(
timout=timeout,
download_directory=download_directory,
file_name=file_name,
stream_info=stream_info,
sd_hash=sd_hash,
wait_for_write=wait_for_write,
name=name
)
def jsonrpc_get(self, p): def jsonrpc_get(self, p):
""" """Download stream from a LBRY uri.
Download stream from a LBRY uri
Args: Args:
'name': name to download, string 'name': name to download, string
'download_directory': optional, path to directory where file will be saved, string 'download_directory': optional, path to directory where file will be saved, string
'file_name': optional, a user specified name for the downloaded file 'file_name': optional, a user specified name for the downloaded file
'stream_info': optional, specified stream info overrides name 'stream_info': optional, specified stream info overrides name
'timout': optional
'wait_for_write': optional, defaults to True
Returns: Returns:
'stream_hash': hex string 'stream_hash': hex string
'path': path of download 'path': path of download
""" """
params = self._process_get_parameters(p)
if 'timeout' not in p.keys(): if not params.name:
timeout = self.download_timeout return server.failure
else: if params.name in self.waiting_on:
timeout = p['timeout'] return server.failure
d = self._download_name(name=params.name,
if 'download_directory' not in p.keys(): timeout=params.timeout,
download_directory = self.download_directory download_directory=params.download_directory,
else: stream_info=params.stream_info,
download_directory = p['download_directory'] file_name=params.file_name,
wait_for_write=params.wait_for_write)
if 'file_name' in p.keys(): d.addCallback(get_output_callback(params))
file_name = p['file_name'] d.addCallback(lambda message: self._render_response(message, OK_CODE))
else:
file_name = None
if 'stream_info' in p.keys():
stream_info = p['stream_info']
if 'sources' in stream_info.keys():
sd_hash = stream_info['sources']['lbry_sd_hash']
else:
sd_hash = stream_info['stream_hash']
else:
stream_info = None
if 'wait_for_write' in p.keys():
wait_for_write = p['wait_for_write']
else:
wait_for_write = True
if 'name' in p.keys():
name = p['name']
if p['name'] not in self.waiting_on.keys():
d = self._download_name(name=name, timeout=timeout, download_directory=download_directory,
stream_info=stream_info, file_name=file_name, wait_for_write=wait_for_write)
d.addCallback(lambda l: {'stream_hash': sd_hash,
'path': os.path.join(self.download_directory, l.file_name)}
if stream_info else
{'stream_hash': l.sd_hash,
'path': os.path.join(self.download_directory, l.file_name)})
d.addCallback(lambda message: self._render_response(message, OK_CODE))
else:
d = server.failure
else:
d = server.failure
return d return d
def jsonrpc_stop_lbry_file(self, p): def jsonrpc_stop_lbry_file(self, p):
@ -2275,8 +2187,161 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = threads.deferToThread(subprocess.Popen, ['open', '-R', path]) d = threads.deferToThread(subprocess.Popen, ['open', '-R', path])
else: else:
# No easy way to reveal specific files on Linux, so just open the containing directory # No easy way to reveal specific files on Linux, so just open the containing directory
d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.dirname(path)]) d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.path.dirname(path)])
d.addCallback(lambda _: self._render_response(True, OK_CODE)) d.addCallback(lambda _: self._render_response(True, OK_CODE))
return d return d
def get_sd_hash(stream_info):
if not stream_info:
return None
try:
return stream_info['sources']['lbry_sd_hash']
except KeyError:
return stream_info.get('stream_hash')
def get_output_callback(params):
def callback(l):
return {
'stream_hash': params.sd_hash if params.stream_info else l.sd_hash,
'path': os.path.join(params.download_directory, l.file_name)
}
return callback
class _DownloadNameHelper(object):
def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None,
file_name=None, wait_for_write=True):
self.daemon = daemon
self.name = name
self.timeout = timeout
if not download_directory or not os.path.isdir(download_directory):
self.download_directory = daemon.download_directory
else:
self.download_directory = download_directory
self.file_name = file_name
self.wait_for_write = wait_for_write
def _setup_stream(self, stream_info):
stream_hash = get_sd_hash(stream_info)
d = self.daemon._get_lbry_file_by_sd_hash(stream_hash)
d.addCallback(self._add_results_callback(stream_info))
return d
def _add_results_callback(self, stream_info):
def add_results(l):
if l:
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
return defer.succeed((stream_info, l))
return defer.succeed((stream_info, None))
return add_results
def wait_or_get_stream(self, args):
stream_info, lbry_file = args
if lbry_file:
return self._get_stream(stream_info)
else:
return self._wait_on_lbry_file(lbry_file)
def _get_stream(self, stream_info):
d = self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
if self.wait_for_write:
d.addCallback(lambda _: self._wait_for_write())
d.addCallback(lambda _: self.daemon.streams[self.name].downloader)
return d
def _wait_for_write(self):
file_name = self.daemon.streams[self.name].downloader.file_name
written_bytes = self.get_written_bytes(file_name)
d = defer.succeed(None)
if not written_bytes:
d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write))
return d
def _wait_on_lbry_file(self, f):
written_bytes = self.get_written_bytes(f.file_name)
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f))
return d
else:
return defer.succeed(self._disp_file(f))
def get_written_bytes(self, file_name):
try:
file_path = os.path.join(self.download_directory, file_name)
if os.path.isfile(file_path):
written_file = file(file_path)
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
except Exception:
writen_bytes = False
return written_bytes
def _disp_file(self, f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("[%s] Already downloaded: %s --> %s", datetime.now(), f.sd_hash, file_path)
return f
def _remove_from_wait(self, r):
del self.daemon.waiting_on[self.name]
return r
class _ResolveNameHelper(object):
def __init__(self, daemon, name, force_refresh):
self.daemon = daemon
self.name = name
self.force_refresh = force_refresh
def get_deferred(self):
if self.need_fresh_stream():
log.info("Resolving stream info for lbry://%s", self.name)
d = self.wallet.get_stream_info_for_name(self.name)
d.addCallbacks(self._cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("Returning cached stream info for lbry://%s", self.name)
d = defer.succeed(self.name_data['claim_metadata'])
return d
@property
def name_data(self):
return self.daemon.name_cache[self.name]
@property
def wallet(self):
return self.daemon.session.wallet
def now(self):
return self.daemon._get_long_count_timestamp()
def _add_txid(self, txid):
self.name_data['txid'] = txid
return defer.succeed(None)
def _cache_stream_info(self, stream_info):
self.daemon.name_cache[self.name] = {
'claim_metadata': stream_info,
'timestamp': self.now()
}
d = self.wallet.get_txid_for_name(self.name)
d.addCallback(self._add_txid)
d.addCallback(lambda _: self.daemon._update_claim_cache())
d.addCallback(lambda _: self.name_data['claim_metadata'])
return d
def need_fresh_stream(self):
return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired()
def is_in_cache(self):
return self.name in self.daemon.name_cache
def is_cached_name_expired(self):
time_in_cache = self.now() - self.name_data['timestamp']
return time_in_cache >= self.daemon.cache_time