From 01cc4f28e01c3ac8793009614484e85c803955e5 Mon Sep 17 00:00:00 2001
From: Job Evers-Meltzer <jobevers@users.noreply.github.com>
Date: Fri, 30 Dec 2016 00:12:20 -0600
Subject: [PATCH] Refactor jsonrpc_get

Convert some of the calls to inlineCallbacks, which
allowed the code to be cleaned up and made more clear
---
 .../lbryfile/EncryptedFileMetadataManager.py  |   2 +-
 lbrynet/lbrynet_daemon/Daemon.py              | 174 +++++++++---------
 lbrynet/lbrynet_daemon/Downloader.py          |  28 +--
 3 files changed, 102 insertions(+), 102 deletions(-)

diff --git a/lbrynet/lbryfile/EncryptedFileMetadataManager.py b/lbrynet/lbryfile/EncryptedFileMetadataManager.py
index bae6291b6..e9b2e7335 100644
--- a/lbrynet/lbryfile/EncryptedFileMetadataManager.py
+++ b/lbrynet/lbryfile/EncryptedFileMetadataManager.py
@@ -49,7 +49,7 @@ class DBEncryptedFileMetadataManager(object):
 
     def get_blobs_for_stream(self, stream_hash, start_blob=None,
                              end_blob=None, count=None, reverse=False):
-        log.debug("Getting blobs for a stream. Count is %s", str(count))
+        log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count)
 
         def get_positions_of_start_and_end():
             if start_blob is not None:
diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py
index 5eac402a7..295494087 100644
--- a/lbrynet/lbrynet_daemon/Daemon.py
+++ b/lbrynet/lbrynet_daemon/Daemon.py
@@ -70,6 +70,7 @@ STARTUP_STAGES = [
                     (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits...')
                   ]
 
+# TODO: make this consistent with the stages in Downloader.py
 DOWNLOAD_METADATA_CODE = 'downloading_metadata'
 DOWNLOAD_TIMEOUT_CODE = 'timeout'
 DOWNLOAD_RUNNING_CODE = 'running'
@@ -779,9 +780,9 @@ class Daemon(AuthJSONRPCServer):
         d.addCallback(BlobStreamDescriptorReader)
         d.addCallback(lambda blob: blob.get_info())
         d.addCallback(cb)
-
         return r
 
+    @defer.inlineCallbacks
     def _download_name(self, name, timeout=None, download_directory=None,
                        file_name=None, stream_info=None, wait_for_write=True):
         """
@@ -795,14 +796,11 @@ class Daemon(AuthJSONRPCServer):
 
         if not stream_info:
             self.waiting_on[name] = True
-            d = self._resolve_name(name)
-        else:
-            d = defer.succeed(stream_info)
-        d.addCallback(helper.setup_stream)
-        d.addCallback(helper.wait_or_get_stream)
-        if not stream_info:
-            d.addCallback(helper.remove_from_wait)
-        return d
+            stream_info = yield self._resolve_name(name)
+            del self.waiting_on[name]
+        lbry_file = yield helper.setup_stream(stream_info)
+        sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file)
+        defer.returnValue((sd_hash, file_path))
 
     def add_stream(self, name, timeout, download_directory, file_name, stream_info):
         """Makes, adds and starts a stream"""
@@ -1400,8 +1398,6 @@ class Daemon(AuthJSONRPCServer):
             return self._render_response(None, BAD_REQUEST)
 
         d = self._resolve_name(name, force_refresh=force)
-        # TODO: this is the rpc call that returns a server.failure.
-        #       what is up with that?
         d.addCallbacks(
             lambda info: self._render_response(info, OK_CODE),
             # TODO: Is server.failure a module? It looks like it:
@@ -1484,6 +1480,7 @@ class Daemon(AuthJSONRPCServer):
         )
 
     @AuthJSONRPCServer.auth_required
+    @defer.inlineCallbacks
     def jsonrpc_get(self, p):
         """Download stream from a LBRY uri.
 
@@ -1493,28 +1490,40 @@ class Daemon(AuthJSONRPCServer):
             'file_name': optional, a user specified name for the downloaded file
             'stream_info': optional, specified stream info overrides name
             'timeout': optional
-            'wait_for_write': optional, defaults to True
+            'wait_for_write': optional, defaults to True. When set, waits for the file to
+                only start to be written before returning any results.
         Returns:
             'stream_hash': hex string
             'path': path of download
         """
         params = self._process_get_parameters(p)
         if not params.name:
-            return server.failure
+            # TODO: return a useful error message here, like "name argument is required"
+            defer.returnValue(server.failure)
         if params.name in self.waiting_on:
-            return server.failure
-        d = self._download_name(name=params.name,
-                                timeout=params.timeout,
-                                download_directory=params.download_directory,
-                                stream_info=params.stream_info,
-                                file_name=params.file_name,
-                                wait_for_write=params.wait_for_write)
-        # TODO: downloading can timeout.  Not sure what to do when that happens
-        d.addCallbacks(
-            get_output_callback(params),
-            lambda err: str(err))
-        d.addCallback(lambda message: self._render_response(message, OK_CODE))
-        return d
+            # TODO: return a useful error message here, like "already
+            # waiting for name to be resolved"
+            defer.returnValue(server.failure)
+        try:
+            sd_hash, file_path = yield self._download_name(
+                name=params.name,
+                timeout=params.timeout,
+                download_directory=params.download_directory,
+                stream_info=params.stream_info,
+                file_name=params.file_name,
+                wait_for_write=params.wait_for_write
+            )
+        except Exception as e:
+            log.exception('Failed to get %s', params.name)
+            response = yield self._render_response(str(e), OK_CODE)
+        else:
+            # TODO: should stream_hash key be changed to sd_hash?
+            message = {
+                'stream_hash': params.sd_hash if params.stream_info else sd_hash,
+                'path': file_path
+            }
+            response = yield self._render_response(message, OK_CODE)
+        defer.returnValue(response)
 
     @AuthJSONRPCServer.auth_required
     def jsonrpc_stop_lbry_file(self, p):
@@ -1722,6 +1731,7 @@ class Daemon(AuthJSONRPCServer):
             txid = p['txid']
             nout = p['nout']
         else:
+            # TODO: return a useful error message
             return server.failure
 
         def _disp(x):
@@ -1916,6 +1926,7 @@ class Daemon(AuthJSONRPCServer):
             amount = p['amount']
             address = p['address']
         else:
+            # TODO: return a useful error message
             return server.failure
 
         reserved_points = self.session.wallet.reserve_points(address, amount)
@@ -1957,6 +1968,7 @@ class Daemon(AuthJSONRPCServer):
             d = self.session.wallet.get_block_info(height)
             d.addCallback(lambda blockhash: self.session.wallet.get_block(blockhash))
         else:
+            # TODO: return a useful error message
             return server.failure
         d.addCallback(lambda r: self._render_response(r, OK_CODE))
         return d
@@ -1974,6 +1986,7 @@ class Daemon(AuthJSONRPCServer):
         if 'txid' in p.keys():
             txid = p['txid']
         else:
+            # TODO: return a useful error message
             return server.failure
 
         d = self.session.wallet.get_claims_from_tx(txid)
@@ -2318,15 +2331,6 @@ def get_sd_hash(stream_info):
         return stream_info.get('stream_hash')
 
 
-def get_output_callback(params):
-    def callback(l):
-        return {
-            'stream_hash': params.sd_hash if params.stream_info else l.sd_hash,
-            'path': os.path.join(params.download_directory, l.file_name)
-        }
-    return callback
-
-
 class _DownloadNameHelper(object):
     def __init__(self, daemon, name,
                  timeout=None,
@@ -2342,52 +2346,54 @@ class _DownloadNameHelper(object):
         self.file_name = file_name
         self.wait_for_write = wait_for_write
 
+    @defer.inlineCallbacks
     def setup_stream(self, stream_info):
-        stream_hash = get_sd_hash(stream_info)
-        d = self.daemon._get_lbry_file_by_sd_hash(stream_hash)
-        d.addCallback(self._prepend_stream_info, stream_info)
-        return d
+        sd_hash = get_sd_hash(stream_info)
+        lbry_file = yield self.daemon._get_lbry_file_by_sd_hash(sd_hash)
+        if self._does_lbry_file_exists(lbry_file):
+            defer.returnValue(lbry_file)
+        else:
+            defer.returnValue(None)
 
-    def _prepend_stream_info(self, lbry_file, stream_info):
-        if lbry_file:
-            if os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)):
-                return defer.succeed((stream_info, lbry_file))
-        return defer.succeed((stream_info, None))
+    def _does_lbry_file_exists(self, lbry_file):
+        return lbry_file and os.path.isfile(self._full_path(lbry_file))
 
-    def wait_or_get_stream(self, args):
-        stream_info, lbry_file = args
+    def _full_path(self, lbry_file):
+        return os.path.join(self.download_directory, lbry_file.file_name)
+
+    @defer.inlineCallbacks
+    def wait_or_get_stream(self, stream_info, lbry_file):
         if lbry_file:
             log.debug('Wait on lbry_file')
-            return self._wait_on_lbry_file(lbry_file)
+            # returns the lbry_file
+            yield self._wait_on_lbry_file(lbry_file)
+            defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file)))
         else:
             log.debug('No lbry_file, need to get stream')
-            return self._get_stream(stream_info)
+            # returns an instance of ManagedEncryptedFileDownloaderFactory
+            sd_hash, file_path = yield self._get_stream(stream_info)
+            defer.returnValue((sd_hash, file_path))
 
+    def _wait_on_lbry_file(self, f):
+        file_path = self._full_path(f)
+        written_bytes = self._get_written_bytes(file_path)
+        if written_bytes:
+            log.info("File has bytes: %s --> %s", f.sd_hash, file_path)
+            return defer.succeed(True)
+        return task.deferLater(reactor, 1, self._wait_on_lbry_file, f)
+
+    @defer.inlineCallbacks
     def _get_stream(self, stream_info):
-        d = self.daemon.add_stream(
+        was_successful, sd_hash, download_path = yield self.daemon.add_stream(
             self.name, self.timeout, self.download_directory, self.file_name, stream_info)
-
-        def _handle_timeout(args):
-            was_successful, _, _ = args
-            if not was_successful:
-                log.warning("lbry://%s timed out, removing from streams", self.name)
-                del self.daemon.streams[self.name]
-
-        d.addCallback(_handle_timeout)
-
+        if not was_successful:
+            log.warning("lbry://%s timed out, removing from streams", self.name)
+            del self.daemon.streams[self.name]
+            self.remove_from_wait("Timed out")
+            raise Exception("Timed out")
         if self.wait_for_write:
-            d.addCallback(lambda _: self._wait_for_write())
-
-        def _get_stream_for_return():
-            stream = self.daemon.streams.get(self.name, None)
-            if stream:
-                return stream.downloader
-            else:
-                self.remove_from_wait("Timed out")
-                return defer.fail(Exception("Timed out"))
-
-        d.addCallback(lambda _: _get_stream_for_return())
-        return d
+            yield self._wait_for_write()
+        defer.returnValue((sd_hash, download_path))
 
     def _wait_for_write(self):
         d = defer.succeed(None)
@@ -2398,39 +2404,27 @@ class _DownloadNameHelper(object):
     def _has_downloader_wrote(self):
         stream = self.daemon.streams.get(self.name, False)
         if stream:
-            return self._get_written_bytes(stream.downloader.file_name)
+            file_path = self._full_path(stream.downloader)
+            return self._get_written_bytes(file_path)
         else:
             return False
 
-    def _wait_on_lbry_file(self, f):
-        written_bytes = self._get_written_bytes(f.file_name)
-        if written_bytes:
-            return defer.succeed(self._disp_file(f))
-        return task.deferLater(reactor, 1, self._wait_on_lbry_file, f)
+    def _get_written_bytes(self, file_path):
+        """Returns the number of bytes written to `file_path`.
 
-    def _get_written_bytes(self, file_name):
-        """Returns the number of bytes written to `file_name`.
-
-        Returns False if there were issues reading `file_name`.
+        Returns False if there were issues reading `file_path`.
         """
         try:
-            file_path = os.path.join(self.download_directory, file_name)
             if os.path.isfile(file_path):
-                written_file = file(file_path)
-                written_file.seek(0, os.SEEK_END)
-                written_bytes = written_file.tell()
-                written_file.close()
+                with open(file_path) as written_file:
+                    written_file.seek(0, os.SEEK_END)
+                    written_bytes = written_file.tell()
             else:
                 written_bytes = False
         except Exception:
             writen_bytes = False
         return written_bytes
 
-    def _disp_file(self, f):
-        file_path = os.path.join(self.download_directory, f.file_name)
-        log.info("Already downloaded: %s --> %s", f.sd_hash, file_path)
-        return f
-
     def remove_from_wait(self, reason):
         if self.name in self.daemon.waiting_on:
             del self.daemon.waiting_on[self.name]
diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py
index 3f59e175e..88eb10163 100644
--- a/lbrynet/lbrynet_daemon/Downloader.py
+++ b/lbrynet/lbrynet_daemon/Downloader.py
@@ -15,6 +15,7 @@ INITIALIZING_CODE = 'initializing'
 DOWNLOAD_METADATA_CODE = 'downloading_metadata'
 DOWNLOAD_TIMEOUT_CODE = 'timeout'
 DOWNLOAD_RUNNING_CODE = 'running'
+# TODO: is this ever used?
 DOWNLOAD_STOPPED_CODE = 'stopped'
 STREAM_STAGES = [
     (INITIALIZING_CODE, 'Initializing...'),
@@ -46,7 +47,7 @@ class GetStream(object):
         self.payment_rate_manager = self.session.payment_rate_manager
         self.lbry_file_manager = lbry_file_manager
         self.sd_identifier = sd_identifier
-        self.stream_hash = None
+        self.sd_hash = None
         self.max_key_fee = max_key_fee
         self.stream_info = None
         self.stream_info_manager = None
@@ -56,6 +57,8 @@ class GetStream(object):
         self.download_directory = download_directory
         self.download_path = None
         self.downloader = None
+        # fired after the metadata has been downloaded and the
+        # actual file has been started
         self.finished = defer.Deferred(None)
         self.checker = LoopingCall(self.check_status)
         self.code = STREAM_STAGES[0]
@@ -63,10 +66,10 @@ class GetStream(object):
     def check_status(self):
         self.timeout_counter += 1
 
-        # TODO: Why is this the stopping condition for the finished callback?
+        # download_path is set after the sd blob has been downloaded
         if self.download_path:
             self.checker.stop()
-            self.finished.callback((True, self.stream_hash, self.download_path))
+            self.finished.callback((True, self.sd_hash, self.download_path))
 
         elif self.timeout_counter >= self.timeout:
             log.info("Timeout downloading lbry://%s" % self.resolved_name)
@@ -108,20 +111,20 @@ class GetStream(object):
         self.resolved_name = name
         self.stream_info = deepcopy(stream_info)
         self.description = self.stream_info['description']
-        self.stream_hash = self.stream_info['sources']['lbry_sd_hash']
+        self.sd_hash = self.stream_info['sources']['lbry_sd_hash']
 
         if 'fee' in self.stream_info:
             self.fee = FeeValidator(self.stream_info['fee'])
             max_key_fee = self._convert_max_fee()
             converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount
             if converted_fee > self.wallet.wallet_balance:
-                log.warning("Insufficient funds to download lbry://%s", self.resolved_name)
-                return defer.fail(InsufficientFundsError())
+                msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format(
+                    self.resolved_name, converted_fee, self.wallet.wallet_balance)
+                raise InsufficientFundsError(msg)
             if converted_fee > max_key_fee:
-                log.warning(
-                    "Key fee %f above limit of %f didn't download lbry://%s",
+                msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format(
                     converted_fee, max_key_fee, self.resolved_name)
-                return defer.fail(KeyFeeAboveMaxAllowed())
+                raise KeyFeeAboveMaxAllowed(msg)
             log.info(
                 "Key fee %f below limit of %f, downloading lbry://%s",
                 converted_fee, max_key_fee, self.resolved_name)
@@ -130,7 +133,7 @@ class GetStream(object):
 
         self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE))
         self._d.addCallback(lambda _: download_sd_blob(
-            self.session, self.stream_hash, self.payment_rate_manager))
+            self.session, self.sd_hash, self.payment_rate_manager))
         self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
         self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
         self._d.addCallback(get_downloader_factory)
@@ -147,7 +150,7 @@ class GetStream(object):
 
         d = self._pay_key_fee()
         d.addCallback(lambda _: log.info(
-            "Downloading %s --> %s", self.stream_hash, self.downloader.file_name))
+            "Downloading %s --> %s", self.sd_hash, self.downloader.file_name))
         d.addCallback(lambda _: self.downloader.start())
 
     def _pay_key_fee(self):
@@ -155,6 +158,9 @@ class GetStream(object):
             fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount
             reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc)
             if reserved_points is None:
+                log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name)
+                # TODO: If we get here, nobody will know that there was an error
+                #       as nobody actually cares about self._d
                 return defer.fail(InsufficientFundsError())
             return self.wallet.send_points_to_address(reserved_points, fee_lbc)
         return defer.succeed(None)