return cached resolve result for 1 block

This commit is contained in:
hackrush 2019-01-05 00:17:05 +05:30
parent e6528963ad
commit ba8bd627a8
7 changed files with 97 additions and 15 deletions

View file

@ -86,7 +86,7 @@ class DatabaseComponent(Component):
@staticmethod
def get_current_db_revision():
return 9
return 10
@staticmethod
def get_revision_filename():

View file

@ -22,6 +22,8 @@ def migrate_db(db_dir, start, end):
from .migrate7to8 import do_migration
elif current == 8:
from .migrate8to9 import do_migration
elif current == 9:
from .migrate9to10 import do_migration
else:
raise Exception("DB migration of version {} to {} is not available".format(current,
current+1))

View file

@ -0,0 +1,11 @@
import sqlite3
import os
def do_migration(db_dir):
db_path = os.path.join(db_dir, "lbrynet.sqlite")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
cursor.executescript("ALTER TABLE claim ADD resolved_at int DEFAULT 0 NOT NULL;")
connection.commit()
connection.close()

View file

@ -12,6 +12,7 @@ from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.decode import smart_decode
from lbrynet.blob.CryptBlob import CryptBlobInfo
from lbrynet.dht.constants import dataExpireTimeout
from lbrynet.schema.uri import parse_lbry_uri
log = logging.getLogger(__name__)
@ -386,8 +387,8 @@ class SQLiteStorage:
def _store_stream(transaction):
transaction.execute("insert into stream values (?, ?, ?, ?, ?);",
(stream_hash, sd_hash, stream_key, stream_name,
suggested_file_name))
(stream_hash, sd_hash, stream_key, stream_name,
suggested_file_name))
for blob_info in stream_blob_infos:
transaction.execute("insert into stream_blob values (?, ?, ?, ?)",
@ -568,13 +569,11 @@ class SQLiteStorage:
return self.db.runInteraction(_save_support)
def get_supports(self, *claim_ids):
def _format_support(outpoint, supported_id, amount, address):
def _format_support(outpoint, amount):
return {
"txid": outpoint.split(":")[0],
"nout": int(outpoint.split(":")[1]),
"claim_id": supported_id,
"amount": dewies_to_lbc(amount),
"address": address,
}
def _get_supports(transaction):
@ -582,7 +581,7 @@ class SQLiteStorage:
_format_support(*support_info)
for support_info in _batched_select(
transaction,
"select * from support where claim_id in {}",
"select support_outpoint, amount from support where claim_id in {}",
tuple(claim_ids)
)
]
@ -604,6 +603,7 @@ class SQLiteStorage:
height = claim_info['height']
address = claim_info['address']
sequence = claim_info['claim_sequence']
resolved_at = claim_info['height'] + claim_info['depth']
try:
certificate_id = claim_info['value'].get('publisherSignature', {}).get('certificateId')
except AttributeError:
@ -617,8 +617,9 @@ class SQLiteStorage:
source_hash = None
serialized = claim_info.get('hex') or hexlify(smart_decode(claim_info['value']).serialized)
transaction.execute(
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence)
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence,
resolved_at)
)
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info
@ -794,6 +795,24 @@ class SQLiteStorage:
claims[stream_hash] = claim
defer.returnValue(claims)
def get_resolved_at_for_outpoint(self, outpoint):
return self.run_and_return_one_or_none(
"select resolved_at from claim "
"where claim_outpoint = ?",
outpoint
)
def get_outpoint_for_uri(self, uri):
uri = parse_lbry_uri(uri)
claim_id_param = "%" if not uri.claim_id else uri.claim_id
return self.run_and_return_one_or_none(
"select claim_outpoint from claim "
"where claim_name=? and claim_id like ?"
"order by height desc, amount desc "
"limit 1",
uri.name, claim_id_param
)
@defer.inlineCallbacks
def get_claim(self, claim_outpoint, include_supports=True):
def _get_claim(transaction):
@ -872,19 +891,19 @@ class SQLiteStorage:
# Helper functions
def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence):
def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, _channel_id, address, claim_sequence,
_resolved_at):
r = {
"name": name,
"claim_id": claim_id,
"address": address,
"claim_sequence": claim_sequence,
"hex": serialized,
"value": ClaimDict.deserialize(unhexlify(serialized)).claim_dict,
"height": height,
"amount": dewies_to_lbc(amount),
"nout": int(outpoint.split(":")[1]),
"txid": outpoint.split(":")[0],
"channel_claim_id": channel_id,
"channel_name": None
}
return r

View file

@ -30,4 +30,4 @@ def lbc_to_dewies(lbc: str) -> int:
def dewies_to_lbc(dewies) -> str:
return satoshis_to_coins(dewies)
return dewies if isinstance(dewies, str) else satoshis_to_coins(dewies)

View file

@ -10,6 +10,7 @@ from typing import Optional
from twisted.internet import defer
from lbrynet.schema.schema import SECP256k1
from lbrynet.schema.uri import parse_lbry_uri
from torba.client.basemanager import BaseWalletManager
from torba.rpc.jsonrpc import CodeMessageError
@ -25,6 +26,10 @@ from lbrynet.extras.wallet.dewies import dewies_to_lbc
log = logging.getLogger(__name__)
def d2f(deferred):
return deferred.asFuture(asyncio.get_event_loop())
class ReservedPoints:
def __init__(self, identifier, amount):
self.identifier = identifier
@ -268,16 +273,55 @@ class LbryWalletManager(BaseWalletManager):
def get_info_exchanger(self):
return LBRYcrdAddressRequester(self)
async def fetch_from_cache(self, uri):
resolver = self.ledger.resolver
is_channel = parse_lbry_uri(uri).is_channel
claim_outpoint = await d2f(self.old_db.get_outpoint_for_uri(uri))
last_resolved_at = await d2f(self.old_db.get_resolved_at_for_outpoint(claim_outpoint))
if not last_resolved_at or resolver.height - last_resolved_at > 1:
return False
log.info("trying to fetch %s from cache", uri)
claim = await d2f(self.old_db.get_claim(claim_outpoint))
claim['depth'] = resolver.height - claim['height']
result = {"claim": claim}
if claim.get("channel_name", False):
certificate_uri = "{}#{}".format(claim.get("channel_name", ""),
claim['value'].get('publisherSignature', {}).get('certificateId'))
certificate_outpoint = await d2f(self.old_db.get_outpoint_for_uri(certificate_uri))
certificate = await d2f(self.old_db.get_claim(certificate_outpoint))
certificate['depth'] = resolver.height - certificate['height']
result["certificate"] = await resolver.parse_and_validate_claim_result(certificate)
result["claim"] = await resolver.parse_and_validate_claim_result(result["claim"],
certificate=result.get(
"certificate", None))
if is_channel:
result = {"certificate": result["claim"]}
return result
async def resolve(self, *uris, **kwargs):
page = kwargs.get('page', 0)
page_size = kwargs.get('page_size', 10)
check_cache = kwargs.get('check_cache', False) # TODO: put caching back (was force_refresh parameter)
cached_results = dict()
needed = list()
if check_cache:
for uri in uris:
cached_results[uri] = await self.fetch_from_cache(uri)
needed = [key for key in cached_results if not cached_results[key]]
ledger: MainNetLedger = self.default_account.ledger
results = await ledger.resolve(page, page_size, *uris)
results = await ledger.resolve(page, page_size, *needed)
if 'error' not in results:
await self.old_db.save_claims_for_resolve([
value for value in results.values() if 'error' not in value
]).asFuture(asyncio.get_event_loop())
results = {**cached_results, **results}
return results
async def get_claims_for_name(self, name: str):

View file

@ -180,7 +180,8 @@ class Resolver:
if decoded.has_signature:
if certificate is None:
log.info("fetching certificate to check claim signature")
certificate = await self.network.get_claims_by_ids(decoded.certificate_id.decode())
certificate_id = decoded.certificate_id.decode()
certificate = (await self.network.get_claims_by_ids(certificate_id)).get(certificate_id, {})
if not certificate:
log.warning('Certificate %s not found', decoded.certificate_id)
claim_result['has_signature'] = True
@ -298,6 +299,11 @@ def format_amount_value(obj):
if not isinstance(obj[k], float):
obj[k] = dewies_to_lbc(obj[k])
elif k == 'supports' and isinstance(v, list):
# supports are already in the desired format
# the following 2 lines are bad as supports should be passed in a consistent manner
# but it is unavoidable at the moment because of so much other code linked to this :-(
if v and isinstance(v[0], dict):
continue
obj[k] = [{'txid': txid, 'nout': nout, 'amount': dewies_to_lbc(amount)}
for (txid, nout, amount) in v]
elif isinstance(v, (list, dict)):