lbcwallet/wallet/notifications.go
Josh Rickmar 497ffc11f0 Modernize the RPC server.
This is a rather monolithic commit that moves the old RPC server to
its own package (rpc/legacyrpc), introduces a new RPC server using
gRPC (rpc/rpcserver), and provides the ability to defer wallet loading
until request at a later time by an RPC (--noinitialload).

The legacy RPC server remains the default for now while the new gRPC
server is not enabled by default.  Enabling the new server requires
setting a listen address (--experimenalrpclisten).  This experimental
flag is used to effectively feature gate the server until it is ready
to use as a default.  Both RPC servers can be run at the same time,
but require binding to different listen addresses.

In theory, with the legacy RPC server now living in its own package it
should become much easier to unit test the handlers.  This will be
useful for any future changes to the package, as compatibility with
Core's wallet is still desired.

Type safety has also been improved in the legacy RPC server.  Multiple
handler types are now used for methods that do and do not require the
RPC client as a dependency.  This can statically help prevent nil
pointer dereferences, and was very useful for catching bugs during
refactoring.

To synchronize the wallet loading process between the main package
(the default) and through the gRPC WalletLoader service (with the
--noinitialload option), as well as increasing the loose coupling of
packages, a new wallet.Loader type has been added.  All creating and
loading of existing wallets is done through a single Loader instance,
and callbacks can be attached to the instance to run after the wallet
has been opened.  This is how the legacy RPC server is associated with
a loaded wallet, even after the wallet is loaded by a gRPC method in a
completely unrelated package.

Documentation for the new RPC server has been added to the
rpc/documentation directory.  The documentation includes a
specification for the new RPC API, addresses how to make changes to
the server implementation, and provides short example clients in
several different languages.

Some of the new RPC methods are not implementated exactly as described
by the specification.  These are considered bugs with the
implementation, not the spec.  Known bugs are commented as such.
2016-01-29 11:18:26 -05:00

652 lines
20 KiB
Go

// Copyright (c) 2015 The btcsuite developers
//
// Permission to use, copy, modify, and distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
package wallet
import (
"bytes"
"sync"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
)
// TODO: It would be good to send errors during notification creation to the rpc
// server instead of just logging them here so the client is aware that wallet
// isn't working correctly and notifications are missing.
// TODO: Anything dealing with accounts here is expensive because the database
// is not organized correctly for true account support, but do the slow thing
// instead of the easy thing since the db can be fixed later, and we want the
// api correct now.
// NotificationServer is a server that interested clients may hook into to
// receive notifications of changes in a wallet. A client is created for each
// registered notification. Clients are guaranteed to receive messages in the
// order wallet created them, but there is no guaranteed synchronization between
// different clients.
type NotificationServer struct {
transactions []chan *TransactionNotifications
currentTxNtfn *TransactionNotifications // coalesce this since wallet does not add mined txs together
spentness map[uint32][]chan *SpentnessNotifications
accountClients []chan *AccountNotification
mu sync.Mutex // Only protects registered client channels
wallet *Wallet // smells like hacks
}
func newNotificationServer(wallet *Wallet) *NotificationServer {
return &NotificationServer{
spentness: make(map[uint32][]chan *SpentnessNotifications),
wallet: wallet,
}
}
func lookupInputAccount(w *Wallet, details *wtxmgr.TxDetails, deb wtxmgr.DebitRecord) uint32 {
// TODO: Debits should record which account(s?) they
// debit from so this doesn't need to be looked up.
prevOP := &details.MsgTx.TxIn[deb.Index].PreviousOutPoint
prev, err := w.TxStore.TxDetails(&prevOP.Hash)
if err != nil {
log.Errorf("Cannot query previous transaction details for %v: %v", prevOP.Hash, err)
return 0
}
if prev == nil {
log.Errorf("Missing previous transaction %v", prevOP.Hash)
return 0
}
prevOut := prev.MsgTx.TxOut[prevOP.Index]
_, addrs, _, err := txscript.ExtractPkScriptAddrs(prevOut.PkScript, w.chainParams)
var inputAcct uint32
if err == nil && len(addrs) > 0 {
inputAcct, err = w.Manager.AddrAccount(addrs[0])
}
if err != nil {
log.Errorf("Cannot fetch account for previous output %v: %v", prevOP, err)
inputAcct = 0
}
return inputAcct
}
func lookupOutputChain(w *Wallet, details *wtxmgr.TxDetails, cred wtxmgr.CreditRecord) (account uint32, internal bool) {
output := details.MsgTx.TxOut[cred.Index]
_, addrs, _, err := txscript.ExtractPkScriptAddrs(output.PkScript, w.chainParams)
var ma waddrmgr.ManagedAddress
if err == nil && len(addrs) > 0 {
ma, err = w.Manager.Address(addrs[0])
}
if err != nil {
log.Errorf("Cannot fetch account for wallet output: %v", err)
} else {
account = ma.Account()
internal = ma.Internal()
}
return
}
func makeTxSummary(w *Wallet, details *wtxmgr.TxDetails) TransactionSummary {
serializedTx := details.SerializedTx
if serializedTx == nil {
var buf bytes.Buffer
err := details.MsgTx.Serialize(&buf)
if err != nil {
log.Errorf("Transaction serialization: %v", err)
}
serializedTx = buf.Bytes()
}
var fee btcutil.Amount
if len(details.Debits) == len(details.MsgTx.TxIn) {
for _, deb := range details.Debits {
fee += deb.Amount
}
for _, txOut := range details.MsgTx.TxOut {
fee -= btcutil.Amount(txOut.Value)
}
}
var inputs []TransactionSummaryInput
if len(details.Debits) != 0 {
inputs = make([]TransactionSummaryInput, len(details.Debits))
for i, d := range details.Debits {
inputs[i] = TransactionSummaryInput{
Index: d.Index,
PreviousAccount: lookupInputAccount(w, details, d),
PreviousAmount: d.Amount,
}
}
}
outputs := make([]TransactionSummaryOutput, 0, len(details.MsgTx.TxOut))
var credIndex int
for i, txOut := range details.MsgTx.TxOut {
mine := len(details.Credits) > credIndex && details.Credits[credIndex].Index == uint32(i)
if !mine && len(details.Debits) == 0 {
continue
}
output := TransactionSummaryOutput{
Index: uint32(i),
Amount: btcutil.Amount(txOut.Value),
Mine: mine,
}
if mine {
acct, internal := lookupOutputChain(w, details, details.Credits[credIndex])
output.Account = acct
output.Internal = internal
credIndex++
} else {
_, addresses, _, err := txscript.ExtractPkScriptAddrs(txOut.PkScript, w.chainParams)
if err == nil {
output.Addresses = addresses
}
}
outputs = append(outputs, output)
}
return TransactionSummary{
Hash: &details.Hash,
Transaction: serializedTx,
MyInputs: inputs,
MyOutputs: outputs,
Fee: fee,
Timestamp: details.Received.Unix(),
}
}
func totalBalances(w *Wallet, m map[uint32]btcutil.Amount) error {
unspent, err := w.TxStore.UnspentOutputs()
if err != nil {
return err
}
for i := range unspent {
output := &unspent[i]
var outputAcct uint32
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
output.PkScript, w.chainParams)
if err == nil && len(addrs) > 0 {
outputAcct, err = w.Manager.AddrAccount(addrs[0])
}
if err == nil {
_, ok := m[outputAcct]
if ok {
m[outputAcct] += output.Amount
}
}
}
return nil
}
func flattenBalanceMap(m map[uint32]btcutil.Amount) []AccountBalance {
s := make([]AccountBalance, 0, len(m))
for k, v := range m {
s = append(s, AccountBalance{Account: k, TotalBalance: v})
}
return s
}
func relevantAccounts(w *Wallet, m map[uint32]btcutil.Amount, txs []TransactionSummary) {
for _, tx := range txs {
for _, d := range tx.MyInputs {
m[d.PreviousAccount] = 0
}
for _, c := range tx.MyOutputs {
m[c.Account] = 0
}
}
}
func (s *NotificationServer) notifyUnminedTransaction(details *wtxmgr.TxDetails) {
// Sanity check: should not be currently coalescing a notification for
// mined transactions at the same time that an unmined tx is notified.
if s.currentTxNtfn != nil {
log.Errorf("Notifying unmined tx notification while creating notification for blocks")
}
defer s.mu.Unlock()
s.mu.Lock()
clients := s.transactions
if len(clients) == 0 {
return
}
unminedTxs := []TransactionSummary{makeTxSummary(s.wallet, details)}
unminedHashes, err := s.wallet.TxStore.UnminedTxHashes()
if err != nil {
log.Errorf("Cannot fetch unmined transaction hashes: %v", err)
return
}
bals := make(map[uint32]btcutil.Amount)
relevantAccounts(s.wallet, bals, unminedTxs)
err = totalBalances(s.wallet, bals)
if err != nil {
log.Errorf("Cannot determine balances for relevant accounts: %v", err)
return
}
n := &TransactionNotifications{
UnminedTransactions: unminedTxs,
UnminedTransactionHashes: unminedHashes,
NewBalances: flattenBalanceMap(bals),
}
for _, c := range clients {
c <- n
}
}
func (s *NotificationServer) notifyDetachedBlock(hash *wire.ShaHash) {
if s.currentTxNtfn == nil {
s.currentTxNtfn = &TransactionNotifications{}
}
s.currentTxNtfn.DetachedBlocks = append(s.currentTxNtfn.DetachedBlocks, hash)
}
func (s *NotificationServer) notifyMinedTransaction(details *wtxmgr.TxDetails, block *wtxmgr.BlockMeta) {
if s.currentTxNtfn == nil {
s.currentTxNtfn = &TransactionNotifications{}
}
n := len(s.currentTxNtfn.AttachedBlocks)
if n == 0 || *s.currentTxNtfn.AttachedBlocks[n-1].Hash != block.Hash {
s.currentTxNtfn.AttachedBlocks = append(s.currentTxNtfn.AttachedBlocks, Block{
Hash: &block.Hash,
Height: block.Height,
Timestamp: block.Time.Unix(),
})
n++
}
txs := s.currentTxNtfn.AttachedBlocks[n-1].Transactions
s.currentTxNtfn.AttachedBlocks[n-1].Transactions = append(txs, makeTxSummary(s.wallet, details))
}
func (s *NotificationServer) notifyAttachedBlock(block *wtxmgr.BlockMeta) {
if s.currentTxNtfn == nil {
s.currentTxNtfn = &TransactionNotifications{}
}
// Add block details if it wasn't already included for previously
// notified mined transactions.
n := len(s.currentTxNtfn.AttachedBlocks)
if n == 0 || *s.currentTxNtfn.AttachedBlocks[n-1].Hash != block.Hash {
s.currentTxNtfn.AttachedBlocks = append(s.currentTxNtfn.AttachedBlocks, Block{
Hash: &block.Hash,
Height: block.Height,
Timestamp: block.Time.Unix(),
})
}
// For now (until notification coalescing isn't necessary) just use
// chain length to determine if this is the new best block.
if s.wallet.ChainSynced() {
if len(s.currentTxNtfn.DetachedBlocks) >= len(s.currentTxNtfn.AttachedBlocks) {
return
}
}
defer s.mu.Unlock()
s.mu.Lock()
clients := s.transactions
if len(clients) == 0 {
s.currentTxNtfn = nil
return
}
// The UnminedTransactions field is intentionally not set. Since the
// hashes of all detached blocks are reported, and all transactions
// moved from a mined block back to unconfirmed are either in the
// UnminedTransactionHashes slice or don't exist due to conflicting with
// a mined transaction in the new best chain, there is no possiblity of
// a new, previously unseen transaction appearing in unconfirmed.
unminedHashes, err := s.wallet.TxStore.UnminedTxHashes()
if err != nil {
log.Errorf("Cannot fetch unmined transaction hashes: %v", err)
return
}
s.currentTxNtfn.UnminedTransactionHashes = unminedHashes
bals := make(map[uint32]btcutil.Amount)
for _, b := range s.currentTxNtfn.AttachedBlocks {
relevantAccounts(s.wallet, bals, b.Transactions)
}
err = totalBalances(s.wallet, bals)
if err != nil {
log.Errorf("Cannot determine balances for relevant accounts: %v", err)
return
}
s.currentTxNtfn.NewBalances = flattenBalanceMap(bals)
for _, c := range clients {
c <- s.currentTxNtfn
}
s.currentTxNtfn = nil
}
// TransactionNotifications is a notification of changes to the wallet's
// transaction set and the current chain tip that wallet is considered to be
// synced with. All transactions added to the blockchain are organized by the
// block they were mined in.
//
// During a chain switch, all removed block hashes are included. Detached
// blocks are sorted in the reverse order they were mined. Attached blocks are
// sorted in the order mined.
//
// All newly added unmined transactions are included. Removed unmined
// transactions are not explicitly included. Instead, the hashes of all
// transactions still unmined are included.
//
// If any transactions were involved, each affected account's new total balance
// is included.
//
// TODO: Because this includes stuff about blocks and can be fired without any
// changes to transactions, it needs a better name.
type TransactionNotifications struct {
AttachedBlocks []Block
DetachedBlocks []*wire.ShaHash
UnminedTransactions []TransactionSummary
UnminedTransactionHashes []*wire.ShaHash
NewBalances []AccountBalance
}
// Block contains the properties and all relevant transactions of an attached
// block.
type Block struct {
Hash *wire.ShaHash
Height int32
Timestamp int64
Transactions []TransactionSummary
}
// TransactionSummary contains a transaction relevant to the wallet and marks
// which inputs and outputs were relevant.
type TransactionSummary struct {
Hash *wire.ShaHash
Transaction []byte
MyInputs []TransactionSummaryInput
MyOutputs []TransactionSummaryOutput
Fee btcutil.Amount
Timestamp int64
}
// TransactionSummaryInput describes a transaction input that is relevant to the
// wallet. The Index field marks the transaction input index of the transaction
// (not included here). The PreviousAccount and PreviousAmount fields describe
// how much this input debits from a wallet account.
type TransactionSummaryInput struct {
Index uint32
PreviousAccount uint32
PreviousAmount btcutil.Amount
}
// TransactionSummaryOutput describes a transaction output of a relevant
// transaction. When the transaction is authored by this wallet, all
// transaction outputs are considered relevant. The Mine field describes
// whether outputs to these authored transactions pay back to the wallet
// (e.g. change) or create an uncontrolled output. For convenience, the
// addresses (if any) of an uncontrolled output are included.
type TransactionSummaryOutput struct {
Index uint32
Amount btcutil.Amount
Mine bool
// Only relevant if mine==true.
Account uint32
Internal bool
// Only relevant if mine==false.
Addresses []btcutil.Address
}
// AccountBalance associates a total (zero confirmation) balance with an
// account. Balances for other minimum confirmation counts require more
// expensive logic and it is not clear which minimums a client is interested in,
// so they are not included.
type AccountBalance struct {
Account uint32
TotalBalance btcutil.Amount
}
// TransactionNotificationsClient receives TransactionNotifications from the
// NotificationServer over the channel C.
type TransactionNotificationsClient struct {
C <-chan *TransactionNotifications
server *NotificationServer
}
// TransactionNotifications returns a client for receiving
// TransactionNotifiations notifications over a channel. The channel is
// unbuffered.
//
// When finished, the Done method should be called on the client to disassociate
// it from the server.
func (s *NotificationServer) TransactionNotifications() TransactionNotificationsClient {
c := make(chan *TransactionNotifications)
s.mu.Lock()
s.transactions = append(s.transactions, c)
s.mu.Unlock()
return TransactionNotificationsClient{
C: c,
server: s,
}
}
// Done deregisters the client from the server and drains any remaining
// messages. It must be called exactly once when the client is finished
// receiving notifications.
func (c *TransactionNotificationsClient) Done() {
go func() {
// Drain notifications until the client channel is removed from
// the server and closed.
for range c.C {
}
}()
go func() {
s := c.server
s.mu.Lock()
clients := s.transactions
for i, ch := range clients {
if c.C == ch {
clients[i] = clients[len(clients)-1]
s.transactions = clients[:len(clients)-1]
close(ch)
break
}
}
s.mu.Unlock()
}()
}
// SpentnessNotifications is a notification that is fired for transaction
// outputs controlled by some account's keys. The notification may be about a
// newly added unspent transaction output or that a previously unspent output is
// now spent. When spent, the notification includes the spending transaction's
// hash and input index.
type SpentnessNotifications struct {
hash *wire.ShaHash
spenderHash *wire.ShaHash
index uint32
spenderIndex uint32
}
// Hash returns the transaction hash of the spent output.
func (n *SpentnessNotifications) Hash() *wire.ShaHash {
return n.hash
}
// Index returns the transaction output index of the spent output.
func (n *SpentnessNotifications) Index() uint32 {
return n.index
}
// Spender returns the spending transction's hash and input index, if any. If
// the output is unspent, the final bool return is false.
func (n *SpentnessNotifications) Spender() (*wire.ShaHash, uint32, bool) {
return n.spenderHash, n.spenderIndex, n.spenderHash != nil
}
// notifyUnspentOutput notifies registered clients of a new unspent output that
// is controlled by the wallet.
func (s *NotificationServer) notifyUnspentOutput(account uint32, hash *wire.ShaHash, index uint32) {
defer s.mu.Unlock()
s.mu.Lock()
clients := s.spentness[account]
if len(clients) == 0 {
return
}
n := &SpentnessNotifications{
hash: hash,
index: index,
}
for _, c := range clients {
c <- n
}
}
// notifySpentOutput notifies registered clients that a previously-unspent
// output is now spent, and includes the spender hash and input index in the
// notification.
func (s *NotificationServer) notifySpentOutput(account uint32, op *wire.OutPoint, spenderHash *wire.ShaHash, spenderIndex uint32) {
defer s.mu.Unlock()
s.mu.Lock()
clients := s.spentness[account]
if len(clients) == 0 {
return
}
n := &SpentnessNotifications{
hash: &op.Hash,
index: op.Index,
spenderHash: spenderHash,
spenderIndex: spenderIndex,
}
for _, c := range clients {
c <- n
}
}
// SpentnessNotificationsClient receives SpentnessNotifications from the
// NotificationServer over the channel C.
type SpentnessNotificationsClient struct {
C <-chan *SpentnessNotifications
account uint32
server *NotificationServer
}
// AccountSpentnessNotifications registers a client for spentness changes of
// outputs controlled by the account.
func (s *NotificationServer) AccountSpentnessNotifications(account uint32) SpentnessNotificationsClient {
c := make(chan *SpentnessNotifications)
s.mu.Lock()
s.spentness[account] = append(s.spentness[account], c)
s.mu.Unlock()
return SpentnessNotificationsClient{
C: c,
account: account,
server: s,
}
}
// Done deregisters the client from the server and drains any remaining
// messages. It must be called exactly once when the client is finished
// receiving notifications.
func (c *SpentnessNotificationsClient) Done() {
go func() {
// Drain notifications until the client channel is removed from
// the server and closed.
for range c.C {
}
}()
go func() {
s := c.server
s.mu.Lock()
clients := s.spentness[c.account]
for i, ch := range clients {
if c.C == ch {
clients[i] = clients[len(clients)-1]
s.spentness[c.account] = clients[:len(clients)-1]
close(ch)
break
}
}
s.mu.Unlock()
}()
}
// AccountNotification contains properties regarding an account, such as its
// name and the number of derived and imported keys. When any of these
// properties change, the notification is fired.
type AccountNotification struct {
AccountNumber uint32
AccountName string
ExternalKeyCount uint32
InternalKeyCount uint32
ImportedKeyCount uint32
}
func (s *NotificationServer) notifyAccountProperties(props *waddrmgr.AccountProperties) {
defer s.mu.Unlock()
s.mu.Lock()
clients := s.accountClients
if len(clients) == 0 {
return
}
n := &AccountNotification{
AccountNumber: props.AccountNumber,
AccountName: props.AccountName,
ExternalKeyCount: props.ExternalKeyCount,
InternalKeyCount: props.InternalKeyCount,
ImportedKeyCount: props.ImportedKeyCount,
}
for _, c := range clients {
c <- n
}
}
// AccountNotificationsClient receives AccountNotifications over the channel C.
type AccountNotificationsClient struct {
C chan *AccountNotification
server *NotificationServer
}
// AccountNotifications returns a client for receiving AccountNotifications over
// a channel. The channel is unbuffered. When finished, the client's Done
// method should be called to disassociate the client from the server.
func (s *NotificationServer) AccountNotifications() AccountNotificationsClient {
c := make(chan *AccountNotification)
s.mu.Lock()
s.accountClients = append(s.accountClients, c)
s.mu.Unlock()
return AccountNotificationsClient{
C: c,
server: s,
}
}
// Done deregisters the client from the server and drains any remaining
// messages. It must be called exactly once when the client is finished
// receiving notifications.
func (c *AccountNotificationsClient) Done() {
go func() {
for range c.C {
}
}()
go func() {
s := c.server
s.mu.Lock()
clients := s.accountClients
for i, ch := range clients {
if c.C == ch {
clients[i] = clients[len(clients)-1]
s.accountClients = clients[:len(clients)-1]
close(ch)
break
}
}
s.mu.Unlock()
}()
}