Rework the btcd RPC connection.

This change greatly cleans up the RPC connection between btcwallet and
btcd.  Proper (JSON-RPC spec-following) notifications are now expected
rather than Responses with a non-empty IDs.

A new RPCConn interface type has also been introduced with a
BtcdRPCConn concrete type for btcd RPC connections.  Non-btcd-specific
code handles the RPCConn, while the btcd details have been abstracted
away to a handful of functions.  This will make it easier to write
tests by creating a new fake RPC connection with hardcoded expected
replies.
This commit is contained in:
Josh Rickmar 2014-01-03 13:34:37 -05:00
parent 42055d5b7c
commit 15ffc674a9
7 changed files with 1213 additions and 1400 deletions

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013 Conformal Systems LLC <info@conformal.com> * Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
* *
* Permission to use, copy, modify, and distribute this software for any * Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above * purpose with or without fee is hereby granted, provided that the above
@ -20,15 +20,12 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"github.com/conformal/btcjson"
"github.com/conformal/btcutil" "github.com/conformal/btcutil"
"github.com/conformal/btcwallet/tx" "github.com/conformal/btcwallet/tx"
"github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
"github.com/conformal/btcws"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
) )
// ErrNotFound describes an error where a map lookup failed due to a // ErrNotFound describes an error where a map lookup failed due to a
@ -70,13 +67,11 @@ func LookupAccountByAddress(address string) (string, error) {
// to prevent against incorrect multiple access. // to prevent against incorrect multiple access.
type Account struct { type Account struct {
*wallet.Wallet *wallet.Wallet
mtx sync.RWMutex mtx sync.RWMutex
name string name string
dirty bool dirty bool
fullRescan bool fullRescan bool
NewBlockTxJSONID uint64 UtxoStore struct {
SpentOutpointJSONID uint64
UtxoStore struct {
sync.RWMutex sync.RWMutex
dirty bool dirty bool
s tx.UtxoStore s tx.UtxoStore
@ -399,7 +394,7 @@ func (a *Account) ImportPrivKey(wif string, rescan bool) error {
addr: struct{}{}, addr: struct{}{},
} }
a.RescanAddresses(bs.Height, addrs) Rescan(CurrentRPCConn(), bs.Height, addrs)
} }
return nil return nil
} }
@ -447,32 +442,26 @@ func (a *Account) ImportWIFPrivateKey(wif string, bs *wallet.BlockStamp) (string
} }
// Track requests btcd to send notifications of new transactions for // Track requests btcd to send notifications of new transactions for
// each address stored in a wallet and sets up a new reply handler for // each address stored in a wallet.
// these notifications.
func (a *Account) Track() { func (a *Account) Track() {
n := <-NewJSONID // Request notifications for transactions sending to all wallet
a.mtx.Lock() // addresses.
a.NewBlockTxJSONID = n addrs := a.ActiveAddresses()
a.mtx.Unlock() addrstrs := make([]string, len(addrs))
i := 0
replyHandlers.Lock() for addr := range addrs {
replyHandlers.m[n] = a.newBlockTxOutHandler addrstrs[i] = addr.EncodeAddress()
replyHandlers.Unlock() i++
for _, addr := range a.ActiveAddresses() {
a.ReqNewTxsForAddress(addr.Address)
} }
n = <-NewJSONID err := NotifyNewTXs(CurrentRPCConn(), addrstrs)
a.mtx.Lock() if err != nil {
a.SpentOutpointJSONID = n log.Error("Unable to request transaction updates for address.")
a.mtx.Unlock() }
replyHandlers.Lock()
replyHandlers.m[n] = a.spentUtxoHandler
replyHandlers.Unlock()
a.UtxoStore.RLock() a.UtxoStore.RLock()
for _, utxo := range a.UtxoStore.s { for _, utxo := range a.UtxoStore.s {
a.ReqSpentUtxoNtfn(utxo) ReqSpentUtxoNtfn(utxo)
} }
a.UtxoStore.RUnlock() a.UtxoStore.RUnlock()
} }
@ -507,58 +496,7 @@ func (a *Account) RescanActiveAddresses() {
} }
// Rescan active addresses starting at the determined block height. // Rescan active addresses starting at the determined block height.
a.RescanAddresses(beginBlock, a.ActivePaymentAddresses()) Rescan(CurrentRPCConn(), beginBlock, a.ActivePaymentAddresses())
}
// RescanAddresses requests btcd to rescan a set of addresses. This
// is needed when, for example, importing private key(s), where btcwallet
// is synced with btcd for all but several address.
func (a *Account) RescanAddresses(beginBlock int32, addrs map[string]struct{}) {
n := <-NewJSONID
cmd, err := btcws.NewRescanCmd(fmt.Sprintf("btcwallet(%v)", n),
beginBlock, addrs)
if err != nil {
log.Errorf("cannot create rescan request: %v", err)
return
}
mcmd, err := cmd.MarshalJSON()
if err != nil {
log.Errorf("cannot create rescan request: %v", err)
return
}
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, e *btcjson.Error) bool {
// Rescan is compatible with new txs from connected block
// notifications, so use that handler.
_ = a.newBlockTxOutHandler(result, e)
if result != nil {
// Notify frontends of new account balance.
confirmed := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - confirmed
NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, unconfirmed)
return false
}
if bs, err := GetCurBlock(); err == nil {
a.mtx.Lock()
a.Wallet.SetSyncedWith(&bs)
a.dirty = true
a.mtx.Unlock()
if err = a.writeDirtyToDisk(); err != nil {
log.Errorf("cannot sync dirty wallet: %v",
err)
}
}
// If result is nil, the rescan has completed. Returning
// true removes this handler.
return true
}
replyHandlers.Unlock()
btcdMsgs <- mcmd
} }
// SortedActivePaymentAddresses returns a slice of all active payment // SortedActivePaymentAddresses returns a slice of all active payment
@ -638,255 +576,19 @@ func (a *Account) ReqNewTxsForAddress(addr btcutil.Address) {
log.Debugf("Requesting notifications of TXs sending to address %v", apkh) log.Debugf("Requesting notifications of TXs sending to address %v", apkh)
a.mtx.RLock() err := NotifyNewTXs(CurrentRPCConn(), []string{apkh.EncodeAddress()})
n := a.NewBlockTxJSONID
a.mtx.RUnlock()
cmd := btcws.NewNotifyNewTXsCmd(fmt.Sprintf("btcwallet(%d)", n),
[]string{apkh.EncodeAddress()})
mcmd, err := cmd.MarshalJSON()
if err != nil { if err != nil {
log.Errorf("cannot request transaction notifications: %v", err) log.Error("Unable to request transaction updates for address.")
} }
btcdMsgs <- mcmd
} }
// ReqSpentUtxoNtfn sends a message to btcd to request updates for when // ReqSpentUtxoNtfn sends a message to btcd to request updates for when
// a stored UTXO has been spent. // a stored UTXO has been spent.
func (a *Account) ReqSpentUtxoNtfn(u *tx.Utxo) { func ReqSpentUtxoNtfn(u *tx.Utxo) {
log.Debugf("Requesting spent UTXO notifications for Outpoint hash %s index %d", log.Debugf("Requesting spent UTXO notifications for Outpoint hash %s index %d",
u.Out.Hash, u.Out.Index) u.Out.Hash, u.Out.Index)
a.mtx.RLock() NotifySpent(CurrentRPCConn(), (*btcwire.OutPoint)(&u.Out))
n := a.SpentOutpointJSONID
a.mtx.RUnlock()
cmd := btcws.NewNotifySpentCmd(fmt.Sprintf("btcwallet(%d)", n),
(*btcwire.OutPoint)(&u.Out))
mcmd, err := cmd.MarshalJSON()
if err != nil {
log.Errorf("cannot create spent request: %v", err)
return
}
btcdMsgs <- mcmd
}
// spentUtxoHandler is the handler function for btcd spent UTXO notifications
// resulting from transactions in newly-attached blocks.
func (a *Account) spentUtxoHandler(result interface{}, e *btcjson.Error) bool {
if e != nil {
log.Errorf("Spent UTXO Handler: Error %d received from btcd: %s",
e.Code, e.Message)
return false
}
v, ok := result.(map[string]interface{})
if !ok {
return false
}
txHashBE, ok := v["txhash"].(string)
if !ok {
log.Error("Spent UTXO Handler: Unspecified transaction hash.")
return false
}
txHash, err := btcwire.NewShaHashFromStr(txHashBE)
if err != nil {
log.Errorf("Spent UTXO Handler: Bad transaction hash: %s", err)
return false
}
index, ok := v["index"].(float64)
if !ok {
log.Error("Spent UTXO Handler: Unspecified index.")
}
_, _ = txHash, index
// Never remove this handler.
return false
}
// newBlockTxOutHandler is the handler function for btcd transaction
// notifications resulting from newly-attached blocks.
func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) bool {
if e != nil {
log.Errorf("Tx Handler: Error %d received from btcd: %s",
e.Code, e.Message)
return false
}
v, ok := result.(map[string]interface{})
if !ok {
// The first result sent from btcd is nil. This could be used to
// indicate that the request for notifications succeeded.
if result != nil {
log.Errorf("Tx Handler: Unexpected result type %T.", result)
}
return false
}
receiverStr, ok := v["receiver"].(string)
if !ok {
log.Error("Tx Handler: Unspecified receiver.")
return false
}
receiver, err := btcutil.DecodeAddr(receiverStr)
if err != nil {
log.Errorf("Tx Handler: receiver address can not be decoded: %v", err)
return false
}
height, ok := v["height"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified height.")
return false
}
blockHashBE, ok := v["blockhash"].(string)
if !ok {
log.Error("Tx Handler: Unspecified block hash.")
return false
}
blockHash, err := btcwire.NewShaHashFromStr(blockHashBE)
if err != nil {
log.Errorf("Tx Handler: Block hash string cannot be parsed: %v", err)
return false
}
fblockIndex, ok := v["blockindex"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified block index.")
return false
}
blockIndex := int32(fblockIndex)
fblockTime, ok := v["blocktime"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified block time.")
return false
}
blockTime := int64(fblockTime)
txhashBE, ok := v["txid"].(string)
if !ok {
log.Error("Tx Handler: Unspecified transaction hash.")
return false
}
txID, err := btcwire.NewShaHashFromStr(txhashBE)
if err != nil {
log.Errorf("Tx Handler: Tx hash string cannot be parsed: %v", err)
return false
}
ftxOutIndex, ok := v["txoutindex"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified transaction output index.")
return false
}
txOutIndex := uint32(ftxOutIndex)
amt, ok := v["amount"].(float64)
if !ok {
log.Error("Tx Handler: Unspecified amount.")
return false
}
pkscript58, ok := v["pkscript"].(string)
if !ok {
log.Error("Tx Handler: Unspecified pubkey script.")
return false
}
pkscript := btcutil.Base58Decode(pkscript58)
spent := false
if tspent, ok := v["spent"].(bool); ok {
spent = tspent
}
if int32(height) != -1 {
worker := NotifyBalanceWorker{
block: *blockHash,
wg: make(chan *sync.WaitGroup),
}
NotifyBalanceSyncerChans.add <- worker
wg := <-worker.wg
defer func() {
wg.Done()
}()
}
// Create RecvTx to add to tx history.
t := &tx.RecvTx{
TxID: *txID,
TxOutIdx: txOutIndex,
TimeReceived: time.Now().Unix(),
BlockHeight: int32(height),
BlockHash: *blockHash,
BlockIndex: blockIndex,
BlockTime: blockTime,
Amount: int64(amt),
ReceiverHash: receiver.ScriptAddress(),
}
// For transactions originating from this wallet, the sent tx history should
// be recorded before the received history. If wallet created this tx, wait
// for the sent history to finish being recorded before continuing.
req := SendTxHistSyncRequest{
txid: *txID,
response: make(chan SendTxHistSyncResponse),
}
SendTxHistSyncChans.access <- req
resp := <-req.response
if resp.ok {
// Wait until send history has been recorded.
<-resp.c
SendTxHistSyncChans.remove <- *txID
}
// Actually record the tx history.
a.TxStore.Lock()
a.TxStore.s.InsertRecvTx(t)
a.TxStore.dirty = true
a.TxStore.Unlock()
// Notify frontends of tx. If the tx is unconfirmed, it is always
// notified and the outpoint is marked as notified. If the outpoint
// has already been notified and is now in a block, a txmined notifiction
// should be sent once to let frontends that all previous send/recvs
// for this unconfirmed tx are now confirmed.
recvTxOP := btcwire.NewOutPoint(txID, txOutIndex)
previouslyNotifiedReq := NotifiedRecvTxRequest{
op: *recvTxOP,
response: make(chan NotifiedRecvTxResponse),
}
NotifiedRecvTxChans.access <- previouslyNotifiedReq
if <-previouslyNotifiedReq.response {
NotifyMinedTx <- t
NotifiedRecvTxChans.remove <- *recvTxOP
} else {
// Notify frontends of new recv tx and mark as notified.
NotifiedRecvTxChans.add <- *recvTxOP
NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(),
int32(height), a.Wallet.Net()))
}
if !spent {
u := &tx.Utxo{
Amt: uint64(amt),
Height: int32(height),
Subscript: pkscript,
}
copy(u.Out.Hash[:], txID[:])
u.Out.Index = uint32(txOutIndex)
copy(u.AddrHash[:], receiver.ScriptAddress())
copy(u.BlockHash[:], blockHash[:])
a.UtxoStore.Lock()
a.UtxoStore.s.Insert(u)
a.UtxoStore.dirty = true
a.UtxoStore.Unlock()
// If this notification came from mempool, notify frontends of
// the new unconfirmed balance immediately. Otherwise, wait until
// the blockconnected notifiation is processed.
if u.Height == -1 {
bal := a.CalculateBalance(0) - a.CalculateBalance(1)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster,
a.name, bal)
}
}
// Never remove this handler.
return false
} }
// accountdir returns the directory containing an account's wallet, utxo, // accountdir returns the directory containing an account's wallet, utxo,

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013 Conformal Systems LLC <info@conformal.com> * Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
* *
* Permission to use, copy, modify, and distribute this software for any * Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above * purpose with or without fee is hereby granted, provided that the above
@ -41,7 +41,7 @@ var accountstore = NewAccountStore()
// key. A RWMutex is used to protect against incorrect concurrent // key. A RWMutex is used to protect against incorrect concurrent
// access. // access.
type AccountStore struct { type AccountStore struct {
sync.Mutex sync.RWMutex
accounts map[string]*Account accounts map[string]*Account
} }
@ -55,8 +55,8 @@ func NewAccountStore() *AccountStore {
// Account returns the account specified by name, or ErrAcctNotExist // Account returns the account specified by name, or ErrAcctNotExist
// as an error if the account is not found. // as an error if the account is not found.
func (store *AccountStore) Account(name string) (*Account, error) { func (store *AccountStore) Account(name string) (*Account, error) {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
account, ok := store.accounts[name] account, ok := store.accounts[name]
if !ok { if !ok {
@ -70,8 +70,8 @@ func (store *AccountStore) Rollback(height int32, hash *btcwire.ShaHash) {
log.Debugf("Rolling back tx history since block height %v hash %v", log.Debugf("Rolling back tx history since block height %v hash %v",
height, hash) height, hash)
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
for _, account := range store.accounts { for _, account := range store.accounts {
account.Rollback(height, hash) account.Rollback(height, hash)
@ -83,8 +83,8 @@ func (store *AccountStore) Rollback(height int32, hash *btcwire.ShaHash) {
// block, including changed balances. Each account is then set to be synced // block, including changed balances. Each account is then set to be synced
// with the latest block. // with the latest block.
func (store *AccountStore) BlockNotify(bs *wallet.BlockStamp) { func (store *AccountStore) BlockNotify(bs *wallet.BlockStamp) {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
for _, a := range store.accounts { for _, a := range store.accounts {
// The UTXO store will be dirty if it was modified // The UTXO store will be dirty if it was modified
@ -129,8 +129,8 @@ func (store *AccountStore) RecordMinedTx(txid *btcwire.ShaHash,
blkhash *btcwire.ShaHash, blkheight int32, blkindex int, blkhash *btcwire.ShaHash, blkheight int32, blkindex int,
blktime int64) error { blktime int64) error {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
for _, account := range store.accounts { for _, account := range store.accounts {
account.TxStore.Lock() account.TxStore.Lock()
@ -175,10 +175,9 @@ func (store *AccountStore) CalculateBalance(account string,
// CreateEncryptedWallet creates a new account with a wallet file // CreateEncryptedWallet creates a new account with a wallet file
// encrypted with passphrase. // encrypted with passphrase.
func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase []byte) error { func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase []byte) error {
store.Lock() store.RLock()
defer store.Unlock()
_, ok := store.accounts[name] _, ok := store.accounts[name]
store.RUnlock()
if ok { if ok {
return ErrAcctExists return ErrAcctExists
} }
@ -198,16 +197,17 @@ func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase [
// Create new account with the wallet. A new JSON ID is set for // Create new account with the wallet. A new JSON ID is set for
// transaction notifications. // transaction notifications.
account := &Account{ account := &Account{
Wallet: wlt, Wallet: wlt,
name: name, name: name,
dirty: true, dirty: true,
NewBlockTxJSONID: <-NewJSONID,
} }
// Save the account in the global account map. The mutex is // Save the account in the global account map. The mutex is
// already held at this point, and will be unlocked when this // already held at this point, and will be unlocked when this
// func returns. // func returns.
store.Lock()
store.accounts[name] = account store.accounts[name] = account
store.Unlock()
// Begin tracking account against a connected btcd. // Begin tracking account against a connected btcd.
// //
@ -226,8 +226,8 @@ func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase [
// DumpKeys returns all WIF-encoded private keys associated with all // DumpKeys returns all WIF-encoded private keys associated with all
// accounts. All wallets must be unlocked for this operation to succeed. // accounts. All wallets must be unlocked for this operation to succeed.
func (store *AccountStore) DumpKeys() ([]string, error) { func (store *AccountStore) DumpKeys() ([]string, error) {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
var keys []string var keys []string
for _, a := range store.accounts { for _, a := range store.accounts {
@ -249,8 +249,8 @@ func (store *AccountStore) DumpKeys() ([]string, error) {
// DumpWIFPrivateKey searches through all accounts for the bitcoin // DumpWIFPrivateKey searches through all accounts for the bitcoin
// payment address addr and returns the WIF-encdoded private key. // payment address addr and returns the WIF-encdoded private key.
func (store *AccountStore) DumpWIFPrivateKey(addr btcutil.Address) (string, error) { func (store *AccountStore) DumpWIFPrivateKey(addr btcutil.Address) (string, error) {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
for _, a := range store.accounts { for _, a := range store.accounts {
switch wif, err := a.DumpWIFPrivateKey(addr); err { switch wif, err := a.DumpWIFPrivateKey(addr); err {
@ -272,8 +272,8 @@ func (store *AccountStore) DumpWIFPrivateKey(addr btcutil.Address) (string, erro
// NotifyBalances notifies a wallet frontend of all confirmed and unconfirmed // NotifyBalances notifies a wallet frontend of all confirmed and unconfirmed
// account balances. // account balances.
func (store *AccountStore) NotifyBalances(frontend chan []byte) { func (store *AccountStore) NotifyBalances(frontend chan []byte) {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
for _, account := range store.accounts { for _, account := range store.accounts {
balance := account.CalculateBalance(1) balance := account.CalculateBalance(1)
@ -286,8 +286,8 @@ func (store *AccountStore) NotifyBalances(frontend chan []byte) {
// ListAccounts returns a map of account names to their current account // ListAccounts returns a map of account names to their current account
// balances. The balances are calculated using minconf confirmations. // balances. The balances are calculated using minconf confirmations.
func (store *AccountStore) ListAccounts(minconf int) map[string]float64 { func (store *AccountStore) ListAccounts(minconf int) map[string]float64 {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
// Create and fill a map of account names and their balances. // Create and fill a map of account names and their balances.
pairs := make(map[string]float64) pairs := make(map[string]float64)
@ -303,8 +303,8 @@ func (store *AccountStore) ListAccounts(minconf int) map[string]float64 {
// TODO(jrick): batch addresses for all accounts together so multiple // TODO(jrick): batch addresses for all accounts together so multiple
// rescan commands can be avoided. // rescan commands can be avoided.
func (store *AccountStore) RescanActiveAddresses() { func (store *AccountStore) RescanActiveAddresses() {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
for _, account := range store.accounts { for _, account := range store.accounts {
account.RescanActiveAddresses() account.RescanActiveAddresses()
@ -314,8 +314,8 @@ func (store *AccountStore) RescanActiveAddresses() {
// Track begins tracking all addresses in all accounts for updates from // Track begins tracking all addresses in all accounts for updates from
// btcd. // btcd.
func (store *AccountStore) Track() { func (store *AccountStore) Track() {
store.Lock() store.RLock()
defer store.Unlock() defer store.RUnlock()
for _, account := range store.accounts { for _, account := range store.accounts {
account.Track() account.Track()
@ -329,9 +329,6 @@ func (store *AccountStore) Track() {
// Wallets opened from this function are not set to track against a // Wallets opened from this function are not set to track against a
// btcd connection. // btcd connection.
func (store *AccountStore) OpenAccount(name string, cfg *config) error { func (store *AccountStore) OpenAccount(name string, cfg *config) error {
store.Lock()
defer store.Unlock()
wlt := new(wallet.Wallet) wlt := new(wallet.Wallet)
a := &Account{ a := &Account{
@ -401,6 +398,7 @@ func (store *AccountStore) OpenAccount(name string, cfg *config) error {
} }
} }
store.Lock()
switch finalErr { switch finalErr {
case ErrNoTxs: case ErrNoTxs:
// Do nothing special for now. This will be implemented when // Do nothing special for now. This will be implemented when
@ -419,6 +417,7 @@ func (store *AccountStore) OpenAccount(name string, cfg *config) error {
default: default:
log.Warnf("cannot open wallet: %v", err) log.Warnf("cannot open wallet: %v", err)
} }
store.Unlock()
// Mark all active payment addresses as belonging to this account. // Mark all active payment addresses as belonging to this account.
for addr := range a.ActivePaymentAddresses() { for addr := range a.ActivePaymentAddresses() {

540
btcdrpc.go Normal file
View file

@ -0,0 +1,540 @@
/*
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
// This file implements the websocket RPC connection to a btcd instance.
package main
import (
"code.google.com/p/go.net/websocket"
"encoding/hex"
"encoding/json"
"errors"
"github.com/conformal/btcjson"
"github.com/conformal/btcutil"
"github.com/conformal/btcwallet/tx"
"github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire"
"github.com/conformal/btcws"
"sync"
"time"
)
// ErrBtcdDisconnected describes an error where an operation cannot
// successfully complete due to btcwallet not being connected to
// btcd.
var ErrBtcdDisconnected = btcjson.Error{
Code: -1,
Message: "btcd disconnected",
}
// BtcdRPCConn is a type managing a client connection to a btcd RPC server
// over websockets.
type BtcdRPCConn struct {
ws *websocket.Conn
addRequest chan *AddRPCRequest
closed chan struct{}
}
// Ensure that BtcdRPCConn can be used as an RPCConn.
var _ RPCConn = &BtcdRPCConn{}
// NewBtcdRPCConn creates a new RPC connection from a btcd websocket
// connection to btcd.
func NewBtcdRPCConn(ws *websocket.Conn) *BtcdRPCConn {
conn := &BtcdRPCConn{
ws: ws,
addRequest: make(chan *AddRPCRequest),
closed: make(chan struct{}),
}
return conn
}
// SendRequest sends an RPC request and returns a channel to read the response's
// result and error. Part of the RPCConn interface.
func (btcd *BtcdRPCConn) SendRequest(request *RPCRequest) chan *RPCResponse {
select {
case <-btcd.closed:
// The connection has closed, so instead of adding and sending
// a request, return a channel that just replies with the
// error for a disconnected btcd.
responseChan := make(chan *RPCResponse)
go func() {
response := &RPCResponse{
Err: &ErrBtcdDisconnected,
}
responseChan <- response
}()
return responseChan
default:
addRequest := &AddRPCRequest{
Request: request,
ResponseChan: make(chan chan *RPCResponse),
}
btcd.addRequest <- addRequest
return <-addRequest.ResponseChan
}
}
// Connected returns whether the connection remains established to the RPC
// server.
//
// This function probably should be removed, as any checks for confirming
// the connection are no longer valid after the check and may result in
// races.
func (btcd *BtcdRPCConn) Connected() bool {
select {
case <-btcd.closed:
return false
default:
return true
}
}
// AddRPCRequest is used to add an RPCRequest to the pool of requests
// being manaaged by a btcd RPC connection.
type AddRPCRequest struct {
Request *RPCRequest
ResponseChan chan chan *RPCResponse
}
// send performs the actual send of the marshaled request over the btcd
// websocket connection.
func (btcd *BtcdRPCConn) send(rpcrequest *RPCRequest) error {
// btcjson.Cmds define their own MarshalJSON which returns an error
// to satisify the json.Marshaler interface, but will never error.
mrequest, _ := rpcrequest.request.MarshalJSON()
return websocket.Message.Send(btcd.ws, mrequest)
}
type receivedResponse struct {
id uint64
raw []byte
reply *btcjson.Reply
}
// Start starts the goroutines required to send RPC requests and listen for
// replies.
func (btcd *BtcdRPCConn) Start() {
done := btcd.closed
responses := make(chan *receivedResponse)
// Maintain a map of JSON IDs to RPCRequests currently being waited on.
go func() {
m := make(map[uint64]*RPCRequest)
for {
select {
case addrequest := <-btcd.addRequest:
rpcrequest := addrequest.Request
m[rpcrequest.request.Id().(uint64)] = rpcrequest
if err := btcd.send(rpcrequest); err != nil {
// Connection lost.
btcd.ws.Close()
close(done)
}
addrequest.ResponseChan <- rpcrequest.response
case recvResponse := <-responses:
rpcrequest, ok := m[recvResponse.id]
if !ok {
log.Warnf("Received unexpected btcd response")
continue
}
delete(m, recvResponse.id)
// If no result var was set, create and send
// send the response unmarshaled by the json
// package.
if rpcrequest.result == nil {
response := &RPCResponse{
Result: recvResponse.reply.Result,
Err: recvResponse.reply.Error,
}
rpcrequest.response <- response
continue
}
// A return var was set, so unmarshal again
// into the var before sending the response.
r := &btcjson.Reply{
Result: rpcrequest.result,
}
json.Unmarshal(recvResponse.raw, &r)
response := &RPCResponse{
Result: r.Result,
Err: r.Error,
}
rpcrequest.response <- response
case <-done:
for _, request := range m {
response := &RPCResponse{
Err: &ErrBtcdDisconnected,
}
request.response <- response
}
return
}
}
}()
// Listen for replies/notifications from btcd, and decide how to handle them.
go func() {
// Idea: instead of reading btcd messages from just one websocket
// connection, maybe use two so the same connection isn't used
// for both notifications and responses? Should make handling
// must faster as unnecessary unmarshal attempts could be avoided.
for {
var m []byte
if err := websocket.Message.Receive(btcd.ws, &m); err != nil {
log.Debugf("Cannot recevie btcd message: %v", err)
close(done)
return
}
// Try notifications (requests with nil ids) first.
n, err := unmarshalNotification(m)
if err == nil {
// Make a copy of the marshaled notification.
mcopy := make([]byte, len(m))
copy(mcopy, m)
// Begin processing the notification.
go processNotification(n, mcopy)
continue
}
// Must be a response.
r, err := unmarshalResponse(m)
if err == nil {
responses <- r
continue
}
// Not sure what was received but it isn't correct.
log.Warnf("Received invalid message from btcd")
}
}()
}
// unmarshalResponse attempts to unmarshal a marshaled JSON-RPC
// response.
func unmarshalResponse(b []byte) (*receivedResponse, error) {
var r btcjson.Reply
if err := json.Unmarshal(b, &r); err != nil {
return nil, err
}
// Check for a valid ID.
if r.Id == nil {
return nil, errors.New("id is nil")
}
fid, ok := (*r.Id).(float64)
if !ok {
return nil, errors.New("id is not a number")
}
response := &receivedResponse{
id: uint64(fid),
raw: b,
reply: &r,
}
return response, nil
}
// unmarshalNotification attempts to unmarshal a marshaled JSON-RPC
// notification (Request with a nil or no ID).
func unmarshalNotification(b []byte) (btcjson.Cmd, error) {
req, err := btcjson.ParseMarshaledCmd(b)
if err != nil {
return nil, err
}
if req.Id() != nil {
return nil, errors.New("id is non-nil")
}
return req, nil
}
// processNotification checks for a handler for a notification, and sends
func processNotification(n btcjson.Cmd, b []byte) {
// Message is a btcd notification. Check the method and dispatch
// correct handler, or if no handler, pass up to each wallet.
if ntfnHandler, ok := notificationHandlers[n.Method()]; ok {
log.Debugf("Running notification handler for method %v",
n.Method())
ntfnHandler(n, b)
} else {
// No handler; send to all wallets.
log.Debugf("Sending notification with method %v to all wallets",
n.Method())
frontendNotificationMaster <- b
}
}
type notificationHandler func(btcjson.Cmd, []byte)
var notificationHandlers = map[string]notificationHandler{
btcws.BlockConnectedNtfnMethod: NtfnBlockConnected,
btcws.BlockDisconnectedNtfnMethod: NtfnBlockDisconnected,
btcws.ProcessedTxNtfnMethod: NtfnProcessedTx,
btcws.TxMinedNtfnMethod: NtfnTxMined,
btcws.TxSpentNtfnMethod: NtfnTxSpent,
}
// NtfnProcessedTx handles the btcws.ProcessedTxNtfn notification.
func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) {
ptn, ok := n.(*btcws.ProcessedTxNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
// Create useful types from the JSON strings.
receiver, err := btcutil.DecodeAddr(ptn.Receiver)
if err != nil {
log.Errorf("%v handler: error parsing receiver: %v", n.Method(), err)
return
}
txID, err := btcwire.NewShaHashFromStr(ptn.TxID)
if err != nil {
log.Errorf("%v handler: error parsing txid: %v", n.Method(), err)
return
}
blockHash, err := btcwire.NewShaHashFromStr(ptn.BlockHash)
if err != nil {
log.Errorf("%v handler: error parsing block hash: %v", n.Method(), err)
return
}
pkscript, err := hex.DecodeString(ptn.PkScript)
if err != nil {
log.Errorf("%v handler: error parsing pkscript: %v", n.Method(), err)
return
}
// Lookup account for address in result.
aname, err := LookupAccountByAddress(ptn.Receiver)
if err == ErrNotFound {
log.Warnf("Received rescan result for unknown address %v", ptn.Receiver)
return
}
a, err := accountstore.Account(aname)
if err == ErrAcctNotExist {
log.Errorf("Missing account for rescaned address %v", ptn.Receiver)
}
// Create RecvTx to add to tx history.
t := &tx.RecvTx{
TxID: *txID,
TxOutIdx: ptn.TxOutIndex,
TimeReceived: time.Now().Unix(),
BlockHeight: ptn.BlockHeight,
BlockHash: *blockHash,
BlockIndex: int32(ptn.BlockIndex),
BlockTime: ptn.BlockTime,
Amount: ptn.Amount,
ReceiverHash: receiver.ScriptAddress(),
}
// For transactions originating from this wallet, the sent tx history should
// be recorded before the received history. If wallet created this tx, wait
// for the sent history to finish being recorded before continuing.
req := SendTxHistSyncRequest{
txid: *txID,
response: make(chan SendTxHistSyncResponse),
}
SendTxHistSyncChans.access <- req
resp := <-req.response
if resp.ok {
// Wait until send history has been recorded.
<-resp.c
SendTxHistSyncChans.remove <- *txID
}
// Record the tx history.
a.TxStore.Lock()
a.TxStore.s.InsertRecvTx(t)
a.TxStore.dirty = true
a.TxStore.Unlock()
// Notify frontends of tx. If the tx is unconfirmed, it is always
// notified and the outpoint is marked as notified. If the outpoint
// has already been notified and is now in a block, a txmined notifiction
// should be sent once to let frontends that all previous send/recvs
// for this unconfirmed tx are now confirmed.
recvTxOP := btcwire.NewOutPoint(txID, ptn.TxOutIndex)
previouslyNotifiedReq := NotifiedRecvTxRequest{
op: *recvTxOP,
response: make(chan NotifiedRecvTxResponse),
}
NotifiedRecvTxChans.access <- previouslyNotifiedReq
if <-previouslyNotifiedReq.response {
NotifyMinedTx <- t
NotifiedRecvTxChans.remove <- *recvTxOP
} else {
// Notify frontends of new recv tx and mark as notified.
NotifiedRecvTxChans.add <- *recvTxOP
NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(),
ptn.BlockHeight, a.Wallet.Net()))
}
if !ptn.Spent {
u := &tx.Utxo{
Amt: uint64(ptn.Amount),
Height: ptn.BlockHeight,
Subscript: pkscript,
}
copy(u.Out.Hash[:], txID[:])
u.Out.Index = uint32(ptn.TxOutIndex)
copy(u.AddrHash[:], receiver.ScriptAddress())
copy(u.BlockHash[:], blockHash[:])
a.UtxoStore.Lock()
a.UtxoStore.s.Insert(u)
a.UtxoStore.dirty = true
a.UtxoStore.Unlock()
// If this notification came from mempool, notify frontends of
// the new unconfirmed balance immediately. Otherwise, wait until
// the blockconnected notifiation is processed.
if u.Height == -1 {
bal := a.CalculateBalance(0) - a.CalculateBalance(1)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster,
a.name, bal)
}
}
// Notify frontends of new account balance.
confirmed := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - confirmed
NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, unconfirmed)
}
// NtfnBlockConnected handles btcd notifications resulting from newly
// connected blocks to the main blockchain.
//
// TODO(jrick): Send block time with notification. This will be used
// to mark wallet files with a possibly-better earliest block height,
// and will greatly reduce rescan times for wallets created with an
// out of sync btcd.
func NtfnBlockConnected(n btcjson.Cmd, marshaled []byte) {
bcn, ok := n.(*btcws.BlockConnectedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
hash, err := btcwire.NewShaHashFromStr(bcn.Hash)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
// Update the blockstamp for the newly-connected block.
bs := &wallet.BlockStamp{
Height: bcn.Height,
Hash: *hash,
}
curBlock.Lock()
curBlock.BlockStamp = *bs
curBlock.Unlock()
// btcd notifies btcwallet about transactions first, and then sends
// the new block notification. New balance notifications for txs
// in blocks are therefore sent here after all tx notifications
// have arrived and finished being processed by the handlers.
workers := NotifyBalanceRequest{
block: *hash,
wg: make(chan *sync.WaitGroup),
}
NotifyBalanceSyncerChans.access <- workers
if wg := <-workers.wg; wg != nil {
wg.Wait()
NotifyBalanceSyncerChans.remove <- *hash
}
accountstore.BlockNotify(bs)
// Pass notification to frontends too.
frontendNotificationMaster <- marshaled
}
// NtfnBlockDisconnected handles btcd notifications resulting from
// blocks disconnected from the main chain in the event of a chain
// switch and notifies frontends of the new blockchain height.
func NtfnBlockDisconnected(n btcjson.Cmd, marshaled []byte) {
bdn, ok := n.(*btcws.BlockDisconnectedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
hash, err := btcwire.NewShaHashFromStr(bdn.Hash)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
// Rollback Utxo and Tx data stores.
go func() {
accountstore.Rollback(bdn.Height, hash)
}()
// Pass notification to frontends too.
frontendNotificationMaster <- marshaled
}
// NtfnTxMined handles btcd notifications resulting from newly
// mined transactions that originated from this wallet.
func NtfnTxMined(n btcjson.Cmd, marshaled []byte) {
tmn, ok := n.(*btcws.TxMinedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
txid, err := btcwire.NewShaHashFromStr(tmn.TxID)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
blockhash, err := btcwire.NewShaHashFromStr(tmn.BlockHash)
if err != nil {
log.Errorf("%v handler: invalid block hash string", n.Method())
return
}
err = accountstore.RecordMinedTx(txid, blockhash,
tmn.BlockHeight, tmn.Index, tmn.BlockTime)
if err != nil {
log.Errorf("%v handler: %v", n.Method(), err)
return
}
// Remove mined transaction from pool.
UnminedTxs.Lock()
delete(UnminedTxs.m, TXID(*txid))
UnminedTxs.Unlock()
}
// NtfnTxSpent handles btcd txspent notifications resulting from a block
// transaction being processed that spents a wallet UTXO.
func NtfnTxSpent(n btcjson.Cmd, marshaled []byte) {
// TODO(jrick): This might actually be useless and maybe it shouldn't
// be implemented.
}

169
cmd.go
View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013 Conformal Systems LLC <info@conformal.com> * Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
* *
* Permission to use, copy, modify, and distribute this software for any * Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above * purpose with or without fee is hereby granted, provided that the above
@ -18,12 +18,10 @@ package main
import ( import (
"errors" "errors"
"fmt"
"github.com/conformal/btcjson" "github.com/conformal/btcjson"
"github.com/conformal/btcutil" "github.com/conformal/btcutil"
"github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
"github.com/conformal/btcws"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
@ -71,83 +69,25 @@ func GetCurBlock() (bs wallet.BlockStamp, err error) {
return bs, nil return bs, nil
} }
// This is a hack and may result in races, but we need to make bb, _ := GetBestBlock(CurrentRPCConn())
// sure that btcd is connected and sending a message will succeed, if bb == nil {
// or this will block forever. A better solution is to return an
// error to the reply handler immediately if btcd is disconnected.
if !btcdConnected.b {
return wallet.BlockStamp{ return wallet.BlockStamp{
Height: int32(btcutil.BlockHeightUnknown), Height: int32(btcutil.BlockHeightUnknown),
}, errors.New("current block unavailable") }, errors.New("current block unavailable")
} }
n := <-NewJSONID hash, err := btcwire.NewShaHashFromStr(bb.Hash)
cmd := btcws.NewGetBestBlockCmd(fmt.Sprintf("btcwallet(%v)", n))
mcmd, err := cmd.MarshalJSON()
if err != nil { if err != nil {
return wallet.BlockStamp{ return wallet.BlockStamp{
Height: int32(btcutil.BlockHeightUnknown), Height: int32(btcutil.BlockHeightUnknown),
}, errors.New("cannot ask for best block") }, err
}
c := make(chan *struct {
hash *btcwire.ShaHash
height int32
})
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, e *btcjson.Error) bool {
if e != nil {
c <- nil
return true
}
m, ok := result.(map[string]interface{})
if !ok {
c <- nil
return true
}
hashBE, ok := m["hash"].(string)
if !ok {
c <- nil
return true
}
hash, err := btcwire.NewShaHashFromStr(hashBE)
if err != nil {
c <- nil
return true
}
fheight, ok := m["height"].(float64)
if !ok {
c <- nil
return true
}
c <- &struct {
hash *btcwire.ShaHash
height int32
}{
hash: hash,
height: int32(fheight),
}
return true
}
replyHandlers.Unlock()
// send message
btcdMsgs <- mcmd
// Block until reply is ready.
reply, ok := <-c
if !ok || reply == nil {
return wallet.BlockStamp{
Height: int32(btcutil.BlockHeightUnknown),
}, errors.New("current block unavailable")
} }
curBlock.Lock() curBlock.Lock()
if reply.height > curBlock.BlockStamp.Height { if bb.Height > curBlock.BlockStamp.Height {
bs = wallet.BlockStamp{ bs = wallet.BlockStamp{
Height: reply.height, Height: bb.Height,
Hash: *reply.hash, Hash: *hash,
} }
curBlock.BlockStamp = bs curBlock.BlockStamp = bs
} }
@ -252,35 +192,76 @@ func main() {
NotifyBalanceSyncerChans.remove, NotifyBalanceSyncerChans.remove,
NotifyBalanceSyncerChans.access) NotifyBalanceSyncerChans.access)
for { updateBtcd := make(chan *BtcdRPCConn)
replies := make(chan error) go func() {
done := make(chan int) // Create an RPC connection and close the closed channel.
go func() { //
BtcdConnect(cafile, replies) // It might be a better idea to create a new concrete type
close(done) // just for an always disconnected RPC connection and begin
}() // with that.
selectLoop: btcd := NewBtcdRPCConn(nil)
close(btcd.closed)
// Maintain the current btcd connection. After reconnects,
// the current connection should be updated.
for { for {
select { select {
case <-done: case conn := <-updateBtcd:
break selectLoop btcd = conn
case err := <-replies:
switch err { case access := <-accessRPC:
case ErrConnRefused: access.rpc <- btcd
btcdConnected.c <- false
log.Info("btcd connection refused, retying in 5 seconds")
time.Sleep(5 * time.Second)
case ErrConnLost:
btcdConnected.c <- false
log.Info("btcd connection lost, retrying in 5 seconds")
time.Sleep(5 * time.Second)
case nil:
btcdConnected.c <- true
log.Info("Established connection to btcd.")
default:
log.Infof("Unhandled error: %v", err)
}
} }
} }
}()
for {
btcd, err := BtcdConnect(cafile)
if err != nil {
log.Info("Retrying btcd connection in 5 seconds")
time.Sleep(5 * time.Second)
continue
}
updateBtcd <- btcd
NotifyBtcdConnection(frontendNotificationMaster)
log.Info("Established connection to btcd")
// Perform handshake.
if err := Handshake(btcd); err != nil {
var message string
if jsonErr, ok := err.(*btcjson.Error); ok {
message = jsonErr.Message
} else {
message = err.Error()
}
log.Errorf("Cannot complete handshake: %v", message)
log.Info("Retrying btcd connection in 5 seconds")
time.Sleep(5 * time.Second)
continue
}
// Block goroutine until the connection is lost.
<-btcd.closed
NotifyBtcdConnection(frontendNotificationMaster)
log.Info("Lost btcd connection")
} }
} }
var accessRPC = make(chan *AccessCurrentRPCConn)
// AccessCurrentRPCConn is used to access the current RPC connection
// from the goroutine managing btcd-side RPC connections.
type AccessCurrentRPCConn struct {
rpc chan RPCConn
}
// CurrentRPCConn returns the most recently-connected btcd-side
// RPC connection.
func CurrentRPCConn() RPCConn {
access := &AccessCurrentRPCConn{
rpc: make(chan RPCConn),
}
accessRPC <- access
return <-access.rpc
}

807
cmdmgr.go

File diff suppressed because it is too large Load diff

154
rpc.go Normal file
View file

@ -0,0 +1,154 @@
/*
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
// This file implements the RPC connection interface and functions to
// communicate with a bitcoin RPC server.
package main
import (
"github.com/conformal/btcjson"
"github.com/conformal/btcwire"
"github.com/conformal/btcws"
)
// RPCRequest is a type responsible for handling RPC requests and providing
// a method to access the response.
type RPCRequest struct {
request btcjson.Cmd
result interface{}
response chan *RPCResponse
}
// NewRPCRequest creates a new RPCRequest from a btcjson.Cmd. request may be
// nil to create a new var for the result (with types determined by the
// unmarshaling rules described in the json package), or set to a var with
// an expected type (i.e. *btcjson.BlockResult) to directly unmarshal the
// response's result into a convenient type.
func NewRPCRequest(request btcjson.Cmd, result interface{}) *RPCRequest {
return &RPCRequest{
request: request,
result: result,
response: make(chan *RPCResponse),
}
}
// RPCResponse holds a response's result and error returned from sending a
// RPCRequest.
type RPCResponse struct {
// Result will be set to a concrete type (i.e. *btcjson.BlockResult)
// and may be type asserted to that type if a non-nil result was used
// to create the originating RPCRequest. Otherwise, Result will be
// set to new memory allocated by json.Unmarshal, and the type rules
// for unmarshaling described in the json package should be followed
// when type asserting Result.
Result interface{}
// Err points to an unmarshaled error, or nil if result is valid.
Err *btcjson.Error
}
// RPCConn is an interface representing a client connection to a bitcoin RPC
// server.
type RPCConn interface {
// SendRequest sends a bitcoin RPC request, returning a channel to
// read the reply. A channel is used so both synchronous and
// asynchronous RPC can be supported.
SendRequest(request *RPCRequest) chan *RPCResponse
}
// GetBestBlockResult holds the result of a getbestblock response.
//
// TODO(jrick): shove this in btcws.
type GetBestBlockResult struct {
Hash string `json:"hash"`
Height int32 `json:"height"`
}
// GetBestBlock gets both the block height and hash of the best block
// in the main chain.
func GetBestBlock(rpc RPCConn) (*GetBestBlockResult, *btcjson.Error) {
cmd := btcws.NewGetBestBlockCmd(<-NewJSONID)
request := NewRPCRequest(cmd, new(GetBestBlockResult))
response := <-rpc.SendRequest(request)
if response.Err != nil {
return nil, response.Err
}
return response.Result.(*GetBestBlockResult), nil
}
// GetBlock requests details about a block with the given hash.
func GetBlock(rpc RPCConn, blockHash string) (*btcjson.BlockResult, *btcjson.Error) {
// NewGetBlockCmd cannot fail with no optargs, so omit the check.
cmd, _ := btcjson.NewGetBlockCmd(<-NewJSONID, blockHash)
request := NewRPCRequest(cmd, new(btcjson.BlockResult))
response := <-rpc.SendRequest(request)
if response.Err != nil {
return nil, response.Err
}
return response.Result.(*btcjson.BlockResult), nil
}
// GetCurrentNet requests the network a bitcoin RPC server is running on.
func GetCurrentNet(rpc RPCConn) (btcwire.BitcoinNet, *btcjson.Error) {
cmd := btcws.NewGetCurrentNetCmd(<-NewJSONID)
request := NewRPCRequest(cmd, nil)
response := <-rpc.SendRequest(request)
if response.Err != nil {
return 0, response.Err
}
return btcwire.BitcoinNet(uint32(response.Result.(float64))), nil
}
// NotifyNewTXs requests notifications for new transactions that spend
// to any of the addresses in addrs.
func NotifyNewTXs(rpc RPCConn, addrs []string) *btcjson.Error {
cmd := btcws.NewNotifyNewTXsCmd(<-NewJSONID, addrs)
request := NewRPCRequest(cmd, nil)
response := <-rpc.SendRequest(request)
return response.Err
}
// NotifySpent requests notifications for when a transaction is processed which
// spends op.
func NotifySpent(rpc RPCConn, op *btcwire.OutPoint) *btcjson.Error {
cmd := btcws.NewNotifySpentCmd(<-NewJSONID, op)
request := NewRPCRequest(cmd, nil)
response := <-rpc.SendRequest(request)
return response.Err
}
// Rescan requests a blockchain rescan for transactions to any number of
// addresses and notifications to inform wallet about such transactions.
func Rescan(rpc RPCConn, beginBlock int32, addrs map[string]struct{}) *btcjson.Error {
// NewRescanCmd cannot fail with no optargs, so omit the check.
cmd, _ := btcws.NewRescanCmd(<-NewJSONID, beginBlock, addrs)
request := NewRPCRequest(cmd, nil)
response := <-rpc.SendRequest(request)
return response.Err
}
// SendRawTransaction sends a hex-encoded transaction for relay.
func SendRawTransaction(rpc RPCConn, hextx string) (txid string, error *btcjson.Error) {
// NewSendRawTransactionCmd cannot fail, so omit the check.
cmd, _ := btcjson.NewSendRawTransactionCmd(<-NewJSONID, hextx)
request := NewRPCRequest(cmd, new(string))
response := <-rpc.SendRequest(request)
if response.Err != nil {
return "", response.Err
}
return *response.Result.(*string), nil
}

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013 Conformal Systems LLC <info@conformal.com> * Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
* *
* Permission to use, copy, modify, and distribute this software for any * Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above * purpose with or without fee is hereby granted, provided that the above
@ -28,13 +28,13 @@ import (
"crypto/x509" "crypto/x509"
"crypto/x509/pkix" "crypto/x509/pkix"
"encoding/base64" "encoding/base64"
"encoding/hex"
"encoding/json" "encoding/json"
"encoding/pem" "encoding/pem"
"errors" "errors"
"fmt" "fmt"
"github.com/conformal/btcjson" "github.com/conformal/btcjson"
"github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire"
"github.com/conformal/btcws" "github.com/conformal/btcws"
"github.com/conformal/go-socks" "github.com/conformal/go-socks"
"math/big" "math/big"
@ -55,19 +55,6 @@ var (
// process cannot be established. // process cannot be established.
ErrConnLost = errors.New("connection lost") ErrConnLost = errors.New("connection lost")
// Channel for updates and boolean with the most recent update of
// whether the connection to btcd is active or not.
btcdConnected = struct {
b bool
c chan bool
}{
c: make(chan bool),
}
// Channel to send messages btcwallet does not understand and requests
// from btcwallet to btcd.
btcdMsgs = make(chan []byte)
// Adds a frontend listener channel // Adds a frontend listener channel
addFrontendListener = make(chan (chan []byte)) addFrontendListener = make(chan (chan []byte))
@ -76,27 +63,6 @@ var (
// Messages sent to this channel are sent to each connected frontend. // Messages sent to this channel are sent to each connected frontend.
frontendNotificationMaster = make(chan []byte, 100) frontendNotificationMaster = make(chan []byte, 100)
// replyHandlers maps between a unique number (passed as part of
// the JSON Id field) and a function to handle a reply or notification
// from btcd. As requests are received, this map is checked for a
// handler function to route the reply to. If the function returns
// true, the handler is removed from the map.
replyHandlers = struct {
sync.Mutex
m map[uint64]func(interface{}, *btcjson.Error) bool
}{
m: make(map[uint64]func(interface{}, *btcjson.Error) bool),
}
// replyRouter maps unique uint64 ids to reply channels, so btcd
// replies can be routed to the correct frontend.
replyRouter = struct {
sync.Mutex
m map[uint64]chan []byte
}{
m: make(map[uint64]chan []byte),
}
) )
// server holds the items the RPC server may need to access (auth, // server holds the items the RPC server may need to access (auth,
@ -296,24 +262,25 @@ func genKey(key, cert string) error {
// handleRPCRequest processes a JSON-RPC request from a frontend. // handleRPCRequest processes a JSON-RPC request from a frontend.
func (s *server) handleRPCRequest(w http.ResponseWriter, r *http.Request) { func (s *server) handleRPCRequest(w http.ResponseWriter, r *http.Request) {
frontend := make(chan []byte)
body, err := btcjson.GetRaw(r.Body) body, err := btcjson.GetRaw(r.Body)
if err != nil { if err != nil {
log.Errorf("RPCS: Error getting JSON message: %v", err) log.Errorf("RPCS: Error getting JSON message: %v", err)
} }
done := make(chan struct{}) response := ProcessFrontendRequest(body, false)
go func() { mresponse, err := json.Marshal(response)
if _, err := w.Write(<-frontend); err != nil { if err != nil {
log.Warnf("RPCS: could not respond to RPC request: %v", id := response.Id
err) response = &btcjson.Reply{
Id: id,
Error: &btcjson.ErrInternal,
} }
close(done) mresponse, _ = json.Marshal(response)
}() }
ProcessRequest(frontend, body, false) if _, err := w.Write(mresponse); err != nil {
<-done log.Warnf("RPCS: could not respond to RPC request: %v", err)
}
} }
// frontendListenerDuplicator listens for new wallet listener channels // frontendListenerDuplicator listens for new wallet listener channels
@ -339,12 +306,9 @@ func frontendListenerDuplicator() {
frontendListeners[c] = true frontendListeners[c] = true
mtx.Unlock() mtx.Unlock()
// TODO(jrick): these notifications belong somewhere better. NotifyBtcdConnection(c)
// Probably want to copy AddWalletListener from btcd, and bs, err := GetCurBlock()
// place these notifications in that function. if err == nil {
NotifyBtcdConnected(frontendNotificationMaster,
btcdConnected.b)
if bs, err := GetCurBlock(); err == nil {
NotifyNewBlockChainHeight(c, bs) NotifyNewBlockChainHeight(c, bs)
NotifyBalances(c) NotifyBalances(c)
} }
@ -360,15 +324,7 @@ func frontendListenerDuplicator() {
// Duplicate all messages sent across frontendNotificationMaster, as // Duplicate all messages sent across frontendNotificationMaster, as
// well as internal btcwallet notifications, to each listening wallet. // well as internal btcwallet notifications, to each listening wallet.
for { for {
var ntfn []byte ntfn := <-frontendNotificationMaster
select {
case conn := <-btcdConnected.c:
NotifyBtcdConnected(frontendNotificationMaster, conn)
continue
case ntfn = <-frontendNotificationMaster:
}
mtx.Lock() mtx.Lock()
for c := range frontendListeners { for c := range frontendListeners {
@ -378,13 +334,15 @@ func frontendListenerDuplicator() {
} }
} }
// NotifyBtcdConnected notifies all frontends of a new btcd connection. // NotifyBtcdConnection notifies a frontend of the current connection
func NotifyBtcdConnected(reply chan []byte, conn bool) { // status of btcwallet to btcd.
btcdConnected.b = conn func NotifyBtcdConnection(reply chan []byte) {
if btcd, ok := CurrentRPCConn().(*BtcdRPCConn); ok {
ntfn := btcws.NewBtcdConnectedNtfn(btcd.Connected())
mntfn, _ := ntfn.MarshalJSON()
reply <- mntfn
}
ntfn := btcws.NewBtcdConnectedNtfn(conn)
mntfn, _ := ntfn.MarshalJSON()
frontendNotificationMaster <- mntfn
} }
// frontendSendRecv is the handler function for websocket connections from // frontendSendRecv is the handler function for websocket connections from
@ -425,7 +383,12 @@ func frontendSendRecv(ws *websocket.Conn) {
return return
} }
// Handle request here. // Handle request here.
go ProcessRequest(frontendNotification, m, true) go func() {
reply := ProcessFrontendRequest(m, true)
mreply, _ := json.Marshal(reply)
frontendNotification <- mreply
}()
case ntfn, _ := <-frontendNotification: case ntfn, _ := <-frontendNotification:
if err := websocket.Message.Send(ws, ntfn); err != nil { if err := websocket.Message.Send(ws, ntfn); err != nil {
// Frontend disconnected. // Frontend disconnected.
@ -435,170 +398,6 @@ func frontendSendRecv(ws *websocket.Conn) {
} }
} }
// BtcdHandler listens for replies and notifications from btcd over a
// websocket and sends messages that btcwallet does not understand to
// btcd. Unlike FrontendHandler, exactly one BtcdHandler goroutine runs.
// BtcdHandler spawns goroutines to perform these tasks, and closes the
// done channel once they are finished.
func BtcdHandler(ws *websocket.Conn, done chan struct{}) {
// Listen for replies/notifications from btcd, and decide how to handle them.
replies := make(chan []byte)
go func() {
for {
var m []byte
if err := websocket.Message.Receive(ws, &m); err != nil {
log.Debugf("cannot recevie btcd message: %v", err)
close(replies)
return
}
replies <- m
}
}()
go func() {
defer close(done)
for {
select {
case rply, ok := <-replies:
if !ok {
// btcd disconnected
return
}
// Handle message here.
go ProcessBtcdNotificationReply(rply)
case r := <-btcdMsgs:
if err := websocket.Message.Send(ws, r); err != nil {
// btcd disconnected.
log.Errorf("Unable to send message to btcd: %v", err)
return
}
}
}
}()
}
type notificationHandler func(btcjson.Cmd, []byte)
var notificationHandlers = map[string]notificationHandler{
btcws.BlockConnectedNtfnMethod: NtfnBlockConnected,
btcws.BlockDisconnectedNtfnMethod: NtfnBlockDisconnected,
btcws.TxMinedNtfnMethod: NtfnTxMined,
}
// ProcessBtcdNotificationReply unmarshalls the JSON notification or
// reply received from btcd and decides how to handle it. Replies are
// routed back to the frontend who sent the message, and wallet
// notifications are processed by btcwallet, and frontend notifications
// are sent to every connected frontend.
func ProcessBtcdNotificationReply(b []byte) {
// Idea: instead of reading btcd messages from just one websocket
// connection, maybe use two so the same connection isn't used
// for both notifications and responses? Should make handling
// must faster as unnecessary unmarshal attempts could be avoided.
// Check for notifications first.
if req, err := btcjson.ParseMarshaledCmd(b); err == nil {
// btcd should not be sending Requests except for
// notifications. Check for a nil id.
if req.Id() != nil {
// Invalid response
log.Warnf("btcd sent a non-notification JSON-RPC Request (ID: %v)",
req.Id())
return
}
// Message is a btcd notification. Check the method and dispatch
// correct handler, or if no handler, pass up to each wallet.
if ntfnHandler, ok := notificationHandlers[req.Method()]; ok {
ntfnHandler(req, b)
} else {
// No handler; send to all wallets.
frontendNotificationMaster <- b
}
return
}
// b is not a Request notification, so it must be a Response.
// Attempt to parse it as one and handle.
var r btcjson.Reply
if err := json.Unmarshal(b, &r); err != nil {
log.Warn("Unable to process btcd message as notification or response")
return
}
// Check for a valid ID.
//
// TODO(jrick): Remove this terrible ID overloading. Each
// passed-through request should be given a new unique ID number
// (reading from the NewJSONID channel) and a reply route with the
// frontend's incoming ID should be set.
if r.Id == nil {
// Responses with no IDs cannot be handled.
log.Warn("Unable to process btcd response without ID")
return
}
idStr, ok := (*r.Id).(string)
if !ok {
// btcd's responses to btcwallet should (currently, see TODO above)
// only ever be sending string IDs. If ID is not a string, log the
// error and drop the message.
log.Error("Incorrect btcd notification id type.")
return
}
var routeID uint64
var origID string
n, _ := fmt.Sscanf(idStr, "btcwallet(%d)-%s", &routeID, &origID)
if n == 1 {
// Request originated from btcwallet. Run and remove correct
// handler.
replyHandlers.Lock()
f := replyHandlers.m[routeID]
replyHandlers.Unlock()
if f != nil {
go func() {
if f(r.Result, r.Error) {
replyHandlers.Lock()
delete(replyHandlers.m, routeID)
replyHandlers.Unlock()
}
}()
}
} else if n == 2 {
// Attempt to route btcd reply to correct frontend.
replyRouter.Lock()
c := replyRouter.m[routeID]
if c != nil {
delete(replyRouter.m, routeID)
} else {
// Can't route to a frontend, drop reply.
replyRouter.Unlock()
log.Info("Unable to route btcd reply to frontend. Dropping.")
return
}
replyRouter.Unlock()
// Convert string back to number if possible.
var origIDNum float64
n, _ := fmt.Sscanf(origID, "%f", &origIDNum)
var id interface{}
if n == 1 {
id = origIDNum
} else {
id = origID
}
r.Id = &id
b, err := json.Marshal(r)
if err != nil {
log.Error("Error marshalling btcd reply. Dropping.")
return
}
c <- b
}
}
// NotifyNewBlockChainHeight notifies all frontends of a new // NotifyNewBlockChainHeight notifies all frontends of a new
// blockchain height. This sends the same notification as // blockchain height. This sends the same notification as
// btcd, so this can probably be removed. // btcd, so this can probably be removed.
@ -608,110 +407,6 @@ func NotifyNewBlockChainHeight(reply chan []byte, bs wallet.BlockStamp) {
reply <- mntfn reply <- mntfn
} }
// NtfnBlockConnected handles btcd notifications resulting from newly
// connected blocks to the main blockchain.
//
// TODO(jrick): Send block time with notification. This will be used
// to mark wallet files with a possibly-better earliest block height,
// and will greatly reduce rescan times for wallets created with an
// out of sync btcd.
func NtfnBlockConnected(n btcjson.Cmd, marshaled []byte) {
bcn, ok := n.(*btcws.BlockConnectedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
hash, err := btcwire.NewShaHashFromStr(bcn.Hash)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
// Update the blockstamp for the newly-connected block.
bs := &wallet.BlockStamp{
Height: bcn.Height,
Hash: *hash,
}
curBlock.Lock()
curBlock.BlockStamp = *bs
curBlock.Unlock()
// btcd notifies btcwallet about transactions first, and then sends
// the new block notification. New balance notifications for txs
// in blocks are therefore sent here after all tx notifications
// have arrived and finished being processed by the handlers.
workers := NotifyBalanceRequest{
block: *hash,
wg: make(chan *sync.WaitGroup),
}
NotifyBalanceSyncerChans.access <- workers
if wg := <-workers.wg; wg != nil {
wg.Wait()
NotifyBalanceSyncerChans.remove <- *hash
}
accountstore.BlockNotify(bs)
// Pass notification to frontends too.
frontendNotificationMaster <- marshaled
}
// NtfnBlockDisconnected handles btcd notifications resulting from
// blocks disconnected from the main chain in the event of a chain
// switch and notifies frontends of the new blockchain height.
func NtfnBlockDisconnected(n btcjson.Cmd, marshaled []byte) {
bdn, ok := n.(*btcws.BlockDisconnectedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
hash, err := btcwire.NewShaHashFromStr(bdn.Hash)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
// Rollback Utxo and Tx data stores.
go func() {
accountstore.Rollback(bdn.Height, hash)
}()
// Pass notification to frontends too.
frontendNotificationMaster <- marshaled
}
// NtfnTxMined handles btcd notifications resulting from newly
// mined transactions that originated from this wallet.
func NtfnTxMined(n btcjson.Cmd, marshaled []byte) {
tmn, ok := n.(*btcws.TxMinedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
txid, err := btcwire.NewShaHashFromStr(tmn.TxID)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
blockhash, err := btcwire.NewShaHashFromStr(tmn.BlockHash)
if err != nil {
log.Errorf("%v handler: invalid block hash string", n.Method())
return
}
err = accountstore.RecordMinedTx(txid, blockhash,
tmn.BlockHeight, tmn.Index, tmn.BlockTime)
if err != nil {
log.Errorf("%v handler: %v", n.Method(), err)
return
}
// Remove mined transaction from pool.
UnminedTxs.Lock()
delete(UnminedTxs.m, TXID(*txid))
UnminedTxs.Unlock()
}
var duplicateOnce sync.Once var duplicateOnce sync.Once
// Start starts a HTTP server to provide standard RPC and extension // Start starts a HTTP server to provide standard RPC and extension
@ -776,17 +471,15 @@ func (s *server) checkAuth(r *http.Request) error {
return nil return nil
} }
// BtcdConnect connects to a running btcd instance over a websocket // BtcdWS opens a websocket connection to a btcd instance.
// for sending and receiving chain-related messages, failing if the func BtcdWS(certificates []byte) (*websocket.Conn, error) {
// connection cannot be established or is lost.
func BtcdConnect(certificates []byte, reply chan error) {
url := fmt.Sprintf("wss://%s/wallet", cfg.Connect) url := fmt.Sprintf("wss://%s/wallet", cfg.Connect)
config, err := websocket.NewConfig(url, "https://localhost/") config, err := websocket.NewConfig(url, "https://localhost/")
if err != nil { if err != nil {
reply <- ErrConnRefused return nil, err
return
} }
// btcd uses a self-signed TLS certifiate which is used as the CA.
pool := x509.NewCertPool() pool := x509.NewCertPool()
pool.AppendCertsFromPEM(certificates) pool.AppendCertsFromPEM(certificates)
config.TlsConfig = &tls.Config{ config.TlsConfig = &tls.Config{
@ -794,14 +487,13 @@ func BtcdConnect(certificates []byte, reply chan error) {
MinVersion: tls.VersionTLS12, MinVersion: tls.VersionTLS12,
} }
// btcd requires basic authorization, so we use a custom config with // btcd requires basic authorization, so set the Authorization header.
// the Authorization header set.
login := cfg.Username + ":" + cfg.Password login := cfg.Username + ":" + cfg.Password
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
config.Header.Add("Authorization", auth) config.Header.Add("Authorization", auth)
// Attempt to connect to running btcd instance. Bail if it fails. // Dial connection.
var btcdws *websocket.Conn var ws *websocket.Conn
var cerr error var cerr error
if cfg.Proxy != "" { if cfg.Proxy != "" {
proxy := &socks.Proxy{ proxy := &socks.Proxy{
@ -811,105 +503,66 @@ func BtcdConnect(certificates []byte, reply chan error) {
} }
conn, err := proxy.Dial("tcp", cfg.Connect) conn, err := proxy.Dial("tcp", cfg.Connect)
if err != nil { if err != nil {
log.Warnf("Error connecting to proxy: %v", err) return nil, err
reply <- ErrConnRefused
return
} }
tlsConn := tls.Client(conn, config.TlsConfig) tlsConn := tls.Client(conn, config.TlsConfig)
btcdws, cerr = websocket.NewClient(config, tlsConn) ws, cerr = websocket.NewClient(config, tlsConn)
} else { } else {
btcdws, cerr = websocket.DialConfig(config) ws, cerr = websocket.DialConfig(config)
} }
if cerr != nil { if cerr != nil {
log.Errorf("%s", cerr) return nil, cerr
reply <- ErrConnRefused
return
} }
reply <- nil return ws, nil
// Remove all reply handlers (if any exist from an old connection).
replyHandlers.Lock()
for k := range replyHandlers.m {
delete(replyHandlers.m, k)
}
replyHandlers.Unlock()
done := make(chan struct{})
BtcdHandler(btcdws, done)
if err := BtcdHandshake(btcdws); err != nil {
log.Errorf("%v", err)
reply <- ErrConnRefused
return
}
// done is closed when BtcdHandler's goroutines are finished.
<-done
reply <- ErrConnLost
} }
// resendUnminedTxs resends any transactions in the unmined // BtcdConnect connects to a running btcd instance over a websocket
// transaction pool to btcd using the 'sendrawtransaction' RPC // for sending and receiving chain-related messages, failing if the
// command. // connection cannot be established or is lost.
func BtcdConnect(certificates []byte) (*BtcdRPCConn, error) {
// Open websocket connection.
ws, err := BtcdWS(certificates)
if err != nil {
log.Errorf("Cannot open websocket connection to btcd: %v", err)
return nil, err
}
// Create and start RPC connection using the btcd websocket.
rpc := NewBtcdRPCConn(ws)
rpc.Start()
return rpc, nil
}
// resendUnminedTxs resends any transactions in the unmined transaction
// pool to btcd using the 'sendrawtransaction' RPC command.
func resendUnminedTxs() { func resendUnminedTxs() {
for _, createdTx := range UnminedTxs.m { for _, createdTx := range UnminedTxs.m {
n := <-NewJSONID hextx := hex.EncodeToString(createdTx.rawTx)
var id interface{} = fmt.Sprintf("btcwallet(%v)", n) if txid, err := SendRawTransaction(CurrentRPCConn(), hextx); err != nil {
m, err := btcjson.CreateMessageWithId("sendrawtransaction", id, string(createdTx.rawTx)) // TODO(jrick): Check error for if this tx is a double spend,
if err != nil { // remove it if so.
log.Errorf("cannot create resend request: %v", err) } else {
continue log.Debugf("Resent unmined transaction %v", txid)
} }
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool {
// Do nothing, just remove the handler.
return true
}
replyHandlers.Unlock()
btcdMsgs <- m
} }
} }
// BtcdHandshake first checks that the websocket connection between // Handshake first checks that the websocket connection between btcwallet and
// btcwallet and btcd is valid, that is, that there are no mismatching // btcd is valid, that is, that there are no mismatching settings between
// settings between the two processes (such as running on different // the two processes (such as running on different Bitcoin networks). If the
// Bitcoin networks). If the sanity checks pass, all wallets are set to // sanity checks pass, all wallets are set to be tracked against chain
// be tracked against chain notifications from this btcd connection. // notifications from this btcd connection.
// //
// TODO(jrick): Track and Rescan commands should be replaced with a // TODO(jrick): Track and Rescan commands should be replaced with a
// single TrackSince function (or similar) which requests address // single TrackSince function (or similar) which requests address
// notifications and performs the rescan since some block height. // notifications and performs the rescan since some block height.
func BtcdHandshake(ws *websocket.Conn) error { func Handshake(rpc RPCConn) error {
n := <-NewJSONID net, jsonErr := GetCurrentNet(rpc)
var cmd btcjson.Cmd if jsonErr != nil {
cmd = btcws.NewGetCurrentNetCmd(fmt.Sprintf("btcwallet(%v)", n)) return jsonErr
mcmd, err := cmd.MarshalJSON()
if err != nil {
return fmt.Errorf("cannot complete btcd handshake: %v", err)
} }
if net != cfg.Net() {
correctNetwork := make(chan bool)
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool {
fnet, ok := result.(float64)
if !ok {
log.Error("btcd handshake: result is not a number")
correctNetwork <- false
return true
}
correctNetwork <- btcwire.BitcoinNet(fnet) == cfg.Net()
// No additional replies expected, remove handler.
return true
}
replyHandlers.Unlock()
btcdMsgs <- mcmd
if !<-correctNetwork {
return errors.New("btcd and btcwallet running on different Bitcoin networks") return errors.New("btcd and btcwallet running on different Bitcoin networks")
} }
@ -928,7 +581,7 @@ func BtcdHandshake(ws *websocket.Conn) error {
// track recently-seen blocks. // track recently-seen blocks.
a, err := accountstore.Account("") a, err := accountstore.Account("")
if err != nil { if err != nil {
return nil return err
} }
// TODO(jrick): if height is less than the earliest-saved block // TODO(jrick): if height is less than the earliest-saved block
@ -942,27 +595,8 @@ func BtcdHandshake(ws *websocket.Conn) error {
log.Debugf("Checking for previous saved block with height %v hash %v", log.Debugf("Checking for previous saved block with height %v hash %v",
bs.Height, bs.Hash) bs.Height, bs.Hash)
n = <-NewJSONID _, err := GetBlock(rpc, bs.Hash.String())
// NewGetBlockCmd can't fail, so don't check error. if err != nil {
// TODO(jrick): probably want to remove the error return value.
cmd, _ = btcjson.NewGetBlockCmd(fmt.Sprintf("btcwallet(%v)", n),
bs.Hash.String())
mcmd, _ = cmd.MarshalJSON()
blockMissing := make(chan bool)
replyHandlers.Lock()
replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool {
blockMissing <- err != nil && err.Code == btcjson.ErrBlockNotFound.Code
// No additional replies expected, remove handler.
return true
}
replyHandlers.Unlock()
btcdMsgs <- mcmd
if <-blockMissing {
continue continue
} }