speed up streaming, off by one errors
This commit is contained in:
parent
46368f52f5
commit
04ee9894c9
3 changed files with 43 additions and 24 deletions
|
@ -991,6 +991,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
d.addCallback(lambda _: finish_deletion(lbry_file))
|
d.addCallback(lambda _: finish_deletion(lbry_file))
|
||||||
|
d.addCallback(lambda _: log.info("[" + str(datetime.now()) + "] Delete lbry file"))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_est_cost(self, name):
|
def _get_est_cost(self, name):
|
||||||
|
|
|
@ -132,3 +132,6 @@ def start():
|
||||||
if not args.logtoconsole:
|
if not args.logtoconsole:
|
||||||
print "Not connected to internet, unable to start"
|
print "Not connected to internet, unable to start"
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
start()
|
|
@ -4,6 +4,7 @@ import os
|
||||||
import shutil
|
import shutil
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
|
import mimetypes
|
||||||
|
|
||||||
from StringIO import StringIO
|
from StringIO import StringIO
|
||||||
from zipfile import ZipFile
|
from zipfile import ZipFile
|
||||||
|
@ -71,18 +72,21 @@ class LBRYFileStreamer(object):
|
||||||
def __init__(self, request, path, start, stop, size):
|
def __init__(self, request, path, start, stop, size):
|
||||||
self._request = request
|
self._request = request
|
||||||
self._fileObject = file(path)
|
self._fileObject = file(path)
|
||||||
self._stop_pos = size if stop == '' else int(stop) #chrome and firefox send range requests for "0-"
|
self._content_type = mimetypes.guess_type(path)[0]
|
||||||
|
self._stop_pos = size - 1 if stop == '' else int(stop) #chrome and firefox send range requests for "0-"
|
||||||
self._cursor = self._start_pos = int(start)
|
self._cursor = self._start_pos = int(start)
|
||||||
self._file_size = size
|
self._file_size = size
|
||||||
|
|
||||||
self._paused = self._sent_bytes = False
|
self._paused = self._sent_bytes = self._stopped = False
|
||||||
self._delay = 0.1
|
self._delay = 0.1
|
||||||
|
self._deferred = defer.succeed(None)
|
||||||
|
|
||||||
self._request.setResponseCode(206)
|
self._request.setResponseCode(206)
|
||||||
self._request.setHeader('accept-ranges', 'bytes')
|
self._request.setHeader('accept-ranges', 'bytes')
|
||||||
self._request.setHeader('content-type', 'application/octet-stream')
|
# self._request.setHeader('content-type', 'application/octet-stream')
|
||||||
self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._start_pos, self._stop_pos, self._file_size))
|
self._request.setHeader('content-type', self._content_type)
|
||||||
|
|
||||||
|
# self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._start_pos, self._stop_pos, self._file_size))
|
||||||
self.resumeProducing()
|
self.resumeProducing()
|
||||||
|
|
||||||
def pauseProducing(self):
|
def pauseProducing(self):
|
||||||
|
@ -98,57 +102,68 @@ class LBRYFileStreamer(object):
|
||||||
|
|
||||||
self._sent_bytes = False
|
self._sent_bytes = False
|
||||||
|
|
||||||
if readable_bytes > self._cursor:
|
if (readable_bytes > self._cursor) and not (self._stopped or self._paused):
|
||||||
read_length = min(readable_bytes, self._stop_pos) - self._cursor
|
read_length = min(readable_bytes, self._stop_pos) - self._cursor + 1
|
||||||
log.info('Writing range %s-%s/%s' % (self._cursor, self._cursor + read_length, self._file_size))
|
self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length - 1, self._file_size))
|
||||||
self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length, self._file_size))
|
self._request.setHeader('content-length', str(read_length))
|
||||||
for i in range(read_length + 1):
|
start_cur = self._cursor
|
||||||
if self._paused:
|
for i in range(read_length):
|
||||||
|
if self._paused or self._stopped:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
data = self._fileObject.read(1)
|
data = self._fileObject.read(1)
|
||||||
self._request.write(data)
|
self._request.write(data)
|
||||||
self._cursor += 1
|
self._cursor += 1
|
||||||
self._sent_bytes = True
|
|
||||||
return defer.succeed(None)
|
|
||||||
|
|
||||||
def _write_reply():
|
log.info("[" + str(datetime.now()) + "] Wrote range %s-%s/%s, length: %s" % (start_cur, self._cursor - 1, self._file_size, self._cursor - start_cur))
|
||||||
|
self._sent_bytes = True
|
||||||
|
|
||||||
if self._cursor == self._stop_pos + 1:
|
if self._cursor == self._stop_pos + 1:
|
||||||
self.stopProducing()
|
self.stopProducing()
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
elif self._paused:
|
elif self._paused or self._stopped:
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
else:
|
else:
|
||||||
d = task.deferLater(reactor, self._delay, _check_for_new_data)
|
self._deferred.addCallback(lambda _: task.deferLater(reactor, self._delay, _check_for_new_data))
|
||||||
d.addCallback(lambda _: _write_reply())
|
return defer.succeed(None)
|
||||||
return d
|
|
||||||
|
|
||||||
log.info("[" + str(datetime.now()) + "] Resuming producer")
|
log.info("[" + str(datetime.now()) + "] Resuming producer")
|
||||||
|
|
||||||
self._paused = False
|
self._paused = False
|
||||||
_write_reply()
|
self._deferred.addCallback(lambda _: _check_for_new_data())
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
def stopProducing(self):
|
def stopProducing(self):
|
||||||
log.info("Stopping producer")
|
log.info("[" + str(datetime.now()) + "] Stopping producer")
|
||||||
|
self._stopped = True
|
||||||
# self._fileObject.close()
|
# self._fileObject.close()
|
||||||
self._request.finish()
|
self._deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||||
|
self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone))
|
||||||
|
self._deferred.cancel()
|
||||||
|
# self._request.finish()
|
||||||
self._request.unregisterProducer()
|
self._request.unregisterProducer()
|
||||||
|
|
||||||
|
|
||||||
class HostedLBRYFile(resource.Resource):
|
class HostedLBRYFile(resource.Resource):
|
||||||
def __init__(self, api):
|
def __init__(self, api):
|
||||||
self._api = api
|
self._api = api
|
||||||
|
self._producer = None
|
||||||
resource.Resource.__init__(self)
|
resource.Resource.__init__(self)
|
||||||
|
|
||||||
def makeProducer(self, request, stream):
|
def makeProducer(self, request, stream):
|
||||||
|
def _save_producer(producer):
|
||||||
|
self._producer = producer
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-')
|
range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-')
|
||||||
start, stop = int(range_header[0]), range_header[1]
|
start, stop = int(range_header[0]), range_header[1]
|
||||||
|
log.info("[" + str(datetime.now()) + "] GET range %s-%s" % (start, stop))
|
||||||
path = os.path.join(self._api.download_directory, stream.file_name)
|
path = os.path.join(self._api.download_directory, stream.file_name)
|
||||||
|
|
||||||
d = stream.get_total_bytes()
|
d = stream.get_total_bytes()
|
||||||
d.addCallback(lambda size: request.registerProducer(LBRYFileStreamer(request, path, start, stop, size), streaming=True))
|
d.addCallback(lambda size: _save_producer(LBRYFileStreamer(request, path, start, stop, size)))
|
||||||
|
d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True))
|
||||||
|
# request.notifyFinish().addCallback(lambda _: self._producer.stopProducing())
|
||||||
|
request.notifyFinish().addErrback(self._responseFailed, d)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def render_GET(self, request):
|
def render_GET(self, request):
|
||||||
|
@ -156,7 +171,7 @@ class HostedLBRYFile(resource.Resource):
|
||||||
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
|
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
|
||||||
d = self._api._download_name(request.args['name'][0])
|
d = self._api._download_name(request.args['name'][0])
|
||||||
d.addCallback(lambda stream: self.makeProducer(request, stream))
|
d.addCallback(lambda stream: self.makeProducer(request, stream))
|
||||||
request.notifyFinish().addErrback(self._responseFailed, d)
|
|
||||||
elif request.args['name'][0] in self._api.waiting_on.keys():
|
elif request.args['name'][0] in self._api.waiting_on.keys():
|
||||||
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
|
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
|
||||||
request.finish()
|
request.finish()
|
||||||
|
|
Loading…
Reference in a new issue