Merge pull request #363 from lbryio/wallet-storage

Wallet storage
This commit is contained in:
Job Evers‐Meltzer 2016-12-30 11:41:38 -06:00 committed by GitHub
commit 4574e3317c
4 changed files with 175 additions and 102 deletions

View file

@ -18,12 +18,14 @@ from lbryum.lbrycrd import COIN, RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS
import lbryum.wallet import lbryum.wallet
from lbryum.commands import known_commands, Commands from lbryum.commands import known_commands, Commands
from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import UnknownNameError, InvalidStreamInfoError, RequestCanceledError from lbrynet.core.Error import UnknownNameError, InvalidStreamInfoError, RequestCanceledError
from lbrynet.db_migrator.migrate1to2 import UNSET_NOUT from lbrynet.db_migrator.migrate1to2 import UNSET_NOUT
from lbrynet.metadata.Metadata import Metadata from lbrynet.metadata.Metadata import Metadata
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
alert = logging.getLogger("lbryalert." + __name__) alert = logging.getLogger("lbryalert." + __name__)
@ -57,6 +59,123 @@ class ClaimOutpoint(dict):
return not self.__eq__(compare) return not self.__eq__(compare)
class MetaDataStorage(object):
def load(self):
return defer.succeed(True)
def clean_bad_records(self):
return defer.succeed(True)
def save_name_metadata(self, name, claim_outpoint, sd_hash):
return defer.succeed(True)
def get_claim_metadata_for_sd_hash(self, sd_hash):
return defer.succeed(True)
def update_claimid(self, claim_id, name, claim_outpoint):
return defer.succeed(True)
def get_claimid_for_tx(self, name, claim_outpoint):
return defer.succeed(True)
class InMemoryStorage(MetaDataStorage):
def __init__(self):
self.metadata = {}
self.claimids = {}
MetaDataStorage.__init__(self)
def save_name_metadata(self, name, claim_outpoint, sd_hash):
self.metadata[sd_hash] = (name, claim_outpoint)
return defer.succeed(True)
def get_claim_metadata_for_sd_hash(self, sd_hash):
try:
name, claim_outpoint = self.metadata[sd_hash]
return defer.succeed((name, claim_outpoint['txid'], claim_outpoint['nout']))
except KeyError:
return defer.succeed(None)
def update_claimid(self, claim_id, name, claim_outpoint):
self.claimids[(name, claim_outpoint['txid'], claim_outpoint['nout'])] = claim_id
return defer.succeed(True)
def get_claimid_for_tx(self, name, claim_outpoint):
try:
return defer.succeed(
self.claimids[(name, claim_outpoint['txid'], claim_outpoint['nout'])])
except KeyError:
return defer.succeed(None)
class SqliteStorage(MetaDataStorage):
def __init__(self, db_dir):
self.db_dir = db_dir
self.db = None
MetaDataStorage.__init__(self)
def load(self):
self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"),
check_same_thread=False)
def create_tables(transaction):
transaction.execute("create table if not exists name_metadata (" +
" name text, " +
" txid text, " +
" n integer, " +
" sd_hash text)")
transaction.execute("create table if not exists claim_ids (" +
" claimId text, " +
" name text, " +
" txid text, " +
" n integer)")
return self.db.runInteraction(create_tables)
def clean_bad_records(self):
d = self.db.runQuery("delete from name_metadata where length(txid) > 64 or txid is null")
return d
def save_name_metadata(self, name, claim_outpoint, sd_hash):
d = self.db.runQuery(
"delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash))
d.addCallback(
lambda _: self.db.runQuery(
"delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], UNSET_NOUT, sd_hash)))
d.addCallback(
lambda _: self.db.runQuery(
"insert into name_metadata values (?, ?, ?, ?)",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash)))
return d
@rerun_if_locked
def get_claim_metadata_for_sd_hash(self, sd_hash):
d = self.db.runQuery("select name, txid, n from name_metadata where sd_hash=?", (sd_hash,))
d.addCallback(lambda r: r[0] if r else None)
return d
def update_claimid(self, claim_id, name, claim_outpoint):
d = self.db.runQuery(
"delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(
lambda _: self.db.runQuery(
"delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], UNSET_NOUT)))
d.addCallback(
lambda r: self.db.runQuery(
"insert into claim_ids values (?, ?, ?, ?)",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout'])))
d.addCallback(lambda _: claim_id)
return d
def get_claimid_for_tx(self, name, claim_outpoint):
d = self.db.runQuery(
"select claimId from claim_ids where name=? and txid=? and n=?",
(name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(lambda r: r[0][0] if r else None)
return d
class Wallet(object): class Wallet(object):
@ -67,9 +186,10 @@ class Wallet(object):
_FIRST_RUN_YES = 1 _FIRST_RUN_YES = 1
_FIRST_RUN_NO = 2 _FIRST_RUN_NO = 2
def __init__(self, db_dir): def __init__(self, storage):
self.db_dir = db_dir if not isinstance(storage, MetaDataStorage):
self.db = None raise ValueError('storage must be an instance of MetaDataStorage')
self._storage = storage
self.next_manage_call = None self.next_manage_call = None
self.wallet_balance = Decimal(0.0) self.wallet_balance = Decimal(0.0)
self.total_reserved_points = Decimal(0.0) self.total_reserved_points = Decimal(0.0)
@ -97,12 +217,27 @@ class Wallet(object):
self.manage() self.manage()
return True return True
d = self._open_db() d = self._storage.load()
d.addCallback(lambda _: self._clean_bad_records()) d.addCallback(lambda _: self._clean_bad_records())
d.addCallback(lambda _: self._start()) d.addCallback(lambda _: self._start())
d.addCallback(lambda _: start_manage()) d.addCallback(lambda _: start_manage())
return d return d
def _clean_bad_records(self):
self._storage.clean_bad_records()
def _save_name_metadata(self, name, claim_outpoint, sd_hash):
return self._storage.save_name_metadata(name, claim_outpoint, sd_hash)
def _get_claim_metadata_for_sd_hash(self, sd_hash):
return self._storage.get_claim_metadata_for_sd_hash(sd_hash)
def _update_claimid(self, claim_id, name, claim_outpoint):
return self._storage.update_claimid(claim_id, name, claim_outpoint)
def _get_claimid_for_tx(self, name, claim_outpoint):
return self._storage.get_claimid_for_tx(name, claim_outpoint)
@staticmethod @staticmethod
def log_stop_error(err): def log_stop_error(err):
log.error("An error occurred stopping the wallet: %s", err.getTraceback()) log.error("An error occurred stopping the wallet: %s", err.getTraceback())
@ -645,62 +780,6 @@ class Wallet(object):
dl.addCallback(handle_checks) dl.addCallback(handle_checks)
return dl return dl
def _open_db(self):
self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"),
check_same_thread=False)
def create_tables(transaction):
transaction.execute("create table if not exists name_metadata (" +
" name text, " +
" txid text, " +
" n integer, " +
" sd_hash text)")
transaction.execute("create table if not exists claim_ids (" +
" claimId text, " +
" name text, " +
" txid text, " +
" n integer)")
return self.db.runInteraction(create_tables)
def _clean_bad_records(self):
d = self.db.runQuery("delete from name_metadata where length(txid) > 64 or txid is null")
return d
def _save_name_metadata(self, name, claim_outpoint, sd_hash):
d = self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash))
d.addCallback(
lambda _: self.db.runQuery("delete from name_metadata where name=? and txid=? and n=? and sd_hash=?",
(name, claim_outpoint['txid'], UNSET_NOUT, sd_hash)))
d.addCallback(lambda _: self.db.runQuery("insert into name_metadata values (?, ?, ?, ?)",
(name, claim_outpoint['txid'], claim_outpoint['nout'], sd_hash)))
return d
def _get_claim_metadata_for_sd_hash(self, sd_hash):
d = self.db.runQuery("select name, txid, n from name_metadata where sd_hash=?", (sd_hash,))
d.addCallback(lambda r: r[0] if r else None)
return d
def _update_claimid(self, claim_id, name, claim_outpoint):
d = self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(
lambda _: self.db.runQuery("delete from claim_ids where claimId=? and name=? and txid=? and n=?",
(claim_id, name, claim_outpoint['txid'], UNSET_NOUT)))
d.addCallback(lambda r: self.db.runQuery("insert into claim_ids values (?, ?, ?, ?)",
(claim_id, name, claim_outpoint['txid'], claim_outpoint['nout'])))
d.addCallback(lambda _: claim_id)
return d
def _get_claimid_for_tx(self, name, claim_outpoint):
d = self.db.runQuery("select claimId from claim_ids where name=? and txid=? and n=?", (name, claim_outpoint['txid'], claim_outpoint['nout']))
d.addCallback(lambda r: r[0][0] if r else None)
return d
######### Must be overridden ######### ######### Must be overridden #########
def get_balance(self): def get_balance(self):
@ -765,9 +844,8 @@ class Wallet(object):
class LBRYumWallet(Wallet): class LBRYumWallet(Wallet):
def __init__(self, storage, config=None):
def __init__(self, db_dir, config=None): Wallet.__init__(self, storage)
Wallet.__init__(self, db_dir)
self._config = config self._config = config
self.network = None self.network = None
self.wallet = None self.wallet = None

View file

@ -43,7 +43,7 @@ from lbrynet.core import system_info
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.Wallet import LBRYumWallet from lbrynet.core.Wallet import LBRYumWallet, SqliteStorage
from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
@ -290,13 +290,10 @@ class Daemon(AuthJSONRPCServer):
self.lbry_file_metadata_manager = None self.lbry_file_metadata_manager = None
self.lbry_file_manager = None self.lbry_file_manager = None
@defer.inlineCallbacks
def setup(self): def setup(self):
self._modify_loggly_formatter() self._modify_loggly_formatter()
def _log_starting_vals():
log.info("Starting balance: " + str(self.session.wallet.wallet_balance))
return defer.succeed(None)
def _announce_startup(): def _announce_startup():
def _wait_for_credits(): def _wait_for_credits():
if float(self.session.wallet.wallet_balance) == 0.0: if float(self.session.wallet.wallet_balance) == 0.0:
@ -330,26 +327,23 @@ class Daemon(AuthJSONRPCServer):
self.looping_call_manager.start(Checker.CONNECTION_PROBLEM, 1) self.looping_call_manager.start(Checker.CONNECTION_PROBLEM, 1)
self.exchange_rate_manager.start() self.exchange_rate_manager.start()
d = defer.Deferred()
if conf.settings.host_ui: if conf.settings.host_ui:
self.lbry_ui_manager.update_checker.start(1800, now=False) self.lbry_ui_manager.update_checker.start(1800, now=False)
d.addCallback(lambda _: self.lbry_ui_manager.setup()) yield self.lbry_ui_manager.setup()
d.addCallback(lambda _: self._initial_setup()) yield self._initial_setup()
d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory)) yield threads.deferToThread(self._setup_data_directory)
d.addCallback(lambda _: self._check_db_migration()) yield self._check_db_migration()
d.addCallback(lambda _: self._load_caches()) yield self._load_caches()
d.addCallback(lambda _: self._set_events()) yield self._set_events()
d.addCallback(lambda _: self._get_session()) yield self._get_session()
d.addCallback(lambda _: self._get_analytics()) yield self._get_analytics()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) yield add_lbry_file_to_sd_identifier(self.sd_identifier)
d.addCallback(lambda _: self._setup_stream_identifier()) yield self._setup_stream_identifier()
d.addCallback(lambda _: self._setup_lbry_file_manager()) yield self._setup_lbry_file_manager()
d.addCallback(lambda _: self._setup_query_handlers()) yield self._setup_query_handlers()
d.addCallback(lambda _: self._setup_server()) yield self._setup_server()
d.addCallback(lambda _: _log_starting_vals()) log.info("Starting balance: " + str(self.session.wallet.wallet_balance))
d.addCallback(lambda _: _announce_startup()) yield _announce_startup()
d.callback(None)
return d
def _get_platform(self): def _get_platform(self):
if self.platform is None: if self.platform is None:
@ -727,7 +721,9 @@ class Daemon(AuthJSONRPCServer):
config = {'auto_connect': True} config = {'auto_connect': True}
if conf.settings.lbryum_wallet_dir: if conf.settings.lbryum_wallet_dir:
config['lbryum_path'] = conf.settings.lbryum_wallet_dir config['lbryum_path'] = conf.settings.lbryum_wallet_dir
return defer.succeed(LBRYumWallet(self.db_dir, config)) storage = SqliteStorage(self.db_dir)
wallet = LBRYumWallet(storage, config)
return defer.succeed(wallet)
elif self.wallet_type == PTC_WALLET: elif self.wallet_type == PTC_WALLET:
log.info("Using PTC wallet") log.info("Using PTC wallet")
from lbrynet.core.PTCWallet import PTCWallet from lbrynet.core.PTCWallet import PTCWallet
@ -754,7 +750,6 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(create_session) d.addCallback(create_session)
d.addCallback(lambda _: self.session.setup()) d.addCallback(lambda _: self.session.setup())
return d return d
def _setup_stream_identifier(self): def _setup_stream_identifier(self):

View file

@ -117,11 +117,7 @@ def update_settings_from_args(args):
settings.update(to_pass) settings.update(to_pass)
def kill(failure, analytics_manager): @defer.inlineCallbacks
analytics_manager.send_server_startup_error(failure.getErrorMessage() + " " + str(failure))
reactor.callFromThread(reactor.stop)
def start_server_and_listen(launchui, use_auth, analytics_manager): def start_server_and_listen(launchui, use_auth, analytics_manager):
"""The primary entry point for launching the daemon. """The primary entry point for launching the daemon.
@ -130,12 +126,16 @@ def start_server_and_listen(launchui, use_auth, analytics_manager):
use_auth: set to true to enable http authentication use_auth: set to true to enable http authentication
analytics_manager: to send analytics analytics_manager: to send analytics
""" """
daemon_server = DaemonServer(analytics_manager) try:
d = daemon_server.start(use_auth) daemon_server = DaemonServer(analytics_manager)
if launchui: yield daemon_server.start(use_auth)
d.addCallback(lambda _: webbrowser.open(settings.UI_ADDRESS)) if launchui:
d.addCallback(lambda _: analytics_manager.send_server_startup_success()) yield webbrowser.open(settings.UI_ADDRESS)
d.addErrback(log.fail(kill, analytics_manager), 'Failed to startup') yield analytics_manager.send_server_startup_success()
except Exception as e:
log.exception('Failed to startup')
analytics_manager.send_server_startup_error(str(e))
reactor.callFromThread(reactor.stop)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -41,10 +41,10 @@ class DaemonServer(object):
return defer.succeed(True) return defer.succeed(True)
@defer.inlineCallbacks
def start(self, use_auth): def start(self, use_auth):
d = self._setup_server(use_auth) yield self._setup_server(use_auth)
d.addCallback(lambda _: self._api.setup()) yield self._api.setup()
return d
def get_site_base(use_auth, root): def get_site_base(use_auth, root):