forked from LBRYCommunity/lbry-sdk
refactor resources and file streamer into their own files, use NoCacheStaticFile instead of static.File
This commit is contained in:
parent
3ad4ad50ec
commit
fe7ea7c679
7 changed files with 485 additions and 376 deletions
|
@ -7,6 +7,7 @@
|
||||||
# The docstrings in this module contain epytext markup; API documentation
|
# The docstrings in this module contain epytext markup; API documentation
|
||||||
# may be created by processing this file with epydoc: http://epydoc.sf.net
|
# may be created by processing this file with epydoc: http://epydoc.sf.net
|
||||||
|
|
||||||
|
import logging
|
||||||
import binascii
|
import binascii
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -21,6 +22,8 @@ import msgformat
|
||||||
from contact import Contact
|
from contact import Contact
|
||||||
|
|
||||||
reactor = twisted.internet.reactor
|
reactor = twisted.internet.reactor
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TimeoutError(Exception):
|
class TimeoutError(Exception):
|
||||||
""" Raised when a RPC times out """
|
""" Raised when a RPC times out """
|
||||||
|
@ -118,6 +121,9 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
except encoding.DecodeError:
|
except encoding.DecodeError:
|
||||||
# We received some rubbish here
|
# We received some rubbish here
|
||||||
return
|
return
|
||||||
|
except IndexError:
|
||||||
|
log.warning("Couldn't decode dht datagram from %s", address)
|
||||||
|
return
|
||||||
|
|
||||||
message = self._translator.fromPrimitive(msgPrimitive)
|
message = self._translator.fromPrimitive(msgPrimitive)
|
||||||
remoteContact = Contact(message.nodeID, address[0], address[1], self)
|
remoteContact = Contact(message.nodeID, address[0], address[1], self)
|
||||||
|
|
|
@ -12,7 +12,8 @@ from twisted.internet import reactor, defer
|
||||||
from jsonrpc.proxy import JSONRPCProxy
|
from jsonrpc.proxy import JSONRPCProxy
|
||||||
|
|
||||||
from lbrynet.core import log_support
|
from lbrynet.core import log_support
|
||||||
from lbrynet.lbrynet_daemon.DaemonServer import DaemonServer, DaemonRequest
|
from lbrynet.lbrynet_daemon.DaemonServer import DaemonServer
|
||||||
|
from lbrynet.lbrynet_daemon.DaemonRequest import DaemonRequest
|
||||||
from lbrynet.conf import API_CONNECTION_STRING, API_INTERFACE, API_PORT, \
|
from lbrynet.conf import API_CONNECTION_STRING, API_INTERFACE, API_PORT, \
|
||||||
UI_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME
|
UI_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME
|
||||||
|
|
||||||
|
|
187
lbrynet/lbrynet_daemon/DaemonRequest.py
Normal file
187
lbrynet/lbrynet_daemon/DaemonRequest.py
Normal file
|
@ -0,0 +1,187 @@
|
||||||
|
import time
|
||||||
|
import cgi
|
||||||
|
import mimetools
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from twisted.web import server
|
||||||
|
|
||||||
|
|
||||||
|
class DaemonRequest(server.Request):
|
||||||
|
"""
|
||||||
|
For LBRY specific request functionality. Currently just provides
|
||||||
|
handling for large multipart POST requests, taken from here:
|
||||||
|
http://sammitch.ca/2013/07/handling-large-requests-in-twisted/
|
||||||
|
|
||||||
|
For multipart POST requests, this populates self.args with temp
|
||||||
|
file objects instead of strings. Note that these files don't auto-delete
|
||||||
|
on close because we want to be able to move and rename them.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# max amount of memory to allow any ~single~ request argument [ie: POSTed file]
|
||||||
|
# note: this value seems to be taken with a grain of salt, memory usage may spike
|
||||||
|
# FAR above this value in some cases.
|
||||||
|
# eg: set the memory limit to 5 MB, write 2 blocks of 4MB, mem usage will
|
||||||
|
# have spiked to 8MB before the data is rolled to disk after the
|
||||||
|
# second write completes.
|
||||||
|
memorylimit = 1024*1024*100
|
||||||
|
|
||||||
|
# enable/disable debug logging
|
||||||
|
do_log = False
|
||||||
|
|
||||||
|
# re-defined only for debug/logging purposes
|
||||||
|
def gotLength(self, length):
|
||||||
|
if self.do_log:
|
||||||
|
print '%f Headers received, Content-Length: %d' % (time.time(), length)
|
||||||
|
server.Request.gotLength(self, length)
|
||||||
|
|
||||||
|
# re-definition of twisted.web.server.Request.requestreceived, the only difference
|
||||||
|
# is that self.parse_multipart() is used rather than cgi.parse_multipart()
|
||||||
|
def requestReceived(self, command, path, version):
|
||||||
|
from twisted.web.http import parse_qs
|
||||||
|
if self.do_log:
|
||||||
|
print '%f Request Received' % time.time()
|
||||||
|
|
||||||
|
self.content.seek(0,0)
|
||||||
|
self.args = {}
|
||||||
|
self.stack = []
|
||||||
|
|
||||||
|
self.method, self.uri = command, path
|
||||||
|
self.clientproto = version
|
||||||
|
x = self.uri.split(b'?', 1)
|
||||||
|
|
||||||
|
if len(x) == 1:
|
||||||
|
self.path = self.uri
|
||||||
|
else:
|
||||||
|
self.path, argstring = x
|
||||||
|
self.args = parse_qs(argstring, 1)
|
||||||
|
|
||||||
|
# cache the client and server information, we'll need this later to be
|
||||||
|
# serialized and sent with the request so CGIs will work remotely
|
||||||
|
self.client = self.channel.transport.getPeer()
|
||||||
|
self.host = self.channel.transport.getHost()
|
||||||
|
|
||||||
|
# Argument processing
|
||||||
|
args = self.args
|
||||||
|
ctype = self.requestHeaders.getRawHeaders(b'content-type')
|
||||||
|
if ctype is not None:
|
||||||
|
ctype = ctype[0]
|
||||||
|
|
||||||
|
if self.method == b"POST" and ctype:
|
||||||
|
mfd = b'multipart/form-data'
|
||||||
|
key, pdict = cgi.parse_header(ctype)
|
||||||
|
if key == b'application/x-www-form-urlencoded':
|
||||||
|
args.update(parse_qs(self.content.read(), 1))
|
||||||
|
elif key == mfd:
|
||||||
|
try:
|
||||||
|
self.content.seek(0,0)
|
||||||
|
args.update(self.parse_multipart(self.content, pdict))
|
||||||
|
#args.update(cgi.parse_multipart(self.content, pdict))
|
||||||
|
|
||||||
|
except KeyError as e:
|
||||||
|
if e.args[0] == b'content-disposition':
|
||||||
|
# Parse_multipart can't cope with missing
|
||||||
|
# content-dispostion headers in multipart/form-data
|
||||||
|
# parts, so we catch the exception and tell the client
|
||||||
|
# it was a bad request.
|
||||||
|
self.channel.transport.write(
|
||||||
|
b"HTTP/1.1 400 Bad Request\r\n\r\n")
|
||||||
|
self.channel.transport.loseConnection()
|
||||||
|
return
|
||||||
|
raise
|
||||||
|
|
||||||
|
self.content.seek(0, 0)
|
||||||
|
|
||||||
|
self.process()
|
||||||
|
|
||||||
|
# re-definition of cgi.parse_multipart that uses a single temporary file to store
|
||||||
|
# data rather than storing 2 to 3 copies in various lists.
|
||||||
|
def parse_multipart(self, fp, pdict):
|
||||||
|
if self.do_log:
|
||||||
|
print '%f Parsing Multipart data: ' % time.time()
|
||||||
|
rewind = fp.tell() #save cursor
|
||||||
|
fp.seek(0,0) #reset cursor
|
||||||
|
|
||||||
|
boundary = ""
|
||||||
|
if 'boundary' in pdict:
|
||||||
|
boundary = pdict['boundary']
|
||||||
|
if not cgi.valid_boundary(boundary):
|
||||||
|
raise ValueError, ('Invalid boundary in multipart form: %r'
|
||||||
|
% (boundary,))
|
||||||
|
|
||||||
|
nextpart = "--" + boundary
|
||||||
|
lastpart = "--" + boundary + "--"
|
||||||
|
partdict = {}
|
||||||
|
terminator = ""
|
||||||
|
|
||||||
|
while terminator != lastpart:
|
||||||
|
c_bytes = -1
|
||||||
|
|
||||||
|
data = tempfile.NamedTemporaryFile(delete=False)
|
||||||
|
if terminator:
|
||||||
|
# At start of next part. Read headers first.
|
||||||
|
headers = mimetools.Message(fp)
|
||||||
|
clength = headers.getheader('content-length')
|
||||||
|
if clength:
|
||||||
|
try:
|
||||||
|
c_bytes = int(clength)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
if c_bytes > 0:
|
||||||
|
data.write(fp.read(c_bytes))
|
||||||
|
# Read lines until end of part.
|
||||||
|
while 1:
|
||||||
|
line = fp.readline()
|
||||||
|
if not line:
|
||||||
|
terminator = lastpart # End outer loop
|
||||||
|
break
|
||||||
|
if line[:2] == "--":
|
||||||
|
terminator = line.strip()
|
||||||
|
if terminator in (nextpart, lastpart):
|
||||||
|
break
|
||||||
|
data.write(line)
|
||||||
|
# Done with part.
|
||||||
|
if data.tell() == 0:
|
||||||
|
continue
|
||||||
|
if c_bytes < 0:
|
||||||
|
# if a Content-Length header was not supplied with the MIME part
|
||||||
|
# then the trailing line break must be removed.
|
||||||
|
# we have data, read the last 2 bytes
|
||||||
|
rewind = min(2, data.tell())
|
||||||
|
data.seek(-rewind, os.SEEK_END)
|
||||||
|
line = data.read(2)
|
||||||
|
if line[-2:] == "\r\n":
|
||||||
|
data.seek(-2, os.SEEK_END)
|
||||||
|
data.truncate()
|
||||||
|
elif line[-1:] == "\n":
|
||||||
|
data.seek(-1, os.SEEK_END)
|
||||||
|
data.truncate()
|
||||||
|
|
||||||
|
line = headers['content-disposition']
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
key, params = cgi.parse_header(line)
|
||||||
|
if key != 'form-data':
|
||||||
|
continue
|
||||||
|
if 'name' in params:
|
||||||
|
name = params['name']
|
||||||
|
# kludge in the filename
|
||||||
|
if 'filename' in params:
|
||||||
|
fname_index = name + '_filename'
|
||||||
|
if fname_index in partdict:
|
||||||
|
partdict[fname_index].append(params['filename'])
|
||||||
|
else:
|
||||||
|
partdict[fname_index] = [params['filename']]
|
||||||
|
else:
|
||||||
|
# Unnamed parts are not returned at all.
|
||||||
|
continue
|
||||||
|
data.seek(0,0)
|
||||||
|
if name in partdict:
|
||||||
|
partdict[name].append(data)
|
||||||
|
else:
|
||||||
|
partdict[name] = [data]
|
||||||
|
|
||||||
|
fp.seek(rewind) # Restore cursor
|
||||||
|
return partdict
|
||||||
|
|
||||||
|
|
|
@ -1,23 +1,12 @@
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import json
|
|
||||||
import sys
|
import sys
|
||||||
import mimetypes
|
|
||||||
import mimetools
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
import cgi
|
|
||||||
|
|
||||||
from appdirs import user_data_dir
|
from appdirs import user_data_dir
|
||||||
from twisted.web import server, static, resource
|
from twisted.internet import defer
|
||||||
from twisted.internet import abstract, defer, interfaces, error, reactor, task
|
|
||||||
|
|
||||||
from zope.interface import implementer
|
|
||||||
|
|
||||||
from lbrynet.lbrynet_daemon.Daemon import Daemon
|
from lbrynet.lbrynet_daemon.Daemon import Daemon
|
||||||
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
from lbrynet.lbrynet_daemon.Resources import LBRYindex, HostedEncryptedFile, EncryptedFileUpload
|
||||||
from lbrynet.conf import API_ADDRESS, UI_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME
|
from lbrynet.conf import API_ADDRESS, DEFAULT_UI_BRANCH, LOG_FILE_NAME
|
||||||
|
|
||||||
|
|
||||||
# TODO: omg, this code is essentially duplicated in Daemon
|
# TODO: omg, this code is essentially duplicated in Daemon
|
||||||
|
@ -32,365 +21,6 @@ lbrynet_log = os.path.join(data_dir, LOG_FILE_NAME)
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class DaemonRequest(server.Request):
|
|
||||||
"""
|
|
||||||
For LBRY specific request functionality. Currently just provides
|
|
||||||
handling for large multipart POST requests, taken from here:
|
|
||||||
http://sammitch.ca/2013/07/handling-large-requests-in-twisted/
|
|
||||||
|
|
||||||
For multipart POST requests, this populates self.args with temp
|
|
||||||
file objects instead of strings. Note that these files don't auto-delete
|
|
||||||
on close because we want to be able to move and rename them.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
# max amount of memory to allow any ~single~ request argument [ie: POSTed file]
|
|
||||||
# note: this value seems to be taken with a grain of salt, memory usage may spike
|
|
||||||
# FAR above this value in some cases.
|
|
||||||
# eg: set the memory limit to 5 MB, write 2 blocks of 4MB, mem usage will
|
|
||||||
# have spiked to 8MB before the data is rolled to disk after the
|
|
||||||
# second write completes.
|
|
||||||
memorylimit = 1024*1024*100
|
|
||||||
|
|
||||||
# enable/disable debug logging
|
|
||||||
do_log = False
|
|
||||||
|
|
||||||
# re-defined only for debug/logging purposes
|
|
||||||
def gotLength(self, length):
|
|
||||||
if self.do_log:
|
|
||||||
print '%f Headers received, Content-Length: %d' % (time.time(), length)
|
|
||||||
server.Request.gotLength(self, length)
|
|
||||||
|
|
||||||
# re-definition of twisted.web.server.Request.requestreceived, the only difference
|
|
||||||
# is that self.parse_multipart() is used rather than cgi.parse_multipart()
|
|
||||||
def requestReceived(self, command, path, version):
|
|
||||||
from twisted.web.http import parse_qs
|
|
||||||
if self.do_log:
|
|
||||||
print '%f Request Received' % time.time()
|
|
||||||
|
|
||||||
self.content.seek(0,0)
|
|
||||||
self.args = {}
|
|
||||||
self.stack = []
|
|
||||||
|
|
||||||
self.method, self.uri = command, path
|
|
||||||
self.clientproto = version
|
|
||||||
x = self.uri.split(b'?', 1)
|
|
||||||
|
|
||||||
if len(x) == 1:
|
|
||||||
self.path = self.uri
|
|
||||||
else:
|
|
||||||
self.path, argstring = x
|
|
||||||
self.args = parse_qs(argstring, 1)
|
|
||||||
|
|
||||||
# cache the client and server information, we'll need this later to be
|
|
||||||
# serialized and sent with the request so CGIs will work remotely
|
|
||||||
self.client = self.channel.transport.getPeer()
|
|
||||||
self.host = self.channel.transport.getHost()
|
|
||||||
|
|
||||||
# Argument processing
|
|
||||||
args = self.args
|
|
||||||
ctype = self.requestHeaders.getRawHeaders(b'content-type')
|
|
||||||
if ctype is not None:
|
|
||||||
ctype = ctype[0]
|
|
||||||
|
|
||||||
if self.method == b"POST" and ctype:
|
|
||||||
mfd = b'multipart/form-data'
|
|
||||||
key, pdict = cgi.parse_header(ctype)
|
|
||||||
if key == b'application/x-www-form-urlencoded':
|
|
||||||
args.update(parse_qs(self.content.read(), 1))
|
|
||||||
elif key == mfd:
|
|
||||||
try:
|
|
||||||
self.content.seek(0,0)
|
|
||||||
args.update(self.parse_multipart(self.content, pdict))
|
|
||||||
#args.update(cgi.parse_multipart(self.content, pdict))
|
|
||||||
|
|
||||||
except KeyError as e:
|
|
||||||
if e.args[0] == b'content-disposition':
|
|
||||||
# Parse_multipart can't cope with missing
|
|
||||||
# content-dispostion headers in multipart/form-data
|
|
||||||
# parts, so we catch the exception and tell the client
|
|
||||||
# it was a bad request.
|
|
||||||
self.channel.transport.write(
|
|
||||||
b"HTTP/1.1 400 Bad Request\r\n\r\n")
|
|
||||||
self.channel.transport.loseConnection()
|
|
||||||
return
|
|
||||||
raise
|
|
||||||
|
|
||||||
self.content.seek(0, 0)
|
|
||||||
|
|
||||||
self.process()
|
|
||||||
|
|
||||||
# re-definition of cgi.parse_multipart that uses a single temporary file to store
|
|
||||||
# data rather than storing 2 to 3 copies in various lists.
|
|
||||||
def parse_multipart(self, fp, pdict):
|
|
||||||
if self.do_log:
|
|
||||||
print '%f Parsing Multipart data: ' % time.time()
|
|
||||||
rewind = fp.tell() #save cursor
|
|
||||||
fp.seek(0,0) #reset cursor
|
|
||||||
|
|
||||||
boundary = ""
|
|
||||||
if 'boundary' in pdict:
|
|
||||||
boundary = pdict['boundary']
|
|
||||||
if not cgi.valid_boundary(boundary):
|
|
||||||
raise ValueError, ('Invalid boundary in multipart form: %r'
|
|
||||||
% (boundary,))
|
|
||||||
|
|
||||||
nextpart = "--" + boundary
|
|
||||||
lastpart = "--" + boundary + "--"
|
|
||||||
partdict = {}
|
|
||||||
terminator = ""
|
|
||||||
|
|
||||||
while terminator != lastpart:
|
|
||||||
c_bytes = -1
|
|
||||||
|
|
||||||
data = tempfile.NamedTemporaryFile(delete=False)
|
|
||||||
if terminator:
|
|
||||||
# At start of next part. Read headers first.
|
|
||||||
headers = mimetools.Message(fp)
|
|
||||||
clength = headers.getheader('content-length')
|
|
||||||
if clength:
|
|
||||||
try:
|
|
||||||
c_bytes = int(clength)
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
if c_bytes > 0:
|
|
||||||
data.write(fp.read(c_bytes))
|
|
||||||
# Read lines until end of part.
|
|
||||||
while 1:
|
|
||||||
line = fp.readline()
|
|
||||||
if not line:
|
|
||||||
terminator = lastpart # End outer loop
|
|
||||||
break
|
|
||||||
if line[:2] == "--":
|
|
||||||
terminator = line.strip()
|
|
||||||
if terminator in (nextpart, lastpart):
|
|
||||||
break
|
|
||||||
data.write(line)
|
|
||||||
# Done with part.
|
|
||||||
if data.tell() == 0:
|
|
||||||
continue
|
|
||||||
if c_bytes < 0:
|
|
||||||
# if a Content-Length header was not supplied with the MIME part
|
|
||||||
# then the trailing line break must be removed.
|
|
||||||
# we have data, read the last 2 bytes
|
|
||||||
rewind = min(2, data.tell())
|
|
||||||
data.seek(-rewind, os.SEEK_END)
|
|
||||||
line = data.read(2)
|
|
||||||
if line[-2:] == "\r\n":
|
|
||||||
data.seek(-2, os.SEEK_END)
|
|
||||||
data.truncate()
|
|
||||||
elif line[-1:] == "\n":
|
|
||||||
data.seek(-1, os.SEEK_END)
|
|
||||||
data.truncate()
|
|
||||||
|
|
||||||
line = headers['content-disposition']
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
key, params = cgi.parse_header(line)
|
|
||||||
if key != 'form-data':
|
|
||||||
continue
|
|
||||||
if 'name' in params:
|
|
||||||
name = params['name']
|
|
||||||
# kludge in the filename
|
|
||||||
if 'filename' in params:
|
|
||||||
fname_index = name + '_filename'
|
|
||||||
if fname_index in partdict:
|
|
||||||
partdict[fname_index].append(params['filename'])
|
|
||||||
else:
|
|
||||||
partdict[fname_index] = [params['filename']]
|
|
||||||
else:
|
|
||||||
# Unnamed parts are not returned at all.
|
|
||||||
continue
|
|
||||||
data.seek(0,0)
|
|
||||||
if name in partdict:
|
|
||||||
partdict[name].append(data)
|
|
||||||
else:
|
|
||||||
partdict[name] = [data]
|
|
||||||
|
|
||||||
fp.seek(rewind) # Restore cursor
|
|
||||||
return partdict
|
|
||||||
|
|
||||||
class LBRYindex(resource.Resource):
|
|
||||||
def __init__(self, ui_dir):
|
|
||||||
resource.Resource.__init__(self)
|
|
||||||
self.ui_dir = ui_dir
|
|
||||||
|
|
||||||
isLeaf = False
|
|
||||||
|
|
||||||
def _delayed_render(self, request, results):
|
|
||||||
request.write(str(results))
|
|
||||||
request.finish()
|
|
||||||
|
|
||||||
def getChild(self, name, request):
|
|
||||||
if name == '':
|
|
||||||
return self
|
|
||||||
return resource.Resource.getChild(self, name, request)
|
|
||||||
|
|
||||||
def render_GET(self, request):
|
|
||||||
request.setHeader('cache-control','no-cache, no-store, must-revalidate')
|
|
||||||
request.setHeader('expires', '0')
|
|
||||||
return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request)
|
|
||||||
|
|
||||||
|
|
||||||
@implementer(interfaces.IPushProducer)
|
|
||||||
class EncryptedFileStreamer(object):
|
|
||||||
"""
|
|
||||||
Writes LBRY stream to request; will pause to wait for new data if the file
|
|
||||||
is downloading.
|
|
||||||
|
|
||||||
No support for range requests (some browser players can't handle it when
|
|
||||||
the full video data isn't available on request).
|
|
||||||
"""
|
|
||||||
|
|
||||||
bufferSize = abstract.FileDescriptor.bufferSize
|
|
||||||
|
|
||||||
|
|
||||||
# How long to wait between sending blocks (needed because some
|
|
||||||
# video players freeze up if you try to send data too fast)
|
|
||||||
stream_interval = 0.005
|
|
||||||
|
|
||||||
# How long to wait before checking if new data has been appended to the file
|
|
||||||
new_data_check_interval = 0.25
|
|
||||||
|
|
||||||
|
|
||||||
def __init__(self, request, path, stream, file_manager):
|
|
||||||
def _set_content_length_header(length):
|
|
||||||
self._request.setHeader('content-length', length)
|
|
||||||
return defer.succeed(None)
|
|
||||||
|
|
||||||
self._request = request
|
|
||||||
self._file = open(path, 'rb')
|
|
||||||
self._stream = stream
|
|
||||||
self._file_manager = file_manager
|
|
||||||
self._headers_sent = False
|
|
||||||
|
|
||||||
self._running = True
|
|
||||||
|
|
||||||
self._request.setResponseCode(200)
|
|
||||||
self._request.setHeader('accept-ranges', 'none')
|
|
||||||
self._request.setHeader('content-type', mimetypes.guess_type(path)[0])
|
|
||||||
self._request.setHeader("Content-Security-Policy", "sandbox")
|
|
||||||
|
|
||||||
self._deferred = stream.get_total_bytes()
|
|
||||||
self._deferred.addCallback(_set_content_length_header)
|
|
||||||
self._deferred.addCallback(lambda _: self.resumeProducing())
|
|
||||||
|
|
||||||
def _check_for_new_data(self):
|
|
||||||
def _recurse_or_stop(stream_status):
|
|
||||||
if not self._running:
|
|
||||||
return
|
|
||||||
|
|
||||||
if stream_status != ManagedEncryptedFileDownloader.STATUS_FINISHED:
|
|
||||||
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.new_data_check_interval, self._check_for_new_data))
|
|
||||||
else:
|
|
||||||
self.stopProducing()
|
|
||||||
|
|
||||||
if not self._running:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Clear the file's EOF indicator by seeking to current position
|
|
||||||
self._file.seek(self._file.tell())
|
|
||||||
|
|
||||||
data = self._file.read(self.bufferSize)
|
|
||||||
if data:
|
|
||||||
self._request.write(data)
|
|
||||||
if self._running: # .write() can trigger a pause
|
|
||||||
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.stream_interval, self._check_for_new_data))
|
|
||||||
else:
|
|
||||||
self._deferred.addCallback(lambda _: self._file_manager.get_lbry_file_status(self._stream))
|
|
||||||
self._deferred.addCallback(_recurse_or_stop)
|
|
||||||
|
|
||||||
def pauseProducing(self):
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def resumeProducing(self):
|
|
||||||
self._running = True
|
|
||||||
self._check_for_new_data()
|
|
||||||
|
|
||||||
def stopProducing(self):
|
|
||||||
self._running = False
|
|
||||||
self._file.close()
|
|
||||||
self._deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
|
||||||
self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone))
|
|
||||||
self._deferred.cancel()
|
|
||||||
self._request.unregisterProducer()
|
|
||||||
self._request.finish()
|
|
||||||
|
|
||||||
|
|
||||||
class HostedEncryptedFile(resource.Resource):
|
|
||||||
def __init__(self, api):
|
|
||||||
self._api = api
|
|
||||||
resource.Resource.__init__(self)
|
|
||||||
|
|
||||||
def _make_stream_producer(self, request, stream):
|
|
||||||
path = os.path.join(self._api.download_directory, stream.file_name)
|
|
||||||
|
|
||||||
producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager)
|
|
||||||
request.registerProducer(producer, streaming=True)
|
|
||||||
|
|
||||||
d = request.notifyFinish()
|
|
||||||
d.addErrback(self._responseFailed, d)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def render_GET(self, request):
|
|
||||||
request.setHeader("Content-Security-Policy", "sandbox")
|
|
||||||
if 'name' in request.args.keys():
|
|
||||||
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
|
|
||||||
d = self._api._download_name(request.args['name'][0])
|
|
||||||
d.addCallback(lambda stream: self._make_stream_producer(request, stream))
|
|
||||||
elif request.args['name'][0] in self._api.waiting_on.keys():
|
|
||||||
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
|
|
||||||
request.finish()
|
|
||||||
else:
|
|
||||||
request.redirect(UI_ADDRESS)
|
|
||||||
request.finish()
|
|
||||||
return server.NOT_DONE_YET
|
|
||||||
|
|
||||||
def _responseFailed(self, err, call):
|
|
||||||
call.addErrback(lambda err: err.trap(error.ConnectionDone))
|
|
||||||
call.addErrback(lambda err: err.trap(defer.CancelledError))
|
|
||||||
call.addErrback(lambda err: log.info("Error: " + str(err)))
|
|
||||||
call.cancel()
|
|
||||||
|
|
||||||
class EncryptedFileUpload(resource.Resource):
|
|
||||||
"""
|
|
||||||
Accepts a file sent via the file upload widget in the web UI, saves
|
|
||||||
it into a temporary dir, and responds with a JSON string containing
|
|
||||||
the path of the newly created file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, api):
|
|
||||||
self._api = api
|
|
||||||
|
|
||||||
def render_POST(self, request):
|
|
||||||
origfilename = request.args['file_filename'][0]
|
|
||||||
uploaded_file = request.args['file'][0] # Temp file created by request
|
|
||||||
|
|
||||||
# Move to a new temporary dir and restore the original file name
|
|
||||||
newdirpath = tempfile.mkdtemp()
|
|
||||||
newpath = os.path.join(newdirpath, origfilename)
|
|
||||||
if os.name == "nt":
|
|
||||||
shutil.copy(uploaded_file.name, newpath)
|
|
||||||
# TODO Still need to remove the file
|
|
||||||
|
|
||||||
# TODO deal with pylint error in cleaner fashion than this
|
|
||||||
try:
|
|
||||||
from exceptions import WindowsError as win_except
|
|
||||||
except ImportError as e:
|
|
||||||
log.error("This shouldn't happen")
|
|
||||||
win_except = Exception
|
|
||||||
|
|
||||||
try:
|
|
||||||
os.remove(uploaded_file.name)
|
|
||||||
except win_except as e:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
shutil.move(uploaded_file.name, newpath)
|
|
||||||
self._api.uploaded_temp_files.append(newpath)
|
|
||||||
|
|
||||||
return json.dumps(newpath)
|
|
||||||
|
|
||||||
|
|
||||||
class DaemonServer(object):
|
class DaemonServer(object):
|
||||||
def _setup_server(self, wallet):
|
def _setup_server(self, wallet):
|
||||||
self.root = LBRYindex(os.path.join(os.path.join(data_dir, "lbry-ui"), "active"))
|
self.root = LBRYindex(os.path.join(os.path.join(data_dir, "lbry-ui"), "active"))
|
||||||
|
|
104
lbrynet/lbrynet_daemon/FileStreamer.py
Normal file
104
lbrynet/lbrynet_daemon/FileStreamer.py
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import mimetypes
|
||||||
|
|
||||||
|
from appdirs import user_data_dir
|
||||||
|
from zope.interface import implements
|
||||||
|
from twisted.internet import defer, error, interfaces, abstract, task, reactor
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: omg, this code is essentially duplicated in Daemon
|
||||||
|
if sys.platform != "darwin":
|
||||||
|
data_dir = os.path.join(os.path.expanduser("~"), ".lbrynet")
|
||||||
|
else:
|
||||||
|
data_dir = user_data_dir("LBRY")
|
||||||
|
if not os.path.isdir(data_dir):
|
||||||
|
os.mkdir(data_dir)
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
STATUS_FINISHED = 'finished'
|
||||||
|
|
||||||
|
class EncryptedFileStreamer(object):
|
||||||
|
"""
|
||||||
|
Writes LBRY stream to request; will pause to wait for new data if the file
|
||||||
|
is downloading.
|
||||||
|
|
||||||
|
No support for range requests (some browser players can't handle it when
|
||||||
|
the full video data isn't available on request).
|
||||||
|
"""
|
||||||
|
implements(interfaces.IPushProducer)
|
||||||
|
|
||||||
|
bufferSize = abstract.FileDescriptor.bufferSize
|
||||||
|
|
||||||
|
|
||||||
|
# How long to wait between sending blocks (needed because some
|
||||||
|
# video players freeze up if you try to send data too fast)
|
||||||
|
stream_interval = 0.005
|
||||||
|
|
||||||
|
# How long to wait before checking if new data has been appended to the file
|
||||||
|
new_data_check_interval = 0.25
|
||||||
|
|
||||||
|
|
||||||
|
def __init__(self, request, path, stream, file_manager):
|
||||||
|
def _set_content_length_header(length):
|
||||||
|
self._request.setHeader('content-length', length)
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
|
self._request = request
|
||||||
|
self._file = open(path, 'rb')
|
||||||
|
self._stream = stream
|
||||||
|
self._file_manager = file_manager
|
||||||
|
self._headers_sent = False
|
||||||
|
|
||||||
|
self._running = True
|
||||||
|
|
||||||
|
self._request.setResponseCode(200)
|
||||||
|
self._request.setHeader('accept-ranges', 'none')
|
||||||
|
self._request.setHeader('content-type', mimetypes.guess_type(path)[0])
|
||||||
|
self._request.setHeader("Content-Security-Policy", "sandbox")
|
||||||
|
|
||||||
|
self._deferred = stream.get_total_bytes()
|
||||||
|
self._deferred.addCallback(_set_content_length_header)
|
||||||
|
self._deferred.addCallback(lambda _: self.resumeProducing())
|
||||||
|
|
||||||
|
def _check_for_new_data(self):
|
||||||
|
def _recurse_or_stop(stream_status):
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
if stream_status != STATUS_FINISHED:
|
||||||
|
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.new_data_check_interval, self._check_for_new_data))
|
||||||
|
else:
|
||||||
|
self.stopProducing()
|
||||||
|
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Clear the file's EOF indicator by seeking to current position
|
||||||
|
self._file.seek(self._file.tell())
|
||||||
|
|
||||||
|
data = self._file.read(self.bufferSize)
|
||||||
|
if data:
|
||||||
|
self._request.write(data)
|
||||||
|
if self._running: # .write() can trigger a pause
|
||||||
|
self._deferred.addCallback(lambda _: task.deferLater(reactor, self.stream_interval, self._check_for_new_data))
|
||||||
|
else:
|
||||||
|
self._deferred.addCallback(lambda _: self._file_manager.get_lbry_file_status(self._stream))
|
||||||
|
self._deferred.addCallback(_recurse_or_stop)
|
||||||
|
|
||||||
|
def pauseProducing(self):
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
def resumeProducing(self):
|
||||||
|
self._running = True
|
||||||
|
self._check_for_new_data()
|
||||||
|
|
||||||
|
def stopProducing(self):
|
||||||
|
self._running = False
|
||||||
|
self._file.close()
|
||||||
|
self._deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||||
|
self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone))
|
||||||
|
self._deferred.cancel()
|
||||||
|
self._request.unregisterProducer()
|
||||||
|
self._request.finish()
|
181
lbrynet/lbrynet_daemon/Resources.py
Normal file
181
lbrynet/lbrynet_daemon/Resources.py
Normal file
|
@ -0,0 +1,181 @@
|
||||||
|
import errno
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
|
||||||
|
from appdirs import user_data_dir
|
||||||
|
from twisted.web import http
|
||||||
|
from twisted.web import server, static, resource
|
||||||
|
from twisted.internet import defer, error
|
||||||
|
from twisted.web.static import getTypeAndEncoding
|
||||||
|
|
||||||
|
from lbrynet.conf import UI_ADDRESS
|
||||||
|
from lbrynet.lbrynet_daemon.FileStreamer import EncryptedFileStreamer
|
||||||
|
|
||||||
|
# TODO: omg, this code is essentially duplicated in Daemon
|
||||||
|
|
||||||
|
if sys.platform != "darwin":
|
||||||
|
data_dir = os.path.join(os.path.expanduser("~"), ".lbrynet")
|
||||||
|
else:
|
||||||
|
data_dir = user_data_dir("LBRY")
|
||||||
|
if not os.path.isdir(data_dir):
|
||||||
|
os.mkdir(data_dir)
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class NoCacheStaticFile(static.File):
|
||||||
|
def render_GET(self, request):
|
||||||
|
"""
|
||||||
|
Begin sending the contents of this L{File} (or a subset of the
|
||||||
|
contents, based on the 'range' header) to the given request.
|
||||||
|
"""
|
||||||
|
self.restat(False)
|
||||||
|
request.setHeader('cache-control', 'no-cache, no-store, must-revalidate')
|
||||||
|
request.setHeader('expires', '0')
|
||||||
|
|
||||||
|
if self.type is None:
|
||||||
|
self.type, self.encoding = getTypeAndEncoding(self.basename(),
|
||||||
|
self.contentTypes,
|
||||||
|
self.contentEncodings,
|
||||||
|
self.defaultType)
|
||||||
|
|
||||||
|
if not self.exists():
|
||||||
|
return self.childNotFound.render(request)
|
||||||
|
|
||||||
|
if self.isdir():
|
||||||
|
return self.redirect(request)
|
||||||
|
|
||||||
|
request.setHeader(b'accept-ranges', b'bytes')
|
||||||
|
|
||||||
|
try:
|
||||||
|
fileForReading = self.openForReading()
|
||||||
|
except IOError as e:
|
||||||
|
if e.errno == errno.EACCES:
|
||||||
|
return self.forbidden.render(request)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
if request.setLastModified(self.getModificationTime()) is http.CACHED:
|
||||||
|
# `setLastModified` also sets the response code for us, so if the
|
||||||
|
# request is cached, we close the file now that we've made sure that
|
||||||
|
# the request would otherwise succeed and return an empty body.
|
||||||
|
fileForReading.close()
|
||||||
|
return b''
|
||||||
|
|
||||||
|
if request.method == b'HEAD':
|
||||||
|
# Set the content headers here, rather than making a producer.
|
||||||
|
self._setContentHeaders(request)
|
||||||
|
# We've opened the file to make sure it's accessible, so close it
|
||||||
|
# now that we don't need it.
|
||||||
|
fileForReading.close()
|
||||||
|
return b''
|
||||||
|
|
||||||
|
producer = self.makeProducer(request, fileForReading)
|
||||||
|
producer.start()
|
||||||
|
|
||||||
|
# and make sure the connection doesn't get closed
|
||||||
|
return server.NOT_DONE_YET
|
||||||
|
render_HEAD = render_GET
|
||||||
|
|
||||||
|
|
||||||
|
class LBRYindex(resource.Resource):
|
||||||
|
def __init__(self, ui_dir):
|
||||||
|
resource.Resource.__init__(self)
|
||||||
|
self.ui_dir = ui_dir
|
||||||
|
|
||||||
|
isLeaf = False
|
||||||
|
|
||||||
|
def _delayed_render(self, request, results):
|
||||||
|
request.write(str(results))
|
||||||
|
request.finish()
|
||||||
|
|
||||||
|
def getChild(self, name, request):
|
||||||
|
request.setHeader('cache-control', 'no-cache, no-store, must-revalidate')
|
||||||
|
request.setHeader('expires', '0')
|
||||||
|
|
||||||
|
if name == '':
|
||||||
|
return self
|
||||||
|
return resource.Resource.getChild(self, name, request)
|
||||||
|
|
||||||
|
def render_GET(self, request):
|
||||||
|
return NoCacheStaticFile(os.path.join(self.ui_dir, "index.html")).render_GET(request)
|
||||||
|
|
||||||
|
|
||||||
|
class HostedEncryptedFile(resource.Resource):
|
||||||
|
def __init__(self, api):
|
||||||
|
self._api = api
|
||||||
|
resource.Resource.__init__(self)
|
||||||
|
|
||||||
|
def _make_stream_producer(self, request, stream):
|
||||||
|
path = os.path.join(self._api.download_directory, stream.file_name)
|
||||||
|
|
||||||
|
producer = EncryptedFileStreamer(request, path, stream, self._api.lbry_file_manager)
|
||||||
|
request.registerProducer(producer, streaming=True)
|
||||||
|
|
||||||
|
d = request.notifyFinish()
|
||||||
|
d.addErrback(self._responseFailed, d)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def render_GET(self, request):
|
||||||
|
request.setHeader("Content-Security-Policy", "sandbox")
|
||||||
|
if 'name' in request.args.keys():
|
||||||
|
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
|
||||||
|
d = self._api._download_name(request.args['name'][0])
|
||||||
|
d.addCallback(lambda stream: self._make_stream_producer(request, stream))
|
||||||
|
elif request.args['name'][0] in self._api.waiting_on.keys():
|
||||||
|
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
|
||||||
|
request.finish()
|
||||||
|
else:
|
||||||
|
request.redirect(UI_ADDRESS)
|
||||||
|
request.finish()
|
||||||
|
return server.NOT_DONE_YET
|
||||||
|
|
||||||
|
def _responseFailed(self, err, call):
|
||||||
|
call.addErrback(lambda err: err.trap(error.ConnectionDone))
|
||||||
|
call.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||||
|
call.addErrback(lambda err: log.info("Error: " + str(err)))
|
||||||
|
call.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
class EncryptedFileUpload(resource.Resource):
|
||||||
|
"""
|
||||||
|
Accepts a file sent via the file upload widget in the web UI, saves
|
||||||
|
it into a temporary dir, and responds with a JSON string containing
|
||||||
|
the path of the newly created file.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, api):
|
||||||
|
self._api = api
|
||||||
|
|
||||||
|
def render_POST(self, request):
|
||||||
|
origfilename = request.args['file_filename'][0]
|
||||||
|
uploaded_file = request.args['file'][0] # Temp file created by request
|
||||||
|
|
||||||
|
# Move to a new temporary dir and restore the original file name
|
||||||
|
newdirpath = tempfile.mkdtemp()
|
||||||
|
newpath = os.path.join(newdirpath, origfilename)
|
||||||
|
if os.name == "nt":
|
||||||
|
shutil.copy(uploaded_file.name, newpath)
|
||||||
|
# TODO Still need to remove the file
|
||||||
|
|
||||||
|
# TODO deal with pylint error in cleaner fashion than this
|
||||||
|
try:
|
||||||
|
from exceptions import WindowsError as win_except
|
||||||
|
except ImportError as e:
|
||||||
|
log.error("This shouldn't happen")
|
||||||
|
win_except = Exception
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.remove(uploaded_file.name)
|
||||||
|
except win_except as e:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
shutil.move(uploaded_file.name, newpath)
|
||||||
|
self._api.uploaded_temp_files.append(newpath)
|
||||||
|
|
||||||
|
return json.dumps(newpath)
|
|
@ -6,10 +6,10 @@ import json
|
||||||
|
|
||||||
from urllib2 import urlopen
|
from urllib2 import urlopen
|
||||||
from StringIO import StringIO
|
from StringIO import StringIO
|
||||||
from twisted.web import static
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.task import LoopingCall
|
from twisted.internet.task import LoopingCall
|
||||||
from lbrynet.conf import DEFAULT_UI_BRANCH, LOG_FILE_NAME
|
from lbrynet.conf import DEFAULT_UI_BRANCH, LOG_FILE_NAME
|
||||||
|
from lbrynet.lbrynet_daemon.Resources import NoCacheStaticFile
|
||||||
from lbrynet import __version__ as lbrynet_version
|
from lbrynet import __version__ as lbrynet_version
|
||||||
from lbryum.version import LBRYUM_VERSION as lbryum_version
|
from lbryum.version import LBRYUM_VERSION as lbryum_version
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
|
@ -235,5 +235,5 @@ class UIManager(object):
|
||||||
|
|
||||||
def _load_ui(self):
|
def _load_ui(self):
|
||||||
for d in [i[0] for i in os.walk(self.active_dir) if os.path.dirname(i[0]) == self.active_dir]:
|
for d in [i[0] for i in os.walk(self.active_dir) if os.path.dirname(i[0]) == self.active_dir]:
|
||||||
self.root.putChild(os.path.basename(d), static.File(d))
|
self.root.putChild(os.path.basename(d), NoCacheStaticFile(d))
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
Loading…
Reference in a new issue