From 46368f52f5fd2f4496f71cdf0675321b5a9be79a Mon Sep 17 00:00:00 2001 From: Jack Date: Sun, 1 May 2016 05:17:59 -0400 Subject: [PATCH] streaming files still needs work, still has problems in safari and is otherwise slow --- lbrynet/lbrynet_daemon/LBRYDaemonServer.py | 205 +++++++++++++-------- 1 file changed, 132 insertions(+), 73 deletions(-) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemonServer.py b/lbrynet/lbrynet_daemon/LBRYDaemonServer.py index 740a1bd42..b6d4942cf 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemonServer.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemonServer.py @@ -2,10 +2,8 @@ import logging import subprocess import os import shutil - -from twisted.internet.task import LoopingCall -from txjsonrpc.web import jsonrpc import json +import sys from StringIO import StringIO from zipfile import ZipFile @@ -13,15 +11,21 @@ from urllib import urlopen from datetime import datetime from appdirs import user_data_dir from twisted.web import server, static, resource -from twisted.internet import defer -from twisted.web.static import StaticProducer +from twisted.internet import defer, interfaces, error, reactor, task +from twisted.python.failure import Failure +from txjsonrpc.web import jsonrpc + +from zope.interface import implements from lbrynet.lbrynet_daemon.LBRYDaemon import LBRYDaemon from lbrynet.conf import API_CONNECTION_STRING, API_ADDRESS, DEFAULT_WALLET, UI_ADDRESS -log = logging.getLogger(__name__) -data_dir = user_data_dir("LBRY") +if sys.platform != "darwin": + data_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") +else: + data_dir = user_data_dir("LBRY") + if not os.path.isdir(data_dir): os.mkdir(data_dir) version_dir = os.path.join(data_dir, "ui_version_history") @@ -31,6 +35,9 @@ if not os.path.isdir(version_dir): version_log = logging.getLogger("lbry_version") version_log.addHandler(logging.FileHandler(os.path.join(version_dir, "lbry_version.log"))) version_log.setLevel(logging.INFO) +log = logging.getLogger(__name__) +log.addHandler(logging.FileHandler(os.path.join(data_dir, 'lbrynet-daemon.log'))) +log.setLevel(logging.INFO) class LBRYindex(resource.Resource): @@ -53,82 +60,103 @@ class LBRYindex(resource.Resource): return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request) -class LBRYFileProducer(StaticProducer): - def __init__(self, request, lbry_stream, api): - self._api = api - self.stream = lbry_stream - self.updater = LoopingCall(self._check_for_data) - self.total_bytes = 0 - if lbry_stream.file_written_to: - file_name = lbry_stream.file_written_to - else: - file_name = os.path.join(self._api.download_directory, lbry_stream.file_name) - StaticProducer.__init__(self, request, fileObject=file(file_name)) +class LBRYFileStreamer(object): + """ + Writes downloaded LBRY file to request as the download comes in, pausing and resuming as requested + used for Chrome + """ - def start(self): - d = self._set_size() - self.fileObject.seek(0) - self.updater.start(1) + implements(interfaces.IPushProducer) - def _set_size(self): - def _set(size): - self.request.setHeader('content-length', str(size)) - self.request.setHeader('content-type', 'application/octet-stream') - return defer.succeed(None) + def __init__(self, request, path, start, stop, size): + self._request = request + self._fileObject = file(path) + self._stop_pos = size if stop == '' else int(stop) #chrome and firefox send range requests for "0-" + self._cursor = self._start_pos = int(start) + self._file_size = size - d = self.stream.get_total_bytes() - d.addCallback(_set) - return d + self._paused = self._sent_bytes = False + self._delay = 0.1 - def _check_for_data(self): - def _write_new_data_to_request(): - self.fileObject.seek(self.fileObject.tell()) - data = self.fileObject.read() - self.total_bytes += len(data) - log.info(str(self.total_bytes)) + self._request.setResponseCode(206) + self._request.setHeader('accept-ranges', 'bytes') + self._request.setHeader('content-type', 'application/octet-stream') + self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._start_pos, self._stop_pos, self._file_size)) - if data: - self.request.write(data) - return defer.succeed(None) + self.resumeProducing() - def _check_status(stream_status): - if stream_status.running_status == "completed": - self.stopProducing() - - return defer.succeed(None) - - d = _write_new_data_to_request() - d.addCallback(lambda _: self.stream.status()) - d.addCallback(_check_status) + def pauseProducing(self): + self._paused = True + log.info("[" + str(datetime.now()) + "] Pausing producer") + return defer.succeed(None) def resumeProducing(self): - self.updater.start(1) + def _check_for_new_data(): + self._fileObject.seek(self._start_pos, os.SEEK_END) + readable_bytes = self._fileObject.tell() + self._fileObject.seek(self._cursor) + + self._sent_bytes = False + + if readable_bytes > self._cursor: + read_length = min(readable_bytes, self._stop_pos) - self._cursor + log.info('Writing range %s-%s/%s' % (self._cursor, self._cursor + read_length, self._file_size)) + self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length, self._file_size)) + for i in range(read_length + 1): + if self._paused: + break + else: + data = self._fileObject.read(1) + self._request.write(data) + self._cursor += 1 + self._sent_bytes = True + return defer.succeed(None) + + def _write_reply(): + if self._cursor == self._stop_pos + 1: + self.stopProducing() + return defer.succeed(None) + elif self._paused: + return defer.succeed(None) + else: + d = task.deferLater(reactor, self._delay, _check_for_new_data) + d.addCallback(lambda _: _write_reply()) + return d + + log.info("[" + str(datetime.now()) + "] Resuming producer") + + self._paused = False + _write_reply() + return defer.succeed(None) def stopProducing(self): - self.updater.stop() - self.fileObject.close() - self.stream.stop() - self.request.finish() + log.info("Stopping producer") + # self._fileObject.close() + self._request.finish() + self._request.unregisterProducer() class HostedLBRYFile(resource.Resource): def __init__(self, api): self._api = api - self.stream = None - self.streaming_file = None - self.producer = None resource.Resource.__init__(self) def makeProducer(self, request, stream): - self.producer = LBRYFileProducer(request, stream, self._api) - return self.producer + range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-') + start, stop = int(range_header[0]), range_header[1] + path = os.path.join(self._api.download_directory, stream.file_name) + + d = stream.get_total_bytes() + d.addCallback(lambda size: request.registerProducer(LBRYFileStreamer(request, path, start, stop, size), streaming=True)) + + return d def render_GET(self, request): if 'name' in request.args.keys(): if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): d = self._api._download_name(request.args['name'][0]) d.addCallback(lambda stream: self.makeProducer(request, stream)) - d.addCallback(lambda producer: producer.start()) + request.notifyFinish().addErrback(self._responseFailed, d) elif request.args['name'][0] in self._api.waiting_on.keys(): request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) request.finish() @@ -137,24 +165,55 @@ class HostedLBRYFile(resource.Resource): request.finish() return server.NOT_DONE_YET + def _responseFailed(self, err, call): + call.addErrback(lambda err: err.trap(error.ConnectionDone)) + call.addErrback(lambda err: err.trap(defer.CancelledError)) + call.addErrback(lambda err: log.info("Error: " + str(err))) + call.cancel() -class LBRYFileRender(resource.Resource): + +class MyLBRYFiles(resource.Resource): isLeaf = False + def __init__(self): + resource.Resource.__init__(self) + self.files_table = None + + def delayed_render(self, request, result): + request.write(result.encode('utf-8')) + request.finish() + def render_GET(self, request): - if 'name' in request.args.keys(): - api = jsonrpc.Proxy(API_CONNECTION_STRING) - if request.args['name'][0] != 'lbry': - d = api.callRemote("get", {'name': request.args['name'][0]}) - d.addCallback(lambda results: static.File(results['path'], defaultType='video/octet-stream')) - d.addCallback(lambda static_file: static_file.render_GET(request) if static_file.getFileSize() > 0 - else server.failure) - else: - request.redirect(UI_ADDRESS) - request.finish() - return server.NOT_DONE_YET + self.files_table = None + api = jsonrpc.Proxy(API_CONNECTION_STRING) + d = api.callRemote("get_lbry_files", {}) + d.addCallback(self._get_table) + d.addCallback(lambda results: self.delayed_render(request, results)) + + return server.NOT_DONE_YET + + def _get_table(self, files): + if not self.files_table: + self.files_table = r'My LBRY files' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + return self._get_table(files) + if not len(files): + self.files_table += r'
Stream nameCompletedToggleRemove
' + return self.files_table else: - return server.failure + f = files.pop() + self.files_table += r'' + self.files_table += r'%s' % (f['stream_name']) + self.files_table += r'%s' % (f['completed']) + self.files_table += r'Start' if f['stopped'] else r'Stop' + self.files_table += r'Delete' + self.files_table += r'' + return self._get_table(files) class LBRYDaemonServer(object): @@ -267,8 +326,8 @@ class LBRYDaemonServer(object): self.root.putChild("font", static.File(os.path.join(self.ui_dir, "font"))) self.root.putChild("img", static.File(os.path.join(self.ui_dir, "img"))) self.root.putChild("js", static.File(os.path.join(self.ui_dir, "js"))) - # self.root.putChild("view", LBRYFileRender()) self.root.putChild("view", HostedLBRYFile(self._api)) + self.root.putChild("files", MyLBRYFiles()) self.root.putChild(API_ADDRESS, self._api) return defer.succeed(True)