lbry file improvements

-fix daemon functions to start/stop lbry files

-remove unused stuff in LBRYFileManager

-improve and use new get_lbry_file function instead of _check_history,
which didn’t use the lbry file manager

-use said function to let delete_lbry_file use the same search keys
(sd_hash, name, and file_name)

-logging in LBRYDownloader
This commit is contained in:
Jack 2016-04-24 17:51:24 -04:00
parent c2ec066c85
commit dfaf51a432
4 changed files with 159 additions and 170 deletions

View file

@ -4,10 +4,7 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi
import logging
import os
import sys
from datetime import datetime
from twisted.internet.task import LoopingCall
from twisted.enterprise import adbapi
from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure
@ -28,14 +25,12 @@ class LBRYFileManager(object):
Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata.
"""
def __init__(self, session, stream_info_manager, sd_identifier, delete_data=False, download_directory=None):
def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None):
self.session = session
self.stream_info_manager = stream_info_manager
self.sd_identifier = sd_identifier
self.lbry_files = []
self.sql_db = None
# self.delete_data = delete_data
# self.check_exists_loop = LoopingCall(self.check_files_exist)
if download_directory:
self.download_directory = download_directory
else:
@ -43,35 +38,11 @@ class LBRYFileManager(object):
log.debug("Download directory for LBRYFileManager: %s", str(self.download_directory))
def setup(self):
# self.check_exists_loop.start(10)
d = self._open_db()
d.addCallback(lambda _: self._add_to_sd_identifier())
d.addCallback(lambda _: self._start_lbry_files())
return d
# def check_files_exist(self):
# def _disp(deleted_files):
# if deleted_files[0][0]:
# for file in bad_files:
# log.info("[" + str(datetime.now()) + "] Detected " + file.file_name + " was deleted, removing from file manager")
#
# def _delete_stream_data(lbry_file):
# s_h = lbry_file.stream_hash
# d = self.get_count_for_stream_hash(s_h)
# # TODO: could possibly be a timing issue here
# d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
# return d
#
# bad_files = [lbry_file for lbry_file in self.lbry_files
# if lbry_file.completed == True and
# os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)) == False]
# d = defer.DeferredList([self.delete_lbry_file(lbry_file) for lbry_file in bad_files], consumeErrors=True)
# d.addCallback(lambda files: _disp(files) if len(files) else defer.succeed(None))
#
# if self.delete_data:
# d2 = defer.DeferredList([_delete_stream_data(lbry_file) for lbry_file in bad_files], consumeErrors=True)
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
@ -183,7 +154,6 @@ class LBRYFileManager(object):
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def stop(self):
# self.check_exists_loop.stop()
ds = []

View file

@ -716,7 +716,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
self.lbry_file_manager = LBRYFileManager(self.session,
self.lbry_file_metadata_manager,
self.sd_identifier,
delete_data=True)
download_directory=self.download_directory)
return self.lbry_file_manager.setup()
d.addCallback(lambda _: set_lbry_file_manager())
@ -862,6 +862,10 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return defer.succeed(True)
def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None):
"""
Add a lbry file to the file manager, start the download, and return the new lbry file
if it already exists in the file manager, return the existing lbry file
"""
if not download_directory:
download_directory = self.download_directory
elif not os.path.isdir(download_directory):
@ -871,34 +875,36 @@ class LBRYDaemon(jsonrpc.JSONRPC):
del self.waiting_on[name]
return r
def _disp_file(f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.stream_hash) + " --> " + file_path)
return f
def _get_stream(name):
def _disp(stream):
stream_hash = stream['stream_hash']
def _setup_stream(stream_info):
stream_hash = stream_info['stream_hash']
if isinstance(stream_hash, dict):
stream_hash = stream_hash['sd_hash']
log.info("[" + str(datetime.now()) + "] Resolved lbry://" + name + " to sd hash: " + stream_hash)
d = self._get_lbry_file_by_sd_hash(stream_hash)
def _add_results(l):
return defer.succeed((stream_info, l))
d.addCallback(_add_results)
return d
log.info("[" + str(datetime.now()) + "] Start stream: " + stream_hash)
return stream
def _disp_file(f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path)
return f
def _get_stream(stream_info):
stream = GetStream(self.sd_identifier, self.session, self.session.wallet,
self.lbry_file_manager,
max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout,
download_directory=download_directory)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallback(_disp)
d.addCallback(lambda stream_info: stream.start(stream_info, name))
d = stream.start(stream_info, name)
d.addCallback(lambda _: stream.downloader)
return d
self.waiting_on[name] = True
d = self._check_history(name)
d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file))
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallback(_setup_stream)
d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _disp_file(lbry_file))
d.addCallback(_remove_from_wait)
return d
@ -918,46 +924,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return d
def _check_history(self, name):
def _get_lbry_file(path):
f = open(path, 'r')
l = json.loads(f.read())
f.close()
file_name = l['stream_name'].decode('hex')
for lbry_file in self.lbry_file_manager.lbry_files:
if lbry_file.stream_name == file_name:
if sys.platform == "darwin":
if os.path.isfile(os.path.join(self.download_directory, lbry_file.stream_name)):
return lbry_file
else:
return False
else:
return lbry_file
else:
return False
def _check(info):
stream_hash = info['stream_hash']
if isinstance(stream_hash, dict):
stream_hash = stream_hash['sd_hash']
path = os.path.join(self.blobfile_dir, stream_hash)
if os.path.isfile(path):
file_size = os.stat(path).st_size
log.info("[" + str(datetime.now()) + "] Search for lbry_file, found " + str(file_size) + " bytes written from stream hash: " + stream_hash)
return defer.succeed(_get_lbry_file(path))
else:
log.info("[" + str(datetime.now()) + "] Search for lbry_file didn't return anything")
return defer.succeed(False)
d = self._resolve_name(name)
d.addCallback(_check)
d.callback(None)
return d
def _delete_lbry_file(self, lbry_file):
d = self.lbry_file_manager.delete_lbry_file(lbry_file)
@ -978,21 +944,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d.addCallback(lambda _: finish_deletion(lbry_file))
return d
def _path_from_name(self, name):
d = self._check_history(name)
d.addCallback(lambda lbry_file: {'stream_hash': lbry_file.stream_hash,
'path': os.path.join(self.download_directory, lbry_file.file_name)}
if lbry_file else defer.fail(UnknownNameError))
return d
def _path_from_lbry_file(self, lbry_file):
if lbry_file:
r = {'stream_hash': lbry_file.stream_hash,
'path': os.path.join(self.download_directory, lbry_file.file_name)}
return defer.succeed(r)
else:
return defer.fail(UnknownNameError)
def _get_est_cost(self, name):
def _check_est(d, name):
if isinstance(d.result, float):
@ -1041,21 +992,43 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return r
def _get_lbry_file_by_uri(self, name):
def _get_file(stream_info):
if isinstance(stream_info['stream_hash'], str) or isinstance(stream_info['stream_hash'], unicode):
sd = stream_info['stream_hash']
elif isinstance(stream_info['stream_hash'], dict):
sd = stream_info['stream_hash']['sd_hash']
for l in self.lbry_file_manager.lbry_files:
if l.sd_hash == sd:
return defer.succeed(l)
return defer.succeed(None)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallback(lambda info: info['stream_hash'])
d.addCallback(lambda sd_hash: next(l for l in self.lbry_file_manager.lbry_files if l.sd_hash == sd_hash))
d.addCallback(_get_file)
return d
def _get_lbry_file_by_sd_hash(self, sd_hash):
r = next(l for l in self.lbry_file_manager.lbry_files if l.sd_hash == sd_hash)
return defer.succeed(r)
for l in self.lbry_file_manager.lbry_files:
if l.sd_hash == sd_hash:
return defer.succeed(l)
return defer.succeed(None)
def _get_lbry_file_by_file_name(self, file_name):
r = next(l for l in self.lbry_file_manager.lbry_files if l.file_name == file_name)
return defer.succeed(r)
for l in self.lbry_file_manager.lbry_files:
if l.file_name == file_name:
return defer.succeed(l)
return defer.succeed(None)
def _get_lbry_file(self, search_by, val):
def _show_file(f):
def _get_lbry_file(self, search_by, val, return_json=True):
def _log_get_lbry_file(f):
if f:
log.info("Found LBRY file for " + search_by + ": " + val)
else:
log.info("Did not find LBRY file for " + search_by + ": " + val)
return f
def _get_json_for_return(f):
if f:
if f.key:
t = {'completed': f.completed, 'file_name': f.file_name, 'key': binascii.b2a_hex(f.key),
@ -1079,7 +1052,9 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = self._get_lbry_file_by_sd_hash(val)
elif search_by == "file_name":
d = self._get_lbry_file_by_file_name(val)
d.addCallback(_show_file)
d.addCallback(_log_get_lbry_file)
if return_json:
d.addCallback(_get_json_for_return)
return d
def _log_to_slack(self, msg):
@ -1473,36 +1448,54 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return d
# def jsonrpc_stop_lbry_file(self, p):
# params = Bunch(p)
#
# try:
# lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == params.stream_hash][0]
# except IndexError:
# return defer.fail(UnknownNameError)
#
# if not lbry_file.stopped:
# d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file)
# d.addCallback(lambda _: self._render_response("Stream has been stopped", OK_CODE))
# d.addErrback(lambda err: self._render_response(err.getTraceback(), ))
# return d
# else:
# return json.dumps({'result': 'Stream was already stopped'})
#
# def jsonrpc_start_lbry_file(self, p):
# params = Bunch(p)
#
# try:
# lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == params.stream_hash][0]
# except IndexError:
# return defer.fail(UnknownNameError)
#
# if lbry_file.stopped:
# d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file)
# d.callback(None)
# return json.dumps({'result': 'Stream started'})
# else:
# return json.dumps({'result': 'Stream was already running'})
def jsonrpc_stop_lbry_file(self, p):
"""
Stop lbry file
Args:
'name': stop file by lbry uri,
'sd_hash': stop file by the hash in the name claim,
'file_name': stop file by its name in the downloads folder,
Returns:
confirmation message
"""
def _stop_file(f):
d = self.lbry_file_manager.toggle_lbry_file_running(f)
d.addCallback(lambda _: "Stopped LBRY file")
return d
if p.keys()[0] in ['name', 'sd_hash', 'file_name']:
search_type = p.keys()[0]
d = self._get_lbry_file(search_type, p[search_type], return_json=False)
d.addCallback(lambda l: _stop_file(l) if not l.stopped else "LBRY file wasn't running")
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
def jsonrpc_start_lbry_file(self, p):
"""
Stop lbry file
Args:
'name': stop file by lbry uri,
'sd_hash': stop file by the hash in the name claim,
'file_name': stop file by its name in the downloads folder,
Returns:
confirmation message
"""
def _start_file(f):
d = self.lbry_file_manager.toggle_lbry_file_running(f)
return defer.succeed("Started LBRY file")
if p.keys()[0] in ['name', 'sd_hash', 'file_name']:
search_type = p.keys()[0]
d = self._get_lbry_file(search_type, p[search_type], return_json=False)
d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running")
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
def jsonrpc_search_nametrie(self, p):
"""
@ -1575,15 +1568,18 @@ class LBRYDaemon(jsonrpc.JSONRPC):
confirmation message
"""
def _disp(file_name):
log.info("[" + str(datetime.now()) + "] Deleted: " + file_name)
return self._render_response("Deleted: " + file_name, OK_CODE)
def _delete_file(f):
file_name = f.file_name
d = self._delete_lbry_file(f)
d.addCallback(lambda _: "Deleted LBRY file" + file_name)
return d
if "file_name" in p.keys():
lbry_files = [self._delete_lbry_file(f) for f in self.lbry_file_manager.lbry_files
if p['file_name'] == f.file_name]
d = defer.DeferredList(lbry_files)
d.addCallback(lambda _: _disp(p['file_name']))
if p.keys()[0] in ['name', 'sd_hash', 'file_name']:
search_type = p.keys()[0]
d = self._get_lbry_file(search_type, p[search_type], return_json=False)
d.addCallback(lambda l: _delete_file(l) if l else "Couldn't find LBRY file to delete")
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
def jsonrpc_publish(self, p):

View file

@ -54,13 +54,20 @@ class LBRYindex(resource.Resource):
class LBRYFileProducer(StaticProducer):
def __init__(self, request, lbry_stream):
def __init__(self, request, lbry_stream, api):
self._api = api
self.stream = lbry_stream
self.updater = LoopingCall(self._check_for_data)
StaticProducer.__init__(self, request, fileObject=file(lbry_stream.file_written_to))
self.total_bytes = 0
if lbry_stream.file_written_to:
file_name = lbry_stream.file_written_to
else:
file_name = os.path.join(self._api.download_directory, lbry_stream.file_name)
StaticProducer.__init__(self, request, fileObject=file(file_name))
def start(self):
d = self._set_size()
self.fileObject.seek(0)
self.updater.start(5)
def _set_size(self):
@ -74,17 +81,22 @@ class LBRYFileProducer(StaticProducer):
return d
def _check_for_data(self):
def _write_new_data_to_request():
self.fileObject.seek(self.fileObject.tell())
data = self.fileObject.read()
self.total_bytes += len(data)
if data:
self.request.write(data)
return defer.succeed(None)
def _check_status(stream_status):
if stream_status.running_status == "completed":
self.stopProducing()
return defer.succeed(None)
d = self.stream.status()
d = _write_new_data_to_request()
d.addCallback(lambda _: self.stream.status())
d.addCallback(_check_status)
def resumeProducing(self):
@ -102,24 +114,22 @@ class HostedLBRYFile(resource.Resource):
self._api = api
self.stream = None
self.streaming_file = None
self.producer = None
resource.Resource.__init__(self)
def _set_stream(self, stream):
self.stream = stream
def makeProducer(self, request, stream):
return LBRYFileProducer(request, stream)
self.producer = LBRYFileProducer(request, stream, self._api)
return self.producer
def render_GET(self, request):
if 'name' in request.args.keys():
if request.args['name'][0] != 'lbry':
if request.args['name'][0] != self.streaming_file:
self.streaming_file = request.args['name'][0]
if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys():
d = self._api._download_name(request.args['name'][0])
d.addCallback(self._set_stream)
else:
d = defer.succeed(None)
d.addCallback(lambda _: self.makeProducer(request, self.stream).start())
d.addCallback(lambda stream: self.makeProducer(request, stream))
d.addCallback(lambda producer: producer.start())
elif request.args['name'][0] in self._api.waiting_on.keys():
request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0])
request.finish()
else:
request.redirect(UI_ADDRESS)
request.finish()

View file

@ -1,7 +1,9 @@
import json
import logging
import os
import sys
from appdirs import user_data_dir
from datetime import datetime
from twisted.internet import defer
from twisted.internet.task import LoopingCall
@ -12,7 +14,18 @@ from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory
from lbrynet.conf import DEFAULT_TIMEOUT
if sys.platform != "darwin":
log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet")
else:
log_dir = user_data_dir("LBRY")
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
LOG_FILENAME = os.path.join(log_dir, 'lbrynet-daemon.log')
log = logging.getLogger(__name__)
handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=2097152, backupCount=5)
log.addHandler(handler)
log.setLevel(logging.INFO)
@ -123,7 +136,7 @@ class GetStream(object):
d = defer.Deferred()
self.downloader = downloader
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d.addCallback(lambda _: log.info("Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)))
d.addCallback(lambda _: log.info("[" + str(datetime.now()) + "] Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)))
d.addCallback(lambda _: self.downloader.start())