remove functions and attributes in Session and Daemon that are now part of components

-rename attributes in daemon to use components
This commit is contained in:
hackrush 2018-07-24 12:22:11 -04:00 committed by Jack Robison
parent 7e8ca842a2
commit 40d8e96811
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 125 additions and 478 deletions

View file

@ -1,11 +1,8 @@
import logging
import miniupnpc
from twisted.internet import threads, defer
from twisted.internet import defer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht import node, hashannouncer
from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager
log = logging.getLogger(__name__)
@ -32,11 +29,10 @@ class Session(object):
peers can connect to this peer.
"""
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None,
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None,
storage=None):
peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None,
dht_node=None, peer_manager=None):
"""@param blob_data_payment_rate: The default payment rate for blob data
@param db_dir: The directory in which levelDB files should be stored
@ -78,10 +74,6 @@ class Session(object):
@param peer_port: The port on which other peers should connect
to this peer
@param use_upnp: Whether or not to try to open a hole in the
firewall so that outside peers can connect to this peer's
peer_port and dht_node_port
@param rate_limiter: An object which keeps track of the amount
of data transferred to and from this peer, and can limit
that rate if desired
@ -103,20 +95,14 @@ class Session(object):
self.known_dht_nodes = []
self.blob_dir = blob_dir
self.blob_manager = blob_manager
# self.blob_tracker = None
# self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker
self.peer_port = peer_port
self.use_upnp = use_upnp
self.rate_limiter = rate_limiter
self.external_ip = external_ip
self.upnp_redirects = []
self.wallet = wallet
self.dht_node_class = dht_node_class
self.dht_node = None
self.dht_node = dht_node
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = OnlyFreePaymentsManager()
# self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
# self.is_generous = is_generous
self.storage = storage or SQLiteStorage(self.db_dir)
def setup(self):
@ -124,15 +110,14 @@ class Session(object):
log.debug("Starting session.")
if self.node_id is None:
self.node_id = generate_id()
if self.dht_node is not None:
if self.peer_manager is None:
self.peer_manager = self.dht_node.peer_manager
if self.use_upnp is True:
d = self._try_upnp()
else:
d = defer.succeed(True)
d.addCallback(lambda _: self.storage.setup())
d.addCallback(lambda _: self._setup_dht())
if self.peer_finder is None:
self.peer_finder = self.dht_node.peer_finder
d = self.storage.setup()
d.addCallback(lambda _: self._setup_other_components())
return d
@ -140,97 +125,12 @@ class Session(object):
"""Stop all services"""
log.info('Stopping session.')
ds = []
if self.hash_announcer:
self.hash_announcer.stop()
# if self.blob_tracker is not None:
# ds.append(defer.maybeDeferred(self.blob_tracker.stop))
if self.dht_node is not None:
ds.append(defer.maybeDeferred(self.dht_node.stop))
if self.rate_limiter is not None:
ds.append(defer.maybeDeferred(self.rate_limiter.stop))
if self.wallet is not None:
ds.append(defer.maybeDeferred(self.wallet.stop))
if self.blob_manager is not None:
ds.append(defer.maybeDeferred(self.blob_manager.stop))
if self.use_upnp is True:
ds.append(defer.maybeDeferred(self._unset_upnp))
return defer.DeferredList(ds)
def _try_upnp(self):
log.debug("In _try_upnp")
def get_free_port(upnp, port, protocol):
# returns an existing mapping if it exists
mapping = upnp.getspecificportmapping(port, protocol)
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping[1]
return get_free_port(upnp, port + 1, protocol)
def get_port_mapping(upnp, port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
port = get_free_port(upnp, port, protocol)
if isinstance(port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, port, protocol, upnp.lanaddr, port)
return port
upnp.addportmapping(port, protocol, upnp.lanaddr, port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port,
protocol, upnp.lanaddr, port)
return port
def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port:
self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port:
self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
return False
def upnp_failed(err):
log.warning("UPnP failed. Reason: %s", err.getErrorMessage())
return False
d = threads.deferToThread(threaded_try_upnp)
d.addErrback(upnp_failed)
return d
def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary
self.dht_node = self.dht_node_class(
node_id=self.node_id,
udpPort=self.dht_node_port,
externalIP=self.external_ip,
peerPort=self.peer_port,
peer_manager=self.peer_manager,
peer_finder=self.peer_finder,
)
if not self.hash_announcer:
self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage)
self.peer_manager = self.dht_node.peer_manager
self.peer_finder = self.dht_node.peer_finder
d = self.dht_node.start(self.known_dht_nodes)
d.addCallback(lambda _: log.info("Joined the dht"))
d.addCallback(lambda _: self.hash_announcer.start())
def _setup_other_components(self):
log.debug("Setting up the rest of the components")
@ -244,39 +144,6 @@ class Session(object):
else:
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore)
# if self.blob_tracker is None:
# self.blob_tracker = self.blob_tracker_class(
# self.blob_manager, self.dht_node.peer_finder, self.dht_node
# )
# if self.payment_rate_manager is None:
# self.payment_rate_manager = self.payment_rate_manager_class(
# self.base_payment_rate_manager, self.blob_tracker, self.is_generous
# )
self.rate_limiter.start()
d = self.blob_manager.setup()
d.addCallback(lambda _: self.wallet.start())
# d.addCallback(lambda _: self.blob_tracker.start())
return d
def _unset_upnp(self):
log.info("Unsetting upnp for session")
def threaded_unset_upnp():
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
for port, protocol in self.upnp_redirects:
if u.getspecificportmapping(port, protocol) is None:
log.warning(
"UPnP redirect for %s %d was removed by something else.",
protocol, port)
else:
u.deleteportmapping(port, protocol)
log.info("Removed UPnP redirect for %s %d.", protocol, port)
self.upnp_redirects = []
d = threads.deferToThread(threaded_unset_upnp)
d.addErrback(lambda err: str(err))
return d

View file

@ -8,11 +8,10 @@ import urllib
import json
import textwrap
import signal
import six
from copy import deepcopy
from decimal import Decimal, InvalidOperation
from twisted.web import server
from twisted.internet import defer, threads, error, reactor
from twisted.internet import defer, reactor
from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure
@ -25,28 +24,20 @@ from lbryschema.decode import smart_decode
# TODO: importing this when internet is disabled raises a socket.gaierror
from lbrynet.core.system_info import get_lbrynet_version
from lbrynet.database.storage import SQLiteStorage
from lbrynet import conf
from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET
from lbrynet.reflector import reupload
from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.core.log_support import configure_loggly_handler
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.daemon.Component import ComponentManager
from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT
from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT
from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT
from lbrynet.daemon.Downloader import GetStream
from lbrynet.daemon.Publisher import Publisher
from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.daemon.auth.server import AuthJSONRPCServer
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core import utils, system_info
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType
from lbrynet.core.Session import Session
from lbrynet.core.Wallet import LBRYumWallet
from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError
@ -58,23 +49,6 @@ from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloade
log = logging.getLogger(__name__)
INITIALIZING_CODE = 'initializing'
LOADING_DB_CODE = 'loading_db'
LOADING_WALLET_CODE = 'loading_wallet'
LOADING_FILE_MANAGER_CODE = 'loading_file_manager'
LOADING_SERVER_CODE = 'loading_server'
STARTED_CODE = 'started'
WAITING_FOR_FIRST_RUN_CREDITS = 'waiting_for_credits'
WAITING_FOR_UNLOCK = 'waiting_for_wallet_unlock'
STARTUP_STAGES = [
(INITIALIZING_CODE, 'Initializing'),
(LOADING_DB_CODE, 'Loading databases'),
(LOADING_WALLET_CODE, 'Catching up with the blockchain'),
(LOADING_FILE_MANAGER_CODE, 'Setting up file manager'),
(LOADING_SERVER_CODE, 'Starting lbrynet'),
(STARTED_CODE, 'Started lbrynet'),
(WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits'),
(WAITING_FOR_UNLOCK, 'Waiting for user to unlock the wallet using the wallet_unlock command')
]
# TODO: make this consistent with the stages in Downloader.py
DOWNLOAD_METADATA_CODE = 'downloading_metadata'
@ -178,39 +152,20 @@ class Daemon(AuthJSONRPCServer):
LBRYnet daemon, a jsonrpc interface to lbry functions
"""
allowed_during_startup = [
'daemon_stop', 'status', 'version', 'wallet_unlock'
]
def __init__(self, analytics_manager):
def __init__(self, analytics_manager, component_manager=None):
AuthJSONRPCServer.__init__(self, conf.settings['use_auth_http'])
self.db_dir = conf.settings['data_dir']
self.download_directory = conf.settings['download_directory']
if conf.settings['BLOBFILES_DIR'] == "blobfiles":
self.blobfile_dir = os.path.join(self.db_dir, "blobfiles")
else:
log.info("Using non-default blobfiles directory: %s", conf.settings['BLOBFILES_DIR'])
self.blobfile_dir = conf.settings['BLOBFILES_DIR']
self.data_rate = conf.settings['data_rate']
self.max_key_fee = conf.settings['max_key_fee']
self.disable_max_key_fee = conf.settings['disable_max_key_fee']
self.download_timeout = conf.settings['download_timeout']
self.run_reflector_server = conf.settings['run_reflector_server']
self.wallet_type = conf.settings['wallet']
self.delete_blobs_on_remove = conf.settings['delete_blobs_on_remove']
self.peer_port = conf.settings['peer_port']
self.reflector_port = conf.settings['reflector_port']
self.dht_node_port = conf.settings['dht_node_port']
self.use_upnp = conf.settings['use_upnp']
self.auto_renew_claim_height_delta = conf.settings['auto_renew_claim_height_delta']
self.startup_status = STARTUP_STAGES[0]
self.connected_to_internet = True
self.connection_status_code = None
self.platform = None
self.current_db_revision = 9
self.db_revision_file = conf.settings.get_db_revision_filename()
self.session = None
self._session_id = conf.settings.get_session_id()
# TODO: this should probably be passed into the daemon, or
# possibly have the entire log upload functionality taken out
@ -219,20 +174,28 @@ class Daemon(AuthJSONRPCServer):
self.analytics_manager = analytics_manager
self.node_id = conf.settings.node_id
# components
self.storage = None
self.dht_node = None
self.wallet = None
self.sd_identifier = None
self.session = None
self.file_manager = None
self.exchange_rate_manager = None
self.wallet_user = None
self.wallet_password = None
self.query_handlers = {}
self.waiting_on = {}
self.streams = {}
self.exchange_rate_manager = ExchangeRateManager()
calls = {
Checker.INTERNET_CONNECTION: LoopingCall(CheckInternetConnection(self)),
Checker.CONNECTION_STATUS: LoopingCall(self._update_connection_status),
}
self.looping_call_manager = LoopingCallManager(calls)
self.sd_identifier = StreamDescriptorIdentifier()
self.lbry_file_manager = None
self.storage = None
self.component_manager = component_manager or ComponentManager(
analytics_manager=self.analytics_manager,
skip_components=conf.settings['components_to_skip']
)
@defer.inlineCallbacks
def setup(self):
@ -243,34 +206,21 @@ class Daemon(AuthJSONRPCServer):
self.looping_call_manager.start(Checker.INTERNET_CONNECTION, 3600)
self.looping_call_manager.start(Checker.CONNECTION_STATUS, 30)
self.exchange_rate_manager.start()
yield self._initial_setup()
yield self.component_manager.setup()
self.storage = self.component_manager.get_component("database").storage
yield self._get_session()
yield self._check_wallet_locked()
self.exchange_rate_manager = self.component_manager.get_component(EXCHANGE_RATE_MANAGER_COMPONENT)
self.storage = self.component_manager.get_component(DATABASE_COMPONENT)
self.session = self.component_manager.get_component(SESSION_COMPONENT)
self.wallet = self.component_manager.get_component(WALLET_COMPONENT)
self.dht_node = self.component_manager.get_component(DHT_COMPONENT)
yield self._start_analytics()
yield add_lbry_file_to_sd_identifier(self.sd_identifier)
yield self._setup_stream_identifier()
yield self._setup_lbry_file_manager()
yield self._setup_query_handlers()
yield self._setup_server()
log.info("Starting balance: " + str(self.session.wallet.get_balance()))
self.sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT)
self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
log.info("Starting balance: " + str(self.wallet.get_balance()))
self.announced_startup = True
self.startup_status = STARTUP_STAGES[5]
log.info("Started lbrynet-daemon")
# ###
# # this should be removed with the next db revision
# if migrated:
# missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids()
# while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe
# batch = missing_channel_claim_ids[:100]
# _ = yield self.session.wallet.get_claims_by_ids(*batch)
# missing_channel_claim_ids = missing_channel_claim_ids[100:]
# ###
self._auto_renew()
def _get_platform(self):
@ -301,12 +251,12 @@ class Daemon(AuthJSONRPCServer):
# auto renew is turned off if 0 or some negative number
if self.auto_renew_claim_height_delta < 1:
defer.returnValue(None)
if not self.session.wallet.network.get_remote_height():
if not self.wallet.network.get_remote_height():
log.warning("Failed to get remote height, aborting auto renew")
defer.returnValue(None)
log.debug("Renewing claim")
h = self.session.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta
results = yield self.session.wallet.claim_renew_all_before_expiration(h)
h = self.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta
results = yield self.wallet.claim_renew_all_before_expiration(h)
for outpoint, result in results.iteritems():
if result['success']:
log.info("Renewed claim at outpoint:%s claim ID:%s, paid fee:%s",
@ -315,93 +265,6 @@ class Daemon(AuthJSONRPCServer):
log.info("Failed to renew claim at outpoint:%s, reason:%s",
outpoint, result['reason'])
def _start_server(self):
if self.peer_port is not None:
server_factory = ServerProtocolFactory(self.session.rate_limiter,
self.query_handlers,
self.session.peer_manager)
try:
log.info("Peer protocol listening on TCP %d", self.peer_port)
self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory)
except error.CannotListenError as e:
import traceback
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
" more details.", self.peer_port)
log.error("%s", traceback.format_exc())
raise ValueError("%s lbrynet may already be running on your computer." % str(e))
return defer.succeed(True)
def _start_reflector(self):
if self.run_reflector_server:
log.info("Starting reflector server")
if self.reflector_port is not None:
reflector_factory = reflector_server_factory(
self.session.peer_manager,
self.session.blob_manager,
self.lbry_file_manager
)
try:
self.reflector_server_port = reactor.listenTCP(self.reflector_port,
reflector_factory)
log.info('Started reflector on port %s', self.reflector_port)
except error.CannotListenError as e:
log.exception("Couldn't bind reflector to port %d", self.reflector_port)
raise ValueError(
"{} lbrynet may already be running on your computer.".format(e))
return defer.succeed(True)
def _stop_reflector(self):
if self.run_reflector_server:
log.info("Stopping reflector server")
try:
if self.reflector_server_port is not None:
self.reflector_server_port, p = None, self.reflector_server_port
return defer.maybeDeferred(p.stopListening)
except AttributeError:
return defer.succeed(True)
return defer.succeed(True)
def _stop_file_manager(self):
if self.lbry_file_manager:
self.lbry_file_manager.stop()
return defer.succeed(True)
def _stop_server(self):
try:
if self.lbry_server_port is not None:
self.lbry_server_port, old_port = None, self.lbry_server_port
log.info('Stop listening on port %s', old_port.port)
return defer.maybeDeferred(old_port.stopListening)
else:
return defer.succeed(True)
except AttributeError:
return defer.succeed(True)
def _setup_server(self):
self.startup_status = STARTUP_STAGES[4]
d = self._start_server()
d.addCallback(lambda _: self._start_reflector())
return d
def _setup_query_handlers(self):
handlers = [
BlobRequestHandlerFactory(
self.session.blob_manager,
self.session.wallet,
self.session.payment_rate_manager,
self.analytics_manager
),
self.session.wallet.get_wallet_info_query_handler_factory(),
]
return self._add_query_handlers(handlers)
def _add_query_handlers(self, query_handlers):
for handler in query_handlers:
query_id = handler.get_primary_query_identifier()
self.query_handlers[query_id] = handler
return defer.succeed(None)
@staticmethod
def _already_shutting_down(sig_num, frame):
log.info("Already shutting down")
@ -417,21 +280,14 @@ class Daemon(AuthJSONRPCServer):
signal.signal(signal.SIGTERM, self._already_shutting_down)
log.info("Closing lbrynet session")
log.info("Status at time of shutdown: " + self.startup_status[0])
self._stop_streams()
self.looping_call_manager.shutdown()
if self.analytics_manager:
self.analytics_manager.shutdown()
d = self._stop_server()
d.addErrback(log.fail(), 'Failure while shutting down')
d.addCallback(lambda _: self._stop_reflector())
d.addErrback(log.fail(), 'Failure while shutting down')
d.addCallback(lambda _: self._stop_file_manager())
d.addErrback(log.fail(), 'Failure while shutting down')
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
if self.component_manager is not None:
d = self.component_manager.stop()
d.addErrback(log.fail(), 'Failure while shutting down')
return d
@ -476,88 +332,10 @@ class Daemon(AuthJSONRPCServer):
return defer.succeed(True)
@defer.inlineCallbacks
def _setup_lbry_file_manager(self):
log.info('Starting the file manager')
self.startup_status = STARTUP_STAGES[3]
self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
yield self.lbry_file_manager.setup()
log.info('Done setting up file manager')
def _start_analytics(self):
if not self.analytics_manager.is_started:
self.analytics_manager.start()
def _get_session(self):
def get_wallet():
if self.wallet_type == LBRYCRD_WALLET:
raise ValueError('LBRYcrd Wallet is no longer supported')
elif self.wallet_type == LBRYUM_WALLET:
log.info("Using lbryum wallet")
lbryum_servers = {address: {'t': str(port)}
for address, port in conf.settings['lbryum_servers']}
config = {
'auto_connect': True,
'chain': conf.settings['blockchain_name'],
'default_servers': lbryum_servers
}
if 'use_keyring' in conf.settings:
config['use_keyring'] = conf.settings['use_keyring']
if conf.settings['lbryum_wallet_dir']:
config['lbryum_path'] = conf.settings['lbryum_wallet_dir']
wallet = LBRYumWallet(self.storage, config)
return defer.succeed(wallet)
else:
raise ValueError('Wallet Type {} is not valid'.format(self.wallet_type))
d = get_wallet()
def create_session(wallet):
self.session = Session(
conf.settings['data_rate'],
db_dir=self.db_dir,
node_id=self.node_id,
blob_dir=self.blobfile_dir,
dht_node_port=self.dht_node_port,
known_dht_nodes=conf.settings['known_dht_nodes'],
peer_port=self.peer_port,
use_upnp=self.use_upnp,
wallet=wallet,
is_generous=conf.settings['is_generous_host'],
external_ip=self.platform['ip'],
storage=self.storage
)
self.startup_status = STARTUP_STAGES[2]
d.addCallback(create_session)
d.addCallback(lambda _: self.session.setup())
return d
@defer.inlineCallbacks
def _check_wallet_locked(self):
wallet = self.session.wallet
if wallet.wallet.use_encryption:
self.startup_status = STARTUP_STAGES[7]
yield wallet.check_locked()
def _setup_stream_identifier(self):
file_saver_factory = EncryptedFileSaverFactory(
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.session.storage,
self.session.wallet,
self.download_directory
)
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType,
file_saver_factory)
return defer.succeed(None)
def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
"""
Download a blob
@ -575,7 +353,7 @@ class Daemon(AuthJSONRPCServer):
timeout = timeout or 30
downloader = StandaloneBlobDownloader(
blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter,
rate_manager, self.session.wallet, timeout
rate_manager, self.wallet, timeout
)
return downloader.download()
@ -583,7 +361,7 @@ class Daemon(AuthJSONRPCServer):
def _get_stream_analytics_report(self, claim_dict):
sd_hash = claim_dict.source_hash
try:
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
except Exception:
stream_hash = None
report = {
@ -597,7 +375,7 @@ class Daemon(AuthJSONRPCServer):
sd_host = None
report["sd_blob"] = sd_host
if stream_hash:
blob_infos = yield self.session.storage.get_blobs_for_stream(stream_hash)
blob_infos = yield self.storage.get_blobs_for_stream(stream_hash)
report["known_blobs"] = len(blob_infos)
else:
blob_infos = []
@ -668,11 +446,12 @@ class Daemon(AuthJSONRPCServer):
def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None,
claim_address=None, change_address=None):
publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet,
publisher = Publisher(self.session, self.file_manager, self.wallet,
certificate_id)
parse_lbry_uri(name)
if not file_path:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(claim_dict['stream']['source']['source'])
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(
claim_dict['stream']['source']['source'])
claim_out = yield publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address,
change_address)
else:
@ -697,7 +476,7 @@ class Daemon(AuthJSONRPCServer):
"""
parsed = parse_lbry_uri(name)
resolution = yield self.session.wallet.resolve(parsed.name, check_cache=not force_refresh)
resolution = yield self.wallet.resolve(parsed.name, check_cache=not force_refresh)
if parsed.name in resolution:
result = resolution[parsed.name]
defer.returnValue(result)
@ -752,7 +531,7 @@ class Daemon(AuthJSONRPCServer):
cost = self._get_est_cost_from_stream_size(size)
resolved = yield self.session.wallet.resolve(uri)
resolved = yield self.wallet.resolve(uri)
if uri in resolved and 'claim' in resolved[uri]:
claim = ClaimDict.load_dict(resolved[uri]['claim']['value'])
@ -799,7 +578,7 @@ class Daemon(AuthJSONRPCServer):
Resolve a name and return the estimated stream cost
"""
resolved = yield self.session.wallet.resolve(uri)
resolved = yield self.wallet.resolve(uri)
if resolved:
claim_response = resolved[uri]
else:
@ -879,7 +658,7 @@ class Daemon(AuthJSONRPCServer):
def _get_lbry_file(self, search_by, val, return_json=False, full_status=False):
lbry_file = None
if search_by in FileID:
for l_f in self.lbry_file_manager.lbry_files:
for l_f in self.file_manager.lbry_files:
if l_f.__dict__.get(search_by) == val:
lbry_file = l_f
break
@ -891,7 +670,7 @@ class Daemon(AuthJSONRPCServer):
@defer.inlineCallbacks
def _get_lbry_files(self, return_json=False, full_status=True, **kwargs):
lbry_files = list(self.lbry_file_manager.lbry_files)
lbry_files = list(self.file_manager.lbry_files)
if kwargs:
for search_type, value in iter_lbry_file_search_values(kwargs):
lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value]
@ -928,7 +707,7 @@ class Daemon(AuthJSONRPCServer):
def _get_single_peer_downloader(self):
downloader = SinglePeerDownloader()
downloader.setup(self.session.wallet)
downloader.setup(self.wallet)
return downloader
@defer.inlineCallbacks
@ -1060,7 +839,7 @@ class Daemon(AuthJSONRPCServer):
should_announce_blobs = yield self.session.blob_manager.count_should_announce_blobs()
response['session_status'] = {
'managed_blobs': len(blobs),
'managed_streams': len(self.lbry_file_manager.lbry_files),
'managed_streams': len(self.file_manager.lbry_files),
'announce_queue_size': announce_queue_size,
'should_announce_blobs': should_announce_blobs,
}
@ -1255,10 +1034,10 @@ class Daemon(AuthJSONRPCServer):
(float) amount of lbry credits in wallet
"""
if address is None:
return self._render_response(float(self.session.wallet.get_balance()))
return self._render_response(float(self.wallet.get_balance()))
else:
return self._render_response(float(
self.session.wallet.get_address_balance(address, include_unconfirmed)))
self.wallet.get_address_balance(address, include_unconfirmed)))
@defer.inlineCallbacks
def jsonrpc_wallet_unlock(self, password):
@ -1275,9 +1054,10 @@ class Daemon(AuthJSONRPCServer):
(bool) true if wallet is unlocked, otherwise false
"""
cmd_runner = self.session.wallet.get_cmd_runner()
if cmd_runner.locked:
d = self.session.wallet.wallet_unlocked_d
# the check_locked() in the if statement is needed because that is what sets
# the wallet_unlocked_d deferred ¯\_(ツ)_/¯
if not self.wallet.check_locked():
d = self.wallet.wallet_unlocked_d
d.callback(password)
result = yield d
else:
@ -1300,7 +1080,7 @@ class Daemon(AuthJSONRPCServer):
(bool) true if wallet is decrypted, otherwise false
"""
result = self.session.wallet.decrypt_wallet()
result = self.wallet.decrypt_wallet()
response = yield self._render_response(result)
defer.returnValue(response)
@ -1320,8 +1100,8 @@ class Daemon(AuthJSONRPCServer):
(bool) true if wallet is decrypted, otherwise false
"""
self.session.wallet.encrypt_wallet(new_password)
response = yield self._render_response(self.session.wallet.wallet.use_encryption)
self.wallet.encrypt_wallet(new_password)
response = yield self._render_response(self.wallet.wallet.use_encryption)
defer.returnValue(response)
@defer.inlineCallbacks
@ -1477,9 +1257,9 @@ class Daemon(AuthJSONRPCServer):
"""
if claim_id is not None and txid is None and nout is None:
claim_results = yield self.session.wallet.get_claim_by_claim_id(claim_id)
claim_results = yield self.wallet.get_claim_by_claim_id(claim_id)
elif txid is not None and nout is not None and claim_id is None:
claim_results = yield self.session.wallet.get_claim_by_outpoint(txid, int(nout))
claim_results = yield self.wallet.get_claim_by_outpoint(txid, int(nout))
else:
raise Exception("Must specify either txid/nout, or claim_id")
response = yield self._render_response(claim_results)
@ -1568,7 +1348,7 @@ class Daemon(AuthJSONRPCServer):
except URIParseError:
results[u] = {"error": "%s is not a valid uri" % u}
resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=not force)
resolved = yield self.wallet.resolve(*valid_uris, check_cache=not force)
for resolved_uri in resolved:
results[resolved_uri] = resolved[resolved_uri]
@ -1626,7 +1406,7 @@ class Daemon(AuthJSONRPCServer):
if parsed_uri.is_channel and not parsed_uri.path:
raise Exception("cannot download a channel claim, specify a /path")
resolved_result = yield self.session.wallet.resolve(uri)
resolved_result = yield self.wallet.resolve(uri)
if resolved_result and uri in resolved_result:
resolved = resolved_result[uri]
else:
@ -1693,7 +1473,7 @@ class Daemon(AuthJSONRPCServer):
raise Exception('Unable to find a file for {}:{}'.format(search_type, value))
if status == 'start' and lbry_file.stopped or status == 'stop' and not lbry_file.stopped:
yield self.lbry_file_manager.toggle_lbry_file_running(lbry_file)
yield self.file_manager.toggle_lbry_file_running(lbry_file)
msg = "Started downloading file" if status == 'start' else "Stopped downloading file"
else:
msg = (
@ -1755,8 +1535,8 @@ class Daemon(AuthJSONRPCServer):
file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash
if lbry_file.sd_hash in self.streams:
del self.streams[lbry_file.sd_hash]
yield self.lbry_file_manager.delete_lbry_file(lbry_file,
delete_file=delete_from_download_dir)
yield self.file_manager.delete_lbry_file(lbry_file,
delete_file=delete_from_download_dir)
log.info("Deleted file: %s", file_name)
result = True
@ -1818,14 +1598,14 @@ class Daemon(AuthJSONRPCServer):
if amount <= 0:
raise Exception("Invalid amount")
yield self.session.wallet.update_balance()
if amount >= self.session.wallet.get_balance():
balance = yield self.session.wallet.get_max_usable_balance_for_claim(channel_name)
yield self.wallet.update_balance()
if amount >= self.wallet.get_balance():
balance = yield self.wallet.get_max_usable_balance_for_claim(channel_name)
max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE
if balance <= MAX_UPDATE_FEE_ESTIMATE:
raise InsufficientFundsError(
"Insufficient funds, please deposit additional LBC. Minimum additional LBC needed {}"
. format(MAX_UPDATE_FEE_ESTIMATE - balance))
.format(MAX_UPDATE_FEE_ESTIMATE - balance))
elif amount > max_bid_amount:
raise InsufficientFundsError(
"Please wait for any pending bids to resolve or lower the bid value. "
@ -1833,7 +1613,7 @@ class Daemon(AuthJSONRPCServer):
.format(max_bid_amount)
)
result = yield self.session.wallet.claim_new_channel(channel_name, amount)
result = yield self.wallet.claim_new_channel(channel_name, amount)
self.analytics_manager.send_new_channel()
log.info("Claimed a new channel! Result: %s", result)
response = yield self._render_response(result)
@ -1855,7 +1635,7 @@ class Daemon(AuthJSONRPCServer):
is in the wallet.
"""
result = yield self.session.wallet.channel_list()
result = yield self.wallet.channel_list()
response = yield self._render_response(result)
defer.returnValue(response)
@ -1891,7 +1671,7 @@ class Daemon(AuthJSONRPCServer):
(str) Serialized certificate information
"""
result = yield self.session.wallet.export_certificate_info(claim_id)
result = yield self.wallet.export_certificate_info(claim_id)
defer.returnValue(result)
@defer.inlineCallbacks
@ -1909,7 +1689,7 @@ class Daemon(AuthJSONRPCServer):
(dict) Result dictionary
"""
result = yield self.session.wallet.import_certificate_info(serialized_certificate_info)
result = yield self.wallet.import_certificate_info(serialized_certificate_info)
defer.returnValue(result)
@defer.inlineCallbacks
@ -2003,9 +1783,9 @@ class Daemon(AuthJSONRPCServer):
if bid <= 0.0:
raise ValueError("Bid value must be greater than 0.0")
yield self.session.wallet.update_balance()
if bid >= self.session.wallet.get_balance():
balance = yield self.session.wallet.get_max_usable_balance_for_claim(name)
yield self.wallet.update_balance()
if bid >= self.wallet.get_balance():
balance = yield self.wallet.get_max_usable_balance_for_claim(name)
max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE
if balance <= MAX_UPDATE_FEE_ESTIMATE:
raise InsufficientFundsError(
@ -2052,7 +1832,7 @@ class Daemon(AuthJSONRPCServer):
log.warning("Stripping empty fee from published metadata")
del metadata['fee']
elif 'address' not in metadata['fee']:
address = yield self.session.wallet.get_least_used_address()
address = yield self.wallet.get_least_used_address()
metadata['fee']['address'] = address
if 'fee' in metadata and 'version' not in metadata['fee']:
metadata['fee']['version'] = '_0_0_1'
@ -2108,7 +1888,7 @@ class Daemon(AuthJSONRPCServer):
certificate_id = channel_id
elif channel_name:
certificate_id = None
my_certificates = yield self.session.wallet.channel_list()
my_certificates = yield self.wallet.channel_list()
for certificate in my_certificates:
if channel_name == certificate['name']:
certificate_id = certificate['claim_id']
@ -2151,7 +1931,7 @@ class Daemon(AuthJSONRPCServer):
if nout is None and txid is not None:
raise Exception('Must specify nout')
result = yield self.session.wallet.abandon_claim(claim_id, txid, nout)
result = yield self.wallet.abandon_claim(claim_id, txid, nout)
self.analytics_manager.send_claim_action('abandon')
defer.returnValue(result)
@ -2178,7 +1958,7 @@ class Daemon(AuthJSONRPCServer):
}
"""
result = yield self.session.wallet.support_claim(name, claim_id, amount)
result = yield self.wallet.support_claim(name, claim_id, amount)
self.analytics_manager.send_claim_action('new_support')
defer.returnValue(result)
@ -2217,11 +1997,11 @@ class Daemon(AuthJSONRPCServer):
nout = int(nout)
else:
raise Exception("invalid outpoint")
result = yield self.session.wallet.claim_renew(txid, nout)
result = yield self.wallet.claim_renew(txid, nout)
result = {outpoint: result}
else:
height = int(height)
result = yield self.session.wallet.claim_renew_all_before_expiration(height)
result = yield self.wallet.claim_renew_all_before_expiration(height)
defer.returnValue(result)
@defer.inlineCallbacks
@ -2251,7 +2031,7 @@ class Daemon(AuthJSONRPCServer):
}
"""
result = yield self.session.wallet.send_claim_to_address(claim_id, address, amount)
result = yield self.wallet.send_claim_to_address(claim_id, address, amount)
response = yield self._render_response(result)
defer.returnValue(response)
@ -2289,7 +2069,7 @@ class Daemon(AuthJSONRPCServer):
]
"""
d = self.session.wallet.get_name_claims()
d = self.wallet.get_name_claims()
d.addCallback(lambda claims: self._render_response(claims))
return d
@ -2327,7 +2107,7 @@ class Daemon(AuthJSONRPCServer):
}
"""
claims = yield self.session.wallet.get_claims_for_name(name) # type: dict
claims = yield self.wallet.get_claims_for_name(name) # type: dict
sort_claim_results(claims['claims'])
defer.returnValue(claims)
@ -2404,8 +2184,8 @@ class Daemon(AuthJSONRPCServer):
except URIParseError:
results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri}
resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=False, page=page,
page_size=page_size)
resolved = yield self.wallet.resolve(*valid_uris, check_cache=False, page=page,
page_size=page_size)
for u in resolved:
if 'error' in resolved[u]:
results[u] = resolved[u]
@ -2477,7 +2257,7 @@ class Daemon(AuthJSONRPCServer):
"""
d = self.session.wallet.get_history()
d = self.wallet.get_history()
d.addCallback(lambda r: self._render_response(r))
return d
@ -2495,7 +2275,7 @@ class Daemon(AuthJSONRPCServer):
(dict) JSON formatted transaction
"""
d = self.session.wallet.get_transaction(txid)
d = self.wallet.get_transaction(txid)
d.addCallback(lambda r: self._render_response(r))
return d
@ -2513,7 +2293,7 @@ class Daemon(AuthJSONRPCServer):
(bool) true, if address is associated with current wallet
"""
d = self.session.wallet.address_is_mine(address)
d = self.wallet.address_is_mine(address)
d.addCallback(lambda is_mine: self._render_response(is_mine))
return d
@ -2532,7 +2312,7 @@ class Daemon(AuthJSONRPCServer):
Could contain more than one public key if multisig.
"""
d = self.session.wallet.get_pub_keys(address)
d = self.wallet.get_pub_keys(address)
d.addCallback(lambda r: self._render_response(r))
return d
@ -2551,7 +2331,7 @@ class Daemon(AuthJSONRPCServer):
List of wallet addresses
"""
addresses = yield self.session.wallet.list_addresses()
addresses = yield self.wallet.list_addresses()
response = yield self._render_response(addresses)
defer.returnValue(response)
@ -2573,7 +2353,7 @@ class Daemon(AuthJSONRPCServer):
log.info("Got new wallet address: " + address)
return defer.succeed(address)
d = self.session.wallet.get_new_address()
d = self.wallet.get_new_address()
d.addCallback(_disp)
d.addCallback(lambda address: self._render_response(address))
return d
@ -2597,7 +2377,7 @@ class Daemon(AuthJSONRPCServer):
log.info("Got unused wallet address: " + address)
return defer.succeed(address)
d = self.session.wallet.get_unused_address()
d = self.wallet.get_unused_address()
d.addCallback(_disp)
d.addCallback(lambda address: self._render_response(address))
return d
@ -2624,10 +2404,10 @@ class Daemon(AuthJSONRPCServer):
elif not amount:
raise NullFundsError()
reserved_points = self.session.wallet.reserve_points(address, amount)
reserved_points = self.wallet.reserve_points(address, amount)
if reserved_points is None:
raise InsufficientFundsError()
yield self.session.wallet.send_points_to_address(reserved_points, amount)
yield self.wallet.send_points_to_address(reserved_points, amount)
self.analytics_manager.send_credits_sent()
defer.returnValue(True)
@ -2675,7 +2455,7 @@ class Daemon(AuthJSONRPCServer):
result = yield self.jsonrpc_send_amount_to_address(amount, address)
else:
validate_claim_id(claim_id)
result = yield self.session.wallet.tip_claim(claim_id, amount)
result = yield self.wallet.tip_claim(claim_id, amount)
self.analytics_manager.send_claim_action('new_support')
defer.returnValue(result)
@ -2704,7 +2484,7 @@ class Daemon(AuthJSONRPCServer):
raise NullFundsError()
broadcast = not no_broadcast
tx = yield self.session.wallet.create_addresses_with_balance(
tx = yield self.wallet.create_addresses_with_balance(
num_addresses, amount, broadcast=broadcast)
tx['broadcast'] = broadcast
defer.returnValue(tx)
@ -2738,7 +2518,7 @@ class Daemon(AuthJSONRPCServer):
]
"""
unspent = yield self.session.wallet.list_unspent()
unspent = yield self.wallet.list_unspent()
for i, utxo in enumerate(unspent):
utxo['txid'] = utxo.pop('prevout_hash')
utxo['nout'] = utxo.pop('prevout_n')
@ -2764,10 +2544,10 @@ class Daemon(AuthJSONRPCServer):
"""
if blockhash is not None:
d = self.session.wallet.get_block(blockhash)
d = self.wallet.get_block(blockhash)
elif height is not None:
d = self.session.wallet.get_block_info(height)
d.addCallback(lambda b: self.session.wallet.get_block(b))
d = self.wallet.get_block_info(height)
d.addCallback(lambda b: self.wallet.get_block(b))
else:
# TODO: return a useful error message
return server.failure
@ -2837,8 +2617,8 @@ class Daemon(AuthJSONRPCServer):
response = yield self._render_response("Don't have that blob")
defer.returnValue(response)
try:
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(blob_hash)
yield self.session.storage.delete_stream(stream_hash)
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(blob_hash)
yield self.storage.delete_stream(stream_hash)
except Exception as err:
pass
yield self.session.blob_manager.delete_blobs([blob_hash])
@ -2864,7 +2644,7 @@ class Daemon(AuthJSONRPCServer):
if not utils.is_valid_blobhash(blob_hash):
raise Exception("invalid blob hash")
finished_deferred = self.session.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash))
finished_deferred = self.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash))
def trap_timeout(err):
err.trap(defer.TimeoutError)
@ -2983,14 +2763,14 @@ class Daemon(AuthJSONRPCServer):
if uri:
metadata = yield self._resolve_name(uri)
sd_hash = utils.get_sd_hash(metadata)
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
elif stream_hash:
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash)
elif sd_hash:
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash)
if stream_hash:
crypt_blobs = yield self.session.storage.get_blobs_for_stream(stream_hash)
crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash)
blobs = yield defer.gatherResults([
self.session.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length)
for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None
@ -3071,7 +2851,7 @@ class Daemon(AuthJSONRPCServer):
contact = None
try:
contact = yield self.session.dht_node.findContact(node_id.decode('hex'))
contact = yield self.dht_node.findContact(node_id.decode('hex'))
except TimeoutError:
result = {'error': 'timeout finding peer'}
defer.returnValue(result)
@ -3113,7 +2893,7 @@ class Daemon(AuthJSONRPCServer):
"""
result = {}
data_store = self.session.dht_node._dataStore._dict
data_store = self.dht_node._dataStore._dict
datastore_len = len(data_store)
hosts = {}
@ -3131,8 +2911,8 @@ class Daemon(AuthJSONRPCServer):
blob_hashes = []
result['buckets'] = {}
for i in range(len(self.session.dht_node._routingTable._buckets)):
for contact in self.session.dht_node._routingTable._buckets[i]._contacts:
for i in range(len(self.dht_node._routingTable._buckets)):
for contact in self.dht_node._routingTable._buckets[i]._contacts:
contacts = result['buckets'].get(i, [])
if contact in hosts:
blobs = hosts[contact]
@ -3155,7 +2935,7 @@ class Daemon(AuthJSONRPCServer):
result['contacts'] = contact_set
result['blob_hashes'] = blob_hashes
result['node_id'] = self.session.dht_node.node_id.encode('hex')
result['node_id'] = self.dht_node.node_id.encode('hex')
return self._render_response(result)
def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None):
@ -3254,7 +3034,7 @@ class Daemon(AuthJSONRPCServer):
}
try:
resolved_result = yield self.session.wallet.resolve(uri)
resolved_result = yield self.wallet.resolve(uri)
response['did_resolve'] = True
except UnknownNameError:
response['error'] = "Failed to resolve name"