From ec92578194bf14e8dc4b2c9555ec442a0d9a552d Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Thu, 12 Jun 2014 11:39:26 -0500 Subject: [PATCH] Switch to gorilla websocket and btcrpcclient. Closes #96. --- account.go | 106 ++--- acctmgr.go | 20 +- cmd.go | 192 ++++----- ntfns.go | 290 ------------- rescan.go | 35 +- rpc.go | 101 ----- rpcclient.go | 811 ++++++++++++++++++++----------------- rpcserver.go | 1096 +++++++++++++++++++++++++++++++++++++++----------- sockets.go | 814 ------------------------------------- 9 files changed, 1468 insertions(+), 1997 deletions(-) delete mode 100644 ntfns.go delete mode 100644 rpc.go delete mode 100644 sockets.go diff --git a/account.go b/account.go index 179017b..b862353 100644 --- a/account.go +++ b/account.go @@ -19,7 +19,6 @@ package main import ( "bytes" "encoding/base64" - "encoding/hex" "fmt" "github.com/conformal/btcjson" "github.com/conformal/btcutil" @@ -43,7 +42,7 @@ type Account struct { func (a *Account) Lock() error { switch err := a.Wallet.Lock(); err { case nil: - NotifyWalletLockStateChange(a.Name(), true) + server.NotifyWalletLockStateChange(a.Name(), true) return nil case wallet.ErrWalletLocked: @@ -61,7 +60,7 @@ func (a *Account) Unlock(passphrase []byte) error { return err } - NotifyWalletLockStateChange(a.Name(), false) + server.NotifyWalletLockStateChange(a.Name(), false) return nil } @@ -436,15 +435,24 @@ func (a *Account) exportBase64() (map[string]string, error) { // Track requests btcd to send notifications of new transactions for // each address stored in a wallet. func (a *Account) Track() { - // Request notifications for transactions sending to all wallet - // addresses. - addrs := a.ActiveAddresses() - addrstrs := make([]string, 0, len(addrs)) - for addr := range addrs { - addrstrs = append(addrstrs, addr.EncodeAddress()) + client, err := accessClient() + if err != nil { + log.Errorf("No chain server client to track addresses.") + return } - if err := NotifyReceived(CurrentServerConn(), addrstrs); err != nil { + // Request notifications for transactions sending to all wallet + // addresses. + // + // TODO: return as slice? (doesn't have to be ordered, or + // SortedActiveAddresses would be fine.) + addrMap := a.ActiveAddresses() + addrs := make([]btcutil.Address, 0, len(addrMap)) + for addr := range addrMap { + addrs = append(addrs, addr) + } + + if err := client.NotifyReceived(addrs); err != nil { log.Error("Unable to request transaction updates for address.") } @@ -492,25 +500,23 @@ func (a *Account) RescanActiveJob() (*RescanJob, error) { // credits that are not known to have been mined into a block, and attempts // to send each to the chain server for relay. func (a *Account) ResendUnminedTxs() { + client, err := accessClient() + if err != nil { + log.Errorf("No chain server client to resend txs.") + return + } + txs := a.TxStore.UnminedDebitTxs() - txBuf := bytes.Buffer{} for _, tx := range txs { - if err := tx.MsgTx().Serialize(&txBuf); err != nil { - // Writing to a bytes.Buffer panics for OOM, and should - // not return any other errors. - panic(err) - } - hextx := hex.EncodeToString(txBuf.Bytes()) - txsha, err := SendRawTransaction(CurrentServerConn(), hextx) + txsha, err := client.SendRawTransaction(tx.MsgTx(), false) if err != nil { // TODO(jrick): Check error for if this tx is a double spend, // remove it if so. log.Warnf("Could not resend transaction %v: %v", txsha, err) - } else { - log.Debugf("Resent unmined transaction %v", txsha) + continue } - txBuf.Reset() + log.Debugf("Resent unmined transaction %v", txsha) } } @@ -564,7 +570,13 @@ func (a *Account) NewAddress() (btcutil.Address, error) { AcctMgr.MarkAddressForAccount(addr, a) // Request updates from btcd for new transactions sent to this address. - a.ReqNewTxsForAddress(addr) + client, err := accessClient() + if err != nil { + return nil, err + } + if err := client.NotifyReceived([]btcutil.Address{addr}); err != nil { + return nil, err + } return addr, nil } @@ -593,7 +605,13 @@ func (a *Account) NewChangeAddress() (btcutil.Address, error) { AcctMgr.MarkAddressForAccount(addr, a) // Request updates from btcd for new transactions sent to this address. - a.ReqNewTxsForAddress(addr) + client, err := accessClient() + if err != nil { + return nil, err + } + if err := client.NotifyReceived([]btcutil.Address{addr}); err != nil { + return nil, err + } return addr, nil } @@ -612,41 +630,25 @@ func (a *Account) RecoverAddresses(n int) error { if err != nil { return err } - addrStrs := make([]string, 0, len(addrs)) - for i := range addrs { - addrStrs = append(addrStrs, addrs[i].EncodeAddress()) - } // Run a goroutine to rescan blockchain for recovered addresses. - go func(addrs []string) { - err := Rescan(CurrentServerConn(), lastInfo.FirstBlock(), - addrs, nil) + go func() { + client, err := accessClient() + if err != nil { + log.Errorf("Cannot access chain server client to " + + "rescan recovered addresses.") + return + } + err = client.Rescan(lastInfo.FirstBlock(), addrs, nil) if err != nil { log.Errorf("Rescanning for recovered addresses "+ "failed: %v", err) } - }(addrStrs) + }() return nil } -// ReqNewTxsForAddress sends a message to btcd to request tx updates -// for addr for each new block that is added to the blockchain. -func (a *Account) ReqNewTxsForAddress(addr btcutil.Address) { - // Only support P2PKH addresses currently. - apkh, ok := addr.(*btcutil.AddressPubKeyHash) - if !ok { - return - } - - log.Debugf("Requesting notifications of TXs sending to address %v", apkh) - - err := NotifyReceived(CurrentServerConn(), []string{apkh.EncodeAddress()}) - if err != nil { - log.Error("Unable to request transaction updates for address.") - } -} - // ReqSpentUtxoNtfns sends a message to btcd to request updates for when // a stored UTXO has been spent. func ReqSpentUtxoNtfns(credits []*txstore.Credit) { @@ -658,7 +660,13 @@ func ReqSpentUtxoNtfns(credits []*txstore.Credit) { ops = append(ops, op) } - if err := NotifySpent(CurrentServerConn(), ops); err != nil { + client, err := accessClient() + if err != nil { + log.Errorf("Cannot access chain server client to " + + "request spent output notifications.") + return + } + if err := client.NotifySpent(ops); err != nil { log.Errorf("Cannot request notifications for spent outputs: %v", err) } diff --git a/acctmgr.go b/acctmgr.go index c182aeb..778ba0c 100644 --- a/acctmgr.go +++ b/acctmgr.go @@ -496,6 +496,10 @@ func (am *AccountManager) rescanListener() { noun := pickNoun(n, "address", "addresses") log.Infof("Finished rescan for %d %s", n, noun) + + default: + // Unexpected rescan message type. + panic(e) } AcctMgr.Release() } @@ -636,9 +640,8 @@ func (am *AccountManager) BlockNotify(bs *wallet.BlockStamp) { // changes, or sending these notifications as the utxos are added. confirmed := a.CalculateBalance(1) unconfirmed := a.CalculateBalance(0) - confirmed - NotifyWalletBalance(allClients, a.name, confirmed) - NotifyWalletBalanceUnconfirmed(allClients, a.name, - unconfirmed) + server.NotifyWalletBalance(a.name, confirmed) + server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed) // If this is the default account, update the block all accounts // are synced with, and schedule a wallet write. @@ -824,17 +827,6 @@ func (am *AccountManager) DumpWIFPrivateKey(addr btcutil.Address) (string, error return a.DumpWIFPrivateKey(addr) } -// NotifyBalances notifies a wallet frontend of all confirmed and unconfirmed -// account balances. -func (am *AccountManager) NotifyBalances(frontend chan []byte) { - for _, a := range am.AllAccounts() { - balance := a.CalculateBalance(1) - unconfirmed := a.CalculateBalance(0) - balance - NotifyWalletBalance(frontend, a.name, balance) - NotifyWalletBalanceUnconfirmed(frontend, a.name, unconfirmed) - } -} - // ListAccounts returns a map of account names to their current account // balances. The balances are calculated using minconf confirmations. func (am *AccountManager) ListAccounts(minconf int) map[string]float64 { diff --git a/cmd.go b/cmd.go index 6018140..c92d7e1 100644 --- a/cmd.go +++ b/cmd.go @@ -17,10 +17,7 @@ package main import ( - "github.com/conformal/btcjson" - "github.com/conformal/btcutil" - "github.com/conformal/btcwallet/wallet" - "github.com/conformal/btcwire" + "errors" "io/ioutil" "net" "net/http" @@ -28,10 +25,15 @@ import ( "os" "sync" "time" + + "github.com/conformal/btcutil" + "github.com/conformal/btcwallet/wallet" + "github.com/conformal/btcwire" ) var ( - cfg *config + cfg *config + server *rpcServer curBlock = struct { sync.RWMutex @@ -54,7 +56,12 @@ func GetCurBlock() (wallet.BlockStamp, error) { return bs, nil } - bb, err := GetBestBlock(CurrentServerConn()) + var bbHash *btcwire.ShaHash + var bbHeight int32 + client, err := accessClient() + if err == nil { + bbHash, bbHeight, err = client.GetBestBlock() + } if err != nil { unknown := wallet.BlockStamp{ Height: int32(btcutil.BlockHeightUnknown), @@ -62,18 +69,11 @@ func GetCurBlock() (wallet.BlockStamp, error) { return unknown, err } - hash, err := btcwire.NewShaHashFromStr(bb.Hash) - if err != nil { - return wallet.BlockStamp{ - Height: int32(btcutil.BlockHeightUnknown), - }, err - } - curBlock.Lock() - if bb.Height > curBlock.BlockStamp.Height { + if bbHeight > curBlock.BlockStamp.Height { bs = wallet.BlockStamp{ - Height: bb.Height, - Hash: *hash, + Height: bbHeight, + Hash: *bbHash, } curBlock.BlockStamp = bs } @@ -81,19 +81,49 @@ func GetCurBlock() (wallet.BlockStamp, error) { return bs, nil } -// NewJSONID is used to receive the next unique JSON ID for btcd -// requests, starting from zero and incrementing by one after each -// read. -var NewJSONID = make(chan uint64) +var clientAccessChan = make(chan *rpcClient) -// JSONIDGenerator sends incremental integers across a channel. This -// is meant to provide a unique value for the JSON ID field for btcd -// messages. -func JSONIDGenerator(c chan uint64) { - var n uint64 +func clientAccess(newClient <-chan *rpcClient) { + var client *rpcClient for { - c <- n - n++ + select { + case c := <-newClient: + client = c + case clientAccessChan <- client: + } + } +} + +func accessClient() (*rpcClient, error) { + c := <-clientAccessChan + if c == nil { + return nil, errors.New("chain server disconnected") + } + return c, nil +} + +func clientConnect(certs []byte, newClient chan<- *rpcClient) { + const initialWait = 5 * time.Second + wait := initialWait + for { + client, err := newRPCClient(certs) + if err != nil { + log.Warnf("Unable to open chain server client "+ + "connection: %v", err) + time.Sleep(wait) + wait <<= 1 + if wait > time.Minute { + wait = time.Minute + } + continue + } + + wait = initialWait + + client.Start() + newClient <- client + + client.WaitForShutdown() } } @@ -129,37 +159,28 @@ func main() { }() } + // Read CA file to verify a btcd TLS connection. + certs, err := ioutil.ReadFile(cfg.CAFile) + if err != nil { + log.Errorf("cannot open CA file: %v", err) + os.Exit(1) + } + // Check and update any old file locations. updateOldFileLocations() // Start account manager and open accounts. AcctMgr.Start() - // Read CA file to verify a btcd TLS connection. - cafile, err := ioutil.ReadFile(cfg.CAFile) + server, err = newRPCServer(cfg.SvrListeners) if err != nil { - log.Errorf("cannot open CA file: %v", err) + log.Errorf("Unable to create HTTP server: %v", err) os.Exit(1) } - go func() { - s, err := newServer(cfg.SvrListeners) - if err != nil { - log.Errorf("Unable to create HTTP server: %v", err) - os.Exit(1) - } - - // Start HTTP server to listen and send messages to frontend and btcd - // backend. Try reconnection if connection failed. - s.Start() - }() - - // Begin generating new IDs for JSON calls. - go JSONIDGenerator(NewJSONID) - - // Begin RPC server goroutines. - go RPCGateway() - go WalletRequestProcessor() + // Start HTTP server to listen and send messages to frontend and btcd + // backend. Try reconnection if connection failed. + server.Start() // Begin maintanence goroutines. go SendBeforeReceiveHistorySync(SendTxHistSyncChans.add, @@ -173,76 +194,7 @@ func main() { NotifyBalanceSyncerChans.remove, NotifyBalanceSyncerChans.access) - updateBtcd := make(chan *BtcdRPCConn) - go func() { - // Create an RPC connection and close the closed channel. - // - // It might be a better idea to create a new concrete type - // just for an always disconnected RPC connection and begin - // with that. - btcd := NewBtcdRPCConn(nil) - close(btcd.closed) - - // Maintain the current btcd connection. After reconnects, - // the current connection should be updated. - for { - select { - case conn := <-updateBtcd: - btcd = conn - - case access := <-accessServer: - access.server <- btcd - } - } - }() - - for { - btcd, err := BtcdConnect(cafile) - if err != nil { - log.Info("Retrying btcd connection in 5 seconds") - time.Sleep(5 * time.Second) - continue - } - updateBtcd <- btcd - - NotifyBtcdConnection(allClients) - log.Info("Established connection to btcd") - - // Perform handshake. - if err := Handshake(btcd); err != nil { - var message string - if jsonErr, ok := err.(btcjson.Error); ok { - message = jsonErr.Message - } else { - message = err.Error() - } - log.Errorf("Cannot complete handshake: %v", message) - log.Info("Retrying btcd connection in 5 seconds") - time.Sleep(5 * time.Second) - continue - } - - // Block goroutine until the connection is lost. - <-btcd.closed - NotifyBtcdConnection(allClients) - log.Info("Lost btcd connection") - } -} - -var accessServer = make(chan *AccessCurrentServerConn) - -// AccessCurrentServerConn is used to access the current RPC connection -// from the goroutine managing btcd-side RPC connections. -type AccessCurrentServerConn struct { - server chan ServerConn -} - -// CurrentServerConn returns the most recently-connected btcd-side -// RPC connection. -func CurrentServerConn() ServerConn { - access := &AccessCurrentServerConn{ - server: make(chan ServerConn), - } - accessServer <- access - return <-access.server + clientChan := make(chan *rpcClient) + go clientAccess(clientChan) + clientConnect(certs, clientChan) } diff --git a/ntfns.go b/ntfns.go deleted file mode 100644 index af298a4..0000000 --- a/ntfns.go +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Copyright (c) 2013, 2014 Conformal Systems LLC - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -// This file implements the notification handlers for btcd-side notifications. - -package main - -import ( - "encoding/hex" - "fmt" - "sync" - "time" - - "github.com/conformal/btcjson" - "github.com/conformal/btcscript" - "github.com/conformal/btcutil" - "github.com/conformal/btcwallet/txstore" - "github.com/conformal/btcwallet/wallet" - "github.com/conformal/btcwire" - "github.com/conformal/btcws" -) - -func parseBlock(block *btcws.BlockDetails) (*txstore.Block, int, error) { - if block == nil { - return nil, btcutil.TxIndexUnknown, nil - } - blksha, err := btcwire.NewShaHashFromStr(block.Hash) - if err != nil { - return nil, btcutil.TxIndexUnknown, err - } - b := &txstore.Block{ - Height: block.Height, - Hash: *blksha, - Time: time.Unix(block.Time, 0), - } - return b, block.Index, nil -} - -type notificationHandler func(btcjson.Cmd) error - -var notificationHandlers = map[string]notificationHandler{ - btcws.BlockConnectedNtfnMethod: NtfnBlockConnected, - btcws.BlockDisconnectedNtfnMethod: NtfnBlockDisconnected, - btcws.RecvTxNtfnMethod: NtfnRecvTx, - btcws.RedeemingTxNtfnMethod: NtfnRedeemingTx, - btcws.RescanProgressNtfnMethod: NtfnRescanProgress, -} - -// NtfnRecvTx handles the btcws.RecvTxNtfn notification. -func NtfnRecvTx(n btcjson.Cmd) error { - rtx, ok := n.(*btcws.RecvTxNtfn) - if !ok { - return fmt.Errorf("%v handler: unexpected type", n.Method()) - } - - bs, err := GetCurBlock() - if err != nil { - return fmt.Errorf("%v handler: cannot get current block: %v", n.Method(), err) - } - - rawTx, err := hex.DecodeString(rtx.HexTx) - if err != nil { - return fmt.Errorf("%v handler: bad hexstring: %v", n.Method(), err) - } - tx, err := btcutil.NewTxFromBytes(rawTx) - if err != nil { - return fmt.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err) - } - - block, txIdx, err := parseBlock(rtx.Block) - if err != nil { - return fmt.Errorf("%v handler: bad block: %v", n.Method(), err) - } - tx.SetIndex(txIdx) - - // For transactions originating from this wallet, the sent tx history should - // be recorded before the received history. If wallet created this tx, wait - // for the sent history to finish being recorded before continuing. - // - // TODO(jrick) this is wrong due to tx malleability. Cannot safely use the - // txsha as an identifier. - req := SendTxHistSyncRequest{ - txsha: *tx.Sha(), - response: make(chan SendTxHistSyncResponse), - } - SendTxHistSyncChans.access <- req - resp := <-req.response - if resp.ok { - // Wait until send history has been recorded. - <-resp.c - SendTxHistSyncChans.remove <- *tx.Sha() - } - - // For every output, find all accounts handling that output address (if any) - // and record the received txout. - for outIdx, txout := range tx.MsgTx().TxOut { - var accounts []*Account - // Errors don't matter here. If addrs is nil, the range below - // does nothing. - _, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript, - activeNet.Params) - for _, addr := range addrs { - a, err := AcctMgr.AccountByAddress(addr) - if err != nil { - continue - } - accounts = append(accounts, a) - } - - for _, a := range accounts { - txr, err := a.TxStore.InsertTx(tx, block) - if err != nil { - return err - } - cred, err := txr.AddCredit(uint32(outIdx), false) - if err != nil { - return err - } - AcctMgr.ds.ScheduleTxStoreWrite(a) - - // Notify frontends of tx. If the tx is unconfirmed, it is always - // notified and the outpoint is marked as notified. If the outpoint - // has already been notified and is now in a block, a txmined notifiction - // should be sent once to let frontends that all previous send/recvs - // for this unconfirmed tx are now confirmed. - op := *cred.OutPoint() - previouslyNotifiedReq := NotifiedRecvTxRequest{ - op: op, - response: make(chan NotifiedRecvTxResponse), - } - NotifiedRecvTxChans.access <- previouslyNotifiedReq - if <-previouslyNotifiedReq.response { - NotifiedRecvTxChans.remove <- op - } else { - // Notify frontends of new recv tx and mark as notified. - NotifiedRecvTxChans.add <- op - - ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net()) - if err != nil { - return err - } - NotifyNewTxDetails(allClients, a.Name(), ltr) - } - - // Notify frontends of new account balance. - confirmed := a.CalculateBalance(1) - unconfirmed := a.CalculateBalance(0) - confirmed - NotifyWalletBalance(allClients, a.name, confirmed) - NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed) - } - } - - return nil -} - -// NtfnBlockConnected handles btcd notifications resulting from newly -// connected blocks to the main blockchain. -// -// TODO(jrick): Send block time with notification. This will be used -// to mark wallet files with a possibly-better earliest block height, -// and will greatly reduce rescan times for wallets created with an -// out of sync btcd. -func NtfnBlockConnected(n btcjson.Cmd) error { - bcn, ok := n.(*btcws.BlockConnectedNtfn) - if !ok { - return fmt.Errorf("%v handler: unexpected type", n.Method()) - } - hash, err := btcwire.NewShaHashFromStr(bcn.Hash) - if err != nil { - return fmt.Errorf("%v handler: invalid hash string", n.Method()) - } - - // Update the blockstamp for the newly-connected block. - bs := &wallet.BlockStamp{ - Height: bcn.Height, - Hash: *hash, - } - curBlock.Lock() - curBlock.BlockStamp = *bs - curBlock.Unlock() - - // btcd notifies btcwallet about transactions first, and then sends - // the new block notification. New balance notifications for txs - // in blocks are therefore sent here after all tx notifications - // have arrived and finished being processed by the handlers. - workers := NotifyBalanceRequest{ - block: *hash, - wg: make(chan *sync.WaitGroup), - } - NotifyBalanceSyncerChans.access <- workers - if wg := <-workers.wg; wg != nil { - wg.Wait() - NotifyBalanceSyncerChans.remove <- *hash - } - AcctMgr.BlockNotify(bs) - - // Pass notification to frontends too. - marshaled, err := n.MarshalJSON() - // The parsed notification is expected to be marshalable. - if err != nil { - panic(err) - } - allClients <- marshaled - - return nil -} - -// NtfnBlockDisconnected handles btcd notifications resulting from -// blocks disconnected from the main chain in the event of a chain -// switch and notifies frontends of the new blockchain height. -func NtfnBlockDisconnected(n btcjson.Cmd) error { - bdn, ok := n.(*btcws.BlockDisconnectedNtfn) - if !ok { - return fmt.Errorf("%v handler: unexpected type", n.Method()) - } - hash, err := btcwire.NewShaHashFromStr(bdn.Hash) - if err != nil { - return fmt.Errorf("%v handler: invalid hash string", n.Method()) - } - - // Rollback Utxo and Tx data stores. - if err = AcctMgr.Rollback(bdn.Height, hash); err != nil { - return err - } - - // Pass notification to frontends too. - marshaled, err := n.MarshalJSON() - // A btcws.BlockDisconnectedNtfn is expected to marshal without error. - // If it does, it indicates that one of its struct fields is of a - // non-marshalable type. - if err != nil { - panic(err) - } - allClients <- marshaled - - return nil -} - -// NtfnRedeemingTx handles btcd redeemingtx notifications resulting from a -// transaction spending a watched outpoint. -func NtfnRedeemingTx(n btcjson.Cmd) error { - cn, ok := n.(*btcws.RedeemingTxNtfn) - if !ok { - return fmt.Errorf("%v handler: unexpected type", n.Method()) - } - - rawTx, err := hex.DecodeString(cn.HexTx) - if err != nil { - return fmt.Errorf("%v handler: bad hexstring: %v", n.Method(), err) - } - tx, err := btcutil.NewTxFromBytes(rawTx) - if err != nil { - return fmt.Errorf("%v handler: bad transaction bytes: %v", n.Method(), err) - } - - block, txIdx, err := parseBlock(cn.Block) - if err != nil { - return fmt.Errorf("%v handler: bad block: %v", n.Method(), err) - } - tx.SetIndex(txIdx) - return AcctMgr.RecordSpendingTx(tx, block) -} - -// NtfnRescanProgress handles btcd rescanprogress notifications resulting -// from a partially completed rescan. -func NtfnRescanProgress(n btcjson.Cmd) error { - cn, ok := n.(*btcws.RescanProgressNtfn) - if !ok { - return fmt.Errorf("%v handler: unexpected type", n.Method()) - } - - // Notify the rescan manager of the completed partial progress for - // the current rescan. - AcctMgr.rm.MarkProgress(cn.LastProcessed) - - return nil -} diff --git a/rescan.go b/rescan.go index 8ba78eb..c8e44a8 100644 --- a/rescan.go +++ b/rescan.go @@ -140,9 +140,9 @@ func (b *rescanBatch) merge(job *RescanJob) { } } -// Status types for the handler. -type rescanProgress int32 -type rescanFinished error +type rescanFinished struct { + error +} // jobHandler runs the RescanManager's for-select loop to manage rescan jobs // and dispatch requests. @@ -190,7 +190,7 @@ func (m *RescanManager) jobHandler() { if m.msgs != nil { m.msgs <- &RescanFinishedMsg{ Addresses: curBatch.addrs, - Error: error(s), + Error: s.error, } } curBatch.done() @@ -204,6 +204,10 @@ func (m *RescanManager) jobHandler() { m.msgs <- (*RescanStartedMsg)(job) } } + + default: + // Unexpected status message + panic(s) } } } @@ -214,16 +218,17 @@ func (m *RescanManager) jobHandler() { // The jobHandler is notified when the processing the rescan finishes. func (m *RescanManager) rpcHandler() { for job := range m.sendJob { - var addrStrs []string - for _, addrs := range job.Addresses { - for i := range addrs { - addrStrs = append(addrStrs, addrs[i].EncodeAddress()) - } + var addrs []btcutil.Address + for _, accountAddrs := range job.Addresses { + addrs = append(addrs, accountAddrs...) } - - c := CurrentServerConn() - err := Rescan(c, job.StartHeight, addrStrs, job.OutPoints) - m.status <- rescanFinished(err) + client, err := accessClient() + if err != nil { + m.status <- rescanFinished{err} + return + } + err = client.Rescan(job.StartHeight, addrs, job.OutPoints) + m.status <- rescanFinished{err} } } @@ -260,6 +265,6 @@ func (m *RescanManager) SubmitJob(job *RescanJob) <-chan struct{} { // MarkProgress messages the RescanManager with the height of the block // last processed by a running rescan. -func (m *RescanManager) MarkProgress(height int32) { - m.status <- rescanProgress(height) +func (m *RescanManager) MarkProgress(height rescanProgress) { + m.status <- height } diff --git a/rpc.go b/rpc.go deleted file mode 100644 index d761107..0000000 --- a/rpc.go +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright (c) 2013, 2014 Conformal Systems LLC - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -package main - -import ( - "encoding/json" - "github.com/conformal/btcjson" -) - -// RawRPCResponse is a response to a JSON-RPC request with delayed -// unmarshaling. -type RawRPCResponse struct { - Id *uint64 - Result *json.RawMessage `json:"result"` - Error *json.RawMessage `json:"error"` -} - -// FinishUnmarshal unmarshals the result and error of a raw RPC response. -// If v is non-nil, the result is unmarshaled into the variable pointed -// to by the interface rather than using the rules in the encoding/json -// package to allocate a new variable for the result. The final result -// and JSON-RPC error is returned. -// -// If the returned error is non-nil, it will be a btcjson.Error. -func (r *RawRPCResponse) FinishUnmarshal(v interface{}) (interface{}, error) { - // JSON-RPC spec makes this handling easier-ish because both result and - // error cannot be non-nil. - if r.Error != nil { - var jsonErr btcjson.Error - if err := json.Unmarshal([]byte(*r.Error), &jsonErr); err != nil { - return nil, btcjson.Error{ - Code: btcjson.ErrParse.Code, - Message: err.Error(), - } - } - return nil, jsonErr - } - if r.Result != nil { - if err := json.Unmarshal([]byte(*r.Result), &v); err != nil { - return nil, btcjson.Error{ - Code: btcjson.ErrParse.Code, - Message: err.Error(), - } - } - return v, nil - } - return nil, nil -} - -// ClientRequest is a type holding a bitcoin client's request and -// a channel to send the response. -type ClientRequest struct { - ws bool - request btcjson.Cmd - response chan RawRPCResponse -} - -// NewClientRequest creates a new ClientRequest from a btcjson.Cmd. -func NewClientRequest(request btcjson.Cmd, ws bool) *ClientRequest { - return &ClientRequest{ - ws: ws, - request: request, - response: make(chan RawRPCResponse), - } -} - -// Handle sends a client request to the RPC gateway for processing, -// and returns the result when handling is finished. -func (r *ClientRequest) Handle() RawRPCResponse { - clientRequests <- r - return <-r.response -} - -// ServerRequest is a type responsible for handling requests to a bitcoin -// server and providing a method to access the response. -type ServerRequest struct { - request btcjson.Cmd - response chan RawRPCResponse -} - -// NewServerRequest creates a new ServerRequest from a btcjson.Cmd. -func NewServerRequest(request btcjson.Cmd) *ServerRequest { - return &ServerRequest{ - request: request, - response: make(chan RawRPCResponse, 1), - } -} diff --git a/rpcclient.go b/rpcclient.go index 0dbadf1..46fba67 100644 --- a/rpcclient.go +++ b/rpcclient.go @@ -14,405 +14,490 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -// This file implements the websocket client connection to a bitcoin RPC -// server. - package main import ( - "code.google.com/p/go.net/websocket" - "encoding/hex" - "encoding/json" "errors" - "github.com/conformal/btcjson" + "fmt" + "sync" + "time" + + "github.com/conformal/btcrpcclient" + "github.com/conformal/btcscript" "github.com/conformal/btcutil" + "github.com/conformal/btcwallet/txstore" + "github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwire" "github.com/conformal/btcws" - "io" ) -// ServerConn is an interface representing a client connection to a bitcoin RPC -// server. -type ServerConn interface { - // SendRequest sends a bitcoin RPC request, returning a channel to - // read the reply. A channel is used so both synchronous and - // asynchronous RPC can be supported. - SendRequest(request *ServerRequest) chan RawRPCResponse +// InvalidNotificationError describes an error due to an invalid chain server +// notification and should be warned by wallet, but does not indicate an +// problem with the current wallet state. +type InvalidNotificationError struct { + error } -// ErrBtcdDisconnected describes an error where an operation cannot -// successfully complete due to btcwallet not being connected to -// btcd. -var ErrBtcdDisconnected = btcjson.Error{ - Code: -1, - Message: "btcd disconnected", +var ( + // MismatchingNetworks represents an error where a client connection + // to btcd cannot succeed due to btcwallet and btcd operating on + // different bitcoin networks. + ErrMismatchedNets = errors.New("mismatched networks") +) + +const ( + // maxConcurrentClientRequests is the maximum number of + // unhandled/running requests that the server will run for a websocket + // client at a time. Beyond this limit, additional request reads will + // block until a running request handler finishes. This limit exists to + // prevent a single connection from causing a denial of service attack + // with an unnecessarily large number of requests. + maxConcurrentClientRequests = 20 + + // maxUnhandledNotifications is the maximum number of still marshaled + // and unhandled notifications. If this limit is reached, the + // btcrpcclient client notification handlers will begin blocking until + // an unhandled notification is processed. + maxUnhandledNotifications = 50 +) + +type notificationChan chan notification + +type blockSummary struct { + hash *btcwire.ShaHash + height int32 } -// ErrBtcdDisconnectedRaw is the raw JSON encoding of ErrBtcdDisconnected. -var ErrBtcdDisconnectedRaw = json.RawMessage(`{"code":-1,"message":"btcd disconnected"}`) - -// BtcdRPCConn is a type managing a client connection to a btcd RPC server -// over websockets. -type BtcdRPCConn struct { - ws *websocket.Conn - addRequest chan *AddRPCRequest - closed chan struct{} +type acceptedTx struct { + tx *btcutil.Tx + block *btcws.BlockDetails // nil if unmined } -// Ensure that BtcdRPCConn can be used as an RPCConn. -var _ ServerConn = &BtcdRPCConn{} - -// NewBtcdRPCConn creates a new RPC connection from a btcd websocket -// connection to btcd. -func NewBtcdRPCConn(ws *websocket.Conn) *BtcdRPCConn { - conn := &BtcdRPCConn{ - ws: ws, - addRequest: make(chan *AddRPCRequest), - closed: make(chan struct{}), +// Notification types. These are defined here and processed from from reading +// a notificationChan to avoid handling these notifications directly in +// btcrpcclient callbacks, which isn't very go-like and doesn't allow +// blocking client calls. +type ( + // Container type for any notification. + notification interface { + handleNotification() error } - return conn + + blockConnected blockSummary + blockDisconnected blockSummary + recvTx acceptedTx + redeemingTx acceptedTx + rescanProgress int32 +) + +func (c notificationChan) onBlockConnected(hash *btcwire.ShaHash, height int32) { + c <- (blockConnected)(blockSummary{hash, height}) } -// SendRequest sends an RPC request and returns a channel to read the response's -// result and error. Part of the RPCConn interface. -func (btcd *BtcdRPCConn) SendRequest(request *ServerRequest) chan RawRPCResponse { - select { - case <-btcd.closed: - // The connection has closed, so instead of adding and sending - // a request, return a channel that just replies with the - // error for a disconnected btcd. - responseChan := make(chan RawRPCResponse, 1) - responseChan <- RawRPCResponse{Error: &ErrBtcdDisconnectedRaw} - return responseChan +func (c notificationChan) onBlockDisconnected(hash *btcwire.ShaHash, height int32) { + c <- (blockDisconnected)(blockSummary{hash, height}) +} - default: - addRequest := &AddRPCRequest{ - Request: request, - ResponseChan: make(chan chan RawRPCResponse, 1), +func (c notificationChan) onRecvTx(tx *btcutil.Tx, block *btcws.BlockDetails) { + c <- recvTx{tx, block} +} + +func (c notificationChan) onRedeemingTx(tx *btcutil.Tx, block *btcws.BlockDetails) { + c <- redeemingTx{tx, block} +} + +func (c notificationChan) onRescanProgress(height int32) { + c <- rescanProgress(height) +} + +func (n blockConnected) handleNotification() error { + // Update the blockstamp for the newly-connected block. + bs := &wallet.BlockStamp{ + Height: n.height, + Hash: *n.hash, + } + curBlock.Lock() + curBlock.BlockStamp = *bs + curBlock.Unlock() + + // btcd notifies btcwallet about transactions first, and then sends + // the new block notification. New balance notifications for txs + // in blocks are therefore sent here after all tx notifications + // have arrived and finished being processed by the handlers. + workers := NotifyBalanceRequest{ + block: *n.hash, + wg: make(chan *sync.WaitGroup), + } + NotifyBalanceSyncerChans.access <- workers + if wg := <-workers.wg; wg != nil { + wg.Wait() + NotifyBalanceSyncerChans.remove <- *n.hash + } + AcctMgr.BlockNotify(bs) + + // Pass notification to frontends too. + if server != nil { + // TODO: marshaling should be perfomred by the server, and + // sent only to client that have requested the notification. + marshaled, err := n.MarshalJSON() + // The parsed notification is expected to be marshalable. + if err != nil { + panic(err) } - btcd.addRequest <- addRequest - return <-addRequest.ResponseChan + server.broadcasts <- marshaled } + + return nil } -// Connected returns whether the connection remains established to the RPC -// server. +// MarshalJSON creates the JSON encoding of the chain notification to pass +// to any connected wallet clients. This should never error. +func (n blockConnected) MarshalJSON() ([]byte, error) { + nn := btcws.NewBlockConnectedNtfn(n.hash.String(), n.height) + return nn.MarshalJSON() +} + +func (n blockDisconnected) handleNotification() error { + // Rollback Utxo and Tx data stores. + if err := AcctMgr.Rollback(n.height, n.hash); err != nil { + return err + } + + // Pass notification to frontends too. + if server != nil { + // TODO: marshaling should be perfomred by the server, and + // sent only to client that have requested the notification. + marshaled, err := n.MarshalJSON() + // A btcws.BlockDisconnectedNtfn is expected to marshal without error. + // If it does, it indicates that one of its struct fields is of a + // non-marshalable type. + if err != nil { + panic(err) + } + server.broadcasts <- marshaled + } + + return nil +} + +// MarshalJSON creates the JSON encoding of the chain notification to pass +// to any connected wallet clients. This should never error. +func (n blockDisconnected) MarshalJSON() ([]byte, error) { + nn := btcws.NewBlockDisconnectedNtfn(n.hash.String(), n.height) + return nn.MarshalJSON() +} + +func parseBlock(block *btcws.BlockDetails) (*txstore.Block, int, error) { + if block == nil { + return nil, btcutil.TxIndexUnknown, nil + } + blksha, err := btcwire.NewShaHashFromStr(block.Hash) + if err != nil { + return nil, btcutil.TxIndexUnknown, err + } + b := &txstore.Block{ + Height: block.Height, + Hash: *blksha, + Time: time.Unix(block.Time, 0), + } + return b, block.Index, nil +} + +func (n recvTx) handleNotification() error { + block, txIdx, err := parseBlock(n.block) + if err != nil { + return InvalidNotificationError{err} + } + n.tx.SetIndex(txIdx) + + bs, err := GetCurBlock() + if err != nil { + return fmt.Errorf("cannot get current block: %v", err) + } + + // For transactions originating from this wallet, the sent tx history should + // be recorded before the received history. If wallet created this tx, wait + // for the sent history to finish being recorded before continuing. + // + // TODO(jrick) this is wrong due to tx malleability. Cannot safely use the + // txsha as an identifier. + req := SendTxHistSyncRequest{ + txsha: *n.tx.Sha(), + response: make(chan SendTxHistSyncResponse), + } + SendTxHistSyncChans.access <- req + resp := <-req.response + if resp.ok { + // Wait until send history has been recorded. + <-resp.c + SendTxHistSyncChans.remove <- *n.tx.Sha() + } + + // For every output, find all accounts handling that output address (if any) + // and record the received txout. + for outIdx, txout := range n.tx.MsgTx().TxOut { + var accounts []*Account + // Errors don't matter here. If addrs is nil, the range below + // does nothing. + _, addrs, _, _ := btcscript.ExtractPkScriptAddrs(txout.PkScript, + activeNet.Params) + for _, addr := range addrs { + a, err := AcctMgr.AccountByAddress(addr) + if err != nil { + continue + } + accounts = append(accounts, a) + } + + for _, a := range accounts { + txr, err := a.TxStore.InsertTx(n.tx, block) + if err != nil { + return err + } + cred, err := txr.AddCredit(uint32(outIdx), false) + if err != nil { + return err + } + AcctMgr.ds.ScheduleTxStoreWrite(a) + + // Notify frontends of tx. If the tx is unconfirmed, it is always + // notified and the outpoint is marked as notified. If the outpoint + // has already been notified and is now in a block, a txmined notifiction + // should be sent once to let frontends that all previous send/recvs + // for this unconfirmed tx are now confirmed. + op := *cred.OutPoint() + previouslyNotifiedReq := NotifiedRecvTxRequest{ + op: op, + response: make(chan NotifiedRecvTxResponse), + } + NotifiedRecvTxChans.access <- previouslyNotifiedReq + if <-previouslyNotifiedReq.response { + NotifiedRecvTxChans.remove <- op + } else { + // Notify frontends of new recv tx and mark as notified. + NotifiedRecvTxChans.add <- op + + ltr, err := cred.ToJSON(a.Name(), bs.Height, a.Wallet.Net()) + if err != nil { + return err + } + server.NotifyNewTxDetails(a.Name(), ltr) + } + + // Notify frontends of new account balance. + confirmed := a.CalculateBalance(1) + unconfirmed := a.CalculateBalance(0) - confirmed + server.NotifyWalletBalance(a.name, confirmed) + server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed) + } + } + + return nil +} + +func (n redeemingTx) handleNotification() error { + block, txIdx, err := parseBlock(n.block) + if err != nil { + return InvalidNotificationError{err} + } + n.tx.SetIndex(txIdx) + return AcctMgr.RecordSpendingTx(n.tx, block) +} + +func (n rescanProgress) handleNotification() error { + AcctMgr.rm.MarkProgress(n) + return nil +} + +type rpcClient struct { + *btcrpcclient.Client // client to btcd + chainNotifications notificationChan + wg sync.WaitGroup +} + +func newRPCClient(certs []byte) (*rpcClient, error) { + ntfns := make(notificationChan, maxUnhandledNotifications) + client := rpcClient{ + chainNotifications: ntfns, + } + initializedClient := make(chan struct{}) + ntfnCallbacks := btcrpcclient.NotificationHandlers{ + OnClientConnected: func() { + log.Info("Established connection to btcd") + <-initializedClient + + // nil client to broadcast to all connected clients + server.NotifyConnectionStatus(nil) + + err := client.Handshake() + if err != nil { + log.Errorf("Cannot complete handshake: %v", err) + client.Stop() + } + }, + OnBlockConnected: ntfns.onBlockConnected, + OnBlockDisconnected: ntfns.onBlockDisconnected, + OnRecvTx: ntfns.onRecvTx, + OnRedeemingTx: ntfns.onRedeemingTx, + OnRescanProgress: ntfns.onRescanProgress, + } + conf := btcrpcclient.ConnConfig{ + Host: cfg.RPCConnect, + Endpoint: "ws", + User: cfg.BtcdUsername, + Pass: cfg.BtcdPassword, + Certificates: certs, + } + c, err := btcrpcclient.New(&conf, &ntfnCallbacks) + if err != nil { + return nil, err + } + client.Client = c + close(initializedClient) + return &client, nil +} + +func (c *rpcClient) Start() { + c.wg.Add(1) + go c.handleNotifications() +} + +func (c *rpcClient) Stop() { + if !c.Client.Disconnected() { + log.Warn("Disconnecting chain server client connection") + c.Client.Shutdown() + } + close(c.chainNotifications) +} + +func (c *rpcClient) WaitForShutdown() { + c.Client.WaitForShutdown() + c.wg.Wait() +} + +func (c *rpcClient) handleNotifications() { + for n := range c.chainNotifications { + AcctMgr.Grab() + err := n.handleNotification() + if err != nil { + switch e := err.(type) { + case InvalidNotificationError: + log.Warnf("Ignoring invalid notification: %v", e) + default: + log.Errorf("Cannot handle notification: %v", e) + } + } + AcctMgr.Release() + } + c.wg.Done() +} + +// Handshake first checks that the websocket connection between btcwallet and +// btcd is valid, that is, that there are no mismatching settings between +// the two processes (such as running on different Bitcoin networks). If the +// sanity checks pass, all wallets are set to be tracked against chain +// notifications from this btcd connection. // -// This function probably should be removed, as any checks for confirming -// the connection are no longer valid after the check and may result in -// races. -func (btcd *BtcdRPCConn) Connected() bool { - select { - case <-btcd.closed: - return false - - default: - return true - } -} - -// Close forces closing the current btcd connection. -func (btcd *BtcdRPCConn) Close() { - select { - case <-btcd.closed: - default: - close(btcd.closed) - } -} - -// AddRPCRequest is used to add an RPCRequest to the pool of requests -// being manaaged by a btcd RPC connection. -type AddRPCRequest struct { - Request *ServerRequest - ResponseChan chan chan RawRPCResponse -} - -// send performs the actual send of the marshaled request over the btcd -// websocket connection. -func (btcd *BtcdRPCConn) send(rpcrequest *ServerRequest) error { - // btcjson.Cmds define their own MarshalJSON which returns an error - // to satisify the json.Marshaler interface, but should never error. - // If an error does occur, it is due to a struct containing a type - // that is not marshalable, so panic here rather than silently - // ignoring it. - mrequest, err := rpcrequest.request.MarshalJSON() +// TODO(jrick): Track and Rescan commands should be replaced with a +// single TrackSince function (or similar) which requests address +// notifications and performs the rescan since some block height. +func (c *rpcClient) Handshake() error { + net, err := c.GetCurrentNet() if err != nil { - panic(err) + return err + } + if net != activeNet.Net { + return ErrMismatchedNets } - return websocket.Message.Send(btcd.ws, mrequest) -} -// Start starts the goroutines required to send RPC requests and listen for -// replies. -func (btcd *BtcdRPCConn) Start() { - done := btcd.closed - responses := make(chan RawRPCResponse) + // Request notifications for connected and disconnected blocks. + if err := c.NotifyBlocks(); err != nil { + return err + } - // Maintain a map of JSON IDs to RPCRequests currently being waited on. - go func() { - m := make(map[uint64]*ServerRequest) - for { - select { - case addrequest := <-btcd.addRequest: - rpcrequest := addrequest.Request - m[rpcrequest.request.Id().(uint64)] = rpcrequest + // Get current best block. If this is before than the oldest + // saved block hash, assume that this btcd instance is not yet + // synced up to a previous btcd that was last used with this + // wallet. + bs, err := GetCurBlock() + if err != nil { + return fmt.Errorf("cannot get best block: %v", err) + } + if server != nil { + server.NotifyNewBlockChainHeight(&bs) + server.NotifyBalances(nil) + } - if err := btcd.send(rpcrequest); err != nil { - // Connection lost. - log.Infof("Cannot complete btcd websocket send: %v", - err) - if err := btcd.ws.Close(); err != nil { - log.Warnf("Cannot close btcd "+ - "websocket connection: %v", err) - } - close(done) - } + // Get default account. Only the default account is used to + // track recently-seen blocks. + a, err := AcctMgr.Account("") + if err != nil { + // No account yet is not a handshake error, but means our + // handshake is done. + return nil + } - addrequest.ResponseChan <- rpcrequest.response + // TODO(jrick): if height is less than the earliest-saved block + // height, should probably wait for btcd to catch up. - case rawResponse, ok := <-responses: - if !ok { - responses = nil - close(done) - break - } - rpcrequest, ok := m[*rawResponse.Id] - if !ok { - log.Warnf("Received unexpected btcd response") - continue - } - delete(m, *rawResponse.Id) + // Check that there was not any reorgs done since last connection. + // If so, rollback and rescan to catch up. + it := a.Wallet.NewIterateRecentBlocks() + for cont := it != nil; cont; cont = it.Prev() { + bs := it.BlockStamp() + log.Debugf("Checking for previous saved block with height %v hash %v", + bs.Height, bs.Hash) - rpcrequest.response <- rawResponse + if _, err := c.GetBlock(&bs.Hash); err != nil { + continue + } - case <-done: - resp := RawRPCResponse{Error: &ErrBtcdDisconnectedRaw} - for _, request := range m { - request.response <- resp - } - return + log.Debug("Found matching block.") + + // If we had to go back to any previous blocks (it.Next + // returns true), then rollback the next and all child blocks. + // This rollback is done here instead of in the blockMissing + // check above for each removed block because Rollback will + // try to write new tx and utxo files on each rollback. + if it.Next() { + bs := it.BlockStamp() + err := AcctMgr.Rollback(bs.Height, &bs.Hash) + if err != nil { + return err } } - }() - // Listen for replies/notifications from btcd, and decide how to handle them. - go func() { - // Idea: instead of reading btcd messages from just one websocket - // connection, maybe use two so the same connection isn't used - // for both notifications and responses? Should make handling - // must faster as unnecessary unmarshal attempts could be avoided. + // Set default account to be marked in sync with the current + // blockstamp. This invalidates the iterator. + a.Wallet.SetSyncedWith(bs) - for { - var m string - if err := websocket.Message.Receive(btcd.ws, &m); err != nil { - // Log warning if btcd did not disconnect. - if err != io.EOF { - log.Infof("Cannot receive btcd websocket message: %v", - err) - } - if err := btcd.ws.Close(); err != nil { - log.Warnf("Cannot close btcd "+ - "websocket connection: %v", err) - } - close(responses) - return - } - - // Try notifications (requests with nil ids) first. - n, err := unmarshalNotification(m) - if err == nil { - svrNtfns <- n - continue - } - - // Must be a response. - r, err := unmarshalResponse(m) - if err == nil { - responses <- r - continue - } - - // Not sure what was received but it isn't correct. - log.Warnf("Received invalid message from btcd") + // Begin tracking wallets against this btcd instance. + AcctMgr.Track() + if err := AcctMgr.RescanActiveAddresses(); err != nil { + return err } - }() -} - -// unmarshalResponse attempts to unmarshal a marshaled JSON-RPC response. -func unmarshalResponse(s string) (RawRPCResponse, error) { - var r RawRPCResponse - if err := json.Unmarshal([]byte(s), &r); err != nil { - return r, err - } - - // Check for a valid ID. - if r.Id == nil { - return r, errors.New("id is null") - } - return r, nil -} - -// unmarshalNotification attempts to unmarshal a marshaled JSON-RPC -// notification (Request with a nil or no ID). -func unmarshalNotification(s string) (btcjson.Cmd, error) { - req, err := btcjson.ParseMarshaledCmd([]byte(s)) - if err != nil { - return nil, err - } - - if req.Id() != nil { - return nil, errors.New("id is non-nil") - } - - return req, nil -} - -// GetBestBlock gets both the block height and hash of the best block -// in the main chain. -func GetBestBlock(rpc ServerConn) (*btcws.GetBestBlockResult, error) { - cmd := btcws.NewGetBestBlockCmd(<-NewJSONID) - response := <-rpc.SendRequest(NewServerRequest(cmd)) - - var resultData btcws.GetBestBlockResult - if _, err := response.FinishUnmarshal(&resultData); err != nil { - return nil, err - } - return &resultData, nil -} - -// GetBlock requests details about a block with the given hash. -func GetBlock(rpc ServerConn, blockHash string) (*btcjson.BlockResult, error) { - // NewGetBlockCmd should never fail with no optargs. If this does fail, - // panic now rather than later. - cmd, err := btcjson.NewGetBlockCmd(<-NewJSONID, blockHash) - if err != nil { - panic(err) - } - response := <-rpc.SendRequest(NewServerRequest(cmd)) - - var resultData btcjson.BlockResult - if _, err := response.FinishUnmarshal(&resultData); err != nil { - return nil, err - } - return &resultData, nil -} - -// GetCurrentNet requests the network a bitcoin RPC server is running on. -func GetCurrentNet(rpc ServerConn) (btcwire.BitcoinNet, error) { - cmd := btcws.NewGetCurrentNetCmd(<-NewJSONID) - response := <-rpc.SendRequest(NewServerRequest(cmd)) - - var resultData uint32 - if _, err := response.FinishUnmarshal(&resultData); err != nil { - return 0, err - } - return btcwire.BitcoinNet(resultData), nil -} - -// NotifyBlocks requests blockconnected and blockdisconnected notifications. -func NotifyBlocks(rpc ServerConn) error { - cmd := btcws.NewNotifyBlocksCmd(<-NewJSONID) - response := <-rpc.SendRequest(NewServerRequest(cmd)) - _, err := response.FinishUnmarshal(nil) - return err -} - -// NotifyReceived requests notifications for new transactions that spend -// to any of the addresses in addrs. -func NotifyReceived(rpc ServerConn, addrs []string) error { - cmd := btcws.NewNotifyReceivedCmd(<-NewJSONID, addrs) - response := <-rpc.SendRequest(NewServerRequest(cmd)) - _, err := response.FinishUnmarshal(nil) - return err -} - -// NotifySpent requests notifications for when a transaction is processed which -// spends op. -func NotifySpent(rpc ServerConn, outpoints []*btcwire.OutPoint) error { - ops := make([]btcws.OutPoint, 0, len(outpoints)) - for _, op := range outpoints { - ops = append(ops, *btcws.NewOutPointFromWire(op)) - } - cmd := btcws.NewNotifySpentCmd(<-NewJSONID, ops) - response := <-rpc.SendRequest(NewServerRequest(cmd)) - _, err := response.FinishUnmarshal(nil) - return err -} - -// Rescan requests a blockchain rescan for transactions to any number of -// addresses and notifications to inform wallet about such transactions. -func Rescan(rpc ServerConn, beginBlock int32, addrs []string, - outpoints []*btcwire.OutPoint) error { - - ops := make([]btcws.OutPoint, 0, len(outpoints)) - for _, op := range outpoints { - ops = append(ops, *btcws.NewOutPointFromWire(op)) - } - // NewRescanCmd should never fail with no optargs. If this does fail, - // panic now rather than later. - cmd, err := btcws.NewRescanCmd(<-NewJSONID, beginBlock, addrs, ops) - if err != nil { - panic(err) - } - response := <-rpc.SendRequest(NewServerRequest(cmd)) - _, err = response.FinishUnmarshal(nil) - return err -} - -// SendRawTransaction sends a hex-encoded transaction for relay. -func SendRawTransaction(rpc ServerConn, hextx string) (txid string, err error) { - // NewSendRawTransactionCmd should never fail. In the exceptional case - // where it does, panic here rather than later. - cmd, err := btcjson.NewSendRawTransactionCmd(<-NewJSONID, hextx) - if err != nil { - panic(err) - } - response := <-rpc.SendRequest(NewServerRequest(cmd)) - - var resultData string - _, err = response.FinishUnmarshal(&resultData) - return resultData, err -} - -// GetRawTransaction returns a future representing a pending GetRawTransaction -// command for txsha.. When the result of the request is required it may be -// collected with GetRawTRansactionAsyncResult. -func GetRawTransactionAsync(rpc ServerConn, txsha *btcwire.ShaHash) chan RawRPCResponse { - // NewGetRawTransactionCmd should never fail with no optargs. If this - // does fail, panic now rather than later. - cmd, err := btcjson.NewGetRawTransactionCmd(<-NewJSONID, txsha.String()) - if err != nil { - panic(err) - } - return rpc.SendRequest(NewServerRequest(cmd)) -} - -// GetRawTransactionAsyncResult waits for the pending command in request - -// the reqsult of a previous GetRawTransactionAsync() call - and returns either -// the requested transaction, or an error. -func GetRawTransactionAsyncResult(request chan RawRPCResponse) (*btcutil.Tx, error) { - response := <-request - - var resultData string - _, err := response.FinishUnmarshal(&resultData) - if err != nil { - return nil, err - } - serializedTx, err := hex.DecodeString(resultData) - if err != nil { - return nil, btcjson.ErrDecodeHexString - } - utx, err := btcutil.NewTxFromBytes(serializedTx) - if err != nil { - return nil, btcjson.ErrDeserialization - } - return utx, nil -} - -// GetRawTransaction sends the non-verbose version of a getrawtransaction -// request to receive the serialized transaction referenced by txsha. If -// successful, the transaction is decoded and returned as a btcutil.Tx. -func GetRawTransaction(rpc ServerConn, txsha *btcwire.ShaHash) (*btcutil.Tx, error) { - resp := GetRawTransactionAsync(rpc, txsha) - return GetRawTransactionAsyncResult(resp) + // TODO: Only begin tracking new unspent outputs as a result + // of the rescan. This is also pretty racy, as a new block + // could arrive between rescan and by the time the new outpoint + // is added to btcd's websocket's unspent output set. + AcctMgr.Track() + + // (Re)send any unmined transactions to btcd in case of a btcd restart. + AcctMgr.ResendUnminedTxs() + + // Get current blockchain height and best block hash. + return nil + } + + // Iterator was invalid (wallet has never been synced) or there was a + // huge chain fork + reorg (more than 20 blocks). + AcctMgr.Track() + if err := AcctMgr.RescanActiveAddresses(); err != nil { + return err + } + // TODO: only begin tracking new unspent outputs as a result of the + // rescan. This is also racy (see comment for second Track above). + AcctMgr.Track() + AcctMgr.ResendUnminedTxs() + return nil } diff --git a/rpcserver.go b/rpcserver.go index ce26131..bd8b78b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -19,21 +19,34 @@ package main import ( "bytes" "crypto/ecdsa" + "crypto/sha256" + "crypto/subtle" + "crypto/tls" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "os" + "path/filepath" + "runtime" + "sync" + "time" + "github.com/conformal/btcec" "github.com/conformal/btcjson" + "github.com/conformal/btcrpcclient" "github.com/conformal/btcscript" "github.com/conformal/btcutil" "github.com/conformal/btcwallet/txstore" "github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwire" "github.com/conformal/btcws" - "sync" - "time" + "github.com/conformal/websocket" ) // Error types to simplify the reporting of specific categories of @@ -80,14 +93,725 @@ var ( } ) -// cmdHandler is a handler function to handle an unmarshaled and parsed +type websocketClient struct { + conn *websocket.Conn + authenticated bool + remoteAddr string + allRequests chan []byte + unauthedRequests chan unauthedRequest + responses chan []byte + quit chan struct{} // closed on disconnect +} + +func newWebsocketClient(c *websocket.Conn, authenticated bool, remoteAddr string) *websocketClient { + return &websocketClient{ + conn: c, + authenticated: authenticated, + remoteAddr: remoteAddr, + allRequests: make(chan []byte), + unauthedRequests: make(chan unauthedRequest, maxConcurrentClientRequests), + responses: make(chan []byte), + quit: make(chan struct{}), + } +} + +var errDisconnected = errors.New("websocket client disconnected") + +func (c *websocketClient) send(b []byte) error { + select { + case c.responses <- b: + return nil + case <-c.quit: + return errDisconnected + } +} + +// parseListeners splits the list of listen addresses passed in addrs into +// IPv4 and IPv6 slices and returns them. This allows easy creation of the +// listeners on the correct interface "tcp4" and "tcp6". It also properly +// detects addresses which apply to "all interfaces" and adds the address to +// both slices. +func parseListeners(addrs []string) ([]string, []string, error) { + ipv4ListenAddrs := make([]string, 0, len(addrs)*2) + ipv6ListenAddrs := make([]string, 0, len(addrs)*2) + for _, addr := range addrs { + host, _, err := net.SplitHostPort(addr) + if err != nil { + // Shouldn't happen due to already being normalized. + return nil, nil, err + } + + // Empty host or host of * on plan9 is both IPv4 and IPv6. + if host == "" || (host == "*" && runtime.GOOS == "plan9") { + ipv4ListenAddrs = append(ipv4ListenAddrs, addr) + ipv6ListenAddrs = append(ipv6ListenAddrs, addr) + continue + } + + // Parse the IP. + ip := net.ParseIP(host) + if ip == nil { + return nil, nil, fmt.Errorf("'%s' is not a valid IP "+ + "address", host) + } + + // To4 returns nil when the IP is not an IPv4 address, so use + // this determine the address type. + if ip.To4() == nil { + ipv6ListenAddrs = append(ipv6ListenAddrs, addr) + } else { + ipv4ListenAddrs = append(ipv4ListenAddrs, addr) + } + } + return ipv4ListenAddrs, ipv6ListenAddrs, nil +} + +// genCertPair generates a key/cert pair to the paths provided. +func genCertPair(certFile, keyFile string) error { + log.Infof("Generating TLS certificates...") + + // Create directories for cert and key files if they do not yet exist. + certDir, _ := filepath.Split(certFile) + keyDir, _ := filepath.Split(keyFile) + if err := os.MkdirAll(certDir, 0700); err != nil { + return err + } + if err := os.MkdirAll(keyDir, 0700); err != nil { + return err + } + + // Generate cert pair. + org := "btcwallet autogenerated cert" + validUntil := time.Now().Add(10 * 365 * 24 * time.Hour) + cert, key, err := btcutil.NewTLSCertPair(org, validUntil, nil) + if err != nil { + return err + } + + // Write cert and key files. + if err = ioutil.WriteFile(certFile, cert, 0666); err != nil { + return err + } + if err = ioutil.WriteFile(keyFile, key, 0600); err != nil { + if rmErr := os.Remove(certFile); rmErr != nil { + log.Warnf("Cannot remove written certificates: %v", rmErr) + } + return err + } + + log.Info("Done generating TLS certificates") + return nil +} + +// rpcServer holds the items the RPC server may need to access (auth, +// config, shutdown, etc.) +type rpcServer struct { + wg sync.WaitGroup + listeners []net.Listener + authsha [sha256.Size]byte + wsClients map[*websocketClient]struct{} + + upgrader websocket.Upgrader + + requests requestChan + + addWSClient chan *websocketClient + removeWSClient chan *websocketClient + broadcasts chan []byte +} + +// newRPCServer creates a new server for serving RPC client connections, both +// HTTP POST and websocket. +func newRPCServer(listenAddrs []string) (*rpcServer, error) { + login := cfg.Username + ":" + cfg.Password + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) + s := rpcServer{ + authsha: sha256.Sum256([]byte(auth)), + wsClients: map[*websocketClient]struct{}{}, + upgrader: websocket.Upgrader{ + // Allow all origins. + CheckOrigin: func(r *http.Request) bool { return true }, + }, + requests: make(requestChan), + addWSClient: make(chan *websocketClient), + removeWSClient: make(chan *websocketClient), + broadcasts: make(chan []byte), + } + + // Check for existence of cert file and key file + if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) { + // if both files do not exist, we generate them. + err := genCertPair(cfg.RPCCert, cfg.RPCKey) + if err != nil { + return nil, err + } + } + keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey) + if err != nil { + return nil, err + } + + tlsConfig := tls.Config{ + Certificates: []tls.Certificate{keypair}, + } + + ipv4ListenAddrs, ipv6ListenAddrs, err := parseListeners(listenAddrs) + listeners := make([]net.Listener, 0, + len(ipv6ListenAddrs)+len(ipv4ListenAddrs)) + for _, addr := range ipv4ListenAddrs { + listener, err := tls.Listen("tcp4", addr, &tlsConfig) + if err != nil { + log.Warnf("RPCS: Can't listen on %s: %v", addr, + err) + continue + } + listeners = append(listeners, listener) + } + + for _, addr := range ipv6ListenAddrs { + listener, err := tls.Listen("tcp6", addr, &tlsConfig) + if err != nil { + log.Warnf("RPCS: Can't listen on %s: %v", addr, + err) + continue + } + listeners = append(listeners, listener) + } + if len(listeners) == 0 { + return nil, errors.New("no valid listen address") + } + + s.listeners = listeners + + return &s, nil +} + +// Start starts a HTTP server to provide standard RPC and extension +// websocket connections for any number of btcwallet clients. +func (s *rpcServer) Start() { + // A duplicator for notifications intended for all clients runs + // in another goroutines. Any such notifications are sent to + // the allClients channel and then sent to each connected client. + go s.NotificationHandler() + go s.requests.handler() + + log.Trace("Starting RPC server") + + serveMux := http.NewServeMux() + const rpcAuthTimeoutSeconds = 10 + httpServer := &http.Server{ + Handler: serveMux, + + // Timeout connections which don't complete the initial + // handshake within the allowed timeframe. + ReadTimeout: time.Second * rpcAuthTimeoutSeconds, + } + serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Connection", "close") + w.Header().Set("Content-Type", "application/json") + r.Close = true + + // TODO: Limit number of active connections. + + if err := s.checkAuthHeader(r); err != nil { + log.Warnf("Unauthorized client connection attempt") + http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) + return + } + s.PostClientRPC(w, r) + }) + serveMux.HandleFunc("/frontend", func(w http.ResponseWriter, r *http.Request) { + authenticated := false + switch s.checkAuthHeader(r) { + case nil: + authenticated = true + case ErrNoAuth: + // nothing + default: + // If auth was supplied but incorrect, rather than simply + // being missing, immediately terminate the connection. + log.Warnf("Disconnecting improperly authorized " + + "websocket client") + http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) + return + } + + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Warnf("Cannot websocket upgrade client %s: %v", + r.RemoteAddr, err) + return + } + wsc := newWebsocketClient(conn, authenticated, r.RemoteAddr) + s.WebsocketClientRPC(wsc) + }) + for _, listener := range s.listeners { + s.wg.Add(1) + go func(listener net.Listener) { + log.Infof("RPCS: RPC server listening on %s", listener.Addr()) + if err := httpServer.Serve(listener); err != nil { + log.Errorf("Listener for %s exited with error: %v", + listener.Addr(), err) + } + log.Tracef("RPCS: RPC listener done for %s", listener.Addr()) + s.wg.Done() + }(listener) + } +} + +// ErrNoAuth represents an error where authentication could not succeed +// due to a missing Authorization HTTP header. +var ErrNoAuth = errors.New("no auth") + +// checkAuthHeader checks the HTTP Basic authentication supplied by a client +// in the HTTP request r. It errors with ErrNoAuth if the request does not +// contain the Authorization header, or another non-nil error if the +// authentication was provided but incorrect. +// +// This check is time-constant. +func (s *rpcServer) checkAuthHeader(r *http.Request) error { + authhdr := r.Header["Authorization"] + if len(authhdr) == 0 { + return ErrNoAuth + } + + authsha := sha256.Sum256([]byte(authhdr[0])) + cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:]) + if cmp != 1 { + return errors.New("bad auth") + } + return nil +} + +func (s *rpcServer) WebsocketClientRead(wsc *websocketClient) { + for { + _, request, err := wsc.conn.ReadMessage() + if err != nil { + if err != io.EOF && err != io.ErrUnexpectedEOF { + log.Warnf("Websocket receive failed from client %s: %v", + wsc.remoteAddr, err) + } + close(wsc.allRequests) + break + } + wsc.allRequests <- request + } + s.wg.Done() +} + +type rawRequest struct { + // "jsonrpc" value isn't checked so we exclude it. + ID interface{} `json:"id"` + Method string `json:"method"` + Params []json.RawMessage `json:"params"` +} + +// idPointer returns a pointer to the passed ID, or nil if the interface is nil. +// Interface pointers are usually a red flag of doing something incorrectly, +// but this is only implemented here to work around an oddity with btcjson, +// which uses empty interface pointers for request and response IDs. +func idPointer(id interface{}) (p *interface{}) { + if id != nil { + p = &id + } + return +} + +func marshalError(id *interface{}) []byte { + response := btcjson.Reply{ + Id: id, + Error: &btcjson.ErrInvalidRequest, + } + mresponse, err := json.Marshal(response) + // We expect the marshal to succeed. If it doesn't, it indicates some + // non-marshalable type in the response. + if err != nil { + panic(err) + } + return mresponse +} + +// websocketPassthrough pass a websocket client's raw request to the connected +// chain server. +func (s *rpcServer) websocketPassthrough(wsc *websocketClient, request rawRequest) { + resp := passthrough(request) + _ = wsc.send(resp) +} + +// postPassthrough pass a websocket client's raw request to the connected +// chain server. +func (s *rpcServer) postPassthrough(w http.ResponseWriter, request rawRequest) { + resp := passthrough(request) + if _, err := w.Write(resp); err != nil { + log.Warnf("Unable to respond to client with passthrough "+ + "response: %v", err) + } +} + +// passthrough is a helper function for websocketPassthrough and postPassthrough +// to request and receive the chain server's marshaled response to an +// unhandled-by-wallet request. The marshaled response includes the original +// request's ID. +func passthrough(request rawRequest) []byte { + var res json.RawMessage + client, err := accessClient() + if err == nil { + res, err = client.RawRequest(request.Method, request.Params) + } + var jsonErr *btcjson.Error + if err != nil { + switch e := err.(type) { + case *btcjson.Error: + jsonErr = e + case btcjson.Error: + jsonErr = &e + default: + jsonErr = &btcjson.Error{ + Code: btcjson.ErrWallet.Code, + Message: err.Error(), + } + } + } + + // The raw result will only marshal correctly if called with the + // MarshalJSON method, and that method requires a pointer receiver. + var pres *json.RawMessage + if res != nil { + pres = &res + } + + resp := btcjson.Reply{ + Id: idPointer(request.ID), + Result: pres, + Error: jsonErr, + } + mresp, err := json.Marshal(resp) + // The chain server response was successfully unmarshaled or we created + // our own error, so a marshal can never error. + if err != nil { + panic(err) + } + return mresp +} + +type unauthedRequest struct { + marshaledRequest []byte + handler requestHandler +} + +func (s *rpcServer) WebsocketClientGateway(wsc *websocketClient) { +out: + for request := range wsc.allRequests { + // Get the method of the request and check whether it should be + // handled by wallet or passed down to btcd. If the latter, + // handle in a new goroutine (to not block or be blocked by + // the handling of actual wallet requests). + // + // This is done by unmarshaling the JSON bytes into a rawRequest + // to avoid the mangling of unmarshaling and re-marshaling of + // large JSON numbers, as well as the overhead of unneeded + // unmarshals and marshals. + var raw rawRequest + if err := json.Unmarshal(request, &raw); err != nil { + if !wsc.authenticated { + // Disconnect immediately. + break out + } + err = wsc.send(marshalError(idPointer(raw.ID))) + if err != nil { + break out + } + continue + } + + f, ok := handlerFunc(raw.Method, true) + if ok || raw.Method == "authenticate" { + // unauthedRequests is buffered to the max number of + // concurrent websocket client requests so as to not + // block the passthrough of later btcd requests. + wsc.unauthedRequests <- unauthedRequest{request, f} + } else { + // websocketPassthrough is run as a goroutine to + // send an unhandled request to the chain server without + // blocking the handling of later wallet requests. + go s.websocketPassthrough(wsc, raw) + } + } + close(wsc.unauthedRequests) + s.wg.Done() +} + +// invalidAuth checks whether a websocket request is allowed for the current +// authentication state. If an unauthenticated client submitted an +// authenticate request, the authentication is verified and the client's +// authentication state is modified. +func (s *rpcServer) invalidAuth(wsc *websocketClient, request btcjson.Cmd) (invalid, checked bool) { + if authCmd, ok := request.(*btcws.AuthenticateCmd); ok { + // Duplication authentication is not allowed. + if wsc.authenticated { + return true, false + } + + // Check credentials. + login := authCmd.Username + ":" + authCmd.Passphrase + auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) + authSha := sha256.Sum256([]byte(auth)) + cmp := subtle.ConstantTimeCompare(authSha[:], s.authsha[:]) + wsc.authenticated = cmp == 1 + return cmp != 1, true + } + // Unauthorized clients must first issue an authenticate request. If + // not already authenticated, the auth is invalid. + return !wsc.authenticated, false +} + +func (s *rpcServer) WebsocketClientRespond(wsc *websocketClient) { +out: + for r := range wsc.unauthedRequests { + cmd, parseErr := btcjson.ParseMarshaledCmd(r.marshaledRequest) + var id interface{} + if cmd != nil { + id = cmd.Id() + } + + // Verify that the websocket is authenticated and not send an + // unnecessary authentication request, or perform the check + // if unauthenticated and this is an authentication request. + // Disconnect the client immediately if the authentication is + // invalid or disallowed. + switch invalid, checked := s.invalidAuth(wsc, cmd); { + case invalid: + log.Warnf("Disconnecting improperly authenticated "+ + "websocket client %s", wsc.remoteAddr) + break out + case checked: + // Marshal and send a successful auth response. The + // marshal is expected to never fail. + response := btcjson.Reply{Id: idPointer(id)} + mresponse, err := json.Marshal(response) + if err != nil { + panic(err) + } + if err := wsc.send(mresponse); err != nil { + break out + } + continue + } + + // The parse error is checked after the authentication check + // so we don't respond back for invalid requests sent by + // unauthenticated clients. + if parseErr != nil { + if wsc.send(marshalError(idPointer(id))) != nil { + break out + } + continue + } + + // Send request and the handler func (already looked up) to the + // server's global request handler. This serializes the + // execution of all handlers from all connections (both + // websocket and HTTP POST), and runs the handler with exclusive + // access of the account manager. + responseChan := make(chan handlerResponse) + s.requests <- handlerJob{ + request: cmd, + handler: r.handler, + response: responseChan, + } + response := <-responseChan + resp := btcjson.Reply{ + Id: idPointer(id), + Result: response.result, + Error: response.jsonErr, + } + mresp, err := json.Marshal(resp) + // All responses originating from us must be marshalable. + if err != nil { + panic(err) + } + // Send marshaled response to client. + if err := wsc.send(mresp); err != nil { + break out + } + } + close(wsc.responses) + s.wg.Done() +} + +func (s *rpcServer) WebsocketClientSend(wsc *websocketClient) { + const deadline time.Duration = 2 * time.Second + for response := range wsc.responses { + err := wsc.conn.SetWriteDeadline(time.Now().Add(deadline)) + if err != nil { + log.Warnf("Cannot set write deadline on client %s: %v", + wsc.remoteAddr, err) + } + err = wsc.conn.WriteMessage(websocket.TextMessage, response) + if err != nil { + log.Warnf("Failed websocket send to client %s: %v", + wsc.remoteAddr, err) + break + } + } + close(wsc.quit) + log.Infof("Disconnected websocket client %s", wsc.remoteAddr) + s.removeWSClient <- wsc + s.wg.Done() +} + +// WebsocketClientRPC starts the goroutines to serve JSON-RPC requests and +// notifications over a websocket connection for a single client. +func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) { + log.Infof("New websocket client %s", wsc.remoteAddr) + + // Clear the read deadline set before the websocket hijacked + // the connection. + if err := wsc.conn.SetReadDeadline(time.Time{}); err != nil { + log.Warnf("Cannot remove read deadline: %v", err) + } + + // Add client context so notifications duplicated to each + // client are received by this client. + s.addWSClient <- wsc + + s.wg.Add(4) + go s.WebsocketClientRead(wsc) + go s.WebsocketClientGateway(wsc) + go s.WebsocketClientRespond(wsc) + go s.WebsocketClientSend(wsc) + + // Send initial unsolicited notifications. + // TODO: these should be requested by the client first. + s.NotifyConnectionStatus(wsc) +} + +// maxRequestSize specifies the maximum number of bytes in the request body +// that may be read from a client. This is currently limited to 4MB. +const maxRequestSize = 1024 * 1024 * 4 + +// PostClientRPC processes and replies to a JSON-RPC client request. +func (s *rpcServer) PostClientRPC(w http.ResponseWriter, r *http.Request) { + body := http.MaxBytesReader(w, r.Body, maxRequestSize) + rpcRequest, err := ioutil.ReadAll(body) + if err != nil { + // TODO: what if the underlying reader errored? + http.Error(w, "413 Request Too Large.", + http.StatusRequestEntityTooLarge) + return + } + + // First check whether wallet has a handler for this request's method. + // If unfound, the request is sent to the chain server for further + // processing. While checking the methods, disallow authenticate + // requests, as they are invalid for HTTP POST clients. + var raw rawRequest + err = json.Unmarshal(rpcRequest, &raw) + if err != nil || raw.Method == "authenticate" { + _, err := w.Write(marshalError(idPointer(raw.ID))) + if err != nil { + log.Warnf("Cannot write invalid request request to "+ + "client: %v", err) + } + return + } + f, ok := handlerFunc(raw.Method, false) + if !ok { + s.postPassthrough(w, raw) + return + } + + // Parse the full request since it must be handled by wallet. + cmd, err := btcjson.ParseMarshaledCmd(rpcRequest) + var id interface{} + if cmd != nil { + id = cmd.Id() + } + if err != nil { + _, err := w.Write(marshalError(idPointer(cmd.Id()))) + if err != nil { + log.Warnf("Client sent invalid request but unable "+ + "to respond with error: %v", err) + } + return + } + + // Send request and the handler func (already looked up) to the + // server's global request handler. This serializes the + // execution of all handlers from all connections (both + // websocket and HTTP POST), and runs the handler with exclusive + // access of the account manager. + responseChan := make(chan handlerResponse) + s.requests <- handlerJob{ + request: cmd, + handler: f, + response: responseChan, + } + response := <-responseChan + resp := btcjson.Reply{ + Id: idPointer(id), + Result: response.result, + Error: response.jsonErr, + } + mresp, err := json.Marshal(resp) + // All responses originating from us must be marshalable. + if err != nil { + panic(err) + } + // Send marshaled response to client. + if _, err := w.Write(mresp); err != nil { + log.Warnf("Unable to respond to client: %v", err) + } +} + +// NotifyConnectionStatus notifies all connected websocket clients of the +// current connection status of btcwallet to btcd. +func (s *rpcServer) NotifyConnectionStatus(wsc *websocketClient) { + connected := false + client, err := accessClient() + if err == nil { + connected = !client.Disconnected() + } + ntfn := btcws.NewBtcdConnectedNtfn(connected) + mntfn, err := ntfn.MarshalJSON() + // btcws notifications must always marshal without error. + if err != nil { + panic(err) + } + if wsc == nil { + s.broadcasts <- mntfn + } else { + // Don't care whether the client disconnected at this + // point, so discard error. + _ = wsc.send(mntfn) + } +} + +func (s *rpcServer) NotificationHandler() { + for { + select { + case c := <-s.addWSClient: + s.wsClients[c] = struct{}{} + case c := <-s.removeWSClient: + delete(s.wsClients, c) + case b := <-s.broadcasts: + for wsc := range s.wsClients { + if err := wsc.send(b); err != nil { + delete(s.wsClients, wsc) + } + } + } + } +} + +// requestHandler is a handler function to handle an unmarshaled and parsed // request into a marshalable response. If the error is a btcjson.Error // or any of the above special error classes, the server will respond with // the JSON-RPC appropiate error code. All other errors use the wallet // catch-all error code, btcjson.ErrWallet.Code. -type cmdHandler func(btcjson.Cmd) (interface{}, error) +type requestHandler func(btcjson.Cmd) (interface{}, error) -var rpcHandlers = map[string]cmdHandler{ +var rpcHandlers = map[string]requestHandler{ // Standard bitcoind methods (implemented) "addmultisigaddress": AddMultiSigAddress, "createmultisig": CreateMultiSig, @@ -127,6 +851,7 @@ var rpcHandlers = map[string]cmdHandler{ "getreceivedbyaddress": Unimplemented, "gettxout": Unimplemented, "gettxoutsetinfo": Unimplemented, + "getwalletinfo": Unimplemented, "getwork": Unimplemented, "importwallet": Unimplemented, "listaddressgroupings": Unimplemented, @@ -145,7 +870,7 @@ var rpcHandlers = map[string]cmdHandler{ } // Extensions exclusive to websocket connections. -var wsHandlers = map[string]cmdHandler{ +var wsHandlers = map[string]requestHandler{ "exportwatchingwallet": ExportWatchingWallet, "getaddressbalance": GetAddressBalance, "getunconfirmedbalance": GetUnconfirmedBalance, @@ -155,194 +880,76 @@ var wsHandlers = map[string]cmdHandler{ "walletislocked": WalletIsLocked, } -// Channels to control RPCGateway -var ( - // Incoming requests from frontends - clientRequests = make(chan *ClientRequest) - - // Incoming notifications from a bitcoin server (btcd) - svrNtfns = make(chan btcjson.Cmd) -) - -// ErrServerBusy is a custom JSON-RPC error for when a client's request -// could not be added to the server request queue for handling. -var ErrServerBusy = btcjson.Error{ - Code: -32000, - Message: "Server busy", -} - -// ErrServerBusyRaw is the raw JSON encoding of ErrServerBusy. -var ErrServerBusyRaw = json.RawMessage(`{"code":-32000,"message":"Server busy"}`) - -// RPCGateway is the common entry point for all client RPC requests and -// server notifications. If a request needs to be handled by btcwallet, -// it is sent to WalletRequestProcessor's request queue, or dropped if the -// queue is full. If a request is unhandled, it is recreated with a new -// JSON-RPC id and sent to btcd for handling. Notifications are also queued -// if they cannot be immediately handled, but are never dropped (queue may -// grow infinitely large). -func RPCGateway() { - var ntfnQueue []btcjson.Cmd - unreadChan := make(chan btcjson.Cmd) - - for { - var ntfnOut chan btcjson.Cmd - var oldestNtfn btcjson.Cmd - if len(ntfnQueue) > 0 { - ntfnOut = handleNtfn - oldestNtfn = ntfnQueue[0] - } else { - ntfnOut = unreadChan - } - - select { - case r := <-clientRequests: - // Check whether to handle request or send to btcd. - _, std := rpcHandlers[r.request.Method()] - _, ext := wsHandlers[r.request.Method()] - if std || ext { - select { - case requestQueue <- r: - default: - // Server busy with too many requests. - resp := RawRPCResponse{ - Error: &ErrServerBusyRaw, - } - r.response <- resp - } - } else { - r.request.SetId(<-NewJSONID) - request := &ServerRequest{ - request: r.request, - response: r.response, - } - CurrentServerConn().SendRequest(request) - } - - case n := <-svrNtfns: - ntfnQueue = append(ntfnQueue, n) - - case ntfnOut <- oldestNtfn: - ntfnQueue = ntfnQueue[1:] - } +// handlerFunc looks up a request handler func for the passed method from +// the http post and (if the request is from a websocket connection) websocket +// handler maps. If a suitable handler could not be found, ok is false. +func handlerFunc(method string, ws bool) (f requestHandler, ok bool) { + f, ok = rpcHandlers[method] + if !ok && ws { + f, ok = wsHandlers[method] } + return f, ok } -// Channels to control WalletRequestProcessor -var ( - requestQueue = make(chan *ClientRequest, 100) - handleNtfn = make(chan btcjson.Cmd) -) +type handlerResponse struct { + result interface{} + jsonErr *btcjson.Error +} -// WalletRequestProcessor processes client requests and btcd notifications. -func WalletRequestProcessor() { - for { - select { - case r := <-requestQueue: - method := r.request.Method() - f, ok := rpcHandlers[method] - if !ok && r.ws { - f, ok = wsHandlers[method] - } - if !ok { - f = Unimplemented - } +type handlerJob struct { + request btcjson.Cmd + handler requestHandler + response chan<- handlerResponse +} - AcctMgr.Grab() - result, err := f(r.request) - AcctMgr.Release() +type requestChan chan handlerJob - if err != nil { - jsonErr := btcjson.Error{Message: err.Error()} - switch e := err.(type) { - case btcjson.Error: - jsonErr = e - case DeserializationError: - jsonErr.Code = btcjson.ErrDeserialization.Code - case InvalidParameterError: - jsonErr.Code = btcjson.ErrInvalidParameter.Code - case ParseError: - jsonErr.Code = btcjson.ErrParse.Code - case InvalidAddressOrKeyError: - jsonErr.Code = btcjson.ErrInvalidAddressOrKey.Code - default: // All other errors get the wallet error code. - jsonErr.Code = btcjson.ErrWallet.Code - } +// handler reads and processes client requests from the channel. Each +// request is run with exclusive access to the account manager. +func (c requestChan) handler() { + for r := range c { + AcctMgr.Grab() + result, err := r.handler(r.request) + AcctMgr.Release() - b, err := json.Marshal(jsonErr) - // Marshal should only fail if jsonErr contains - // vars of an non-mashalable type, which would - // indicate a source code issue with btcjson. - if err != nil { - panic(err) - } - r.response <- RawRPCResponse{ - Error: (*json.RawMessage)(&b), - } - continue - } - - b, err := json.Marshal(result) - // Marshal should only fail if result contains vars of - // an unmashalable type. This may indicate an bug with - // the calle RPC handler, and should be logged. - if err != nil { - log.Errorf("Cannot marshal result: %v", err) - } - r.response <- RawRPCResponse{ - Result: (*json.RawMessage)(&b), - } - - case n := <-handleNtfn: - f, ok := notificationHandlers[n.Method()] - if !ok { - // Ignore unhandled notifications. - continue - } - - AcctMgr.Grab() - err := f(n) - AcctMgr.Release() - switch err { - case txstore.ErrInconsistentStore: - // Assume this is a broken btcd reordered - // notifications. Restart the connection - // to reload accounts files from their last - // known good state. - log.Warn("Reconnecting to recover from " + - "out-of-order btcd notification") - s := CurrentServerConn() - if btcd, ok := s.(*BtcdRPCConn); ok { - AcctMgr.Grab() - btcd.Close() - AcctMgr.OpenAccounts() - AcctMgr.Release() - } - - case nil: // ignore - default: - log.Warn(err) + var jsonErr *btcjson.Error + if err != nil { + jsonErr = &btcjson.Error{Message: err.Error()} + switch e := err.(type) { + case btcjson.Error: + *jsonErr = e + case DeserializationError: + jsonErr.Code = btcjson.ErrDeserialization.Code + case InvalidParameterError: + jsonErr.Code = btcjson.ErrInvalidParameter.Code + case ParseError: + jsonErr.Code = btcjson.ErrParse.Code + case InvalidAddressOrKeyError: + jsonErr.Code = btcjson.ErrInvalidAddressOrKey.Code + default: // All other errors get the wallet error code. + jsonErr.Code = btcjson.ErrWallet.Code } } + r.response <- handlerResponse{result, jsonErr} } } // Unimplemented handles an unimplemented RPC request with the // appropiate error. -func Unimplemented(icmd btcjson.Cmd) (interface{}, error) { +func Unimplemented(btcjson.Cmd) (interface{}, error) { return nil, btcjson.ErrUnimplemented } // Unsupported handles a standard bitcoind RPC request which is // unsupported by btcwallet due to design differences. -func Unsupported(icmd btcjson.Cmd) (interface{}, error) { +func Unsupported(btcjson.Cmd) (interface{}, error) { return nil, btcjson.Error{ Code: -1, Message: "Request unsupported by btcwallet", } } -// makeMultiSigScript is a heper function to combine common logic for +// makeMultiSigScript is a helper function to combine common logic for // AddMultiSig and CreateMultiSig. // all error codes are rpc parse error here to match bitcoind which just throws // a runtime exception. *sigh*. @@ -560,17 +1167,14 @@ func GetBalance(icmd btcjson.Cmd) (interface{}, error) { // exist. func GetInfo(icmd btcjson.Cmd) (interface{}, error) { // Call down to btcd for all of the information in this command known - // by them. This call is expected to always succeed. - gicmd, err := btcjson.NewGetInfoCmd(<-NewJSONID) + // by them. + client, err := accessClient() if err != nil { - panic(err) + return nil, err } - response := <-CurrentServerConn().SendRequest(NewServerRequest(gicmd)) - - var info btcjson.InfoResult - _, jsonErr := response.FinishUnmarshal(&info) - if jsonErr != nil { - return nil, jsonErr + info, err := client.GetInfo() + if err != nil { + return nil, err } balance := float64(0.0) @@ -586,12 +1190,10 @@ func GetInfo(icmd btcjson.Cmd) (interface{}, error) { TxFeeIncrement.Lock() info.PaytxFee = float64(TxFeeIncrement.i) / float64(btcutil.SatoshiPerBitcoin) TxFeeIncrement.Unlock() - /* - * We don't set the following since they don't make much sense in the - * wallet architecture: - * - unlocked_until - * - errors - */ + // We don't set the following since they don't make much sense in the + // wallet architecture: + // - unlocked_until + // - errors return info, nil } @@ -748,14 +1350,32 @@ func KeypoolRefill(icmd btcjson.Cmd) (interface{}, error) { return nil, nil } +// NotifyNewBlockChainHeight notifies all frontends of a new +// blockchain height. This sends the same notification as +// btcd, so this can probably be removed. +func (s *rpcServer) NotifyNewBlockChainHeight(bs *wallet.BlockStamp) { + ntfn := btcws.NewBlockConnectedNtfn(bs.Hash.String(), bs.Height) + mntfn, err := ntfn.MarshalJSON() + // btcws notifications must always marshal without error. + if err != nil { + panic(err) + } + s.broadcasts <- mntfn +} + // NotifyBalances notifies an attached frontend of the current confirmed // and unconfirmed account balances. // // TODO(jrick): Switch this to return a single JSON object // (map[string]interface{}) of all accounts and their balances, instead of // separate notifications for each account. -func NotifyBalances(frontend chan []byte) { - AcctMgr.NotifyBalances(frontend) +func (s *rpcServer) NotifyBalances(wsc *websocketClient) { + for _, a := range AcctMgr.AllAccounts() { + balance := a.CalculateBalance(1) + unconfirmed := a.CalculateBalance(0) - balance + s.NotifyWalletBalance(a.name, balance) + s.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed) + } } // GetNewAddress handlesa getnewaddress request by returning a new @@ -1078,13 +1698,22 @@ func ListSinceBlock(icmd btcjson.Cmd) (interface{}, error) { return nil, btcjson.ErrInternal } + client, err := accessClient() + if err != nil { + return nil, err + } + height := int32(-1) if cmd.BlockHash != "" { - br, err := GetBlock(CurrentServerConn(), cmd.BlockHash) + hash, err := btcwire.NewShaHashFromStr(cmd.BlockHash) + if err != nil { + return nil, DeserializationError{err} + } + block, err := client.GetBlock(hash) if err != nil { return nil, err } - height = int32(br.Height) + height = int32(block.Height()) } bs, err := GetCurBlock() @@ -1095,14 +1724,11 @@ func ListSinceBlock(icmd btcjson.Cmd) (interface{}, error) { // For the result we need the block hash for the last block counted // in the blockchain due to confirmations. We send this off now so that // it can arrive asynchronously while we figure out the rest. - gbh, err := btcjson.NewGetBlockHashCmd(<-NewJSONID, - int64(bs.Height)+1-int64(cmd.TargetConfirmations)) + gbh := client.GetBlockHashAsync(int64(bs.Height) + 1 - int64(cmd.TargetConfirmations)) if err != nil { return nil, err } - bhChan := CurrentServerConn().SendRequest(NewServerRequest(gbh)) - txInfoList, err := AcctMgr.ListSinceBlock(height, bs.Height, cmd.TargetConfirmations) if err != nil { @@ -1110,16 +1736,14 @@ func ListSinceBlock(icmd btcjson.Cmd) (interface{}, error) { } // Done with work, get the response. - response := <-bhChan - var hash string - _, err = response.FinishUnmarshal(&hash) + blockHash, err := gbh.Receive() if err != nil { return nil, err } res := btcjson.ListSinceBlockResult{ Transactions: txInfoList, - LastBlock: hash, + LastBlock: blockHash.String(), } return res, nil } @@ -1234,6 +1858,12 @@ func ListUnspent(icmd btcjson.Cmd) (interface{}, error) { // sending payment transactions. func sendPairs(icmd btcjson.Cmd, account string, amounts map[string]btcutil.Amount, minconf int) (interface{}, error) { + + client, err := accessClient() + if err != nil { + return nil, err + } + // Check that the account specified in the request exists. a, err := AcctMgr.Account(account) if err != nil { @@ -1265,24 +1895,22 @@ func sendPairs(icmd btcjson.Cmd, account string, amounts map[string]btcutil.Amou if err := AcctMgr.ds.FlushAccount(a); err != nil { return nil, fmt.Errorf("Cannot write account: %v", err) } - a.ReqNewTxsForAddress(createdTx.changeAddr) + err := client.NotifyReceived([]btcutil.Address{createdTx.changeAddr}) + if err != nil { + return nil, err + } } - serializedTx := bytes.Buffer{} - serializedTx.Grow(createdTx.tx.MsgTx().SerializeSize()) - if err := createdTx.tx.MsgTx().Serialize(&serializedTx); err != nil { - // Hitting OOM writing to a bytes.Buffer already panics, and - // all other errors are unexpected. - panic(err) - } - hextx := hex.EncodeToString(serializedTx.Bytes()) - txSha, err := SendRawTransaction(CurrentServerConn(), hextx) + txSha, err := client.SendRawTransaction(createdTx.tx.MsgTx(), false) if err != nil { SendTxHistSyncChans.remove <- *createdTx.tx.Sha() return nil, err } - return handleSendRawTxReply(icmd, txSha, a, createdTx) + if err := handleSendRawTxReply(icmd, txSha, a, createdTx); err != nil { + return nil, err + } + return txSha.String(), nil } // SendFrom handles a sendfrom RPC request by creating a new transaction @@ -1418,17 +2046,17 @@ func SendBeforeReceiveHistorySync(add, done, remove chan btcwire.ShaHash, } } -func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo *CreatedTx) (interface{}, error) { +func handleSendRawTxReply(icmd btcjson.Cmd, txSha *btcwire.ShaHash, a *Account, txInfo *CreatedTx) error { // Add to transaction store. txr, err := a.TxStore.InsertTx(txInfo.tx, nil) if err != nil { log.Errorf("Error adding sent tx history: %v", err) - return nil, btcjson.ErrInternal + return btcjson.ErrInternal } debits, err := txr.AddDebits(txInfo.inputs) if err != nil { log.Errorf("Error adding sent tx history: %v", err) - return nil, btcjson.ErrInternal + return btcjson.ErrInternal } AcctMgr.ds.ScheduleTxStoreWrite(a) @@ -1438,10 +2066,10 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo * ltr, err := debits.ToJSON(a.Name(), bs.Height, a.Net()) if err != nil { log.Errorf("Error adding sent tx history: %v", err) - return nil, btcjson.ErrInternal + return btcjson.ErrInternal } for _, details := range ltr { - NotifyNewTxDetails(allClients, a.Name(), details) + server.NotifyNewTxDetails(a.Name(), details) } } @@ -1451,15 +2079,15 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo * // Disk sync tx and utxo stores. if err := AcctMgr.ds.FlushAccount(a); err != nil { log.Errorf("Cannot write account: %v", err) - return nil, err + return err } // Notify all frontends of account's new unconfirmed and // confirmed balance. confirmed := a.CalculateBalance(1) unconfirmed := a.CalculateBalance(0) - confirmed - NotifyWalletBalance(allClients, a.name, confirmed) - NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed) + server.NotifyWalletBalance(a.name, confirmed) + server.NotifyWalletBalanceUnconfirmed(a.name, unconfirmed) // The comments to be saved differ based on the underlying type // of the cmd, so switch on the type to check whether it is a @@ -1479,8 +2107,8 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo * _ = cmd.CommentTo } - log.Infof("Successfully sent transaction %v", txIDStr) - return txIDStr, nil + log.Infof("Successfully sent transaction %v", txSha) + return nil } // SetTxFee sets the transaction fee per kilobyte added to transactions. @@ -1588,7 +2216,7 @@ func RecoverAddresses(icmd btcjson.Cmd) (interface{}, error) { // pendingTx is used for async fetching of transaction dependancies in // SignRawTransaction. type pendingTx struct { - resp chan RawRPCResponse + resp btcrpcclient.FutureGetRawTransactionResult inputs []uint32 // list of inputs that care about this tx. } @@ -1651,6 +2279,8 @@ func SignRawTransaction(icmd btcjson.Cmd) (interface{}, error) { }] = script } + var client *rpcClient + // Now we go and look for any inputs that we were not provided by // querying btcd with getrawtransaction. We queue up a bunch of async // requests and will wait for replies after we have checked the rest of @@ -1674,9 +2304,15 @@ func SignRawTransaction(icmd btcjson.Cmd) (interface{}, error) { } // Never heard of this one before, request it. + if client == nil { + client, err = accessClient() + if err != nil { + return nil, err + } + } + prevHash := &txIn.PreviousOutpoint.Hash requested[txIn.PreviousOutpoint.Hash] = &pendingTx{ - resp: GetRawTransactionAsync(CurrentServerConn(), - &txIn.PreviousOutpoint.Hash), + resp: client.GetRawTransactionAsync(prevHash), inputs: []uint32{txIn.PreviousOutpoint.Index}, } } @@ -1737,7 +2373,7 @@ func SignRawTransaction(icmd btcjson.Cmd) (interface{}, error) { // could move waiting to the following loop and be slightly more // asynchronous. for txid, ptx := range requested { - tx, err := GetRawTransactionAsyncResult(ptx.resp) + tx, err := ptx.resp.Receive() if err != nil { return nil, err } @@ -2067,7 +2703,7 @@ type AccountNtfn struct { // NotifyWalletLockStateChange sends a notification to all frontends // that the wallet has just been locked or unlocked. -func NotifyWalletLockStateChange(account string, locked bool) { +func (s *rpcServer) NotifyWalletLockStateChange(account string, locked bool) { ntfn := btcws.NewWalletLockStateNtfn(account, locked) mntfn, err := ntfn.MarshalJSON() // If the marshal failed, it indicates that the btcws notification @@ -2075,12 +2711,12 @@ func NotifyWalletLockStateChange(account string, locked bool) { if err != nil { panic(err) } - allClients <- mntfn + s.broadcasts <- mntfn } // NotifyWalletBalance sends a confirmed account balance notification // to a frontend. -func NotifyWalletBalance(frontend chan []byte, account string, balance float64) { +func (s *rpcServer) NotifyWalletBalance(account string, balance float64) { ntfn := btcws.NewAccountBalanceNtfn(account, balance, true) mntfn, err := ntfn.MarshalJSON() // If the marshal failed, it indicates that the btcws notification @@ -2088,12 +2724,12 @@ func NotifyWalletBalance(frontend chan []byte, account string, balance float64) if err != nil { panic(err) } - frontend <- mntfn + s.broadcasts <- mntfn } // NotifyWalletBalanceUnconfirmed sends a confirmed account balance // notification to a frontend. -func NotifyWalletBalanceUnconfirmed(frontend chan []byte, account string, balance float64) { +func (s *rpcServer) NotifyWalletBalanceUnconfirmed(account string, balance float64) { ntfn := btcws.NewAccountBalanceNtfn(account, balance, false) mntfn, err := ntfn.MarshalJSON() // If the marshal failed, it indicates that the btcws notification @@ -2101,13 +2737,11 @@ func NotifyWalletBalanceUnconfirmed(frontend chan []byte, account string, balanc if err != nil { panic(err) } - frontend <- mntfn + s.broadcasts <- mntfn } // NotifyNewTxDetails sends details of a new transaction to a frontend. -func NotifyNewTxDetails(frontend chan []byte, account string, - details btcjson.ListTransactionsResult) { - +func (s *rpcServer) NotifyNewTxDetails(account string, details btcjson.ListTransactionsResult) { ntfn := btcws.NewTxNtfn(account, &details) mntfn, err := ntfn.MarshalJSON() // If the marshal failed, it indicates that the btcws notification @@ -2115,7 +2749,7 @@ func NotifyNewTxDetails(frontend chan []byte, account string, if err != nil { panic(err) } - frontend <- mntfn + s.broadcasts <- mntfn } // NotifiedRecvTxRequest is used to check whether the outpoint of diff --git a/sockets.go b/sockets.go deleted file mode 100644 index cb82be1..0000000 --- a/sockets.go +++ /dev/null @@ -1,814 +0,0 @@ -/* - * Copyright (c) 2013, 2014 Conformal Systems LLC - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -package main - -import ( - "code.google.com/p/go.net/websocket" - "crypto/sha256" - "crypto/subtle" - "crypto/tls" - "crypto/x509" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "github.com/conformal/btcjson" - "github.com/conformal/btcutil" - "github.com/conformal/btcwallet/wallet" - "github.com/conformal/btcws" - "github.com/conformal/go-socks" - "io" - "io/ioutil" - "net" - "net/http" - "os" - "path/filepath" - "runtime" - "sync" - "time" -) - -var ( - // ErrBadAuth represents an error where a request is denied due to - // a missing, incorrect, or duplicate authentication request. - ErrBadAuth = errors.New("bad auth") - - // ErrNoAuth represents an error where authentication could not succeed - // due to a missing Authorization HTTP header. - ErrNoAuth = errors.New("no auth") - - // ErrConnRefused represents an error where a connection to another - // process cannot be established. - ErrConnRefused = errors.New("connection refused") - - // ErrConnLost represents an error where a connection to another - // process cannot be established. - ErrConnLost = errors.New("connection lost") - - // Adds a frontend listener channel - addClient = make(chan clientContext) - - // Messages sent to this channel are sent to each connected frontend. - allClients = make(chan []byte, 100) -) - -// server holds the items the RPC server may need to access (auth, -// config, shutdown, etc.) -type server struct { - wg sync.WaitGroup - listeners []net.Listener - authsha [sha256.Size]byte -} - -type clientContext struct { - send chan []byte - quit chan struct{} // closed on disconnect -} - -// parseListeners splits the list of listen addresses passed in addrs into -// IPv4 and IPv6 slices and returns them. This allows easy creation of the -// listeners on the correct interface "tcp4" and "tcp6". It also properly -// detects addresses which apply to "all interfaces" and adds the address to -// both slices. -func parseListeners(addrs []string) ([]string, []string, error) { - ipv4ListenAddrs := make([]string, 0, len(addrs)*2) - ipv6ListenAddrs := make([]string, 0, len(addrs)*2) - for _, addr := range addrs { - host, _, err := net.SplitHostPort(addr) - if err != nil { - // Shouldn't happen due to already being normalized. - return nil, nil, err - } - - // Empty host or host of * on plan9 is both IPv4 and IPv6. - if host == "" || (host == "*" && runtime.GOOS == "plan9") { - ipv4ListenAddrs = append(ipv4ListenAddrs, addr) - ipv6ListenAddrs = append(ipv6ListenAddrs, addr) - continue - } - - // Parse the IP. - ip := net.ParseIP(host) - if ip == nil { - return nil, nil, fmt.Errorf("'%s' is not a valid IP "+ - "address", host) - } - - // To4 returns nil when the IP is not an IPv4 address, so use - // this determine the address type. - if ip.To4() == nil { - ipv6ListenAddrs = append(ipv6ListenAddrs, addr) - } else { - ipv4ListenAddrs = append(ipv4ListenAddrs, addr) - } - } - return ipv4ListenAddrs, ipv6ListenAddrs, nil -} - -// newServer returns a new instance of the server struct. -func newServer(listenAddrs []string) (*server, error) { - login := cfg.Username + ":" + cfg.Password - auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) - s := server{ - authsha: sha256.Sum256([]byte(auth)), - } - - // Check for existence of cert file and key file - if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) { - // if both files do not exist, we generate them. - err := genCertPair(cfg.RPCCert, cfg.RPCKey) - if err != nil { - return nil, err - } - } - keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey) - if err != nil { - return nil, err - } - - tlsConfig := tls.Config{ - Certificates: []tls.Certificate{keypair}, - } - - ipv4ListenAddrs, ipv6ListenAddrs, err := parseListeners(listenAddrs) - listeners := make([]net.Listener, 0, - len(ipv6ListenAddrs)+len(ipv4ListenAddrs)) - for _, addr := range ipv4ListenAddrs { - listener, err := tls.Listen("tcp4", addr, &tlsConfig) - if err != nil { - log.Warnf("RPCS: Can't listen on %s: %v", addr, - err) - continue - } - listeners = append(listeners, listener) - } - - for _, addr := range ipv6ListenAddrs { - listener, err := tls.Listen("tcp6", addr, &tlsConfig) - if err != nil { - log.Warnf("RPCS: Can't listen on %s: %v", addr, - err) - continue - } - listeners = append(listeners, listener) - } - if len(listeners) == 0 { - return nil, errors.New("no valid listen address") - } - - s.listeners = listeners - - return &s, nil -} - -// genCertPair generates a key/cert pair to the paths provided. -func genCertPair(certFile, keyFile string) error { - log.Infof("Generating TLS certificates...") - - // Create directories for cert and key files if they do not yet exist. - certDir, _ := filepath.Split(certFile) - keyDir, _ := filepath.Split(keyFile) - if err := os.MkdirAll(certDir, 0700); err != nil { - return err - } - if err := os.MkdirAll(keyDir, 0700); err != nil { - return err - } - - // Generate cert pair. - org := "btcwallet autogenerated cert" - validUntil := time.Now().Add(10 * 365 * 24 * time.Hour) - cert, key, err := btcutil.NewTLSCertPair(org, validUntil, nil) - if err != nil { - return err - } - - // Write cert and key files. - if err = ioutil.WriteFile(certFile, cert, 0666); err != nil { - return err - } - if err = ioutil.WriteFile(keyFile, key, 0600); err != nil { - if rmErr := os.Remove(certFile); rmErr != nil { - log.Warnf("Cannot remove written certificates: %v", rmErr) - } - return err - } - - log.Info("Done generating TLS certificates") - return nil -} - -// ReplyToFrontend responds to a marshaled JSON-RPC request with a -// marshaled JSON-RPC response for both standard and extension -// (websocket) clients. The returned error is ErrBadAuth if a -// missing, incorrect, or duplicate authentication request is -// received. -func (s *server) ReplyToFrontend(msg []byte, ws, authenticated bool) ([]byte, error) { - cmd, parseErr := btcjson.ParseMarshaledCmd(msg) - var id interface{} - if cmd != nil { - id = cmd.Id() - } - - // If client is not already authenticated, the parsed request must - // be for authentication. - authCmd, ok := cmd.(*btcws.AuthenticateCmd) - if authenticated { - if ok { - // Duplicate auth request. - return nil, ErrBadAuth - } - } else { - if !ok { - // The first unauthenticated request must be an auth request. - return nil, ErrBadAuth - } - - // Check credentials. - login := authCmd.Username + ":" + authCmd.Passphrase - auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) - authSha := sha256.Sum256([]byte(auth)) - cmp := subtle.ConstantTimeCompare(authSha[:], s.authsha[:]) - if cmp != 1 { - return nil, ErrBadAuth - } - return nil, nil - } - - if parseErr != nil { - response := btcjson.Reply{ - Id: &id, - Error: &btcjson.ErrInvalidRequest, - } - mresponse, err := json.Marshal(response) - // We expect the marshal to succeed. If it doesn't, it - // indicates that either jsonErr (which is created by us) or - // the id itself (which was successfully unmashaled) are of - // some non-marshalable type. - if err != nil { - panic(err) - } - return mresponse, nil - } - - cReq := NewClientRequest(cmd, ws) - rawResp := cReq.Handle() - - response := struct { - Jsonrpc string `json:"jsonrpc"` - Id interface{} `json:"id"` - Result *json.RawMessage `json:"result"` - Error *json.RawMessage `json:"error"` - }{ - Jsonrpc: "1.0", - Id: id, - Result: rawResp.Result, - Error: rawResp.Error, - } - mresponse, err := json.Marshal(response) - if err != nil { - log.Errorf("Cannot marshal response: %v", err) - response := btcjson.Reply{ - Id: &id, - Error: &btcjson.ErrInternal, - } - mresponse, err = json.Marshal(&response) - // We expect this marshal to succeed. If it doesn't, btcjson - // returned an id with an non-marshalable type or ErrInternal - // is just plain wrong. - if err != nil { - panic(err) - } - } - - return mresponse, nil -} - -// ServeRPCRequest processes and replies to a JSON-RPC client request. -func (s *server) ServeRPCRequest(w http.ResponseWriter, r *http.Request) { - body, err := btcjson.GetRaw(r.Body) - if err != nil { - log.Errorf("RPCS: Error getting JSON message: %v", err) - } - - resp, err := s.ReplyToFrontend(body, false, true) - if err == ErrBadAuth { - http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) - return - } - if _, err := w.Write(resp); err != nil { - log.Warnf("RPCS: could not respond to RPC request: %v", err) - } -} - -// clientResponseDuplicator listens for new wallet listener channels -// and duplicates messages sent to allClients to all connected clients. -func clientResponseDuplicator() { - clients := make(map[clientContext]struct{}) - - for { - select { - case cc := <-addClient: - clients[cc] = struct{}{} - - case n := <-allClients: - for cc := range clients { - select { - case <-cc.quit: - delete(clients, cc) - case cc.send <- n: - } - } - } - } -} - -// NotifyBtcdConnection notifies a frontend of the current connection -// status of btcwallet to btcd. -func NotifyBtcdConnection(reply chan []byte) { - if btcd, ok := CurrentServerConn().(*BtcdRPCConn); ok { - ntfn := btcws.NewBtcdConnectedNtfn(btcd.Connected()) - mntfn, err := ntfn.MarshalJSON() - // btcws notifications must always marshal without error. - if err != nil { - panic(err) - } - reply <- mntfn - } -} - -// stringQueue manages a queue of strings, reading from in and sending -// the oldest unsent to out. This handler closes out and returns after -// in is closed and any queued items are sent. Any reads on quit result -// in immediate shutdown of the handler. -func stringQueue(in <-chan string, out chan<- string, quit <-chan struct{}) { - var q []string - var dequeue chan<- string - skipQueue := out - var next string -out: - for { - select { - case n, ok := <-in: - if !ok { - // Sender closed input channel. Nil channel - // and continue so the remaining queued - // items may be sent. If the queue is empty, - // break out of the loop. - in = nil - if dequeue == nil { - break out - } - continue - } - - // Either send to out immediately if skipQueue is - // non-nil (queue is empty) and reader is ready, - // or append to the queue and send later. - select { - case skipQueue <- n: - default: - q = append(q, n) - dequeue = out - skipQueue = nil - next = q[0] - } - - case dequeue <- next: - copy(q, q[1:]) - q[len(q)-1] = "" // avoid leak - q = q[:len(q)-1] - if len(q) == 0 { - // If the input chan was closed and nil'd, - // break out of the loop. - if in == nil { - break out - } - dequeue = nil - skipQueue = out - } else { - next = q[0] - } - - case <-quit: - break out - } - } - close(out) -} - -// WSSendRecv is the handler for websocket client connections. It loops -// forever (until disconnected), reading JSON-RPC requests and sending -// sending responses and notifications. -func (s *server) WSSendRecv(ws *websocket.Conn, remoteAddr string, authenticated bool) { - // Clear the read deadline set before the websocket hijacked - // the connection. - if err := ws.SetReadDeadline(time.Time{}); err != nil { - log.Warnf("Cannot remove read deadline: %v", err) - } - - // Add client context so notifications duplicated to each - // client are received by this client. - recvQuit := make(chan struct{}) - sendQuit := make(chan struct{}) - cc := clientContext{ - send: make(chan []byte, 1), // buffer size is number of initial notifications - quit: make(chan struct{}), - } - go func() { - select { - case <-recvQuit: - case <-sendQuit: - } - log.Infof("Disconnected websocket client %s", remoteAddr) - close(cc.quit) - }() - log.Infof("New websocket client %s", remoteAddr) - - NotifyBtcdConnection(cc.send) // TODO(jrick): clients should explicitly request this. - addClient <- cc - - // received passes all received messages from the currently connected - // frontend to the for-select loop. It is closed when reading a - // message from the websocket connection fails (presumably due to - // a disconnected client). - recvQueueIn := make(chan string) - - // Receive messages from websocket and send across jsonMsgs until - // connection is lost - go func() { - for { - var m string - if err := websocket.Message.Receive(ws, &m); err != nil { - select { - case <-sendQuit: - // Do not log error. - - default: - if err != io.EOF { - log.Warnf("Websocket receive failed from client %s: %v", - remoteAddr, err) - } - } - close(recvQueueIn) - close(recvQuit) - return - } - recvQueueIn <- m - } - }() - - // Manage queue of received messages for LIFO processing. - recvQueueOut := make(chan string) - go stringQueue(recvQueueIn, recvQueueOut, cc.quit) - - badAuth := make(chan struct{}) - sendResp := make(chan []byte) - go func() { - out: - for m := range recvQueueOut { - resp, err := s.ReplyToFrontend([]byte(m), true, authenticated) - if err == ErrBadAuth { - select { - case badAuth <- struct{}{}: - case <-cc.quit: - } - break out - } - - // Authentication passed. - authenticated = true - - select { - case sendResp <- resp: - case <-cc.quit: - break out - } - } - close(sendResp) - }() - - const deadline time.Duration = 2 * time.Second - -out: - for { - var m []byte - var ok bool - - select { - case <-badAuth: - // Bad auth. Disconnect. - log.Warnf("Disconnecting unauthorized websocket client %s", remoteAddr) - break out - - case m = <-cc.send: // sends from external writers. never closes. - case m, ok = <-sendResp: - if !ok { - // Nothing left to send. Return so the handler exits. - break out - } - case <-cc.quit: - break out - } - - err := ws.SetWriteDeadline(time.Now().Add(deadline)) - if err != nil { - log.Errorf("Cannot set write deadline on client %s: %v", remoteAddr, err) - break out - } - err = websocket.Message.Send(ws, string(m)) - if err != nil { - log.Warnf("Websocket send failed to client %s: %v", remoteAddr, err) - break out - } - } - close(sendQuit) - log.Tracef("Leaving function WSSendRecv") -} - -// NotifyNewBlockChainHeight notifies all frontends of a new -// blockchain height. This sends the same notification as -// btcd, so this can probably be removed. -func NotifyNewBlockChainHeight(reply chan []byte, bs wallet.BlockStamp) { - ntfn := btcws.NewBlockConnectedNtfn(bs.Hash.String(), bs.Height) - mntfn, err := ntfn.MarshalJSON() - // btcws notifications must always marshal without error. - if err != nil { - panic(err) - } - reply <- mntfn -} - -var duplicateOnce sync.Once - -// Start starts a HTTP server to provide standard RPC and extension -// websocket connections for any number of btcwallet frontends. -func (s *server) Start() { - // A duplicator for notifications intended for all clients runs - // in another goroutines. Any such notifications are sent to - // the allClients channel and then sent to each connected client. - // - // Use a sync.Once to insure no extra duplicators run. - go duplicateOnce.Do(clientResponseDuplicator) - - log.Trace("Starting RPC server") - - serveMux := http.NewServeMux() - const rpcAuthTimeoutSeconds = 10 - httpServer := &http.Server{ - Handler: serveMux, - - // Timeout connections which don't complete the initial - // handshake within the allowed timeframe. - ReadTimeout: time.Second * rpcAuthTimeoutSeconds, - } - serveMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - if err := s.checkAuth(r); err != nil { - log.Warnf("Unauthorized client connection attempt") - http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) - return - } - s.ServeRPCRequest(w, r) - }) - serveMux.HandleFunc("/frontend", func(w http.ResponseWriter, r *http.Request) { - authenticated := false - if err := s.checkAuth(r); err != nil { - // If auth was supplied but incorrect, rather than simply being - // missing, immediately terminate the connection. - if err != ErrNoAuth { - log.Warnf("Disconnecting improperly authorized websocket client") - http.Error(w, "401 Unauthorized.", http.StatusUnauthorized) - return - } - } else { - authenticated = true - } - - // A new Server instance is created rather than just creating the - // handler closure since the default server will disconnect the - // client if the origin is unset. - wsServer := websocket.Server{ - Handler: websocket.Handler(func(ws *websocket.Conn) { - s.WSSendRecv(ws, r.RemoteAddr, authenticated) - }), - } - wsServer.ServeHTTP(w, r) - }) - for _, listener := range s.listeners { - s.wg.Add(1) - go func(listener net.Listener) { - log.Infof("RPCS: RPC server listening on %s", listener.Addr()) - if err := httpServer.Serve(listener); err != nil { - log.Errorf("Listener for %s exited with error: %v", - listener.Addr(), err) - } - log.Tracef("RPCS: RPC listener done for %s", listener.Addr()) - s.wg.Done() - }(listener) - } -} - -// checkAuth checks the HTTP Basic authentication supplied by a frontend -// in the HTTP request r. If the frontend's supplied authentication does -// not match the username and password expected, a non-nil error is -// returned. -// -// This check is time-constant. -func (s *server) checkAuth(r *http.Request) error { - authhdr := r.Header["Authorization"] - if len(authhdr) == 0 { - return ErrNoAuth - } - - authsha := sha256.Sum256([]byte(authhdr[0])) - cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:]) - if cmp != 1 { - return ErrBadAuth - } - return nil -} - -// BtcdWS opens a websocket connection to a btcd instance. -func BtcdWS(certificates []byte) (*websocket.Conn, error) { - url := fmt.Sprintf("wss://%s/ws", cfg.RPCConnect) - config, err := websocket.NewConfig(url, "https://localhost/") - if err != nil { - return nil, err - } - - // btcd uses a self-signed TLS certifiate which is used as the CA. - pool := x509.NewCertPool() - pool.AppendCertsFromPEM(certificates) - config.TlsConfig = &tls.Config{ - RootCAs: pool, - MinVersion: tls.VersionTLS12, - } - - // btcd requires basic authorization, so set the Authorization header. - login := cfg.BtcdUsername + ":" + cfg.BtcdPassword - auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) - config.Header.Add("Authorization", auth) - - // Dial connection. - var ws *websocket.Conn - var cerr error - if cfg.Proxy != "" { - proxy := &socks.Proxy{ - Addr: cfg.Proxy, - Username: cfg.ProxyUser, - Password: cfg.ProxyPass, - } - conn, err := proxy.Dial("tcp", cfg.RPCConnect) - if err != nil { - return nil, err - } - - tlsConn := tls.Client(conn, config.TlsConfig) - ws, cerr = websocket.NewClient(config, tlsConn) - } else { - ws, cerr = websocket.DialConfig(config) - } - if cerr != nil { - return nil, cerr - } - return ws, nil -} - -// BtcdConnect connects to a running btcd instance over a websocket -// for sending and receiving chain-related messages, failing if the -// connection cannot be established or is lost. -func BtcdConnect(certificates []byte) (*BtcdRPCConn, error) { - // Open websocket connection. - ws, err := BtcdWS(certificates) - if err != nil { - log.Errorf("Cannot open websocket connection to btcd: %v", err) - return nil, err - } - - // Create and start RPC connection using the btcd websocket. - rpc := NewBtcdRPCConn(ws) - rpc.Start() - return rpc, nil -} - -// Handshake first checks that the websocket connection between btcwallet and -// btcd is valid, that is, that there are no mismatching settings between -// the two processes (such as running on different Bitcoin networks). If the -// sanity checks pass, all wallets are set to be tracked against chain -// notifications from this btcd connection. -// -// TODO(jrick): Track and Rescan commands should be replaced with a -// single TrackSince function (or similar) which requests address -// notifications and performs the rescan since some block height. -func Handshake(rpc ServerConn) error { - net, err := GetCurrentNet(rpc) - if err != nil { - return err - } - if net != activeNet.Net { - return errors.New("btcd and btcwallet running on different Bitcoin networks") - } - - // Request notifications for connected and disconnected blocks. - if err := NotifyBlocks(rpc); err != nil { - return err - } - - // Get current best block. If this is before than the oldest - // saved block hash, assume that this btcd instance is not yet - // synced up to a previous btcd that was last used with this - // wallet. - bs, err := GetCurBlock() - if err != nil { - return fmt.Errorf("cannot get best block: %v", err) - } - NotifyNewBlockChainHeight(allClients, bs) - NotifyBalances(allClients) - - // Get default account. Only the default account is used to - // track recently-seen blocks. - a, err := AcctMgr.Account("") - if err != nil { - // No account yet is not a handshake error, but means our - // handshake is done. - return nil - } - - // TODO(jrick): if height is less than the earliest-saved block - // height, should probably wait for btcd to catch up. - - // Check that there was not any reorgs done since last connection. - // If so, rollback and rescan to catch up. - it := a.Wallet.NewIterateRecentBlocks() - for cont := it != nil; cont; cont = it.Prev() { - bs := it.BlockStamp() - log.Debugf("Checking for previous saved block with height %v hash %v", - bs.Height, bs.Hash) - - if _, err := GetBlock(rpc, bs.Hash.String()); err != nil { - continue - } - - log.Debug("Found matching block.") - - // If we had to go back to any previous blocks (it.Next - // returns true), then rollback the next and all child blocks. - // This rollback is done here instead of in the blockMissing - // check above for each removed block because Rollback will - // try to write new tx and utxo files on each rollback. - if it.Next() { - bs := it.BlockStamp() - err := AcctMgr.Rollback(bs.Height, &bs.Hash) - if err != nil { - return err - } - } - - // Set default account to be marked in sync with the current - // blockstamp. This invalidates the iterator. - a.Wallet.SetSyncedWith(bs) - - // Begin tracking wallets against this btcd instance. - AcctMgr.Track() - if err := AcctMgr.RescanActiveAddresses(); err != nil { - return err - } - // TODO: Only begin tracking new unspent outputs as a result - // of the rescan. This is also pretty racy, as a new block - // could arrive between rescan and by the time the new outpoint - // is added to btcd's websocket's unspent output set. - AcctMgr.Track() - - // (Re)send any unmined transactions to btcd in case of a btcd restart. - AcctMgr.ResendUnminedTxs() - - // Get current blockchain height and best block hash. - return nil - } - - // Iterator was invalid (wallet has never been synced) or there was a - // huge chain fork + reorg (more than 20 blocks). - AcctMgr.Track() - if err := AcctMgr.RescanActiveAddresses(); err != nil { - return err - } - // TODO: only begin tracking new unspent outputs as a result of the - // rescan. This is also racy (see comment for second Track above). - AcctMgr.Track() - AcctMgr.ResendUnminedTxs() - return nil -}