diff --git a/CHANGELOG.md b/CHANGELOG.md index 524978a45..edbaa951c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,15 +9,17 @@ at anytime. ## [Unreleased] ### Added - * + * Add file filters: `claim_id`, `outpoint`, and `rowid` * * ### Changed - * - * - * - + * Change file filter `uri` to `name` and return field `lbry_uri` to `name` + * Refactor file_list, add `full_status` argument to populate resource intensive fields + * Remove deprecated file commands: `get_lbry_files`, `get_lbry_file`, and `file_get` + * Remove deprecated `delete_lbry_file` command + * Return standard file json from `get` + ### Fixed * Added string comparison to ClaimOutpoint (needed to look things up by outpoint) * @@ -27,6 +29,10 @@ at anytime. ### Fixed * Fixed ExchangeRateManager freezing the app * Fixed download not timing out properly when downloading sd blob + * Fixed ExchangeRateManager freezing the app + * Fixed download not timing out properly when downloading sd blob + * Fixed get not reassembling an already downloaded file that was deleted from download directory + * ## [0.9.0rc11] - 2017-02-27 ### Fixed diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 53c5866a0..14a707fc3 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -464,7 +464,7 @@ class DownloadRequest(RequestHelper): blob_details.cancel_func, blob ) - log.info("Requesting blob %s from %s", blob.blob_hash, self.peer) + log.debug("Requesting blob %s from %s", blob.blob_hash, self.peer) return request def _handle_download_request(self, client_blob_request): diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index 504093240..29aea9d1a 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -146,7 +146,7 @@ class FullStreamProgressManager(StreamProgressManager): current_blob_num = self.last_blob_outputted + 1 if current_blob_num in blobs and blobs[current_blob_num].is_validated(): - log.info("Outputting blob %s", str(self.last_blob_outputted + 1)) + log.debug("Outputting blob %s", str(self.last_blob_outputted + 1)) self.provided_blob_nums.append(self.last_blob_outputted + 1) d = self.download_manager.handle_blob(self.last_blob_outputted + 1) d.addCallback(lambda _: finished_outputting_blob()) diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index ec2110807..5f9a9712d 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -19,7 +19,7 @@ from lbrynet.lbryfile.StreamDescriptor import save_sd_info log = logging.getLogger(__name__) -def log_status(uri, sd_hash, status): +def log_status(name, sd_hash, status): if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: status_string = "running" elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: @@ -28,7 +28,7 @@ def log_status(uri, sd_hash, status): status_string = "finished" else: status_string = "unknown" - log.info("lbry://%s (%s) is %s", uri, short_hash(sd_hash), status_string) + log.info("lbry://%s (%s) is %s", name, short_hash(sd_hash), status_string) class ManagedEncryptedFileDownloader(EncryptedFileSaver): @@ -49,7 +49,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.sd_hash = None self.txid = None self.nout = None - self.uri = None + self.name = None self.claim_id = None self.rowid = rowid self.lbry_file_manager = lbry_file_manager @@ -64,7 +64,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): yield self.load_file_attributes() status = yield self.lbry_file_manager.get_lbry_file_status(self) - log_status(self.uri, self.sd_hash, status) + log_status(self.name, self.sd_hash, status) if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: # start returns self.finished_deferred @@ -115,12 +115,12 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) if stream_metadata: name, txid, nout = stream_metadata - self.uri = name + self.name = name self.txid = txid self.nout = nout else: raise NoSuchSDHash(self.sd_hash) - self.claim_id = yield self.wallet.get_claimid(self.uri, self.txid, self.nout) + self.claim_id = yield self.wallet.get_claimid(self.name, self.txid, self.nout) defer.returnValue(None) @defer.inlineCallbacks @@ -128,7 +128,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): yield EncryptedFileSaver._start(self) yield self.load_file_attributes() status = yield self._save_status() - log_status(self.uri, self.sd_hash, status) + log_status(self.name, self.sd_hash, status) defer.returnValue(status) def _get_finished_deferred_callback_value(self): diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index d40fb8af3..a6a68a11c 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -190,13 +190,10 @@ class EncryptedFileManager(object): file_name) defer.returnValue(lbry_file) - def delete_lbry_file(self, lbry_file): - for l in self.lbry_files: - if l == lbry_file: - lbry_file = l - break - else: - return defer.fail(Failure(ValueError("Could not find that LBRY file"))) + @defer.inlineCallbacks + def delete_lbry_file(self, lbry_file, delete_file=False): + if lbry_file not in self.lbry_files: + raise ValueError("Could not find that LBRY file") def wait_for_finished(count=2): if count <= 0 or lbry_file.saving_status is False: @@ -204,19 +201,36 @@ class EncryptedFileManager(object): else: return task.deferLater(reactor, 1, wait_for_finished, count=count - 1) - def ignore_stopped(err): - err.trap(AlreadyStoppedError, CurrentlyStoppingError) - return wait_for_finished() + full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name) - d = lbry_file.stop() - d.addErrback(ignore_stopped) + try: + yield lbry_file.stop() + except (AlreadyStoppedError, CurrentlyStoppingError): + yield wait_for_finished() - def remove_from_list(): - self.lbry_files.remove(lbry_file) + self.lbry_files.remove(lbry_file) - d.addCallback(lambda _: remove_from_list()) - d.addCallback(lambda _: self._delete_lbry_file_options(lbry_file.rowid)) - return d + yield self._delete_lbry_file_options(lbry_file.rowid) + + yield lbry_file.delete_data() + + # TODO: delete this + # get count for stream hash returns the count of the lbry files with the stream hash + # in the lbry_file_options table, which will soon be removed. + + stream_count = yield self.get_count_for_stream_hash(lbry_file.stream_hash) + if stream_count == 0: + yield self.stream_info_manager.delete_stream(lbry_file.stream_hash) + else: + msg = ("Can't delete stream info for %s, count is %i\n" + "The call that resulted in this warning will\n" + "be removed in the database refactor") + log.warning(msg, lbry_file.stream_hash, stream_count) + + if delete_file and os.path.isfile(full_path): + os.remove(full_path) + + defer.returnValue(True) def toggle_lbry_file_running(self, lbry_file): """Toggle whether a stream reader is currently running""" @@ -238,7 +252,8 @@ class EncryptedFileManager(object): def stop(self): safe_stop_looping_call(self.lbry_file_reflector) yield defer.DeferredList(list(self._stop_lbry_files())) - yield self.sql_db.close() + if self.sql_db: + yield self.sql_db.close() self.sql_db = None log.info("Stopped %s", self) defer.returnValue(True) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 9a4e63ac8..77a50e29c 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -42,7 +42,7 @@ from lbrynet.core import log_support, utils, file_utils from lbrynet.core import system_info from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.Session import Session -from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage +from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage, ClaimOutpoint from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory @@ -92,6 +92,19 @@ PENDING_ID = "not set" SHORT_ID_LEN = 20 +class IterableContainer(object): + def __iter__(self): + for attr in dir(self): + if not attr.startswith("_"): + yield getattr(self, attr) + + def __contains__(self, item): + for attr in self: + if item == attr: + return True + return False + + class Checker: """The looping calls the daemon runs""" INTERNET_CONNECTION = 'internet_connection_checker' @@ -100,12 +113,18 @@ class Checker: PENDING_CLAIM = 'pending_claim_checker' -class FileID: +class _FileID(IterableContainer): """The different ways a file can be identified""" NAME = 'name' SD_HASH = 'sd_hash' FILE_NAME = 'file_name' STREAM_HASH = 'stream_hash' + CLAIM_ID = "claim_id" + OUTPOINT = "outpoint" + ROWID = "rowid" + + +FileID = _FileID() # TODO add login credentials in a conf file @@ -420,7 +439,7 @@ class Daemon(AuthJSONRPCServer): for name in self.pending_claims: log.info("Checking if new claim for lbry://%s is confirmed" % name) d = self._resolve_name(name, force_refresh=True) - d.addCallback(lambda _: self._get_lbry_file_by_uri(name)) + d.addCallback(lambda _: self._get_lbry_file(FileID.NAME, name)) d.addCallbacks( lambda lbry_file: _process_lbry_file(name, lbry_file), lambda _: re_add_to_pending_claims(name) @@ -888,25 +907,6 @@ class Daemon(AuthJSONRPCServer): helper = _ResolveNameHelper(self, name, force_refresh) return helper.get_deferred() - @defer.inlineCallbacks - def _delete_lbry_file(self, lbry_file, delete_file=True): - stream_hash = lbry_file.stream_hash - filename = os.path.join(self.download_directory, lbry_file.file_name) - - yield self.lbry_file_manager.delete_lbry_file(lbry_file) - yield lbry_file.delete_data() - stream_count = yield self.lbry_file_manager.get_count_for_stream_hash(stream_hash) - if stream_count == 0: - yield self.stream_info_manager.delete_stream(stream_hash) - else: - log.warning("Can't delete stream info for %s, count is %i", stream_hash, stream_count) - if delete_file: - if os.path.isfile(filename): - os.remove(filename) - log.info("Deleted file %s", filename) - log.info("Deleted stream %s", stream_hash) - defer.returnValue(True) - def _get_or_download_sd_blob(self, blob, sd_hash): if blob: return self.session.blob_manager.get_blob(blob[0]) @@ -1008,77 +1008,89 @@ class Daemon(AuthJSONRPCServer): return self.get_est_cost_using_known_size(name, size) return self.get_est_cost_from_name(name) - def _find_lbry_file_by_uri(self, uri): - for lbry_file in self.lbry_file_manager.lbry_files: - if uri == lbry_file.uri: - return lbry_file - raise UnknownNameError(uri) - - def _find_lbry_file_by_sd_hash(self, sd_hash): - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.sd_hash == sd_hash: - return lbry_file - raise NoSuchSDHash(sd_hash) - - def _find_lbry_file_by_file_name(self, file_name): - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.file_name == file_name: - return lbry_file - raise Exception("File %s not found" % file_name) - - def _find_lbry_file_by_stream_hash(self, stream_hash): - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.stream_hash == stream_hash: - return lbry_file - raise NoSuchStreamHash(stream_hash) - @defer.inlineCallbacks - def _get_lbry_file_by_uri(self, name): + def _get_lbry_file_dict(self, lbry_file, full_status=False): + key = binascii.b2a_hex(lbry_file.key) if lbry_file.key else None + full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name) + mime_type = mimetypes.guess_type(full_path)[0] + if os.path.isfile(full_path): + with open(full_path) as written_file: + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + else: + written_bytes = False + + if full_status: + size = yield lbry_file.get_total_bytes() + file_status = yield lbry_file.status() + message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, + file_status.num_known, file_status.running_status) + else: + size = None + message = None + claim = yield self.session.wallet.get_claim_info(lbry_file.name, + lbry_file.txid, + lbry_file.nout) try: - stream_info = yield self._resolve_name(name) - sd_hash = stream_info['sources']['lbry_sd_hash'] - lbry_file = yield self._get_lbry_file_by_sd_hash(sd_hash) - except (UnknownNameError, NoSuchSDHash): - lbry_file = yield self._find_lbry_file_by_uri(name) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file_by_sd_hash(self, sd_hash): - lbry_file = yield self._find_lbry_file_by_sd_hash(sd_hash) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file_by_file_name(self, file_name): - lbry_file = yield self._get_lbry_file_by_file_name(file_name) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file_by_stream_hash(self, stream_hash): - lbry_file = yield self._find_lbry_file_by_stream_hash(stream_hash) - defer.returnValue(lbry_file) - - @defer.inlineCallbacks - def _get_lbry_file(self, search_by, val, return_json=True): - helper = _GetFileHelper(self, search_by, val, return_json) + metadata = claim['value'] + except: + metadata = None try: - lbry_file = yield helper.retrieve_file() - defer.returnValue(lbry_file) - except Exception as err: - # TODO: do something with the error, don't return None when a file isn't found - defer.returnValue(False) + outpoint = repr(ClaimOutpoint(lbry_file.txid, lbry_file.nout)) + except TypeError: + outpoint = None - def _get_lbry_files(self): - def safe_get(sd_hash): - d = self._get_lbry_file(FileID.SD_HASH, sd_hash) - d.addErrback(log.fail(), 'Failed to get file for hash: %s', sd_hash) - return d + defer.returnValue({ + 'completed': lbry_file.completed, + 'file_name': lbry_file.file_name, + 'download_directory': lbry_file.download_directory, + 'points_paid': lbry_file.points_paid, + 'stopped': lbry_file.stopped, + 'stream_hash': lbry_file.stream_hash, + 'stream_name': lbry_file.stream_name, + 'suggested_file_name': lbry_file.suggested_file_name, + 'sd_hash': lbry_file.sd_hash, + 'name': lbry_file.name, + 'outpoint': outpoint, + 'claim_id': lbry_file.claim_id, + 'download_path': full_path, + 'mime_type': mime_type, + 'key': key, + 'total_bytes': size, + 'written_bytes': written_bytes, + 'message': message, + 'metadata': metadata + }) - d = defer.DeferredList([ - safe_get(l.sd_hash) - for l in self.lbry_file_manager.lbry_files - ]) - return d + @defer.inlineCallbacks + def _get_lbry_file(self, search_by, val, return_json=True, full_status=False): + lbry_file = None + if search_by in FileID: + for l_f in self.lbry_file_manager.lbry_files: + if l_f.__dict__.get(search_by) == val: + lbry_file = l_f + break + else: + raise NoValidSearch('{} is not a valid search operation'.format(search_by)) + if return_json and lbry_file: + lbry_file = yield self._get_lbry_file_dict(lbry_file, full_status=full_status) + defer.returnValue(lbry_file) + @defer.inlineCallbacks + def _get_lbry_files(self, as_json=False, full_status=False, **kwargs): + lbry_files = list(self.lbry_file_manager.lbry_files) + if kwargs: + for search_type, value in iter_lbry_file_search_values(kwargs): + lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value] + if as_json: + file_dicts = [] + for lbry_file in lbry_files: + lbry_file_dict = yield self._get_lbry_file_dict(lbry_file, full_status=full_status) + file_dicts.append(lbry_file_dict) + lbry_files = file_dicts + defer.returnValue(lbry_files) + + # TODO: do this and get_blobs_for_sd_hash in the stream info manager def get_blobs_for_stream_hash(self, stream_hash): def _iter_blobs(blob_hashes): for blob_hash, blob_num, blob_iv, blob_length in blob_hashes: @@ -1112,7 +1124,6 @@ class Daemon(AuthJSONRPCServer): Args: session_status: bool - blockchain_status: bool Returns: daemon status """ @@ -1152,7 +1163,6 @@ class Daemon(AuthJSONRPCServer): 'managed_blobs': len(blobs), 'managed_streams': len(self.lbry_file_manager.lbry_files), } - defer.returnValue(response) def jsonrpc_get_best_blockhash(self): @@ -1426,90 +1436,55 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda _: reactor.callLater(0.0, reactor.stop)) return self._render_response("Shutting down") - def jsonrpc_get_lbry_files(self): + @defer.inlineCallbacks + def jsonrpc_file_list(self, **kwargs): """ - DEPRECATED. Use `file_list` instead. - """ - return self.jsonrpc_file_list() - - def jsonrpc_file_list(self): - """ - List files + List files limited by optional filters Args: - None + 'name' (optional): filter files by lbry name, + 'sd_hash' (optional): filter files by sd hash, + 'file_name' (optional): filter files by the name in the downloads folder, + 'stream_hash' (optional): filter files by stream hash, + 'claim_id' (optional): filter files by claim id, + 'outpoint' (optional): filter files by claim outpoint, + 'rowid' (optional): filter files by internal row id, + 'full_status': (optional): bool, if true populate the 'message' and 'size' fields + Returns: - List of files, with the following keys: - 'completed': bool - 'file_name': string - 'key': hex string - 'points_paid': float - 'stopped': bool - 'stream_hash': base 58 string - 'stream_name': string - 'suggested_file_name': string - 'sd_hash': string + [ + { + 'completed': bool, + 'file_name': str, + 'download_directory': str, + 'points_paid': float, + 'stopped': bool, + 'stream_hash': str (hex), + 'stream_name': str, + 'suggested_file_name': str, + 'sd_hash': str (hex), + 'name': str, + 'outpoint': str, (txid:nout) + 'claim_id': str (hex), + 'download_path': str, + 'mime_type': str, + 'key': str (hex), + 'total_bytes': int, None if full_status is False + 'written_bytes': int, + 'message': str, None if full_status is False + 'metadata': Metadata dict + } + ] """ - d = self._get_lbry_files() - d.addCallback(lambda r: self._render_response([d[1] for d in r if d[0]])) - - return d - - def jsonrpc_get_lbry_file(self, **kwargs): - """ - DEPRECATED. Use `file_get` instead. - """ - return self.jsonrpc_file_get(**kwargs) - - def jsonrpc_file_get(self, **kwargs): - """ - Get a file, if no matching file exists returns False - - Args: - 'name': get file by lbry uri, - 'sd_hash': get file by the hash in the name claim, - 'file_name': get file by its name in the downloads folder, - 'stream_hash': get file by its stream hash - Returns: - 'completed': bool, - 'file_name': str, - 'download_directory': str, - 'points_paid': float, - 'stopped': bool, - 'stream_hash': str (hex), - 'stream_name': str, - 'suggested_file_name': str, - 'sd_hash': str (hex), - 'lbry_uri': str, - 'txid': str (b58), - 'claim_id': str (b58), - 'download_path': str, - 'mime_type': str, - 'key': str (hex), - 'total_bytes': int, - 'written_bytes': int, - 'code': str, - 'message': str - 'metadata': Metadata dict if claim is valid, otherwise status str - } - """ - d = self._get_deferred_for_lbry_file(kwargs) - d.addCallback(lambda r: self._render_response(r)) - return d - - def _get_deferred_for_lbry_file(self, search_fields): - try: - searchtype, value = get_lbry_file_search_value(search_fields) - except NoValidSearch: - return defer.fail() - else: - return self._get_lbry_file(searchtype, value) + result = yield self._get_lbry_files(as_json=True, **kwargs) + response = yield self._render_response(result) + defer.returnValue(response) @defer.inlineCallbacks def jsonrpc_resolve_name(self, name, force=False): """ - Resolve stream info from a LBRY uri + Resolve stream info from a LBRY name Args: 'name': name to look up, string, do not include lbry:// prefix @@ -1538,7 +1513,7 @@ class Daemon(AuthJSONRPCServer): def jsonrpc_claim_show(self, name, txid=None, nout=None): """ - Resolve claim info from a LBRY uri + Resolve claim info from a LBRY name Args: 'name': name to look up, string, do not include lbry:// prefix @@ -1567,7 +1542,7 @@ class Daemon(AuthJSONRPCServer): self, name, file_name=None, stream_info=None, timeout=None, download_directory=None, wait_for_write=True): """ - Download stream from a LBRY uri. + Download stream from a LBRY name. Args: 'name': name to download, string @@ -1578,64 +1553,65 @@ class Daemon(AuthJSONRPCServer): 'wait_for_write': optional, defaults to True. When set, waits for the file to only start to be written before returning any results. Returns: - 'stream_hash': hex string - 'path': path of download + { + 'completed': bool, + 'file_name': str, + 'download_directory': str, + 'points_paid': float, + 'stopped': bool, + 'stream_hash': str (hex), + 'stream_name': str, + 'suggested_file_name': str, + 'sd_hash': str (hex), + 'name': str, + 'outpoint': str, (txid:nout) + 'claim_id': str (hex), + 'download_path': str, + 'mime_type': str, + 'key': str (hex), + 'total_bytes': int + 'written_bytes': int, + 'message': str + 'metadata': Metadata dict + } """ timeout = timeout if timeout is not None else self.download_timeout download_directory = download_directory or self.download_directory - sd_hash = get_sd_hash(stream_info) if name in self.waiting_on: - # TODO: return a useful error message here, like "already - # waiting for name to be resolved" - defer.returnValue(server.failure) + log.info("Already waiting on lbry://%s to start downloading", name) + yield self.streams[name].data_downloading_deferred - # first check if we already have this lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False) - if lbry_file: - log.info('Already have a file for %s', name) - message = { - 'stream_hash': sd_hash if stream_info else lbry_file.sd_hash, - 'path': os.path.join(lbry_file.download_directory, lbry_file.file_name) - } - response = yield self._render_response(message) - defer.returnValue(response) - download_id = utils.random_string() - self.analytics_manager.send_download_started(download_id, name, stream_info) - tries = 1 - max_tries = 3 - while tries <= max_tries: + if lbry_file: + if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)): + log.info("Already have lbry file but missing file in %s, rebuilding it", + lbry_file.download_directory) + yield lbry_file.start() + else: + log.info('Already have a file for %s', name) + result = yield self._get_lbry_file_dict(lbry_file, full_status=True) + else: + download_id = utils.random_string() + self.analytics_manager.send_download_started(download_id, name, stream_info) try: - log.info('Making try %s / %s to start download of %s', tries, max_tries, name) - new_sd_hash, file_path = yield self._download_name( - name=name, - timeout=timeout, - download_directory=download_directory, - stream_info=stream_info, - file_name=file_name, - wait_for_write=wait_for_write + yield self._download_name(name=name, timeout=timeout, + download_directory=download_directory, + stream_info=stream_info, file_name=file_name, + wait_for_write=wait_for_write) + stream = self.streams[name] + stream.finished_deferred.addCallback( + lambda _: self.analytics_manager.send_download_finished( + download_id, name, stream_info) ) - break + result = yield self._get_lbry_file_dict(self.streams[name].downloader, + full_status=True) except Exception as e: log.warning('Failed to get %s', name) - if tries == max_tries: - self.analytics_manager.send_download_errored(download_id, name, stream_info) - response = yield self._render_response(e.message) - defer.returnValue(response) - tries += 1 - # TODO: should stream_hash key be changed to sd_hash? - message = { - 'stream_hash': sd_hash if stream_info else new_sd_hash, - 'path': file_path - } - stream = self.streams.get(name) - if stream: - stream.finished_deferred.addCallback( - lambda _: self.analytics_manager.send_download_finished( - download_id, name, stream_info) - ) - response = yield self._render_response(message) + self.analytics_manager.send_download_errored(download_id, name, stream_info) + result = e.message + response = yield self._render_response(result) defer.returnValue(response) @AuthJSONRPCServer.auth_required @@ -1660,7 +1636,7 @@ class Daemon(AuthJSONRPCServer): Args: 'status': "start" or "stop" - 'name': start file by lbry uri, + 'name': start file by lbry name, 'sd_hash': start file by the hash in the name claim, 'file_name': start file by its name in the downloads folder, Returns: @@ -1686,41 +1662,45 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(response) @AuthJSONRPCServer.auth_required - def jsonrpc_delete_lbry_file(self, **kwargs): - """ - DEPRECATED. Use `file_delete` instead - """ - return self.jsonrpc_file_delete(**kwargs) - - @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks def jsonrpc_file_delete(self, delete_target_file=True, **kwargs): """ Delete a lbry file Args: - 'file_name': downloaded file name, string + 'name' (optional): delete files by lbry name, + 'sd_hash' (optional): delete files by sd hash, + 'file_name' (optional): delete files by the name in the downloads folder, + 'stream_hash' (optional): delete files by stream hash, + 'claim_id' (optional): delete files by claim id, + 'outpoint' (optional): delete files by claim outpoint, + 'rowid': (optional): delete file by rowid in the file manager + 'delete_target_file' (optional): delete file from downloads folder, defaults to True + if False only the blobs and db entries will be deleted Returns: - confirmation message + True if deletion was successful, otherwise False """ - def _delete_file(f): - if not f: - return False - file_name = f.file_name - d = self._delete_lbry_file(f, delete_file=delete_target_file) - d.addCallback(lambda _: "Deleted file: " + file_name) - return d - - try: - searchtype, value = get_lbry_file_search_value(kwargs) - except NoValidSearch: - d = defer.fail() + searchtype, value = get_lbry_file_search_value(kwargs) + lbry_files = yield self._get_lbry_files(searchtype, value, return_json=False) + if len(lbry_files) > 1: + log.warning("There are %i files to delete, use narrower filters to select one", + len(lbry_files)) + result = False + elif not lbry_files: + log.warning("There is no file to delete for '%s'", value) + result = False else: - d = self._get_lbry_file(searchtype, value, return_json=False) - d.addCallback(_delete_file) - - d.addCallback(lambda r: self._render_response(r)) - return d + lbry_file = lbry_files[0] + file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash + if lbry_file.claim_id in self.streams: + del self.streams[lbry_file.claim_id] + yield self.lbry_file_manager.delete_lbry_file(lbry_file, + delete_file=delete_target_file) + log.info("Deleted %s (%s)", file_name, utils.short_hash(stream_hash)) + result = True + response = yield self._render_response(result) + defer.returnValue(response) def jsonrpc_get_est_cost(self, **kwargs): """ @@ -1734,7 +1714,7 @@ class Daemon(AuthJSONRPCServer): Get estimated cost for a lbry stream Args: - 'name': lbry uri + 'name': lbry name 'size': stream size, in bytes. if provided an sd blob won't be downloaded. Returns: estimated cost @@ -2368,7 +2348,7 @@ class Daemon(AuthJSONRPCServer): if uri: metadata = yield self._resolve_name(uri) - sd_hash = get_sd_hash(metadata) + sd_hash = utils.get_sd_hash(metadata) blobs = yield self.get_blobs_for_sd_hash(sd_hash) elif stream_hash: try: @@ -2437,7 +2417,7 @@ class Daemon(AuthJSONRPCServer): Get stream availability for a winning claim Arg: - name (str): lbry uri + name (str): lbry name sd_timeout (int, optional): sd blob download timeout peer_timeout (int, optional): how long to look for peers @@ -2462,7 +2442,7 @@ class Daemon(AuthJSONRPCServer): return decoded_sd_blob metadata = yield self._resolve_name(name) - sd_hash = get_sd_hash(metadata) + sd_hash = utils.get_sd_hash(metadata) sd_timeout = sd_timeout or conf.settings['sd_download_timeout'] peer_timeout = peer_timeout or conf.settings['peer_search_timeout'] blobs = [] @@ -2574,6 +2554,7 @@ class _DownloadNameHelper(object): try: download_path = yield self.daemon.add_stream( self.name, self.timeout, self.download_directory, self.file_name, stream_info) + self.remove_from_wait(None) except (InsufficientFundsError, Exception) as err: if Failure(err).check(InsufficientFundsError): log.warning("Insufficient funds to download lbry://%s", self.name) @@ -2582,7 +2563,8 @@ class _DownloadNameHelper(object): log.warning("lbry://%s timed out, removing from streams", self.name) self.remove_from_wait("Timed out") if self.daemon.streams[self.name].downloader is not None: - yield self.daemon._delete_lbry_file(self.daemon.streams[self.name].downloader) + yield self.daemon.lbry_file_manager.delete_lbry_file( + self.daemon.streams[self.name].downloader) del self.daemon.streams[self.name] raise err @@ -2679,126 +2661,6 @@ class _ResolveNameHelper(object): return time_in_cache >= self.daemon.cache_time -class _GetFileHelper(object): - def __init__(self, daemon, search_by, val, return_json=True): - self.daemon = daemon - self.search_by = search_by - self.val = val - self.return_json = return_json - - def retrieve_file(self): - d = self.search_for_file() - if self.return_json: - d.addCallback(self._get_json) - return d - - def search_for_file(self): - if self.search_by == FileID.NAME: - return self.daemon._get_lbry_file_by_uri(self.val) - elif self.search_by == FileID.SD_HASH: - return self.daemon._get_lbry_file_by_sd_hash(self.val) - elif self.search_by == FileID.FILE_NAME: - return self.daemon._get_lbry_file_by_file_name(self.val) - elif self.search_by == FileID.STREAM_HASH: - return self.daemon._get_lbry_file_by_stream_hash(self.val) - raise Exception('{} is not a valid search operation'.format(self.search_by)) - - def _get_json(self, lbry_file): - if lbry_file: - d = lbry_file.get_total_bytes() - d.addCallback(self._generate_reply, lbry_file) - d.addCallback(self._add_metadata, lbry_file) - return d - else: - return False - - def _generate_reply(self, size, lbry_file): - written_bytes = self._get_written_bytes(lbry_file) - code, message = self._get_status(lbry_file) - - if code == DOWNLOAD_RUNNING_CODE: - d = lbry_file.status() - d.addCallback(self._get_msg_for_file_status) - d.addCallback( - lambda msg: self._get_properties_dict(lbry_file, code, msg, written_bytes, size)) - else: - d = defer.succeed( - self._get_properties_dict(lbry_file, code, message, written_bytes, size)) - return d - - def _get_msg_for_file_status(self, file_status): - message = STREAM_STAGES[2][1] % ( - file_status.name, file_status.num_completed, file_status.num_known, - file_status.running_status) - return defer.succeed(message) - - def _get_key(self, lbry_file): - return binascii.b2a_hex(lbry_file.key) if lbry_file.key else None - - def _full_path(self, lbry_file): - return os.path.join(lbry_file.download_directory, lbry_file.file_name) - - def _get_status(self, lbry_file): - if self.search_by == FileID.NAME: - if self.val in self.daemon.streams.keys(): - status = self.daemon.streams[self.val].code - elif lbry_file in self.daemon.lbry_file_manager.lbry_files: - status = STREAM_STAGES[2] - else: - status = [False, False] - else: - status = [False, False] - return status - - def _get_written_bytes(self, lbry_file): - full_path = self._full_path(lbry_file) - if os.path.isfile(full_path): - with open(full_path) as written_file: - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - else: - written_bytes = False - return written_bytes - - def _get_properties_dict(self, lbry_file, code, message, written_bytes, size): - key = self._get_key(lbry_file) - full_path = self._full_path(lbry_file) - mime_type = mimetypes.guess_type(full_path)[0] - return { - 'completed': lbry_file.completed, - 'file_name': lbry_file.file_name, - 'download_directory': lbry_file.download_directory, - 'points_paid': lbry_file.points_paid, - 'stopped': lbry_file.stopped, - 'stream_hash': lbry_file.stream_hash, - 'stream_name': lbry_file.stream_name, - 'suggested_file_name': lbry_file.suggested_file_name, - 'sd_hash': lbry_file.sd_hash, - 'lbry_uri': lbry_file.uri, - 'txid': lbry_file.txid, - 'claim_id': lbry_file.claim_id, - 'download_path': full_path, - 'mime_type': mime_type, - 'key': key, - 'total_bytes': size, - 'written_bytes': written_bytes, - 'code': code, - 'message': message - } - - def _add_metadata(self, message, lbry_file): - def _add_to_dict(metadata): - message['metadata'] = metadata - return defer.succeed(message) - - if lbry_file.txid: - d = self.daemon._resolve_name(lbry_file.uri) - d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation")) - else: - d = defer.succeed(message) - return d - - def loggly_time_string(dt): formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S") milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3)) @@ -2834,13 +2696,20 @@ def report_bug_to_slack(message, installation_id, platform_name, app_version): def get_lbry_file_search_value(search_fields): - for searchtype in (FileID.SD_HASH, FileID.NAME, FileID.FILE_NAME, FileID.STREAM_HASH): - value = search_fields.get(searchtype) - if value: + for searchtype in FileID: + value = search_fields.get(searchtype, None) + if value is not None: return searchtype, value raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) +def iter_lbry_file_search_values(search_fields): + for searchtype in FileID: + value = search_fields.get(searchtype, None) + if value is not None: + yield searchtype, value + + def get_blob_payment_rate_manager(session, payment_rate_manager=None): if payment_rate_manager: rate_managers = { diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index eb39e6bc4..26882d186 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -39,8 +39,8 @@ class EncryptedFileReflectorClient(Protocol): lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) @property - def lbry_uri(self): - return "lbry://%s" % self.factory.lbry_uri + def name(self): + return "lbry://%s" % self.factory.name @property def blob_manager(self): @@ -74,20 +74,20 @@ class EncryptedFileReflectorClient(Protocol): if reason.check(error.ConnectionDone): if not self.needed_blobs: log.info("Reflector has all blobs for %s (%s)", - self.lbry_uri, self.stream_descriptor) + self.name, self.stream_descriptor) elif not self.reflected_blobs: log.info("No more completed blobs for %s (%s) to reflect, %i are still needed", - self.lbry_uri, self.stream_descriptor, len(self.needed_blobs)) + self.name, self.stream_descriptor, len(self.needed_blobs)) else: log.info('Finished sending reflector %i blobs for %s (%s)', - len(self.reflected_blobs), self.lbry_uri, self.stream_descriptor) + len(self.reflected_blobs), self.name, self.stream_descriptor) self.factory.finished_deferred.callback(self.reflected_blobs) elif reason.check(error.ConnectionLost): - log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.lbry_uri, + log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.name, self.stream_descriptor, len(self.reflected_blobs)) self.factory.finished_deferred.callback(self.reflected_blobs) else: - log.info('Reflector finished for %s (%s): %s', self.lbry_uri, self.stream_descriptor, + log.info('Reflector finished for %s (%s): %s', self.name, self.stream_descriptor, reason) self.factory.finished_deferred.callback(reason) @@ -131,7 +131,7 @@ class EncryptedFileReflectorClient(Protocol): log.info("Reflector needs %s%i blobs for %s", needs_desc, len(filtered), - self.lbry_uri) + self.name) return filtered d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash) @@ -223,7 +223,7 @@ class EncryptedFileReflectorClient(Protocol): log.info("Sent reflector descriptor %s", self.next_blob_to_send) else: log.warning("Reflector failed to receive descriptor %s for %s", - self.next_blob_to_send, self.lbry_uri) + self.next_blob_to_send, self.name) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() @@ -236,7 +236,7 @@ class EncryptedFileReflectorClient(Protocol): return defer.succeed(True) else: log.warning("Reflector already has %s for %s", self.next_blob_to_send, - self.lbry_uri) + self.name) return self.set_not_uploading() else: # Expecting Server Blob Response if 'received_blob' not in response_dict: @@ -245,10 +245,10 @@ class EncryptedFileReflectorClient(Protocol): if response_dict['received_blob']: self.reflected_blobs.append(self.next_blob_to_send.blob_hash) log.debug("Sent reflector blob %s for %s", self.next_blob_to_send, - self.lbry_uri) + self.name) else: log.warning("Reflector failed to receive blob %s for %s", - self.next_blob_to_send, self.lbry_uri) + self.next_blob_to_send, self.name) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() @@ -284,12 +284,12 @@ class EncryptedFileReflectorClient(Protocol): err.trap(ValueError) if blob_hash not in self.failed_blob_hashes: log.warning("Failed to reflect blob %s for %s, reason: %s", - str(blob_hash)[:16], self.lbry_uri, err.getTraceback()) + str(blob_hash)[:16], self.name, err.getTraceback()) self.blob_hashes_to_send.append(blob_hash) self.failed_blob_hashes.append(blob_hash) else: log.warning("Failed second try reflecting blob %s for %s, giving up, reason: %s", - str(blob_hash)[:16], self.lbry_uri, err.getTraceback()) + str(blob_hash)[:16], self.name, err.getTraceback()) def send_next_request(self): if self.file_sender is not None: @@ -335,8 +335,8 @@ class EncryptedFileReflectorClientFactory(ClientFactory): return self._lbry_file.stream_hash @property - def lbry_uri(self): - return self._lbry_file.uri + def name(self): + return self._lbry_file.name @property def protocol_version(self): diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index a9ef36d08..5421f10a8 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -61,7 +61,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker log_format = "%(funcName)s(): %(message)s" -logging.basicConfig(level=logging.WARNING, format=log_format) +logging.basicConfig(level=logging.CRITICAL, format=log_format) def require_system(system): diff --git a/tests/mocks.py b/tests/mocks.py index 9dea430d7..cf8e5cf7a 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -16,7 +16,7 @@ class FakeLBRYFile(object): self.blob_manager = blob_manager self.stream_info_manager = stream_info_manager self.stream_hash = stream_hash - self.uri = "fake_uri" + self.name = "fake_uri" class Node(object):