Merge pull request #137 from lbryio/html5

More reliable stream producer
This commit is contained in:
Jack Robison 2016-10-02 03:31:54 -04:00 committed by GitHub
commit 7f980446db

View file

@ -11,11 +11,12 @@ import cgi
from appdirs import user_data_dir
from twisted.web import server, static, resource
from twisted.internet import defer, interfaces, error, reactor, threads
from twisted.internet import abstract, defer, interfaces, error, reactor, task
from zope.interface import implements
from zope.interface import implementer
from lbrynet.lbrynet_daemon.Daemon import Daemon
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.conf import API_ADDRESS, UI_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME
@ -231,123 +232,112 @@ class LBRYindex(resource.Resource):
return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request)
@implementer(interfaces.IPushProducer)
class EncryptedFileStreamer(object):
"""
Writes downloaded LBRY file to request as the download comes in, pausing and resuming as requested
used for Chrome
Writes LBRY stream to request; will pause to wait for new data if the file
is downloading.
No support for range requests (some browser players can't handle it when
the full video data isn't available on request).
"""
implements(interfaces.IPushProducer)
bufferSize = abstract.FileDescriptor.bufferSize
# How long to wait between sending blocks (needed because some
# video players freeze up if you try to send data too fast)
stream_interval = 0.005
# How long to wait before checking if new data has been appended to the file
new_data_check_interval = 0.25
def __init__(self, request, path, stream, file_manager):
def _set_content_length_header(length):
self._request.setHeader('content-length', length)
return defer.succeed(None)
def __init__(self, request, path, start, stop, size):
self._request = request
self._fileObject = file(path)
self._content_type = mimetypes.guess_type(path)[0]
self._stop_pos = size - 1 if stop == '' else int(stop) #chrome and firefox send range requests for "0-"
self._cursor = self._start_pos = int(start)
self._file_size = size
self._depth = 0
self._file = open(path, 'rb')
self._stream = stream
self._file_manager = file_manager
self._headers_sent = False
self._paused = self._sent_bytes = self._stopped = False
self._delay = 0.25
self._deferred = defer.succeed(None)
self._running = True
self._request.setResponseCode(206)
self._request.setHeader('accept-ranges', 'bytes')
self._request.setHeader('content-type', self._content_type)
self._request.setResponseCode(200)
self._request.setHeader('accept-ranges', 'none')
self._request.setHeader('content-type', mimetypes.guess_type(path)[0])
self._request.setHeader("Content-Security-Policy", "sandbox")
self.resumeProducing()
self._deferred = stream.get_total_bytes()
self._deferred.addCallback(_set_content_length_header)
self._deferred.addCallback(lambda _: self.resumeProducing())
def _check_for_new_data(self):
def _recurse_or_stop(stream_status):
if not self._running:
return
if stream_status != ManagedEncryptedFileDownloader.STATUS_FINISHED:
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.new_data_check_interval, self._check_for_new_data))
else:
self.stopProducing()
if not self._running:
return
# Clear the file's EOF indicator by seeking to current position
self._file.seek(self._file.tell())
data = self._file.read(self.bufferSize)
if data:
self._request.write(data)
if self._running: # .write() can trigger a pause
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.stream_interval, self._check_for_new_data))
else:
self._deferred.addCallback(lambda _: self._file_manager.get_lbry_file_status(self._stream))
self._deferred.addCallback(_recurse_or_stop)
def pauseProducing(self):
self._paused = True
log.info("Pausing producer")
return defer.succeed(None)
self._running = False
def resumeProducing(self):
def _check_for_new_data():
self._depth += 1
self._fileObject.seek(self._start_pos, os.SEEK_END)
readable_bytes = self._fileObject.tell()
self._fileObject.seek(self._cursor)
self._sent_bytes = False
if (readable_bytes > self._cursor) and not (self._stopped or self._paused):
read_length = min(readable_bytes, self._stop_pos) - self._cursor + 1
self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length - 1, self._file_size))
self._request.setHeader('content-length', str(read_length))
start_cur = self._cursor
for i in range(read_length):
if self._paused or self._stopped:
break
else:
data = self._fileObject.read(1)
self._request.write(data)
self._cursor += 1
log.info("Wrote range %s-%s/%s, length: %s, readable: %s, depth: %s" %
(start_cur, self._cursor, self._file_size, self._cursor - start_cur, readable_bytes, self._depth))
self._sent_bytes = True
if self._cursor == self._stop_pos + 1:
self.stopProducing()
return defer.succeed(None)
elif self._paused or self._stopped:
return defer.succeed(None)
else:
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self._delay, _check_for_new_data))
return defer.succeed(None)
log.info("Resuming producer")
self._paused = False
self._deferred.addCallback(lambda _: _check_for_new_data())
self._running = True
self._check_for_new_data()
def stopProducing(self):
log.info("Stopping producer")
self._stopped = True
# self._fileObject.close()
self._running = False
self._file.close()
self._deferred.addErrback(lambda err: err.trap(defer.CancelledError))
self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone))
self._deferred.cancel()
# self._request.finish()
self._request.unregisterProducer()
return defer.succeed(None)
self._request.finish()
class HostedEncryptedFile(resource.Resource):
def __init__(self, api):
self._api = api
self._producer = None
resource.Resource.__init__(self)
# todo: fix EncryptedFileStreamer and use it instead of static.File
# def makeProducer(self, request, stream):
# def _save_producer(producer):
# self._producer = producer
# return defer.succeed(None)
#
# range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-')
# start, stop = int(range_header[0]), range_header[1]
# log.info("GET range %s-%s" % (start, stop))
# path = os.path.join(self._api.download_directory, stream.file_name)
#
# d = stream.get_total_bytes()
# d.addCallback(lambda size: _save_producer(EncryptedFileStreamer(request, path, start, stop, size)))
# d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True))
# # request.notifyFinish().addCallback(lambda _: self._producer.stopProducing())
# request.notifyFinish().addErrback(self._responseFailed, d)
# return d
def _make_stream_producer(self, request, stream):
path = os.path.join(self._api.download_directory, stream.file_name)
producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager)
request.registerProducer(producer, streaming=True)
d = request.notifyFinish()
d.addErrback(self._responseFailed, d)
return d
def render_GET(self, request):
request.setHeader("Content-Security-Policy", "sandbox")
if 'name' in request.args.keys():
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
d = self._api._download_name(request.args['name'][0])
# d.addCallback(lambda stream: self.makeProducer(request, stream))
d.addCallback(lambda stream: static.File(os.path.join(self._api.download_directory,
stream.file_name)).render_GET(request))
d.addCallback(lambda stream: self._make_stream_producer(request, stream))
elif request.args['name'][0] in self._api.waiting_on.keys():
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
request.finish()
@ -356,11 +346,11 @@ class HostedEncryptedFile(resource.Resource):
request.finish()
return server.NOT_DONE_YET
# def _responseFailed(self, err, call):
# call.addErrback(lambda err: err.trap(error.ConnectionDone))
# call.addErrback(lambda err: err.trap(defer.CancelledError))
# call.addErrback(lambda err: log.info("Error: " + str(err)))
# call.cancel()
def _responseFailed(self, err, call):
call.addErrback(lambda err: err.trap(error.ConnectionDone))
call.addErrback(lambda err: err.trap(defer.CancelledError))
call.addErrback(lambda err: log.info("Error: " + str(err)))
call.cancel()
class EncryptedFileUpload(resource.Resource):
"""