forked from LBRYCommunity/lbry-sdk
Refactor _download_name
Nested functions are the devil, especially ones that use variables from the outer scope. Refactoring _download_name to use a helper class helps make the scoping more explicit and will undoubtably prevent bugs in the future. I think this makes _download_name drastically more readable. Also cleaned up some duplicated code and made download_directory respect the passed in parameter instead of being the default.
This commit is contained in:
parent
a90029ec50
commit
a15d7ca543
1 changed files with 109 additions and 80 deletions
|
@ -999,97 +999,37 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
return defer.succeed(True)
|
||||
|
||||
def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None,
|
||||
file_name=None, stream_info=None, wait_for_write=True):
|
||||
file_name=None, stream_info=None, wait_for_write=True):
|
||||
"""
|
||||
Add a lbry file to the file manager, start the download, and return the new lbry file.
|
||||
If it already exists in the file manager, return the existing lbry file
|
||||
"""
|
||||
|
||||
if not download_directory:
|
||||
download_directory = self.download_directory
|
||||
elif not os.path.isdir(download_directory):
|
||||
download_directory = self.download_directory
|
||||
|
||||
def _remove_from_wait(r):
|
||||
del self.waiting_on[name]
|
||||
return r
|
||||
|
||||
def _setup_stream(stream_info):
|
||||
if 'sources' in stream_info.keys():
|
||||
stream_hash = stream_info['sources']['lbry_sd_hash']
|
||||
else:
|
||||
stream_hash = stream_info['stream_hash']
|
||||
|
||||
d = self._get_lbry_file_by_sd_hash(stream_hash)
|
||||
def _add_results(l):
|
||||
if l:
|
||||
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
|
||||
return defer.succeed((stream_info, l))
|
||||
return defer.succeed((stream_info, None))
|
||||
d.addCallback(_add_results)
|
||||
return d
|
||||
|
||||
def _wait_on_lbry_file(f):
|
||||
if os.path.isfile(os.path.join(self.download_directory, f.file_name)):
|
||||
written_file = file(os.path.join(self.download_directory, f.file_name))
|
||||
written_file.seek(0, os.SEEK_END)
|
||||
written_bytes = written_file.tell()
|
||||
written_file.close()
|
||||
else:
|
||||
written_bytes = False
|
||||
|
||||
if not written_bytes:
|
||||
d = defer.succeed(None)
|
||||
d.addCallback(lambda _: reactor.callLater(1, _wait_on_lbry_file, f))
|
||||
return d
|
||||
else:
|
||||
return defer.succeed(_disp_file(f))
|
||||
|
||||
def _disp_file(f):
|
||||
file_path = os.path.join(self.download_directory, f.file_name)
|
||||
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path)
|
||||
return f
|
||||
|
||||
def _get_stream(stream_info):
|
||||
def _wait_for_write():
|
||||
try:
|
||||
if os.path.isfile(os.path.join(self.download_directory, self.streams[name].downloader.file_name)):
|
||||
written_file = file(os.path.join(self.download_directory, self.streams[name].downloader.file_name))
|
||||
written_file.seek(0, os.SEEK_END)
|
||||
written_bytes = written_file.tell()
|
||||
written_file.close()
|
||||
else:
|
||||
written_bytes = False
|
||||
except:
|
||||
written_bytes = False
|
||||
|
||||
if not written_bytes:
|
||||
d = defer.succeed(None)
|
||||
d.addCallback(lambda _: reactor.callLater(1, _wait_for_write))
|
||||
return d
|
||||
else:
|
||||
return defer.succeed(None)
|
||||
|
||||
self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet,
|
||||
self.lbry_file_manager, max_key_fee=self.max_key_fee,
|
||||
data_rate=self.data_rate, timeout=timeout,
|
||||
download_directory=download_directory, file_name=file_name)
|
||||
d = self.streams[name].start(stream_info, name)
|
||||
if wait_for_write:
|
||||
d.addCallback(lambda _: _wait_for_write())
|
||||
d.addCallback(lambda _: self.streams[name].downloader)
|
||||
|
||||
return d
|
||||
helper = _DownloadNameHelper(
|
||||
self, name, timeout, download_directory, file_name, wait_for_write)
|
||||
|
||||
if not stream_info:
|
||||
self.waiting_on[name] = True
|
||||
d = self._resolve_name(name)
|
||||
else:
|
||||
d = defer.succeed(stream_info)
|
||||
d.addCallback(_setup_stream)
|
||||
d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _wait_on_lbry_file(lbry_file))
|
||||
d.addCallback(helper._setup_stream)
|
||||
d.addCallback(helper.wait_or_get_stream)
|
||||
if not stream_info:
|
||||
d.addCallback(_remove_from_wait)
|
||||
d.addCallback(helper._remove_from_wait)
|
||||
return d
|
||||
|
||||
def add_stream(self, name, timeout, download_directory, file_name, stream_info):
|
||||
"""Makes, adds and starts a stream"""
|
||||
self.streams[name] = GetStream(self.sd_identifier,
|
||||
self.session,
|
||||
self.session.wallet,
|
||||
self.lbry_file_manager,
|
||||
max_key_fee=self.max_key_fee,
|
||||
data_rate=self.data_rate,
|
||||
timeout=timeout,
|
||||
download_directory=download_directory,
|
||||
file_name=file_name)
|
||||
d = self.streams[name].start(stream_info, name)
|
||||
return d
|
||||
|
||||
def _get_long_count_timestamp(self):
|
||||
|
@ -2273,3 +2213,92 @@ def get_output_callback(params):
|
|||
'path': os.path.join(params.download_directory, l.file_name)
|
||||
}
|
||||
return callback
|
||||
|
||||
|
||||
class _DownloadNameHelper(object):
|
||||
def __init__(self, daemon, name, timeout=DEFAULT_TIMEOUT, download_directory=None,
|
||||
file_name=None, wait_for_write=True):
|
||||
self.daemon = daemon
|
||||
self.name = name
|
||||
self.timeout = timeout
|
||||
if not download_directory or not os.path.isdir(download_directory):
|
||||
self.download_directory = daemon.download_directory
|
||||
else:
|
||||
self.download_directory = download_directory
|
||||
self.file_name = file_name
|
||||
self.wait_for_write = wait_for_write
|
||||
|
||||
def _setup_stream(self, stream_info):
|
||||
stream_hash = get_sd_hash(stream_info)
|
||||
d = self.daemon._get_lbry_file_by_sd_hash(stream_hash)
|
||||
d.addCallback(self._add_results_callback(stream_info))
|
||||
return d
|
||||
|
||||
def _add_results_callback(self, stream_info):
|
||||
def add_results(l):
|
||||
if l:
|
||||
if os.path.isfile(os.path.join(self.download_directory, l.file_name)):
|
||||
return defer.succeed((stream_info, l))
|
||||
return defer.succeed((stream_info, None))
|
||||
return add_results
|
||||
|
||||
def wait_or_get_stream(self, args):
|
||||
stream_info, lbry_file = args
|
||||
if lbry_file:
|
||||
return self._get_stream(stream_info)
|
||||
else:
|
||||
return self._wait_on_lbry_file(lbry_file)
|
||||
|
||||
def _get_stream(self, stream_info):
|
||||
d = self.daemon.add_stream(
|
||||
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
|
||||
if self.wait_for_write:
|
||||
d.addCallback(lambda _: self._wait_for_write())
|
||||
d.addCallback(lambda _: self.daemon.streams[self.name].downloader)
|
||||
return d
|
||||
|
||||
def _wait_for_write(self):
|
||||
file_name = self.daemon.streams[self.name].downloader.file_name
|
||||
written_bytes = self.get_written_bytes(file_name)
|
||||
d = defer.succeed(None)
|
||||
if not written_bytes:
|
||||
d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write))
|
||||
return d
|
||||
|
||||
def _wait_on_lbry_file(self, f):
|
||||
written_bytes = self.get_written_bytes(f.file_name)
|
||||
if not written_bytes:
|
||||
d = defer.succeed(None)
|
||||
d.addCallback(lambda _: reactor.callLater(1, self._wait_on_lbry_file, f))
|
||||
return d
|
||||
else:
|
||||
return defer.succeed(self._disp_file(f))
|
||||
|
||||
def get_written_bytes(self, file_name):
|
||||
try:
|
||||
file_path = os.path.join(self.download_directory, file_name)
|
||||
if os.path.isfile(file_path):
|
||||
written_file = file(file_path)
|
||||
written_file.seek(0, os.SEEK_END)
|
||||
written_bytes = written_file.tell()
|
||||
written_file.close()
|
||||
else:
|
||||
written_bytes = False
|
||||
except Exception:
|
||||
writen_bytes = False
|
||||
return written_bytes
|
||||
|
||||
def _disp_file(self, f):
|
||||
file_path = os.path.join(self.download_directory, f.file_name)
|
||||
log.info("[%s] Already downloaded: %s --> %s", datetime.now(), f.sd_hash, file_path)
|
||||
return f
|
||||
|
||||
def _remove_from_wait(self, r):
|
||||
del self.daemon.waiting_on[self.name]
|
||||
return r
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue