From dfaf51a432e6293600849915ac47754541f69c21 Mon Sep 17 00:00:00 2001 From: Jack Date: Sun, 24 Apr 2016 17:51:24 -0400 Subject: [PATCH] lbry file improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit -fix daemon functions to start/stop lbry files -remove unused stuff in LBRYFileManager -improve and use new get_lbry_file function instead of _check_history, which didn’t use the lbry file manager -use said function to let delete_lbry_file use the same search keys (sd_hash, name, and file_name) -logging in LBRYDownloader --- lbrynet/lbryfilemanager/LBRYFileManager.py | 32 +-- lbrynet/lbrynet_daemon/LBRYDaemon.py | 234 ++++++++++----------- lbrynet/lbrynet_daemon/LBRYDaemonServer.py | 48 +++-- lbrynet/lbrynet_daemon/LBRYDownloader.py | 15 +- 4 files changed, 159 insertions(+), 170 deletions(-) diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index d0dbca1ae..144425c6f 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -4,10 +4,7 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi import logging import os -import sys -from datetime import datetime -from twisted.internet.task import LoopingCall from twisted.enterprise import adbapi from twisted.internet import defer, task, reactor from twisted.python.failure import Failure @@ -28,14 +25,12 @@ class LBRYFileManager(object): Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata. """ - def __init__(self, session, stream_info_manager, sd_identifier, delete_data=False, download_directory=None): + def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None): self.session = session self.stream_info_manager = stream_info_manager self.sd_identifier = sd_identifier self.lbry_files = [] self.sql_db = None - # self.delete_data = delete_data - # self.check_exists_loop = LoopingCall(self.check_files_exist) if download_directory: self.download_directory = download_directory else: @@ -43,35 +38,11 @@ class LBRYFileManager(object): log.debug("Download directory for LBRYFileManager: %s", str(self.download_directory)) def setup(self): - # self.check_exists_loop.start(10) - d = self._open_db() d.addCallback(lambda _: self._add_to_sd_identifier()) d.addCallback(lambda _: self._start_lbry_files()) return d - # def check_files_exist(self): - # def _disp(deleted_files): - # if deleted_files[0][0]: - # for file in bad_files: - # log.info("[" + str(datetime.now()) + "] Detected " + file.file_name + " was deleted, removing from file manager") - # - # def _delete_stream_data(lbry_file): - # s_h = lbry_file.stream_hash - # d = self.get_count_for_stream_hash(s_h) - # # TODO: could possibly be a timing issue here - # d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True) - # return d - # - # bad_files = [lbry_file for lbry_file in self.lbry_files - # if lbry_file.completed == True and - # os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)) == False] - # d = defer.DeferredList([self.delete_lbry_file(lbry_file) for lbry_file in bad_files], consumeErrors=True) - # d.addCallback(lambda files: _disp(files) if len(files) else defer.succeed(None)) - # - # if self.delete_data: - # d2 = defer.DeferredList([_delete_stream_data(lbry_file) for lbry_file in bad_files], consumeErrors=True) - def get_lbry_file_status(self, lbry_file): return self._get_lbry_file_status(lbry_file.rowid) @@ -183,7 +154,6 @@ class LBRYFileManager(object): return defer.fail(Failure(ValueError("Could not find that LBRY file"))) def stop(self): - # self.check_exists_loop.stop() ds = [] diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index c0c487b42..961d6bdca 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -716,7 +716,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.lbry_file_manager = LBRYFileManager(self.session, self.lbry_file_metadata_manager, self.sd_identifier, - delete_data=True) + download_directory=self.download_directory) return self.lbry_file_manager.setup() d.addCallback(lambda _: set_lbry_file_manager()) @@ -862,6 +862,10 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None): + """ + Add a lbry file to the file manager, start the download, and return the new lbry file + if it already exists in the file manager, return the existing lbry file + """ if not download_directory: download_directory = self.download_directory elif not os.path.isdir(download_directory): @@ -871,34 +875,36 @@ class LBRYDaemon(jsonrpc.JSONRPC): del self.waiting_on[name] return r + def _setup_stream(stream_info): + stream_hash = stream_info['stream_hash'] + if isinstance(stream_hash, dict): + stream_hash = stream_hash['sd_hash'] + log.info("[" + str(datetime.now()) + "] Resolved lbry://" + name + " to sd hash: " + stream_hash) + d = self._get_lbry_file_by_sd_hash(stream_hash) + def _add_results(l): + return defer.succeed((stream_info, l)) + d.addCallback(_add_results) + return d + def _disp_file(f): file_path = os.path.join(self.download_directory, f.file_name) - log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.stream_hash) + " --> " + file_path) + log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path) return f - def _get_stream(name): - def _disp(stream): - stream_hash = stream['stream_hash'] - if isinstance(stream_hash, dict): - stream_hash = stream_hash['sd_hash'] - - log.info("[" + str(datetime.now()) + "] Start stream: " + stream_hash) - return stream - + def _get_stream(stream_info): stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager, max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout, download_directory=download_directory) - d = self.session.wallet.get_stream_info_for_name(name) - d.addCallback(_disp) - d.addCallback(lambda stream_info: stream.start(stream_info, name)) + d = stream.start(stream_info, name) d.addCallback(lambda _: stream.downloader) return d self.waiting_on[name] = True - d = self._check_history(name) - d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file)) + d = self.session.wallet.get_stream_info_for_name(name) + d.addCallback(_setup_stream) + d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _disp_file(lbry_file)) d.addCallback(_remove_from_wait) return d @@ -918,46 +924,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): return d - def _check_history(self, name): - def _get_lbry_file(path): - f = open(path, 'r') - l = json.loads(f.read()) - f.close() - - file_name = l['stream_name'].decode('hex') - - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.stream_name == file_name: - if sys.platform == "darwin": - if os.path.isfile(os.path.join(self.download_directory, lbry_file.stream_name)): - return lbry_file - else: - return False - else: - return lbry_file - else: - return False - - def _check(info): - stream_hash = info['stream_hash'] - if isinstance(stream_hash, dict): - stream_hash = stream_hash['sd_hash'] - - path = os.path.join(self.blobfile_dir, stream_hash) - if os.path.isfile(path): - file_size = os.stat(path).st_size - log.info("[" + str(datetime.now()) + "] Search for lbry_file, found " + str(file_size) + " bytes written from stream hash: " + stream_hash) - return defer.succeed(_get_lbry_file(path)) - else: - log.info("[" + str(datetime.now()) + "] Search for lbry_file didn't return anything") - return defer.succeed(False) - - d = self._resolve_name(name) - d.addCallback(_check) - d.callback(None) - - return d - def _delete_lbry_file(self, lbry_file): d = self.lbry_file_manager.delete_lbry_file(lbry_file) @@ -978,21 +944,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda _: finish_deletion(lbry_file)) return d - def _path_from_name(self, name): - d = self._check_history(name) - d.addCallback(lambda lbry_file: {'stream_hash': lbry_file.stream_hash, - 'path': os.path.join(self.download_directory, lbry_file.file_name)} - if lbry_file else defer.fail(UnknownNameError)) - return d - - def _path_from_lbry_file(self, lbry_file): - if lbry_file: - r = {'stream_hash': lbry_file.stream_hash, - 'path': os.path.join(self.download_directory, lbry_file.file_name)} - return defer.succeed(r) - else: - return defer.fail(UnknownNameError) - def _get_est_cost(self, name): def _check_est(d, name): if isinstance(d.result, float): @@ -1041,21 +992,43 @@ class LBRYDaemon(jsonrpc.JSONRPC): return r def _get_lbry_file_by_uri(self, name): + def _get_file(stream_info): + if isinstance(stream_info['stream_hash'], str) or isinstance(stream_info['stream_hash'], unicode): + sd = stream_info['stream_hash'] + elif isinstance(stream_info['stream_hash'], dict): + sd = stream_info['stream_hash']['sd_hash'] + + for l in self.lbry_file_manager.lbry_files: + if l.sd_hash == sd: + return defer.succeed(l) + return defer.succeed(None) + d = self.session.wallet.get_stream_info_for_name(name) - d.addCallback(lambda info: info['stream_hash']) - d.addCallback(lambda sd_hash: next(l for l in self.lbry_file_manager.lbry_files if l.sd_hash == sd_hash)) + d.addCallback(_get_file) + return d def _get_lbry_file_by_sd_hash(self, sd_hash): - r = next(l for l in self.lbry_file_manager.lbry_files if l.sd_hash == sd_hash) - return defer.succeed(r) + for l in self.lbry_file_manager.lbry_files: + if l.sd_hash == sd_hash: + return defer.succeed(l) + return defer.succeed(None) def _get_lbry_file_by_file_name(self, file_name): - r = next(l for l in self.lbry_file_manager.lbry_files if l.file_name == file_name) - return defer.succeed(r) + for l in self.lbry_file_manager.lbry_files: + if l.file_name == file_name: + return defer.succeed(l) + return defer.succeed(None) - def _get_lbry_file(self, search_by, val): - def _show_file(f): + def _get_lbry_file(self, search_by, val, return_json=True): + def _log_get_lbry_file(f): + if f: + log.info("Found LBRY file for " + search_by + ": " + val) + else: + log.info("Did not find LBRY file for " + search_by + ": " + val) + return f + + def _get_json_for_return(f): if f: if f.key: t = {'completed': f.completed, 'file_name': f.file_name, 'key': binascii.b2a_hex(f.key), @@ -1079,7 +1052,9 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = self._get_lbry_file_by_sd_hash(val) elif search_by == "file_name": d = self._get_lbry_file_by_file_name(val) - d.addCallback(_show_file) + d.addCallback(_log_get_lbry_file) + if return_json: + d.addCallback(_get_json_for_return) return d def _log_to_slack(self, msg): @@ -1473,36 +1448,54 @@ class LBRYDaemon(jsonrpc.JSONRPC): return d - # def jsonrpc_stop_lbry_file(self, p): - # params = Bunch(p) - # - # try: - # lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == params.stream_hash][0] - # except IndexError: - # return defer.fail(UnknownNameError) - # - # if not lbry_file.stopped: - # d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file) - # d.addCallback(lambda _: self._render_response("Stream has been stopped", OK_CODE)) - # d.addErrback(lambda err: self._render_response(err.getTraceback(), )) - # return d - # else: - # return json.dumps({'result': 'Stream was already stopped'}) - # - # def jsonrpc_start_lbry_file(self, p): - # params = Bunch(p) - # - # try: - # lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == params.stream_hash][0] - # except IndexError: - # return defer.fail(UnknownNameError) - # - # if lbry_file.stopped: - # d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file) - # d.callback(None) - # return json.dumps({'result': 'Stream started'}) - # else: - # return json.dumps({'result': 'Stream was already running'}) + def jsonrpc_stop_lbry_file(self, p): + """ + Stop lbry file + + Args: + 'name': stop file by lbry uri, + 'sd_hash': stop file by the hash in the name claim, + 'file_name': stop file by its name in the downloads folder, + Returns: + confirmation message + """ + + def _stop_file(f): + d = self.lbry_file_manager.toggle_lbry_file_running(f) + d.addCallback(lambda _: "Stopped LBRY file") + return d + + if p.keys()[0] in ['name', 'sd_hash', 'file_name']: + search_type = p.keys()[0] + d = self._get_lbry_file(search_type, p[search_type], return_json=False) + d.addCallback(lambda l: _stop_file(l) if not l.stopped else "LBRY file wasn't running") + + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + + def jsonrpc_start_lbry_file(self, p): + """ + Stop lbry file + + Args: + 'name': stop file by lbry uri, + 'sd_hash': stop file by the hash in the name claim, + 'file_name': stop file by its name in the downloads folder, + Returns: + confirmation message + """ + + def _start_file(f): + d = self.lbry_file_manager.toggle_lbry_file_running(f) + return defer.succeed("Started LBRY file") + + if p.keys()[0] in ['name', 'sd_hash', 'file_name']: + search_type = p.keys()[0] + d = self._get_lbry_file(search_type, p[search_type], return_json=False) + d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running") + + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d def jsonrpc_search_nametrie(self, p): """ @@ -1575,17 +1568,20 @@ class LBRYDaemon(jsonrpc.JSONRPC): confirmation message """ - def _disp(file_name): - log.info("[" + str(datetime.now()) + "] Deleted: " + file_name) - return self._render_response("Deleted: " + file_name, OK_CODE) - - if "file_name" in p.keys(): - lbry_files = [self._delete_lbry_file(f) for f in self.lbry_file_manager.lbry_files - if p['file_name'] == f.file_name] - d = defer.DeferredList(lbry_files) - d.addCallback(lambda _: _disp(p['file_name'])) + def _delete_file(f): + file_name = f.file_name + d = self._delete_lbry_file(f) + d.addCallback(lambda _: "Deleted LBRY file" + file_name) return d + if p.keys()[0] in ['name', 'sd_hash', 'file_name']: + search_type = p.keys()[0] + d = self._get_lbry_file(search_type, p[search_type], return_json=False) + d.addCallback(lambda l: _delete_file(l) if l else "Couldn't find LBRY file to delete") + + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + def jsonrpc_publish(self, p): """ Make a new name claim diff --git a/lbrynet/lbrynet_daemon/LBRYDaemonServer.py b/lbrynet/lbrynet_daemon/LBRYDaemonServer.py index e68479bbe..167c11e92 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemonServer.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemonServer.py @@ -54,13 +54,20 @@ class LBRYindex(resource.Resource): class LBRYFileProducer(StaticProducer): - def __init__(self, request, lbry_stream): + def __init__(self, request, lbry_stream, api): + self._api = api self.stream = lbry_stream self.updater = LoopingCall(self._check_for_data) - StaticProducer.__init__(self, request, fileObject=file(lbry_stream.file_written_to)) + self.total_bytes = 0 + if lbry_stream.file_written_to: + file_name = lbry_stream.file_written_to + else: + file_name = os.path.join(self._api.download_directory, lbry_stream.file_name) + StaticProducer.__init__(self, request, fileObject=file(file_name)) def start(self): d = self._set_size() + self.fileObject.seek(0) self.updater.start(5) def _set_size(self): @@ -74,17 +81,22 @@ class LBRYFileProducer(StaticProducer): return d def _check_for_data(self): - self.fileObject.seek(self.fileObject.tell()) - data = self.fileObject.read() - if data: - self.request.write(data) + def _write_new_data_to_request(): + self.fileObject.seek(self.fileObject.tell()) + data = self.fileObject.read() + self.total_bytes += len(data) + + if data: + self.request.write(data) + return defer.succeed(None) def _check_status(stream_status): if stream_status.running_status == "completed": self.stopProducing() return defer.succeed(None) - d = self.stream.status() + d = _write_new_data_to_request() + d.addCallback(lambda _: self.stream.status()) d.addCallback(_check_status) def resumeProducing(self): @@ -102,24 +114,22 @@ class HostedLBRYFile(resource.Resource): self._api = api self.stream = None self.streaming_file = None + self.producer = None resource.Resource.__init__(self) - def _set_stream(self, stream): - self.stream = stream - def makeProducer(self, request, stream): - return LBRYFileProducer(request, stream) + self.producer = LBRYFileProducer(request, stream, self._api) + return self.producer def render_GET(self, request): if 'name' in request.args.keys(): - if request.args['name'][0] != 'lbry': - if request.args['name'][0] != self.streaming_file: - self.streaming_file = request.args['name'][0] - d = self._api._download_name(request.args['name'][0]) - d.addCallback(self._set_stream) - else: - d = defer.succeed(None) - d.addCallback(lambda _: self.makeProducer(request, self.stream).start()) + if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): + d = self._api._download_name(request.args['name'][0]) + d.addCallback(lambda stream: self.makeProducer(request, stream)) + d.addCallback(lambda producer: producer.start()) + elif request.args['name'][0] in self._api.waiting_on.keys(): + request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) + request.finish() else: request.redirect(UI_ADDRESS) request.finish() diff --git a/lbrynet/lbrynet_daemon/LBRYDownloader.py b/lbrynet/lbrynet_daemon/LBRYDownloader.py index b52c8c92a..048eb3e5b 100644 --- a/lbrynet/lbrynet_daemon/LBRYDownloader.py +++ b/lbrynet/lbrynet_daemon/LBRYDownloader.py @@ -1,7 +1,9 @@ import json import logging import os +import sys +from appdirs import user_data_dir from datetime import datetime from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -12,7 +14,18 @@ from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory from lbrynet.conf import DEFAULT_TIMEOUT +if sys.platform != "darwin": + log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") +else: + log_dir = user_data_dir("LBRY") + +if not os.path.isdir(log_dir): + os.mkdir(log_dir) + +LOG_FILENAME = os.path.join(log_dir, 'lbrynet-daemon.log') log = logging.getLogger(__name__) +handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=2097152, backupCount=5) +log.addHandler(handler) log.setLevel(logging.INFO) @@ -123,7 +136,7 @@ class GetStream(object): d = defer.Deferred() self.downloader = downloader self.download_path = os.path.join(downloader.download_directory, downloader.file_name) - d.addCallback(lambda _: log.info("Downloading " + str(self.stream_hash) + " --> " + str(self.download_path))) + d.addCallback(lambda _: log.info("[" + str(datetime.now()) + "] Downloading " + str(self.stream_hash) + " --> " + str(self.download_path))) d.addCallback(lambda _: self.downloader.start())