delete Session.py

split Session into Components
This commit is contained in:
Jack Robison 2018-07-25 15:33:43 -04:00
parent e3c3fafa1e
commit 99207b7221
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
9 changed files with 233 additions and 318 deletions

View file

@ -1,150 +0,0 @@
import logging
from twisted.internet import defer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager
log = logging.getLogger(__name__)
class Session(object):
"""This class manages all important services common to any application that uses the network.
the hash announcer, which informs other peers that this peer is
associated with some hash. Usually, this means this peer has a
blob identified by the hash in question, but it can be used for
other purposes.
the peer finder, which finds peers that are associated with some
hash.
the blob manager, which keeps track of which blobs have been
downloaded and provides access to them,
the rate limiter, which attempts to ensure download and upload
rates stay below a set maximum
upnp, which opens holes in compatible firewalls so that remote
peers can connect to this peer.
"""
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None,
dht_node=None, peer_manager=None, download_mirrors=None):
"""@param blob_data_payment_rate: The default payment rate for blob data
@param db_dir: The directory in which levelDB files should be stored
@param node_id: The unique ID of this node
@param peer_manager: An object which keeps track of all known
peers. If None, a PeerManager will be created
@param dht_node_port: The port on which the dht node should
listen for incoming connections
@param known_dht_nodes: A list of nodes which the dht node
should use to bootstrap into the dht
@param peer_finder: An object which is used to look up peers
that are associated with some hash. If None, a
DHTPeerFinder will be used, which looks for peers in the
distributed hash table.
@param hash_announcer: An object which announces to other
peers that this peer is associated with some hash. If
None, and peer_port is not None, a DHTHashAnnouncer will
be used. If None and peer_port is None, a
DummyHashAnnouncer will be used, which will not actually
announce anything.
@param blob_dir: The directory in which blobs will be
stored. If None and blob_manager is None, blobs will be
stored in memory only.
@param blob_manager: An object which keeps track of downloaded
blobs and provides access to them. If None, and blob_dir
is not None, a DiskBlobManager will be used, with the
given blob_dir. If None and blob_dir is None, a
TempBlobManager will be used, which stores blobs in memory
only.
@param peer_port: The port on which other peers should connect
to this peer
@param rate_limiter: An object which keeps track of the amount
of data transferred to and from this peer, and can limit
that rate if desired
@param wallet: An object which will be used to keep track of
expected payments and which will pay peers. If None, a
wallet which uses the Point Trader system will be used,
which is meant for testing only
"""
self.db_dir = db_dir
self.node_id = node_id
self.peer_manager = peer_manager
self.peer_finder = peer_finder
self.hash_announcer = hash_announcer
self.dht_node_port = dht_node_port
self.known_dht_nodes = known_dht_nodes
if self.known_dht_nodes is None:
self.known_dht_nodes = []
self.blob_dir = blob_dir
self.blob_manager = blob_manager
self.peer_port = peer_port
self.rate_limiter = rate_limiter
self.external_ip = external_ip
self.upnp_redirects = []
self.wallet = wallet
self.dht_node = dht_node
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = OnlyFreePaymentsManager()
self.storage = storage or SQLiteStorage(self.db_dir)
self.download_mirrors = download_mirrors
def setup(self):
"""Create the blob directory and database if necessary, start all desired services"""
log.debug("Starting session.")
if self.dht_node is not None:
if self.peer_manager is None:
self.peer_manager = self.dht_node.peer_manager
if self.peer_finder is None:
self.peer_finder = self.dht_node.peer_finder
d = self.storage.setup()
d.addCallback(lambda _: self._setup_other_components())
return d
def shut_down(self):
"""Stop all services"""
log.info('Stopping session.')
ds = []
if self.rate_limiter is not None:
ds.append(defer.maybeDeferred(self.rate_limiter.stop))
if self.blob_manager is not None:
ds.append(defer.maybeDeferred(self.blob_manager.stop))
return defer.DeferredList(ds)
def _setup_other_components(self):
log.debug("Setting up the rest of the components")
if self.rate_limiter is None:
self.rate_limiter = RateLimiter()
if self.blob_manager is None:
if self.blob_dir is None:
raise Exception(
"TempBlobManager is no longer supported, specify BlobManager or db_dir")
else:
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore)
self.rate_limiter.start()
d = self.blob_manager.setup()
return d

View file

@ -425,7 +425,8 @@ class EncryptedFileStreamDescriptorValidator(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): def download_sd_blob(blob_hash, blob_manager, peer_finder, rate_limiter, payment_rate_manager, wallet, timeout=None,
download_mirrors=None):
""" """
Downloads a single blob from the network Downloads a single blob from the network
@ -439,13 +440,13 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None):
""" """
downloader = StandaloneBlobDownloader(blob_hash, downloader = StandaloneBlobDownloader(blob_hash,
session.blob_manager, blob_manager,
session.peer_finder, peer_finder,
session.rate_limiter, rate_limiter,
payment_rate_manager, payment_rate_manager,
session.wallet, wallet,
timeout) timeout)
mirror = HTTPBlobDownloader(session.blob_manager, [blob_hash], session.download_mirrors) mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [])
mirror.start() mirror.start()
sd_blob = yield downloader.download() sd_blob = yield downloader.download()
mirror.stop() mirror.stop()
@ -454,9 +455,9 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None):
try: try:
validate_descriptor(sd_info) validate_descriptor(sd_info)
except InvalidStreamDescriptorError as err: except InvalidStreamDescriptorError as err:
yield session.blob_manager.delete_blobs([blob_hash]) yield blob_manager.delete_blobs([blob_hash])
raise err raise err
raw_sd = yield sd_reader._get_raw_data() raw_sd = yield sd_reader._get_raw_data()
yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)) yield blob_manager.storage.add_known_blob(blob_hash, len(raw_sd))
yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info) yield save_sd_info(blob_manager, sd_blob.blob_hash, sd_info)
defer.returnValue(sd_blob) defer.returnValue(sd_blob)

View file

@ -37,6 +37,9 @@ class Component(object):
def running(self): def running(self):
return self._running return self._running
def get_status(self):
return
def start(self): def start(self):
raise NotImplementedError() raise NotImplementedError()

View file

@ -8,7 +8,9 @@ from twisted.internet import defer, threads, reactor, error
from lbryum.simple_config import SimpleConfig from lbryum.simple_config import SimpleConfig
from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbrynet import conf from lbrynet import conf
from lbrynet.core.Session import Session from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType
from lbrynet.core.Wallet import LBRYumWallet from lbrynet.core.Wallet import LBRYumWallet
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
@ -30,9 +32,9 @@ log = logging.getLogger(__name__)
# settings must be initialized before this file is imported # settings must be initialized before this file is imported
DATABASE_COMPONENT = "database" DATABASE_COMPONENT = "database"
BLOB_COMPONENT = "blob_manager"
HEADERS_COMPONENT = "blockchain_headers" HEADERS_COMPONENT = "blockchain_headers"
WALLET_COMPONENT = "wallet" WALLET_COMPONENT = "wallet"
SESSION_COMPONENT = "session"
DHT_COMPONENT = "dht" DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer" HASH_ANNOUNCER_COMPONENT = "hash_announcer"
STREAM_IDENTIFIER_COMPONENT = "stream_identifier" STREAM_IDENTIFIER_COMPONENT = "stream_identifier"
@ -41,6 +43,10 @@ PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
REFLECTOR_COMPONENT = "reflector" REFLECTOR_COMPONENT = "reflector"
UPNP_COMPONENT = "upnp" UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
RATE_LIMITER_COMPONENT = "rate_limiter"
PAYMENT_RATE_COMPONENT = "payment_rate_manager"
def get_wallet_config(): def get_wallet_config():
wallet_type = GCS('wallet') wallet_type = GCS('wallet')
if wallet_type == conf.LBRYCRD_WALLET: if wallet_type == conf.LBRYCRD_WALLET:
@ -334,40 +340,26 @@ class WalletComponent(Component):
self.wallet = None self.wallet = None
class SessionComponent(Component): class BlobComponent(Component):
component_name = SESSION_COMPONENT component_name = BLOB_COMPONENT
depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT] depends_on = [DATABASE_COMPONENT, DHT_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
self.session = None self.blob_manager = None
@property @property
def component(self): def component(self):
return self.session return self.blob_manager
@defer.inlineCallbacks
def start(self): def start(self):
self.session = Session( storage = self.component_manager.get_component(DATABASE_COMPONENT)
GCS('data_rate'), dht_node = self.component_manager.get_component(DHT_COMPONENT)
db_dir=GCS('data_dir'), self.blob_manager = DiskBlobManager(CS.get_blobfiles_dir(), storage, dht_node._dataStore)
node_id=CS.get_node_id(), return self.blob_manager.setup()
blob_dir=CS.get_blobfiles_dir(),
dht_node=self.component_manager.get_component(DHT_COMPONENT),
hash_announcer=self.component_manager.get_component(HASH_ANNOUNCER_COMPONENT),
dht_node_port=GCS('dht_node_port'),
known_dht_nodes=GCS('known_dht_nodes'),
peer_port=GCS('peer_port'),
wallet=self.component_manager.get_component(WALLET_COMPONENT),
external_ip=CS.get_external_ip(),
storage=self.component_manager.get_component(DATABASE_COMPONENT),
download_mirrors=GCS('download_mirrors')
)
yield self.session.setup()
@defer.inlineCallbacks
def stop(self): def stop(self):
yield self.session.shut_down() return self.blob_manager.stop()
class DHTComponent(Component): class DHTComponent(Component):
@ -384,6 +376,12 @@ class DHTComponent(Component):
def component(self): def component(self):
return self.dht_node return self.dht_node
def get_status(self):
return {
'node_id': CS.get_node_id().encode('hex'),
'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.contacts)
}
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
@ -435,9 +433,29 @@ class HashAnnouncerComponent(Component):
yield self.hash_announcer.stop() yield self.hash_announcer.stop()
class RateLimiterComponent(Component):
component_name = RATE_LIMITER_COMPONENT
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.rate_limiter = RateLimiter()
@property
def component(self):
return self.rate_limiter
def start(self):
self.rate_limiter.start()
return defer.succeed(None)
def stop(self):
self.rate_limiter.stop()
return defer.succeed(None)
class StreamIdentifierComponent(Component): class StreamIdentifierComponent(Component):
component_name = STREAM_IDENTIFIER_COMPONENT component_name = STREAM_IDENTIFIER_COMPONENT
depends_on = [SESSION_COMPONENT] depends_on = [DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
@ -449,14 +467,19 @@ class StreamIdentifierComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
session = self.component_manager.get_component(SESSION_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT)
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT)
add_lbry_file_to_sd_identifier(self.sd_identifier) add_lbry_file_to_sd_identifier(self.sd_identifier)
file_saver_factory = EncryptedFileSaverFactory( file_saver_factory = EncryptedFileSaverFactory(
session.peer_finder, dht_node.peer_finder,
session.rate_limiter, rate_limiter,
session.blob_manager, blob_manager,
session.storage, storage,
session.wallet, wallet,
GCS('download_directory') GCS('download_directory')
) )
yield self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory) yield self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
@ -465,9 +488,28 @@ class StreamIdentifierComponent(Component):
pass pass
class PaymentRateComponent(Component):
component_name = PAYMENT_RATE_COMPONENT
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.payment_rate_manager = OnlyFreePaymentsManager()
@property
def component(self):
return self.payment_rate_manager
def start(self):
return defer.succeed(None)
def stop(self):
return defer.succeed(None)
class FileManagerComponent(Component): class FileManagerComponent(Component):
component_name = FILE_MANAGER_COMPONENT component_name = FILE_MANAGER_COMPONENT
depends_on = [SESSION_COMPONENT, STREAM_IDENTIFIER_COMPONENT] depends_on = [DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT,
STREAM_IDENTIFIER_COMPONENT, PAYMENT_RATE_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
@ -477,12 +519,25 @@ class FileManagerComponent(Component):
def component(self): def component(self):
return self.file_manager return self.file_manager
def get_status(self):
if not self.file_manager:
return
return {
'managed_streams': len(self.file_manager.lbry_files)
}
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
session = self.component_manager.get_component(SESSION_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT)
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT)
sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT) sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT)
payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT)
log.info('Starting the file manager') log.info('Starting the file manager')
self.file_manager = EncryptedFileManager(session, sd_identifier) self.file_manager = EncryptedFileManager(dht_node.peer_finder, rate_limiter, blob_manager, wallet,
payment_rate_manager, storage, sd_identifier)
yield self.file_manager.setup() yield self.file_manager.setup()
log.info('Done setting up file manager') log.info('Done setting up file manager')
@ -493,7 +548,8 @@ class FileManagerComponent(Component):
class PeerProtocolServerComponent(Component): class PeerProtocolServerComponent(Component):
component_name = PEER_PROTOCOL_SERVER_COMPONENT component_name = PEER_PROTOCOL_SERVER_COMPONENT
depends_on = [SESSION_COMPONENT, UPNP_COMPONENT] depends_on = [UPNP_COMPONENT, DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT,
PAYMENT_RATE_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
@ -507,17 +563,22 @@ class PeerProtocolServerComponent(Component):
def start(self): def start(self):
query_handlers = {} query_handlers = {}
upnp_component = self.component_manager.get_component(UPNP_COMPONENT) upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
dht_node = self.component_manager.get_component(DHT_COMPONENT)
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT)
payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT)
peer_port, udp_port = upnp_component.get_redirects() peer_port, udp_port = upnp_component.get_redirects()
session = self.component_manager.get_component(SESSION_COMPONENT)
handlers = [ handlers = [
BlobRequestHandlerFactory( BlobRequestHandlerFactory(
session.blob_manager, blob_manager,
session.wallet, wallet,
session.payment_rate_manager, payment_rate_manager,
self.component_manager.analytics_manager self.component_manager.analytics_manager
), ),
session.wallet.get_wallet_info_query_handler_factory(), wallet.get_wallet_info_query_handler_factory(),
] ]
for handler in handlers: for handler in handlers:
@ -525,7 +586,7 @@ class PeerProtocolServerComponent(Component):
query_handlers[query_id] = handler query_handlers[query_id] = handler
if peer_port is not None: if peer_port is not None:
server_factory = ServerProtocolFactory(session.rate_limiter, query_handlers, session.peer_manager) server_factory = ServerProtocolFactory(rate_limiter, query_handlers, dht_node.peer_manager)
try: try:
log.info("Peer protocol listening on TCP %d", peer_port) log.info("Peer protocol listening on TCP %d", peer_port)
@ -547,7 +608,7 @@ class PeerProtocolServerComponent(Component):
class ReflectorComponent(Component): class ReflectorComponent(Component):
component_name = REFLECTOR_COMPONENT component_name = REFLECTOR_COMPONENT
depends_on = [SESSION_COMPONENT, FILE_MANAGER_COMPONENT] depends_on = [DHT_COMPONENT, BLOB_COMPONENT, FILE_MANAGER_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
@ -561,11 +622,10 @@ class ReflectorComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
log.info("Starting reflector server") log.info("Starting reflector server")
dht_node = self.component_manager.get_component(DHT_COMPONENT)
session = self.component_manager.get_component(SESSION_COMPONENT) blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
reflector_factory = reflector_server_factory(session.peer_manager, session.blob_manager, file_manager) reflector_factory = reflector_server_factory(dht_node.peer_manager, blob_manager, file_manager)
try: try:
self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory) self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory)
log.info('Started reflector on port %s', self.reflector_server_port) log.info('Started reflector on port %s', self.reflector_server_port)

View file

@ -25,14 +25,13 @@ from lbryschema.decode import smart_decode
from lbrynet.core.system_info import get_lbrynet_version from lbrynet.core.system_info import get_lbrynet_version
from lbrynet import conf from lbrynet import conf
from lbrynet.reflector import reupload from lbrynet.reflector import reupload
from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT, RATE_LIMITER_COMPONENT
from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT, PAYMENT_RATE_COMPONENT, UPNP_COMPONENT
from lbrynet.daemon.ComponentManager import RequiredCondition from lbrynet.daemon.ComponentManager import RequiredCondition
from lbrynet.daemon.Downloader import GetStream from lbrynet.daemon.Downloader import GetStream
from lbrynet.daemon.Publisher import Publisher from lbrynet.daemon.Publisher import Publisher
from lbrynet.daemon.auth.server import AuthJSONRPCServer from lbrynet.daemon.auth.server import AuthJSONRPCServer
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core import utils, system_info from lbrynet.core import utils, system_info
from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
@ -186,13 +185,16 @@ class Daemon(AuthJSONRPCServer):
""" """
component_attributes = { component_attributes = {
EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager",
DATABASE_COMPONENT: "storage", DATABASE_COMPONENT: "storage",
SESSION_COMPONENT: "session",
WALLET_COMPONENT: "wallet",
DHT_COMPONENT: "dht_node", DHT_COMPONENT: "dht_node",
WALLET_COMPONENT: "wallet",
STREAM_IDENTIFIER_COMPONENT: "sd_identifier", STREAM_IDENTIFIER_COMPONENT: "sd_identifier",
FILE_MANAGER_COMPONENT: "file_manager", FILE_MANAGER_COMPONENT: "file_manager",
EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager",
PAYMENT_RATE_COMPONENT: "payment_rate_manager",
RATE_LIMITER_COMPONENT: "rate_limiter",
BLOB_COMPONENT: "blob_manager",
UPNP_COMPONENT: "upnp"
} }
def __init__(self, analytics_manager=None, component_manager=None): def __init__(self, analytics_manager=None, component_manager=None):
@ -218,9 +220,12 @@ class Daemon(AuthJSONRPCServer):
self.dht_node = None self.dht_node = None
self.wallet = None self.wallet = None
self.sd_identifier = None self.sd_identifier = None
self.session = None
self.file_manager = None self.file_manager = None
self.exchange_rate_manager = None self.exchange_rate_manager = None
self.payment_rate_manager = None
self.rate_limiter = None
self.blob_manager = None
self.upnp = None
# TODO: delete this # TODO: delete this
self.streams = {} self.streams = {}
@ -254,10 +259,10 @@ class Daemon(AuthJSONRPCServer):
if not blob_hash: if not blob_hash:
raise Exception("Nothing to download") raise Exception("Nothing to download")
rate_manager = rate_manager or self.session.payment_rate_manager rate_manager = rate_manager or self.payment_rate_manager
timeout = timeout or 30 timeout = timeout or 30
downloader = StandaloneBlobDownloader( downloader = StandaloneBlobDownloader(
blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter, blob_hash, self.blob_manager, self.dht_node.peer_finder, self.rate_limiter,
rate_manager, self.wallet, timeout rate_manager, self.wallet, timeout
) )
return downloader.download() return downloader.download()
@ -275,7 +280,7 @@ class Daemon(AuthJSONRPCServer):
} }
blobs = {} blobs = {}
try: try:
sd_host = yield self.session.blob_manager.get_host_downloaded_from(sd_hash) sd_host = yield self.blob_manager.get_host_downloaded_from(sd_hash)
except Exception: except Exception:
sd_host = None sd_host = None
report["sd_blob"] = sd_host report["sd_blob"] = sd_host
@ -320,11 +325,11 @@ class Daemon(AuthJSONRPCServer):
else: else:
download_id = utils.random_string() download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, claim_dict) self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.streams[sd_hash] = GetStream(self.sd_identifier, self.wallet, self.exchange_rate_manager,
self.streams[sd_hash] = GetStream(self.sd_identifier, self.session, self.blob_manager, self.dht_node.peer_finder, self.rate_limiter,
self.exchange_rate_manager, conf.settings['max_key_fee'], self.payment_rate_manager, self.storage, conf.settings['max_key_fee'],
conf.settings['disable_max_key_fee'], conf.settings['disable_max_key_fee'], conf.settings['data_rate'],
conf.settings['data_rate'], timeout) timeout)
try: try:
lbry_file, finished_deferred = yield self.streams[sd_hash].start( lbry_file, finished_deferred = yield self.streams[sd_hash].start(
claim_dict, name, txid, nout, file_name claim_dict, name, txid, nout, file_name
@ -350,9 +355,8 @@ class Daemon(AuthJSONRPCServer):
@defer.inlineCallbacks @defer.inlineCallbacks
def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None, def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None,
claim_address=None, change_address=None): claim_address=None, change_address=None):
publisher = Publisher(self.blob_manager, self.payment_rate_manager, self.storage, self.file_manager,
publisher = Publisher(self.session, self.file_manager, self.wallet, self.wallet, certificate_id)
certificate_id)
parse_lbry_uri(name) parse_lbry_uri(name)
if not file_path: if not file_path:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash( stream_hash = yield self.storage.get_stream_hash_for_sd_hash(
@ -388,16 +392,17 @@ class Daemon(AuthJSONRPCServer):
def _get_or_download_sd_blob(self, blob, sd_hash): def _get_or_download_sd_blob(self, blob, sd_hash):
if blob: if blob:
return self.session.blob_manager.get_blob(blob[0]) return self.blob_manager.get_blob(blob[0])
return download_sd_blob( return download_sd_blob(
self.session, sd_hash, self.session.payment_rate_manager, conf.settings['search_timeout'] sd_hash, self.blob_manager, self.dht_node.peer_finder, self.rate_limiter, self.payment_rate_manager,
self.wallet, timeout=conf.settings['search_timeout'], download_mirrors=conf.settings['download_mirrors']
) )
def get_or_download_sd_blob(self, sd_hash): def get_or_download_sd_blob(self, sd_hash):
"""Return previously downloaded sd blob if already in the blob """Return previously downloaded sd blob if already in the blob
manager, otherwise download and return it manager, otherwise download and return it
""" """
d = self.session.blob_manager.completed_blobs([sd_hash]) d = self.blob_manager.completed_blobs([sd_hash])
d.addCallback(self._get_or_download_sd_blob, sd_hash) d.addCallback(self._get_or_download_sd_blob, sd_hash)
return d return d
@ -416,7 +421,7 @@ class Daemon(AuthJSONRPCServer):
Calculate estimated LBC cost for a stream given its size in bytes Calculate estimated LBC cost for a stream given its size in bytes
""" """
if self.session.payment_rate_manager.generous: if self.payment_rate_manager.generous:
return 0.0 return 0.0
return size / (10 ** 6) * conf.settings['data_rate'] return size / (10 ** 6) * conf.settings['data_rate']
@ -693,7 +698,7 @@ class Daemon(AuthJSONRPCServer):
'blocks_behind': (int) remote_height - local_height, 'blocks_behind': (int) remote_height - local_height,
'best_blockhash': (str) block hash of most recent block, 'best_blockhash': (str) block hash of most recent block,
}, },
'dht_node_status': { 'dht': {
'node_id': (str) lbry dht node id - hex encoded, 'node_id': (str) lbry dht node id - hex encoded,
'peers_in_routing_table': (int) the number of peers in the routing table, 'peers_in_routing_table': (int) the number of peers in the routing table,
}, },
@ -708,14 +713,6 @@ class Daemon(AuthJSONRPCServer):
} }
""" """
# on startup, the wallet or network won't be available but we still need this call to work
has_wallet = self.session and self.wallet and self.wallet.network
local_height = self.wallet.network.get_local_height() if has_wallet else 0
remote_height = self.wallet.network.get_server_height() if has_wallet else 0
best_hash = (yield self.wallet.get_best_blockhash()) if has_wallet else None
wallet_is_encrypted = has_wallet and self.wallet.wallet and \
self.wallet.wallet.use_encryption
connection_code = CONNECTION_STATUS_CONNECTED if utils.check_connection() else CONNECTION_STATUS_NETWORK connection_code = CONNECTION_STATUS_CONNECTED if utils.check_connection() else CONNECTION_STATUS_NETWORK
response = { response = {
'installation_id': conf.settings.installation_id, 'installation_id': conf.settings.installation_id,
@ -1285,7 +1282,9 @@ class Daemon(AuthJSONRPCServer):
response = yield self._render_response(results) response = yield self._render_response(results)
defer.returnValue(response) defer.returnValue(response)
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @requires(STREAM_IDENTIFIER_COMPONENT, WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_get(self, uri, file_name=None, timeout=None): def jsonrpc_get(self, uri, file_name=None, timeout=None):
""" """
@ -1476,7 +1475,9 @@ class Daemon(AuthJSONRPCServer):
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@requires(WALLET_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @requires(STREAM_IDENTIFIER_COMPONENT, WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_stream_cost_estimate(self, uri, size=None): def jsonrpc_stream_cost_estimate(self, uri, size=None):
""" """
@ -1631,7 +1632,8 @@ class Daemon(AuthJSONRPCServer):
result = yield self.wallet.import_certificate_info(serialized_certificate_info) result = yield self.wallet.import_certificate_info(serialized_certificate_info)
defer.returnValue(result) defer.returnValue(result)
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None, def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None,
description=None, author=None, language=None, license=None, description=None, author=None, language=None, license=None,
@ -2514,7 +2516,8 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(lambda r: self._render_response(r)) d.addCallback(lambda r: self._render_response(r))
return d return d
@requires(WALLET_COMPONENT, SESSION_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @requires(WALLET_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None):
""" """
@ -2545,8 +2548,7 @@ class Daemon(AuthJSONRPCServer):
} }
timeout = timeout or 30 timeout = timeout or 30
payment_rate_manager = get_blob_payment_rate_manager(self.session, payment_rate_manager) blob = yield self._download_blob(blob_hash, rate_manager=self.payment_rate_manager,
blob = yield self._download_blob(blob_hash, rate_manager=payment_rate_manager,
timeout=timeout) timeout=timeout)
if encoding and encoding in decoders: if encoding and encoding in decoders:
blob_file = blob.open_for_reading() blob_file = blob.open_for_reading()
@ -2558,7 +2560,7 @@ class Daemon(AuthJSONRPCServer):
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@requires(SESSION_COMPONENT) @requires(BLOB_COMPONENT, DATABASE_COMPONENT)
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_blob_delete(self, blob_hash): def jsonrpc_blob_delete(self, blob_hash):
""" """
@ -2574,7 +2576,7 @@ class Daemon(AuthJSONRPCServer):
(str) Success/fail message (str) Success/fail message
""" """
if blob_hash not in self.session.blob_manager.blobs: if blob_hash not in self.blob_manager.blobs:
response = yield self._render_response("Don't have that blob") response = yield self._render_response("Don't have that blob")
defer.returnValue(response) defer.returnValue(response)
try: try:
@ -2582,7 +2584,7 @@ class Daemon(AuthJSONRPCServer):
yield self.storage.delete_stream(stream_hash) yield self.storage.delete_stream(stream_hash)
except Exception as err: except Exception as err:
pass pass
yield self.session.blob_manager.delete_blobs([blob_hash]) yield self.blob_manager.delete_blobs([blob_hash])
response = yield self._render_response("Deleted %s" % blob_hash) response = yield self._render_response("Deleted %s" % blob_hash)
defer.returnValue(response) defer.returnValue(response)
@ -2612,7 +2614,7 @@ class Daemon(AuthJSONRPCServer):
err.trap(defer.TimeoutError) err.trap(defer.TimeoutError)
return [] return []
finished_deferred.addTimeout(timeout or conf.settings['peer_search_timeout'], self.session.dht_node.clock) finished_deferred.addTimeout(timeout or conf.settings['peer_search_timeout'], self.dht_node.clock)
finished_deferred.addErrback(trap_timeout) finished_deferred.addErrback(trap_timeout)
peers = yield finished_deferred peers = yield finished_deferred
results = [ results = [
@ -2625,7 +2627,7 @@ class Daemon(AuthJSONRPCServer):
] ]
defer.returnValue(results) defer.returnValue(results)
@requires(SESSION_COMPONENT, DHT_COMPONENT, conditions=[DHT_HAS_CONTACTS]) @requires(DATABASE_COMPONENT)
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None):
""" """
@ -2698,7 +2700,7 @@ class Daemon(AuthJSONRPCServer):
results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server)
defer.returnValue(results) defer.returnValue(results)
@requires(SESSION_COMPONENT, WALLET_COMPONENT) @requires(BLOB_COMPONENT, WALLET_COMPONENT)
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None,
finished=None, page_size=None, page=None): finished=None, page_size=None, page=None):
@ -2737,16 +2739,16 @@ class Daemon(AuthJSONRPCServer):
if stream_hash: if stream_hash:
crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash) crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash)
blobs = yield defer.gatherResults([ blobs = yield defer.gatherResults([
self.session.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length)
for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None
]) ])
else: else:
blobs = [] blobs = []
# get_blobs_for_stream does not include the sd blob, so we'll add it manually # get_blobs_for_stream does not include the sd blob, so we'll add it manually
if sd_hash in self.session.blob_manager.blobs: if sd_hash in self.blob_manager.blobs:
blobs = [self.session.blob_manager.blobs[sd_hash]] + blobs blobs = [self.blob_manager.blobs[sd_hash]] + blobs
else: else:
blobs = self.session.blob_manager.blobs.itervalues() blobs = self.blob_manager.blobs.itervalues()
if needed: if needed:
blobs = [blob for blob in blobs if not blob.get_is_verified()] blobs = [blob for blob in blobs if not blob.get_is_verified()]
@ -2762,7 +2764,7 @@ class Daemon(AuthJSONRPCServer):
response = yield self._render_response(blob_hashes_for_return) response = yield self._render_response(blob_hashes_for_return)
defer.returnValue(response) defer.returnValue(response)
@requires(SESSION_COMPONENT) @requires(BLOB_COMPONENT)
def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None): def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None):
""" """
Reflects specified blobs Reflects specified blobs
@ -2777,11 +2779,11 @@ class Daemon(AuthJSONRPCServer):
(list) reflected blob hashes (list) reflected blob hashes
""" """
d = reupload.reflect_blob_hashes(blob_hashes, self.session.blob_manager, reflector_server) d = reupload.reflect_blob_hashes(blob_hashes, self.blob_manager, reflector_server)
d.addCallback(lambda r: self._render_response(r)) d.addCallback(lambda r: self._render_response(r))
return d return d
@requires(SESSION_COMPONENT) @requires(BLOB_COMPONENT)
def jsonrpc_blob_reflect_all(self): def jsonrpc_blob_reflect_all(self):
""" """
Reflects all saved blobs Reflects all saved blobs
@ -2796,8 +2798,8 @@ class Daemon(AuthJSONRPCServer):
(bool) true if successful (bool) true if successful
""" """
d = self.session.blob_manager.get_all_verified_blobs() d = self.blob_manager.get_all_verified_blobs()
d.addCallback(reupload.reflect_blob_hashes, self.session.blob_manager) d.addCallback(reupload.reflect_blob_hashes, self.blob_manager)
d.addCallback(lambda r: self._render_response(r)) d.addCallback(lambda r: self._render_response(r))
return d return d
@ -2943,7 +2945,7 @@ class Daemon(AuthJSONRPCServer):
return self._blob_availability(blob_hash, search_timeout, blob_timeout) return self._blob_availability(blob_hash, search_timeout, blob_timeout)
@requires(SESSION_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@AuthJSONRPCServer.deprecated("stream_availability") @AuthJSONRPCServer.deprecated("stream_availability")
def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None):
""" """
@ -2964,7 +2966,7 @@ class Daemon(AuthJSONRPCServer):
return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout) return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout)
@requires(SESSION_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None):
""" """
@ -3012,7 +3014,7 @@ class Daemon(AuthJSONRPCServer):
'head_blob_hash': None, 'head_blob_hash': None,
'head_blob_availability': {}, 'head_blob_availability': {},
'use_upnp': conf.settings['use_upnp'], 'use_upnp': conf.settings['use_upnp'],
'upnp_redirect_is_set': len(self.session.upnp_redirects) > 0, 'upnp_redirect_is_set': len(self.upnp.get_redirects()) > 0,
'error': None 'error': None
} }
@ -3042,7 +3044,7 @@ class Daemon(AuthJSONRPCServer):
response['sd_hash'] = sd_hash response['sd_hash'] = sd_hash
head_blob_hash = None head_blob_hash = None
downloader = self._get_single_peer_downloader() downloader = self._get_single_peer_downloader()
have_sd_blob = sd_hash in self.session.blob_manager.blobs have_sd_blob = sd_hash in self.blob_manager.blobs
try: try:
sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout, sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout,
encoding="json") encoding="json")
@ -3141,17 +3143,6 @@ def iter_lbry_file_search_values(search_fields):
yield searchtype, value yield searchtype, value
def get_blob_payment_rate_manager(session, payment_rate_manager=None):
if payment_rate_manager:
rate_managers = {
'only-free': OnlyFreePaymentsManager()
}
if payment_rate_manager in rate_managers:
payment_rate_manager = rate_managers[payment_rate_manager]
log.info("Downloading blob with rate manager: %s", payment_rate_manager)
return payment_rate_manager or session.payment_rate_manager
def create_key_getter(field): def create_key_getter(field):
search_path = field.split('.') search_path = field.split('.')
def key_getter(value): def key_getter(value):

View file

@ -30,8 +30,8 @@ log = logging.getLogger(__name__)
class GetStream(object): class GetStream(object):
def __init__(self, sd_identifier, session, exchange_rate_manager, def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter,
max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None):
self.timeout = timeout or conf.settings['download_timeout'] self.timeout = timeout or conf.settings['download_timeout']
self.data_rate = data_rate or conf.settings['data_rate'] self.data_rate = data_rate or conf.settings['data_rate']
@ -41,11 +41,14 @@ class GetStream(object):
self.timeout_counter = 0 self.timeout_counter = 0
self.code = None self.code = None
self.sd_hash = None self.sd_hash = None
self.session = session self.blob_manager = blob_manager
self.wallet = self.session.wallet self.peer_finder = peer_finder
self.rate_limiter = rate_limiter
self.wallet = wallet
self.exchange_rate_manager = exchange_rate_manager self.exchange_rate_manager = exchange_rate_manager
self.payment_rate_manager = self.session.payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.storage = storage
self.downloader = None self.downloader = None
self.checker = LoopingCall(self.check_status) self.checker = LoopingCall(self.check_status)
@ -174,15 +177,16 @@ class GetStream(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_sd_blob(self): def _download_sd_blob(self):
sd_blob = yield download_sd_blob(self.session, self.sd_hash, sd_blob = yield download_sd_blob(self.sd_hash, self.blob_manager, self.peer_finder, self.rate_limiter,
self.payment_rate_manager, self.timeout) self.payment_rate_manager, self.wallet, self.timeout,
conf.settings['download_mirrors'])
defer.returnValue(sd_blob) defer.returnValue(sd_blob)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download(self, sd_blob, name, key_fee, txid, nout, file_name=None): def _download(self, sd_blob, name, key_fee, txid, nout, file_name=None):
self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) self.downloader = yield self._create_downloader(sd_blob, file_name=file_name)
yield self.pay_key_fee(key_fee, name) yield self.pay_key_fee(key_fee, name)
yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout))
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)

View file

@ -11,8 +11,10 @@ log = logging.getLogger(__name__)
class Publisher(object): class Publisher(object):
def __init__(self, session, lbry_file_manager, wallet, certificate_id): def __init__(self, blob_manager, payment_rate_manager, storage, lbry_file_manager, wallet, certificate_id):
self.session = session self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager
self.storage = storage
self.lbry_file_manager = lbry_file_manager self.lbry_file_manager = lbry_file_manager
self.wallet = wallet self.wallet = wallet
self.certificate_id = certificate_id self.certificate_id = certificate_id
@ -30,8 +32,8 @@ class Publisher(object):
file_name = os.path.basename(file_path) file_name = os.path.basename(file_path)
with file_utils.get_read_handle(file_path) as read_handle: with file_utils.get_read_handle(file_path) as read_handle:
self.lbry_file = yield create_lbry_file(self.session, self.lbry_file_manager, file_name, self.lbry_file = yield create_lbry_file(self.blob_manager, self.storage, self.payment_rate_manager,
read_handle) self.lbry_file_manager, file_name, read_handle)
if 'source' not in claim_dict['stream']: if 'source' not in claim_dict['stream']:
claim_dict['stream']['source'] = {} claim_dict['stream']['source'] = {}
@ -42,7 +44,7 @@ class Publisher(object):
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
# check if we have a file already for this claim (if this is a publish update with a new stream) # check if we have a file already for this claim (if this is a publish update with a new stream)
old_stream_hashes = yield self.session.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'], old_stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'],
self.lbry_file.stream_hash) self.lbry_file.stream_hash)
if old_stream_hashes: if old_stream_hashes:
for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes, for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes,
@ -50,7 +52,7 @@ class Publisher(object):
yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False)
log.info("Removed old stream for claim update: %s", lbry_file.stream_hash) log.info("Removed old stream for claim update: %s", lbry_file.stream_hash)
yield self.session.storage.save_content_claim( yield self.storage.save_content_claim(
self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])
) )
defer.returnValue(claim_out) defer.returnValue(claim_out)
@ -60,7 +62,7 @@ class Publisher(object):
"""Make a claim without creating a lbry file""" """Make a claim without creating a lbry file"""
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
if stream_hash: # the stream_hash returned from the db will be None if this isn't a stream we have if stream_hash: # the stream_hash returned from the db will be None if this isn't a stream we have
yield self.session.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], yield self.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'],
claim_out['nout'])) claim_out['nout']))
self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0] self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0]
defer.returnValue(claim_out) defer.returnValue(claim_out)

View file

@ -59,7 +59,8 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
# we can simply read the file from the disk without needing to # we can simply read the file from the disk without needing to
# involve reactor. # involve reactor.
@defer.inlineCallbacks @defer.inlineCallbacks
def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=None, iv_generator=None): def create_lbry_file(blob_manager, storage, payment_rate_manager, lbry_file_manager, file_name, file_handle,
key=None, iv_generator=None):
"""Turn a plain file into an LBRY File. """Turn a plain file into an LBRY File.
An LBRY File is a collection of encrypted blobs of data and the metadata that binds them An LBRY File is a collection of encrypted blobs of data and the metadata that binds them
@ -98,7 +99,7 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non
file_directory = os.path.dirname(file_handle.name) file_directory = os.path.dirname(file_handle.name)
lbry_file_creator = EncryptedFileStreamCreator( lbry_file_creator = EncryptedFileStreamCreator(
session.blob_manager, lbry_file_manager, base_file_name, key, iv_generator blob_manager, lbry_file_manager, base_file_name, key, iv_generator
) )
yield lbry_file_creator.setup() yield lbry_file_creator.setup()
@ -114,18 +115,18 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non
log.debug("making the sd blob") log.debug("making the sd blob")
sd_info = lbry_file_creator.sd_info sd_info = lbry_file_creator.sd_info
descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager) descriptor_writer = BlobStreamDescriptorWriter(blob_manager)
sd_hash = yield descriptor_writer.create_descriptor(sd_info) sd_hash = yield descriptor_writer.create_descriptor(sd_info)
log.debug("saving the stream") log.debug("saving the stream")
yield session.storage.store_stream( yield storage.store_stream(
sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'], sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'],
sd_info['suggested_file_name'], sd_info['blobs'] sd_info['suggested_file_name'], sd_info['blobs']
) )
log.debug("adding to the file manager") log.debug("adding to the file manager")
lbry_file = yield lbry_file_manager.add_published_file( lbry_file = yield lbry_file_manager.add_published_file(
sd_info['stream_hash'], sd_hash, binascii.hexlify(file_directory), session.payment_rate_manager, sd_info['stream_hash'], sd_hash, binascii.hexlify(file_directory), payment_rate_manager,
session.payment_rate_manager.min_blob_data_payment_rate payment_rate_manager.min_blob_data_payment_rate
) )
defer.returnValue(lbry_file) defer.returnValue(lbry_file)

View file

@ -28,15 +28,18 @@ class EncryptedFileManager(object):
# when reflecting files, reflect up to this many files at a time # when reflecting files, reflect up to this many files at a time
CONCURRENT_REFLECTS = 5 CONCURRENT_REFLECTS = 5
def __init__(self, session, sd_identifier): def __init__(self, peer_finder, rate_limiter, blob_manager, wallet, payment_rate_manager, storage, sd_identifier):
self.auto_re_reflect = conf.settings['reflect_uploads'] and conf.settings['auto_re_reflect_interval'] > 0 self.auto_re_reflect = conf.settings['reflect_uploads'] and conf.settings['auto_re_reflect_interval'] > 0
self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval'] self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval']
self.session = session self.download_mirrors = conf.settings['download_mirrors']
self.storage = session.storage self.peer_finder = peer_finder
self.rate_limiter = rate_limiter
self.blob_manager = blob_manager
self.wallet = wallet
self.payment_rate_manager = payment_rate_manager
self.storage = storage
# TODO: why is sd_identifier part of the file manager? # TODO: why is sd_identifier part of the file manager?
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
assert sd_identifier
self.lbry_files = [] self.lbry_files = []
self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
@ -47,14 +50,14 @@ class EncryptedFileManager(object):
log.info("Started file manager") log.info("Started file manager")
def get_lbry_file_status(self, lbry_file): def get_lbry_file_status(self, lbry_file):
return self.session.storage.get_lbry_file_status(lbry_file.rowid) return self.storage.get_lbry_file_status(lbry_file.rowid)
def set_lbry_file_data_payment_rate(self, lbry_file, new_rate): def set_lbry_file_data_payment_rate(self, lbry_file, new_rate):
return self.session.storage(lbry_file.rowid, new_rate) return self.storage(lbry_file.rowid, new_rate)
def change_lbry_file_status(self, lbry_file, status): def change_lbry_file_status(self, lbry_file, status):
log.debug("Changing status of %s to %s", lbry_file.stream_hash, status) log.debug("Changing status of %s to %s", lbry_file.stream_hash, status)
return self.session.storage.change_file_status(lbry_file.rowid, status) return self.storage.change_file_status(lbry_file.rowid, status)
def get_lbry_file_status_reports(self): def get_lbry_file_status_reports(self):
ds = [] ds = []
@ -80,20 +83,20 @@ class EncryptedFileManager(object):
return ManagedEncryptedFileDownloader( return ManagedEncryptedFileDownloader(
rowid, rowid,
stream_hash, stream_hash,
self.session.peer_finder, self.peer_finder,
self.session.rate_limiter, self.rate_limiter,
self.session.blob_manager, self.blob_manager,
self.session.storage, self.storage,
self, self,
payment_rate_manager, payment_rate_manager,
self.session.wallet, self.wallet,
download_directory, download_directory,
file_name, file_name,
stream_name=stream_name, stream_name=stream_name,
sd_hash=sd_hash, sd_hash=sd_hash,
key=key, key=key,
suggested_file_name=suggested_file_name, suggested_file_name=suggested_file_name,
download_mirrors=self.session.download_mirrors download_mirrors=self.download_mirrors
) )
def _start_lbry_file(self, file_info, payment_rate_manager, claim_info): def _start_lbry_file(self, file_info, payment_rate_manager, claim_info):
@ -116,9 +119,9 @@ class EncryptedFileManager(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_lbry_files(self): def _start_lbry_files(self):
files = yield self.session.storage.get_all_lbry_files() files = yield self.storage.get_all_lbry_files()
claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files]) claim_infos = yield self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
prm = self.session.payment_rate_manager prm = self.payment_rate_manager
log.info("Starting %i files", len(files)) log.info("Starting %i files", len(files))
for file_info in files: for file_info in files:
@ -154,7 +157,7 @@ class EncryptedFileManager(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate): def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate):
status = ManagedEncryptedFileDownloader.STATUS_FINISHED status = ManagedEncryptedFileDownloader.STATUS_FINISHED
stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False) stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False)
key = stream_metadata['key'] key = stream_metadata['key']
stream_name = stream_metadata['stream_name'] stream_name = stream_metadata['stream_name']
file_name = stream_metadata['suggested_file_name'] file_name = stream_metadata['suggested_file_name']
@ -175,9 +178,9 @@ class EncryptedFileManager(object):
def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None, def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None,
blob_data_rate=None, status=None, file_name=None): blob_data_rate=None, status=None, file_name=None):
status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED
payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager payment_rate_manager = payment_rate_manager or self.payment_rate_manager
blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate
stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False) stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False)
key = stream_metadata['key'] key = stream_metadata['key']
stream_name = stream_metadata['stream_name'] stream_name = stream_metadata['stream_name']
file_name = file_name or stream_metadata['suggested_file_name'] file_name = file_name or stream_metadata['suggested_file_name']
@ -187,7 +190,7 @@ class EncryptedFileManager(object):
rowid = yield self.storage.save_downloaded_file( rowid = yield self.storage.save_downloaded_file(
stream_hash, os.path.basename(file_name.decode('hex')).encode('hex'), download_directory, blob_data_rate stream_hash, os.path.basename(file_name.decode('hex')).encode('hex'), download_directory, blob_data_rate
) )
file_name = yield self.session.storage.get_filename_for_rowid(rowid) file_name = yield self.storage.get_filename_for_rowid(rowid)
lbry_file = self._get_lbry_file( lbry_file = self._get_lbry_file(
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
stream_metadata['suggested_file_name'] stream_metadata['suggested_file_name']
@ -222,7 +225,7 @@ class EncryptedFileManager(object):
del self.storage.content_claim_callbacks[lbry_file.stream_hash] del self.storage.content_claim_callbacks[lbry_file.stream_hash]
yield lbry_file.delete_data() yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash) yield self.storage.delete_stream(lbry_file.stream_hash)
if delete_file and os.path.isfile(full_path): if delete_file and os.path.isfile(full_path):
os.remove(full_path) os.remove(full_path)