pull out metadata storage from the wallet

This commit is contained in:
Job Evers-Meltzer 2016-11-30 23:28:25 -06:00
parent 552ed77882
commit fc24d9b5aa
2 changed files with 145 additions and 65 deletions

View file

@ -18,12 +18,14 @@ from lbryum.lbrycrd import COIN, RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS
import lbryum.wallet
from lbryum.commands import known_commands, Commands
from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import UnknownNameError, InvalidStreamInfoError, RequestCanceledError
from lbrynet.db_migrator.migrate1to2 import UNSET_NOUT
from lbrynet.metadata.Metadata import Metadata
log = logging.getLogger(__name__)
alert = logging.getLogger("lbryalert." + __name__)
@ -57,6 +59,123 @@ class ClaimOutpoint(dict):
return not self.__eq__(compare)
class MetaDataStorage(object):
def load(self):
return defer.succeed(True)
def clean_bad_records(self):
return defer.succeed(True)
def save_name_metadata(self, name, claim_outpoint, sd_hash):
return defer.succeed(True)
def get_claim_metadata_for_sd_hash(self, sd_hash):
return defer.succeed(True)
def update_claimid(self, claim_id, name, claim_outpoint):
return defer.succeed(True)
def get_claimid_for_tx(self, name, claim_outpoint):
return defer.succeed(True)
class InMemoryStorage(MetaDataStorage):
def __init__(self):
self.metadata = {}
self.claimids = {}
MetaDataStorage.__init__(self)
def save_name_metadata(self, name, claim_outpoint, sd_hash):
self.metadata[sd_hash] = (name, claim_outpoint)
return defer.succeed(True)
def get_claim_metadata_for_sd_hash(self, sd_hash):
try:
name, claim_outpoint = self.metadata[sd_hash]
return defer.succeed((name, claim_outpoint['txid'], claim_outpoint['nout']))
except KeyError:
return defer.succeed(None)
def update_claimid(self, claim_id, name, claim_outpoint):
self.claimids[(name, claim_outpoint['txid'], claim_outpoint['nout'])] = claim_id
return defer.succeed(True)
def get_claimid_for_tx(self, name, claim_outpoint):
try:
return defer.succeed(
self.claimids[(name, claim_outpoint['txid'], claim_outpoint['nout'])])
except KeyError:
return defer.succeed(None)
class SqliteStorage(MetaDataStorage):
def __init__(self, db_dir):
self.db_dir = db_dir
self.db = None
MetaDataStorage.__init__(self)
def load(self):
self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"),
check_same_thread=False)
def create_tables(transaction):
transaction.execute("create table if not exists name_metadata (" +
" name text, " +
" txid text, " +
" n integer, " +
" sd_hash text)")
transaction.execute("create table if not exists claim_ids (" +
" claimId text, " +
" name text, " +
" txid text, " +
" n integer)")
return self.db.runInteraction(create_tables)
def clean_bad_records(self):
d = self.db.runQuery("delete from name_metadata where length(txid) > 64 or txid is null")
return d
def save_name_metadata(self, name, claim_outpoint, sd_hash):
d = self.db.runQuery(
"delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash))
d.addCallback(
lambda _: self.db.runQuery(
"delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], UNSET_NOUT, sd_hash)))
d.addCallback(
lambda _: self.db.runQuery(
"insert into name_metadata values (?, ?, ?, ?)",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash)))
return d
@rerun_if_locked
def get_claim_metadata_for_sd_hash(self, sd_hash):
d = self.db.runQuery("select name, txid, n from name_metadata where sd_hash=?", (sd_hash,))
d.addCallback(lambda r: r[0] if r else None)
return d
def update_claimid(self, claim_id, name, claim_outpoint):
d = self.db.runQuery(
"delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(
lambda _: self.db.runQuery(
"delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], UNSET_NOUT)))
d.addCallback(
lambda r: self.db.runQuery(
"insert into claim_ids values (?, ?, ?, ?)",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout'])))
d.addCallback(lambda _: claim_id)
return d
def get_claimid_for_tx(self, name, claim_outpoint):
d = self.db.runQuery(
"select claimId from claim_ids where name=? and txid=? and n=?",
(name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(lambda r: r[0][0] if r else None)
return d
class Wallet(object):
@ -67,9 +186,10 @@ class Wallet(object):
_FIRST_RUN_YES = 1
_FIRST_RUN_NO = 2
def __init__(self, db_dir):
self.db_dir = db_dir
self.db = None
def __init__(self, storage):
if not isinstance(storage, MetaDataStorage):
raise ValueError('storage must be an instance of MetaDataStorage')
self._storage = storage
self.next_manage_call = None
self.wallet_balance = Decimal(0.0)
self.total_reserved_points = Decimal(0.0)
@ -97,12 +217,27 @@ class Wallet(object):
self.manage()
return True
d = self._open_db()
d = self._storage.load()
d.addCallback(lambda _: self._clean_bad_records())
d.addCallback(lambda _: self._start())
d.addCallback(lambda _: start_manage())
return d
def _clean_bad_records(self):
self._storage.clean_bad_records()
def _save_name_metadata(self, name, claim_outpoint, sd_hash):
return self._storage.save_name_metadata(name, claim_outpoint, sd_hash)
def _get_claim_metadata_for_sd_hash(self, sd_hash):
return self._storage.get_claim_metadata_for_sd_hash(sd_hash)
def _update_claimid(self, claim_id, name, claim_outpoint):
return self._storage.update_claimid(claim_id, name, claim_outpoint)
def _get_claimid_for_tx(self, name, claim_outpoint):
return self._storage.get_claimid_for_tx(name, claim_outpoint)
@staticmethod
def log_stop_error(err):
log.error("An error occurred stopping the wallet: %s", err.getTraceback())
@ -645,62 +780,6 @@ class Wallet(object):
dl.addCallback(handle_checks)
return dl
def _open_db(self):
self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"),
check_same_thread=False)
def create_tables(transaction):
transaction.execute("create table if not exists name_metadata (" +
" name text, " +
" txid text, " +
" n integer, " +
" sd_hash text)")
transaction.execute("create table if not exists claim_ids (" +
" claimId text, " +
" name text, " +
" txid text, " +
" n integer)")
return self.db.runInteraction(create_tables)
def _clean_bad_records(self):
d = self.db.runQuery("delete from name_metadata where length(txid) > 64 or txid is null")
return d
def _save_name_metadata(self, name, claim_outpoint, sd_hash):
d = self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash))
d.addCallback(
lambda _: self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], UNSET_NOUT, sd_hash)))
d.addCallback(lambda _: self.db.runQuery("insert into name_metadata values (?, ?, ?, ?)",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash)))
return d
def _get_claim_metadata_for_sd_hash(self, sd_hash):
d = self.db.runQuery("select name, txid, n from name_metadata where sd_hash=?", (sd_hash,))
d.addCallback(lambda r: r[0] if r else None)
return d
def _update_claimid(self, claim_id, name, claim_outpoint):
d = self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(
lambda _: self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], UNSET_NOUT)))
d.addCallback(lambda r: self.db.runQuery("insert into claim_ids values (?, ?, ?, ?)",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout'])))
d.addCallback(lambda _: claim_id)
return d
def _get_claimid_for_tx(self, name, claim_outpoint):
d = self.db.runQuery("select claimId from claim_ids where name=? and txid=? and n=?", (name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(lambda r: r[0][0] if r else None)
return d
######### Must be overridden #########
def get_balance(self):
@ -765,9 +844,8 @@ class Wallet(object):
class LBRYumWallet(Wallet):
def __init__(self, db_dir, config=None):
Wallet.__init__(self, db_dir)
def __init__(self, storage, config=None):
Wallet.__init__(self, storage)
self._config = config
self.network = None
self.wallet = None

View file

@ -43,7 +43,7 @@ from lbrynet.core import system_info
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.Session import Session
from lbrynet.core.Wallet import LBRYumWallet
from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage
from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
@ -727,7 +727,9 @@ class Daemon(AuthJSONRPCServer):
config = {'auto_connect': True}
if conf.settings.lbryum_wallet_dir:
config['lbryum_path'] = conf.settings.lbryum_wallet_dir
return defer.succeed(LBRYumWallet(self.db_dir, config))
storage = SqliteStorage(self.db_dir)
wallet = LBRYumWallet(storage, config)
return defer.succeed(wallet)
elif self.wallet_type == PTC_WALLET:
log.info("Using PTC wallet")
from lbrynet.core.PTCWallet import PTCWallet