parent
7ec4e96c6b
commit
ec92578194
9 changed files with 1468 additions and 1997 deletions
108
account.go
108
account.go
|
@ -19,7 +19,6 @@ package main
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"github.com/conformal/btcjson"
|
||||
"github.com/conformal/btcutil"
|
||||
|
@ -43,7 +42,7 @@ type Account struct {
|
|||
func (a *Account) Lock() error {
|
||||
switch err := a.Wallet.Lock(); err {
|
||||
case nil:
|
||||
NotifyWalletLockStateChange(a.Name(), true)
|
||||
server.NotifyWalletLockStateChange(a.Name(), true)
|
||||
return nil
|
||||
|
||||
case wallet.ErrWalletLocked:
|
||||
|
@ -61,7 +60,7 @@ func (a *Account) Unlock(passphrase []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
NotifyWalletLockStateChange(a.Name(), false)
|
||||
server.NotifyWalletLockStateChange(a.Name(), false)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -436,15 +435,24 @@ func (a *Account) exportBase64() (map[string]string, error) {
|
|||
// Track requests btcd to send notifications of new transactions for
|
||||
// each address stored in a wallet.
|
||||
func (a *Account) Track() {
|
||||
// Request notifications for transactions sending to all wallet
|
||||
// addresses.
|
||||
addrs := a.ActiveAddresses()
|
||||
addrstrs := make([]string, 0, len(addrs))
|
||||
for addr := range addrs {
|
||||
addrstrs = append(addrstrs, addr.EncodeAddress())
|
||||
client, err := accessClient()
|
||||
if err != nil {
|
||||
log.Errorf("No chain server client to track addresses.")
|
||||
return
|
||||
}
|
||||
|
||||
if err := NotifyReceived(CurrentServerConn(), addrstrs); err != nil {
|
||||
// Request notifications for transactions sending to all wallet
|
||||
// addresses.
|
||||
//
|
||||
// TODO: return as slice? (doesn't have to be ordered, or
|
||||
// SortedActiveAddresses would be fine.)
|
||||
addrMap := a.ActiveAddresses()
|
||||
addrs := make([]btcutil.Address, 0, len(addrMap))
|
||||
for addr := range addrMap {
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
|
||||
if err := client.NotifyReceived(addrs); err != nil {
|
||||
log.Error("Unable to request transaction updates for address.")
|
||||
}
|
||||
|
||||
|
@ -492,25 +500,23 @@ func (a *Account) RescanActiveJob() (*RescanJob, error) {
|
|||
// credits that are not known to have been mined into a block, and attempts
|
||||
// to send each to the chain server for relay.
|
||||
func (a *Account) ResendUnminedTxs() {
|
||||
txs := a.TxStore.UnminedDebitTxs()
|
||||
txBuf := bytes.Buffer{}
|
||||
for _, tx := range txs {
|
||||
if err := tx.MsgTx().Serialize(&txBuf); err != nil {
|
||||
// Writing to a bytes.Buffer panics for OOM, and should
|
||||
// not return any other errors.
|
||||
panic(err)
|
||||
client, err := accessClient()
|
||||
if err != nil {
|
||||
log.Errorf("No chain server client to resend txs.")
|
||||
return
|
||||
}
|
||||
hextx := hex.EncodeToString(txBuf.Bytes())
|
||||
txsha, err := SendRawTransaction(CurrentServerConn(), hextx)
|
||||
|
||||
txs := a.TxStore.UnminedDebitTxs()
|
||||
for _, tx := range txs {
|
||||
txsha, err := client.SendRawTransaction(tx.MsgTx(), false)
|
||||
if err != nil {
|
||||
// TODO(jrick): Check error for if this tx is a double spend,
|
||||
// remove it if so.
|
||||
log.Warnf("Could not resend transaction %v: %v",
|
||||
txsha, err)
|
||||
} else {
|
||||
log.Debugf("Resent unmined transaction %v", txsha)
|
||||
continue
|
||||
}
|
||||
txBuf.Reset()
|
||||
log.Debugf("Resent unmined transaction %v", txsha)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -564,7 +570,13 @@ func (a *Account) NewAddress() (btcutil.Address, error) {
|
|||
AcctMgr.MarkAddressForAccount(addr, a)
|
||||
|
||||
// Request updates from btcd for new transactions sent to this address.
|
||||
a.ReqNewTxsForAddress(addr)
|
||||
client, err := accessClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := client.NotifyReceived([]btcutil.Address{addr}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return addr, nil
|
||||
}
|
||||
|
@ -593,7 +605,13 @@ func (a *Account) NewChangeAddress() (btcutil.Address, error) {
|
|||
AcctMgr.MarkAddressForAccount(addr, a)
|
||||
|
||||
// Request updates from btcd for new transactions sent to this address.
|
||||
a.ReqNewTxsForAddress(addr)
|
||||
client, err := accessClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := client.NotifyReceived([]btcutil.Address{addr}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return addr, nil
|
||||
}
|
||||
|
@ -612,41 +630,25 @@ func (a *Account) RecoverAddresses(n int) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addrStrs := make([]string, 0, len(addrs))
|
||||
for i := range addrs {
|
||||
addrStrs = append(addrStrs, addrs[i].EncodeAddress())
|
||||
}
|
||||
|
||||
// Run a goroutine to rescan blockchain for recovered addresses.
|
||||
go func(addrs []string) {
|
||||
err := Rescan(CurrentServerConn(), lastInfo.FirstBlock(),
|
||||
addrs, nil)
|
||||
go func() {
|
||||
client, err := accessClient()
|
||||
if err != nil {
|
||||
log.Errorf("Cannot access chain server client to " +
|
||||
"rescan recovered addresses.")
|
||||
return
|
||||
}
|
||||
err = client.Rescan(lastInfo.FirstBlock(), addrs, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Rescanning for recovered addresses "+
|
||||
"failed: %v", err)
|
||||
}
|
||||
}(addrStrs)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReqNewTxsForAddress sends a message to btcd to request tx updates
|
||||
// for addr for each new block that is added to the blockchain.
|
||||
func (a *Account) ReqNewTxsForAddress(addr btcutil.Address) {
|
||||
// Only support P2PKH addresses currently.
|
||||
apkh, ok := addr.(*btcutil.AddressPubKeyHash)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("Requesting notifications of TXs sending to address %v", apkh)
|
||||
|
||||
err := NotifyReceived(CurrentServerConn(), []string{apkh.EncodeAddress()})
|
||||
if err != nil {
|
||||
log.Error("Unable to request transaction updates for address.")
|
||||
}
|
||||
}
|
||||
|
||||
// ReqSpentUtxoNtfns sends a message to btcd to request updates for when
|
||||
// a stored UTXO has been spent.
|
||||
func ReqSpentUtxoNtfns(credits []*txstore.Credit) {
|
||||
|
@ -658,7 +660,13 @@ func ReqSpentUtxoNtfns(credits []*txstore.Credit) {
|
|||
ops = append(ops, op)
|
||||
}
|
||||
|
||||
if err := NotifySpent(CurrentServerConn(), ops); err != nil {
|
||||
client, err := accessClient()
|
||||
if err != nil {
|
||||
log.Errorf("Cannot access chain server client to " +
|
||||
"request spent output notifications.")
|
||||
return
|
||||
}
|
||||
if err := client.NotifySpent(ops); err != nil {
|
||||
log.Errorf("Cannot request notifications for spent outputs: %v",
|
||||
err)
|
||||
}
|
||||
|
|
20
acctmgr.go
20
acctmgr.go
|
@ -496,6 +496,10 @@ func (am *AccountManager) rescanListener() {
|
|||
|
||||
noun := pickNoun(n, "address", "addresses")
|
||||
log.Infof("Finished rescan for %d %s", n, noun)
|
||||
|
||||
default:
|
||||
// Unexpected rescan message type.
|
||||
panic(e)
|
||||
}
|
||||
AcctMgr.Release()
|
||||
}
|
||||
|
@ -636,9 +640,8 @@ func (am *AccountManager) BlockNotify(bs *wallet.BlockStamp) {
|
|||
// changes, or sending these notifications as the utxos are added.
|
||||
confirmed := a.CalculateBalance(1)
|
||||
unconfirmed := a.CalculateBalance(0) - confirmed
|
||||
NotifyWalletBalance(allClients, a.name, confirmed)
|
||||
NotifyWalletBalanceUnconfirmed(allClients, a.name,
|
||||
unconfirmed)
|
||||
server.NotifyWalletBalance(a.name, confirmed)
|
||||
server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed)
|
||||
|
||||
// If this is the default account, update the block all accounts
|
||||
// are synced with, and schedule a wallet write.
|
||||
|
@ -824,17 +827,6 @@ func (am *AccountManager) DumpWIFPrivateKey(addr btcutil.Address) (string, error
|
|||
return a.DumpWIFPrivateKey(addr)
|
||||
}
|
||||
|
||||
// NotifyBalances notifies a wallet frontend of all confirmed and unconfirmed
|
||||
// account balances.
|
||||
func (am *AccountManager) NotifyBalances(frontend chan []byte) {
|
||||
for _, a := range am.AllAccounts() {
|
||||
balance := a.CalculateBalance(1)
|
||||
unconfirmed := a.CalculateBalance(0) - balance
|
||||
NotifyWalletBalance(frontend, a.name, balance)
|
||||
NotifyWalletBalanceUnconfirmed(frontend, a.name, unconfirmed)
|
||||
}
|
||||
}
|
||||
|
||||
// ListAccounts returns a map of account names to their current account
|
||||
// balances. The balances are calculated using minconf confirmations.
|
||||
func (am *AccountManager) ListAccounts(minconf int) map[string]float64 {
|
||||
|
|
184
cmd.go
184
cmd.go
|
@ -17,10 +17,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/conformal/btcjson"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwallet/wallet"
|
||||
"github.com/conformal/btcwire"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
|
@ -28,10 +25,15 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwallet/wallet"
|
||||
"github.com/conformal/btcwire"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg *config
|
||||
server *rpcServer
|
||||
|
||||
curBlock = struct {
|
||||
sync.RWMutex
|
||||
|
@ -54,7 +56,12 @@ func GetCurBlock() (wallet.BlockStamp, error) {
|
|||
return bs, nil
|
||||
}
|
||||
|
||||
bb, err := GetBestBlock(CurrentServerConn())
|
||||
var bbHash *btcwire.ShaHash
|
||||
var bbHeight int32
|
||||
client, err := accessClient()
|
||||
if err == nil {
|
||||
bbHash, bbHeight, err = client.GetBestBlock()
|
||||
}
|
||||
if err != nil {
|
||||
unknown := wallet.BlockStamp{
|
||||
Height: int32(btcutil.BlockHeightUnknown),
|
||||
|
@ -62,18 +69,11 @@ func GetCurBlock() (wallet.BlockStamp, error) {
|
|||
return unknown, err
|
||||
}
|
||||
|
||||
hash, err := btcwire.NewShaHashFromStr(bb.Hash)
|
||||
if err != nil {
|
||||
return wallet.BlockStamp{
|
||||
Height: int32(btcutil.BlockHeightUnknown),
|
||||
}, err
|
||||
}
|
||||
|
||||
curBlock.Lock()
|
||||
if bb.Height > curBlock.BlockStamp.Height {
|
||||
if bbHeight > curBlock.BlockStamp.Height {
|
||||
bs = wallet.BlockStamp{
|
||||
Height: bb.Height,
|
||||
Hash: *hash,
|
||||
Height: bbHeight,
|
||||
Hash: *bbHash,
|
||||
}
|
||||
curBlock.BlockStamp = bs
|
||||
}
|
||||
|
@ -81,19 +81,49 @@ func GetCurBlock() (wallet.BlockStamp, error) {
|
|||
return bs, nil
|
||||
}
|
||||
|
||||
// NewJSONID is used to receive the next unique JSON ID for btcd
|
||||
// requests, starting from zero and incrementing by one after each
|
||||
// read.
|
||||
var NewJSONID = make(chan uint64)
|
||||
var clientAccessChan = make(chan *rpcClient)
|
||||
|
||||
// JSONIDGenerator sends incremental integers across a channel. This
|
||||
// is meant to provide a unique value for the JSON ID field for btcd
|
||||
// messages.
|
||||
func JSONIDGenerator(c chan uint64) {
|
||||
var n uint64
|
||||
func clientAccess(newClient <-chan *rpcClient) {
|
||||
var client *rpcClient
|
||||
for {
|
||||
c <- n
|
||||
n++
|
||||
select {
|
||||
case c := <-newClient:
|
||||
client = c
|
||||
case clientAccessChan <- client:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func accessClient() (*rpcClient, error) {
|
||||
c := <-clientAccessChan
|
||||
if c == nil {
|
||||
return nil, errors.New("chain server disconnected")
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func clientConnect(certs []byte, newClient chan<- *rpcClient) {
|
||||
const initialWait = 5 * time.Second
|
||||
wait := initialWait
|
||||
for {
|
||||
client, err := newRPCClient(certs)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to open chain server client "+
|
||||
"connection: %v", err)
|
||||
time.Sleep(wait)
|
||||
wait <<= 1
|
||||
if wait > time.Minute {
|
||||
wait = time.Minute
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
wait = initialWait
|
||||
|
||||
client.Start()
|
||||
newClient <- client
|
||||
|
||||
client.WaitForShutdown()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,21 +159,20 @@ func main() {
|
|||
}()
|
||||
}
|
||||
|
||||
// Read CA file to verify a btcd TLS connection.
|
||||
certs, err := ioutil.ReadFile(cfg.CAFile)
|
||||
if err != nil {
|
||||
log.Errorf("cannot open CA file: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Check and update any old file locations.
|
||||
updateOldFileLocations()
|
||||
|
||||
// Start account manager and open accounts.
|
||||
AcctMgr.Start()
|
||||
|
||||
// Read CA file to verify a btcd TLS connection.
|
||||
cafile, err := ioutil.ReadFile(cfg.CAFile)
|
||||
if err != nil {
|
||||
log.Errorf("cannot open CA file: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
go func() {
|
||||
s, err := newServer(cfg.SvrListeners)
|
||||
server, err = newRPCServer(cfg.SvrListeners)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to create HTTP server: %v", err)
|
||||
os.Exit(1)
|
||||
|
@ -151,15 +180,7 @@ func main() {
|
|||
|
||||
// Start HTTP server to listen and send messages to frontend and btcd
|
||||
// backend. Try reconnection if connection failed.
|
||||
s.Start()
|
||||
}()
|
||||
|
||||
// Begin generating new IDs for JSON calls.
|
||||
go JSONIDGenerator(NewJSONID)
|
||||
|
||||
// Begin RPC server goroutines.
|
||||
go RPCGateway()
|
||||
go WalletRequestProcessor()
|
||||
server.Start()
|
||||
|
||||
// Begin maintanence goroutines.
|
||||
go SendBeforeReceiveHistorySync(SendTxHistSyncChans.add,
|
||||
|
@ -173,76 +194,7 @@ func main() {
|
|||
NotifyBalanceSyncerChans.remove,
|
||||
NotifyBalanceSyncerChans.access)
|
||||
|
||||
updateBtcd := make(chan *BtcdRPCConn)
|
||||
go func() {
|
||||
// Create an RPC connection and close the closed channel.
|
||||
//
|
||||
// It might be a better idea to create a new concrete type
|
||||
// just for an always disconnected RPC connection and begin
|
||||
// with that.
|
||||
btcd := NewBtcdRPCConn(nil)
|
||||
close(btcd.closed)
|
||||
|
||||
// Maintain the current btcd connection. After reconnects,
|
||||
// the current connection should be updated.
|
||||
for {
|
||||
select {
|
||||
case conn := <-updateBtcd:
|
||||
btcd = conn
|
||||
|
||||
case access := <-accessServer:
|
||||
access.server <- btcd
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
btcd, err := BtcdConnect(cafile)
|
||||
if err != nil {
|
||||
log.Info("Retrying btcd connection in 5 seconds")
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
updateBtcd <- btcd
|
||||
|
||||
NotifyBtcdConnection(allClients)
|
||||
log.Info("Established connection to btcd")
|
||||
|
||||
// Perform handshake.
|
||||
if err := Handshake(btcd); err != nil {
|
||||
var message string
|
||||
if jsonErr, ok := err.(btcjson.Error); ok {
|
||||
message = jsonErr.Message
|
||||
} else {
|
||||
message = err.Error()
|
||||
}
|
||||
log.Errorf("Cannot complete handshake: %v", message)
|
||||
log.Info("Retrying btcd connection in 5 seconds")
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// Block goroutine until the connection is lost.
|
||||
<-btcd.closed
|
||||
NotifyBtcdConnection(allClients)
|
||||
log.Info("Lost btcd connection")
|
||||
}
|
||||
}
|
||||
|
||||
var accessServer = make(chan *AccessCurrentServerConn)
|
||||
|
||||
// AccessCurrentServerConn is used to access the current RPC connection
|
||||
// from the goroutine managing btcd-side RPC connections.
|
||||
type AccessCurrentServerConn struct {
|
||||
server chan ServerConn
|
||||
}
|
||||
|
||||
// CurrentServerConn returns the most recently-connected btcd-side
|
||||
// RPC connection.
|
||||
func CurrentServerConn() ServerConn {
|
||||
access := &AccessCurrentServerConn{
|
||||
server: make(chan ServerConn),
|
||||
}
|
||||
accessServer <- access
|
||||
return <-access.server
|
||||
clientChan := make(chan *rpcClient)
|
||||
go clientAccess(clientChan)
|
||||
clientConnect(certs, clientChan)
|
||||
}
|
||||
|
|
290
ntfns.go
290
ntfns.go
|
@ -1,290 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
||||
*
|
||||
* Permission to use, copy, modify, and distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
* copyright notice and this permission notice appear in all copies.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
|
||||
// This file implements the notification handlers for btcd-side notifications.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/conformal/btcjson"
|
||||
"github.com/conformal/btcscript"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwallet/txstore"
|
||||
"github.com/conformal/btcwallet/wallet"
|
||||
"github.com/conformal/btcwire"
|
||||
"github.com/conformal/btcws"
|
||||
)
|
||||
|
||||
func parseBlock(block *btcws.BlockDetails) (*txstore.Block, int, error) {
|
||||
if block == nil {
|
||||
return nil, btcutil.TxIndexUnknown, nil
|
||||
}
|
||||
blksha, err := btcwire.NewShaHashFromStr(block.Hash)
|
||||
if err != nil {
|
||||
return nil, btcutil.TxIndexUnknown, err
|
||||
}
|
||||
b := &txstore.Block{
|
||||
Height: block.Height,
|
||||
Hash: *blksha,
|
||||
Time: time.Unix(block.Time, 0),
|
||||
}
|
||||
return b, block.Index, nil
|
||||
}
|
||||
|
||||
type notificationHandler func(btcjson.Cmd) error
|
||||
|
||||
var notificationHandlers = map[string]notificationHandler{
|
||||
btcws.BlockConnectedNtfnMethod: NtfnBlockConnected,
|
||||
btcws.BlockDisconnectedNtfnMethod: NtfnBlockDisconnected,
|
||||
btcws.RecvTxNtfnMethod: NtfnRecvTx,
|
||||
btcws.RedeemingTxNtfnMethod: NtfnRedeemingTx,
|
||||
btcws.RescanProgressNtfnMethod: NtfnRescanProgress,
|
||||
}
|
||||
|
||||
// NtfnRecvTx handles the btcws.RecvTxNtfn notification.
|
||||
func NtfnRecvTx(n btcjson.Cmd) error {
|
||||
rtx, ok := n.(*btcws.RecvTxNtfn)
|
||||
if !ok {
|
||||
return fmt.Errorf("%v handler: unexpected type", n.Method())
|
||||
}
|
||||
|
||||
bs, err := GetCurBlock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: cannot get current block: %v", n.Method(), err)
|
||||
}
|
||||
|
||||
rawTx, err := hex.DecodeString(rtx.HexTx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: bad hexstring: %v", n.Method(), err)
|
||||
}
|
||||
tx, err := btcutil.NewTxFromBytes(rawTx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err)
|
||||
}
|
||||
|
||||
block, txIdx, err := parseBlock(rtx.Block)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: bad block: %v", n.Method(), err)
|
||||
}
|
||||
tx.SetIndex(txIdx)
|
||||
|
||||
// For transactions originating from this wallet, the sent tx history should
|
||||
// be recorded before the received history. If wallet created this tx, wait
|
||||
// for the sent history to finish being recorded before continuing.
|
||||
//
|
||||
// TODO(jrick) this is wrong due to tx malleability. Cannot safely use the
|
||||
// txsha as an identifier.
|
||||
req := SendTxHistSyncRequest{
|
||||
txsha: *tx.Sha(),
|
||||
response: make(chan SendTxHistSyncResponse),
|
||||
}
|
||||
SendTxHistSyncChans.access <- req
|
||||
resp := <-req.response
|
||||
if resp.ok {
|
||||
// Wait until send history has been recorded.
|
||||
<-resp.c
|
||||
SendTxHistSyncChans.remove <- *tx.Sha()
|
||||
}
|
||||
|
||||
// For every output, find all accounts handling that output address (if any)
|
||||
// and record the received txout.
|
||||
for outIdx, txout := range tx.MsgTx().TxOut {
|
||||
var accounts []*Account
|
||||
// Errors don't matter here. If addrs is nil, the range below
|
||||
// does nothing.
|
||||
_, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript,
|
||||
activeNet.Params)
|
||||
for _, addr := range addrs {
|
||||
a, err := AcctMgr.AccountByAddress(addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
accounts = append(accounts, a)
|
||||
}
|
||||
|
||||
for _, a := range accounts {
|
||||
txr, err := a.TxStore.InsertTx(tx, block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cred, err := txr.AddCredit(uint32(outIdx), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
AcctMgr.ds.ScheduleTxStoreWrite(a)
|
||||
|
||||
// Notify frontends of tx. If the tx is unconfirmed, it is always
|
||||
// notified and the outpoint is marked as notified. If the outpoint
|
||||
// has already been notified and is now in a block, a txmined notifiction
|
||||
// should be sent once to let frontends that all previous send/recvs
|
||||
// for this unconfirmed tx are now confirmed.
|
||||
op := *cred.OutPoint()
|
||||
previouslyNotifiedReq := NotifiedRecvTxRequest{
|
||||
op: op,
|
||||
response: make(chan NotifiedRecvTxResponse),
|
||||
}
|
||||
NotifiedRecvTxChans.access <- previouslyNotifiedReq
|
||||
if <-previouslyNotifiedReq.response {
|
||||
NotifiedRecvTxChans.remove <- op
|
||||
} else {
|
||||
// Notify frontends of new recv tx and mark as notified.
|
||||
NotifiedRecvTxChans.add <- op
|
||||
|
||||
ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
NotifyNewTxDetails(allClients, a.Name(), ltr)
|
||||
}
|
||||
|
||||
// Notify frontends of new account balance.
|
||||
confirmed := a.CalculateBalance(1)
|
||||
unconfirmed := a.CalculateBalance(0) - confirmed
|
||||
NotifyWalletBalance(allClients, a.name, confirmed)
|
||||
NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NtfnBlockConnected handles btcd notifications resulting from newly
|
||||
// connected blocks to the main blockchain.
|
||||
//
|
||||
// TODO(jrick): Send block time with notification. This will be used
|
||||
// to mark wallet files with a possibly-better earliest block height,
|
||||
// and will greatly reduce rescan times for wallets created with an
|
||||
// out of sync btcd.
|
||||
func NtfnBlockConnected(n btcjson.Cmd) error {
|
||||
bcn, ok := n.(*btcws.BlockConnectedNtfn)
|
||||
if !ok {
|
||||
return fmt.Errorf("%v handler: unexpected type", n.Method())
|
||||
}
|
||||
hash, err := btcwire.NewShaHashFromStr(bcn.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: invalid hash string", n.Method())
|
||||
}
|
||||
|
||||
// Update the blockstamp for the newly-connected block.
|
||||
bs := &wallet.BlockStamp{
|
||||
Height: bcn.Height,
|
||||
Hash: *hash,
|
||||
}
|
||||
curBlock.Lock()
|
||||
curBlock.BlockStamp = *bs
|
||||
curBlock.Unlock()
|
||||
|
||||
// btcd notifies btcwallet about transactions first, and then sends
|
||||
// the new block notification. New balance notifications for txs
|
||||
// in blocks are therefore sent here after all tx notifications
|
||||
// have arrived and finished being processed by the handlers.
|
||||
workers := NotifyBalanceRequest{
|
||||
block: *hash,
|
||||
wg: make(chan *sync.WaitGroup),
|
||||
}
|
||||
NotifyBalanceSyncerChans.access <- workers
|
||||
if wg := <-workers.wg; wg != nil {
|
||||
wg.Wait()
|
||||
NotifyBalanceSyncerChans.remove <- *hash
|
||||
}
|
||||
AcctMgr.BlockNotify(bs)
|
||||
|
||||
// Pass notification to frontends too.
|
||||
marshaled, err := n.MarshalJSON()
|
||||
// The parsed notification is expected to be marshalable.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
allClients <- marshaled
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NtfnBlockDisconnected handles btcd notifications resulting from
|
||||
// blocks disconnected from the main chain in the event of a chain
|
||||
// switch and notifies frontends of the new blockchain height.
|
||||
func NtfnBlockDisconnected(n btcjson.Cmd) error {
|
||||
bdn, ok := n.(*btcws.BlockDisconnectedNtfn)
|
||||
if !ok {
|
||||
return fmt.Errorf("%v handler: unexpected type", n.Method())
|
||||
}
|
||||
hash, err := btcwire.NewShaHashFromStr(bdn.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: invalid hash string", n.Method())
|
||||
}
|
||||
|
||||
// Rollback Utxo and Tx data stores.
|
||||
if err = AcctMgr.Rollback(bdn.Height, hash); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Pass notification to frontends too.
|
||||
marshaled, err := n.MarshalJSON()
|
||||
// A btcws.BlockDisconnectedNtfn is expected to marshal without error.
|
||||
// If it does, it indicates that one of its struct fields is of a
|
||||
// non-marshalable type.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
allClients <- marshaled
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NtfnRedeemingTx handles btcd redeemingtx notifications resulting from a
|
||||
// transaction spending a watched outpoint.
|
||||
func NtfnRedeemingTx(n btcjson.Cmd) error {
|
||||
cn, ok := n.(*btcws.RedeemingTxNtfn)
|
||||
if !ok {
|
||||
return fmt.Errorf("%v handler: unexpected type", n.Method())
|
||||
}
|
||||
|
||||
rawTx, err := hex.DecodeString(cn.HexTx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: bad hexstring: %v", n.Method(), err)
|
||||
}
|
||||
tx, err := btcutil.NewTxFromBytes(rawTx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err)
|
||||
}
|
||||
|
||||
block, txIdx, err := parseBlock(cn.Block)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v handler: bad block: %v", n.Method(), err)
|
||||
}
|
||||
tx.SetIndex(txIdx)
|
||||
return AcctMgr.RecordSpendingTx(tx, block)
|
||||
}
|
||||
|
||||
// NtfnRescanProgress handles btcd rescanprogress notifications resulting
|
||||
// from a partially completed rescan.
|
||||
func NtfnRescanProgress(n btcjson.Cmd) error {
|
||||
cn, ok := n.(*btcws.RescanProgressNtfn)
|
||||
if !ok {
|
||||
return fmt.Errorf("%v handler: unexpected type", n.Method())
|
||||
}
|
||||
|
||||
// Notify the rescan manager of the completed partial progress for
|
||||
// the current rescan.
|
||||
AcctMgr.rm.MarkProgress(cn.LastProcessed)
|
||||
|
||||
return nil
|
||||
}
|
33
rescan.go
33
rescan.go
|
@ -140,9 +140,9 @@ func (b *rescanBatch) merge(job *RescanJob) {
|
|||
}
|
||||
}
|
||||
|
||||
// Status types for the handler.
|
||||
type rescanProgress int32
|
||||
type rescanFinished error
|
||||
type rescanFinished struct {
|
||||
error
|
||||
}
|
||||
|
||||
// jobHandler runs the RescanManager's for-select loop to manage rescan jobs
|
||||
// and dispatch requests.
|
||||
|
@ -190,7 +190,7 @@ func (m *RescanManager) jobHandler() {
|
|||
if m.msgs != nil {
|
||||
m.msgs <- &RescanFinishedMsg{
|
||||
Addresses: curBatch.addrs,
|
||||
Error: error(s),
|
||||
Error: s.error,
|
||||
}
|
||||
}
|
||||
curBatch.done()
|
||||
|
@ -204,6 +204,10 @@ func (m *RescanManager) jobHandler() {
|
|||
m.msgs <- (*RescanStartedMsg)(job)
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
// Unexpected status message
|
||||
panic(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -214,16 +218,17 @@ func (m *RescanManager) jobHandler() {
|
|||
// The jobHandler is notified when the processing the rescan finishes.
|
||||
func (m *RescanManager) rpcHandler() {
|
||||
for job := range m.sendJob {
|
||||
var addrStrs []string
|
||||
for _, addrs := range job.Addresses {
|
||||
for i := range addrs {
|
||||
addrStrs = append(addrStrs, addrs[i].EncodeAddress())
|
||||
var addrs []btcutil.Address
|
||||
for _, accountAddrs := range job.Addresses {
|
||||
addrs = append(addrs, accountAddrs...)
|
||||
}
|
||||
client, err := accessClient()
|
||||
if err != nil {
|
||||
m.status <- rescanFinished{err}
|
||||
return
|
||||
}
|
||||
|
||||
c := CurrentServerConn()
|
||||
err := Rescan(c, job.StartHeight, addrStrs, job.OutPoints)
|
||||
m.status <- rescanFinished(err)
|
||||
err = client.Rescan(job.StartHeight, addrs, job.OutPoints)
|
||||
m.status <- rescanFinished{err}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -260,6 +265,6 @@ func (m *RescanManager) SubmitJob(job *RescanJob) <-chan struct{} {
|
|||
|
||||
// MarkProgress messages the RescanManager with the height of the block
|
||||
// last processed by a running rescan.
|
||||
func (m *RescanManager) MarkProgress(height int32) {
|
||||
m.status <- rescanProgress(height)
|
||||
func (m *RescanManager) MarkProgress(height rescanProgress) {
|
||||
m.status <- height
|
||||
}
|
||||
|
|
101
rpc.go
101
rpc.go
|
@ -1,101 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
||||
*
|
||||
* Permission to use, copy, modify, and distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
* copyright notice and this permission notice appear in all copies.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/conformal/btcjson"
|
||||
)
|
||||
|
||||
// RawRPCResponse is a response to a JSON-RPC request with delayed
|
||||
// unmarshaling.
|
||||
type RawRPCResponse struct {
|
||||
Id *uint64
|
||||
Result *json.RawMessage `json:"result"`
|
||||
Error *json.RawMessage `json:"error"`
|
||||
}
|
||||
|
||||
// FinishUnmarshal unmarshals the result and error of a raw RPC response.
|
||||
// If v is non-nil, the result is unmarshaled into the variable pointed
|
||||
// to by the interface rather than using the rules in the encoding/json
|
||||
// package to allocate a new variable for the result. The final result
|
||||
// and JSON-RPC error is returned.
|
||||
//
|
||||
// If the returned error is non-nil, it will be a btcjson.Error.
|
||||
func (r *RawRPCResponse) FinishUnmarshal(v interface{}) (interface{}, error) {
|
||||
// JSON-RPC spec makes this handling easier-ish because both result and
|
||||
// error cannot be non-nil.
|
||||
if r.Error != nil {
|
||||
var jsonErr btcjson.Error
|
||||
if err := json.Unmarshal([]byte(*r.Error), &jsonErr); err != nil {
|
||||
return nil, btcjson.Error{
|
||||
Code: btcjson.ErrParse.Code,
|
||||
Message: err.Error(),
|
||||
}
|
||||
}
|
||||
return nil, jsonErr
|
||||
}
|
||||
if r.Result != nil {
|
||||
if err := json.Unmarshal([]byte(*r.Result), &v); err != nil {
|
||||
return nil, btcjson.Error{
|
||||
Code: btcjson.ErrParse.Code,
|
||||
Message: err.Error(),
|
||||
}
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// ClientRequest is a type holding a bitcoin client's request and
|
||||
// a channel to send the response.
|
||||
type ClientRequest struct {
|
||||
ws bool
|
||||
request btcjson.Cmd
|
||||
response chan RawRPCResponse
|
||||
}
|
||||
|
||||
// NewClientRequest creates a new ClientRequest from a btcjson.Cmd.
|
||||
func NewClientRequest(request btcjson.Cmd, ws bool) *ClientRequest {
|
||||
return &ClientRequest{
|
||||
ws: ws,
|
||||
request: request,
|
||||
response: make(chan RawRPCResponse),
|
||||
}
|
||||
}
|
||||
|
||||
// Handle sends a client request to the RPC gateway for processing,
|
||||
// and returns the result when handling is finished.
|
||||
func (r *ClientRequest) Handle() RawRPCResponse {
|
||||
clientRequests <- r
|
||||
return <-r.response
|
||||
}
|
||||
|
||||
// ServerRequest is a type responsible for handling requests to a bitcoin
|
||||
// server and providing a method to access the response.
|
||||
type ServerRequest struct {
|
||||
request btcjson.Cmd
|
||||
response chan RawRPCResponse
|
||||
}
|
||||
|
||||
// NewServerRequest creates a new ServerRequest from a btcjson.Cmd.
|
||||
func NewServerRequest(request btcjson.Cmd) *ServerRequest {
|
||||
return &ServerRequest{
|
||||
request: request,
|
||||
response: make(chan RawRPCResponse, 1),
|
||||
}
|
||||
}
|
789
rpcclient.go
789
rpcclient.go
|
@ -14,405 +14,490 @@
|
|||
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
|
||||
// This file implements the websocket client connection to a bitcoin RPC
|
||||
// server.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"code.google.com/p/go.net/websocket"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/conformal/btcjson"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/conformal/btcrpcclient"
|
||||
"github.com/conformal/btcscript"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwallet/txstore"
|
||||
"github.com/conformal/btcwallet/wallet"
|
||||
"github.com/conformal/btcwire"
|
||||
"github.com/conformal/btcws"
|
||||
"io"
|
||||
)
|
||||
|
||||
// ServerConn is an interface representing a client connection to a bitcoin RPC
|
||||
// server.
|
||||
type ServerConn interface {
|
||||
// SendRequest sends a bitcoin RPC request, returning a channel to
|
||||
// read the reply. A channel is used so both synchronous and
|
||||
// asynchronous RPC can be supported.
|
||||
SendRequest(request *ServerRequest) chan RawRPCResponse
|
||||
// InvalidNotificationError describes an error due to an invalid chain server
|
||||
// notification and should be warned by wallet, but does not indicate an
|
||||
// problem with the current wallet state.
|
||||
type InvalidNotificationError struct {
|
||||
error
|
||||
}
|
||||
|
||||
// ErrBtcdDisconnected describes an error where an operation cannot
|
||||
// successfully complete due to btcwallet not being connected to
|
||||
// btcd.
|
||||
var ErrBtcdDisconnected = btcjson.Error{
|
||||
Code: -1,
|
||||
Message: "btcd disconnected",
|
||||
var (
|
||||
// MismatchingNetworks represents an error where a client connection
|
||||
// to btcd cannot succeed due to btcwallet and btcd operating on
|
||||
// different bitcoin networks.
|
||||
ErrMismatchedNets = errors.New("mismatched networks")
|
||||
)
|
||||
|
||||
const (
|
||||
// maxConcurrentClientRequests is the maximum number of
|
||||
// unhandled/running requests that the server will run for a websocket
|
||||
// client at a time. Beyond this limit, additional request reads will
|
||||
// block until a running request handler finishes. This limit exists to
|
||||
// prevent a single connection from causing a denial of service attack
|
||||
// with an unnecessarily large number of requests.
|
||||
maxConcurrentClientRequests = 20
|
||||
|
||||
// maxUnhandledNotifications is the maximum number of still marshaled
|
||||
// and unhandled notifications. If this limit is reached, the
|
||||
// btcrpcclient client notification handlers will begin blocking until
|
||||
// an unhandled notification is processed.
|
||||
maxUnhandledNotifications = 50
|
||||
)
|
||||
|
||||
type notificationChan chan notification
|
||||
|
||||
type blockSummary struct {
|
||||
hash *btcwire.ShaHash
|
||||
height int32
|
||||
}
|
||||
|
||||
// ErrBtcdDisconnectedRaw is the raw JSON encoding of ErrBtcdDisconnected.
|
||||
var ErrBtcdDisconnectedRaw = json.RawMessage(`{"code":-1,"message":"btcd disconnected"}`)
|
||||
|
||||
// BtcdRPCConn is a type managing a client connection to a btcd RPC server
|
||||
// over websockets.
|
||||
type BtcdRPCConn struct {
|
||||
ws *websocket.Conn
|
||||
addRequest chan *AddRPCRequest
|
||||
closed chan struct{}
|
||||
type acceptedTx struct {
|
||||
tx *btcutil.Tx
|
||||
block *btcws.BlockDetails // nil if unmined
|
||||
}
|
||||
|
||||
// Ensure that BtcdRPCConn can be used as an RPCConn.
|
||||
var _ ServerConn = &BtcdRPCConn{}
|
||||
|
||||
// NewBtcdRPCConn creates a new RPC connection from a btcd websocket
|
||||
// connection to btcd.
|
||||
func NewBtcdRPCConn(ws *websocket.Conn) *BtcdRPCConn {
|
||||
conn := &BtcdRPCConn{
|
||||
ws: ws,
|
||||
addRequest: make(chan *AddRPCRequest),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
return conn
|
||||
// Notification types. These are defined here and processed from from reading
|
||||
// a notificationChan to avoid handling these notifications directly in
|
||||
// btcrpcclient callbacks, which isn't very go-like and doesn't allow
|
||||
// blocking client calls.
|
||||
type (
|
||||
// Container type for any notification.
|
||||
notification interface {
|
||||
handleNotification() error
|
||||
}
|
||||
|
||||
// SendRequest sends an RPC request and returns a channel to read the response's
|
||||
// result and error. Part of the RPCConn interface.
|
||||
func (btcd *BtcdRPCConn) SendRequest(request *ServerRequest) chan RawRPCResponse {
|
||||
select {
|
||||
case <-btcd.closed:
|
||||
// The connection has closed, so instead of adding and sending
|
||||
// a request, return a channel that just replies with the
|
||||
// error for a disconnected btcd.
|
||||
responseChan := make(chan RawRPCResponse, 1)
|
||||
responseChan <- RawRPCResponse{Error: &ErrBtcdDisconnectedRaw}
|
||||
return responseChan
|
||||
blockConnected blockSummary
|
||||
blockDisconnected blockSummary
|
||||
recvTx acceptedTx
|
||||
redeemingTx acceptedTx
|
||||
rescanProgress int32
|
||||
)
|
||||
|
||||
default:
|
||||
addRequest := &AddRPCRequest{
|
||||
Request: request,
|
||||
ResponseChan: make(chan chan RawRPCResponse, 1),
|
||||
}
|
||||
btcd.addRequest <- addRequest
|
||||
return <-addRequest.ResponseChan
|
||||
}
|
||||
func (c notificationChan) onBlockConnected(hash *btcwire.ShaHash, height int32) {
|
||||
c <- (blockConnected)(blockSummary{hash, height})
|
||||
}
|
||||
|
||||
// Connected returns whether the connection remains established to the RPC
|
||||
// server.
|
||||
func (c notificationChan) onBlockDisconnected(hash *btcwire.ShaHash, height int32) {
|
||||
c <- (blockDisconnected)(blockSummary{hash, height})
|
||||
}
|
||||
|
||||
func (c notificationChan) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
|
||||
c <- recvTx{tx, block}
|
||||
}
|
||||
|
||||
func (c notificationChan) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) {
|
||||
c <- redeemingTx{tx, block}
|
||||
}
|
||||
|
||||
func (c notificationChan) onRescanProgress(height int32) {
|
||||
c <- rescanProgress(height)
|
||||
}
|
||||
|
||||
func (n blockConnected) handleNotification() error {
|
||||
// Update the blockstamp for the newly-connected block.
|
||||
bs := &wallet.BlockStamp{
|
||||
Height: n.height,
|
||||
Hash: *n.hash,
|
||||
}
|
||||
curBlock.Lock()
|
||||
curBlock.BlockStamp = *bs
|
||||
curBlock.Unlock()
|
||||
|
||||
// btcd notifies btcwallet about transactions first, and then sends
|
||||
// the new block notification. New balance notifications for txs
|
||||
// in blocks are therefore sent here after all tx notifications
|
||||
// have arrived and finished being processed by the handlers.
|
||||
workers := NotifyBalanceRequest{
|
||||
block: *n.hash,
|
||||
wg: make(chan *sync.WaitGroup),
|
||||
}
|
||||
NotifyBalanceSyncerChans.access <- workers
|
||||
if wg := <-workers.wg; wg != nil {
|
||||
wg.Wait()
|
||||
NotifyBalanceSyncerChans.remove <- *n.hash
|
||||
}
|
||||
AcctMgr.BlockNotify(bs)
|
||||
|
||||
// Pass notification to frontends too.
|
||||
if server != nil {
|
||||
// TODO: marshaling should be perfomred by the server, and
|
||||
// sent only to client that have requested the notification.
|
||||
marshaled, err := n.MarshalJSON()
|
||||
// The parsed notification is expected to be marshalable.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
server.broadcasts <- marshaled
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalJSON creates the JSON encoding of the chain notification to pass
|
||||
// to any connected wallet clients. This should never error.
|
||||
func (n blockConnected) MarshalJSON() ([]byte, error) {
|
||||
nn := btcws.NewBlockConnectedNtfn(n.hash.String(), n.height)
|
||||
return nn.MarshalJSON()
|
||||
}
|
||||
|
||||
func (n blockDisconnected) handleNotification() error {
|
||||
// Rollback Utxo and Tx data stores.
|
||||
if err := AcctMgr.Rollback(n.height, n.hash); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Pass notification to frontends too.
|
||||
if server != nil {
|
||||
// TODO: marshaling should be perfomred by the server, and
|
||||
// sent only to client that have requested the notification.
|
||||
marshaled, err := n.MarshalJSON()
|
||||
// A btcws.BlockDisconnectedNtfn is expected to marshal without error.
|
||||
// If it does, it indicates that one of its struct fields is of a
|
||||
// non-marshalable type.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
server.broadcasts <- marshaled
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalJSON creates the JSON encoding of the chain notification to pass
|
||||
// to any connected wallet clients. This should never error.
|
||||
func (n blockDisconnected) MarshalJSON() ([]byte, error) {
|
||||
nn := btcws.NewBlockDisconnectedNtfn(n.hash.String(), n.height)
|
||||
return nn.MarshalJSON()
|
||||
}
|
||||
|
||||
func parseBlock(block *btcws.BlockDetails) (*txstore.Block, int, error) {
|
||||
if block == nil {
|
||||
return nil, btcutil.TxIndexUnknown, nil
|
||||
}
|
||||
blksha, err := btcwire.NewShaHashFromStr(block.Hash)
|
||||
if err != nil {
|
||||
return nil, btcutil.TxIndexUnknown, err
|
||||
}
|
||||
b := &txstore.Block{
|
||||
Height: block.Height,
|
||||
Hash: *blksha,
|
||||
Time: time.Unix(block.Time, 0),
|
||||
}
|
||||
return b, block.Index, nil
|
||||
}
|
||||
|
||||
func (n recvTx) handleNotification() error {
|
||||
block, txIdx, err := parseBlock(n.block)
|
||||
if err != nil {
|
||||
return InvalidNotificationError{err}
|
||||
}
|
||||
n.tx.SetIndex(txIdx)
|
||||
|
||||
bs, err := GetCurBlock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get current block: %v", err)
|
||||
}
|
||||
|
||||
// For transactions originating from this wallet, the sent tx history should
|
||||
// be recorded before the received history. If wallet created this tx, wait
|
||||
// for the sent history to finish being recorded before continuing.
|
||||
//
|
||||
// This function probably should be removed, as any checks for confirming
|
||||
// the connection are no longer valid after the check and may result in
|
||||
// races.
|
||||
func (btcd *BtcdRPCConn) Connected() bool {
|
||||
select {
|
||||
case <-btcd.closed:
|
||||
return false
|
||||
// TODO(jrick) this is wrong due to tx malleability. Cannot safely use the
|
||||
// txsha as an identifier.
|
||||
req := SendTxHistSyncRequest{
|
||||
txsha: *n.tx.Sha(),
|
||||
response: make(chan SendTxHistSyncResponse),
|
||||
}
|
||||
SendTxHistSyncChans.access <- req
|
||||
resp := <-req.response
|
||||
if resp.ok {
|
||||
// Wait until send history has been recorded.
|
||||
<-resp.c
|
||||
SendTxHistSyncChans.remove <- *n.tx.Sha()
|
||||
}
|
||||
|
||||
// For every output, find all accounts handling that output address (if any)
|
||||
// and record the received txout.
|
||||
for outIdx, txout := range n.tx.MsgTx().TxOut {
|
||||
var accounts []*Account
|
||||
// Errors don't matter here. If addrs is nil, the range below
|
||||
// does nothing.
|
||||
_, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript,
|
||||
activeNet.Params)
|
||||
for _, addr := range addrs {
|
||||
a, err := AcctMgr.AccountByAddress(addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
accounts = append(accounts, a)
|
||||
}
|
||||
|
||||
for _, a := range accounts {
|
||||
txr, err := a.TxStore.InsertTx(n.tx, block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cred, err := txr.AddCredit(uint32(outIdx), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
AcctMgr.ds.ScheduleTxStoreWrite(a)
|
||||
|
||||
// Notify frontends of tx. If the tx is unconfirmed, it is always
|
||||
// notified and the outpoint is marked as notified. If the outpoint
|
||||
// has already been notified and is now in a block, a txmined notifiction
|
||||
// should be sent once to let frontends that all previous send/recvs
|
||||
// for this unconfirmed tx are now confirmed.
|
||||
op := *cred.OutPoint()
|
||||
previouslyNotifiedReq := NotifiedRecvTxRequest{
|
||||
op: op,
|
||||
response: make(chan NotifiedRecvTxResponse),
|
||||
}
|
||||
NotifiedRecvTxChans.access <- previouslyNotifiedReq
|
||||
if <-previouslyNotifiedReq.response {
|
||||
NotifiedRecvTxChans.remove <- op
|
||||
} else {
|
||||
// Notify frontends of new recv tx and mark as notified.
|
||||
NotifiedRecvTxChans.add <- op
|
||||
|
||||
ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
server.NotifyNewTxDetails(a.Name(), ltr)
|
||||
}
|
||||
|
||||
// Notify frontends of new account balance.
|
||||
confirmed := a.CalculateBalance(1)
|
||||
unconfirmed := a.CalculateBalance(0) - confirmed
|
||||
server.NotifyWalletBalance(a.name, confirmed)
|
||||
server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n redeemingTx) handleNotification() error {
|
||||
block, txIdx, err := parseBlock(n.block)
|
||||
if err != nil {
|
||||
return InvalidNotificationError{err}
|
||||
}
|
||||
n.tx.SetIndex(txIdx)
|
||||
return AcctMgr.RecordSpendingTx(n.tx, block)
|
||||
}
|
||||
|
||||
func (n rescanProgress) handleNotification() error {
|
||||
AcctMgr.rm.MarkProgress(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
type rpcClient struct {
|
||||
*btcrpcclient.Client // client to btcd
|
||||
chainNotifications notificationChan
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newRPCClient(certs []byte) (*rpcClient, error) {
|
||||
ntfns := make(notificationChan, maxUnhandledNotifications)
|
||||
client := rpcClient{
|
||||
chainNotifications: ntfns,
|
||||
}
|
||||
initializedClient := make(chan struct{})
|
||||
ntfnCallbacks := btcrpcclient.NotificationHandlers{
|
||||
OnClientConnected: func() {
|
||||
log.Info("Established connection to btcd")
|
||||
<-initializedClient
|
||||
|
||||
// nil client to broadcast to all connected clients
|
||||
server.NotifyConnectionStatus(nil)
|
||||
|
||||
err := client.Handshake()
|
||||
if err != nil {
|
||||
log.Errorf("Cannot complete handshake: %v", err)
|
||||
client.Stop()
|
||||
}
|
||||
},
|
||||
OnBlockConnected: ntfns.onBlockConnected,
|
||||
OnBlockDisconnected: ntfns.onBlockDisconnected,
|
||||
OnRecvTx: ntfns.onRecvTx,
|
||||
OnRedeemingTx: ntfns.onRedeemingTx,
|
||||
OnRescanProgress: ntfns.onRescanProgress,
|
||||
}
|
||||
conf := btcrpcclient.ConnConfig{
|
||||
Host: cfg.RPCConnect,
|
||||
Endpoint: "ws",
|
||||
User: cfg.BtcdUsername,
|
||||
Pass: cfg.BtcdPassword,
|
||||
Certificates: certs,
|
||||
}
|
||||
c, err := btcrpcclient.New(&conf, &ntfnCallbacks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.Client = c
|
||||
close(initializedClient)
|
||||
return &client, nil
|
||||
}
|
||||
|
||||
func (c *rpcClient) Start() {
|
||||
c.wg.Add(1)
|
||||
go c.handleNotifications()
|
||||
}
|
||||
|
||||
func (c *rpcClient) Stop() {
|
||||
if !c.Client.Disconnected() {
|
||||
log.Warn("Disconnecting chain server client connection")
|
||||
c.Client.Shutdown()
|
||||
}
|
||||
close(c.chainNotifications)
|
||||
}
|
||||
|
||||
func (c *rpcClient) WaitForShutdown() {
|
||||
c.Client.WaitForShutdown()
|
||||
c.wg.Wait()
|
||||
}
|
||||
|
||||
func (c *rpcClient) handleNotifications() {
|
||||
for n := range c.chainNotifications {
|
||||
AcctMgr.Grab()
|
||||
err := n.handleNotification()
|
||||
if err != nil {
|
||||
switch e := err.(type) {
|
||||
case InvalidNotificationError:
|
||||
log.Warnf("Ignoring invalid notification: %v", e)
|
||||
default:
|
||||
return true
|
||||
log.Errorf("Cannot handle notification: %v", e)
|
||||
}
|
||||
}
|
||||
AcctMgr.Release()
|
||||
}
|
||||
c.wg.Done()
|
||||
}
|
||||
|
||||
// Close forces closing the current btcd connection.
|
||||
func (btcd *BtcdRPCConn) Close() {
|
||||
select {
|
||||
case <-btcd.closed:
|
||||
default:
|
||||
close(btcd.closed)
|
||||
}
|
||||
}
|
||||
|
||||
// AddRPCRequest is used to add an RPCRequest to the pool of requests
|
||||
// being manaaged by a btcd RPC connection.
|
||||
type AddRPCRequest struct {
|
||||
Request *ServerRequest
|
||||
ResponseChan chan chan RawRPCResponse
|
||||
}
|
||||
|
||||
// send performs the actual send of the marshaled request over the btcd
|
||||
// websocket connection.
|
||||
func (btcd *BtcdRPCConn) send(rpcrequest *ServerRequest) error {
|
||||
// btcjson.Cmds define their own MarshalJSON which returns an error
|
||||
// to satisify the json.Marshaler interface, but should never error.
|
||||
// If an error does occur, it is due to a struct containing a type
|
||||
// that is not marshalable, so panic here rather than silently
|
||||
// ignoring it.
|
||||
mrequest, err := rpcrequest.request.MarshalJSON()
|
||||
// Handshake first checks that the websocket connection between btcwallet and
|
||||
// btcd is valid, that is, that there are no mismatching settings between
|
||||
// the two processes (such as running on different Bitcoin networks). If the
|
||||
// sanity checks pass, all wallets are set to be tracked against chain
|
||||
// notifications from this btcd connection.
|
||||
//
|
||||
// TODO(jrick): Track and Rescan commands should be replaced with a
|
||||
// single TrackSince function (or similar) which requests address
|
||||
// notifications and performs the rescan since some block height.
|
||||
func (c *rpcClient) Handshake() error {
|
||||
net, err := c.GetCurrentNet()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return err
|
||||
}
|
||||
return websocket.Message.Send(btcd.ws, mrequest)
|
||||
if net != activeNet.Net {
|
||||
return ErrMismatchedNets
|
||||
}
|
||||
|
||||
// Start starts the goroutines required to send RPC requests and listen for
|
||||
// replies.
|
||||
func (btcd *BtcdRPCConn) Start() {
|
||||
done := btcd.closed
|
||||
responses := make(chan RawRPCResponse)
|
||||
|
||||
// Maintain a map of JSON IDs to RPCRequests currently being waited on.
|
||||
go func() {
|
||||
m := make(map[uint64]*ServerRequest)
|
||||
for {
|
||||
select {
|
||||
case addrequest := <-btcd.addRequest:
|
||||
rpcrequest := addrequest.Request
|
||||
m[rpcrequest.request.Id().(uint64)] = rpcrequest
|
||||
|
||||
if err := btcd.send(rpcrequest); err != nil {
|
||||
// Connection lost.
|
||||
log.Infof("Cannot complete btcd websocket send: %v",
|
||||
err)
|
||||
if err := btcd.ws.Close(); err != nil {
|
||||
log.Warnf("Cannot close btcd "+
|
||||
"websocket connection: %v", err)
|
||||
}
|
||||
close(done)
|
||||
// Request notifications for connected and disconnected blocks.
|
||||
if err := c.NotifyBlocks(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addrequest.ResponseChan <- rpcrequest.response
|
||||
|
||||
case rawResponse, ok := <-responses:
|
||||
if !ok {
|
||||
responses = nil
|
||||
close(done)
|
||||
break
|
||||
// Get current best block. If this is before than the oldest
|
||||
// saved block hash, assume that this btcd instance is not yet
|
||||
// synced up to a previous btcd that was last used with this
|
||||
// wallet.
|
||||
bs, err := GetCurBlock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get best block: %v", err)
|
||||
}
|
||||
rpcrequest, ok := m[*rawResponse.Id]
|
||||
if !ok {
|
||||
log.Warnf("Received unexpected btcd response")
|
||||
continue
|
||||
}
|
||||
delete(m, *rawResponse.Id)
|
||||
|
||||
rpcrequest.response <- rawResponse
|
||||
|
||||
case <-done:
|
||||
resp := RawRPCResponse{Error: &ErrBtcdDisconnectedRaw}
|
||||
for _, request := range m {
|
||||
request.response <- resp
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Listen for replies/notifications from btcd, and decide how to handle them.
|
||||
go func() {
|
||||
// Idea: instead of reading btcd messages from just one websocket
|
||||
// connection, maybe use two so the same connection isn't used
|
||||
// for both notifications and responses? Should make handling
|
||||
// must faster as unnecessary unmarshal attempts could be avoided.
|
||||
|
||||
for {
|
||||
var m string
|
||||
if err := websocket.Message.Receive(btcd.ws, &m); err != nil {
|
||||
// Log warning if btcd did not disconnect.
|
||||
if err != io.EOF {
|
||||
log.Infof("Cannot receive btcd websocket message: %v",
|
||||
err)
|
||||
}
|
||||
if err := btcd.ws.Close(); err != nil {
|
||||
log.Warnf("Cannot close btcd "+
|
||||
"websocket connection: %v", err)
|
||||
}
|
||||
close(responses)
|
||||
return
|
||||
if server != nil {
|
||||
server.NotifyNewBlockChainHeight(&bs)
|
||||
server.NotifyBalances(nil)
|
||||
}
|
||||
|
||||
// Try notifications (requests with nil ids) first.
|
||||
n, err := unmarshalNotification(m)
|
||||
if err == nil {
|
||||
svrNtfns <- n
|
||||
// Get default account. Only the default account is used to
|
||||
// track recently-seen blocks.
|
||||
a, err := AcctMgr.Account("")
|
||||
if err != nil {
|
||||
// No account yet is not a handshake error, but means our
|
||||
// handshake is done.
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(jrick): if height is less than the earliest-saved block
|
||||
// height, should probably wait for btcd to catch up.
|
||||
|
||||
// Check that there was not any reorgs done since last connection.
|
||||
// If so, rollback and rescan to catch up.
|
||||
it := a.Wallet.NewIterateRecentBlocks()
|
||||
for cont := it != nil; cont; cont = it.Prev() {
|
||||
bs := it.BlockStamp()
|
||||
log.Debugf("Checking for previous saved block with height %v hash %v",
|
||||
bs.Height, bs.Hash)
|
||||
|
||||
if _, err := c.GetBlock(&bs.Hash); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Must be a response.
|
||||
r, err := unmarshalResponse(m)
|
||||
if err == nil {
|
||||
responses <- r
|
||||
continue
|
||||
}
|
||||
log.Debug("Found matching block.")
|
||||
|
||||
// Not sure what was received but it isn't correct.
|
||||
log.Warnf("Received invalid message from btcd")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// unmarshalResponse attempts to unmarshal a marshaled JSON-RPC response.
|
||||
func unmarshalResponse(s string) (RawRPCResponse, error) {
|
||||
var r RawRPCResponse
|
||||
if err := json.Unmarshal([]byte(s), &r); err != nil {
|
||||
return r, err
|
||||
}
|
||||
|
||||
// Check for a valid ID.
|
||||
if r.Id == nil {
|
||||
return r, errors.New("id is null")
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// unmarshalNotification attempts to unmarshal a marshaled JSON-RPC
|
||||
// notification (Request with a nil or no ID).
|
||||
func unmarshalNotification(s string) (btcjson.Cmd, error) {
|
||||
req, err := btcjson.ParseMarshaledCmd([]byte(s))
|
||||
// If we had to go back to any previous blocks (it.Next
|
||||
// returns true), then rollback the next and all child blocks.
|
||||
// This rollback is done here instead of in the blockMissing
|
||||
// check above for each removed block because Rollback will
|
||||
// try to write new tx and utxo files on each rollback.
|
||||
if it.Next() {
|
||||
bs := it.BlockStamp()
|
||||
err := AcctMgr.Rollback(bs.Height, &bs.Hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.Id() != nil {
|
||||
return nil, errors.New("id is non-nil")
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// GetBestBlock gets both the block height and hash of the best block
|
||||
// in the main chain.
|
||||
func GetBestBlock(rpc ServerConn) (*btcws.GetBestBlockResult, error) {
|
||||
cmd := btcws.NewGetBestBlockCmd(<-NewJSONID)
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
|
||||
var resultData btcws.GetBestBlockResult
|
||||
if _, err := response.FinishUnmarshal(&resultData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resultData, nil
|
||||
}
|
||||
|
||||
// GetBlock requests details about a block with the given hash.
|
||||
func GetBlock(rpc ServerConn, blockHash string) (*btcjson.BlockResult, error) {
|
||||
// NewGetBlockCmd should never fail with no optargs. If this does fail,
|
||||
// panic now rather than later.
|
||||
cmd, err := btcjson.NewGetBlockCmd(<-NewJSONID, blockHash)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
|
||||
var resultData btcjson.BlockResult
|
||||
if _, err := response.FinishUnmarshal(&resultData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resultData, nil
|
||||
}
|
||||
|
||||
// GetCurrentNet requests the network a bitcoin RPC server is running on.
|
||||
func GetCurrentNet(rpc ServerConn) (btcwire.BitcoinNet, error) {
|
||||
cmd := btcws.NewGetCurrentNetCmd(<-NewJSONID)
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
|
||||
var resultData uint32
|
||||
if _, err := response.FinishUnmarshal(&resultData); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return btcwire.BitcoinNet(resultData), nil
|
||||
}
|
||||
|
||||
// NotifyBlocks requests blockconnected and blockdisconnected notifications.
|
||||
func NotifyBlocks(rpc ServerConn) error {
|
||||
cmd := btcws.NewNotifyBlocksCmd(<-NewJSONID)
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
_, err := response.FinishUnmarshal(nil)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyReceived requests notifications for new transactions that spend
|
||||
// to any of the addresses in addrs.
|
||||
func NotifyReceived(rpc ServerConn, addrs []string) error {
|
||||
cmd := btcws.NewNotifyReceivedCmd(<-NewJSONID, addrs)
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
_, err := response.FinishUnmarshal(nil)
|
||||
// Set default account to be marked in sync with the current
|
||||
// blockstamp. This invalidates the iterator.
|
||||
a.Wallet.SetSyncedWith(bs)
|
||||
|
||||
// Begin tracking wallets against this btcd instance.
|
||||
AcctMgr.Track()
|
||||
if err := AcctMgr.RescanActiveAddresses(); err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO: Only begin tracking new unspent outputs as a result
|
||||
// of the rescan. This is also pretty racy, as a new block
|
||||
// could arrive between rescan and by the time the new outpoint
|
||||
// is added to btcd's websocket's unspent output set.
|
||||
AcctMgr.Track()
|
||||
|
||||
// NotifySpent requests notifications for when a transaction is processed which
|
||||
// spends op.
|
||||
func NotifySpent(rpc ServerConn, outpoints []*btcwire.OutPoint) error {
|
||||
ops := make([]btcws.OutPoint, 0, len(outpoints))
|
||||
for _, op := range outpoints {
|
||||
ops = append(ops, *btcws.NewOutPointFromWire(op))
|
||||
// (Re)send any unmined transactions to btcd in case of a btcd restart.
|
||||
AcctMgr.ResendUnminedTxs()
|
||||
|
||||
// Get current blockchain height and best block hash.
|
||||
return nil
|
||||
}
|
||||
cmd := btcws.NewNotifySpentCmd(<-NewJSONID, ops)
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
_, err := response.FinishUnmarshal(nil)
|
||||
|
||||
// Iterator was invalid (wallet has never been synced) or there was a
|
||||
// huge chain fork + reorg (more than 20 blocks).
|
||||
AcctMgr.Track()
|
||||
if err := AcctMgr.RescanActiveAddresses(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Rescan requests a blockchain rescan for transactions to any number of
|
||||
// addresses and notifications to inform wallet about such transactions.
|
||||
func Rescan(rpc ServerConn, beginBlock int32, addrs []string,
|
||||
outpoints []*btcwire.OutPoint) error {
|
||||
|
||||
ops := make([]btcws.OutPoint, 0, len(outpoints))
|
||||
for _, op := range outpoints {
|
||||
ops = append(ops, *btcws.NewOutPointFromWire(op))
|
||||
}
|
||||
// NewRescanCmd should never fail with no optargs. If this does fail,
|
||||
// panic now rather than later.
|
||||
cmd, err := btcws.NewRescanCmd(<-NewJSONID, beginBlock, addrs, ops)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
_, err = response.FinishUnmarshal(nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// SendRawTransaction sends a hex-encoded transaction for relay.
|
||||
func SendRawTransaction(rpc ServerConn, hextx string) (txid string, err error) {
|
||||
// NewSendRawTransactionCmd should never fail. In the exceptional case
|
||||
// where it does, panic here rather than later.
|
||||
cmd, err := btcjson.NewSendRawTransactionCmd(<-NewJSONID, hextx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
response := <-rpc.SendRequest(NewServerRequest(cmd))
|
||||
|
||||
var resultData string
|
||||
_, err = response.FinishUnmarshal(&resultData)
|
||||
return resultData, err
|
||||
}
|
||||
|
||||
// GetRawTransaction returns a future representing a pending GetRawTransaction
|
||||
// command for txsha.. When the result of the request is required it may be
|
||||
// collected with GetRawTRansactionAsyncResult.
|
||||
func GetRawTransactionAsync(rpc ServerConn, txsha *btcwire.ShaHash) chan RawRPCResponse {
|
||||
// NewGetRawTransactionCmd should never fail with no optargs. If this
|
||||
// does fail, panic now rather than later.
|
||||
cmd, err := btcjson.NewGetRawTransactionCmd(<-NewJSONID, txsha.String())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return rpc.SendRequest(NewServerRequest(cmd))
|
||||
}
|
||||
|
||||
// GetRawTransactionAsyncResult waits for the pending command in request -
|
||||
// the reqsult of a previous GetRawTransactionAsync() call - and returns either
|
||||
// the requested transaction, or an error.
|
||||
func GetRawTransactionAsyncResult(request chan RawRPCResponse) (*btcutil.Tx, error) {
|
||||
response := <-request
|
||||
|
||||
var resultData string
|
||||
_, err := response.FinishUnmarshal(&resultData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
serializedTx, err := hex.DecodeString(resultData)
|
||||
if err != nil {
|
||||
return nil, btcjson.ErrDecodeHexString
|
||||
}
|
||||
utx, err := btcutil.NewTxFromBytes(serializedTx)
|
||||
if err != nil {
|
||||
return nil, btcjson.ErrDeserialization
|
||||
}
|
||||
return utx, nil
|
||||
}
|
||||
|
||||
// GetRawTransaction sends the non-verbose version of a getrawtransaction
|
||||
// request to receive the serialized transaction referenced by txsha. If
|
||||
// successful, the transaction is decoded and returned as a btcutil.Tx.
|
||||
func GetRawTransaction(rpc ServerConn, txsha *btcwire.ShaHash) (*btcutil.Tx, error) {
|
||||
resp := GetRawTransactionAsync(rpc, txsha)
|
||||
return GetRawTransactionAsyncResult(resp)
|
||||
// TODO: only begin tracking new unspent outputs as a result of the
|
||||
// rescan. This is also racy (see comment for second Track above).
|
||||
AcctMgr.Track()
|
||||
AcctMgr.ResendUnminedTxs()
|
||||
return nil
|
||||
}
|
||||
|
|
1074
rpcserver.go
1074
rpcserver.go
File diff suppressed because it is too large
Load diff
814
sockets.go
814
sockets.go
|
@ -1,814 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
|
||||
*
|
||||
* Permission to use, copy, modify, and distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
* copyright notice and this permission notice appear in all copies.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"code.google.com/p/go.net/websocket"
|
||||
"crypto/sha256"
|
||||
"crypto/subtle"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/conformal/btcjson"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwallet/wallet"
|
||||
"github.com/conformal/btcws"
|
||||
"github.com/conformal/go-socks"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrBadAuth represents an error where a request is denied due to
|
||||
// a missing, incorrect, or duplicate authentication request.
|
||||
ErrBadAuth = errors.New("bad auth")
|
||||
|
||||
// ErrNoAuth represents an error where authentication could not succeed
|
||||
// due to a missing Authorization HTTP header.
|
||||
ErrNoAuth = errors.New("no auth")
|
||||
|
||||
// ErrConnRefused represents an error where a connection to another
|
||||
// process cannot be established.
|
||||
ErrConnRefused = errors.New("connection refused")
|
||||
|
||||
// ErrConnLost represents an error where a connection to another
|
||||
// process cannot be established.
|
||||
ErrConnLost = errors.New("connection lost")
|
||||
|
||||
// Adds a frontend listener channel
|
||||
addClient = make(chan clientContext)
|
||||
|
||||
// Messages sent to this channel are sent to each connected frontend.
|
||||
allClients = make(chan []byte, 100)
|
||||
)
|
||||
|
||||
// server holds the items the RPC server may need to access (auth,
|
||||
// config, shutdown, etc.)
|
||||
type server struct {
|
||||
wg sync.WaitGroup
|
||||
listeners []net.Listener
|
||||
authsha [sha256.Size]byte
|
||||
}
|
||||
|
||||
type clientContext struct {
|
||||
send chan []byte
|
||||
quit chan struct{} // closed on disconnect
|
||||
}
|
||||
|
||||
// parseListeners splits the list of listen addresses passed in addrs into
|
||||
// IPv4 and IPv6 slices and returns them. This allows easy creation of the
|
||||
// listeners on the correct interface "tcp4" and "tcp6". It also properly
|
||||
// detects addresses which apply to "all interfaces" and adds the address to
|
||||
// both slices.
|
||||
func parseListeners(addrs []string) ([]string, []string, error) {
|
||||
ipv4ListenAddrs := make([]string, 0, len(addrs)*2)
|
||||
ipv6ListenAddrs := make([]string, 0, len(addrs)*2)
|
||||
for _, addr := range addrs {
|
||||
host, _, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
// Shouldn't happen due to already being normalized.
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Empty host or host of * on plan9 is both IPv4 and IPv6.
|
||||
if host == "" || (host == "*" && runtime.GOOS == "plan9") {
|
||||
ipv4ListenAddrs = append(ipv4ListenAddrs, addr)
|
||||
ipv6ListenAddrs = append(ipv6ListenAddrs, addr)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse the IP.
|
||||
ip := net.ParseIP(host)
|
||||
if ip == nil {
|
||||
return nil, nil, fmt.Errorf("'%s' is not a valid IP "+
|
||||
"address", host)
|
||||
}
|
||||
|
||||
// To4 returns nil when the IP is not an IPv4 address, so use
|
||||
// this determine the address type.
|
||||
if ip.To4() == nil {
|
||||
ipv6ListenAddrs = append(ipv6ListenAddrs, addr)
|
||||
} else {
|
||||
ipv4ListenAddrs = append(ipv4ListenAddrs, addr)
|
||||
}
|
||||
}
|
||||
return ipv4ListenAddrs, ipv6ListenAddrs, nil
|
||||
}
|
||||
|
||||
// newServer returns a new instance of the server struct.
|
||||
func newServer(listenAddrs []string) (*server, error) {
|
||||
login := cfg.Username + ":" + cfg.Password
|
||||
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
|
||||
s := server{
|
||||
authsha: sha256.Sum256([]byte(auth)),
|
||||
}
|
||||
|
||||
// Check for existence of cert file and key file
|
||||
if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {
|
||||
// if both files do not exist, we generate them.
|
||||
err := genCertPair(cfg.RPCCert, cfg.RPCKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsConfig := tls.Config{
|
||||
Certificates: []tls.Certificate{keypair},
|
||||
}
|
||||
|
||||
ipv4ListenAddrs, ipv6ListenAddrs, err := parseListeners(listenAddrs)
|
||||
listeners := make([]net.Listener, 0,
|
||||
len(ipv6ListenAddrs)+len(ipv4ListenAddrs))
|
||||
for _, addr := range ipv4ListenAddrs {
|
||||
listener, err := tls.Listen("tcp4", addr, &tlsConfig)
|
||||
if err != nil {
|
||||
log.Warnf("RPCS: Can't listen on %s: %v", addr,
|
||||
err)
|
||||
continue
|
||||
}
|
||||
listeners = append(listeners, listener)
|
||||
}
|
||||
|
||||
for _, addr := range ipv6ListenAddrs {
|
||||
listener, err := tls.Listen("tcp6", addr, &tlsConfig)
|
||||
if err != nil {
|
||||
log.Warnf("RPCS: Can't listen on %s: %v", addr,
|
||||
err)
|
||||
continue
|
||||
}
|
||||
listeners = append(listeners, listener)
|
||||
}
|
||||
if len(listeners) == 0 {
|
||||
return nil, errors.New("no valid listen address")
|
||||
}
|
||||
|
||||
s.listeners = listeners
|
||||
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
// genCertPair generates a key/cert pair to the paths provided.
|
||||
func genCertPair(certFile, keyFile string) error {
|
||||
log.Infof("Generating TLS certificates...")
|
||||
|
||||
// Create directories for cert and key files if they do not yet exist.
|
||||
certDir, _ := filepath.Split(certFile)
|
||||
keyDir, _ := filepath.Split(keyFile)
|
||||
if err := os.MkdirAll(certDir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(keyDir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Generate cert pair.
|
||||
org := "btcwallet autogenerated cert"
|
||||
validUntil := time.Now().Add(10 * 365 * 24 * time.Hour)
|
||||
cert, key, err := btcutil.NewTLSCertPair(org, validUntil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write cert and key files.
|
||||
if err = ioutil.WriteFile(certFile, cert, 0666); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = ioutil.WriteFile(keyFile, key, 0600); err != nil {
|
||||
if rmErr := os.Remove(certFile); rmErr != nil {
|
||||
log.Warnf("Cannot remove written certificates: %v", rmErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Done generating TLS certificates")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReplyToFrontend responds to a marshaled JSON-RPC request with a
|
||||
// marshaled JSON-RPC response for both standard and extension
|
||||
// (websocket) clients. The returned error is ErrBadAuth if a
|
||||
// missing, incorrect, or duplicate authentication request is
|
||||
// received.
|
||||
func (s *server) ReplyToFrontend(msg []byte, ws, authenticated bool) ([]byte, error) {
|
||||
cmd, parseErr := btcjson.ParseMarshaledCmd(msg)
|
||||
var id interface{}
|
||||
if cmd != nil {
|
||||
id = cmd.Id()
|
||||
}
|
||||
|
||||
// If client is not already authenticated, the parsed request must
|
||||
// be for authentication.
|
||||
authCmd, ok := cmd.(*btcws.AuthenticateCmd)
|
||||
if authenticated {
|
||||
if ok {
|
||||
// Duplicate auth request.
|
||||
return nil, ErrBadAuth
|
||||
}
|
||||
} else {
|
||||
if !ok {
|
||||
// The first unauthenticated request must be an auth request.
|
||||
return nil, ErrBadAuth
|
||||
}
|
||||
|
||||
// Check credentials.
|
||||
login := authCmd.Username + ":" + authCmd.Passphrase
|
||||
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
|
||||
authSha := sha256.Sum256([]byte(auth))
|
||||
cmp := subtle.ConstantTimeCompare(authSha[:], s.authsha[:])
|
||||
if cmp != 1 {
|
||||
return nil, ErrBadAuth
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if parseErr != nil {
|
||||
response := btcjson.Reply{
|
||||
Id: &id,
|
||||
Error: &btcjson.ErrInvalidRequest,
|
||||
}
|
||||
mresponse, err := json.Marshal(response)
|
||||
// We expect the marshal to succeed. If it doesn't, it
|
||||
// indicates that either jsonErr (which is created by us) or
|
||||
// the id itself (which was successfully unmashaled) are of
|
||||
// some non-marshalable type.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return mresponse, nil
|
||||
}
|
||||
|
||||
cReq := NewClientRequest(cmd, ws)
|
||||
rawResp := cReq.Handle()
|
||||
|
||||
response := struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
Id interface{} `json:"id"`
|
||||
Result *json.RawMessage `json:"result"`
|
||||
Error *json.RawMessage `json:"error"`
|
||||
}{
|
||||
Jsonrpc: "1.0",
|
||||
Id: id,
|
||||
Result: rawResp.Result,
|
||||
Error: rawResp.Error,
|
||||
}
|
||||
mresponse, err := json.Marshal(response)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot marshal response: %v", err)
|
||||
response := btcjson.Reply{
|
||||
Id: &id,
|
||||
Error: &btcjson.ErrInternal,
|
||||
}
|
||||
mresponse, err = json.Marshal(&response)
|
||||
// We expect this marshal to succeed. If it doesn't, btcjson
|
||||
// returned an id with an non-marshalable type or ErrInternal
|
||||
// is just plain wrong.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return mresponse, nil
|
||||
}
|
||||
|
||||
// ServeRPCRequest processes and replies to a JSON-RPC client request.
|
||||
func (s *server) ServeRPCRequest(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := btcjson.GetRaw(r.Body)
|
||||
if err != nil {
|
||||
log.Errorf("RPCS: Error getting JSON message: %v", err)
|
||||
}
|
||||
|
||||
resp, err := s.ReplyToFrontend(body, false, true)
|
||||
if err == ErrBadAuth {
|
||||
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if _, err := w.Write(resp); err != nil {
|
||||
log.Warnf("RPCS: could not respond to RPC request: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// clientResponseDuplicator listens for new wallet listener channels
|
||||
// and duplicates messages sent to allClients to all connected clients.
|
||||
func clientResponseDuplicator() {
|
||||
clients := make(map[clientContext]struct{})
|
||||
|
||||
for {
|
||||
select {
|
||||
case cc := <-addClient:
|
||||
clients[cc] = struct{}{}
|
||||
|
||||
case n := <-allClients:
|
||||
for cc := range clients {
|
||||
select {
|
||||
case <-cc.quit:
|
||||
delete(clients, cc)
|
||||
case cc.send <- n:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyBtcdConnection notifies a frontend of the current connection
|
||||
// status of btcwallet to btcd.
|
||||
func NotifyBtcdConnection(reply chan []byte) {
|
||||
if btcd, ok := CurrentServerConn().(*BtcdRPCConn); ok {
|
||||
ntfn := btcws.NewBtcdConnectedNtfn(btcd.Connected())
|
||||
mntfn, err := ntfn.MarshalJSON()
|
||||
// btcws notifications must always marshal without error.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
reply <- mntfn
|
||||
}
|
||||
}
|
||||
|
||||
// stringQueue manages a queue of strings, reading from in and sending
|
||||
// the oldest unsent to out. This handler closes out and returns after
|
||||
// in is closed and any queued items are sent. Any reads on quit result
|
||||
// in immediate shutdown of the handler.
|
||||
func stringQueue(in <-chan string, out chan<- string, quit <-chan struct{}) {
|
||||
var q []string
|
||||
var dequeue chan<- string
|
||||
skipQueue := out
|
||||
var next string
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case n, ok := <-in:
|
||||
if !ok {
|
||||
// Sender closed input channel. Nil channel
|
||||
// and continue so the remaining queued
|
||||
// items may be sent. If the queue is empty,
|
||||
// break out of the loop.
|
||||
in = nil
|
||||
if dequeue == nil {
|
||||
break out
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Either send to out immediately if skipQueue is
|
||||
// non-nil (queue is empty) and reader is ready,
|
||||
// or append to the queue and send later.
|
||||
select {
|
||||
case skipQueue <- n:
|
||||
default:
|
||||
q = append(q, n)
|
||||
dequeue = out
|
||||
skipQueue = nil
|
||||
next = q[0]
|
||||
}
|
||||
|
||||
case dequeue <- next:
|
||||
copy(q, q[1:])
|
||||
q[len(q)-1] = "" // avoid leak
|
||||
q = q[:len(q)-1]
|
||||
if len(q) == 0 {
|
||||
// If the input chan was closed and nil'd,
|
||||
// break out of the loop.
|
||||
if in == nil {
|
||||
break out
|
||||
}
|
||||
dequeue = nil
|
||||
skipQueue = out
|
||||
} else {
|
||||
next = q[0]
|
||||
}
|
||||
|
||||
case <-quit:
|
||||
break out
|
||||
}
|
||||
}
|
||||
close(out)
|
||||
}
|
||||
|
||||
// WSSendRecv is the handler for websocket client connections. It loops
|
||||
// forever (until disconnected), reading JSON-RPC requests and sending
|
||||
// sending responses and notifications.
|
||||
func (s *server) WSSendRecv(ws *websocket.Conn, remoteAddr string, authenticated bool) {
|
||||
// Clear the read deadline set before the websocket hijacked
|
||||
// the connection.
|
||||
if err := ws.SetReadDeadline(time.Time{}); err != nil {
|
||||
log.Warnf("Cannot remove read deadline: %v", err)
|
||||
}
|
||||
|
||||
// Add client context so notifications duplicated to each
|
||||
// client are received by this client.
|
||||
recvQuit := make(chan struct{})
|
||||
sendQuit := make(chan struct{})
|
||||
cc := clientContext{
|
||||
send: make(chan []byte, 1), // buffer size is number of initial notifications
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
go func() {
|
||||
select {
|
||||
case <-recvQuit:
|
||||
case <-sendQuit:
|
||||
}
|
||||
log.Infof("Disconnected websocket client %s", remoteAddr)
|
||||
close(cc.quit)
|
||||
}()
|
||||
log.Infof("New websocket client %s", remoteAddr)
|
||||
|
||||
NotifyBtcdConnection(cc.send) // TODO(jrick): clients should explicitly request this.
|
||||
addClient <- cc
|
||||
|
||||
// received passes all received messages from the currently connected
|
||||
// frontend to the for-select loop. It is closed when reading a
|
||||
// message from the websocket connection fails (presumably due to
|
||||
// a disconnected client).
|
||||
recvQueueIn := make(chan string)
|
||||
|
||||
// Receive messages from websocket and send across jsonMsgs until
|
||||
// connection is lost
|
||||
go func() {
|
||||
for {
|
||||
var m string
|
||||
if err := websocket.Message.Receive(ws, &m); err != nil {
|
||||
select {
|
||||
case <-sendQuit:
|
||||
// Do not log error.
|
||||
|
||||
default:
|
||||
if err != io.EOF {
|
||||
log.Warnf("Websocket receive failed from client %s: %v",
|
||||
remoteAddr, err)
|
||||
}
|
||||
}
|
||||
close(recvQueueIn)
|
||||
close(recvQuit)
|
||||
return
|
||||
}
|
||||
recvQueueIn <- m
|
||||
}
|
||||
}()
|
||||
|
||||
// Manage queue of received messages for LIFO processing.
|
||||
recvQueueOut := make(chan string)
|
||||
go stringQueue(recvQueueIn, recvQueueOut, cc.quit)
|
||||
|
||||
badAuth := make(chan struct{})
|
||||
sendResp := make(chan []byte)
|
||||
go func() {
|
||||
out:
|
||||
for m := range recvQueueOut {
|
||||
resp, err := s.ReplyToFrontend([]byte(m), true, authenticated)
|
||||
if err == ErrBadAuth {
|
||||
select {
|
||||
case badAuth <- struct{}{}:
|
||||
case <-cc.quit:
|
||||
}
|
||||
break out
|
||||
}
|
||||
|
||||
// Authentication passed.
|
||||
authenticated = true
|
||||
|
||||
select {
|
||||
case sendResp <- resp:
|
||||
case <-cc.quit:
|
||||
break out
|
||||
}
|
||||
}
|
||||
close(sendResp)
|
||||
}()
|
||||
|
||||
const deadline time.Duration = 2 * time.Second
|
||||
|
||||
out:
|
||||
for {
|
||||
var m []byte
|
||||
var ok bool
|
||||
|
||||
select {
|
||||
case <-badAuth:
|
||||
// Bad auth. Disconnect.
|
||||
log.Warnf("Disconnecting unauthorized websocket client %s", remoteAddr)
|
||||
break out
|
||||
|
||||
case m = <-cc.send: // sends from external writers. never closes.
|
||||
case m, ok = <-sendResp:
|
||||
if !ok {
|
||||
// Nothing left to send. Return so the handler exits.
|
||||
break out
|
||||
}
|
||||
case <-cc.quit:
|
||||
break out
|
||||
}
|
||||
|
||||
err := ws.SetWriteDeadline(time.Now().Add(deadline))
|
||||
if err != nil {
|
||||
log.Errorf("Cannot set write deadline on client %s: %v", remoteAddr, err)
|
||||
break out
|
||||
}
|
||||
err = websocket.Message.Send(ws, string(m))
|
||||
if err != nil {
|
||||
log.Warnf("Websocket send failed to client %s: %v", remoteAddr, err)
|
||||
break out
|
||||
}
|
||||
}
|
||||
close(sendQuit)
|
||||
log.Tracef("Leaving function WSSendRecv")
|
||||
}
|
||||
|
||||
// NotifyNewBlockChainHeight notifies all frontends of a new
|
||||
// blockchain height. This sends the same notification as
|
||||
// btcd, so this can probably be removed.
|
||||
func NotifyNewBlockChainHeight(reply chan []byte, bs wallet.BlockStamp) {
|
||||
ntfn := btcws.NewBlockConnectedNtfn(bs.Hash.String(), bs.Height)
|
||||
mntfn, err := ntfn.MarshalJSON()
|
||||
// btcws notifications must always marshal without error.
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
reply <- mntfn
|
||||
}
|
||||
|
||||
var duplicateOnce sync.Once
|
||||
|
||||
// Start starts a HTTP server to provide standard RPC and extension
|
||||
// websocket connections for any number of btcwallet frontends.
|
||||
func (s *server) Start() {
|
||||
// A duplicator for notifications intended for all clients runs
|
||||
// in another goroutines. Any such notifications are sent to
|
||||
// the allClients channel and then sent to each connected client.
|
||||
//
|
||||
// Use a sync.Once to insure no extra duplicators run.
|
||||
go duplicateOnce.Do(clientResponseDuplicator)
|
||||
|
||||
log.Trace("Starting RPC server")
|
||||
|
||||
serveMux := http.NewServeMux()
|
||||
const rpcAuthTimeoutSeconds = 10
|
||||
httpServer := &http.Server{
|
||||
Handler: serveMux,
|
||||
|
||||
// Timeout connections which don't complete the initial
|
||||
// handshake within the allowed timeframe.
|
||||
ReadTimeout: time.Second * rpcAuthTimeoutSeconds,
|
||||
}
|
||||
serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
if err := s.checkAuth(r); err != nil {
|
||||
log.Warnf("Unauthorized client connection attempt")
|
||||
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
s.ServeRPCRequest(w, r)
|
||||
})
|
||||
serveMux.HandleFunc("/frontend", func(w http.ResponseWriter, r *http.Request) {
|
||||
authenticated := false
|
||||
if err := s.checkAuth(r); err != nil {
|
||||
// If auth was supplied but incorrect, rather than simply being
|
||||
// missing, immediately terminate the connection.
|
||||
if err != ErrNoAuth {
|
||||
log.Warnf("Disconnecting improperly authorized websocket client")
|
||||
http.Error(w, "401 Unauthorized.", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
authenticated = true
|
||||
}
|
||||
|
||||
// A new Server instance is created rather than just creating the
|
||||
// handler closure since the default server will disconnect the
|
||||
// client if the origin is unset.
|
||||
wsServer := websocket.Server{
|
||||
Handler: websocket.Handler(func(ws *websocket.Conn) {
|
||||
s.WSSendRecv(ws, r.RemoteAddr, authenticated)
|
||||
}),
|
||||
}
|
||||
wsServer.ServeHTTP(w, r)
|
||||
})
|
||||
for _, listener := range s.listeners {
|
||||
s.wg.Add(1)
|
||||
go func(listener net.Listener) {
|
||||
log.Infof("RPCS: RPC server listening on %s", listener.Addr())
|
||||
if err := httpServer.Serve(listener); err != nil {
|
||||
log.Errorf("Listener for %s exited with error: %v",
|
||||
listener.Addr(), err)
|
||||
}
|
||||
log.Tracef("RPCS: RPC listener done for %s", listener.Addr())
|
||||
s.wg.Done()
|
||||
}(listener)
|
||||
}
|
||||
}
|
||||
|
||||
// checkAuth checks the HTTP Basic authentication supplied by a frontend
|
||||
// in the HTTP request r. If the frontend's supplied authentication does
|
||||
// not match the username and password expected, a non-nil error is
|
||||
// returned.
|
||||
//
|
||||
// This check is time-constant.
|
||||
func (s *server) checkAuth(r *http.Request) error {
|
||||
authhdr := r.Header["Authorization"]
|
||||
if len(authhdr) == 0 {
|
||||
return ErrNoAuth
|
||||
}
|
||||
|
||||
authsha := sha256.Sum256([]byte(authhdr[0]))
|
||||
cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:])
|
||||
if cmp != 1 {
|
||||
return ErrBadAuth
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BtcdWS opens a websocket connection to a btcd instance.
|
||||
func BtcdWS(certificates []byte) (*websocket.Conn, error) {
|
||||
url := fmt.Sprintf("wss://%s/ws", cfg.RPCConnect)
|
||||
config, err := websocket.NewConfig(url, "https://localhost/")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// btcd uses a self-signed TLS certifiate which is used as the CA.
|
||||
pool := x509.NewCertPool()
|
||||
pool.AppendCertsFromPEM(certificates)
|
||||
config.TlsConfig = &tls.Config{
|
||||
RootCAs: pool,
|
||||
MinVersion: tls.VersionTLS12,
|
||||
}
|
||||
|
||||
// btcd requires basic authorization, so set the Authorization header.
|
||||
login := cfg.BtcdUsername + ":" + cfg.BtcdPassword
|
||||
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
|
||||
config.Header.Add("Authorization", auth)
|
||||
|
||||
// Dial connection.
|
||||
var ws *websocket.Conn
|
||||
var cerr error
|
||||
if cfg.Proxy != "" {
|
||||
proxy := &socks.Proxy{
|
||||
Addr: cfg.Proxy,
|
||||
Username: cfg.ProxyUser,
|
||||
Password: cfg.ProxyPass,
|
||||
}
|
||||
conn, err := proxy.Dial("tcp", cfg.RPCConnect)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsConn := tls.Client(conn, config.TlsConfig)
|
||||
ws, cerr = websocket.NewClient(config, tlsConn)
|
||||
} else {
|
||||
ws, cerr = websocket.DialConfig(config)
|
||||
}
|
||||
if cerr != nil {
|
||||
return nil, cerr
|
||||
}
|
||||
return ws, nil
|
||||
}
|
||||
|
||||
// BtcdConnect connects to a running btcd instance over a websocket
|
||||
// for sending and receiving chain-related messages, failing if the
|
||||
// connection cannot be established or is lost.
|
||||
func BtcdConnect(certificates []byte) (*BtcdRPCConn, error) {
|
||||
// Open websocket connection.
|
||||
ws, err := BtcdWS(certificates)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot open websocket connection to btcd: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create and start RPC connection using the btcd websocket.
|
||||
rpc := NewBtcdRPCConn(ws)
|
||||
rpc.Start()
|
||||
return rpc, nil
|
||||
}
|
||||
|
||||
// Handshake first checks that the websocket connection between btcwallet and
|
||||
// btcd is valid, that is, that there are no mismatching settings between
|
||||
// the two processes (such as running on different Bitcoin networks). If the
|
||||
// sanity checks pass, all wallets are set to be tracked against chain
|
||||
// notifications from this btcd connection.
|
||||
//
|
||||
// TODO(jrick): Track and Rescan commands should be replaced with a
|
||||
// single TrackSince function (or similar) which requests address
|
||||
// notifications and performs the rescan since some block height.
|
||||
func Handshake(rpc ServerConn) error {
|
||||
net, err := GetCurrentNet(rpc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if net != activeNet.Net {
|
||||
return errors.New("btcd and btcwallet running on different Bitcoin networks")
|
||||
}
|
||||
|
||||
// Request notifications for connected and disconnected blocks.
|
||||
if err := NotifyBlocks(rpc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get current best block. If this is before than the oldest
|
||||
// saved block hash, assume that this btcd instance is not yet
|
||||
// synced up to a previous btcd that was last used with this
|
||||
// wallet.
|
||||
bs, err := GetCurBlock()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get best block: %v", err)
|
||||
}
|
||||
NotifyNewBlockChainHeight(allClients, bs)
|
||||
NotifyBalances(allClients)
|
||||
|
||||
// Get default account. Only the default account is used to
|
||||
// track recently-seen blocks.
|
||||
a, err := AcctMgr.Account("")
|
||||
if err != nil {
|
||||
// No account yet is not a handshake error, but means our
|
||||
// handshake is done.
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(jrick): if height is less than the earliest-saved block
|
||||
// height, should probably wait for btcd to catch up.
|
||||
|
||||
// Check that there was not any reorgs done since last connection.
|
||||
// If so, rollback and rescan to catch up.
|
||||
it := a.Wallet.NewIterateRecentBlocks()
|
||||
for cont := it != nil; cont; cont = it.Prev() {
|
||||
bs := it.BlockStamp()
|
||||
log.Debugf("Checking for previous saved block with height %v hash %v",
|
||||
bs.Height, bs.Hash)
|
||||
|
||||
if _, err := GetBlock(rpc, bs.Hash.String()); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("Found matching block.")
|
||||
|
||||
// If we had to go back to any previous blocks (it.Next
|
||||
// returns true), then rollback the next and all child blocks.
|
||||
// This rollback is done here instead of in the blockMissing
|
||||
// check above for each removed block because Rollback will
|
||||
// try to write new tx and utxo files on each rollback.
|
||||
if it.Next() {
|
||||
bs := it.BlockStamp()
|
||||
err := AcctMgr.Rollback(bs.Height, &bs.Hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Set default account to be marked in sync with the current
|
||||
// blockstamp. This invalidates the iterator.
|
||||
a.Wallet.SetSyncedWith(bs)
|
||||
|
||||
// Begin tracking wallets against this btcd instance.
|
||||
AcctMgr.Track()
|
||||
if err := AcctMgr.RescanActiveAddresses(); err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO: Only begin tracking new unspent outputs as a result
|
||||
// of the rescan. This is also pretty racy, as a new block
|
||||
// could arrive between rescan and by the time the new outpoint
|
||||
// is added to btcd's websocket's unspent output set.
|
||||
AcctMgr.Track()
|
||||
|
||||
// (Re)send any unmined transactions to btcd in case of a btcd restart.
|
||||
AcctMgr.ResendUnminedTxs()
|
||||
|
||||
// Get current blockchain height and best block hash.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Iterator was invalid (wallet has never been synced) or there was a
|
||||
// huge chain fork + reorg (more than 20 blocks).
|
||||
AcctMgr.Track()
|
||||
if err := AcctMgr.RescanActiveAddresses(); err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO: only begin tracking new unspent outputs as a result of the
|
||||
// rescan. This is also racy (see comment for second Track above).
|
||||
AcctMgr.Track()
|
||||
AcctMgr.ResendUnminedTxs()
|
||||
return nil
|
||||
}
|
Loading…
Reference in a new issue