From fc24d9b5aacf086b2d5592f58ffae982c7d2a5b6 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 30 Nov 2016 23:28:25 -0600 Subject: [PATCH] pull out metadata storage from the wallet --- lbrynet/core/Wallet.py | 204 +++++++++++++++++++++---------- lbrynet/lbrynet_daemon/Daemon.py | 6 +- 2 files changed, 145 insertions(+), 65 deletions(-) diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 6af123df8..76c567ba0 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -18,12 +18,14 @@ from lbryum.lbrycrd import COIN, RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS import lbryum.wallet from lbryum.commands import known_commands, Commands +from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import UnknownNameError, InvalidStreamInfoError, RequestCanceledError from lbrynet.db_migrator.migrate1to2 import UNSET_NOUT from lbrynet.metadata.Metadata import Metadata + log = logging.getLogger(__name__) alert = logging.getLogger("lbryalert." + __name__) @@ -57,6 +59,123 @@ class ClaimOutpoint(dict): return not self.__eq__(compare) +class MetaDataStorage(object): + def load(self): + return defer.succeed(True) + + def clean_bad_records(self): + return defer.succeed(True) + + def save_name_metadata(self, name, claim_outpoint, sd_hash): + return defer.succeed(True) + + def get_claim_metadata_for_sd_hash(self, sd_hash): + return defer.succeed(True) + + def update_claimid(self, claim_id, name, claim_outpoint): + return defer.succeed(True) + + def get_claimid_for_tx(self, name, claim_outpoint): + return defer.succeed(True) + + +class InMemoryStorage(MetaDataStorage): + def __init__(self): + self.metadata = {} + self.claimids = {} + MetaDataStorage.__init__(self) + + def save_name_metadata(self, name, claim_outpoint, sd_hash): + self.metadata[sd_hash] = (name, claim_outpoint) + return defer.succeed(True) + + def get_claim_metadata_for_sd_hash(self, sd_hash): + try: + name, claim_outpoint = self.metadata[sd_hash] + return defer.succeed((name, claim_outpoint['txid'], claim_outpoint['nout'])) + except KeyError: + return defer.succeed(None) + + def update_claimid(self, claim_id, name, claim_outpoint): + self.claimids[(name, claim_outpoint['txid'], claim_outpoint['nout'])] = claim_id + return defer.succeed(True) + + def get_claimid_for_tx(self, name, claim_outpoint): + try: + return defer.succeed( + self.claimids[(name, claim_outpoint['txid'], claim_outpoint['nout'])]) + except KeyError: + return defer.succeed(None) + + +class SqliteStorage(MetaDataStorage): + def __init__(self, db_dir): + self.db_dir = db_dir + self.db = None + MetaDataStorage.__init__(self) + + def load(self): + self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"), + check_same_thread=False) + + def create_tables(transaction): + transaction.execute("create table if not exists name_metadata (" + + " name text, " + + " txid text, " + + " n integer, " + + " sd_hash text)") + transaction.execute("create table if not exists claim_ids (" + + " claimId text, " + + " name text, " + + " txid text, " + + " n integer)") + return self.db.runInteraction(create_tables) + + def clean_bad_records(self): + d = self.db.runQuery("delete from name_metadata where length(txid) > 64 or txid is null") + return d + + def save_name_metadata(self, name, claim_outpoint, sd_hash): + d = self.db.runQuery( + "delete from name_metadata where name=? and txid=? and n=? and sd_hash=?", + (name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash)) + d.addCallback( + lambda _: self.db.runQuery( + "delete from name_metadata where name=? and txid=? and n=? and sd_hash=?", + (name, claim_outpoint['txid'], UNSET_NOUT, sd_hash))) + d.addCallback( + lambda _: self.db.runQuery( + "insert into name_metadata values (?, ?, ?, ?)", + (name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash))) + return d + + @rerun_if_locked + def get_claim_metadata_for_sd_hash(self, sd_hash): + d = self.db.runQuery("select name, txid, n from name_metadata where sd_hash=?", (sd_hash,)) + d.addCallback(lambda r: r[0] if r else None) + return d + + def update_claimid(self, claim_id, name, claim_outpoint): + d = self.db.runQuery( + "delete from claim_ids where claimId=? and name=? and txid=? and n=?", + (claim_id, name, claim_outpoint['txid'], claim_outpoint['nout'])) + d.addCallback( + lambda _: self.db.runQuery( + "delete from claim_ids where claimId=? and name=? and txid=? and n=?", + (claim_id, name, claim_outpoint['txid'], UNSET_NOUT))) + d.addCallback( + lambda r: self.db.runQuery( + "insert into claim_ids values (?, ?, ?, ?)", + (claim_id, name, claim_outpoint['txid'], claim_outpoint['nout']))) + d.addCallback(lambda _: claim_id) + return d + + def get_claimid_for_tx(self, name, claim_outpoint): + d = self.db.runQuery( + "select claimId from claim_ids where name=? and txid=? and n=?", + (name, claim_outpoint['txid'], claim_outpoint['nout'])) + d.addCallback(lambda r: r[0][0] if r else None) + return d class Wallet(object): @@ -67,9 +186,10 @@ class Wallet(object): _FIRST_RUN_YES = 1 _FIRST_RUN_NO = 2 - def __init__(self, db_dir): - self.db_dir = db_dir - self.db = None + def __init__(self, storage): + if not isinstance(storage, MetaDataStorage): + raise ValueError('storage must be an instance of MetaDataStorage') + self._storage = storage self.next_manage_call = None self.wallet_balance = Decimal(0.0) self.total_reserved_points = Decimal(0.0) @@ -97,12 +217,27 @@ class Wallet(object): self.manage() return True - d = self._open_db() + d = self._storage.load() d.addCallback(lambda _: self._clean_bad_records()) d.addCallback(lambda _: self._start()) d.addCallback(lambda _: start_manage()) return d + def _clean_bad_records(self): + self._storage.clean_bad_records() + + def _save_name_metadata(self, name, claim_outpoint, sd_hash): + return self._storage.save_name_metadata(name, claim_outpoint, sd_hash) + + def _get_claim_metadata_for_sd_hash(self, sd_hash): + return self._storage.get_claim_metadata_for_sd_hash(sd_hash) + + def _update_claimid(self, claim_id, name, claim_outpoint): + return self._storage.update_claimid(claim_id, name, claim_outpoint) + + def _get_claimid_for_tx(self, name, claim_outpoint): + return self._storage.get_claimid_for_tx(name, claim_outpoint) + @staticmethod def log_stop_error(err): log.error("An error occurred stopping the wallet: %s", err.getTraceback()) @@ -645,62 +780,6 @@ class Wallet(object): dl.addCallback(handle_checks) return dl - def _open_db(self): - self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"), - check_same_thread=False) - - def create_tables(transaction): - transaction.execute("create table if not exists name_metadata (" + - " name text, " + - " txid text, " + - " n integer, " + - " sd_hash text)") - transaction.execute("create table if not exists claim_ids (" + - " claimId text, " + - " name text, " + - " txid text, " + - " n integer)") - - return self.db.runInteraction(create_tables) - - def _clean_bad_records(self): - d = self.db.runQuery("delete from name_metadata where length(txid) > 64 or txid is null") - return d - - - def _save_name_metadata(self, name, claim_outpoint, sd_hash): - d = self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?", - (name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash)) - d.addCallback( - lambda _: self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?", - (name, claim_outpoint['txid'], UNSET_NOUT, sd_hash))) - - d.addCallback(lambda _: self.db.runQuery("insert into name_metadata values (?, ?, ?, ?)", - (name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash))) - return d - - def _get_claim_metadata_for_sd_hash(self, sd_hash): - d = self.db.runQuery("select name, txid, n from name_metadata where sd_hash=?", (sd_hash,)) - d.addCallback(lambda r: r[0] if r else None) - return d - - def _update_claimid(self, claim_id, name, claim_outpoint): - d = self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?", - (claim_id, name, claim_outpoint['txid'], claim_outpoint['nout'])) - d.addCallback( - lambda _: self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?", - (claim_id, name, claim_outpoint['txid'], UNSET_NOUT))) - - d.addCallback(lambda r: self.db.runQuery("insert into claim_ids values (?, ?, ?, ?)", - (claim_id, name, claim_outpoint['txid'], claim_outpoint['nout']))) - d.addCallback(lambda _: claim_id) - return d - - def _get_claimid_for_tx(self, name, claim_outpoint): - d = self.db.runQuery("select claimId from claim_ids where name=? and txid=? and n=?", (name, claim_outpoint['txid'], claim_outpoint['nout'])) - d.addCallback(lambda r: r[0][0] if r else None) - return d - ######### Must be overridden ######### def get_balance(self): @@ -765,9 +844,8 @@ class Wallet(object): class LBRYumWallet(Wallet): - - def __init__(self, db_dir, config=None): - Wallet.__init__(self, db_dir) + def __init__(self, storage, config=None): + Wallet.__init__(self, storage) self._config = config self.network = None self.wallet = None diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 232d2f7b0..7bc77770b 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -43,7 +43,7 @@ from lbrynet.core import system_info from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader from lbrynet.core.Session import Session -from lbrynet.core.Wallet import LBRYumWallet +from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory @@ -727,7 +727,9 @@ class Daemon(AuthJSONRPCServer): config = {'auto_connect': True} if conf.settings.lbryum_wallet_dir: config['lbryum_path'] = conf.settings.lbryum_wallet_dir - return defer.succeed(LBRYumWallet(self.db_dir, config)) + storage = SqliteStorage(self.db_dir) + wallet = LBRYumWallet(storage, config) + return defer.succeed(wallet) elif self.wallet_type == PTC_WALLET: log.info("Using PTC wallet") from lbrynet.core.PTCWallet import PTCWallet