update download, handle fee conversion error, use uri instead of name to get()

This commit is contained in:
Jack Robison 2017-04-06 20:40:55 -04:00
parent 1880f64da2
commit bcf7a28fc8
2 changed files with 74 additions and 170 deletions

View file

@ -16,6 +16,11 @@ from twisted.internet import defer, threads, error, reactor, task
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbryschema.decode import smart_decode
from lbryschema.claim import ClaimDict
from lbryschema.uri import parse_lbry_uri
from lbryschema.error import DecodeError
# TODO: importing this when internet is disabled raises a socket.gaierror # TODO: importing this when internet is disabled raises a socket.gaierror
from lbryum.version import LBRYUM_VERSION from lbryum.version import LBRYUM_VERSION
from lbrynet import __version__ as LBRYNET_VERSION from lbrynet import __version__ as LBRYNET_VERSION
@ -695,20 +700,41 @@ class Daemon(AuthJSONRPCServer):
return finished_d return finished_d
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, stream_info, timeout=None, download_directory=None, def _download_name(self, name, stream_info, claim_id, timeout=None, download_directory=None,
file_name=None, wait_for_write=True): file_name=None):
""" """
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
""" """
timeout = timeout if timeout is not None else conf.settings['download_timeout'] if claim_id in self.streams:
downloader = self.streams[claim_id]
result = yield downloader.finished_deferred
defer.returnValue(result)
else:
download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, stream_info)
helper = _DownloadNameHelper(self, name, timeout, download_directory, file_name, self.streams[claim_id] = GetStream(self.sd_identifier, self.session,
wait_for_write) self.session.wallet, self.lbry_file_manager,
lbry_file = yield helper.setup_stream(stream_info) self.exchange_rate_manager, self.max_key_fee,
sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) conf.settings['data_rate'], timeout,
defer.returnValue((sd_hash, file_path)) download_directory, file_name)
try:
download = self.streams[claim_id].start(stream_info, name)
self.streams[claim_id].finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(download_id,
name,
stream_info))
lbry_file = yield download
result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
del self.streams[claim_id]
except Exception as err:
log.warning('Failed to get %s: %s', name, err)
self.analytics_manager.send_download_errored(download_id, name, stream_info)
del self.streams[claim_id]
result = {'error': err.message}
defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def _publish_stream(self, name, bid, claim_dict, file_path=None): def _publish_stream(self, name, bid, claim_dict, file_path=None):
@ -1386,19 +1412,17 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_get(self, name, claim_id=None, file_name=None, timeout=None, @AuthJSONRPCServer.auth_required
download_directory=None, wait_for_write=True): @defer.inlineCallbacks
def jsonrpc_get(self, uri, file_name=None, timeout=None, download_directory=None):
""" """
Download stream from a LBRY name. Download stream from a LBRY name.
Args: Args:
'name': (str) name to download 'uri': (str) lbry uri to download
'claim_id' (optional): (str) claim id for claim to download
'file_name'(optional): (str) a user specified name for the downloaded file 'file_name'(optional): (str) a user specified name for the downloaded file
'timeout'(optional): (int) download timeout in number of seconds 'timeout'(optional): (int) download timeout in number of seconds
'download_directory'(optional): (str) path to directory where file will be saved 'download_directory'(optional): (str) path to directory where file will be saved
'wait_for_write'(optional): (bool) defaults to True. When set, waits for the file to
only start to be written before returning any results.
Returns: Returns:
(dict) Dictionary contaning information about the stream (dict) Dictionary contaning information about the stream
@ -1426,34 +1450,26 @@ class Daemon(AuthJSONRPCServer):
""" """
def _get_claim(_claim_id, _claims):
#TODO: do this in Wallet class
for claim in _claims['claims']:
if claim['claim_id'] == _claim_id:
return smart_decode(claim['value']).claim_dict
log.info("Received request to get %s", name)
timeout = timeout if timeout is not None else self.download_timeout timeout = timeout if timeout is not None else self.download_timeout
download_directory = download_directory or self.download_directory download_directory = download_directory or self.download_directory
if name in self.streams:
resolved = yield self.session.wallet.resolve_uri(uri)
if 'value' not in resolved:
if 'claim' not in resolved:
raise Exception("Nothing to download")
else:
resolved = resolved['claim']
name = resolved['name']
claim_id = resolved['claim_id']
stream_info = resolved['value']
if claim_id in self.streams:
log.info("Already waiting on lbry://%s to start downloading", name) log.info("Already waiting on lbry://%s to start downloading", name)
yield self.streams[name].data_downloading_deferred yield self.streams[claim_id].data_downloading_deferred
stream_info = None
lbry_file = None
if claim_id:
lbry_file = yield self._get_lbry_file(FileID.CLAIM_ID, claim_id, return_json=False) lbry_file = yield self._get_lbry_file(FileID.CLAIM_ID, claim_id, return_json=False)
claims = yield self.session.wallet.get_claims_for_name(name)
formatted_claims = format_json_out_amount_as_float(claims)
stream_info = _get_claim(claim_id, formatted_claims)
if not stream_info:
log.error("No claim %s for lbry://%s, using winning claim", claim_id, name)
if not stream_info:
lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False)
stream_info = yield self._resolve_name(name)
if lbry_file: if lbry_file:
if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)): if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)):
@ -1464,26 +1480,9 @@ class Daemon(AuthJSONRPCServer):
log.info('Already have a file for %s', name) log.info('Already have a file for %s', name)
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
else: else:
download_id = utils.random_string() result = yield self._download_name(name, stream_info, claim_id, timeout=timeout,
self.analytics_manager.send_download_started(download_id, name, stream_info)
try:
yield self._download_name(name=name, stream_info=stream_info, timeout=timeout,
download_directory=download_directory, download_directory=download_directory,
file_name=file_name, wait_for_write=wait_for_write) file_name=file_name)
stream = self.streams[name]
stream.finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(
download_id, name, stream_info)
)
result = yield self._get_lbry_file_dict(self.streams[name].downloader,
full_status=True)
except Exception as e:
# TODO: should reraise here, instead of returning e.message
log.warning('Failed to get %s', name)
self.analytics_manager.send_download_errored(download_id, name, stream_info)
result = e.message
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@ -2401,114 +2400,6 @@ class Daemon(AuthJSONRPCServer):
return d return d
class _DownloadNameHelper(object):
def __init__(self, daemon, name, timeout=None, download_directory=None, file_name=None,
wait_for_write=True):
self.daemon = daemon
self.name = name
self.timeout = timeout if timeout is not None else conf.settings['download_timeout']
if not download_directory or not os.path.isdir(download_directory):
self.download_directory = daemon.download_directory
else:
self.download_directory = download_directory
self.file_name = file_name
self.wait_for_write = wait_for_write
@defer.inlineCallbacks
def setup_stream(self, stream_info):
sd_hash = utils.get_sd_hash(stream_info)
lbry_file = yield self.daemon._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
if self._does_lbry_file_exists(lbry_file):
defer.returnValue(lbry_file)
else:
defer.returnValue(None)
def _does_lbry_file_exists(self, lbry_file):
return lbry_file and os.path.isfile(self._full_path(lbry_file))
def _full_path(self, lbry_file):
return os.path.join(self.download_directory, lbry_file.file_name)
@defer.inlineCallbacks
def wait_or_get_stream(self, stream_info, lbry_file):
if lbry_file:
log.debug('Wait on lbry_file')
# returns the lbry_file
yield self._wait_on_lbry_file(lbry_file)
defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file)))
else:
log.debug('No lbry_file, need to get stream')
# returns an instance of ManagedEncryptedFileDownloaderFactory
sd_hash, file_path = yield self._get_stream(stream_info)
defer.returnValue((sd_hash, file_path))
def _wait_on_lbry_file(self, f):
file_path = self._full_path(f)
written_bytes = self._get_written_bytes(file_path)
if written_bytes:
log.info("File has bytes: %s --> %s", f.sd_hash, file_path)
return defer.succeed(True)
return task.deferLater(reactor, 1, self._wait_on_lbry_file, f)
@defer.inlineCallbacks
def _get_stream(self, stream_info):
try:
download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
self.remove_from_wait(None)
except (InsufficientFundsError, Exception) as err:
if Failure(err).check(InsufficientFundsError):
log.warning("Insufficient funds to download lbry://%s", self.name)
self.remove_from_wait("Insufficient funds")
else:
log.warning("lbry://%s timed out, removing from streams", self.name)
self.remove_from_wait("Timed out")
if self.daemon.streams[self.name].downloader is not None:
yield self.daemon.lbry_file_manager.delete_lbry_file(
self.daemon.streams[self.name].downloader)
del self.daemon.streams[self.name]
raise err
if self.wait_for_write:
yield self._wait_for_write()
defer.returnValue((self.daemon.streams[self.name].sd_hash, download_path))
def _wait_for_write(self):
d = defer.succeed(None)
if not self._has_downloader_wrote():
d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write))
return d
def _has_downloader_wrote(self):
stream = self.daemon.streams.get(self.name, False)
if stream:
file_path = self._full_path(stream.downloader)
return self._get_written_bytes(file_path)
else:
return False
def _get_written_bytes(self, file_path):
"""Returns the number of bytes written to `file_path`.
Returns False if there were issues reading `file_path`.
"""
try:
if os.path.isfile(file_path):
with open(file_path) as written_file:
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
else:
written_bytes = False
except Exception:
writen_bytes = False
return written_bytes
def remove_from_wait(self, reason):
if self.name in self.daemon.waiting_on:
del self.daemon.waiting_on[self.name]
return reason
class _ResolveNameHelper(object): class _ResolveNameHelper(object):
def __init__(self, daemon, name, force_refresh): def __init__(self, daemon, name, force_refresh):
self.daemon = daemon self.daemon = daemon

View file

@ -1,6 +1,6 @@
import logging import logging
import os import os
from twisted.internet import defer from twisted.internet import defer, threads
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from lbrynet.core import utils from lbrynet.core import utils
@ -63,6 +63,8 @@ class GetStream(object):
# fired after the metadata and the first data blob have been downloaded # fired after the metadata and the first data blob have been downloaded
self.data_downloading_deferred = defer.Deferred(None) self.data_downloading_deferred = defer.Deferred(None)
self._running = False
@property @property
def download_path(self): def download_path(self):
return os.path.join(self.download_directory, self.downloader.file_name) return os.path.join(self.download_directory, self.downloader.file_name)
@ -88,7 +90,7 @@ class GetStream(object):
elif self.downloader: elif self.downloader:
d = self.downloader.status() d = self.downloader.status()
d.addCallback(self._check_status) d.addCallback(self._check_status)
else: elif self._running:
log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter)
def convert_max_fee(self): def convert_max_fee(self):
@ -158,10 +160,21 @@ class GetStream(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def download(self, stream_info, name): def download(self, stream_info, name):
if self._running:
raise Exception("Already running")
self._running = True
self.set_status(INITIALIZING_CODE, name) self.set_status(INITIALIZING_CODE, name)
self.sd_hash = utils.get_sd_hash(stream_info) self.sd_hash = utils.get_sd_hash(stream_info)
if 'fee' in stream_info['stream']['metadata']: if 'fee' in stream_info['stream']['metadata']:
fee = self.check_fee(stream_info['stream']['metadata']['fee']) try:
fee = yield threads.deferToThread(self.check_fee,
stream_info['stream']['metadata']['fee'])
except Exception as err:
self._running = False
self.finished_deferred.errback(err)
raise err
else: else:
fee = None fee = None
@ -184,7 +197,7 @@ class GetStream(object):
safe_start(self.checker) safe_start(self.checker)
self.download(stream_info, name) self.download(stream_info, name)
yield self.data_downloading_deferred yield self.data_downloading_deferred
defer.returnValue(self.download_path) defer.returnValue(self.downloader)
except Exception as err: except Exception as err:
safe_stop(self.checker) safe_stop(self.checker)
raise err raise err