diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index f02a4dcfa..61d16ee96 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -482,7 +482,7 @@ class Wallet(object): return defer.succeed(None) if 'error' in result: - log.warning("Got an error looking up a name: %s", result['error']) + log.warning("Got an error looking up lbry://%s: %s", name, result['error']) return Failure(UnknownNameError(name)) _check_result_fields(result) try: diff --git a/lbrynet/core/file_utils.py b/lbrynet/core/file_utils.py index d779fa674..b7611763a 100644 --- a/lbrynet/core/file_utils.py +++ b/lbrynet/core/file_utils.py @@ -1,6 +1,8 @@ import os import sys import subprocess +from contextlib import contextmanager + def start(path): """ @@ -33,3 +35,18 @@ def reveal(path): subprocess.Popen(['xdg-open', os.path.dirname(path)]) elif sys.platform == 'win32': subprocess.Popen(['explorer', '/select', path]) + + +@contextmanager +def get_read_handle(path): + """ + Get os independent read handle for a file + """ + + if os.name == "nt": + file_mode = 'rb' + else: + file_mode = 'r' + read_handle = open(path, file_mode) + yield read_handle + read_handle.close() diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index cf4d15ef5..d5958b89d 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -16,7 +16,6 @@ from twisted.web import server from twisted.internet import defer, threads, error, reactor, task from twisted.internet.task import LoopingCall from twisted.python.failure import Failure -from jsonschema import ValidationError # TODO: importing this when internet is disabled raises a socket.gaierror from lbryum.version import LBRYUM_VERSION as lbryum_version @@ -24,7 +23,7 @@ from lbrynet import __version__ as lbrynet_version from lbrynet import conf, reflector, analytics from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET from lbrynet.metadata.Fee import FeeValidator -from lbrynet.metadata.Metadata import Metadata, verify_name_characters +from lbrynet.metadata.Metadata import verify_name_characters from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier @@ -369,7 +368,7 @@ class Daemon(AuthJSONRPCServer): self.connection_status_code = CONNECTION_STATUS_NETWORK # claim_out is dictionary containing 'txid' and 'nout' - def _add_to_pending_claims(self, name, claim_out): + def _add_to_pending_claims(self, claim_out, name): txid = claim_out['txid'] nout = claim_out['nout'] log.info("Adding lbry://%s to pending claims, txid %s nout %d" % (name, txid, nout)) @@ -397,7 +396,7 @@ class Daemon(AuthJSONRPCServer): log.warning("Re-add %s to pending claims", name) txid, nout = self.pending_claims.pop(name) claim_out = {'txid': txid, 'nout': nout} - self._add_to_pending_claims(name, claim_out) + self._add_to_pending_claims(claim_out, name) def _process_lbry_file(name, lbry_file): # lbry_file is an instance of ManagedEncryptedFileDownloader or None @@ -774,6 +773,26 @@ class Daemon(AuthJSONRPCServer): sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) defer.returnValue((sd_hash, file_path)) + @defer.inlineCallbacks + def _publish_stream(self, name, bid, metadata, file_path=None, fee=None): + publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet) + verify_name_characters(name) + if bid <= 0.0: + raise Exception("Invalid bid") + if fee: + metadata = yield publisher.add_fee_to_metadata(metadata, fee) + if not file_path: + claim_out = yield publisher.update_stream(name, bid, metadata) + else: + claim_out = yield publisher.publish_stream(name, file_path, bid, metadata) + yield threads.deferToThread(self._reflect, publisher.lbry_file) + + log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], + claim_out['nout']) + yield self._add_to_pending_claims(claim_out, name) + self.looping_call_manager.start(Checker.PENDING_CLAIM, 30) + defer.returnValue(claim_out) + def add_stream(self, name, timeout, download_directory, file_name, stream_info): """Makes, adds and starts a stream""" self.streams[name] = GetStream(self.sd_identifier, @@ -1640,16 +1659,17 @@ class Daemon(AuthJSONRPCServer): return d @AuthJSONRPCServer.auth_required - def jsonrpc_publish(self, name, file_path, bid, metadata, fee=None): + def jsonrpc_publish(self, name, bid, metadata, file_path=None, fee=None): """ Make a new name claim and publish associated data to lbrynet Args: - 'name': name to be claimed, string - 'file_path': path to file to be associated with name, string - 'bid': amount of credits to commit in this claim, float - 'metadata': metadata dictionary - optional 'fee' + 'name': str, name to be claimed, string + 'bid': float, amount of credits to commit in this claim, + 'metadata': dict, Metadata compliant (can be missing sources if a file is provided) + 'file_path' (optional): str, path to file to be associated with name, if not given + the stream from your existing claim for the name will be used + 'fee' (optional): dict, FeeValidator compliant Returns: 'success' : True if claim was succesful , False otherwise 'reason' : if not succesful, give reason @@ -1659,17 +1679,6 @@ class Daemon(AuthJSONRPCServer): 'claim_id' : claim id of the resulting transaction """ - def _set_address(address, currency, m): - log.info("Generated new address for key fee: " + str(address)) - m['fee'][currency]['address'] = address - return m - - def _reflect_if_possible(sd_hash, claim_out): - d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) - d.addCallback(self._reflect) - d.addCallback(lambda _: claim_out) - return d - log.info("Publish: %s", { 'name': name, 'file_path': file_path, @@ -1677,49 +1686,9 @@ class Daemon(AuthJSONRPCServer): 'metadata': metadata, 'fee': fee, }) - verify_name_characters(name) - if bid <= 0.0: - return defer.fail(Exception("Invalid bid")) - - try: - metadata = Metadata(metadata) - make_lbry_file = False - sd_hash = metadata['sources']['lbry_sd_hash'] - log.info("Update publish for %s using existing stream", name) - except ValidationError: - make_lbry_file = True - sd_hash = None - if not file_path: - raise Exception("No file given to publish") - if not os.path.isfile(file_path): - raise Exception("Specified file for publish doesnt exist: %s" % file_path) - - self.looping_call_manager.start(Checker.PENDING_CLAIM, 30) - - d = self._resolve_name(name, force_refresh=True) - d.addErrback(lambda _: None) - - if fee is not None: - metadata['fee'] = fee - assert len(metadata['fee']) == 1, "Too many fees" - for c in metadata['fee']: - if 'address' not in metadata['fee'][c]: - d.addCallback(lambda _: self.session.wallet.get_new_address()) - d.addCallback(lambda addr: _set_address(addr, c, metadata)) - else: - d.addCallback(lambda _: metadata) - if make_lbry_file: - pub = Publisher(self.session, self.lbry_file_manager, self.session.wallet) - d.addCallback(lambda meta: pub.start(name, file_path, bid, meta)) - else: - d.addCallback(lambda meta: self.session.wallet.claim_name(name, bid, meta)) - if sd_hash: - d.addCallback(lambda claim_out: _reflect_if_possible(sd_hash, claim_out)) - - d.addCallback(lambda claim_out: self._add_to_pending_claims(name, claim_out)) + d = self._publish_stream(name, bid, metadata, file_path, fee) d.addCallback(lambda r: self._render_response(r)) - return d @AuthJSONRPCServer.auth_required diff --git a/lbrynet/lbrynet_daemon/Publisher.py b/lbrynet/lbrynet_daemon/Publisher.py index 39dcb98e6..ad7078714 100644 --- a/lbrynet/lbrynet_daemon/Publisher.py +++ b/lbrynet/lbrynet_daemon/Publisher.py @@ -1,16 +1,13 @@ import logging import mimetypes import os -import random - -from twisted.internet import threads, defer, reactor +from twisted.internet import defer +from lbrynet.core import file_utils from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob from lbrynet.metadata.Metadata import Metadata -from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader -from lbrynet import reflector -from lbrynet import conf +from lbrynet.metadata.Fee import FeeValidator log = logging.getLogger(__name__) @@ -21,154 +18,52 @@ class Publisher(object): self.session = session self.lbry_file_manager = lbry_file_manager self.wallet = wallet - self.received_file_name = False - self.file_path = None - self.file_name = None - self.publish_name = None - self.bid_amount = None - self.verified = False self.lbry_file = None - self.txid = None - self.nout = None - self.claim_id = None - self.fee = None - self.stream_hash = None - # TODO: this needs to be passed into the constructor - reflector_server = random.choice(conf.settings['reflector_servers']) - self.reflector_server, self.reflector_port = reflector_server[0], reflector_server[1] - self.metadata = {} - def start(self, name, file_path, bid, metadata): + @defer.inlineCallbacks + def add_fee_to_metadata(self, metadata, fee): + metadata['fee'] = FeeValidator(fee) + assert len(fee) == 1, "Too many fees" + for currency in fee: + if 'address' not in fee[currency]: + new_address = yield self.session.wallet.get_new_address() + fee[currency]['address'] = new_address + metadata['fee'] = FeeValidator(fee) + defer.returnValue(metadata) + + @defer.inlineCallbacks + def publish_stream(self, name, file_path, bid, metadata): log.info('Starting publish for %s', name) - - def _show_result(): - log.info( - "Success! Published %s --> lbry://%s txid: %s nout: %d", - self.file_name, self.publish_name, self.txid, self.nout - ) - return defer.succeed({ - 'nout': self.nout, - 'txid': self.txid, - 'claim_id': self.claim_id, - 'fee': self.fee, - }) - - self.publish_name = name - self.file_path = file_path - self.bid_amount = bid - self.metadata = metadata - - # TODO: we cannot have this sort of code scattered throughout - # our code base. Use polymorphism instead - if os.name == "nt": - file_mode = 'rb' - else: - file_mode = 'r' - - d = self._check_file_path(self.file_path) - # TODO: ensure that we aren't leaving this resource open - d.addCallback(lambda _: create_lbry_file(self.session, self.lbry_file_manager, - self.file_name, open(self.file_path, file_mode))) - d.addCallback(self.add_to_lbry_files) - d.addCallback(lambda _: self._create_sd_blob()) - d.addCallback(lambda _: self._claim_name()) - d.addCallback(lambda _: self.set_status()) - d.addCallback(lambda _: self.start_reflector()) - d.addCallbacks( - lambda _: _show_result(), - errback=log.fail(self._throw_publish_error), - errbackArgs=( - "An error occurred publishing %s to %s", self.file_name, self.publish_name) - ) - return d - - def start_reflector(self): - # TODO: is self.reflector_server unused? - reflector_server = random.choice(conf.settings['reflector_servers']) - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - log.info("Reflecting new publication") - factory = reflector.ClientFactory( - self.session.blob_manager, - self.lbry_file_manager.stream_info_manager, - self.stream_hash, - self.publish_name - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d - - def _check_file_path(self, file_path): - def check_file_threaded(): - f = open(file_path) - f.close() - self.file_name = os.path.basename(self.file_path) - return True - return threads.deferToThread(check_file_threaded) - - def set_lbry_file(self, lbry_file_downloader): - self.lbry_file = lbry_file_downloader - return defer.succeed(None) - - def add_to_lbry_files(self, stream_hash): - self.stream_hash = stream_hash + file_name = os.path.basename(file_path) + with file_utils.get_read_handle(file_path) as read_handle: + stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, file_name, + read_handle) prm = self.session.payment_rate_manager - d = self.lbry_file_manager.add_lbry_file(stream_hash, prm) - d.addCallback(self.set_lbry_file) - return d + self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, prm) + sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager, + self.session.blob_manager, self.lbry_file.stream_hash) + if 'sources' not in metadata: + metadata['sources'] = {} + metadata['sources']['lbry_sd_hash'] = sd_hash + metadata['content_type'] = get_content_type(file_path) + metadata['ver'] = Metadata.current_version + claim_out = yield self.make_claim(name, bid, metadata) + defer.returnValue(claim_out) - def _create_sd_blob(self): - log.debug('Creating stream descriptor blob') - d = publish_sd_blob(self.lbry_file_manager.stream_info_manager, - self.session.blob_manager, - self.lbry_file.stream_hash) + @defer.inlineCallbacks + def update_stream(self, name, bid, metadata): + my_claim = yield self.wallet.get_my_claim(name) + updated_metadata = my_claim['value'] + for meta_key in metadata: + updated_metadata[meta_key] = metadata[meta_key] + claim_out = yield self.make_claim(name, bid, updated_metadata) + defer.returnValue(claim_out) - def set_sd_hash(sd_hash): - log.debug('stream descriptor hash: %s', sd_hash) - if 'sources' not in self.metadata: - self.metadata['sources'] = {} - self.metadata['sources']['lbry_sd_hash'] = sd_hash - - d.addCallback(set_sd_hash) - return d - - def set_status(self): - log.debug('Setting status') - d = self.lbry_file_manager.change_lbry_file_status( - self.lbry_file, ManagedEncryptedFileDownloader.STATUS_FINISHED) - d.addCallback(lambda _: self.lbry_file.restore()) - return d - - def _claim_name(self): - log.debug('Claiming name') - self._update_metadata() - m = Metadata(self.metadata) - - def set_claim_out(claim_out): - log.debug('Name claimed using txid: %s, nout: %d, claim_id: %s, fee :%f', - claim_out['txid'], claim_out['nout'], - claim_out['claim_id'], claim_out['fee']) - self.txid = claim_out['txid'] - self.nout = claim_out['nout'] - self.claim_id = claim_out['claim_id'] - self.fee = claim_out['fee'] - - d = self.wallet.claim_name(self.publish_name, self.bid_amount, m) - d.addCallback(set_claim_out) - return d - - def _update_metadata(self): - filename = os.path.join(self.lbry_file.download_directory, self.lbry_file.file_name) - self.metadata['content_type'] = get_content_type(filename) - self.metadata['ver'] = Metadata.current_version - - def _throw_publish_error(self, err): - # TODO: I'm not a fan of the log and re-throw, especially when - # the new exception is more generic. Look over this to - # see if there is a reason not to remove the errback - # handler and allow the original exception to move up - # the stack. - raise Exception("Publish failed") + @defer.inlineCallbacks + def make_claim(self, name, bid, metadata): + validated_metadata = Metadata(metadata) + claim_out = yield self.wallet.claim_name(name, bid, validated_metadata) + defer.returnValue(claim_out) def get_content_type(filename):