import logging from lbrynet.core.Session import LBRYSession import os.path import argparse from yapsy.PluginManager import PluginManager from twisted.internet import defer, threads, stdio, task from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings from lbrynet.lbryfilemanager.LBRYFileManager import LBRYFileManager from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE # , MIN_BLOB_INFO_PAYMENT_RATE from lbrynet.core.utils import generate_id from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.PTCWallet import PTCWallet from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileOpenerFactory from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType from lbrynet.lbryfile.LBRYFileMetadataManager import DBLBRYFileMetadataManager, TempLBRYFileMetadataManager #from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager from lbrynet.lbrynet_console.ControlHandlers import ApplicationStatusFactory, GetWalletBalancesFactory, ShutDownFactory from lbrynet.lbrynet_console.ControlHandlers import LBRYFileStatusFactory, DeleteLBRYFileChooserFactory from lbrynet.lbrynet_console.ControlHandlers import ToggleLBRYFileRunningChooserFactory from lbrynet.lbrynet_console.ControlHandlers import ModifyApplicationDefaultsFactory from lbrynet.lbrynet_console.ControlHandlers import CreateLBRYFileFactory, PublishStreamDescriptorChooserFactory from lbrynet.lbrynet_console.ControlHandlers import ShowPublishedSDHashesChooserFactory from lbrynet.lbrynet_console.ControlHandlers import CreatePlainStreamDescriptorChooserFactory from lbrynet.lbrynet_console.ControlHandlers import ShowLBRYFileStreamHashChooserFactory, AddStreamFromHashFactory from lbrynet.lbrynet_console.ControlHandlers import AddStreamFromSDFactory, AddStreamFromLBRYcrdNameFactory from lbrynet.lbrynet_console.ControlHandlers import ClaimNameFactory, GetNewWalletAddressFactory from lbrynet.lbrynet_console.ControlHandlers import ShowServerStatusFactory, ModifyServerSettingsFactory from lbrynet.lbrynet_console.ControlHandlers import ModifyLBRYFileOptionsChooserFactory from lbrynet.lbrynet_console.ControlHandlers import PeerStatsAndSettingsChooserFactory from lbrynet.core.LBRYcrdWallet import LBRYcrdWallet log = logging.getLogger(__name__) class LBRYConsole(): """A class which can upload and download file streams to and from the network""" def __init__(self, peer_port, dht_node_port, known_dht_nodes, control_class, wallet_type, lbrycrd_conf, use_upnp, data_dir, created_data_dir): """ @param peer_port: the network port on which to listen for peers @param dht_node_port: the network port on which to listen for dht node requests @param known_dht_nodes: a list of (ip_address, dht_port) which will be used to join the DHT network """ self.peer_port = peer_port self.dht_node_port = dht_node_port self.known_dht_nodes = known_dht_nodes self.wallet_type = wallet_type self.lbrycrd_conf = lbrycrd_conf self.use_upnp = use_upnp self.lbry_server_port = None self.control_class = control_class self.session = None self.lbry_file_metadata_manager = None self.lbry_file_manager = None self.db_dir = data_dir self.current_db_revision = 1 self.blobfile_dir = os.path.join(self.db_dir, "blobfiles") self.created_data_dir = created_data_dir self.plugin_manager = PluginManager() self.plugin_manager.setPluginPlaces([ os.path.join(self.db_dir, "plugins"), os.path.join(os.path.dirname(__file__), "plugins"), ]) self.control_handlers = [] self.query_handlers = {} self.settings = LBRYSettings(self.db_dir) self.blob_request_payment_rate_manager = None self.lbryid = None self.sd_identifier = StreamDescriptorIdentifier() self.plugin_objects = [] self.db_migration_revisions = None def start(self): """Initialize the session and restore everything to its saved state""" d = threads.deferToThread(self._setup_data_directory) d.addCallback(lambda _: self._check_db_migration()) d.addCallback(lambda _: self._get_settings()) d.addCallback(lambda _: self._get_session()) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self._setup_lbry_file_manager()) d.addCallback(lambda _: self._setup_lbry_file_opener()) d.addCallback(lambda _: self._setup_control_handlers()) d.addCallback(lambda _: self._setup_query_handlers()) d.addCallback(lambda _: self._load_plugins()) d.addCallback(lambda _: self._setup_server()) d.addCallback(lambda _: self._start_controller()) d.addErrback(self._show_start_error) return d def _show_start_error(self, error): print error.getErrorMessage() return error def shut_down(self): """Stop the session, all currently running streams, and stop the server""" d = self._shut_down() if self.session is not None: d.addCallback(lambda _: self.session.shut_down()) return d def add_control_handlers(self, control_handlers): for control_handler in control_handlers: self.control_handlers.append(control_handler) def add_query_handlers(self, query_handlers): def _set_query_handlers(statuses): from future_builtins import zip for handler, (success, status) in zip(query_handlers, statuses): if success is True: self.query_handlers[handler] = status ds = [] for handler in query_handlers: ds.append(self.settings.get_query_handler_status(handler.get_primary_query_identifier())) dl = defer.DeferredList(ds) dl.addCallback(_set_query_handlers) return dl def _setup_data_directory(self): if self.created_data_dir: db_revision = open(os.path.join(self.db_dir, "db_revision"), mode='w') db_revision.write(str(self.current_db_revision)) db_revision.close() log.debug("Created the db revision file: %s", str(os.path.join(self.db_dir, "db_revision"))) if not os.path.exists(self.blobfile_dir): os.mkdir(self.blobfile_dir) log.debug("Created the blobfile directory: %s", str(self.blobfile_dir)) def _check_db_migration(self): old_revision = 0 db_revision_file = os.path.join(self.db_dir, "db_revision") if os.path.exists(db_revision_file): old_revision = int(open(db_revision_file).read().strip()) if old_revision < self.current_db_revision: from lbrynet.db_migrator import dbmigrator print "Upgrading your databases..." d = threads.deferToThread(dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision) def print_success(old_dirs): success_string = "Finished upgrading the databases. It is now safe to delete the" success_string += " following directories, if you feel like it. It won't make any" success_string += " difference.\nAnyway here they are: " for i, old_dir in enumerate(old_dirs): success_string += old_dir if i + 1 < len(old_dir): success_string += ", " print success_string d.addCallback(print_success) return d return defer.succeed(True) def _get_settings(self): d = self.settings.start() d.addCallback(lambda _: self.settings.get_lbryid()) d.addCallback(self.set_lbryid) return d def set_lbryid(self, lbryid): if lbryid is None: return self._make_lbryid() else: self.lbryid = lbryid def _make_lbryid(self): self.lbryid = generate_id() d = self.settings.save_lbryid(self.lbryid) return d def _get_lbrycrd_settings(self): settings = {"username": "rpcuser", "password": "rpcpassword", "rpc_port": 8332} if os.path.exists(self.lbrycrd_conf): lbrycrd_conf = open(self.lbrycrd_conf) for l in lbrycrd_conf: if l.startswith("rpcuser="): settings["username"] = l[8:-1] if l.startswith("rpcpassword="): settings["password"] = l[12:-1] if l.startswith("rpcport="): settings["rpc_port"] = int(l[8:-1]) return settings def _get_session(self): def get_default_data_rate(): d = self.settings.get_default_data_payment_rate() d.addCallback(lambda rate: {"default_data_payment_rate": rate if rate is not None else MIN_BLOB_DATA_PAYMENT_RATE}) return d def create_lbrycrd_wallet(lbrycrd_options): return LBRYcrdWallet(lbrycrd_options['username'], lbrycrd_options['password'], "127.0.0.1", lbrycrd_options['rpc_port']) def get_wallet(): if self.wallet_type == "lbrycrd": d = threads.deferToThread(self._get_lbrycrd_settings) d.addCallback(create_lbrycrd_wallet) else: d = defer.succeed(PTCWallet(self.db_dir)) d.addCallback(lambda wallet: {"wallet": wallet}) return d d1 = get_default_data_rate() d2 = get_wallet() def combine_results(results): r = {} for success, result in results: if success is True: r.update(result) return r def create_session(results): self.session = LBRYSession(results['default_data_payment_rate'], db_dir=self.db_dir, lbryid=self.lbryid, blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port, known_dht_nodes=self.known_dht_nodes, peer_port=self.peer_port, use_upnp=self.use_upnp, wallet=results['wallet']) dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) dl.addCallback(combine_results) dl.addCallback(create_session) dl.addCallback(lambda _: self.session.setup()) return dl def _setup_lbry_file_manager(self): self.lbry_file_metadata_manager = DBLBRYFileMetadataManager(self.db_dir) d = self.lbry_file_metadata_manager.setup() def set_lbry_file_manager(): self.lbry_file_manager = LBRYFileManager(self.session, self.lbry_file_metadata_manager, self.sd_identifier) return self.lbry_file_manager.setup() d.addCallback(lambda _: set_lbry_file_manager()) return d def _setup_lbry_file_opener(self): stream_info_manager = TempLBRYFileMetadataManager() downloader_factory = LBRYFileOpenerFactory(self.session.peer_finder, self.session.rate_limiter, self.session.blob_manager, stream_info_manager, self.session.wallet) self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory) return defer.succeed(True) def _setup_control_handlers(self): handlers = [ ('General', ApplicationStatusFactory(self.session.rate_limiter, self.session.dht_node)), ('General', GetWalletBalancesFactory(self.session.wallet)), ('General', ModifyApplicationDefaultsFactory(self)), ('General', ShutDownFactory(self)), ('General', PeerStatsAndSettingsChooserFactory(self.session.peer_manager)), ('lbryfile', LBRYFileStatusFactory(self.lbry_file_manager)), ('Stream Downloading', AddStreamFromSDFactory(self.sd_identifier, self.session.base_payment_rate_manager)), ('lbryfile', DeleteLBRYFileChooserFactory(self.lbry_file_metadata_manager, self.session.blob_manager, self.lbry_file_manager)), ('lbryfile', ToggleLBRYFileRunningChooserFactory(self.lbry_file_manager)), ('lbryfile', CreateLBRYFileFactory(self.session, self.lbry_file_manager)), ('lbryfile', PublishStreamDescriptorChooserFactory(self.lbry_file_metadata_manager, self.session.blob_manager, self.lbry_file_manager)), ('lbryfile', ShowPublishedSDHashesChooserFactory(self.lbry_file_metadata_manager, self.lbry_file_manager)), ('lbryfile', CreatePlainStreamDescriptorChooserFactory(self.lbry_file_manager)), ('lbryfile', ShowLBRYFileStreamHashChooserFactory(self.lbry_file_manager)), ('lbryfile', ModifyLBRYFileOptionsChooserFactory(self.lbry_file_manager)), ('Stream Downloading', AddStreamFromHashFactory(self.sd_identifier, self.session)) ] self.add_control_handlers(handlers) if self.wallet_type == 'lbrycrd': lbrycrd_handlers = [ ('Stream Downloading', AddStreamFromLBRYcrdNameFactory(self.sd_identifier, self.session, self.session.wallet)), ('General', ClaimNameFactory(self.session. wallet, self.lbry_file_manager, self.session.blob_manager, self.sd_identifier)), ('General', GetNewWalletAddressFactory(self.session.wallet)) ] self.add_control_handlers(lbrycrd_handlers) if self.peer_port is not None: server_handlers = [ ('Server', ShowServerStatusFactory(self)), ('Server', ModifyServerSettingsFactory(self)), ] self.add_control_handlers(server_handlers) def _setup_query_handlers(self): handlers = [ #CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, # self._server_payment_rate_manager), BlobAvailabilityHandlerFactory(self.session.blob_manager), #BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, # self._server_payment_rate_manager), self.session.wallet.get_wallet_info_query_handler_factory(), ] def get_blob_request_handler_factory(rate): self.blob_request_payment_rate_manager = PaymentRateManager( self.session.base_payment_rate_manager, rate ) handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, self.blob_request_payment_rate_manager)) d1 = self.settings.get_server_data_payment_rate() d1.addCallback(get_blob_request_handler_factory) dl = defer.DeferredList([d1]) dl.addCallback(lambda _: self.add_query_handlers(handlers)) return dl def _load_plugins(self): d = threads.deferToThread(self.plugin_manager.collectPlugins) def setup_plugins(): ds = [] for plugin in self.plugin_manager.getAllPlugins(): self.plugin_objects.append(plugin.plugin_object) ds.append(plugin.plugin_object.setup(self)) return defer.DeferredList(ds) d.addCallback(lambda _: setup_plugins()) return d def _stop_plugins(self): ds = [] for plugin_object in self.plugin_objects: ds.append(defer.maybeDeferred(plugin_object.stop)) return defer.DeferredList(ds) def _setup_server(self): def restore_running_status(running): if running is True: return self.start_server() return defer.succeed(True) dl = self.settings.get_server_running_status() dl.addCallback(restore_running_status) return dl def start_server(self): if self.peer_port is not None: server_factory = ServerProtocolFactory(self.session.rate_limiter, self.query_handlers, self.session.peer_manager) from twisted.internet import reactor self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory) return defer.succeed(True) def stop_server(self): if self.lbry_server_port is not None: self.lbry_server_port, p = None, self.lbry_server_port return defer.maybeDeferred(p.stopListening) else: return defer.succeed(True) def _start_controller(self): self.control_class(self.control_handlers) return defer.succeed(True) def _shut_down(self): self.plugin_manager = None ds = [] if self.lbry_file_metadata_manager is not None: d = self.lbry_file_metadata_manager.stop() d.addCallback(lambda _: self.lbry_file_manager.stop()) ds.append(d) ds.append(self.stop_server()) ds.append(self._stop_plugins()) dl = defer.DeferredList(ds) return dl class StdIOControl(): def __init__(self, control_handlers): stdio.StandardIO(ConsoleControl(control_handlers)) def launch_lbry_console(): from twisted.internet import reactor parser = argparse.ArgumentParser(description="Launch a lbrynet console") parser.add_argument("--no_listen_peer", help="Don't listen for incoming data connections.", action="store_true") parser.add_argument("--peer_port", help="The port on which the console will listen for incoming data connections.", type=int, default=3333) parser.add_argument("--no_listen_dht", help="Don't listen for incoming DHT connections.", action="store_true") parser.add_argument("--dht_node_port", help="The port on which the console will listen for DHT connections.", type=int, default=4444) parser.add_argument("--wallet_type", help="Either 'lbrycrd' or 'ptc'.", type=str, default="lbrycrd") parser.add_argument("--lbrycrd_wallet_conf", help="The configuration file for the LBRYcrd wallet. Default: ~/.lbrycrd", type=str) parser.add_argument("--no_dht_bootstrap", help="Don't try to connect to the DHT", action="store_true") parser.add_argument("--dht_bootstrap_host", help="The hostname of a known DHT node, to be used to bootstrap into the DHT. " "Must be used with --dht_bootstrap_port", type=str, default='104.236.42.182') parser.add_argument("--dht_bootstrap_port", help="The port of a known DHT node, to be used to bootstrap into the DHT. Must " "be used with --dht_bootstrap_host", type=int, default=4000) parser.add_argument("--use_upnp", help="Try to use UPnP to enable incoming connections through the firewall", action="store_true") parser.add_argument("--data_dir", help=("The full path to the directory in which lbrynet data and metadata will be stored. " "Default: ~/.lbrynet"), type=str) args = parser.parse_args() if args.no_dht_bootstrap: bootstrap_nodes = [] else: bootstrap_nodes = [(args.dht_bootstrap_host, args.dht_bootstrap_port)] if args.no_listen_peer: peer_port = None else: peer_port = args.peer_port if args.no_listen_dht: dht_node_port = None else: dht_node_port = args.dht_node_port created_data_dir = False if not args.data_dir: data_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") else: data_dir = args.data_dir if not os.path.exists(data_dir): os.mkdir(data_dir) created_data_dir = True if args.lbrycrd_wallet_conf: lbrycrd_conf = args.lbrycrd_wallet_conf else: lbrycrd_conf = os.path.join(os.path.expanduser("~"), ".lbrycrd", "lbrycrd.conf") log_format = "(%(asctime)s)[%(filename)s:%(lineno)s] %(funcName)s(): %(message)s" formatter = logging.Formatter(log_format) logger = logging.getLogger() logger.setLevel(logging.DEBUG) file_handler = logging.FileHandler(os.path.join(data_dir, "console.log")) file_handler.setFormatter(formatter) file_handler.addFilter(logging.Filter("lbrynet")) logger.addHandler(file_handler) console = LBRYConsole(peer_port, dht_node_port, bootstrap_nodes, StdIOControl, wallet_type=args.wallet_type, lbrycrd_conf=lbrycrd_conf, use_upnp=args.use_upnp, data_dir=data_dir, created_data_dir=created_data_dir) d = task.deferLater(reactor, 0, console.start) d.addErrback(lambda _: reactor.stop()) reactor.addSystemEventTrigger('before', 'shutdown', console.shut_down) reactor.run()