diff --git a/lbrynet/lbrynet_daemon/DaemonServer.py b/lbrynet/lbrynet_daemon/DaemonServer.py index 230f152ee..80882acf1 100644 --- a/lbrynet/lbrynet_daemon/DaemonServer.py +++ b/lbrynet/lbrynet_daemon/DaemonServer.py @@ -11,7 +11,7 @@ import cgi from appdirs import user_data_dir from twisted.web import server, static, resource -from twisted.internet import abstract, defer, interfaces, error, reactor, threads +from twisted.internet import abstract, defer, interfaces, error, reactor, task, threads from zope.interface import implementer @@ -261,6 +261,7 @@ class EncryptedFileStreamer(object): self._file = open(path, 'rb') self._stream = stream self._file_manager = file_manager + self._headers_sent = False self._running = True @@ -273,29 +274,35 @@ class EncryptedFileStreamer(object): self._deferred.addCallback(_set_content_length_header) self._deferred.addCallback(lambda _: self.resumeProducing()) + def _check_for_new_data(self): + def _recurse_or_stop(stream_status): + if not self._running: + return + + if stream_status != ManagedLBRYFileDownloader.STATUS_FINISHED: + self._deferred.addCallback(lambda _: task.deferLater(reactor, self.new_data_check_interval, self._check_for_new_data)) + else: + self.stopProducing() + + if not self._running: + return + + data = self._file.read(self.bufferSize) + if data: + self._request.write(data) + self._deferred.addCallback(lambda _: task.deferLater(reactor, self.stream_interval, self._check_for_new_data)) + else: + # We've written all the bytes currently in the file, but we may + # still be downloading, so check file status to see if we're done. + self._deferred.addCallback(lambda _: self._file_manager.get_lbry_file_status(self._stream)) + self._deferred.addCallback(_recurse_or_stop) + def pauseProducing(self): self._running = False - return defer.succeed(None) def resumeProducing(self): - def _check_for_new_data(): - if not self._running: - return defer.succeed(None) - - data = self._file.read(self.bufferSize) - if data: - self._request.write(data) - self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.stream_interval, _check_for_new_data)) - else: - status = self._file_manager.get_lbry_file_status(self._stream) - if status != ManagedLBRYFileDownloader.STATUS_FINISHED: - self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.new_data_check_interval, _check_for_new_data)) - else: - self.stopProducing() - self._running = True - self._deferred.addCallback(lambda _: _check_for_new_data()) - return defer.succeed(None) + self._check_for_new_data() def stopProducing(self): self._running = False @@ -305,7 +312,6 @@ class EncryptedFileStreamer(object): self._deferred.cancel() self._request.finish() self._request.unregisterProducer() - return defer.succeed(None) class HostedEncryptedFile(resource.Resource): @@ -317,8 +323,8 @@ class HostedEncryptedFile(resource.Resource): path = os.path.join(self._api.download_directory, stream.file_name) producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager) - d = defer.Deferred(None) - d.addCallback(lambda _: request.registerProducer(producer, streaming=True)) + request.registerProducer(producer, streaming=True) + request.notifyFinish().addCallback(lambda _: producer.stopProducing()) request.notifyFinish().addErrback(self._responseFailed, d) return d