refactor Publisher and jsonrpc_publish

-add get_read_handle to file_utils

-don’t leave read handle hanging after creating lbry file

-get rid of threaded weirdness

-remove reflector functionality from Publisher

-fix updating with an existing stream

-reflect new stream in thread after broadcasting name claim
This commit is contained in:
Jack Robison 2017-02-09 16:12:30 -05:00
parent c7106dfc97
commit 5a1bab2eaa
4 changed files with 92 additions and 211 deletions

View file

@ -482,7 +482,7 @@ class Wallet(object):
return defer.succeed(None)
if 'error' in result:
log.warning("Got an error looking up a name: %s", result['error'])
log.warning("Got an error looking up lbry://%s: %s", name, result['error'])
return Failure(UnknownNameError(name))
_check_result_fields(result)
try:

View file

@ -1,6 +1,8 @@
import os
import sys
import subprocess
from contextlib import contextmanager
def start(path):
"""
@ -33,3 +35,18 @@ def reveal(path):
subprocess.Popen(['xdg-open', os.path.dirname(path)])
elif sys.platform == 'win32':
subprocess.Popen(['explorer', '/select', path])
@contextmanager
def get_read_handle(path):
"""
Get os independent read handle for a file
"""
if os.name == "nt":
file_mode = 'rb'
else:
file_mode = 'r'
read_handle = open(path, file_mode)
yield read_handle
read_handle.close()

View file

@ -16,7 +16,6 @@ from twisted.web import server
from twisted.internet import defer, threads, error, reactor, task
from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure
from jsonschema import ValidationError
# TODO: importing this when internet is disabled raises a socket.gaierror
from lbryum.version import LBRYUM_VERSION as lbryum_version
@ -24,7 +23,7 @@ from lbrynet import __version__ as lbrynet_version
from lbrynet import conf, reflector, analytics
from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET
from lbrynet.metadata.Fee import FeeValidator
from lbrynet.metadata.Metadata import Metadata, verify_name_characters
from lbrynet.metadata.Metadata import verify_name_characters
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
@ -369,7 +368,7 @@ class Daemon(AuthJSONRPCServer):
self.connection_status_code = CONNECTION_STATUS_NETWORK
# claim_out is dictionary containing 'txid' and 'nout'
def _add_to_pending_claims(self, name, claim_out):
def _add_to_pending_claims(self, claim_out, name):
txid = claim_out['txid']
nout = claim_out['nout']
log.info("Adding lbry://%s to pending claims, txid %s nout %d" % (name, txid, nout))
@ -397,7 +396,7 @@ class Daemon(AuthJSONRPCServer):
log.warning("Re-add %s to pending claims", name)
txid, nout = self.pending_claims.pop(name)
claim_out = {'txid': txid, 'nout': nout}
self._add_to_pending_claims(name, claim_out)
self._add_to_pending_claims(claim_out, name)
def _process_lbry_file(name, lbry_file):
# lbry_file is an instance of ManagedEncryptedFileDownloader or None
@ -774,6 +773,26 @@ class Daemon(AuthJSONRPCServer):
sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file)
defer.returnValue((sd_hash, file_path))
@defer.inlineCallbacks
def _publish_stream(self, name, bid, metadata, file_path=None, fee=None):
publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet)
verify_name_characters(name)
if bid <= 0.0:
raise Exception("Invalid bid")
if fee:
metadata = yield publisher.add_fee_to_metadata(metadata, fee)
if not file_path:
claim_out = yield publisher.update_stream(name, bid, metadata)
else:
claim_out = yield publisher.publish_stream(name, file_path, bid, metadata)
yield threads.deferToThread(self._reflect, publisher.lbry_file)
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'],
claim_out['nout'])
yield self._add_to_pending_claims(claim_out, name)
self.looping_call_manager.start(Checker.PENDING_CLAIM, 30)
defer.returnValue(claim_out)
def add_stream(self, name, timeout, download_directory, file_name, stream_info):
"""Makes, adds and starts a stream"""
self.streams[name] = GetStream(self.sd_identifier,
@ -1640,16 +1659,17 @@ class Daemon(AuthJSONRPCServer):
return d
@AuthJSONRPCServer.auth_required
def jsonrpc_publish(self, name, file_path, bid, metadata, fee=None):
def jsonrpc_publish(self, name, bid, metadata, file_path=None, fee=None):
"""
Make a new name claim and publish associated data to lbrynet
Args:
'name': name to be claimed, string
'file_path': path to file to be associated with name, string
'bid': amount of credits to commit in this claim, float
'metadata': metadata dictionary
optional 'fee'
'name': str, name to be claimed, string
'bid': float, amount of credits to commit in this claim,
'metadata': dict, Metadata compliant (can be missing sources if a file is provided)
'file_path' (optional): str, path to file to be associated with name, if not given
the stream from your existing claim for the name will be used
'fee' (optional): dict, FeeValidator compliant
Returns:
'success' : True if claim was succesful , False otherwise
'reason' : if not succesful, give reason
@ -1659,17 +1679,6 @@ class Daemon(AuthJSONRPCServer):
'claim_id' : claim id of the resulting transaction
"""
def _set_address(address, currency, m):
log.info("Generated new address for key fee: " + str(address))
m['fee'][currency]['address'] = address
return m
def _reflect_if_possible(sd_hash, claim_out):
d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
d.addCallback(self._reflect)
d.addCallback(lambda _: claim_out)
return d
log.info("Publish: %s", {
'name': name,
'file_path': file_path,
@ -1677,49 +1686,9 @@ class Daemon(AuthJSONRPCServer):
'metadata': metadata,
'fee': fee,
})
verify_name_characters(name)
if bid <= 0.0:
return defer.fail(Exception("Invalid bid"))
try:
metadata = Metadata(metadata)
make_lbry_file = False
sd_hash = metadata['sources']['lbry_sd_hash']
log.info("Update publish for %s using existing stream", name)
except ValidationError:
make_lbry_file = True
sd_hash = None
if not file_path:
raise Exception("No file given to publish")
if not os.path.isfile(file_path):
raise Exception("Specified file for publish doesnt exist: %s" % file_path)
self.looping_call_manager.start(Checker.PENDING_CLAIM, 30)
d = self._resolve_name(name, force_refresh=True)
d.addErrback(lambda _: None)
if fee is not None:
metadata['fee'] = fee
assert len(metadata['fee']) == 1, "Too many fees"
for c in metadata['fee']:
if 'address' not in metadata['fee'][c]:
d.addCallback(lambda _: self.session.wallet.get_new_address())
d.addCallback(lambda addr: _set_address(addr, c, metadata))
else:
d.addCallback(lambda _: metadata)
if make_lbry_file:
pub = Publisher(self.session, self.lbry_file_manager, self.session.wallet)
d.addCallback(lambda meta: pub.start(name, file_path, bid, meta))
else:
d.addCallback(lambda meta: self.session.wallet.claim_name(name, bid, meta))
if sd_hash:
d.addCallback(lambda claim_out: _reflect_if_possible(sd_hash, claim_out))
d.addCallback(lambda claim_out: self._add_to_pending_claims(name, claim_out))
d = self._publish_stream(name, bid, metadata, file_path, fee)
d.addCallback(lambda r: self._render_response(r))
return d
@AuthJSONRPCServer.auth_required

View file

@ -1,16 +1,13 @@
import logging
import mimetypes
import os
import random
from twisted.internet import threads, defer, reactor
from twisted.internet import defer
from lbrynet.core import file_utils
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
from lbrynet.metadata.Metadata import Metadata
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet import reflector
from lbrynet import conf
from lbrynet.metadata.Fee import FeeValidator
log = logging.getLogger(__name__)
@ -21,154 +18,52 @@ class Publisher(object):
self.session = session
self.lbry_file_manager = lbry_file_manager
self.wallet = wallet
self.received_file_name = False
self.file_path = None
self.file_name = None
self.publish_name = None
self.bid_amount = None
self.verified = False
self.lbry_file = None
self.txid = None
self.nout = None
self.claim_id = None
self.fee = None
self.stream_hash = None
# TODO: this needs to be passed into the constructor
reflector_server = random.choice(conf.settings['reflector_servers'])
self.reflector_server, self.reflector_port = reflector_server[0], reflector_server[1]
self.metadata = {}
def start(self, name, file_path, bid, metadata):
@defer.inlineCallbacks
def add_fee_to_metadata(self, metadata, fee):
metadata['fee'] = FeeValidator(fee)
assert len(fee) == 1, "Too many fees"
for currency in fee:
if 'address' not in fee[currency]:
new_address = yield self.session.wallet.get_new_address()
fee[currency]['address'] = new_address
metadata['fee'] = FeeValidator(fee)
defer.returnValue(metadata)
@defer.inlineCallbacks
def publish_stream(self, name, file_path, bid, metadata):
log.info('Starting publish for %s', name)
def _show_result():
log.info(
"Success! Published %s --> lbry://%s txid: %s nout: %d",
self.file_name, self.publish_name, self.txid, self.nout
)
return defer.succeed({
'nout': self.nout,
'txid': self.txid,
'claim_id': self.claim_id,
'fee': self.fee,
})
self.publish_name = name
self.file_path = file_path
self.bid_amount = bid
self.metadata = metadata
# TODO: we cannot have this sort of code scattered throughout
# our code base. Use polymorphism instead
if os.name == "nt":
file_mode = 'rb'
else:
file_mode = 'r'
d = self._check_file_path(self.file_path)
# TODO: ensure that we aren't leaving this resource open
d.addCallback(lambda _: create_lbry_file(self.session, self.lbry_file_manager,
self.file_name, open(self.file_path, file_mode)))
d.addCallback(self.add_to_lbry_files)
d.addCallback(lambda _: self._create_sd_blob())
d.addCallback(lambda _: self._claim_name())
d.addCallback(lambda _: self.set_status())
d.addCallback(lambda _: self.start_reflector())
d.addCallbacks(
lambda _: _show_result(),
errback=log.fail(self._throw_publish_error),
errbackArgs=(
"An error occurred publishing %s to %s", self.file_name, self.publish_name)
)
return d
def start_reflector(self):
# TODO: is self.reflector_server unused?
reflector_server = random.choice(conf.settings['reflector_servers'])
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
log.info("Reflecting new publication")
factory = reflector.ClientFactory(
self.session.blob_manager,
self.lbry_file_manager.stream_info_manager,
self.stream_hash,
self.publish_name
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d
def _check_file_path(self, file_path):
def check_file_threaded():
f = open(file_path)
f.close()
self.file_name = os.path.basename(self.file_path)
return True
return threads.deferToThread(check_file_threaded)
def set_lbry_file(self, lbry_file_downloader):
self.lbry_file = lbry_file_downloader
return defer.succeed(None)
def add_to_lbry_files(self, stream_hash):
self.stream_hash = stream_hash
file_name = os.path.basename(file_path)
with file_utils.get_read_handle(file_path) as read_handle:
stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, file_name,
read_handle)
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(self.set_lbry_file)
return d
self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, prm)
sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager,
self.session.blob_manager, self.lbry_file.stream_hash)
if 'sources' not in metadata:
metadata['sources'] = {}
metadata['sources']['lbry_sd_hash'] = sd_hash
metadata['content_type'] = get_content_type(file_path)
metadata['ver'] = Metadata.current_version
claim_out = yield self.make_claim(name, bid, metadata)
defer.returnValue(claim_out)
def _create_sd_blob(self):
log.debug('Creating stream descriptor blob')
d = publish_sd_blob(self.lbry_file_manager.stream_info_manager,
self.session.blob_manager,
self.lbry_file.stream_hash)
@defer.inlineCallbacks
def update_stream(self, name, bid, metadata):
my_claim = yield self.wallet.get_my_claim(name)
updated_metadata = my_claim['value']
for meta_key in metadata:
updated_metadata[meta_key] = metadata[meta_key]
claim_out = yield self.make_claim(name, bid, updated_metadata)
defer.returnValue(claim_out)
def set_sd_hash(sd_hash):
log.debug('stream descriptor hash: %s', sd_hash)
if 'sources' not in self.metadata:
self.metadata['sources'] = {}
self.metadata['sources']['lbry_sd_hash'] = sd_hash
d.addCallback(set_sd_hash)
return d
def set_status(self):
log.debug('Setting status')
d = self.lbry_file_manager.change_lbry_file_status(
self.lbry_file, ManagedEncryptedFileDownloader.STATUS_FINISHED)
d.addCallback(lambda _: self.lbry_file.restore())
return d
def _claim_name(self):
log.debug('Claiming name')
self._update_metadata()
m = Metadata(self.metadata)
def set_claim_out(claim_out):
log.debug('Name claimed using txid: %s, nout: %d, claim_id: %s, fee :%f',
claim_out['txid'], claim_out['nout'],
claim_out['claim_id'], claim_out['fee'])
self.txid = claim_out['txid']
self.nout = claim_out['nout']
self.claim_id = claim_out['claim_id']
self.fee = claim_out['fee']
d = self.wallet.claim_name(self.publish_name, self.bid_amount, m)
d.addCallback(set_claim_out)
return d
def _update_metadata(self):
filename = os.path.join(self.lbry_file.download_directory, self.lbry_file.file_name)
self.metadata['content_type'] = get_content_type(filename)
self.metadata['ver'] = Metadata.current_version
def _throw_publish_error(self, err):
# TODO: I'm not a fan of the log and re-throw, especially when
# the new exception is more generic. Look over this to
# see if there is a reason not to remove the errback
# handler and allow the original exception to move up
# the stack.
raise Exception("Publish failed")
@defer.inlineCallbacks
def make_claim(self, name, bid, metadata):
validated_metadata = Metadata(metadata)
claim_out = yield self.wallet.claim_name(name, bid, validated_metadata)
defer.returnValue(claim_out)
def get_content_type(filename):