revert download refactor

to be re-merged after fixes
This commit is contained in:
Jack 2016-07-20 20:34:02 -04:00
parent c2d5f092b3
commit f999073fb4

View file

@ -143,11 +143,6 @@ OK_CODE = 200
REMOTE_SERVER = "www.google.com" REMOTE_SERVER = "www.google.com"
class Parameters(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class LBRYDaemon(jsonrpc.JSONRPC): class LBRYDaemon(jsonrpc.JSONRPC):
""" """
LBRYnet daemon, a jsonrpc interface to lbry functions LBRYnet daemon, a jsonrpc interface to lbry functions
@ -1028,32 +1023,92 @@ class LBRYDaemon(jsonrpc.JSONRPC):
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
""" """
helper = _DownloadNameHelper(
self, name, timeout, download_directory, file_name, wait_for_write) if not download_directory:
download_directory = self.download_directory
elif not os.path.isdir(download_directory):
download_directory = self.download_directory
def _remove_from_wait(r):
del self.waiting_on[name]
return r
def _setup_stream(stream_info):
if 'sources' in stream_info.keys():
stream_hash = stream_info['sources']['lbry_sd_hash']
else:
stream_hash = stream_info['stream_hash']
d = self._get_lbry_file_by_sd_hash(stream_hash)
def _add_results(l):
if l:
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
return defer.succeed((stream_info, l))
return defer.succeed((stream_info, None))
d.addCallback(_add_results)
return d
def _wait_on_lbry_file(f):
if os.path.isfile(os.path.join(self.download_directory, f.file_name)):
written_file = file(os.path.join(self.download_directory, f.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f))
return d
else:
return defer.succeed(_disp_file(f))
def _disp_file(f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path)
return f
def _get_stream(stream_info):
def _wait_for_write():
try:
if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)):
written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name))
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
except:
written_bytes = False
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, _wait_for_write))
return d
else:
return defer.succeed(None)
self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet,
self.lbry_file_manager, max_key_fee=self.max_key_fee,
data_rate=self.data_rate, timeout=timeout,
download_directory=download_directory, file_name=file_name)
d = self.streams[name].start(stream_info, name)
if wait_for_write:
d.addCallback(lambda _: _wait_for_write())
d.addCallback(lambda _: self.streams[name].downloader)
return d
if not stream_info: if not stream_info:
self.waiting_on[name] = True self.waiting_on[name] = True
d = self._resolve_name(name) d = self._resolve_name(name)
else: else:
d = defer.succeed(stream_info) d = defer.succeed(stream_info)
d.addCallback(helper._setup_stream) d.addCallback(_setup_stream)
d.addCallback(helper.wait_or_get_stream) d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file))
if not stream_info: if not stream_info:
d.addCallback(helper._remove_from_wait) d.addCallback(_remove_from_wait)
return d
def add_stream(self, name, timeout, download_directory, file_name, stream_info):
"""Makes, adds and starts a stream"""
self.streams[name] = GetStream(self.sd_identifier,
self.session,
self.session.wallet,
self.lbry_file_manager,
max_key_fee=self.max_key_fee,
data_rate=self.data_rate,
timeout=timeout,
download_directory=download_directory,
file_name=file_name)
d = self.streams[name].start(stream_info, name)
return d return d
def _get_long_count_timestamp(self): def _get_long_count_timestamp(self):
@ -1066,16 +1121,38 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return defer.succeed(True) return defer.succeed(True)
def _resolve_name(self, name, force_refresh=False): def _resolve_name(self, name, force_refresh=False):
"""Resolves a name. Checks the cache first before going out to the blockchain. def _cache_stream_info(stream_info):
def _add_txid(txid):
self.name_cache[name]['txid'] = txid
return defer.succeed(None)
Args: self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()}
name: the lbry://<name> to resolve d = self.session.wallet.get_txid_for_name(name)
force_refresh: if True, always go out to the blockchain to resolve. d.addCallback(_add_txid)
""" d.addCallback(lambda _: self._update_claim_cache())
if name.startswith('lbry://'): d.addCallback(lambda _: self.name_cache[name]['claim_metadata'])
raise ValueError('name %s should not start with lbry://')
helper = _ResolveNameHelper(self, name, force_refresh) return d
return helper.get_deferred()
if not force_refresh:
if name in self.name_cache.keys():
if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time:
log.info("[" + str(datetime.now()) + "] Returning cached stream info for lbry://" + name)
d = defer.succeed(self.name_cache[name]['claim_metadata'])
else:
log.info("[" + str(datetime.now()) + "] Refreshing stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError))
return d
def _delete_lbry_file(self, lbry_file, delete_file=True): def _delete_lbry_file(self, lbry_file, delete_file=True):
d = self.lbry_file_manager.delete_lbry_file(lbry_file) d = self.lbry_file_manager.delete_lbry_file(lbry_file)
@ -1598,54 +1675,65 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure) d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure)
return d return d
def _process_get_parameters(self, p):
"""Extract info from input parameters and fill in default values for `get` call."""
# TODO: this process can be abstracted s.t. each method
# can spec what parameters it expects and how to set default values
timeout = p.get('timeout', self.download_timeout)
download_directory = p.get('download_directory', self.download_directory)
file_name = p.get('file_name')
stream_info = p.get('stream_info')
sd_hash = get_sd_hash(stream_info)
wait_for_write = p.get('wait_for_write', True)
name = p.get('name')
return Parameters(
timout=timeout,
download_directory=download_directory,
file_name=file_name,
stream_info=stream_info,
sd_hash=sd_hash,
wait_for_write=wait_for_write,
name=name
)
def jsonrpc_get(self, p): def jsonrpc_get(self, p):
"""Download stream from a LBRY uri. """
Download stream from a LBRY uri
Args: Args:
'name': name to download, string 'name': name to download, string
'download_directory': optional, path to directory where file will be saved, string 'download_directory': optional, path to directory where file will be saved, string
'file_name': optional, a user specified name for the downloaded file 'file_name': optional, a user specified name for the downloaded file
'stream_info': optional, specified stream info overrides name 'stream_info': optional, specified stream info overrides name
'timout': optional
'wait_for_write': optional, defaults to True
Returns: Returns:
'stream_hash': hex string 'stream_hash': hex string
'path': path of download 'path': path of download
""" """
params = self._process_get_parameters(p)
if not params.name: if 'timeout' not in p.keys():
return server.failure timeout = self.download_timeout
if params.name in self.waiting_on: else:
return server.failure timeout = p['timeout']
d = self._download_name(name=params.name,
timeout=params.timeout, if 'download_directory' not in p.keys():
download_directory=params.download_directory, download_directory = self.download_directory
stream_info=params.stream_info, else:
file_name=params.file_name, download_directory = p['download_directory']
wait_for_write=params.wait_for_write)
d.addCallback(get_output_callback(params)) if 'file_name' in p.keys():
file_name = p['file_name']
else:
file_name = None
if 'stream_info' in p.keys():
stream_info = p['stream_info']
if 'sources' in stream_info.keys():
sd_hash = stream_info['sources']['lbry_sd_hash']
else:
sd_hash = stream_info['stream_hash']
else:
stream_info = None
if 'wait_for_write' in p.keys():
wait_for_write = p['wait_for_write']
else:
wait_for_write = True
if 'name' in p.keys():
name = p['name']
if p['name'] not in self.waiting_on.keys():
d = self._download_name(name=name, timeout=timeout, download_directory=download_directory,
stream_info=stream_info, file_name=file_name, wait_for_write=wait_for_write)
d.addCallback(lambda l: {'stream_hash': sd_hash,
'path': os.path.join(self.download_directory, l.file_name)}
if stream_info else
{'stream_hash': l.sd_hash,
'path': os.path.join(self.download_directory, l.file_name)})
d.addCallback(lambda message: self._render_response(message, OK_CODE)) d.addCallback(lambda message: self._render_response(message, OK_CODE))
else:
d = server.failure
else:
d = server.failure
return d return d
def jsonrpc_stop_lbry_file(self, p): def jsonrpc_stop_lbry_file(self, p):
@ -2192,161 +2280,8 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = threads.deferToThread(subprocess.Popen, ['open', '-R', path]) d = threads.deferToThread(subprocess.Popen, ['open', '-R', path])
else: else:
# No easy way to reveal specific files on Linux, so just open the containing directory # No easy way to reveal specific files on Linux, so just open the containing directory
d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.path.dirname(path)]) d = threads.deferToThread(subprocess.Popen, ['xdg-open', os.dirname(path)])
d.addCallback(lambda _: self._render_response(True, OK_CODE)) d.addCallback(lambda _: self._render_response(True, OK_CODE))
return d return d
def get_sd_hash(stream_info):
if not stream_info:
return None
try:
return stream_info['sources']['lbry_sd_hash']
except KeyError:
return stream_info.get('stream_hash')
def get_output_callback(params):
def callback(l):
return {
'stream_hash': params.sd_hash if params.stream_info else l.sd_hash,
'path': os.path.join(params.download_directory, l.file_name)
}
return callback
class _DownloadNameHelper(object):
def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None,
file_name=None, wait_for_write=True):
self.daemon = daemon
self.name = name
self.timeout = timeout
if not download_directory or not os.path.isdir(download_directory):
self.download_directory = daemon.download_directory
else:
self.download_directory = download_directory
self.file_name = file_name
self.wait_for_write = wait_for_write
def _setup_stream(self, stream_info):
stream_hash = get_sd_hash(stream_info)
d = self.daemon._get_lbry_file_by_sd_hash(stream_hash)
d.addCallback(self._add_results_callback(stream_info))
return d
def _add_results_callback(self, stream_info):
def add_results(l):
if l:
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
return defer.succeed((stream_info, l))
return defer.succeed((stream_info, None))
return add_results
def wait_or_get_stream(self, args):
stream_info, lbry_file = args
if lbry_file:
return self._get_stream(stream_info)
else:
return self._wait_on_lbry_file(lbry_file)
def _get_stream(self, stream_info):
d = self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
if self.wait_for_write:
d.addCallback(lambda _: self._wait_for_write())
d.addCallback(lambda _: self.daemon.streams[self.name].downloader)
return d
def _wait_for_write(self):
file_name = self.daemon.streams[self.name].downloader.file_name
written_bytes = self.get_written_bytes(file_name)
d = defer.succeed(None)
if not written_bytes:
d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write))
return d
def _wait_on_lbry_file(self, f):
written_bytes = self.get_written_bytes(f.file_name)
if not written_bytes:
d = defer.succeed(None)
d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f))
return d
else:
return defer.succeed(self._disp_file(f))
def get_written_bytes(self, file_name):
try:
file_path = os.path.join(self.download_directory, file_name)
if os.path.isfile(file_path):
written_file = file(file_path)
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
written_file.close()
else:
written_bytes = False
except Exception:
writen_bytes = False
return written_bytes
def _disp_file(self, f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("Already downloaded: %s --> %s", f.sd_hash, file_path)
return f
def _remove_from_wait(self, r):
del self.daemon.waiting_on[self.name]
return r
class _ResolveNameHelper(object):
def __init__(self, daemon, name, force_refresh):
self.daemon = daemon
self.name = name
self.force_refresh = force_refresh
def get_deferred(self):
if self.need_fresh_stream():
log.info("Resolving stream info for lbry://%s", self.name)
d = self.wallet.get_stream_info_for_name(self.name)
d.addCallbacks(self._cache_stream_info, lambda _: defer.fail(UnknownNameError))
else:
log.info("Returning cached stream info for lbry://%s", self.name)
d = defer.succeed(self.name_data['claim_metadata'])
return d
@property
def name_data(self):
return self.daemon.name_cache[self.name]
@property
def wallet(self):
return self.daemon.session.wallet
def now(self):
return self.daemon._get_long_count_timestamp()
def _add_txid(self, txid):
self.name_data['txid'] = txid
return defer.succeed(None)
def _cache_stream_info(self, stream_info):
self.daemon.name_cache[self.name] = {
'claim_metadata': stream_info,
'timestamp': self.now()
}
d = self.wallet.get_txid_for_name(self.name)
d.addCallback(self._add_txid)
d.addCallback(lambda _: self.daemon._update_claim_cache())
d.addCallback(lambda _: self.name_data['claim_metadata'])
return d
def need_fresh_stream(self):
return self.force_refresh or not self.is_in_cache() or self.is_cached_name_expired()
def is_in_cache(self):
return self.name in self.daemon.name_cache
def is_cached_name_expired(self):
time_in_cache = self.now() - self.name_data['timestamp']
return time_in_cache >= self.daemon.cache_time