/* * Copyright (c) 2013 Conformal Systems LLC * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ package main import ( "code.google.com/p/go.net/websocket" "encoding/base64" "encoding/json" "errors" "fmt" "github.com/conformal/btcjson" "github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwire" "github.com/conformal/btcws" "net" "net/http" "sync" ) var ( // ErrConnRefused represents an error where a connection to another // process cannot be established. ErrConnRefused = errors.New("connection refused") // ErrConnLost represents an error where a connection to another // process cannot be established. ErrConnLost = errors.New("connection lost") // Channel for updates and boolean with the most recent update of // whether the connection to btcd is active or not. btcdConnected = struct { b bool c chan bool }{ c: make(chan bool), } // Channel to send messages btcwallet does not understand and requests // from btcwallet to btcd. btcdMsgs = make(chan []byte, 100) // Adds a frontend listener channel addFrontendListener = make(chan (chan []byte)) // Removes a frontend listener channel deleteFrontendListener = make(chan (chan []byte)) // Messages sent to this channel are sent to each connected frontend. frontendNotificationMaster = make(chan []byte, 100) // replyHandlers maps between a unique number (passed as part of // the JSON Id field) and a function to handle a reply or notification // from btcd. As requests are received, this map is checked for a // handler function to route the reply to. If the function returns // true, the handler is removed from the map. replyHandlers = struct { sync.Mutex m map[uint64]func(interface{}, *btcjson.Error) bool }{ m: make(map[uint64]func(interface{}, *btcjson.Error) bool), } // replyRouter maps unique uint64 ids to reply channels, so btcd // replies can be routed to the correct frontend. replyRouter = struct { sync.Mutex m map[uint64]chan []byte }{ m: make(map[uint64]chan []byte), } ) // frontendListenerDuplicator listens for new wallet listener channels // and duplicates messages sent to frontendNotificationMaster to all // connected listeners. func frontendListenerDuplicator() { // frontendListeners is a map holding each currently connected frontend // listener as the key. The value is ignored, as this is only used as // a set. frontendListeners := make(map[chan []byte]bool) // Don't want to add or delete a wallet listener while iterating // through each to propigate to every attached wallet. Use a mutex to // prevent this. var mtx sync.Mutex // Check for listener channels to add or remove from set. go func() { for { select { case c := <-addFrontendListener: mtx.Lock() frontendListeners[c] = true mtx.Unlock() // TODO(jrick): these notifications belong somewhere better. // Probably want to copy AddWalletListener from btcd, and // place these notifications in that function. NotifyBtcdConnected(frontendNotificationMaster, btcdConnected.b) if bs, err := GetCurBlock(); err == nil { NotifyNewBlockChainHeight(c, bs.Height) NotifyBalances(c) } case c := <-deleteFrontendListener: mtx.Lock() delete(frontendListeners, c) mtx.Unlock() } } }() // Duplicate all messages sent across frontendNotificationMaster, as // well as internal btcwallet notifications, to each listening wallet. for { var ntfn []byte select { case conn := <-btcdConnected.c: NotifyBtcdConnected(frontendNotificationMaster, conn) continue case ntfn = <-frontendNotificationMaster: } mtx.Lock() for c := range frontendListeners { c <- ntfn } mtx.Unlock() } } // NotifyBtcdConnected notifies all frontends of a new btcd connection. func NotifyBtcdConnected(reply chan []byte, conn bool) { btcdConnected.b = conn var idStr interface{} = "btcwallet:btcdconnected" r := btcjson.Reply{ Result: conn, Id: &idStr, } ntfn, _ := json.Marshal(r) frontendNotificationMaster <- ntfn } // frontendReqsNotifications is the handler function for websocket // connections from a btcwallet instance. It reads messages from wallet and // sends back replies, as well as notififying wallets of chain updates. // There can possibly be many of these running, one for each currently // connected frontend. func frontendReqsNotifications(ws *websocket.Conn) { // Add frontend notification channel to set so this handler receives // updates. frontendNotification := make(chan []byte) addFrontendListener <- frontendNotification defer func() { deleteFrontendListener <- frontendNotification }() // jsonMsgs receives JSON messages from the currently connected frontend. jsonMsgs := make(chan []byte) // Receive messages from websocket and send across jsonMsgs until // connection is lost go func() { for { var m []byte if err := websocket.Message.Receive(ws, &m); err != nil { close(jsonMsgs) return } jsonMsgs <- m } }() for { select { case m, ok := <-jsonMsgs: if !ok { // frontend disconnected. return } // Handle JSON message here. go ProcessFrontendMsg(frontendNotification, m) case ntfn, _ := <-frontendNotification: if err := websocket.Message.Send(ws, ntfn); err != nil { // Frontend disconnected. return } } } } // BtcdHandler listens for replies and notifications from btcd over a // websocket and sends messages that btcwallet does not understand to // btcd. Unlike FrontendHandler, exactly one BtcdHandler goroutine runs. func BtcdHandler(ws *websocket.Conn) { // Notification channel to return from listener goroutine when // btcd disconnects. disconnected := make(chan int) defer func() { close(disconnected) }() // Listen for replies/notifications from btcd, and decide how to handle them. replies := make(chan []byte) go func() { defer close(replies) for { select { case <-disconnected: return default: var m []byte if err := websocket.Message.Receive(ws, &m); err != nil { return } replies <- m } } }() for { select { case rply, ok := <-replies: if !ok { // btcd disconnected return } // Handle message here. go ProcessBtcdNotificationReply(rply) case r := <-btcdMsgs: if err := websocket.Message.Send(ws, r); err != nil { // btcd disconnected. log.Errorf("Unable to send message to btcd: %v", err) return } } } } type notificationHandler func(btcws.Notification) var notificationHandlers = map[string]notificationHandler{ btcws.BlockConnectedNtfnId: NtfnBlockConnected, btcws.BlockDisconnectedNtfnId: NtfnBlockDisconnected, btcws.TxMinedNtfnId: NtfnTxMined, } // ProcessBtcdNotificationReply unmarshalls the JSON notification or // reply received from btcd and decides how to handle it. Replies are // routed back to the frontend who sent the message, and wallet // notifications are processed by btcwallet, and frontend notifications // are sent to every connected frontend. func ProcessBtcdNotificationReply(b []byte) { // Check if the json id field was set by btcwallet. var routeID uint64 var origID string var r btcjson.Reply if err := json.Unmarshal(b, &r); err != nil { log.Errorf("Unable to unmarshal btcd message: %v", err) return } if r.Id == nil { // btcd should only ever be sending JSON messages with a string in // the id field. Log the error and drop the message. log.Error("Unable to process btcd notification or reply.") return } idStr, ok := (*r.Id).(string) if !ok { // btcd should only ever be sending JSON messages with a string in // the id field. Log the error and drop the message. log.Error("Incorrect btcd notification id type.") return } n, _ := fmt.Sscanf(idStr, "btcwallet(%d)-%s", &routeID, &origID) if n == 1 { // Request originated from btcwallet. Run and remove correct // handler. replyHandlers.Lock() f := replyHandlers.m[routeID] replyHandlers.Unlock() if f != nil { go func() { if f(r.Result, r.Error) { replyHandlers.Lock() delete(replyHandlers.m, routeID) replyHandlers.Unlock() } }() } } else if n == 2 { // Attempt to route btcd reply to correct frontend. replyRouter.Lock() c := replyRouter.m[routeID] if c != nil { delete(replyRouter.m, routeID) } else { // Can't route to a frontend, drop reply. log.Info("Unable to route btcd reply to frontend. Dropping.") return } replyRouter.Unlock() // Convert string back to number if possible. var origIDNum float64 n, _ := fmt.Sscanf(origID, "%f", &origIDNum) var id interface{} if n == 1 { id = origIDNum } else { id = origID } r.Id = &id b, err := json.Marshal(r) if err != nil { log.Error("Error marshalling btcd reply. Dropping.") return } c <- b } else { // Message is a btcd notification. Check the id and dispatch // correct handler, or if no handler, pass up to each wallet. if ntfnHandler, ok := notificationHandlers[idStr]; ok { n, err := btcws.ParseMarshaledNtfn(idStr, b) if err != nil { log.Errorf("Error unmarshaling expected "+ "notification: %v", err) return } ntfnHandler(n) return } frontendNotificationMaster <- b } } // NotifyNewBlockChainHeight notifies all frontends of a new // blockchain height. func NotifyNewBlockChainHeight(reply chan []byte, height int32) { var id interface{} = "btcwallet:newblockchainheight" msgRaw := &btcjson.Reply{ Result: height, Id: &id, } msg, _ := json.Marshal(msgRaw) reply <- msg } // NtfnBlockConnected handles btcd notifications resulting from newly // connected blocks to the main blockchain. func NtfnBlockConnected(n btcws.Notification) { bcn, ok := n.(*btcws.BlockConnectedNtfn) if !ok { log.Errorf("%v handler: unexpected type", n.Id()) return } hash, err := btcwire.NewShaHashFromStr(bcn.Hash) if err != nil { log.Errorf("%v handler: invalid hash string", n.Id()) return } // Update the blockstamp for the newly-connected block. bs := &wallet.BlockStamp{ Height: bcn.Height, Hash: *hash, } curBlock.Lock() curBlock.BlockStamp = *bs curBlock.Unlock() // btcd notifies btcwallet about transactions first, and then sends // the new block notification. New balance notifications for txs // in blocks are therefore sent here after all tx notifications // have arrived. // // TODO(jrick): send frontend tx notifications once that's // implemented. for _, w := range wallets.m { // Mark wallet as being synced with the new blockstamp. w.mtx.Lock() w.Wallet.SetSyncedWith(bs) w.mtx.Unlock() // The UTXO store will be dirty if it was modified // from a tx notification. if w.UtxoStore.dirty { // Notify all frontends of account's new unconfirmed // and confirmed balance. confirmed := w.CalculateBalance(1) unconfirmed := w.CalculateBalance(0) - confirmed NotifyWalletBalance(frontendNotificationMaster, w.name, confirmed) NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, w.name, unconfirmed) } // The account is intentionaly not immediately synced to disk. // If btcd is performing an IBD, writing the wallet file for // each newly-connected block would result in too many // unnecessary disk writes. The UTXO and transaction stores // could be written, but in the case of btcwallet closing // before writing the dirty wallet, both would have to be // pruned anyways. // // Instead, the wallet is queued to be written to disk at the // next scheduled disk sync. w.dirty = true dirtyWallets.Lock() dirtyWallets.m[w] = true dirtyWallets.Unlock() } // Notify frontends of new blockchain height. NotifyNewBlockChainHeight(frontendNotificationMaster, bcn.Height) } // NtfnBlockDisconnected handles btcd notifications resulting from // blocks disconnected from the main chain in the event of a chain // switch and notifies frontends of the new blockchain height. // // TODO(jrick): Rollback Utxo and Tx data func NtfnBlockDisconnected(n btcws.Notification) { bdn, ok := n.(*btcws.BlockDisconnectedNtfn) if !ok { log.Errorf("%v handler: unexpected type", n.Id()) return } hash, err := btcwire.NewShaHashFromStr(bdn.Hash) if err != nil { log.Errorf("%v handler: invalid hash string", n.Id()) return } // Rollback Utxo and Tx data stores. go func() { wallets.Rollback(bdn.Height, hash) }() // Notify frontends of new blockchain height. NotifyNewBlockChainHeight(frontendNotificationMaster, bdn.Height) } // NtfnTxMined handles btcd notifications resulting from newly // mined transactions that originated from this wallet. func NtfnTxMined(n btcws.Notification) { tmn, ok := n.(*btcws.TxMinedNtfn) if !ok { log.Errorf("%v handler: unexpected type", n.Id()) return } hash, err := btcwire.NewShaHashFromStr(tmn.Hash) if err != nil { log.Errorf("%v handler: invalid hash string", n.Id()) return } // Remove mined transaction from pool. UnminedTxs.Lock() delete(UnminedTxs.m, TXID(hash[:])) UnminedTxs.Unlock() } var duplicateOnce sync.Once // FrontendListenAndServe starts a HTTP server to provide websocket // connections for any number of btcwallet frontends. func FrontendListenAndServe() error { // We'll need to duplicate replies to frontends to each frontend. // Replies are sent to frontendReplyMaster, and duplicated to each valid // channel in frontendReplySet. This runs a goroutine to duplicate // requests for each channel in the set. // // Use a sync.Once to insure no extra duplicators run. go duplicateOnce.Do(frontendListenerDuplicator) // TODO(jrick): We need some sort of authentication before websocket // connections are allowed, and perhaps TLS on the server as well. http.Handle("/frontend", websocket.Handler(frontendReqsNotifications)) return http.ListenAndServe(net.JoinHostPort("", cfg.SvrPort), nil) } // BtcdConnect connects to a running btcd instance over a websocket // for sending and receiving chain-related messages, failing if the // connection cannot be established or is lost. func BtcdConnect(reply chan error) { // btcd requires basic authorization, so we use a custom config with // the Authorization header set. server := fmt.Sprintf("ws://%s/wallet", net.JoinHostPort("localhost", cfg.BtcdPort)) login := cfg.Username + ":" + cfg.Password auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) config, err := websocket.NewConfig(server, "http://localhost/") if err != nil { reply <- ErrConnRefused return } config.Header.Add("Authorization", auth) // Attempt to connect to running btcd instance. Bail if it fails. btcdws, err := websocket.DialConfig(config) if err != nil { reply <- ErrConnRefused return } reply <- nil // Remove all reply handlers (if any exist from an old connection). replyHandlers.Lock() for k := range replyHandlers.m { delete(replyHandlers.m, k) } replyHandlers.Unlock() handlerClosed := make(chan int) go func() { BtcdHandler(btcdws) close(handlerClosed) }() BtcdHandshake(btcdws) <-handlerClosed reply <- ErrConnLost } // resendUnminedTxs resends any transactions in the unmined // transaction pool to btcd using the 'sendrawtransaction' RPC // command. func resendUnminedTxs() { for _, createdTx := range UnminedTxs.m { n := <-NewJSONID var id interface{} = fmt.Sprintf("btcwallet(%v)", n) m, err := btcjson.CreateMessageWithId("sendrawtransaction", id, string(createdTx.rawTx)) if err != nil { log.Errorf("cannot create resend request: %v", err) continue } replyHandlers.Lock() replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool { // Do nothing, just remove the handler. return true } replyHandlers.Unlock() btcdMsgs <- m } } // BtcdHandshake first checks that the websocket connection between // btcwallet and btcd is valid, that is, that there are no mismatching // settings between the two processes (such as running on different // Bitcoin networks). If the sanity checks pass, all wallets are set to // be tracked against chain notifications from this btcd connection. func BtcdHandshake(ws *websocket.Conn) { n := <-NewJSONID cmd := btcws.NewGetCurrentNetCmd(fmt.Sprintf("btcwallet(%v)", n)) mcmd, err := cmd.MarshalJSON() if err != nil { log.Errorf("Cannot complete btcd handshake: %v", err) ws.Close() return } correctNetwork := make(chan bool) replyHandlers.Lock() replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool { fnet, ok := result.(float64) if !ok { log.Error("btcd handshake: result is not a number") ws.Close() correctNetwork <- false return true } var walletNetwork btcwire.BitcoinNet if cfg.MainNet { walletNetwork = btcwire.MainNet } else { walletNetwork = btcwire.TestNet3 } correctNetwork <- btcwire.BitcoinNet(fnet) == walletNetwork // No additional replies expected, remove handler. return true } replyHandlers.Unlock() btcdMsgs <- mcmd if !<-correctNetwork { log.Error("btcd and btcwallet running on different Bitcoin networks") ws.Close() return } // TODO(jrick): Check that there was not any reorgs done // since last connection. If so, rollback and rescan to // catch up. for _, w := range wallets.m { w.RescanToBestBlock() } // Begin tracking wallets against this btcd instance. for _, w := range wallets.m { w.Track() } // (Re)send any unmined transactions to btcd in case of a btcd restart. resendUnminedTxs() // Get current blockchain height and best block hash. if bs, err := GetCurBlock(); err == nil { NotifyNewBlockChainHeight(frontendNotificationMaster, bs.Height) NotifyBalances(frontendNotificationMaster) } }