From ba8bd627a8454a6f45986b86c63da087341b2ac5 Mon Sep 17 00:00:00 2001 From: hackrush Date: Sat, 5 Jan 2019 00:17:05 +0530 Subject: [PATCH] return cached resolve result for 1 block --- lbrynet/extras/daemon/Components.py | 2 +- lbrynet/extras/daemon/migrator/dbmigrator.py | 2 + .../extras/daemon/migrator/migrate9to10.py | 11 +++++ lbrynet/extras/daemon/storage.py | 41 ++++++++++++----- lbrynet/extras/wallet/dewies.py | 2 +- lbrynet/extras/wallet/manager.py | 46 ++++++++++++++++++- lbrynet/extras/wallet/resolve.py | 8 +++- 7 files changed, 97 insertions(+), 15 deletions(-) create mode 100644 lbrynet/extras/daemon/migrator/migrate9to10.py diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 5514a4cae..c10edc3c3 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -86,7 +86,7 @@ class DatabaseComponent(Component): @staticmethod def get_current_db_revision(): - return 9 + return 10 @staticmethod def get_revision_filename(): diff --git a/lbrynet/extras/daemon/migrator/dbmigrator.py b/lbrynet/extras/daemon/migrator/dbmigrator.py index 5ec93712c..0d05ac614 100644 --- a/lbrynet/extras/daemon/migrator/dbmigrator.py +++ b/lbrynet/extras/daemon/migrator/dbmigrator.py @@ -22,6 +22,8 @@ def migrate_db(db_dir, start, end): from .migrate7to8 import do_migration elif current == 8: from .migrate8to9 import do_migration + elif current == 9: + from .migrate9to10 import do_migration else: raise Exception("DB migration of version {} to {} is not available".format(current, current+1)) diff --git a/lbrynet/extras/daemon/migrator/migrate9to10.py b/lbrynet/extras/daemon/migrator/migrate9to10.py new file mode 100644 index 000000000..9ef08bb39 --- /dev/null +++ b/lbrynet/extras/daemon/migrator/migrate9to10.py @@ -0,0 +1,11 @@ +import sqlite3 +import os + + +def do_migration(db_dir): + db_path = os.path.join(db_dir, "lbrynet.sqlite") + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + cursor.executescript("ALTER TABLE claim ADD resolved_at int DEFAULT 0 NOT NULL;") + connection.commit() + connection.close() diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 948029309..3956b5fe4 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -12,6 +12,7 @@ from lbrynet.schema.claim import ClaimDict from lbrynet.schema.decode import smart_decode from lbrynet.blob.CryptBlob import CryptBlobInfo from lbrynet.dht.constants import dataExpireTimeout +from lbrynet.schema.uri import parse_lbry_uri log = logging.getLogger(__name__) @@ -386,8 +387,8 @@ class SQLiteStorage: def _store_stream(transaction): transaction.execute("insert into stream values (?, ?, ?, ?, ?);", - (stream_hash, sd_hash, stream_key, stream_name, - suggested_file_name)) + (stream_hash, sd_hash, stream_key, stream_name, + suggested_file_name)) for blob_info in stream_blob_infos: transaction.execute("insert into stream_blob values (?, ?, ?, ?)", @@ -568,13 +569,11 @@ class SQLiteStorage: return self.db.runInteraction(_save_support) def get_supports(self, *claim_ids): - def _format_support(outpoint, supported_id, amount, address): + def _format_support(outpoint, amount): return { "txid": outpoint.split(":")[0], "nout": int(outpoint.split(":")[1]), - "claim_id": supported_id, "amount": dewies_to_lbc(amount), - "address": address, } def _get_supports(transaction): @@ -582,7 +581,7 @@ class SQLiteStorage: _format_support(*support_info) for support_info in _batched_select( transaction, - "select * from support where claim_id in {}", + "select support_outpoint, amount from support where claim_id in {}", tuple(claim_ids) ) ] @@ -604,6 +603,7 @@ class SQLiteStorage: height = claim_info['height'] address = claim_info['address'] sequence = claim_info['claim_sequence'] + resolved_at = claim_info['height'] + claim_info['depth'] try: certificate_id = claim_info['value'].get('publisherSignature', {}).get('certificateId') except AttributeError: @@ -617,8 +617,9 @@ class SQLiteStorage: source_hash = None serialized = claim_info.get('hex') or hexlify(smart_decode(claim_info['value']).serialized) transaction.execute( - "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", - (outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence) + "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence, + resolved_at) ) if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing # support info @@ -794,6 +795,24 @@ class SQLiteStorage: claims[stream_hash] = claim defer.returnValue(claims) + def get_resolved_at_for_outpoint(self, outpoint): + return self.run_and_return_one_or_none( + "select resolved_at from claim " + "where claim_outpoint = ?", + outpoint + ) + + def get_outpoint_for_uri(self, uri): + uri = parse_lbry_uri(uri) + claim_id_param = "%" if not uri.claim_id else uri.claim_id + return self.run_and_return_one_or_none( + "select claim_outpoint from claim " + "where claim_name=? and claim_id like ?" + "order by height desc, amount desc " + "limit 1", + uri.name, claim_id_param + ) + @defer.inlineCallbacks def get_claim(self, claim_outpoint, include_supports=True): def _get_claim(transaction): @@ -872,19 +891,19 @@ class SQLiteStorage: # Helper functions -def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence): +def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, _channel_id, address, claim_sequence, + _resolved_at): r = { "name": name, "claim_id": claim_id, "address": address, "claim_sequence": claim_sequence, + "hex": serialized, "value": ClaimDict.deserialize(unhexlify(serialized)).claim_dict, "height": height, "amount": dewies_to_lbc(amount), "nout": int(outpoint.split(":")[1]), "txid": outpoint.split(":")[0], - "channel_claim_id": channel_id, - "channel_name": None } return r diff --git a/lbrynet/extras/wallet/dewies.py b/lbrynet/extras/wallet/dewies.py index 3e70c1da9..7100dac3a 100644 --- a/lbrynet/extras/wallet/dewies.py +++ b/lbrynet/extras/wallet/dewies.py @@ -30,4 +30,4 @@ def lbc_to_dewies(lbc: str) -> int: def dewies_to_lbc(dewies) -> str: - return satoshis_to_coins(dewies) + return dewies if isinstance(dewies, str) else satoshis_to_coins(dewies) diff --git a/lbrynet/extras/wallet/manager.py b/lbrynet/extras/wallet/manager.py index 038e7dd87..e85711579 100644 --- a/lbrynet/extras/wallet/manager.py +++ b/lbrynet/extras/wallet/manager.py @@ -10,6 +10,7 @@ from typing import Optional from twisted.internet import defer from lbrynet.schema.schema import SECP256k1 +from lbrynet.schema.uri import parse_lbry_uri from torba.client.basemanager import BaseWalletManager from torba.rpc.jsonrpc import CodeMessageError @@ -25,6 +26,10 @@ from lbrynet.extras.wallet.dewies import dewies_to_lbc log = logging.getLogger(__name__) +def d2f(deferred): + return deferred.asFuture(asyncio.get_event_loop()) + + class ReservedPoints: def __init__(self, identifier, amount): self.identifier = identifier @@ -268,16 +273,55 @@ class LbryWalletManager(BaseWalletManager): def get_info_exchanger(self): return LBRYcrdAddressRequester(self) + async def fetch_from_cache(self, uri): + resolver = self.ledger.resolver + is_channel = parse_lbry_uri(uri).is_channel + + claim_outpoint = await d2f(self.old_db.get_outpoint_for_uri(uri)) + last_resolved_at = await d2f(self.old_db.get_resolved_at_for_outpoint(claim_outpoint)) + + if not last_resolved_at or resolver.height - last_resolved_at > 1: + return False + + log.info("trying to fetch %s from cache", uri) + claim = await d2f(self.old_db.get_claim(claim_outpoint)) + claim['depth'] = resolver.height - claim['height'] + result = {"claim": claim} + + if claim.get("channel_name", False): + certificate_uri = "{}#{}".format(claim.get("channel_name", ""), + claim['value'].get('publisherSignature', {}).get('certificateId')) + certificate_outpoint = await d2f(self.old_db.get_outpoint_for_uri(certificate_uri)) + certificate = await d2f(self.old_db.get_claim(certificate_outpoint)) + certificate['depth'] = resolver.height - certificate['height'] + result["certificate"] = await resolver.parse_and_validate_claim_result(certificate) + + result["claim"] = await resolver.parse_and_validate_claim_result(result["claim"], + certificate=result.get( + "certificate", None)) + if is_channel: + result = {"certificate": result["claim"]} + return result + async def resolve(self, *uris, **kwargs): page = kwargs.get('page', 0) page_size = kwargs.get('page_size', 10) check_cache = kwargs.get('check_cache', False) # TODO: put caching back (was force_refresh parameter) + + cached_results = dict() + needed = list() + if check_cache: + for uri in uris: + cached_results[uri] = await self.fetch_from_cache(uri) + needed = [key for key in cached_results if not cached_results[key]] + ledger: MainNetLedger = self.default_account.ledger - results = await ledger.resolve(page, page_size, *uris) + results = await ledger.resolve(page, page_size, *needed) if 'error' not in results: await self.old_db.save_claims_for_resolve([ value for value in results.values() if 'error' not in value ]).asFuture(asyncio.get_event_loop()) + results = {**cached_results, **results} return results async def get_claims_for_name(self, name: str): diff --git a/lbrynet/extras/wallet/resolve.py b/lbrynet/extras/wallet/resolve.py index c27446a10..a42eb8e29 100644 --- a/lbrynet/extras/wallet/resolve.py +++ b/lbrynet/extras/wallet/resolve.py @@ -180,7 +180,8 @@ class Resolver: if decoded.has_signature: if certificate is None: log.info("fetching certificate to check claim signature") - certificate = await self.network.get_claims_by_ids(decoded.certificate_id.decode()) + certificate_id = decoded.certificate_id.decode() + certificate = (await self.network.get_claims_by_ids(certificate_id)).get(certificate_id, {}) if not certificate: log.warning('Certificate %s not found', decoded.certificate_id) claim_result['has_signature'] = True @@ -298,6 +299,11 @@ def format_amount_value(obj): if not isinstance(obj[k], float): obj[k] = dewies_to_lbc(obj[k]) elif k == 'supports' and isinstance(v, list): + # supports are already in the desired format + # the following 2 lines are bad as supports should be passed in a consistent manner + # but it is unavoidable at the moment because of so much other code linked to this :-( + if v and isinstance(v[0], dict): + continue obj[k] = [{'txid': txid, 'nout': nout, 'amount': dewies_to_lbc(amount)} for (txid, nout, amount) in v] elif isinstance(v, (list, dict)):