Refactor of LBRYStreamProducer

This commit is contained in:
Alex Liebowitz 2016-09-30 13:46:43 -04:00
parent aa3aff91d0
commit f245822814

View file

@ -11,7 +11,7 @@ import cgi
from appdirs import user_data_dir from appdirs import user_data_dir
from twisted.web import server, static, resource from twisted.web import server, static, resource
from twisted.internet import abstract, defer, interfaces, error, reactor, threads from twisted.internet import abstract, defer, interfaces, error, reactor, task, threads
from zope.interface import implementer from zope.interface import implementer
@ -261,6 +261,7 @@ class EncryptedFileStreamer(object):
self._file = open(path, 'rb') self._file = open(path, 'rb')
self._stream = stream self._stream = stream
self._file_manager = file_manager self._file_manager = file_manager
self._headers_sent = False
self._running = True self._running = True
@ -273,29 +274,35 @@ class EncryptedFileStreamer(object):
self._deferred.addCallback(_set_content_length_header) self._deferred.addCallback(_set_content_length_header)
self._deferred.addCallback(lambda _: self.resumeProducing()) self._deferred.addCallback(lambda _: self.resumeProducing())
def _check_for_new_data(self):
def _recurse_or_stop(stream_status):
if not self._running:
return
if stream_status != ManagedLBRYFileDownloader.STATUS_FINISHED:
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.new_data_check_interval, self._check_for_new_data))
else:
self.stopProducing()
if not self._running:
return
data = self._file.read(self.bufferSize)
if data:
self._request.write(data)
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.stream_interval, self._check_for_new_data))
else:
# We've written all the bytes currently in the file, but we may
# still be downloading, so check file status to see if we're done.
self._deferred.addCallback(lambda _: self._file_manager.get_lbry_file_status(self._stream))
self._deferred.addCallback(_recurse_or_stop)
def pauseProducing(self): def pauseProducing(self):
self._running = False self._running = False
return defer.succeed(None)
def resumeProducing(self): def resumeProducing(self):
def _check_for_new_data():
if not self._running:
return defer.succeed(None)
data = self._file.read(self.bufferSize)
if data:
self._request.write(data)
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.stream_interval, _check_for_new_data))
else:
status = self._file_manager.get_lbry_file_status(self._stream)
if status != ManagedLBRYFileDownloader.STATUS_FINISHED:
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.new_data_check_interval, _check_for_new_data))
else:
self.stopProducing()
self._running = True self._running = True
self._deferred.addCallback(lambda _: _check_for_new_data()) self._check_for_new_data()
return defer.succeed(None)
def stopProducing(self): def stopProducing(self):
self._running = False self._running = False
@ -305,7 +312,6 @@ class EncryptedFileStreamer(object):
self._deferred.cancel() self._deferred.cancel()
self._request.finish() self._request.finish()
self._request.unregisterProducer() self._request.unregisterProducer()
return defer.succeed(None)
class HostedEncryptedFile(resource.Resource): class HostedEncryptedFile(resource.Resource):
@ -317,8 +323,8 @@ class HostedEncryptedFile(resource.Resource):
path = os.path.join(self._api.download_directory, stream.file_name) path = os.path.join(self._api.download_directory, stream.file_name)
producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager) producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager)
d = defer.Deferred(None) request.registerProducer(producer, streaming=True)
d.addCallback(lambda _: request.registerProducer(producer, streaming=True))
request.notifyFinish().addCallback(lambda _: producer.stopProducing()) request.notifyFinish().addCallback(lambda _: producer.stopProducing())
request.notifyFinish().addErrback(self._responseFailed, d) request.notifyFinish().addErrback(self._responseFailed, d)
return d return d