Merge pull request #99 from lbryio/refactor-refresh

Add DownloadNameHelper and ResolveNameHelper
This commit is contained in:
Jack Robison 2016-08-06 20:06:43 -04:00 committed by GitHub
commit 01131da5b7
2 changed files with 245 additions and 178 deletions

View file

@ -132,6 +132,11 @@ OK_CODE = 200
REMOTE_SERVER = "www.google.com"
class Parameters(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class LBRYDaemon(jsonrpc.JSONRPC):
"""
LBRYnet daemon, a jsonrpc interface to lbry functions
@ -1102,97 +1107,38 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return r
def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None,
file_name=None, stream_info=None, wait_for_write=True):
file_name=None, stream_info=None, wait_for_write=True):
"""
Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file
"""
if not download_directory:
download_directory = self.download_directory
elif not os.path.isdir(download_directory):
download_directory = self.download_directory
def _remove_from_wait(r):
del self.waiting_on[name]
return r
def _setup_stream(stream_info):
if 'sources' in stream_info.keys():
stream_hash = stream_info['sources']['lbry_sd_hash']
else:
stream_hash = stream_info['stream_hash']
d = self._get_lbry_file_by_sd_hash(stream_hash)
def _add_results(l):
if l:
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
return defer.succeed((stream_info, l))
return defer.succeed((stream_info, None))
d.addCallback(_add_results)
return d
def _wait_on_lbry_file(f):
if os.path.isfile(os.path.join(self.download_directory, f.file_name)):
written_file = file(os.path.join(self.download_directory, f.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f))
return d
else:
return defer.succeed(_disp_file(f))
def _disp_file(f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("Already downloaded: " + str(f.sd_hash) + " --> " + file_path)
return f
def _get_stream(stream_info):
def _wait_for_write():
try:
if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)):
written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
except:
written_bytes = False
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, _wait_for_write))
return d
else:
return defer.succeed(None)
self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet,
self.lbry_file_manager, self.exchange_rate_manager,
max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout,
download_directory=download_directory, file_name=file_name)
d = self.streams[name].start(stream_info, name)
if wait_for_write:
d.addCallback(lambda _: _wait_for_write())
d.addCallback(lambda _: self.streams[name].downloader)
return d
helper = _DownloadNameHelper(
self, name, timeout, download_directory, file_name, wait_for_write)
if not stream_info:
self.waiting_on[name] = True
d = self._resolve_name(name)
else:
d = defer.succeed(stream_info)
d.addCallback(_setup_stream)
d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file))
d.addCallback(helper._setup_stream)
d.addCallback(helper.wait_or_get_stream)
if not stream_info:
d.addCallback(_remove_from_wait)
d.addCallback(helper._remove_from_wait)
return d
def add_stream(self, name, timeout, download_directory, file_name, stream_info):
"""Makes, adds and starts a stream"""
self.streams[name] = GetStream(self.sd_identifier,
self.session,
self.session.wallet,
self.lbry_file_manager,
self.exchange_rate_manager,
max_key_fee=self.max_key_fee,
data_rate=self.data_rate,
timeout=timeout,
download_directory=download_directory,
file_name=file_name)
d = self.streams[name].start(stream_info, name)
return d
def _get_long_count_timestamp(self):
@ -1205,44 +1151,16 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return defer.succeed(True)
def _resolve_name(self, name, force_refresh=False):
try:
verify_name_characters(name)
except AssertionError:
log.error("Bad name")
return defer.fail(InvalidNameError("Bad name"))
"""Resolves a name. Checks the cache first before going out to the blockchain.
def _cache_stream_info(stream_info):
def _add_txid(txid):
self.name_cache[name]['txid'] = txid
return defer.succeed(None)
self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()}
d = self.session.wallet.get_txid_for_name(name)
d.addCallback(_add_txid)
d.addCallback(lambda _: self._update_claim_cache())
d.addCallback(lambda _: self.name_cache[name]['claim_metadata'])
return d
if not force_refresh:
if name in self.name_cache.keys():
if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time:
log.info("Returning cached stream info for lbry://" + name)
d = defer.succeed(self.name_cache[name]['claim_metadata'])
else:
log.info("Refreshing stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("Resolving stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("Resolving stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
return d
Args:
name: the lbry://<name> to resolve
force_refresh: if True, always go out to the blockchain to resolve.
"""
if name.startswith('lbry://'):
raise ValueError('name %s should not start with lbry://')
helper = _ResolveNameHelper(self, name, force_refresh)
return helper.get_deferred()
def _delete_lbry_file(self, lbry_file, delete_file=True):
d = self.lbry_file_manager.delete_lbry_file(lbry_file)
@ -1790,65 +1708,54 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
def _process_get_parameters(self, p):
"""Extract info from input parameters and fill in default values for `get` call."""
# TODO: this process can be abstracted s.t. each method
# can spec what parameters it expects and how to set default values
timeout = p.get('timeout', self.download_timeout)
download_directory = p.get('download_directory', self.download_directory)
file_name = p.get('file_name')
stream_info = p.get('stream_info')
sd_hash = get_sd_hash(stream_info)
wait_for_write = p.get('wait_for_write', True)
name = p.get('name')
return Parameters(
timeout=timeout,
download_directory=download_directory,
file_name=file_name,
stream_info=stream_info,
sd_hash=sd_hash,
wait_for_write=wait_for_write,
name=name
)
def jsonrpc_get(self, p):
"""
Download stream from a LBRY uri
"""Download stream from a LBRY uri.
Args:
'name': name to download, string
'download_directory': optional, path to directory where file will be saved, string
'file_name': optional, a user specified name for the downloaded file
'stream_info': optional, specified stream info overrides name
'timeout': optional
'wait_for_write': optional, defaults to True
Returns:
'stream_hash': hex string
'path': path of download
"""
if 'timeout' not in p.keys():
timeout = self.download_timeout
else:
timeout = p['timeout']
if 'download_directory' not in p.keys():
download_directory = self.download_directory
else:
download_directory = p['download_directory']
if 'file_name' in p.keys():
file_name = p['file_name']
else:
file_name = None
if 'stream_info' in p.keys():
stream_info = p['stream_info']
if 'sources' in stream_info.keys():
sd_hash = stream_info['sources']['lbry_sd_hash']
else:
sd_hash = stream_info['stream_hash']
else:
stream_info = None
if 'wait_for_write' in p.keys():
wait_for_write = p['wait_for_write']
else:
wait_for_write = True
if 'name' in p.keys():
name = p['name']
if p['name'] not in self.waiting_on.keys():
d = self._download_name(name=name, timeout=timeout, download_directory=download_directory,
stream_info=stream_info, file_name=file_name, wait_for_write=wait_for_write)
d.addCallback(lambda l: {'stream_hash': sd_hash,
'path': os.path.join(self.download_directory, l.file_name)}
if stream_info else
{'stream_hash': l.sd_hash,
'path': os.path.join(self.download_directory, l.file_name)})
d.addCallback(lambda message: self._render_response(message, OK_CODE))
else:
d = server.failure
else:
d = server.failure
params = self._process_get_parameters(p)
if not params.name:
return server.failure
if params.name in self.waiting_on:
return server.failure
d = self._download_name(name=params.name,
timeout=params.timeout,
download_directory=params.download_directory,
stream_info=params.stream_info,
file_name=params.file_name,
wait_for_write=params.wait_for_write)
d.addCallback(get_output_callback(params))
d.addCallback(lambda message: self._render_response(message, OK_CODE))
return d
def jsonrpc_stop_lbry_file(self, p):
@ -2407,7 +2314,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = threads.deferToThread(subprocess.Popen, ['open', '-R', path])
else:
# No easy way to reveal specific files on Linux, so just open the containing directory
d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.dirname(path)])
d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.path.dirname(path)])
d.addCallback(lambda _: self._render_response(True, OK_CODE))
return d
@ -2447,3 +2354,165 @@ def get_version_from_tag(tag):
return match.group(1)
else:
raise Exception('Failed to parse version from tag {}'.format(tag))
def get_sd_hash(stream_info):
if not stream_info:
return None
try:
return stream_info['sources']['lbry_sd_hash']
except KeyError:
return stream_info.get('stream_hash')
def get_output_callback(params):
def callback(l):
return {
'stream_hash': params.sd_hash if params.stream_info else l.sd_hash,
'path': os.path.join(params.download_directory, l.file_name)
}
return callback
class _DownloadNameHelper(object):
def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None,
file_name=None, wait_for_write=True):
self.daemon = daemon
self.name = name
self.timeout = timeout
if not download_directory or not os.path.isdir(download_directory):
self.download_directory = daemon.download_directory
else:
self.download_directory = download_directory
self.file_name = file_name
self.wait_for_write = wait_for_write
def _setup_stream(self, stream_info):
stream_hash = get_sd_hash(stream_info)
d = self.daemon._get_lbry_file_by_sd_hash(stream_hash)
d.addCallback(self._add_results_callback(stream_info))
return d
def _add_results_callback(self, stream_info):
def add_results(l):
if l:
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
return defer.succeed((stream_info, l))
return defer.succeed((stream_info, None))
return add_results
def wait_or_get_stream(self, args):
stream_info, lbry_file = args
if lbry_file:
return self._wait_on_lbry_file(lbry_file)
else:
return self._get_stream(stream_info)
def _get_stream(self, stream_info):
d = self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
if self.wait_for_write:
d.addCallback(lambda _: self._wait_for_write())
d.addCallback(lambda _: self.daemon.streams[self.name].downloader)
return d
def _wait_for_write(self):
d = defer.succeed(None)
if not self.has_downloader_wrote():
d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write))
return d
def has_downloader_wrote(self):
downloader = self.daemon.streams[self.name].downloader
if not downloader:
return False
return self.get_written_bytes(downloader.file_name)
def _wait_on_lbry_file(self, f):
written_bytes = self.get_written_bytes(f.file_name)
if written_bytes:
return defer.succeed(self._disp_file(f))
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f))
return d
def get_written_bytes(self, file_name):
"""Returns the number of bytes written to `file_name`.
Returns False if there were issues reading `file_name`.
"""
try:
file_path = os.path.join(self.download_directory, file_name)
if os.path.isfile(file_path):
written_file = file(file_path)
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
except Exception:
writen_bytes = False
return written_bytes
def _disp_file(self, f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("Already downloaded: %s --> %s", f.sd_hash, file_path)
return f
def _remove_from_wait(self, r):
del self.daemon.waiting_on[self.name]
return r
class _ResolveNameHelper(object):
def __init__(self, daemon, name, force_refresh):
self.daemon = daemon
self.name = name
self.force_refresh = force_refresh
def get_deferred(self):
if self.need_fresh_stream():
log.info("Resolving stream info for lbry://%s", self.name)
d = self.wallet.get_stream_info_for_name(self.name)
d.addCallbacks(self._cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("Returning cached stream info for lbry://%s", self.name)
d = defer.succeed(self.name_data['claim_metadata'])
return d
@property
def name_data(self):
return self.daemon.name_cache[self.name]
@property
def wallet(self):
return self.daemon.session.wallet
def now(self):
return self.daemon._get_long_count_timestamp()
def _add_txid(self, txid):
self.name_data['txid'] = txid
return defer.succeed(None)
def _cache_stream_info(self, stream_info):
self.daemon.name_cache[self.name] = {
'claim_metadata': stream_info,
'timestamp': self.now()
}
d = self.wallet.get_txid_for_name(self.name)
d.addCallback(self._add_txid)
d.addCallback(lambda _: self.daemon._update_claim_cache())
d.addCallback(lambda _: self.name_data['claim_metadata'])
return d
def need_fresh_stream(self):
return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired()
def is_in_cache(self):
return self.name in self.daemon.name_cache
def is_cached_name_expired(self):
time_in_cache = self.now() - self.name_data['timestamp']
return time_in_cache >= self.daemon.cache_time

View file

@ -150,21 +150,19 @@ class GetStream(object):
return self.finished
def _start_download(self, downloader):
def _pay_key_fee():
if self.fee is not None:
fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount
reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, fee_lbc)
return defer.succeed(None)
d = _pay_key_fee()
log.info('Starting download for %s', self.name)
self.downloader = downloader
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d = self._pay_key_fee()
d.addCallback(lambda _: log.info("Downloading %s --> %s", self.stream_hash, self.downloader.file_name))
d.addCallback(lambda _: self.downloader.start())
def _pay_key_fee(self):
if self.fee is not None:
fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount
reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, fee_lbc)
return defer.succeed(None)