diff --git a/lbrynet/core/LBRYcrdWallet.py b/lbrynet/core/LBRYcrdWallet.py index 872164501..3db07f1b3 100644 --- a/lbrynet/core/LBRYcrdWallet.py +++ b/lbrynet/core/LBRYcrdWallet.py @@ -317,6 +317,47 @@ class LBRYcrdWallet(object): d.addCallback(_save_metadata) return d + def abandon_name(self, txid): + address = self._get_new_address() + raw = self._get_raw_tx(txid) + transaction = self._get_decoded_tx(raw) + amount = float(transaction['vout'][1]['value']) + return self._abandon_name(txid, address, amount) + + def get_tx(self, txid): + raw = self._get_raw_tx(txid) + return self._get_decoded_tx(raw) + + def get_name_claims(self): + return threads.deferToThread(self._get_name_claims) + + def start_miner(self): + if not self._get_gen_status(): + return self._set_gen_status(True) + else: + return "Miner was already running" + + def stop_miner(self): + if self._get_gen_status(): + return self._set_gen_status(False) + else: + return "Miner wasn't running" + + def get_miner_status(self): + return self._get_gen_status() + + def get_block(self, blockhash): + return self._get_block(blockhash) + + def get_blockchain_info(self): + return self._get_blockchain_info() + + def get_claims_for_tx(self, txid): + return self._get_claims_for_tx(txid) + + def get_nametrie(self): + return self._get_nametrie() + def get_name_and_validity_for_sd_hash(self, sd_hash): d = self._get_claim_metadata_for_sd_hash(sd_hash) d.addCallback(lambda name_txid: threads.deferToThread(self._get_status_of_claim, name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None) @@ -355,9 +396,6 @@ class LBRYcrdWallet(object): def _get_rpc_conn(self): return AuthServiceProxy(self.rpc_conn_string) - def get_rpc_conn_x(self): - return AuthServiceProxy(self.rpc_conn_string) - def _start_daemon(self): tries = 0 @@ -491,6 +529,60 @@ class LBRYcrdWallet(object): rpc_conn = self._get_rpc_conn() return rpc_conn.getinfo() + @_catch_connection_error + def _get_name_claims(self): + rpc_conn = self._get_rpc_conn() + return rpc_conn.listnameclaims() + + @_catch_connection_error + def _get_gen_status(self): + rpc_conn = self._get_rpc_conn() + return rpc_conn.getgenerate() + + @_catch_connection_error + def _set_gen_status(self, b): + if b: + log.info("Starting miner") + else: + log.info("Stopping miner") + rpc_conn = self._get_rpc_conn() + return rpc_conn.setgenerate(b) + + @_catch_connection_error + def _get_raw_tx(self, txid): + rpc_conn = self._get_rpc_conn() + return rpc_conn.getrawtransaction(txid) + + @_catch_connection_error + def _get_decoded_tx(self, raw): + rpc_conn = self._get_rpc_conn() + return rpc_conn.decoderawtransaction(raw) + + @_catch_connection_error + def _abandon_name(self, txid, address, amount): + rpc_conn = self._get_rpc_conn() + return rpc_conn.abandonname(txid, address, amount) + + @_catch_connection_error + def _get_blockchain_info(self): + rpc_conn = self._get_rpc_conn() + return rpc_conn.getblockchaininfo() + + @_catch_connection_error + def _get_block(self, blockhash): + rpc_conn = self._get_rpc_conn() + return rpc_conn.getblock(blockhash) + + @_catch_connection_error + def _get_claims_for_tx(self, txid): + rpc_conn = self._get_rpc_conn() + return rpc_conn.getclaimsfortx(txid) + + @_catch_connection_error + def _get_nametrie(self): + rpc_conn = self._get_rpc_conn() + return rpc_conn.getnametrie() + @_catch_connection_error def _get_wallet_balance(self): rpc_conn = self._get_rpc_conn() diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 3e9e6aaf6..f43b9847f 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -1,3 +1,8 @@ +from lbrynet.core.PaymentRateManager import PaymentRateManager +from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory +from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory +from lbrynet.core.server.ServerProtocol import ServerProtocolFactory +from lbrynet.lbrynet_console.ControlHandlers import get_time_behind_blockchain from lbrynet.core.Error import UnknownNameError from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory @@ -14,7 +19,7 @@ from lbrynet.core.LBRYcrdWallet import LBRYcrdWallet from lbrynet.lbryfilemanager.LBRYFileManager import LBRYFileManager from lbrynet.lbryfile.LBRYFileMetadataManager import DBLBRYFileMetadataManager, TempLBRYFileMetadataManager from twisted.web import xmlrpc, server -from twisted.internet import defer, threads, reactor +from twisted.internet import defer, threads, reactor, error from datetime import datetime import logging import os @@ -22,6 +27,7 @@ import sys import json import binascii import webbrowser +from decimal import Decimal log = logging.getLogger(__name__) @@ -86,7 +92,6 @@ class LBRYDaemon(xmlrpc.XMLRPC): self.wallet_type = "lbrycrd" self.lbrycrd_conf = os.path.join(self.wallet_dir, "lbrycrd.conf") self.autofetcher_conf = os.path.join(self.wallet_dir, "autofetcher.conf") - self.rpc_conn = None self.files = [] self.created_data_dir = False if not os.path.exists(self.db_dir): @@ -95,6 +100,8 @@ class LBRYDaemon(xmlrpc.XMLRPC): self.session_settings = None self.data_rate = 0.5 self.max_key_fee = 100.0 + self.query_handlers = {} + return defer.succeed(None) def _disp_startup(): @@ -113,18 +120,91 @@ class LBRYDaemon(xmlrpc.XMLRPC): d.addCallback(lambda _: self._setup_stream_identifier()) d.addCallback(lambda _: self._setup_lbry_file_manager()) d.addCallback(lambda _: self._setup_lbry_file_opener()) + d.addCallback(lambda _: self._setup_query_handlers()) + d.addCallback(lambda _: self._setup_server()) d.addCallback(lambda _: self._setup_fetcher()) d.addCallback(lambda _: _disp_startup()) d.callback(None) return defer.succeed(None) + def _start_server(self): + + if self.peer_port is not None: + + server_factory = ServerProtocolFactory(self.session.rate_limiter, + self.query_handlers, + self.session.peer_manager) + try: + self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory) + except error.CannotListenError as e: + import traceback + log.error("Couldn't bind to port %d. %s", self.peer_port, traceback.format_exc()) + raise ValueError("%s lbrynet may already be running on your computer.", str(e)) + return defer.succeed(True) + + def _stop_server(self): + if self.lbry_server_port is not None: + self.lbry_server_port, p = None, self.lbry_server_port + return defer.maybeDeferred(p.stopListening) + else: + return defer.succeed(True) + + def _setup_server(self): + + def restore_running_status(running): + if running is True: + return self._start_server() + return defer.succeed(True) + + dl = self.settings.get_server_running_status() + dl.addCallback(restore_running_status) + return dl + + def _setup_query_handlers(self): + handlers = [ + #CryptBlobInfoQueryHandlerFactory(self.lbry_file_metadata_manager, self.session.wallet, + # self._server_payment_rate_manager), + BlobAvailabilityHandlerFactory(self.session.blob_manager), + #BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, + # self._server_payment_rate_manager), + self.session.wallet.get_wallet_info_query_handler_factory(), + ] + + def get_blob_request_handler_factory(rate): + self.blob_request_payment_rate_manager = PaymentRateManager( + self.session.base_payment_rate_manager, rate + ) + handlers.append(BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, + self.blob_request_payment_rate_manager)) + + d1 = self.settings.get_server_data_payment_rate() + d1.addCallback(get_blob_request_handler_factory) + + dl = defer.DeferredList([d1]) + dl.addCallback(lambda _: self._add_query_handlers(handlers)) + return dl + + def _add_query_handlers(self, query_handlers): + + def _set_query_handlers(statuses): + from future_builtins import zip + for handler, (success, status) in zip(query_handlers, statuses): + if success is True: + self.query_handlers[handler] = status + + ds = [] + for handler in query_handlers: + ds.append(self.settings.get_query_handler_status(handler.get_primary_query_identifier())) + dl = defer.DeferredList(ds) + dl.addCallback(_set_query_handlers) + return dl + def _shutdown(self): print 'Closing lbrynet session' + d = self._stop_server() if self.session is not None: - d = self.session.shut_down() - else: - d = defer.succeed(True) + d.addCallback(lambda _: self.session.shut_down()) return d def _update_settings(self): @@ -174,11 +254,11 @@ class LBRYDaemon(xmlrpc.XMLRPC): def _get_settings(self): d = self.settings.start() d.addCallback(lambda _: self.settings.get_lbryid()) - d.addCallback(self.set_lbryid) + d.addCallback(self._set_lbryid) d.addCallback(lambda _: self._get_lbrycrdd_path()) return d - def set_lbryid(self, lbryid): + def _set_lbryid(self, lbryid): if lbryid is None: return self._make_lbryid() else: @@ -237,7 +317,6 @@ class LBRYDaemon(xmlrpc.XMLRPC): blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port, known_dht_nodes=self.known_dht_nodes, peer_port=self.peer_port, use_upnp=self.use_upnp, wallet=results['wallet']) - self.rpc_conn = self.session.wallet.get_rpc_conn_x() dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) dl.addCallback(combine_results) @@ -426,7 +505,7 @@ class LBRYDaemon(xmlrpc.XMLRPC): d.addCallback(lambda info: int(info['stream_size'])/1000000*self.data_rate) d.addCallback(_add_key_fee) d.addErrback(lambda _: _add_key_fee(0.0)) - reactor.callLater(5.0, _check_est, d, name) + reactor.callLater(7.5, _check_est, d, name) return d @@ -500,10 +579,9 @@ class LBRYDaemon(xmlrpc.XMLRPC): d = self._shutdown() d.addCallback(lambda _: _disp_shutdown()) - d.addCallback(lambda _: reactor.stop()) - d.callback(None) + d.addCallback(lambda _: reactor.callLater(1.0, reactor.stop)) - return d + return defer.succeed('Shutting down') def xmlrpc_get_lbry_files(self): """ @@ -680,7 +758,7 @@ class LBRYDaemon(xmlrpc.XMLRPC): print '[' + str(datetime.now()) + '] Search nametrie: ' + search - filtered_results = [n for n in self.rpc_conn.getnametrie() if n['name'].startswith(search)] + filtered_results = [n for n in self.session.wallet.get_nametrie() if n['name'].startswith(search)] if len(filtered_results) > 25: filtered_results = filtered_results[:25] filtered_results = [n for n in filtered_results if 'txid' in n.keys()] @@ -769,6 +847,38 @@ class LBRYDaemon(xmlrpc.XMLRPC): return d + def xmlrpc_abandon_name(self, txid): + def _disp(txid, tx): + print '[' + str(datetime.now()) + '] Spent coins from claim tx ' + txid + ' --> ' + tx + return tx + + d = defer.Deferred() + d.addCallback(lambda _: self.session.wallet.abandon_name(txid)) + d.addCallback(lambda tx: _disp(txid, tx)) + d.addErrback(lambda err: str(err.getTraceback())) + d.callback(None) + + return d + + def xmlrpc_get_name_claims(self): + def _clean(claims): + for c in claims: + for k in c.keys(): + if type(c[k]) == Decimal: + c[k] = float(c[k]) + return claims + + d = self.session.wallet.get_name_claims() + d.addCallback(_clean) + + return d + + def xmlrpc_get_time_behind_blockchain(self): + d = self.session.wallet.get_most_recent_blocktime() + d.addCallback(get_time_behind_blockchain) + + return d + def main(): daemon = LBRYDaemon() @@ -777,4 +887,4 @@ def main(): reactor.run() if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/lbrynet/lbrynet_daemon/LBRYDownloader.py b/lbrynet/lbrynet_daemon/LBRYDownloader.py index 6895ded68..93bfbfa09 100644 --- a/lbrynet/lbrynet_daemon/LBRYDownloader.py +++ b/lbrynet/lbrynet_daemon/LBRYDownloader.py @@ -104,7 +104,6 @@ class FetcherDaemon(object): self.lbry_metadata_manager = lbry_file_metadata_manager self.seen = [] self.lastbestblock = None - self.rpc_conn = self.wallet.get_rpc_conn_x() self.search = None self.first_run = True self.is_running = False @@ -134,14 +133,14 @@ class FetcherDaemon(object): return msg def _get_names(self): - c = self.rpc_conn.getblockchaininfo() + c = self.wallet.get_blockchain_info() rtn = [] if self.lastbestblock != c: - block = self.rpc_conn.getblock(c['bestblockhash']) + block = self.wallet.get_block(c['bestblockhash']) txids = block['tx'] - transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids] + transactions = [self.wallet.get_tx(t) for t in txids] for t in transactions: - claims = self.rpc_conn.getclaimsfortx(t['txid']) + claims = self.wallet.get_claims_for_tx(t['txid']) # if self.first_run: # # claims = self.rpc_conn.getclaimsfortx("96aca2c60efded5806b7336430c5987b9092ffbea9c6ed444e3bf8e008993e11") # # claims = self.rpc_conn.getclaimsfortx("cc9c7f5225ecb38877e6ca7574d110b23214ac3556b9d65784065ad3a85b4f74")