diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 649070269..4975c86da 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -16,6 +16,11 @@ from twisted.internet import defer, threads, error, reactor, task from twisted.internet.task import LoopingCall from twisted.python.failure import Failure +from lbryschema.decode import smart_decode +from lbryschema.claim import ClaimDict +from lbryschema.uri import parse_lbry_uri +from lbryschema.error import DecodeError + # TODO: importing this when internet is disabled raises a socket.gaierror from lbryum.version import LBRYUM_VERSION from lbrynet import __version__ as LBRYNET_VERSION @@ -695,20 +700,41 @@ class Daemon(AuthJSONRPCServer): return finished_d @defer.inlineCallbacks - def _download_name(self, name, stream_info, timeout=None, download_directory=None, - file_name=None, wait_for_write=True): + def _download_name(self, name, stream_info, claim_id, timeout=None, download_directory=None, + file_name=None): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ - timeout = timeout if timeout is not None else conf.settings['download_timeout'] + if claim_id in self.streams: + downloader = self.streams[claim_id] + result = yield downloader.finished_deferred + defer.returnValue(result) + else: + download_id = utils.random_string() + self.analytics_manager.send_download_started(download_id, name, stream_info) - helper = _DownloadNameHelper(self, name, timeout, download_directory, file_name, - wait_for_write) - lbry_file = yield helper.setup_stream(stream_info) - sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) - defer.returnValue((sd_hash, file_path)) + self.streams[claim_id] = GetStream(self.sd_identifier, self.session, + self.session.wallet, self.lbry_file_manager, + self.exchange_rate_manager, self.max_key_fee, + conf.settings['data_rate'], timeout, + download_directory, file_name) + try: + download = self.streams[claim_id].start(stream_info, name) + self.streams[claim_id].finished_deferred.addCallback( + lambda _: self.analytics_manager.send_download_finished(download_id, + name, + stream_info)) + lbry_file = yield download + result = yield self._get_lbry_file_dict(lbry_file, full_status=True) + del self.streams[claim_id] + except Exception as err: + log.warning('Failed to get %s: %s', name, err) + self.analytics_manager.send_download_errored(download_id, name, stream_info) + del self.streams[claim_id] + result = {'error': err.message} + defer.returnValue(result) @defer.inlineCallbacks def _publish_stream(self, name, bid, claim_dict, file_path=None): @@ -1386,19 +1412,17 @@ class Daemon(AuthJSONRPCServer): @AuthJSONRPCServer.auth_required @defer.inlineCallbacks - def jsonrpc_get(self, name, claim_id=None, file_name=None, timeout=None, - download_directory=None, wait_for_write=True): + @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks + def jsonrpc_get(self, uri, file_name=None, timeout=None, download_directory=None): """ Download stream from a LBRY name. Args: - 'name': (str) name to download - 'claim_id' (optional): (str) claim id for claim to download + 'uri': (str) lbry uri to download 'file_name'(optional): (str) a user specified name for the downloaded file 'timeout'(optional): (int) download timeout in number of seconds 'download_directory'(optional): (str) path to directory where file will be saved - 'wait_for_write'(optional): (bool) defaults to True. When set, waits for the file to - only start to be written before returning any results. Returns: (dict) Dictionary contaning information about the stream @@ -1426,34 +1450,26 @@ class Daemon(AuthJSONRPCServer): """ - def _get_claim(_claim_id, _claims): - #TODO: do this in Wallet class - for claim in _claims['claims']: - if claim['claim_id'] == _claim_id: - return smart_decode(claim['value']).claim_dict - - log.info("Received request to get %s", name) - timeout = timeout if timeout is not None else self.download_timeout download_directory = download_directory or self.download_directory - if name in self.streams: + + resolved = yield self.session.wallet.resolve_uri(uri) + + if 'value' not in resolved: + if 'claim' not in resolved: + raise Exception("Nothing to download") + else: + resolved = resolved['claim'] + + name = resolved['name'] + claim_id = resolved['claim_id'] + stream_info = resolved['value'] + + if claim_id in self.streams: log.info("Already waiting on lbry://%s to start downloading", name) - yield self.streams[name].data_downloading_deferred + yield self.streams[claim_id].data_downloading_deferred - stream_info = None - lbry_file = None - - if claim_id: - lbry_file = yield self._get_lbry_file(FileID.CLAIM_ID, claim_id, return_json=False) - claims = yield self.session.wallet.get_claims_for_name(name) - formatted_claims = format_json_out_amount_as_float(claims) - stream_info = _get_claim(claim_id, formatted_claims) - if not stream_info: - log.error("No claim %s for lbry://%s, using winning claim", claim_id, name) - - if not stream_info: - lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False) - stream_info = yield self._resolve_name(name) + lbry_file = yield self._get_lbry_file(FileID.CLAIM_ID, claim_id, return_json=False) if lbry_file: if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)): @@ -1464,26 +1480,9 @@ class Daemon(AuthJSONRPCServer): log.info('Already have a file for %s', name) result = yield self._get_lbry_file_dict(lbry_file, full_status=True) else: - download_id = utils.random_string() - - self.analytics_manager.send_download_started(download_id, name, stream_info) - try: - yield self._download_name(name=name, stream_info=stream_info, timeout=timeout, - download_directory=download_directory, - file_name=file_name, wait_for_write=wait_for_write) - stream = self.streams[name] - stream.finished_deferred.addCallback( - lambda _: self.analytics_manager.send_download_finished( - download_id, name, stream_info) - ) - result = yield self._get_lbry_file_dict(self.streams[name].downloader, - full_status=True) - except Exception as e: - # TODO: should reraise here, instead of returning e.message - log.warning('Failed to get %s', name) - self.analytics_manager.send_download_errored(download_id, name, stream_info) - result = e.message - + result = yield self._download_name(name, stream_info, claim_id, timeout=timeout, + download_directory=download_directory, + file_name=file_name) response = yield self._render_response(result) defer.returnValue(response) @@ -2401,114 +2400,6 @@ class Daemon(AuthJSONRPCServer): return d -class _DownloadNameHelper(object): - def __init__(self, daemon, name, timeout=None, download_directory=None, file_name=None, - wait_for_write=True): - self.daemon = daemon - self.name = name - self.timeout = timeout if timeout is not None else conf.settings['download_timeout'] - if not download_directory or not os.path.isdir(download_directory): - self.download_directory = daemon.download_directory - else: - self.download_directory = download_directory - self.file_name = file_name - self.wait_for_write = wait_for_write - - @defer.inlineCallbacks - def setup_stream(self, stream_info): - sd_hash = utils.get_sd_hash(stream_info) - lbry_file = yield self.daemon._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) - if self._does_lbry_file_exists(lbry_file): - defer.returnValue(lbry_file) - else: - defer.returnValue(None) - - def _does_lbry_file_exists(self, lbry_file): - return lbry_file and os.path.isfile(self._full_path(lbry_file)) - - def _full_path(self, lbry_file): - return os.path.join(self.download_directory, lbry_file.file_name) - - @defer.inlineCallbacks - def wait_or_get_stream(self, stream_info, lbry_file): - if lbry_file: - log.debug('Wait on lbry_file') - # returns the lbry_file - yield self._wait_on_lbry_file(lbry_file) - defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file))) - else: - log.debug('No lbry_file, need to get stream') - # returns an instance of ManagedEncryptedFileDownloaderFactory - sd_hash, file_path = yield self._get_stream(stream_info) - defer.returnValue((sd_hash, file_path)) - - def _wait_on_lbry_file(self, f): - file_path = self._full_path(f) - written_bytes = self._get_written_bytes(file_path) - if written_bytes: - log.info("File has bytes: %s --> %s", f.sd_hash, file_path) - return defer.succeed(True) - return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) - - @defer.inlineCallbacks - def _get_stream(self, stream_info): - try: - download_path = yield self.daemon.add_stream( - self.name, self.timeout, self.download_directory, self.file_name, stream_info) - self.remove_from_wait(None) - except (InsufficientFundsError, Exception) as err: - if Failure(err).check(InsufficientFundsError): - log.warning("Insufficient funds to download lbry://%s", self.name) - self.remove_from_wait("Insufficient funds") - else: - log.warning("lbry://%s timed out, removing from streams", self.name) - self.remove_from_wait("Timed out") - if self.daemon.streams[self.name].downloader is not None: - yield self.daemon.lbry_file_manager.delete_lbry_file( - self.daemon.streams[self.name].downloader) - del self.daemon.streams[self.name] - raise err - - if self.wait_for_write: - yield self._wait_for_write() - defer.returnValue((self.daemon.streams[self.name].sd_hash, download_path)) - - def _wait_for_write(self): - d = defer.succeed(None) - if not self._has_downloader_wrote(): - d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) - return d - - def _has_downloader_wrote(self): - stream = self.daemon.streams.get(self.name, False) - if stream: - file_path = self._full_path(stream.downloader) - return self._get_written_bytes(file_path) - else: - return False - - def _get_written_bytes(self, file_path): - """Returns the number of bytes written to `file_path`. - - Returns False if there were issues reading `file_path`. - """ - try: - if os.path.isfile(file_path): - with open(file_path) as written_file: - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - else: - written_bytes = False - except Exception: - writen_bytes = False - return written_bytes - - def remove_from_wait(self, reason): - if self.name in self.daemon.waiting_on: - del self.daemon.waiting_on[self.name] - return reason - - class _ResolveNameHelper(object): def __init__(self, daemon, name, force_refresh): self.daemon = daemon diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index de7852125..826ee58ef 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -1,6 +1,6 @@ import logging import os -from twisted.internet import defer +from twisted.internet import defer, threads from twisted.internet.task import LoopingCall from lbrynet.core import utils @@ -63,6 +63,8 @@ class GetStream(object): # fired after the metadata and the first data blob have been downloaded self.data_downloading_deferred = defer.Deferred(None) + self._running = False + @property def download_path(self): return os.path.join(self.download_directory, self.downloader.file_name) @@ -88,7 +90,7 @@ class GetStream(object): elif self.downloader: d = self.downloader.status() d.addCallback(self._check_status) - else: + elif self._running: log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) def convert_max_fee(self): @@ -158,10 +160,21 @@ class GetStream(object): @defer.inlineCallbacks def download(self, stream_info, name): + if self._running: + raise Exception("Already running") + self._running = True + self.set_status(INITIALIZING_CODE, name) self.sd_hash = utils.get_sd_hash(stream_info) + if 'fee' in stream_info['stream']['metadata']: - fee = self.check_fee(stream_info['stream']['metadata']['fee']) + try: + fee = yield threads.deferToThread(self.check_fee, + stream_info['stream']['metadata']['fee']) + except Exception as err: + self._running = False + self.finished_deferred.errback(err) + raise err else: fee = None @@ -184,7 +197,7 @@ class GetStream(object): safe_start(self.checker) self.download(stream_info, name) yield self.data_downloading_deferred - defer.returnValue(self.download_path) + defer.returnValue(self.downloader) except Exception as err: safe_stop(self.checker) raise err