diff --git a/lbrynet/lbrynet_daemon/DaemonServer.py b/lbrynet/lbrynet_daemon/DaemonServer.py index 7ddad04ef..7ede73313 100644 --- a/lbrynet/lbrynet_daemon/DaemonServer.py +++ b/lbrynet/lbrynet_daemon/DaemonServer.py @@ -11,9 +11,9 @@ import cgi from appdirs import user_data_dir from twisted.web import server, static, resource -from twisted.internet import defer, interfaces, error, reactor, threads +from twisted.internet import abstract, defer, interfaces, error, reactor, threads -from zope.interface import implements +from zope.interface import implementer from lbrynet.lbrynet_daemon.Daemon import Daemon from lbrynet.conf import API_ADDRESS, UI_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME @@ -231,29 +231,31 @@ class LBRYindex(resource.Resource): return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request) +@implementer(interfaces.IPushProducer) class EncryptedFileStreamer(object): """ - Writes downloaded LBRY file to request as the download comes in, pausing and resuming as requested - used for Chrome + Writes LBRY stream to request; will pause to wait for new data if the file + is downloading. + + No support for range requests (some browser players can't handle it when + the full video data isn't available on request). """ - implements(interfaces.IPushProducer) + bufferSize = abstract.FileDescriptor.bufferSize + delay = 0.25 - def __init__(self, request, path, start, stop, size): + def __init__(self, request, path, total_bytes): self._request = request - self._fileObject = file(path) + self._fileObject = open(path, 'rb') self._content_type = mimetypes.guess_type(path)[0] - self._stop_pos = size - 1 if stop == '' else int(stop) #chrome and firefox send range requests for "0-" - self._cursor = self._start_pos = int(start) - self._file_size = size - self._depth = 0 + self._bytes_written = 0 + self._stopped = False + self._total_bytes = total_bytes - self._paused = self._sent_bytes = self._stopped = False - self._delay = 0.25 self._deferred = defer.succeed(None) - self._request.setResponseCode(206) - self._request.setHeader('accept-ranges', 'bytes') + self._request.setResponseCode(200) + self._request.setHeader('accept-ranges', 'none') self._request.setHeader('content-type', self._content_type) self._request.setHeader("Content-Security-Policy", "sandbox") @@ -266,47 +268,28 @@ class EncryptedFileStreamer(object): def resumeProducing(self): def _check_for_new_data(): - self._depth += 1 - self._fileObject.seek(self._start_pos, os.SEEK_END) - readable_bytes = self._fileObject.tell() - self._fileObject.seek(self._cursor) + data = self._fileObject.read(self.bufferSize) - self._sent_bytes = False + self._request.write(data) + log.info('wrote to request') + self._bytes_written += len(data) - if (readable_bytes > self._cursor) and not (self._stopped or self._paused): - read_length = min(readable_bytes, self._stop_pos) - self._cursor + 1 - self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length - 1, self._file_size)) - self._request.setHeader('content-length', str(read_length)) - start_cur = self._cursor - for i in range(read_length): - if self._paused or self._stopped: - break - else: - data = self._fileObject.read(1) - self._request.write(data) - self._cursor += 1 - - log.info("Wrote range %s-%s/%s, length: %s, readable: %s, depth: %s" % - (start_cur, self._cursor, self._file_size, self._cursor - start_cur, readable_bytes, self._depth)) - self._sent_bytes = True - - if self._cursor == self._stop_pos + 1: + if self._bytes_written >= self._total_bytes: self.stopProducing() return defer.succeed(None) - elif self._paused or self._stopped: + elif self._stopped: return defer.succeed(None) else: - self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self._delay, _check_for_new_data)) + self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self.delay, _check_for_new_data)) return defer.succeed(None) log.info("Resuming producer") - self._paused = False self._deferred.addCallback(lambda _: _check_for_new_data()) def stopProducing(self): log.info("Stopping producer") self._stopped = True - # self._fileObject.close() + self._fileObject.close() self._deferred.addErrback(lambda err: err.trap(defer.CancelledError)) self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone)) self._deferred.cancel() @@ -321,33 +304,26 @@ class HostedEncryptedFile(resource.Resource): self._producer = None resource.Resource.__init__(self) - # todo: fix EncryptedFileStreamer and use it instead of static.File - # def makeProducer(self, request, stream): - # def _save_producer(producer): - # self._producer = producer - # return defer.succeed(None) - # - # range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-') - # start, stop = int(range_header[0]), range_header[1] - # log.info("GET range %s-%s" % (start, stop)) - # path = os.path.join(self._api.download_directory, stream.file_name) - # - # d = stream.get_total_bytes() - # d.addCallback(lambda size: _save_producer(EncryptedFileStreamer(request, path, start, stop, size))) - # d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True)) - # # request.notifyFinish().addCallback(lambda _: self._producer.stopProducing()) - # request.notifyFinish().addErrback(self._responseFailed, d) - # return d + def makeProducer(self, request, stream): + def _save_producer(producer): + self._producer = producer + return defer.succeed(None) + + path = os.path.join(self._api.download_directory, stream.file_name) + + d = stream.get_total_bytes() + d.addCallback(lambda total_bytes: _save_producer(EncryptedFileStreamer(request, path, total_bytes))) + d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True)) + ##request.notifyFinish().addCallback(lambda _: self._producer.stopProducing()) + request.notifyFinish().addErrback(self._responseFailed, d) + return d def render_GET(self, request): request.setHeader("Content-Security-Policy", "sandbox") if 'name' in request.args.keys(): if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): d = self._api._download_name(request.args['name'][0]) - # d.addCallback(lambda stream: self.makeProducer(request, stream)) - d.addCallback(lambda stream: static.File(os.path.join(self._api.download_directory, - stream.file_name)).render_GET(request)) - + d.addCallback(lambda stream: self.makeProducer(request, stream)) elif request.args['name'][0] in self._api.waiting_on.keys(): request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) request.finish() @@ -356,11 +332,13 @@ class HostedEncryptedFile(resource.Resource): request.finish() return server.NOT_DONE_YET - # def _responseFailed(self, err, call): - # call.addErrback(lambda err: err.trap(error.ConnectionDone)) - # call.addErrback(lambda err: err.trap(defer.CancelledError)) - # call.addErrback(lambda err: log.info("Error: " + str(err))) - # call.cancel() + def _responseFailed(self, err, call): + log.error("Hosted file response failed with error: " + str(err)) + + #call.addErrback(lambda err: err.trap(error.ConnectionDone)) + #call.addErrback(lambda err: err.trap(defer.CancelledError)) + #call.addErrback(lambda err: log.info("Error: " + str(err))) + #call.cancel() class EncryptedFileUpload(resource.Resource): """