Rework the btcwallet connection.

This changes the protocol between btcd and btcwallet to follow
JSON-RPC specifications sending notifications as requests with an
empty ID.

The notification request context handling has been greatly cleaned up
now that IDs no longer need to be saved when sending notifications.
This commit is contained in:
Josh Rickmar 2014-01-08 11:40:27 -05:00
parent a5cc3196b4
commit cd3084afcd
3 changed files with 142 additions and 224 deletions

View file

@ -5,8 +5,8 @@
package main
import (
"github.com/conformal/btcd/limits"
"fmt"
"github.com/conformal/btcd/limits"
"net"
"net/http"
_ "net/http/pprof"

View file

@ -316,7 +316,7 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) {
}
// initialize memory for websocket connections
rpc.ws.connections = make(map[chan []byte]*requestContexts)
rpc.ws.connections = make(map[walletChan]*requestContexts)
rpc.ws.walletNotificationMaster = make(chan []byte)
rpc.ws.txNotifications = make(map[string]*list.List)
rpc.ws.spentNotifications = make(map[btcwire.OutPoint]*list.List)

View file

@ -8,6 +8,7 @@ import (
"bytes"
"code.google.com/p/go.net/websocket"
"container/list"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/conformal/btcdb"
@ -19,7 +20,9 @@ import (
"sync"
)
type wsCommandHandler func(*rpcServer, btcjson.Cmd, chan []byte, *requestContexts) error
type walletChan chan []byte
type wsCommandHandler func(*rpcServer, btcjson.Cmd, walletChan) error
// wsHandlers maps RPC command strings to appropriate websocket handler
// functions.
@ -37,13 +40,13 @@ var wsHandlers = map[string]wsCommandHandler{
type wsContext struct {
sync.RWMutex
// connections holds a map of each currently connected wallet
// listener as the key.
connections map[chan []byte]*requestContexts
// connections holds a map of requests for each wallet using the
// wallet channel as the key.
connections map[walletChan]*requestContexts
// Any chain notifications meant to be received by every connected
// wallet are sent across this channel.
walletNotificationMaster chan []byte
walletNotificationMaster walletChan
// Map of address hash to list of notificationCtx. This is the global
// list we actually use for notifications, we also keep a list in the
@ -58,41 +61,30 @@ type wsContext struct {
minedTxNotifications map[btcwire.ShaHash]*list.List
}
type notificationCtx struct {
id interface{}
connection chan []byte
rc *requestContexts
}
// AddTxRequest adds the request context for new transaction notifications.
func (r *wsContext) AddTxRequest(walletNotification chan []byte, rc *requestContexts, addr string, id interface{}) {
func (r *wsContext) AddTxRequest(wallet walletChan, addr string) {
r.Lock()
defer r.Unlock()
nc := &notificationCtx{
id: id,
connection: walletNotification,
rc: rc,
}
clist, ok := r.txNotifications[addr]
if !ok {
clist = list.New()
r.txNotifications[addr] = clist
}
clist.PushBack(nc)
clist.PushBack(wallet)
rc.txRequests[addr] = id
rc := r.connections[wallet]
rc.txRequests[addr] = struct{}{}
}
func (r *wsContext) removeGlobalTxRequest(walletNotification chan []byte, addr string) {
func (r *wsContext) removeGlobalTxRequest(wallet walletChan, addr string) {
clist := r.txNotifications[addr]
var enext *list.Element
for e := clist.Front(); e != nil; e = enext {
enext = e.Next()
ctx := e.Value.(*notificationCtx)
if ctx.connection == walletNotification {
c := e.Value.(walletChan)
if c == wallet {
clist.Remove(e)
break
}
@ -105,31 +97,28 @@ func (r *wsContext) removeGlobalTxRequest(walletNotification chan []byte, addr s
// AddSpentRequest adds a request context for notifications of a spent
// Outpoint.
func (r *wsContext) AddSpentRequest(walletNotification chan []byte, rc *requestContexts, op *btcwire.OutPoint, id interface{}) {
func (r *wsContext) AddSpentRequest(wallet walletChan, op *btcwire.OutPoint) {
r.Lock()
defer r.Unlock()
nc := &notificationCtx{
id: id,
connection: walletNotification,
rc: rc,
}
clist, ok := r.spentNotifications[*op]
if !ok {
clist = list.New()
r.spentNotifications[*op] = clist
}
clist.PushBack(nc)
rc.spentRequests[*op] = id
clist.PushBack(wallet)
rc := r.connections[wallet]
rc.spentRequests[*op] = struct{}{}
}
func (r *wsContext) removeGlobalSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) {
func (r *wsContext) removeGlobalSpentRequest(wallet walletChan, op *btcwire.OutPoint) {
clist := r.spentNotifications[*op]
var enext *list.Element
for e := clist.Front(); e != nil; e = enext {
enext = e.Next()
ctx := e.Value.(*notificationCtx)
if ctx.connection == walletNotification {
c := e.Value.(walletChan)
if c == wallet {
clist.Remove(e)
break
}
@ -142,42 +131,39 @@ func (r *wsContext) removeGlobalSpentRequest(walletNotification chan []byte, op
// RemoveSpentRequest removes a request context for notifications of a
// spent Outpoint.
func (r *wsContext) RemoveSpentRequest(walletNotification chan []byte, rc *requestContexts, op *btcwire.OutPoint) {
func (r *wsContext) RemoveSpentRequest(wallet walletChan, op *btcwire.OutPoint) {
r.Lock()
defer r.Unlock()
r.removeGlobalSpentRequest(walletNotification, op)
r.removeGlobalSpentRequest(wallet, op)
rc := r.connections[wallet]
delete(rc.spentRequests, *op)
}
// AddMinedTxRequest adds request contexts for notifications of a
// mined transaction.
func (r *wsContext) AddMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) {
func (r *wsContext) AddMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) {
r.Lock()
defer r.Unlock()
rc := r.connections[walletNotification]
nc := &notificationCtx{
connection: walletNotification,
rc: rc,
}
clist, ok := r.minedTxNotifications[*txID]
if !ok {
clist = list.New()
r.minedTxNotifications[*txID] = clist
}
clist.PushBack(nc)
rc.minedTxRequests[*txID] = true
clist.PushBack(wallet)
rc := r.connections[wallet]
rc.minedTxRequests[*txID] = struct{}{}
}
func (r *wsContext) removeGlobalMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) {
func (r *wsContext) removeGlobalMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) {
clist := r.minedTxNotifications[*txID]
var enext *list.Element
for e := clist.Front(); e != nil; e = enext {
enext = e.Next()
ctx := e.Value.(*notificationCtx)
if ctx.connection == walletNotification {
c := e.Value.(walletChan)
if c == wallet {
clist.Remove(e)
break
}
@ -190,44 +176,42 @@ func (r *wsContext) removeGlobalMinedTxRequest(walletNotification chan []byte, t
// RemoveMinedTxRequest removes request contexts for notifications of a
// mined transaction.
func (r *wsContext) RemoveMinedTxRequest(walletNotification chan []byte, rc *requestContexts, txID *btcwire.ShaHash) {
func (r *wsContext) RemoveMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) {
r.Lock()
defer r.Unlock()
r.removeMinedTxRequest(walletNotification, rc, txID)
r.removeMinedTxRequest(wallet, txID)
}
// removeMinedTxRequest removes request contexts for notifications of a
// mined transaction without grabbing any locks.
func (r *wsContext) removeMinedTxRequest(walletNotification chan []byte, rc *requestContexts, txID *btcwire.ShaHash) {
r.removeGlobalMinedTxRequest(walletNotification, txID)
func (r *wsContext) removeMinedTxRequest(wallet walletChan, txID *btcwire.ShaHash) {
r.removeGlobalMinedTxRequest(wallet, txID)
rc := r.connections[wallet]
delete(rc.minedTxRequests, *txID)
}
// CloseListeners removes all request contexts for notifications sent
// to a wallet notification channel and closes the channel to stop all
// goroutines currently serving that wallet.
func (r *wsContext) CloseListeners(walletNotification chan []byte) {
func (r *wsContext) CloseListeners(wallet walletChan) {
r.Lock()
defer r.Unlock()
delete(r.connections, walletNotification)
close(walletNotification)
delete(r.connections, wallet)
close(wallet)
}
// requestContexts holds all requests for a single wallet connection.
type requestContexts struct {
// txRequests maps between a 160-byte pubkey hash and the JSON
// id of the requester so replies can be correctly routed back
// to the correct btcwallet callback. The key must be a stringified
// address hash.
txRequests map[string]interface{}
// txRequests is a set of addresses a wallet has requested transactions
// updates for. It is maintained here so all requests can be removed
// when a wallet disconnects.
txRequests map[string]struct{}
// spentRequests maps between an Outpoint of an unspent
// transaction output and the JSON id of the requester so
// replies can be correctly routed back to the correct
// btcwallet callback.
spentRequests map[btcwire.OutPoint]interface{}
// spentRequests is a set of unspent Outpoints a wallet has requested
// notifications for when they are spent by a processed transaction.
spentRequests map[btcwire.OutPoint]struct{}
// minedTxRequests holds a set of transaction IDs (tx hashes) of
// transactions created by a wallet. A wallet may request
@ -235,35 +219,33 @@ type requestContexts struct {
// removed from the mempool. Once a tx has been mined into a
// block, wallet may remove the raw transaction from its unmined tx
// pool.
minedTxRequests map[btcwire.ShaHash]bool
minedTxRequests map[btcwire.ShaHash]struct{}
}
// respondToAnyCmd checks that a parsed command is a standard or
// extension JSON-RPC command and runs the proper handler to reply to
// the command. Any and all responses are sent to the wallet from
// this function.
func respondToAnyCmd(cmd btcjson.Cmd, s *rpcServer,
walletNotification chan []byte, rc *requestContexts) {
func respondToAnyCmd(cmd btcjson.Cmd, s *rpcServer, wallet walletChan) {
// Lookup the websocket extension for the command and if it doesn't
// exist fallback to handling the command as a standard command.
wsHandler, ok := wsHandlers[cmd.Method()]
if !ok {
reply := standardCmdReply(cmd, s)
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
return
}
// Call the appropriate handler which responds unless there was an
// error in which case the error is marshalled and sent here.
if err := wsHandler(s, cmd, walletNotification, rc); err != nil {
if err := wsHandler(s, cmd, wallet); err != nil {
var reply btcjson.Reply
jsonErr, ok := err.(btcjson.Error)
if ok {
reply.Error = &jsonErr
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
return
}
@ -276,15 +258,13 @@ func respondToAnyCmd(cmd btcjson.Cmd, s *rpcServer,
}
reply.Error = &jsonErr
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
}
}
// handleGetCurrentNet implements the getcurrentnet command extension
// for websocket connections.
func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, rc *requestContexts) error {
func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
@ -297,15 +277,13 @@ func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd,
reply.Result = float64(net)
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
return nil
}
// handleGetBestBlock implements the getbestblock command extension
// for websocket connections.
func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, rc *requestContexts) error {
func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
@ -322,15 +300,13 @@ func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd,
"height": height,
}
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
return nil
}
// handleNotifyNewTXs implements the notifynewtxs command extension for
// websocket connections.
func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, rc *requestContexts) error {
func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
@ -351,20 +327,17 @@ func handleNotifyNewTXs(s *rpcServer, cmd btcjson.Cmd,
return fmt.Errorf("address is not P2PKH: %v", addr.EncodeAddress())
}
s.ws.AddTxRequest(walletNotification, rc, addr.EncodeAddress(),
cmd.Id())
s.ws.AddTxRequest(wallet, addr.EncodeAddress())
}
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
return nil
}
// handleNotifySpent implements the notifyspent command extension for
// websocket connections.
func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, rc *requestContexts) error {
func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
@ -373,22 +346,16 @@ func handleNotifySpent(s *rpcServer, cmd btcjson.Cmd,
return btcjson.ErrInternal
}
s.ws.AddSpentRequest(walletNotification, rc, notifyCmd.OutPoint,
cmd.Id())
s.ws.AddSpentRequest(wallet, notifyCmd.OutPoint)
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
return nil
}
// handleRescan implements the rescan command extension for websocket
// connections.
func handleRescan(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, rc *requestContexts) error {
id := cmd.Id()
reply := &btcjson.Reply{Id: &id}
func handleRescan(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
rescanCmd, ok := cmd.(*btcws.RescanCmd)
if !ok {
return btcjson.ErrInternal
@ -425,11 +392,12 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd,
}
for _, tx := range blk.Transactions() {
var txReply *btcdb.TxListReply
txouts:
for txOutIdx, txout := range tx.MsgTx().TxOut {
_, addrs, _, err := btcscript.ExtractPkScriptAddrs(
txout.PkScript, s.server.btcnet)
if err != nil {
continue
continue txouts
}
for i, addr := range addrs {
@ -443,7 +411,7 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd,
txReplyList, err := s.server.db.FetchTxBySha(tx.Sha())
if err != nil {
rpcsLog.Errorf("Tx Sha %v not found by db.", tx.Sha())
return err
continue txouts
}
for i := range txReplyList {
if txReplyList[i].Height == blk.Height() {
@ -451,33 +419,23 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd,
break
}
}
}
reply.Result = struct {
Receiver string `json:"receiver"`
Height int64 `json:"height"`
BlockHash string `json:"blockhash"`
BlockIndex int `json:"blockindex"`
BlockTime int64 `json:"blocktime"`
TxID string `json:"txid"`
TxOutIndex uint32 `json:"txoutindex"`
Amount int64 `json:"amount"`
PkScript string `json:"pkscript"`
Spent bool `json:"spent"`
}{
Receiver: encodedAddr,
Height: blk.Height(),
BlockHash: blkshalist[i].String(),
BlockIndex: tx.Index(),
BlockTime: blk.MsgBlock().Header.Timestamp.Unix(),
TxID: tx.Sha().String(),
TxOutIndex: uint32(txOutIdx),
Amount: txout.Value,
PkScript: btcutil.Base58Encode(txout.PkScript),
Spent: txReply.TxSpent[txOutIdx],
ntfn := &btcws.ProcessedTxNtfn{
Receiver: encodedAddr,
Amount: txout.Value,
TxID: tx.Sha().String(),
TxOutIndex: uint32(txOutIdx),
PkScript: hex.EncodeToString(txout.PkScript),
BlockHash: blkshalist[i].String(),
BlockHeight: int32(blk.Height()),
BlockIndex: tx.Index(),
BlockTime: blk.MsgBlock().Header.Timestamp.Unix(),
Spent: txReply.TxSpent[txOutIdx],
}
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
mntfn, _ := ntfn.MarshalJSON()
wallet <- mntfn
}
}
}
@ -491,19 +449,23 @@ func handleRescan(s *rpcServer, cmd btcjson.Cmd,
}
}
reply.Result = nil
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
rpcsLog.Info("Finished rescan")
id := cmd.Id()
response := &btcjson.Reply{
Id: &id,
Result: nil,
Error: nil,
}
mresponse, _ := json.Marshal(response)
wallet <- mresponse
return nil
}
// handleWalletSendRawTransaction implements the websocket extended version of
// the sendrawtransaction command.
func handleWalletSendRawTransaction(s *rpcServer, cmd btcjson.Cmd,
walletNotification chan []byte, rc *requestContexts) error {
func handleWalletSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, wallet walletChan) error {
result, err := handleSendRawTransaction(s, cmd)
if err != nil {
return err
@ -514,31 +476,28 @@ func handleWalletSendRawTransaction(s *rpcServer, cmd btcjson.Cmd,
txSha, _ := btcwire.NewShaHashFromStr(result.(string))
// Request to be notified when the transaction is mined.
s.ws.AddMinedTxRequest(walletNotification, txSha)
s.ws.AddMinedTxRequest(wallet, txSha)
return nil
}
// AddWalletListener adds a channel to listen for new messages from a
// wallet.
func (s *rpcServer) AddWalletListener(c chan []byte) *requestContexts {
func (s *rpcServer) AddWalletListener(c walletChan) {
s.ws.Lock()
rc := &requestContexts{
// The key is a stringified addressHash.
txRequests: make(map[string]interface{}),
spentRequests: make(map[btcwire.OutPoint]interface{}),
minedTxRequests: make(map[btcwire.ShaHash]bool),
txRequests: make(map[string]struct{}),
spentRequests: make(map[btcwire.OutPoint]struct{}),
minedTxRequests: make(map[btcwire.ShaHash]struct{}),
}
s.ws.connections[c] = rc
s.ws.Unlock()
return rc
}
// RemoveWalletListener removes a wallet listener channel.
func (s *rpcServer) RemoveWalletListener(c chan []byte, rc *requestContexts) {
func (s *rpcServer) RemoveWalletListener(c walletChan) {
s.ws.Lock()
rc := s.ws.connections[c]
for k := range rc.txRequests {
s.ws.removeGlobalTxRequest(c, k)
}
@ -580,9 +539,9 @@ func (s *rpcServer) walletListenerDuplicator() {
func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// Add wallet notification channel so this handler receives btcd chain
// notifications.
c := make(chan []byte)
rc := s.AddWalletListener(c)
defer s.RemoveWalletListener(c, rc)
c := make(walletChan)
s.AddWalletListener(c)
defer s.RemoveWalletListener(c)
// msgs is a channel for all messages received over the websocket.
msgs := make(chan []byte)
@ -614,7 +573,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
return
}
// Handle request here.
go s.websocketJSONHandler(c, rc, m)
go s.websocketJSONHandler(c, m)
case ntfn, _ := <-c:
// Send btcd notification to btcwallet instance over
// websocket.
@ -631,9 +590,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// websocketJSONHandler parses and handles a marshalled json message,
// sending the marshalled reply to a wallet notification channel.
func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte,
rc *requestContexts, msg []byte) {
func (s *rpcServer) websocketJSONHandler(wallet walletChan, msg []byte) {
s.wg.Add(1)
defer s.wg.Done()
@ -648,11 +605,11 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte,
}
reply.Error = jsonErr
mreply, _ := json.Marshal(reply)
walletNotification <- mreply
wallet <- mreply
return
}
respondToAnyCmd(cmd, s, walletNotification, rc)
respondToAnyCmd(cmd, s, wallet)
}
// NotifyBlockConnected creates and marshalls a JSON message to notify
@ -678,7 +635,7 @@ func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
var enext *list.Element
for e := clist.Front(); e != nil; e = enext {
enext = e.Next()
ctx := e.Value.(*notificationCtx)
c := e.Value.(walletChan)
// TODO: remove int32 type conversion after
// the int64 -> int32 switch is made.
ntfn := btcws.NewTxMinedNtfn(tx.Sha().String(),
@ -686,9 +643,8 @@ func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
block.MsgBlock().Header.Timestamp.Unix(),
tx.Index())
mntfn, _ := json.Marshal(ntfn)
ctx.connection <- mntfn
s.ws.removeMinedTxRequest(ctx.connection, ctx.rc,
tx.Sha())
c <- mntfn
s.ws.removeMinedTxRequest(c, tx.Sha())
}
}
}
@ -722,39 +678,19 @@ func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) {
}
}
func notifySpentData(ctx *notificationCtx, txhash *btcwire.ShaHash, index uint32,
func notifySpentData(wallet walletChan, txhash *btcwire.ShaHash, index uint32,
spender *btcutil.Tx) {
txStr := ""
if spender != nil {
var buf bytes.Buffer
err := spender.MsgTx().Serialize(&buf)
if err != nil {
// This really shouldn't ever happen...
rpcsLog.Warnf("Can't serialize tx: %v", err)
return
}
txStr = string(buf.Bytes())
}
reply := &btcjson.Reply{
Result: struct {
TxHash string `json:"txhash"`
Index uint32 `json:"index"`
SpendingTx string `json:"spender,omitempty"`
}{
TxHash: txhash.String(),
Index: index,
SpendingTx: txStr,
},
Error: nil,
Id: &ctx.id,
}
replyBytes, err := json.Marshal(reply)
if err != nil {
rpcsLog.Errorf("Unable to marshal spent notification: %v", err)
return
}
ctx.connection <- replyBytes
var buf bytes.Buffer
// Ignore Serialize's error, as writing to a bytes.buffer
// cannot fail.
spender.MsgTx().Serialize(&buf)
txStr := hex.EncodeToString(buf.Bytes())
// TODO(jrick): create a new notification in btcws and use that.
ntfn := btcws.NewTxSpentNtfn(txhash.String(), int(index), txStr)
mntfn, _ := ntfn.MarshalJSON()
wallet <- mntfn
}
// newBlockNotifyCheckTxIn is a helper function to iterate through
@ -766,11 +702,10 @@ func (s *rpcServer) newBlockNotifyCheckTxIn(tx *btcutil.Tx) {
var enext *list.Element
for e := clist.Front(); e != nil; e = enext {
enext = e.Next()
ctx := e.Value.(*notificationCtx)
notifySpentData(ctx, &txin.PreviousOutpoint.Hash,
uint32(txin.PreviousOutpoint.Index), tx)
s.ws.RemoveSpentRequest(ctx.connection, ctx.rc,
&txin.PreviousOutpoint)
c := e.Value.(walletChan)
notifySpentData(c, &txin.PreviousOutpoint.Hash,
txin.PreviousOutpoint.Index, tx)
s.ws.RemoveSpentRequest(c, &txin.PreviousOutpoint)
}
}
}
@ -796,25 +731,17 @@ func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
encodedAddr := addr.EncodeAddress()
if idlist, ok := s.ws.txNotifications[encodedAddr]; ok {
for e := idlist.Front(); e != nil; e = e.Next() {
ctx := e.Value.(*notificationCtx)
wallet := e.Value.(walletChan)
// TODO(jrick): shove this in btcws
result := struct {
Receiver string `json:"receiver"`
Height int64 `json:"height"`
BlockHash string `json:"blockhash"`
BlockIndex int `json:"blockindex"`
BlockTime int64 `json:"blocktime"`
TxID string `json:"txid"`
TxOutIndex uint32 `json:"txoutindex"`
Amount int64 `json:"amount"`
PkScript string `json:"pkscript"`
}{
ntfn := &btcws.ProcessedTxNtfn{
Receiver: encodedAddr,
TxID: tx.Sha().String(),
TxOutIndex: uint32(i),
Amount: txout.Value,
PkScript: btcutil.Base58Encode(txout.PkScript),
PkScript: hex.EncodeToString(txout.PkScript),
// TODO(jrick): hardcoding unspent is WRONG and needs
// to be either calculated from other block txs, or dropped.
Spent: false,
}
if block != nil {
@ -823,26 +750,17 @@ func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
rpcsLog.Error("Error getting block sha; dropping Tx notification.")
break
}
result.Height = block.Height()
result.BlockHash = blkhash.String()
result.BlockIndex = tx.Index()
result.BlockTime = block.MsgBlock().Header.Timestamp.Unix()
ntfn.BlockHeight = int32(block.Height())
ntfn.BlockHash = blkhash.String()
ntfn.BlockIndex = tx.Index()
ntfn.BlockTime = block.MsgBlock().Header.Timestamp.Unix()
} else {
result.Height = -1
result.BlockIndex = -1
ntfn.BlockHeight = -1
ntfn.BlockIndex = -1
}
reply := &btcjson.Reply{
Result: result,
Error: nil,
Id: &ctx.id,
}
mreply, err := json.Marshal(reply)
if err != nil {
rpcsLog.Errorf("Unable to marshal tx notification: %v", err)
continue
}
ctx.connection <- mreply
mntfn, _ := ntfn.MarshalJSON()
wallet <- mntfn
}
}
}