diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py deleted file mode 100644 index d3a7c758d..000000000 --- a/lbrynet/core/Session.py +++ /dev/null @@ -1,150 +0,0 @@ -import logging -from twisted.internet import defer -from lbrynet.core.BlobManager import DiskBlobManager -from lbrynet.database.storage import SQLiteStorage -from lbrynet.core.RateLimiter import RateLimiter -from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager - -log = logging.getLogger(__name__) - - -class Session(object): - """This class manages all important services common to any application that uses the network. - - the hash announcer, which informs other peers that this peer is - associated with some hash. Usually, this means this peer has a - blob identified by the hash in question, but it can be used for - other purposes. - - the peer finder, which finds peers that are associated with some - hash. - - the blob manager, which keeps track of which blobs have been - downloaded and provides access to them, - - the rate limiter, which attempts to ensure download and upload - rates stay below a set maximum - - upnp, which opens holes in compatible firewalls so that remote - peers can connect to this peer. - """ - - def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None, - known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, - peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None, - dht_node=None, peer_manager=None, download_mirrors=None): - """@param blob_data_payment_rate: The default payment rate for blob data - - @param db_dir: The directory in which levelDB files should be stored - - @param node_id: The unique ID of this node - - @param peer_manager: An object which keeps track of all known - peers. If None, a PeerManager will be created - - @param dht_node_port: The port on which the dht node should - listen for incoming connections - - @param known_dht_nodes: A list of nodes which the dht node - should use to bootstrap into the dht - - @param peer_finder: An object which is used to look up peers - that are associated with some hash. If None, a - DHTPeerFinder will be used, which looks for peers in the - distributed hash table. - - @param hash_announcer: An object which announces to other - peers that this peer is associated with some hash. If - None, and peer_port is not None, a DHTHashAnnouncer will - be used. If None and peer_port is None, a - DummyHashAnnouncer will be used, which will not actually - announce anything. - - @param blob_dir: The directory in which blobs will be - stored. If None and blob_manager is None, blobs will be - stored in memory only. - - @param blob_manager: An object which keeps track of downloaded - blobs and provides access to them. If None, and blob_dir - is not None, a DiskBlobManager will be used, with the - given blob_dir. If None and blob_dir is None, a - TempBlobManager will be used, which stores blobs in memory - only. - - @param peer_port: The port on which other peers should connect - to this peer - - @param rate_limiter: An object which keeps track of the amount - of data transferred to and from this peer, and can limit - that rate if desired - - @param wallet: An object which will be used to keep track of - expected payments and which will pay peers. If None, a - wallet which uses the Point Trader system will be used, - which is meant for testing only - - """ - self.db_dir = db_dir - self.node_id = node_id - self.peer_manager = peer_manager - self.peer_finder = peer_finder - self.hash_announcer = hash_announcer - self.dht_node_port = dht_node_port - self.known_dht_nodes = known_dht_nodes - if self.known_dht_nodes is None: - self.known_dht_nodes = [] - self.blob_dir = blob_dir - self.blob_manager = blob_manager - self.peer_port = peer_port - self.rate_limiter = rate_limiter - self.external_ip = external_ip - self.upnp_redirects = [] - self.wallet = wallet - self.dht_node = dht_node - self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) - self.payment_rate_manager = OnlyFreePaymentsManager() - self.storage = storage or SQLiteStorage(self.db_dir) - self.download_mirrors = download_mirrors - - def setup(self): - """Create the blob directory and database if necessary, start all desired services""" - - log.debug("Starting session.") - - if self.dht_node is not None: - if self.peer_manager is None: - self.peer_manager = self.dht_node.peer_manager - - if self.peer_finder is None: - self.peer_finder = self.dht_node.peer_finder - - d = self.storage.setup() - d.addCallback(lambda _: self._setup_other_components()) - return d - - def shut_down(self): - """Stop all services""" - log.info('Stopping session.') - ds = [] - if self.rate_limiter is not None: - ds.append(defer.maybeDeferred(self.rate_limiter.stop)) - if self.blob_manager is not None: - ds.append(defer.maybeDeferred(self.blob_manager.stop)) - return defer.DeferredList(ds) - - def _setup_other_components(self): - log.debug("Setting up the rest of the components") - - if self.rate_limiter is None: - self.rate_limiter = RateLimiter() - - if self.blob_manager is None: - if self.blob_dir is None: - raise Exception( - "TempBlobManager is no longer supported, specify BlobManager or db_dir") - else: - self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore) - - self.rate_limiter.start() - d = self.blob_manager.setup() - return d diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 7a4303308..89831a3ba 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -425,7 +425,8 @@ class EncryptedFileStreamDescriptorValidator(object): @defer.inlineCallbacks -def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): +def download_sd_blob(blob_hash, blob_manager, peer_finder, rate_limiter, payment_rate_manager, wallet, timeout=None, + download_mirrors=None): """ Downloads a single blob from the network @@ -439,13 +440,13 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): """ downloader = StandaloneBlobDownloader(blob_hash, - session.blob_manager, - session.peer_finder, - session.rate_limiter, + blob_manager, + peer_finder, + rate_limiter, payment_rate_manager, - session.wallet, + wallet, timeout) - mirror = HTTPBlobDownloader(session.blob_manager, [blob_hash], session.download_mirrors) + mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or []) mirror.start() sd_blob = yield downloader.download() mirror.stop() @@ -454,9 +455,9 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): try: validate_descriptor(sd_info) except InvalidStreamDescriptorError as err: - yield session.blob_manager.delete_blobs([blob_hash]) + yield blob_manager.delete_blobs([blob_hash]) raise err raw_sd = yield sd_reader._get_raw_data() - yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)) - yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info) + yield blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)) + yield save_sd_info(blob_manager, sd_blob.blob_hash, sd_info) defer.returnValue(sd_blob) diff --git a/lbrynet/daemon/Component.py b/lbrynet/daemon/Component.py index 8909df65e..a323ff7f1 100644 --- a/lbrynet/daemon/Component.py +++ b/lbrynet/daemon/Component.py @@ -37,6 +37,9 @@ class Component(object): def running(self): return self._running + def get_status(self): + return + def start(self): raise NotImplementedError() diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py index 19183411f..1de589cf8 100644 --- a/lbrynet/daemon/Components.py +++ b/lbrynet/daemon/Components.py @@ -8,7 +8,9 @@ from twisted.internet import defer, threads, reactor, error from lbryum.simple_config import SimpleConfig from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbrynet import conf -from lbrynet.core.Session import Session +from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager +from lbrynet.core.RateLimiter import RateLimiter +from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType from lbrynet.core.Wallet import LBRYumWallet from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory @@ -30,9 +32,9 @@ log = logging.getLogger(__name__) # settings must be initialized before this file is imported DATABASE_COMPONENT = "database" +BLOB_COMPONENT = "blob_manager" HEADERS_COMPONENT = "blockchain_headers" WALLET_COMPONENT = "wallet" -SESSION_COMPONENT = "session" DHT_COMPONENT = "dht" HASH_ANNOUNCER_COMPONENT = "hash_announcer" STREAM_IDENTIFIER_COMPONENT = "stream_identifier" @@ -41,6 +43,10 @@ PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" REFLECTOR_COMPONENT = "reflector" UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" +RATE_LIMITER_COMPONENT = "rate_limiter" +PAYMENT_RATE_COMPONENT = "payment_rate_manager" + + def get_wallet_config(): wallet_type = GCS('wallet') if wallet_type == conf.LBRYCRD_WALLET: @@ -334,40 +340,26 @@ class WalletComponent(Component): self.wallet = None -class SessionComponent(Component): - component_name = SESSION_COMPONENT - depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT] +class BlobComponent(Component): + component_name = BLOB_COMPONENT + depends_on = [DATABASE_COMPONENT, DHT_COMPONENT] def __init__(self, component_manager): Component.__init__(self, component_manager) - self.session = None + self.blob_manager = None @property def component(self): - return self.session + return self.blob_manager - @defer.inlineCallbacks def start(self): - self.session = Session( - GCS('data_rate'), - db_dir=GCS('data_dir'), - node_id=CS.get_node_id(), - blob_dir=CS.get_blobfiles_dir(), - dht_node=self.component_manager.get_component(DHT_COMPONENT), - hash_announcer=self.component_manager.get_component(HASH_ANNOUNCER_COMPONENT), - dht_node_port=GCS('dht_node_port'), - known_dht_nodes=GCS('known_dht_nodes'), - peer_port=GCS('peer_port'), - wallet=self.component_manager.get_component(WALLET_COMPONENT), - external_ip=CS.get_external_ip(), - storage=self.component_manager.get_component(DATABASE_COMPONENT), - download_mirrors=GCS('download_mirrors') - ) - yield self.session.setup() + storage = self.component_manager.get_component(DATABASE_COMPONENT) + dht_node = self.component_manager.get_component(DHT_COMPONENT) + self.blob_manager = DiskBlobManager(CS.get_blobfiles_dir(), storage, dht_node._dataStore) + return self.blob_manager.setup() - @defer.inlineCallbacks def stop(self): - yield self.session.shut_down() + return self.blob_manager.stop() class DHTComponent(Component): @@ -384,6 +376,12 @@ class DHTComponent(Component): def component(self): return self.dht_node + def get_status(self): + return { + 'node_id': CS.get_node_id().encode('hex'), + 'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.contacts) + } + @defer.inlineCallbacks def start(self): self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) @@ -435,9 +433,29 @@ class HashAnnouncerComponent(Component): yield self.hash_announcer.stop() +class RateLimiterComponent(Component): + component_name = RATE_LIMITER_COMPONENT + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.rate_limiter = RateLimiter() + + @property + def component(self): + return self.rate_limiter + + def start(self): + self.rate_limiter.start() + return defer.succeed(None) + + def stop(self): + self.rate_limiter.stop() + return defer.succeed(None) + + class StreamIdentifierComponent(Component): component_name = STREAM_IDENTIFIER_COMPONENT - depends_on = [SESSION_COMPONENT] + depends_on = [DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] def __init__(self, component_manager): Component.__init__(self, component_manager) @@ -449,14 +467,19 @@ class StreamIdentifierComponent(Component): @defer.inlineCallbacks def start(self): - session = self.component_manager.get_component(SESSION_COMPONENT) + dht_node = self.component_manager.get_component(DHT_COMPONENT) + rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT) + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + storage = self.component_manager.get_component(DATABASE_COMPONENT) + wallet = self.component_manager.get_component(WALLET_COMPONENT) + add_lbry_file_to_sd_identifier(self.sd_identifier) file_saver_factory = EncryptedFileSaverFactory( - session.peer_finder, - session.rate_limiter, - session.blob_manager, - session.storage, - session.wallet, + dht_node.peer_finder, + rate_limiter, + blob_manager, + storage, + wallet, GCS('download_directory') ) yield self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory) @@ -465,9 +488,28 @@ class StreamIdentifierComponent(Component): pass +class PaymentRateComponent(Component): + component_name = PAYMENT_RATE_COMPONENT + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.payment_rate_manager = OnlyFreePaymentsManager() + + @property + def component(self): + return self.payment_rate_manager + + def start(self): + return defer.succeed(None) + + def stop(self): + return defer.succeed(None) + + class FileManagerComponent(Component): component_name = FILE_MANAGER_COMPONENT - depends_on = [SESSION_COMPONENT, STREAM_IDENTIFIER_COMPONENT] + depends_on = [DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, + STREAM_IDENTIFIER_COMPONENT, PAYMENT_RATE_COMPONENT] def __init__(self, component_manager): Component.__init__(self, component_manager) @@ -477,12 +519,25 @@ class FileManagerComponent(Component): def component(self): return self.file_manager + def get_status(self): + if not self.file_manager: + return + return { + 'managed_streams': len(self.file_manager.lbry_files) + } + @defer.inlineCallbacks def start(self): - session = self.component_manager.get_component(SESSION_COMPONENT) + dht_node = self.component_manager.get_component(DHT_COMPONENT) + rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT) + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + storage = self.component_manager.get_component(DATABASE_COMPONENT) + wallet = self.component_manager.get_component(WALLET_COMPONENT) sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT) + payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT) log.info('Starting the file manager') - self.file_manager = EncryptedFileManager(session, sd_identifier) + self.file_manager = EncryptedFileManager(dht_node.peer_finder, rate_limiter, blob_manager, wallet, + payment_rate_manager, storage, sd_identifier) yield self.file_manager.setup() log.info('Done setting up file manager') @@ -493,7 +548,8 @@ class FileManagerComponent(Component): class PeerProtocolServerComponent(Component): component_name = PEER_PROTOCOL_SERVER_COMPONENT - depends_on = [SESSION_COMPONENT, UPNP_COMPONENT] + depends_on = [UPNP_COMPONENT, DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, + PAYMENT_RATE_COMPONENT] def __init__(self, component_manager): Component.__init__(self, component_manager) @@ -507,17 +563,22 @@ class PeerProtocolServerComponent(Component): def start(self): query_handlers = {} upnp_component = self.component_manager.get_component(UPNP_COMPONENT) + dht_node = self.component_manager.get_component(DHT_COMPONENT) + rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT) + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + wallet = self.component_manager.get_component(WALLET_COMPONENT) + payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT) + peer_port, udp_port = upnp_component.get_redirects() - session = self.component_manager.get_component(SESSION_COMPONENT) handlers = [ BlobRequestHandlerFactory( - session.blob_manager, - session.wallet, - session.payment_rate_manager, + blob_manager, + wallet, + payment_rate_manager, self.component_manager.analytics_manager ), - session.wallet.get_wallet_info_query_handler_factory(), + wallet.get_wallet_info_query_handler_factory(), ] for handler in handlers: @@ -525,7 +586,7 @@ class PeerProtocolServerComponent(Component): query_handlers[query_id] = handler if peer_port is not None: - server_factory = ServerProtocolFactory(session.rate_limiter, query_handlers, session.peer_manager) + server_factory = ServerProtocolFactory(rate_limiter, query_handlers, dht_node.peer_manager) try: log.info("Peer protocol listening on TCP %d", peer_port) @@ -547,7 +608,7 @@ class PeerProtocolServerComponent(Component): class ReflectorComponent(Component): component_name = REFLECTOR_COMPONENT - depends_on = [SESSION_COMPONENT, FILE_MANAGER_COMPONENT] + depends_on = [DHT_COMPONENT, BLOB_COMPONENT, FILE_MANAGER_COMPONENT] def __init__(self, component_manager): Component.__init__(self, component_manager) @@ -561,11 +622,10 @@ class ReflectorComponent(Component): @defer.inlineCallbacks def start(self): log.info("Starting reflector server") - - session = self.component_manager.get_component(SESSION_COMPONENT) + dht_node = self.component_manager.get_component(DHT_COMPONENT) + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) - reflector_factory = reflector_server_factory(session.peer_manager, session.blob_manager, file_manager) - + reflector_factory = reflector_server_factory(dht_node.peer_manager, blob_manager, file_manager) try: self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory) log.info('Started reflector on port %s', self.reflector_server_port) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 6b99a101b..4cd73469d 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -25,14 +25,13 @@ from lbryschema.decode import smart_decode from lbrynet.core.system_info import get_lbrynet_version from lbrynet import conf from lbrynet.reflector import reupload -from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT -from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT -from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT +from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT +from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT, RATE_LIMITER_COMPONENT +from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT, PAYMENT_RATE_COMPONENT, UPNP_COMPONENT from lbrynet.daemon.ComponentManager import RequiredCondition from lbrynet.daemon.Downloader import GetStream from lbrynet.daemon.Publisher import Publisher from lbrynet.daemon.auth.server import AuthJSONRPCServer -from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core import utils, system_info from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.Error import InsufficientFundsError, UnknownNameError @@ -186,13 +185,16 @@ class Daemon(AuthJSONRPCServer): """ component_attributes = { - EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager", DATABASE_COMPONENT: "storage", - SESSION_COMPONENT: "session", - WALLET_COMPONENT: "wallet", DHT_COMPONENT: "dht_node", + WALLET_COMPONENT: "wallet", STREAM_IDENTIFIER_COMPONENT: "sd_identifier", FILE_MANAGER_COMPONENT: "file_manager", + EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager", + PAYMENT_RATE_COMPONENT: "payment_rate_manager", + RATE_LIMITER_COMPONENT: "rate_limiter", + BLOB_COMPONENT: "blob_manager", + UPNP_COMPONENT: "upnp" } def __init__(self, analytics_manager=None, component_manager=None): @@ -218,9 +220,12 @@ class Daemon(AuthJSONRPCServer): self.dht_node = None self.wallet = None self.sd_identifier = None - self.session = None self.file_manager = None self.exchange_rate_manager = None + self.payment_rate_manager = None + self.rate_limiter = None + self.blob_manager = None + self.upnp = None # TODO: delete this self.streams = {} @@ -254,10 +259,10 @@ class Daemon(AuthJSONRPCServer): if not blob_hash: raise Exception("Nothing to download") - rate_manager = rate_manager or self.session.payment_rate_manager + rate_manager = rate_manager or self.payment_rate_manager timeout = timeout or 30 downloader = StandaloneBlobDownloader( - blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter, + blob_hash, self.blob_manager, self.dht_node.peer_finder, self.rate_limiter, rate_manager, self.wallet, timeout ) return downloader.download() @@ -275,7 +280,7 @@ class Daemon(AuthJSONRPCServer): } blobs = {} try: - sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) + sd_host = yield self.blob_manager.get_host_downloaded_from(sd_hash) except Exception: sd_host = None report["sd_blob"] = sd_host @@ -320,11 +325,11 @@ class Daemon(AuthJSONRPCServer): else: download_id = utils.random_string() self.analytics_manager.send_download_started(download_id, name, claim_dict) - - self.streams[sd_hash] = GetStream(self.sd_identifier, self.session, - self.exchange_rate_manager, conf.settings['max_key_fee'], - conf.settings['disable_max_key_fee'], - conf.settings['data_rate'], timeout) + self.streams[sd_hash] = GetStream(self.sd_identifier, self.wallet, self.exchange_rate_manager, + self.blob_manager, self.dht_node.peer_finder, self.rate_limiter, + self.payment_rate_manager, self.storage, conf.settings['max_key_fee'], + conf.settings['disable_max_key_fee'], conf.settings['data_rate'], + timeout) try: lbry_file, finished_deferred = yield self.streams[sd_hash].start( claim_dict, name, txid, nout, file_name @@ -350,9 +355,8 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None, claim_address=None, change_address=None): - - publisher = Publisher(self.session, self.file_manager, self.wallet, - certificate_id) + publisher = Publisher(self.blob_manager, self.payment_rate_manager, self.storage, self.file_manager, + self.wallet, certificate_id) parse_lbry_uri(name) if not file_path: stream_hash = yield self.storage.get_stream_hash_for_sd_hash( @@ -388,16 +392,17 @@ class Daemon(AuthJSONRPCServer): def _get_or_download_sd_blob(self, blob, sd_hash): if blob: - return self.session.blob_manager.get_blob(blob[0]) + return self.blob_manager.get_blob(blob[0]) return download_sd_blob( - self.session, sd_hash, self.session.payment_rate_manager, conf.settings['search_timeout'] + sd_hash, self.blob_manager, self.dht_node.peer_finder, self.rate_limiter, self.payment_rate_manager, + self.wallet, timeout=conf.settings['search_timeout'], download_mirrors=conf.settings['download_mirrors'] ) def get_or_download_sd_blob(self, sd_hash): """Return previously downloaded sd blob if already in the blob manager, otherwise download and return it """ - d = self.session.blob_manager.completed_blobs([sd_hash]) + d = self.blob_manager.completed_blobs([sd_hash]) d.addCallback(self._get_or_download_sd_blob, sd_hash) return d @@ -416,7 +421,7 @@ class Daemon(AuthJSONRPCServer): Calculate estimated LBC cost for a stream given its size in bytes """ - if self.session.payment_rate_manager.generous: + if self.payment_rate_manager.generous: return 0.0 return size / (10 ** 6) * conf.settings['data_rate'] @@ -693,7 +698,7 @@ class Daemon(AuthJSONRPCServer): 'blocks_behind': (int) remote_height - local_height, 'best_blockhash': (str) block hash of most recent block, }, - 'dht_node_status': { + 'dht': { 'node_id': (str) lbry dht node id - hex encoded, 'peers_in_routing_table': (int) the number of peers in the routing table, }, @@ -708,14 +713,6 @@ class Daemon(AuthJSONRPCServer): } """ - # on startup, the wallet or network won't be available but we still need this call to work - has_wallet = self.session and self.wallet and self.wallet.network - local_height = self.wallet.network.get_local_height() if has_wallet else 0 - remote_height = self.wallet.network.get_server_height() if has_wallet else 0 - best_hash = (yield self.wallet.get_best_blockhash()) if has_wallet else None - wallet_is_encrypted = has_wallet and self.wallet.wallet and \ - self.wallet.wallet.use_encryption - connection_code = CONNECTION_STATUS_CONNECTED if utils.check_connection() else CONNECTION_STATUS_NETWORK response = { 'installation_id': conf.settings.installation_id, @@ -1285,7 +1282,9 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(results) defer.returnValue(response) - @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @requires(STREAM_IDENTIFIER_COMPONENT, WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, + DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, + conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks def jsonrpc_get(self, uri, file_name=None, timeout=None): """ @@ -1476,7 +1475,9 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) - @requires(WALLET_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @requires(STREAM_IDENTIFIER_COMPONENT, WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, + DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, + conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks def jsonrpc_stream_cost_estimate(self, uri, size=None): """ @@ -1631,7 +1632,8 @@ class Daemon(AuthJSONRPCServer): result = yield self.wallet.import_certificate_info(serialized_certificate_info) defer.returnValue(result) - @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, + conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None, description=None, author=None, language=None, license=None, @@ -2514,7 +2516,8 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d - @requires(WALLET_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @requires(WALLET_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, + conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): """ @@ -2545,8 +2548,7 @@ class Daemon(AuthJSONRPCServer): } timeout = timeout or 30 - payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager) - blob = yield self._download_blob(blob_hash, rate_manager=payment_rate_manager, + blob = yield self._download_blob(blob_hash, rate_manager=self.payment_rate_manager, timeout=timeout) if encoding and encoding in decoders: blob_file = blob.open_for_reading() @@ -2558,7 +2560,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) - @requires(SESSION_COMPONENT) + @requires(BLOB_COMPONENT, DATABASE_COMPONENT) @defer.inlineCallbacks def jsonrpc_blob_delete(self, blob_hash): """ @@ -2574,7 +2576,7 @@ class Daemon(AuthJSONRPCServer): (str) Success/fail message """ - if blob_hash not in self.session.blob_manager.blobs: + if blob_hash not in self.blob_manager.blobs: response = yield self._render_response("Don't have that blob") defer.returnValue(response) try: @@ -2582,7 +2584,7 @@ class Daemon(AuthJSONRPCServer): yield self.storage.delete_stream(stream_hash) except Exception as err: pass - yield self.session.blob_manager.delete_blobs([blob_hash]) + yield self.blob_manager.delete_blobs([blob_hash]) response = yield self._render_response("Deleted %s" % blob_hash) defer.returnValue(response) @@ -2612,7 +2614,7 @@ class Daemon(AuthJSONRPCServer): err.trap(defer.TimeoutError) return [] - finished_deferred.addTimeout(timeout or conf.settings['peer_search_timeout'], self.session.dht_node.clock) + finished_deferred.addTimeout(timeout or conf.settings['peer_search_timeout'], self.dht_node.clock) finished_deferred.addErrback(trap_timeout) peers = yield finished_deferred results = [ @@ -2625,7 +2627,7 @@ class Daemon(AuthJSONRPCServer): ] defer.returnValue(results) - @requires(SESSION_COMPONENT, DHT_COMPONENT, conditions=[DHT_HAS_CONTACTS]) + @requires(DATABASE_COMPONENT) @defer.inlineCallbacks def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): """ @@ -2698,7 +2700,7 @@ class Daemon(AuthJSONRPCServer): results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) defer.returnValue(results) - @requires(SESSION_COMPONENT, WALLET_COMPONENT) + @requires(BLOB_COMPONENT, WALLET_COMPONENT) @defer.inlineCallbacks def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, finished=None, page_size=None, page=None): @@ -2737,16 +2739,16 @@ class Daemon(AuthJSONRPCServer): if stream_hash: crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash) blobs = yield defer.gatherResults([ - self.session.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) + self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None ]) else: blobs = [] # get_blobs_for_stream does not include the sd blob, so we'll add it manually - if sd_hash in self.session.blob_manager.blobs: - blobs = [self.session.blob_manager.blobs[sd_hash]] + blobs + if sd_hash in self.blob_manager.blobs: + blobs = [self.blob_manager.blobs[sd_hash]] + blobs else: - blobs = self.session.blob_manager.blobs.itervalues() + blobs = self.blob_manager.blobs.itervalues() if needed: blobs = [blob for blob in blobs if not blob.get_is_verified()] @@ -2762,7 +2764,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(blob_hashes_for_return) defer.returnValue(response) - @requires(SESSION_COMPONENT) + @requires(BLOB_COMPONENT) def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None): """ Reflects specified blobs @@ -2777,11 +2779,11 @@ class Daemon(AuthJSONRPCServer): (list) reflected blob hashes """ - d = reupload.reflect_blob_hashes(blob_hashes, self.session.blob_manager, reflector_server) + d = reupload.reflect_blob_hashes(blob_hashes, self.blob_manager, reflector_server) d.addCallback(lambda r: self._render_response(r)) return d - @requires(SESSION_COMPONENT) + @requires(BLOB_COMPONENT) def jsonrpc_blob_reflect_all(self): """ Reflects all saved blobs @@ -2796,8 +2798,8 @@ class Daemon(AuthJSONRPCServer): (bool) true if successful """ - d = self.session.blob_manager.get_all_verified_blobs() - d.addCallback(reupload.reflect_blob_hashes, self.session.blob_manager) + d = self.blob_manager.get_all_verified_blobs() + d.addCallback(reupload.reflect_blob_hashes, self.blob_manager) d.addCallback(lambda r: self._render_response(r)) return d @@ -2943,7 +2945,7 @@ class Daemon(AuthJSONRPCServer): return self._blob_availability(blob_hash, search_timeout, blob_timeout) - @requires(SESSION_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @AuthJSONRPCServer.deprecated("stream_availability") def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): """ @@ -2964,7 +2966,7 @@ class Daemon(AuthJSONRPCServer): return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout) - @requires(SESSION_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): """ @@ -3012,7 +3014,7 @@ class Daemon(AuthJSONRPCServer): 'head_blob_hash': None, 'head_blob_availability': {}, 'use_upnp': conf.settings['use_upnp'], - 'upnp_redirect_is_set': len(self.session.upnp_redirects) > 0, + 'upnp_redirect_is_set': len(self.upnp.get_redirects()) > 0, 'error': None } @@ -3042,7 +3044,7 @@ class Daemon(AuthJSONRPCServer): response['sd_hash'] = sd_hash head_blob_hash = None downloader = self._get_single_peer_downloader() - have_sd_blob = sd_hash in self.session.blob_manager.blobs + have_sd_blob = sd_hash in self.blob_manager.blobs try: sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout, encoding="json") @@ -3141,17 +3143,6 @@ def iter_lbry_file_search_values(search_fields): yield searchtype, value -def get_blob_payment_rate_manager(session, payment_rate_manager=None): - if payment_rate_manager: - rate_managers = { - 'only-free': OnlyFreePaymentsManager() - } - if payment_rate_manager in rate_managers: - payment_rate_manager = rate_managers[payment_rate_manager] - log.info("Downloading blob with rate manager: %s", payment_rate_manager) - return payment_rate_manager or session.payment_rate_manager - - def create_key_getter(field): search_path = field.split('.') def key_getter(value): diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 67873218a..83063b5e4 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -30,8 +30,8 @@ log = logging.getLogger(__name__) class GetStream(object): - def __init__(self, sd_identifier, session, exchange_rate_manager, - max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): + def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter, + payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): self.timeout = timeout or conf.settings['download_timeout'] self.data_rate = data_rate or conf.settings['data_rate'] @@ -41,11 +41,14 @@ class GetStream(object): self.timeout_counter = 0 self.code = None self.sd_hash = None - self.session = session - self.wallet = self.session.wallet + self.blob_manager = blob_manager + self.peer_finder = peer_finder + self.rate_limiter = rate_limiter + self.wallet = wallet self.exchange_rate_manager = exchange_rate_manager - self.payment_rate_manager = self.session.payment_rate_manager + self.payment_rate_manager = payment_rate_manager self.sd_identifier = sd_identifier + self.storage = storage self.downloader = None self.checker = LoopingCall(self.check_status) @@ -174,15 +177,16 @@ class GetStream(object): @defer.inlineCallbacks def _download_sd_blob(self): - sd_blob = yield download_sd_blob(self.session, self.sd_hash, - self.payment_rate_manager, self.timeout) + sd_blob = yield download_sd_blob(self.sd_hash, self.blob_manager, self.peer_finder, self.rate_limiter, + self.payment_rate_manager, self.wallet, self.timeout, + conf.settings['download_mirrors']) defer.returnValue(sd_blob) @defer.inlineCallbacks def _download(self, sd_blob, name, key_fee, txid, nout, file_name=None): self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) yield self.pay_key_fee(key_fee, name) - yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) + yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index 3dc01664c..fd8dad73b 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -11,8 +11,10 @@ log = logging.getLogger(__name__) class Publisher(object): - def __init__(self, session, lbry_file_manager, wallet, certificate_id): - self.session = session + def __init__(self, blob_manager, payment_rate_manager, storage, lbry_file_manager, wallet, certificate_id): + self.blob_manager = blob_manager + self.payment_rate_manager = payment_rate_manager + self.storage = storage self.lbry_file_manager = lbry_file_manager self.wallet = wallet self.certificate_id = certificate_id @@ -30,8 +32,8 @@ class Publisher(object): file_name = os.path.basename(file_path) with file_utils.get_read_handle(file_path) as read_handle: - self.lbry_file = yield create_lbry_file(self.session, self.lbry_file_manager, file_name, - read_handle) + self.lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.payment_rate_manager, + self.lbry_file_manager, file_name, read_handle) if 'source' not in claim_dict['stream']: claim_dict['stream']['source'] = {} @@ -42,7 +44,7 @@ class Publisher(object): claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) # check if we have a file already for this claim (if this is a publish update with a new stream) - old_stream_hashes = yield self.session.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'], + old_stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'], self.lbry_file.stream_hash) if old_stream_hashes: for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes, @@ -50,7 +52,7 @@ class Publisher(object): yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) log.info("Removed old stream for claim update: %s", lbry_file.stream_hash) - yield self.session.storage.save_content_claim( + yield self.storage.save_content_claim( self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) ) defer.returnValue(claim_out) @@ -60,7 +62,7 @@ class Publisher(object): """Make a claim without creating a lbry file""" claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) if stream_hash: # the stream_hash returned from the db will be None if this isn't a stream we have - yield self.session.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], + yield self.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])) self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0] defer.returnValue(claim_out) diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index 49f8ce5f4..a5411d2ec 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -59,7 +59,8 @@ class EncryptedFileStreamCreator(CryptStreamCreator): # we can simply read the file from the disk without needing to # involve reactor. @defer.inlineCallbacks -def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=None, iv_generator=None): +def create_lbry_file(blob_manager, storage, payment_rate_manager, lbry_file_manager, file_name, file_handle, + key=None, iv_generator=None): """Turn a plain file into an LBRY File. An LBRY File is a collection of encrypted blobs of data and the metadata that binds them @@ -98,7 +99,7 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non file_directory = os.path.dirname(file_handle.name) lbry_file_creator = EncryptedFileStreamCreator( - session.blob_manager, lbry_file_manager, base_file_name, key, iv_generator + blob_manager, lbry_file_manager, base_file_name, key, iv_generator ) yield lbry_file_creator.setup() @@ -114,18 +115,18 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non log.debug("making the sd blob") sd_info = lbry_file_creator.sd_info - descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager) + descriptor_writer = BlobStreamDescriptorWriter(blob_manager) sd_hash = yield descriptor_writer.create_descriptor(sd_info) log.debug("saving the stream") - yield session.storage.store_stream( + yield storage.store_stream( sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'], sd_info['suggested_file_name'], sd_info['blobs'] ) log.debug("adding to the file manager") lbry_file = yield lbry_file_manager.add_published_file( - sd_info['stream_hash'], sd_hash, binascii.hexlify(file_directory), session.payment_rate_manager, - session.payment_rate_manager.min_blob_data_payment_rate + sd_info['stream_hash'], sd_hash, binascii.hexlify(file_directory), payment_rate_manager, + payment_rate_manager.min_blob_data_payment_rate ) defer.returnValue(lbry_file) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index abff82fef..437b474f3 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -28,15 +28,18 @@ class EncryptedFileManager(object): # when reflecting files, reflect up to this many files at a time CONCURRENT_REFLECTS = 5 - def __init__(self, session, sd_identifier): - + def __init__(self, peer_finder, rate_limiter, blob_manager, wallet, payment_rate_manager, storage, sd_identifier): self.auto_re_reflect = conf.settings['reflect_uploads'] and conf.settings['auto_re_reflect_interval'] > 0 self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval'] - self.session = session - self.storage = session.storage + self.download_mirrors = conf.settings['download_mirrors'] + self.peer_finder = peer_finder + self.rate_limiter = rate_limiter + self.blob_manager = blob_manager + self.wallet = wallet + self.payment_rate_manager = payment_rate_manager + self.storage = storage # TODO: why is sd_identifier part of the file manager? self.sd_identifier = sd_identifier - assert sd_identifier self.lbry_files = [] self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) @@ -47,14 +50,14 @@ class EncryptedFileManager(object): log.info("Started file manager") def get_lbry_file_status(self, lbry_file): - return self.session.storage.get_lbry_file_status(lbry_file.rowid) + return self.storage.get_lbry_file_status(lbry_file.rowid) def set_lbry_file_data_payment_rate(self, lbry_file, new_rate): - return self.session.storage(lbry_file.rowid, new_rate) + return self.storage(lbry_file.rowid, new_rate) def change_lbry_file_status(self, lbry_file, status): log.debug("Changing status of %s to %s", lbry_file.stream_hash, status) - return self.session.storage.change_file_status(lbry_file.rowid, status) + return self.storage.change_file_status(lbry_file.rowid, status) def get_lbry_file_status_reports(self): ds = [] @@ -80,20 +83,20 @@ class EncryptedFileManager(object): return ManagedEncryptedFileDownloader( rowid, stream_hash, - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.session.storage, + self.peer_finder, + self.rate_limiter, + self.blob_manager, + self.storage, self, payment_rate_manager, - self.session.wallet, + self.wallet, download_directory, file_name, stream_name=stream_name, sd_hash=sd_hash, key=key, suggested_file_name=suggested_file_name, - download_mirrors=self.session.download_mirrors + download_mirrors=self.download_mirrors ) def _start_lbry_file(self, file_info, payment_rate_manager, claim_info): @@ -116,9 +119,9 @@ class EncryptedFileManager(object): @defer.inlineCallbacks def _start_lbry_files(self): - files = yield self.session.storage.get_all_lbry_files() - claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) - prm = self.session.payment_rate_manager + files = yield self.storage.get_all_lbry_files() + claim_infos = yield self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) + prm = self.payment_rate_manager log.info("Starting %i files", len(files)) for file_info in files: @@ -154,7 +157,7 @@ class EncryptedFileManager(object): @defer.inlineCallbacks def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate): status = ManagedEncryptedFileDownloader.STATUS_FINISHED - stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False) + stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False) key = stream_metadata['key'] stream_name = stream_metadata['stream_name'] file_name = stream_metadata['suggested_file_name'] @@ -175,9 +178,9 @@ class EncryptedFileManager(object): def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None, blob_data_rate=None, status=None, file_name=None): status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED - payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager + payment_rate_manager = payment_rate_manager or self.payment_rate_manager blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate - stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False) + stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False) key = stream_metadata['key'] stream_name = stream_metadata['stream_name'] file_name = file_name or stream_metadata['suggested_file_name'] @@ -187,7 +190,7 @@ class EncryptedFileManager(object): rowid = yield self.storage.save_downloaded_file( stream_hash, os.path.basename(file_name.decode('hex')).encode('hex'), download_directory, blob_data_rate ) - file_name = yield self.session.storage.get_filename_for_rowid(rowid) + file_name = yield self.storage.get_filename_for_rowid(rowid) lbry_file = self._get_lbry_file( rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, stream_metadata['suggested_file_name'] @@ -222,7 +225,7 @@ class EncryptedFileManager(object): del self.storage.content_claim_callbacks[lbry_file.stream_hash] yield lbry_file.delete_data() - yield self.session.storage.delete_stream(lbry_file.stream_hash) + yield self.storage.delete_stream(lbry_file.stream_hash) if delete_file and os.path.isfile(full_path): os.remove(full_path)