fetch claim heights

This commit is contained in:
Jack Robison 2018-02-28 14:59:12 -05:00
parent 6a4b65a796
commit 01c4c6ed97
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 81 additions and 19 deletions

View file

@ -5,6 +5,7 @@ from decimal import Decimal
from zope.interface import implements
from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure
from twisted.internet.error import ConnectionAborted
from lbryum import wallet as lbryum_wallet
from lbryum.network import Network
@ -19,11 +20,11 @@ from lbryschema.error import DecodeError
from lbryschema.decode import smart_decode
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.utils import DeferredDict
from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint
from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError
from twisted.internet.error import ConnectionAborted
log = logging.getLogger(__name__)
@ -83,12 +84,15 @@ class Wallet(object):
self._manage_count = 0
self._balance_refresh_time = 3
self._batch_count = 20
self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims)
def start(self):
log.info("Starting wallet.")
def start_manage():
self.stopped = False
self.manage()
self._pending_claim_checker.start(30)
return True
d = self._start()
@ -102,6 +106,9 @@ class Wallet(object):
def stop(self):
log.info("Stopping wallet.")
self.stopped = True
if self._pending_claim_checker.running:
self._pending_claim_checker.stop()
# If self.next_manage_call is None, then manage is currently running or else
# start has not been called, so set stopped and do nothing else.
if self.next_manage_call is not None:
@ -315,6 +322,19 @@ class Wallet(object):
######
@defer.inlineCallbacks
def fetch_and_save_heights_for_pending_claims(self):
pending_outpoints = yield self.storage.get_pending_claim_outpoints()
if pending_outpoints:
tx_heights = yield DeferredDict({txid: self.get_height_for_txid(txid) for txid in pending_outpoints},
consumeErrors=True)
outpoint_heights = {}
for txid, outputs in pending_outpoints.iteritems():
if txid in tx_heights:
for nout in outputs:
outpoint_heights["%s:%i" % (txid, nout)] = tx_heights[txid]
yield self.storage.save_claim_tx_heights(outpoint_heights)
@defer.inlineCallbacks
def get_claim_by_claim_id(self, claim_id, check_expire=True):
claim = yield self._get_claim_by_claimid(claim_id)
@ -765,6 +785,9 @@ class Wallet(object):
def get_max_usable_balance_for_claim(self, claim_name):
return defer.fail(NotImplementedError())
def get_height_for_txid(self, txid):
return defer.fail(NotImplementedError())
def _start(self):
return defer.fail(NotImplementedError())
@ -1157,6 +1180,9 @@ class LBRYumWallet(Wallet):
def claim_renew(self, txid, nout):
return self._run_cmd_as_defer_succeed('renewclaim', txid, nout)
def get_height_for_txid(self, txid):
return self._run_cmd_as_defer_to_thread('gettransactionheight', txid)
def decrypt_wallet(self):
if not self.wallet.use_encryption:
return False

View file

@ -7,7 +7,7 @@ import string
import json
import pkg_resources
from twisted.internet import defer
from lbryschema.claim import ClaimDict
from lbrynet.core.cryptoutils import get_lbry_hash_obj
@ -146,3 +146,18 @@ def get_sd_hash(stream_info):
def json_dumps_pretty(obj, **kwargs):
return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs)
@defer.inlineCallbacks
def DeferredDict(d, consumeErrors=False):
keys = []
dl = []
response = {}
for k, v in d.iteritems():
keys.append(k)
dl.append(v)
results = yield defer.DeferredList(dl, consumeErrors=consumeErrors)
for k, (success, result) in zip(keys, results):
if success:
response[k] = result
defer.returnValue(response)

View file

@ -1638,9 +1638,9 @@ class Daemon(AuthJSONRPCServer):
if not resolved or 'value' not in resolved:
if 'claim' not in resolved:
raise Exception(
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
)
raise Exception(
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
)
else:
resolved = resolved['claim']
txid, nout, name = resolved['txid'], resolved['nout'], resolved['name']

View file

@ -42,15 +42,13 @@ class Publisher(object):
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
# check if we have a file already for this claim (if this is a publish update with a new stream)
old_stream_hashes = yield self.session.storage.get_stream_hashes_for_claim_id(claim_out['claim_id'])
old_stream_hashes = yield self.session.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'],
self.lbry_file.stream_hash)
if old_stream_hashes:
lbry_files = list(self.lbry_file_manager.lbry_files)
for lbry_file in lbry_files:
s_h = lbry_file.stream_hash
if s_h in old_stream_hashes:
yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False)
old_stream_hashes.remove(s_h)
log.info("Removed old stream for claim update: %s", s_h)
for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes,
list(self.lbry_file_manager.lbry_files)):
yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False)
log.info("Removed old stream for claim update: %s", lbry_file.stream_hash)
yield self.session.storage.save_content_claim(
self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])

View file

@ -579,12 +579,12 @@ class SQLiteStorage(object):
log.warning("claim %s contains the same stream as the one already downloaded from claim %s",
claim_id, known_claim_id)
def get_stream_hashes_for_claim_id(self, claim_id):
def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash):
return self.run_and_return_list(
"select f.stream_hash from file f "
"inner join content_claim cc on f.stream_hash=cc.stream_hash "
"inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=?",
claim_id
"inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=? "
"where f.stream_hash!=?", claim_id, new_stream_hash
)
@defer.inlineCallbacks
@ -670,7 +670,7 @@ class SQLiteStorage(object):
def _get_claim(transaction):
claim_info = transaction.execute(
"select * from claim where claim_id=? order by height, rowid desc", (claim_id, )
"select * from claim where claim_id=? order by rowid desc", (claim_id, )
).fetchone()
result = _claim_response(*claim_info)
if result['channel_claim_id']:
@ -701,3 +701,25 @@ class SQLiteStorage(object):
).fetchall()
]
return self.db.runInteraction(_get_unknown_certificate_claim_ids)
@defer.inlineCallbacks
def get_pending_claim_outpoints(self):
claim_outpoints = yield self.run_and_return_list("select claim_outpoint from claim where height=-1")
results = {} # {txid: [nout, ...]}
for outpoint_str in claim_outpoints:
txid, nout = outpoint_str.split(":")
outputs = results.get(txid, [])
outputs.append(int(nout))
results[txid] = outputs
if results:
log.debug("missing transaction heights for %i claims", len(results))
defer.returnValue(results)
def save_claim_tx_heights(self, claim_tx_heights):
def _save_claim_heights(transaction):
for outpoint, height in claim_tx_heights.iteritems():
transaction.execute(
"update claim set height=? where claim_outpoint=? and height=-1",
(height, outpoint)
)
return self.db.runInteraction(_save_claim_heights)

View file

@ -296,8 +296,9 @@ class ContentClaimStorageTests(StorageTest):
stored_content_claim = yield self.storage.get_content_claim(stream_hash)
self.assertDictEqual(stored_content_claim, fake_claim_info)
stream_hashes = yield self.storage.get_stream_hashes_for_claim_id(fake_claim_info['claim_id'])
self.assertListEqual(stream_hashes, [stream_hash])
stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'],
stream_hash)
self.assertListEqual(stream_hashes, [])
# test that we can't associate a claim update with a new stream to the file
second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash()