Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
hackrush
ba8bd627a8 return cached resolve result for 1 block 2019-01-05 00:29:19 +05:30
hackrush
e6528963ad Actually get the certificate_id 2018-12-25 12:16:06 +05:30
7 changed files with 98 additions and 16 deletions

View file

@ -86,7 +86,7 @@ class DatabaseComponent(Component):
@staticmethod @staticmethod
def get_current_db_revision(): def get_current_db_revision():
return 9 return 10
@staticmethod @staticmethod
def get_revision_filename(): def get_revision_filename():

View file

@ -22,6 +22,8 @@ def migrate_db(db_dir, start, end):
from .migrate7to8 import do_migration from .migrate7to8 import do_migration
elif current == 8: elif current == 8:
from .migrate8to9 import do_migration from .migrate8to9 import do_migration
elif current == 9:
from .migrate9to10 import do_migration
else: else:
raise Exception("DB migration of version {} to {} is not available".format(current, raise Exception("DB migration of version {} to {} is not available".format(current,
current+1)) current+1))

View file

@ -0,0 +1,11 @@
import sqlite3
import os
def do_migration(db_dir):
db_path = os.path.join(db_dir, "lbrynet.sqlite")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
cursor.executescript("ALTER TABLE claim ADD resolved_at int DEFAULT 0 NOT NULL;")
connection.commit()
connection.close()

View file

@ -12,6 +12,7 @@ from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.decode import smart_decode from lbrynet.schema.decode import smart_decode
from lbrynet.blob.CryptBlob import CryptBlobInfo from lbrynet.blob.CryptBlob import CryptBlobInfo
from lbrynet.dht.constants import dataExpireTimeout from lbrynet.dht.constants import dataExpireTimeout
from lbrynet.schema.uri import parse_lbry_uri
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -386,8 +387,8 @@ class SQLiteStorage:
def _store_stream(transaction): def _store_stream(transaction):
transaction.execute("insert into stream values (?, ?, ?, ?, ?);", transaction.execute("insert into stream values (?, ?, ?, ?, ?);",
(stream_hash, sd_hash, stream_key, stream_name, (stream_hash, sd_hash, stream_key, stream_name,
suggested_file_name)) suggested_file_name))
for blob_info in stream_blob_infos: for blob_info in stream_blob_infos:
transaction.execute("insert into stream_blob values (?, ?, ?, ?)", transaction.execute("insert into stream_blob values (?, ?, ?, ?)",
@ -568,13 +569,11 @@ class SQLiteStorage:
return self.db.runInteraction(_save_support) return self.db.runInteraction(_save_support)
def get_supports(self, *claim_ids): def get_supports(self, *claim_ids):
def _format_support(outpoint, supported_id, amount, address): def _format_support(outpoint, amount):
return { return {
"txid": outpoint.split(":")[0], "txid": outpoint.split(":")[0],
"nout": int(outpoint.split(":")[1]), "nout": int(outpoint.split(":")[1]),
"claim_id": supported_id,
"amount": dewies_to_lbc(amount), "amount": dewies_to_lbc(amount),
"address": address,
} }
def _get_supports(transaction): def _get_supports(transaction):
@ -582,7 +581,7 @@ class SQLiteStorage:
_format_support(*support_info) _format_support(*support_info)
for support_info in _batched_select( for support_info in _batched_select(
transaction, transaction,
"select * from support where claim_id in {}", "select support_outpoint, amount from support where claim_id in {}",
tuple(claim_ids) tuple(claim_ids)
) )
] ]
@ -604,8 +603,9 @@ class SQLiteStorage:
height = claim_info['height'] height = claim_info['height']
address = claim_info['address'] address = claim_info['address']
sequence = claim_info['claim_sequence'] sequence = claim_info['claim_sequence']
resolved_at = claim_info['height'] + claim_info['depth']
try: try:
certificate_id = claim_info['value'].get('content_claims_to_update', {}).get('certificateId') certificate_id = claim_info['value'].get('publisherSignature', {}).get('certificateId')
except AttributeError: except AttributeError:
certificate_id = None certificate_id = None
try: try:
@ -617,8 +617,9 @@ class SQLiteStorage:
source_hash = None source_hash = None
serialized = claim_info.get('hex') or hexlify(smart_decode(claim_info['value']).serialized) serialized = claim_info.get('hex') or hexlify(smart_decode(claim_info['value']).serialized)
transaction.execute( transaction.execute(
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence) (outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence,
resolved_at)
) )
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info # support info
@ -794,6 +795,24 @@ class SQLiteStorage:
claims[stream_hash] = claim claims[stream_hash] = claim
defer.returnValue(claims) defer.returnValue(claims)
def get_resolved_at_for_outpoint(self, outpoint):
return self.run_and_return_one_or_none(
"select resolved_at from claim "
"where claim_outpoint = ?",
outpoint
)
def get_outpoint_for_uri(self, uri):
uri = parse_lbry_uri(uri)
claim_id_param = "%" if not uri.claim_id else uri.claim_id
return self.run_and_return_one_or_none(
"select claim_outpoint from claim "
"where claim_name=? and claim_id like ?"
"order by height desc, amount desc "
"limit 1",
uri.name, claim_id_param
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_claim(self, claim_outpoint, include_supports=True): def get_claim(self, claim_outpoint, include_supports=True):
def _get_claim(transaction): def _get_claim(transaction):
@ -872,19 +891,19 @@ class SQLiteStorage:
# Helper functions # Helper functions
def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence): def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, _channel_id, address, claim_sequence,
_resolved_at):
r = { r = {
"name": name, "name": name,
"claim_id": claim_id, "claim_id": claim_id,
"address": address, "address": address,
"claim_sequence": claim_sequence, "claim_sequence": claim_sequence,
"hex": serialized,
"value": ClaimDict.deserialize(unhexlify(serialized)).claim_dict, "value": ClaimDict.deserialize(unhexlify(serialized)).claim_dict,
"height": height, "height": height,
"amount": dewies_to_lbc(amount), "amount": dewies_to_lbc(amount),
"nout": int(outpoint.split(":")[1]), "nout": int(outpoint.split(":")[1]),
"txid": outpoint.split(":")[0], "txid": outpoint.split(":")[0],
"channel_claim_id": channel_id,
"channel_name": None
} }
return r return r

View file

@ -30,4 +30,4 @@ def lbc_to_dewies(lbc: str) -> int:
def dewies_to_lbc(dewies) -> str: def dewies_to_lbc(dewies) -> str:
return satoshis_to_coins(dewies) return dewies if isinstance(dewies, str) else satoshis_to_coins(dewies)

View file

@ -10,6 +10,7 @@ from typing import Optional
from twisted.internet import defer from twisted.internet import defer
from lbrynet.schema.schema import SECP256k1 from lbrynet.schema.schema import SECP256k1
from lbrynet.schema.uri import parse_lbry_uri
from torba.client.basemanager import BaseWalletManager from torba.client.basemanager import BaseWalletManager
from torba.rpc.jsonrpc import CodeMessageError from torba.rpc.jsonrpc import CodeMessageError
@ -25,6 +26,10 @@ from lbrynet.extras.wallet.dewies import dewies_to_lbc
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def d2f(deferred):
return deferred.asFuture(asyncio.get_event_loop())
class ReservedPoints: class ReservedPoints:
def __init__(self, identifier, amount): def __init__(self, identifier, amount):
self.identifier = identifier self.identifier = identifier
@ -268,16 +273,55 @@ class LbryWalletManager(BaseWalletManager):
def get_info_exchanger(self): def get_info_exchanger(self):
return LBRYcrdAddressRequester(self) return LBRYcrdAddressRequester(self)
async def fetch_from_cache(self, uri):
resolver = self.ledger.resolver
is_channel = parse_lbry_uri(uri).is_channel
claim_outpoint = await d2f(self.old_db.get_outpoint_for_uri(uri))
last_resolved_at = await d2f(self.old_db.get_resolved_at_for_outpoint(claim_outpoint))
if not last_resolved_at or resolver.height - last_resolved_at > 1:
return False
log.info("trying to fetch %s from cache", uri)
claim = await d2f(self.old_db.get_claim(claim_outpoint))
claim['depth'] = resolver.height - claim['height']
result = {"claim": claim}
if claim.get("channel_name", False):
certificate_uri = "{}#{}".format(claim.get("channel_name", ""),
claim['value'].get('publisherSignature', {}).get('certificateId'))
certificate_outpoint = await d2f(self.old_db.get_outpoint_for_uri(certificate_uri))
certificate = await d2f(self.old_db.get_claim(certificate_outpoint))
certificate['depth'] = resolver.height - certificate['height']
result["certificate"] = await resolver.parse_and_validate_claim_result(certificate)
result["claim"] = await resolver.parse_and_validate_claim_result(result["claim"],
certificate=result.get(
"certificate", None))
if is_channel:
result = {"certificate": result["claim"]}
return result
async def resolve(self, *uris, **kwargs): async def resolve(self, *uris, **kwargs):
page = kwargs.get('page', 0) page = kwargs.get('page', 0)
page_size = kwargs.get('page_size', 10) page_size = kwargs.get('page_size', 10)
check_cache = kwargs.get('check_cache', False) # TODO: put caching back (was force_refresh parameter) check_cache = kwargs.get('check_cache', False) # TODO: put caching back (was force_refresh parameter)
cached_results = dict()
needed = list()
if check_cache:
for uri in uris:
cached_results[uri] = await self.fetch_from_cache(uri)
needed = [key for key in cached_results if not cached_results[key]]
ledger: MainNetLedger = self.default_account.ledger ledger: MainNetLedger = self.default_account.ledger
results = await ledger.resolve(page, page_size, *uris) results = await ledger.resolve(page, page_size, *needed)
if 'error' not in results: if 'error' not in results:
await self.old_db.save_claims_for_resolve([ await self.old_db.save_claims_for_resolve([
value for value in results.values() if 'error' not in value value for value in results.values() if 'error' not in value
]).asFuture(asyncio.get_event_loop()) ]).asFuture(asyncio.get_event_loop())
results = {**cached_results, **results}
return results return results
async def get_claims_for_name(self, name: str): async def get_claims_for_name(self, name: str):

View file

@ -180,7 +180,8 @@ class Resolver:
if decoded.has_signature: if decoded.has_signature:
if certificate is None: if certificate is None:
log.info("fetching certificate to check claim signature") log.info("fetching certificate to check claim signature")
certificate = await self.network.get_claims_by_ids(decoded.certificate_id.decode()) certificate_id = decoded.certificate_id.decode()
certificate = (await self.network.get_claims_by_ids(certificate_id)).get(certificate_id, {})
if not certificate: if not certificate:
log.warning('Certificate %s not found', decoded.certificate_id) log.warning('Certificate %s not found', decoded.certificate_id)
claim_result['has_signature'] = True claim_result['has_signature'] = True
@ -298,6 +299,11 @@ def format_amount_value(obj):
if not isinstance(obj[k], float): if not isinstance(obj[k], float):
obj[k] = dewies_to_lbc(obj[k]) obj[k] = dewies_to_lbc(obj[k])
elif k == 'supports' and isinstance(v, list): elif k == 'supports' and isinstance(v, list):
# supports are already in the desired format
# the following 2 lines are bad as supports should be passed in a consistent manner
# but it is unavoidable at the moment because of so much other code linked to this :-(
if v and isinstance(v[0], dict):
continue
obj[k] = [{'txid': txid, 'nout': nout, 'amount': dewies_to_lbc(amount)} obj[k] = [{'txid': txid, 'nout': nout, 'amount': dewies_to_lbc(amount)}
for (txid, nout, amount) in v] for (txid, nout, amount) in v]
elif isinstance(v, (list, dict)): elif isinstance(v, (list, dict)):