diff --git a/lbrynet/conf.py b/lbrynet/conf.py index d593c51f5..cfab59b08 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -203,7 +203,7 @@ class ApplicationSettings(Settings): self.BLOBFILES_DIR = "blobfiles" self.BLOB_SIZE = 2*MB self.LOG_FILE_NAME = "lbrynet.log" - self.LOG_POST_URL = "https://lbry.io/log-upload" + self.LOG_POST_URL = "https://lbry.io/log-upload" self.CRYPTSD_FILE_EXTENSION = ".cryptsd" self.API_INTERFACE = "localhost" self.API_ADDRESS = "lbryapi" @@ -220,6 +220,7 @@ class ApplicationSettings(Settings): self.LOGGLY_TOKEN = 'LJEzATH4AzRgAwxjAP00LwZ2YGx3MwVgZTMuBQZ3MQuxLmOv' self.ANALYTICS_ENDPOINT = 'https://api.segment.io/v1' self.ANALYTICS_TOKEN = 'Ax5LZzR1o3q3Z3WjATASDwR5rKyHH0qOIRIbLmMXn2H=' + self.DB_REVISION_FILE_NAME = 'db_revision' Settings.__init__(self) @@ -278,6 +279,9 @@ class Config(DefaultSettings): """ return os.path.join(self.ensure_data_dir(), self.LOG_FILE_NAME) + def get_db_revision_filename(self): + return os.path.join(self.ensure_data_dir(), self.DB_REVISION_FILE_NAME) + def get_conf_filename(self): return get_settings_file_ext(self.ensure_data_dir()) diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 47ae0f384..2db3b6ab2 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -26,6 +26,7 @@ from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHand from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import UnknownNameError, InvalidStreamInfoError, RequestCanceledError from lbrynet.core.Error import InsufficientFundsError +from lbrynet.db_migrator.migrate1to2 import UNSET_NOUT from lbrynet.metadata.Metadata import Metadata log = logging.getLogger(__name__) @@ -38,6 +39,29 @@ class ReservedPoints(object): self.amount = amount +class ClaimOutpoint(dict): + def __init__(self, txid, nout): + if len(txid) != 64: + raise TypeError('{} is not a txid'.format(txid)) + self['txid'] = txid + self['nout'] = nout + + def __repr__(self): + return "{}:{}".format(txid,nout) + + def __eq__(self, compare): + if isinstance(compare,dict): + # TODO: lbryum returns nout's in dicts as "nOut" , need to fix this + if 'nOut' in compare: + return (self['txid'],self['nout']) == (compare['txid'],compare['nOut']) + elif 'nout' in compare: + return (self['txid'],self['nout']) == (compare['txid'],compare['nout']) + else: + raise TypeError('cannot compare {}'.format(type(compare))) + def __ne__(self, compare): + return not self.__eq__(compare) + + def _catch_connection_error(f): def w(*args): try: @@ -312,12 +336,13 @@ class Wallet(object): d.addCallback(lambda r: None if 'txid' not in r else r['txid']) return d - def get_stream_info_from_txid(self, name, txid): - d = self.get_claims_from_tx(txid) + def get_stream_info_from_claim_outpoint(self, name, txid, nout): + claim_outpoint = ClaimOutpoint(txid, nout) + d = self.get_claims_from_tx(claim_outpoint['txid']) def get_claim_for_name(claims): for claim in claims: - if claim['name'] == name: + if claim_outpoint == claim: claim['txid'] = txid return claim return Failure(UnknownNameError(name)) @@ -342,10 +367,10 @@ class Wallet(object): metadata = Metadata(json.loads(result['value'])) except (TypeError, ValueError, ValidationError): return Failure(InvalidStreamInfoError(name, result['value'])) - txid = result['txid'] sd_hash = metadata['sources']['lbry_sd_hash'] - d = self._save_name_metadata(name, txid, sd_hash) - d.addCallback(lambda _: self.get_claimid(name, txid)) + claim_outpoint = ClaimOutpoint(result['txid'], result['n']) + d = self._save_name_metadata(name, claim_outpoint, sd_hash) + d.addCallback(lambda _: self.get_claimid(name, result['txid'],result['n'])) d.addCallback(lambda cid: _log_success(cid)) d.addCallback(lambda _: metadata) return d @@ -355,17 +380,17 @@ class Wallet(object): d.addCallback(lambda claims: next(claim for claim in claims['claims'] if claim['claimId'] == claim_id)) return d - def get_claimid(self, name, txid): + def get_claimid(self, name, txid, nout): def _get_id_for_return(claim_id): if claim_id: return defer.succeed(claim_id) else: d = self.get_claims_from_tx(txid) - d.addCallback(lambda claims: next(c['claimId'] for c in claims if c['name'] == name)) - d.addCallback(lambda cid: self._update_claimid(cid, name, txid)) + d.addCallback(lambda claims: next(c for c in claims if c['name'] == name and c['nOut'] == claim_outpoint['nout'])) + d.addCallback(lambda claim: self._update_claimid(claim['claimId'], name, ClaimOutpoint(txid, claim['nOut']))) return d - - d = self._get_claimid_for_tx(name, txid) + claim_outpoint = ClaimOutpoint(txid, nout) + d = self._get_claimid_for_tx(name, claim_outpoint) d.addCallback(_get_id_for_return) return d @@ -376,6 +401,7 @@ class Wallet(object): claim['value'] = json.loads(claim['value']) return claim + def _get_my_unspent_claim(claims): for claim in claims: if claim['name'] == name and not claim['is spent'] and not claim.get('supported_claimid', False): @@ -387,46 +413,49 @@ class Wallet(object): d.addCallback(_get_claim_for_return) return d - def get_claim_info(self, name, txid=None): - if not txid: + def get_claim_info(self, name, txid=None, nout=None): + if txid is None or nout is None: d = self._get_value_for_name(name) - d.addCallback(lambda r: self._get_claim_info(name, r['txid'])) - else: - d = self._get_claim_info(name, txid) + d.addCallback(lambda r: self._get_claim_info(name, ClaimOutpoint(r['txid'],r['n']))) + else: + d = self._get_claim_info(name, ClaimOutpoint(txid,nout)) d.addErrback(lambda _: False) return d - def _format_claim_for_return(self, name, claim, txid, metadata=None, meta_version=None): + def _format_claim_for_return(self, name, claim, metadata=None, meta_version=None): result = {} result['claim_id'] = claim['claimId'] result['amount'] = claim['nEffectiveAmount'] result['height'] = claim['nHeight'] result['name'] = name - result['txid'] = txid + result['txid'] = claim['txid'] + result['nout'] = claim['n'] result['value'] = metadata if metadata else json.loads(claim['value']) result['supports'] = [{'txid': support['txid'], 'n': support['n']} for support in claim['supports']] result['meta_version'] = meta_version if meta_version else result['value'].get('ver', '0.0.1') return result - def _get_claim_info(self, name, txid): + def _get_claim_info(self, name, claim_outpoint): def _build_response(claim): try: metadata = Metadata(json.loads(claim['value'])) meta_ver = metadata.version sd_hash = metadata['sources']['lbry_sd_hash'] - d = self._save_name_metadata(name, txid, sd_hash) + d = self._save_name_metadata(name, claim_outpoint, sd_hash) except (TypeError, ValueError, ValidationError): metadata = claim['value'] meta_ver = "Non-compliant" d = defer.succeed(None) - d.addCallback(lambda _: self._format_claim_for_return(name, claim, txid, - metadata=metadata, meta_version=meta_ver)) + d.addCallback(lambda _: self._format_claim_for_return(name, + claim, + metadata=metadata, + meta_version=meta_ver)) log.info("get claim info lbry://%s metadata: %s, claimid: %s", name, meta_ver, claim['claimId']) return d - d = self.get_claimid(name, txid) + d = self.get_claimid(name, claim_outpoint['txid'], claim_outpoint['nout']) d.addCallback(lambda claim_id: self.get_claim(name, claim_id)) d.addCallback(_build_response) return d @@ -442,10 +471,14 @@ class Wallet(object): return defer.succeed(Metadata(meta_for_return)) def claim_name(self, name, bid, m): - def _save_metadata(txid, metadata): - log.info("Saving metadata for claim %s" % txid) - d = self._save_name_metadata(name, txid, metadata['sources']['lbry_sd_hash']) - d.addCallback(lambda _: txid) + def _save_metadata(claim_out, metadata): + if not claim_out['success']: + msg = 'Claim to name {} failed: {}'.format(name,claim_out['reason']) + defer.fail(Exception(msg)) + claim_outpoint = ClaimOutpoint(claim_out['txid'],claim_out['nout']) + log.info("Saving metadata for claim %s %d" % (claim_outpoint['txid'], claim_outpoint['nout'])) + d = self._save_name_metadata(name, claim_outpoint, metadata['sources']['lbry_sd_hash']) + d.addCallback(lambda _: claim_out) return d def _claim_or_update(claim, metadata, _bid): @@ -455,47 +488,21 @@ class Wallet(object): else: log.info("Updating over own claim") d = self.update_metadata(metadata, claim['value']) - d.addCallback(lambda new_metadata: self._send_name_claim_update(name, claim['claim_id'], claim['txid'], new_metadata, _bid)) + claim_outpoint = ClaimOutpoint(claim['txid'],claim['nOut']) + d.addCallback(lambda new_metadata: self._send_name_claim_update(name, claim['claim_id'], + claim_outpoint, + new_metadata, _bid)) return d meta = Metadata(m) d = self.get_my_claim(name) d.addCallback(lambda claim: _claim_or_update(claim, meta, bid)) - d.addCallback(lambda txid: _save_metadata(txid, meta)) + d.addCallback(lambda claim_out: _save_metadata(claim_out, meta)) return d - def abandon_name(self, txid): - d1 = self.get_new_address() - d2 = self.get_claims_from_tx(txid) - - def get_txout_of_claim(claims): - for claim in claims: - if 'name' in claim and 'nOut' in claim: - return claim['nOut'] - return defer.fail(ValueError("No claims in tx")) - - def get_value_of_txout(nOut): - d = self._get_raw_tx(txid) - d.addCallback(self._get_decoded_tx) - d.addCallback(lambda tx: tx['vout'][nOut]['value']) - return d - - d2.addCallback(get_txout_of_claim) - d2.addCallback(get_value_of_txout) - dl = defer.DeferredList([d1, d2], consumeErrors=True) - - def abandon(results): - if results[0][0] and results[1][0]: - address = results[0][1] - amount = float(results[1][1]) - return self._send_abandon(txid, address, amount) - elif results[0][0] is False: - return defer.fail(Failure(ValueError("Couldn't get a new address"))) - else: - return results[1][1] - - dl.addCallback(abandon) - return dl + def abandon_claim(self, txid, nout): + claim_outpoint = ClaimOutpoint(txid, nout) + return self._abandon_claim(claim_outpoint) def support_claim(self, name, claim_id, amount): return self._support_claim(name, claim_id, amount) @@ -517,35 +524,24 @@ class Wallet(object): d = self._address_is_mine(address) return d - def get_tx_json(self, txid): - def _decode(raw_tx): - tx = Transaction(raw_tx).deserialize() - decoded_tx = {} - for txkey in tx.keys(): - if isinstance(tx[txkey], list): - decoded_tx[txkey] = [] - for i in tx[txkey]: - tmp = {} - for k in i.keys(): - if isinstance(i[k], Decimal): - tmp[k] = float(i[k] / 1e8) - else: - tmp[k] = i[k] - decoded_tx[txkey].append(tmp) - else: - decoded_tx[txkey] = tx[txkey] - return decoded_tx - - d = self._get_raw_tx(txid) - d.addCallback(_decode) + def get_transaction(self, txid): + d = self._get_transaction(txid) return d def get_claim_metadata_for_sd_hash(self, sd_hash): return self._get_claim_metadata_for_sd_hash(sd_hash) def get_name_and_validity_for_sd_hash(self, sd_hash): + def _get_status_of_claim(name_txid, sd_hash): + if name_txid: + claim_outpoint = ClaimOutpoint(name_txid[1],name_txid[2]) + name = name_txid[0] + return self._get_status_of_claim(claim_outpoint, name, sd_hash) + else: + return None + d = self._get_claim_metadata_for_sd_hash(sd_hash) - d.addCallback(lambda name_txid: self._get_status_of_claim(name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None) + d.addCallback(lambda name_txid: _get_status_of_claim(name_txid, sd_hash)) return d def get_available_balance(self): @@ -565,15 +561,17 @@ class Wallet(object): d.addCallback(lambda _: self._first_run == self._FIRST_RUN_YES) return d - def _get_status_of_claim(self, txid, name, sd_hash): - d = self.get_claims_from_tx(txid) + def _get_status_of_claim(self, claim_outpoint, name, sd_hash): + d = self.get_claims_from_tx(claim_outpoint['txid']) def get_status(claims): if claims is None: claims = [] for claim in claims: if 'in claim trie' in claim: - if 'name' in claim and str(claim['name']) == name and 'value' in claim: + name_is_equal = 'name' in claim and str(claim['name']) == name + nout_is_equal = 'nOut' in claim and claim['nOut'] == claim_outpoint['nout'] + if name_is_equal and nout_is_equal and 'value' in claim: try: value_dict = json.loads(claim['value']) except (ValueError, TypeError): @@ -650,11 +648,13 @@ class Wallet(object): transaction.execute("create table if not exists name_metadata (" + " name text, " + " txid text, " + + " n integer, " + " sd_hash text)") transaction.execute("create table if not exists claim_ids (" + " claimId text, " + " name text, " + - " txid text)") + " txid text, " + + " n integer)") return self.db.runInteraction(create_tables) @@ -662,27 +662,37 @@ class Wallet(object): d = self.db.runQuery("delete from name_metadata where length(txid) > 64 or txid is null") return d - def _save_name_metadata(self, name, txid, sd_hash): - assert len(txid) == 64, "That's not a txid: %s" % str(txid) - d = self.db.runQuery("delete from name_metadata where name=? and txid=? and sd_hash=?", (name, txid, sd_hash)) - d.addCallback(lambda _: self.db.runQuery("insert into name_metadata values (?, ?, ?)", (name, txid, sd_hash))) + + def _save_name_metadata(self, name, claim_outpoint, sd_hash): + d = self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?", + (name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash)) + d.addCallback( + lambda _: self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?", + (name, claim_outpoint['txid'], UNSET_NOUT, sd_hash))) + + d.addCallback(lambda _: self.db.runQuery("insert into name_metadata values (?, ?, ?, ?)", + (name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash))) return d def _get_claim_metadata_for_sd_hash(self, sd_hash): - d = self.db.runQuery("select name, txid from name_metadata where sd_hash=?", (sd_hash,)) + d = self.db.runQuery("select name, txid, n from name_metadata where sd_hash=?", (sd_hash,)) d.addCallback(lambda r: r[0] if r else None) return d - def _update_claimid(self, claim_id, name, txid): - assert len(txid) == 64, "That's not a txid: %s" % str(txid) - d = self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=?", (claim_id, name, txid)) - d.addCallback(lambda r: self.db.runQuery("insert into claim_ids values (?, ?, ?)", (claim_id, name, txid))) + def _update_claimid(self, claim_id, name, claim_outpoint): + d = self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?", + (claim_id, name, claim_outpoint['txid'], claim_outpoint['nout'])) + d.addCallback( + lambda _: self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?", + (claim_id, name, claim_outpoint['txid'], UNSET_NOUT))) + + d.addCallback(lambda r: self.db.runQuery("insert into claim_ids values (?, ?, ?, ?)", + (claim_id, name, claim_outpoint['txid'], claim_outpoint['nout']))) d.addCallback(lambda _: claim_id) return d - def _get_claimid_for_tx(self, name, txid): - assert len(txid) == 64, "That's not a txid: %s" % str(txid) - d = self.db.runQuery("select claimId from claim_ids where name=? and txid=?", (name, txid)) + def _get_claimid_for_tx(self, name, claim_outpoint): + d = self.db.runQuery("select claimId from claim_ids where name=? and txid=? and n=?", (name, claim_outpoint['txid'], claim_outpoint['nout'])) d.addCallback(lambda r: r[0][0] if r else None) return d @@ -721,10 +731,10 @@ class Wallet(object): def _get_decoded_tx(self, raw_tx): return defer.fail(NotImplementedError()) - def _send_abandon(self, txid, address, amount): + def _abandon_claim(self, claim_outpoint): return defer.fail(NotImplementedError()) - def _send_name_claim_update(self, name, claim_id, txid, value, amount): + def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): return defer.fail(NotImplementedError()) def _support_claim(self, name, claim_id, amount): @@ -748,6 +758,9 @@ class Wallet(object): def _address_is_mine(self, address): return defer.fail(NotImplementedError()) + def _get_transaction(self, txid): + return defer.fail(NotImplementedError()) + def _start(self): pass @@ -865,11 +878,14 @@ class LBRYcrdWallet(Wallet): def _get_decoded_tx(self, raw_tx): return threads.deferToThread(self._get_decoded_tx_rpc, raw_tx) + def _get_transaction(self, txid): + return threads.deferToThread(self._get_raw_tx_rpc, txid, 1) + def _send_abandon(self, txid, address, amount): return threads.deferToThread(self._send_abandon_rpc, txid, address, amount) - def _send_name_claim_update(self, name, claim_id, txid, value, amount): - return threads.deferToThread(self._update_name_rpc, txid, value, amount) + def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): + return threads.deferToThread(self._update_name_rpc, claim_outpoint, value, amount) def _support_claim(self, name, claim_id, amount): return threads.deferToThread(self._support_claim_rpc, name, claim_id, amount) @@ -991,9 +1007,9 @@ class LBRYcrdWallet(Wallet): return rpc_conn.setgenerate(b) @_catch_connection_error - def _get_raw_tx_rpc(self, txid): + def _get_raw_tx_rpc(self, txid, raw=0): rpc_conn = self._get_rpc_conn() - return rpc_conn.getrawtransaction(txid) + return rpc_conn.getrawtransaction(txid, raw) @_catch_connection_error def _get_decoded_tx_rpc(self, raw): @@ -1051,9 +1067,10 @@ class LBRYcrdWallet(Wallet): return rpc_conn.getvalueforname(name) @_catch_connection_error - def _update_name_rpc(self, txid, value, amount): + def _update_name_rpc(self, claim_outpoint, value, amount): + # TODO use nout in updateclaim once lbrycrdd uses it rpc_conn = self._get_rpc_conn() - return rpc_conn.updateclaim(txid, json.dumps(value), amount) + return rpc_conn.updateclaim(claim_outpoint['txid'], json.dumps(value), amount) @_catch_connection_error def _send_name_claim_rpc(self, name, value, amount): @@ -1295,34 +1312,34 @@ class LBRYumWallet(Wallet): func = getattr(self.cmd_runner, cmd.name) return threads.deferToThread(func, txid) - def _send_name_claim(self, name, val, amount): - def send_claim(address): - cmd = known_commands['claimname'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, address, amount, name, json.dumps(val)) - d = self.get_new_address() - d.addCallback(send_claim) - d.addCallback(self._broadcast_transaction) + def _get_transaction(self, txid): + def _add_confirms(tx): + tx['confirmations'] = self.wallet.get_confirmations(txid) + return tx + + d = self._get_raw_tx(txid) + d.addCallback(self._get_decoded_tx) + d.addCallback(_add_confirms) return d + def _send_name_claim(self, name, val, amount): + cmd = known_commands['claim'] + func = getattr(self.cmd_runner, cmd.name) + return threads.deferToThread(func, name, json.dumps(val), amount) + def _get_claims_for_name(self, name): cmd = known_commands['getclaimsforname'] func = getattr(self.cmd_runner, cmd.name) return threads.deferToThread(func, name) - def _send_name_claim_update(self, name, claim_id, txid, value, amount): - def send_claim_update(address): - decoded_claim_id = claim_id.decode('hex')[::-1] - metadata = json.dumps(value) - log.info("updateclaim %s %s %f %s %s '%s'", txid, address, amount, name, decoded_claim_id.encode('hex'), metadata) - cmd = known_commands['updateclaim'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, txid, address, amount, name, decoded_claim_id, metadata) + def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): + metadata = json.dumps(value) + log.info("updateclaim %s %d %f %s %s '%s'", claim_outpoint['txid'], claim_outpoint['nout'], + amount, name, claim_id, metadata) + cmd = known_commands['update'] + func = getattr(self.cmd_runner, cmd.name) + return threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout'], name, claim_id, metadata, amount) - d = self.get_new_address() - d.addCallback(send_claim_update) - d.addCallback(self._broadcast_transaction) - return d def _get_decoded_tx(self, raw_tx): tx = Transaction(raw_tx) @@ -1334,23 +1351,18 @@ class LBRYumWallet(Wallet): decoded_tx['vout'].append(out) return decoded_tx - def _send_abandon(self, txid, address, amount): - log.info("Abandon %s %s %f" % (txid, address, amount)) - cmd = known_commands['abandonclaim'] + def _abandon_claim(self, claim_outpoint): + log.info("Abandon %s %s" % (claim_outpoint['txid'],claim_outpoint['nout'])) + cmd = known_commands['abandon'] func = getattr(self.cmd_runner, cmd.name) - d = threads.deferToThread(func, txid, address, amount) - d.addCallback(self._broadcast_transaction) + d = threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout']) return d def _support_claim(self, name, claim_id, amount): - def _send_support(d, a, n, c): - cmd = known_commands['supportclaim'] - func = getattr(self.cmd_runner, cmd.name) - d = threads.deferToThread(func, d, a, n, c) - return d - d = self.get_new_address() - d.addCallback(lambda address: _send_support(address, amount, name, claim_id)) - d.addCallback(self._broadcast_transaction) + log.info("Support %s %s %f" % (name, claim_id, amount)) + cmd = known_commands['support'] + func = getattr(self.cmd_runner, cmd.name) + d = threads.deferToThread(func, name, claim_id, amount) return d def _broadcast_transaction(self, raw_tx): diff --git a/lbrynet/db_migrator/dbmigrator.py b/lbrynet/db_migrator/dbmigrator.py index 2eedf917a..152f24476 100644 --- a/lbrynet/db_migrator/dbmigrator.py +++ b/lbrynet/db_migrator/dbmigrator.py @@ -1,18 +1,17 @@ import logging -# import os def migrate_db(db_dir, start, end): current = start - old_dirs = [] - # if os.name == "nt": - # return old_dirs - # while current < end: - # if current == 0: - # from lbrynet.db_migrator.migrate0to1 import do_migration - # old_dirs.append(do_migration(db_dir)) - # current += 1 - return old_dirs + while current < end: + if current == 1: + from lbrynet.db_migrator.migrate1to2 import do_migration + do_migration(db_dir) + else: + raise Exception("DB migration of version {} to {} is not available".format(current,current+1)) + current += 1 + + return None def run_migration_script(): @@ -25,4 +24,4 @@ def run_migration_script(): if __name__ == "__main__": - run_migration_script() \ No newline at end of file + run_migration_script() diff --git a/lbrynet/db_migrator/migrate1to2.py b/lbrynet/db_migrator/migrate1to2.py new file mode 100644 index 000000000..7ec6b5d79 --- /dev/null +++ b/lbrynet/db_migrator/migrate1to2.py @@ -0,0 +1,72 @@ +import sqlite3 +import os +import logging + +log = logging.getLogger(__name__) +UNSET_NOUT = -1 + +def do_migration(db_dir): + log.info("Doing the migration") + migrate_blockchainname_db(db_dir) + log.info("Migration succeeded") + + +def migrate_blockchainname_db(db_dir): + blockchainname_db = os.path.join(db_dir,"blockchainname.db") + # skip migration on fresh installs + if not os.path.isfile(blockchainname_db): + return + temp_db = sqlite3.connect(":memory:") + db_file = sqlite3.connect(blockchainname_db) + file_cursor = db_file.cursor() + mem_cursor = temp_db.cursor() + + mem_cursor.execute("create table if not exists name_metadata (" + " name text, " + " txid text, " + " n integer, " + " sd_hash text)") + mem_cursor.execute("create table if not exists claim_ids (" + " claimId text, " + " name text, " + " txid text, " + " n integer)") + temp_db.commit() + + name_metadata = file_cursor.execute("select * from name_metadata").fetchall() + claim_metadata = file_cursor.execute("select * from claim_ids").fetchall() + + # fill n as V1_UNSET_NOUT, Wallet.py will be responsible for filling in correct n + for name, txid, sd_hash in name_metadata: + mem_cursor.execute("insert into name_metadata values (?, ?, ?, ?) ", (name, txid, UNSET_NOUT, sd_hash)) + + for claim_id, name, txid in claim_metadata: + mem_cursor.execute("insert into claim_ids values (?, ?, ?, ?)", (claim_id, name, txid, UNSET_NOUT)) + temp_db.commit() + + new_name_metadata = mem_cursor.execute("select * from name_metadata").fetchall() + new_claim_metadata = mem_cursor.execute("select * from claim_ids").fetchall() + + file_cursor.execute("drop table name_metadata") + file_cursor.execute("create table name_metadata (" + " name text, " + " txid text, " + " n integer, " + " sd_hash text)") + + for name, txid, n, sd_hash in new_name_metadata: + file_cursor.execute("insert into name_metadata values (?, ?, ?, ?) ", (name, txid, n, sd_hash)) + + file_cursor.execute("drop table claim_ids") + file_cursor.execute("create table claim_ids (" + " claimId text, " + " name text, " + " txid text, " + " n integer)") + + for claim_id, name, txid, n in new_claim_metadata: + file_cursor.execute("insert into claim_ids values (?, ?, ?, ?)", (claim_id, name, txid, n)) + + db_file.commit() + db_file.close() + temp_db.close() diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 7ce5adb9d..bf0b49d49 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -20,7 +20,6 @@ log = logging.getLogger(__name__) class ManagedEncryptedFileDownloader(EncryptedFileSaver): - STATUS_RUNNING = "running" STATUS_STOPPED = "stopped" STATUS_FINISHED = "finished" @@ -29,10 +28,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): lbry_file_manager, payment_rate_manager, wallet, download_directory, upload_allowed, file_name=None): EncryptedFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, - stream_info_manager, payment_rate_manager, wallet, download_directory, - upload_allowed, file_name) + stream_info_manager, payment_rate_manager, wallet, download_directory, + upload_allowed, file_name) self.sd_hash = None self.txid = None + self.nout = None self.uri = None self.claim_id = None self.rowid = rowid @@ -42,35 +42,36 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def restore(self): d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) - def _save_sd_hash(sd_hash): - if len(sd_hash): + def _save_stream_info(sd_hash): + if sd_hash: self.sd_hash = sd_hash[0] d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) + d.addCallback(lambda r: _save_claim(r[0], r[1], r[2])) + return d else: - d = defer.succeed(None) - - return d + return None def _save_claim_id(claim_id): self.claim_id = claim_id return defer.succeed(None) - def _notify_bad_claim(name, txid): - log.error("Error loading name claim for lbry file: lbry://%s, tx %s does not contain a valid claim", name, txid) - log.warning("lbry file for lbry://%s, tx %s has no claim, deleting it", name, txid) + def _notify_bad_claim(name, txid, nout): + err_msg = "Error loading name claim for lbry file: \ + lbry://%s, tx %s output %i does not contain a valid claim, deleting it" + log.error(err_msg, name, txid, nout) return self.lbry_file_manager.delete_lbry_file(self) - def _save_claim(name, txid): + def _save_claim(name, txid, nout): self.uri = name self.txid = txid - d = self.wallet.get_claimid(name, txid) - d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid)) + self.nout = nout + d = self.wallet.get_claimid(name, txid, nout) + d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid, nout)) return d reflector_server = random.choice(settings.reflector_servers) - d.addCallback(_save_sd_hash) - d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None) + d.addCallback(_save_stream_info) d.addCallback(lambda _: reupload.check_and_restore_availability(self, reflector_server)) d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) @@ -91,7 +92,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def set_saving_status_done(): self.saving_status = False - d = EncryptedFileDownloader.stop(self, err=err) # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. + # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. + d = EncryptedFileDownloader.stop(self, err=err) if change_status is True: self.saving_status = True d.addCallback(lambda _: self._save_status()) @@ -140,13 +142,14 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): return d - def _save_claim(name, txid): + def _save_claim(name, txid, nout): self.uri = name self.txid = txid + self.nout = nout return defer.succeed(None) d.addCallback(_save_sd_hash) - d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None) + d.addCallback(lambda r: _save_claim(r[0], r[1], r[2]) if r else None) d.addCallback(lambda _: self._save_status()) return d @@ -204,4 +207,4 @@ class ManagedEncryptedFileDownloaderFactory(object): @staticmethod def get_description(): - return "Save the file to disk" \ No newline at end of file + return "Save the file to disk" diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index 876912fcb..547b1ef3a 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -1348,7 +1348,7 @@ class CreatePlainStreamDescriptor(CommandHandler): self.overwrite_old = True else: file_name = self.lbry_file.file_name - file_name = file_name + ".cryptsd" + file_name += ".cryptsd" return defer.succeed(file_name) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index ed9198482..8b2d0eaf0 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -47,7 +47,7 @@ from lbrynet.core.Wallet import LBRYcrdWallet, LBRYumWallet from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory -from lbrynet.core.Error import InsufficientFundsError, InvalidNameError +from lbrynet.core.Error import InsufficientFundsError, InvalidNameError, UnknownNameError log = logging.getLogger(__name__) @@ -253,7 +253,8 @@ class Daemon(AuthJSONRPCServer): self.platform = None self.first_run = None self.log_file = conf.settings.get_log_filename() - self.current_db_revision = 1 + self.current_db_revision = 2 + self.db_revision_file = conf.settings.get_db_revision_filename() self.session = None self.uploaded_temp_files = [] self._session_id = base58.b58encode(utils.generate_id()) @@ -436,10 +437,13 @@ class Daemon(AuthJSONRPCServer): if not self.connected_to_internet: self.connection_problem = CONNECTION_PROBLEM_CODES[1] - def _add_to_pending_claims(self, name, txid): - log.info("Adding lbry://%s to pending claims, txid %s" % (name, txid)) - self.pending_claims[name] = txid - return txid + # claim_out is dictionary containing 'txid' and 'nout' + def _add_to_pending_claims(self, name, claim_out): + txid = claim_out['txid'] + nout = claim_out['nout'] + log.info("Adding lbry://%s to pending claims, txid %s nout %d" % (name, txid, nout)) + self.pending_claims[name] = (txid,nout) + return claim_out def _check_pending_claims(self): # TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY. @@ -453,15 +457,17 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda l: _start_file(l) if l.stopped else "LBRY file was already running") def re_add_to_pending_claims(name): - txid = self.pending_claims.pop(name) - self._add_to_pending_claims(name, txid) + log.warning("Re-add %s to pending claims", name) + txid, nout = self.pending_claims.pop(name) + claim_out = {'txid':txid,'nout':nout} + self._add_to_pending_claims(name, claim_out) def _process_lbry_file(name, lbry_file): # lbry_file is an instance of ManagedEncryptedFileDownloader or None # TODO: check for sd_hash in addition to txid ready_to_start = ( lbry_file and - self.pending_claims[name] == lbry_file.txid + self.pending_claims[name] == (lbry_file.txid,lbry_file.nout) ) if ready_to_start: _get_and_start_file(name) @@ -673,41 +679,41 @@ class Daemon(AuthJSONRPCServer): return defer.succeed(True) + def _write_db_revision_file(self,version_num): + with open(self.db_revision_file, mode='w') as db_revision: + db_revision.write(str(version_num)) + def _setup_data_directory(self): + old_revision = 1 self.startup_status = STARTUP_STAGES[1] log.info("Loading databases...") if self.created_data_dir: - db_revision_path = os.path.join(self.db_dir, "db_revision") - with open(db_revision_path, mode='w') as db_revision: - db_revision.write(str(self.current_db_revision)) - log.debug("Created the db revision file: %s", db_revision_path) + self._write_db_revision_file(self.current_db_revision) + log.debug("Created the db revision file: %s", self.db_revision_file) if not os.path.exists(self.blobfile_dir): os.mkdir(self.blobfile_dir) log.debug("Created the blobfile directory: %s", str(self.blobfile_dir)) + if not os.path.exists(self.db_revision_file): + log.warning("db_revision file not found. Creating it") + self._write_db_revision_file(old_revision) def _check_db_migration(self): old_revision = 1 - db_revision_file = os.path.join(self.db_dir, "db_revision") - if os.path.exists(db_revision_file): - old_revision = int(open(db_revision_file).read().strip()) + if os.path.exists(self.db_revision_file): + old_revision = int(open(self.db_revision_file).read().strip()) + if old_revision > self.current_db_revision: return defer.fail(Exception('This version of lbrynet is not compatible with the database')) + + def update_version_file_and_print_success(): + self._write_db_revision_file(self.current_db_revision) + log.info("Finished upgrading the databases.") + if old_revision < self.current_db_revision: from lbrynet.db_migrator import dbmigrator log.info("Upgrading your databases...") d = threads.deferToThread(dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision) - - def print_success(old_dirs): - success_string = "Finished upgrading the databases. It is now safe to delete the" - success_string += " following directories, if you feel like it. It won't make any" - success_string += " difference.\nAnyway here they are: " - for i, old_dir in enumerate(old_dirs): - success_string += old_dir - if i + 1 < len(old_dir): - success_string += ", " - log.info(success_string) - - d.addCallback(print_success) + d.addCallback(lambda _: update_version_file_and_print_success()) return d return defer.succeed(True) @@ -970,7 +976,6 @@ class Daemon(AuthJSONRPCServer): if l.sd_hash == sd: return defer.succeed(l) return defer.succeed(None) - d = self._resolve_name(name) d.addCallback(_get_file) @@ -1082,7 +1087,7 @@ class Daemon(AuthJSONRPCServer): elif self.startup_status[0] == LOADING_wallet_CODE: if self.wallet_type == LBRYUM_WALLET: if self.session.wallet.blocks_behind_alert != 0: - r['message'] = r['message'] % (str(self.session.wallet.blocks_behind_alert) + " blocks behind") + r['message'] %= str(self.session.wallet.blocks_behind_alert) + " blocks behind" r['progress'] = self.session.wallet.catchup_progress else: r['message'] = "Catching up with the blockchain" @@ -1390,6 +1395,9 @@ class Daemon(AuthJSONRPCServer): Args: 'name': name to look up, string, do not include lbry:// prefix + 'txid': optional, if specified, look for claim with this txid + 'nout': optional, if specified, look for claim with this nout + Returns: txid, amount, value, n, height """ @@ -1403,7 +1411,8 @@ class Daemon(AuthJSONRPCServer): name = p[FileID.NAME] txid = p.get('txid', None) - d = self.session.wallet.get_claim_info(name, txid) + nout = p.get('nout', None) + d = self.session.wallet.get_claim_info(name, txid, nout) d.addCallback(_convert_amount_to_float) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1585,7 +1594,12 @@ class Daemon(AuthJSONRPCServer): 'metadata': metadata dictionary optional 'fee' Returns: - Claim txid + 'success' : True if claim was succesful , False otherwise + 'reason' : if not succesful, give reason + 'txid' : txid of resulting transaction if succesful + 'nout' : nout of the resulting support claim if succesful + 'fee' : fee paid for the claim transaction if succesful + 'claimid' : claimid of the resulting transaction """ def _set_address(address, currency, m): @@ -1593,10 +1607,10 @@ class Daemon(AuthJSONRPCServer): m['fee'][currency]['address'] = address return m - def _reflect_if_possible(sd_hash, txid): + def _reflect_if_possible(sd_hash, claim_out): d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) d.addCallback(self._reflect) - d.addCallback(lambda _: txid) + d.addCallback(lambda _: claim_out) return d name = p[FileID.NAME] @@ -1647,9 +1661,9 @@ class Daemon(AuthJSONRPCServer): else: d.addCallback(lambda meta: self.session.wallet.claim_name(name, bid, meta)) if sd_hash: - d.addCallback(lambda txid: _reflect_if_possible(sd_hash, txid)) + d.addCallback(lambda claim_out: _reflect_if_possible(sd_hash, claim_out)) - d.addCallback(lambda txid: self._add_to_pending_claims(name, txid)) + d.addCallback(lambda claim_out: self._add_to_pending_claims(name, claim_out)) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1658,15 +1672,18 @@ class Daemon(AuthJSONRPCServer): def jsonrpc_abandon_claim(self, p): """ Abandon a name and reclaim credits from the claim - Args: 'txid': txid of claim, string + 'nout': nout of claim, integer Return: - txid + success : True if succesful , False otherwise + reason : if not succesful, give reason + txid : txid of resulting transaction if succesful + fee : fee paid for the transaction if succesful """ - - if 'txid' in p.keys(): + if 'txid' in p.keys() and 'nout' in p.keys(): txid = p['txid'] + nout = p['nout'] else: return server.failure @@ -1675,7 +1692,7 @@ class Daemon(AuthJSONRPCServer): return self._render_response(x, OK_CODE) d = defer.Deferred() - d.addCallback(lambda _: self.session.wallet.abandon_name(txid)) + d.addCallback(lambda _: self.session.wallet.abandon_claim(txid,nout)) d.addCallback(_disp) d.callback(None) @@ -1704,7 +1721,12 @@ class Daemon(AuthJSONRPCServer): 'claim_id': claim id of claim to support 'amount': amount to support by Return: - txid + success : True if succesful , False otherwise + reason : if not succesful, give reason + txid : txid of resulting transaction if succesful + nout : nout of the resulting support claim if succesful + fee : fee paid for the transaction if succesful + """ name = p[FileID.NAME] @@ -1780,7 +1802,7 @@ class Daemon(AuthJSONRPCServer): txid = p['txid'] - d = self.session.wallet.get_tx_json(txid) + d = self.session.wallet.get_transaction(txid) d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -2289,6 +2311,7 @@ def get_darwin_lbrycrdd_path(): default = "./lbrycrdd" try: import Foundation + # TODO: require pyobjc and pyobjc-core on os x except ImportError: log.warning('Foundation module not installed, falling back to default lbrycrdd path') return default diff --git a/lbrynet/lbrynet_daemon/Publisher.py b/lbrynet/lbrynet_daemon/Publisher.py index 5c717cde0..f62772ecf 100644 --- a/lbrynet/lbrynet_daemon/Publisher.py +++ b/lbrynet/lbrynet_daemon/Publisher.py @@ -30,6 +30,7 @@ class Publisher(object): self.verified = False self.lbry_file = None self.txid = None + self.nout = None self.stream_hash = None # TODO: this needs to be passed into the constructor reflector_server = random.choice(settings.reflector_servers) @@ -39,10 +40,12 @@ class Publisher(object): def start(self, name, file_path, bid, metadata): log.info('Starting publish for %s', name) def _show_result(): - log.info( - "Success! Published %s --> lbry://%s txid: %s", - self.file_name, self.publish_name, self.txid) - return defer.succeed(self.txid) + log.info("Success! Published %s --> lbry://%s txid: %s nout: %d", + self.file_name, self.publish_name, self.txid, self.nout) + out = {} + out['nout'] = self.nout + out['txid'] = self.txid + return defer.succeed(out) self.publish_name = name self.file_path = file_path @@ -128,12 +131,18 @@ class Publisher(object): self._update_metadata() m = Metadata(self.metadata) - def set_tx_hash(txid): - log.debug('Name claimed using txid: %s', txid) + def set_txid_nout(claim_out): + if not claim_out['success']: + msg = 'Failed to claim name:{}'.format(claim_out['reason']) + defer.fail(Exception(msg)) + txid = claim_out['txid'] + nout = claim_out['nout'] + log.debug('Name claimed using txid: %s, nout: %d', txid, nout) self.txid = txid + self.nout = nout d = self.wallet.claim_name(self.publish_name, self.bid_amount, m) - d.addCallback(set_tx_hash) + d.addCallback(set_txid_nout) return d def _update_metadata(self): diff --git a/tests/dht/testNode.py b/tests/dht/testNode.py index e196d2c95..f4839da10 100644 --- a/tests/dht/testNode.py +++ b/tests/dht/testNode.py @@ -132,13 +132,14 @@ class NodeContactTest(unittest.TestCase): """ Some scaffolding for the NodeLookupTest class. Allows isolated node testing by simulating remote node responses""" from twisted.internet import protocol, defer, selectreactor from lbrynet.dht.msgtypes import ResponseMessage + + class FakeRPCProtocol(protocol.DatagramProtocol): def __init__(self): self.reactor = selectreactor.SelectReactor() self.testResponse = None self.network = None - - + def createNetwork(self, contactNetwork): """ set up a list of contacts together with their closest contacts @param contactNetwork: a sequence of tuples, each containing a contact together with its closest @@ -198,12 +199,7 @@ class FakeRPCProtocol(protocol.DatagramProtocol): df = defer.Deferred() df.callback((message,(contact.address, contact.port))) return df - - - - - print "findValue" - + def _send(self, data, rpcID, address): """ fake sending data """ @@ -242,8 +238,7 @@ class NodeLookupTest(unittest.TestCase): for i in range(self.contactsAmount): # create the testNodeIDs in ascending order, away from the actual node ID, with regards to the distance metric self.testNodeIDs.append(idNum + i + 1) - - + # generate contacts self.contacts = [] for i in range(self.contactsAmount):