lbcwallet/btcdrpc.go
Josh Rickmar 830829a79f Add dirty wallets to disc sync schedule.
There were several places where various account files (wallet, tx, or
utxo stores) were being marked as dirty, and then not being either
immediately synced to disk or marked as a dirty account so they would
be scheduled to be synced to disk.  This change adds Account functions
to mark as dirty and add the account to the map of scheduled accounts
so they won't be missed by the disk syncer goroutine.
2014-01-23 12:38:39 -05:00

536 lines
15 KiB
Go

/*
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
// This file implements the websocket RPC connection to a btcd instance.
package main
import (
"code.google.com/p/go.net/websocket"
"encoding/hex"
"encoding/json"
"errors"
"github.com/conformal/btcjson"
"github.com/conformal/btcutil"
"github.com/conformal/btcwallet/tx"
"github.com/conformal/btcwallet/wallet"
"github.com/conformal/btcwire"
"github.com/conformal/btcws"
"sync"
"time"
)
// ErrBtcdDisconnected describes an error where an operation cannot
// successfully complete due to btcwallet not being connected to
// btcd.
var ErrBtcdDisconnected = btcjson.Error{
Code: -1,
Message: "btcd disconnected",
}
// BtcdRPCConn is a type managing a client connection to a btcd RPC server
// over websockets.
type BtcdRPCConn struct {
ws *websocket.Conn
addRequest chan *AddRPCRequest
closed chan struct{}
}
// Ensure that BtcdRPCConn can be used as an RPCConn.
var _ RPCConn = &BtcdRPCConn{}
// NewBtcdRPCConn creates a new RPC connection from a btcd websocket
// connection to btcd.
func NewBtcdRPCConn(ws *websocket.Conn) *BtcdRPCConn {
conn := &BtcdRPCConn{
ws: ws,
addRequest: make(chan *AddRPCRequest),
closed: make(chan struct{}),
}
return conn
}
// SendRequest sends an RPC request and returns a channel to read the response's
// result and error. Part of the RPCConn interface.
func (btcd *BtcdRPCConn) SendRequest(request *RPCRequest) chan *RPCResponse {
select {
case <-btcd.closed:
// The connection has closed, so instead of adding and sending
// a request, return a channel that just replies with the
// error for a disconnected btcd.
responseChan := make(chan *RPCResponse)
go func() {
response := &RPCResponse{
Err: &ErrBtcdDisconnected,
}
responseChan <- response
}()
return responseChan
default:
addRequest := &AddRPCRequest{
Request: request,
ResponseChan: make(chan chan *RPCResponse),
}
btcd.addRequest <- addRequest
return <-addRequest.ResponseChan
}
}
// Connected returns whether the connection remains established to the RPC
// server.
//
// This function probably should be removed, as any checks for confirming
// the connection are no longer valid after the check and may result in
// races.
func (btcd *BtcdRPCConn) Connected() bool {
select {
case <-btcd.closed:
return false
default:
return true
}
}
// AddRPCRequest is used to add an RPCRequest to the pool of requests
// being manaaged by a btcd RPC connection.
type AddRPCRequest struct {
Request *RPCRequest
ResponseChan chan chan *RPCResponse
}
// send performs the actual send of the marshaled request over the btcd
// websocket connection.
func (btcd *BtcdRPCConn) send(rpcrequest *RPCRequest) error {
// btcjson.Cmds define their own MarshalJSON which returns an error
// to satisify the json.Marshaler interface, but will never error.
mrequest, _ := rpcrequest.request.MarshalJSON()
return websocket.Message.Send(btcd.ws, mrequest)
}
type receivedResponse struct {
id uint64
raw string
reply *btcjson.Reply
}
// Start starts the goroutines required to send RPC requests and listen for
// replies.
func (btcd *BtcdRPCConn) Start() {
done := btcd.closed
responses := make(chan *receivedResponse)
// Maintain a map of JSON IDs to RPCRequests currently being waited on.
go func() {
m := make(map[uint64]*RPCRequest)
for {
select {
case addrequest := <-btcd.addRequest:
rpcrequest := addrequest.Request
m[rpcrequest.request.Id().(uint64)] = rpcrequest
if err := btcd.send(rpcrequest); err != nil {
// Connection lost.
btcd.ws.Close()
close(done)
}
addrequest.ResponseChan <- rpcrequest.response
case recvResponse := <-responses:
rpcrequest, ok := m[recvResponse.id]
if !ok {
log.Warnf("Received unexpected btcd response")
continue
}
delete(m, recvResponse.id)
// If no result var was set, create and send
// send the response unmarshaled by the json
// package.
if rpcrequest.result == nil {
response := &RPCResponse{
Result: recvResponse.reply.Result,
Err: recvResponse.reply.Error,
}
rpcrequest.response <- response
continue
}
// A return var was set, so unmarshal again
// into the var before sending the response.
r := &btcjson.Reply{
Result: rpcrequest.result,
}
json.Unmarshal([]byte(recvResponse.raw), &r)
response := &RPCResponse{
Result: r.Result,
Err: r.Error,
}
rpcrequest.response <- response
case <-done:
for _, request := range m {
response := &RPCResponse{
Err: &ErrBtcdDisconnected,
}
request.response <- response
}
return
}
}
}()
// Listen for replies/notifications from btcd, and decide how to handle them.
go func() {
// Idea: instead of reading btcd messages from just one websocket
// connection, maybe use two so the same connection isn't used
// for both notifications and responses? Should make handling
// must faster as unnecessary unmarshal attempts could be avoided.
for {
var m string
if err := websocket.Message.Receive(btcd.ws, &m); err != nil {
log.Debugf("Cannot receive btcd message: %v", err)
close(done)
return
}
// Try notifications (requests with nil ids) first.
n, err := unmarshalNotification(m)
if err == nil {
// Begin processing the notification.
go processNotification(n, m)
continue
}
// Must be a response.
r, err := unmarshalResponse(m)
if err == nil {
responses <- r
continue
}
// Not sure what was received but it isn't correct.
log.Warnf("Received invalid message from btcd")
}
}()
}
// unmarshalResponse attempts to unmarshal a marshaled JSON-RPC
// response.
func unmarshalResponse(s string) (*receivedResponse, error) {
var r btcjson.Reply
if err := json.Unmarshal([]byte(s), &r); err != nil {
return nil, err
}
// Check for a valid ID.
if r.Id == nil {
return nil, errors.New("id is nil")
}
fid, ok := (*r.Id).(float64)
if !ok {
return nil, errors.New("id is not a number")
}
response := &receivedResponse{
id: uint64(fid),
raw: s,
reply: &r,
}
return response, nil
}
// unmarshalNotification attempts to unmarshal a marshaled JSON-RPC
// notification (Request with a nil or no ID).
func unmarshalNotification(s string) (btcjson.Cmd, error) {
req, err := btcjson.ParseMarshaledCmd([]byte(s))
if err != nil {
return nil, err
}
if req.Id() != nil {
return nil, errors.New("id is non-nil")
}
return req, nil
}
// processNotification checks for a handler for a notification, and sends
func processNotification(n btcjson.Cmd, s string) {
// Message is a btcd notification. Check the method and dispatch
// correct handler, or if no handler, pass up to each wallet.
if ntfnHandler, ok := notificationHandlers[n.Method()]; ok {
log.Debugf("Running notification handler for method %v",
n.Method())
ntfnHandler(n, []byte(s))
} else {
// No handler; send to all wallets.
log.Debugf("Sending notification with method %v to all wallets",
n.Method())
frontendNotificationMaster <- []byte(s)
}
}
type notificationHandler func(btcjson.Cmd, []byte)
var notificationHandlers = map[string]notificationHandler{
btcws.BlockConnectedNtfnMethod: NtfnBlockConnected,
btcws.BlockDisconnectedNtfnMethod: NtfnBlockDisconnected,
btcws.ProcessedTxNtfnMethod: NtfnProcessedTx,
btcws.TxMinedNtfnMethod: NtfnTxMined,
btcws.TxSpentNtfnMethod: NtfnTxSpent,
}
// NtfnProcessedTx handles the btcws.ProcessedTxNtfn notification.
func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) {
ptn, ok := n.(*btcws.ProcessedTxNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
// Create useful types from the JSON strings.
receiver, err := btcutil.DecodeAddr(ptn.Receiver)
if err != nil {
log.Errorf("%v handler: error parsing receiver: %v", n.Method(), err)
return
}
txID, err := btcwire.NewShaHashFromStr(ptn.TxID)
if err != nil {
log.Errorf("%v handler: error parsing txid: %v", n.Method(), err)
return
}
blockHash, err := btcwire.NewShaHashFromStr(ptn.BlockHash)
if err != nil {
log.Errorf("%v handler: error parsing block hash: %v", n.Method(), err)
return
}
pkscript, err := hex.DecodeString(ptn.PkScript)
if err != nil {
log.Errorf("%v handler: error parsing pkscript: %v", n.Method(), err)
return
}
// Lookup account for address in result.
aname, err := LookupAccountByAddress(ptn.Receiver)
if err == ErrNotFound {
log.Warnf("Received rescan result for unknown address %v", ptn.Receiver)
return
}
a, err := accountstore.Account(aname)
if err == ErrAcctNotExist {
log.Errorf("Missing account for rescaned address %v", ptn.Receiver)
}
// Create RecvTx to add to tx history.
t := &tx.RecvTx{
TxID: *txID,
TxOutIdx: ptn.TxOutIndex,
TimeReceived: time.Now().Unix(),
BlockHeight: ptn.BlockHeight,
BlockHash: *blockHash,
BlockIndex: int32(ptn.BlockIndex),
BlockTime: ptn.BlockTime,
Amount: ptn.Amount,
ReceiverHash: receiver.ScriptAddress(),
}
// For transactions originating from this wallet, the sent tx history should
// be recorded before the received history. If wallet created this tx, wait
// for the sent history to finish being recorded before continuing.
req := SendTxHistSyncRequest{
txid: *txID,
response: make(chan SendTxHistSyncResponse),
}
SendTxHistSyncChans.access <- req
resp := <-req.response
if resp.ok {
// Wait until send history has been recorded.
<-resp.c
SendTxHistSyncChans.remove <- *txID
}
// Record the tx history.
a.TxStore.Lock()
a.TxStore.s.InsertRecvTx(t)
a.MarkDirtyTxStore()
a.TxStore.Unlock()
// Notify frontends of tx. If the tx is unconfirmed, it is always
// notified and the outpoint is marked as notified. If the outpoint
// has already been notified and is now in a block, a txmined notifiction
// should be sent once to let frontends that all previous send/recvs
// for this unconfirmed tx are now confirmed.
recvTxOP := btcwire.NewOutPoint(txID, ptn.TxOutIndex)
previouslyNotifiedReq := NotifiedRecvTxRequest{
op: *recvTxOP,
response: make(chan NotifiedRecvTxResponse),
}
NotifiedRecvTxChans.access <- previouslyNotifiedReq
if <-previouslyNotifiedReq.response {
NotifyMinedTx <- t
NotifiedRecvTxChans.remove <- *recvTxOP
} else {
// Notify frontends of new recv tx and mark as notified.
NotifiedRecvTxChans.add <- *recvTxOP
NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(),
ptn.BlockHeight, a.Wallet.Net())[0])
}
if !ptn.Spent {
u := &tx.Utxo{
Amt: uint64(ptn.Amount),
Height: ptn.BlockHeight,
Subscript: pkscript,
}
copy(u.Out.Hash[:], txID[:])
u.Out.Index = uint32(ptn.TxOutIndex)
copy(u.AddrHash[:], receiver.ScriptAddress())
copy(u.BlockHash[:], blockHash[:])
a.UtxoStore.Lock()
a.UtxoStore.s.Insert(u)
a.MarkDirtyUtxoStore()
a.UtxoStore.Unlock()
// If this notification came from mempool, notify frontends of
// the new unconfirmed balance immediately. Otherwise, wait until
// the blockconnected notifiation is processed.
if u.Height == -1 {
bal := a.CalculateBalance(0) - a.CalculateBalance(1)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster,
a.name, bal)
}
}
// Notify frontends of new account balance.
confirmed := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - confirmed
NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, unconfirmed)
}
// NtfnBlockConnected handles btcd notifications resulting from newly
// connected blocks to the main blockchain.
//
// TODO(jrick): Send block time with notification. This will be used
// to mark wallet files with a possibly-better earliest block height,
// and will greatly reduce rescan times for wallets created with an
// out of sync btcd.
func NtfnBlockConnected(n btcjson.Cmd, marshaled []byte) {
bcn, ok := n.(*btcws.BlockConnectedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
hash, err := btcwire.NewShaHashFromStr(bcn.Hash)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
// Update the blockstamp for the newly-connected block.
bs := &wallet.BlockStamp{
Height: bcn.Height,
Hash: *hash,
}
curBlock.Lock()
curBlock.BlockStamp = *bs
curBlock.Unlock()
// btcd notifies btcwallet about transactions first, and then sends
// the new block notification. New balance notifications for txs
// in blocks are therefore sent here after all tx notifications
// have arrived and finished being processed by the handlers.
workers := NotifyBalanceRequest{
block: *hash,
wg: make(chan *sync.WaitGroup),
}
NotifyBalanceSyncerChans.access <- workers
if wg := <-workers.wg; wg != nil {
wg.Wait()
NotifyBalanceSyncerChans.remove <- *hash
}
accountstore.BlockNotify(bs)
// Pass notification to frontends too.
frontendNotificationMaster <- marshaled
}
// NtfnBlockDisconnected handles btcd notifications resulting from
// blocks disconnected from the main chain in the event of a chain
// switch and notifies frontends of the new blockchain height.
func NtfnBlockDisconnected(n btcjson.Cmd, marshaled []byte) {
bdn, ok := n.(*btcws.BlockDisconnectedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
hash, err := btcwire.NewShaHashFromStr(bdn.Hash)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
// Rollback Utxo and Tx data stores.
go func() {
accountstore.Rollback(bdn.Height, hash)
}()
// Pass notification to frontends too.
frontendNotificationMaster <- marshaled
}
// NtfnTxMined handles btcd notifications resulting from newly
// mined transactions that originated from this wallet.
func NtfnTxMined(n btcjson.Cmd, marshaled []byte) {
tmn, ok := n.(*btcws.TxMinedNtfn)
if !ok {
log.Errorf("%v handler: unexpected type", n.Method())
return
}
txid, err := btcwire.NewShaHashFromStr(tmn.TxID)
if err != nil {
log.Errorf("%v handler: invalid hash string", n.Method())
return
}
blockhash, err := btcwire.NewShaHashFromStr(tmn.BlockHash)
if err != nil {
log.Errorf("%v handler: invalid block hash string", n.Method())
return
}
err = accountstore.RecordMinedTx(txid, blockhash,
tmn.BlockHeight, tmn.Index, tmn.BlockTime)
if err != nil {
log.Errorf("%v handler: %v", n.Method(), err)
return
}
// Remove mined transaction from pool.
UnminedTxs.Lock()
delete(UnminedTxs.m, TXID(*txid))
UnminedTxs.Unlock()
}
// NtfnTxSpent handles btcd txspent notifications resulting from a block
// transaction being processed that spents a wallet UTXO.
func NtfnTxSpent(n btcjson.Cmd, marshaled []byte) {
// TODO(jrick): This might actually be useless and maybe it shouldn't
// be implemented.
}