streaming files

still needs work, still has problems in safari and is otherwise slow
This commit is contained in:
Jack 2016-05-01 05:17:59 -04:00
parent 226e9084c9
commit 46368f52f5

View file

@ -2,10 +2,8 @@ import logging
import subprocess import subprocess
import os import os
import shutil import shutil
from twisted.internet.task import LoopingCall
from txjsonrpc.web import jsonrpc
import json import json
import sys
from StringIO import StringIO from StringIO import StringIO
from zipfile import ZipFile from zipfile import ZipFile
@ -13,15 +11,21 @@ from urllib import urlopen
from datetime import datetime from datetime import datetime
from appdirs import user_data_dir from appdirs import user_data_dir
from twisted.web import server, static, resource from twisted.web import server, static, resource
from twisted.internet import defer from twisted.internet import defer, interfaces, error, reactor, task
from twisted.web.static import StaticProducer from twisted.python.failure import Failure
from txjsonrpc.web import jsonrpc
from zope.interface import implements
from lbrynet.lbrynet_daemon.LBRYDaemon import LBRYDaemon from lbrynet.lbrynet_daemon.LBRYDaemon import LBRYDaemon
from lbrynet.conf import API_CONNECTION_STRING, API_ADDRESS, DEFAULT_WALLET, UI_ADDRESS from lbrynet.conf import API_CONNECTION_STRING, API_ADDRESS, DEFAULT_WALLET, UI_ADDRESS
log = logging.getLogger(__name__)
data_dir = user_data_dir("LBRY") if sys.platform != "darwin":
data_dir = os.path.join(os.path.expanduser("~"), ".lbrynet")
else:
data_dir = user_data_dir("LBRY")
if not os.path.isdir(data_dir): if not os.path.isdir(data_dir):
os.mkdir(data_dir) os.mkdir(data_dir)
version_dir = os.path.join(data_dir, "ui_version_history") version_dir = os.path.join(data_dir, "ui_version_history")
@ -31,6 +35,9 @@ if not os.path.isdir(version_dir):
version_log = logging.getLogger("lbry_version") version_log = logging.getLogger("lbry_version")
version_log.addHandler(logging.FileHandler(os.path.join(version_dir, "lbry_version.log"))) version_log.addHandler(logging.FileHandler(os.path.join(version_dir, "lbry_version.log")))
version_log.setLevel(logging.INFO) version_log.setLevel(logging.INFO)
log = logging.getLogger(__name__)
log.addHandler(logging.FileHandler(os.path.join(data_dir, 'lbrynet-daemon.log')))
log.setLevel(logging.INFO)
class LBRYindex(resource.Resource): class LBRYindex(resource.Resource):
@ -53,82 +60,103 @@ class LBRYindex(resource.Resource):
return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request) return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request)
class LBRYFileProducer(StaticProducer): class LBRYFileStreamer(object):
def __init__(self, request, lbry_stream, api): """
self._api = api Writes downloaded LBRY file to request as the download comes in, pausing and resuming as requested
self.stream = lbry_stream used for Chrome
self.updater = LoopingCall(self._check_for_data) """
self.total_bytes = 0
if lbry_stream.file_written_to:
file_name = lbry_stream.file_written_to
else:
file_name = os.path.join(self._api.download_directory, lbry_stream.file_name)
StaticProducer.__init__(self, request, fileObject=file(file_name))
def start(self): implements(interfaces.IPushProducer)
d = self._set_size()
self.fileObject.seek(0)
self.updater.start(1)
def _set_size(self): def __init__(self, request, path, start, stop, size):
def _set(size): self._request = request
self.request.setHeader('content-length', str(size)) self._fileObject = file(path)
self.request.setHeader('content-type', 'application/octet-stream') self._stop_pos = size if stop == '' else int(stop) #chrome and firefox send range requests for "0-"
return defer.succeed(None) self._cursor = self._start_pos = int(start)
self._file_size = size
d = self.stream.get_total_bytes() self._paused = self._sent_bytes = False
d.addCallback(_set) self._delay = 0.1
return d
def _check_for_data(self): self._request.setResponseCode(206)
def _write_new_data_to_request(): self._request.setHeader('accept-ranges', 'bytes')
self.fileObject.seek(self.fileObject.tell()) self._request.setHeader('content-type', 'application/octet-stream')
data = self.fileObject.read() self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._start_pos, self._stop_pos, self._file_size))
self.total_bytes += len(data)
log.info(str(self.total_bytes))
if data: self.resumeProducing()
self.request.write(data)
return defer.succeed(None)
def _check_status(stream_status): def pauseProducing(self):
if stream_status.running_status == "completed": self._paused = True
self.stopProducing() log.info("[" + str(datetime.now()) + "] Pausing producer")
return defer.succeed(None)
return defer.succeed(None)
d = _write_new_data_to_request()
d.addCallback(lambda _: self.stream.status())
d.addCallback(_check_status)
def resumeProducing(self): def resumeProducing(self):
self.updater.start(1) def _check_for_new_data():
self._fileObject.seek(self._start_pos, os.SEEK_END)
readable_bytes = self._fileObject.tell()
self._fileObject.seek(self._cursor)
self._sent_bytes = False
if readable_bytes > self._cursor:
read_length = min(readable_bytes, self._stop_pos) - self._cursor
log.info('Writing range %s-%s/%s' % (self._cursor, self._cursor + read_length, self._file_size))
self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length, self._file_size))
for i in range(read_length + 1):
if self._paused:
break
else:
data = self._fileObject.read(1)
self._request.write(data)
self._cursor += 1
self._sent_bytes = True
return defer.succeed(None)
def _write_reply():
if self._cursor == self._stop_pos + 1:
self.stopProducing()
return defer.succeed(None)
elif self._paused:
return defer.succeed(None)
else:
d = task.deferLater(reactor, self._delay, _check_for_new_data)
d.addCallback(lambda _: _write_reply())
return d
log.info("[" + str(datetime.now()) + "] Resuming producer")
self._paused = False
_write_reply()
return defer.succeed(None)
def stopProducing(self): def stopProducing(self):
self.updater.stop() log.info("Stopping producer")
self.fileObject.close() # self._fileObject.close()
self.stream.stop() self._request.finish()
self.request.finish() self._request.unregisterProducer()
class HostedLBRYFile(resource.Resource): class HostedLBRYFile(resource.Resource):
def __init__(self, api): def __init__(self, api):
self._api = api self._api = api
self.stream = None
self.streaming_file = None
self.producer = None
resource.Resource.__init__(self) resource.Resource.__init__(self)
def makeProducer(self, request, stream): def makeProducer(self, request, stream):
self.producer = LBRYFileProducer(request, stream, self._api) range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-')
return self.producer start, stop = int(range_header[0]), range_header[1]
path = os.path.join(self._api.download_directory, stream.file_name)
d = stream.get_total_bytes()
d.addCallback(lambda size: request.registerProducer(LBRYFileStreamer(request, path, start, stop, size), streaming=True))
return d
def render_GET(self, request): def render_GET(self, request):
if 'name' in request.args.keys(): if 'name' in request.args.keys():
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
d = self._api._download_name(request.args['name'][0]) d = self._api._download_name(request.args['name'][0])
d.addCallback(lambda stream: self.makeProducer(request, stream)) d.addCallback(lambda stream: self.makeProducer(request, stream))
d.addCallback(lambda producer: producer.start()) request.notifyFinish().addErrback(self._responseFailed, d)
elif request.args['name'][0] in self._api.waiting_on.keys(): elif request.args['name'][0] in self._api.waiting_on.keys():
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
request.finish() request.finish()
@ -137,24 +165,55 @@ class HostedLBRYFile(resource.Resource):
request.finish() request.finish()
return server.NOT_DONE_YET return server.NOT_DONE_YET
def _responseFailed(self, err, call):
call.addErrback(lambda err: err.trap(error.ConnectionDone))
call.addErrback(lambda err: err.trap(defer.CancelledError))
call.addErrback(lambda err: log.info("Error: " + str(err)))
call.cancel()
class LBRYFileRender(resource.Resource):
class MyLBRYFiles(resource.Resource):
isLeaf = False isLeaf = False
def __init__(self):
resource.Resource.__init__(self)
self.files_table = None
def delayed_render(self, request, result):
request.write(result.encode('utf-8'))
request.finish()
def render_GET(self, request): def render_GET(self, request):
if 'name' in request.args.keys(): self.files_table = None
api = jsonrpc.Proxy(API_CONNECTION_STRING) api = jsonrpc.Proxy(API_CONNECTION_STRING)
if request.args['name'][0] != 'lbry': d = api.callRemote("get_lbry_files", {})
d = api.callRemote("get", {'name': request.args['name'][0]}) d.addCallback(self._get_table)
d.addCallback(lambda results: static.File(results['path'], defaultType='video/octet-stream')) d.addCallback(lambda results: self.delayed_render(request, results))
d.addCallback(lambda static_file: static_file.render_GET(request) if static_file.getFileSize() > 0
else server.failure) return server.NOT_DONE_YET
else:
request.redirect(UI_ADDRESS) def _get_table(self, files):
request.finish() if not self.files_table:
return server.NOT_DONE_YET self.files_table = r'<html><head><title>My LBRY files</title></head><body><table border="1">'
self.files_table += r'<tr>'
self.files_table += r'<td>Stream name</td>'
self.files_table += r'<td>Completed</td>'
self.files_table += r'<td>Toggle</td>'
self.files_table += r'<td>Remove</td>'
self.files_table += r'</tr>'
return self._get_table(files)
if not len(files):
self.files_table += r'</table></body></html>'
return self.files_table
else: else:
return server.failure f = files.pop()
self.files_table += r'<tr>'
self.files_table += r'<td>%s</td>' % (f['stream_name'])
self.files_table += r'<td>%s</td>' % (f['completed'])
self.files_table += r'<td>Start</td>' if f['stopped'] else r'<td>Stop</td>'
self.files_table += r'<td>Delete</td>'
self.files_table += r'</tr>'
return self._get_table(files)
class LBRYDaemonServer(object): class LBRYDaemonServer(object):
@ -267,8 +326,8 @@ class LBRYDaemonServer(object):
self.root.putChild("font", static.File(os.path.join(self.ui_dir, "font"))) self.root.putChild("font", static.File(os.path.join(self.ui_dir, "font")))
self.root.putChild("img", static.File(os.path.join(self.ui_dir, "img"))) self.root.putChild("img", static.File(os.path.join(self.ui_dir, "img")))
self.root.putChild("js", static.File(os.path.join(self.ui_dir, "js"))) self.root.putChild("js", static.File(os.path.join(self.ui_dir, "js")))
# self.root.putChild("view", LBRYFileRender())
self.root.putChild("view", HostedLBRYFile(self._api)) self.root.putChild("view", HostedLBRYFile(self._api))
self.root.putChild("files", MyLBRYFiles())
self.root.putChild(API_ADDRESS, self._api) self.root.putChild(API_ADDRESS, self._api)
return defer.succeed(True) return defer.succeed(True)