Merge pull request #512 from lbryio/file-list-refactor

File list refactor
This commit is contained in:
Jack Robison 2017-03-07 20:38:04 -05:00 committed by GitHub
commit e2cc212eb8
9 changed files with 306 additions and 416 deletions

View file

@ -9,14 +9,16 @@ at anytime.
## [Unreleased] ## [Unreleased]
### Added ### Added
* * Add file filters: `claim_id`, `outpoint`, and `rowid`
* *
* *
### Changed ### Changed
* * Change file filter `uri` to `name` and return field `lbry_uri` to `name`
* * Refactor file_list, add `full_status` argument to populate resource intensive fields
* * Remove deprecated file commands: `get_lbry_files`, `get_lbry_file`, and `file_get`
* Remove deprecated `delete_lbry_file` command
* Return standard file json from `get`
### Fixed ### Fixed
* Added string comparison to ClaimOutpoint (needed to look things up by outpoint) * Added string comparison to ClaimOutpoint (needed to look things up by outpoint)
@ -27,6 +29,10 @@ at anytime.
### Fixed ### Fixed
* Fixed ExchangeRateManager freezing the app * Fixed ExchangeRateManager freezing the app
* Fixed download not timing out properly when downloading sd blob * Fixed download not timing out properly when downloading sd blob
* Fixed ExchangeRateManager freezing the app
* Fixed download not timing out properly when downloading sd blob
* Fixed get not reassembling an already downloaded file that was deleted from download directory
*
## [0.9.0rc11] - 2017-02-27 ## [0.9.0rc11] - 2017-02-27
### Fixed ### Fixed

View file

@ -464,7 +464,7 @@ class DownloadRequest(RequestHelper):
blob_details.cancel_func, blob_details.cancel_func,
blob blob
) )
log.info("Requesting blob %s from %s", blob.blob_hash, self.peer) log.debug("Requesting blob %s from %s", blob.blob_hash, self.peer)
return request return request
def _handle_download_request(self, client_blob_request): def _handle_download_request(self, client_blob_request):

View file

@ -146,7 +146,7 @@ class FullStreamProgressManager(StreamProgressManager):
current_blob_num = self.last_blob_outputted + 1 current_blob_num = self.last_blob_outputted + 1
if current_blob_num in blobs and blobs[current_blob_num].is_validated(): if current_blob_num in blobs and blobs[current_blob_num].is_validated():
log.info("Outputting blob %s", str(self.last_blob_outputted + 1)) log.debug("Outputting blob %s", str(self.last_blob_outputted + 1))
self.provided_blob_nums.append(self.last_blob_outputted + 1) self.provided_blob_nums.append(self.last_blob_outputted + 1)
d = self.download_manager.handle_blob(self.last_blob_outputted + 1) d = self.download_manager.handle_blob(self.last_blob_outputted + 1)
d.addCallback(lambda _: finished_outputting_blob()) d.addCallback(lambda _: finished_outputting_blob())

View file

@ -19,7 +19,7 @@ from lbrynet.lbryfile.StreamDescriptor import save_sd_info
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def log_status(uri, sd_hash, status): def log_status(name, sd_hash, status):
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
status_string = "running" status_string = "running"
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
@ -28,7 +28,7 @@ def log_status(uri, sd_hash, status):
status_string = "finished" status_string = "finished"
else: else:
status_string = "unknown" status_string = "unknown"
log.info("lbry://%s (%s) is %s", uri, short_hash(sd_hash), status_string) log.info("lbry://%s (%s) is %s", name, short_hash(sd_hash), status_string)
class ManagedEncryptedFileDownloader(EncryptedFileSaver): class ManagedEncryptedFileDownloader(EncryptedFileSaver):
@ -49,7 +49,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
self.sd_hash = None self.sd_hash = None
self.txid = None self.txid = None
self.nout = None self.nout = None
self.uri = None self.name = None
self.claim_id = None self.claim_id = None
self.rowid = rowid self.rowid = rowid
self.lbry_file_manager = lbry_file_manager self.lbry_file_manager = lbry_file_manager
@ -64,7 +64,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
yield self.load_file_attributes() yield self.load_file_attributes()
status = yield self.lbry_file_manager.get_lbry_file_status(self) status = yield self.lbry_file_manager.get_lbry_file_status(self)
log_status(self.uri, self.sd_hash, status) log_status(self.name, self.sd_hash, status)
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
# start returns self.finished_deferred # start returns self.finished_deferred
@ -115,12 +115,12 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) stream_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
if stream_metadata: if stream_metadata:
name, txid, nout = stream_metadata name, txid, nout = stream_metadata
self.uri = name self.name = name
self.txid = txid self.txid = txid
self.nout = nout self.nout = nout
else: else:
raise NoSuchSDHash(self.sd_hash) raise NoSuchSDHash(self.sd_hash)
self.claim_id = yield self.wallet.get_claimid(self.uri, self.txid, self.nout) self.claim_id = yield self.wallet.get_claimid(self.name, self.txid, self.nout)
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -128,7 +128,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
yield EncryptedFileSaver._start(self) yield EncryptedFileSaver._start(self)
yield self.load_file_attributes() yield self.load_file_attributes()
status = yield self._save_status() status = yield self._save_status()
log_status(self.uri, self.sd_hash, status) log_status(self.name, self.sd_hash, status)
defer.returnValue(status) defer.returnValue(status)
def _get_finished_deferred_callback_value(self): def _get_finished_deferred_callback_value(self):

View file

@ -190,13 +190,10 @@ class EncryptedFileManager(object):
file_name) file_name)
defer.returnValue(lbry_file) defer.returnValue(lbry_file)
def delete_lbry_file(self, lbry_file): @defer.inlineCallbacks
for l in self.lbry_files: def delete_lbry_file(self, lbry_file, delete_file=False):
if l == lbry_file: if lbry_file not in self.lbry_files:
lbry_file = l raise ValueError("Could not find that LBRY file")
break
else:
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def wait_for_finished(count=2): def wait_for_finished(count=2):
if count <= 0 or lbry_file.saving_status is False: if count <= 0 or lbry_file.saving_status is False:
@ -204,19 +201,36 @@ class EncryptedFileManager(object):
else: else:
return task.deferLater(reactor, 1, wait_for_finished, count=count - 1) return task.deferLater(reactor, 1, wait_for_finished, count=count - 1)
def ignore_stopped(err): full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name)
err.trap(AlreadyStoppedError, CurrentlyStoppingError)
return wait_for_finished()
d = lbry_file.stop() try:
d.addErrback(ignore_stopped) yield lbry_file.stop()
except (AlreadyStoppedError, CurrentlyStoppingError):
yield wait_for_finished()
def remove_from_list(): self.lbry_files.remove(lbry_file)
self.lbry_files.remove(lbry_file)
d.addCallback(lambda _: remove_from_list()) yield self._delete_lbry_file_options(lbry_file.rowid)
d.addCallback(lambda _: self._delete_lbry_file_options(lbry_file.rowid))
return d yield lbry_file.delete_data()
# TODO: delete this
# get count for stream hash returns the count of the lbry files with the stream hash
# in the lbry_file_options table, which will soon be removed.
stream_count = yield self.get_count_for_stream_hash(lbry_file.stream_hash)
if stream_count == 0:
yield self.stream_info_manager.delete_stream(lbry_file.stream_hash)
else:
msg = ("Can't delete stream info for %s, count is %i\n"
"The call that resulted in this warning will\n"
"be removed in the database refactor")
log.warning(msg, lbry_file.stream_hash, stream_count)
if delete_file and os.path.isfile(full_path):
os.remove(full_path)
defer.returnValue(True)
def toggle_lbry_file_running(self, lbry_file): def toggle_lbry_file_running(self, lbry_file):
"""Toggle whether a stream reader is currently running""" """Toggle whether a stream reader is currently running"""
@ -238,7 +252,8 @@ class EncryptedFileManager(object):
def stop(self): def stop(self):
safe_stop_looping_call(self.lbry_file_reflector) safe_stop_looping_call(self.lbry_file_reflector)
yield defer.DeferredList(list(self._stop_lbry_files())) yield defer.DeferredList(list(self._stop_lbry_files()))
yield self.sql_db.close() if self.sql_db:
yield self.sql_db.close()
self.sql_db = None self.sql_db = None
log.info("Stopped %s", self) log.info("Stopped %s", self)
defer.returnValue(True) defer.returnValue(True)

View file

@ -42,7 +42,7 @@ from lbrynet.core import log_support, utils, file_utils
from lbrynet.core import system_info from lbrynet.core import system_info
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage, ClaimOutpoint
from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
@ -92,6 +92,19 @@ PENDING_ID = "not set"
SHORT_ID_LEN = 20 SHORT_ID_LEN = 20
class IterableContainer(object):
def __iter__(self):
for attr in dir(self):
if not attr.startswith("_"):
yield getattr(self, attr)
def __contains__(self, item):
for attr in self:
if item == attr:
return True
return False
class Checker: class Checker:
"""The looping calls the daemon runs""" """The looping calls the daemon runs"""
INTERNET_CONNECTION = 'internet_connection_checker' INTERNET_CONNECTION = 'internet_connection_checker'
@ -100,12 +113,18 @@ class Checker:
PENDING_CLAIM = 'pending_claim_checker' PENDING_CLAIM = 'pending_claim_checker'
class FileID: class _FileID(IterableContainer):
"""The different ways a file can be identified""" """The different ways a file can be identified"""
NAME = 'name' NAME = 'name'
SD_HASH = 'sd_hash' SD_HASH = 'sd_hash'
FILE_NAME = 'file_name' FILE_NAME = 'file_name'
STREAM_HASH = 'stream_hash' STREAM_HASH = 'stream_hash'
CLAIM_ID = "claim_id"
OUTPOINT = "outpoint"
ROWID = "rowid"
FileID = _FileID()
# TODO add login credentials in a conf file # TODO add login credentials in a conf file
@ -420,7 +439,7 @@ class Daemon(AuthJSONRPCServer):
for name in self.pending_claims: for name in self.pending_claims:
log.info("Checking if new claim for lbry://%s is confirmed" % name) log.info("Checking if new claim for lbry://%s is confirmed" % name)
d = self._resolve_name(name, force_refresh=True) d = self._resolve_name(name, force_refresh=True)
d.addCallback(lambda _: self._get_lbry_file_by_uri(name)) d.addCallback(lambda _: self._get_lbry_file(FileID.NAME, name))
d.addCallbacks( d.addCallbacks(
lambda lbry_file: _process_lbry_file(name, lbry_file), lambda lbry_file: _process_lbry_file(name, lbry_file),
lambda _: re_add_to_pending_claims(name) lambda _: re_add_to_pending_claims(name)
@ -888,25 +907,6 @@ class Daemon(AuthJSONRPCServer):
helper = _ResolveNameHelper(self, name, force_refresh) helper = _ResolveNameHelper(self, name, force_refresh)
return helper.get_deferred() return helper.get_deferred()
@defer.inlineCallbacks
def _delete_lbry_file(self, lbry_file, delete_file=True):
stream_hash = lbry_file.stream_hash
filename = os.path.join(self.download_directory, lbry_file.file_name)
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
yield lbry_file.delete_data()
stream_count = yield self.lbry_file_manager.get_count_for_stream_hash(stream_hash)
if stream_count == 0:
yield self.stream_info_manager.delete_stream(stream_hash)
else:
log.warning("Can't delete stream info for %s, count is %i", stream_hash, stream_count)
if delete_file:
if os.path.isfile(filename):
os.remove(filename)
log.info("Deleted file %s", filename)
log.info("Deleted stream %s", stream_hash)
defer.returnValue(True)
def _get_or_download_sd_blob(self, blob, sd_hash): def _get_or_download_sd_blob(self, blob, sd_hash):
if blob: if blob:
return self.session.blob_manager.get_blob(blob[0]) return self.session.blob_manager.get_blob(blob[0])
@ -1008,77 +1008,89 @@ class Daemon(AuthJSONRPCServer):
return self.get_est_cost_using_known_size(name, size) return self.get_est_cost_using_known_size(name, size)
return self.get_est_cost_from_name(name) return self.get_est_cost_from_name(name)
def _find_lbry_file_by_uri(self, uri):
for lbry_file in self.lbry_file_manager.lbry_files:
if uri == lbry_file.uri:
return lbry_file
raise UnknownNameError(uri)
def _find_lbry_file_by_sd_hash(self, sd_hash):
for lbry_file in self.lbry_file_manager.lbry_files:
if lbry_file.sd_hash == sd_hash:
return lbry_file
raise NoSuchSDHash(sd_hash)
def _find_lbry_file_by_file_name(self, file_name):
for lbry_file in self.lbry_file_manager.lbry_files:
if lbry_file.file_name == file_name:
return lbry_file
raise Exception("File %s not found" % file_name)
def _find_lbry_file_by_stream_hash(self, stream_hash):
for lbry_file in self.lbry_file_manager.lbry_files:
if lbry_file.stream_hash == stream_hash:
return lbry_file
raise NoSuchStreamHash(stream_hash)
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_lbry_file_by_uri(self, name): def _get_lbry_file_dict(self, lbry_file, full_status=False):
key = binascii.b2a_hex(lbry_file.key) if lbry_file.key else None
full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name)
mime_type = mimetypes.guess_type(full_path)[0]
if os.path.isfile(full_path):
with open(full_path) as written_file:
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
else:
written_bytes = False
if full_status:
size = yield lbry_file.get_total_bytes()
file_status = yield lbry_file.status()
message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed,
file_status.num_known, file_status.running_status)
else:
size = None
message = None
claim = yield self.session.wallet.get_claim_info(lbry_file.name,
lbry_file.txid,
lbry_file.nout)
try: try:
stream_info = yield self._resolve_name(name) metadata = claim['value']
sd_hash = stream_info['sources']['lbry_sd_hash'] except:
lbry_file = yield self._get_lbry_file_by_sd_hash(sd_hash) metadata = None
except (UnknownNameError, NoSuchSDHash):
lbry_file = yield self._find_lbry_file_by_uri(name)
defer.returnValue(lbry_file)
@defer.inlineCallbacks
def _get_lbry_file_by_sd_hash(self, sd_hash):
lbry_file = yield self._find_lbry_file_by_sd_hash(sd_hash)
defer.returnValue(lbry_file)
@defer.inlineCallbacks
def _get_lbry_file_by_file_name(self, file_name):
lbry_file = yield self._get_lbry_file_by_file_name(file_name)
defer.returnValue(lbry_file)
@defer.inlineCallbacks
def _get_lbry_file_by_stream_hash(self, stream_hash):
lbry_file = yield self._find_lbry_file_by_stream_hash(stream_hash)
defer.returnValue(lbry_file)
@defer.inlineCallbacks
def _get_lbry_file(self, search_by, val, return_json=True):
helper = _GetFileHelper(self, search_by, val, return_json)
try: try:
lbry_file = yield helper.retrieve_file() outpoint = repr(ClaimOutpoint(lbry_file.txid, lbry_file.nout))
defer.returnValue(lbry_file) except TypeError:
except Exception as err: outpoint = None
# TODO: do something with the error, don't return None when a file isn't found
defer.returnValue(False)
def _get_lbry_files(self): defer.returnValue({
def safe_get(sd_hash): 'completed': lbry_file.completed,
d = self._get_lbry_file(FileID.SD_HASH, sd_hash) 'file_name': lbry_file.file_name,
d.addErrback(log.fail(), 'Failed to get file for hash: %s', sd_hash) 'download_directory': lbry_file.download_directory,
return d 'points_paid': lbry_file.points_paid,
'stopped': lbry_file.stopped,
'stream_hash': lbry_file.stream_hash,
'stream_name': lbry_file.stream_name,
'suggested_file_name': lbry_file.suggested_file_name,
'sd_hash': lbry_file.sd_hash,
'name': lbry_file.name,
'outpoint': outpoint,
'claim_id': lbry_file.claim_id,
'download_path': full_path,
'mime_type': mime_type,
'key': key,
'total_bytes': size,
'written_bytes': written_bytes,
'message': message,
'metadata': metadata
})
d = defer.DeferredList([ @defer.inlineCallbacks
safe_get(l.sd_hash) def _get_lbry_file(self, search_by, val, return_json=True, full_status=False):
for l in self.lbry_file_manager.lbry_files lbry_file = None
]) if search_by in FileID:
return d for l_f in self.lbry_file_manager.lbry_files:
if l_f.__dict__.get(search_by) == val:
lbry_file = l_f
break
else:
raise NoValidSearch('{} is not a valid search operation'.format(search_by))
if return_json and lbry_file:
lbry_file = yield self._get_lbry_file_dict(lbry_file, full_status=full_status)
defer.returnValue(lbry_file)
@defer.inlineCallbacks
def _get_lbry_files(self, as_json=False, full_status=False, **kwargs):
lbry_files = list(self.lbry_file_manager.lbry_files)
if kwargs:
for search_type, value in iter_lbry_file_search_values(kwargs):
lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value]
if as_json:
file_dicts = []
for lbry_file in lbry_files:
lbry_file_dict = yield self._get_lbry_file_dict(lbry_file, full_status=full_status)
file_dicts.append(lbry_file_dict)
lbry_files = file_dicts
defer.returnValue(lbry_files)
# TODO: do this and get_blobs_for_sd_hash in the stream info manager
def get_blobs_for_stream_hash(self, stream_hash): def get_blobs_for_stream_hash(self, stream_hash):
def _iter_blobs(blob_hashes): def _iter_blobs(blob_hashes):
for blob_hash, blob_num, blob_iv, blob_length in blob_hashes: for blob_hash, blob_num, blob_iv, blob_length in blob_hashes:
@ -1112,7 +1124,6 @@ class Daemon(AuthJSONRPCServer):
Args: Args:
session_status: bool session_status: bool
blockchain_status: bool
Returns: Returns:
daemon status daemon status
""" """
@ -1152,7 +1163,6 @@ class Daemon(AuthJSONRPCServer):
'managed_blobs': len(blobs), 'managed_blobs': len(blobs),
'managed_streams': len(self.lbry_file_manager.lbry_files), 'managed_streams': len(self.lbry_file_manager.lbry_files),
} }
defer.returnValue(response) defer.returnValue(response)
def jsonrpc_get_best_blockhash(self): def jsonrpc_get_best_blockhash(self):
@ -1426,90 +1436,55 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(lambda _: reactor.callLater(0.0, reactor.stop)) d.addCallback(lambda _: reactor.callLater(0.0, reactor.stop))
return self._render_response("Shutting down") return self._render_response("Shutting down")
def jsonrpc_get_lbry_files(self): @defer.inlineCallbacks
def jsonrpc_file_list(self, **kwargs):
""" """
DEPRECATED. Use `file_list` instead. List files limited by optional filters
"""
return self.jsonrpc_file_list()
def jsonrpc_file_list(self):
"""
List files
Args: Args:
None 'name' (optional): filter files by lbry name,
'sd_hash' (optional): filter files by sd hash,
'file_name' (optional): filter files by the name in the downloads folder,
'stream_hash' (optional): filter files by stream hash,
'claim_id' (optional): filter files by claim id,
'outpoint' (optional): filter files by claim outpoint,
'rowid' (optional): filter files by internal row id,
'full_status': (optional): bool, if true populate the 'message' and 'size' fields
Returns: Returns:
List of files, with the following keys: [
'completed': bool {
'file_name': string 'completed': bool,
'key': hex string 'file_name': str,
'points_paid': float 'download_directory': str,
'stopped': bool 'points_paid': float,
'stream_hash': base 58 string 'stopped': bool,
'stream_name': string 'stream_hash': str (hex),
'suggested_file_name': string 'stream_name': str,
'sd_hash': string 'suggested_file_name': str,
'sd_hash': str (hex),
'name': str,
'outpoint': str, (txid:nout)
'claim_id': str (hex),
'download_path': str,
'mime_type': str,
'key': str (hex),
'total_bytes': int, None if full_status is False
'written_bytes': int,
'message': str, None if full_status is False
'metadata': Metadata dict
}
]
""" """
d = self._get_lbry_files() result = yield self._get_lbry_files(as_json=True, **kwargs)
d.addCallback(lambda r: self._render_response([d[1] for d in r if d[0]])) response = yield self._render_response(result)
defer.returnValue(response)
return d
def jsonrpc_get_lbry_file(self, **kwargs):
"""
DEPRECATED. Use `file_get` instead.
"""
return self.jsonrpc_file_get(**kwargs)
def jsonrpc_file_get(self, **kwargs):
"""
Get a file, if no matching file exists returns False
Args:
'name': get file by lbry uri,
'sd_hash': get file by the hash in the name claim,
'file_name': get file by its name in the downloads folder,
'stream_hash': get file by its stream hash
Returns:
'completed': bool,
'file_name': str,
'download_directory': str,
'points_paid': float,
'stopped': bool,
'stream_hash': str (hex),
'stream_name': str,
'suggested_file_name': str,
'sd_hash': str (hex),
'lbry_uri': str,
'txid': str (b58),
'claim_id': str (b58),
'download_path': str,
'mime_type': str,
'key': str (hex),
'total_bytes': int,
'written_bytes': int,
'code': str,
'message': str
'metadata': Metadata dict if claim is valid, otherwise status str
}
"""
d = self._get_deferred_for_lbry_file(kwargs)
d.addCallback(lambda r: self._render_response(r))
return d
def _get_deferred_for_lbry_file(self, search_fields):
try:
searchtype, value = get_lbry_file_search_value(search_fields)
except NoValidSearch:
return defer.fail()
else:
return self._get_lbry_file(searchtype, value)
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_resolve_name(self, name, force=False): def jsonrpc_resolve_name(self, name, force=False):
""" """
Resolve stream info from a LBRY uri Resolve stream info from a LBRY name
Args: Args:
'name': name to look up, string, do not include lbry:// prefix 'name': name to look up, string, do not include lbry:// prefix
@ -1538,7 +1513,7 @@ class Daemon(AuthJSONRPCServer):
def jsonrpc_claim_show(self, name, txid=None, nout=None): def jsonrpc_claim_show(self, name, txid=None, nout=None):
""" """
Resolve claim info from a LBRY uri Resolve claim info from a LBRY name
Args: Args:
'name': name to look up, string, do not include lbry:// prefix 'name': name to look up, string, do not include lbry:// prefix
@ -1567,7 +1542,7 @@ class Daemon(AuthJSONRPCServer):
self, name, file_name=None, stream_info=None, timeout=None, self, name, file_name=None, stream_info=None, timeout=None,
download_directory=None, wait_for_write=True): download_directory=None, wait_for_write=True):
""" """
Download stream from a LBRY uri. Download stream from a LBRY name.
Args: Args:
'name': name to download, string 'name': name to download, string
@ -1578,64 +1553,65 @@ class Daemon(AuthJSONRPCServer):
'wait_for_write': optional, defaults to True. When set, waits for the file to 'wait_for_write': optional, defaults to True. When set, waits for the file to
only start to be written before returning any results. only start to be written before returning any results.
Returns: Returns:
'stream_hash': hex string {
'path': path of download 'completed': bool,
'file_name': str,
'download_directory': str,
'points_paid': float,
'stopped': bool,
'stream_hash': str (hex),
'stream_name': str,
'suggested_file_name': str,
'sd_hash': str (hex),
'name': str,
'outpoint': str, (txid:nout)
'claim_id': str (hex),
'download_path': str,
'mime_type': str,
'key': str (hex),
'total_bytes': int
'written_bytes': int,
'message': str
'metadata': Metadata dict
}
""" """
timeout = timeout if timeout is not None else self.download_timeout timeout = timeout if timeout is not None else self.download_timeout
download_directory = download_directory or self.download_directory download_directory = download_directory or self.download_directory
sd_hash = get_sd_hash(stream_info)
if name in self.waiting_on: if name in self.waiting_on:
# TODO: return a useful error message here, like "already log.info("Already waiting on lbry://%s to start downloading", name)
# waiting for name to be resolved" yield self.streams[name].data_downloading_deferred
defer.returnValue(server.failure)
# first check if we already have this
lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False) lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False)
if lbry_file:
log.info('Already have a file for %s', name)
message = {
'stream_hash': sd_hash if stream_info else lbry_file.sd_hash,
'path': os.path.join(lbry_file.download_directory, lbry_file.file_name)
}
response = yield self._render_response(message)
defer.returnValue(response)
download_id = utils.random_string() if lbry_file:
self.analytics_manager.send_download_started(download_id, name, stream_info) if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)):
tries = 1 log.info("Already have lbry file but missing file in %s, rebuilding it",
max_tries = 3 lbry_file.download_directory)
while tries <= max_tries: yield lbry_file.start()
else:
log.info('Already have a file for %s', name)
result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
else:
download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, stream_info)
try: try:
log.info('Making try %s / %s to start download of %s', tries, max_tries, name) yield self._download_name(name=name, timeout=timeout,
new_sd_hash, file_path = yield self._download_name( download_directory=download_directory,
name=name, stream_info=stream_info, file_name=file_name,
timeout=timeout, wait_for_write=wait_for_write)
download_directory=download_directory, stream = self.streams[name]
stream_info=stream_info, stream.finished_deferred.addCallback(
file_name=file_name, lambda _: self.analytics_manager.send_download_finished(
wait_for_write=wait_for_write download_id, name, stream_info)
) )
break result = yield self._get_lbry_file_dict(self.streams[name].downloader,
full_status=True)
except Exception as e: except Exception as e:
log.warning('Failed to get %s', name) log.warning('Failed to get %s', name)
if tries == max_tries: self.analytics_manager.send_download_errored(download_id, name, stream_info)
self.analytics_manager.send_download_errored(download_id, name, stream_info) result = e.message
response = yield self._render_response(e.message) response = yield self._render_response(result)
defer.returnValue(response)
tries += 1
# TODO: should stream_hash key be changed to sd_hash?
message = {
'stream_hash': sd_hash if stream_info else new_sd_hash,
'path': file_path
}
stream = self.streams.get(name)
if stream:
stream.finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(
download_id, name, stream_info)
)
response = yield self._render_response(message)
defer.returnValue(response) defer.returnValue(response)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@ -1660,7 +1636,7 @@ class Daemon(AuthJSONRPCServer):
Args: Args:
'status': "start" or "stop" 'status': "start" or "stop"
'name': start file by lbry uri, 'name': start file by lbry name,
'sd_hash': start file by the hash in the name claim, 'sd_hash': start file by the hash in the name claim,
'file_name': start file by its name in the downloads folder, 'file_name': start file by its name in the downloads folder,
Returns: Returns:
@ -1686,41 +1662,45 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(response) defer.returnValue(response)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_delete_lbry_file(self, **kwargs): @defer.inlineCallbacks
"""
DEPRECATED. Use `file_delete` instead
"""
return self.jsonrpc_file_delete(**kwargs)
@AuthJSONRPCServer.auth_required
def jsonrpc_file_delete(self, delete_target_file=True, **kwargs): def jsonrpc_file_delete(self, delete_target_file=True, **kwargs):
""" """
Delete a lbry file Delete a lbry file
Args: Args:
'file_name': downloaded file name, string 'name' (optional): delete files by lbry name,
'sd_hash' (optional): delete files by sd hash,
'file_name' (optional): delete files by the name in the downloads folder,
'stream_hash' (optional): delete files by stream hash,
'claim_id' (optional): delete files by claim id,
'outpoint' (optional): delete files by claim outpoint,
'rowid': (optional): delete file by rowid in the file manager
'delete_target_file' (optional): delete file from downloads folder, defaults to True
if False only the blobs and db entries will be deleted
Returns: Returns:
confirmation message True if deletion was successful, otherwise False
""" """
def _delete_file(f): searchtype, value = get_lbry_file_search_value(kwargs)
if not f: lbry_files = yield self._get_lbry_files(searchtype, value, return_json=False)
return False if len(lbry_files) > 1:
file_name = f.file_name log.warning("There are %i files to delete, use narrower filters to select one",
d = self._delete_lbry_file(f, delete_file=delete_target_file) len(lbry_files))
d.addCallback(lambda _: "Deleted file: " + file_name) result = False
return d elif not lbry_files:
log.warning("There is no file to delete for '%s'", value)
try: result = False
searchtype, value = get_lbry_file_search_value(kwargs)
except NoValidSearch:
d = defer.fail()
else: else:
d = self._get_lbry_file(searchtype, value, return_json=False) lbry_file = lbry_files[0]
d.addCallback(_delete_file) file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash
if lbry_file.claim_id in self.streams:
d.addCallback(lambda r: self._render_response(r)) del self.streams[lbry_file.claim_id]
return d yield self.lbry_file_manager.delete_lbry_file(lbry_file,
delete_file=delete_target_file)
log.info("Deleted %s (%s)", file_name, utils.short_hash(stream_hash))
result = True
response = yield self._render_response(result)
defer.returnValue(response)
def jsonrpc_get_est_cost(self, **kwargs): def jsonrpc_get_est_cost(self, **kwargs):
""" """
@ -1734,7 +1714,7 @@ class Daemon(AuthJSONRPCServer):
Get estimated cost for a lbry stream Get estimated cost for a lbry stream
Args: Args:
'name': lbry uri 'name': lbry name
'size': stream size, in bytes. if provided an sd blob won't be downloaded. 'size': stream size, in bytes. if provided an sd blob won't be downloaded.
Returns: Returns:
estimated cost estimated cost
@ -2368,7 +2348,7 @@ class Daemon(AuthJSONRPCServer):
if uri: if uri:
metadata = yield self._resolve_name(uri) metadata = yield self._resolve_name(uri)
sd_hash = get_sd_hash(metadata) sd_hash = utils.get_sd_hash(metadata)
blobs = yield self.get_blobs_for_sd_hash(sd_hash) blobs = yield self.get_blobs_for_sd_hash(sd_hash)
elif stream_hash: elif stream_hash:
try: try:
@ -2437,7 +2417,7 @@ class Daemon(AuthJSONRPCServer):
Get stream availability for a winning claim Get stream availability for a winning claim
Arg: Arg:
name (str): lbry uri name (str): lbry name
sd_timeout (int, optional): sd blob download timeout sd_timeout (int, optional): sd blob download timeout
peer_timeout (int, optional): how long to look for peers peer_timeout (int, optional): how long to look for peers
@ -2462,7 +2442,7 @@ class Daemon(AuthJSONRPCServer):
return decoded_sd_blob return decoded_sd_blob
metadata = yield self._resolve_name(name) metadata = yield self._resolve_name(name)
sd_hash = get_sd_hash(metadata) sd_hash = utils.get_sd_hash(metadata)
sd_timeout = sd_timeout or conf.settings['sd_download_timeout'] sd_timeout = sd_timeout or conf.settings['sd_download_timeout']
peer_timeout = peer_timeout or conf.settings['peer_search_timeout'] peer_timeout = peer_timeout or conf.settings['peer_search_timeout']
blobs = [] blobs = []
@ -2574,6 +2554,7 @@ class _DownloadNameHelper(object):
try: try:
download_path = yield self.daemon.add_stream( download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info) self.name, self.timeout, self.download_directory, self.file_name, stream_info)
self.remove_from_wait(None)
except (InsufficientFundsError, Exception) as err: except (InsufficientFundsError, Exception) as err:
if Failure(err).check(InsufficientFundsError): if Failure(err).check(InsufficientFundsError):
log.warning("Insufficient funds to download lbry://%s", self.name) log.warning("Insufficient funds to download lbry://%s", self.name)
@ -2582,7 +2563,8 @@ class _DownloadNameHelper(object):
log.warning("lbry://%s timed out, removing from streams", self.name) log.warning("lbry://%s timed out, removing from streams", self.name)
self.remove_from_wait("Timed out") self.remove_from_wait("Timed out")
if self.daemon.streams[self.name].downloader is not None: if self.daemon.streams[self.name].downloader is not None:
yield self.daemon._delete_lbry_file(self.daemon.streams[self.name].downloader) yield self.daemon.lbry_file_manager.delete_lbry_file(
self.daemon.streams[self.name].downloader)
del self.daemon.streams[self.name] del self.daemon.streams[self.name]
raise err raise err
@ -2679,126 +2661,6 @@ class _ResolveNameHelper(object):
return time_in_cache >= self.daemon.cache_time return time_in_cache >= self.daemon.cache_time
class _GetFileHelper(object):
def __init__(self, daemon, search_by, val, return_json=True):
self.daemon = daemon
self.search_by = search_by
self.val = val
self.return_json = return_json
def retrieve_file(self):
d = self.search_for_file()
if self.return_json:
d.addCallback(self._get_json)
return d
def search_for_file(self):
if self.search_by == FileID.NAME:
return self.daemon._get_lbry_file_by_uri(self.val)
elif self.search_by == FileID.SD_HASH:
return self.daemon._get_lbry_file_by_sd_hash(self.val)
elif self.search_by == FileID.FILE_NAME:
return self.daemon._get_lbry_file_by_file_name(self.val)
elif self.search_by == FileID.STREAM_HASH:
return self.daemon._get_lbry_file_by_stream_hash(self.val)
raise Exception('{} is not a valid search operation'.format(self.search_by))
def _get_json(self, lbry_file):
if lbry_file:
d = lbry_file.get_total_bytes()
d.addCallback(self._generate_reply, lbry_file)
d.addCallback(self._add_metadata, lbry_file)
return d
else:
return False
def _generate_reply(self, size, lbry_file):
written_bytes = self._get_written_bytes(lbry_file)
code, message = self._get_status(lbry_file)
if code == DOWNLOAD_RUNNING_CODE:
d = lbry_file.status()
d.addCallback(self._get_msg_for_file_status)
d.addCallback(
lambda msg: self._get_properties_dict(lbry_file, code, msg, written_bytes, size))
else:
d = defer.succeed(
self._get_properties_dict(lbry_file, code, message, written_bytes, size))
return d
def _get_msg_for_file_status(self, file_status):
message = STREAM_STAGES[2][1] % (
file_status.name, file_status.num_completed, file_status.num_known,
file_status.running_status)
return defer.succeed(message)
def _get_key(self, lbry_file):
return binascii.b2a_hex(lbry_file.key) if lbry_file.key else None
def _full_path(self, lbry_file):
return os.path.join(lbry_file.download_directory, lbry_file.file_name)
def _get_status(self, lbry_file):
if self.search_by == FileID.NAME:
if self.val in self.daemon.streams.keys():
status = self.daemon.streams[self.val].code
elif lbry_file in self.daemon.lbry_file_manager.lbry_files:
status = STREAM_STAGES[2]
else:
status = [False, False]
else:
status = [False, False]
return status
def _get_written_bytes(self, lbry_file):
full_path = self._full_path(lbry_file)
if os.path.isfile(full_path):
with open(full_path) as written_file:
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
else:
written_bytes = False
return written_bytes
def _get_properties_dict(self, lbry_file, code, message, written_bytes, size):
key = self._get_key(lbry_file)
full_path = self._full_path(lbry_file)
mime_type = mimetypes.guess_type(full_path)[0]
return {
'completed': lbry_file.completed,
'file_name': lbry_file.file_name,
'download_directory': lbry_file.download_directory,
'points_paid': lbry_file.points_paid,
'stopped': lbry_file.stopped,
'stream_hash': lbry_file.stream_hash,
'stream_name': lbry_file.stream_name,
'suggested_file_name': lbry_file.suggested_file_name,
'sd_hash': lbry_file.sd_hash,
'lbry_uri': lbry_file.uri,
'txid': lbry_file.txid,
'claim_id': lbry_file.claim_id,
'download_path': full_path,
'mime_type': mime_type,
'key': key,
'total_bytes': size,
'written_bytes': written_bytes,
'code': code,
'message': message
}
def _add_metadata(self, message, lbry_file):
def _add_to_dict(metadata):
message['metadata'] = metadata
return defer.succeed(message)
if lbry_file.txid:
d = self.daemon._resolve_name(lbry_file.uri)
d.addCallbacks(_add_to_dict, lambda _: _add_to_dict("Pending confirmation"))
else:
d = defer.succeed(message)
return d
def loggly_time_string(dt): def loggly_time_string(dt):
formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S") formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S")
milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3)) milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3))
@ -2834,13 +2696,20 @@ def report_bug_to_slack(message, installation_id, platform_name, app_version):
def get_lbry_file_search_value(search_fields): def get_lbry_file_search_value(search_fields):
for searchtype in (FileID.SD_HASH, FileID.NAME, FileID.FILE_NAME, FileID.STREAM_HASH): for searchtype in FileID:
value = search_fields.get(searchtype) value = search_fields.get(searchtype, None)
if value: if value is not None:
return searchtype, value return searchtype, value
raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) raise NoValidSearch('{} is missing a valid search type'.format(search_fields))
def iter_lbry_file_search_values(search_fields):
for searchtype in FileID:
value = search_fields.get(searchtype, None)
if value is not None:
yield searchtype, value
def get_blob_payment_rate_manager(session, payment_rate_manager=None): def get_blob_payment_rate_manager(session, payment_rate_manager=None):
if payment_rate_manager: if payment_rate_manager:
rate_managers = { rate_managers = {

View file

@ -39,8 +39,8 @@ class EncryptedFileReflectorClient(Protocol):
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
@property @property
def lbry_uri(self): def name(self):
return "lbry://%s" % self.factory.lbry_uri return "lbry://%s" % self.factory.name
@property @property
def blob_manager(self): def blob_manager(self):
@ -74,20 +74,20 @@ class EncryptedFileReflectorClient(Protocol):
if reason.check(error.ConnectionDone): if reason.check(error.ConnectionDone):
if not self.needed_blobs: if not self.needed_blobs:
log.info("Reflector has all blobs for %s (%s)", log.info("Reflector has all blobs for %s (%s)",
self.lbry_uri, self.stream_descriptor) self.name, self.stream_descriptor)
elif not self.reflected_blobs: elif not self.reflected_blobs:
log.info("No more completed blobs for %s (%s) to reflect, %i are still needed", log.info("No more completed blobs for %s (%s) to reflect, %i are still needed",
self.lbry_uri, self.stream_descriptor, len(self.needed_blobs)) self.name, self.stream_descriptor, len(self.needed_blobs))
else: else:
log.info('Finished sending reflector %i blobs for %s (%s)', log.info('Finished sending reflector %i blobs for %s (%s)',
len(self.reflected_blobs), self.lbry_uri, self.stream_descriptor) len(self.reflected_blobs), self.name, self.stream_descriptor)
self.factory.finished_deferred.callback(self.reflected_blobs) self.factory.finished_deferred.callback(self.reflected_blobs)
elif reason.check(error.ConnectionLost): elif reason.check(error.ConnectionLost):
log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.lbry_uri, log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.name,
self.stream_descriptor, len(self.reflected_blobs)) self.stream_descriptor, len(self.reflected_blobs))
self.factory.finished_deferred.callback(self.reflected_blobs) self.factory.finished_deferred.callback(self.reflected_blobs)
else: else:
log.info('Reflector finished for %s (%s): %s', self.lbry_uri, self.stream_descriptor, log.info('Reflector finished for %s (%s): %s', self.name, self.stream_descriptor,
reason) reason)
self.factory.finished_deferred.callback(reason) self.factory.finished_deferred.callback(reason)
@ -131,7 +131,7 @@ class EncryptedFileReflectorClient(Protocol):
log.info("Reflector needs %s%i blobs for %s", log.info("Reflector needs %s%i blobs for %s",
needs_desc, needs_desc,
len(filtered), len(filtered),
self.lbry_uri) self.name)
return filtered return filtered
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash) d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
@ -223,7 +223,7 @@ class EncryptedFileReflectorClient(Protocol):
log.info("Sent reflector descriptor %s", self.next_blob_to_send) log.info("Sent reflector descriptor %s", self.next_blob_to_send)
else: else:
log.warning("Reflector failed to receive descriptor %s for %s", log.warning("Reflector failed to receive descriptor %s for %s",
self.next_blob_to_send, self.lbry_uri) self.next_blob_to_send, self.name)
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading() return self.set_not_uploading()
@ -236,7 +236,7 @@ class EncryptedFileReflectorClient(Protocol):
return defer.succeed(True) return defer.succeed(True)
else: else:
log.warning("Reflector already has %s for %s", self.next_blob_to_send, log.warning("Reflector already has %s for %s", self.next_blob_to_send,
self.lbry_uri) self.name)
return self.set_not_uploading() return self.set_not_uploading()
else: # Expecting Server Blob Response else: # Expecting Server Blob Response
if 'received_blob' not in response_dict: if 'received_blob' not in response_dict:
@ -245,10 +245,10 @@ class EncryptedFileReflectorClient(Protocol):
if response_dict['received_blob']: if response_dict['received_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash) self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.debug("Sent reflector blob %s for %s", self.next_blob_to_send, log.debug("Sent reflector blob %s for %s", self.next_blob_to_send,
self.lbry_uri) self.name)
else: else:
log.warning("Reflector failed to receive blob %s for %s", log.warning("Reflector failed to receive blob %s for %s",
self.next_blob_to_send, self.lbry_uri) self.next_blob_to_send, self.name)
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading() return self.set_not_uploading()
@ -284,12 +284,12 @@ class EncryptedFileReflectorClient(Protocol):
err.trap(ValueError) err.trap(ValueError)
if blob_hash not in self.failed_blob_hashes: if blob_hash not in self.failed_blob_hashes:
log.warning("Failed to reflect blob %s for %s, reason: %s", log.warning("Failed to reflect blob %s for %s, reason: %s",
str(blob_hash)[:16], self.lbry_uri, err.getTraceback()) str(blob_hash)[:16], self.name, err.getTraceback())
self.blob_hashes_to_send.append(blob_hash) self.blob_hashes_to_send.append(blob_hash)
self.failed_blob_hashes.append(blob_hash) self.failed_blob_hashes.append(blob_hash)
else: else:
log.warning("Failed second try reflecting blob %s for %s, giving up, reason: %s", log.warning("Failed second try reflecting blob %s for %s, giving up, reason: %s",
str(blob_hash)[:16], self.lbry_uri, err.getTraceback()) str(blob_hash)[:16], self.name, err.getTraceback())
def send_next_request(self): def send_next_request(self):
if self.file_sender is not None: if self.file_sender is not None:
@ -335,8 +335,8 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
return self._lbry_file.stream_hash return self._lbry_file.stream_hash
@property @property
def lbry_uri(self): def name(self):
return self._lbry_file.uri return self._lbry_file.name
@property @property
def protocol_version(self): def protocol_version(self):

View file

@ -61,7 +61,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
log_format = "%(funcName)s(): %(message)s" log_format = "%(funcName)s(): %(message)s"
logging.basicConfig(level=logging.WARNING, format=log_format) logging.basicConfig(level=logging.CRITICAL, format=log_format)
def require_system(system): def require_system(system):

View file

@ -16,7 +16,7 @@ class FakeLBRYFile(object):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash self.stream_hash = stream_hash
self.uri = "fake_uri" self.name = "fake_uri"
class Node(object): class Node(object):