Merge pull request #593 from lbryio/channels

Channels
This commit is contained in:
Jack Robison 2017-04-07 15:55:11 -04:00 committed by GitHub
commit dc0699b929
10 changed files with 396 additions and 435 deletions

View file

@ -9,17 +9,19 @@ at anytime.
## [Unreleased] ## [Unreleased]
### Added ### Added
* * Use `claim_id` instead of outpoint for `claim_abandon`
* * Add `channel_name` parameter to `publish`
* * Add `delete_all` parameter to `file_delete` to allow deleting multiple files
* Add `channel_list_mine`
* Add `channel_new`
* Add `resolve` to resolve lbry uris
### Changed ### Changed
* * Use `uri` instead of `name` for `get`, remove explicit `claim_id` parameter
* * Increase default download timeout
*
### Fixed ### Fixed
* *
* *
* *

View file

@ -162,7 +162,7 @@ ADJUSTABLE_SETTINGS = {
'delete_blobs_on_remove': (bool, True), 'delete_blobs_on_remove': (bool, True),
'dht_node_port': (int, 4444), 'dht_node_port': (int, 4444),
'download_directory': (str, default_download_directory), 'download_directory': (str, default_download_directory),
'download_timeout': (int, 30), 'download_timeout': (int, 180),
'host_ui': (bool, True), 'host_ui': (bool, True),
'is_generous_host': (bool, True), 'is_generous_host': (bool, True),
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port), 'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),

View file

@ -10,20 +10,18 @@ from zope.interface import implements
from decimal import Decimal from decimal import Decimal
from lbryum import SimpleConfig, Network from lbryum import SimpleConfig, Network
from lbryum.lbrycrd import COIN, RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS from lbryum.lbrycrd import COIN
import lbryum.wallet import lbryum.wallet
from lbryum.commands import known_commands, Commands from lbryum.commands import known_commands, Commands
from lbryschema.uri import parse_lbry_uri
from lbryschema.claim import ClaimDict from lbryschema.claim import ClaimDict
from lbryschema.decode import smart_decode
from lbryschema.error import DecodeError from lbryschema.error import DecodeError
from lbrynet.core import utils
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import (UnknownNameError, InvalidStreamInfoError, RequestCanceledError, from lbrynet.core.Error import RequestCanceledError, InsufficientFundsError
InsufficientFundsError)
from lbrynet.db_migrator.migrate1to2 import UNSET_NOUT from lbrynet.db_migrator.migrate1to2 import UNSET_NOUT
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -450,65 +448,10 @@ class Wallet(object):
log.debug("There were no payments to send") log.debug("There were no payments to send")
return defer.succeed(True) return defer.succeed(True)
def get_stream_info_for_name(self, name): ######
d = self._get_value_for_name(name)
d.addCallback(self._get_stream_info_from_value, name)
return d
def get_txid_for_name(self, name): def get_claim(self, claim_id):
d = self._get_value_for_name(name) return self._get_claim_by_claimid(claim_id)
d.addCallback(lambda r: None if 'txid' not in r else r['txid'])
return d
def get_stream_info_from_claim_outpoint(self, name, txid, nout):
claim_outpoint = ClaimOutpoint(txid, nout)
d = self.get_claims_from_tx(claim_outpoint['txid'])
def get_claim_for_name(claims):
for claim in claims:
if claim_outpoint == claim:
claim['txid'] = txid
return claim
return Failure(UnknownNameError(name))
d.addCallback(get_claim_for_name)
d.addCallback(self._get_stream_info_from_value, name)
return d
def _get_stream_info_from_value(self, result, name):
def _check_result_fields(r):
for k in ['value', 'txid', 'nout', 'height', 'amount']:
assert k in r, "getvalueforname response missing field %s" % k
def _log_success(claim_id):
log.debug("lbry://%s complies with %s, claimid: %s",
name, claim_dict.claim_dict['version'], claim_id)
return defer.succeed(None)
if 'error' in result:
log.warning("Got an error looking up lbry://%s: %s", name, result['error'])
return Failure(UnknownNameError(name))
_check_result_fields(result)
try:
claim_dict = smart_decode(result['value'].decode('hex'))
except (TypeError, ValueError, DecodeError):
return Failure(InvalidStreamInfoError(name, result['value']))
#TODO: what if keys don't exist here,
# probablly need get_sd_hash() function fro ClaimDict
sd_hash = utils.get_sd_hash(claim_dict.claim_dict)
claim_outpoint = ClaimOutpoint(result['txid'], result['nout'])
d = self._save_name_metadata(name, claim_outpoint, sd_hash)
d.addCallback(lambda _: self.get_claimid(name, result['txid'], result['nout']))
d.addCallback(lambda cid: _log_success(cid))
d.addCallback(lambda _: claim_dict.claim_dict)
return d
def get_claim(self, name, claim_id):
d = self.get_claims_for_name(name)
d.addCallback(
lambda claims: next(
claim for claim in claims['claims'] if claim['claim_id'] == claim_id))
return d
def get_claimid(self, name, txid, nout): def get_claimid(self, name, txid, nout):
def _get_id_for_return(claim_id): def _get_id_for_return(claim_id):
@ -530,96 +473,135 @@ class Wallet(object):
d.addCallback(_get_id_for_return) d.addCallback(_get_id_for_return)
return d return d
@defer.inlineCallbacks
def get_my_claim(self, name): def get_my_claim(self, name):
def _get_claim_for_return(claim): my_claims = yield self.get_name_claims()
if not claim: my_claim = False
return False for claim in my_claims:
claim['value'] = smart_decode(claim['value'].decode('hex')).claim_dict if claim['name'] == name:
return claim claim['value'] = ClaimDict.load_dict(claim['value'])
my_claim = claim
def _get_my_unspent_claim(claims): break
for claim in claims: defer.returnValue(my_claim)
is_unspent = (
claim['name'] == name and
not claim['is_spent'] and
not claim.get('supported_claimid', False)
)
if is_unspent:
return claim
return False
d = self.get_name_claims()
d.addCallback(_get_my_unspent_claim)
d.addCallback(_get_claim_for_return)
return d
def get_claim_info(self, name, txid=None, nout=None): def get_claim_info(self, name, txid=None, nout=None):
if txid is None or nout is None: if txid is None or nout is None:
d = self._get_value_for_name(name) return self.get_claim_by_name(name)
d.addCallback(lambda r: self._get_claim_info(name, ClaimOutpoint(r['txid'], r['nout'])))
else: else:
d = self._get_claim_info(name, ClaimOutpoint(txid, nout)) return self.get_claim_by_outpoint(ClaimOutpoint(txid, nout))
# TODO: this catches every exception, fix this
d.addErrback(lambda _: False)
return d
def _format_claim_for_return(self, name, claim, claim_dict, meta_version): @defer.inlineCallbacks
result = {} def _handle_claim_result(self, results):
result['claim_id'] = claim['claim_id'] if 'error' in results:
result['amount'] = claim['effective_amount'] raise Exception(results['error'])
result['height'] = claim['height'] elif 'claim' in results:
result['name'] = name claim = results['claim']
result['txid'] = claim['txid'] if 'has_signature' in claim and claim['has_signature']:
result['nout'] = claim['nout'] if not claim['signature_is_valid']:
result['value'] = claim_dict raise Exception("Claim has invalid signature")
result['supports'] = [
{'txid': support['txid'], 'nout': support['nout']} for support in claim['supports']]
result['meta_version'] = meta_version
return result
def _get_claim_info(self, name, claim_outpoint):
def _build_response(claim):
try: try:
claim_dict = smart_decode(claim['value'].decode('hex')).claim_dict decoded = ClaimDict.load_dict(claim['value'])
meta_ver = claim_dict['stream']['metadata']['version'] claim_dict = decoded.claim_dict
sd_hash = utils.get_sd_hash(claim_dict) outpoint = ClaimOutpoint(claim['txid'], claim['nout'])
d = self._save_name_metadata(name, claim_outpoint, sd_hash) name = claim['name']
claim['value'] = claim_dict
yield self._save_name_metadata(name, outpoint, decoded.source_hash)
yield self._update_claimid(claim['claim_id'], name, outpoint)
except (TypeError, ValueError, KeyError, DecodeError): except (TypeError, ValueError, KeyError, DecodeError):
claim_dict = claim['value'] claim = claim['value']
meta_ver = "Non-compliant" log.warning(results)
d = defer.succeed(None) results = claim
elif 'value' in results:
if 'has_signature' in results and results['has_signature']:
if not results['signature_is_valid']:
raise Exception("Claim has invalid signature")
try:
decoded = ClaimDict.load_dict(results['value'])
claim_dict = decoded.claim_dict
outpoint = ClaimOutpoint(results['txid'], results['nout'])
name = results['name']
yield self._save_name_metadata(name, outpoint, decoded.source_hash)
yield self._update_claimid(results['claim_id'], name, outpoint)
except (TypeError, ValueError, KeyError, DecodeError):
claim_dict = results['value']
log.warning(results)
results['value'] = claim_dict
log.info("get claim info lbry://%s#%s", results['name'], results['claim_id'])
defer.returnValue(results)
d.addCallback(lambda _: self._format_claim_for_return(name, @defer.inlineCallbacks
claim, def resolve_uri(self, uri):
claim_dict=claim_dict, resolve_results = yield self._get_value_for_uri(uri)
meta_version=meta_ver)) if 'claim' in resolve_results:
log.info( formatted = yield self._handle_claim_result(resolve_results)
"get claim info lbry://%s metadata: %s, claimid: %s", resolve_results['claim'] = formatted
name, meta_ver, claim['claim_id']) result = resolve_results
return d elif 'claims_in_channel' in resolve_results:
claims_for_return = []
for claim in resolve_results['claims_in_channel']:
formatted = yield self._handle_claim_result(claim)
claims_for_return.append(formatted)
resolve_results['claims_in_channel'] = claims_for_return
result = resolve_results
else:
result = None
defer.returnValue(result)
d = self.get_claimid(name, claim_outpoint['txid'], claim_outpoint['nout']) @defer.inlineCallbacks
d.addCallback(lambda claim_id: self.get_claim(name, claim_id)) def get_claim_by_outpoint(self, claim_outpoint):
d.addCallback(_build_response) claim = yield self._get_claim_by_outpoint(claim_outpoint['txid'], claim_outpoint['nout'])
return d result = yield self._handle_claim_result(claim)
defer.returnValue(result)
@defer.inlineCallbacks
def get_claim_by_name(self, name):
get_name_result = yield self._get_value_for_name(name)
result = yield self._handle_claim_result(get_name_result)
defer.returnValue(result)
@defer.inlineCallbacks
def get_claims_for_name(self, name): def get_claims_for_name(self, name):
d = self._get_claims_for_name(name) result = yield self._get_claims_for_name(name)
return d claims = result['claims']
claims_for_return = []
for claim in claims:
claim['value'] = ClaimDict.load_dict(claim['value']).claim_dict
claims_for_return.append(claim)
result['claims'] = claims_for_return
defer.returnValue(result)
def _process_claim_out(self, claim_out): def _process_claim_out(self, claim_out):
claim_out.pop('success') claim_out.pop('success')
claim_out['fee'] = float(claim_out['fee']) claim_out['fee'] = float(claim_out['fee'])
return claim_out return claim_out
def claim_new_channel(self, channel_name, amount):
parsed_channel_name = parse_lbry_uri(channel_name)
if not parsed_channel_name.is_channel:
raise Exception("Invalid channel name")
elif (parsed_channel_name.path or parsed_channel_name.claim_id or
parsed_channel_name.bid_position or parsed_channel_name.claim_sequence):
raise Exception("New channel claim should have no fields other than name")
return self._claim_certificate(parsed_channel_name.name, amount)
@defer.inlineCallbacks @defer.inlineCallbacks
def claim_name(self, name, bid, metadata): def channel_list(self):
certificates = yield self._get_certificate_claims()
results = []
for claim in certificates:
formatted = yield self._handle_claim_result(claim)
results.append(formatted)
defer.returnValue(results)
@defer.inlineCallbacks
def claim_name(self, name, bid, metadata, certificate_id=None):
""" """
Claim a name, or update if name already claimed by user Claim a name, or update if name already claimed by user
@param name: str, name to claim @param name: str, name to claim
@param bid: float, bid amount @param bid: float, bid amount
@param metadata: Metadata compliant dict @param metadata: ClaimDict compliant dict
@param certificate_id: str (optional), claim id of channel certificate
@return: Deferred which returns a dict containing below items @return: Deferred which returns a dict containing below items
txid - txid of the resulting transaction txid - txid of the resulting transaction
@ -627,22 +609,14 @@ class Wallet(object):
fee - transaction fee paid to make claim fee - transaction fee paid to make claim
claim_id - claim id of the claim claim_id - claim id of the claim
""" """
claim_dict = ClaimDict.load_dict(metadata)
my_claim = yield self.get_my_claim(name)
if my_claim: decoded = ClaimDict.load_dict(metadata)
log.info("Updating claim") serialized = decoded.serialized
if self.get_balance() < Decimal(bid) - Decimal(my_claim['amount']):
raise InsufficientFundsError() if self.get_balance() < Decimal(bid):
old_claim_outpoint = ClaimOutpoint(my_claim['txid'], my_claim['nout']) raise InsufficientFundsError()
claim = yield self._send_name_claim_update(name, my_claim['claim_id'],
old_claim_outpoint, claim_dict.serialized, bid) claim = yield self._send_name_claim(name, serialized.encode('hex'), bid, certificate_id)
claim['claim_id'] = my_claim['claim_id']
else:
log.info("Making a new claim")
if self.get_balance() < bid:
raise InsufficientFundsError()
claim = yield self._send_name_claim(name, claim_dict.serialized, bid)
if not claim['success']: if not claim['success']:
msg = 'Claim to name {} failed: {}'.format(name, claim['reason']) msg = 'Claim to name {} failed: {}'.format(name, claim['reason'])
@ -651,24 +625,20 @@ class Wallet(object):
claim = self._process_claim_out(claim) claim = self._process_claim_out(claim)
claim_outpoint = ClaimOutpoint(claim['txid'], claim['nout']) claim_outpoint = ClaimOutpoint(claim['txid'], claim['nout'])
log.info("Saving metadata for claim %s %d", claim['txid'], claim['nout']) log.info("Saving metadata for claim %s %d", claim['txid'], claim['nout'])
yield self._save_name_metadata(name, claim_outpoint, yield self._update_claimid(claim['claim_id'], name, claim_outpoint)
utils.get_sd_hash(claim_dict.claim_dict)) yield self._save_name_metadata(name, claim_outpoint, decoded.source_hash)
defer.returnValue(claim) defer.returnValue(claim)
@defer.inlineCallbacks @defer.inlineCallbacks
def abandon_claim(self, txid, nout): def abandon_claim(self, claim_id):
def _parse_abandon_claim_out(claim_out): claim_out = yield self._abandon_claim(claim_id)
if not claim_out['success']:
msg = 'Abandon of {}:{} failed: {}'.format(txid, nout, claim_out['reason'])
raise Exception(msg)
claim_out = self._process_claim_out(claim_out)
log.info("Abandoned claim tx %s (n: %i) --> %s", txid, nout, claim_out)
return defer.succeed(claim_out)
claim_outpoint = ClaimOutpoint(txid, nout) if not claim_out['success']:
claim_out = yield self._abandon_claim(claim_outpoint) msg = 'Abandon of {} failed: {}'.format(claim_id, claim_out['reason'])
result = yield _parse_abandon_claim_out(claim_out) raise Exception(msg)
defer.returnValue(result)
claim_out = self._process_claim_out(claim_out)
defer.returnValue(claim_out)
def support_claim(self, name, claim_id, amount): def support_claim(self, name, claim_id, amount):
def _parse_support_claim_out(claim_out): def _parse_support_claim_out(claim_out):
@ -778,13 +748,13 @@ class Wallet(object):
def _get_claims_for_name(self, name): def _get_claims_for_name(self, name):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _send_name_claim(self, name, val, amount): def _claim_certificate(self, name, amount):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _abandon_claim(self, claim_outpoint): def _send_name_claim(self, name, val, amount, certificate_id=None):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): def _abandon_claim(self, claim_id):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _support_claim(self, name, claim_id, amount): def _support_claim(self, name, claim_id, amount):
@ -808,6 +778,18 @@ class Wallet(object):
def _address_is_mine(self, address): def _address_is_mine(self, address):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _get_value_for_uri(self, uri):
return defer.fail(NotImplementedError())
def _get_certificate_claims(self):
return defer.fail(NotImplementedError())
def _get_claim_by_outpoint(self, txid, nout):
return defer.fail(NotImplementedError())
def _get_claim_by_claimid(self, claim_id):
return defer.fail(NotImplementedError())
def _start(self): def _start(self):
pass pass
@ -947,21 +929,21 @@ class LBRYumWallet(Wallet):
# run commands as a defer.succeed, # run commands as a defer.succeed,
# lbryum commands should be run this way , unless if the command # lbryum commands should be run this way , unless if the command
# only makes a lbrum server query, use _run_cmd_as_defer_to_thread() # only makes a lbrum server query, use _run_cmd_as_defer_to_thread()
def _run_cmd_as_defer_succeed(self, command_name, *args): def _run_cmd_as_defer_succeed(self, command_name, *args, **kwargs):
cmd_runner = self._get_cmd_runner() cmd_runner = self._get_cmd_runner()
cmd = known_commands[command_name] cmd = known_commands[command_name]
func = getattr(cmd_runner, cmd.name) func = getattr(cmd_runner, cmd.name)
return defer.succeed(func(*args)) return defer.succeed(func(*args, **kwargs))
# run commands as a deferToThread, lbryum commands that only make # run commands as a deferToThread, lbryum commands that only make
# queries to lbryum server should be run this way # queries to lbryum server should be run this way
# TODO: keep track of running threads and cancel them on `stop` # TODO: keep track of running threads and cancel them on `stop`
# otherwise the application will hang, waiting for threads to complete # otherwise the application will hang, waiting for threads to complete
def _run_cmd_as_defer_to_thread(self, command_name, *args): def _run_cmd_as_defer_to_thread(self, command_name, *args, **kwargs):
cmd_runner = self._get_cmd_runner() cmd_runner = self._get_cmd_runner()
cmd = known_commands[command_name] cmd = known_commands[command_name]
func = getattr(cmd_runner, cmd.name) func = getattr(cmd_runner, cmd.name)
return threads.deferToThread(func, *args) return threads.deferToThread(func, *args, **kwargs)
def _update_balance(self): def _update_balance(self):
accounts = None accounts = None
@ -1019,35 +1001,17 @@ class LBRYumWallet(Wallet):
return self._run_cmd_as_defer_to_thread('getclaimsforname', name) return self._run_cmd_as_defer_to_thread('getclaimsforname', name)
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_name_claim(self, name, value, amount): def _send_name_claim(self, name, value, amount, certificate_id=None):
broadcast = False log.info("Send claim: %s for %s: %s ", name, amount, value)
log.debug("Name claim %s %f", name, amount) claim_out = yield self._run_cmd_as_defer_succeed('claim', name, value, amount,
tx = yield self._run_cmd_as_defer_succeed('claim', name, value, amount, broadcast) certificate_id=certificate_id)
claim_out = yield self._broadcast_claim_transaction(tx)
defer.returnValue(claim_out) defer.returnValue(claim_out)
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): def _abandon_claim(self, claim_id):
log.debug("Update %s %d %f %s %s", claim_outpoint['txid'], claim_outpoint['nout'], log.debug("Abandon %s" % claim_id)
amount, name, claim_id) tx_out = yield self._run_cmd_as_defer_succeed('abandon', claim_id)
broadcast = False defer.returnValue(tx_out)
tx = yield self._run_cmd_as_defer_succeed(
'update', claim_outpoint['txid'], claim_outpoint['nout'],
name, claim_id, value, amount, broadcast
)
claim_out = yield self._broadcast_claim_transaction(tx)
defer.returnValue(claim_out)
@defer.inlineCallbacks
def _abandon_claim(self, claim_outpoint):
log.debug("Abandon %s %s" % (claim_outpoint['txid'], claim_outpoint['nout']))
broadcast = False
abandon_tx = yield self._run_cmd_as_defer_succeed(
'abandon', claim_outpoint['txid'], claim_outpoint['nout'], broadcast
)
claim_out = yield self._broadcast_claim_transaction(abandon_tx)
defer.returnValue(claim_out)
@defer.inlineCallbacks @defer.inlineCallbacks
def _support_claim(self, name, claim_id, amount): def _support_claim(self, name, claim_id, amount):
@ -1085,20 +1049,30 @@ class LBRYumWallet(Wallet):
return d return d
def _get_value_for_name(self, name): def _get_value_for_name(self, name):
height_to_check = self.network.get_local_height() - RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS + 1 if not name:
if height_to_check < 0: raise Exception("No name given")
msg = "Height to check is less than 0, blockchain headers are likely not initialized" return self._run_cmd_as_defer_to_thread('getvalueforname', name)
raise Exception(msg)
block_header = self.network.blockchain.read_header(height_to_check) def _get_value_for_uri(self, uri):
block_hash = self.network.blockchain.hash_header(block_header) if not uri:
d = self._run_cmd_as_defer_to_thread('requestvalueforname', name, block_hash) raise Exception("No uri given")
d.addCallback(lambda response: Commands._verify_proof(name, block_header['claim_trie_root'], return self._run_cmd_as_defer_to_thread('getvalueforuri', uri)
response))
return d def _claim_certificate(self, name, amount):
return self._run_cmd_as_defer_to_thread('claimcertificate', name, amount)
def _get_certificate_claims(self):
return self._run_cmd_as_defer_succeed('getcertificateclaims')
def get_claims_from_tx(self, txid): def get_claims_from_tx(self, txid):
return self._run_cmd_as_defer_to_thread('getclaimsfromtx', txid) return self._run_cmd_as_defer_to_thread('getclaimsfromtx', txid)
def _get_claim_by_outpoint(self, txid, nout):
return self._run_cmd_as_defer_to_thread('getclaimbyoutpoint', txid, nout)
def get_claim_by_claimid(self, claim_id):
return self._run_cmd_as_defer_to_thread('getclaimbyid', claim_id)
def _get_balance_for_address(self, address): def _get_balance_for_address(self, address):
return defer.succeed(Decimal(self.wallet.get_addr_received(address)) / COIN) return defer.succeed(Decimal(self.wallet.get_addr_received(address)) / COIN)

View file

@ -8,6 +8,7 @@ import json
import pkg_resources import pkg_resources
from lbryschema.claim import ClaimDict
from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.cryptoutils import get_lbry_hash_obj
# digest_size is in bytes, and blob hashes are hex encoded # digest_size is in bytes, and blob hashes are hex encoded
@ -112,6 +113,8 @@ def short_hash(hash_str):
def get_sd_hash(stream_info): def get_sd_hash(stream_info):
if not stream_info: if not stream_info:
return None return None
if isinstance(stream_info, ClaimDict):
return stream_info.source_hash
return stream_info['stream']['source']['source'] return stream_info['stream']['source']['source']

View file

@ -135,7 +135,7 @@ class EncryptedFileManager(object):
rowid, stream_hash, payment_rate_manager, blob_data_rate=options) rowid, stream_hash, payment_rate_manager, blob_data_rate=options)
yield downloader.restore() yield downloader.restore()
except Exception: except Exception:
log.exception('An error occurred while starting a lbry file (%s, %s, %s)', log.error('An error occurred while starting a lbry file (%s, %s, %s)',
rowid, stream_hash, options) rowid, stream_hash, options)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -12,7 +12,7 @@ from requests import exceptions as requests_exceptions
import random import random
from twisted.web import server from twisted.web import server
from twisted.internet import defer, threads, error, reactor, task from twisted.internet import defer, threads, error, reactor
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -25,7 +25,7 @@ from lbrynet.reflector import reupload
from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.metadata.Fee import FeeValidator from lbrynet.metadata.Fee import FeeValidator
from lbrynet.metadata.Metadata import verify_name_characters from lbrynet.metadata.Metadata import verify_name_characters
from lbryschema.decode import smart_decode
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
@ -695,25 +695,47 @@ class Daemon(AuthJSONRPCServer):
return finished_d return finished_d
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, stream_info, timeout=None, download_directory=None, def _download_name(self, name, stream_info, claim_id, timeout=None, download_directory=None,
file_name=None, wait_for_write=True): file_name=None):
""" """
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
""" """
timeout = timeout if timeout is not None else conf.settings['download_timeout'] if claim_id in self.streams:
downloader = self.streams[claim_id]
result = yield downloader.finished_deferred
defer.returnValue(result)
else:
download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, stream_info)
helper = _DownloadNameHelper(self, name, timeout, download_directory, file_name, self.streams[claim_id] = GetStream(self.sd_identifier, self.session,
wait_for_write) self.session.wallet, self.lbry_file_manager,
lbry_file = yield helper.setup_stream(stream_info) self.exchange_rate_manager, self.max_key_fee,
sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) conf.settings['data_rate'], timeout,
defer.returnValue((sd_hash, file_path)) download_directory, file_name)
try:
download = self.streams[claim_id].start(stream_info, name)
self.streams[claim_id].finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(download_id,
name,
stream_info))
lbry_file = yield download
result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
del self.streams[claim_id]
except Exception as err:
log.warning('Failed to get %s: %s', name, err)
self.analytics_manager.send_download_errored(download_id, name, stream_info)
del self.streams[claim_id]
result = {'error': err.message}
defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def _publish_stream(self, name, bid, claim_dict, file_path=None): def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None):
publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet) publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet,
certificate_id)
verify_name_characters(name) verify_name_characters(name)
if bid <= 0.0: if bid <= 0.0:
raise Exception("Invalid bid") raise Exception("Invalid bid")
@ -890,7 +912,7 @@ class Daemon(AuthJSONRPCServer):
lbry_file.txid, lbry_file.txid,
lbry_file.nout) lbry_file.nout)
try: try:
metadata = smart_decode(claim['value']).claim_dict metadata = claim['value']
except: except:
metadata = None metadata = None
try: try:
@ -946,6 +968,7 @@ class Daemon(AuthJSONRPCServer):
lbry_file_dict = yield self._get_lbry_file_dict(lbry_file, full_status=full_status) lbry_file_dict = yield self._get_lbry_file_dict(lbry_file, full_status=full_status)
file_dicts.append(lbry_file_dict) file_dicts.append(lbry_file_dict)
lbry_files = file_dicts lbry_files = file_dicts
log.info("Collected %i lbry files", len(lbry_files))
defer.returnValue(lbry_files) defer.returnValue(lbry_files)
# TODO: do this and get_blobs_for_sd_hash in the stream info manager # TODO: do this and get_blobs_for_sd_hash in the stream info manager
@ -1337,10 +1360,6 @@ class Daemon(AuthJSONRPCServer):
resolvable resolvable
""" """
if not name:
# TODO: seems like we should raise an error here
defer.returnValue(None)
try: try:
metadata = yield self._resolve_name(name, force_refresh=force) metadata = yield self._resolve_name(name, force_refresh=force)
except UnknownNameError: except UnknownNameError:
@ -1386,19 +1405,42 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_get(self, name, claim_id=None, file_name=None, timeout=None, def jsonrpc_resolve(self, uri):
download_directory=None, wait_for_write=True): """
Resolve a LBRY URI
Args:
'uri': (str) uri to download
Returns:
{
'claim_id': (str) claim id,
'claim_sequence': (int) claim sequence number,
'decoded_claim': (bool) whether or not the claim value was decoded,
'depth': (int) claim depth,
'has_signature': (bool) included if decoded_claim
'name': (str) claim name,
'txid': (str) claim txid,
'nout': (str) claim nout,
'signature_is_valid': (bool), included if has_signature,
'value': ClaimDict if decoded, otherwise hex string
}
"""
resolved = yield self.session.wallet.resolve_uri(uri)
results = yield self._render_response(resolved)
defer.returnValue(results)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_get(self, uri, file_name=None, timeout=None, download_directory=None):
""" """
Download stream from a LBRY name. Download stream from a LBRY name.
Args: Args:
'name': (str) name to download 'uri': (str) lbry uri to download
'claim_id' (optional): (str) claim id for claim to download
'file_name'(optional): (str) a user specified name for the downloaded file 'file_name'(optional): (str) a user specified name for the downloaded file
'timeout'(optional): (int) download timeout in number of seconds 'timeout'(optional): (int) download timeout in number of seconds
'download_directory'(optional): (str) path to directory where file will be saved 'download_directory'(optional): (str) path to directory where file will be saved
'wait_for_write'(optional): (bool) defaults to True. When set, waits for the file to
only start to be written before returning any results.
Returns: Returns:
(dict) Dictionary contaning information about the stream (dict) Dictionary contaning information about the stream
@ -1426,34 +1468,26 @@ class Daemon(AuthJSONRPCServer):
""" """
def _get_claim(_claim_id, _claims):
#TODO: do this in Wallet class
for claim in _claims['claims']:
if claim['claim_id'] == _claim_id:
return smart_decode(claim['value']).claim_dict
log.info("Received request to get %s", name)
timeout = timeout if timeout is not None else self.download_timeout timeout = timeout if timeout is not None else self.download_timeout
download_directory = download_directory or self.download_directory download_directory = download_directory or self.download_directory
if name in self.streams:
resolved = yield self.session.wallet.resolve_uri(uri)
if 'value' not in resolved:
if 'claim' not in resolved:
raise Exception("Nothing to download")
else:
resolved = resolved['claim']
name = resolved['name']
claim_id = resolved['claim_id']
stream_info = resolved['value']
if claim_id in self.streams:
log.info("Already waiting on lbry://%s to start downloading", name) log.info("Already waiting on lbry://%s to start downloading", name)
yield self.streams[name].data_downloading_deferred yield self.streams[claim_id].data_downloading_deferred
stream_info = None lbry_file = yield self._get_lbry_file(FileID.CLAIM_ID, claim_id, return_json=False)
lbry_file = None
if claim_id:
lbry_file = yield self._get_lbry_file(FileID.CLAIM_ID, claim_id, return_json=False)
claims = yield self.session.wallet.get_claims_for_name(name)
formatted_claims = format_json_out_amount_as_float(claims)
stream_info = _get_claim(claim_id, formatted_claims)
if not stream_info:
log.error("No claim %s for lbry://%s, using winning claim", claim_id, name)
if not stream_info:
lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False)
stream_info = yield self._resolve_name(name)
if lbry_file: if lbry_file:
if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)): if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)):
@ -1464,26 +1498,9 @@ class Daemon(AuthJSONRPCServer):
log.info('Already have a file for %s', name) log.info('Already have a file for %s', name)
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
else: else:
download_id = utils.random_string() result = yield self._download_name(name, stream_info, claim_id, timeout=timeout,
download_directory=download_directory,
self.analytics_manager.send_download_started(download_id, name, stream_info) file_name=file_name)
try:
yield self._download_name(name=name, stream_info=stream_info, timeout=timeout,
download_directory=download_directory,
file_name=file_name, wait_for_write=wait_for_write)
stream = self.streams[name]
stream.finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(
download_id, name, stream_info)
)
result = yield self._get_lbry_file_dict(self.streams[name].downloader,
full_status=True)
except Exception as e:
# TODO: should reraise here, instead of returning e.message
log.warning('Failed to get %s', name)
self.analytics_manager.send_download_errored(download_id, name, stream_info)
result = e.message
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@ -1537,7 +1554,7 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_file_delete(self, delete_target_file=True, **kwargs): def jsonrpc_file_delete(self, delete_target_file=True, delete_all=False, **kwargs):
""" """
Delete a lbry file Delete a lbry file
@ -1557,21 +1574,27 @@ class Daemon(AuthJSONRPCServer):
""" """
lbry_files = yield self._get_lbry_files(return_json=False, **kwargs) lbry_files = yield self._get_lbry_files(return_json=False, **kwargs)
if len(lbry_files) > 1: if len(lbry_files) > 1:
log.warning("There are %i files to delete, use narrower filters to select one", if not delete_all:
len(lbry_files)) log.warning("There are %i files to delete, use narrower filters to select one",
result = False len(lbry_files))
elif not lbry_files: result = False
else:
log.warning("Deleting %i files",
len(lbry_files))
if not lbry_files:
log.warning("There is no file to delete") log.warning("There is no file to delete")
result = False result = False
else: else:
lbry_file = lbry_files[0] for lbry_file in lbry_files:
file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash
if lbry_file.claim_id in self.streams: if lbry_file.claim_id in self.streams:
del self.streams[lbry_file.claim_id] del self.streams[lbry_file.claim_id]
yield self.lbry_file_manager.delete_lbry_file(lbry_file, yield self.lbry_file_manager.delete_lbry_file(lbry_file,
delete_file=delete_target_file) delete_file=delete_target_file)
log.info("Deleted %s (%s)", file_name, utils.short_hash(stream_hash)) log.info("Deleted %s (%s)", file_name, utils.short_hash(stream_hash))
result = True result = True
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@ -1597,11 +1620,51 @@ class Daemon(AuthJSONRPCServer):
cost = yield self.get_est_cost(name, size) cost = yield self.get_est_cost(name, size)
defer.returnValue(cost) defer.returnValue(cost)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_channel_new(self, channel_name, amount):
"""
Generate a publisher key and create a new certificate claim
Args:
'name': (str) '@' prefixed name
'amount': (float) amount to claim name
Returns:
(dict) Dictionary containing result of the claim
{
'tx' : (str) hex encoded transaction
'txid' : (str) txid of resulting claim
'nout' : (int) nout of the resulting claim
'fee' : (float) fee paid for the claim transaction
'claim_id' : (str) claim ID of the resulting claim
}
"""
result = yield self.session.wallet.claim_new_channel(channel_name, amount)
response = yield self._render_response(result)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_channel_list_mine(self):
"""
Get my channels
Returns:
(list) ClaimDict
"""
result = yield self.session.wallet.channel_list()
response = yield self._render_response(result)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None, def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None,
description=None, author=None, language=None, license=None, description=None, author=None, language=None, license=None,
license_url=None, thumbnail=None, preview=None, nsfw=None, sources=None): license_url=None, thumbnail=None, preview=None, nsfw=None, sources=None,
channel_name=None):
""" """
Make a new name claim and publish associated data to lbrynet, Make a new name claim and publish associated data to lbrynet,
update over existing claim if user already has a claim for name. update over existing claim if user already has a claim for name.
@ -1641,6 +1704,7 @@ class Daemon(AuthJSONRPCServer):
'preview'(optional): (str) preview URL for the file 'preview'(optional): (str) preview URL for the file
'nsfw'(optional): (bool) True if not safe for work 'nsfw'(optional): (bool) True if not safe for work
'sources'(optional): (dict){'lbry_sd_hash':sd_hash} specifies sd hash of file 'sources'(optional): (dict){'lbry_sd_hash':sd_hash} specifies sd hash of file
'channel_name' (optional): (str) name of the publisher channel
Returns: Returns:
(dict) Dictionary containing result of the claim (dict) Dictionary containing result of the claim
@ -1691,10 +1755,11 @@ class Daemon(AuthJSONRPCServer):
else: else:
address = fee_dict['address'] address = fee_dict['address']
new_fee_dict = { new_fee_dict = {
'version':'_0_0_1', 'version': '_0_0_1',
'currency': currency, 'currency': currency,
'address':address, 'address': address,
'amount':fee_dict['amount']} 'amount': fee_dict['amount']
}
metadata['fee'] = new_fee_dict metadata['fee'] = new_fee_dict
log.info("Publish: %s", { log.info("Publish: %s", {
@ -1706,14 +1771,30 @@ class Daemon(AuthJSONRPCServer):
}) })
claim_dict = { claim_dict = {
'version':'_0_0_1', 'version': '_0_0_1',
'claimType':'streamType', 'claimType': 'streamType',
'stream':{'metadata':metadata, 'version':'_0_0_1'}} 'stream': {
'metadata': metadata,
'version': '_0_0_1'
}
}
if sources is not None: if sources is not None:
claim_dict['stream']['source'] = sources claim_dict['stream']['source'] = sources
result = yield self._publish_stream(name, bid, claim_dict, file_path) if channel_name:
certificate_id = None
my_certificates = yield self.session.wallet.channel_list()
for certificate in my_certificates:
if channel_name == certificate['name']:
certificate_id = certificate['claim_id']
break
if not certificate_id:
raise Exception("Cannot publish using channel %s" % channel_name)
else:
certificate_id = None
result = yield self._publish_stream(name, bid, claim_dict, file_path, certificate_id)
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@ -1726,13 +1807,12 @@ class Daemon(AuthJSONRPCServer):
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_claim_abandon(self, txid, nout): def jsonrpc_claim_abandon(self, claim_id):
""" """
Abandon a name and reclaim credits from the claim Abandon a name and reclaim credits from the claim
Args: Args:
'txid': (str) txid of claim 'claim_id': (str) claim_id of claim
'nout': (int) nout of claim
Return: Return:
(dict) Dictionary containing result of the claim (dict) Dictionary containing result of the claim
{ {
@ -1742,7 +1822,7 @@ class Daemon(AuthJSONRPCServer):
""" """
try: try:
abandon_claim_tx = yield self.session.wallet.abandon_claim(txid, nout) abandon_claim_tx = yield self.session.wallet.abandon_claim(claim_id)
response = yield self._render_response(abandon_claim_tx) response = yield self._render_response(abandon_claim_tx)
except BaseException as err: except BaseException as err:
log.warning(err) log.warning(err)
@ -1862,6 +1942,7 @@ class Daemon(AuthJSONRPCServer):
""" """
return self.jsonrpc_claim_list(**kwargs) return self.jsonrpc_claim_list(**kwargs)
@defer.inlineCallbacks
def jsonrpc_claim_list(self, name): def jsonrpc_claim_list(self, name):
""" """
Get claims for a name Get claims for a name
@ -1890,10 +1971,8 @@ class Daemon(AuthJSONRPCServer):
} }
""" """
d = self.session.wallet.get_claims_for_name(name) claims = yield self.session.wallet.get_claims_for_name(name)
d.addCallback(format_json_out_amount_as_float) defer.returnValue(claims)
d.addCallback(lambda r: self._render_response(r))
return d
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_get_transaction_history(self): def jsonrpc_get_transaction_history(self):
@ -2401,114 +2480,6 @@ class Daemon(AuthJSONRPCServer):
return d return d
class _DownloadNameHelper(object):
def __init__(self, daemon, name, timeout=None, download_directory=None, file_name=None,
wait_for_write=True):
self.daemon = daemon
self.name = name
self.timeout = timeout if timeout is not None else conf.settings['download_timeout']
if not download_directory or not os.path.isdir(download_directory):
self.download_directory = daemon.download_directory
else:
self.download_directory = download_directory
self.file_name = file_name
self.wait_for_write = wait_for_write
@defer.inlineCallbacks
def setup_stream(self, stream_info):
sd_hash = utils.get_sd_hash(stream_info)
lbry_file = yield self.daemon._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
if self._does_lbry_file_exists(lbry_file):
defer.returnValue(lbry_file)
else:
defer.returnValue(None)
def _does_lbry_file_exists(self, lbry_file):
return lbry_file and os.path.isfile(self._full_path(lbry_file))
def _full_path(self, lbry_file):
return os.path.join(self.download_directory, lbry_file.file_name)
@defer.inlineCallbacks
def wait_or_get_stream(self, stream_info, lbry_file):
if lbry_file:
log.debug('Wait on lbry_file')
# returns the lbry_file
yield self._wait_on_lbry_file(lbry_file)
defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file)))
else:
log.debug('No lbry_file, need to get stream')
# returns an instance of ManagedEncryptedFileDownloaderFactory
sd_hash, file_path = yield self._get_stream(stream_info)
defer.returnValue((sd_hash, file_path))
def _wait_on_lbry_file(self, f):
file_path = self._full_path(f)
written_bytes = self._get_written_bytes(file_path)
if written_bytes:
log.info("File has bytes: %s --> %s", f.sd_hash, file_path)
return defer.succeed(True)
return task.deferLater(reactor, 1, self._wait_on_lbry_file, f)
@defer.inlineCallbacks
def _get_stream(self, stream_info):
try:
download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
self.remove_from_wait(None)
except (InsufficientFundsError, Exception) as err:
if Failure(err).check(InsufficientFundsError):
log.warning("Insufficient funds to download lbry://%s", self.name)
self.remove_from_wait("Insufficient funds")
else:
log.warning("lbry://%s timed out, removing from streams", self.name)
self.remove_from_wait("Timed out")
if self.daemon.streams[self.name].downloader is not None:
yield self.daemon.lbry_file_manager.delete_lbry_file(
self.daemon.streams[self.name].downloader)
del self.daemon.streams[self.name]
raise err
if self.wait_for_write:
yield self._wait_for_write()
defer.returnValue((self.daemon.streams[self.name].sd_hash, download_path))
def _wait_for_write(self):
d = defer.succeed(None)
if not self._has_downloader_wrote():
d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write))
return d
def _has_downloader_wrote(self):
stream = self.daemon.streams.get(self.name, False)
if stream:
file_path = self._full_path(stream.downloader)
return self._get_written_bytes(file_path)
else:
return False
def _get_written_bytes(self, file_path):
"""Returns the number of bytes written to `file_path`.
Returns False if there were issues reading `file_path`.
"""
try:
if os.path.isfile(file_path):
with open(file_path) as written_file:
written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell()
else:
written_bytes = False
except Exception:
writen_bytes = False
return written_bytes
def remove_from_wait(self, reason):
if self.name in self.daemon.waiting_on:
del self.daemon.waiting_on[self.name]
return reason
class _ResolveNameHelper(object): class _ResolveNameHelper(object):
def __init__(self, daemon, name, force_refresh): def __init__(self, daemon, name, force_refresh):
self.daemon = daemon self.daemon = daemon
@ -2518,7 +2489,7 @@ class _ResolveNameHelper(object):
def get_deferred(self): def get_deferred(self):
if self.need_fresh_stream(): if self.need_fresh_stream():
log.info("Resolving stream info for lbry://%s", self.name) log.info("Resolving stream info for lbry://%s", self.name)
d = self.wallet.get_stream_info_for_name(self.name) d = self.wallet.get_claim_by_name(self.name)
d.addCallback(self._cache_stream_info) d.addCallback(self._cache_stream_info)
else: else:
log.debug("Returning cached stream info for lbry://%s", self.name) log.debug("Returning cached stream info for lbry://%s", self.name)
@ -2542,11 +2513,10 @@ class _ResolveNameHelper(object):
def _cache_stream_info(self, stream_info): def _cache_stream_info(self, stream_info):
self.daemon.name_cache[self.name] = { self.daemon.name_cache[self.name] = {
'claim_metadata': stream_info, 'claim_metadata': stream_info['value'],
'timestamp': self.now() 'timestamp': self.now()
} }
d = self.wallet.get_txid_for_name(self.name) d = self._add_txid(stream_info['txid'])
d.addCallback(self._add_txid)
d.addCallback(lambda _: self.daemon._update_claim_cache()) d.addCallback(lambda _: self.daemon._update_claim_cache())
d.addCallback(lambda _: self.name_data['claim_metadata']) d.addCallback(lambda _: self.name_data['claim_metadata'])
return d return d

View file

@ -1,6 +1,6 @@
import logging import logging
import os import os
from twisted.internet import defer from twisted.internet import defer, threads
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from lbrynet.core import utils from lbrynet.core import utils
@ -63,6 +63,8 @@ class GetStream(object):
# fired after the metadata and the first data blob have been downloaded # fired after the metadata and the first data blob have been downloaded
self.data_downloading_deferred = defer.Deferred(None) self.data_downloading_deferred = defer.Deferred(None)
self._running = False
@property @property
def download_path(self): def download_path(self):
return os.path.join(self.download_directory, self.downloader.file_name) return os.path.join(self.download_directory, self.downloader.file_name)
@ -88,7 +90,7 @@ class GetStream(object):
elif self.downloader: elif self.downloader:
d = self.downloader.status() d = self.downloader.status()
d.addCallback(self._check_status) d.addCallback(self._check_status)
else: elif self._running:
log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter) log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter)
def convert_max_fee(self): def convert_max_fee(self):
@ -158,10 +160,21 @@ class GetStream(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def download(self, stream_info, name): def download(self, stream_info, name):
if self._running:
raise Exception("Already running")
self._running = True
self.set_status(INITIALIZING_CODE, name) self.set_status(INITIALIZING_CODE, name)
self.sd_hash = utils.get_sd_hash(stream_info) self.sd_hash = utils.get_sd_hash(stream_info)
if 'fee' in stream_info['stream']['metadata']: if 'fee' in stream_info['stream']['metadata']:
fee = self.check_fee(stream_info['stream']['metadata']['fee']) try:
fee = yield threads.deferToThread(self.check_fee,
stream_info['stream']['metadata']['fee'])
except Exception as err:
self._running = False
self.finished_deferred.errback(err)
raise err
else: else:
fee = None fee = None
@ -184,7 +197,7 @@ class GetStream(object):
safe_start(self.checker) safe_start(self.checker)
self.download(stream_info, name) self.download(stream_info, name)
yield self.data_downloading_deferred yield self.data_downloading_deferred
defer.returnValue(self.download_path) defer.returnValue(self.downloader)
except Exception as err: except Exception as err:
safe_stop(self.checker) safe_stop(self.checker)
raise err raise err

View file

@ -64,7 +64,6 @@ class MarketFeed(object):
self.rate = ExchangeRate(self.market, price, int(time.time())) self.rate = ExchangeRate(self.market, price, int(time.time()))
def _log_error(self, err): def _log_error(self, err):
log.error(err)
log.warning( log.warning(
"There was a problem updating %s exchange rate information from %s", "There was a problem updating %s exchange rate information from %s",
self.market, self.name) self.market, self.name)

View file

@ -13,10 +13,11 @@ log = logging.getLogger(__name__)
class Publisher(object): class Publisher(object):
def __init__(self, session, lbry_file_manager, wallet): def __init__(self, session, lbry_file_manager, wallet, certificate_id):
self.session = session self.session = session
self.lbry_file_manager = lbry_file_manager self.lbry_file_manager = lbry_file_manager
self.wallet = wallet self.wallet = wallet
self.certificate_id = certificate_id
self.lbry_file = None self.lbry_file = None
""" """
@ -56,7 +57,8 @@ class Publisher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def make_claim(self, name, bid, claim_dict): def make_claim(self, name, bid, claim_dict):
claim_out = yield self.wallet.claim_name(name, bid, claim_dict) claim_out = yield self.wallet.claim_name(name, bid, claim_dict,
certificate_id=self.certificate_id)
defer.returnValue(claim_out) defer.returnValue(claim_out)

View file

@ -4,7 +4,7 @@ from twisted.trial import unittest
from twisted.internet import threads, defer from twisted.internet import threads, defer
from lbrynet.core.Error import InsufficientFundsError from lbrynet.core.Error import InsufficientFundsError
from lbrynet.core.Wallet import Wallet, ReservedPoints from lbrynet.core.Wallet import Wallet, ReservedPoints, InMemoryStorage
test_metadata = { test_metadata = {
'license': 'NASA', 'license': 'NASA',
@ -29,6 +29,8 @@ class MocLbryumWallet(Wallet):
self.wallet_balance = Decimal(10.0) self.wallet_balance = Decimal(10.0)
self.total_reserved_points = Decimal(0.0) self.total_reserved_points = Decimal(0.0)
self.queued_payments = defaultdict(Decimal) self.queued_payments = defaultdict(Decimal)
self._storage = InMemoryStorage()
def get_name_claims(self): def get_name_claims(self):
return threads.deferToThread(lambda: []) return threads.deferToThread(lambda: [])
@ -50,7 +52,7 @@ class WalletTest(unittest.TestCase):
def test_successful_send_name_claim(self): def test_successful_send_name_claim(self):
expected_claim_out = { expected_claim_out = {
"claimid": "f43dc06256a69988bdbea09a58c80493ba15dcfa", "claim_id": "f43dc06256a69988bdbea09a58c80493ba15dcfa",
"fee": "0.00012", "fee": "0.00012",
"nout": 0, "nout": 0,
"success": True, "success": True,
@ -59,12 +61,12 @@ class WalletTest(unittest.TestCase):
def check_out(claim_out): def check_out(claim_out):
self.assertTrue('success' not in claim_out) self.assertTrue('success' not in claim_out)
self.assertEqual(expected_claim_out['claimid'], claim_out['claimid']) self.assertEqual(expected_claim_out['claim_id'], claim_out['claim_id'])
self.assertEqual(expected_claim_out['fee'], claim_out['fee']) self.assertEqual(expected_claim_out['fee'], claim_out['fee'])
self.assertEqual(expected_claim_out['nout'], claim_out['nout']) self.assertEqual(expected_claim_out['nout'], claim_out['nout'])
self.assertEqual(expected_claim_out['txid'], claim_out['txid']) self.assertEqual(expected_claim_out['txid'], claim_out['txid'])
def success_send_name_claim(self, name, val, amount): def success_send_name_claim(self, name, val, amount, certificate_id=None):
return expected_claim_out return expected_claim_out
MocLbryumWallet._send_name_claim = success_send_name_claim MocLbryumWallet._send_name_claim = success_send_name_claim
@ -111,8 +113,8 @@ class WalletTest(unittest.TestCase):
return threads.deferToThread(lambda: claim_out) return threads.deferToThread(lambda: claim_out)
MocLbryumWallet._abandon_claim = failed_abandon_claim MocLbryumWallet._abandon_claim = failed_abandon_claim
wallet = MocLbryumWallet() wallet = MocLbryumWallet()
d = wallet.abandon_claim("11030a76521e5f552ca87ad70765d0cc52e6ea4c0dc0063335e6cf2a9a85085f", 1) d = wallet.abandon_claim("f43dc06256a69988bdbea09a58c80493ba15dcfa")
self.assertFailure(d,Exception) self.assertFailure(d, Exception)
return d return d
def test_successful_abandon(self): def test_successful_abandon(self):
@ -132,7 +134,7 @@ class WalletTest(unittest.TestCase):
MocLbryumWallet._abandon_claim = success_abandon_claim MocLbryumWallet._abandon_claim = success_abandon_claim
wallet = MocLbryumWallet() wallet = MocLbryumWallet()
d = wallet.abandon_claim("0578c161ad8d36a7580c557d7444f967ea7f988e194c20d0e3c42c3cabf110dd", 1) d = wallet.abandon_claim("f43dc06256a69988bdbea09a58c80493ba15dcfa")
d.addCallback(lambda claim_out: check_out(claim_out)) d.addCallback(lambda claim_out: check_out(claim_out))
return d return d
@ -187,7 +189,3 @@ class WalletTest(unittest.TestCase):
d.addCallback(lambda _: wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 4)) d.addCallback(lambda _: wallet.support_claim('test', "f43dc06256a69988bdbea09a58c80493ba15dcfa", 4))
self.assertFailure(d,InsufficientFundsError) self.assertFailure(d,InsufficientFundsError)
return d return d