diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 8b1a36331..8f010329d 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -4,5 +4,5 @@ import logging logging.getLogger(__name__).addHandler(logging.NullHandler()) -version = (0, 2, 2) +version = (0, 2, 3) __version__ = ".".join([str(x) for x in version]) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index f3903ae6a..dfae3d274 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -18,8 +18,6 @@ MIN_BLOB_INFO_PAYMENT_RATE = .02 # points/1000 infos MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE = .05 # points/1000 infos MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE = .05 # points/1000 infos MAX_CONNECTIONS_PER_STREAM = 5 -DEFAULT_MAX_SEARCH_RESULTS = 25 -DEFAULT_MAX_KEY_FEE = 100.0 KNOWN_DHT_NODES = [('104.236.42.182', 4000)] @@ -33,11 +31,13 @@ API_ADDRESS = "lbryapi" API_PORT = 5279 ICON_PATH = "app.icns" APP_NAME = "LBRY" -DEFAULT_WALLET = "lbryum" - API_CONNECTION_STRING = "http://%s:%i/%s" % (API_INTERFACE, API_PORT, API_ADDRESS) UI_ADDRESS = "http://" + API_INTERFACE + ":" + str(API_PORT) - PROTOCOL_PREFIX = "lbry" -DEFAULT_TIMEOUT = 30 \ No newline at end of file +DEFAULT_WALLET = "lbryum" +DEFAULT_TIMEOUT = 30 +DEFAULT_MAX_SEARCH_RESULTS = 25 +DEFAULT_MAX_KEY_FEE = 100.0 +DEFAULT_SEARCH_TIMEOUT = 3.0 +DEFAULT_CACHE_TIME = 3600 \ No newline at end of file diff --git a/lbrynet/core/LBRYcrdWallet.py b/lbrynet/core/LBRYcrdWallet.py index bdf35af6b..5691d291f 100644 --- a/lbrynet/core/LBRYcrdWallet.py +++ b/lbrynet/core/LBRYcrdWallet.py @@ -1014,7 +1014,7 @@ class LBRYumWallet(LBRYWallet): self.max_behind = self.blocks_behind_alert self.catchup_progress = int(100 * (self.blocks_behind_alert / (5 + self.max_behind))) if self._caught_up_counter == 0: - alert.info('Catching up to the blockchain...showing blocks left...') + alert.info('Catching up with the blockchain...showing blocks left...') if self._caught_up_counter % 30 == 0: alert.info('%d...', (remote_height - local_height)) @@ -1136,6 +1136,39 @@ class LBRYumWallet(LBRYWallet): func = getattr(self.cmd_runner, cmd.name) return threads.deferToThread(func) + def get_history(self): + cmd = known_commands['history'] + func = getattr(self.cmd_runner, cmd.name) + return threads.deferToThread(func) + + def get_tx_json(self, txid): + def _decode(raw_tx): + tx = Transaction(raw_tx).deserialize() + decoded_tx = {} + for txkey in tx.keys(): + if isinstance(tx[txkey], list): + decoded_tx[txkey] = [] + for i in tx[txkey]: + tmp = {} + for k in i.keys(): + if isinstance(i[k], Decimal): + tmp[k] = float(i[k] / 1e8) + else: + tmp[k] = i[k] + decoded_tx[txkey].append(tmp) + else: + decoded_tx[txkey] = tx[txkey] + return decoded_tx + + d = self._get_raw_tx(txid) + d.addCallback(_decode) + return d + + def get_pub_keys(self, wallet): + cmd = known_commands['getpubkeys'] + func = getattr(self.cmd_runner, cmd.name) + return threads.deferToThread(func, wallet) + def _save_wallet(self, val): d = threads.deferToThread(self.wallet.storage.write) d.addCallback(lambda _: val) diff --git a/lbrynet/lbryfile/client/LBRYFileDownloader.py b/lbrynet/lbryfile/client/LBRYFileDownloader.py index a65f49484..16ee9425d 100644 --- a/lbrynet/lbryfile/client/LBRYFileDownloader.py +++ b/lbrynet/lbryfile/client/LBRYFileDownloader.py @@ -211,10 +211,20 @@ class LBRYFileSaver(LBRYFileDownloader): file_name = "_" if os.path.exists(os.path.join(self.download_directory, file_name)): ext_num = 1 + + def _get_file_name(ext): + if len(file_name.split(".")): + fn = ''.join(file_name.split(".")[:-1]) + file_ext = ''.join(file_name.split(".")[-1]) + return fn + "-" + str(ext) + "." + file_ext + else: + return file_name + "_" + str(ext) + while os.path.exists(os.path.join(self.download_directory, - file_name + "_" + str(ext_num))): + _get_file_name(ext_num))): ext_num += 1 - file_name = file_name + "_" + str(ext_num) + + file_name = _get_file_name(ext_num) try: self.file_handle = open(os.path.join(self.download_directory, file_name), 'wb') self.file_written_to = os.path.join(self.download_directory, file_name) diff --git a/lbrynet/lbryfilemanager/LBRYFileCreator.py b/lbrynet/lbryfilemanager/LBRYFileCreator.py index 1b0fcb5b0..770d1ba6b 100644 --- a/lbrynet/lbryfilemanager/LBRYFileCreator.py +++ b/lbrynet/lbryfilemanager/LBRYFileCreator.py @@ -131,7 +131,8 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non def make_stream_desc_file(stream_hash): log.debug("creating the stream descriptor file") - descriptor_writer = PlainStreamDescriptorWriter(file_name + conf.CRYPTSD_FILE_EXTENSION) + descriptor_file_path = os.path.join(session.db_dir, file_name + conf.CRYPTSD_FILE_EXTENSION) + descriptor_writer = PlainStreamDescriptorWriter(descriptor_file_path) d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True) diff --git a/lbrynet/lbryfilemanager/LBRYFileDownloader.py b/lbrynet/lbryfilemanager/LBRYFileDownloader.py index 2afc17588..fa3ce530e 100644 --- a/lbrynet/lbryfilemanager/LBRYFileDownloader.py +++ b/lbrynet/lbryfilemanager/LBRYFileDownloader.py @@ -24,12 +24,22 @@ class ManagedLBRYFileDownloader(LBRYFileSaver): LBRYFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, payment_rate_manager, wallet, download_directory, upload_allowed, file_name) + self.sd_hash = None self.rowid = rowid self.lbry_file_manager = lbry_file_manager self.saving_status = False def restore(self): - d = self.lbry_file_manager.get_lbry_file_status(self) + d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) + + def _save_sd_hash(sd_hash): + if len(sd_hash): + self.sd_hash = sd_hash[0] + return defer.succeed(None) + + d.addCallback(_save_sd_hash) + + d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) def restore_status(status): if status == ManagedLBRYFileDownloader.STATUS_RUNNING: @@ -87,6 +97,14 @@ class ManagedLBRYFileDownloader(LBRYFileSaver): d = LBRYFileSaver._start(self) + d.addCallback(lambda _: self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash)) + + def _save_sd_hash(sd_hash): + self.sd_hash = sd_hash[0] + return defer.succeed(None) + + d.addCallback(_save_sd_hash) + d.addCallback(lambda _: self._save_status()) return d @@ -119,7 +137,7 @@ class ManagedLBRYFileDownloaderFactory(object): def can_download(self, sd_validator): return True - def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None): + def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None, file_name=None): data_rate = options[0] upload_allowed = options[1] @@ -138,7 +156,8 @@ class ManagedLBRYFileDownloaderFactory(object): payment_rate_manager, data_rate, upload_allowed, - download_directory=download_directory)) + download_directory=download_directory, + file_name=file_name)) return d @staticmethod diff --git a/lbrynet/lbryfilemanager/LBRYFileManager.py b/lbrynet/lbryfilemanager/LBRYFileManager.py index d0dbca1ae..d805fb38d 100644 --- a/lbrynet/lbryfilemanager/LBRYFileManager.py +++ b/lbrynet/lbryfilemanager/LBRYFileManager.py @@ -4,10 +4,7 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi import logging import os -import sys -from datetime import datetime -from twisted.internet.task import LoopingCall from twisted.enterprise import adbapi from twisted.internet import defer, task, reactor from twisted.python.failure import Failure @@ -28,14 +25,12 @@ class LBRYFileManager(object): Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata. """ - def __init__(self, session, stream_info_manager, sd_identifier, delete_data=False, download_directory=None): + def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None): self.session = session self.stream_info_manager = stream_info_manager self.sd_identifier = sd_identifier self.lbry_files = [] self.sql_db = None - # self.delete_data = delete_data - # self.check_exists_loop = LoopingCall(self.check_files_exist) if download_directory: self.download_directory = download_directory else: @@ -43,35 +38,11 @@ class LBRYFileManager(object): log.debug("Download directory for LBRYFileManager: %s", str(self.download_directory)) def setup(self): - # self.check_exists_loop.start(10) - d = self._open_db() d.addCallback(lambda _: self._add_to_sd_identifier()) d.addCallback(lambda _: self._start_lbry_files()) return d - # def check_files_exist(self): - # def _disp(deleted_files): - # if deleted_files[0][0]: - # for file in bad_files: - # log.info("[" + str(datetime.now()) + "] Detected " + file.file_name + " was deleted, removing from file manager") - # - # def _delete_stream_data(lbry_file): - # s_h = lbry_file.stream_hash - # d = self.get_count_for_stream_hash(s_h) - # # TODO: could possibly be a timing issue here - # d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True) - # return d - # - # bad_files = [lbry_file for lbry_file in self.lbry_files - # if lbry_file.completed == True and - # os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)) == False] - # d = defer.DeferredList([self.delete_lbry_file(lbry_file) for lbry_file in bad_files], consumeErrors=True) - # d.addCallback(lambda files: _disp(files) if len(files) else defer.succeed(None)) - # - # if self.delete_data: - # d2 = defer.DeferredList([_delete_stream_data(lbry_file) for lbry_file in bad_files], consumeErrors=True) - def get_lbry_file_status(self, lbry_file): return self._get_lbry_file_status(lbry_file.rowid) @@ -123,7 +94,7 @@ class LBRYFileManager(object): return d def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True, - download_directory=None): + download_directory=None, file_name=None): if not download_directory: download_directory = self.download_directory payment_rate_manager.min_blob_data_payment_rate = blob_data_rate @@ -134,16 +105,18 @@ class LBRYFileManager(object): self.stream_info_manager, self, payment_rate_manager, self.session.wallet, download_directory, - upload_allowed) + upload_allowed, + file_name=file_name) self.lbry_files.append(lbry_file_downloader) d = lbry_file_downloader.set_stream_info() d.addCallback(lambda _: lbry_file_downloader) return d - def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True, download_directory=None): + def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True, + download_directory=None, file_name=None): d = self._save_lbry_file(stream_hash, blob_data_rate) d.addCallback(lambda rowid: self.start_lbry_file(rowid, stream_hash, payment_rate_manager, - blob_data_rate, upload_allowed, download_directory)) + blob_data_rate, upload_allowed, download_directory, file_name)) return d def delete_lbry_file(self, lbry_file): @@ -183,7 +156,6 @@ class LBRYFileManager(object): return defer.fail(Failure(ValueError("Could not find that LBRY file"))) def stop(self): - # self.check_exists_loop.stop() ds = [] diff --git a/lbrynet/lbrynet_daemon/Apps/LBRYURIHandler.py b/lbrynet/lbrynet_daemon/Apps/LBRYURIHandler.py index c748a1015..f6990cfea 100644 --- a/lbrynet/lbrynet_daemon/Apps/LBRYURIHandler.py +++ b/lbrynet/lbrynet_daemon/Apps/LBRYURIHandler.py @@ -11,73 +11,39 @@ API_CONNECTION_STRING = "http://localhost:5279/lbryapi" UI_ADDRESS = "http://localhost:5279" -class Timeout(Exception): - def __init__(self, value): - self.parameter = value - - def __str__(self): - return repr(self.parameter) - - class LBRYURIHandler(object): def __init__(self): self.started_daemon = False - self.start_timeout = 0 self.daemon = JSONRPCProxy.from_url(API_CONNECTION_STRING) - def check_status(self): - status = None - try: - status = self.daemon.is_running() - if self.start_timeout < 30 and not status: - sleep(1) - self.start_timeout += 1 - self.check_status() - elif status: - return True - else: - raise Timeout("LBRY daemon is running, but connection timed out") - except: - if self.start_timeout < 30: - sleep(1) - self.start_timeout += 1 - self.check_status() - else: - raise Timeout("Timed out trying to start LBRY daemon") - def handle_osx(self, lbry_name): - lbry_process = [d for d in subprocess.Popen(['ps','aux'], stdout=subprocess.PIPE).stdout.readlines() - if 'LBRY.app' in d and 'LBRYURIHandler' not in d] try: status = self.daemon.is_running() except: - status = None - - if lbry_process or status: - self.check_status() - started = False - else: os.system("open /Applications/LBRY.app") - self.check_status() - started = True + sleep(3) - if lbry_name == "lbry" or lbry_name == "" and not started: + if lbry_name == "lbry" or lbry_name == "": webbrowser.open(UI_ADDRESS) else: - webbrowser.open(UI_ADDRESS + "/view?name=" + lbry_name) + webbrowser.open(UI_ADDRESS + "/?watch=" + lbry_name) def handle_linux(self, lbry_name): try: - is_running = self.daemon.is_running() - if not is_running: - sys.exit(0) + status = self.daemon.is_running() except: - sys.exit(0) + cmd = r'DIR = "$( cd "$(dirname "${BASH_SOURCE[0]}" )" && pwd )"' \ + r'if [-z "$(pgrep lbrynet-daemon)"]; then' \ + r'echo "running lbrynet-daemon..."' \ + r'$DIR / lbrynet - daemon &' \ + r'sleep 3 # let the daemon load before connecting' \ + r'fi' + subprocess.Popen(cmd, shell=True) - if lbry_name == "lbry": + if lbry_name == "lbry" or lbry_name == "": webbrowser.open(UI_ADDRESS) else: - webbrowser.open(UI_ADDRESS + "/view?name=" + lbry_name) + webbrowser.open(UI_ADDRESS + "/?watch=" + lbry_name) def main(args): diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index ff0bfd87a..f16f5562a 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -3,17 +3,15 @@ import os import sys import simplejson as json import binascii -import subprocess -import logging import logging.handlers import requests -import base64 import base58 import platform -import json +import socket -from twisted.web import server, resource, static +from twisted.web import server from twisted.internet import defer, threads, error, reactor +from twisted.internet.task import LoopingCall from txjsonrpc import jsonrpclib from txjsonrpc.web import jsonrpc from txjsonrpc.web.jsonrpc import Handler @@ -29,16 +27,16 @@ from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory -from lbrynet.lbrynet_console.ControlHandlers import get_time_behind_blockchain -from lbrynet.core.Error import UnknownNameError +from lbrynet.core.Error import UnknownNameError, InsufficientFundsError from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier -from lbrynet.lbrynet_daemon.LBRYDownloader import GetStream, FetcherDaemon +from lbrynet.lbrynet_daemon.LBRYDownloader import GetStream from lbrynet.lbrynet_daemon.LBRYPublisher import Publisher from lbrynet.core.utils import generate_id from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings -from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, KNOWN_DHT_NODES, DEFAULT_MAX_KEY_FEE +from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, KNOWN_DHT_NODES, DEFAULT_MAX_KEY_FEE, \ + DEFAULT_WALLET, DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME from lbrynet.conf import API_CONNECTION_STRING, API_PORT, API_ADDRESS, DEFAULT_TIMEOUT, UI_ADDRESS from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.Session import LBRYSession @@ -66,19 +64,46 @@ else: PREVIOUS_LOG = 0 log = logging.getLogger(__name__) -handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=262144, backupCount=5) +handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=2097152, backupCount=5) log.addHandler(handler) log.setLevel(logging.INFO) +INITIALIZING_CODE = 'initializing' +LOADING_DB_CODE = 'loading_db' +LOADING_WALLET_CODE = 'loading_wallet' +LOADING_FILE_MANAGER_CODE = 'loading_file_manager' +LOADING_SERVER_CODE = 'loading_server' +STARTED_CODE = 'started' STARTUP_STAGES = [ - ('initializing', 'Initializing...'), - ('loading_db', 'Loading databases...'), - ('loading_wallet', 'Catching up with the blockchain... %s'), - ('loading_file_manager', 'Setting up file manager'), - ('loading_server', 'Starting lbrynet'), - ('started', 'Started lbrynet') + (INITIALIZING_CODE, 'Initializing...'), + (LOADING_DB_CODE, 'Loading databases...'), + (LOADING_WALLET_CODE, 'Catching up with the blockchain... %s'), + (LOADING_FILE_MANAGER_CODE, 'Setting up file manager'), + (LOADING_SERVER_CODE, 'Starting lbrynet'), + (STARTED_CODE, 'Started lbrynet') ] +DOWNLOAD_METADATA_CODE = 'downloading_metadata' +DOWNLOAD_TIMEOUT_CODE = 'timeout' +DOWNLOAD_RUNNING_CODE = 'running' +DOWNLOAD_STOPPED_CODE = 'stopped' +STREAM_STAGES = [ + (INITIALIZING_CODE, 'Initializing...'), + (DOWNLOAD_METADATA_CODE, 'Downloading metadata'), + (DOWNLOAD_RUNNING_CODE, 'Started %s, got %s/%s blobs, stream status: %s'), + (DOWNLOAD_STOPPED_CODE, 'Paused stream'), + (DOWNLOAD_TIMEOUT_CODE, 'Stream timed out') + ] + +CONNECT_CODE_VERSION_CHECK = 'version_check' +CONNECT_CODE_NETWORK = 'network_connection' +CONNECT_CODE_WALLET = 'wallet_catchup_lag' +CONNECTION_PROBLEM_CODES = [ + (CONNECT_CODE_VERSION_CHECK, "There was a problem checking for updates on github"), + (CONNECT_CODE_NETWORK, "Your internet connection appears to have been interrupted"), + (CONNECT_CODE_WALLET, "Synchronization with the blockchain is lagging... if this continues try restarting LBRY") + ] + ALLOWED_DURING_STARTUP = ['is_running', 'is_first_run', 'get_time_behind_blockchain', 'stop', 'daemon_status', 'get_start_notice', @@ -87,10 +112,14 @@ ALLOWED_DURING_STARTUP = ['is_running', 'is_first_run', BAD_REQUEST = 400 NOT_FOUND = 404 OK_CODE = 200 + # TODO add login credentials in a conf file # TODO alert if your copy of a lbry file is out of date with the name record +REMOTE_SERVER = "www.google.com" + + class LBRYDaemon(jsonrpc.JSONRPC): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -98,25 +127,27 @@ class LBRYDaemon(jsonrpc.JSONRPC): isLeaf = True - def __init__(self, ui_version_info, wallet_type="lbryum"): + def __init__(self, ui_version_info, wallet_type=DEFAULT_WALLET): jsonrpc.JSONRPC.__init__(self) reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) self.startup_status = STARTUP_STAGES[0] self.startup_message = None self.announced_startup = False + self.connected_to_internet = True + self.connection_problem = None self.query_handlers = {} self.ui_version = ui_version_info.replace('\n', '') self.git_lbrynet_version = None self.git_lbryum_version = None self.wallet_type = wallet_type - self.session_settings = None self.first_run = None self.log_file = LOG_FILENAME - self.fetcher = None self.current_db_revision = 1 self.run_server = True self.session = None + self.waiting_on = {} + self.streams = {} self.known_dht_nodes = KNOWN_DHT_NODES self.platform_info = { "processor": platform.processor(), @@ -135,7 +166,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): if os.name == "nt": from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle - self.download_directory = get_path(FOLDERID.Downloads, UserHandle.current) + default_download_directory = get_path(FOLDERID.Downloads, UserHandle.current) self.db_dir = os.path.join(get_path(FOLDERID.RoamingAppData, UserHandle.current), "lbrynet") self.lbrycrdd_path = "lbrycrdd.exe" if wallet_type == "lbrycrd": @@ -143,7 +174,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): else: self.wallet_dir = os.path.join(get_path(FOLDERID.RoamingAppData, UserHandle.current), "lbryum") elif sys.platform == "darwin": - self.download_directory = os.path.join(os.path.expanduser("~"), 'Downloads') + default_download_directory = os.path.join(os.path.expanduser("~"), 'Downloads') self.db_dir = user_data_dir("LBRY") self.lbrycrdd_path = "./lbrycrdd" if wallet_type == "lbrycrd": @@ -151,7 +182,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): else: self.wallet_dir = user_data_dir("LBRY") else: - self.download_directory = os.getcwd() + default_download_directory = os.getcwd() self.db_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") self.lbrycrdd_path = "./lbrycrdd" if wallet_type == "lbrycrd": @@ -172,6 +203,11 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.wallet_user = None self.wallet_password = None + self.internet_connection_checker = LoopingCall(self._check_network_connection) + self.version_checker = LoopingCall(self._check_remote_versions) + self.connection_problem_checker = LoopingCall(self._check_connection_problems) + # self.lbrynet_connection_checker = LoopingCall(self._check_lbrynet_connection) + self.sd_identifier = StreamDescriptorIdentifier() self.stream_info_manager = TempLBRYFileMetadataManager() self.settings = LBRYSettings(self.db_dir) @@ -179,16 +215,16 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.lbry_file_metadata_manager = None self.lbry_file_manager = None - #defaults for settings otherwise loaded from daemon_settings.json self.default_settings = { 'run_on_startup': False, 'data_rate': MIN_BLOB_DATA_PAYMENT_RATE, 'max_key_fee': DEFAULT_MAX_KEY_FEE, - 'default_download_directory': self.download_directory, + 'download_directory': default_download_directory, 'max_upload': 0.0, 'max_download': 0.0, 'upload_log': True, - 'search_timeout': 3.0, + 'search_timeout': DEFAULT_SEARCH_TIMEOUT, + 'download_timeout': DEFAULT_TIMEOUT, 'max_search_results': DEFAULT_MAX_SEARCH_RESULTS, 'wallet_type': wallet_type, 'delete_blobs_on_remove': True, @@ -196,9 +232,71 @@ class LBRYDaemon(jsonrpc.JSONRPC): 'dht_node_port': 4444, 'use_upnp': True, 'start_lbrycrdd': True, - 'requested_first_run_credits': False + 'requested_first_run_credits': False, + 'cache_time': DEFAULT_CACHE_TIME } + if os.path.isfile(self.daemon_conf): + f = open(self.daemon_conf, "r") + loaded_settings = json.loads(f.read()) + f.close() + missing_settings = {} + removed_settings = {} + for k in self.default_settings.keys(): + if k not in loaded_settings.keys(): + missing_settings[k] = self.default_settings[k] + for k in loaded_settings.keys(): + if not k in self.default_settings.keys(): + log.info("Removing unused setting: " + k + " with value: " + str(loaded_settings[k])) + removed_settings[k] = loaded_settings[k] + del loaded_settings[k] + for k in missing_settings.keys(): + log.info("Adding missing setting: " + k + " with default value: " + str(missing_settings[k])) + loaded_settings[k] = missing_settings[k] + if missing_settings or removed_settings: + f = open(self.daemon_conf, "w") + f.write(json.dumps(loaded_settings)) + f.close() + else: + log.info("Loaded lbrynet-daemon configuration") + settings_dict = loaded_settings + else: + log.info( + "Writing default settings : " + json.dumps(self.default_settings) + " --> " + str(self.daemon_conf)) + f = open(self.daemon_conf, "w") + f.write(json.dumps(self.default_settings)) + f.close() + settings_dict = self.default_settings + + self.session_settings = settings_dict + + self.run_on_startup = self.session_settings['run_on_startup'] + self.data_rate = self.session_settings['data_rate'] + self.max_key_fee = self.session_settings['max_key_fee'] + self.download_directory = self.session_settings['download_directory'] + self.max_upload = self.session_settings['max_upload'] + self.max_download = self.session_settings['max_download'] + self.upload_log = self.session_settings['upload_log'] + self.search_timeout = self.session_settings['search_timeout'] + self.download_timeout = self.session_settings['download_timeout'] + self.max_search_results = self.session_settings['max_search_results'] + self.wallet_type = self.session_settings['wallet_type'] + self.delete_blobs_on_remove = self.session_settings['delete_blobs_on_remove'] + self.peer_port = self.session_settings['peer_port'] + self.dht_node_port = self.session_settings['dht_node_port'] + self.use_upnp = self.session_settings['use_upnp'] + self.start_lbrycrdd = self.session_settings['start_lbrycrdd'] + self.requested_first_run_credits = self.session_settings['requested_first_run_credits'] + self.cache_time = self.session_settings['cache_time'] + + if os.path.isfile(os.path.join(self.db_dir, "stream_info_cache.json")): + f = open(os.path.join(self.db_dir, "stream_info_cache.json"), "r") + self.name_cache = json.loads(f.read()) + f.close() + log.info("Loaded claim info cache") + else: + self.name_cache = {} + def render(self, request): request.content.seek(0, 0) # Unmarshal the JSON-RPC data. @@ -267,41 +365,18 @@ class LBRYDaemon(jsonrpc.JSONRPC): def setup(self): def _log_starting_vals(): - def _get_lbry_files_json(): - r = self._get_lbry_files() - return json.dumps(r) - - def _get_lbryum_version(): - r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n') - version = next(line.split("=")[1].split("#")[0].replace(" ", "") - for line in r if "ELECTRUM_VERSION" in line) - version = version.replace("'", "") - log.info("remote lbryum " + str(version) + " > local lbryum " + str(lbryum_version) + " = " + str( - version > lbryum_version)) - return version - - def _get_lbrynet_version(): - r = urlopen("https://raw.githubusercontent.com/lbryio/lbry/master/lbrynet/__init__.py").read().split('\n') - vs = next(i for i in r if 'version =' in i).split("=")[1].replace(" ", "") - vt = tuple(int(x) for x in vs[1:-1].split(',')) - vr = ".".join([str(x) for x in vt]) - log.info("remote lbrynet " + str(vr) + " > local lbrynet " + str(lbrynet_version) + " = " + str( - vr > lbrynet_version)) - return vr - - self.git_lbrynet_version = _get_lbrynet_version() - self.git_lbryum_version = _get_lbryum_version() - - log.info("LBRY Files: " + _get_lbry_files_json()) - log.info("Starting balance: " + str(self.session.wallet.wallet_balance)) - - return defer.succeed(None) + d = self._get_lbry_files() + d.addCallback(lambda r: json.dumps([d[1] for d in r])) + d.addCallback(lambda r: log.info("LBRY Files: " + r)) + d.addCallback(lambda _: log.info("Starting balance: " + str(self.session.wallet.wallet_balance))) + return d def _announce_startup(): def _announce(): self.announced_startup = True self.startup_status = STARTUP_STAGES[5] log.info("[" + str(datetime.now()) + "] Started lbrynet-daemon") + # self.lbrynet_connection_checker.start(3600) if self.first_run: d = self._upload_log(name_prefix="fr") @@ -317,9 +392,12 @@ class LBRYDaemon(jsonrpc.JSONRPC): log.info("[" + str(datetime.now()) + "] Starting lbrynet-daemon") + self.internet_connection_checker.start(3600) + self.version_checker.start(3600 * 12) + self.connection_problem_checker.start(1) + d = defer.Deferred() d.addCallback(lambda _: self._initial_setup()) - d.addCallback(self._set_daemon_settings) d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory)) d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._get_settings()) @@ -331,7 +409,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda _: self._setup_lbry_file_opener()) d.addCallback(lambda _: self._setup_query_handlers()) d.addCallback(lambda _: self._setup_server()) - d.addCallback(lambda _: self._setup_fetcher()) d.addCallback(lambda _: _log_starting_vals()) d.addCallback(lambda _: _announce_startup()) d.callback(None) @@ -343,44 +420,77 @@ class LBRYDaemon(jsonrpc.JSONRPC): log.info("Platform: " + json.dumps(self.platform_info)) return defer.succeed(None) - def _load_daemon_conf(): - if os.path.isfile(self.daemon_conf): - f = open(self.daemon_conf, "r") - loaded_settings = json.loads(f.read()) - f.close() - missing_settings = {} - for k in self.default_settings.keys(): - if k not in loaded_settings.keys(): - missing_settings[k] = self.default_settings[k] - if missing_settings != {}: - for k in missing_settings.keys(): - log.info("Adding missing setting: " + k + " with default value: " + str(missing_settings[k])) - loaded_settings[k] = missing_settings[k] - f = open(self.daemon_conf, "w") - f.write(json.dumps(loaded_settings)) - f.close() - rsettings = loaded_settings - else: - log.info("Writing default settings : " + json.dumps(self.default_settings) + " --> " + str(self.daemon_conf)) - f = open(self.daemon_conf, "w") - f.write(json.dumps(self.default_settings)) - f.close() - rsettings = self.default_settings - for k in rsettings.keys(): - self.__dict__[k] = rsettings[k] - return rsettings - d = _log_platform() - d.addCallback(lambda _: _load_daemon_conf()) return d - def _set_daemon_settings(self, settings): - self.session_settings = settings - return defer.succeed(None) + def _check_network_connection(self): + try: + host = socket.gethostbyname(REMOTE_SERVER) + s = socket.create_connection((host, 80), 2) + self.connected_to_internet = True + except: + log.info("[" + str(datetime.now()) + "] Internet connection not working") + self.connected_to_internet = False + + def _check_lbrynet_connection(self): + def _log_success(): + log.info("[" + str(datetime.now()) + "] lbrynet connectivity test passed") + def _log_failure(): + log.info("[" + str(datetime.now()) + "] lbrynet connectivity test failed") + + wonderfullife_sh = "6f3af0fa3924be98a54766aa2715d22c6c1509c3f7fa32566df4899a41f3530a9f97b2ecb817fa1dcbf1b30553aefaa7" + d = download_sd_blob(self.session, wonderfullife_sh, self.session.base_payment_rate_manager) + d.addCallbacks(lambda _: _log_success, lambda _: _log_failure) + + def _check_remote_versions(self): + def _get_lbryum_version(): + try: + r = urlopen("https://raw.githubusercontent.com/lbryio/lbryum/master/lib/version.py").read().split('\n') + version = next(line.split("=")[1].split("#")[0].replace(" ", "") + for line in r if "ELECTRUM_VERSION" in line) + version = version.replace("'", "") + log.info("remote lbryum " + str(version) + " > local lbryum " + str(lbryum_version) + " = " + str( + version > lbryum_version)) + self.git_lbryum_version = version + return defer.succeed(None) + except: + log.info("[" + str(datetime.now()) + "] Failed to get lbryum version from git") + self.git_lbryum_version = None + return defer.fail(None) + + def _get_lbrynet_version(): + try: + r = urlopen("https://raw.githubusercontent.com/lbryio/lbry/master/lbrynet/__init__.py").read().split('\n') + vs = next(i for i in r if 'version =' in i).split("=")[1].replace(" ", "") + vt = tuple(int(x) for x in vs[1:-1].split(',')) + vr = ".".join([str(x) for x in vt]) + log.info("remote lbrynet " + str(vr) + " > local lbrynet " + str(lbrynet_version) + " = " + str( + vr > lbrynet_version)) + self.git_lbrynet_version = vr + return defer.succeed(None) + except: + log.info("[" + str(datetime.now()) + "] Failed to get lbrynet version from git") + self.git_lbrynet_version = None + return defer.fail(None) + + d = _get_lbrynet_version() + d.addCallback(lambda _: _get_lbryum_version()) + + def _check_connection_problems(self): + if not self.git_lbrynet_version or not self.git_lbryum_version: + self.connection_problem = CONNECTION_PROBLEM_CODES[0] + + elif self.startup_status[0] == 'loading_wallet': + if self.session.wallet.is_lagging: + self.connection_problem = CONNECTION_PROBLEM_CODES[2] + else: + self.connection_problem = None + + if not self.connected_to_internet: + self.connection_problem = CONNECTION_PROBLEM_CODES[1] def _start_server(self): - if self.peer_port is not None: server_factory = ServerProtocolFactory(self.session.rate_limiter, @@ -402,7 +512,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) def _setup_server(self): - def restore_running_status(running): if running is True: return self._start_server() @@ -439,7 +548,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): return dl def _add_query_handlers(self, query_handlers): - def _set_query_handlers(statuses): from future_builtins import zip for handler, (success, status) in zip(query_handlers, statuses): @@ -453,13 +561,13 @@ class LBRYDaemon(jsonrpc.JSONRPC): dl.addCallback(_set_query_handlers) return dl - def _upload_log(self, name_prefix=None, exclude_previous=False): + def _upload_log(self, name_prefix=None, exclude_previous=False, force=False): if name_prefix: name_prefix = name_prefix + "-" + platform.system() else: name_prefix = platform.system() - if self.session_settings['upload_log']: + if self.upload_log or force: LOG_URL = "https://lbry.io/log-upload" if exclude_previous: f = open(self.log_file, "r") @@ -475,8 +583,9 @@ class LBRYDaemon(jsonrpc.JSONRPC): params = {'name': log_name, 'log': log_contents} requests.post(LOG_URL, params) - - return defer.succeed(None) + return defer.succeed(None) + else: + return defer.succeed(None) def _shutdown(self): log.info("Closing lbrynet session") @@ -511,10 +620,10 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.session_settings['max_key_fee'] = float(settings['max_key_fee']) else: return defer.fail() - elif k == 'default_download_directory': - if type(settings['default_download_directory']) is unicode: - if os.path.isdir(settings['default_download_directory']): - self.session_settings['default_download_directory'] = settings['default_download_directory'] + elif k == 'download_directory': + if type(settings['download_directory']) is unicode: + if os.path.isdir(settings['download_directory']): + self.session_settings['download_directory'] = settings['download_directory'] else: pass else: @@ -538,6 +647,31 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.session_settings['upload_log'] = settings['upload_log'] else: return defer.fail() + elif k == 'download_timeout': + if type(settings['download_timeout']) is int: + self.session_settings['download_timeout'] = settings['download_timeout'] + else: + return defer.fail() + elif k == 'search_timeout': + if type(settings['search_timeout']) is float: + self.session_settings['search_timeout'] = settings['search_timeout'] + else: + return defer.fail() + elif k == 'cache_time': + if type(settings['cache_time']) is int: + self.session_settings['cache_time'] = settings['cache_time'] + else: + return defer.fail() + self.run_on_startup = self.session_settings['run_on_startup'] + self.data_rate = self.session_settings['data_rate'] + self.max_key_fee = self.session_settings['max_key_fee'] + self.download_directory = self.session_settings['download_directory'] + self.max_upload = self.session_settings['max_upload'] + self.max_download = self.session_settings['max_download'] + self.upload_log = self.session_settings['upload_log'] + self.download_timeout = self.session_settings['download_timeout'] + self.search_timeout = self.session_settings['search_timeout'] + self.cache_time = self.session_settings['cache_time'] f = open(self.daemon_conf, "w") f.write(json.dumps(self.session_settings)) @@ -545,11 +679,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(True) - def _setup_fetcher(self): - self.fetcher = FetcherDaemon(self.session, self.lbry_file_manager, self.lbry_file_metadata_manager, - self.session.wallet, self.sd_identifier, self.autofetcher_conf) - return defer.succeed(None) - def _setup_data_directory(self): self.startup_status = STARTUP_STAGES[1] log.info("Loading databases...") @@ -615,7 +744,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.lbry_file_manager = LBRYFileManager(self.session, self.lbry_file_metadata_manager, self.sd_identifier, - delete_data=True) + download_directory=self.download_directory) return self.lbry_file_manager.setup() d.addCallback(lambda _: set_lbry_file_manager()) @@ -692,7 +821,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): r = requests.post(url, json=data) if r.status_code == 200: self.requested_first_run_credits = True - self.session_settings['requested_first_run_credits'] = True f = open(self.daemon_conf, "w") f.write(json.dumps(self.session_settings)) f.close() @@ -745,7 +873,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): def _setup_stream_identifier(self): file_saver_factory = LBRYFileSaverFactory(self.session.peer_finder, self.session.rate_limiter, self.session.blob_manager, self.stream_info_manager, - self.session.wallet, self.session_settings['default_download_directory']) + self.session.wallet, self.download_directory) self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, file_saver_factory) file_opener_factory = LBRYFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter, self.session.blob_manager, self.stream_info_manager, @@ -761,94 +889,88 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory) return defer.succeed(True) - def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None): + def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None, file_name=None, stream_info=None): + """ + Add a lbry file to the file manager, start the download, and return the new lbry file. + If it already exists in the file manager, return the existing lbry file + """ + if not download_directory: - download_directory = self.session_settings['default_download_directory'] + download_directory = self.download_directory elif not os.path.isdir(download_directory): - download_directory = self.session_settings['default_download_directory'] + download_directory = self.download_directory + + def _remove_from_wait(r): + del self.waiting_on[name] + return r + + def _setup_stream(stream_info): + if 'sources' in stream_info.keys(): + stream_hash = stream_info['sources']['lbry_sd_hash'] + else: + stream_hash = stream_info['stream_hash'] + + d = self._get_lbry_file_by_sd_hash(stream_hash) + def _add_results(l): + return defer.succeed((stream_info, l)) + d.addCallback(_add_results) + return d def _disp_file(f): - file_path = os.path.join(self.session_settings['default_download_directory'], f.file_name) - log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.stream_hash) + " --> " + file_path) - return defer.succeed(f) + file_path = os.path.join(self.download_directory, f.file_name) + log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.sd_hash) + " --> " + file_path) + return f - def _get_stream(name): - def _disp(stream): - stream_hash = stream['stream_hash'] - if isinstance(stream_hash, dict): - stream_hash = stream_hash['sd_hash'] - - log.info("[" + str(datetime.now()) + "] Start stream: " + stream_hash) - return stream - - d = self.session.wallet.get_stream_info_for_name(name) - stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager, - max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout, - download_directory=download_directory) - d.addCallback(_disp) - d.addCallback(lambda stream_info: stream.start(stream_info)) - d.addCallback(lambda _: self._path_from_name(name)) + def _get_stream(stream_info): + self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet, + self.lbry_file_manager, max_key_fee=self.max_key_fee, + data_rate=self.data_rate, timeout=timeout, + download_directory=download_directory, file_name=file_name) + d = self.streams[name].start(stream_info, name) + d.addCallback(lambda _: self.streams[name].downloader) return d - d = self._check_history(name) - d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file)) - d.addCallback(lambda _: self._path_from_name(name)) - d.addErrback(lambda err: defer.fail(NOT_FOUND)) + if not stream_info: + self.waiting_on[name] = True + d = self._resolve_name(name) + else: + d = defer.succeed(stream_info) + d.addCallback(_setup_stream) + d.addCallback(lambda (stream_info, lbry_file): _get_stream(stream_info) if not lbry_file else _disp_file(lbry_file)) + if not stream_info: + d.addCallback(_remove_from_wait) return d + def _get_long_count_timestamp(self): + return int((datetime.utcnow() - (datetime(year=2012, month=12, day=21))).total_seconds()) + + def _update_claim_cache(self): + f = open(os.path.join(self.db_dir, "stream_info_cache.json"), "w") + f.write(json.dumps(self.name_cache)) + f.close() + return defer.succeed(True) + def _resolve_name(self, name): - d = defer.Deferred() - d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name)) - d.addErrback(lambda _: defer.fail(UnknownNameError)) + def _cache_stream_info(stream_info): + self.name_cache[name] = {'claim_metadata': stream_info, 'timestamp': self._get_long_count_timestamp()} + d = self._update_claim_cache() + d.addCallback(lambda _: self.name_cache[name]['claim_metadata']) + return d - return d - - def _resolve_name_wc(self, name): - d = defer.Deferred() - d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name)) - d.addErrback(lambda _: defer.fail(UnknownNameError)) - d.callback(None) - - return d - - def _check_history(self, name): - def _get_lbry_file(path): - f = open(path, 'r') - l = json.loads(f.read()) - f.close() - - file_name = l['stream_name'].decode('hex') - - for lbry_file in self.lbry_file_manager.lbry_files: - if lbry_file.stream_name == file_name: - if sys.platform == "darwin": - if os.path.isfile(os.path.join(self.session_settings['default_download_directory'], lbry_file.stream_name)): - return lbry_file - else: - return False - else: - return lbry_file + if name in self.name_cache.keys(): + if (self._get_long_count_timestamp() - self.name_cache[name]['timestamp']) < self.cache_time: + log.info("[" + str(datetime.now()) + "] Returning cached stream info for lbry://" + name) + d = defer.succeed(self.name_cache[name]['claim_metadata']) else: - return False - - def _check(info): - stream_hash = info['stream_hash'] - if isinstance(stream_hash, dict): - stream_hash = stream_hash['sd_hash'] - - path = os.path.join(self.blobfile_dir, stream_hash) - if os.path.isfile(path): - log.info("[" + str(datetime.now()) + "] Search for lbry_file, returning: " + stream_hash) - return defer.succeed(_get_lbry_file(path)) - else: - log.info("[" + str(datetime.now()) + "] Search for lbry_file didn't return anything") - return defer.succeed(False) - - d = self._resolve_name(name) - d.addCallback(_check) - d.callback(None) + log.info("[" + str(datetime.now()) + "] Refreshing stream info for lbry://" + name) + d = self.session.wallet.get_stream_info_for_name(name) + d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) + else: + log.info("[" + str(datetime.now()) + "] Resolving stream info for lbry://" + name) + d = self.session.wallet.get_stream_info_for_name(name) + d.addCallbacks(_cache_stream_info, lambda _: defer.fail(UnknownNameError)) return d @@ -865,31 +987,17 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = self.lbry_file_manager.get_count_for_stream_hash(s_h) # TODO: could possibly be a timing issue here d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True) - d.addCallback(lambda _: os.remove(os.path.join(self.session_settings['default_download_directory'], lbry_file.file_name)) if - os.path.isfile(os.path.join(self.session_settings['default_download_directory'], lbry_file.file_name)) else defer.succeed(None)) + d.addCallback(lambda _: os.remove(os.path.join(self.download_directory, lbry_file.file_name)) if + os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)) else defer.succeed(None)) return d d.addCallback(lambda _: finish_deletion(lbry_file)) + d.addCallback(lambda _: log.info("[" + str(datetime.now()) + "] Delete lbry file")) return d - def _path_from_name(self, name): - d = self._check_history(name) - d.addCallback(lambda lbry_file: {'stream_hash': lbry_file.stream_hash, - 'path': os.path.join(self.session_settings['default_download_directory'], lbry_file.file_name)} - if lbry_file else defer.fail(UnknownNameError)) - return d - - def _path_from_lbry_file(self, lbry_file): - if lbry_file: - r = {'stream_hash': lbry_file.stream_hash, - 'path': os.path.join(self.session_settings['default_download_directory'], lbry_file.file_name)} - return defer.succeed(r) - else: - return defer.fail(UnknownNameError) - def _get_est_cost(self, name): def _check_est(d, name): - if type(d.result) is float: + if isinstance(d.result, float): log.info("[" + str(datetime.now()) + "] Cost est for lbry://" + name + ": " + str(d.result) + "LBC") else: log.info("[" + str(datetime.now()) + "] Timeout estimating cost for lbry://" + name + ", using key fee") @@ -897,11 +1005,11 @@ class LBRYDaemon(jsonrpc.JSONRPC): return defer.succeed(None) def _add_key_fee(data_cost): - d = self.session.wallet.get_stream_info_for_name(name) + d = self._resolve_name(name) d.addCallback(lambda info: data_cost + info['key_fee'] if 'key_fee' in info.keys() else data_cost) return d - d = self.session.wallet.get_stream_info_for_name(name) + d = self._resolve_name(name) d.addCallback(lambda info: info['stream_hash'] if isinstance(info['stream_hash'], str) else info['stream_hash']['sd_hash']) d.addCallback(lambda sd_hash: download_sd_blob(self.session, sd_hash, @@ -915,23 +1023,123 @@ class LBRYDaemon(jsonrpc.JSONRPC): return d - def _get_lbry_files(self): - r = [] - for f in self.lbry_file_manager.lbry_files: - if f.key: - t = {'completed': f.completed, 'file_name': f.file_name, 'key': binascii.b2a_hex(f.key), - 'points_paid': f.points_paid, 'stopped': f.stopped, 'stream_hash': f.stream_hash, - 'stream_name': f.stream_name, 'suggested_file_name': f.suggested_file_name, - 'upload_allowed': f.upload_allowed} + def _get_lbry_file_by_uri(self, name): + def _get_file(stream_info): + if isinstance(stream_info['stream_hash'], str) or isinstance(stream_info['stream_hash'], unicode): + sd = stream_info['stream_hash'] + elif isinstance(stream_info['stream_hash'], dict): + sd = stream_info['stream_hash']['sd_hash'] + for l in self.lbry_file_manager.lbry_files: + if l.sd_hash == sd: + return defer.succeed(l) + return defer.succeed(None) + + d = self._resolve_name(name) + d.addCallback(_get_file) + + return d + + def _get_lbry_file_by_sd_hash(self, sd_hash): + for l in self.lbry_file_manager.lbry_files: + if l.sd_hash == sd_hash: + return defer.succeed(l) + return defer.succeed(None) + + def _get_lbry_file_by_file_name(self, file_name): + for l in self.lbry_file_manager.lbry_files: + if l.file_name == file_name: + return defer.succeed(l) + return defer.succeed(None) + + def _get_lbry_file(self, search_by, val, return_json=True): + def _log_get_lbry_file(f): + if f and val: + log.info("Found LBRY file for " + search_by + ": " + val) + elif val: + log.info("Did not find LBRY file for " + search_by + ": " + val) + return f + + def _get_json_for_return(f): + def _get_file_status(file_status): + message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, file_status.num_known, file_status.running_status) + return defer.succeed(message) + + def _generate_reply(size): + if f.key: + key = binascii.b2a_hex(f.key) + else: + key = None + + if os.path.isfile(os.path.join(self.download_directory, f.file_name)): + written_file = file(os.path.join(self.download_directory, f.file_name)) + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() + written_file.close() + else: + written_bytes = False + + if search_by == "name": + if val in self.streams.keys(): + status = self.streams[val].code + elif f in self.lbry_file_manager.lbry_files: + # if f.stopped: + # status = STREAM_STAGES[3] + # else: + status = STREAM_STAGES[2] + else: + status = [False, False] + else: + status = [False, False] + + if status[0] == DOWNLOAD_RUNNING_CODE: + d = f.status() + d.addCallback(_get_file_status) + d.addCallback(lambda message: {'completed': f.completed, 'file_name': f.file_name, 'key': key, + 'points_paid': f.points_paid, 'stopped': f.stopped, + 'stream_hash': f.stream_hash, + 'stream_name': f.stream_name, + 'suggested_file_name': f.suggested_file_name, + 'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash, + 'total_bytes': size, + 'written_bytes': written_bytes, 'code': status[0], + 'message': message}) + else: + d = defer.succeed({'completed': f.completed, 'file_name': f.file_name, 'key': key, + 'points_paid': f.points_paid, 'stopped': f.stopped, 'stream_hash': f.stream_hash, + 'stream_name': f.stream_name, 'suggested_file_name': f.suggested_file_name, + 'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash, 'total_bytes': size, + 'written_bytes': written_bytes, 'code': status[0], 'message': status[1]}) + + return d + + if f: + d = f.get_total_bytes() + d.addCallback(_generate_reply) + return d else: - t = {'completed': f.completed, 'file_name': f.file_name, 'key': None, - 'points_paid': f.points_paid, - 'stopped': f.stopped, 'stream_hash': f.stream_hash, 'stream_name': f.stream_name, - 'suggested_file_name': f.suggested_file_name, 'upload_allowed': f.upload_allowed} + return False - r.append(t) - return r + if search_by == "name": + d = self._get_lbry_file_by_uri(val) + elif search_by == "sd_hash": + d = self._get_lbry_file_by_sd_hash(val) + elif search_by == "file_name": + d = self._get_lbry_file_by_file_name(val) + d.addCallback(_log_get_lbry_file) + if return_json: + d.addCallback(_get_json_for_return) + return d + + def _get_lbry_files(self): + d = defer.DeferredList([self._get_lbry_file('sd_hash', l.sd_hash) for l in self.lbry_file_manager.lbry_files]) + return d + + def _log_to_slack(self, msg): + URL = "https://hooks.slack.com/services/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" + msg = platform.platform() + ": " + base58.b58encode(self.lbryid)[:20] + ", " + msg + requests.post(URL, json.dumps({"text": msg})) + return defer.succeed(None) def _render_response(self, result, code): return defer.succeed({'result': result, 'code': code}) @@ -959,20 +1167,26 @@ class LBRYDaemon(jsonrpc.JSONRPC): Args: None Returns: - 'status_message': startup status message - 'status_code': status_code - if status_code is 'loading_wallet', also contains key 'progress': blockchain catchup progress + 'message': startup status message + 'code': status_code + 'progress': progress, only used in loading_wallet + 'is_lagging': flag set to indicate lag, if set message will contain relevant message """ - r = {'code': self.startup_status[0], 'message': self.startup_status[1], 'progress': None, 'is_lagging': None} - if self.startup_status[0] == 'loading_wallet': - r['is_lagging'] = self.session.wallet.is_lagging - if r['is_lagging'] == True: - r['message'] = "Synchronization with the blockchain is lagging... if this continues try restarting LBRY" - else: - r['message'] = r['message'] % (str(self.session.wallet.blocks_behind_alert) + " blocks behind") - r['progress'] = self.session.wallet.catchup_progress + r = {'code': self.startup_status[0], 'message': self.startup_status[1], + 'progress': None, 'is_lagging': None, 'problem_code': None} + if self.connection_problem: + r['problem_code'] = self.connection_problem[0] + r['message'] = self.connection_problem[1] + r['is_lagging'] = True + elif self.startup_status[0] == LOADING_WALLET_CODE: + if self.session.wallet.blocks_behind_alert != 0: + r['message'] = r['message'] % (str(self.session.wallet.blocks_behind_alert) + " blocks behind") + r['progress'] = self.session.wallet.catchup_progress + else: + r['message'] = "Catching up with the blockchain" + r['progress'] = 0 log.info("[" + str(datetime.now()) + "] daemon status: " + str(r)) return self._render_response(r, OK_CODE) @@ -1056,11 +1270,12 @@ class LBRYDaemon(jsonrpc.JSONRPC): 'run_on_startup': bool, 'data_rate': float, 'max_key_fee': float, - 'default_download_directory': string, + 'download_directory': string, 'max_upload': float, 0.0 for unlimited 'max_download': float, 0.0 for unlimited 'upload_log': bool, 'search_timeout': float, + 'download_timeout': int 'max_search_results': int, 'wallet_type': string, 'delete_blobs_on_remove': bool, @@ -1081,27 +1296,20 @@ class LBRYDaemon(jsonrpc.JSONRPC): 'run_on_startup': bool, 'data_rate': float, 'max_key_fee': float, - 'default_download_directory': string, + 'download_directory': string, 'max_upload': float, 0.0 for unlimited 'max_download': float, 0.0 for unlimited 'upload_log': bool, - 'search_timeout': float, - 'max_search_results': int, - 'wallet_type': string, - 'delete_blobs_on_remove': bool, - 'peer_port': int, - 'dht_node_port': int, - 'use_upnp': bool, - 'start_lbrycrdd': bool, + 'download_timeout': int Returns: settings dict """ - def _log_settings_change(params): - log.info("[" + str(datetime.now()) + "] Set daemon settings to " + str(params)) + def _log_settings_change(): + log.info("[" + str(datetime.now()) + "] Set daemon settings to " + json.dumps(self.session_settings)) d = self._update_settings(p) - d.addCallback(lambda _: _log_settings_change(p)) + d.addCallback(lambda _: _log_settings_change()) d.addCallback(lambda _: self._render_response(self.session_settings, OK_CODE)) return d @@ -1130,48 +1338,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): else: return self._render_response(self.jsonrpc_help.__doc__, OK_CODE) - def jsonrpc_start_fetcher(self): - """ - Start automatically downloading new name claims as they occur (off by default) - - Args: - None - Returns: - confirmation message - """ - - self.fetcher.start() - log.info('[' + str(datetime.now()) + '] Start autofetcher') - # self._log_to_slack('[' + str(datetime.now()) + '] Start autofetcher') - return self._render_response("Started autofetching claims", OK_CODE) - - def jsonrpc_stop_fetcher(self): - """ - Stop automatically downloading new name claims as they occur - - Args: - None - Returns: - confirmation message - """ - - self.fetcher.stop() - log.info('[' + str(datetime.now()) + '] Stop autofetcher') - return self._render_response("Stopped autofetching claims", OK_CODE) - - def jsonrpc_fetcher_status(self): - """ - Get fetcher status - - Args: - None - Returns: - True/False - """ - - log.info("[" + str(datetime.now()) + "] Get fetcher status") - return self._render_response(self.fetcher.check_if_running(), OK_CODE) - def jsonrpc_get_balance(self): """ Get balance @@ -1200,7 +1366,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = self._shutdown() d.addCallback(lambda _: _disp_shutdown()) - d.addCallback(lambda _: reactor.callLater(1.0, reactor.stop)) + d.addCallback(lambda _: reactor.callLater(0.0, reactor.stop)) return self._render_response("Shutting down", OK_CODE) @@ -1221,11 +1387,43 @@ class LBRYDaemon(jsonrpc.JSONRPC): 'stream_name': string 'suggested_file_name': string 'upload_allowed': bool + 'sd_hash': string """ - r = self._get_lbry_files() - log.info("[" + str(datetime.now()) + "] Get LBRY files") - return self._render_response(r, OK_CODE) + d = self._get_lbry_files() + d.addCallback(lambda r: [d[1] for d in r]) + d.addCallback(lambda r: self._render_response(r, OK_CODE) if len(r) else self._render_response(False, OK_CODE)) + + return d + + def jsonrpc_get_lbry_file(self, p): + """ + Get lbry file + + Args: + 'name': get file by lbry uri, + 'sd_hash': get file by the hash in the name claim, + 'file_name': get file by its name in the downloads folder, + Returns: + 'completed': bool + 'file_name': string + 'key': hex string + 'points_paid': float + 'stopped': bool + 'stream_hash': base 58 string + 'stream_name': string + 'suggested_file_name': string + 'upload_allowed': bool + 'sd_hash': string + """ + + if p.keys()[0] in ['name', 'sd_hash', 'file_name']: + search_type = p.keys()[0] + d = self._get_lbry_file(search_type, p[search_type]) + else: + d = defer.fail() + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d def jsonrpc_resolve_name(self, p): """ @@ -1242,18 +1440,8 @@ class LBRYDaemon(jsonrpc.JSONRPC): else: return self._render_response(None, BAD_REQUEST) - def _disp(info): - stream_hash = info['stream_hash'] - if isinstance(stream_hash, dict): - stream_hash = stream_hash['sd_hash'] - - log.info("[" + str(datetime.now()) + "] Resolved info: " + stream_hash) - - return self._render_response(info, OK_CODE) - d = self._resolve_name(name) - d.addCallbacks(_disp, lambda _: server.failure) - d.callback(None) + d.addCallbacks(lambda info: self._render_response(info, OK_CODE), lambda _: server.failure) return d def jsonrpc_get(self, p): @@ -1262,62 +1450,104 @@ class LBRYDaemon(jsonrpc.JSONRPC): Args: 'name': name to download, string - optional 'download_directory': path to directory where file will be saved, string + 'download_directory': optional, path to directory where file will be saved, string + 'file_name': optional, a user specified name for the downloaded file + 'stream_info': optional, specified stream info overrides name Returns: 'stream_hash': hex string 'path': path of download """ if 'timeout' not in p.keys(): - timeout = DEFAULT_TIMEOUT + timeout = self.download_timeout else: timeout = p['timeout'] if 'download_directory' not in p.keys(): - download_directory = self.session_settings['default_download_directory'] + download_directory = self.download_directory else: download_directory = p['download_directory'] + if 'file_name' in p.keys(): + file_name = p['file_name'] + else: + file_name = None + + if 'stream_info' in p.keys(): + stream_info = p['stream_info'] + if 'sources' in stream_info.keys(): + sd_hash = stream_info['sources']['lbry_sd_hash'] + else: + sd_hash = stream_info['stream_hash'] + else: + stream_info = None + if 'name' in p.keys(): name = p['name'] - d = self._download_name(name=name, timeout=timeout, download_directory=download_directory) - d.addCallbacks(lambda message: self._render_response(message, OK_CODE), - lambda err: self._render_response('error', NOT_FOUND)) + if p['name'] not in self.waiting_on.keys(): + d = self._download_name(name=name, timeout=timeout, download_directory=download_directory, + stream_info=stream_info, file_name=file_name) + d.addCallback(lambda l: {'stream_hash': sd_hash, + 'path': os.path.join(self.download_directory, l.file_name)} + if stream_info else + {'stream_hash': l.sd_hash, + 'path': os.path.join(self.download_directory, l.file_name)}) + d.addCallback(lambda message: self._render_response(message, OK_CODE)) + else: + d = server.failure else: - d = self._render_response('error', BAD_REQUEST) + d = server.failure return d - # def jsonrpc_stop_lbry_file(self, p): - # params = Bunch(p) - # - # try: - # lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == params.stream_hash][0] - # except IndexError: - # return defer.fail(UnknownNameError) - # - # if not lbry_file.stopped: - # d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file) - # d.addCallback(lambda _: self._render_response("Stream has been stopped", OK_CODE)) - # d.addErrback(lambda err: self._render_response(err.getTraceback(), )) - # return d - # else: - # return json.dumps({'result': 'Stream was already stopped'}) - # - # def jsonrpc_start_lbry_file(self, p): - # params = Bunch(p) - # - # try: - # lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == params.stream_hash][0] - # except IndexError: - # return defer.fail(UnknownNameError) - # - # if lbry_file.stopped: - # d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file) - # d.callback(None) - # return json.dumps({'result': 'Stream started'}) - # else: - # return json.dumps({'result': 'Stream was already running'}) + def jsonrpc_stop_lbry_file(self, p): + """ + Stop lbry file + + Args: + 'name': stop file by lbry uri, + 'sd_hash': stop file by the hash in the name claim, + 'file_name': stop file by its name in the downloads folder, + Returns: + confirmation message + """ + + def _stop_file(f): + d = self.lbry_file_manager.toggle_lbry_file_running(f) + d.addCallback(lambda _: "Stopped LBRY file") + return d + + if p.keys()[0] in ['name', 'sd_hash', 'file_name']: + search_type = p.keys()[0] + d = self._get_lbry_file(search_type, p[search_type], return_json=False) + d.addCallback(lambda l: _stop_file(l) if not l.stopped else "LBRY file wasn't running") + + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + + def jsonrpc_start_lbry_file(self, p): + """ + Stop lbry file + + Args: + 'name': stop file by lbry uri, + 'sd_hash': stop file by the hash in the name claim, + 'file_name': stop file by its name in the downloads folder, + Returns: + confirmation message + """ + + def _start_file(f): + d = self.lbry_file_manager.toggle_lbry_file_running(f) + return defer.succeed("Started LBRY file") + + if p.keys()[0] in ['name', 'sd_hash', 'file_name']: + search_type = p.keys()[0] + d = self._get_lbry_file(search_type, p[search_type], return_json=False) + d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running") + + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d def jsonrpc_search_nametrie(self, p): """ @@ -1347,7 +1577,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): ds = [] for claim in claims: d1 = defer.succeed(claim) - d2 = self._resolve_name_wc(claim['name']) + d2 = self._resolve_name(claim['name']) d3 = self._get_est_cost(claim['name']) dl = defer.DeferredList([d1, d2, d3], consumeErrors=True) ds.append(dl) @@ -1364,6 +1594,8 @@ class LBRYDaemon(jsonrpc.JSONRPC): del r[1]['name'] t.update(r[1]) t['cost_est'] = r[2] + if not 'thumbnail' in t.keys(): + t['thumbnail'] = "img/Free-speech-flag.svg" consolidated_results.append(t) # log.info(str(t)) return consolidated_results @@ -1390,17 +1622,20 @@ class LBRYDaemon(jsonrpc.JSONRPC): confirmation message """ - def _disp(file_name): - log.info("[" + str(datetime.now()) + "] Deleted: " + file_name) - return self._render_response("Deleted: " + file_name, OK_CODE) - - if "file_name" in p.keys(): - lbry_files = [self._delete_lbry_file(f) for f in self.lbry_file_manager.lbry_files - if p['file_name'] == f.file_name] - d = defer.DeferredList(lbry_files) - d.addCallback(lambda _: _disp(p['file_name'])) + def _delete_file(f): + file_name = f.file_name + d = self._delete_lbry_file(f) + d.addCallback(lambda _: "Deleted LBRY file" + file_name) return d + if p.keys()[0] in ['name', 'sd_hash', 'file_name']: + search_type = p.keys()[0] + d = self._get_lbry_file(search_type, p[search_type], return_json=False) + d.addCallback(lambda l: _delete_file(l) if l else False) + + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + def jsonrpc_publish(self, p): """ Make a new name claim @@ -1497,6 +1732,50 @@ class LBRYDaemon(jsonrpc.JSONRPC): return d + def jsonrpc_get_transaction_history(self): + """ + Get transaction history + + Args: + None + Returns: + list of transactions + """ + + d = self.session.wallet.get_history() + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + + def jsonrpc_get_transaction(self, p): + """ + Get a decoded transaction from a txid + + Args: + txid: txid hex string + Returns: + JSON formatted transaction + """ + + + txid = p['txid'] + d = self.session.wallet.get_tx_json(txid) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + + def jsonrpc_get_public_key_from_wallet(self, p): + """ + Get public key from wallet address + + Args: + wallet: wallet address, base58 + Returns: + public key + """ + + wallet = p['wallet'] + d = self.session.wallet.get_pub_keys(wallet) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + def jsonrpc_get_time_behind_blockchain(self): """ Get number of blocks behind the blockchain @@ -1539,6 +1818,97 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda address: self._render_response(address, OK_CODE)) return d + def jsonrpc_send_amount_to_address(self, p): + """ + Send credits to an address + + Args: + amount: the amount to send + address: the address of the recipient + Returns: + True if payment successfully scheduled + """ + + if 'amount' in p.keys() and 'address' in p.keys(): + amount = p['amount'] + address = p['address'] + else: + return server.failure + + reserved_points = self.session.wallet.reserve_points(address, amount) + if reserved_points is None: + return defer.fail(InsufficientFundsError()) + d = self.session.wallet.send_points_to_address(reserved_points, amount) + d.addCallback(lambda _: self._render_response(True, OK_CODE)) + return d + + def jsonrpc_get_best_blockhash(self): + """ + Get hash of most recent block + + Args: + None + Returns: + Hash of most recent block + """ + + d = self.session.wallet.get_best_blockhash() + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + + def jsonrpc_get_block(self, p): + """ + Get contents of a block + + Args: + blockhash: hash of the block to look up + Returns: + requested block + """ + + if 'blockhash' in p.keys(): + blockhash = p['blockhash'] + else: + return server.failure + + d = self.session.wallet.get_block(blockhash) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + + def jsonrpc_get_claims_for_tx(self, p): + """ + Get claims for tx + + Args: + txid: txid of a name claim transaction + Returns: + any claims contained in the requested tx + """ + + if 'txid' in p.keys(): + txid = p['txid'] + else: + return server.failure + + d = self.session.wallet.get_claims_from_tx(txid) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + + def jsonrpc_get_nametrie(self): + """ + Get the nametrie + + Args: + None + Returns: + Name claim trie + """ + + d = self.session.wallet.get_nametrie() + d.addCallback(lambda r: [i for i in r if 'txid' in i.keys()]) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + # def jsonrpc_update_name(self, metadata): # def _disp(x): # print x @@ -1559,23 +1929,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): # # return d - def jsonrpc_toggle_fetcher_verbose(self): - """ - Toggle fetcher verbose mode - - Args: - None - Returns: - Fetcher verbose status, bool - """ - - if self.fetcher.verbose: - self.fetcher.verbose = False - else: - self.fetcher.verbose = True - - return self._render_response(self.fetcher.verbose, OK_CODE) - def jsonrpc_check_for_new_version(self): """ Checks local version against versions in __init__.py and version.py in the lbrynet and lbryum repos @@ -1586,8 +1939,6 @@ class LBRYDaemon(jsonrpc.JSONRPC): true/false, true meaning that there is a new version available """ - - def _check_version(): if (lbrynet_version >= self.git_lbrynet_version) and (lbryum_version >= self.git_lbryum_version): log.info("[" + str(datetime.now()) + "] Up to date") @@ -1599,69 +1950,40 @@ class LBRYDaemon(jsonrpc.JSONRPC): return _check_version() def jsonrpc_upload_log(self, p=None): + """ + Upload log + + Args, optional: + 'name_prefix': prefix to indicate what is requesting the log upload + 'exclude_previous': true/false, whether or not to exclude previous sessions from upload, defaults on true + Returns + True + """ + if p: if 'name_prefix' in p.keys(): prefix = p['name_prefix'] + '_api' else: prefix = None - if 'exclude_previous' in p.keys: + + if 'exclude_previous' in p.keys(): exclude_previous = p['exclude_previous'] else: exclude_previous = True + + if 'message' in p.keys(): + log.info("[" + str(datetime.now()) + "] Upload log message: " + str(p['message'])) + + if 'force' in p.keys(): + force = p['force'] + else: + force = False else: prefix = "api" exclude_previous = True - d = self._upload_log(name_prefix=prefix, exclude_previous=exclude_previous) + d = self._upload_log(name_prefix=prefix, exclude_previous=exclude_previous, force=force) + if 'message' in p.keys(): + d.addCallback(lambda _: self._log_to_slack(p['message'])) d.addCallback(lambda _: self._render_response(True, OK_CODE)) return d - - -class LBRYDaemonCommandHandler(object): - def __init__(self, command): - self._api = jsonrpc.Proxy(API_CONNECTION_STRING) - self.command = command - - def run(self, params=None): - if params: - d = self._api.callRemote(self.command, params) - else: - d = self._api.callRemote(self.command) - return d - - -class LBRYindex(resource.Resource): - def __init__(self, ui_dir): - resource.Resource.__init__(self) - self.ui_dir = ui_dir - - isLeaf = False - - def _delayed_render(self, request, results): - request.write(str(results)) - request.finish() - - def getChild(self, name, request): - if name == '': - return self - return resource.Resource.getChild(self, name, request) - - def render_GET(self, request): - return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request) - - -class LBRYFileRender(resource.Resource): - isLeaf = False - - def render_GET(self, request): - if 'name' in request.args.keys(): - api = jsonrpc.Proxy(API_CONNECTION_STRING) - if request.args['name'][0] != 'lbry': - d = api.callRemote("get", {'name': request.args['name'][0]}) - d.addCallback(lambda results: static.File(results['path']).render_GET(request)) - else: - request.redirect(UI_ADDRESS) - request.finish() - return server.NOT_DONE_YET - else: - return server.failure diff --git a/lbrynet/lbrynet_daemon/LBRYDaemonControl.py b/lbrynet/lbrynet_daemon/LBRYDaemonControl.py index d04133144..c9e9acf75 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemonControl.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemonControl.py @@ -1,25 +1,21 @@ import argparse import logging import logging.handlers -import subprocess import os -import shutil import webbrowser import sys import socket +import platform -from StringIO import StringIO -from zipfile import ZipFile -from urllib import urlopen -from datetime import datetime from appdirs import user_data_dir -from twisted.web import server, static +from twisted.web import server from twisted.internet import reactor, defer from jsonrpc.proxy import JSONRPCProxy -from lbrynet.lbrynet_daemon.LBRYDaemon import LBRYDaemon, LBRYindex, LBRYFileRender +from lbrynet.lbrynet_daemon.LBRYDaemonServer import LBRYDaemonServer from lbrynet.conf import API_CONNECTION_STRING, API_INTERFACE, API_ADDRESS, API_PORT, DEFAULT_WALLET, UI_ADDRESS + if sys.platform != "darwin": log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") else: @@ -30,10 +26,11 @@ if not os.path.isdir(log_dir): LOG_FILENAME = os.path.join(log_dir, 'lbrynet-daemon.log') log = logging.getLogger(__name__) -handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=262144, backupCount=5) +handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=2097152, backupCount=5) log.addHandler(handler) log.setLevel(logging.INFO) + REMOTE_SERVER = "www.google.com" @@ -48,9 +45,11 @@ def test_internet_connection(): def stop(): def _disp_shutdown(): + print "Shutting down lbrynet-daemon from command line" log.info("Shutting down lbrynet-daemon from command line") def _disp_not_running(): + print "Attempt to shut down lbrynet-daemon from command line when daemon isn't running" log.info("Attempt to shut down lbrynet-daemon from command line when daemon isn't running") d = defer.Deferred(None) @@ -67,18 +66,26 @@ def start(): default=DEFAULT_WALLET) parser.add_argument("--ui", help="path to custom UI folder", - default="") + default=None) parser.add_argument("--branch", - help="Branch of lbry-web-ui repo to use, defaults on HEAD", - default="HEAD") + help="Branch of lbry-web-ui repo to use, defaults on master", + default="master") parser.add_argument('--no-launch', dest='launchui', action="store_false") - parser.set_defaults(launchui=True) + parser.add_argument('--log-to-console', dest='logtoconsole', action="store_true") + parser.add_argument('--quiet', dest='quiet', action="store_true") + parser.set_defaults(launchui=True, logtoconsole=False, quiet=False) + args = parser.parse_args() + + if args.logtoconsole: + logging.basicConfig(level=logging.INFO) args = parser.parse_args() try: JSONRPCProxy.from_url(API_CONNECTION_STRING).is_running() log.info("lbrynet-daemon is already running") + if not args.logtoconsole: + print "lbrynet-daemon is already running" if args.launchui: webbrowser.open(UI_ADDRESS) return @@ -86,95 +93,31 @@ def start(): pass log.info("Starting lbrynet-daemon from command line") - print "Starting lbrynet-daemon from command line" - print "To view activity, view the log file here: " + LOG_FILENAME - print "Web UI is available at http://%s:%i" %(API_INTERFACE, API_PORT) - print "JSONRPC API is available at " + API_CONNECTION_STRING - print "To quit press ctrl-c or call 'stop' via the API" - if args.branch == "HEAD": - GIT_CMD_STRING = "git ls-remote https://github.com/lbryio/lbry-web-ui.git | grep %s | cut -f 1" % args.branch - DIST_URL = "https://raw.githubusercontent.com/lbryio/lbry-web-ui/master/dist.zip" - else: - log.info("Using UI branch: " + args.branch) - GIT_CMD_STRING = "git ls-remote https://github.com/lbryio/lbry-web-ui.git | grep refs/heads/%s | cut -f 1" % args.branch - DIST_URL = "https://raw.githubusercontent.com/lbryio/lbry-web-ui/%s/dist.zip" % args.branch - - def getui(ui_dir=None): - if ui_dir: - if os.path.isdir(ui_dir): - log.info("Using user specified UI directory: " + str(ui_dir)) - ui_version_info = "user-specified" - return defer.succeed([ui_dir, ui_version_info]) - else: - log.info("User specified UI directory doesn't exist: " + str(ui_dir)) - - def download_ui(dest_dir, ui_version): - url = urlopen(DIST_URL) - z = ZipFile(StringIO(url.read())) - names = [i for i in z.namelist() if '.DS_Store' not in i and '__MACOSX' not in i] - z.extractall(dest_dir, members=names) - return defer.succeed([dest_dir, ui_version]) - - data_dir = user_data_dir("LBRY") - version_dir = os.path.join(data_dir, "ui_version_history") - - git_version = subprocess.check_output(GIT_CMD_STRING, shell=True) - if not git_version: - log.info("You should have been notified to install xcode command line tools, once it's installed you can start LBRY") - print "You should have been notified to install xcode command line tools, once it's installed you can start LBRY" - sys.exit(0) - - ui_version_info = git_version - - if not os.path.isdir(data_dir): - os.mkdir(data_dir) - - if not os.path.isdir(os.path.join(data_dir, "ui_version_history")): - os.mkdir(version_dir) - - if not os.path.isfile(os.path.join(version_dir, git_version)): - f = open(os.path.join(version_dir, git_version), "w") - version_message = "[" + str(datetime.now()) + "] Updating UI --> " + git_version - f.write(version_message) - f.close() - log.info(version_message) - - if os.path.isdir(os.path.join(data_dir, "lbry-web-ui")): - shutil.rmtree(os.path.join(data_dir, "lbry-web-ui")) - else: - version_message = "[" + str(datetime.now()) + "] UI version " + git_version + " up to date" - log.info(version_message) - - if os.path.isdir(os.path.join(data_dir, "lbry-web-ui")): - return defer.succeed([os.path.join(data_dir, "lbry-web-ui"), ui_version_info]) - else: - return download_ui(os.path.join(data_dir, "lbry-web-ui"), ui_version_info) - - def setupserver(ui_dir, ui_version): - root = LBRYindex(ui_dir) - root.putChild("css", static.File(os.path.join(ui_dir, "css"))) - root.putChild("font", static.File(os.path.join(ui_dir, "font"))) - root.putChild("img", static.File(os.path.join(ui_dir, "img"))) - root.putChild("js", static.File(os.path.join(ui_dir, "js"))) - root.putChild("view", LBRYFileRender()) - return defer.succeed([root, ui_version]) - - def setupapi(root, wallet, ui_version): - daemon = LBRYDaemon(ui_version, wallet_type=wallet) - root.putChild(API_ADDRESS, daemon) - reactor.listenTCP(API_PORT, server.Site(root), interface=API_INTERFACE) - return daemon.setup() + if not args.logtoconsole and not args.quiet: + print "Starting lbrynet-daemon from command line" + print "To view activity, view the log file here: " + LOG_FILENAME + print "Web UI is available at http://%s:%i" % (API_INTERFACE, API_PORT) + print "JSONRPC API is available at " + API_CONNECTION_STRING + print "To quit press ctrl-c or call 'stop' via the API" if test_internet_connection(): - d = getui(args.ui) - d.addCallback(lambda r: setupserver(r[0], r[1])) - d.addCallback(lambda r: setupapi(r[0], args.wallet, r[1])) + lbry = LBRYDaemonServer() + + d = lbry.start(branch=args.branch, user_specified=args.ui) if args.launchui: d.addCallback(lambda _: webbrowser.open(UI_ADDRESS)) + + reactor.listenTCP(API_PORT, server.Site(lbry.root), interface=API_INTERFACE) reactor.run() - print "\nClosing lbrynet-daemon" + + if not args.logtoconsole and not args.quiet: + print "\nClosing lbrynet-daemon" else: log.info("Not connected to internet, unable to start") - print "Not connected to internet, unable to start" + if not args.logtoconsole: + print "Not connected to internet, unable to start" return + +if __name__ == "__main__": + start() \ No newline at end of file diff --git a/lbrynet/lbrynet_daemon/LBRYDaemonServer.py b/lbrynet/lbrynet_daemon/LBRYDaemonServer.py new file mode 100644 index 000000000..85494d21c --- /dev/null +++ b/lbrynet/lbrynet_daemon/LBRYDaemonServer.py @@ -0,0 +1,353 @@ +import logging +import os +import shutil +import json +import sys +import mimetypes + +from StringIO import StringIO +from zipfile import ZipFile +from urllib import urlopen +from datetime import datetime +from appdirs import user_data_dir +from twisted.web import server, static, resource +from twisted.internet import defer, interfaces, error, reactor, task, threads +from twisted.python.failure import Failure +from txjsonrpc.web import jsonrpc + +from zope.interface import implements + +from lbrynet.lbrynet_daemon.LBRYDaemon import LBRYDaemon +from lbrynet.conf import API_CONNECTION_STRING, API_ADDRESS, DEFAULT_WALLET, UI_ADDRESS + + +if sys.platform != "darwin": + data_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") +else: + data_dir = user_data_dir("LBRY") + +if not os.path.isdir(data_dir): + os.mkdir(data_dir) +version_dir = os.path.join(data_dir, "ui_version_history") +if not os.path.isdir(version_dir): + os.mkdir(version_dir) + +version_log = logging.getLogger("lbry_version") +version_log.addHandler(logging.FileHandler(os.path.join(version_dir, "lbry_version.log"))) +version_log.setLevel(logging.INFO) +log = logging.getLogger(__name__) +log.addHandler(logging.FileHandler(os.path.join(data_dir, 'lbrynet-daemon.log'))) +log.setLevel(logging.INFO) + + +class LBRYindex(resource.Resource): + def __init__(self, ui_dir): + resource.Resource.__init__(self) + self.ui_dir = ui_dir + + isLeaf = False + + def _delayed_render(self, request, results): + request.write(str(results)) + request.finish() + + def getChild(self, name, request): + if name == '': + return self + return resource.Resource.getChild(self, name, request) + + def render_GET(self, request): + return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request) + + +class LBRYFileStreamer(object): + """ + Writes downloaded LBRY file to request as the download comes in, pausing and resuming as requested + used for Chrome + """ + + implements(interfaces.IPushProducer) + + def __init__(self, request, path, start, stop, size): + self._request = request + self._fileObject = file(path) + self._content_type = mimetypes.guess_type(path)[0] + self._stop_pos = size - 1 if stop == '' else int(stop) #chrome and firefox send range requests for "0-" + self._cursor = self._start_pos = int(start) + self._file_size = size + self._depth = 0 + + self._paused = self._sent_bytes = self._stopped = False + self._delay = 0.25 + self._deferred = defer.succeed(None) + + self._request.setResponseCode(206) + self._request.setHeader('accept-ranges', 'bytes') + self._request.setHeader('content-type', self._content_type) + + self.resumeProducing() + + def pauseProducing(self): + self._paused = True + log.info("[" + str(datetime.now()) + "] Pausing producer") + return defer.succeed(None) + + def resumeProducing(self): + def _check_for_new_data(): + self._depth += 1 + self._fileObject.seek(self._start_pos, os.SEEK_END) + readable_bytes = self._fileObject.tell() + self._fileObject.seek(self._cursor) + + self._sent_bytes = False + + if (readable_bytes > self._cursor) and not (self._stopped or self._paused): + read_length = min(readable_bytes, self._stop_pos) - self._cursor + 1 + self._request.setHeader('content-range', 'bytes %s-%s/%s' % (self._cursor, self._cursor + read_length - 1, self._file_size)) + self._request.setHeader('content-length', str(read_length)) + start_cur = self._cursor + for i in range(read_length): + if self._paused or self._stopped: + break + else: + data = self._fileObject.read(1) + self._request.write(data) + self._cursor += 1 + + log.info("[" + str(datetime.now()) + "] Wrote range %s-%s/%s, length: %s, readable: %s, depth: %s" % + (start_cur, self._cursor, self._file_size, self._cursor - start_cur, readable_bytes, self._depth)) + self._sent_bytes = True + + if self._cursor == self._stop_pos + 1: + self.stopProducing() + return defer.succeed(None) + elif self._paused or self._stopped: + return defer.succeed(None) + else: + self._deferred.addCallback(lambda _: threads.deferToThread(reactor.callLater, self._delay, _check_for_new_data)) + return defer.succeed(None) + + log.info("[" + str(datetime.now()) + "] Resuming producer") + self._paused = False + self._deferred.addCallback(lambda _: _check_for_new_data()) + + def stopProducing(self): + log.info("[" + str(datetime.now()) + "] Stopping producer") + self._stopped = True + # self._fileObject.close() + self._deferred.addErrback(lambda err: err.trap(defer.CancelledError)) + self._deferred.addErrback(lambda err: err.trap(error.ConnectionDone)) + self._deferred.cancel() + # self._request.finish() + self._request.unregisterProducer() + return defer.succeed(None) + + +class HostedLBRYFile(resource.Resource): + def __init__(self, api): + self._api = api + self._producer = None + resource.Resource.__init__(self) + + def makeProducer(self, request, stream): + def _save_producer(producer): + self._producer = producer + return defer.succeed(None) + + range_header = request.getAllHeaders()['range'].replace('bytes=', '').split('-') + start, stop = int(range_header[0]), range_header[1] + log.info("[" + str(datetime.now()) + "] GET range %s-%s" % (start, stop)) + path = os.path.join(self._api.download_directory, stream.file_name) + + d = stream.get_total_bytes() + d.addCallback(lambda size: _save_producer(LBRYFileStreamer(request, path, start, stop, size))) + d.addCallback(lambda _: request.registerProducer(self._producer, streaming=True)) + # request.notifyFinish().addCallback(lambda _: self._producer.stopProducing()) + request.notifyFinish().addErrback(self._responseFailed, d) + return d + + def render_GET(self, request): + if 'name' in request.args.keys(): + if request.args['name'][0] != 'lbry' and request.args['name'][0] not in self._api.waiting_on.keys(): + d = self._api._download_name(request.args['name'][0]) + # d.addCallback(lambda stream: self.makeProducer(request, stream)) + d.addCallback(lambda stream: static.File(os.path.join(self._api.download_directory, + stream.file_name)).render_GET(request)) + + elif request.args['name'][0] in self._api.waiting_on.keys(): + request.redirect(UI_ADDRESS + "/?watch=" + request.args['name'][0]) + request.finish() + else: + request.redirect(UI_ADDRESS) + request.finish() + return server.NOT_DONE_YET + + def _responseFailed(self, err, call): + call.addErrback(lambda err: err.trap(error.ConnectionDone)) + call.addErrback(lambda err: err.trap(defer.CancelledError)) + call.addErrback(lambda err: log.info("Error: " + str(err))) + call.cancel() + + +class MyLBRYFiles(resource.Resource): + isLeaf = False + + def __init__(self): + resource.Resource.__init__(self) + self.files_table = None + + def delayed_render(self, request, result): + request.write(result.encode('utf-8')) + request.finish() + + def render_GET(self, request): + self.files_table = None + api = jsonrpc.Proxy(API_CONNECTION_STRING) + d = api.callRemote("get_lbry_files", {}) + d.addCallback(self._get_table) + d.addCallback(lambda results: self.delayed_render(request, results)) + + return server.NOT_DONE_YET + + def _get_table(self, files): + if not self.files_table: + self.files_table = r'My LBRY files' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + self.files_table += r'' + return self._get_table(files) + if not len(files): + self.files_table += r'
Stream nameCompletedToggleRemove
' + return self.files_table + else: + f = files.pop() + self.files_table += r'' + self.files_table += r'%s' % (f['stream_name']) + self.files_table += r'%s' % (f['completed']) + self.files_table += r'Start' if f['stopped'] else r'Stop' + self.files_table += r'Delete' + self.files_table += r'' + return self._get_table(files) + + +class LBRYDaemonServer(object): + def __init__(self): + self.data_dir = user_data_dir("LBRY") + if not os.path.isdir(self.data_dir): + os.mkdir(self.data_dir) + self.version_dir = os.path.join(self.data_dir, "ui_version_history") + if not os.path.isdir(self.version_dir): + os.mkdir(self.version_dir) + self.config = os.path.join(self.version_dir, "active.json") + self.ui_dir = os.path.join(self.data_dir, "lbry-web-ui") + self.git_version = None + self._api = None + self.root = None + + if not os.path.isfile(os.path.join(self.config)): + self.loaded_git_version = None + else: + try: + f = open(self.config, "r") + loaded_ui = json.loads(f.read()) + f.close() + self.loaded_git_version = loaded_ui['commit'] + self.loaded_branch = loaded_ui['branch'] + version_log.info("[" + str(datetime.now()) + "] Last used " + self.loaded_branch + " commit " + str(self.loaded_git_version).replace("\n", "")) + except: + self.loaded_git_version = None + self.loaded_branch = None + + def setup(self, branch="master", user_specified=None): + self.branch = branch + if user_specified: + if os.path.isdir(user_specified): + log.info("Using user specified UI directory: " + str(user_specified)) + self.branch = "user-specified" + self.loaded_git_version = "user-specified" + self.ui_dir = user_specified + return defer.succeed("user-specified") + else: + log.info("User specified UI directory doesn't exist, using " + branch) + else: + log.info("Using UI branch: " + branch) + self._git_url = "https://api.github.com/repos/lbryio/lbry-web-ui/git/refs/heads/%s" % branch + self._dist_url = "https://raw.githubusercontent.com/lbryio/lbry-web-ui/%s/dist.zip" % branch + + d = self._up_to_date() + d.addCallback(lambda r: self._download_ui() if not r else self.branch) + return d + + def _up_to_date(self): + def _get_git_info(): + response = urlopen(self._git_url) + data = json.loads(response.read()) + return defer.succeed(data['object']['sha']) + + def _set_git(version): + self.git_version = version + version_log.info("[" + str(datetime.now()) + "] UI branch " + self.branch + " has a most recent commit of: " + str(self.git_version).replace("\n", "")) + + if self.git_version == self.loaded_git_version and os.path.isdir(self.ui_dir): + version_log.info("[" + str(datetime.now()) + "] local copy of " + self.branch + " is up to date") + return defer.succeed(True) + else: + if self.git_version == self.loaded_git_version: + version_log.info("[" + str(datetime.now()) + "] Can't find ui files, downloading them again") + else: + version_log.info("[" + str(datetime.now()) + "] local copy of " + self.branch + " branch is out of date, updating") + f = open(self.config, "w") + f.write(json.dumps({'commit': self.git_version, + 'time': str(datetime.now()), + 'branch': self.branch})) + f.close() + return defer.succeed(False) + + d = _get_git_info() + d.addCallback(_set_git) + return d + + def _download_ui(self): + def _delete_ui_dir(): + if os.path.isdir(self.ui_dir): + if self.loaded_git_version: + version_log.info("[" + str(datetime.now()) + "] Removed ui files for commit " + str(self.loaded_git_version).replace("\n", "")) + log.info("Removing out of date ui files") + shutil.rmtree(self.ui_dir) + return defer.succeed(None) + + def _dl_ui(): + url = urlopen(self._dist_url) + z = ZipFile(StringIO(url.read())) + names = [i for i in z.namelist() if '.DS_exStore' not in i and '__MACOSX' not in i] + z.extractall(self.ui_dir, members=names) + version_log.info("[" + str(datetime.now()) + "] Updated branch " + self.branch + ": " + str(self.loaded_git_version).replace("\n", "") + " --> " + self.git_version.replace("\n", "")) + log.info("Downloaded files for UI commit " + str(self.git_version).replace("\n", "")) + self.loaded_git_version = self.git_version + return self.branch + + d = _delete_ui_dir() + d.addCallback(lambda _: _dl_ui()) + return d + + def _setup_server(self, ui_ver, wallet): + self._api = LBRYDaemon(ui_ver, wallet_type=wallet) + self.root = LBRYindex(self.ui_dir) + self.root.putChild("css", static.File(os.path.join(self.ui_dir, "css"))) + self.root.putChild("font", static.File(os.path.join(self.ui_dir, "font"))) + self.root.putChild("img", static.File(os.path.join(self.ui_dir, "img"))) + self.root.putChild("js", static.File(os.path.join(self.ui_dir, "js"))) + self.root.putChild("view", HostedLBRYFile(self._api)) + self.root.putChild("files", MyLBRYFiles()) + self.root.putChild(API_ADDRESS, self._api) + return defer.succeed(True) + + def start(self, branch="master", user_specified=False, wallet=DEFAULT_WALLET): + d = self.setup(branch=branch, user_specified=user_specified) + d.addCallback(lambda v: self._setup_server(v, wallet)) + d.addCallback(lambda _: self._api.setup()) + + return d diff --git a/lbrynet/lbrynet_daemon/LBRYDownloader.py b/lbrynet/lbrynet_daemon/LBRYDownloader.py index 5ebc4b59e..d62bdc3b1 100644 --- a/lbrynet/lbrynet_daemon/LBRYDownloader.py +++ b/lbrynet/lbrynet_daemon/LBRYDownloader.py @@ -1,7 +1,9 @@ import json import logging import os +import sys +from appdirs import user_data_dir from datetime import datetime from twisted.internet import defer from twisted.internet.task import LoopingCall @@ -12,12 +14,37 @@ from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory from lbrynet.conf import DEFAULT_TIMEOUT +INITIALIZING_CODE = 'initializing' +DOWNLOAD_METADATA_CODE = 'downloading_metadata' +DOWNLOAD_TIMEOUT_CODE = 'timeout' +DOWNLOAD_RUNNING_CODE = 'running' +DOWNLOAD_STOPPED_CODE = 'stopped' +STREAM_STAGES = [ + (INITIALIZING_CODE, 'Initializing...'), + (DOWNLOAD_METADATA_CODE, 'Downloading metadata'), + (DOWNLOAD_RUNNING_CODE, 'Started stream'), + (DOWNLOAD_STOPPED_CODE, 'Paused stream'), + (DOWNLOAD_TIMEOUT_CODE, 'Stream timed out') + ] + +if sys.platform != "darwin": + log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") +else: + log_dir = user_data_dir("LBRY") + +if not os.path.isdir(log_dir): + os.mkdir(log_dir) + +LOG_FILENAME = os.path.join(log_dir, 'lbrynet-daemon.log') log = logging.getLogger(__name__) +handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=2097152, backupCount=5) +log.addHandler(handler) +log.setLevel(logging.INFO) class GetStream(object): def __init__(self, sd_identifier, session, wallet, lbry_file_manager, max_key_fee, pay_key=True, data_rate=0.5, - timeout=DEFAULT_TIMEOUT, download_directory=None): + timeout=DEFAULT_TIMEOUT, download_directory=None, file_name=None): self.wallet = wallet self.resolved_name = None self.description = None @@ -26,6 +53,7 @@ class GetStream(object): self.data_rate = data_rate self.pay_key = pay_key self.name = None + self.file_name = file_name self.session = session self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager) self.lbry_file_manager = lbry_file_manager @@ -39,43 +67,45 @@ class GetStream(object): self.timeout_counter = 0 self.download_directory = download_directory self.download_path = None + self.downloader = None + self.finished = defer.Deferred() self.checker = LoopingCall(self.check_status) - + self.code = STREAM_STAGES[0] def check_status(self): self.timeout_counter += 1 - if self.download_path and os.path.isfile(self.download_path): + if self.download_path: self.checker.stop() - return defer.succeed(True) + self.finished.callback((self.stream_hash, self.download_path)) elif self.timeout_counter >= self.timeout: - log.info("Timeout downloading " + str(self.stream_info)) + log.info("Timeout downloading lbry://" + self.resolved_name + ", " + str(self.stream_info)) self.checker.stop() self.d.cancel() + self.code = STREAM_STAGES[4] + self.finished.callback(False) - def start(self, stream_info): + def start(self, stream_info, name): + self.resolved_name = name self.stream_info = stream_info if 'stream_hash' in self.stream_info.keys(): - self.description = self.stream_info['description'] - if 'key_fee' in self.stream_info.keys(): - self.key_fee = float(self.stream_info['key_fee']) - if 'key_fee_address' in self.stream_info.keys(): - self.key_fee_address = self.stream_info['key_fee_address'] - else: - self.key_fee_address = None - else: - self.key_fee = None - self.key_fee_address = None - self.stream_hash = self.stream_info['stream_hash'] - if isinstance(self.stream_hash, dict): - self.stream_hash = self.stream_hash['sd_hash'] - + elif 'sources' in self.stream_info.keys(): + self.stream_hash = self.stream_info['sources']['lbry_sd_hash'] else: - log.error("InvalidStreamInfoError in autofetcher: ", stream_info) raise InvalidStreamInfoError(self.stream_info) - + if 'description' in self.stream_info.keys(): + self.description = self.stream_info['description'] + if 'key_fee' in self.stream_info.keys(): + self.key_fee = float(self.stream_info['key_fee']) + if 'key_fee_address' in self.stream_info.keys(): + self.key_fee_address = self.stream_info['key_fee_address'] + else: + self.key_fee_address = None + else: + self.key_fee = None + self.key_fee_address = None if self.key_fee > self.max_key_fee: if self.pay_key: log.info("Key fee (" + str(self.key_fee) + ") above limit of " + str( @@ -84,21 +114,31 @@ class GetStream(object): else: pass + def _cause_timeout(): + self.timeout_counter = self.timeout * 2 + + def _set_status(x, status): + self.code = next(s for s in STREAM_STAGES if s[0] == status) + return x + self.checker.start(1) + self.d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager)) self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) - self.d.addCallback(lambda metadata: (next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)), metadata)) + self.d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) + self.d.addCallback(lambda metadata: ( + next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)), + metadata)) self.d.addCallback(lambda (factory, metadata): factory.make_downloader(metadata, [self.data_rate, True], self.payment_rate_manager, - download_directory=self.download_directory)) - self.d.addErrback(lambda err: err.trap(defer.CancelledError)) - self.d.addErrback(lambda err: log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())) - self.d.addCallback(self._start_download) + download_directory=self.download_directory, + file_name=self.file_name)) + self.d.addCallbacks(self._start_download, lambda _: _cause_timeout()) self.d.callback(None) - return self.d + return self.finished def _start_download(self, downloader): def _pay_key_fee(): @@ -114,126 +154,7 @@ class GetStream(object): d = _pay_key_fee() else: d = defer.Deferred() - - downloader.start() - + self.downloader = downloader self.download_path = os.path.join(downloader.download_directory, downloader.file_name) - d.addCallback(lambda _: log.info("Downloading " + str(self.stream_hash) + " --> " + str(self.download_path))) - - return d - - -class FetcherDaemon(object): - def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf, - verbose=False): - self.autofetcher_conf = autofetcher_conf - self.max_key_fee = 0.0 - self.sd_identifier = sd_identifier - self.wallet = wallet - self.session = session - self.lbry_file_manager = lbry_file_manager - self.lbry_metadata_manager = lbry_file_metadata_manager - self.seen = [] - self.lastbestblock = None - self.search = None - self.first_run = True - self.is_running = False - self.verbose = verbose - self._get_autofetcher_conf() - - def start(self): - if not self.is_running: - self.is_running = True - self.search = LoopingCall(self._looped_search) - self.search.start(1) - log.info("Starting autofetcher") - else: - log.info("Autofetcher is already running") - - def stop(self): - if self.is_running: - self.search.stop() - self.is_running = False - else: - log.info("Autofetcher isn't running, there's nothing to stop") - - def check_if_running(self): - if self.is_running: - msg = "Autofetcher is running\n" - msg += "Last block hash: " + str(self.lastbestblock) - else: - msg = "Autofetcher is not running" - return msg - - def _get_names(self): - d = self.wallet.get_best_blockhash() - d.addCallback(lambda blockhash: get_new_streams(blockhash) if blockhash != self.lastbestblock else []) - - def get_new_streams(blockhash): - self.lastbestblock = blockhash - d = self.wallet.get_block(blockhash) - d.addCallback(lambda block: get_new_streams_in_txes(block['tx'], blockhash)) - return d - - def get_new_streams_in_txes(txids, blockhash): - ds = [] - for t in txids: - d = self.wallet.get_claims_from_tx(t) - d.addCallback(get_new_streams_in_tx, t, blockhash) - ds.append(d) - d = defer.DeferredList(ds, consumeErrors=True) - d.addCallback(lambda result: [r[1] for r in result if r[0]]) - d.addCallback(lambda stream_lists: [stream for streams in stream_lists for stream in streams]) - return d - - def get_new_streams_in_tx(claims, t, blockhash): - rtn = [] - if claims: - for claim in claims: - if claim not in self.seen: - msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \ - " | stream hash: " + str(json.loads(claim['value'])['stream_hash']) - log.info(msg) - if self.verbose: - print msg - rtn.append((claim['name'], t)) - self.seen.append(claim) - else: - if self.verbose: - print "[" + str(datetime.now()) + "] No claims in block", blockhash - return rtn - - d.addCallback(lambda streams: defer.DeferredList( - [self.wallet.get_stream_info_from_txid(name, t) for name, t in streams])) - return d - - def _download_claims(self, claims): - if claims: - for claim in claims: - stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager, - self.max_key_fee, pay_key=False) - stream.start(claim[1]) - - return defer.succeed(None) - - def _looped_search(self): - d = self._get_names() - d.addCallback(self._download_claims) - return d - - def _get_autofetcher_conf(self): - settings = {"maxkey": "0.0"} - if os.path.exists(self.autofetcher_conf): - conf = open(self.autofetcher_conf) - for l in conf: - if l.startswith("maxkey="): - settings["maxkey"] = float(l[7:].rstrip('\n')) - conf.close() - else: - conf = open(self.autofetcher_conf, "w") - conf.write("maxkey=10.0") - conf.close() - settings["maxkey"] = 10.0 - log.info("No autofetcher conf file found, making one with max key fee of 10.0") - - self.max_key_fee = settings["maxkey"] + d.addCallback(lambda _: log.info("[" + str(datetime.now()) + "] Downloading " + str(self.stream_hash) + " --> " + str(self.download_path))) + d.addCallback(lambda _: self.downloader.start()) diff --git a/lbrynet/lbrynet_daemon/LBRYPublisher.py b/lbrynet/lbrynet_daemon/LBRYPublisher.py index eeb8bbed3..e68a06498 100644 --- a/lbrynet/lbrynet_daemon/LBRYPublisher.py +++ b/lbrynet/lbrynet_daemon/LBRYPublisher.py @@ -1,15 +1,30 @@ +import logging +import os +import sys + +from appdirs import user_data_dir +from datetime import datetime + from lbrynet.core.Error import InsufficientFundsError from lbrynet.lbryfilemanager.LBRYFileCreator import create_lbry_file from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader from twisted.internet import threads, defer -import os -import logging -from datetime import datetime +if sys.platform != "darwin": + log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") +else: + log_dir = user_data_dir("LBRY") +if not os.path.isdir(log_dir): + os.mkdir(log_dir) + +LOG_FILENAME = os.path.join(log_dir, 'lbrynet-daemon.log') log = logging.getLogger(__name__) +handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=2097152, backupCount=5) +log.addHandler(handler) +log.setLevel(logging.INFO) class Publisher(object): diff --git a/packaging/ubuntu/lbry b/packaging/ubuntu/lbry index 15cf9d171..d24574f32 100755 --- a/packaging/ubuntu/lbry +++ b/packaging/ubuntu/lbry @@ -28,7 +28,12 @@ ARG=${1:-} if [ -z "$ARG" ]; then URL="" else - URL="view?name=$(urlencode "$(echo "$ARG" | cut -c 8-)")" + NAME=$(echo "$ARG" | cut -c 8-) + if [ -z "$NAME" -o "$NAME" == "lbry" ]; then + URL="" + else + URL="/?watch=$(urlencode "$NAME")" + fi fi -/usr/bin/xdg-open "http://localhost:5279/$URL" +/usr/bin/xdg-open "http://localhost:5279$URL"