fix up LBRYcrdWallet and split it into two classes
This commit is contained in:
parent
c245c69389
commit
ec3e365b3c
4 changed files with 461 additions and 304 deletions
|
@ -39,11 +39,11 @@ def _catch_connection_error(f):
|
||||||
return w
|
return w
|
||||||
|
|
||||||
|
|
||||||
class LBRYcrdWallet(object):
|
class LBRYWallet(object):
|
||||||
"""This class implements the LBRYWallet interface for the LBRYcrd payment system"""
|
"""This class implements the LBRYWallet interface for the LBRYcrd payment system"""
|
||||||
implements(ILBRYWallet)
|
implements(ILBRYWallet)
|
||||||
|
|
||||||
def __init__(self, db_dir, wallet_dir=None, wallet_conf=None, lbrycrdd_path=None):
|
def __init__(self, db_dir):
|
||||||
|
|
||||||
self.db_dir = db_dir
|
self.db_dir = db_dir
|
||||||
self.db = None
|
self.db = None
|
||||||
|
@ -58,44 +58,29 @@ class LBRYcrdWallet(object):
|
||||||
# incremental_amount(float))
|
# incremental_amount(float))
|
||||||
self.max_expected_payment_time = datetime.timedelta(minutes=3)
|
self.max_expected_payment_time = datetime.timedelta(minutes=3)
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
self.started_lbrycrdd = False
|
|
||||||
self.wallet_dir = wallet_dir
|
|
||||||
self.wallet_conf = wallet_conf
|
|
||||||
self.lbrycrdd = None
|
|
||||||
self.manage_running = False
|
|
||||||
self.lbrycrdd_path = lbrycrdd_path
|
|
||||||
|
|
||||||
settings = self.get_rpc_conf()
|
self.manage_running = False
|
||||||
rpc_user = settings["username"]
|
|
||||||
rpc_pass = settings["password"]
|
|
||||||
rpc_port = settings["rpc_port"]
|
|
||||||
rpc_url = "127.0.0.1"
|
|
||||||
self.rpc_conn_string = "http://%s:%s@%s:%s" % (rpc_user, rpc_pass, rpc_url, str(rpc_port))
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
|
|
||||||
def make_connection():
|
|
||||||
alert.info("Connecting to lbrycrdd...")
|
|
||||||
if self.lbrycrdd_path is not None:
|
|
||||||
self._start_daemon()
|
|
||||||
self._get_info()
|
|
||||||
log.info("Connected!")
|
|
||||||
alert.info("Connected to lbrycrdd.")
|
|
||||||
|
|
||||||
def start_manage():
|
def start_manage():
|
||||||
self.stopped = False
|
self.stopped = False
|
||||||
self.manage()
|
self.manage()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
d = self._open_db()
|
d = self._open_db()
|
||||||
d.addCallback(lambda _: threads.deferToThread(make_connection))
|
d.addCallback(lambda _: self._start())
|
||||||
d.addCallback(lambda _: start_manage())
|
d.addCallback(lambda _: start_manage())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def stop(self):
|
def _start(self):
|
||||||
|
pass
|
||||||
|
|
||||||
def log_stop_error(err):
|
@staticmethod
|
||||||
log.error("An error occurred stopping the wallet. %s", err.getTraceback())
|
def log_stop_error(err):
|
||||||
|
log.error("An error occurred stopping the wallet: %s", err.getTraceback())
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
# If self.next_manage_call is None, then manage is currently running or else
|
# If self.next_manage_call is None, then manage is currently running or else
|
||||||
|
@ -105,12 +90,14 @@ class LBRYcrdWallet(object):
|
||||||
self.next_manage_call = None
|
self.next_manage_call = None
|
||||||
|
|
||||||
d = self.manage()
|
d = self.manage()
|
||||||
d.addErrback(log_stop_error)
|
d.addErrback(self.log_stop_error)
|
||||||
if self.lbrycrdd_path is not None:
|
d.addCallback(lambda _: self._stop())
|
||||||
d.addCallback(lambda _: self._stop_daemon())
|
d.addErrback(self.log_stop_error)
|
||||||
d.addErrback(log_stop_error)
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def _stop(self):
|
||||||
|
pass
|
||||||
|
|
||||||
def manage(self):
|
def manage(self):
|
||||||
log.info("Doing manage")
|
log.info("Doing manage")
|
||||||
self.next_manage_call = None
|
self.next_manage_call = None
|
||||||
|
@ -137,7 +124,7 @@ class LBRYcrdWallet(object):
|
||||||
|
|
||||||
d.addCallback(lambda _: self._send_payments())
|
d.addCallback(lambda _: self._send_payments())
|
||||||
|
|
||||||
d.addCallback(lambda _: threads.deferToThread(self._get_wallet_balance))
|
d.addCallback(lambda _: self.get_balance())
|
||||||
|
|
||||||
def set_wallet_balance(balance):
|
def set_wallet_balance(balance):
|
||||||
self.wallet_balance = balance
|
self.wallet_balance = balance
|
||||||
|
@ -164,16 +151,6 @@ class LBRYcrdWallet(object):
|
||||||
d.addBoth(set_manage_not_running)
|
d.addBoth(set_manage_not_running)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def get_info_exchanger(self):
|
|
||||||
return LBRYcrdAddressRequester(self)
|
|
||||||
|
|
||||||
def get_wallet_info_query_handler_factory(self):
|
|
||||||
return LBRYcrdAddressQueryHandlerFactory(self)
|
|
||||||
|
|
||||||
def get_balance(self):
|
|
||||||
d = threads.deferToThread(self._get_wallet_balance)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def reserve_points(self, identifier, amount):
|
def reserve_points(self, identifier, amount):
|
||||||
"""
|
"""
|
||||||
Ensure a certain amount of points are available to be sent as payment, before the service is rendered
|
Ensure a certain amount of points are available to be sent as payment, before the service is rendered
|
||||||
|
@ -263,37 +240,66 @@ class LBRYcrdWallet(object):
|
||||||
def set_address_for_peer(address):
|
def set_address_for_peer(address):
|
||||||
self.current_address_given_to_peer[peer] = address
|
self.current_address_given_to_peer[peer] = address
|
||||||
return address
|
return address
|
||||||
d = threads.deferToThread(self._get_new_address)
|
d = self.get_new_address()
|
||||||
d.addCallback(set_address_for_peer)
|
d.addCallback(set_address_for_peer)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def get_stream_info_for_name(self, name, txid=None):
|
def _send_payments(self):
|
||||||
|
log.info("Trying to send payments, if there are any to be sent")
|
||||||
|
|
||||||
def get_stream_info_from_value(result):
|
payments_to_send = {}
|
||||||
r_dict = {}
|
for address, points in self.queued_payments.items():
|
||||||
if 'value' in result:
|
log.info("Should be sending %s points to %s", str(points), str(address))
|
||||||
value = result['value']
|
payments_to_send[address] = float(points)
|
||||||
try:
|
self.total_reserved_points -= points
|
||||||
value_dict = json.loads(value)
|
self.wallet_balance -= points
|
||||||
except ValueError:
|
del self.queued_payments[address]
|
||||||
return Failure(InvalidStreamInfoError(name))
|
if payments_to_send:
|
||||||
known_fields = ['stream_hash', 'name', 'description', 'key_fee', 'key_fee_address', 'thumbnail',
|
log.info("Creating a transaction with outputs %s", str(payments_to_send))
|
||||||
'content_license']
|
return self._do_send_many(payments_to_send)
|
||||||
for field in known_fields:
|
log.info("There were no payments to send")
|
||||||
if field in value_dict:
|
return defer.succeed(True)
|
||||||
r_dict[field] = value_dict[field]
|
|
||||||
if 'stream_hash' in r_dict and 'txid' in result:
|
def get_stream_info_for_name(self, name):
|
||||||
d = self._save_name_metadata(name, r_dict['stream_hash'], str(result['txid']))
|
d = self._get_value_for_name(name)
|
||||||
else:
|
d.addCallback(self._get_stream_info_from_value, name)
|
||||||
d = defer.succeed(True)
|
return d
|
||||||
d.addCallback(lambda _: r_dict)
|
|
||||||
return d
|
def get_stream_info_from_txid(self, name, txid):
|
||||||
|
d = self._get_claims_from_tx(txid)
|
||||||
|
|
||||||
|
def get_claim_for_name(claims):
|
||||||
|
for claim in claims:
|
||||||
|
if claim['name'] == name:
|
||||||
|
claim['txid'] = txid
|
||||||
|
return claim
|
||||||
return Failure(UnknownNameError(name))
|
return Failure(UnknownNameError(name))
|
||||||
|
|
||||||
d = threads.deferToThread(self._get_value_for_name, name, txid)
|
d.addCallback(get_claim_for_name)
|
||||||
d.addCallback(get_stream_info_from_value)
|
d.addCallback(self._get_stream_info_from_value, name)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def _get_stream_info_from_value(self, result, name):
|
||||||
|
r_dict = {}
|
||||||
|
if 'value' in result:
|
||||||
|
value = result['value']
|
||||||
|
try:
|
||||||
|
value_dict = json.loads(value)
|
||||||
|
except ValueError:
|
||||||
|
return Failure(InvalidStreamInfoError(name))
|
||||||
|
known_fields = ['stream_hash', 'name', 'description', 'key_fee', 'key_fee_address', 'thumbnail',
|
||||||
|
'content_license']
|
||||||
|
for field in known_fields:
|
||||||
|
if field in value_dict:
|
||||||
|
r_dict[field] = value_dict[field]
|
||||||
|
if 'stream_hash' in r_dict and 'txid' in result:
|
||||||
|
d = self._save_name_metadata(name, r_dict['stream_hash'], str(result['txid']))
|
||||||
|
else:
|
||||||
|
d = defer.succeed(True)
|
||||||
|
d.addCallback(lambda _: r_dict)
|
||||||
|
return d
|
||||||
|
return Failure(UnknownNameError(name))
|
||||||
|
|
||||||
def claim_name(self, name, sd_hash, amount, description=None, key_fee=None,
|
def claim_name(self, name, sd_hash, amount, description=None, key_fee=None,
|
||||||
key_fee_address=None, thumbnail=None, content_license=None):
|
key_fee_address=None, thumbnail=None, content_license=None):
|
||||||
value = {"stream_hash": sd_hash}
|
value = {"stream_hash": sd_hash}
|
||||||
|
@ -308,7 +314,7 @@ class LBRYcrdWallet(object):
|
||||||
if content_license is not None:
|
if content_license is not None:
|
||||||
value['content_license'] = content_license
|
value['content_license'] = content_license
|
||||||
|
|
||||||
d = threads.deferToThread(self._claim_name, name, json.dumps(value), amount)
|
d = self._send_name_claim(name, json.dumps(value), amount)
|
||||||
|
|
||||||
def _save_metadata(txid):
|
def _save_metadata(txid):
|
||||||
d = self._save_name_metadata(name, sd_hash, txid)
|
d = self._save_name_metadata(name, sd_hash, txid)
|
||||||
|
@ -319,70 +325,230 @@ class LBRYcrdWallet(object):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def abandon_name(self, txid):
|
def abandon_name(self, txid):
|
||||||
address = self._get_new_address()
|
d1 = self.get_new_address()
|
||||||
raw = self._get_raw_tx(txid)
|
d2 = self._get_claims_from_tx(txid)
|
||||||
transaction = self._get_decoded_tx(raw)
|
|
||||||
amount = float(transaction['vout'][1]['value'])
|
def get_txout_of_claim(claims):
|
||||||
return self._abandon_name(txid, address, amount)
|
for claim in claims:
|
||||||
|
if 'name' in claim and 'nOut' in claim:
|
||||||
|
return claim['nOut']
|
||||||
|
return defer.fail(ValueError("No claims in tx"))
|
||||||
|
|
||||||
|
def get_value_of_txout(nOut):
|
||||||
|
d = self._get_raw_tx(txid)
|
||||||
|
d.addCallback(self._get_decoded_tx)
|
||||||
|
d.addCallback(lambda tx: tx['vout'][nOut]['value'])
|
||||||
|
return d
|
||||||
|
|
||||||
|
d2.addCallback(get_txout_of_claim)
|
||||||
|
d2.addCallback(get_value_of_txout)
|
||||||
|
dl = defer.DeferredList([d1, d2], consumeErrors=True)
|
||||||
|
|
||||||
|
def abandon(results):
|
||||||
|
if results[0][0] and results[1][0]:
|
||||||
|
address = results[0][1]
|
||||||
|
amount = results[1][1]
|
||||||
|
return self._send_abandon(txid, address, amount)
|
||||||
|
elif results[0][0] is False:
|
||||||
|
return defer.fail(Failure(ValueError("Couldn't get a new address")))
|
||||||
|
else:
|
||||||
|
return results[1][1]
|
||||||
|
|
||||||
|
dl.addCallback(abandon)
|
||||||
|
return dl
|
||||||
|
|
||||||
def get_tx(self, txid):
|
def get_tx(self, txid):
|
||||||
raw = self._get_raw_tx(txid)
|
d = self._get_raw_tx(txid)
|
||||||
return self._get_decoded_tx(raw)
|
d.addCallback(self._get_decoded_tx)
|
||||||
|
return d
|
||||||
def get_name_claims(self):
|
|
||||||
return threads.deferToThread(self._get_name_claims)
|
|
||||||
|
|
||||||
def start_miner(self):
|
|
||||||
if not self._get_gen_status():
|
|
||||||
return self._set_gen_status(True)
|
|
||||||
else:
|
|
||||||
return "Miner was already running"
|
|
||||||
|
|
||||||
def stop_miner(self):
|
|
||||||
if self._get_gen_status():
|
|
||||||
return self._set_gen_status(False)
|
|
||||||
else:
|
|
||||||
return "Miner wasn't running"
|
|
||||||
|
|
||||||
def get_miner_status(self):
|
|
||||||
return self._get_gen_status()
|
|
||||||
|
|
||||||
def get_block(self, blockhash):
|
|
||||||
return self._get_block(blockhash)
|
|
||||||
|
|
||||||
def get_blockchain_info(self):
|
|
||||||
return self._get_blockchain_info()
|
|
||||||
|
|
||||||
def get_claims_for_tx(self, txid):
|
|
||||||
return self._get_claims_for_tx(txid)
|
|
||||||
|
|
||||||
def get_nametrie(self):
|
|
||||||
return self._get_nametrie()
|
|
||||||
|
|
||||||
# def update_name(self, name_value):
|
# def update_name(self, name_value):
|
||||||
# return self._update_name(name_value)
|
# return self._update_name(name_value)
|
||||||
|
|
||||||
def get_name_and_validity_for_sd_hash(self, sd_hash):
|
def get_name_and_validity_for_sd_hash(self, sd_hash):
|
||||||
d = self._get_claim_metadata_for_sd_hash(sd_hash)
|
d = self._get_claim_metadata_for_sd_hash(sd_hash)
|
||||||
d.addCallback(lambda name_txid: threads.deferToThread(self._get_status_of_claim, name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None)
|
d.addCallback(lambda name_txid: self._get_status_of_claim(name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def get_available_balance(self):
|
def get_available_balance(self):
|
||||||
return float(self.wallet_balance - self.total_reserved_points)
|
return float(self.wallet_balance - self.total_reserved_points)
|
||||||
|
|
||||||
def get_new_address(self):
|
def _get_status_of_claim(self, txid, name, sd_hash):
|
||||||
return threads.deferToThread(self._get_new_address)
|
d = self._get_claims_from_tx(txid)
|
||||||
|
|
||||||
def check_first_run(self):
|
def get_status(claims):
|
||||||
d = threads.deferToThread(self._get_wallet_balance)
|
if claims is None:
|
||||||
d.addCallback(lambda bal: threads.deferToThread(self._get_num_addresses) if bal == 0 else 2)
|
claims = []
|
||||||
d.addCallback(lambda num_addresses: True if num_addresses <= 1 else False)
|
for claim in claims:
|
||||||
|
if 'in claim trie' in claim:
|
||||||
|
if 'name' in claim and str(claim['name']) == name and 'value' in claim:
|
||||||
|
try:
|
||||||
|
value_dict = json.loads(claim['value'])
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
if 'stream_hash' in value_dict and str(value_dict['stream_hash']) == sd_hash:
|
||||||
|
if 'is controlling' in claim and claim['is controlling']:
|
||||||
|
return name, "valid"
|
||||||
|
if claim['in claim trie']:
|
||||||
|
return name, "invalid"
|
||||||
|
if 'in queue' in claim and claim['in queue']:
|
||||||
|
return name, "pending"
|
||||||
|
return name, "unconfirmed"
|
||||||
|
return None
|
||||||
|
|
||||||
|
d.addCallback(get_status)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def get_most_recent_blocktime(self):
|
def _check_expected_balances(self):
|
||||||
return threads.deferToThread(self._get_best_block_time)
|
now = datetime.datetime.now()
|
||||||
|
balances_to_check = []
|
||||||
|
try:
|
||||||
|
while self.expected_balance_at_time[0][3] < now:
|
||||||
|
balances_to_check.append(self.expected_balance_at_time.popleft())
|
||||||
|
except IndexError:
|
||||||
|
pass
|
||||||
|
ds = []
|
||||||
|
for balance_to_check in balances_to_check:
|
||||||
|
log.info("Checking balance of address %s", str(balance_to_check[1]))
|
||||||
|
d = self._get_balance_for_address(balance_to_check[1])
|
||||||
|
d.addCallback(lambda bal: bal >= balance_to_check[2])
|
||||||
|
ds.append(d)
|
||||||
|
dl = defer.DeferredList(ds)
|
||||||
|
|
||||||
def get_rpc_conf(self):
|
def handle_checks(results):
|
||||||
|
from future_builtins import zip
|
||||||
|
for balance, (success, result) in zip(balances_to_check, results):
|
||||||
|
peer = balance[0]
|
||||||
|
if success is True:
|
||||||
|
if result is False:
|
||||||
|
if balance[4] <= 1: # first or second strike, give them another chance
|
||||||
|
new_expected_balance = (balance[0],
|
||||||
|
balance[1],
|
||||||
|
balance[2],
|
||||||
|
datetime.datetime.now() + self.max_expected_payment_time,
|
||||||
|
balance[4] + 1,
|
||||||
|
balance[5])
|
||||||
|
self.expected_balance_at_time.append(new_expected_balance)
|
||||||
|
peer.update_score(-5.0)
|
||||||
|
else:
|
||||||
|
peer.update_score(-50.0)
|
||||||
|
else:
|
||||||
|
if balance[4] == 0:
|
||||||
|
peer.update_score(balance[5])
|
||||||
|
peer.update_stats('points_received', balance[5])
|
||||||
|
else:
|
||||||
|
log.warning("Something went wrong checking a balance. Peer: %s, account: %s,"
|
||||||
|
"expected balance: %s, expected time: %s, count: %s, error: %s",
|
||||||
|
str(balance[0]), str(balance[1]), str(balance[2]), str(balance[3]),
|
||||||
|
str(balance[4]), str(result.getErrorMessage()))
|
||||||
|
|
||||||
|
dl.addCallback(handle_checks)
|
||||||
|
return dl
|
||||||
|
|
||||||
|
def _open_db(self):
|
||||||
|
self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"),
|
||||||
|
check_same_thread=False)
|
||||||
|
return self.db.runQuery("create table if not exists name_metadata (" +
|
||||||
|
" name text, " +
|
||||||
|
" txid text, " +
|
||||||
|
" sd_hash text)")
|
||||||
|
|
||||||
|
def _save_name_metadata(self, name, sd_hash, txid):
|
||||||
|
d = self.db.runQuery("insert into name_metadata values (?, ?, ?)",
|
||||||
|
(name, txid, sd_hash))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _get_claim_metadata_for_sd_hash(self, sd_hash):
|
||||||
|
d = self.db.runQuery("select name, txid from name_metadata where sd_hash=?", (sd_hash,))
|
||||||
|
d.addCallback(lambda r: r[0] if len(r) else None)
|
||||||
|
return d
|
||||||
|
|
||||||
|
######### Must be overridden #########
|
||||||
|
|
||||||
|
def get_info_exchanger(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def get_wallet_info_query_handler_factory(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def get_balance(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def get_new_address(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def get_block(self, blockhash):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def get_most_recent_blocktime(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def get_blockchain_info(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def get_name_claims(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def check_first_run(self):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _get_raw_tx(self, txid):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _send_name_claim(self, name, val, amount):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _get_decoded_tx(self, raw_tx):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _send_abandon(self, txid, address, amount):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _do_send_many(self, payments_to_send):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _get_value_for_name(self, name):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _get_claims_from_tx(self, txid):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
def _get_balance_for_address(self, address):
|
||||||
|
return defer.fail(NotImplementedError())
|
||||||
|
|
||||||
|
|
||||||
|
class LBRYcrdWallet(LBRYWallet):
|
||||||
|
def __init__(self, db_dir, wallet_dir=None, wallet_conf=None, lbrycrdd_path=None):
|
||||||
|
LBRYWallet.__init__(self, db_dir)
|
||||||
|
self.started_lbrycrdd = False
|
||||||
|
self.wallet_dir = wallet_dir
|
||||||
|
self.wallet_conf = wallet_conf
|
||||||
|
self.lbrycrdd = None
|
||||||
|
self.lbrycrdd_path = lbrycrdd_path
|
||||||
|
|
||||||
|
settings = self._get_rpc_conf()
|
||||||
|
rpc_user = settings["username"]
|
||||||
|
rpc_pass = settings["password"]
|
||||||
|
rpc_port = settings["rpc_port"]
|
||||||
|
rpc_url = "127.0.0.1"
|
||||||
|
self.rpc_conn_string = "http://%s:%s@%s:%s" % (rpc_user, rpc_pass, rpc_url, str(rpc_port))
|
||||||
|
|
||||||
|
def _start(self):
|
||||||
|
return threads.deferToThread(self._make_connection)
|
||||||
|
|
||||||
|
def _stop(self):
|
||||||
|
if self.lbrycrdd_path is not None:
|
||||||
|
return self._stop_daemon()
|
||||||
|
|
||||||
|
def _make_connection(self):
|
||||||
|
alert.info("Connecting to lbrycrdd...")
|
||||||
|
if self.lbrycrdd_path is not None:
|
||||||
|
self._start_daemon()
|
||||||
|
self._get_info_rpc()
|
||||||
|
log.info("Connected!")
|
||||||
|
alert.info("Connected to lbrycrdd.")
|
||||||
|
|
||||||
|
def _get_rpc_conf(self):
|
||||||
settings = {"username": "rpcuser",
|
settings = {"username": "rpcuser",
|
||||||
"password": "rpcpassword",
|
"password": "rpcpassword",
|
||||||
"rpc_port": 8332}
|
"rpc_port": 8332}
|
||||||
|
@ -397,6 +563,82 @@ class LBRYcrdWallet(object):
|
||||||
settings["rpc_port"] = int(l[8:].rstrip('\n'))
|
settings["rpc_port"] = int(l[8:].rstrip('\n'))
|
||||||
return settings
|
return settings
|
||||||
|
|
||||||
|
def get_info_exchanger(self):
|
||||||
|
return LBRYcrdAddressRequester(self)
|
||||||
|
|
||||||
|
def get_wallet_info_query_handler_factory(self):
|
||||||
|
return LBRYcrdAddressQueryHandlerFactory(self)
|
||||||
|
|
||||||
|
def check_first_run(self):
|
||||||
|
d = self.get_balance()
|
||||||
|
d.addCallback(lambda bal: threads.deferToThread(self._get_num_addresses_rpc) if bal == 0 else 2)
|
||||||
|
d.addCallback(lambda num_addresses: True if num_addresses <= 1 else False)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def get_new_address(self):
|
||||||
|
return threads.deferToThread(self._get_new_address_rpc)
|
||||||
|
|
||||||
|
def get_balance(self):
|
||||||
|
return threads.deferToThread(self._get_wallet_balance_rpc)
|
||||||
|
|
||||||
|
def get_most_recent_blocktime(self):
|
||||||
|
d = threads.deferToThread(self._get_best_blockhash_rpc)
|
||||||
|
d.addCallback(lambda blockhash: threads.deferToThread(self._get_block_rpc, blockhash))
|
||||||
|
d.addCallback(
|
||||||
|
lambda block: block['time'] if 'time' in block else Failure(ValueError("Could not get a block time")))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def get_name_claims(self):
|
||||||
|
return threads.deferToThread(self._get_name_claims_rpc)
|
||||||
|
|
||||||
|
def get_block(self, blockhash):
|
||||||
|
return threads.deferToThread(self._get_block_rpc, blockhash)
|
||||||
|
|
||||||
|
def get_blockchain_info(self):
|
||||||
|
return threads.deferToThread(self._get_blockchain_info_rpc)
|
||||||
|
|
||||||
|
def get_nametrie(self):
|
||||||
|
return threads.deferToThread(self._get_nametrie_rpc)
|
||||||
|
|
||||||
|
def start_miner(self):
|
||||||
|
d = threads.deferToThread(self._get_gen_status_rpc)
|
||||||
|
d.addCallback(lambda status: threads.deferToThread(self._set_gen_status_rpc, True) if not status
|
||||||
|
else "Miner was already running")
|
||||||
|
return d
|
||||||
|
|
||||||
|
def stop_miner(self):
|
||||||
|
d = threads.deferToThread(self._get_gen_status_rpc)
|
||||||
|
d.addCallback(lambda status: threads.deferToThread(self._set_gen_status_rpc, False) if status
|
||||||
|
else "Miner wasn't running")
|
||||||
|
return d
|
||||||
|
|
||||||
|
def get_miner_status(self):
|
||||||
|
return threads.deferToThread(self._get_gen_status_rpc)
|
||||||
|
|
||||||
|
def _get_balance_for_address(self, address):
|
||||||
|
return threads.deferToThread(self._get_balance_for_address_rpc, address)
|
||||||
|
|
||||||
|
def _do_send_many(self, payments_to_send):
|
||||||
|
return threads.deferToThread(self._do_send_many_rpc, payments_to_send)
|
||||||
|
|
||||||
|
def _send_name_claim(self, name, value, amount):
|
||||||
|
return threads.deferToThread(self._send_name_claim_rpc, name, value, amount)
|
||||||
|
|
||||||
|
def _get_raw_tx(self, txid):
|
||||||
|
return threads.deferToThread(self._get_raw_tx_rpc, txid)
|
||||||
|
|
||||||
|
def _get_decoded_tx(self, raw_tx):
|
||||||
|
return threads.deferToThread(self._get_decoded_tx_rpc, raw_tx)
|
||||||
|
|
||||||
|
def _send_abandon(self, txid, address, amount):
|
||||||
|
return threads.deferToThread(self._send_abandon_rpc, txid, address, amount)
|
||||||
|
|
||||||
|
def _get_claims_from_tx(self, txid):
|
||||||
|
return threads.deferToThread(self._get_claims_from_tx_rpc, txid)
|
||||||
|
|
||||||
|
def _get_value_for_name(self, name):
|
||||||
|
return threads.deferToThread(self._get_value_for_name_rpc, name)
|
||||||
|
|
||||||
def _get_rpc_conn(self):
|
def _get_rpc_conn(self):
|
||||||
return AuthServiceProxy(self.rpc_conn_string)
|
return AuthServiceProxy(self.rpc_conn_string)
|
||||||
|
|
||||||
|
@ -450,101 +692,41 @@ class LBRYcrdWallet(object):
|
||||||
def _stop_daemon(self):
|
def _stop_daemon(self):
|
||||||
if self.lbrycrdd is not None and self.started_lbrycrdd is True:
|
if self.lbrycrdd is not None and self.started_lbrycrdd is True:
|
||||||
alert.info("Stopping lbrycrdd...")
|
alert.info("Stopping lbrycrdd...")
|
||||||
d = threads.deferToThread(self._rpc_stop)
|
d = threads.deferToThread(self._stop_rpc)
|
||||||
d.addCallback(lambda _: alert.info("Stopped lbrycrdd."))
|
d.addCallback(lambda _: alert.info("Stopped lbrycrdd."))
|
||||||
return d
|
return d
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
def _check_expected_balances(self):
|
|
||||||
now = datetime.datetime.now()
|
|
||||||
balances_to_check = []
|
|
||||||
try:
|
|
||||||
while self.expected_balance_at_time[0][3] < now:
|
|
||||||
balances_to_check.append(self.expected_balance_at_time.popleft())
|
|
||||||
except IndexError:
|
|
||||||
pass
|
|
||||||
ds = []
|
|
||||||
for balance_to_check in balances_to_check:
|
|
||||||
d = threads.deferToThread(self._check_expected_balance, balance_to_check)
|
|
||||||
ds.append(d)
|
|
||||||
dl = defer.DeferredList(ds)
|
|
||||||
|
|
||||||
def handle_checks(results):
|
|
||||||
from future_builtins import zip
|
|
||||||
for balance, (success, result) in zip(balances_to_check, results):
|
|
||||||
peer = balance[0]
|
|
||||||
if success is True:
|
|
||||||
if result is False:
|
|
||||||
if balance[4] <= 1: # first or second strike, give them another chance
|
|
||||||
new_expected_balance = (balance[0],
|
|
||||||
balance[1],
|
|
||||||
balance[2],
|
|
||||||
datetime.datetime.now() + self.max_expected_payment_time,
|
|
||||||
balance[4] + 1,
|
|
||||||
balance[5])
|
|
||||||
self.expected_balance_at_time.append(new_expected_balance)
|
|
||||||
peer.update_score(-5.0)
|
|
||||||
else:
|
|
||||||
peer.update_score(-50.0)
|
|
||||||
else:
|
|
||||||
if balance[4] == 0:
|
|
||||||
peer.update_score(balance[5])
|
|
||||||
peer.update_stats('points_received', balance[5])
|
|
||||||
else:
|
|
||||||
log.warning("Something went wrong checking a balance. Peer: %s, account: %s,"
|
|
||||||
"expected balance: %s, expected time: %s, count: %s, error: %s",
|
|
||||||
str(balance[0]), str(balance[1]), str(balance[2]), str(balance[3]),
|
|
||||||
str(balance[4]), str(result.getErrorMessage()))
|
|
||||||
|
|
||||||
dl.addCallback(handle_checks)
|
|
||||||
return dl
|
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _check_expected_balance(self, expected_balance):
|
def _get_balance_for_address_rpc(self, address):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
log.info("Checking balance of address %s", str(expected_balance[1]))
|
balance = rpc_conn.getreceivedbyaddress(address)
|
||||||
balance = rpc_conn.getreceivedbyaddress(expected_balance[1])
|
log.debug("received balance for %s: %s", str(address), str(balance))
|
||||||
log.debug("received balance: %s", str(balance))
|
return balance
|
||||||
log.debug("expected balance: %s", str(expected_balance[2]))
|
|
||||||
return balance >= expected_balance[2]
|
|
||||||
|
|
||||||
def _send_payments(self):
|
|
||||||
log.info("Trying to send payments, if there are any to be sent")
|
|
||||||
|
|
||||||
def do_send(payments):
|
|
||||||
rpc_conn = self._get_rpc_conn()
|
|
||||||
rpc_conn.sendmany("", payments)
|
|
||||||
|
|
||||||
payments_to_send = {}
|
|
||||||
for address, points in self.queued_payments.items():
|
|
||||||
log.info("Should be sending %s points to %s", str(points), str(address))
|
|
||||||
payments_to_send[address] = float(points)
|
|
||||||
self.total_reserved_points -= points
|
|
||||||
self.wallet_balance -= points
|
|
||||||
del self.queued_payments[address]
|
|
||||||
if payments_to_send:
|
|
||||||
log.info("Creating a transaction with outputs %s", str(payments_to_send))
|
|
||||||
return threads.deferToThread(do_send, payments_to_send)
|
|
||||||
log.info("There were no payments to send")
|
|
||||||
return defer.succeed(True)
|
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_info(self):
|
def _do_send_many_rpc(self, payments):
|
||||||
|
rpc_conn = self._get_rpc_conn()
|
||||||
|
rpc_conn.sendmany("", payments)
|
||||||
|
return True
|
||||||
|
|
||||||
|
@_catch_connection_error
|
||||||
|
def _get_info_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getinfo()
|
return rpc_conn.getinfo()
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_name_claims(self):
|
def _get_name_claims_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.listnameclaims()
|
return rpc_conn.listnameclaims()
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_gen_status(self):
|
def _get_gen_status_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getgenerate()
|
return rpc_conn.getgenerate()
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _set_gen_status(self, b):
|
def _set_gen_status_rpc(self, b):
|
||||||
if b:
|
if b:
|
||||||
log.info("Starting miner")
|
log.info("Starting miner")
|
||||||
else:
|
else:
|
||||||
|
@ -553,69 +735,61 @@ class LBRYcrdWallet(object):
|
||||||
return rpc_conn.setgenerate(b)
|
return rpc_conn.setgenerate(b)
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_raw_tx(self, txid):
|
def _get_raw_tx_rpc(self, txid):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getrawtransaction(txid)
|
return rpc_conn.getrawtransaction(txid)
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_decoded_tx(self, raw):
|
def _get_decoded_tx_rpc(self, raw):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.decoderawtransaction(raw)
|
return rpc_conn.decoderawtransaction(raw)
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _abandon_name(self, txid, address, amount):
|
def _send_abandon_rpc(self, txid, address, amount):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.abandonname(txid, address, amount)
|
return rpc_conn.abandonname(txid, address, amount)
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_blockchain_info(self):
|
def _get_blockchain_info_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getblockchaininfo()
|
return rpc_conn.getblockchaininfo()
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_block(self, blockhash):
|
def _get_block_rpc(self, blockhash):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getblock(blockhash)
|
return rpc_conn.getblock(blockhash)
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_claims_for_tx(self, txid):
|
def _get_claims_from_tx_rpc(self, txid):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getclaimsfortx(txid)
|
return rpc_conn.getclaimsfortx(txid)
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_nametrie(self):
|
def _get_nametrie_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getnametrie()
|
return rpc_conn.getnametrie()
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_wallet_balance(self):
|
def _get_wallet_balance_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getbalance("")
|
return rpc_conn.getbalance("")
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_new_address(self):
|
def _get_new_address_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return rpc_conn.getnewaddress()
|
return rpc_conn.getnewaddress()
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_value_for_name(self, name, txid=None):
|
def _get_value_for_name_rpc(self, name):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
if not txid:
|
return rpc_conn.getvalueforname(name)
|
||||||
return rpc_conn.getvalueforname(name)
|
|
||||||
else:
|
|
||||||
claim = rpc_conn.getclaimsfortx(txid)[0]
|
|
||||||
if claim['name'] == name:
|
|
||||||
claim['txid'] = txid
|
|
||||||
return claim
|
|
||||||
else:
|
|
||||||
raise ValueError
|
|
||||||
|
|
||||||
# def _update_name(self, name_value):
|
# def _update_name_rpc(self, name_value):
|
||||||
# rpc_conn = self._get_rpc_conn()
|
# rpc_conn = self._get_rpc_conn()
|
||||||
# return rpc_conn.updatename(name_value)
|
# return rpc_conn.updatename(name_value)
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _claim_name(self, name, value, amount):
|
def _send_name_claim_rpc(self, name, value, amount):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
try:
|
try:
|
||||||
return str(rpc_conn.claimname(name, value, amount))
|
return str(rpc_conn.claimname(name, value, amount))
|
||||||
|
@ -626,45 +800,17 @@ class LBRYcrdWallet(object):
|
||||||
raise ValueError(e.error['message'])
|
raise ValueError(e.error['message'])
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_status_of_claim(self, txhash, name, sd_hash):
|
def _get_num_addresses_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
|
||||||
claims = rpc_conn.getclaimsfortx(txhash)
|
|
||||||
if claims is None:
|
|
||||||
claims = []
|
|
||||||
for claim in claims:
|
|
||||||
if 'in claim trie' in claim:
|
|
||||||
if 'name' in claim and str(claim['name']) == name and 'value' in claim:
|
|
||||||
try:
|
|
||||||
value_dict = json.loads(claim['value'])
|
|
||||||
except ValueError:
|
|
||||||
return None
|
|
||||||
if 'stream_hash' in value_dict and str(value_dict['stream_hash']) == sd_hash:
|
|
||||||
if 'is controlling' in claim and claim['is controlling']:
|
|
||||||
return name, "valid"
|
|
||||||
if claim['in claim trie']:
|
|
||||||
return name, "invalid"
|
|
||||||
if 'in queue' in claim and claim['in queue']:
|
|
||||||
return name, "pending"
|
|
||||||
return name, "unconfirmed"
|
|
||||||
return None
|
|
||||||
|
|
||||||
@_catch_connection_error
|
|
||||||
def _get_num_addresses(self):
|
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
return len(rpc_conn.getaddressesbyaccount(""))
|
return len(rpc_conn.getaddressesbyaccount(""))
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _get_best_block_time(self):
|
def _get_best_blockhash_rpc(self):
|
||||||
rpc_conn = self._get_rpc_conn()
|
rpc_conn = self._get_rpc_conn()
|
||||||
best_block_hash = rpc_conn.getbestblockhash()
|
return rpc_conn.getbestblockhash()
|
||||||
block = rpc_conn.getblock(best_block_hash)
|
|
||||||
if 'time' in block:
|
|
||||||
return block['time']
|
|
||||||
raise ValueError("Could not get a block time")
|
|
||||||
|
|
||||||
|
|
||||||
@_catch_connection_error
|
@_catch_connection_error
|
||||||
def _rpc_stop(self):
|
def _stop_rpc(self):
|
||||||
# check if our lbrycrdd is actually running, or if we connected to one that was already
|
# check if our lbrycrdd is actually running, or if we connected to one that was already
|
||||||
# running and ours failed to start
|
# running and ours failed to start
|
||||||
if self.lbrycrdd.poll() is None:
|
if self.lbrycrdd.poll() is None:
|
||||||
|
@ -672,24 +818,6 @@ class LBRYcrdWallet(object):
|
||||||
rpc_conn.stop()
|
rpc_conn.stop()
|
||||||
self.lbrycrdd.wait()
|
self.lbrycrdd.wait()
|
||||||
|
|
||||||
def _open_db(self):
|
|
||||||
self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"),
|
|
||||||
check_same_thread=False)
|
|
||||||
return self.db.runQuery("create table if not exists name_metadata (" +
|
|
||||||
" name text, " +
|
|
||||||
" txid text, " +
|
|
||||||
" sd_hash text)")
|
|
||||||
|
|
||||||
def _save_name_metadata(self, name, sd_hash, txid):
|
|
||||||
d = self.db.runQuery("insert into name_metadata values (?, ?, ?)",
|
|
||||||
(name, txid, sd_hash))
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _get_claim_metadata_for_sd_hash(self, sd_hash):
|
|
||||||
d = self.db.runQuery("select name, txid from name_metadata where sd_hash=?", (sd_hash,))
|
|
||||||
d.addCallback(lambda r: r[0] if len(r) else None)
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
class LBRYcrdAddressRequester(object):
|
class LBRYcrdAddressRequester(object):
|
||||||
implements([IRequestCreator])
|
implements([IRequestCreator])
|
||||||
|
|
|
@ -766,15 +766,15 @@ class AddStream(CommandHandler):
|
||||||
|
|
||||||
def do_download(stream_downloader):
|
def do_download(stream_downloader):
|
||||||
d = stream_downloader.start()
|
d = stream_downloader.start()
|
||||||
d.addCallback(lambda _: self._download_succeeded(stream_downloader))
|
d.addCallback(lambda result: self._download_succeeded(stream_downloader, result))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
d.addCallback(do_download)
|
d.addCallback(do_download)
|
||||||
d.addErrback(self._handle_download_error)
|
d.addErrback(self._handle_download_error)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _download_succeeded(self, stream_downloader):
|
def _download_succeeded(self, stream_downloader, result):
|
||||||
self.console.sendLine("%s has successfully downloaded." % str(stream_downloader))
|
self.console.sendLine("%s: %s." % (str(stream_downloader), str(result)))
|
||||||
|
|
||||||
def _handle_download_error(self, err):
|
def _handle_download_error(self, err):
|
||||||
if err.check(InsufficientFundsError):
|
if err.check(InsufficientFundsError):
|
||||||
|
|
|
@ -799,12 +799,12 @@ class LBRYDaemon(xmlrpc.XMLRPC):
|
||||||
@return:
|
@return:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _return_d(x):
|
#def _return_d(x):
|
||||||
d = defer.Deferred()
|
# d = defer.Deferred()
|
||||||
d.addCallback(lambda _: x)
|
# d.addCallback(lambda _: x)
|
||||||
d.callback(None)
|
# d.callback(None)
|
||||||
|
|
||||||
return d
|
# return d
|
||||||
|
|
||||||
def _clean(n):
|
def _clean(n):
|
||||||
t = []
|
t = []
|
||||||
|
@ -835,21 +835,36 @@ class LBRYDaemon(xmlrpc.XMLRPC):
|
||||||
|
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
def resolve_claims(claims):
|
||||||
|
ds = []
|
||||||
|
for claim in claims:
|
||||||
|
d1 = defer.succeed(claim)
|
||||||
|
d2 = self._resolve_name_wc(claim['name'])
|
||||||
|
d3 = self._get_est_cost(claim['name'])
|
||||||
|
dl = defer.DeferredList([d1, d2, d3], consumeErrors=True)
|
||||||
|
ds.append(dl)
|
||||||
|
return defer.DeferredList(ds)
|
||||||
|
|
||||||
def _disp(results):
|
def _disp(results):
|
||||||
print '[' + str(datetime.now()) + '] Found ' + str(len(results)) + ' results'
|
print '[' + str(datetime.now()) + '] Found ' + str(len(results)) + ' results'
|
||||||
return results
|
return results
|
||||||
|
|
||||||
print '[' + str(datetime.now()) + '] Search nametrie: ' + search
|
print '[' + str(datetime.now()) + '] Search nametrie: ' + search
|
||||||
|
|
||||||
filtered_results = [n for n in self.session.wallet.get_nametrie() if n['name'].startswith(search)]
|
d = self.session.wallet.get_nametrie()
|
||||||
if len(filtered_results) > self.max_search_results:
|
d.addCallback(lambda trie: [claim for claim in trie if claim['name'].startswith(search) and 'txid' in claim])
|
||||||
filtered_results = filtered_results[:self.max_search_results]
|
d.addCallback(lambda claims: claims[:self.max_search_results])
|
||||||
filtered_results = [n for n in filtered_results if 'txid' in n.keys()]
|
d.addCallback(resolve_claims)
|
||||||
resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name_wc(n['name']),
|
|
||||||
self._get_est_cost(n['name'])], consumeErrors=True)
|
|
||||||
for n in filtered_results]
|
|
||||||
|
|
||||||
d = defer.DeferredList(resolved_results)
|
#filtered_results = [n for n in self.session.wallet.get_nametrie() if n['name'].startswith(search)]
|
||||||
|
#if len(filtered_results) > self.max_search_results:
|
||||||
|
# filtered_results = filtered_results[:self.max_search_results]
|
||||||
|
#filtered_results = [n for n in filtered_results if 'txid' in n.keys()]
|
||||||
|
#resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name_wc(n['name']),
|
||||||
|
# self._get_est_cost(n['name'])], consumeErrors=True)
|
||||||
|
# for n in filtered_results]
|
||||||
|
|
||||||
|
#d = defer.DeferredList(resolved_results)
|
||||||
d.addCallback(_clean)
|
d.addCallback(_clean)
|
||||||
d.addCallback(_parse)
|
d.addCallback(_parse)
|
||||||
d.addCallback(_disp)
|
d.addCallback(_disp)
|
||||||
|
|
|
@ -135,52 +135,66 @@ class FetcherDaemon(object):
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
def _get_names(self):
|
def _get_names(self):
|
||||||
c = self.wallet.get_blockchain_info()
|
d = self.wallet.get_blockchain_info()
|
||||||
rtn = []
|
d.addCallback(lambda c: get_new_streams if c != self.lastbestblock else [])
|
||||||
if self.lastbestblock != c:
|
|
||||||
block = self.wallet.get_block(c['bestblockhash'])
|
def get_new_streams(c):
|
||||||
txids = block['tx']
|
self.lastbestblock = c
|
||||||
transactions = [self.wallet.get_tx(t) for t in txids]
|
d = self.wallet.get_block(c['bestblockhash'])
|
||||||
for t in transactions:
|
d.addCallback(lambda block: get_new_streams_in_txes(block['tx'], c))
|
||||||
claims = self.wallet.get_claims_for_tx(t['txid'])
|
return d
|
||||||
|
|
||||||
|
def get_new_streams_in_txes(txids, c):
|
||||||
|
ds = []
|
||||||
|
for t in txids:
|
||||||
|
d = self.wallet.get_claims_from_tx(t)
|
||||||
|
d.addCallback(get_new_streams_in_tx, t, c)
|
||||||
|
ds.append(d)
|
||||||
|
d = defer.DeferredList(ds, consumeErrors=True)
|
||||||
|
d.addCallback(lambda result: [r[1] for r in result if r[0]])
|
||||||
|
d.addCallback(lambda stream_lists: [stream for streams in stream_lists for stream in streams])
|
||||||
|
return d
|
||||||
|
|
||||||
|
def get_new_streams_in_tx(claims, t, c):
|
||||||
|
#claims = self.wallet.get_claims_for_tx(t['txid'])
|
||||||
# if self.first_run:
|
# if self.first_run:
|
||||||
# # claims = self.rpc_conn.getclaimsfortx("96aca2c60efded5806b7336430c5987b9092ffbea9c6ed444e3bf8e008993e11")
|
# # claims = self.rpc_conn.getclaimsfortx("96aca2c60efded5806b7336430c5987b9092ffbea9c6ed444e3bf8e008993e11")
|
||||||
# # claims = self.rpc_conn.getclaimsfortx("cc9c7f5225ecb38877e6ca7574d110b23214ac3556b9d65784065ad3a85b4f74")
|
# # claims = self.rpc_conn.getclaimsfortx("cc9c7f5225ecb38877e6ca7574d110b23214ac3556b9d65784065ad3a85b4f74")
|
||||||
# self.first_run = False
|
# self.first_run = False
|
||||||
if claims:
|
rtn = []
|
||||||
for claim in claims:
|
if claims:
|
||||||
if claim not in self.seen:
|
for claim in claims:
|
||||||
msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \
|
if claim not in self.seen:
|
||||||
" | stream hash: " + str(json.loads(claim['value'])['stream_hash'])
|
msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \
|
||||||
print msg
|
" | stream hash: " + str(json.loads(claim['value'])['stream_hash'])
|
||||||
log.debug(msg)
|
print msg
|
||||||
rtn.append([claim['name'], t['txid']])
|
log.debug(msg)
|
||||||
self.seen.append(claim)
|
rtn.append((claim['name'], t))
|
||||||
else:
|
self.seen.append(claim)
|
||||||
if self.verbose:
|
else:
|
||||||
print "[" + str(datetime.now()) + "] No claims in block", c['bestblockhash']
|
if self.verbose:
|
||||||
|
print "[" + str(datetime.now()) + "] No claims in block", c['bestblockhash']
|
||||||
|
return rtn
|
||||||
|
|
||||||
self.lastbestblock = c
|
d.addCallback(lambda streams: defer.DeferredList(
|
||||||
|
[self.wallet.get_stream_info_from_txid(name, t) for name, t in streams]))
|
||||||
if len(rtn):
|
# if len(rtn):
|
||||||
return defer.DeferredList([self.wallet.get_stream_info_for_name(name, txid=t) for name, t in rtn])
|
# return defer.DeferredList([self.wallet.get_stream_info_for_name(name, txid=t) for name, t in rtn])
|
||||||
|
return d
|
||||||
|
|
||||||
def _download_claims(self, claims):
|
def _download_claims(self, claims):
|
||||||
if claims:
|
if claims:
|
||||||
for claim in claims:
|
for claim in claims:
|
||||||
download = defer.Deferred()
|
|
||||||
stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager,
|
stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager,
|
||||||
self.max_key_fee, pay_key=False)
|
self.max_key_fee, pay_key=False)
|
||||||
download.addCallback(lambda _: stream.start(claim[1]))
|
stream.start(claim[1])
|
||||||
download.callback(None)
|
|
||||||
|
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
def _looped_search(self):
|
def _looped_search(self):
|
||||||
d = defer.Deferred()
|
d = self._get_names()
|
||||||
d.addCallback(lambda _: self._get_names())
|
|
||||||
d.addCallback(self._download_claims)
|
d.addCallback(self._download_claims)
|
||||||
d.callback(None)
|
return d
|
||||||
|
|
||||||
def _get_autofetcher_conf(self):
|
def _get_autofetcher_conf(self):
|
||||||
settings = {"maxkey": "0.0"}
|
settings = {"maxkey": "0.0"}
|
||||||
|
|
Loading…
Add table
Reference in a new issue