diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 810011d23..8b784ab48 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.6.2 +current_version = 0.6.3 commit = True tag = True diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 5089ac8a4..55b0400f2 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.6.2" +__version__ = "0.6.3" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) \ No newline at end of file diff --git a/lbrynet/lbrynet_daemon/DaemonServer.py b/lbrynet/lbrynet_daemon/DaemonServer.py index 7ddad04ef..c02ff0cc5 100644 --- a/lbrynet/lbrynet_daemon/DaemonServer.py +++ b/lbrynet/lbrynet_daemon/DaemonServer.py @@ -11,11 +11,12 @@ import cgi from appdirs import user_data_dir from twisted.web import server, static, resource -from twisted.internet import defer, interfaces, error, reactor, threads +from twisted.internet import abstract, defer, interfaces, error, reactor, task -from zope.interface import implements +from zope.interface import implementer from lbrynet.lbrynet_daemon.Daemon import Daemon +from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.conf import API_ADDRESS, UI_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME @@ -231,123 +232,112 @@ class LBRYindex(resource.Resource): return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request) +@implementer(interfaces.IPushProducer) class EncryptedFileStreamer(object): """ - Writes downloaded LBRY file to request as the download comes in, pausing and resuming as requested - used for Chrome + Writes LBRY stream to request; will pause to wait for new data if the file + is downloading. + + No support for range requests (some browser players can't handle it when + the full video data isn't available on request). """ - implements(interfaces.IPushProducer) + bufferSize = abstract.FileDescriptor.bufferSize + + + # How long to wait between sending blocks (needed because some + # video players freeze up if you try to send data too fast) + stream_interval = 0.005 + + # How long to wait before checking if new data has been appended to the file + new_data_check_interval = 0.25 + + + def __init__(self, request, path, stream, file_manager): + def _set_content_length_header(length): + self._request.setHeader('content-length', length) + return defer.succeed(None) - def __init__(self, request, path, start, stop, size): self._request = request - self._fileObject = file(path) - self._content_type = mimetypes.guess_type(path)[0] - self._stop_pos = size - 1 if stop == '' else int(stop) #chrome and firefox send range requests for "0-" - self._cursor = self._start_pos = int(start) - self._file_size = size - self._depth = 0 + self._file = open(path, 'rb') + self._stream = stream + self._file_manager = file_manager + self._headers_sent = False - self._paused = self._sent_bytes = self._stopped = False - self._delay = 0.25 - self._deferred = defer.succeed(None) + self._running = True - self._request.setResponseCode(206) - self._request.setHeader('accept-ranges', 'bytes') - self._request.setHeader('content-type', self._content_type) + self._request.setResponseCode(200) + self._request.setHeader('accept-ranges', 'none') + self._request.setHeader('content-type', mimetypes.guess_type(path)[0]) self._request.setHeader("Content-Security-Policy", "sandbox") - self.resumeProducing() + self._deferred = stream.get_total_bytes() + self._deferred.addCallback(_set_content_length_header) + self._deferred.addCallback(lambda _: self.resumeProducing()) + + def _check_for_new_data(self): + def _recurse_or_stop(stream_status): + if not self._running: + return + + if stream_status != ManagedEncryptedFileDownloader.STATUS_FINISHED: + self._deferred.addCallback(lambda _: task.deferLater(reactor, self.new_data_check_interval, self._check_for_new_data)) + else: + self.stopProducing() + + if not self._running: + return + + # Clear the file's EOF indicator by seeking to current position + self._file.seek(self._file.tell()) + + data = self._file.read(self.bufferSize) + if data: + self._request.write(data) + if self._running: # .write() can trigger a pause + self._deferred.addCallback(lambda _: task.deferLater(reactor, self.stream_interval, self._check_for_new_data)) + else: + self._deferred.addCallback(lambda _: self._file_manager.get_lbry_file_status(self._stream)) + self._deferred.addCallback(_recurse_or_stop) def pauseProducing(self): - self._paused = True - log.info("Pausing producer") - return defer.succeed(None) + self._running = False def resumeProducing(self): - def _check_for_new_data(): - self._depth += 1 - self._fileObject.seek(self._start_pos, os.SEEK_END) - readable_bytes = self._fileObject.tell() - self._fileObject.seek(self._cursor) - - self._sent_bytes = False - - if (readable_bytes > self._cursor) and not (self._stopped or self._paused): - read_length = min(readable_bytes, self._stop_pos) - self._cursor + 1 - self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length - 1, self._file_size)) - self._request.setHeader('content-length', str(read_length)) - start_cur = self._cursor - for i in range(read_length): - if self._paused or self._stopped: - break - else: - data = self._fileObject.read(1) - self._request.write(data) - self._cursor += 1 - - log.info("Wrote range %s-%s/%s, length: %s, readable: %s, depth: %s" % - (start_cur, self._cursor, self._file_size, self._cursor - start_cur, readable_bytes, self._depth)) - self._sent_bytes = True - - if self._cursor == self._stop_pos + 1: - self.stopProducing() - return defer.succeed(None) - elif self._paused or self._stopped: - return defer.succeed(None) - else: - self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self._delay, _check_for_new_data)) - return defer.succeed(None) - - log.info("Resuming producer") - self._paused = False - self._deferred.addCallback(lambda _: _check_for_new_data()) + self._running = True + self._check_for_new_data() def stopProducing(self): - log.info("Stopping producer") - self._stopped = True - # self._fileObject.close() + self._running = False + self._file.close() self._deferred.addErrback(lambda err: err.trap(defer.CancelledError)) self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone)) self._deferred.cancel() - # self._request.finish() self._request.unregisterProducer() - return defer.succeed(None) + self._request.finish() class HostedEncryptedFile(resource.Resource): def __init__(self, api): self._api = api - self._producer = None resource.Resource.__init__(self) - # todo: fix EncryptedFileStreamer and use it instead of static.File - # def makeProducer(self, request, stream): - # def _save_producer(producer): - # self._producer = producer - # return defer.succeed(None) - # - # range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-') - # start, stop = int(range_header[0]), range_header[1] - # log.info("GET range %s-%s" % (start, stop)) - # path = os.path.join(self._api.download_directory, stream.file_name) - # - # d = stream.get_total_bytes() - # d.addCallback(lambda size: _save_producer(EncryptedFileStreamer(request, path, start, stop, size))) - # d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True)) - # # request.notifyFinish().addCallback(lambda _: self._producer.stopProducing()) - # request.notifyFinish().addErrback(self._responseFailed, d) - # return d + def _make_stream_producer(self, request, stream): + path = os.path.join(self._api.download_directory, stream.file_name) + + producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager) + request.registerProducer(producer, streaming=True) + + d = request.notifyFinish() + d.addErrback(self._responseFailed, d) + return d def render_GET(self, request): request.setHeader("Content-Security-Policy", "sandbox") if 'name' in request.args.keys(): if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): d = self._api._download_name(request.args['name'][0]) - # d.addCallback(lambda stream: self.makeProducer(request, stream)) - d.addCallback(lambda stream: static.File(os.path.join(self._api.download_directory, - stream.file_name)).render_GET(request)) - + d.addCallback(lambda stream: self._make_stream_producer(request, stream)) elif request.args['name'][0] in self._api.waiting_on.keys(): request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) request.finish() @@ -356,11 +346,11 @@ class HostedEncryptedFile(resource.Resource): request.finish() return server.NOT_DONE_YET - # def _responseFailed(self, err, call): - # call.addErrback(lambda err: err.trap(error.ConnectionDone)) - # call.addErrback(lambda err: err.trap(defer.CancelledError)) - # call.addErrback(lambda err: log.info("Error: " + str(err))) - # call.cancel() + def _responseFailed(self, err, call): + call.addErrback(lambda err: err.trap(error.ConnectionDone)) + call.addErrback(lambda err: err.trap(defer.CancelledError)) + call.addErrback(lambda err: log.info("Error: " + str(err))) + call.cancel() class EncryptedFileUpload(resource.Resource): """ diff --git a/packaging/ubuntu/lbry.desktop b/packaging/ubuntu/lbry.desktop index 48182d854..bffd1eb28 100644 --- a/packaging/ubuntu/lbry.desktop +++ b/packaging/ubuntu/lbry.desktop @@ -1,5 +1,5 @@ [Desktop Entry] -Version=0.6.2 +Version=0.6.3 Name=LBRY Comment=The world's first user-owned content marketplace Icon=lbry