Add basic support for streaming partially downloaded files

This commit is contained in:
Alex Liebowitz 2016-08-26 06:15:09 -04:00
parent eb7cc8463b
commit 5537dd878f

View file

@ -11,9 +11,9 @@ import cgi
from appdirs import user_data_dir
from twisted.web import server, static, resource
from twisted.internet import defer, interfaces, error, reactor, threads
from twisted.internet import abstract, defer, interfaces, error, reactor, threads
from zope.interface import implements
from zope.interface import implementer
from lbrynet.lbrynet_daemon.Daemon import Daemon
from lbrynet.conf import API_ADDRESS, UI_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME
@ -231,29 +231,31 @@ class LBRYindex(resource.Resource):
return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request)
@implementer(interfaces.IPushProducer)
class EncryptedFileStreamer(object):
"""
Writes downloaded LBRY file to request as the download comes in, pausing and resuming as requested
used for Chrome
Writes LBRY stream to request; will pause to wait for new data if the file
is downloading.
No support for range requests (some browser players can't handle it when
the full video data isn't available on request).
"""
implements(interfaces.IPushProducer)
bufferSize = abstract.FileDescriptor.bufferSize
delay = 0.25
def __init__(self, request, path, start, stop, size):
def __init__(self, request, path, total_bytes):
self._request = request
self._fileObject = file(path)
self._fileObject = open(path, 'rb')
self._content_type = mimetypes.guess_type(path)[0]
self._stop_pos = size - 1 if stop == '' else int(stop) #chrome and firefox send range requests for "0-"
self._cursor = self._start_pos = int(start)
self._file_size = size
self._depth = 0
self._bytes_written = 0
self._stopped = False
self._total_bytes = total_bytes
self._paused = self._sent_bytes = self._stopped = False
self._delay = 0.25
self._deferred = defer.succeed(None)
self._request.setResponseCode(206)
self._request.setHeader('accept-ranges', 'bytes')
self._request.setResponseCode(200)
self._request.setHeader('accept-ranges', 'none')
self._request.setHeader('content-type', self._content_type)
self._request.setHeader("Content-Security-Policy", "sandbox")
@ -266,47 +268,28 @@ class EncryptedFileStreamer(object):
def resumeProducing(self):
def _check_for_new_data():
self._depth += 1
self._fileObject.seek(self._start_pos, os.SEEK_END)
readable_bytes = self._fileObject.tell()
self._fileObject.seek(self._cursor)
data = self._fileObject.read(self.bufferSize)
self._sent_bytes = False
self._request.write(data)
log.info('wrote to request')
self._bytes_written += len(data)
if (readable_bytes > self._cursor) and not (self._stopped or self._paused):
read_length = min(readable_bytes, self._stop_pos) - self._cursor + 1
self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length - 1, self._file_size))
self._request.setHeader('content-length', str(read_length))
start_cur = self._cursor
for i in range(read_length):
if self._paused or self._stopped:
break
else:
data = self._fileObject.read(1)
self._request.write(data)
self._cursor += 1
log.info("Wrote range %s-%s/%s, length: %s, readable: %s, depth: %s" %
(start_cur, self._cursor, self._file_size, self._cursor - start_cur, readable_bytes, self._depth))
self._sent_bytes = True
if self._cursor == self._stop_pos + 1:
if self._bytes_written >= self._total_bytes:
self.stopProducing()
return defer.succeed(None)
elif self._paused or self._stopped:
elif self._stopped:
return defer.succeed(None)
else:
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self._delay, _check_for_new_data))
self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.delay, _check_for_new_data))
return defer.succeed(None)
log.info("Resuming producer")
self._paused = False
self._deferred.addCallback(lambda _: _check_for_new_data())
def stopProducing(self):
log.info("Stopping producer")
self._stopped = True
# self._fileObject.close()
self._fileObject.close()
self._deferred.addErrback(lambda err: err.trap(defer.CancelledError))
self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone))
self._deferred.cancel()
@ -321,33 +304,26 @@ class HostedEncryptedFile(resource.Resource):
self._producer = None
resource.Resource.__init__(self)
# todo: fix EncryptedFileStreamer and use it instead of static.File
# def makeProducer(self, request, stream):
# def _save_producer(producer):
# self._producer = producer
# return defer.succeed(None)
#
# range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-')
# start, stop = int(range_header[0]), range_header[1]
# log.info("GET range %s-%s" % (start, stop))
# path = os.path.join(self._api.download_directory, stream.file_name)
#
# d = stream.get_total_bytes()
# d.addCallback(lambda size: _save_producer(EncryptedFileStreamer(request, path, start, stop, size)))
# d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True))
# # request.notifyFinish().addCallback(lambda _: self._producer.stopProducing())
# request.notifyFinish().addErrback(self._responseFailed, d)
# return d
def makeProducer(self, request, stream):
def _save_producer(producer):
self._producer = producer
return defer.succeed(None)
path = os.path.join(self._api.download_directory, stream.file_name)
d = stream.get_total_bytes()
d.addCallback(lambda total_bytes: _save_producer(EncryptedFileStreamer(request, path, total_bytes)))
d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True))
##request.notifyFinish().addCallback(lambda _: self._producer.stopProducing())
request.notifyFinish().addErrback(self._responseFailed, d)
return d
def render_GET(self, request):
request.setHeader("Content-Security-Policy", "sandbox")
if 'name' in request.args.keys():
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
d = self._api._download_name(request.args['name'][0])
# d.addCallback(lambda stream: self.makeProducer(request, stream))
d.addCallback(lambda stream: static.File(os.path.join(self._api.download_directory,
stream.file_name)).render_GET(request))
d.addCallback(lambda stream: self.makeProducer(request, stream))
elif request.args['name'][0] in self._api.waiting_on.keys():
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
request.finish()
@ -356,11 +332,13 @@ class HostedEncryptedFile(resource.Resource):
request.finish()
return server.NOT_DONE_YET
# def _responseFailed(self, err, call):
# call.addErrback(lambda err: err.trap(error.ConnectionDone))
# call.addErrback(lambda err: err.trap(defer.CancelledError))
# call.addErrback(lambda err: log.info("Error: " + str(err)))
# call.cancel()
def _responseFailed(self, err, call):
log.error("Hosted file response failed with error: " + str(err))
#call.addErrback(lambda err: err.trap(error.ConnectionDone))
#call.addErrback(lambda err: err.trap(defer.CancelledError))
#call.addErrback(lambda err: log.info("Error: " + str(err)))
#call.cancel()
class EncryptedFileUpload(resource.Resource):
"""